This commit is contained in:
aozhiwei 2023-07-05 16:42:11 +08:00
parent d630f75e32
commit bd9d1f6278
2 changed files with 103 additions and 10 deletions

View File

@ -106,7 +106,7 @@ CREATE TABLE `t_dbpull_last_idx` (
`contract_address` varchar(60) NOT NULL DEFAULT '' COMMENT 'contract_address',
`contract_name` varchar(60) NOT NULL DEFAULT '' COMMENT 'contract_name',
`event_name` varchar(60) NOT NULL DEFAULT '' COMMENT 'event_name',
`block_number` bigint NOT NULL DEFAULT '0' COMMENT 'block_number',
`last_idx` bigint NOT NULL DEFAULT '0' COMMENT 'last_idx',
`createtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
`modifytime` int(11) NOT NULL DEFAULT '0' COMMENT '修改时间',
PRIMARY KEY (`idx`),
@ -127,7 +127,7 @@ CREATE TABLE `t_dbprocess_last_idx` (
`contract_address` varchar(60) NOT NULL DEFAULT '' COMMENT 'contract_address',
`contract_name` varchar(60) NOT NULL DEFAULT '' COMMENT 'contract_name',
`event_name` varchar(60) NOT NULL DEFAULT '' COMMENT 'event_name',
`block_number` bigint NOT NULL DEFAULT '0' COMMENT 'block_number',
`last_idx` bigint NOT NULL DEFAULT '0' COMMENT 'last_idx',
`createtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
`modifytime` int(11) NOT NULL DEFAULT '0' COMMENT '修改时间',
PRIMARY KEY (`idx`),

View File

@ -8,6 +8,12 @@ const LIMIT_COUNT = 100;
class PullDbEvent extends BaseService {
static maxIdx = BigInt(0);
static async staticInit() {
}
async init(net, event) {
const {err, conn} = await app.getDbConn('BcEventDb0');
this.conn = conn;
@ -31,13 +37,16 @@ class PullDbEvent extends BaseService {
try {
const startIdx = this.getStartIdx();
const endIdx = this.getEndIdx();
if (startIdx >= endIdx) {
return;
}
const {err, rows} = await this.conn.execQuery(
'SELECT * FROM t_blockchain_event WHERE `idx` > ? AND `idx` < ? ' +
'SELECT * FROM t_blockchain_event WHERE `idx` > ? AND `idx` <= ? ' +
'AND net_id = ? AND event_name = ? AND contract_address = ? ' +
'LIMIT ' + LIMIT_COUNT,
[
startIdx,
endIdx,
startIdx.toString(),
endIdx.toString(),
this.getNetId(),
this.getEventName(),
this.getContractAddress()
@ -46,22 +55,28 @@ class PullDbEvent extends BaseService {
throw err;
}
if (rows.length > 0) {
await utils.serial(
rows,
async (row) => {
await this.saveToDb(row);
}
);
} else {
this.lastIdx = endIdx;
}
await this.saveLastIdx(this.lastIdx);
} catch (err) {
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(1000, 3000));
}
}
async getStartIdx() {
getStartIdx() {
return this.lastIdx;
}
async getEndIdx() {
getEndIdx() {
return maxIdx;
}
getEventName() {
@ -81,6 +96,84 @@ class PullDbEvent extends BaseService {
return instName;
}
getTableName() {
}
async saveLastIdx(lastIdx) {
const logHead = this.getInstanceName() + ' saveLastIdx: ';
while (true) {
const {err} = await this.conn.upsert(
't_dbpull_last_idx',
[
['net_id', this.getNetId()],
['contract_address', this.getContractAddress()],
['event_name', this.getEventName()],
],
[
['last_idx', lastIdx.toString()],
['modifytime', utils.getUtcTime()],
],
[
['net_id', this.getNetId()],
['contract_address', this.getContractAddress()],
['event_name', this.getEventName()],
['contract_name', this.getContractName()],
['last_idx', lastIdx],
['createtime', utils.getUtcTime()],
['modifytime', utils.getUtcTime()],
]
);
if (!err) {
break;
}
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
}
async saveToDb(row) {
const logHead = this.getInstanceName() + ' saveToDb: ';
while (true) {
const {err} = await this.conn.upsert(
this.getTableName(),
[
['txhash', row['txhash']],
['hash_code', row['hash_code']],
['log_index', row['log_index']],
['net_id', row['net_id']],
['event_name', row['event_name']],
['contract_address', row['contract_address']],
],
[
],
[
['txhash', row['txhash']],
['hash_code', row['hash_code']],
['log_index', row['log_index']],
['net_id', row['net_id']],
['event_name', row['event_name']],
['contract_address', row['contract_address']],
['contract_name', row['contract_name']],
['block_number', row['block_number']],
['return_values', row['return_values']],
['src_idx', row['idx']],
['createtime', utils.getUtcTime()],
['modifytime', utils.getUtcTime()],
]
);
if (!err) {
break;
}
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
const idx = BigInt(row['idx']);
if (idx > this.lastIdx) {
this.lastIdx = idx;
}
}
}
module.exports = PullDbEvent;