diff --git a/src/app.js b/src/app.js index 98889ac..caddd22 100644 --- a/src/app.js +++ b/src/app.js @@ -9,6 +9,7 @@ import book from './sites/book'; import bookChapter from './sites/bookChapter'; import dandanzan from './sites/dandanzan'; import proxy from './sites/proxy'; +import commonTask from './sites/common'; mongoose.Promise = Promise; @@ -18,14 +19,15 @@ db.on('error', function (err) { logger.error(err); process.exit(1); }); -db.once('open', function () { +db.once('open', async function () { logger.info('Connected to db.'); + // await proxy.run(); // hoh8.run(); // book.run(); // movie.run(); // bookChapter.run(); - dandanzan.run(); - // proxy.run(); + // await dandanzan.run(); + await commonTask.run(); }); mongoose.connect(config.db, {promiseLibrary: Promise, useNewUrlParser: true}); diff --git a/src/models/spider/ProxyInfo.js b/src/models/spider/ProxyInfo.js index bb3bc52..46e395c 100644 --- a/src/models/spider/ProxyInfo.js +++ b/src/models/spider/ProxyInfo.js @@ -16,6 +16,7 @@ let ProxyInfoSchema = new Schema({ status: {type: Number, default: 0}, // 连续尝试次数 try_count: {type: Number, default: 0}, + err_count: {type: Number, default: 0} }, { collection: 'proxy_info', timestamps: true @@ -30,8 +31,12 @@ ProxyInfoModel.updateOne = async function (link, record) { } // 返回所有待检查的代理 -ProxyInfoModel.needCheckList = function() { - return ProxyInfoModel.find({status: {$in: [0 ,1]}}); +ProxyInfoModel.needCheckList = function(all) { + if (all) { + return ProxyInfoModel.find({status: {$in: [0 ,1, -1]}}); + } else { + return ProxyInfoModel.find({status: {$in: [0 ,1]}}); + } } // 所有可用代理 diff --git a/src/sites/common.js b/src/sites/common.js new file mode 100644 index 0000000..39c6fa8 --- /dev/null +++ b/src/sites/common.js @@ -0,0 +1,52 @@ +import CrawlRecord from '../models/spider/CrawlRecord'; +import generalQueue from '../utils/general.queue'; + +/** + * 获取抓取记录中 lastStatus = false的记录,再次抓取 + * */ + +const parseOneRecord = async function (record) { + let classTask = require(`./${record.className}`).default; + try { + if(typeof classTask[record.methodName] === 'function'){ + await classTask[record.methodName](record.params); + } + } catch (err) { + console.log('error parse false record', record.id); + } +} + +const parseAllRecord = async function () { + let records; + try { + records = await CrawlRecord.find({lastStatus: false}); + } catch (err) { + console.log('error get records'); + } + return new Promise(async (resolve, reject) => { + console.time('all') + generalQueue.setCb(function () { + console.log('parse all false records finished'); + resolve() + }) + if (records) { + for(const record of records) { + try { + generalQueue.addQueue({ + run: async function () { + await parseOneRecord(record); + } + }) + } catch (err) { + console.log('error parse record:', record); + } + } + } + }); +} + +export default { + run: async () => { + await parseAllRecord(); + } +} diff --git a/src/sites/dandanzan.js b/src/sites/dandanzan.js index 1ed5102..23b6b64 100644 --- a/src/sites/dandanzan.js +++ b/src/sites/dandanzan.js @@ -167,10 +167,11 @@ const parseAllMovie = async (category, beginNo = 1) => { export default { run: async () => { - // await proxy.run(); await parseAllMovie('movie'); await parseAllMovie('tv'); await parseAllMovie('show'); await parseAllMovie('cartoon'); - } + }, + parseListPage: parseListPage, + parseOnePage: parseOnePage } diff --git a/src/utils/general.queue.js b/src/utils/general.queue.js index c0b94f6..36ade86 100644 --- a/src/utils/general.queue.js +++ b/src/utils/general.queue.js @@ -11,7 +11,7 @@ let q = async.queue( async (reqObj, cb) => { } catch (err) { cb(err); } -}, 10); +}, 20); q.drain = function(){ console.info('all queue done'); console.timeEnd('all'); diff --git a/src/utils/proxy.util.js b/src/utils/proxy.util.js index bdbc3ca..ae210cd 100644 --- a/src/utils/proxy.util.js +++ b/src/utils/proxy.util.js @@ -59,7 +59,7 @@ export default { if(response && response.statusCode === 200 ){ resolve(response.text); } else { - console.log('parse page with statusCode: ', response.statusCode, url); + console.log('parse page with statusCode: ', (response) && (response.statusCode), url); try { proxy = 'http://' + proxys[stringUtil.randomNum(0, proxys.length - 1)].link; response = await request.get(url) @@ -70,7 +70,7 @@ export default { if(response && response.statusCode === 200 ){ resolve(response.text); } else { - reject(new Error('parse page with error statusCode: ' + response.statusCode)) + reject(new Error('parse page with error statusCode: ' + (response) && (response.statusCode))) } } catch (err2) { reject(err2)