import { getModelForClass, index, modelOptions, mongoose, pre, prop, Ref, Severity } from '@typegoose/typegoose' import { BlockChain } from 'chain/BlockChain' import { MAX_BATCH_REQ_COUNT, ZERO_BYTES32 } from 'common/Constants' import { dbconn } from 'decorators/dbconn' import { MailQueue } from 'queue/mail.queue' import { TaskSvr } from 'service/task.service' import { generateRandomBytes32 } from 'utils/wallet.util' import { BaseModule } from './Base' import { ReqTaskStatus, RequestTask, RequestTaskClass, TaskType } from './RequestTask' export enum TaskStatus { NOTSTART = 0, PEDING = 1, SUCCESS = 2, TX_SCHEDULE_CONFIRM = 3, TX_EXEC_CONFIRM = 4, TX_PART_ERROR = 8, TX_EXEC_ERROR = 9, TX_SCHEDULE_ERROR = 10, } @dbconn() @index({ taskId: 1 }, { unique: false }) @modelOptions({ schemaOptions: { collection: 'chain_task', timestamps: true }, options: { allowMixed: Severity.ALLOW }, }) export class ChainTaskClass extends BaseModule { @prop({ required: true }) public taskId!: string @prop() public chain: number @prop() public name: string @prop() public desc: string // 谁发起的 @prop() public starter: string // 发起人的名称 @prop() public starterName: string @prop({ type: mongoose.Schema.Types.Mixed }) public taskData: any @prop({ enum: TaskStatus, default: TaskStatus.NOTSTART }) public status: TaskStatus @prop() public startTime: number @prop() public endTime: number @prop({ required: true, default: true }) public newRecord: boolean @prop({ required: true, default: false }) public allEnd: boolean @prop({ required: true, default: 0 }) public successCount: number @prop({ required: true, default: 0 }) public errorCount: number @prop({ type: String, required: true, default: [] }) public tasks!: Array @prop({ type: mongoose.Schema.Types.Mixed }) public notify: any /** * 检查是否所有的任务都已完成(成功或重试次数达到上限) * 调用时机 * 1. 每个 request task 成功 * 2. 每个 request task 重试达到预设次数 * @param chainTaskId */ public static async checkStatus(chainTaskId: string) { let record = await ChainTask.findById(chainTaskId) let sCount = 0 let errCount = 0 for (let subId of record.tasks) { let subData = await RequestTask.findById(subId) if (subData.status === ReqTaskStatus.SUCCESS) { sCount += 1 } else if (subData.status === ReqTaskStatus.ERROR) { errCount += 1 } } record.successCount = sCount record.errorCount = errCount if (sCount === record.tasks.length) { record.status = TaskStatus.TX_EXEC_CONFIRM record.allEnd = true } else { record.allEnd = false if (record.status === TaskStatus.NOTSTART && sCount > 0) { record.status = TaskStatus.PEDING } else if (errCount === record.tasks.length) { record.status = TaskStatus.TX_EXEC_ERROR record.allEnd = true } else if (errCount + sCount === record.tasks.length) { record.status = TaskStatus.TX_PART_ERROR record.allEnd = true } } if (record.allEnd) { record.endTime = Date.now() } await record.save() if (record.allEnd) { setImmediate(async function () { let desc = `总数: ${record.tasks.length}, 成功: ${sCount}, 失败: ${errCount}` let result = new TaskSvr().sendResultNotify(record, desc) record.notify = result await record.save() }) } } public static async checkScheduleStatus(chainTaskId: string) { let record = await ChainTask.findById(chainTaskId) let sCount = 0 for (let subId of record.tasks) { let subData = await RequestTask.findById(subId) if (subData.status === ReqTaskStatus.WAIT_EXEC) { sCount += 1 } } if (sCount === record.tasks.length) { // send mail to confirmer new MailQueue(record.chain).addTaskToQueue(record) } } public static async allUnFinishedTask() { return ChainTask.find({ allEnd: false }) } /** * 解析企业微信审批信息 */ public static async parseWxApprovalInfo({ taskId, name, desc, data, starter, starterName, chain }) { let maxTryCount = parseInt(process.env.CHAIN_MAX_TRY) let chainTask = await ChainTask.insertOrUpdate({ taskId }, { name, desc, starter, starterName, data, chain }) let subTasks: any = [] if (chainTask.newRecord) { let subTask let index = 0 let count = 0 for (let sub of data) { if (sub.type == TaskType.MINT_NFT && sub.amount && parseInt(sub.amount) > 1) { count += parseInt(sub.amount) } else { count += 1 } if (!subTask || count >= MAX_BATCH_REQ_COUNT) { index += 1 count = sub.type === TaskType.MINT_NFT && sub.amount && parseInt(sub.amount) > 1 ? parseInt(sub.amount) : 1 subTask = new RequestTask({ taskId, index, chain, chainTaskId: chainTask.id, reqDatas: [], maxTryCount, predecessor: ZERO_BYTES32, salt: generateRandomBytes32(), }) subTasks.push(subTask) } subTask.reqDatas.push(sub) subTask.targets.push(sub.address) let abi = await new BlockChain().generateFunAbi(sub) subTask.datas.push(abi) subTask.values.push('0') await subTask.save() chainTask.tasks.pushOnce(subTask.id) } } chainTask.newRecord = false await chainTask.save() return subTasks } } export const ChainTask = getModelForClass(ChainTaskClass, { existingConnection: ChainTaskClass['db'] })