import { AsyncQueue, createAsyncQueue } from 'common/AsyncQueue' import { singleton } from 'decorators/singleton' import { DocumentType } from '@typegoose/typegoose' import { ReqTaskStatus, RequestTaskClass } from 'models/RequestTask' import { BlockChain } from 'chain/BlockChain' import { ChainTask } from 'models/ChainTask' import logger from 'logger/logger' @singleton export class ChainQueue { private queue: AsyncQueue private blockChain: BlockChain constructor() { this.queue = createAsyncQueue() this.blockChain = new BlockChain() } public async addTaskToQueue(subTask: DocumentType) { if (subTask.maxTryCount && subTask.tryCount > subTask.maxTryCount) { subTask.status = ReqTaskStatus.ERROR await subTask.save() await ChainTask.checkStatus(subTask.chainTaskId) return } if (subTask.status === ReqTaskStatus.NOTSTART) { subTask.blockAdd = this.blockChain.currentBlockNum await subTask.save() } if (subTask.status === ReqTaskStatus.WAIT_CONFIRM) { this.blockChain.confirmQueue.addTaskToQueue(subTask) return } this.queue.push(async () => { try { subTask.tryCount += 1 if (subTask.status === ReqTaskStatus.NOTSTART) { subTask.status = ReqTaskStatus.PEDING } if (!subTask.startTime) { subTask.startTime = Date.now() } await subTask.save() try { await subTask.requestChain() } catch (reqerr) { logger.info(reqerr) subTask.errMsg.push(JSON.stringify(reqerr)) await subTask.save() // TODO:: 要排除数据已经提交到链上, 返回过程中的网络错误 this.addTaskToQueue(subTask) return } this.blockChain.confirmQueue.addTaskToQueue(subTask) } catch (err) { subTask.errMsg.push(err) await subTask.save() } }) } }