package task import ( "q5" "f5" "fmt" "sync" "time" "main/constant" "main/model/system" ) type taskMgr struct { lastSyncSysMailIdx int64 syncSysMailCond *sync.Cond } var TaskMgr = new(taskMgr) func (this *taskMgr) Init() { this.syncSysMailCond = sync.NewCond(new(sync.Mutex)) f5.GetGoStyleDb().OrmSelectOne( constant.MAIL_DB, "t_param", [][]string{ {"param_name", constant.PARAM_NAME_LAST_SYNC_SYS_MAIL_IDX}, }, func (err error, ds *f5.DataSet) { if err != nil { panic(fmt.Sprintf("taskMgr init err:%s", err)) return } if ds.Next() { this.lastSyncSysMailIdx = q5.ToInt64(ds.GetByName("param_val1")) } go this.syncSysMail() }) } func (this *taskMgr) UnInit() { } func (this *taskMgr) syncSysMail() { go func () { var waitMs int64 = 1000 * 2 for { select { case <-time.After(time.Millisecond * time.Duration(waitMs)): waitMs = 1000 * 2 this.syncSysMailCond.Broadcast() } } }() lastOutTick := q5.GetTickCount() for true { this.pullSysMail() this.syncSysMailCond.L.Lock() this.syncSysMailCond.Wait() this.syncSysMailCond.L.Unlock() if q5.GetTickCount() - lastOutTick > 15 { lastOutTick = q5.GetTickCount() f5.GetSysLog().Info("taskMgr lastSyncSysMailIdx:%s", this.lastSyncSysMailIdx) } } } func (this *taskMgr) pullSysMail() { f5.GetGoStyleDb().SelectCustomQuery( constant.MAIL_DB, fmt.Sprintf("SELECT * FROM t_sys_mail WHERE idx>%d LIMIT 1000;", this.lastSyncSysMailIdx), func (err error, ds *f5.DataSet) { if err != nil { f5.GetSysLog().Warning("pullSysMail %s", err) return } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) f5.GetSysLog().Info("pullSysMail %s begin", idx) if !this.writeMail(ds) { return } f5.GetSysLog().Info("pullSysMail %s end", idx) if idx > this.lastSyncSysMailIdx { this.lastSyncSysMailIdx = idx } } if ds.NumOfReaded() > 0 { this.saveLastSyncSysMailIdx() } }) } func (this *taskMgr) writeMail(ds *f5.DataSet) bool { mailId := q5.ToString(f5.GetApp().NewLockNodeUuid()) unikey := ds.GetByName("unikey") subject := ds.GetByName("subject") content := ds.GetByName("content") recipients := ds.GetByName("recipients") attachments := ds.GetByName("attachments") tag1 := q5.ToInt32(ds.GetByName("tag1")) tag2 := q5.ToInt32(ds.GetByName("tag2")) sendTime := q5.ToInt32(ds.GetByName("sendtime")) expireTime := q5.ToInt32(ds.GetByName("expiretime")) userRegStartTime := q5.ToInt32(ds.GetByName("user_reg_start_time")) userRegEndTime := q5.ToInt32(ds.GetByName("user_reg_end_time")) var ok = false f5.GetGoStyleDb().Upsert( constant.MAIL_DB, "t_mail", [][]string{ {"unikey", unikey}, }, [][]string{ }, [][]string{ {"mail_id", mailId}, {"mail_type", q5.ToString(constant.MAIL_TYPE_GROUP)}, {"unikey", unikey}, {"subject", subject}, {"content", content}, {"recipients", recipients}, {"attachments", attachments}, {"sendtime", q5.ToString(sendTime)}, {"user_reg_start_time", q5.ToString(userRegStartTime)}, {"user_reg_end_time", q5.ToString(userRegEndTime)}, {"tag1", q5.ToString(tag1)}, {"tag2", q5.ToString(tag2)}, {"expiretime", q5.ToString(expireTime)}, }, func (err error, lastInsertId int64, rowsAffected int64) { ok = err == nil if err != nil { f5.GetSysLog().Info("writeMail err:%s", err) } else { { nowDaySeconds := int32(f5.GetApp().GetRealSeconds()) e := new(system.MailEvent) e.EventName = constant.EVENT_MAIL_UPDATE e.Param1 = q5.ToString(mailId) e.CreateTime = nowDaySeconds e.ModifyTime = nowDaySeconds f5.GetApp().GetOrmDb(constant.MAIL_DB).Create(e) } } }) return ok } func (this *taskMgr) saveLastSyncSysMailIdx() { f5.GetGoStyleDb().Upsert( constant.MAIL_DB, "t_param", [][]string{ {"param_name", constant.PARAM_NAME_LAST_SYNC_SYS_MAIL_IDX}, }, [][]string{ {"param_val1", q5.ToString(this.lastSyncSysMailIdx)}, }, [][]string { {"param_name", constant.PARAM_NAME_LAST_SYNC_SYS_MAIL_IDX}, {"param_val1", q5.ToString(this.lastSyncSysMailIdx)}, }, func (err error, lastInsertId int64, rowsAffected int64) { if err != nil { f5.GetSysLog().Info("saveLastSyncSysMailIdx err:%s", err) } }) }