optimiz event catcher
This commit is contained in:
parent
0c33c9abaa
commit
981067421e
@ -277,7 +277,7 @@ export const AllChains: IChain[] = [
|
|||||||
{
|
{
|
||||||
name: 'Arbitrum Sepolia',
|
name: 'Arbitrum Sepolia',
|
||||||
type: 'Testnet',
|
type: 'Testnet',
|
||||||
rpc: 'https://arb-sepolia.g.alchemy.com/v2/EKR1je8ZGia332kkemNc4mtXQuFskIq3',
|
rpc: 'https://arb-sepolia.g.alchemy.com/v2/mHoYM0SyjeizxvdjShcdOHiCrXOM_mlg|https://arb-sepolia.g.alchemy.com/v2/EKR1je8ZGia332kkemNc4mtXQuFskIq3',
|
||||||
id: 421614,
|
id: 421614,
|
||||||
network: 'ARB_SEPOLIA',
|
network: 'ARB_SEPOLIA',
|
||||||
symbol: 'ETH',
|
symbol: 'ETH',
|
||||||
|
@ -81,16 +81,6 @@ export const batchEthBlocks = async (rpc: string, blockNumbers: number[]) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const batchEthLogs = async (rpc: string, params: any) => {
|
export const batchEthLogs = async (rpc: string, params: any) => {
|
||||||
// let batch = []
|
|
||||||
// for (let i = 0; i < params.length; i++) {
|
|
||||||
// batch.push({
|
|
||||||
// jsonrpc: "2.0",
|
|
||||||
// method: "eth_getLogs",
|
|
||||||
// params: [params[i]],
|
|
||||||
// id: ids[i]
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
|
|
||||||
return fetch(rpc, {
|
return fetch(rpc, {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
headers: {
|
headers: {
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import { IChain } from "chain/allchain";
|
import { IChain } from "chain/allchain";
|
||||||
import { batchEthLogs, ethBlockNumber } from "chain/chain.api";
|
import { batchEthLogs, ethGetLogs } from "chain/chain.api";
|
||||||
import { IEventCfg } from "config/events_cfg";
|
import { IEventCfg } from "config/events_cfg";
|
||||||
import logger from "logger/logger";
|
import logger from "logger/logger";
|
||||||
import { GeneralEvent } from "models/GeneralEvent";
|
import { GeneralEvent } from "models/GeneralEvent";
|
||||||
@ -7,6 +7,7 @@ import { GeneralEvent } from "models/GeneralEvent";
|
|||||||
import { RedisClient } from "redis/RedisClient";
|
import { RedisClient } from "redis/RedisClient";
|
||||||
import { decodeEvent, getTopics } from "utils/event.util";
|
import { decodeEvent, getTopics } from "utils/event.util";
|
||||||
|
|
||||||
|
const MAX_TOPICS = 4
|
||||||
|
|
||||||
export class EventBatchSvr {
|
export class EventBatchSvr {
|
||||||
chainCfg: IChain
|
chainCfg: IChain
|
||||||
@ -24,29 +25,85 @@ export class EventBatchSvr {
|
|||||||
if (!cfg.topic) {
|
if (!cfg.topic) {
|
||||||
cfg.topic = getTopics(cfg)
|
cfg.topic = getTopics(cfg)
|
||||||
}
|
}
|
||||||
this.processer.set(cfg.address+cfg.topic, cfg)
|
this.processer.set((cfg.address+cfg.topic).toLowerCase(), cfg)
|
||||||
}
|
}
|
||||||
this.redisKey = `event_${this.chainCfg.id}`
|
this.redisKey = `event_${this.chainCfg.id}`
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute() {
|
async execute() {
|
||||||
logger.info(`begin sync events with chain: ${this.chainCfg.id}`)
|
|
||||||
try {
|
try {
|
||||||
// let currentBlock = await ethBlockNumber(this.rpc)
|
// let currentBlock = await ethBlockNumber(this.rpc)
|
||||||
|
let blockStr = await new RedisClient().get(this.redisKey)
|
||||||
|
if (blockStr) {
|
||||||
|
this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock)
|
||||||
|
}
|
||||||
|
logger.info(`begin sync events with chain: ${this.chainCfg.id}, from block: ${this.fromBlock}`)
|
||||||
let params = []
|
let params = []
|
||||||
|
let uninParams = []
|
||||||
|
let topicsSet = new Set()
|
||||||
for (let cfg of this.eventCfgs) {
|
for (let cfg of this.eventCfgs) {
|
||||||
let param = await this.buildQueryParams(cfg)
|
await this.fixBlockNumber(cfg)
|
||||||
params.push(param)
|
if (cfg.fromBlock != this.fromBlock) {
|
||||||
}
|
let _param = this.buildQueryParams(cfg)
|
||||||
let results = await batchEthLogs(this.rpc, params)
|
params.push(_param)
|
||||||
for (let i = 0; i < results.length; i++) {
|
} else {
|
||||||
if (results[i].error) {
|
let _param = uninParams[uninParams.length - 1]
|
||||||
console.log(results[i].error)
|
if (!_param || topicsSet.size > MAX_TOPICS) {
|
||||||
continue
|
_param = this.buildQueryParams(cfg)
|
||||||
|
uninParams.push(_param)
|
||||||
|
topicsSet = new Set()
|
||||||
|
} else {
|
||||||
|
let topic = getTopics(cfg)
|
||||||
|
if (!topicsSet.has(topic)) {
|
||||||
|
_param.params[0].topics.push(topic)
|
||||||
|
topicsSet.add(topic)
|
||||||
|
}
|
||||||
|
_param.params[0].address.push(cfg.address)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let events = results[i].result
|
|
||||||
await this.processEvents(events)
|
|
||||||
}
|
}
|
||||||
|
let nextBlock = this.fromBlock
|
||||||
|
if (uninParams.length > 1) {
|
||||||
|
logger.debug(`unin params: ${uninParams.length}`)
|
||||||
|
let results = await batchEthLogs(this.rpc, uninParams)
|
||||||
|
for (let i = 0; i < results.length; i++) {
|
||||||
|
if (results[i].error) {
|
||||||
|
throw results[i].error
|
||||||
|
}
|
||||||
|
let events = results[i].result
|
||||||
|
let num = await this.processEvents(events)
|
||||||
|
nextBlock = Math.max(num, nextBlock)
|
||||||
|
}
|
||||||
|
} else if (uninParams.length == 1) {
|
||||||
|
logger.debug(`unin params length 1, use eth_getLogs`)
|
||||||
|
let result = await ethGetLogs(this.rpc, uninParams[0])
|
||||||
|
if (result.error) {
|
||||||
|
throw result.error
|
||||||
|
}
|
||||||
|
let num = await this.processEvents(result.result)
|
||||||
|
nextBlock = Math.max(num, nextBlock)
|
||||||
|
}
|
||||||
|
if (params.length > 0) {
|
||||||
|
logger.debug(`params: ${params.length}`)
|
||||||
|
let results = await batchEthLogs(this.rpc, params)
|
||||||
|
for (let i = 0; i < results.length; i++) {
|
||||||
|
if (results[i].error) {
|
||||||
|
throw results[i].error
|
||||||
|
}
|
||||||
|
let events = results[i].result
|
||||||
|
let num = await this.processEvents(events)
|
||||||
|
nextBlock = Math.max(num, nextBlock)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nextBlock ++;
|
||||||
|
for (let cfg of this.eventCfgs) {
|
||||||
|
cfg.fromBlock = nextBlock
|
||||||
|
const redisKey = this.buildRedisKey(cfg)
|
||||||
|
await new RedisClient().set(redisKey, cfg.fromBlock + '')
|
||||||
|
}
|
||||||
|
await new RedisClient().set(this.redisKey, nextBlock + '')
|
||||||
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.log(err)
|
console.log(err)
|
||||||
}
|
}
|
||||||
@ -56,19 +113,20 @@ export class EventBatchSvr {
|
|||||||
return `event_${this.chainCfg.id}_${cfg.address}_${cfg.event}`
|
return `event_${this.chainCfg.id}_${cfg.address}_${cfg.event}`
|
||||||
}
|
}
|
||||||
|
|
||||||
async buildQueryParams(cfg: IEventCfg, toBlock?: string) {
|
async fixBlockNumber(cfg: IEventCfg) {
|
||||||
const redisKey = this.buildRedisKey(cfg)
|
const redisKey = this.buildRedisKey(cfg)
|
||||||
|
|
||||||
const params: any = {
|
|
||||||
fromBlock: cfg.fromBlock,
|
|
||||||
toBlock: toBlock || 'latest',
|
|
||||||
address: cfg.address
|
|
||||||
}
|
|
||||||
let blockStr = await new RedisClient().get(redisKey)
|
let blockStr = await new RedisClient().get(redisKey)
|
||||||
if (blockStr) {
|
if (blockStr) {
|
||||||
params.fromBlock = Math.max(parseInt(blockStr), cfg.fromBlock)
|
cfg.fromBlock = Math.max(parseInt(blockStr), cfg.fromBlock)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buildQueryParams(cfg: IEventCfg) {
|
||||||
|
const params: any = {
|
||||||
|
fromBlock: '0x' + cfg.fromBlock.toString(16),
|
||||||
|
toBlock: 'latest',
|
||||||
|
address: [cfg.address]
|
||||||
}
|
}
|
||||||
params.fromBlock = '0x' + params.fromBlock.toString(16)
|
|
||||||
params.topics = [getTopics(cfg)]
|
params.topics = [getTopics(cfg)]
|
||||||
const result = {
|
const result = {
|
||||||
jsonrpc: "2.0",
|
jsonrpc: "2.0",
|
||||||
@ -80,27 +138,30 @@ export class EventBatchSvr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async processEvents(events: any[]) {
|
async processEvents(events: any[]) {
|
||||||
|
let blockNumber = 0
|
||||||
if (events.length === 0) {
|
if (events.length === 0) {
|
||||||
return
|
return blockNumber
|
||||||
}
|
}
|
||||||
const address = events[0].address
|
logger.info(`process events: ${events.length}`)
|
||||||
const topic = events[0].topics[0]
|
|
||||||
const cfg = this.processer.get(address+topic)
|
|
||||||
logger.info(`process events: ${cfg.chain} | ${address} | ${cfg.event} | ${events.length}`)
|
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
|
const address = events[0].address
|
||||||
|
const topic = events[0].topics[0]
|
||||||
|
const cfg = this.processer.get((address+topic).toLowerCase())
|
||||||
|
if (!cfg) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
event.chain = this.chainCfg.id + ''
|
event.chain = this.chainCfg.id + ''
|
||||||
event.event = cfg.event
|
event.event = cfg.event
|
||||||
let result = decodeEvent(cfg, event);
|
let result = decodeEvent(cfg, event);
|
||||||
cfg.fromBlock = Math.max (parseInt(event.blockNumber, 16) + 1, cfg.fromBlock)
|
// cfg.fromBlock = Math.max (parseInt(event.blockNumber, 16) + 1, cfg.fromBlock)
|
||||||
event.decodedData = result
|
event.decodedData = result
|
||||||
const record = await GeneralEvent.saveEvent(event)
|
const record = await GeneralEvent.saveEvent(event)
|
||||||
|
blockNumber = Math.max(parseInt(event.blockNumber, 16), cfg.fromBlock)
|
||||||
if (cfg.eventProcesser) {
|
if (cfg.eventProcesser) {
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
await cfg.eventProcesser.parseEvent(record)
|
await cfg.eventProcesser.parseEvent(record)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const redisKey = this.buildRedisKey(cfg)
|
return blockNumber
|
||||||
await new RedisClient().set(redisKey, cfg.fromBlock + '')
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user