This commit is contained in:
aozhiwei 2024-07-23 13:42:26 +08:00
parent 3369e54e6e
commit c842ce5e34

View File

@ -895,27 +895,36 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan i
p := new(q5.ConcurrentMap[int64, *chan int64]) p := new(q5.ConcurrentMap[int64, *chan int64])
p.Store(newIdx, &chNewMaxIdx) p.Store(newIdx, &chNewMaxIdx)
this.maxIdxMonitorHash.Store(key, p) this.maxIdxMonitorHash.Store(key, p)
notifyFunc := func (newMaxIdx int64) {
p.Range(func(id int64, ch *chan int64) bool {
go func() {
(*ch) <- newMaxIdx
}()
return true
})
}
go func () { go func () {
var lastMaxIdx int64 var lastMaxIdx int64
for { for {
this.RawQuery( this.RawQuery(
dataSource, dataSource,
fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", tblName), fmt.Sprintf("SELECT MAX(idx) FROM %s", tblName),
[]string{}, []string{},
func (err error, ds *DataSet) { func (err error, ds *DataSet) {
if err != nil { if err != nil {
time.Sleep(time.Millisecond * time.Duration(8000 + rand.Intn(1200)))
return return
} }
if ds.Next() { if ds.Next() {
idx := q5.ToInt64(ds.GetByName("max_idx")) idx := q5.ToInt64(ds.GetByIndex(0))
if idx > lastMaxIdx { if idx > lastMaxIdx {
lastMaxIdx = idx lastMaxIdx = idx
//hasNewDdata = true
GetSysLog().Info("%s hasNewData max_idx:%d", tblName, lastMaxIdx) GetSysLog().Info("%s hasNewData max_idx:%d", tblName, lastMaxIdx)
} }
} }
}) })
time.Sleep(time.Millisecond * time.Duration(800 + rand.Intn(120))) go notifyFunc(lastMaxIdx)
time.Sleep(time.Millisecond * time.Duration(800 + rand.Intn(1200)))
} }
}() }()
} }