diff --git a/config/event_list_production.json b/config/event_list_production.json new file mode 100644 index 0000000..92dc6cb --- /dev/null +++ b/config/event_list_production.json @@ -0,0 +1,42 @@ +[ + { + "chain": 42161, + "address": "0x3F13F83E6363D97d0353cAAfACA08B05D9BF3637", + "event": "Transfer", + "abi": "ERC721_Transfer", + "fromBlock": 104992059, + "eventProcesser": "NftHolder" + }, + { + "chain": 42161, + "address": "0x79fc2a4216A1e595DBD09D13c4B4bD3B095d5bb2", + "event": "Transfer", + "abi": "ERC721_Transfer", + "fromBlock": 110028781, + "eventProcesser": "NftHolder" + }, + { + "chain": 42161, + "address": "0xD728de3d9ebeD90E84aBe84539280cbC5b18E304", + "event": "Transfer", + "abi": "ERC721_Transfer", + "fromBlock": 110369163, + "eventProcesser": "NftHolder" + }, + { + "chain": 42161, + "address": "0xefD4c863E73e7E9Cc33d46fB30CE51510FCFdeb0", + "event": "Transfer", + "abi": "ERC721_Transfer", + "fromBlock": 116977151, + "eventProcesser": "NftHolder" + }, + { + "chain": 42161, + "address": "0x0cee888fA25810ca648D697099bc17a2c9E1dfBF", + "event": "Transfer", + "abi": "ERC721_Transfer", + "fromBlock": 148632616, + "eventProcesser": "NftHolder" + } +] \ No newline at end of file diff --git a/package.json b/package.json index 2f29b02..5d7d0ff 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "prod:monitor": "NODE_PATH=./dist node dist/monitor.js", "dev:scription": "ts-node -r tsconfig-paths/register src/scriptions.ts", "prod:scription": "TZ='SG' NODE_PATH=./dist node dist/scriptions.js", - "dev:event": "ts-node -r tsconfig-paths/register src/events.ts", + "dev:event": "NODE_ENV=production ts-node -r tsconfig-paths/register src/events.ts", "prod:event": "TZ='SG' NODE_PATH=./dist node dist/events.js" }, "author": "z", diff --git a/src/chain/allchain.ts b/src/chain/allchain.ts index 9cab85f..b14d02f 100644 --- a/src/chain/allchain.ts +++ b/src/chain/allchain.ts @@ -6,6 +6,7 @@ export interface IChain { symbol: string network?: string explorerurl: string + maxEvents?: number } export const AllChains: IChain[] = [ { @@ -219,9 +220,10 @@ export const AllChains: IChain[] = [ { name: 'Arbitrum One', type: 'Mainnet', - rpc: 'https://arb-mainnet.g.alchemy.com/v2/2wVx68PmeMUCVgcMc9H-bKcnLDFBYlFS', + rpc: 'https://arb-mainnet.g.alchemy.com/v2/XdKzWdaghDdUzC-XT-sVzCzjH9EHOHpV', id: 42161, network: 'ARBITRUM', + maxEvents: 2000, symbol: 'ETH', explorerurl: 'https://arbiscan.io/', }, diff --git a/src/events.ts b/src/events.ts index 00da7b2..e2c6b56 100644 --- a/src/events.ts +++ b/src/events.ts @@ -3,7 +3,11 @@ import logger from 'logger/logger' const envFile = process.env.NODE_ENV && process.env.NODE_ENV === 'production' ? `.env.production` : '.env.development' dotenv.config({ path: envFile }) -const events = require('../config/event_list.json') +const listFile = + process.env.NODE_ENV && process.env.NODE_ENV === 'production' + ? `../config/event_list_production.json` + : '../config/event_list.json' +const events = require(listFile) import 'common/Extend' import { AllChains, IChain } from 'chain/allchain' diff --git a/src/interface/IEventCfg.ts b/src/interface/IEventCfg.ts index 30ec788..51b3263 100644 --- a/src/interface/IEventCfg.ts +++ b/src/interface/IEventCfg.ts @@ -3,6 +3,7 @@ export interface IEventCfg { event: string abi: any fromBlock: number + toBlock?: number eventProcesser?: string chain: number topic?: string diff --git a/src/models/CheckIn.ts b/src/models/CheckIn.ts index 0f12c52..3c2a7cf 100644 --- a/src/models/CheckIn.ts +++ b/src/models/CheckIn.ts @@ -2,7 +2,7 @@ import { getModelForClass, index, modelOptions, prop } from '@typegoose/typegoos import { dbconn } from 'decorators/dbconn' import { BaseModule } from './Base' import { formatDate, yesterday } from 'zutils/utils/date.util' -import { logger } from '@typegoose/typegoose/lib/logSettings' +import logger from 'logger/logger' @dbconn() @index({ from: 1 }, { unique: false }) diff --git a/src/service/event.batch.service.ts b/src/service/event.batch.service.ts index 3763112..d87ad40 100644 --- a/src/service/event.batch.service.ts +++ b/src/service/event.batch.service.ts @@ -17,6 +17,7 @@ let eventProcessers = { NftStake: NftStake, } +const INTERVAL = 100 export class EventBatchSvr { chainCfg: IChain eventCfgs: IEventCfg[] = [] @@ -39,6 +40,37 @@ export class EventBatchSvr { this.redisKey = `event_${this.chainCfg.id}` } + async divQueryPassEvents(cfg: IEventCfg, fromBlock: number, toBlock: number) { + const middle = ((fromBlock + toBlock) / 2) | 0 + const middlePlusOne = middle + 1 + const firstHalfEvents = await this.getPastEvents(cfg, fromBlock, middle) + const secondHalfEvents = await this.getPastEvents(cfg, middlePlusOne, toBlock) + return [...firstHalfEvents, ...secondHalfEvents] + } + + async getPastEvents(cfg: IEventCfg, fromBlock: number, toBlock: number) { + let events = [] + logger.info(`get past events: ${cfg.address}, ${cfg.event}, ${fromBlock}, ${toBlock}`) + try { + const _param = this.buildQueryParams(cfg, fromBlock, toBlock) + const result = await ethGetLogs(this.rpc, _param.params) + if (result.error) { + if (result.error.message && /Log response size exceeded/.test(result.error.message)) { + events = await this.divQueryPassEvents(cfg, fromBlock, toBlock) + } else { + throw result.error + } + logger.info('fetch history error: ', result.error.message || result.error) + } else { + events = result.result + } + } catch (e) { + if (e.message && /Log response size exceeded/.test(e.message)) { + events = await this.divQueryPassEvents(cfg, fromBlock, toBlock) + } + } + return events + } async execute() { let currentBlock = await retryEthBlockNumber(this.rpc) let toBlock = parseInt(currentBlock.result, 16) @@ -47,18 +79,14 @@ export class EventBatchSvr { this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock) } logger.info(`sync events with chain: ${this.chainCfg.id}, from: ${this.fromBlock}, to: ${toBlock}`) - let params = [] let uninParams = [] let topicsSet = new Set() for (let cfg of this.eventCfgs) { - await this.fixBlockNumber(cfg) - if (cfg.fromBlock != this.fromBlock) { - let _param = this.buildQueryParams(cfg, toBlock) - params.push(_param) - } else { + await this.fixBlockNumber(cfg, this.chainCfg, toBlock) + if (cfg.fromBlock === this.fromBlock && cfg.toBlock === toBlock) { let _param = uninParams[uninParams.length - 1] if (!_param || !topicsSet.has(cfg.topic)) { - _param = this.buildQueryParams(cfg, toBlock) + _param = this.buildQueryParams(cfg) uninParams.push(_param) topicsSet = new Set() topicsSet.add(cfg.topic) @@ -69,6 +97,9 @@ export class EventBatchSvr { } _param.params[0].address.push(cfg.address) } + } else { + const historyEvents = await this.getPastEvents(cfg, cfg.fromBlock, toBlock) + await this.processEvents(historyEvents) } } let nextBlock = this.fromBlock @@ -90,20 +121,10 @@ export class EventBatchSvr { } await this.processEvents(result.result) } - if (params.length > 0) { - logger.debug(`params: ${params.length}`) - let results = await batchEthLogs(this.rpc, params) - for (let i = 0; i < results.length; i++) { - if (results[i].error) { - throw results[i].error - } - let events = results[i].result - await this.processEvents(events) - } - } nextBlock = toBlock + 1 for (let cfg of this.eventCfgs) { cfg.fromBlock = nextBlock + cfg.toBlock = 0 const redisKey = this.buildRedisKey(cfg) await new ZRedisClient().set(redisKey, cfg.fromBlock + '') } @@ -114,17 +135,24 @@ export class EventBatchSvr { return `event_${this.chainCfg.id}_${cfg.address}_${cfg.event}` } - async fixBlockNumber(cfg: IEventCfg) { + async fixBlockNumber(cfg: IEventCfg, chainCfg: IChain, toBlock: number) { const redisKey = this.buildRedisKey(cfg) let blockStr = await new ZRedisClient().get(redisKey) if (blockStr) { cfg.fromBlock = Math.max(parseInt(blockStr), cfg.fromBlock) } + if (chainCfg.maxEvents) { + cfg.toBlock = Math.min(cfg.fromBlock + chainCfg.maxEvents, toBlock) + } else { + cfg.toBlock = toBlock + } } - buildQueryParams(cfg: IEventCfg, toBlock?: number) { + buildQueryParams(cfg: IEventCfg, fromBlock?: number, toBlock?: number) { + fromBlock = fromBlock || cfg.fromBlock + toBlock = toBlock || cfg.toBlock const params: any = { - fromBlock: '0x' + cfg.fromBlock.toString(16), + fromBlock: '0x' + fromBlock.toString(16), toBlock: toBlock ? '0x' + toBlock?.toString(16) : 'latest', address: [cfg.address], } diff --git a/start_event_release.json b/start_event_release.json new file mode 100644 index 0000000..109c65f --- /dev/null +++ b/start_event_release.json @@ -0,0 +1,22 @@ +{ + "apps": [ + { + "name": "chain-event", + "script": "npm", + "args": "run prod:event", + "cwd": "/home/kingsome/code/web_chain_client", + "max_memory_restart": "1024M", + "log_date_format": "YYYY-MM-DD HH:mm Z", + "watch": false, + "ignore_watch": ["node_modules", "logs", "fixtures", "tasks"], + "instances": 1, + "exec_mode": "fork", + "env": { + "NODE_ENV": "production" + }, + "env_production": { + "NODE_ENV": "production" + } + } + ] +}