aozhiwei f6cff25b5f 1
2022-03-30 01:34:02 +08:00

194 lines
5.5 KiB
JavaScript

const utils = require("./utils");
const db = require("./db");
const sync = require("./sync");
const metamgr = require("./metamgr");
const dbhelper = require("./dbhelper");
const log = require("./log");
const bc = require('./blockchain');
const C = require("./C");
class TransferMgr {
constructor() {
this.pendingWithdrawalHash = {};
this.historyWithdrawalCond = new sync.Cond();
this.newWithdrawalCond = new sync.Cond();
utils.registerEventHandler(
C.REMOVE_PENDING_WITHDRAWAL_EVENT,
(idx) => {
this.removePendingWithdrawal(idx);
}
);
}
init() {
setTimeout(this.start.bind(this), 1000 * 0.1);
setTimeout(this.procCoinEvent.bind(this, 'ceg_coinInstance', 'ceg_last_block_number'), 1000 * 3);
setTimeout(this.procCoinEvent.bind(this, 'cec_coinInstance', 'ceg_last_block_number'), 1000 * 3);
}
async start() {
const {err, row} = await db.execQueryOne(
'SELECT max(idx) max_idx FROM t_withdrawal', []);
if (err) {
throw err;
}
const hisMaxIdx = Math.max(0, utils.emptyReplace(row['max_idx'], 0));
setTimeout(this.historyWithdrawal.bind(this, hisMaxIdx), 1000 * 0.1);
setTimeout(this.newWithdrawal.bind(this, hisMaxIdx), 1000 * 0.1);
}
async historyWithdrawal(hisMaxIdx) {
let lastSyncIdx = hisMaxIdx;
while (lastSyncIdx > 0) {
const startIdx = Math.max(0, lastSyncIdx - 1000);
const {err, rows} = await db.execQuery(
'SELECT * FROM t_withdrawal WHERE idx > ? AND idx <= ? AND state = 0 LIMIT 1000',
[startIdx, lastSyncIdx]);
if (!err) {
this.procHisWithdrawalList(rows);
lastSyncIdx = startIdx;
if (rows.length <= 0) {
continue;
}
}
await this.historyWithdrawalCond.wait(1000 * 1);
}
log.info('history withdrawal loaded done hisMaxIdx:' + hisMaxIdx);
}
async newWithdrawal(hisMaxIdx) {
let lastSyncIdx = hisMaxIdx;
while (true) {
const {err, rows} = await db.execQuery(
'SELECT * FROM t_withdrawal WHERE idx > ? AND state = 0 LIMIT 100',
[lastSyncIdx]);
if (!err) {
const maxIdx = this.procNewWithdrawalList(rows);
if (maxIdx > lastSyncIdx) {
lastSyncIdx = maxIdx;
}
}
await this.newWithdrawalCond.wait(1000 * 3);
}
}
procHisWithdrawalList(rows) {
return this.internalProcWithdrawalList(rows, false);
}
procNewWithdrawalList(rows) {
return this.internalProcWithdrawalList(rows, true);
}
internalProcWithdrawalList(rows, isNewWithdrawal) {
let maxIdx = 0;
rows.forEach(function (withdrawalDb) {
this.addPendingWithdrawal(withdrawalDb, isNewWithdrawal);
if (withdrawalDb['idx'] > maxIdx) {
maxIdx = withdrawalDb['idx'];
}
}.bind(this));
return maxIdx;
}
addPendingWithdrawal(withdrawalDb, isNewWithdrawal) {
if (withdrawalDb['idx'] in this.pendingWithdrawalHash) {
utils.throwError('idx exists ' + withdrawalDb['idx']);
}
const a = require("./withdrawal");
const withdrawal = new a.Withdrawal(withdrawalDb, isNewWithdrawal);
this.pendingWithdrawalHash[withdrawalDb['idx']] = withdrawal;
withdrawal.init();
}
removePendingWithdrawal(withdrawalId) {
delete this.pendingWithdrawalHash[withdrawalId];
}
async procCoinEvent(instanceName, paramName) {
const logClass = 'procCoinEvent_' + instanceName + '_' + paramName;
await bc.mustBeActive();
let lastBlockNumber = await bc.getFirstBlockNumber();
{
const {err, row} = await dbhelper.mustBeOrmSelectOne(
't_parameter',
[
['name', paramName]
],
1000 * 1);
if (row) {
lastBlockNumber = '' + row['value'];
}
}
while (true) {
if (Number(lastBlockNumber) + 6 < Number(bc.getCurrBlockNumber())) {
try {
const events = await bc[instanceName].getPastEvents(
'Transfer',
{
fromBlock: Number(lastBlockNumber) - 4000,
toBlock: Number(lastBlockNumber) + 1000,
});
console.log(events);
if (events.length > 0) {
console.log(events);
for (let i = 0; i < events.length; ++i) {
const event = events[i];
await this.procOneCoinEvent(event);
const blockNumber = event['blockNumber'];
if (Number(blockNumber) > Number(lastBlockNumber)) {
lastBlockNumber = blockNumber;
}
}
}
} catch (err) {
log.error(util.format('%s err:%s',
logClass,
err
)
);
}
}
await utils.sleep(1000 * 1);
}
}
async procOneCoinEvent(event) {
const blockNumber = event['blockNumber'];
while (!bc.isComfirmed(blockNumber)) {
await utils.sleep(1000 * 1);
}
{
const {err, row} = await dbhelper.mustBeOrmSelectOne(
't_transfer',
[
['txhash', event['transactionHash']]
],
1000 * 1);
if (row) {
return;
}
}
{
await dbhelper.mustBeInsert
(
't_transfer',
[
['txhash', event['transactionHash']],
['_from', event.returnValues['from']],
['_to', event.returnValues['to']],
['value', event.returnValues['value']],
['state', 0],
['createtime', utils.getUtcTime()],
['modifytime', utils.getUtcTime()],
],
1000 * 10
);
}
}
}
module.exports = new TransferMgr();