优化抓取逻辑
This commit is contained in:
parent
65abad1eb3
commit
4ce4eac368
@ -1,7 +1,7 @@
|
|||||||
[
|
[
|
||||||
{
|
{
|
||||||
"chain": 421614,
|
"chain": 421614,
|
||||||
"rpc": "https://arb-sepolia.g.alchemy.com/v2/mHoYM0SyjeizxvdjShcdOHiCrXOM_mlg",
|
"rpc": "https://arb-sepolia.g.alchemy.com/v2/EKR1je8ZGia332kkemNc4mtXQuFskIq3",
|
||||||
"fromBlock": 5624211,
|
"fromBlock": 5624211,
|
||||||
"filters": [{
|
"filters": [{
|
||||||
"key": "input",
|
"key": "input",
|
||||||
|
@ -2,7 +2,6 @@ import { IChain } from "chain/allchain";
|
|||||||
import { retryEthBlockNumber } from "chain/chain.api";
|
import { retryEthBlockNumber } from "chain/chain.api";
|
||||||
import { IScriptionCfg } from "interface/IScriptionCfg";
|
import { IScriptionCfg } from "interface/IScriptionCfg";
|
||||||
import logger from "logger/logger";
|
import logger from "logger/logger";
|
||||||
import { CheckIn } from "models/CheckIn";
|
|
||||||
import { RedisClient } from "redis/RedisClient";
|
import { RedisClient } from "redis/RedisClient";
|
||||||
import { getPastBlocksIter } from "utils/block.util";
|
import { getPastBlocksIter } from "utils/block.util";
|
||||||
import { formatDate } from "utils/date.util";
|
import { formatDate } from "utils/date.util";
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import { IChain } from "chain/allchain";
|
import { IChain } from "chain/allchain";
|
||||||
import { batchEthLogs, ethGetLogs } from "chain/chain.api";
|
import { batchEthLogs, ethBlockNumber, ethGetLogs, retryEthBlockNumber } from "chain/chain.api";
|
||||||
import logger from "logger/logger";
|
import logger from "logger/logger";
|
||||||
import { GeneralEvent } from "models/GeneralEvent";
|
import { GeneralEvent } from "models/GeneralEvent";
|
||||||
|
|
||||||
@ -10,6 +10,7 @@ import { NftHolder } from 'models/NftHolder'
|
|||||||
import { TokenHolder } from 'models/TokenHolder'
|
import { TokenHolder } from 'models/TokenHolder'
|
||||||
import { NftStake } from 'models/NftStake'
|
import { NftStake } from 'models/NftStake'
|
||||||
import { IEventCfg } from "interface/IEventCfg";
|
import { IEventCfg } from "interface/IEventCfg";
|
||||||
|
import { parse } from "path";
|
||||||
|
|
||||||
let eventProcessers = {
|
let eventProcessers = {
|
||||||
NftHolder: NftHolder,
|
NftHolder: NftHolder,
|
||||||
@ -44,24 +45,25 @@ export class EventBatchSvr {
|
|||||||
async execute() {
|
async execute() {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// let currentBlock = await ethBlockNumber(this.rpc)
|
let currentBlock = await retryEthBlockNumber(this.rpc)
|
||||||
|
let toBlock = parseInt(currentBlock.result, 16)
|
||||||
let blockStr = await new RedisClient().get(this.redisKey)
|
let blockStr = await new RedisClient().get(this.redisKey)
|
||||||
if (blockStr) {
|
if (blockStr) {
|
||||||
this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock)
|
this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock)
|
||||||
}
|
}
|
||||||
logger.info(`begin sync events with chain: ${this.chainCfg.id}, from block: ${this.fromBlock}`)
|
logger.info(`sync events with chain: ${this.chainCfg.id}, from: ${this.fromBlock}, to: ${toBlock}`)
|
||||||
let params = []
|
let params = []
|
||||||
let uninParams = []
|
let uninParams = []
|
||||||
let topicsSet = new Set()
|
let topicsSet = new Set()
|
||||||
for (let cfg of this.eventCfgs) {
|
for (let cfg of this.eventCfgs) {
|
||||||
await this.fixBlockNumber(cfg)
|
await this.fixBlockNumber(cfg);
|
||||||
if (cfg.fromBlock != this.fromBlock) {
|
if (cfg.fromBlock != this.fromBlock) {
|
||||||
let _param = this.buildQueryParams(cfg)
|
let _param = this.buildQueryParams(cfg, toBlock)
|
||||||
params.push(_param)
|
params.push(_param)
|
||||||
} else {
|
} else {
|
||||||
let _param = uninParams[uninParams.length - 1]
|
let _param = uninParams[uninParams.length - 1]
|
||||||
if (!_param || topicsSet.size > MAX_TOPICS) {
|
if (!_param || topicsSet.size > MAX_TOPICS) {
|
||||||
_param = this.buildQueryParams(cfg)
|
_param = this.buildQueryParams(cfg, toBlock)
|
||||||
uninParams.push(_param)
|
uninParams.push(_param)
|
||||||
topicsSet = new Set()
|
topicsSet = new Set()
|
||||||
} else {
|
} else {
|
||||||
@ -82,8 +84,7 @@ export class EventBatchSvr {
|
|||||||
throw results[i].error
|
throw results[i].error
|
||||||
}
|
}
|
||||||
let events = results[i].result
|
let events = results[i].result
|
||||||
let num = await this.processEvents(events)
|
await this.processEvents(events)
|
||||||
nextBlock = Math.max(num, nextBlock)
|
|
||||||
}
|
}
|
||||||
} else if (uninParams.length == 1) {
|
} else if (uninParams.length == 1) {
|
||||||
logger.debug(`unin params length 1, use eth_getLogs`)
|
logger.debug(`unin params length 1, use eth_getLogs`)
|
||||||
@ -91,8 +92,7 @@ export class EventBatchSvr {
|
|||||||
if (result.error) {
|
if (result.error) {
|
||||||
throw result.error
|
throw result.error
|
||||||
}
|
}
|
||||||
let num = await this.processEvents(result.result)
|
await this.processEvents(result.result)
|
||||||
nextBlock = Math.max(num, nextBlock)
|
|
||||||
}
|
}
|
||||||
if (params.length > 0) {
|
if (params.length > 0) {
|
||||||
logger.debug(`params: ${params.length}`)
|
logger.debug(`params: ${params.length}`)
|
||||||
@ -102,11 +102,10 @@ export class EventBatchSvr {
|
|||||||
throw results[i].error
|
throw results[i].error
|
||||||
}
|
}
|
||||||
let events = results[i].result
|
let events = results[i].result
|
||||||
let num = await this.processEvents(events)
|
await this.processEvents(events)
|
||||||
nextBlock = Math.max(num, nextBlock)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nextBlock ++;
|
nextBlock = toBlock + 1;
|
||||||
for (let cfg of this.eventCfgs) {
|
for (let cfg of this.eventCfgs) {
|
||||||
cfg.fromBlock = nextBlock
|
cfg.fromBlock = nextBlock
|
||||||
const redisKey = this.buildRedisKey(cfg)
|
const redisKey = this.buildRedisKey(cfg)
|
||||||
@ -131,10 +130,10 @@ export class EventBatchSvr {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
buildQueryParams(cfg: IEventCfg) {
|
buildQueryParams(cfg: IEventCfg, toBlock?: number) {
|
||||||
const params: any = {
|
const params: any = {
|
||||||
fromBlock: '0x' + cfg.fromBlock.toString(16),
|
fromBlock: '0x' + cfg.fromBlock.toString(16),
|
||||||
toBlock: 'latest',
|
toBlock: toBlock ? '0x' + toBlock?.toString(16) : 'latest',
|
||||||
address: [cfg.address]
|
address: [cfg.address]
|
||||||
}
|
}
|
||||||
params.topics = [cfg.topic]
|
params.topics = [cfg.topic]
|
||||||
|
@ -18,7 +18,7 @@ export async function divQueryPassBlocks({chainId, rpc, fromBlock, amount}
|
|||||||
export async function getPastBlocks({chainId, rpc, fromBlock, amount}
|
export async function getPastBlocks({chainId, rpc, fromBlock, amount}
|
||||||
: {chainId: number, rpc: string, fromBlock: number, amount: number}) {
|
: {chainId: number, rpc: string, fromBlock: number, amount: number}) {
|
||||||
let blocks = []
|
let blocks = []
|
||||||
logger.log(`getPastBlocks: ${chainId} from: ${fromBlock} amount: ${amount}`)
|
logger.info(`getPastBlocks: ${chainId} from: ${fromBlock} amount: ${amount}`)
|
||||||
let blockNumber = fromBlock
|
let blockNumber = fromBlock
|
||||||
const redisKey = `blocknum_${chainId}`
|
const redisKey = `blocknum_${chainId}`
|
||||||
let retryCount = 0;
|
let retryCount = 0;
|
||||||
@ -78,7 +78,7 @@ export async function getPastBlocks({chainId, rpc, fromBlock, amount}
|
|||||||
|
|
||||||
export function* getPastBlocksIter({chainId, rpc, fromBlock, amount}
|
export function* getPastBlocksIter({chainId, rpc, fromBlock, amount}
|
||||||
: {chainId: number, rpc: string, fromBlock: number, amount: number}) {
|
: {chainId: number, rpc: string, fromBlock: number, amount: number}) {
|
||||||
logger.debug(`*getPastBlocksIter: ${chainId} from: ${fromBlock} amount: ${amount}`)
|
logger.info(`*getPastBlocksIter: ${chainId} from: ${fromBlock} amount: ${amount}`)
|
||||||
let remain = amount
|
let remain = amount
|
||||||
while (remain > 0) {
|
while (remain > 0) {
|
||||||
yield getPastBlocks({chainId, rpc, fromBlock, amount: Math.min(MAX_BATCH_AMOUNT, remain)})
|
yield getPastBlocks({chainId, rpc, fromBlock, amount: Math.min(MAX_BATCH_AMOUNT, remain)})
|
||||||
|
Loading…
x
Reference in New Issue
Block a user