This commit is contained in:
aozhiwei 2024-07-23 13:28:55 +08:00
parent fb4380095c
commit 3369e54e6e

View File

@ -7,6 +7,8 @@ import (
"q5"
"strings"
"sync"
"sync/atomic"
"math/rand"
"time"
)
@ -27,12 +29,15 @@ type dbPool struct {
style DBStyle
lock sync.Mutex
dataSourceHash map[string]*q5.ListHead
maxIdxMonitorHash q5.ConcurrentMap[string, q5.ConcurrentMap[int64, *chan int64]]
currMonitorIdx int64
monitorLock sync.Mutex
maxIdxMonitorHash *q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]]
}
func (this *dbPool) init(style DBStyle) {
this.style = style
this.dataSourceHash = make(map[string]*q5.ListHead)
this.maxIdxMonitorHash = new(q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]])
}
func (this *dbPool) unInit() {
@ -878,5 +883,41 @@ func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watch
}
func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan int64 {
return nil
this.monitorLock.Lock()
defer this.monitorLock.Unlock()
key := fmt.Sprintf("%s_%s", dataSource, tblName)
chNewMaxIdx := make(chan int64)
newIdx := atomic.AddInt64(&this.currMonitorIdx, 1)
if p, ok := this.maxIdxMonitorHash.Load(key); ok {
(*p).Store(newIdx, &chNewMaxIdx)
} else {
p := new(q5.ConcurrentMap[int64, *chan int64])
p.Store(newIdx, &chNewMaxIdx)
this.maxIdxMonitorHash.Store(key, p)
go func () {
var lastMaxIdx int64
for {
this.RawQuery(
dataSource,
fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", tblName),
[]string{},
func (err error, ds *DataSet) {
if err != nil {
return
}
if ds.Next() {
idx := q5.ToInt64(ds.GetByName("max_idx"))
if idx > lastMaxIdx {
lastMaxIdx = idx
//hasNewDdata = true
GetSysLog().Info("%s hasNewData max_idx:%d", tblName, lastMaxIdx)
}
}
})
time.Sleep(time.Millisecond * time.Duration(800 + rand.Intn(120)))
}
}()
}
return &chNewMaxIdx
}