diff --git a/dbpool.go b/dbpool.go index fcdfd3f..66c7e78 100644 --- a/dbpool.go +++ b/dbpool.go @@ -240,12 +240,14 @@ func (this *dbPool) IncrementLoad( dataSource string, name string, tblName string, + initIdx int64, sqlCb func(int64, int64) (string, []string), + lastIdxChgCb func(int64), doCb func(*DataSet) bool) { if this.style != GO_STYLE_DB { panic("dbpool.IncrementLoad is not gostyle") } - var lastIdx int64 + lastIdx := initIdx var maxIdx int64 var newMaxIdx int64 cond := sync.NewCond(new(sync.Mutex)) @@ -256,7 +258,7 @@ func (this *dbPool) IncrementLoad( select { case idx := <-*chNewMaxIdx: { - q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &idx) + q5.AtomicStoreInt64WhenGreater(&idx, &newMaxIdx) cond.Broadcast() } } @@ -265,6 +267,7 @@ func (this *dbPool) IncrementLoad( } for { if lastIdx < maxIdx { + oldLastIdx := lastIdx sql, params := sqlCb(lastIdx, maxIdx) this.syncInternalQuery( dataSource, @@ -277,7 +280,6 @@ func (this *dbPool) IncrementLoad( } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) - fmt.Println("IncrementLoad idx:%d lastIdx:%d", idx, lastIdx) if !doCb(ds) { return } @@ -291,6 +293,9 @@ func (this *dbPool) IncrementLoad( lastIdx = maxIdx } }) + if lastIdx > oldLastIdx { + lastIdxChgCb(lastIdx) + } } q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx) if lastIdx >= maxIdx {