This commit is contained in:
aozhiwei 2024-07-27 15:09:33 +08:00
parent 31e33a0e2d
commit 461f66af6a

View File

@ -358,6 +358,83 @@ func (this *dbPool) IncrementLoad(
}
}
func (this *dbPool) LoopLoadNew(
dataSource string,
name string,
tblName string,
initIdx int64,
sqlCb func(int64, int64) (string, []string),
doCb func(*DataSet) bool) {
if this.style != GO_STYLE_DB {
panic("dbpool.IncrementLoad is not gostyle")
}
lastIdx := initIdx
var maxIdx int64
var newMaxIdx int64
cond := sync.NewCond(new(sync.Mutex))
item := new(maxIdxMonitorProgItem)
item.lastIdx = &lastIdx
{
chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, item)
go func () {
for {
select {
case idx := <-*chNewMaxIdx:
{
q5.AtomicStoreInt64WhenGreater(&idx, &newMaxIdx)
cond.Broadcast()
}
}
}
}()
}
for {
if lastIdx < maxIdx {
sql, params := sqlCb(lastIdx, maxIdx)
this.syncInternalQuery(
dataSource,
sql,
params,
func (err error, ds *DataSet) {
item.execTimes += 1
if err != nil {
item.errTimes += 1
time.Sleep(time.Second * 10)
return
}
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
if !doCb(ds) {
item.abortTimes += 1
return
}
item.okTimes += 1
if idx > lastIdx {
lastIdx = idx
} else {
panic(fmt.Sprintf("IncrementLoad idx error:%s %s", idx, lastIdx))
}
}
if ds.NumOfReaded() <= 0 {
lastIdx = maxIdx
}
})
}
q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx)
if lastIdx >= maxIdx {
go func() {
time.Sleep(time.Millisecond * 1000 * 60)
cond.Broadcast()
}()
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
} else {
time.Sleep(time.Millisecond * 1000 * 3)
}
}
}
func (this *dbPool) SelectLike(
dataSource string,
tblName string,