This commit is contained in:
aozhiwei 2024-06-29 20:44:27 +08:00
parent 83692803ab
commit 71fae94866

132
dbpool.go
View File

@ -145,85 +145,41 @@ func (this *dbPool) LoopLoad(
doCb func(*DataSet) bool) { doCb func(*DataSet) bool) {
newDataCond := sync.NewCond(new(sync.Mutex)) newDataCond := sync.NewCond(new(sync.Mutex))
chTimer := make(chan int64) chTimer := make(chan int64)
go this.discoverNewData(dataSource, watchTable, watchTimeCb, newDataCond)
go q5.CreateCondTimer(chTimer, newDataCond)
go func () { var lastIdx int64
var lastMaxIdx int64 for true {
for true { hasNextData := false
hasNewDdata := false sql := sqlCb(lastIdx)
this.RawQuery( this.RawQuery(
dataSource, dataSource,
fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", watchTable), sql,
[]string{}, params,
func (err error, ds *DataSet) { func (err error, ds *DataSet) {
if err != nil { if err != nil {
return return
}
if ds.Next() {
idx := q5.ToInt64(ds.GetByName("max_idx"))
if idx > lastMaxIdx {
lastMaxIdx = idx
hasNewDdata = true
GetSysLog().Info("%s hasNewData max_idx:%d", watchTable, lastMaxIdx)
}
}
})
if hasNewDdata {
newDataCond.Broadcast()
} else {
time.Sleep(time.Second * time.Duration(watchTimeCb()))
}
}
}()
go func () {
var waitSecond int64 = 10
for {
select {
case waitSecond = <-chTimer:
if waitSecond < 10 {
waitSecond = 10
} }
case <-time.After(time.Second * time.Duration(waitSecond)): for ds.Next() {
waitSecond = 10 idx := q5.ToInt64(ds.GetByName("idx"))
newDataCond.Broadcast() if !doCb(ds) {
}
}
}()
{
var lastIdx int64
for true {
hasNextData := false
sql := sqlCb(lastIdx)
this.RawQuery(
dataSource,
sql,
params,
func (err error, ds *DataSet) {
if err != nil {
return return
} }
for ds.Next() { if idx > lastIdx {
idx := q5.ToInt64(ds.GetByName("idx")) lastIdx = idx
if !doCb(ds) {
return
}
if idx > lastIdx {
lastIdx = idx
}
hasNextData = true
} }
}) hasNextData = true
if hasNextData { }
chTimer <- nextTimeCb() })
} else { if hasNextData {
lastIdx = 0 chTimer <- nextTimeCb()
chTimer <- nextRoundCb() } else {
} lastIdx = 0
newDataCond.L.Lock() chTimer <- nextRoundCb()
newDataCond.Wait()
newDataCond.L.Unlock()
} }
newDataCond.L.Lock()
newDataCond.Wait()
newDataCond.L.Unlock()
} }
} }
@ -699,3 +655,33 @@ func (this *dbPool) queryOne(dataSource string, sql string, params []string,
go this.internalQueryOne(dataSource, sql, params, cb) go this.internalQueryOne(dataSource, sql, params, cb)
} }
} }
func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watchTimeCb func() int64,
cond *sync.Cond) {
var lastMaxIdx int64
for true {
hasNewDdata := false
this.RawQuery(
dataSource,
fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", watchTable),
[]string{},
func (err error, ds *DataSet) {
if err != nil {
return
}
if ds.Next() {
idx := q5.ToInt64(ds.GetByName("max_idx"))
if idx > lastMaxIdx {
lastMaxIdx = idx
hasNewDdata = true
GetSysLog().Info("%s hasNewData max_idx:%d", watchTable, lastMaxIdx)
}
}
})
if hasNewDdata {
cond.Broadcast()
} else {
time.Sleep(time.Second * time.Duration(watchTimeCb()))
}
}
}