From 9cded5efc9b3834294490a2cd24893959efdfbd5 Mon Sep 17 00:00:00 2001 From: zhl Date: Fri, 14 Apr 2023 14:53:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E4=B8=BB=E5=8A=A8=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E7=9A=84=E6=96=B9=E6=B3=95=E6=9B=BF=E6=8D=A2=E4=BC=81?= =?UTF-8?q?=E4=B8=9A=E5=BE=AE=E4=BF=A1=E7=9A=84=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/api.server.ts | 9 ++++-- src/controllers/workflow.controller.ts | 20 ++++++------ src/queue/confirm.queue.ts | 2 +- src/schedule/wxtask.schedule.ts | 31 ++++++++++++++++++ src/service/mail.service.ts | 3 ++ src/service/wechatwork.service.ts | 45 ++++++++++++++++++++++++++ 6 files changed, 96 insertions(+), 14 deletions(-) create mode 100644 src/schedule/wxtask.schedule.ts diff --git a/src/api.server.ts b/src/api.server.ts index d80370d..78e7f96 100644 --- a/src/api.server.ts +++ b/src/api.server.ts @@ -9,6 +9,8 @@ import { mongoose } from '@typegoose/typegoose' import logger from 'logger/logger' import BlocknumSchedule from 'schedule/blocknum.schedule' import path from 'path' +import { RedisClient } from 'redis/RedisClient' +import WxTaskSchedule from 'schedule/wxtask.schedule' const zReqParserPlugin = require('plugins/zReqParser') @@ -109,11 +111,12 @@ export class ApiServer { logger.log(`DB Connection Error: ${err.message}`) } let opts = { url: process.env.REDIS } - // new RedisClient(opts) - // logger.log('REDIS Connected') + new RedisClient(opts) + logger.log('REDIS Connected') } private initSchedules() { new BlocknumSchedule().scheduleAll() + new WxTaskSchedule().scheduleAll() } private restoreChainQueue() {} private setErrHandler() { @@ -165,7 +168,7 @@ export class ApiServer { self.registerRouter() self.setErrHandler() self.setFormatSend() - // self.initSchedules() + self.initSchedules() // restartAllUnFinishedTask() this.server.listen({ port: parseInt(process.env.API_PORT) }, (err: any, address: any) => { if (err) { diff --git a/src/controllers/workflow.controller.ts b/src/controllers/workflow.controller.ts index 2c2594f..a127e3b 100644 --- a/src/controllers/workflow.controller.ts +++ b/src/controllers/workflow.controller.ts @@ -36,16 +36,16 @@ class WorkFlowController extends BaseController { if (msg_signature !== signature) { throw new ZError(10, 'sign check failed') } - const { message, id } = decrypt(aesKey, xml.Encrypt) - let parser = new XMLParser() - let jsonData = parser.parse(message) - let spStatus = jsonData.xml?.ApprovalInfo?.SpStatus - if (spStatus === TaskStatus.PASS) { - let spNo = jsonData.xml?.ApprovalInfo?.SpNo - if (spNo) { - new TaskQueue().addTaskToQueue(spNo) - } - } + // const { message, id } = decrypt(aesKey, xml.Encrypt) + // let parser = new XMLParser() + // let jsonData = parser.parse(message) + // let spStatus = jsonData.xml?.ApprovalInfo?.SpStatus + // if (spStatus === TaskStatus.PASS) { + // let spNo = jsonData.xml?.ApprovalInfo?.SpNo + // if (spNo) { + // new TaskQueue().addTaskToQueue(spNo) + // } + // } res.send('success') } diff --git a/src/queue/confirm.queue.ts b/src/queue/confirm.queue.ts index d8d872e..9fd3ca2 100644 --- a/src/queue/confirm.queue.ts +++ b/src/queue/confirm.queue.ts @@ -23,7 +23,7 @@ export class ConfirmQueue { this.queue.push(async () => { try { let receipt = await waitTransaction(this.web3, task.txHash) - logger.info(`receipt confirmed: ${task.txHash}`) + logger.info(`receipt confirmed: status: ${receipt.statue}, txhash: ${task.txHash}`) if (isSuccessfulTransaction(receipt)) { if (task.status === ReqTaskStatus.WAIT_CONFIRM) { task.status = ReqTaskStatus.WAIT_EXEC diff --git a/src/schedule/wxtask.schedule.ts b/src/schedule/wxtask.schedule.ts new file mode 100644 index 0000000..f306d65 --- /dev/null +++ b/src/schedule/wxtask.schedule.ts @@ -0,0 +1,31 @@ +import { singleton } from 'decorators/singleton' +import logger from 'logger/logger' +import { ChainTask } from 'models/ChainTask' +import * as schedule from 'node-schedule' +import { TaskQueue } from 'queue/task.queue' +import { WechatWorkService } from 'service/wechatwork.service' + +@singleton +export default class WxTaskSchedule { + async parseAllRecord() { + let detail: any = await new WechatWorkService().queryTasks() + if (detail.errcode) { + logger.info('approval list error, code: ' + detail.errcode + ' errmsg: ' + detail.errmsg) + return + } + const { sp_no_list } = detail + for (let spNo of sp_no_list) { + let record = await ChainTask.findOne({ taskId: spNo }) + if (record) { + continue + } + logger.info('got one task: ' + spNo) + new TaskQueue().addTaskToQueue(spNo) + } + } + scheduleAll() { + const job = schedule.scheduleJob(' */1 * * * *', async () => { + await this.parseAllRecord() + }) + } +} diff --git a/src/service/mail.service.ts b/src/service/mail.service.ts index c18bb18..639bb94 100644 --- a/src/service/mail.service.ts +++ b/src/service/mail.service.ts @@ -1,4 +1,5 @@ import { singleton } from 'decorators/singleton' +import logger from 'logger/logger' import { createTransport, Transporter } from 'nodemailer' import Mail from 'nodemailer/lib/mailer' @@ -21,6 +22,8 @@ export class MailService { } public async send(message: Mail.Options) { + logger.info('begin send mail: ') + logger.info(JSON.stringify(message)) await this.transporter.verify() return this.transporter.sendMail(message) } diff --git a/src/service/wechatwork.service.ts b/src/service/wechatwork.service.ts index 9c56b38..1e05f82 100644 --- a/src/service/wechatwork.service.ts +++ b/src/service/wechatwork.service.ts @@ -3,6 +3,7 @@ import { singleton } from 'decorators/singleton' import fs from 'fs' import os from 'os' import path from 'path' +import { RedisClient } from 'redis/RedisClient' import { excelToJson } from 'utils/excel.util' // 1-审批中;2-已通过;3-已驳回;4-已撤销;6-通过后撤销;7-已删除;10-已支付 @@ -25,6 +26,7 @@ export class WechatWorkService { private wxAesKey: string private wxCorpId: string private wxCorpSecret: string + private timePre: number constructor() { this.wxToken = process.env.WX_TOKEN @@ -148,4 +150,47 @@ export class WechatWorkService { fs.writeFileSync(filePath, res.data) return filePath } + // 查询审批列表 + public async queryTasks() { + const url = `${WX_API_HOST}/cgi-bin/oa/getapprovalinfo` + const access_token = await this.getAccessToken() + let starttime = this.timePre + if (!this.timePre) { + let timeStr = await new RedisClient().get('qywx_time_cache') + if (timeStr) { + starttime = parseInt(timeStr) + } + } + starttime = starttime || 1681401600 + let endtime = (Date.now() / 1000) | 0 + let config: AxiosRequestConfig = { + method: 'post', + url, + params: { + access_token, + }, + data: { + starttime, + endtime, + cursor: 0, + size: 100, + filters: [ + { + key: 'template_id', + value: process.env.WX_TEMPLATE_ID, + }, + { + key: 'sp_status', + value: '2', + }, + ], + }, + } + let response = await axios.request(config).then(response => { + return response.data + }) + this.timePre = starttime + await new RedisClient().set('qywx_time_cache', starttime + '') + return response + } }