diff --git a/dataset.go b/dataset.go index 30aec0a..1065da6 100644 --- a/dataset.go +++ b/dataset.go @@ -10,6 +10,7 @@ type DataSet struct { columns []string values []interface{} numOfReaded int64 + closed bool } //已读取函数(调用Next成功的次数) @@ -17,7 +18,17 @@ func (this *DataSet) NumOfReaded() int64 { return this.numOfReaded } +func (this *DataSet) close() { + if this.rows != nil { + this.rows.Close() + } + this.closed = true +} + func (this *DataSet) Next() bool { + if this.closed { + panic("DataSet is closed") + } ret := this.rows.Next() if !ret { return ret diff --git a/dbpool.go b/dbpool.go index 10462d7..1d31b54 100644 --- a/dbpool.go +++ b/dbpool.go @@ -91,6 +91,7 @@ func (this *dbPool) SelectCustomQuery(dataSource string, sql string, cb QueryRes func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb QueryResultCb) { chDone := make(chan bool) + chCbDone := make(chan bool) params := []string{} var e error var d *DataSet @@ -99,12 +100,20 @@ func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb Quer e = err d = ds chDone <- true + for { + select { + case <-chCbDone: + close(chCbDone) + return + } + } }) for { select { case <-chDone: close(chDone) cb(e, d) + chCbDone <- true return } } @@ -387,8 +396,9 @@ func (this *dbPool) internalExec(dataSource string, sql string, params []string, cb(errors.New("borrowConn error"), int64(0), int64(0)) return } - result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...) this.returnConn(ds) + result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...) + //this.returnConn(ds) var lastInsertId int64 var rowsAffected int64 @@ -417,18 +427,31 @@ func (this *dbPool) internalQuery(dataSource string, sql string, params []string cb(errors.New("borrowConn error"), nil) return } - rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) this.returnConn(ds) + rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) + //this.returnConn(ds) if err != nil { GetSysLog().Warning("f5.dbpool.internalQuery error:%s sql:%s", err, sql) } + var dataSet *DataSet + if err == nil { + dataSet = NewDataSet(rows) + } + freeFunc := func () { + if dataSet != nil { + dataSet.close() + } + } + if this.style == GO_STYLE_DB { - cb(err, NewDataSet(rows)) + defer freeFunc() + cb(err, dataSet) } else { _app.RegisterMainThreadCb( func() { - cb(err, NewDataSet(rows)) + defer freeFunc() + cb(err, dataSet) }) } } @@ -440,13 +463,25 @@ func (this *dbPool) internalQueryNoMainThread(dataSource string, sql string, par cb(errors.New("borrowConn error"), nil) return } - rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) this.returnConn(ds) + rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) + //this.returnConn(ds) if err != nil { GetSysLog().Warning("f5.dbpool.internalQueryNoMainThread error:%s sql:%s", err, sql) } - cb(err, NewDataSet(rows)) + var dataSet *DataSet + if err == nil { + dataSet = NewDataSet(rows) + } + freeFunc := func () { + if dataSet != nil { + dataSet.close() + } + } + + defer freeFunc() + cb(err, dataSet) } func (this *dbPool) internalQueryOne(dataSource string, sql string, params []string, @@ -456,8 +491,9 @@ func (this *dbPool) internalQueryOne(dataSource string, sql string, params []str cb(errors.New("borrowConn error"), nil) return } - rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) this.returnConn(ds) + rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) + //this.returnConn(ds) if err != nil { GetSysLog().Warning("f5.dbpool.internalQueryOne error:%s sql:%s", err, sql) } @@ -466,12 +502,19 @@ func (this *dbPool) internalQueryOne(dataSource string, sql string, params []str if err == nil { dataSet = NewDataSet(rows) } + freeFunc := func () { + if dataSet != nil { + dataSet.close() + } + } if this.style == GO_STYLE_DB { + defer freeFunc() cb(err, dataSet) } else { _app.RegisterMainThreadCb( func() { + defer freeFunc() cb(err, dataSet) }) }