aozhiwei 1af2fc3fa0 1
2024-06-05 13:49:32 +08:00

129 lines
3.0 KiB
Go

package task
import (
"q5"
"f5"
"fmt"
"sync"
"time"
"main/constant"
)
type taskMgr struct {
lastSyncSysMailIdx int64
syncSysMailCond *sync.Cond
}
var TaskMgr = new(taskMgr)
func (this *taskMgr) Init() {
this.syncSysMailCond = sync.NewCond(new(sync.Mutex))
f5.GetGoStyleDb().OrmSelectOne(
constant.MAIL_DB,
"t_param",
[][]string{
{"param_name", constant.PARAM_NAME_LAST_SYNC_SYS_MAIL_IDX},
},
func (err error, ds *f5.DataSet) {
if err != nil {
panic(fmt.Sprintf("taskMgr init err:%s", err))
return
}
if ds.Next() {
this.lastSyncSysMailIdx = q5.ToInt64(ds.GetByName("param_val1"))
}
go this.syncSysMail()
})
}
func (this *taskMgr) UnInit() {
}
func (this *taskMgr) syncSysMail() {
go func () {
var waitMs int64 = 1000 * 2
for {
select {
case <-time.After(time.Millisecond * time.Duration(waitMs)):
waitMs = 1000 * 2
this.syncSysMailCond.Broadcast()
}
}
}()
for true {
this.pullSysMail()
this.syncSysMailCond.L.Lock()
this.syncSysMailCond.Wait()
this.syncSysMailCond.L.Unlock()
}
}
func (this *taskMgr) pullSysMail() {
f5.GetGoStyleDb().SelectCustomQuery(
constant.MAIL_DB,
fmt.Sprintf("SELECT * FROM t_sys_mail WHERE idx>%d LIMIT 1000;", this.lastSyncSysMailIdx),
func (err error, ds *f5.DataSet) {
if err != nil {
f5.GetSysLog().Warning("pullSysMail %s", err)
return
}
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
f5.GetSysLog().Info("pullSysMail %s begin", idx)
if !this.writeMail(ds) {
return
}
f5.GetSysLog().Info("pullSysMail %s end", idx)
if idx > this.lastSyncSysMailIdx {
this.lastSyncSysMailIdx = idx
}
}
})
}
func (this *taskMgr) writeMail(ds *f5.DataSet) bool {
unikey := ds.GetByName("unikey")
subject := ds.GetByName("subject")
content := ds.GetByName("content")
recipients := ds.GetByName("recipients")
attachments := ds.GetByName("attachments")
tag1 := q5.ToInt32(ds.GetByName("tag1"))
tag2 := q5.ToInt32(ds.GetByName("tag2"))
sendTime := q5.ToInt32(ds.GetByName("sendtime"))
expireTime := q5.ToInt32(ds.GetByName("expiretime"))
userRegStartTime := q5.ToInt32(ds.GetByName("user_reg_start_time"))
userRegEndTime := q5.ToInt32(ds.GetByName("user_reg_end_time"))
var ok = false
f5.GetGoStyleDb().Upsert(
constant.MAIL_DB,
"t_mail",
[][]string{
{"unikey", unikey},
},
[][]string{
},
[][]string{
{"mail_id", q5.ToString(f5.GetApp().NewLockNodeUuid())},
{"mail_type", q5.ToString(constant.MAIL_TYPE_GROUP)},
{"unikey", unikey},
{"subject", subject},
{"content", content},
{"recipients", recipients},
{"attachments", attachments},
{"sendtime", q5.ToString(sendTime)},
{"user_reg_start_time", q5.ToString(userRegStartTime)},
{"user_reg_end_time", q5.ToString(userRegEndTime)},
{"tag1", q5.ToString(tag1)},
{"tag2", q5.ToString(tag2)},
{"expiretime", q5.ToString(expireTime)},
},
func (err error, lastInsertId int64, rowsAffected int64) {
ok = err == nil
if err != nil {
f5.GetSysLog().Info("writeMail err:%s", err)
}
})
return ok
}