增加客户端confirm和schedule执行

This commit is contained in:
zhl 2023-04-14 13:28:36 +08:00
parent bf03e5476c
commit 25e173915b
21 changed files with 26010 additions and 34505 deletions

View File

@ -44,6 +44,7 @@
"fastify-xml-body-parser": "^2.2.0", "fastify-xml-body-parser": "^2.2.0",
"mongoose": "5.10.3", "mongoose": "5.10.3",
"mongoose-findorcreate": "^3.0.0", "mongoose-findorcreate": "^3.0.0",
"nodemailer": "^6.9.1",
"node-schedule": "^2.0.0", "node-schedule": "^2.0.0",
"node-xlsx": "^0.21.0", "node-xlsx": "^0.21.0",
"redis": "^3.1.2", "redis": "^3.1.2",
@ -53,6 +54,7 @@
"devDependencies": { "devDependencies": {
"@types/dotenv": "^8.2.0", "@types/dotenv": "^8.2.0",
"@types/node": "^14.14.20", "@types/node": "^14.14.20",
"@types/nodemailer": "^6.4.7",
"@types/node-schedule": "^2.1.0", "@types/node-schedule": "^2.1.0",
"@types/redis": "^2.8.28", "@types/redis": "^2.8.28",
"@typescript-eslint/eslint-plugin": "^4.25.0", "@typescript-eslint/eslint-plugin": "^4.25.0",

File diff suppressed because one or more lines are too long

View File

@ -43,9 +43,9 @@ async function confirmTask() {
let ids = wallet.scheduleList let ids = wallet.scheduleList
showLoading() showLoading()
try { try {
let gas = await wallet.contract.methods.confirmTransactionBatch(ids).estimateGas() let gas = await wallet.contract.methods.confirmTransaction(ids).estimateGas()
gas = gas | 0 gas = gas | 0
await wallet.contract.methods.confirmTransactionBatch(ids).send({ gas }) await wallet.contract.methods.confirmTransaction(ids).send({ gas })
} catch (err) { } catch (err) {
console.log('error confirm task', err) console.log('error confirm task', err)
} }
@ -57,9 +57,9 @@ async function rejectTask() {
let ids = wallet.scheduleList let ids = wallet.scheduleList
showLoading() showLoading()
try { try {
let gas = await wallet.contract.methods.revokeConfirmationBatch(ids).estimateGas() let gas = await wallet.contract.methods.revokeConfirmation(ids).estimateGas()
gas = gas | 0 gas = gas | 0
await wallet.contract.methods.revokeConfirmationBatch(ids).send({ gas }) await wallet.contract.methods.revokeConfirmation(ids).send({ gas })
} catch (err) { } catch (err) {
console.log('error confirm task', err) console.log('error confirm task', err)
} }

File diff suppressed because one or more lines are too long

View File

@ -38,16 +38,12 @@ export class WalletReactor {
*/ */
async beginSchedule(operation: IOperationData, seconds: number) { async beginSchedule(operation: IOperationData, seconds: number) {
// let operation: any = this.genOperation(contractAddress, 0, data, ZERO_BYTES32, salt) // let operation: any = this.genOperation(contractAddress, 0, data, ZERO_BYTES32, salt)
let gas = await this.contract.methods
.schedule(operation.targets, operation.values, operation.datas, operation.predecessor, operation.salt, seconds)
.estimateGas({ from: this.account.address })
let res = await this.contract.methods let res = await this.contract.methods
.scheduleBatch( .schedule(operation.targets, operation.values, operation.datas, operation.predecessor, operation.salt, seconds)
operation.targets, .send({ gas: gas | 0 })
operation.values,
operation.datas,
operation.predecessor,
operation.salt,
seconds,
)
.send({ gas: 1000000 })
operation.transactionHash = res.transactionHash operation.transactionHash = res.transactionHash
return operation return operation
} }
@ -58,7 +54,8 @@ export class WalletReactor {
* @returns * @returns
*/ */
async cancelSchedule(id) { async cancelSchedule(id) {
let res = await this.contract.methods.cancel(id).send({ gas: 1000000 }) let gas = await this.contract.methods.cancel(id).estimateGas({ from: this.account.address })
let res = await this.contract.methods.cancel(id).send({ gas: gas | 0 })
return res return res
} }
/** /**
@ -89,11 +86,11 @@ export class WalletReactor {
*/ */
async executeSchedule(operation: IOperationData) { async executeSchedule(operation: IOperationData) {
let gas = await this.contract.methods let gas = await this.contract.methods
.executeBatch(operation.targets, operation.values, operation.datas, operation.predecessor, operation.salt) .execute(operation.targets, operation.values, operation.datas, operation.predecessor, operation.salt)
.estimateGas({ from: this.account.address }) .estimateGas({ from: this.account.address })
gas = gas | 0 gas = gas | 0
let res = await this.contract.methods let res = await this.contract.methods
.executeBatch(operation.targets, operation.values, operation.datas, operation.predecessor, operation.salt) .execute(operation.targets, operation.values, operation.datas, operation.predecessor, operation.salt)
.send({ gas }) .send({ gas })
return res return res
} }

View File

@ -3,3 +3,11 @@ export const ZERO_ADDRESS = '0x0000000000000000000000000000000000000000'
export const ZERO_BYTES32 = '0x0000000000000000000000000000000000000000000000000000000000000000' export const ZERO_BYTES32 = '0x0000000000000000000000000000000000000000000000000000000000000000'
export const MAX_BATCH_REQ_COUNT = 50 export const MAX_BATCH_REQ_COUNT = 50
export const CONFIRM_MAIL_HTML = `
<h1>西<h1>
<p>{{title}}</p>
<p>{{desc}}</p>
<p>, , , , 使MetaMask的浏览器打开</p>
<p><a href="{{link}}" target="_blank">{{link2}}</a></p>
`

View File

@ -8,6 +8,8 @@ import { TaskStatus } from 'service/wechatwork.service'
import { RequestTask } from 'models/RequestTask' import { RequestTask } from 'models/RequestTask'
import { BlockChain } from 'chain/BlockChain' import { BlockChain } from 'chain/BlockChain'
import { ChainTask } from 'models/ChainTask' import { ChainTask } from 'models/ChainTask'
import { isObjectId } from 'utils/string.util'
import { ChainQueue } from 'queue/chain.queue'
class WorkFlowController extends BaseController { class WorkFlowController extends BaseController {
@role(ROLE_ANON) @role(ROLE_ANON)
@ -54,6 +56,9 @@ class WorkFlowController extends BaseController {
if (!id) { if (!id) {
return res.view('/templates/confirm_err_page.ejs', { msg: '参数错误' }) return res.view('/templates/confirm_err_page.ejs', { msg: '参数错误' })
} }
if (!isObjectId(id)) {
return res.view('/templates/confirm_err_page.ejs', { msg: '参数错误' })
}
const chainTask = await ChainTask.findById(id) const chainTask = await ChainTask.findById(id)
if (!chainTask) { if (!chainTask) {
return res.view('/templates/confirm_err_page.ejs', { msg: '任务未找到' }) return res.view('/templates/confirm_err_page.ejs', { msg: '任务未找到' })
@ -66,6 +71,14 @@ class WorkFlowController extends BaseController {
return res.view('/templates/confirm_page.ejs', { id: id, subtasks: requestTasks, mainTask: chainTask, address }) return res.view('/templates/confirm_page.ejs', { id: id, subtasks: requestTasks, mainTask: chainTask, address })
} }
@role(ROLE_ANON)
@router('get /workflow/redirect/:id')
async rejectPage(req, res) {
const { id } = req.params
res.header('Content-Security-Policy', "script-src 'nonce-rAnd0m'")
return res.view('/templates/redirect_page.ejs', { id })
}
@role(ROLE_ANON) @role(ROLE_ANON)
@router('get /workflow/update_required') @router('get /workflow/update_required')
async updateRequired(req, res) { async updateRequired(req, res) {
@ -95,14 +108,22 @@ class WorkFlowController extends BaseController {
// let fileId = 'WWME_g-oYEAAAzSUkPNpznkoGbgD2f1bDCA.xlsx' // let fileId = 'WWME_g-oYEAAAzSUkPNpznkoGbgD2f1bDCA.xlsx'
// await new WechatWorkService().fetchFile(fileId) // await new WechatWorkService().fetchFile(fileId)
// console.log('11') // console.log('11')
// let spNo = '202304070010' let spNo = '202304070010'
// new TaskQueue().addTaskToQueue(spNo) // new TaskQueue().addTaskToQueue(spNo)
let { id } = req.params let task = await ChainTask.findById('642fe42611845ce0e1def316')
let record = await RequestTask.findById(id) for (let tid of task.tasks) {
if (record) { let subTask = await RequestTask.findById(tid)
let res = await new BlockChain().walletReactor.executeSchedule(record) let { scheduleId } = new BlockChain().walletReactor.genOperation(subTask)
return res subTask.scheduleId = scheduleId
await subTask.save()
new ChainQueue().addTaskToQueue(subTask)
} }
// let { id } = req.params
// let record = await RequestTask.findById(id)
// if (record) {
// let res = await new BlockChain().walletReactor.executeSchedule(record)
// return res
// }
return {} return {}
} }
} }

View File

@ -2,6 +2,7 @@ import { getModelForClass, index, modelOptions, mongoose, pre, prop, Ref, Severi
import { BlockChain } from 'chain/BlockChain' import { BlockChain } from 'chain/BlockChain'
import { MAX_BATCH_REQ_COUNT, ZERO_BYTES32 } from 'common/Constants' import { MAX_BATCH_REQ_COUNT, ZERO_BYTES32 } from 'common/Constants'
import { dbconn } from 'decorators/dbconn' import { dbconn } from 'decorators/dbconn'
import { MailQueue } from 'queue/mail.queue'
import { generateRandomBytes32 } from 'utils/wallet.util' import { generateRandomBytes32 } from 'utils/wallet.util'
import { BaseModule } from './Base' import { BaseModule } from './Base'
@ -11,9 +12,11 @@ export enum TaskStatus {
NOTSTART = 0, NOTSTART = 0,
PEDING = 1, PEDING = 1,
SUCCESS = 2, SUCCESS = 2,
TX_ALL_CONFIRM = 3, TX_SCHEDULE_CONFIRM = 3,
TX_EXEC_CONFIRM = 4,
TX_PART_ERROR = 8, TX_PART_ERROR = 8,
TX_ERROR = 9, TX_EXEC_ERROR = 9,
TX_SCHEDULE_ERROR = 10,
} }
@dbconn() @dbconn()
@ -81,26 +84,41 @@ export class ChainTaskClass extends BaseModule {
record.successCount = sCount record.successCount = sCount
record.errorCount = errCount record.errorCount = errCount
if (sCount === record.tasks.length) { if (sCount === record.tasks.length) {
record.status = TaskStatus.TX_ALL_CONFIRM record.status = TaskStatus.TX_EXEC_CONFIRM
record.allEnd = true record.allEnd = true
} else { } else {
record.allEnd = false record.allEnd = false
if (record.status === TaskStatus.NOTSTART && sCount > 0) { if (record.status === TaskStatus.NOTSTART && sCount > 0) {
record.status = TaskStatus.PEDING record.status = TaskStatus.PEDING
} else if (errCount === record.tasks.length) { } else if (errCount === record.tasks.length) {
record.status = TaskStatus.TX_ERROR record.status = TaskStatus.TX_EXEC_ERROR
record.allEnd = true record.allEnd = true
} else if (errCount + sCount === record.tasks.length) { } else if (errCount + sCount === record.tasks.length) {
record.status = TaskStatus.TX_PART_ERROR record.status = TaskStatus.TX_PART_ERROR
record.allEnd = true record.allEnd = true
} }
} }
await record.save()
if (record.allEnd) { if (record.allEnd) {
setImmediate(async function () { setImmediate(async function () {
// TODO:: 通知前端 // TODO:: 通知前端
}) })
} }
await record.save()
}
public static async checkScheduleStatus(chainTaskId: string) {
let record = await ChainTask.findById(chainTaskId)
let sCount = 0
for (let subId of record.tasks) {
let subData = await RequestTask.findById(subId)
if (subData.status === ReqTaskStatus.WAIT_EXEC) {
sCount += 1
}
}
if (sCount === record.tasks.length) {
// send mail to confirmer
new MailQueue().addTaskToQueue(record)
}
} }
public static async allUnFinishedTask() { public static async allUnFinishedTask() {

View File

@ -18,13 +18,15 @@ export enum ReqTaskStatus {
PEDING = 1, PEDING = 1,
WAIT_CONFIRM = 2, WAIT_CONFIRM = 2,
WAIT_EXEC = 3, WAIT_EXEC = 3,
SUCCESS = 4, WAIT_EXEC_CONFIRM = 4,
REVERT = 8, SUCCESS = 5,
ERROR = 9, SCHEDULE_REVERT = 8,
EXEC_REVERT = 9,
ERROR = 99,
} }
@dbconn() @dbconn()
@index({ scheduleId: 1 }, { unique: false }) @index({ scheduleId: 1 }, { unique: true })
@modelOptions({ @modelOptions({
schemaOptions: { collection: 'chain_request_task', timestamps: true }, schemaOptions: { collection: 'chain_request_task', timestamps: true },
options: { allowMixed: Severity.ALLOW }, options: { allowMixed: Severity.ALLOW },
@ -42,6 +44,9 @@ export class RequestTaskClass extends BaseModule {
@prop({ required: true, default: 0 }) @prop({ required: true, default: 0 })
public tryCount: number public tryCount: number
@prop({ required: true, default: 0 })
public execCount: number
@prop({ required: true, default: 0 }) @prop({ required: true, default: 0 })
public maxTryCount: number public maxTryCount: number
@ -58,7 +63,10 @@ export class RequestTaskClass extends BaseModule {
public startTime: number public startTime: number
@prop() @prop()
public endTime: number public endScheduleTime: number
@prop()
public endExecTime: number
//begin for schedule //begin for schedule
@prop() @prop()
@ -82,6 +90,9 @@ export class RequestTaskClass extends BaseModule {
@prop() @prop()
public txHash: string public txHash: string
@prop()
public execHash: string
/** /**
* block num * block num
*/ */
@ -108,6 +119,16 @@ export class RequestTaskClass extends BaseModule {
await self.save() await self.save()
} }
public async execSchdule(this: DocumentType<RequestTaskClass>) {
let self = this
let result = await new BlockChain().walletReactor.executeSchedule(self)
logger.info(result)
let { transactionHash } = result
self.execHash = transactionHash
self.statue = ReqTaskStatus.WAIT_EXEC_CONFIRM
await self.save()
}
public static async allUnFinishedTask(chainTaskId: string) { public static async allUnFinishedTask(chainTaskId: string) {
return RequestTask.find({ chainTaskId, status: { $ne: ReqTaskStatus.SUCCESS } }) return RequestTask.find({ chainTaskId, status: { $ne: ReqTaskStatus.SUCCESS } })
} }

View File

@ -3,8 +3,7 @@ import { dbconn } from 'decorators/dbconn'
import { BaseModule } from './Base' import { BaseModule } from './Base'
@dbconn() @dbconn()
@index({ tokenId: 1 }, { unique: false }) @index({ transactionHash: 1 }, { unique: true })
@index({ transactionHash: 1, tokenId: 1, from: 1, to: 1 }, { unique: true })
@modelOptions({ @modelOptions({
schemaOptions: { collection: 'schedule_confirm_event', timestamps: true }, schemaOptions: { collection: 'schedule_confirm_event', timestamps: true },
}) })
@ -23,8 +22,8 @@ export class ScheduleConfirmEventClass extends BaseModule {
public removed: boolean public removed: boolean
@prop() @prop()
public operater: string public operater: string
@prop() @prop({ type: () => [String] })
public scheduleId: string public scheduleIds: string[]
@prop() @prop()
public blockTime: number public blockTime: number
@prop({ default: 0 }) @prop({ default: 0 })
@ -40,7 +39,7 @@ export class ScheduleConfirmEventClass extends BaseModule {
blockNumber: event.blockHeight, blockNumber: event.blockHeight,
removed: event.removed, removed: event.removed,
operater: event.sender, operater: event.sender,
scheduleId: event.id, scheduleIds: event.ids,
transactionHash: event.hash, transactionHash: event.hash,
blockTime: new Date(event.time).getTime(), blockTime: new Date(event.time).getTime(),
$inc: { version: 1 }, $inc: { version: 1 },

View File

@ -0,0 +1,53 @@
import { getModelForClass, index, modelOptions, prop } from '@typegoose/typegoose'
import { dbconn } from 'decorators/dbconn'
import { BaseModule } from './Base'
@dbconn()
@index({ transactionHash: 1, scheduleId: 1 }, { unique: true })
@modelOptions({
schemaOptions: { collection: 'schedule_executed_event', timestamps: true },
})
export class ScheduleExecutedEventClass extends BaseModule {
@prop({ required: true })
public address!: string
@prop()
public event: string
@prop({ required: true })
public transactionHash: string
@prop()
public blockNumber: number
@prop()
public blockHash: string
@prop()
public removed: boolean
@prop()
public operater: string
@prop()
public scheduleId: string
@prop()
public blockTime: number
@prop({ default: 0 })
public version: number
public static async saveEvent(event: any) {
if (!event.success) {
return
}
const data = {
address: event.tokenAddress,
blockNumber: event.blockHeight,
removed: event.removed,
operater: event.sender,
transactionHash: event.hash,
blockTime: new Date(event.time).getTime(),
$inc: { version: 1 },
}
return ScheduleExecutedEvent.insertOrUpdate({ transactionHash: event.hash, scheduleId: event.id }, data)
}
}
export const ScheduleExecutedEvent = getModelForClass(ScheduleExecutedEventClass, {
existingConnection: ScheduleExecutedEventClass['db'],
})

View File

@ -0,0 +1,53 @@
import { getModelForClass, index, modelOptions, prop } from '@typegoose/typegoose'
import { dbconn } from 'decorators/dbconn'
import { BaseModule } from './Base'
@dbconn()
@index({ transactionHash: 1, scheduleId: 1 }, { unique: true })
@modelOptions({
schemaOptions: { collection: 'schedule_added_event', timestamps: true },
})
export class ScheduledAddedEventClass extends BaseModule {
@prop({ required: true })
public address!: string
@prop()
public event: string
@prop({ required: true })
public transactionHash: string
@prop()
public blockNumber: number
@prop()
public blockHash: string
@prop()
public removed: boolean
@prop()
public operater: string
@prop()
public scheduleId: string
@prop()
public blockTime: number
@prop({ default: 0 })
public version: number
public static async saveEvent(event: any) {
if (!event.success) {
return
}
const data = {
address: event.tokenAddress,
blockNumber: event.blockHeight,
removed: event.removed,
operater: event.sender,
transactionHash: event.hash,
blockTime: new Date(event.time).getTime(),
$inc: { version: 1 },
}
return ScheduledAddedEvent.insertOrUpdate({ transactionHash: event.hash, scheduleId: event.id }, data)
}
}
export const ScheduledAddedEvent = getModelForClass(ScheduledAddedEventClass, {
existingConnection: ScheduledAddedEventClass['db'],
})

View File

@ -6,6 +6,12 @@ import { BlockChain } from 'chain/BlockChain'
import { ChainTask } from 'models/ChainTask' import { ChainTask } from 'models/ChainTask'
import logger from 'logger/logger' import logger from 'logger/logger'
const EXCLUDE_STATUS = [
ReqTaskStatus.SUCCESS,
ReqTaskStatus.WAIT_EXEC,
ReqTaskStatus.WAIT_EXEC_CONFIRM,
ReqTaskStatus.EXEC_REVERT,
]
@singleton @singleton
export class ChainQueue { export class ChainQueue {
private queue: AsyncQueue private queue: AsyncQueue
@ -17,6 +23,9 @@ export class ChainQueue {
} }
public async addTaskToQueue(subTask: DocumentType<RequestTaskClass>) { public async addTaskToQueue(subTask: DocumentType<RequestTaskClass>) {
if (EXCLUDE_STATUS.indexOf(subTask.status) >= 0) {
return
}
if (subTask.maxTryCount && subTask.tryCount > subTask.maxTryCount) { if (subTask.maxTryCount && subTask.tryCount > subTask.maxTryCount) {
subTask.status = ReqTaskStatus.ERROR subTask.status = ReqTaskStatus.ERROR
await subTask.save() await subTask.save()
@ -31,6 +40,7 @@ export class ChainQueue {
this.blockChain.confirmQueue.addTaskToQueue(subTask) this.blockChain.confirmQueue.addTaskToQueue(subTask)
return return
} }
this.queue.push(async () => { this.queue.push(async () => {
try { try {
subTask.tryCount += 1 subTask.tryCount += 1

View File

@ -7,6 +7,7 @@ import { isSuccessfulTransaction, waitTransaction } from 'chain/TransactionConfi
import { ChainTask } from 'models/ChainTask' import { ChainTask } from 'models/ChainTask'
import { ChainQueue } from './chain.queue' import { ChainQueue } from './chain.queue'
import logger from 'logger/logger' import logger from 'logger/logger'
import { ExecQueue } from './exec.queue'
@singleton @singleton
export class ConfirmQueue { export class ConfirmQueue {
@ -24,20 +25,35 @@ export class ConfirmQueue {
let receipt = await waitTransaction(this.web3, task.txHash) let receipt = await waitTransaction(this.web3, task.txHash)
logger.info(`receipt confirmed: ${task.txHash}`) logger.info(`receipt confirmed: ${task.txHash}`)
if (isSuccessfulTransaction(receipt)) { if (isSuccessfulTransaction(receipt)) {
if (task.status === ReqTaskStatus.WAIT_CONFIRM) {
task.status = ReqTaskStatus.WAIT_EXEC
task.endScheduleTime = Date.now()
await ChainTask.checkScheduleStatus(task.chainTaskId)
} else if (task.status === ReqTaskStatus.WAIT_EXEC_CONFIRM) {
task.status = ReqTaskStatus.SUCCESS task.status = ReqTaskStatus.SUCCESS
task.endTime = Date.now() task.endExecTime = Date.now()
await task.save()
await ChainTask.checkStatus(task.chainTaskId) await ChainTask.checkStatus(task.chainTaskId)
} else { }
task.status = ReqTaskStatus.REVERT
await task.save() await task.save()
} else {
if (task.status === ReqTaskStatus.WAIT_CONFIRM) {
task.status = ReqTaskStatus.SCHEDULE_REVERT
new ChainQueue().addTaskToQueue(task) new ChainQueue().addTaskToQueue(task)
} else if (task.status === ReqTaskStatus.WAIT_EXEC_CONFIRM) {
task.status = ReqTaskStatus.EXEC_REVERT
new ExecQueue().addTaskToQueue(task)
}
await task.save()
} }
} catch (err) { } catch (err) {
logger.error(err) logger.error(err)
task.errMsg.push(err) task.errMsg.push(err)
await task.save() await task.save()
if (task.status === ReqTaskStatus.WAIT_CONFIRM) {
new ChainQueue().addTaskToQueue(task) new ChainQueue().addTaskToQueue(task)
} else if (task.status === ReqTaskStatus.WAIT_EXEC_CONFIRM) {
new ExecQueue().addTaskToQueue(task)
}
} }
}) })
} }

60
src/queue/exec.queue.ts Normal file
View File

@ -0,0 +1,60 @@
import { AsyncQueue, createAsyncQueue } from 'common/AsyncQueue'
import { singleton } from 'decorators/singleton'
import { BlockChain } from 'chain/BlockChain'
import logger from 'logger/logger'
import { ReqTaskStatus, RequestTaskClass } from 'models/RequestTask'
import { DocumentType } from '@typegoose/typegoose'
import { ChainTask } from 'models/ChainTask'
const EXCLUDE_STATUS = [
ReqTaskStatus.NOTSTART,
ReqTaskStatus.SUCCESS,
ReqTaskStatus.PEDING,
ReqTaskStatus.WAIT_CONFIRM,
ReqTaskStatus.SCHEDULE_REVERT,
]
@singleton
export class ExecQueue {
private queue: AsyncQueue
private blockChain: BlockChain
constructor() {
this.queue = createAsyncQueue()
this.blockChain = new BlockChain()
}
public async addTaskToQueue(subTask: DocumentType<RequestTaskClass>) {
if (EXCLUDE_STATUS.indexOf(subTask.status) >= 0) {
return
}
if (subTask.maxTryCount && subTask.execCount > subTask.maxTryCount) {
subTask.status = ReqTaskStatus.ERROR
await subTask.save()
await ChainTask.checkStatus(subTask.chainTaskId)
return
}
if (subTask.status === ReqTaskStatus.WAIT_EXEC_CONFIRM) {
this.blockChain.confirmQueue.addTaskToQueue(subTask)
return
}
this.queue.push(async () => {
subTask.execCount += 1
try {
try {
await subTask.execSchdule()
} catch (reqerr) {
logger.info(reqerr)
subTask.errMsg.push(JSON.stringify(reqerr))
await subTask.save()
// TODO:: 要排除数据已经提交到链上, 返回过程中的网络错误
this.addTaskToQueue(subTask)
return
}
} catch (err) {
logger.error('error add task: ' + err)
subTask.errMsg.push(err)
await subTask.save()
}
})
}
}

57
src/queue/mail.queue.ts Normal file
View File

@ -0,0 +1,57 @@
import { AsyncQueue, createAsyncQueue } from 'common/AsyncQueue'
import { singleton } from 'decorators/singleton'
import { DocumentType } from '@typegoose/typegoose'
import logger from 'logger/logger'
import { ChainTaskClass } from 'models/ChainTask'
import { MailService } from 'service/mail.service'
import { Deferred } from 'utils/promise.util'
import {CONFIRM_MAIL_HTML} from 'common/Constants'
export interface IMailData {
from?: string
to: string
subject?: string
text?: string
html?: string
}
const DEFAULT_MSG_DATA: IMailData = {
from: 'CEBG <noreply@cebg.games>',
to: process.env.MAIL_DEFAULT_ADDRESS,
subject: 'CEBG',
}
@singleton
export class MailQueue {
private queue: AsyncQueue
constructor() {
this.queue = createAsyncQueue()
}
public async addTaskToQueue(task: DocumentType<ChainTaskClass>) {
let html = CONFIRM_MAIL_HTML
html = html.replace("{{title}}", task.name)
html = html.replace("{{desc}}", task.desc)
let link = `${process.env.WEB_BASE_URL}/workflow/redirect/${task.id}`
html = html.replace("{{link}}", link)
let link2 = `${process.env.WEB_BASE_URL}/workflow/confirm/${task.id}`
html = html.replace("{{link2}}", link2)
let data: any = {html}
Object(DEFAULT_MSG_DATA).zssign(data)
let deferred = new Deferred()
this.queue.push(async () => {
try {
let info = await new MailService().send(data)
logger.info(
`send mail success:: from: ${data.from}, to: ${data.to}, subject: ${data.subject}, messageId: ${info.messageId}`,
)
deferred.resolve(info)
} catch (err) {
logger.info(`send mail error:: from: ${data.from}, to: ${data.to}, subject: ${data.subject}`)
logger.error(err)
deferred.reject(err)
}
})
return deferred.promise
}
}

View File

@ -2,16 +2,19 @@ import logger from 'logger/logger'
import { ChainTask } from 'models/ChainTask' import { ChainTask } from 'models/ChainTask'
import { RequestTask } from 'models/RequestTask' import { RequestTask } from 'models/RequestTask'
import { ChainQueue } from 'queue/chain.queue' import { ChainQueue } from 'queue/chain.queue'
import { ExecQueue } from 'queue/exec.queue'
export async function restartAllUnFinishedTask() { export async function restartAllUnFinishedTask() {
let chainTasks = await ChainTask.allUnFinishedTask() let chainTasks = await ChainTask.allUnFinishedTask()
logger.info(`restore ${chainTasks.length} chain tasks`) logger.info(`restore ${chainTasks.length} chain tasks`)
let chainQueue = new ChainQueue() let chainQueue = new ChainQueue()
let execQueue = new ExecQueue()
for (let cTask of chainTasks) { for (let cTask of chainTasks) {
let subTasks = await RequestTask.allUnFinishedTask(cTask.id) let subTasks = await RequestTask.allUnFinishedTask(cTask.id)
logger.info(`restore ${subTasks.length} req tasks fro ${cTask.id}`) logger.info(`restore ${subTasks.length} req tasks fro ${cTask.id}`)
for (let subTask of subTasks) { for (let subTask of subTasks) {
chainQueue.addTaskToQueue(subTask) chainQueue.addTaskToQueue(subTask)
execQueue.addTaskToQueue(subTask)
} }
} }
} }

View File

@ -0,0 +1,27 @@
import { singleton } from 'decorators/singleton'
import { createTransport, Transporter } from 'nodemailer'
import Mail from 'nodemailer/lib/mailer'
@singleton
export class MailService {
private transporter: Transporter
constructor() {
const options = {
host: process.env.MAIL_SMTP_HOST,
secure: true,
auth: {
user: process.env.MAIL_SMTP_USER,
pass: process.env.MAIL_SMTP_PASS,
},
logger: true,
debug: false,
}
// @ts-ignore
this.transporter = createTransport(options, {})
}
public async send(message: Mail.Options) {
await this.transporter.verify()
return this.transporter.sendMail(message)
}
}

View File

@ -1,7 +1,9 @@
import { BlockChain } from 'chain/BlockChain' import { BlockChain } from 'chain/BlockChain'
import { singleton } from 'decorators/singleton' import { singleton } from 'decorators/singleton'
import { ChainTask } from 'models/ChainTask' import { ChainTask } from 'models/ChainTask'
import { ReqTaskStatus, RequestTask } from 'models/RequestTask'
import { ChainQueue } from 'queue/chain.queue' import { ChainQueue } from 'queue/chain.queue'
import { ExecQueue } from 'queue/exec.queue'
import { WechatWorkService } from './wechatwork.service' import { WechatWorkService } from './wechatwork.service'
@singleton @singleton
@ -16,4 +18,14 @@ export class TaskSvr {
new ChainQueue().addTaskToQueue(subTask) new ChainQueue().addTaskToQueue(subTask)
} }
} }
public async parseOneSchedule(scheduleId: string) {
let record = await RequestTask.findOne({ scheduleId })
if (!record) {
return
}
record.statue = ReqTaskStatus.WAIT_EXEC
await record.save()
new ExecQueue().addTaskToQueue(record)
}
} }

View File

@ -0,0 +1,47 @@
<!DOCTYPE HTML>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<title>任务详情</title>
<meta charset="utf-8">
<meta name="renderer" content="webkit">
<meta http-equiv="Content-Security-Policy" content="script-src 'nonce-rAnd0m'">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta http-equiv="Expires" content="0">
<meta http-equiv="Pragma" content="no-cache">
<meta http-equiv="Cache-control" content="no-cache">
<meta http-equiv="Cache" content="no-cache">
<meta name="viewport" content="width=device-width,initial-scale=1, minimum-scale=1, maximum-scale=1, user-scalable=no, minimal-ui" />
<meta name="apple-mobile-web-app-capable" content="yes" />
<meta name="full-screen" content="true" />
<meta name="screen-orientation" content="portrait" />
<meta name="x5-fullscreen" content="true" />
<meta name="360-fullscreen" content="true" />
<meta name="apple-mobile-web-app-title" content="WJTX">
<meta name="apple-mobile-web-app-capable" content="yes">
<meta name="apple-mobile-web-app-status-bar-style" content="black">
</head>
<body>
<h1>跳转中...</h1>
<script nonce="rAnd0m">
(function() {
var url1 = 'https://metamask.app.link/dapp/www.cebg.games'
var url2 = 'https://www.cebg.games'
var url = ''
// if current devices is mobile, redirect url1, else redirect url2
if (/(iPhone|iPad|iPod|iOS|Android)/i.test(navigator.userAgent)) {
url = url1
} else {
url = url2
}
console.log(url)
location.href = url
})()
</script>
</body>
</html>

View File

@ -557,6 +557,13 @@
resolved "https://registry.npmmirror.com/@types/node/-/node-16.11.64.tgz#9171f327298b619e2c52238b120c19056415d820" resolved "https://registry.npmmirror.com/@types/node/-/node-16.11.64.tgz#9171f327298b619e2c52238b120c19056415d820"
integrity sha512-z5hPTlVFzNwtJ2LNozTpJcD1Cu44c4LNuzaq1mwxmiHWQh2ULdR6Vjwo1UGldzRpzL0yUEdZddnfqGW2G70z6Q== integrity sha512-z5hPTlVFzNwtJ2LNozTpJcD1Cu44c4LNuzaq1mwxmiHWQh2ULdR6Vjwo1UGldzRpzL0yUEdZddnfqGW2G70z6Q==
"@types/nodemailer@^6.4.7":
version "6.4.7"
resolved "https://registry.yarnpkg.com/@types/nodemailer/-/nodemailer-6.4.7.tgz#658f4bca47c1a895b1d7e054b3b54030a5e1f5e0"
integrity sha512-f5qCBGAn/f0qtRcd4SEn88c8Fp3Swct1731X4ryPKqS61/A3LmmzN8zaEz7hneJvpjFbUUgY7lru/B/7ODTazg==
dependencies:
"@types/node" "*"
"@types/pbkdf2@^3.0.0": "@types/pbkdf2@^3.0.0":
version "3.1.0" version "3.1.0"
resolved "https://registry.npmmirror.com/@types/pbkdf2/-/pbkdf2-3.1.0.tgz#039a0e9b67da0cdc4ee5dab865caa6b267bb66b1" resolved "https://registry.npmmirror.com/@types/pbkdf2/-/pbkdf2-3.1.0.tgz#039a0e9b67da0cdc4ee5dab865caa6b267bb66b1"
@ -3467,6 +3474,11 @@ node-xlsx@^0.21.0:
dependencies: dependencies:
xlsx "^0.17.4" xlsx "^0.17.4"
nodemailer@^6.9.1:
version "6.9.1"
resolved "https://registry.yarnpkg.com/nodemailer/-/nodemailer-6.9.1.tgz#8249d928a43ed85fec17b13d2870c8f758a126ed"
integrity sha512-qHw7dOiU5UKNnQpXktdgQ1d3OFgRAekuvbJLcdG5dnEo/GtcTHRYM7+UfJARdOFU9WUQO8OiIamgWPmiSFHYAA==
normalize-path@^3.0.0, normalize-path@~3.0.0: normalize-path@^3.0.0, normalize-path@~3.0.0:
version "3.0.0" version "3.0.0"
resolved "https://registry.npmmirror.com/normalize-path/-/normalize-path-3.0.0.tgz#0dcd69ff23a1c9b11fd0978316644a0388216a65" resolved "https://registry.npmmirror.com/normalize-path/-/normalize-path-3.0.0.tgz#0dcd69ff23a1c9b11fd0978316644a0388216a65"