This commit is contained in:
aozhiwei 2023-06-12 19:11:58 +08:00
parent 09e20c2cbb
commit 89a99d32c0

View File

@ -0,0 +1,220 @@
const app = require('j7/app');
const utils = require('j7/utils');
const bcutils = require('j7/bcutils');
const log = require('j7/log');
const bc = require('../blockchain');
const BaseService = require('./baseservice');
class EventProcess extends BaseService {
async init(conn, instance, cb) {
this.conn = conn;
this.instance = instance;
this.cb = cb;
this.lastBlockNumber = 0;
await this.start();
}
async start() {
while (true) {
await this.pullEvent();
await utils.sleep(8000 + utils.randRange(500, 1500));
}
}
async pullEvent() {
const logClass = this.getInstanceName() + ' pullEvent:';
while (true) {
await bc.lockQuery();
try {
const fromBlock = await this.getFromBlock();
const toBlock = await this.calcToBlock(fromBlock);
if (toBlock > fromBlock) {
const events = await bc[this.getInstanceName()].getPastEvents(
this.instance['eventName'],
{
fromBlock: fromBlock,
toBlock: toBlock,
},
);
this.instance['fromBlock'] = fromBlock;
this.instance['toBlock'] = toBlock;
this.instance['currBlock'] = bc.getCurrBlockNumber();
this.instance['eventCount'] += events.length;
if (events.length > 0) {
console.log(events);
await this.updateFirstBlockNumber(fromBlock);
}
await this.processEvents(events, toBlock);
await this.saveLastBlockNumber(toBlock);
}
++this.instance['pullCount'];
return;
} catch (err) {
log.error(logClass + err);
await utils.sleep(1000 + utils.randRange(10, 2000));
} finally {
await bc.unlockQuery();
}
}
}
getInstanceName() {
return this.instance['name'];
}
async processEvents(events, toBlock) {
for (let i in events) {
while (true) {
try {
await this.cb(events[i]);
await this.saveToDb(events[i]);
break;
} catch (err) {
log.error(err);
}
await utils.sleep(8000 + utils.randRange(500, 1500));
}
}
}
async getFromBlock() {
const logClass = this.getInstanceName() + ' getFromBlock:';
const firstBlockNumber = await bc.getFirstBlockNumber();
while (this.lastBlockNumber < 1) {
try {
const {err, row} = await this.conn.ormSelectOne(
't_parameter',
[
['name', this.getBlockNumberDbName()]
]
);
if (!err) {
if (row) {
this.lastBlockNumber = Number(row['value']);
} else {
this.lastBlockNumber = firstBlockNumber;
}
}
console.log(logClass, this.lastBlockNumber, bc.getCurrBlockNumber());
while (this.lastBlockNumber + 8 > bc.getCurrBlockNumber()) {
await utils.sleep(1000 + utils.randRange(500, 1500));
}
continue;
} catch (err) {
log.log(err);
}
await utils.sleep(5000 + utils.randRange(500, 1500));
}
return this.lastBlockNumber + 1;
}
async calcToBlock(fromBlock) {
const currBlockNumber = bc.getCurrBlockNumber();
const distanceBlock = currBlockNumber - fromBlock - 8;
const batchBlockNum = 888;
if (distanceBlock > 0) {
if (distanceBlock > batchBlockNum) {
return fromBlock + batchBlockNum;
} else {
return fromBlock + distanceBlock;
}
}
return fromBlock;
}
async saveLastBlockNumber(blockNumber) {
const logClass = 'event_process.saveLastBlockNumber';
while (true) {
const {err} = await this.conn.upsert(
't_parameter',
[
['name', this.getBlockNumberDbName()]
],
[
['value', blockNumber],
],
[
['name', this.getBlockNumberDbName()],
['value', blockNumber],
]
);
if (!err) {
break;
}
log.error(logClass + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
this.lastBlockNumber = blockNumber;
}
async updateFirstBlockNumber(blockNumber) {
const logClass = 'event_process.updateFirstBlockNumber';
while (true) {
const {err} = await this.conn.upsert(
't_parameter',
[
['name', this.getFirstBlockNumberDbName()]
],
[
['!value', () => {
return 'LEAST(CONVERT(value, unsigned), ' + blockNumber + ')';
}],
],
[
['name', this.getFirstBlockNumberDbName()],
['value', blockNumber],
]
);
if (!err) {
break;
}
log.error(logClass + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
}
async saveToDb(event) {
const logClass = 'event_process.saveToDb';
while (true) {
const nowTime = utils.getUtcTime();
const returnValues = event['returnValues'];
const {err} = await this.conn.upsert(
't_blockchain_event',
[
['txhash', event['transactionHash']],
['log_index', event['logIndex']],
],
[
],
[
['instance_name', this.instance['name']],
['event_name', this.instance['eventName']],
['txhash', event['transactionHash']],
['log_index', event['logIndex']],
['block_number', event['blockNumber']],
['raw_data', utils.jsonEncode(event)],
['return_values', utils.jsonEncode(returnValues)],
['createtime', utils.getUtcTime()],
['modifytime', utils.getUtcTime()],
]
);
if (!err) {
break;
}
log.error(logClass + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
}
getBlockNumberDbName() {
return this.instance['name'] + '.' + this.instance['eventName'] + '.last_block_number';
}
getFirstBlockNumberDbName() {
return this.instance['name'] + '.' + this.instance['eventName'] + '.first_block_number';
}
}
module.exports = EventProcess;