site-activity-chain/src/service/event.batch.service.ts
2024-01-12 17:12:50 +08:00

167 lines
5.4 KiB
TypeScript

import { IChain } from "chain/allchain";
import { batchEthLogs, ethGetLogs } from "chain/chain.api";
import { IEventCfg } from "config/events_cfg";
import logger from "logger/logger";
import { GeneralEvent } from "models/GeneralEvent";
import { RedisClient } from "redis/RedisClient";
import { decodeEvent, getTopics } from "utils/event.util";
const MAX_TOPICS = 4
export class EventBatchSvr {
chainCfg: IChain
eventCfgs: IEventCfg[] = []
processer: Map<string, IEventCfg> = new Map()
fromBlock: number = Number.MAX_SAFE_INTEGER
redisKey = ''
rpc = '';
constructor(_chainCfg: IChain, _eventCfgs: IEventCfg[]) {
this.chainCfg =_chainCfg
this.eventCfgs = _eventCfgs
this.rpc = _chainCfg.rpc.split('|')[0]
for (let cfg of this.eventCfgs) {
this.fromBlock = Math.min(this.fromBlock, cfg.fromBlock)
if (!cfg.topic) {
cfg.topic = getTopics(cfg)
}
this.processer.set((cfg.address+cfg.topic).toLowerCase(), cfg)
}
this.eventCfgs.sort((a, b) => a.topic.localeCompare(b.topic))
this.redisKey = `event_${this.chainCfg.id}`
}
async execute() {
try {
// let currentBlock = await ethBlockNumber(this.rpc)
let blockStr = await new RedisClient().get(this.redisKey)
if (blockStr) {
this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock)
}
logger.info(`begin sync events with chain: ${this.chainCfg.id}, from block: ${this.fromBlock}`)
let params = []
let uninParams = []
let topicsSet = new Set()
for (let cfg of this.eventCfgs) {
await this.fixBlockNumber(cfg)
if (cfg.fromBlock != this.fromBlock) {
let _param = this.buildQueryParams(cfg)
params.push(_param)
} else {
let _param = uninParams[uninParams.length - 1]
if (!_param || topicsSet.size > MAX_TOPICS) {
_param = this.buildQueryParams(cfg)
uninParams.push(_param)
topicsSet = new Set()
} else {
if (!topicsSet.has(cfg.topic)) {
_param.params[0].topics.push(cfg.topic)
topicsSet.add(cfg.topic)
}
_param.params[0].address.push(cfg.address)
}
}
}
let nextBlock = this.fromBlock
if (uninParams.length > 1) {
logger.debug(`unin params: ${uninParams.length}`)
let results = await batchEthLogs(this.rpc, uninParams)
for (let i = 0; i < results.length; i++) {
if (results[i].error) {
throw results[i].error
}
let events = results[i].result
let num = await this.processEvents(events)
nextBlock = Math.max(num, nextBlock)
}
} else if (uninParams.length == 1) {
logger.debug(`unin params length 1, use eth_getLogs`)
let result = await ethGetLogs(this.rpc, uninParams[0])
if (result.error) {
throw result.error
}
let num = await this.processEvents(result.result)
nextBlock = Math.max(num, nextBlock)
}
if (params.length > 0) {
logger.debug(`params: ${params.length}`)
let results = await batchEthLogs(this.rpc, params)
for (let i = 0; i < results.length; i++) {
if (results[i].error) {
throw results[i].error
}
let events = results[i].result
let num = await this.processEvents(events)
nextBlock = Math.max(num, nextBlock)
}
}
nextBlock ++;
for (let cfg of this.eventCfgs) {
cfg.fromBlock = nextBlock
const redisKey = this.buildRedisKey(cfg)
await new RedisClient().set(redisKey, cfg.fromBlock + '')
}
await new RedisClient().set(this.redisKey, nextBlock + '')
} catch (err) {
console.log(err)
}
}
buildRedisKey(cfg: IEventCfg) {
return `event_${this.chainCfg.id}_${cfg.address}_${cfg.event}`
}
async fixBlockNumber(cfg: IEventCfg) {
const redisKey = this.buildRedisKey(cfg)
let blockStr = await new RedisClient().get(redisKey)
if (blockStr) {
cfg.fromBlock = Math.max(parseInt(blockStr), cfg.fromBlock)
}
}
buildQueryParams(cfg: IEventCfg) {
const params: any = {
fromBlock: '0x' + cfg.fromBlock.toString(16),
toBlock: 'latest',
address: [cfg.address]
}
params.topics = [cfg.topic]
const result = {
jsonrpc: "2.0",
method: "eth_getLogs",
params: [params],
id: `${cfg.address}_${cfg.event}`
}
return result
}
async processEvents(events: any[]) {
let blockNumber = 0
if (events.length === 0) {
return blockNumber
}
logger.info(`process events: ${events.length}`)
for (const event of events) {
const address = events[0].address
const topic = events[0].topics[0]
const cfg = this.processer.get((address+topic).toLowerCase())
if (!cfg) {
continue;
}
event.chain = this.chainCfg.id + ''
event.event = cfg.event
let result = decodeEvent(cfg, event);
// cfg.fromBlock = Math.max (parseInt(event.blockNumber, 16) + 1, cfg.fromBlock)
event.decodedData = result
const record = await GeneralEvent.saveEvent(event)
blockNumber = Math.max(parseInt(event.blockNumber, 16), cfg.fromBlock)
if (cfg.eventProcesser) {
// @ts-ignore
await cfg.eventProcesser.parseEvent(record)
}
}
return blockNumber
}
}