diff --git a/dbpool.go b/dbpool.go index c9a2fea..67b5671 100644 --- a/dbpool.go +++ b/dbpool.go @@ -145,85 +145,41 @@ func (this *dbPool) LoopLoad( doCb func(*DataSet) bool) { newDataCond := sync.NewCond(new(sync.Mutex)) chTimer := make(chan int64) + go this.discoverNewData(dataSource, watchTable, watchTimeCb, newDataCond) + go q5.CreateCondTimer(chTimer, newDataCond) - go func () { - var lastMaxIdx int64 - for true { - hasNewDdata := false - this.RawQuery( - dataSource, - fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", watchTable), - []string{}, - func (err error, ds *DataSet) { - if err != nil { - return - } - if ds.Next() { - idx := q5.ToInt64(ds.GetByName("max_idx")) - if idx > lastMaxIdx { - lastMaxIdx = idx - hasNewDdata = true - GetSysLog().Info("%s hasNewData max_idx:%d", watchTable, lastMaxIdx) - } - } - }) - if hasNewDdata { - newDataCond.Broadcast() - } else { - time.Sleep(time.Second * time.Duration(watchTimeCb())) - } - } - }() - - go func () { - var waitSecond int64 = 10 - for { - select { - case waitSecond = <-chTimer: - if waitSecond < 10 { - waitSecond = 10 + var lastIdx int64 + for true { + hasNextData := false + sql := sqlCb(lastIdx) + this.RawQuery( + dataSource, + sql, + params, + func (err error, ds *DataSet) { + if err != nil { + return } - case <-time.After(time.Second * time.Duration(waitSecond)): - waitSecond = 10 - newDataCond.Broadcast() - } - } - }() - - { - var lastIdx int64 - for true { - hasNextData := false - sql := sqlCb(lastIdx) - this.RawQuery( - dataSource, - sql, - params, - func (err error, ds *DataSet) { - if err != nil { + for ds.Next() { + idx := q5.ToInt64(ds.GetByName("idx")) + if !doCb(ds) { return } - for ds.Next() { - idx := q5.ToInt64(ds.GetByName("idx")) - if !doCb(ds) { - return - } - if idx > lastIdx { - lastIdx = idx - } - hasNextData = true + if idx > lastIdx { + lastIdx = idx } - }) - if hasNextData { - chTimer <- nextTimeCb() - } else { - lastIdx = 0 - chTimer <- nextRoundCb() - } - newDataCond.L.Lock() - newDataCond.Wait() - newDataCond.L.Unlock() + hasNextData = true + } + }) + if hasNextData { + chTimer <- nextTimeCb() + } else { + lastIdx = 0 + chTimer <- nextRoundCb() } + newDataCond.L.Lock() + newDataCond.Wait() + newDataCond.L.Unlock() } } @@ -699,3 +655,33 @@ func (this *dbPool) queryOne(dataSource string, sql string, params []string, go this.internalQueryOne(dataSource, sql, params, cb) } } + +func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watchTimeCb func() int64, + cond *sync.Cond) { + var lastMaxIdx int64 + for true { + hasNewDdata := false + this.RawQuery( + dataSource, + fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", watchTable), + []string{}, + func (err error, ds *DataSet) { + if err != nil { + return + } + if ds.Next() { + idx := q5.ToInt64(ds.GetByName("max_idx")) + if idx > lastMaxIdx { + lastMaxIdx = idx + hasNewDdata = true + GetSysLog().Info("%s hasNewData max_idx:%d", watchTable, lastMaxIdx) + } + } + }) + if hasNewDdata { + cond.Broadcast() + } else { + time.Sleep(time.Second * time.Duration(watchTimeCb())) + } + } +}