import { getModelForClass, index, modelOptions, mongoose, pre, prop, Ref, Severity } from '@typegoose/typegoose' import { BlockChain } from 'chain/BlockChain' import { MAX_BATCH_REQ_COUNT, ZERO_BYTES32 } from 'common/Constants' import { dbconn } from 'decorators/dbconn' import { generateRandomBytes32 } from 'utils/wallet.util' import { BaseModule } from './Base' import { ReqTaskStatus, RequestTask, RequestTaskClass } from './RequestTask' export enum TaskStatus { NOTSTART = 0, PEDING = 1, SUCCESS = 2, TX_ALL_CONFIRM = 3, TX_PART_ERROR = 8, TX_ERROR = 9, } @dbconn() @index({ taskId: 1 }, { unique: false }) @modelOptions({ schemaOptions: { collection: 'chain_task', timestamps: true }, options: { allowMixed: Severity.ALLOW }, }) export class ChainTaskClass extends BaseModule { @prop({ required: true }) public taskId!: string @prop() public name: string @prop() public desc: string @prop() public starter: string @prop({ type: mongoose.Schema.Types.Mixed }) public taskData: any @prop({ enum: TaskStatus, default: TaskStatus.NOTSTART }) public status: TaskStatus @prop() public startTime: number @prop() public endTime: number @prop({ required: true, default: true }) public newRecord: boolean @prop({ required: true, default: false }) public allEnd: boolean @prop({ required: true, default: 0 }) public successCount: number @prop({ required: true, default: 0 }) public errorCount: number @prop({ type: String, required: true, default: [] }) public tasks!: Array /** * 检查是否所有的任务都已完成(成功或重试次数达到上限) * 调用时机 * 1. 每个 request task 成功 * 2. 每个 request task 重试达到预设次数 * @param chainTaskId */ public static async checkStatus(chainTaskId: string) { let record = await ChainTask.findById(chainTaskId) let sCount = 0 let errCount = 0 for (let subId of record.tasks) { let subData = await RequestTask.findById(subId) if (subData.status === ReqTaskStatus.SUCCESS) { sCount += 1 } else if (subData.status === ReqTaskStatus.ERROR) { errCount += 1 } } record.successCount = sCount record.errorCount = errCount if (sCount === record.tasks.length) { record.status = TaskStatus.TX_ALL_CONFIRM record.allEnd = true } else { record.allEnd = false if (record.status === TaskStatus.NOTSTART && sCount > 0) { record.status = TaskStatus.PEDING } else if (errCount === record.tasks.length) { record.status = TaskStatus.TX_ERROR record.allEnd = true } else if (errCount + sCount === record.tasks.length) { record.status = TaskStatus.TX_PART_ERROR record.allEnd = true } } await record.save() if (record.allEnd) { setImmediate(async function () { // TODO:: 通知前端 }) } } public static async allUnFinishedTask() { return ChainTask.find({ allEnd: false }) } /** * 解析企业微信审批信息 */ public static async parseWxApprovalInfo({ taskId, name, desc, data, starter }) { let maxTryCount = parseInt(process.env.CHAIN_MAX_TRY) let chainTask = await ChainTask.insertOrUpdate({ taskId }, { name, desc, starter, data }) let subTasks: any = [] if (chainTask.newRecord) { let subTask let index = 0 for (let sub of data) { if (!subTask || subTask.reqDatas.length >= MAX_BATCH_REQ_COUNT) { index += 1 subTask = new RequestTask({ taskId, index, chainTaskId: chainTask.id, reqDatas: [], maxTryCount, predecessor: ZERO_BYTES32, salt: generateRandomBytes32(), }) subTasks.push(subTask) } subTask.reqDatas.push(sub) subTask.targets.push(sub.address) let abi = await new BlockChain().generateFunAbi(sub) subTask.datas.push(abi) subTask.values.push('0') await subTask.save() chainTask.tasks.pushOnce(subTask.id) } } chainTask.newRecord = false await chainTask.save() return subTasks } } export const ChainTask = getModelForClass(ChainTaskClass, { existingConnection: ChainTaskClass['db'] })