diff --git a/src/api.server.ts b/src/api.server.ts index 59964e0..6a177e1 100644 --- a/src/api.server.ts +++ b/src/api.server.ts @@ -105,7 +105,7 @@ export class ApiServer { logger.log('REDIS Connected') } private initSchedules() { - new BlocknumSchedule().scheduleAll() + // new BlocknumSchedule().scheduleAll() new PriceSvr().scheduleAll() } private restoreChainQueue() {} diff --git a/src/chain/chain.api.ts b/src/chain/chain.api.ts index 6c237c5..9327b1a 100644 --- a/src/chain/chain.api.ts +++ b/src/chain/chain.api.ts @@ -1,4 +1,5 @@ import fetch from "node-fetch" +import { retry } from 'utils/promise.util' const requestChain = async (rpc: string, method: string, params: any) => { const data = { @@ -21,6 +22,14 @@ export const ethBlockNumber = async (rpc: string) => { return requestChain(rpc, "eth_blockNumber", []) } +export const retryEthBlockNumber = async (rpc: string) => { + const res = await retry(() => ethBlockNumber(rpc), { maxRetries: 3, whitelistErrors: [] }) + if (res.error) { + throw new Error(res.error.message) + } + return res +} + export const ethGetBlockByNumber = async (rpc: string, blockNumber: string) => { return requestChain(rpc, "eth_getBlockByNumber", [blockNumber, true]) } @@ -29,7 +38,7 @@ export const ethGetLogs = async (rpc: string, params: any) => { return requestChain(rpc, "eth_getLogs", [params]) } -export const batchEthBlocks = async (rpc: string, blockNumber: number, amount: number) => { +export const _batchEthBlocks = async (rpc: string, blockNumber: number, amount: number) => { let batch = [] for (let i = 0; i < amount; i++) { batch.push({ @@ -50,6 +59,27 @@ export const batchEthBlocks = async (rpc: string, blockNumber: number, amount: n .then((res) => res.json()) } +export const batchEthBlocks = async (rpc: string, blockNumbers: number[]) => { + let batch = [] + for (let blockNum of blockNumbers) { + batch.push({ + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: ["0x" + blockNum.toString(16), true], + id: blockNum + }) + } + + return fetch(rpc, { + method: "POST", + headers: { + "Content-Type": "application/json; charset=utf-8" + }, + body: JSON.stringify(batch) + }) + .then((res) => res.json()) +} + export const batchEthLogs = async (rpc: string, params: any) => { // let batch = [] // for (let i = 0; i < params.length; i++) { diff --git a/src/models/CheckIn.ts b/src/models/CheckIn.ts index cf42e2c..756a4cf 100644 --- a/src/models/CheckIn.ts +++ b/src/models/CheckIn.ts @@ -2,9 +2,11 @@ import { getModelForClass, index, modelOptions, prop } from '@typegoose/typegoos import { dbconn } from 'decorators/dbconn' import { BaseModule } from './Base' import { formatDate, yesterday } from 'utils/date.util' +import { logger } from '@typegoose/typegoose/lib/logSettings' @dbconn() @index({ from: 1 }, { unique: false }) +@index({ hash: 1 }, { unique: true }) @index({ from: 1, dateTag: 1}, { unique: true }) @index({ from: 1, blockTime: 1}, { unique: false }) @modelOptions({ @@ -39,7 +41,11 @@ export class CheckInClass extends BaseModule { if (preDayEvent) { event.count = preDayEvent.count + 1 } - return CheckIn.insertOrUpdate({ hash: event.hash }, event) + try { + await CheckIn.insertOrUpdate({ hash: event.hash }, event) + } catch (err) { + logger.info('save check record error: ', event.dataTag, event.from, err.message || err) + } } public toJson() { diff --git a/src/scriptions.ts b/src/scriptions.ts index f5784fa..7b29ded 100644 --- a/src/scriptions.ts +++ b/src/scriptions.ts @@ -58,7 +58,7 @@ async function parseAllEvents() { await initEventSvrs() setInterval(function () { parseAllEvents() - }, 10000) + }, 20000) parseAllEvents() })(); diff --git a/src/service/block.sync.service.ts b/src/service/block.sync.service.ts index e7f609f..94e1adc 100644 --- a/src/service/block.sync.service.ts +++ b/src/service/block.sync.service.ts @@ -1,5 +1,5 @@ import { IChain } from "chain/allchain"; -import { ethBlockNumber } from "chain/chain.api"; +import { ethBlockNumber, retryEthBlockNumber } from "chain/chain.api"; import { IScriptionCfg } from "config/scriptions_cfg"; import { RedisClient } from "redis/RedisClient"; import { getPastBlocksIter } from "utils/block.util"; @@ -9,6 +9,7 @@ export class BlockSyncSvr { chainCfg: IChain scriptionCfgs: IScriptionCfg[] = [] fromBlock: number = Number.MAX_SAFE_INTEGER + batchCount: number = 0 redisKey = '' rpc = ''; constructor(_chainCfg: IChain, _scriptionCfgs: IScriptionCfg[]) { @@ -26,30 +27,40 @@ export class BlockSyncSvr { async execute() { try { - let currentBlock = await ethBlockNumber(this.rpc) + if (!this.batchCount) { + let currentBlock = await retryEthBlockNumber(this.rpc) + if (currentBlock.error) { + process.exit(1) + } + this.batchCount = parseInt(currentBlock.result, 16) - this.fromBlock + } + let blockStr = await new RedisClient().get(this.redisKey) if (blockStr) { this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock) } - //@ts-ignore - const amount = parseInt(currentBlock.result, 16) - this.fromBlock let blocks = getPastBlocksIter({ chainId: this.chainCfg.id, rpc: this.rpc, fromBlock: this.fromBlock, - amount + amount: this.batchCount }) - await this.processBlockDatas(blocks) + const amount = await this.processBlockDatas(blocks) + if (amount > 0) { + this.batchCount = amount + 1 + } } catch (err) { console.log(err) } } async processBlockDatas(iterator: any) { + let count = 0 for (const getPastBlockPromise of iterator) { const blocks = await getPastBlockPromise for (const block of blocks) { // await BlockData.saveBlock(block) + count++ if (!block.transactions || block.transactions.length === 0) { continue } @@ -66,8 +77,8 @@ export class BlockSyncSvr { } } } - } } + return count } } \ No newline at end of file diff --git a/src/utils/block.util.ts b/src/utils/block.util.ts index ff06ef7..9ced7e4 100644 --- a/src/utils/block.util.ts +++ b/src/utils/block.util.ts @@ -1,6 +1,8 @@ import { batchEthBlocks } from "chain/chain.api"; import logger from "logger/logger"; import { RedisClient } from "redis/RedisClient"; +import { retry } from "./promise.util"; +import { parse } from "path"; const MAX_BATCH_AMOUNT = 500 const REQUEST_INTERVAL = 0.5 * 1000 @@ -19,18 +21,44 @@ export async function getPastBlocks({chainId, rpc, fromBlock, amount} logger.log(`getPastBlocks: ${chainId} from: ${fromBlock} amount: ${amount}`) let blockNumber = fromBlock const redisKey = `blocknum_${chainId}` - try { - let res = await batchEthBlocks(rpc, blockNumber, amount) - if (res.error) { - throw new Error(res.error.message) + let retryCount = 0; + const parseBlocksAndRetry = async (blockNums: number[]) => { + let records = await batchEthBlocks(rpc, blockNums) + if (records.error) { + throw new Error(records.error.message) } - for (let i = 0; i < res.length; i++) { - const block = res[i].result; - if (block) { + let realAmount = 0 + let retryNums: number[] = [] + let maxBlockNumber = 0 + for (let i = 0; i < records.length; i++) { + const block = records[i].result; + if (block?.hash) { blocks.push(block) + realAmount++ + maxBlockNumber = Math.max(maxBlockNumber, blockNumber + i) + } else { + if (block) { + logger.warn(`block ${blockNumber + i}: ${block}`) + } else { + logger.warn(`block ${blockNumber + i} is null`) + retryNums.push(blockNumber + i) + } } } - await new RedisClient().set(redisKey, blockNumber + amount + '') + if (retryNums.length > 0 && ++retryCount < 3) { + logger.info(`${retryCount} retry ${retryNums.length} blocks`) + const retryBlocks = await parseBlocksAndRetry(retryNums) + realAmount += retryBlocks + } + return realAmount + } + try { + const numsArr = Array.from({length: amount}, (v, k) => k + blockNumber) + const realAmount = await parseBlocksAndRetry(numsArr) + if (retryCount > 0) { + blocks.sort((a, b) => parseInt(a.number) - parseInt(b.number)) + } + await new RedisClient().set(redisKey, blockNumber + realAmount + '') await new Promise(resolve => setTimeout(resolve, REQUEST_INTERVAL)) } catch (e) { logger.log(e.message || e)