diff --git a/package.json b/package.json index d8b580d..2b1fbb6 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,9 @@ "lint": "eslint --ext .ts src/**", "format": "eslint --ext .ts src/** --fix", "dev:monitor": "NODE_ENV=development ts-node -r tsconfig-paths/register src/monitor.ts", - "prod:monitor": "NODE_PATH=./dist node dist/monitor.js" + "prod:monitor": "NODE_PATH=./dist node dist/monitor.js", + "dev:scription": "ts-node -r tsconfig-paths/register src/scriptions.ts", + "prod:scription": "TZ='SG' NODE_PATH=./dist node dist/scriptions.js" }, "author": "z", "license": "ISC", diff --git a/src/chain/allchain.ts b/src/chain/allchain.ts index 4aa0136..e6d4cc3 100644 --- a/src/chain/allchain.ts +++ b/src/chain/allchain.ts @@ -1,4 +1,13 @@ -export const AllChains = [ +export interface IChain { + name: string + type: string + rpc: string + id: number + symbol: string + network?: string + explorerurl: string +} +export const AllChains: IChain[] = [ { name: 'Ethereum Mainnet RPC', type: 'Mainnet', @@ -210,7 +219,7 @@ export const AllChains = [ { name: 'Arbitrum One', type: 'Mainnet', - rpc: 'https://arb1.arbitrum.io/rpc', + rpc: 'https://arbitrum-mainnet.infura.io/v3/b6bf7d3508c941499b10025c0776eaf8', id: 42161, network: 'ARBITRUM', symbol: 'ETH', @@ -265,6 +274,15 @@ export const AllChains = [ symbol: 'AGOR', explorerurl: 'https://goerli-rollup-explorer.arbitrum.io', }, + { + name: 'Arbitrum Sepolia', + type: 'Testnet', + rpc: 'https://arbitrum-sepolia.infura.io/v3/b6bf7d3508c941499b10025c0776eaf8', + id: 421614, + network: 'ARB_SEPOLIA', + symbol: 'ETH', + explorerurl: 'https://sepolia.arbiscan.io', + }, { name: 'Harmony Mainnet RPC', type: 'Mainnet', diff --git a/src/chain/chain.api.ts b/src/chain/chain.api.ts new file mode 100644 index 0000000..a2e10db --- /dev/null +++ b/src/chain/chain.api.ts @@ -0,0 +1,29 @@ + +const requestChain = async (rpc: string, method: string, params: any) => { + const data = { + id: Date.now(), + jsonrpc: "2.0", + method, + params + } + return fetch(rpc, { + method: "POST", + headers: { + "Content-Type": "application/json; charset=utf-8" + }, + body: JSON.stringify(data) + }) + .then((res) => res.json()) +} + +export const ethBlockNumber = async (rpc: string) => { + return requestChain(rpc, "eth_blockNumber", []) +} + +export const ethGetBlockByNumber = async (rpc: string, blockNumber: string) => { + return requestChain(rpc, "eth_getBlockByNumber", [blockNumber, true]) +} + +export const ethGetLogs = async (rpc: string, params: any) => { + return requestChain(rpc, "eth_getLogs", [params]) +} \ No newline at end of file diff --git a/src/config/scriptions_cfg.ts b/src/config/scriptions_cfg.ts new file mode 100644 index 0000000..e43c354 --- /dev/null +++ b/src/config/scriptions_cfg.ts @@ -0,0 +1,22 @@ +import { CheckIn } from "models/CheckIn" + +export interface IScriptionCfg { + chain: number, + fromBlock: number, + filter: (event: any) => boolean, + eventProcesser: (event: any) => Promise, +} + +export const SCRIPTIONS_CFG: IScriptionCfg[] = [ + { + chain: 421614, + fromBlock: 5063559, + filter: (event: any) => { + return ( event.input === '0x646174613a2c7b2270223a2263662d3230222c226f70223a22636869636b227d' + && event.to.toLowerCase() === '0x50a8e60041a206acaa5f844a1104896224be6f39') + }, + eventProcesser: async (event: any) => { + return CheckIn.saveEvent(event) + } + } +] \ No newline at end of file diff --git a/src/models/CheckIn.ts b/src/models/CheckIn.ts new file mode 100644 index 0000000..25b843c --- /dev/null +++ b/src/models/CheckIn.ts @@ -0,0 +1,38 @@ +import { getModelForClass, index, modelOptions, prop } from '@typegoose/typegoose' +import { dbconn } from 'decorators/dbconn' +import { BaseModule } from './Base' + +@dbconn() +@index({ address: 1 }, { unique: false }) +@index({ transactionHash: 1, from: 1, to: 1 }, { unique: true }) +@modelOptions({ + schemaOptions: { collection: 'check_in_event', timestamps: true }, +}) +export class CheckInClass extends BaseModule { + @prop({ required: true }) + public from!: string + @prop() + public to: string + @prop({ required: true }) + public hash: string + @prop() + public blockNumber: string + @prop() + public blockHash: string + @prop() + public blockTime: number + @prop() + public dateTag: string + @prop() + public value: string + @prop() + public input: string + + public static async saveEvent(event: any) { + return CheckIn.insertOrUpdate({ hash: event.hash }, event) + } +} + +export const CheckIn = getModelForClass(CheckInClass, { + existingConnection: CheckInClass['db'], +}) diff --git a/src/scriptions.ts b/src/scriptions.ts new file mode 100644 index 0000000..28c4401 --- /dev/null +++ b/src/scriptions.ts @@ -0,0 +1,65 @@ +import * as dotenv from 'dotenv' +import logger from 'logger/logger' +import { RedisClient } from 'redis/RedisClient' +const envFile = process.env.NODE_ENV && process.env.NODE_ENV === 'production' ? `.env.production` : '.env.development' +dotenv.config({ path: envFile }) +import { EventSyncSvr } from 'service/event.sync.service' +import {IScriptionCfg, SCRIPTIONS_CFG} from 'config/scriptions_cfg' + + +import 'common/Extend' +import { AllChains, IChain } from 'chain/allchain' +import { BlockSyncSvr } from 'service/block.sync.service' + +let svrs: any[] = [] +let lock = false + + +async function initEventSvrs() { + const cfgMap: Map = new Map(); + for (let cfg of SCRIPTIONS_CFG) { + const chainCfg = AllChains.find((chain) => chain.id === cfg.chain) + if (!chainCfg) { + logger.error('chainCfg not found: ' + cfg.chain) + process.exit(1) + } + if (!cfgMap.has(chainCfg)) { + cfgMap.set(chainCfg, []) + } + cfgMap.get(chainCfg)?.push(cfg) + } + for (let chainCfg of cfgMap.keys()) { + const svr = new BlockSyncSvr(chainCfg, cfgMap.get(chainCfg)!) + svrs.push(svr) + } +} + +async function parseAllEvents() { + if (lock) { + logger.info('sync in process, cancel.') + return + } + lock = true + logger.info('begin sync block: ' + svrs.length) + for (let svr of svrs) { + try { + await svr.execute() + } catch (err) { + logger.info('sync block with error:: chain: ' + svr.chainCfg.id ) + logger.info(err) + } + } + lock = false +} + +;(async () => { + let opts = { url: process.env.REDIS } + new RedisClient(opts) + logger.info('REDIS Connected') + await initEventSvrs() + setInterval(function () { + parseAllEvents() + }, 500) + parseAllEvents() +})(); + diff --git a/src/service/block.sync.service.ts b/src/service/block.sync.service.ts new file mode 100644 index 0000000..64e0392 --- /dev/null +++ b/src/service/block.sync.service.ts @@ -0,0 +1,67 @@ +import { IChain } from "chain/allchain"; +import { ethGetBlockByNumber } from "chain/chain.api"; +import { IScriptionCfg } from "config/scriptions_cfg"; +import logger from "logger/logger"; +import { RedisClient } from "redis/RedisClient"; +import { formatDate } from "utils/date.util"; +import { retry } from "utils/promise.util"; + + +export class BlockSyncSvr { + chainCfg: IChain + scriptionCfgs: IScriptionCfg[] = [] + fromBlock: number = Number.MAX_SAFE_INTEGER + redisKey = '' + constructor(_chainCfg: IChain, _scriptionCfgs: IScriptionCfg[]) { + this.chainCfg =_chainCfg + this.scriptionCfgs = _scriptionCfgs + for (let cfg of _scriptionCfgs) { + this.fromBlock = Math.min(this.fromBlock, cfg.fromBlock) + } + this.redisKey = `blocknum_${this.chainCfg.id}` + } + + async execute() { + try { + let blockStr = await new RedisClient().get(this.redisKey) + if (blockStr) { + this.fromBlock = Math.max(parseInt(blockStr), this.fromBlock) + } + const block = await this.fetchBlock(this.chainCfg.rpc.split('|'), this.fromBlock); + if (!block.result.hash) { + logger.log(`${this.fromBlock} block not found`) + return + } + for (let i = 0; i < block.result.transactions.length; i++) { + const tx = block.result.transactions[i]; + if (block.result.timestamp) { + tx.blockTime = parseInt(block.result.timestamp, 16); + tx.dateTag = formatDate(new Date(tx.blockTime * 1000)); + } + for (let j = 0; j < this.scriptionCfgs.length; j++) { + const cfg = this.scriptionCfgs[j]; + if (cfg.filter(tx)) { + await cfg.eventProcesser(tx) + } + } + } + await new RedisClient().set(this.redisKey, this.fromBlock + 1 + '') + } catch (err) { + console.log(err) + } + } + + async fetchBlock(rpcs: string[], blockNumber: number) { + const blockNumberHex = '0x' + blockNumber.toString(16) + for (let rpc of rpcs) { + try { + let res = await retry(() => ethGetBlockByNumber(rpc, blockNumberHex), { maxRetries: 3, whitelistErrors: [] }) + return res; + } catch (err) { + console.log(err) + throw err + } + } + + } +} \ No newline at end of file diff --git a/src/service/chain.service.ts b/src/service/chain.service.ts index a9ce27b..88d5f97 100644 --- a/src/service/chain.service.ts +++ b/src/service/chain.service.ts @@ -14,4 +14,4 @@ export async function restartAllUnFinishedTask() { chainQueue.addTaskToQueue(subTask) } } -} +} \ No newline at end of file diff --git a/src/utils/date.util.ts b/src/utils/date.util.ts new file mode 100644 index 0000000..76b98ee --- /dev/null +++ b/src/utils/date.util.ts @@ -0,0 +1,14 @@ +// format the date to the format we want +export const formatDate = (date: Date): string => { + const year = date.getFullYear(); + const month = (date.getMonth() + 1 + '').padStart(2, '0'); + const day = (date.getDate() + '').padStart(2, '0'); + return `${year}${month}${day}`; +}; + +// get formated datestring of yesterday +export const yesterday = () => { + const date = new Date(); + date.setDate(date.getDate() - 1); + return date; +}; \ No newline at end of file