This commit is contained in:
aozhiwei 2023-07-11 20:14:48 +08:00
parent f429bbe5ef
commit 8913e000f4
3 changed files with 12 additions and 11 deletions

View File

@ -9,6 +9,8 @@ const eventsFactory = require('./events/factory');
const LIMIT_COUNT = 100; const LIMIT_COUNT = 100;
let g_bcevent_conn = null;
class DbEventProcess extends BaseService { class DbEventProcess extends BaseService {
static #tableMaxIdxHash = {}; static #tableMaxIdxHash = {};
@ -16,6 +18,7 @@ class DbEventProcess extends BaseService {
static async staticInit() { static async staticInit() {
const {err, conn} = await app.getDbConn(constant.BCEVENTDB_NAME); const {err, conn} = await app.getDbConn(constant.BCEVENTDB_NAME);
const tables = metaFactory.getAllTables(); const tables = metaFactory.getAllTables();
g_bcevent_conn = conn;
await utils.serial await utils.serial
( (
Object.values(tables), Object.values(tables),
@ -50,8 +53,7 @@ class DbEventProcess extends BaseService {
this.eventConf = this.event['eventConf']; this.eventConf = this.event['eventConf'];
this.progInfo = this.event['progressInfo']; this.progInfo = this.event['progressInfo'];
this.contractAddress = this.net.getContractAddressByName(this.getContractName()); this.contractAddress = this.net.getContractAddressByName(this.getContractName());
const {err, conn} = await app.getDbConn(constant.BCEVENTDB_NAME); this.conn = g_bcevent_conn;
this.conn = conn;
this.lastIdx = await this.getLastIdx(); this.lastIdx = await this.getLastIdx();
this.progInfo['proclastIdx'] = this.lastIdx.toString(); this.progInfo['proclastIdx'] = this.lastIdx.toString();
this.eventHandle = eventsFactory.getEventHandle(this); this.eventHandle = eventsFactory.getEventHandle(this);
@ -93,13 +95,13 @@ class DbEventProcess extends BaseService {
throw err; throw err;
} }
if (rows.length > 0) { if (rows.length > 0) {
this.progInfo['procEventCount'] += rows.length;
await utils.serial( await utils.serial(
rows, rows,
async (row) => { async (row) => {
await this.process(row); await this.process(row);
} }
); );
this.progInfo['procEventCount'] += rows.length;
} else { } else {
this.lastIdx = endIdx; this.lastIdx = endIdx;
} }

View File

@ -75,15 +75,12 @@ class BaseEventProcess {
} }
async markOk() { async markOk() {
const {err} = await this.updateEventDb( await this.updateEventDb(
[ [
['state', constant.EVENTDB_STATE_HANDLED], ['status', constant.EVENTDB_STATE_HANDLED],
['modifytime', utils.getUtcTime()], ['modifytime', utils.getUtcTime()],
] ]
); );
if (err) {
this.throwError('markOk error');
}
} }
async updateEventDb(fields) { async updateEventDb(fields) {
@ -93,7 +90,7 @@ class BaseEventProcess {
'update', 'update',
this.eventProc.getTableName(), this.eventProc.getTableName(),
[ [
['idx', this.getEventDb['idx']], ['idx', this.getEventDb()['idx']],
], ],
fields fields
); );

View File

@ -8,6 +8,8 @@ const BaseService = require('./baseservice');
const LIMIT_COUNT = 100; const LIMIT_COUNT = 100;
const BCEVENT_TABLE_NAME = 't_blockchain_event'; const BCEVENT_TABLE_NAME = 't_blockchain_event';
let g_bcevent_conn = null;
class PullDbEvent extends BaseService { class PullDbEvent extends BaseService {
static #maxIdx = BigInt(0); static #maxIdx = BigInt(0);
@ -21,6 +23,7 @@ class PullDbEvent extends BaseService {
} }
PullDbEvent.#maxIdx = maxIdx; PullDbEvent.#maxIdx = maxIdx;
} }
g_bcevent_conn = conn;
const updateMaxIdxFunc = async () => { const updateMaxIdxFunc = async () => {
while (true) { while (true) {
const {err, maxIdx} = await conn.getMaxIdx(BCEVENT_TABLE_NAME); const {err, maxIdx} = await conn.getMaxIdx(BCEVENT_TABLE_NAME);
@ -36,8 +39,7 @@ class PullDbEvent extends BaseService {
} }
async init(net, event) { async init(net, event) {
const {err, conn} = await app.getDbConn(constant.BCEVENTDB_NAME); this.conn = g_bcevent_conn;
this.conn = conn;
this.net = net; this.net = net;
this.event = event; this.event = event;
this.eventConf = this.event['eventConf']; this.eventConf = this.event['eventConf'];