This commit is contained in:
aozhiwei 2024-06-29 19:30:24 +08:00
parent 191048f40d
commit 0de6e748f6

View File

@ -140,30 +140,53 @@ func (this *dbPool) LoopLoad(
watchTimeCb func() int64, watchTimeCb func() int64,
sqlCb func(int64) string, sqlCb func(int64) string,
params []string, params []string,
nextTimeCb func() time.Duration, nextTimeCb func() int64,
nextRoundCb func() time.Duration, nextRoundCb func() int64,
doCb func(*DataSet) bool) { doCb func(*DataSet) bool) {
{ newDataCond := sync.NewCond(new(sync.Mutex))
go func () { chTimer := make(chan int64)
go func () {
var lastMaxIdx int64
for true {
hasNewDdata := false
this.RawQuery( this.RawQuery(
dataSource, dataSource,
fmt.Sprintf("SELECT MAX(idx) FROM %s", watchTable), fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", watchTable),
[]string{}, []string{},
func (err error, ds *DataSet) { func (err error, ds *DataSet) {
if err != nil {
return
}
if ds.Next() {
idx := q5.ToInt64(ds.GetByName("max_idx"))
if idx > lastMaxIdx {
lastMaxIdx = idx
hasNewDdata = true
}
}
}) })
time.Sleep(time.Second * 3) if hasNewDdata {
}() newDataCond.Broadcast()
} } else {
var lastIdx int64 time.Sleep(time.Second * time.Duration(watchTimeCb()))
for true { }
hasNextData := false }
sql := sqlCb(lastIdx) }()
this.RawQuery(
dataSource, {
sql, var lastIdx int64
params, for true {
func (err error, ds *DataSet) { hasNextData := false
if err == nil { sql := sqlCb(lastIdx)
this.RawQuery(
dataSource,
sql,
params,
func (err error, ds *DataSet) {
if err != nil {
return
}
for ds.Next() { for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx")) idx := q5.ToInt64(ds.GetByName("idx"))
if !doCb(ds) { if !doCb(ds) {
@ -174,13 +197,16 @@ func (this *dbPool) LoopLoad(
} }
hasNextData = true hasNextData = true
} }
} })
}) if hasNextData {
if hasNextData { chTimer <- nextTimeCb()
time.Sleep(nextTimeCb()) } else {
} else { lastIdx = 0
lastIdx = 0 chTimer <- nextRoundCb()
time.Sleep(nextRoundCb()) }
newDataCond.L.Lock()
newDataCond.Wait()
newDataCond.L.Unlock()
} }
} }
} }