From 3369e54e6ef21f83493904ba0338b6a1b0198234 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Tue, 23 Jul 2024 13:28:55 +0800 Subject: [PATCH] 1 --- dbpool.go | 45 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/dbpool.go b/dbpool.go index 45ec978..c5b46b0 100644 --- a/dbpool.go +++ b/dbpool.go @@ -7,6 +7,8 @@ import ( "q5" "strings" "sync" + "sync/atomic" + "math/rand" "time" ) @@ -27,12 +29,15 @@ type dbPool struct { style DBStyle lock sync.Mutex dataSourceHash map[string]*q5.ListHead - maxIdxMonitorHash q5.ConcurrentMap[string, q5.ConcurrentMap[int64, *chan int64]] + currMonitorIdx int64 + monitorLock sync.Mutex + maxIdxMonitorHash *q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]] } func (this *dbPool) init(style DBStyle) { this.style = style this.dataSourceHash = make(map[string]*q5.ListHead) + this.maxIdxMonitorHash = new(q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]]) } func (this *dbPool) unInit() { @@ -878,5 +883,41 @@ func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watch } func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan int64 { - return nil + this.monitorLock.Lock() + defer this.monitorLock.Unlock() + + key := fmt.Sprintf("%s_%s", dataSource, tblName) + chNewMaxIdx := make(chan int64) + newIdx := atomic.AddInt64(&this.currMonitorIdx, 1) + if p, ok := this.maxIdxMonitorHash.Load(key); ok { + (*p).Store(newIdx, &chNewMaxIdx) + } else { + p := new(q5.ConcurrentMap[int64, *chan int64]) + p.Store(newIdx, &chNewMaxIdx) + this.maxIdxMonitorHash.Store(key, p) + go func () { + var lastMaxIdx int64 + for { + this.RawQuery( + dataSource, + fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", tblName), + []string{}, + func (err error, ds *DataSet) { + if err != nil { + return + } + if ds.Next() { + idx := q5.ToInt64(ds.GetByName("max_idx")) + if idx > lastMaxIdx { + lastMaxIdx = idx + //hasNewDdata = true + GetSysLog().Info("%s hasNewData max_idx:%d", tblName, lastMaxIdx) + } + } + }) + time.Sleep(time.Millisecond * time.Duration(800 + rand.Intn(120))) + } + }() + } + return &chNewMaxIdx }