251 lines
6.6 KiB
JavaScript
251 lines
6.6 KiB
JavaScript
const app = require('j7/app');
|
|
const utils = require('j7/utils');
|
|
const bcutils = require('j7/bcutils');
|
|
const log = require('j7/log');
|
|
const constant = require('common/constant');
|
|
const BaseService = require('./baseservice');
|
|
const metaFactory = require('../metadata/factory');
|
|
const eventsFactory = require('./events/factory');
|
|
|
|
const LIMIT_COUNT = 100;
|
|
|
|
let g_bcevent_conn = null;
|
|
|
|
class DbEventProcess extends BaseService {
|
|
|
|
static #tableMaxIdxHash = {};
|
|
|
|
static async staticInit() {
|
|
const {err, conn} = await app.getDbConn(constant.BCEVENTDB_NAME);
|
|
const tables = metaFactory.getAllTables();
|
|
g_bcevent_conn = conn;
|
|
await utils.serial
|
|
(
|
|
Object.values(tables),
|
|
async (tblObj) => {
|
|
{
|
|
const {err, maxIdx} = await conn.getMaxIdx(tblObj['table_name']);
|
|
if (err) {
|
|
throw 'DbEventProcess error:' + err;
|
|
}
|
|
DbEventProcess.#tableMaxIdxHash[tblObj['table_name']] = maxIdx;
|
|
}
|
|
const updateMaxIdxFunc = async () => {
|
|
while (true) {
|
|
const {err, maxIdx} = await conn.getMaxIdx(tblObj['table_name']);
|
|
if (!err) {
|
|
DbEventProcess.#tableMaxIdxHash[tblObj['table_name']] = maxIdx;
|
|
await utils.sleep(500 + utils.randRange(500, 1500));
|
|
} else {
|
|
await utils.sleep(5000 + utils.randRange(500, 1500));
|
|
}
|
|
}
|
|
};
|
|
updateMaxIdxFunc();
|
|
}
|
|
);
|
|
}
|
|
|
|
async init(net, event) {
|
|
this.net = net;
|
|
this.event = event;
|
|
this.lastIdx = BigInt(0);
|
|
this.eventConf = this.event['eventConf'];
|
|
this.progInfo = this.event['progressInfo'];
|
|
this.contractAddress = this.net.getContractAddressByName(this.getContractName());
|
|
this.conn = g_bcevent_conn;
|
|
this.lastIdx = await this.getLastIdx();
|
|
this.progInfo['proclastIdx'] = this.lastIdx.toString();
|
|
this.eventHandle = eventsFactory.getEventHandle(this);
|
|
await this.start();
|
|
}
|
|
|
|
async start() {
|
|
while (true) {
|
|
if (this.eventHandle) {
|
|
await this.pullEvent();
|
|
}
|
|
await utils.sleep(500 + utils.randRange(500, 1500));
|
|
}
|
|
}
|
|
|
|
async pullEvent() {
|
|
const logHead = this.getInstanceName() + ' pullDbEvent: ';
|
|
const tableName = this.getTableName();
|
|
try {
|
|
const startIdx = this.getStartIdx();
|
|
const endIdx = this.getEndIdx();
|
|
++this.progInfo['procPullCount'];
|
|
if (startIdx >= endIdx) {
|
|
return;
|
|
}
|
|
const {err, rows} = await this.conn.execQuery(
|
|
`SELECT * FROM ${tableName} WHERE idx > ? AND idx <= ? ` +
|
|
'AND net_id = ? AND event_name = ? AND contract_address = ? AND ' +
|
|
'status = ' + constant.EVENTDB_STATE_PENDING + ' ' +
|
|
'LIMIT ' + LIMIT_COUNT,
|
|
[
|
|
startIdx.toString(),
|
|
endIdx.toString(),
|
|
this.getNetId(),
|
|
this.getEventName(),
|
|
this.getContractAddress()
|
|
]);
|
|
if (err) {
|
|
throw new Error(err);
|
|
}
|
|
if (rows.length > 0) {
|
|
this.progInfo['procEventCount'] += rows.length;
|
|
await utils.serial(
|
|
rows,
|
|
async (row) => {
|
|
await this.process(row);
|
|
}
|
|
);
|
|
} else {
|
|
this.lastIdx = endIdx;
|
|
}
|
|
this.progInfo['procLastIdx'] = this.lastIdx.toString();
|
|
await this.saveLastIdx(this.lastIdx);
|
|
} catch (err) {
|
|
utils.safeDumpErrStack(err);
|
|
log.error(logHead + err);
|
|
await utils.sleep(5000 + utils.randRange(1000, 3000));
|
|
}
|
|
}
|
|
|
|
getStartIdx() {
|
|
return this.lastIdx;
|
|
}
|
|
|
|
getEndIdx() {
|
|
return this.getMaxIdx();
|
|
}
|
|
|
|
getNetId() {
|
|
return this.net['net_id'];
|
|
}
|
|
|
|
getEventName() {
|
|
return this.eventConf['event_name'];
|
|
}
|
|
|
|
getContractAddress() {
|
|
return this.contractAddress;
|
|
}
|
|
|
|
getContractName() {
|
|
return this.eventConf['contract_name'];
|
|
}
|
|
|
|
getContractAddressByName(name) {
|
|
return this.net['getContractAddressByName'](name);
|
|
}
|
|
|
|
getInstanceName() {
|
|
const instName = this.getNetId() + ' ' + this.getContractName() + '.' + this.getEventName() + ' dbprocess';
|
|
return instName;
|
|
}
|
|
|
|
genLogHead(msg) {
|
|
const logHead = this.getNetId() + ' ' + this.getContractName() + '.' + this.getEventName() +
|
|
' dbprocess ' + msg ;
|
|
return logHead;
|
|
}
|
|
|
|
getTableName() {
|
|
return this.eventConf['table_name'];
|
|
}
|
|
|
|
async getLastIdx() {
|
|
const logHead = this.getInstanceName() + ' getLastIdx: ';
|
|
while (true) {
|
|
const {err, row} = await this.conn.ormSelectOne(
|
|
't_dbprocess_last_idx',
|
|
[
|
|
['net_id', this.getNetId()],
|
|
['contract_address', this.getContractAddress()],
|
|
['event_name', this.getEventName()],
|
|
]
|
|
);
|
|
if (err) {
|
|
log.error(logHead + err);
|
|
await utils.sleep(5000 + utils.randRange(500, 1500));
|
|
continue;
|
|
}
|
|
if (row) {
|
|
return BigInt(row['last_idx']);
|
|
}
|
|
break;
|
|
}
|
|
return BigInt(0);
|
|
}
|
|
|
|
async saveLastIdx(lastIdx) {
|
|
const logHead = this.getInstanceName() + ' saveLastIdx: ';
|
|
while (true) {
|
|
const {err} = await this.conn.upsert(
|
|
't_dbprocess_last_idx',
|
|
[
|
|
['net_id', this.getNetId()],
|
|
['contract_address', this.getContractAddress()],
|
|
['event_name', this.getEventName()],
|
|
],
|
|
[
|
|
['last_idx', lastIdx.toString()],
|
|
['modifytime', utils.getUtcTime()],
|
|
],
|
|
[
|
|
['net_id', this.getNetId()],
|
|
['contract_address', this.getContractAddress()],
|
|
['event_name', this.getEventName()],
|
|
['contract_name', this.getContractName()],
|
|
['last_idx', lastIdx],
|
|
['createtime', utils.getUtcTime()],
|
|
['modifytime', utils.getUtcTime()],
|
|
]
|
|
);
|
|
if (!err) {
|
|
break;
|
|
}
|
|
log.error(logHead + err);
|
|
await utils.sleep(5000 + utils.randRange(500, 1500));
|
|
}
|
|
}
|
|
|
|
getMaxIdx() {
|
|
if (utils.hasKey(DbEventProcess.#tableMaxIdxHash, this.getTableName())) {
|
|
return DbEventProcess.#tableMaxIdxHash[this.getTableName()];
|
|
} else {
|
|
return BigInt(0);
|
|
}
|
|
}
|
|
|
|
async process(row) {
|
|
const logHead = this.getInstanceName() + ' process: ';
|
|
while (true) {
|
|
try {
|
|
const handle = eventsFactory.createEventHandle(this, this.conn, row);
|
|
try {
|
|
await handle.start();
|
|
break;
|
|
} finally {
|
|
await handle.safeRelease();
|
|
}
|
|
} catch (err) {
|
|
utils.safeDumpErrStack(err);
|
|
log.error(logHead + err);
|
|
await utils.sleep(5000 + utils.randRange(500, 1500));
|
|
}
|
|
}
|
|
const idx = BigInt(row['idx']);
|
|
if (idx > this.lastIdx) {
|
|
this.lastIdx = idx;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
}
|
|
|
|
module.exports = DbEventProcess;
|