import { IChain } from 'chain/allchain' import { batchEthLogs, ethGetLogs, retryEthBlockNumber } from 'chain/chain.api' import logger from 'logger/logger' import { GeneralEvent } from 'models/GeneralEvent' import { RedisClient } from 'redis/RedisClient' import { decodeEvent, getTopics } from 'utils/event.util' import { NftHolder } from 'models/NftHolder' import { TokenHolder } from 'models/TokenHolder' import { NftStake } from 'models/NftStake' import { IEventCfg } from 'interface/IEventCfg' let eventProcessers = { NftHolder: NftHolder, TokenHolder: TokenHolder, NftStake: NftStake, } export class EventBatchSvr { chainCfg: IChain eventCfgs: IEventCfg[] = [] processer: Map = new Map() fromBlock: number = Number.MAX_SAFE_INTEGER redisKey = '' rpc = '' constructor(_chainCfg: IChain, _eventCfgs: IEventCfg[]) { this.chainCfg = _chainCfg this.eventCfgs = _eventCfgs this.rpc = _chainCfg.rpc.split('|')[0] for (let cfg of this.eventCfgs) { this.fromBlock = Math.min(this.fromBlock, cfg.fromBlock) if (!cfg.topic) { cfg.topic = getTopics(cfg) } this.processer.set((cfg.address + cfg.topic).toLowerCase(), cfg) } this.eventCfgs.sort((a, b) => a.topic.localeCompare(b.topic)) this.redisKey = `event_${this.chainCfg.id}` } async execute() { let currentBlock = await retryEthBlockNumber(this.rpc) let toBlock = parseInt(currentBlock.result, 16) let blockStr = await new RedisClient().get(this.redisKey) if (blockStr) { this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock) } logger.info(`sync events with chain: ${this.chainCfg.id}, from: ${this.fromBlock}, to: ${toBlock}`) let params = [] let uninParams = [] let topicsSet = new Set() for (let cfg of this.eventCfgs) { await this.fixBlockNumber(cfg) if (cfg.fromBlock != this.fromBlock) { let _param = this.buildQueryParams(cfg, toBlock) params.push(_param) } else { let _param = uninParams[uninParams.length - 1] if (!_param || !topicsSet.has(cfg.topic)) { _param = this.buildQueryParams(cfg, toBlock) uninParams.push(_param) topicsSet = new Set() topicsSet.add(cfg.topic) } else { if (!topicsSet.has(cfg.topic)) { _param.params[0].topics.push(cfg.topic) topicsSet.add(cfg.topic) } _param.params[0].address.push(cfg.address) } } } let nextBlock = this.fromBlock if (uninParams.length > 1) { logger.debug(`unin params: ${uninParams.length}`) let results = await batchEthLogs(this.rpc, uninParams) for (let i = 0; i < results.length; i++) { if (results[i].error) { throw results[i].error } let events = results[i].result await this.processEvents(events) } } else if (uninParams.length == 1) { logger.debug(`unin params length 1, use eth_getLogs`) let result = await ethGetLogs(this.rpc, uninParams[0].params) if (result.error) { throw result.error } await this.processEvents(result.result) } if (params.length > 0) { logger.debug(`params: ${params.length}`) let results = await batchEthLogs(this.rpc, params) for (let i = 0; i < results.length; i++) { if (results[i].error) { throw results[i].error } let events = results[i].result await this.processEvents(events) } } nextBlock = toBlock + 1 for (let cfg of this.eventCfgs) { cfg.fromBlock = nextBlock const redisKey = this.buildRedisKey(cfg) await new RedisClient().set(redisKey, cfg.fromBlock + '') } await new RedisClient().set(this.redisKey, nextBlock + '') } buildRedisKey(cfg: IEventCfg) { return `event_${this.chainCfg.id}_${cfg.address}_${cfg.event}` } async fixBlockNumber(cfg: IEventCfg) { const redisKey = this.buildRedisKey(cfg) let blockStr = await new RedisClient().get(redisKey) if (blockStr) { cfg.fromBlock = Math.max(parseInt(blockStr), cfg.fromBlock) } } buildQueryParams(cfg: IEventCfg, toBlock?: number) { const params: any = { fromBlock: '0x' + cfg.fromBlock.toString(16), toBlock: toBlock ? '0x' + toBlock?.toString(16) : 'latest', address: [cfg.address], } params.topics = [cfg.topic] const result = { jsonrpc: '2.0', method: 'eth_getLogs', params: [params], id: `${cfg.address}_${cfg.event}`, } return result } async processEvents(events: any[]) { let blockNumber = 0 if (events.length === 0) { return blockNumber } logger.info(`process events: ${events.length}`) for (const event of events) { const address = events[0].address const topic = events[0].topics[0] const cfg = this.processer.get((address + topic).toLowerCase()) if (!cfg) { continue } event.chain = this.chainCfg.id + '' event.event = cfg.event let result = decodeEvent(cfg, event) // cfg.fromBlock = Math.max (parseInt(event.blockNumber, 16) + 1, cfg.fromBlock) event.decodedData = result const record = await GeneralEvent.saveEvent(event) blockNumber = Math.max(parseInt(event.blockNumber, 16), cfg.fromBlock) if (cfg.eventProcesser && eventProcessers[cfg.eventProcesser]) { try { await eventProcessers[cfg.eventProcesser].parseEvent(record) } catch (err) { logger.error('process event error: ', err.message || err) } } } return blockNumber } }