From 89a99d32c02817a26a671b3790375ebe3cebede3 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Mon, 12 Jun 2023 19:11:58 +0800 Subject: [PATCH] 1 --- server/web3spider/services/event_process.js | 220 ++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 server/web3spider/services/event_process.js diff --git a/server/web3spider/services/event_process.js b/server/web3spider/services/event_process.js new file mode 100644 index 0000000..f23d46e --- /dev/null +++ b/server/web3spider/services/event_process.js @@ -0,0 +1,220 @@ +const app = require('j7/app'); +const utils = require('j7/utils'); +const bcutils = require('j7/bcutils'); +const log = require('j7/log'); +const bc = require('../blockchain'); +const BaseService = require('./baseservice'); + +class EventProcess extends BaseService { + + async init(conn, instance, cb) { + this.conn = conn; + this.instance = instance; + this.cb = cb; + this.lastBlockNumber = 0; + await this.start(); + } + + async start() { + while (true) { + await this.pullEvent(); + await utils.sleep(8000 + utils.randRange(500, 1500)); + } + } + + async pullEvent() { + const logClass = this.getInstanceName() + ' pullEvent:'; + while (true) { + await bc.lockQuery(); + try { + const fromBlock = await this.getFromBlock(); + const toBlock = await this.calcToBlock(fromBlock); + if (toBlock > fromBlock) { + const events = await bc[this.getInstanceName()].getPastEvents( + this.instance['eventName'], + { + fromBlock: fromBlock, + toBlock: toBlock, + }, + ); + this.instance['fromBlock'] = fromBlock; + this.instance['toBlock'] = toBlock; + this.instance['currBlock'] = bc.getCurrBlockNumber(); + this.instance['eventCount'] += events.length; + if (events.length > 0) { + console.log(events); + await this.updateFirstBlockNumber(fromBlock); + } + await this.processEvents(events, toBlock); + await this.saveLastBlockNumber(toBlock); + } + ++this.instance['pullCount']; + return; + } catch (err) { + log.error(logClass + err); + await utils.sleep(1000 + utils.randRange(10, 2000)); + } finally { + await bc.unlockQuery(); + } + } + } + + getInstanceName() { + return this.instance['name']; + } + + async processEvents(events, toBlock) { + for (let i in events) { + while (true) { + try { + await this.cb(events[i]); + await this.saveToDb(events[i]); + break; + } catch (err) { + log.error(err); + } + await utils.sleep(8000 + utils.randRange(500, 1500)); + } + } + } + + async getFromBlock() { + const logClass = this.getInstanceName() + ' getFromBlock:'; + const firstBlockNumber = await bc.getFirstBlockNumber(); + while (this.lastBlockNumber < 1) { + try { + const {err, row} = await this.conn.ormSelectOne( + 't_parameter', + [ + ['name', this.getBlockNumberDbName()] + ] + ); + if (!err) { + if (row) { + this.lastBlockNumber = Number(row['value']); + } else { + this.lastBlockNumber = firstBlockNumber; + } + } + console.log(logClass, this.lastBlockNumber, bc.getCurrBlockNumber()); + while (this.lastBlockNumber + 8 > bc.getCurrBlockNumber()) { + await utils.sleep(1000 + utils.randRange(500, 1500)); + } + continue; + } catch (err) { + log.log(err); + } + await utils.sleep(5000 + utils.randRange(500, 1500)); + } + return this.lastBlockNumber + 1; + } + + async calcToBlock(fromBlock) { + const currBlockNumber = bc.getCurrBlockNumber(); + const distanceBlock = currBlockNumber - fromBlock - 8; + const batchBlockNum = 888; + if (distanceBlock > 0) { + if (distanceBlock > batchBlockNum) { + return fromBlock + batchBlockNum; + } else { + return fromBlock + distanceBlock; + } + } + return fromBlock; + } + + async saveLastBlockNumber(blockNumber) { + const logClass = 'event_process.saveLastBlockNumber'; + while (true) { + const {err} = await this.conn.upsert( + 't_parameter', + [ + ['name', this.getBlockNumberDbName()] + ], + [ + ['value', blockNumber], + ], + [ + ['name', this.getBlockNumberDbName()], + ['value', blockNumber], + ] + ); + if (!err) { + break; + } + log.error(logClass + err); + await utils.sleep(5000 + utils.randRange(500, 1500)); + } + this.lastBlockNumber = blockNumber; + } + + async updateFirstBlockNumber(blockNumber) { + const logClass = 'event_process.updateFirstBlockNumber'; + while (true) { + const {err} = await this.conn.upsert( + 't_parameter', + [ + ['name', this.getFirstBlockNumberDbName()] + ], + [ + ['!value', () => { + return 'LEAST(CONVERT(value, unsigned), ' + blockNumber + ')'; + }], + ], + [ + ['name', this.getFirstBlockNumberDbName()], + ['value', blockNumber], + ] + ); + if (!err) { + break; + } + log.error(logClass + err); + await utils.sleep(5000 + utils.randRange(500, 1500)); + } + } + + async saveToDb(event) { + const logClass = 'event_process.saveToDb'; + while (true) { + const nowTime = utils.getUtcTime(); + const returnValues = event['returnValues']; + const {err} = await this.conn.upsert( + 't_blockchain_event', + [ + ['txhash', event['transactionHash']], + ['log_index', event['logIndex']], + ], + [ + ], + [ + ['instance_name', this.instance['name']], + ['event_name', this.instance['eventName']], + ['txhash', event['transactionHash']], + ['log_index', event['logIndex']], + ['block_number', event['blockNumber']], + ['raw_data', utils.jsonEncode(event)], + ['return_values', utils.jsonEncode(returnValues)], + ['createtime', utils.getUtcTime()], + ['modifytime', utils.getUtcTime()], + ] + ); + if (!err) { + break; + } + log.error(logClass + err); + await utils.sleep(5000 + utils.randRange(500, 1500)); + } + } + + getBlockNumberDbName() { + return this.instance['name'] + '.' + this.instance['eventName'] + '.last_block_number'; + } + + getFirstBlockNumberDbName() { + return this.instance['name'] + '.' + this.instance['eventName'] + '.first_block_number'; + } + +} + +module.exports = EventProcess;