diff --git a/server/web3spider/tasks/event_center.js b/server/web3spider/tasks/event_center.js new file mode 100644 index 0000000..8599742 --- /dev/null +++ b/server/web3spider/tasks/event_center.js @@ -0,0 +1,485 @@ +const app = require('j7/app'); +const utils = require('j7/utils'); +const bcutils = require('j7/bcutils'); +const log = require('j7/log'); +const event = require('j7/event'); +const BaseService = require('./baseservice'); +const bc = require('../blockchain'); +const C = require('../C'); +const factory = require('./factory'); +const BoxOpenedProcess = require('./_internal/boxopened_process'); +const ActivateProcess = require('./_internal/activate_process'); +const NftTransferProcess = require('./_internal/nft_transfer_process'); +const Nft1155TransferProcess = require('./_internal/nft1155_transfer_process'); +const Activate721NftProcess = require('./_internal/activate721nft_process'); +const Activate1155NftProcess = require('./_internal/activate1155nft_process'); +const ShardMixSuccProcess = require('./_internal/shard_mix_succ_process'); +const ShardMixFailProcess = require('./_internal/shard_mix_fail_process'); +const ShardMintSuccProcess = require('./_internal/shard_mint_succ_process'); +const ShardMintFailProcess = require('./_internal/shard_mint_fail_process'); +const EvolveSuccProcess = require('./_internal/evolve_succ_process'); +const EvolveFailProcess = require('./_internal/evolve_fail_process'); +const ChipPluginProcess = require('./_internal/chip_plugin_process'); +const ChipUnplugProcess = require('./_internal/chip_unplug_process'); + +const LIMIT_COUNT = 100; + +class EventCenter extends BaseService { + + instances = [ + { + 'name': 'HEROInstance', + 'eventName': 'Transfer', + }, + { + 'name': 'WEAPONInstance', + 'eventName': 'Transfer', + }, + // { + // 'name': 'chipInstance', + // 'eventName': 'Transfer', + // }, + // { + // 'name': 'luckboxInstance', + // 'eventName': 'Transfer', + // } + ]; + + instances1155 = [ + { + 'name': 'chipInstance', + 'eventName': 'TransferSingle', + }, + { + 'name': 'chipInstance', + 'eventName': 'TransferBatch', + }, + { + 'name': 'shardInstance', + 'eventName': 'TransferSingle', + }, + { + 'name': 'shardInstance', + 'eventName': 'TransferBatch', + }, + ]; + + // boxInstance = { + // 'name': 'boxproxyInstance', + // 'eventName': 'BoxOpened', + // }; + + // activateInstance = { + // 'name': 'activateproxyInstance', + // 'eventName': 'LogNFTActivate', + // }; + + activate721NftInstance = { + 'name': 'UserMinterFactoryInstance', + 'eventName': 'TokenMinted', + }; + + activate1155NftInstance = { + 'name': 'nftMallInstance', + 'eventName': 'BuyTransactionBatch', + }; + + shardMixByUserFailInstance = { + 'name': 'userFactoryInstance', + 'eventName': 'TokenMintFail', + }; + + shardMixByUserSuccInstance = { + 'name': 'factoryInstance', + 'eventName': 'TokenMinted', + }; + + // shardMintByUserFailInstance = { + // 'name': 'userFactoryInstance', + // 'eventName': 'TokenMintFail', + // }; + + shardMintByUserSuccInstance = { + 'name': 'factoryInstance', + 'eventName': 'TokenMintedBatch', + }; + + evolveFailInstance = { + 'name': 'userProxyInstance', + 'eventName': 'TokenEvolveFail', + }; + + evolveSuccInstance = { + 'name': 'proxyInstance', + 'eventName': 'TokenEvolved', + }; + + chipPluginInstance = { + 'name': 'nftChipLockerInstance', + 'eventName': 'ChipPlugin', + }; + + chipUnplugInstance = { + 'name': 'nftChipLockerInstance', + 'eventName': 'ChipUnplug', + }; + + async init() { + this.lastIdx = 0; + this.pendingConfirmOwnerHash = {}; + this.pendingConfirmBalanceHash = {}; + + const {err, conn} = await app.getDbConn('MarketDb0'); + if (err) { + throw 'db error:' + err; + } + this.conn = conn; + + { + event.addListener(C.DESTORY_EXEC_CONFIRM_OWNER_EVENT, (tokenId) => { + log.info('destory confirm owner:' + tokenId); + try { + if (utils.getVal(this.pendingConfirmOwnerHash, tokenId)) { + delete this.pendingConfirmOwnerHash[tokenId]; + } + } catch(err) { + log.error('destory confirm owner:' + err); + } + }); + event.addListener(C.CREATE_EXEC_CONFIRM_OWNER_EVENT, (tokenId) => { + log.info('create confirm owner:' + tokenId); + try { + this.addConfirmOwnerRequest(tokenId); + } catch(err) { + log.error('create confirm owner:' + err); + } + }); + } + { + event.addListener + (C.DESTORY_EXEC_CONFIRM_BALANCE_EVENT, + (instanceName, address, tokenId) => { + log.info('destory confirm balance:' + instanceName + ' ' + address + tokenId); + try { + const key = this.genBalanceKey(instanceName, address, tokenId); + if (utils.getVal(this.pendingConfirmBalanceHash, key)) { + delete this.pendingConfirmBalanceHash[key]; + } + } catch(err) { + log.error('destory confirm balance:' + err); + } + }); + event.addListener + (C.CREATE_EXEC_CONFIRM_BALANCE_EVENT, + (instanceName, address, tokenIds, idx) => { + log.info('create confirm balance:' + instanceName + ' ' + address + utils.jsonEncode(tokenIds)); + try { + this.addConfirmBalanceRequest(instanceName, address, tokenIds, idx); + } catch(err) { + log.error('create confirm balance:' + err); + } + }); + } + + this.confirmOwner(); + this.confirmBalance(); + //this.addConfirmBalanceRequest('shardInstance', '0xffcf8fdee72ac11b5c542428b35eef5769c409f0', [1], 0); + await this.initEventProcess(); + } + + async initEventProcess() { + const initInstance = (instance) => { + instance['pullCount'] = 0; + instance['eventCount'] = 0; + instance['fromBlock'] = 0; + instance['toBlock'] = 0; + instance['currBlock'] = 0; + }; + const allInstances = []; + { + this.instances.forEach((item) => { + allInstances.push(item); + }); + // this.instances1155.forEach((item) => { + // allInstances.push(item); + // }); + //allInstances.push(this.boxInstance); + //allInstances.push(this.activateInstance); + allInstances.push(this.activate721NftInstance); + //allInstances.push(this.activate1155NftInstance); + //allInstances.push(this.shardMixByUserFailInstance); + //allInstances.push(this.shardMixByUserSuccInstance); + //allInstances.push(this.shardMintByUserFailInstance); + //allInstances.push(this.shardMintByUserSuccInstance); + //allInstances.push(this.evolveFailInstance); + //allInstances.push(this.evolveSuccInstance); + //allInstances.push(this.chipPluginInstance); + //allInstances.push(this.chipUnplugInstance); + console.log(allInstances); + { + const instanceHash = {}; + allInstances.forEach((item) => { + const key = item['name'] + '.' + item['eventName']; + if (utils.hasKey(instanceHash, key)) { + console.log(key); + throw '!!!!duplicate instance'; + } + instanceHash[key] = item; + }); + } + allInstances.forEach((item) => { + initInstance(item); + }); + + let count = 0; + const outputLog = async () => { + while (true) { + log.info(++count + '-------------------------------------------------------------'); + log.info('pendingConfirmOwnerHash.size:' + + Object.keys(this.pendingConfirmOwnerHash).length + + 'pendingConfirmBalanceHash.size:' + + Object.keys(this.pendingConfirmBalanceHash).length); + allInstances.forEach((item) => { + log.info(utils.jsonEncode(item)); + }); + await utils.sleep(1000 * 10); + } + }; + setTimeout(outputLog, 1000 * 3); + } + { + this.instances.forEach(async (item) => { + factory.create('EventProcess', null) + .init(this.conn, item, async (event) => { + await (new NftTransferProcess()).start(item, this.conn, event); + }); + }); + } + // { + // this.instances1155.forEach(async (item) => { + // factory.create('EventProcess', null) + // .init(this.conn, item, async (event) => { + // await (new Nft1155TransferProcess()).start(item, this.conn, event); + // }); + // }); + // } + /* + { + factory.create('EventProcess', null) + .init(this.conn, this.boxInstance, async (event) => { + await (new BoxOpenedProcess()).start(this.boxInstance, this.conn, event); + }); + } + { + factory.create('EventProcess', null) + .init(this.conn, this.activateInstance, async (event) => { + await (new ActivateProcess()).start(this.activateInstance, this.conn, event); + }); + }*/ + { + factory.create('EventProcess', null) + .init(this.conn, this.activate721NftInstance, async (event) => { + await (new Activate721NftProcess()).start(this.activate721NftInstance, this.conn, event); + }); + } + // { + // factory.create('EventProcess', null) + // .init(this.conn, this.activate1155NftInstance, async (event) => { + // await (new Activate1155NftProcess()).start(this.activate1155NftInstance, this.conn, event); + // }); + // } + // { + // factory.create('EventProcess', null) + // .init(this.conn, this.shardMixByUserFailInstance, async (event) => { + // await (new ShardMixFailProcess()).start(this.shardMixByUserFailInstance, this.conn, event); + // }); + // } + // { + // factory.create('EventProcess', null) + // .init(this.conn, this.shardMixByUserSuccInstance, async (event) => { + // await (new ShardMixSuccProcess()).start(this.shardMixByUserSuccInstance, this.conn, event); + // }); + // } + { + /*factory.create('EventProcess', null) + .init(this.conn, this.shardMintByUserFailInstance, async (event) => { + await (new ShardMintFailProcess()).start(this.shardMintByUserFailInstance, this.conn, event); + }); + */ + } + // { + // factory.create('EventProcess', null) + // .init(this.conn, this.shardMintByUserSuccInstance, async (event) => { + // await (new ShardMintSuccProcess()).start(this.shardMintByUserSuccInstance, this.conn, event); + // }); + // } + // { + // factory.create('EventProcess', null) + // .init(this.conn, this.evolveFailInstance, async (event) => { + // await (new EvolveFailProcess()).start(this.evolveFailInstance, this.conn, event); + // }); + // } + // { + // factory.create('EventProcess', null) + // .init(this.conn, this.evolveSuccInstance, async (event) => { + // await (new EvolveSuccProcess()).start(this.evolveSuccInstance, this.conn, event); + // }); + // } + // { + // factory.create('EventProcess', null) + // .init(this.conn, this.chipPluginInstance, async (event) => { + // await (new ChipPluginProcess()).start(this.chipPluginInstance, this.conn, event); + // }); + // } + // { + // factory.create('EventProcess', null) + // .init(this.conn, this.chipUnplugInstance, async (event) => { + // await (new ChipUnplugProcess()).start(this.chipUnplugInstance, this.conn, event); + // }); + // } + } + + async addConfirmOwnerRequest(tokenId) { + const pendingRequest = utils.getVal(this.pendingConfirmOwnerHash, tokenId); + if (!pendingRequest) { + const newRequest = new factory.create('ExecConfirmOwner', null); + this.pendingConfirmOwnerHash[tokenId] = newRequest; + newRequest.init(tokenId); + return; + } else { + pendingRequest.addTryCount(); + } + } + + async addConfirmBalanceRequest(instanceName, address, tokenIds, idx) { + const syncTokenIds = []; + tokenIds.forEach( + (element) => { + const tokenId = element; + const key = this.genBalanceKey(instanceName, address, tokenId); + const pendingRequest = utils.getVal(this.pendingConfirmBalanceHash, key); + if (pendingRequest) { + pendingRequest.addTryCount(tokenId); + } else { + syncTokenIds.push(tokenId); + } + } + ); + if (syncTokenIds.length > 0) { + const newRequest = new factory.create('ExecConfirmBalance', null); + syncTokenIds.forEach( + (element) => { + const tokenId = element; + const key = this.genBalanceKey(instanceName, address, tokenId); + this.pendingConfirmBalanceHash[key] = newRequest; + } + ); + newRequest.init(instanceName, address, syncTokenIds, idx); + } + } + + async confirmOwner() { + let maxIdx = 0; + while (true) { + { + const {err, rows} = await this.conn.execQuery( + 'SELECT * FROM t_nft_transfer WHERE `idx` > ? AND ' + + '`owner_confirmed` = 0 LIMIT ' + LIMIT_COUNT, + [this.lastIdx]); + if (!err) { + for (let i in rows) { + const row = rows[i]; + this.addConfirmOwnerRequest(row['token_id']); + if (row['idx'] > this.lastIdx) { + this.lastIdx = row['idx']; + } + } + if (rows.length < 1 && this.lastIdx + LIMIT_COUNT < maxIdx) { + this.lastIdx += LIMIT_COUNT; + continue; + } + } + } + { + const {err, row} = await this.conn.execQueryOne( + 'SELECT max(idx) max_idx FROM t_nft_transfer', []); + if (!err && row['max_idx'] != null) { + maxIdx = row['max_idx']; + } + } + while (Object.keys(this.pendingConfirmOwnerHash).length > 50) { + await utils.sleep(1000 + utils.randRange(500, 1500)); + } + await utils.sleep(2000 + utils.randRange(500, 1500)); + } + } + + async confirmBalance() { + let maxIdx = 0; + let lastIdx = 0; + while (true) { + { + const {err, rows} = await this.conn.execQuery( + 'SELECT * FROM t_nft1155_transfer WHERE `idx` > ? AND ' + + '`owner_confirmed` = 0 LIMIT ' + LIMIT_COUNT, + [lastIdx]); + if (!err) { + for (let i in rows) { + const row = rows[i]; + const instanceName = row['instance_name']; + const tokenIds = this.extractTokenIds(row); + console.log('confirmBalance', instanceName, tokenIds, row); + if (bcutils.isValidAddress(row['_from'])) { + this.addConfirmBalanceRequest(instanceName, row['_from'], tokenIds, row['idx']); + } + if (bcutils.isValidAddress(row['_to'])) { + this.addConfirmBalanceRequest(instanceName, row['_to'], tokenIds, row['idx']); + } + if (row['idx'] > lastIdx) { + lastIdx = row['idx']; + } + } + if (rows.length < 1 && lastIdx + LIMIT_COUNT < maxIdx) { + lastIdx += LIMIT_COUNT; + continue; + } + } + } + { + const {err, row} = await this.conn.execQueryOne( + 'SELECT max(idx) max_idx FROM t_nft1155_transfer', []); + if (!err && row['max_idx'] != null) { + maxIdx = row['max_idx']; + } + } + while (Object.keys(this.pendingConfirmBalanceHash).length > 50) { + await utils.sleep(1000 + utils.randRange(500, 1500)); + } + await utils.sleep(2000 + utils.randRange(500, 1500)); + } + } + + extractTokenIds(row) { + let tokenIds = []; + try { + const returnValues = utils.jsonDecode(row['return_values']); + const eventName = row['event_name']; + console.log(returnValues, eventName); + if (eventName == 'TransferSingle') { + tokenIds.push(returnValues['id']); + } else if (eventName == 'TransferBatch') { + tokenIds = returnValues['ids']; + } + } catch (e) { + + } + return tokenIds; + } + + genBalanceKey(instanceName, address, tokenId) { + return instanceName + '!' + address.toString() + '!' + tokenId; + } + +} + +module.exports = EventCenter;