This commit is contained in:
aozhiwei 2023-07-10 12:31:31 +08:00
parent b55a5c097e
commit 24bc7f1110

View File

@ -7,15 +7,14 @@ const BaseService = require('./baseservice');
class Erc721Refresher extends BaseService { class Erc721Refresher extends BaseService {
async init(bc, net, event) { async init(bc, net, refresher) {
const {err, conn} = await app.getDbConn(constant.BCEVENTDB_NAME); const {err, conn} = await app.getDbConn(constant.BCEVENTDB_NAME);
this.conn = conn; this.conn = conn;
this.lastBlockNumber = 0;
this.bc = bc; this.bc = bc;
this.net = net; this.net = net;
this.event = event; this.refresher = refresher;
this.eventConf = this.event['eventConf']; this.conf = this.refresher['conf'];
this.progInfo = this.event['progressInfo']; this.progInfo = this.refresher['progressInfo'];
await this.start(); await this.start();
} }
@ -27,178 +26,7 @@ class Erc721Refresher extends BaseService {
} }
async pullEvent() { async pullEvent() {
const logHead = this.getInstanceName() + ' pullEvent: '; const logHead = this.genLogHead() + ' pullEvent: ';
while (true) {
await this.bc.lockQuery();
try {
const fromBlock = await this.getFromBlock();
const toBlock = await this.calcToBlock(fromBlock);
if (toBlock > fromBlock) {
const events = await this.bc.getPastEvents(
this.getContractName(),
this.getEventName(),
{
fromBlock: fromBlock,
toBlock: toBlock,
},
);
await this.processEvents(events, fromBlock, toBlock);
await this.saveLastBlockNumber(toBlock);
}
++this.progInfo['pullCount'];
return;
} catch (err) {
log.error(logHead + err);
await utils.sleep(1000 + utils.randRange(10, 2000));
} finally {
await this.bc.unlockQuery();
}
}
}
async processEvents(events, fromBlock, toBlock) {
this.progInfo['fromBlock'] = fromBlock;
this.progInfo['toBlock'] = toBlock;
this.progInfo['currBlock'] = this.bc.getCurrBlockNumber();
this.progInfo['eventCount'] += events.length;
if (events.length <= 0) {
return;
}
console.log(events);
utils.serial
(events,
async (event) => {
while (true) {
try {
await this.saveToDb(event);
return;
} catch (err) {
log.error(err);
}
await utils.sleep(8000 + utils.randRange(500, 1500));
}
});
}
async getFromBlock() {
const logHead = this.getInstanceName() + ' getFromBlock: ';
const firstBlockNumber = this.getInitBlock();
while (this.lastBlockNumber < 1) {
try {
const {err, row} = await this.conn.ormSelectOne(
't_last_block',
[
['net_id', this.getNetId()],
['contract_address', this.getContractAddress()],
['event_name', this.getEventName()],
]
);
if (!err) {
if (row) {
this.lastBlockNumber = Number(row['block_number']);
} else {
this.lastBlockNumber = firstBlockNumber;
}
}
console.log(logHead, this.lastBlockNumber, this.bc.getCurrBlockNumber());
while (this.lastBlockNumber + 8 > this.bc.getCurrBlockNumber()) {
await utils.sleep(1000 + utils.randRange(500, 1500));
}
continue;
} catch (err) {
log.error(err);
}
await utils.sleep(5000 + utils.randRange(500, 1500));
}
return this.lastBlockNumber + 1;
}
async calcToBlock(fromBlock) {
const currBlockNumber = this.bc.getCurrBlockNumber();
const distanceBlock = currBlockNumber - fromBlock - 8;
const batchBlockNum = 888;
if (distanceBlock > 0) {
if (distanceBlock > batchBlockNum) {
return fromBlock + batchBlockNum;
} else {
return fromBlock + distanceBlock;
}
}
return fromBlock;
}
async saveLastBlockNumber(blockNumber) {
const logHead = this.getInstanceName() + ' event_process.saveLastBlockNumber: ';
while (true) {
const {err} = await this.conn.upsert(
't_last_block',
[
['net_id', this.getNetId()],
['contract_address', this.getContractAddress()],
['event_name', this.getEventName()],
],
[
['block_number', blockNumber],
['modifytime', utils.getUtcTime()],
],
[
['net_id', this.getNetId()],
['contract_address', this.getContractAddress()],
['event_name', this.getEventName()],
['block_number', blockNumber],
['contract_name', this.getContractName()],
['createtime', utils.getUtcTime()],
['modifytime', utils.getUtcTime()],
]
);
if (!err) {
break;
}
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
this.lastBlockNumber = blockNumber;
}
async saveToDb(event) {
const logHead = this.getInstanceName() + ' event_process.saveToDb: ';
while (true) {
const nowTime = utils.getUtcTime();
const returnValues = event['returnValues'];
const hashCode = '';
const {err} = await this.conn.upsert(
't_blockchain_event',
[
['txhash', event['transactionHash']],
['hash_code', hashCode],
['log_index', event['logIndex']],
['net_id', this.bc.getNetId()],
['event_name', this.getEventName()],
['contract_address', this.getContractAddress()],
],
[
],
[
['txhash', event['transactionHash']],
['hash_code', hashCode],
['log_index', event['logIndex']],
['net_id', this.bc.getNetId()],
['event_name', this.getEventName()],
['contract_address', this.getContractAddress()],
['contract_name', this.getContractName()],
['block_number', event['blockNumber']],
['raw_data', utils.jsonEncode(event)],
['return_values', utils.jsonEncode(returnValues)],
['createtime', utils.getUtcTime()],
['modifytime', utils.getUtcTime()],
]
);
if (!err) {
break;
}
log.error(logHead + err);
await utils.sleep(5000 + utils.randRange(500, 1500));
}
} }
getNetId() { getNetId() {
@ -210,11 +38,11 @@ class Erc721Refresher extends BaseService {
} }
getContractName() { getContractName() {
return this.eventConf['contract_name']; return this.conf['contract_name'];
} }
getInstanceName() { genLogHead(msg) {
const instName = this.getNetId() + '.' + this.getContractName() + '.' + this.getEventName(); const instName = this.getNetId() + '.' + this.getContractName() + ' erc721 refresher ' + msg;
return instName; return instName;
} }