diff --git a/database/bcevent.sql b/database/bcevent.sql index f799c0b..8d7ef29 100644 --- a/database/bcevent.sql +++ b/database/bcevent.sql @@ -106,7 +106,7 @@ CREATE TABLE `t_dbpull_last_idx` ( `contract_address` varchar(60) NOT NULL DEFAULT '' COMMENT 'contract_address', `contract_name` varchar(60) NOT NULL DEFAULT '' COMMENT 'contract_name', `event_name` varchar(60) NOT NULL DEFAULT '' COMMENT 'event_name', - `block_number` bigint NOT NULL DEFAULT '0' COMMENT 'block_number', + `last_idx` bigint NOT NULL DEFAULT '0' COMMENT 'last_idx', `createtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间', `modifytime` int(11) NOT NULL DEFAULT '0' COMMENT '修改时间', PRIMARY KEY (`idx`), @@ -127,7 +127,7 @@ CREATE TABLE `t_dbprocess_last_idx` ( `contract_address` varchar(60) NOT NULL DEFAULT '' COMMENT 'contract_address', `contract_name` varchar(60) NOT NULL DEFAULT '' COMMENT 'contract_name', `event_name` varchar(60) NOT NULL DEFAULT '' COMMENT 'event_name', - `block_number` bigint NOT NULL DEFAULT '0' COMMENT 'block_number', + `last_idx` bigint NOT NULL DEFAULT '0' COMMENT 'last_idx', `createtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间', `modifytime` int(11) NOT NULL DEFAULT '0' COMMENT '修改时间', PRIMARY KEY (`idx`), diff --git a/server/web3dbspider/services/pull_dbevent.js b/server/web3dbspider/services/pull_dbevent.js index af3dce9..c156aef 100644 --- a/server/web3dbspider/services/pull_dbevent.js +++ b/server/web3dbspider/services/pull_dbevent.js @@ -8,6 +8,12 @@ const LIMIT_COUNT = 100; class PullDbEvent extends BaseService { + static maxIdx = BigInt(0); + + static async staticInit() { + + } + async init(net, event) { const {err, conn} = await app.getDbConn('BcEventDb0'); this.conn = conn; @@ -31,13 +37,16 @@ class PullDbEvent extends BaseService { try { const startIdx = this.getStartIdx(); const endIdx = this.getEndIdx(); + if (startIdx >= endIdx) { + return; + } const {err, rows} = await this.conn.execQuery( - 'SELECT * FROM t_blockchain_event WHERE `idx` > ? AND `idx` < ? ' + + 'SELECT * FROM t_blockchain_event WHERE `idx` > ? AND `idx` <= ? ' + 'AND net_id = ? AND event_name = ? AND contract_address = ? ' + 'LIMIT ' + LIMIT_COUNT, [ - startIdx, - endIdx, + startIdx.toString(), + endIdx.toString(), this.getNetId(), this.getEventName(), this.getContractAddress() @@ -46,22 +55,28 @@ class PullDbEvent extends BaseService { throw err; } if (rows.length > 0) { - + await utils.serial( + rows, + async (row) => { + await this.saveToDb(row); + } + ); } else { this.lastIdx = endIdx; } + await this.saveLastIdx(this.lastIdx); } catch (err) { log.error(logHead + err); await utils.sleep(5000 + utils.randRange(1000, 3000)); } } - async getStartIdx() { - + getStartIdx() { + return this.lastIdx; } - async getEndIdx() { - + getEndIdx() { + return maxIdx; } getEventName() { @@ -81,6 +96,84 @@ class PullDbEvent extends BaseService { return instName; } + getTableName() { + + } + + async saveLastIdx(lastIdx) { + const logHead = this.getInstanceName() + ' saveLastIdx: '; + while (true) { + const {err} = await this.conn.upsert( + 't_dbpull_last_idx', + [ + ['net_id', this.getNetId()], + ['contract_address', this.getContractAddress()], + ['event_name', this.getEventName()], + ], + [ + ['last_idx', lastIdx.toString()], + ['modifytime', utils.getUtcTime()], + ], + [ + ['net_id', this.getNetId()], + ['contract_address', this.getContractAddress()], + ['event_name', this.getEventName()], + ['contract_name', this.getContractName()], + ['last_idx', lastIdx], + ['createtime', utils.getUtcTime()], + ['modifytime', utils.getUtcTime()], + ] + ); + if (!err) { + break; + } + log.error(logHead + err); + await utils.sleep(5000 + utils.randRange(500, 1500)); + } + } + + async saveToDb(row) { + const logHead = this.getInstanceName() + ' saveToDb: '; + while (true) { + const {err} = await this.conn.upsert( + this.getTableName(), + [ + ['txhash', row['txhash']], + ['hash_code', row['hash_code']], + ['log_index', row['log_index']], + ['net_id', row['net_id']], + ['event_name', row['event_name']], + ['contract_address', row['contract_address']], + ], + [ + ], + [ + ['txhash', row['txhash']], + ['hash_code', row['hash_code']], + ['log_index', row['log_index']], + ['net_id', row['net_id']], + ['event_name', row['event_name']], + ['contract_address', row['contract_address']], + ['contract_name', row['contract_name']], + ['block_number', row['block_number']], + ['return_values', row['return_values']], + ['src_idx', row['idx']], + ['createtime', utils.getUtcTime()], + ['modifytime', utils.getUtcTime()], + ] + ); + if (!err) { + break; + } + log.error(logHead + err); + await utils.sleep(5000 + utils.randRange(500, 1500)); + } + const idx = BigInt(row['idx']); + if (idx > this.lastIdx) { + this.lastIdx = idx; + } + } + } module.exports = PullDbEvent;