const app = require('j7/app'); const utils = require('j7/utils'); const bcutils = require('j7/bcutils'); const log = require('j7/log'); const bc = require('../blockchain'); const BaseService = require('./baseservice'); class EventProcess extends BaseService { async init(conn, instance, cb) { this.conn = conn; this.instance = instance; this.cb = cb; this.lastBlockNumber = 0; await this.start(); } async start() { while (true) { await this.pullEvent(); await utils.sleep(8000 + utils.randRange(500, 1500)); } } async pullEvent() { const logClass = this.getInstanceName() + ' pullEvent:'; while (true) { try { console.log('pullEvent1', utils.jsonEncode(this.instance)); const fromBlock = await this.getFromBlock(); console.log('pullEvent2', utils.jsonEncode(this.instance)); const toBlock = await this.calcToBlock(fromBlock); console.log('pullEvent', fromBlock, toBlock, bc.getCurrBlockNumber(), utils.jsonEncode(this.instance)); if (toBlock >= fromBlock) { const events = await bc[this.getInstanceName()].getPastEvents( this.instance['eventName'], { fromBlock: fromBlock, toBlock: toBlock, }, ); console.log(events); await this.processEvents(events, toBlock); await this.saveLastBlockNumber(toBlock); } return; } catch (err) { log.error(logClass + err); } } } getInstanceName() { return this.instance['name']; } async processEvents(events, toBlock) { for (let i in events) { while (true) { try { await this.cb(events[i]); break; } catch (err) { log.error(err); } await utils.sleep(8000 + utils.randRange(500, 1500)); } } } async getFromBlock() { const firstBlockNumber = await bc.getFirstBlockNumber(); while (this.lastBlockNumber < 1) { try { const {err, row} = await this.conn.ormSelectOne( 't_parameter', [ ['name', this.getBlockNumberDbName()] ] ); if (!err) { if (row) { this.lastBlockNumber = Number(row['value']); } else { this.lastBlockNumber = firstBlockNumber; } } console.log('getFromBlock', this.lastBlockNumber, bc.getCurrBlockNumber(), utils.jsonEncode(this.instance)); while (this.lastBlockNumber + 8 > bc.getCurrBlockNumber()) { await utils.sleep(1000 + utils.randRange(500, 1500)); } continue; } catch (err) { console.log(err); } await utils.sleep(5000 + utils.randRange(500, 1500)); } return this.lastBlockNumber; } async calcToBlock(fromBlock) { const currBlockNumber = bc.getCurrBlockNumber(); const distanceBlock = currBlockNumber - fromBlock - 8; if (distanceBlock > 0) { if (distanceBlock > 3000) { return fromBlock + 3000; } else { return fromBlock + distanceBlock; } } return fromBlock; } async saveLastBlockNumber(blockNumber) { while (true) { const {err} = await this.conn.upsert( 't_parameter', [ ['name', this.getBlockNumberDbName()] ], [ ['value', blockNumber], ], [ ['name', this.getBlockNumberDbName()], ['value', blockNumber], ] ); if (!err) { break; } await utils.sleep(5000 + utils.randRange(500, 1500)); } this.lastBlockNumber = blockNumber; } getBlockNumberDbName() { return this.instance['name'] + '.' + this.instance['eventName'] + '.last_block_number'; } } module.exports = EventProcess;