229 lines
5.4 KiB
TypeScript
229 lines
5.4 KiB
TypeScript
import logger from 'logger/logger'
|
|
import { toBN } from './number.util'
|
|
import { BN } from 'ethereumjs-util'
|
|
import { RedisClient } from 'redis/RedisClient'
|
|
|
|
const ONE = toBN(1)
|
|
const TWO = toBN(2)
|
|
const queryRange = toBN(1000)
|
|
// 返回数据如果达到这个数值, 需要拆分块的区间, 重新获取
|
|
const RESULT_LIMIT_COUNT = 99
|
|
// 单个块event数量超过该值, 需要独立请求
|
|
const SPLIT_LIMIT_COUNT = 40
|
|
|
|
const blockTimeMap: Map<number, number> = new Map()
|
|
|
|
async function divQueryPassEvents({
|
|
contract,
|
|
event,
|
|
fromBlock,
|
|
toBlock,
|
|
options,
|
|
}: {
|
|
contract: any
|
|
event: string
|
|
fromBlock: BN
|
|
toBlock: BN
|
|
options?: any
|
|
}) {
|
|
const middle = fromBlock.add(toBlock).divRound(TWO)
|
|
const middlePlusOne = middle.add(ONE)
|
|
|
|
const firstHalfEvents = await getPastEvents({
|
|
contract,
|
|
event,
|
|
fromBlock,
|
|
toBlock: middle,
|
|
options,
|
|
})
|
|
const secondHalfEvents = await getPastEvents({
|
|
contract,
|
|
event,
|
|
fromBlock: middlePlusOne,
|
|
toBlock,
|
|
options,
|
|
})
|
|
return [...firstHalfEvents, ...secondHalfEvents]
|
|
}
|
|
/**
|
|
* 某些链最多返回99条数据, 针对这种情况, 如下方式处理
|
|
* 1. 分析现有事件, 对于同一block返回数量超过设定值的, 单独拿出来
|
|
* 2. 比如 fromBlock: 0, toBlock: 100, 51和54号块事件数量超标
|
|
* 3. 那么 分别查询 0-50, 51-51, 52-53, 54-54, 54-100
|
|
* @param param0
|
|
* @returns
|
|
*/
|
|
async function splitQueryEvents({
|
|
contract,
|
|
event,
|
|
fromBlock,
|
|
toBlock,
|
|
options,
|
|
events,
|
|
}: {
|
|
contract: any
|
|
event: string
|
|
fromBlock: BN
|
|
toBlock: BN
|
|
options?: any
|
|
events: any[]
|
|
}) {
|
|
let countMap: Map<string, number> = new Map()
|
|
for (let event of events) {
|
|
countMap.inc(event.blockNumber, 1)
|
|
}
|
|
let blockArr: number[] = []
|
|
for (let [key, val] of countMap.entries()) {
|
|
if (val >= SPLIT_LIMIT_COUNT) {
|
|
blockArr.push(parseInt(key))
|
|
}
|
|
}
|
|
blockArr.sort((a, b) => a - b)
|
|
let results: any[] = []
|
|
let preBlock = fromBlock
|
|
for (let i = 0; i < blockArr.length; i++) {
|
|
const block = toBN(blockArr[i])
|
|
let subFromBlock = preBlock
|
|
let subToBlock = block
|
|
if (!preBlock.eq(block)) {
|
|
const partEvents0 = await getPastEvents({
|
|
contract,
|
|
event,
|
|
fromBlock: subFromBlock,
|
|
toBlock: subToBlock.sub(ONE),
|
|
options,
|
|
})
|
|
results = results.concat(partEvents0)
|
|
}
|
|
|
|
let partEvents1 = await getPastEvents({
|
|
contract,
|
|
event,
|
|
fromBlock: subToBlock,
|
|
toBlock: subToBlock,
|
|
options,
|
|
})
|
|
results = results.concat(partEvents1)
|
|
|
|
if (i === blockArr.length - 1) {
|
|
if (!subToBlock.eq(toBlock)) {
|
|
let partEvents2 = await getPastEvents({
|
|
contract,
|
|
event,
|
|
fromBlock: subToBlock.add(ONE),
|
|
toBlock: toBlock,
|
|
options,
|
|
})
|
|
results = results.concat(partEvents2)
|
|
}
|
|
}
|
|
preBlock = block.add(ONE)
|
|
}
|
|
return results
|
|
}
|
|
|
|
export async function getPastEvents({
|
|
contract,
|
|
event,
|
|
fromBlock,
|
|
toBlock,
|
|
options,
|
|
}: {
|
|
contract: any
|
|
event: string
|
|
fromBlock: BN
|
|
toBlock: BN
|
|
options?: any
|
|
}) {
|
|
logger.debug(`${contract.options.address}: ${event} from: ${fromBlock} to: ${toBlock}`)
|
|
let events
|
|
try {
|
|
events = await contract.getPastEvents(event, {
|
|
...options,
|
|
fromBlock,
|
|
toBlock,
|
|
})
|
|
if (events.length >= RESULT_LIMIT_COUNT) {
|
|
events = splitQueryEvents({
|
|
contract,
|
|
event,
|
|
fromBlock,
|
|
toBlock,
|
|
options,
|
|
events,
|
|
})
|
|
}
|
|
} catch (e) {
|
|
if (
|
|
e.message &&
|
|
(/query returned more than \d+ results/.test(e.message) ||
|
|
/logs matched by query exceeds limit of \d/.test(e.message))
|
|
) {
|
|
events = divQueryPassEvents({
|
|
contract,
|
|
event,
|
|
fromBlock,
|
|
toBlock,
|
|
options,
|
|
})
|
|
} else {
|
|
throw new Error(e)
|
|
}
|
|
}
|
|
return events
|
|
}
|
|
|
|
export function* getPastEventsIter({
|
|
chain,
|
|
contract,
|
|
event,
|
|
fromBlock,
|
|
toBlock,
|
|
options,
|
|
}: {
|
|
chain: number
|
|
contract: any
|
|
event: string
|
|
fromBlock: number
|
|
toBlock: number
|
|
options?: any
|
|
}) {
|
|
const address = contract.options.address
|
|
const redisKey = `${chain}_${address.toLowerCase()}_${event}`
|
|
logger.debug(`*getPastEventsIter: ${event} from: ${fromBlock} to: ${toBlock}`)
|
|
let from = toBN(fromBlock)
|
|
let to = toBN(fromBlock).add(queryRange)
|
|
const toBlockBN = toBN(toBlock)
|
|
while (to.lt(toBlockBN)) {
|
|
yield getPastEvents({ contract, event, fromBlock: from, toBlock: to, options })
|
|
from = to.add(ONE)
|
|
to = to.add(queryRange)
|
|
yield new RedisClient().set(redisKey, from + '')
|
|
}
|
|
yield getPastEvents({ contract, event, fromBlock: from, toBlock: toBlockBN, options })
|
|
yield new RedisClient().set(redisKey, toBlockBN.add(ONE) + '')
|
|
}
|
|
|
|
export async function processEvents(web3, iterator, chain: number, processedEvent) {
|
|
for (const getPastEventPromise of iterator) {
|
|
const events = await getPastEventPromise
|
|
for (const event of events) {
|
|
if (event?.blockNumber) {
|
|
if (blockTimeMap.has(event.blockNumber)) {
|
|
event.timestamp = blockTimeMap.get(event.blockNumber)
|
|
} else {
|
|
const blockData = await web3.eth.getBlock(event.blockNumber)
|
|
event.timestamp = blockData.timestamp
|
|
blockTimeMap.set(event.blockNumber, blockData.timestamp)
|
|
}
|
|
}
|
|
event.chain = chain
|
|
await processedEvent(event)
|
|
}
|
|
}
|
|
}
|
|
|
|
export function clearTimeCache() {
|
|
blockTimeMap.clear()
|
|
}
|