diff --git a/src/chain/allchain.ts b/src/chain/allchain.ts index ae1edb2..9cab85f 100644 --- a/src/chain/allchain.ts +++ b/src/chain/allchain.ts @@ -277,7 +277,7 @@ export const AllChains: IChain[] = [ { name: 'Arbitrum Sepolia', type: 'Testnet', - rpc: 'https://arb-sepolia.g.alchemy.com/v2/EKR1je8ZGia332kkemNc4mtXQuFskIq3', + rpc: 'https://arb-sepolia.g.alchemy.com/v2/mHoYM0SyjeizxvdjShcdOHiCrXOM_mlg|https://arb-sepolia.g.alchemy.com/v2/EKR1je8ZGia332kkemNc4mtXQuFskIq3', id: 421614, network: 'ARB_SEPOLIA', symbol: 'ETH', diff --git a/src/chain/chain.api.ts b/src/chain/chain.api.ts index 9327b1a..6834fe1 100644 --- a/src/chain/chain.api.ts +++ b/src/chain/chain.api.ts @@ -81,16 +81,6 @@ export const batchEthBlocks = async (rpc: string, blockNumbers: number[]) => { } export const batchEthLogs = async (rpc: string, params: any) => { - // let batch = [] - // for (let i = 0; i < params.length; i++) { - // batch.push({ - // jsonrpc: "2.0", - // method: "eth_getLogs", - // params: [params[i]], - // id: ids[i] - // }) - // } - return fetch(rpc, { method: "POST", headers: { diff --git a/src/service/event.batch.service.ts b/src/service/event.batch.service.ts index 11d1bd6..83ae910 100644 --- a/src/service/event.batch.service.ts +++ b/src/service/event.batch.service.ts @@ -1,5 +1,5 @@ import { IChain } from "chain/allchain"; -import { batchEthLogs, ethBlockNumber } from "chain/chain.api"; +import { batchEthLogs, ethGetLogs } from "chain/chain.api"; import { IEventCfg } from "config/events_cfg"; import logger from "logger/logger"; import { GeneralEvent } from "models/GeneralEvent"; @@ -7,6 +7,7 @@ import { GeneralEvent } from "models/GeneralEvent"; import { RedisClient } from "redis/RedisClient"; import { decodeEvent, getTopics } from "utils/event.util"; +const MAX_TOPICS = 4 export class EventBatchSvr { chainCfg: IChain @@ -24,29 +25,85 @@ export class EventBatchSvr { if (!cfg.topic) { cfg.topic = getTopics(cfg) } - this.processer.set(cfg.address+cfg.topic, cfg) + this.processer.set((cfg.address+cfg.topic).toLowerCase(), cfg) } this.redisKey = `event_${this.chainCfg.id}` } async execute() { - logger.info(`begin sync events with chain: ${this.chainCfg.id}`) + try { // let currentBlock = await ethBlockNumber(this.rpc) + let blockStr = await new RedisClient().get(this.redisKey) + if (blockStr) { + this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock) + } + logger.info(`begin sync events with chain: ${this.chainCfg.id}, from block: ${this.fromBlock}`) let params = [] + let uninParams = [] + let topicsSet = new Set() for (let cfg of this.eventCfgs) { - let param = await this.buildQueryParams(cfg) - params.push(param) - } - let results = await batchEthLogs(this.rpc, params) - for (let i = 0; i < results.length; i++) { - if (results[i].error) { - console.log(results[i].error) - continue + await this.fixBlockNumber(cfg) + if (cfg.fromBlock != this.fromBlock) { + let _param = this.buildQueryParams(cfg) + params.push(_param) + } else { + let _param = uninParams[uninParams.length - 1] + if (!_param || topicsSet.size > MAX_TOPICS) { + _param = this.buildQueryParams(cfg) + uninParams.push(_param) + topicsSet = new Set() + } else { + let topic = getTopics(cfg) + if (!topicsSet.has(topic)) { + _param.params[0].topics.push(topic) + topicsSet.add(topic) + } + _param.params[0].address.push(cfg.address) + } } - let events = results[i].result - await this.processEvents(events) } + let nextBlock = this.fromBlock + if (uninParams.length > 1) { + logger.debug(`unin params: ${uninParams.length}`) + let results = await batchEthLogs(this.rpc, uninParams) + for (let i = 0; i < results.length; i++) { + if (results[i].error) { + throw results[i].error + } + let events = results[i].result + let num = await this.processEvents(events) + nextBlock = Math.max(num, nextBlock) + } + } else if (uninParams.length == 1) { + logger.debug(`unin params length 1, use eth_getLogs`) + let result = await ethGetLogs(this.rpc, uninParams[0]) + if (result.error) { + throw result.error + } + let num = await this.processEvents(result.result) + nextBlock = Math.max(num, nextBlock) + } + if (params.length > 0) { + logger.debug(`params: ${params.length}`) + let results = await batchEthLogs(this.rpc, params) + for (let i = 0; i < results.length; i++) { + if (results[i].error) { + throw results[i].error + } + let events = results[i].result + let num = await this.processEvents(events) + nextBlock = Math.max(num, nextBlock) + } + } + nextBlock ++; + for (let cfg of this.eventCfgs) { + cfg.fromBlock = nextBlock + const redisKey = this.buildRedisKey(cfg) + await new RedisClient().set(redisKey, cfg.fromBlock + '') + } + await new RedisClient().set(this.redisKey, nextBlock + '') + } catch (err) { console.log(err) } @@ -56,19 +113,20 @@ export class EventBatchSvr { return `event_${this.chainCfg.id}_${cfg.address}_${cfg.event}` } - async buildQueryParams(cfg: IEventCfg, toBlock?: string) { + async fixBlockNumber(cfg: IEventCfg) { const redisKey = this.buildRedisKey(cfg) - - const params: any = { - fromBlock: cfg.fromBlock, - toBlock: toBlock || 'latest', - address: cfg.address - } let blockStr = await new RedisClient().get(redisKey) if (blockStr) { - params.fromBlock = Math.max(parseInt(blockStr), cfg.fromBlock) + cfg.fromBlock = Math.max(parseInt(blockStr), cfg.fromBlock) + } + } + + buildQueryParams(cfg: IEventCfg) { + const params: any = { + fromBlock: '0x' + cfg.fromBlock.toString(16), + toBlock: 'latest', + address: [cfg.address] } - params.fromBlock = '0x' + params.fromBlock.toString(16) params.topics = [getTopics(cfg)] const result = { jsonrpc: "2.0", @@ -80,27 +138,30 @@ export class EventBatchSvr { } async processEvents(events: any[]) { + let blockNumber = 0 if (events.length === 0) { - return + return blockNumber } - const address = events[0].address - const topic = events[0].topics[0] - const cfg = this.processer.get(address+topic) - logger.info(`process events: ${cfg.chain} | ${address} | ${cfg.event} | ${events.length}`) - + logger.info(`process events: ${events.length}`) for (const event of events) { + const address = events[0].address + const topic = events[0].topics[0] + const cfg = this.processer.get((address+topic).toLowerCase()) + if (!cfg) { + continue; + } event.chain = this.chainCfg.id + '' event.event = cfg.event let result = decodeEvent(cfg, event); - cfg.fromBlock = Math.max (parseInt(event.blockNumber, 16) + 1, cfg.fromBlock) + // cfg.fromBlock = Math.max (parseInt(event.blockNumber, 16) + 1, cfg.fromBlock) event.decodedData = result const record = await GeneralEvent.saveEvent(event) + blockNumber = Math.max(parseInt(event.blockNumber, 16), cfg.fromBlock) if (cfg.eventProcesser) { // @ts-ignore await cfg.eventProcesser.parseEvent(record) - } + } } - const redisKey = this.buildRedisKey(cfg) - await new RedisClient().set(redisKey, cfg.fromBlock + '') + return blockNumber } } \ No newline at end of file