增加一个更新抓取数据中失败记录的task

This commit is contained in:
zhl 2019-05-09 16:33:05 +08:00
parent e1dcbabd1f
commit 32f3d89035
6 changed files with 70 additions and 10 deletions

View File

@ -9,6 +9,7 @@ import book from './sites/book';
import bookChapter from './sites/bookChapter'; import bookChapter from './sites/bookChapter';
import dandanzan from './sites/dandanzan'; import dandanzan from './sites/dandanzan';
import proxy from './sites/proxy'; import proxy from './sites/proxy';
import commonTask from './sites/common';
mongoose.Promise = Promise; mongoose.Promise = Promise;
@ -18,14 +19,15 @@ db.on('error', function (err) {
logger.error(err); logger.error(err);
process.exit(1); process.exit(1);
}); });
db.once('open', function () { db.once('open', async function () {
logger.info('Connected to db.'); logger.info('Connected to db.');
// await proxy.run();
// hoh8.run(); // hoh8.run();
// book.run(); // book.run();
// movie.run(); // movie.run();
// bookChapter.run(); // bookChapter.run();
dandanzan.run(); // await dandanzan.run();
// proxy.run(); await commonTask.run();
}); });
mongoose.connect(config.db, {promiseLibrary: Promise, useNewUrlParser: true}); mongoose.connect(config.db, {promiseLibrary: Promise, useNewUrlParser: true});

View File

@ -16,6 +16,7 @@ let ProxyInfoSchema = new Schema({
status: {type: Number, default: 0}, status: {type: Number, default: 0},
// 连续尝试次数 // 连续尝试次数
try_count: {type: Number, default: 0}, try_count: {type: Number, default: 0},
err_count: {type: Number, default: 0}
}, { }, {
collection: 'proxy_info', collection: 'proxy_info',
timestamps: true timestamps: true
@ -30,9 +31,13 @@ ProxyInfoModel.updateOne = async function (link, record) {
} }
// 返回所有待检查的代理 // 返回所有待检查的代理
ProxyInfoModel.needCheckList = function() { ProxyInfoModel.needCheckList = function(all) {
if (all) {
return ProxyInfoModel.find({status: {$in: [0 ,1, -1]}});
} else {
return ProxyInfoModel.find({status: {$in: [0 ,1]}}); return ProxyInfoModel.find({status: {$in: [0 ,1]}});
} }
}
// 所有可用代理 // 所有可用代理
ProxyInfoModel.availableList = function() { ProxyInfoModel.availableList = function() {

52
src/sites/common.js Normal file
View File

@ -0,0 +1,52 @@
import CrawlRecord from '../models/spider/CrawlRecord';
import generalQueue from '../utils/general.queue';
/**
* 获取抓取记录中 lastStatus = false的记录再次抓取
* */
const parseOneRecord = async function (record) {
let classTask = require(`./${record.className}`).default;
try {
if(typeof classTask[record.methodName] === 'function'){
await classTask[record.methodName](record.params);
}
} catch (err) {
console.log('error parse false record', record.id);
}
}
const parseAllRecord = async function () {
let records;
try {
records = await CrawlRecord.find({lastStatus: false});
} catch (err) {
console.log('error get records');
}
return new Promise(async (resolve, reject) => {
console.time('all')
generalQueue.setCb(function () {
console.log('parse all false records finished');
resolve()
})
if (records) {
for(const record of records) {
try {
generalQueue.addQueue({
run: async function () {
await parseOneRecord(record);
}
})
} catch (err) {
console.log('error parse record:', record);
}
}
}
});
}
export default {
run: async () => {
await parseAllRecord();
}
}

View File

@ -167,10 +167,11 @@ const parseAllMovie = async (category, beginNo = 1) => {
export default { export default {
run: async () => { run: async () => {
// await proxy.run();
await parseAllMovie('movie'); await parseAllMovie('movie');
await parseAllMovie('tv'); await parseAllMovie('tv');
await parseAllMovie('show'); await parseAllMovie('show');
await parseAllMovie('cartoon'); await parseAllMovie('cartoon');
} },
parseListPage: parseListPage,
parseOnePage: parseOnePage
} }

View File

@ -11,7 +11,7 @@ let q = async.queue( async (reqObj, cb) => {
} catch (err) { } catch (err) {
cb(err); cb(err);
} }
}, 10); }, 20);
q.drain = function(){ q.drain = function(){
console.info('all queue done'); console.info('all queue done');
console.timeEnd('all'); console.timeEnd('all');

View File

@ -59,7 +59,7 @@ export default {
if(response && response.statusCode === 200 ){ if(response && response.statusCode === 200 ){
resolve(response.text); resolve(response.text);
} else { } else {
console.log('parse page with statusCode: ', response.statusCode, url); console.log('parse page with statusCode: ', (response) && (response.statusCode), url);
try { try {
proxy = 'http://' + proxys[stringUtil.randomNum(0, proxys.length - 1)].link; proxy = 'http://' + proxys[stringUtil.randomNum(0, proxys.length - 1)].link;
response = await request.get(url) response = await request.get(url)
@ -70,7 +70,7 @@ export default {
if(response && response.statusCode === 200 ){ if(response && response.statusCode === 200 ){
resolve(response.text); resolve(response.text);
} else { } else {
reject(new Error('parse page with error statusCode: ' + response.statusCode)) reject(new Error('parse page with error statusCode: ' + (response) && (response.statusCode)))
} }
} catch (err2) { } catch (err2) {
reject(err2) reject(err2)