From 46d9c483f2e1ac6999a171a937bf30a0b0a74ea6 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Fri, 14 Jun 2024 15:32:42 +0800 Subject: [PATCH] 1 --- server/marketserver/task/taskmgr.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/server/marketserver/task/taskmgr.go b/server/marketserver/task/taskmgr.go index a1dd893c..e7cbc4f2 100644 --- a/server/marketserver/task/taskmgr.go +++ b/server/marketserver/task/taskmgr.go @@ -12,14 +12,14 @@ type taskMgr struct { } func (this* taskMgr) Init() { - go this.loadWebHookEvent(constant.ORDER_UPDATE_EVENT) + go this.loadWebHookEvent(constant.ORDER_UPDATE_EVENT, this.orderUpdatedCb) } func (this* taskMgr) UnInit() { } -func (this* taskMgr) loadWebHookEvent(eventName string) { +func (this* taskMgr) loadWebHookEvent(eventName string, cb func(ds *f5.DataSet) bool) { var lastSyncIdx = this.getLastIdx(eventName) for true { if lastSyncIdx < 0 { @@ -28,15 +28,31 @@ func (this* taskMgr) loadWebHookEvent(eventName string) { continue } } else { + var newLastSyncIdx int64 f5.GetGoStyleDb().RawQuery( constant.BCEVENT_DB, - "SELECT * FROM t_webhook_process_last_idx WHERE idx > ?", + "SELECT * FROM t_webhook_process_last_idx WHERE idx > ? LIMIT 1000", []string{ q5.ToString(lastSyncIdx), }, func (err error, ds *f5.DataSet) { - + if err == nil { + for ds.Next() { + idx := q5.ToInt64(ds.GetByName("idx")) + if cb(ds) { + if idx > newLastSyncIdx { + newLastSyncIdx = idx + } + } else { + break + } + } + } }) + if newLastSyncIdx > lastSyncIdx { + lastSyncIdx = newLastSyncIdx + this.saveLastIdx(eventName, lastSyncIdx) + } } time.Sleep(time.Second * 3) } @@ -86,3 +102,7 @@ func (this* taskMgr) saveLastIdx(eventName string, lastIdx int64) bool { }) return result } + +func (this* taskMgr) orderUpdatedCb(ds *f5.DataSet) bool { + return true +}