const util = require("util"); const utils = require("./utils"); const db = require("./db"); const sync = require("./sync"); const metamgr = require("./metamgr"); const dbhelper = require("./dbhelper"); const log = require("./log"); const bc = require('./blockchain'); const C = require("./C"); class TransferMgr { constructor() { this.pendingWithdrawalHash = {}; this.historyWithdrawalCond = new sync.Cond(); this.newWithdrawalCond = new sync.Cond(); utils.registerEventHandler( C.REMOVE_PENDING_WITHDRAWAL_EVENT, (idx) => { this.removePendingWithdrawal(idx); } ); } init() { setTimeout(this.start.bind(this), 1000 * 0.1); setTimeout(this.procCoinEvent.bind(this, 'goldInstance', 'ceg_last_block_number', 1), 1000 * 3); setTimeout(this.procCoinEvent.bind(this, 'coinInstance', 'cec_last_block_number', 2), 1000 * 3); } async start() { const {err, row} = await db.execQueryOne( 'SELECT max(idx) max_idx FROM t_withdrawal', []); if (err) { throw err; } const hisMaxIdx = Math.max(0, utils.emptyReplace(row['max_idx'], 0)); setTimeout(this.historyWithdrawal.bind(this, hisMaxIdx), 1000 * 0.1); setTimeout(this.newWithdrawal.bind(this, hisMaxIdx), 1000 * 0.1); } async historyWithdrawal(hisMaxIdx) { let lastSyncIdx = hisMaxIdx; while (lastSyncIdx > 0) { const startIdx = Math.max(0, lastSyncIdx - 1000); const {err, rows} = await db.execQuery( 'SELECT * FROM t_withdrawal WHERE idx > ? AND idx <= ? AND state = 0 LIMIT 1000', [startIdx, lastSyncIdx]); if (!err) { this.procHisWithdrawalList(rows); lastSyncIdx = startIdx; if (rows.length <= 0) { continue; } } await this.historyWithdrawalCond.wait(1000 * 1); } log.info('history withdrawal loaded done hisMaxIdx:' + hisMaxIdx); } async newWithdrawal(hisMaxIdx) { let lastSyncIdx = hisMaxIdx; while (true) { const {err, rows} = await db.execQuery( 'SELECT * FROM t_withdrawal WHERE idx > ? AND state = 0 LIMIT 100', [lastSyncIdx]); if (!err) { const maxIdx = this.procNewWithdrawalList(rows); if (maxIdx > lastSyncIdx) { lastSyncIdx = maxIdx; } } await this.newWithdrawalCond.wait(1000 * 3); } } procHisWithdrawalList(rows) { return this.internalProcWithdrawalList(rows, false); } procNewWithdrawalList(rows) { return this.internalProcWithdrawalList(rows, true); } internalProcWithdrawalList(rows, isNewWithdrawal) { let maxIdx = 0; rows.forEach(function (withdrawalDb) { this.addPendingWithdrawal(withdrawalDb, isNewWithdrawal); if (withdrawalDb['idx'] > maxIdx) { maxIdx = withdrawalDb['idx']; } }.bind(this)); return maxIdx; } addPendingWithdrawal(withdrawalDb, isNewWithdrawal) { if (withdrawalDb['idx'] in this.pendingWithdrawalHash) { utils.throwError('idx exists ' + withdrawalDb['idx']); } const a = require("./withdrawal"); const withdrawal = new a.Withdrawal(withdrawalDb, isNewWithdrawal); this.pendingWithdrawalHash[withdrawalDb['idx']] = withdrawal; withdrawal.init(); } removePendingWithdrawal(withdrawalId) { delete this.pendingWithdrawalHash[withdrawalId]; } async procCoinEvent(instanceName, paramName, coinType) { const logClass = 'procCoinEvent_' + instanceName + '_' + paramName; await bc.mustBeActive(); let lastBlockNumber = await bc.getFirstBlockNumber(); { const {err, row} = await dbhelper.mustBeOrmSelectOne( 't_parameter', [ ['name', paramName] ], 1000 * 1); if (row) { lastBlockNumber = '' + row['value']; } else { await dbhelper.mustBeInsert( 't_parameter', [ ['name', paramName], ['value', lastBlockNumber] ], 1000 * 10); } } while (true) { const distance = Number(bc.getCurrBlockNumber()) - Number(lastBlockNumber); if (distance >= 6) { try { let toBlock = Number(lastBlockNumber) + Math.min(distance - 6, 1000); const events = await bc[instanceName].getPastEvents( 'Transfer', { fromBlock: Number(lastBlockNumber), toBlock: toBlock }); if (events.length > 0) { for (let i = 0; i < events.length; ++i) { const event = events[i]; await this.procOneCoinEvent(event, coinType); } } lastBlockNumber = toBlock; await dbhelper.update( 't_parameter', [ ['name', paramName], ], [ ['value', lastBlockNumber] ] ); } catch (err) { log.error(util.format('%s err:%s', logClass, err ) ); } } await utils.sleep(1000 * 1); } } async procOneCoinEvent(event, coinType) { const blockNumber = event['blockNumber']; while (!bc.isComfirmed(blockNumber)) { await utils.sleep(1000 * 1); } let transDb = null; { const {err, row} = await dbhelper.mustBeOrmSelectOne( 't_transfer', [ ['txhash', event['transactionHash']] ], 1000 * 1); if (row && row['state'] != 0) { return; } transDb = row; } if (!transDb) { await dbhelper.mustBeInsert ( 't_transfer', [ ['txhash', event['transactionHash']], ['block_number', blockNumber], ['type', coinType], ['_from', event.returnValues['from']], ['_to', event.returnValues['to']], ['value', event.returnValues['value']], ['state', 0], ['createtime', utils.getUtcTime()], ['modifytime', utils.getUtcTime()], ], 1000 * 10 ); } let dir = 0; let target = ''; if (utils.isSameAccount(event.returnValues['from'], metamgr.getUserAddress())) { dir = 1; target = event.returnValues['to']; } else if (utils.isSameAccount(event.returnValues['to'], metamgr.getUserAddress())) { dir = 0; target = event.returnValues['from']; } else { return; } const {err, data} = await utils.httpGet( metamgr.getServerConf()['gameapi_url'] + '/webapp/index.php', { 'c': 'Callback', 'a': 'transfer', 'dir': dir, 'txhash': event['transactionHash'], 'account': target, 'type': coinType, 'value': event.returnValues['value'] }); console.log(err, event['transactionHash'], data['data']); if (err) { await dbhelper.mustBeUpdate ( 't_transfer', [ ['txhash', event['transactionHash']], ], [ ['state', 2], ['reason', err], ], 1000 * 10 ); } else { //const rspObj = utils.jsonDecode('' + data['data']); const rspObj = data['data']; console.log(rspObj); if (rspObj && rspObj['errcode'] == 0) { await dbhelper.mustBeUpdate ( 't_transfer', [ ['txhash', event['transactionHash']], ], [ ['state', 1], ], 1000 * 10 ); } else { await dbhelper.mustBeUpdate ( 't_transfer', [ ['txhash', event['transactionHash']], ], [ ['state', 2], ['reason', data['data']], ], 1000 * 10 ); } } } } module.exports = new TransferMgr();