From ab782820fe6021d63595e5fc5c84e0870c605e85 Mon Sep 17 00:00:00 2001 From: zhl Date: Thu, 6 Apr 2023 18:06:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=9F=E8=83=BD:=20=E4=B8=BA=E4=BC=81?= =?UTF-8?q?=E4=B8=9A=E5=BE=AE=E4=BF=A1=E6=B7=BB=E5=8A=A0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=98=9F=E5=88=97=E5=92=8C=E5=AD=90=E4=BB=BB=E5=8A=A1=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在`api.server.ts`中注册`fastify-xml-body-parser`插件。 - 在`task.queue.ts`中添加`ChainQueue`类和`addTaskToQueue`方法。 - 改进`wechatwork.service.ts`和`task.service.ts`中任务解析和处理。 - 在`ChainTask.ts`中添加新属性和方法到`ChainTaskClass`中。 --- src/api.server.ts | 1 + src/models/ChainTask.ts | 30 ++++++++++++++++ src/queue/task.queue.ts | 29 ++++++++++++++++ src/service/task.service.ts | 11 +++++- src/service/wechatwork.service.ts | 57 +++++++++++++++++++++++++++++-- 5 files changed, 125 insertions(+), 3 deletions(-) create mode 100644 src/queue/task.queue.ts diff --git a/src/api.server.ts b/src/api.server.ts index 6c4e030..077a392 100644 --- a/src/api.server.ts +++ b/src/api.server.ts @@ -30,6 +30,7 @@ export class ApiServer { } private registerPlugins() { this.server.register(require('fastify-formbody')) + this.server.register(require('fastify-xml-body-parser')) this.server.register(zReqParserPlugin) this.server.register(helmet, { hidePoweredBy: false }) this.server.register(zTokenParserPlugin) diff --git a/src/models/ChainTask.ts b/src/models/ChainTask.ts index 10b16d3..298b333 100644 --- a/src/models/ChainTask.ts +++ b/src/models/ChainTask.ts @@ -20,6 +20,10 @@ export enum TaskStatus { export class ChainTaskClass extends BaseModule { @prop({ required: true }) public taskId!: string + @prop() + public name: string + @prop() + public desc: string @prop({ type: mongoose.Schema.Types.Mixed }) public taskData: any @@ -95,6 +99,32 @@ export class ChainTaskClass extends BaseModule { public static async allUnFinishedTask() { return ChainTask.find({ allEnd: false }) } + + /** + * 解析企业微信审批信息 + */ + public static async parseWxApprovalInfo({ taskId, name, desc, data }) { + let maxTryCount = parseInt(process.env.CHAIN_MAX_TRY) + let chainTask = await ChainTask.insertOrUpdate({ taskId }, { name, desc, data }) + let subTasks: any = [] + if (chainTask.newRecord) { + for (let sub of data) { + let subType = sub.type + let subTask = new RequestTask({ + taskId, + chainTaskId: chainTask.id, + taskType: subType, + reqData: sub, + maxTryCount, + }) + await subTask.save() + chainTask.tasks.pushOnce(subTask.id) + subTasks.push(subTask) + } + } + await chainTask.save() + return subTasks + } } export const ChainTask = getModelForClass(ChainTaskClass, { existingConnection: ChainTaskClass['db'] }) diff --git a/src/queue/task.queue.ts b/src/queue/task.queue.ts new file mode 100644 index 0000000..e1840ae --- /dev/null +++ b/src/queue/task.queue.ts @@ -0,0 +1,29 @@ +import { AsyncQueue, createAsyncQueue } from 'common/AsyncQueue' +import { singleton } from 'decorators/singleton' +import { DocumentType } from '@typegoose/typegoose' +import { ReqTaskStatus, RequestTaskClass } from 'models/RequestTask' +import { BlockChain } from 'chain/BlockChain' +import { ChainTask } from 'models/ChainTask' +import logger from 'logger/logger' +import { TaskSvr } from 'service/task.service' + +@singleton +export class ChainQueue { + private queue: AsyncQueue + private blockChain: BlockChain + + constructor() { + this.queue = createAsyncQueue() + this.blockChain = new BlockChain() + } + + public async addTaskToQueue(spNo: string) { + this.queue.push(async () => { + try { + await new TaskSvr().parseOneTask(spNo) + } catch (err) { + logger.error('error add task: ' + err) + } + }) + } +} diff --git a/src/service/task.service.ts b/src/service/task.service.ts index 3ee0273..5fbc2a8 100644 --- a/src/service/task.service.ts +++ b/src/service/task.service.ts @@ -1,6 +1,15 @@ import { singleton } from 'decorators/singleton' +import { ChainTask } from 'models/ChainTask' +import { ChainQueue } from 'queue/chain.queue' +import { WechatWorkService } from './wechatwork.service' @singleton export class TaskSvr { - public async parseOneTask(data: any) {} + public async parseOneTask(spNo: string) { + let data = await new WechatWorkService().parseOneTask(spNo) + let subTasks = await ChainTask.parseWxApprovalInfo(data) + for (let subTask of subTasks) { + new ChainQueue().addTaskToQueue(subTask) + } + } } diff --git a/src/service/wechatwork.service.ts b/src/service/wechatwork.service.ts index 0cd7efe..cce3dfd 100644 --- a/src/service/wechatwork.service.ts +++ b/src/service/wechatwork.service.ts @@ -1,6 +1,20 @@ import axios, { AxiosRequestConfig } from 'axios' import { singleton } from 'decorators/singleton' import fs from 'fs' +import os from 'os' +import path from 'path' +import { excelToJson } from 'utils/excel.util' + +// 1-审批中;2-已通过;3-已驳回;4-已撤销;6-通过后撤销;7-已删除;10-已支付 +export enum TaskStatus { + PEDING = 1, + PASS = 2, + REJECT = 3, + CANCEL = 4, + PASS_CANCEL = 6, + DELETE = 7, + PAY = 10, +} const WX_API_HOST = 'https://qyapi.weixin.qq.com' @singleton @@ -72,6 +86,42 @@ export class WechatWorkService { let response = await axios.request(config).then(response => { return response.data }) + return response + } + /** + * 调用企业微信审核详情接口, 并处理返回数据 + * @param spNo 审批单号 + */ + public async parseOneTask(spNo: string) { + let detail: any = await this.fetchApprovalDetail(spNo) + if (detail.errcode) { + throw new Error('approval detail error, code: ' + detail.errcode + ' errmsg: ' + detail.errmsg) + } + const { info } = detail + if (info.status !== TaskStatus.PASS) { + throw new Error('approval status error, status: ' + info.status) + } + const { apply_data } = info + const { contents } = apply_data + let name = '' + let desc = '' + let fileId = '' + for (let content of contents) { + let { control, value, title } = content + if (control === 'Text' && title.text == '名字') { + name = value.text + } else if (control === 'Text' && title.text == '描述') { + desc = value.text + } else if (control === 'File' && value.files.length > 0) { + fileId = value.files[0].file_id + } + } + if (!fileId) { + throw new Error('no file') + } + let filePath = await this.fetchFile(fileId) + let data = excelToJson(filePath) + return { taskId: spNo, name, desc, data } } /** @@ -91,8 +141,11 @@ export class WechatWorkService { media_id: mediaId, }, } - let response = await axios.request(config).then(response => { - response.data.pipe(fs.createWriteStream('ada_lovelace.jpg')) + let filename = `${mediaId}.xlsx` + const filePath = path.join(os.tmpdir(), filename) + await axios.request(config).then(response => { + response.data.pipe(fs.createWriteStream(filePath)) }) + return filePath } }