This commit is contained in:
aozhiwei 2024-04-18 14:33:34 +08:00
parent d15c5a061f
commit c31eaf2c49
2 changed files with 61 additions and 7 deletions

View File

@ -10,6 +10,7 @@ type DataSet struct {
columns []string columns []string
values []interface{} values []interface{}
numOfReaded int64 numOfReaded int64
closed bool
} }
//已读取函数(调用Next成功的次数) //已读取函数(调用Next成功的次数)
@ -17,7 +18,17 @@ func (this *DataSet) NumOfReaded() int64 {
return this.numOfReaded return this.numOfReaded
} }
func (this *DataSet) close() {
if this.rows != nil {
this.rows.Close()
}
this.closed = true
}
func (this *DataSet) Next() bool { func (this *DataSet) Next() bool {
if this.closed {
panic("DataSet is closed")
}
ret := this.rows.Next() ret := this.rows.Next()
if !ret { if !ret {
return ret return ret

View File

@ -91,6 +91,7 @@ func (this *dbPool) SelectCustomQuery(dataSource string, sql string, cb QueryRes
func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb QueryResultCb) { func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb QueryResultCb) {
chDone := make(chan bool) chDone := make(chan bool)
chCbDone := make(chan bool)
params := []string{} params := []string{}
var e error var e error
var d *DataSet var d *DataSet
@ -99,12 +100,20 @@ func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb Quer
e = err e = err
d = ds d = ds
chDone <- true chDone <- true
for {
select {
case <-chCbDone:
close(chCbDone)
return
}
}
}) })
for { for {
select { select {
case <-chDone: case <-chDone:
close(chDone) close(chDone)
cb(e, d) cb(e, d)
chCbDone <- true
return return
} }
} }
@ -387,8 +396,9 @@ func (this *dbPool) internalExec(dataSource string, sql string, params []string,
cb(errors.New("borrowConn error"), int64(0), int64(0)) cb(errors.New("borrowConn error"), int64(0), int64(0))
return return
} }
result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...)
this.returnConn(ds) this.returnConn(ds)
result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...)
//this.returnConn(ds)
var lastInsertId int64 var lastInsertId int64
var rowsAffected int64 var rowsAffected int64
@ -417,18 +427,31 @@ func (this *dbPool) internalQuery(dataSource string, sql string, params []string
cb(errors.New("borrowConn error"), nil) cb(errors.New("borrowConn error"), nil)
return return
} }
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
this.returnConn(ds) this.returnConn(ds)
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
//this.returnConn(ds)
if err != nil { if err != nil {
GetSysLog().Warning("f5.dbpool.internalQuery error:%s sql:%s", err, sql) GetSysLog().Warning("f5.dbpool.internalQuery error:%s sql:%s", err, sql)
} }
var dataSet *DataSet
if err == nil {
dataSet = NewDataSet(rows)
}
freeFunc := func () {
if dataSet != nil {
dataSet.close()
}
}
if this.style == GO_STYLE_DB { if this.style == GO_STYLE_DB {
cb(err, NewDataSet(rows)) defer freeFunc()
cb(err, dataSet)
} else { } else {
_app.RegisterMainThreadCb( _app.RegisterMainThreadCb(
func() { func() {
cb(err, NewDataSet(rows)) defer freeFunc()
cb(err, dataSet)
}) })
} }
} }
@ -440,13 +463,25 @@ func (this *dbPool) internalQueryNoMainThread(dataSource string, sql string, par
cb(errors.New("borrowConn error"), nil) cb(errors.New("borrowConn error"), nil)
return return
} }
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
this.returnConn(ds) this.returnConn(ds)
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
//this.returnConn(ds)
if err != nil { if err != nil {
GetSysLog().Warning("f5.dbpool.internalQueryNoMainThread error:%s sql:%s", err, sql) GetSysLog().Warning("f5.dbpool.internalQueryNoMainThread error:%s sql:%s", err, sql)
} }
cb(err, NewDataSet(rows)) var dataSet *DataSet
if err == nil {
dataSet = NewDataSet(rows)
}
freeFunc := func () {
if dataSet != nil {
dataSet.close()
}
}
defer freeFunc()
cb(err, dataSet)
} }
func (this *dbPool) internalQueryOne(dataSource string, sql string, params []string, func (this *dbPool) internalQueryOne(dataSource string, sql string, params []string,
@ -456,8 +491,9 @@ func (this *dbPool) internalQueryOne(dataSource string, sql string, params []str
cb(errors.New("borrowConn error"), nil) cb(errors.New("borrowConn error"), nil)
return return
} }
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
this.returnConn(ds) this.returnConn(ds)
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
//this.returnConn(ds)
if err != nil { if err != nil {
GetSysLog().Warning("f5.dbpool.internalQueryOne error:%s sql:%s", err, sql) GetSysLog().Warning("f5.dbpool.internalQueryOne error:%s sql:%s", err, sql)
} }
@ -466,12 +502,19 @@ func (this *dbPool) internalQueryOne(dataSource string, sql string, params []str
if err == nil { if err == nil {
dataSet = NewDataSet(rows) dataSet = NewDataSet(rows)
} }
freeFunc := func () {
if dataSet != nil {
dataSet.close()
}
}
if this.style == GO_STYLE_DB { if this.style == GO_STYLE_DB {
defer freeFunc()
cb(err, dataSet) cb(err, dataSet)
} else { } else {
_app.RegisterMainThreadCb( _app.RegisterMainThreadCb(
func() { func() {
defer freeFunc()
cb(err, dataSet) cb(err, dataSet)
}) })
} }