This commit is contained in:
aozhiwei 2024-07-23 14:44:11 +08:00
parent c842ce5e34
commit ea3ed6fc66

View File

@ -240,12 +240,14 @@ func (this *dbPool) IncrementLoad(
dataSource string, dataSource string,
name string, name string,
tblName string, tblName string,
initIdx int64,
sqlCb func(int64, int64) (string, []string), sqlCb func(int64, int64) (string, []string),
lastIdxChgCb func(int64),
doCb func(*DataSet) bool) { doCb func(*DataSet) bool) {
if this.style != GO_STYLE_DB { if this.style != GO_STYLE_DB {
panic("dbpool.IncrementLoad is not gostyle") panic("dbpool.IncrementLoad is not gostyle")
} }
var lastIdx int64 lastIdx := initIdx
var maxIdx int64 var maxIdx int64
var newMaxIdx int64 var newMaxIdx int64
cond := sync.NewCond(new(sync.Mutex)) cond := sync.NewCond(new(sync.Mutex))
@ -256,7 +258,7 @@ func (this *dbPool) IncrementLoad(
select { select {
case idx := <-*chNewMaxIdx: case idx := <-*chNewMaxIdx:
{ {
q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &idx) q5.AtomicStoreInt64WhenGreater(&idx, &newMaxIdx)
cond.Broadcast() cond.Broadcast()
} }
} }
@ -265,6 +267,7 @@ func (this *dbPool) IncrementLoad(
} }
for { for {
if lastIdx < maxIdx { if lastIdx < maxIdx {
oldLastIdx := lastIdx
sql, params := sqlCb(lastIdx, maxIdx) sql, params := sqlCb(lastIdx, maxIdx)
this.syncInternalQuery( this.syncInternalQuery(
dataSource, dataSource,
@ -277,7 +280,6 @@ func (this *dbPool) IncrementLoad(
} }
for ds.Next() { for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx")) idx := q5.ToInt64(ds.GetByName("idx"))
fmt.Println("IncrementLoad idx:%d lastIdx:%d", idx, lastIdx)
if !doCb(ds) { if !doCb(ds) {
return return
} }
@ -291,6 +293,9 @@ func (this *dbPool) IncrementLoad(
lastIdx = maxIdx lastIdx = maxIdx
} }
}) })
if lastIdx > oldLastIdx {
lastIdxChgCb(lastIdx)
}
} }
q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx) q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx)
if lastIdx >= maxIdx { if lastIdx >= maxIdx {