From ad86cb3eb8b56efc795b51a484f29d2e8a3fbab9 Mon Sep 17 00:00:00 2001 From: CounterFire2023 <136581895+CounterFire2023@users.noreply.github.com> Date: Mon, 8 Jan 2024 14:55:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=9D=97=E9=AB=98=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E7=A8=8B=E5=BA=8F,=20=E6=94=B9=E7=94=A8=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E6=9F=A5=E8=AF=A2=E6=9D=A5=E8=8E=B7=E5=8F=96=E5=9D=97?= =?UTF-8?q?=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chain/allchain.ts | 2 +- src/chain/chain.api.ts | 21 +++++++++ src/config/scriptions_cfg.ts | 9 ++-- src/controllers/task.controllers.ts | 38 ++++++++------- src/models/BlockData.ts | 27 +++++++++++ src/scriptions.ts | 4 +- src/service/block.sync.service.ts | 72 ++++++++++++++++------------- src/service/event.sync.service.ts | 1 - src/utils/block.util.ts | 60 ++++++++++++++++++++++++ src/utils/contract.util.ts | 4 +- 10 files changed, 180 insertions(+), 58 deletions(-) create mode 100644 src/models/BlockData.ts create mode 100644 src/utils/block.util.ts diff --git a/src/chain/allchain.ts b/src/chain/allchain.ts index e6d4cc3..af1c5ec 100644 --- a/src/chain/allchain.ts +++ b/src/chain/allchain.ts @@ -277,7 +277,7 @@ export const AllChains: IChain[] = [ { name: 'Arbitrum Sepolia', type: 'Testnet', - rpc: 'https://arbitrum-sepolia.infura.io/v3/b6bf7d3508c941499b10025c0776eaf8', + rpc: 'https://sepolia-rollup.arbitrum.io/rpc|https://arbitrum-sepolia.infura.io/v3/b6bf7d3508c941499b10025c0776eaf8', id: 421614, network: 'ARB_SEPOLIA', symbol: 'ETH', diff --git a/src/chain/chain.api.ts b/src/chain/chain.api.ts index a2e10db..4569260 100644 --- a/src/chain/chain.api.ts +++ b/src/chain/chain.api.ts @@ -26,4 +26,25 @@ export const ethGetBlockByNumber = async (rpc: string, blockNumber: string) => { export const ethGetLogs = async (rpc: string, params: any) => { return requestChain(rpc, "eth_getLogs", [params]) +} + +export const batchEthBlocks = async (rpc: string, blockNumber: number, amount: number) => { + let batch = [] + for (let i = 0; i < amount; i++) { + batch.push({ + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: ["0x" + (blockNumber + i).toString(16), true], + id: blockNumber + i + }) + } + + return fetch(rpc, { + method: "POST", + headers: { + "Content-Type": "application/json; charset=utf-8" + }, + body: JSON.stringify(batch) + }) + .then((res) => res.json()) } \ No newline at end of file diff --git a/src/config/scriptions_cfg.ts b/src/config/scriptions_cfg.ts index e43c354..69a9c23 100644 --- a/src/config/scriptions_cfg.ts +++ b/src/config/scriptions_cfg.ts @@ -2,21 +2,24 @@ import { CheckIn } from "models/CheckIn" export interface IScriptionCfg { chain: number, + rpc?: string, fromBlock: number, filter: (event: any) => boolean, - eventProcesser: (event: any) => Promise, + process: (event: any) => Promise, } export const SCRIPTIONS_CFG: IScriptionCfg[] = [ { chain: 421614, + // rpc: 'https://arbitrum-sepolia.infura.io/v3/25559ac58e714177b31ff48d507e7ac9', + rpc: 'https://arb-sepolia.g.alchemy.com/v2/EKR1je8ZGia332kkemNc4mtXQuFskIq3', fromBlock: 5063559, filter: (event: any) => { return ( event.input === '0x646174613a2c7b2270223a2263662d3230222c226f70223a22636869636b227d' && event.to.toLowerCase() === '0x50a8e60041a206acaa5f844a1104896224be6f39') }, - eventProcesser: async (event: any) => { - return CheckIn.saveEvent(event) + process: async (tx: any) => { + return CheckIn.saveEvent(tx) } } ] \ No newline at end of file diff --git a/src/controllers/task.controllers.ts b/src/controllers/task.controllers.ts index fbc5474..3262e5c 100644 --- a/src/controllers/task.controllers.ts +++ b/src/controllers/task.controllers.ts @@ -9,26 +9,32 @@ class TaskController extends BaseController { @role('anon') @router('post /task/check_in') async checkDailyCheckIn(req, res) { - let { address, days } = req.params - if (!address || !days) { - throw new ZError(10, 'address is required') + let { address, days, limit } = req.params + if (!address || (!days && !limit)) { + throw new ZError(10, 'params mismatch') } let query: any = { from: address } - if (typeof days === 'number') { - let begin = getNDayAgo(days, true) - query.blockTime = {$gt: begin.getTime() / 1000 | 0} - } else if (typeof days === 'string') { - if (days === '1month') { - let date = getMonthBegin(new Date()) - query.blockTime = {$gt: date.getTime() / 1000 | 0} - } else { - query.dateTag = days + if (!limit) { + if (typeof days === 'number') { + let begin = getNDayAgo(days, true) + query.blockTime = {$gt: begin.getTime() / 1000 | 0} + } else if (typeof days === 'string') { + if (days === '1month') { + let date = getMonthBegin(new Date()) + query.blockTime = {$gt: date.getTime() / 1000 | 0} + } else { + query.dateTag = days + } + } else if (Array.isArray(days)) { + query.dateTag = {$in: days} } - - } else if (Array.isArray(days)) { - query.dateTag = {$in: days} } - let records = await CheckIn.find({ from: address, dateTag: {$in: days}}) + let records + if (limit) { + records = await CheckIn.find(query).limit(limit) + } else { + records = await CheckIn.find(query) + } let result = [] for (let record of records) { result.push(record.toJson()) diff --git a/src/models/BlockData.ts b/src/models/BlockData.ts new file mode 100644 index 0000000..a3e120d --- /dev/null +++ b/src/models/BlockData.ts @@ -0,0 +1,27 @@ +import { Severity, getModelForClass, index, modelOptions, mongoose, prop } from '@typegoose/typegoose' +import { dbconn } from 'decorators/dbconn' +import { BaseModule } from './Base' + +@dbconn() +@index({ hash: 1 }, { unique: true }) +@modelOptions({ + schemaOptions: { collection: 'block_data', timestamps: true }, + options: { allowMixed: Severity.ALLOW }, +}) +export class BlockDataClass extends BaseModule { + @prop({ required: true }) + public hash!: string + @prop() + public chainId: number + + @prop({ type: mongoose.Schema.Types.Mixed}) + public data: any + + public static async saveBlock(event: any) { + return BlockData.insertOrUpdate({ hash: event.hash }, {data: event}) + } +} + +export const BlockData = getModelForClass(BlockDataClass, { + existingConnection: BlockDataClass['db'], +}) diff --git a/src/scriptions.ts b/src/scriptions.ts index 28c4401..1cb4178 100644 --- a/src/scriptions.ts +++ b/src/scriptions.ts @@ -36,7 +36,7 @@ async function initEventSvrs() { async function parseAllEvents() { if (lock) { - logger.info('sync in process, cancel.') + logger.warn('sync in process, cancel.') return } lock = true @@ -59,7 +59,7 @@ async function parseAllEvents() { await initEventSvrs() setInterval(function () { parseAllEvents() - }, 500) + }, 10000) parseAllEvents() })(); diff --git a/src/service/block.sync.service.ts b/src/service/block.sync.service.ts index 64e0392..a1b0a9a 100644 --- a/src/service/block.sync.service.ts +++ b/src/service/block.sync.service.ts @@ -1,67 +1,73 @@ import { IChain } from "chain/allchain"; -import { ethGetBlockByNumber } from "chain/chain.api"; +import { ethBlockNumber } from "chain/chain.api"; import { IScriptionCfg } from "config/scriptions_cfg"; -import logger from "logger/logger"; +import { BlockData } from "models/BlockData"; import { RedisClient } from "redis/RedisClient"; +import { getPastBlocksIter } from "utils/block.util"; import { formatDate } from "utils/date.util"; -import { retry } from "utils/promise.util"; - export class BlockSyncSvr { chainCfg: IChain scriptionCfgs: IScriptionCfg[] = [] fromBlock: number = Number.MAX_SAFE_INTEGER redisKey = '' + rpc = ''; constructor(_chainCfg: IChain, _scriptionCfgs: IScriptionCfg[]) { this.chainCfg =_chainCfg this.scriptionCfgs = _scriptionCfgs + this.rpc = _chainCfg.rpc.split('|')[0] for (let cfg of _scriptionCfgs) { this.fromBlock = Math.min(this.fromBlock, cfg.fromBlock) + if (cfg.rpc) { + this.rpc = cfg.rpc + } } this.redisKey = `blocknum_${this.chainCfg.id}` } async execute() { try { + let currentBlock = await ethBlockNumber(this.rpc) let blockStr = await new RedisClient().get(this.redisKey) if (blockStr) { this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock) } - const block = await this.fetchBlock(this.chainCfg.rpc.split('|'), this.fromBlock); - if (!block.result.hash) { - logger.log(`${this.fromBlock} block not found`) - return - } - for (let i = 0; i < block.result.transactions.length; i++) { - const tx = block.result.transactions[i]; - if (block.result.timestamp) { - tx.blockTime = parseInt(block.result.timestamp, 16); - tx.dateTag = formatDate(new Date(tx.blockTime * 1000)); - } - for (let j = 0; j < this.scriptionCfgs.length; j++) { - const cfg = this.scriptionCfgs[j]; - if (cfg.filter(tx)) { - await cfg.eventProcesser(tx) - } - } - } - await new RedisClient().set(this.redisKey, this.fromBlock + 1 + '') + const amount = parseInt(currentBlock.result, 16) - this.fromBlock + let blocks = getPastBlocksIter({ + chainId: this.chainCfg.id, + rpc: this.rpc, + fromBlock: this.fromBlock, + amount + }) + await this.processBlockDatas(blocks) } catch (err) { console.log(err) } } - async fetchBlock(rpcs: string[], blockNumber: number) { - const blockNumberHex = '0x' + blockNumber.toString(16) - for (let rpc of rpcs) { - try { - let res = await retry(() => ethGetBlockByNumber(rpc, blockNumberHex), { maxRetries: 3, whitelistErrors: [] }) - return res; - } catch (err) { - console.log(err) - throw err + async processBlockDatas(iterator: any) { + for (const getPastBlockPromise of iterator) { + const blocks = await getPastBlockPromise + for (const block of blocks) { + await BlockData.saveBlock(block) + if (!block.transactions || block.transactions.length === 0) { + continue + } + for (let i = 0; i < block.transactions.length; i++) { + const tx = block.transactions[i]; + if (block.timestamp) { + tx.blockTime = parseInt(block.timestamp, 16); + tx.dateTag = formatDate(new Date(tx.blockTime * 1000)); + } + for (let j = 0; j < this.scriptionCfgs.length; j++) { + const cfg = this.scriptionCfgs[j]; + if (cfg.filter(tx)) { + await cfg.process(tx) + } + } + } + } } - } } \ No newline at end of file diff --git a/src/service/event.sync.service.ts b/src/service/event.sync.service.ts index 8dc0cd2..6d7f052 100644 --- a/src/service/event.sync.service.ts +++ b/src/service/event.sync.service.ts @@ -2,7 +2,6 @@ import assert from 'assert' import { AllChains } from 'chain/allchain' import { HttpRetryProvider } from 'chain/HttpRetryProvider' import logger from 'logger/logger' -import { NftTransferEvent } from 'models/NftTransferEvent' import { RedisClient } from 'redis/RedisClient' import { clearTimeCache, getPastEventsIter, processEvents } from 'utils/contract.util' diff --git a/src/utils/block.util.ts b/src/utils/block.util.ts new file mode 100644 index 0000000..ff06ef7 --- /dev/null +++ b/src/utils/block.util.ts @@ -0,0 +1,60 @@ +import { batchEthBlocks } from "chain/chain.api"; +import logger from "logger/logger"; +import { RedisClient } from "redis/RedisClient"; + +const MAX_BATCH_AMOUNT = 500 +const REQUEST_INTERVAL = 0.5 * 1000 + +export async function divQueryPassBlocks({chainId, rpc, fromBlock, amount} + : {chainId: number, rpc: string, fromBlock: number, amount: number}) { + const middleBlock = fromBlock + Math.floor(amount / 2) + const firstBlocks = await getPastBlocks({chainId, rpc, fromBlock, amount: middleBlock - fromBlock}) + const secondBlocks = await getPastBlocks({chainId, rpc, fromBlock: middleBlock, amount: amount - (middleBlock - fromBlock)}) + return [...firstBlocks, ...secondBlocks] +} + +export async function getPastBlocks({chainId, rpc, fromBlock, amount} + : {chainId: number, rpc: string, fromBlock: number, amount: number}) { + let blocks = [] + logger.log(`getPastBlocks: ${chainId} from: ${fromBlock} amount: ${amount}`) + let blockNumber = fromBlock + const redisKey = `blocknum_${chainId}` + try { + let res = await batchEthBlocks(rpc, blockNumber, amount) + if (res.error) { + throw new Error(res.error.message) + } + for (let i = 0; i < res.length; i++) { + const block = res[i].result; + if (block) { + blocks.push(block) + } + } + await new RedisClient().set(redisKey, blockNumber + amount + '') + await new Promise(resolve => setTimeout(resolve, REQUEST_INTERVAL)) + } catch (e) { + logger.log(e.message || e) + if (e.message && /Too Many Requests/.test(e.message) && amount > 1) { + blocks = await divQueryPassBlocks({chainId, rpc, fromBlock, amount}) + } else if (e.message && /Public RPC Rate Limit Hit, limit will reset in \d+ seconds/.test(e.message)) { + const match = e.message.match(/Public RPC Rate Limit Hit, limit will reset in (\d+) seconds/) + const seconds = parseInt(match[1]) + await new Promise(resolve => setTimeout(resolve, seconds * 1000)) + blocks = await getPastBlocks({chainId, rpc, fromBlock, amount}) + }else { + throw e + } + } + return blocks +} + +export function* getPastBlocksIter({chainId, rpc, fromBlock, amount} + : {chainId: number, rpc: string, fromBlock: number, amount: number}) { + logger.debug(`*getPastBlocksIter: ${chainId} from: ${fromBlock} amount: ${amount}`) + let remain = amount + while (remain > 0) { + yield getPastBlocks({chainId, rpc, fromBlock, amount: Math.min(MAX_BATCH_AMOUNT, remain)}) + fromBlock += MAX_BATCH_AMOUNT + remain -= MAX_BATCH_AMOUNT + } +} diff --git a/src/utils/contract.util.ts b/src/utils/contract.util.ts index 0c08cc5..caa3751 100644 --- a/src/utils/contract.util.ts +++ b/src/utils/contract.util.ts @@ -144,7 +144,7 @@ export async function getPastEvents({ toBlock, }) if (events.length >= RESULT_LIMIT_COUNT) { - events = splitQueryEvents({ + events = await splitQueryEvents({ contract, event, fromBlock, @@ -155,7 +155,7 @@ export async function getPastEvents({ } } catch (e) { if (e.message && /query returned more than \d+ results/.test(e.message)) { - events = divQueryPassEvents({ + events = await divQueryPassEvents({ contract, event, fromBlock,