From 461f66af6a994d308e6db1b5ac660e1ed3fd2e5f Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Sat, 27 Jul 2024 15:09:33 +0800 Subject: [PATCH] 1 --- dbpool.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/dbpool.go b/dbpool.go index 3cdeea1..e8d71ac 100644 --- a/dbpool.go +++ b/dbpool.go @@ -358,6 +358,83 @@ func (this *dbPool) IncrementLoad( } } +func (this *dbPool) LoopLoadNew( + dataSource string, + name string, + tblName string, + initIdx int64, + sqlCb func(int64, int64) (string, []string), + doCb func(*DataSet) bool) { + if this.style != GO_STYLE_DB { + panic("dbpool.IncrementLoad is not gostyle") + } + lastIdx := initIdx + var maxIdx int64 + var newMaxIdx int64 + cond := sync.NewCond(new(sync.Mutex)) + item := new(maxIdxMonitorProgItem) + item.lastIdx = &lastIdx + { + chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, item) + go func () { + for { + select { + case idx := <-*chNewMaxIdx: + { + q5.AtomicStoreInt64WhenGreater(&idx, &newMaxIdx) + cond.Broadcast() + } + } + } + }() + } + for { + if lastIdx < maxIdx { + sql, params := sqlCb(lastIdx, maxIdx) + this.syncInternalQuery( + dataSource, + sql, + params, + func (err error, ds *DataSet) { + item.execTimes += 1 + if err != nil { + item.errTimes += 1 + time.Sleep(time.Second * 10) + return + } + for ds.Next() { + idx := q5.ToInt64(ds.GetByName("idx")) + if !doCb(ds) { + item.abortTimes += 1 + return + } + item.okTimes += 1 + if idx > lastIdx { + lastIdx = idx + } else { + panic(fmt.Sprintf("IncrementLoad idx error:%s %s", idx, lastIdx)) + } + } + if ds.NumOfReaded() <= 0 { + lastIdx = maxIdx + } + }) + } + q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx) + if lastIdx >= maxIdx { + go func() { + time.Sleep(time.Millisecond * 1000 * 60) + cond.Broadcast() + }() + cond.L.Lock() + cond.Wait() + cond.L.Unlock() + } else { + time.Sleep(time.Millisecond * 1000 * 3) + } + } +} + func (this *dbPool) SelectLike( dataSource string, tblName string,