diff --git a/server/web3bcspider/services/erc721_refresher.js b/server/web3bcspider/services/erc721_refresher.js index cbde1e5..259e576 100644 --- a/server/web3bcspider/services/erc721_refresher.js +++ b/server/web3bcspider/services/erc721_refresher.js @@ -7,15 +7,14 @@ const BaseService = require('./baseservice'); class Erc721Refresher extends BaseService { - async init(bc, net, event) { + async init(bc, net, refresher) { const {err, conn} = await app.getDbConn(constant.BCEVENTDB_NAME); this.conn = conn; - this.lastBlockNumber = 0; this.bc = bc; this.net = net; - this.event = event; - this.eventConf = this.event['eventConf']; - this.progInfo = this.event['progressInfo']; + this.refresher = refresher; + this.conf = this.refresher['conf']; + this.progInfo = this.refresher['progressInfo']; await this.start(); } @@ -27,178 +26,7 @@ class Erc721Refresher extends BaseService { } async pullEvent() { - const logHead = this.getInstanceName() + ' pullEvent: '; - while (true) { - await this.bc.lockQuery(); - try { - const fromBlock = await this.getFromBlock(); - const toBlock = await this.calcToBlock(fromBlock); - if (toBlock > fromBlock) { - const events = await this.bc.getPastEvents( - this.getContractName(), - this.getEventName(), - { - fromBlock: fromBlock, - toBlock: toBlock, - }, - ); - await this.processEvents(events, fromBlock, toBlock); - await this.saveLastBlockNumber(toBlock); - } - ++this.progInfo['pullCount']; - return; - } catch (err) { - log.error(logHead + err); - await utils.sleep(1000 + utils.randRange(10, 2000)); - } finally { - await this.bc.unlockQuery(); - } - } - } - - async processEvents(events, fromBlock, toBlock) { - this.progInfo['fromBlock'] = fromBlock; - this.progInfo['toBlock'] = toBlock; - this.progInfo['currBlock'] = this.bc.getCurrBlockNumber(); - this.progInfo['eventCount'] += events.length; - if (events.length <= 0) { - return; - } - console.log(events); - utils.serial - (events, - async (event) => { - while (true) { - try { - await this.saveToDb(event); - return; - } catch (err) { - log.error(err); - } - await utils.sleep(8000 + utils.randRange(500, 1500)); - } - }); - } - - async getFromBlock() { - const logHead = this.getInstanceName() + ' getFromBlock: '; - const firstBlockNumber = this.getInitBlock(); - while (this.lastBlockNumber < 1) { - try { - const {err, row} = await this.conn.ormSelectOne( - 't_last_block', - [ - ['net_id', this.getNetId()], - ['contract_address', this.getContractAddress()], - ['event_name', this.getEventName()], - ] - ); - if (!err) { - if (row) { - this.lastBlockNumber = Number(row['block_number']); - } else { - this.lastBlockNumber = firstBlockNumber; - } - } - console.log(logHead, this.lastBlockNumber, this.bc.getCurrBlockNumber()); - while (this.lastBlockNumber + 8 > this.bc.getCurrBlockNumber()) { - await utils.sleep(1000 + utils.randRange(500, 1500)); - } - continue; - } catch (err) { - log.error(err); - } - await utils.sleep(5000 + utils.randRange(500, 1500)); - } - return this.lastBlockNumber + 1; - } - - async calcToBlock(fromBlock) { - const currBlockNumber = this.bc.getCurrBlockNumber(); - const distanceBlock = currBlockNumber - fromBlock - 8; - const batchBlockNum = 888; - if (distanceBlock > 0) { - if (distanceBlock > batchBlockNum) { - return fromBlock + batchBlockNum; - } else { - return fromBlock + distanceBlock; - } - } - return fromBlock; - } - - async saveLastBlockNumber(blockNumber) { - const logHead = this.getInstanceName() + ' event_process.saveLastBlockNumber: '; - while (true) { - const {err} = await this.conn.upsert( - 't_last_block', - [ - ['net_id', this.getNetId()], - ['contract_address', this.getContractAddress()], - ['event_name', this.getEventName()], - ], - [ - ['block_number', blockNumber], - ['modifytime', utils.getUtcTime()], - ], - [ - ['net_id', this.getNetId()], - ['contract_address', this.getContractAddress()], - ['event_name', this.getEventName()], - ['block_number', blockNumber], - ['contract_name', this.getContractName()], - ['createtime', utils.getUtcTime()], - ['modifytime', utils.getUtcTime()], - ] - ); - if (!err) { - break; - } - log.error(logHead + err); - await utils.sleep(5000 + utils.randRange(500, 1500)); - } - this.lastBlockNumber = blockNumber; - } - - async saveToDb(event) { - const logHead = this.getInstanceName() + ' event_process.saveToDb: '; - while (true) { - const nowTime = utils.getUtcTime(); - const returnValues = event['returnValues']; - const hashCode = ''; - const {err} = await this.conn.upsert( - 't_blockchain_event', - [ - ['txhash', event['transactionHash']], - ['hash_code', hashCode], - ['log_index', event['logIndex']], - ['net_id', this.bc.getNetId()], - ['event_name', this.getEventName()], - ['contract_address', this.getContractAddress()], - ], - [ - ], - [ - ['txhash', event['transactionHash']], - ['hash_code', hashCode], - ['log_index', event['logIndex']], - ['net_id', this.bc.getNetId()], - ['event_name', this.getEventName()], - ['contract_address', this.getContractAddress()], - ['contract_name', this.getContractName()], - ['block_number', event['blockNumber']], - ['raw_data', utils.jsonEncode(event)], - ['return_values', utils.jsonEncode(returnValues)], - ['createtime', utils.getUtcTime()], - ['modifytime', utils.getUtcTime()], - ] - ); - if (!err) { - break; - } - log.error(logHead + err); - await utils.sleep(5000 + utils.randRange(500, 1500)); - } + const logHead = this.genLogHead() + ' pullEvent: '; } getNetId() { @@ -210,11 +38,11 @@ class Erc721Refresher extends BaseService { } getContractName() { - return this.eventConf['contract_name']; + return this.conf['contract_name']; } - getInstanceName() { - const instName = this.getNetId() + '.' + this.getContractName() + '.' + this.getEventName(); + genLogHead(msg) { + const instName = this.getNetId() + '.' + this.getContractName() + ' erc721 refresher ' + msg; return instName; }