diff --git a/dbpool.go b/dbpool.go index 7073aca..d30bba5 100644 --- a/dbpool.go +++ b/dbpool.go @@ -140,30 +140,53 @@ func (this *dbPool) LoopLoad( watchTimeCb func() int64, sqlCb func(int64) string, params []string, - nextTimeCb func() time.Duration, - nextRoundCb func() time.Duration, + nextTimeCb func() int64, + nextRoundCb func() int64, doCb func(*DataSet) bool) { - { - go func () { + newDataCond := sync.NewCond(new(sync.Mutex)) + chTimer := make(chan int64) + + go func () { + var lastMaxIdx int64 + for true { + hasNewDdata := false this.RawQuery( dataSource, - fmt.Sprintf("SELECT MAX(idx) FROM %s", watchTable), + fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", watchTable), []string{}, func (err error, ds *DataSet) { + if err != nil { + return + } + if ds.Next() { + idx := q5.ToInt64(ds.GetByName("max_idx")) + if idx > lastMaxIdx { + lastMaxIdx = idx + hasNewDdata = true + } + } }) - time.Sleep(time.Second * 3) - }() - } - var lastIdx int64 - for true { - hasNextData := false - sql := sqlCb(lastIdx) - this.RawQuery( - dataSource, - sql, - params, - func (err error, ds *DataSet) { - if err == nil { + if hasNewDdata { + newDataCond.Broadcast() + } else { + time.Sleep(time.Second * time.Duration(watchTimeCb())) + } + } + }() + + { + var lastIdx int64 + for true { + hasNextData := false + sql := sqlCb(lastIdx) + this.RawQuery( + dataSource, + sql, + params, + func (err error, ds *DataSet) { + if err != nil { + return + } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) if !doCb(ds) { @@ -174,13 +197,16 @@ func (this *dbPool) LoopLoad( } hasNextData = true } - } - }) - if hasNextData { - time.Sleep(nextTimeCb()) - } else { - lastIdx = 0 - time.Sleep(nextRoundCb()) + }) + if hasNextData { + chTimer <- nextTimeCb() + } else { + lastIdx = 0 + chTimer <- nextRoundCb() + } + newDataCond.L.Lock() + newDataCond.Wait() + newDataCond.L.Unlock() } } }