优化块同步代码

This commit is contained in:
CounterFire2023 2024-01-11 11:50:22 +08:00
parent 441809ff38
commit 7596a8ac25
6 changed files with 94 additions and 19 deletions

View File

@ -105,7 +105,7 @@ export class ApiServer {
logger.log('REDIS Connected')
}
private initSchedules() {
new BlocknumSchedule().scheduleAll()
// new BlocknumSchedule().scheduleAll()
new PriceSvr().scheduleAll()
}
private restoreChainQueue() {}

View File

@ -1,4 +1,5 @@
import fetch from "node-fetch"
import { retry } from 'utils/promise.util'
const requestChain = async (rpc: string, method: string, params: any) => {
const data = {
@ -21,6 +22,14 @@ export const ethBlockNumber = async (rpc: string) => {
return requestChain(rpc, "eth_blockNumber", [])
}
export const retryEthBlockNumber = async (rpc: string) => {
const res = await retry(() => ethBlockNumber(rpc), { maxRetries: 3, whitelistErrors: [] })
if (res.error) {
throw new Error(res.error.message)
}
return res
}
export const ethGetBlockByNumber = async (rpc: string, blockNumber: string) => {
return requestChain(rpc, "eth_getBlockByNumber", [blockNumber, true])
}
@ -29,7 +38,7 @@ export const ethGetLogs = async (rpc: string, params: any) => {
return requestChain(rpc, "eth_getLogs", [params])
}
export const batchEthBlocks = async (rpc: string, blockNumber: number, amount: number) => {
export const _batchEthBlocks = async (rpc: string, blockNumber: number, amount: number) => {
let batch = []
for (let i = 0; i < amount; i++) {
batch.push({
@ -50,6 +59,27 @@ export const batchEthBlocks = async (rpc: string, blockNumber: number, amount: n
.then((res) => res.json())
}
export const batchEthBlocks = async (rpc: string, blockNumbers: number[]) => {
let batch = []
for (let blockNum of blockNumbers) {
batch.push({
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: ["0x" + blockNum.toString(16), true],
id: blockNum
})
}
return fetch(rpc, {
method: "POST",
headers: {
"Content-Type": "application/json; charset=utf-8"
},
body: JSON.stringify(batch)
})
.then((res) => res.json())
}
export const batchEthLogs = async (rpc: string, params: any) => {
// let batch = []
// for (let i = 0; i < params.length; i++) {

View File

@ -2,9 +2,11 @@ import { getModelForClass, index, modelOptions, prop } from '@typegoose/typegoos
import { dbconn } from 'decorators/dbconn'
import { BaseModule } from './Base'
import { formatDate, yesterday } from 'utils/date.util'
import { logger } from '@typegoose/typegoose/lib/logSettings'
@dbconn()
@index({ from: 1 }, { unique: false })
@index({ hash: 1 }, { unique: true })
@index({ from: 1, dateTag: 1}, { unique: true })
@index({ from: 1, blockTime: 1}, { unique: false })
@modelOptions({
@ -39,7 +41,11 @@ export class CheckInClass extends BaseModule {
if (preDayEvent) {
event.count = preDayEvent.count + 1
}
return CheckIn.insertOrUpdate({ hash: event.hash }, event)
try {
await CheckIn.insertOrUpdate({ hash: event.hash }, event)
} catch (err) {
logger.info('save check record error: ', event.dataTag, event.from, err.message || err)
}
}
public toJson() {

View File

@ -58,7 +58,7 @@ async function parseAllEvents() {
await initEventSvrs()
setInterval(function () {
parseAllEvents()
}, 10000)
}, 20000)
parseAllEvents()
})();

View File

@ -1,5 +1,5 @@
import { IChain } from "chain/allchain";
import { ethBlockNumber } from "chain/chain.api";
import { ethBlockNumber, retryEthBlockNumber } from "chain/chain.api";
import { IScriptionCfg } from "config/scriptions_cfg";
import { RedisClient } from "redis/RedisClient";
import { getPastBlocksIter } from "utils/block.util";
@ -9,6 +9,7 @@ export class BlockSyncSvr {
chainCfg: IChain
scriptionCfgs: IScriptionCfg[] = []
fromBlock: number = Number.MAX_SAFE_INTEGER
batchCount: number = 0
redisKey = ''
rpc = '';
constructor(_chainCfg: IChain, _scriptionCfgs: IScriptionCfg[]) {
@ -26,30 +27,40 @@ export class BlockSyncSvr {
async execute() {
try {
let currentBlock = await ethBlockNumber(this.rpc)
if (!this.batchCount) {
let currentBlock = await retryEthBlockNumber(this.rpc)
if (currentBlock.error) {
process.exit(1)
}
this.batchCount = parseInt(currentBlock.result, 16) - this.fromBlock
}
let blockStr = await new RedisClient().get(this.redisKey)
if (blockStr) {
this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock)
}
//@ts-ignore
const amount = parseInt(currentBlock.result, 16) - this.fromBlock
let blocks = getPastBlocksIter({
chainId: this.chainCfg.id,
rpc: this.rpc,
fromBlock: this.fromBlock,
amount
amount: this.batchCount
})
await this.processBlockDatas(blocks)
const amount = await this.processBlockDatas(blocks)
if (amount > 0) {
this.batchCount = amount + 1
}
} catch (err) {
console.log(err)
}
}
async processBlockDatas(iterator: any) {
let count = 0
for (const getPastBlockPromise of iterator) {
const blocks = await getPastBlockPromise
for (const block of blocks) {
// await BlockData.saveBlock(block)
count++
if (!block.transactions || block.transactions.length === 0) {
continue
}
@ -66,8 +77,8 @@ export class BlockSyncSvr {
}
}
}
}
}
return count
}
}

View File

@ -1,6 +1,8 @@
import { batchEthBlocks } from "chain/chain.api";
import logger from "logger/logger";
import { RedisClient } from "redis/RedisClient";
import { retry } from "./promise.util";
import { parse } from "path";
const MAX_BATCH_AMOUNT = 500
const REQUEST_INTERVAL = 0.5 * 1000
@ -19,18 +21,44 @@ export async function getPastBlocks({chainId, rpc, fromBlock, amount}
logger.log(`getPastBlocks: ${chainId} from: ${fromBlock} amount: ${amount}`)
let blockNumber = fromBlock
const redisKey = `blocknum_${chainId}`
try {
let res = await batchEthBlocks(rpc, blockNumber, amount)
if (res.error) {
throw new Error(res.error.message)
let retryCount = 0;
const parseBlocksAndRetry = async (blockNums: number[]) => {
let records = await batchEthBlocks(rpc, blockNums)
if (records.error) {
throw new Error(records.error.message)
}
for (let i = 0; i < res.length; i++) {
const block = res[i].result;
if (block) {
let realAmount = 0
let retryNums: number[] = []
let maxBlockNumber = 0
for (let i = 0; i < records.length; i++) {
const block = records[i].result;
if (block?.hash) {
blocks.push(block)
realAmount++
maxBlockNumber = Math.max(maxBlockNumber, blockNumber + i)
} else {
if (block) {
logger.warn(`block ${blockNumber + i}: ${block}`)
} else {
logger.warn(`block ${blockNumber + i} is null`)
retryNums.push(blockNumber + i)
}
}
}
await new RedisClient().set(redisKey, blockNumber + amount + '')
if (retryNums.length > 0 && ++retryCount < 3) {
logger.info(`${retryCount} retry ${retryNums.length} blocks`)
const retryBlocks = await parseBlocksAndRetry(retryNums)
realAmount += retryBlocks
}
return realAmount
}
try {
const numsArr = Array.from({length: amount}, (v, k) => k + blockNumber)
const realAmount = await parseBlocksAndRetry(numsArr)
if (retryCount > 0) {
blocks.sort((a, b) => parseInt(a.number) - parseInt(b.number))
}
await new RedisClient().set(redisKey, blockNumber + realAmount + '')
await new Promise(resolve => setTimeout(resolve, REQUEST_INTERVAL))
} catch (e) {
logger.log(e.message || e)