aozhiwei 55d4098be7 1
2023-07-05 20:05:39 +08:00

236 lines
6.2 KiB
JavaScript

const app = require('j7/app');
const utils = require('j7/utils');
const bcutils = require('j7/bcutils');
const log = require('j7/log');
const BaseService = require('./baseservice');
const LIMIT_COUNT = 100;
const BCEVENT_TABLE_NAME = 't_blockchain_event';
class PullDbEvent extends BaseService {
static #maxIdx = BigInt(0);
static async staticInit() {
const {err, conn} = await app.getDbConn('BcEventDb0');
{
const {err, maxIdx} = await conn.getMaxIdx(BCEVENT_TABLE_NAME);
if (err) {
throw 'PullDbEvent error:' + err;
}
PullDbEvent.#maxIdx = maxIdx;
}
const updateMaxIdxFunc = async () => {
while (true) {
const {err, maxIdx} = await conn.getMaxIdx(BCEVENT_TABLE_NAME);
if (!err) {
PullDbEvent.#maxIdx = maxIdx;
await utils.sleep(500 + utils.randRange(500, 1500));
} else {
await utils.sleep(5000 + utils.randRange(500, 1500));
}
}
};
updateMaxIdxFunc();
}
async init(net, event) {
const {err, conn} = await app.getDbConn('BcEventDb0');
this.conn = conn;
this.net = net;
this.event = event;
this.eventConf = this.event['eventConf'];
this.progInfo = this.event['progressInfo'];
this.contractAddress = this.net.getContractAddressByName(this.getContractName());
this.lastIdx = await this.getLastIdx();
this.progInfo['lastIdx'] = this.lastIdx.toString();
await this.start();
}
async start() {
while (true) {
await this.pullEvent();
await utils.sleep(500 + utils.randRange(500, 1500));
}
}
async pullEvent() {
const logHead = this.getInstanceName() + ' pullDbEvent: ';
try {
const startIdx = this.getStartIdx();
const endIdx = this.getEndIdx();
++this.progInfo['pullCount'];
if (startIdx >= endIdx) {
return;
}
const {err, rows} = await this.conn.execQuery(
'SELECT * FROM t_blockchain_event WHERE `idx` > ? AND `idx` <= ? ' +
'AND net_id = ? AND event_name = ? AND contract_address = ? ' +
'LIMIT ' + LIMIT_COUNT,
[
startIdx.toString(),
endIdx.toString(),
this.getNetId(),
this.getEventName(),
this.getContractAddress()
]);
if (err) {
throw err;
}
if (rows.length > 0) {
await utils.serial(
rows,
async (row) => {
await this.saveToDb(row);
}
);
this.progInfo['eventCount'] += rows.length;
} else {
this.lastIdx = endIdx;
}
this.progInfo['lastIdx'] = this.lastIdx.toString();
await this.saveLastIdx(this.lastIdx);
} catch (err) {
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(1000, 3000));
}
}
getStartIdx() {
return this.lastIdx;
}
getEndIdx() {
return PullDbEvent.#maxIdx;
}
getNetId() {
return this.net['net_id'];
}
getEventName() {
return this.eventConf['event_name'];
}
getContractAddress() {
return this.contractAddress;
}
getContractName() {
return this.eventConf['contract_name'];
}
getInstanceName() {
const instName = this.getNetId() + '.' + this.getContractName() + '.' + this.getEventName();
return instName;
}
getTableName() {
return this.eventConf['table_name'];
}
async getLastIdx() {
const logHead = this.getInstanceName() + ' getLastIdx: ';
while (true) {
const {err, row} = await this.conn.ormSelectOne(
't_dbpull_last_idx',
[
['net_id', this.getNetId()],
['contract_address', this.getContractAddress()],
['event_name', this.getEventName()],
]
);
if (err) {
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
continue;
}
if (row) {
return BigInt(row['last_idx']);
}
}
return BigInt(0);
}
async saveLastIdx(lastIdx) {
const logHead = this.getInstanceName() + ' saveLastIdx: ';
while (true) {
const {err} = await this.conn.upsert(
't_dbpull_last_idx',
[
['net_id', this.getNetId()],
['contract_address', this.getContractAddress()],
['event_name', this.getEventName()],
],
[
['last_idx', lastIdx.toString()],
['modifytime', utils.getUtcTime()],
],
[
['net_id', this.getNetId()],
['contract_address', this.getContractAddress()],
['event_name', this.getEventName()],
['contract_name', this.getContractName()],
['last_idx', lastIdx],
['createtime', utils.getUtcTime()],
['modifytime', utils.getUtcTime()],
]
);
if (!err) {
break;
}
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
}
async saveToDb(row) {
const logHead = this.getInstanceName() + ' saveToDb: ';
while (true) {
const {err} = await this.conn.upsert(
this.getTableName(),
[
['txhash', row['txhash']],
['hash_code', row['hash_code']],
['log_index', row['log_index']],
['net_id', row['net_id']],
['event_name', row['event_name']],
['contract_address', row['contract_address']],
],
[
],
[
['txhash', row['txhash']],
['hash_code', row['hash_code']],
['log_index', row['log_index']],
['net_id', row['net_id']],
['event_name', row['event_name']],
['contract_address', row['contract_address']],
['contract_name', row['contract_name']],
['block_number', row['block_number']],
['return_values', row['return_values']],
['src_idx', row['idx']],
['createtime', utils.getUtcTime()],
['modifytime', utils.getUtcTime()],
]
);
if (!err) {
break;
}
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
const idx = BigInt(row['idx']);
if (idx > this.lastIdx) {
this.lastIdx = idx;
}
}
getMaxIdx() {
return PullDbEvent.#maxIdx;
}
}
module.exports = PullDbEvent;