针对正式网的事件查询接口块高限制, 优化查询测率

This commit is contained in:
CounterFire2023 2024-01-22 15:53:47 +08:00
parent 69adf77a93
commit f542d62057
8 changed files with 124 additions and 25 deletions

View File

@ -0,0 +1,42 @@
[
{
"chain": 42161,
"address": "0x3F13F83E6363D97d0353cAAfACA08B05D9BF3637",
"event": "Transfer",
"abi": "ERC721_Transfer",
"fromBlock": 104992059,
"eventProcesser": "NftHolder"
},
{
"chain": 42161,
"address": "0x79fc2a4216A1e595DBD09D13c4B4bD3B095d5bb2",
"event": "Transfer",
"abi": "ERC721_Transfer",
"fromBlock": 110028781,
"eventProcesser": "NftHolder"
},
{
"chain": 42161,
"address": "0xD728de3d9ebeD90E84aBe84539280cbC5b18E304",
"event": "Transfer",
"abi": "ERC721_Transfer",
"fromBlock": 110369163,
"eventProcesser": "NftHolder"
},
{
"chain": 42161,
"address": "0xefD4c863E73e7E9Cc33d46fB30CE51510FCFdeb0",
"event": "Transfer",
"abi": "ERC721_Transfer",
"fromBlock": 116977151,
"eventProcesser": "NftHolder"
},
{
"chain": 42161,
"address": "0x0cee888fA25810ca648D697099bc17a2c9E1dfBF",
"event": "Transfer",
"abi": "ERC721_Transfer",
"fromBlock": 148632616,
"eventProcesser": "NftHolder"
}
]

View File

@ -15,7 +15,7 @@
"prod:monitor": "NODE_PATH=./dist node dist/monitor.js",
"dev:scription": "ts-node -r tsconfig-paths/register src/scriptions.ts",
"prod:scription": "TZ='SG' NODE_PATH=./dist node dist/scriptions.js",
"dev:event": "ts-node -r tsconfig-paths/register src/events.ts",
"dev:event": "NODE_ENV=production ts-node -r tsconfig-paths/register src/events.ts",
"prod:event": "TZ='SG' NODE_PATH=./dist node dist/events.js"
},
"author": "z",

View File

