优化块高同步程序, 改用批量查询来获取块数据

This commit is contained in:
CounterFire2023 2024-01-08 14:55:20 +08:00
parent c2bad5d5e3
commit ad86cb3eb8
10 changed files with 180 additions and 58 deletions

View File

@ -277,7 +277,7 @@ export const AllChains: IChain[] = [
{
name: 'Arbitrum Sepolia',
type: 'Testnet',
rpc: 'https://arbitrum-sepolia.infura.io/v3/b6bf7d3508c941499b10025c0776eaf8',
rpc: 'https://sepolia-rollup.arbitrum.io/rpc|https://arbitrum-sepolia.infura.io/v3/b6bf7d3508c941499b10025c0776eaf8',
id: 421614,
network: 'ARB_SEPOLIA',
symbol: 'ETH',

View File

@ -26,4 +26,25 @@ export const ethGetBlockByNumber = async (rpc: string, blockNumber: string) => {
export const ethGetLogs = async (rpc: string, params: any) => {
return requestChain(rpc, "eth_getLogs", [params])
}
export const batchEthBlocks = async (rpc: string, blockNumber: number, amount: number) => {
let batch = []
for (let i = 0; i < amount; i++) {
batch.push({
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: ["0x" + (blockNumber + i).toString(16), true],
id: blockNumber + i
})
}
return fetch(rpc, {
method: "POST",
headers: {
"Content-Type": "application/json; charset=utf-8"
},
body: JSON.stringify(batch)
})
.then((res) => res.json())
}

View File

@ -2,21 +2,24 @@ import { CheckIn } from "models/CheckIn"
export interface IScriptionCfg {
chain: number,
rpc?: string,
fromBlock: number,
filter: (event: any) => boolean,
eventProcesser: (event: any) => Promise<void>,
process: (event: any) => Promise<void>,
}
export const SCRIPTIONS_CFG: IScriptionCfg[] = [
{
chain: 421614,
// rpc: 'https://arbitrum-sepolia.infura.io/v3/25559ac58e714177b31ff48d507e7ac9',
rpc: 'https://arb-sepolia.g.alchemy.com/v2/EKR1je8ZGia332kkemNc4mtXQuFskIq3',
fromBlock: 5063559,
filter: (event: any) => {
return ( event.input === '0x646174613a2c7b2270223a2263662d3230222c226f70223a22636869636b227d'
&& event.to.toLowerCase() === '0x50a8e60041a206acaa5f844a1104896224be6f39')
},
eventProcesser: async (event: any) => {
return CheckIn.saveEvent(event)
process: async (tx: any) => {
return CheckIn.saveEvent(tx)
}
}
]

View File

@ -9,26 +9,32 @@ class TaskController extends BaseController {
@role('anon')
@router('post /task/check_in')
async checkDailyCheckIn(req, res) {
let { address, days } = req.params
if (!address || !days) {
throw new ZError(10, 'address is required')
let { address, days, limit } = req.params
if (!address || (!days && !limit)) {
throw new ZError(10, 'params mismatch')
}
let query: any = { from: address }
if (typeof days === 'number') {
let begin = getNDayAgo(days, true)
query.blockTime = {$gt: begin.getTime() / 1000 | 0}
} else if (typeof days === 'string') {
if (days === '1month') {
let date = getMonthBegin(new Date())
query.blockTime = {$gt: date.getTime() / 1000 | 0}
} else {
query.dateTag = days
if (!limit) {
if (typeof days === 'number') {
let begin = getNDayAgo(days, true)
query.blockTime = {$gt: begin.getTime() / 1000 | 0}
} else if (typeof days === 'string') {
if (days === '1month') {
let date = getMonthBegin(new Date())
query.blockTime = {$gt: date.getTime() / 1000 | 0}
} else {
query.dateTag = days
}
} else if (Array.isArray(days)) {
query.dateTag = {$in: days}
}
} else if (Array.isArray(days)) {
query.dateTag = {$in: days}
}
let records = await CheckIn.find({ from: address, dateTag: {$in: days}})
let records
if (limit) {
records = await CheckIn.find(query).limit(limit)
} else {
records = await CheckIn.find(query)
}
let result = []
for (let record of records) {
result.push(record.toJson())

27
src/models/BlockData.ts Normal file
View File

@ -0,0 +1,27 @@
import { Severity, getModelForClass, index, modelOptions, mongoose, prop } from '@typegoose/typegoose'
import { dbconn } from 'decorators/dbconn'
import { BaseModule } from './Base'
@dbconn()
@index({ hash: 1 }, { unique: true })
@modelOptions({
schemaOptions: { collection: 'block_data', timestamps: true },
options: { allowMixed: Severity.ALLOW },
})
export class BlockDataClass extends BaseModule {
@prop({ required: true })
public hash!: string
@prop()
public chainId: number
@prop({ type: mongoose.Schema.Types.Mixed})
public data: any
public static async saveBlock(event: any) {
return BlockData.insertOrUpdate({ hash: event.hash }, {data: event})
}
}
export const BlockData = getModelForClass(BlockDataClass, {
existingConnection: BlockDataClass['db'],
})

View File

@ -36,7 +36,7 @@ async function initEventSvrs() {
async function parseAllEvents() {
if (lock) {
logger.info('sync in process, cancel.')
logger.warn('sync in process, cancel.')
return
}
lock = true
@ -59,7 +59,7 @@ async function parseAllEvents() {
await initEventSvrs()
setInterval(function () {
parseAllEvents()
}, 500)
}, 10000)
parseAllEvents()
})();

View File

@ -1,67 +1,73 @@
import { IChain } from "chain/allchain";
import { ethGetBlockByNumber } from "chain/chain.api";
import { ethBlockNumber } from "chain/chain.api";
import { IScriptionCfg } from "config/scriptions_cfg";
import logger from "logger/logger";
import { BlockData } from "models/BlockData";
import { RedisClient } from "redis/RedisClient";
import { getPastBlocksIter } from "utils/block.util";
import { formatDate } from "utils/date.util";
import { retry } from "utils/promise.util";
export class BlockSyncSvr {
chainCfg: IChain
scriptionCfgs: IScriptionCfg[] = []
fromBlock: number = Number.MAX_SAFE_INTEGER
redisKey = ''
rpc = '';
constructor(_chainCfg: IChain, _scriptionCfgs: IScriptionCfg[]) {
this.chainCfg =_chainCfg
this.scriptionCfgs = _scriptionCfgs
this.rpc = _chainCfg.rpc.split('|')[0]
for (let cfg of _scriptionCfgs) {
this.fromBlock = Math.min(this.fromBlock, cfg.fromBlock)
if (cfg.rpc) {
this.rpc = cfg.rpc
}
}
this.redisKey = `blocknum_${this.chainCfg.id}`
}
async execute() {
try {
let currentBlock = await ethBlockNumber(this.rpc)
let blockStr = await new RedisClient().get(this.redisKey)
if (blockStr) {
this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock)
}
const block = await this.fetchBlock(this.chainCfg.rpc.split('|'), this.fromBlock);
if (!block.result.hash) {
logger.log(`${this.fromBlock} block not found`)
return
}
for (let i = 0; i < block.result.transactions.length; i++) {
const tx = block.result.transactions[i];
if (block.result.timestamp) {
tx.blockTime = parseInt(block.result.timestamp, 16);
tx.dateTag = formatDate(new Date(tx.blockTime * 1000));
}
for (let j = 0; j < this.scriptionCfgs.length; j++) {
const cfg = this.scriptionCfgs[j];
if (cfg.filter(tx)) {
await cfg.eventProcesser(tx)
}
}
}
await new RedisClient().set(this.redisKey, this.fromBlock + 1 + '')
const amount = parseInt(currentBlock.result, 16) - this.fromBlock
let blocks = getPastBlocksIter({
chainId: this.chainCfg.id,
rpc: this.rpc,
fromBlock: this.fromBlock,
amount
})
await this.processBlockDatas(blocks)
} catch (err) {
console.log(err)
}
}
async fetchBlock(rpcs: string[], blockNumber: number) {
const blockNumberHex = '0x' + blockNumber.toString(16)
for (let rpc of rpcs) {
try {
let res = await retry(() => ethGetBlockByNumber(rpc, blockNumberHex), { maxRetries: 3, whitelistErrors: [] })
return res;
} catch (err) {
console.log(err)
throw err
async processBlockDatas(iterator: any) {
for (const getPastBlockPromise of iterator) {
const blocks = await getPastBlockPromise
for (const block of blocks) {
await BlockData.saveBlock(block)
if (!block.transactions || block.transactions.length === 0) {
continue
}
for (let i = 0; i < block.transactions.length; i++) {
const tx = block.transactions[i];
if (block.timestamp) {
tx.blockTime = parseInt(block.timestamp, 16);
tx.dateTag = formatDate(new Date(tx.blockTime * 1000));
}
for (let j = 0; j < this.scriptionCfgs.length; j++) {
const cfg = this.scriptionCfgs[j];
if (cfg.filter(tx)) {
await cfg.process(tx)
}
}
}
}
}
}
}

View File

@ -2,7 +2,6 @@ import assert from 'assert'
import { AllChains } from 'chain/allchain'
import { HttpRetryProvider } from 'chain/HttpRetryProvider'
import logger from 'logger/logger'
import { NftTransferEvent } from 'models/NftTransferEvent'
import { RedisClient } from 'redis/RedisClient'
import { clearTimeCache, getPastEventsIter, processEvents } from 'utils/contract.util'

60
src/utils/block.util.ts Normal file
View File

@ -0,0 +1,60 @@
import { batchEthBlocks } from "chain/chain.api";
import logger from "logger/logger";
import { RedisClient } from "redis/RedisClient";
const MAX_BATCH_AMOUNT = 500
const REQUEST_INTERVAL = 0.5 * 1000
export async function divQueryPassBlocks({chainId, rpc, fromBlock, amount}
: {chainId: number, rpc: string, fromBlock: number, amount: number}) {
const middleBlock = fromBlock + Math.floor(amount / 2)
const firstBlocks = await getPastBlocks({chainId, rpc, fromBlock, amount: middleBlock - fromBlock})
const secondBlocks = await getPastBlocks({chainId, rpc, fromBlock: middleBlock, amount: amount - (middleBlock - fromBlock)})
return [...firstBlocks, ...secondBlocks]
}
export async function getPastBlocks({chainId, rpc, fromBlock, amount}
: {chainId: number, rpc: string, fromBlock: number, amount: number}) {
let blocks = []
logger.log(`getPastBlocks: ${chainId} from: ${fromBlock} amount: ${amount}`)
let blockNumber = fromBlock
const redisKey = `blocknum_${chainId}`
try {
let res = await batchEthBlocks(rpc, blockNumber, amount)
if (res.error) {
throw new Error(res.error.message)
}
for (let i = 0; i < res.length; i++) {
const block = res[i].result;
if (block) {
blocks.push(block)
}
}
await new RedisClient().set(redisKey, blockNumber + amount + '')
await new Promise(resolve => setTimeout(resolve, REQUEST_INTERVAL))
} catch (e) {
logger.log(e.message || e)
if (e.message && /Too Many Requests/.test(e.message) && amount > 1) {
blocks = await divQueryPassBlocks({chainId, rpc, fromBlock, amount})
} else if (e.message && /Public RPC Rate Limit Hit, limit will reset in \d+ seconds/.test(e.message)) {
const match = e.message.match(/Public RPC Rate Limit Hit, limit will reset in (\d+) seconds/)
const seconds = parseInt(match[1])
await new Promise(resolve => setTimeout(resolve, seconds * 1000))
blocks = await getPastBlocks({chainId, rpc, fromBlock, amount})
}else {
throw e
}
}
return blocks
}
export function* getPastBlocksIter({chainId, rpc, fromBlock, amount}
: {chainId: number, rpc: string, fromBlock: number, amount: number}) {
logger.debug(`*getPastBlocksIter: ${chainId} from: ${fromBlock} amount: ${amount}`)
let remain = amount
while (remain > 0) {
yield getPastBlocks({chainId, rpc, fromBlock, amount: Math.min(MAX_BATCH_AMOUNT, remain)})
fromBlock += MAX_BATCH_AMOUNT
remain -= MAX_BATCH_AMOUNT
}
}

View File

@ -144,7 +144,7 @@ export async function getPastEvents({
toBlock,
})
if (events.length >= RESULT_LIMIT_COUNT) {
events = splitQueryEvents({
events = await splitQueryEvents({
contract,
event,
fromBlock,
@ -155,7 +155,7 @@ export async function getPastEvents({
}
} catch (e) {
if (e.message && /query returned more than \d+ results/.test(e.message)) {
events = divQueryPassEvents({
events = await divQueryPassEvents({
contract,
event,
fromBlock,