From c842ce5e34e6b8a9e145913bcc87ea99219d45b5 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Tue, 23 Jul 2024 13:42:26 +0800 Subject: [PATCH] 1 --- dbpool.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/dbpool.go b/dbpool.go index c5b46b0..fcdfd3f 100644 --- a/dbpool.go +++ b/dbpool.go @@ -895,27 +895,36 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan i p := new(q5.ConcurrentMap[int64, *chan int64]) p.Store(newIdx, &chNewMaxIdx) this.maxIdxMonitorHash.Store(key, p) + notifyFunc := func (newMaxIdx int64) { + p.Range(func(id int64, ch *chan int64) bool { + go func() { + (*ch) <- newMaxIdx + }() + return true + }) + } go func () { var lastMaxIdx int64 for { this.RawQuery( dataSource, - fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", tblName), + fmt.Sprintf("SELECT MAX(idx) FROM %s", tblName), []string{}, func (err error, ds *DataSet) { if err != nil { + time.Sleep(time.Millisecond * time.Duration(8000 + rand.Intn(1200))) return } if ds.Next() { - idx := q5.ToInt64(ds.GetByName("max_idx")) + idx := q5.ToInt64(ds.GetByIndex(0)) if idx > lastMaxIdx { lastMaxIdx = idx - //hasNewDdata = true GetSysLog().Info("%s hasNewData max_idx:%d", tblName, lastMaxIdx) } } }) - time.Sleep(time.Millisecond * time.Duration(800 + rand.Intn(120))) + go notifyFunc(lastMaxIdx) + time.Sleep(time.Millisecond * time.Duration(800 + rand.Intn(1200))) } }() }