@ -6,6 +6,7 @@ export interface IChain {
symbol: string
network?: string
explorerurl: string
maxEvents?: number
}
export const AllChains: IChain[] = [
{
@ -219,9 +220,10 @@ export const AllChains: IChain[] = [
{
name: 'Arbitrum One',
type: 'Mainnet',
rpc: 'https://arb-mainnet.g.alchemy.com/v2/2wVx68PmeMUCVgcMc9H-bKcnLDFBYlFS',
rpc: 'https://arb-mainnet.g.alchemy.com/v2/XdKzWdaghDdUzC-XT-sVzCzjH9EHOHpV',
id: 42161,
network: 'ARBITRUM',
maxEvents: 2000,
symbol: 'ETH',
explorerurl: 'https://arbiscan.io/',
},

View File

@ -3,7 +3,11 @@ import logger from 'logger/logger'
const envFile = process.env.NODE_ENV && process.env.NODE_ENV === 'production' ? `.env.production` : '.env.development'
dotenv.config({ path: envFile })
const events = require('../config/event_list.json')
const listFile =
process.env.NODE_ENV && process.env.NODE_ENV === 'production'
? `../config/event_list_production.json`
: '../config/event_list.json'
const events = require(listFile)
import 'common/Extend'
import { AllChains, IChain } from 'chain/allchain'

View File

@ -3,6 +3,7 @@ export interface IEventCfg {
event: string
abi: any
fromBlock: number
toBlock?: number
eventProcesser?: string
chain: number
topic?: string

View File

@ -2,7 +2,7 @@ import { getModelForClass, index, modelOptions, prop } from '@typegoose/typegoos
import { dbconn } from 'decorators/dbconn'
import { BaseModule } from './Base'
import { formatDate, yesterday } from 'zutils/utils/date.util'
import { logger } from '@typegoose/typegoose/lib/logSettings'
import logger from 'logger/logger'
@dbconn()
@index({ from: 1 }, { unique: false })

View File

@ -17,6 +17,7 @@ let eventProcessers = {
NftStake: NftStake,
}
const INTERVAL = 100
export class EventBatchSvr {
chainCfg: IChain
eventCfgs: IEventCfg[] = []
@ -39,6 +40,37 @@ export class EventBatchSvr {
this.redisKey = `event_${this.chainCfg.id}`
}
async divQueryPassEvents(cfg: IEventCfg, fromBlock: number, toBlock: number) {
const middle = ((fromBlock + toBlock) / 2) | 0
const middlePlusOne = middle + 1
const firstHalfEvents = await this.getPastEvents(cfg, fromBlock, middle)
const secondHalfEvents = await this.getPastEvents(cfg, middlePlusOne, toBlock)
return [...firstHalfEvents, ...secondHalfEvents]
}
async getPastEvents(cfg: IEventCfg, fromBlock: number, toBlock: number) {
let events = []
logger.info(`get past events: ${cfg.address}, ${cfg.event}, ${fromBlock}, ${toBlock}`)
try {
const _param = this.buildQueryParams(cfg, fromBlock, toBlock)
const result = await ethGetLogs(this.rpc, _param.params)
if (result.error) {
if (result.error.message && /Log response size exceeded/.test(result.error.message)) {
events = await this.divQueryPassEvents(cfg, fromBlock, toBlock)
} else {
throw result.error
}
logger.info('fetch history error: ', result.error.message || result.error)
} else {
events = result.result
}
} catch (e) {
if (e.message && /Log response size exceeded/.test(e.message)) {
events = await this.divQueryPassEvents(cfg, fromBlock, toBlock)
}
}
return events
}
async execute() {
let currentBlock = await retryEthBlockNumber(this.rpc)
let toBlock = parseInt(currentBlock.result, 16)
@ -47,18 +79,14 @@ export class EventBatchSvr {
this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock)
}
logger.info(`sync events with chain: ${this.chainCfg.id}, from: ${this.fromBlock}, to: ${toBlock}`)
let params = []
let uninParams = []
let topicsSet = new Set()
for (let cfg of this.eventCfgs) {
await this.fixBlockNumber(cfg)
if (cfg.fromBlock != this.fromBlock) {
let _param = this.buildQueryParams(cfg, toBlock)
params.push(_param)
} else {
await this.fixBlockNumber(cfg, this.chainCfg, toBlock)
if (cfg.fromBlock === this.fromBlock && cfg.toBlock === toBlock) {
let _param = uninParams[uninParams.length - 1]
if (!_param || !topicsSet.has(cfg.topic)) {
_param = this.buildQueryParams(cfg, toBlock)
_param = this.buildQueryParams(cfg)
uninParams.push(_param)
topicsSet = new Set()
topicsSet.add(cfg.topic)
@ -69,6 +97,9 @@ export class EventBatchSvr {
}
_param.params[0].address.push(cfg.address)
}
} else {
const historyEvents = await this.getPastEvents(cfg, cfg.fromBlock, toBlock)
await this.processEvents(historyEvents)
}
}
let nextBlock = this.fromBlock
@ -90,20 +121,10 @@ export class EventBatchSvr {
}
await this.processEvents(result.result)
}
if (params.length > 0) {
logger.debug(`params: ${params.length}`)
let results = await batchEthLogs(this.rpc, params)
for (let i = 0; i < results.length; i++) {
if (results[i].error) {
throw results[i].error
}
let events = results[i].result
await this.processEvents(events)
}
}
nextBlock = toBlock + 1
for (let cfg of this.eventCfgs) {
cfg.fromBlock = nextBlock
cfg.toBlock = 0
const redisKey = this.buildRedisKey(cfg)
await new ZRedisClient().set(redisKey, cfg.fromBlock + '')
}
@ -114,17 +135,24 @@ export class EventBatchSvr {
return `event_${this.chainCfg.id}_${cfg.address}_${cfg.event}`
}
async fixBlockNumber(cfg: IEventCfg) {
async fixBlockNumber(cfg: IEventCfg, chainCfg: IChain, toBlock: number) {
const redisKey = this.buildRedisKey(cfg)
let blockStr = await new ZRedisClient().get(redisKey)
if (blockStr) {
cfg.fromBlock = Math.max(parseInt(blockStr), cfg.fromBlock)
}
if (chainCfg.maxEvents) {
cfg.toBlock = Math.min(cfg.fromBlock + chainCfg.maxEvents, toBlock)
} else {
cfg.toBlock = toBlock
}
}
buildQueryParams(cfg: IEventCfg, toBlock?: number) {
buildQueryParams(cfg: IEventCfg, fromBlock?: number, toBlock?: number) {
fromBlock = fromBlock || cfg.fromBlock
toBlock = toBlock || cfg.toBlock
const params: any = {
fromBlock: '0x' + cfg.fromBlock.toString(16),
fromBlock: '0x' + fromBlock.toString(16),
toBlock: toBlock ? '0x' + toBlock?.toString(16) : 'latest',
address: [cfg.address],
}

22
start_event_release.json Normal file
View File

@ -0,0 +1,22 @@
{
"apps": [
{
"name": "chain-event",
"script": "npm",
"args": "run prod:event",
"cwd": "/home/kingsome/code/web_chain_client",
"max_memory_restart": "1024M",
"log_date_format": "YYYY-MM-DD HH:mm Z",
"watch": false,
"ignore_watch": ["node_modules", "logs", "fixtures", "tasks"],
"instances": 1,
"exec_mode": "fork",
"env": {
"NODE_ENV": "production"
},
"env_production": {
"NODE_ENV": "production"
}
}
]
}