功能: 为企业微信添加任务队列和子任务解析。

- 在`api.server.ts`中注册`fastify-xml-body-parser`插件。
- 在`task.queue.ts`中添加`ChainQueue`类和`addTaskToQueue`方法。
- 改进`wechatwork.service.ts`和`task.service.ts`中任务解析和处理。
- 在`ChainTask.ts`中添加新属性和方法到`ChainTaskClass`中。
This commit is contained in:
zhl 2023-04-06 18:06:05 +08:00
parent a1ce07f8fb
commit ab782820fe
5 changed files with 125 additions and 3 deletions

View File

@ -30,6 +30,7 @@ export class ApiServer {
}
private registerPlugins() {
this.server.register(require('fastify-formbody'))
this.server.register(require('fastify-xml-body-parser'))
this.server.register(zReqParserPlugin)
this.server.register(helmet, { hidePoweredBy: false })
this.server.register(zTokenParserPlugin)

View File

@ -20,6 +20,10 @@ export enum TaskStatus {
export class ChainTaskClass extends BaseModule {
@prop({ required: true })
public taskId!: string
@prop()
public name: string
@prop()
public desc: string
@prop({ type: mongoose.Schema.Types.Mixed })
public taskData: any
@ -95,6 +99,32 @@ export class ChainTaskClass extends BaseModule {
public static async allUnFinishedTask() {
return ChainTask.find({ allEnd: false })
}
/**
*
*/
public static async parseWxApprovalInfo({ taskId, name, desc, data }) {
let maxTryCount = parseInt(process.env.CHAIN_MAX_TRY)
let chainTask = await ChainTask.insertOrUpdate({ taskId }, { name, desc, data })
let subTasks: any = []
if (chainTask.newRecord) {
for (let sub of data) {
let subType = sub.type
let subTask = new RequestTask({
taskId,
chainTaskId: chainTask.id,
taskType: subType,
reqData: sub,
maxTryCount,
})
await subTask.save()
chainTask.tasks.pushOnce(subTask.id)
subTasks.push(subTask)
}
}
await chainTask.save()
return subTasks
}
}
export const ChainTask = getModelForClass(ChainTaskClass, { existingConnection: ChainTaskClass['db'] })

29
src/queue/task.queue.ts Normal file
View File

@ -0,0 +1,29 @@
import { AsyncQueue, createAsyncQueue } from 'common/AsyncQueue'
import { singleton } from 'decorators/singleton'
import { DocumentType } from '@typegoose/typegoose'
import { ReqTaskStatus, RequestTaskClass } from 'models/RequestTask'
import { BlockChain } from 'chain/BlockChain'
import { ChainTask } from 'models/ChainTask'
import logger from 'logger/logger'
import { TaskSvr } from 'service/task.service'
@singleton
export class ChainQueue {
private queue: AsyncQueue
private blockChain: BlockChain
constructor() {
this.queue = createAsyncQueue()
this.blockChain = new BlockChain()
}
public async addTaskToQueue(spNo: string) {
this.queue.push(async () => {
try {
await new TaskSvr().parseOneTask(spNo)
} catch (err) {
logger.error('error add task: ' + err)
}
})
}
}

View File

@ -1,6 +1,15 @@
import { singleton } from 'decorators/singleton'
import { ChainTask } from 'models/ChainTask'
import { ChainQueue } from 'queue/chain.queue'
import { WechatWorkService } from './wechatwork.service'
@singleton
export class TaskSvr {
public async parseOneTask(data: any) {}
public async parseOneTask(spNo: string) {
let data = await new WechatWorkService().parseOneTask(spNo)
let subTasks = await ChainTask.parseWxApprovalInfo(data)
for (let subTask of subTasks) {
new ChainQueue().addTaskToQueue(subTask)
}
}
}

View File

@ -1,6 +1,20 @@
import axios, { AxiosRequestConfig } from 'axios'
import { singleton } from 'decorators/singleton'
import fs from 'fs'
import os from 'os'
import path from 'path'
import { excelToJson } from 'utils/excel.util'
// 1-审批中2-已通过3-已驳回4-已撤销6-通过后撤销7-已删除10-已支付
export enum TaskStatus {
PEDING = 1,
PASS = 2,
REJECT = 3,
CANCEL = 4,
PASS_CANCEL = 6,
DELETE = 7,
PAY = 10,
}
const WX_API_HOST = 'https://qyapi.weixin.qq.com'
@singleton
@ -72,6 +86,42 @@ export class WechatWorkService {
let response = await axios.request(config).then(response => {
return response.data
})
return response
}
/**
* ,
* @param spNo
*/
public async parseOneTask(spNo: string) {
let detail: any = await this.fetchApprovalDetail(spNo)
if (detail.errcode) {
throw new Error('approval detail error, code: ' + detail.errcode + ' errmsg: ' + detail.errmsg)
}
const { info } = detail
if (info.status !== TaskStatus.PASS) {
throw new Error('approval status error, status: ' + info.status)
}
const { apply_data } = info
const { contents } = apply_data
let name = ''
let desc = ''
let fileId = ''
for (let content of contents) {
let { control, value, title } = content
if (control === 'Text' && title.text == '名字') {
name = value.text
} else if (control === 'Text' && title.text == '描述') {
desc = value.text
} else if (control === 'File' && value.files.length > 0) {
fileId = value.files[0].file_id
}
}
if (!fileId) {
throw new Error('no file')
}
let filePath = await this.fetchFile(fileId)
let data = excelToJson(filePath)
return { taskId: spNo, name, desc, data }
}
/**
@ -91,8 +141,11 @@ export class WechatWorkService {
media_id: mediaId,
},
}
let response = await axios.request(config).then(response => {
response.data.pipe(fs.createWriteStream('ada_lovelace.jpg'))
let filename = `${mediaId}.xlsx`
const filePath = path.join(os.tmpdir(), filename)
await axios.request(config).then(response => {
response.data.pipe(fs.createWriteStream(filePath))
})
return filePath
}
}