This commit is contained in:
aozhiwei 2023-07-09 13:50:46 +08:00
parent d356b9c3db
commit 53ba20dfd3
3 changed files with 49 additions and 3 deletions

View File

@ -0,0 +1,5 @@
const EVENTDB_STATE_PENDING = 0;
const EVENTDB_STATE_HANDLED = 1;
exports.EVENTDB_STATE_PENDING = EVENTDB_STATE_PENDING;
exports.EVENTDB_STATE_HANDLED = EVENTDB_STATE_HANDLED;

View File

@ -2,6 +2,7 @@ const app = require('j7/app');
const utils = require('j7/utils');
const bcutils = require('j7/bcutils');
const log = require('j7/log');
const constant = require('common/constant');
const BaseService = require('./baseservice');
const metaFactory = require('../metadata/factory');
const eventsFactory = require('./events/factory');
@ -58,7 +59,9 @@ class DbEventProcess extends BaseService {
async start() {
while (true) {
if (this.eventHandle) {
await this.pullEvent();
}
await utils.sleep(500 + utils.randRange(500, 1500));
}
}
@ -75,7 +78,8 @@ class DbEventProcess extends BaseService {
}
const {err, rows} = await this.conn.execQuery(
'SELECT * FROM ${tableName} WHERE `idx` > ? AND `idx` <= ? ' +
'AND net_id = ? AND event_name = ? AND contract_address = ? ' +
'AND net_id = ? AND event_name = ? AND contract_address = ? AND ' +
'state = ' + constant.EVENTDB_STATE_PENDING + ' ' +
'LIMIT ' + LIMIT_COUNT,
[
startIdx.toString(),
@ -91,7 +95,7 @@ class DbEventProcess extends BaseService {
await utils.serial(
rows,
async (row) => {
await this.saveToDb(row);
await this.process(row);
}
);
this.progInfo['procEventCount'] += rows.length;
@ -206,6 +210,15 @@ class DbEventProcess extends BaseService {
}
}
async process(row) {
if (!this.eventHandle) {
return false;
}
const handle = eventsFactory.createHash(this);
await handle.start();
return true;
}
}
module.exports = DbEventProcess;

View File

@ -3,6 +3,7 @@ const app = require('j7/app');
const bcutils = require('j7/bcutils');
const utils = require('j7/utils');
const j7event = require('j7/event');
const constant = require('common/constant');
const metaFactory = require('../../metadata/factory');
class BaseEventProcess {
@ -35,6 +36,33 @@ class BaseEventProcess {
throw errMsg;
}
async markOk() {
await this.updateEventDb(
[
['state', constant.EVENTDB_STATE_HANDLED],
['modifytime', utils.getUtcTime()],
]
);
}
async updateEventDb(fields) {
const logHead = this.genLogHead('updateEventDb');
while (true) {
const {err} = await this.conn.update(
this.eventProc.getTableName(),
[
['idx', this.getEventDb['idx']],
],
fields
);
if (!err) {
break;
}
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
}
}
module.exports = BaseEventProcess;