This commit is contained in:
aozhiwei 2024-06-22 13:38:53 +08:00
parent 330f53df3d
commit 04c7b77bd7
2 changed files with 45 additions and 1 deletions

View File

@ -128,6 +128,44 @@ func (this *dbPool) SyncBatchLoadFullTable(dataSource string, sqlTpl string,
return lastIdx
}
func (this *dbPool) LoopLoad(
dataSource string,
sqlCb func(int64) string,
params []string,
nextTimeCb func() time.Duration,
nextRoundCb func() time.Duration,
doCb func(*DataSet) bool) {
var lastIdx int64
for true {
hasNextData := false
sql := sqlCb(lastIdx)
this.RawQuery(
dataSource,
sql,
params,
func (err error, ds *DataSet) {
if err == nil {
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
if idx > lastIdx {
lastIdx = idx
}
if doCb(ds) {
}
hasNextData = true
}
}
})
if hasNextData {
time.Sleep(nextTimeCb())
} else {
lastIdx = 0
time.Sleep(nextRoundCb())
}
}
}
func (this *dbPool) SelectLike(
dataSource string,
tblName string,
@ -446,6 +484,9 @@ func (this *dbPool) internalExec(dataSource string, sql string, params []string,
this.returnConn(ds)
result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...)
//this.returnConn(ds)
if err != nil {
GetSysLog().Warning("f5.dbpool.internalExec error:%s sql:%s\\%s", err, sql, q5.GetCallStack())
}
var lastInsertId int64
var rowsAffected int64

View File

@ -130,7 +130,10 @@ func (this *RawMetaTable[T]) Load() {
panic(fmt.Sprintf("error json format %s", this.FileName))
}
var rows []map[string]interface{}
json.Unmarshal([]byte(jsonStr), &rows)
err1 := json.Unmarshal([]byte(jsonStr), &rows)
if err1 != nil {
panic(fmt.Sprintf("load metafile json decode aerror %s %s", this.FileName, err1))
}
for _, row := range rows {
var obj = new(T)
var x interface{} = obj