f5/dbpool.go
aozhiwei 461f66af6a 1
2024-07-27 15:09:33 +08:00

1081 lines
24 KiB
Go

package f5
import (
"errors"
"fmt"
"math"
"q5"
"strings"
"sync"
"sync/atomic"
"math/rand"
"time"
)
type DBStyle int32
const (
GO_STYLE_DB DBStyle = iota
JS_STYLE_DB
)
type dataSource struct {
name string
conn *q5.Mysql
entry q5.ListHead
}
type maxIdxMonitorProgItem struct {
execTimes int64
okTimes int64
errTimes int64
abortTimes int64
lastIdx *int64
}
type maxIdxMonitorProg struct {
dataSource string
tblName string
maxIdx *int64
itemHash *q5.ConcurrentMap[string, *maxIdxMonitorProgItem]
}
type dbPool struct {
style DBStyle
lock sync.Mutex
dataSourceHash map[string]*q5.ListHead
currMonitorIdx int64
monitorLock sync.Mutex
maxIdxMonitorHash *q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]]
maxIdxMonitorProgHash *q5.ConcurrentMap[string, *maxIdxMonitorProg]
}
func (this *dbPool) init(style DBStyle) {
this.style = style
this.dataSourceHash = make(map[string]*q5.ListHead)
this.maxIdxMonitorHash = new(q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]])
this.maxIdxMonitorProgHash = new(q5.ConcurrentMap[string, *maxIdxMonitorProg])
go this.outputMonitorLog()
}
func (this *dbPool) outputMonitorLog() {
for {
if this.maxIdxMonitorProgHash.GetSize() > 0 {
GetSysLog().Info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
this.maxIdxMonitorProgHash.Range(func (k string, ele *maxIdxMonitorProg) bool {
ele.itemHash.Range(func (k1 string, ele2 *maxIdxMonitorProgItem) bool {
GetSysLog().Info("%s.%s.%s maxIdx:%d lastIdx:%d exec:%d ok:%d err:%d abort:%d",
ele.dataSource,
ele.tblName,
k1,
*ele.maxIdx,
*ele2.lastIdx,
ele2.execTimes,
ele2.okTimes,
ele2.errTimes,
ele2.abortTimes)
return true
})
return true
})
GetSysLog().Info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
}
time.Sleep(time.Second * 10)
}
}
func (this *dbPool) unInit() {
}
func (this *dbPool) RegisterDataSource(name string, host string, port int32,
user string, passwd string, dataBase string, size int32, maxOpenConns int32, maxIdleConns int32) {
this.lock.Lock()
defer this.lock.Unlock()
var head *q5.ListHead
if val, ok := this.dataSourceHash[name]; ok {
head = val
} else {
head = q5.NewListHead()
this.dataSourceHash[name] = head
}
if maxOpenConns <= 0 {
maxOpenConns = 1
}
if maxIdleConns <= 0 {
maxIdleConns = 1
}
for i := int32(0); i < size; i++ {
ds := dataSource{}
ds.name = name
ds.conn = q5.NewMysql(host, port, user, passwd, dataBase, maxOpenConns, maxIdleConns)
ds.entry.Init(&ds)
head.AddTail(&ds.entry)
err := ds.conn.Open()
if err != nil {
panic(fmt.Sprintf("RegisterDataSource err:%s", err))
}
}
}
func (this *dbPool) Select(
dataSource string,
tblName string,
fields []string,
whereKv [][]string,
cb QueryResultCb) {
params := []string{}
sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName)
this.joinWhere(&sql, &params, whereKv)
this.query(dataSource, sql, params, cb)
}
func (this *dbPool) OrmSelect(
dataSource string,
tblName string,
whereKv [][]string,
cb QueryResultCb) {
params := []string{}
sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName)
this.joinWhere(&sql, &params, whereKv)
this.query(dataSource, sql, params, cb)
}
func (this *dbPool) RawQuery(dataSource string, sql string, params []string, cb QueryResultCb) {
this.query(dataSource, sql, params, cb)
}
func (this *dbPool) SelectCustomQuery(dataSource string, sql string, cb QueryResultCb) {
params := []string{}
this.query(dataSource, sql, params, cb)
}
func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb QueryResultCb) {
params := []string{}
this.syncInternalQuery(dataSource, sql, params, cb)
}
func (this *dbPool) SyncBatchLoadFullTable(dataSource string, sqlTpl string,
cb func(*DataSet), errCb func(error)) int64 {
var lastIdx int64
var done = false
for !done {
this.SyncSelectCustomQuery(
dataSource,
fmt.Sprintf(sqlTpl, lastIdx),
func (err error, ds *DataSet) {
if err != nil {
errCb(err)
return
}
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
cb(ds)
if idx > lastIdx {
lastIdx = idx
} else {
panic(fmt.Sprintf("SyncBatchLoadFullTable idx error:%s %s", idx, lastIdx))
}
}
if ds.NumOfReaded() <= 0 {
done = true
}
})
}
return lastIdx
}
func (this *dbPool) BatchLoadFullTable(
dataSource string,
sqlCb func(int64) string,
params []string,
nextTimeCb func(),
doCb func(*DataSet) bool) error {
if this.style != GO_STYLE_DB {
panic("dbpool.BatchLoadFullTable is not gostyle")
}
var resultErr error
var lastIdx int64
done := false
for !done && resultErr == nil {
this.syncInternalQuery(
dataSource,
sqlCb(lastIdx),
params,
func (err error, ds *DataSet) {
if err != nil {
resultErr = err
return
}
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
fmt.Println("BatchLoadFullTable idx:%d lastIdx:%d", idx, lastIdx)
doCb(ds)
if idx > lastIdx {
lastIdx = idx
} else {
panic(fmt.Sprintf("BatchLoadFullTable idx error:%s %s", idx, lastIdx))
}
}
if ds.NumOfReaded() <= 0 {
done = true
}
})
nextTimeCb()
}
return resultErr
}
func (this *dbPool) LoopLoad(
dataSource string,
name string,
watchTable string,
watchTimeCb func() int64,
sqlCb func(int64) string,
params []string,
nextTimeCb func() int64,
nextRoundCb func() int64,
doCb func(*DataSet) bool) {
cond := sync.NewCond(new(sync.Mutex))
chTimer := make(chan int64)
go this.discoverNewData(dataSource, watchTable, watchTimeCb, cond)
go q5.CreateCondTimer(chTimer, cond, 10)
var lastIdx int64
for true {
hasNextData := false
sql := sqlCb(lastIdx)
this.RawQuery(
dataSource,
sql,
params,
func (err error, ds *DataSet) {
if err != nil {
return
}
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
if !doCb(ds) {
return
}
if idx > lastIdx {
lastIdx = idx
}
hasNextData = true
}
})
if hasNextData {
chTimer <- nextTimeCb()
} else {
lastIdx = 0
chTimer <- nextRoundCb()
}
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
if !hasNextData {
GetSysLog().Info("dbpool.LoopLoad %s fetch next batch last_idx:%d", name, lastIdx)
} else {
GetSysLog().Info("dbpool.LoopLoad %s fetch next round last_idx:%d", name, lastIdx)
}
}
}
func (this *dbPool) IncrementLoad(
dataSource string,
name string,
tblName string,
initIdx int64,
sqlCb func(int64, int64) (string, []string),
lastIdxChgCb func(int64),
doCb func(*DataSet) bool) {
if this.style != GO_STYLE_DB {
panic("dbpool.IncrementLoad is not gostyle")
}
lastIdx := initIdx
var maxIdx int64
var newMaxIdx int64
cond := sync.NewCond(new(sync.Mutex))
item := new(maxIdxMonitorProgItem)
item.lastIdx = &lastIdx
{
chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, item)
go func () {
for {
select {
case idx := <-*chNewMaxIdx:
{
q5.AtomicStoreInt64WhenGreater(&idx, &newMaxIdx)
cond.Broadcast()
}
}
}
}()
}
for {
if lastIdx < maxIdx {
oldLastIdx := lastIdx
sql, params := sqlCb(lastIdx, maxIdx)
this.syncInternalQuery(
dataSource,
sql,
params,
func (err error, ds *DataSet) {
item.execTimes += 1
if err != nil {
item.errTimes += 1
time.Sleep(time.Second * 10)
return
}
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
if !doCb(ds) {
item.abortTimes += 1
return
}
item.okTimes += 1
if idx > lastIdx {
lastIdx = idx
} else {
panic(fmt.Sprintf("IncrementLoad idx error:%s %s", idx, lastIdx))
}
}
if ds.NumOfReaded() <= 0 {
lastIdx = maxIdx
}
})
if lastIdx > oldLastIdx {
lastIdxChgCb(lastIdx)
}
}
q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx)
if lastIdx >= maxIdx {
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
} else {
time.Sleep(time.Millisecond * 1000 * 3)
}
}
}
func (this *dbPool) LoopLoadNew(
dataSource string,
name string,
tblName string,
initIdx int64,
sqlCb func(int64, int64) (string, []string),
doCb func(*DataSet) bool) {
if this.style != GO_STYLE_DB {
panic("dbpool.IncrementLoad is not gostyle")
}
lastIdx := initIdx
var maxIdx int64
var newMaxIdx int64
cond := sync.NewCond(new(sync.Mutex))
item := new(maxIdxMonitorProgItem)
item.lastIdx = &lastIdx
{
chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, item)
go func () {
for {
select {
case idx := <-*chNewMaxIdx:
{
q5.AtomicStoreInt64WhenGreater(&idx, &newMaxIdx)
cond.Broadcast()
}
}
}
}()
}
for {
if lastIdx < maxIdx {
sql, params := sqlCb(lastIdx, maxIdx)
this.syncInternalQuery(
dataSource,
sql,
params,
func (err error, ds *DataSet) {
item.execTimes += 1
if err != nil {
item.errTimes += 1
time.Sleep(time.Second * 10)
return
}
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
if !doCb(ds) {
item.abortTimes += 1
return
}
item.okTimes += 1
if idx > lastIdx {
lastIdx = idx
} else {
panic(fmt.Sprintf("IncrementLoad idx error:%s %s", idx, lastIdx))
}
}
if ds.NumOfReaded() <= 0 {
lastIdx = maxIdx
}
})
}
q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx)
if lastIdx >= maxIdx {
go func() {
time.Sleep(time.Millisecond * 1000 * 60)
cond.Broadcast()
}()
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
} else {
time.Sleep(time.Millisecond * 1000 * 3)
}
}
}
func (this *dbPool) SelectLike(
dataSource string,
tblName string,
fields []string,
whereKv [][]string,
likeWhere [][]string,
start int64,
limit int,
cb QueryResultCb) {
var params []string
sql := fmt.Sprintf("SELECT %s FROM %s WHERE idx > %d", this.joinSelectFields(fields), tblName, start)
this.joinWhere(&sql, &params, whereKv)
this.joinWhereLike(&sql, &params, likeWhere)
sql = fmt.Sprintf("%s ORDER BY idx ASC LIMIT %d", sql, limit)
this.query(dataSource, sql, params, cb)
}
func (this *dbPool) SelectOne(
dataSource string,
tblName string,
fields []string,
whereKv [][]string,
cb QueryOneCb) {
params := []string{}
sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName)
this.joinWhere(&sql, &params, whereKv)
this.queryOne(dataSource, sql, params, cb)
}
func (this *dbPool) OrmSelectOne(
dataSource string,
tblName string,
whereKv [][]string,
cb QueryOneCb) {
params := []string{}
sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName)
this.joinWhere(&sql, &params, whereKv)
this.queryOne(dataSource, sql, params, cb)
}
func (this *dbPool) Update(
dataSource string,
tblName string,
whereKv [][]string,
fieldsKv [][]string,
cb ExecResultCb) {
params := []string{}
sql := "UPDATE `" + tblName + "` SET " + this.joinUpdateFields(fieldsKv, &params) +
" WHERE 1=1"
this.joinWhere(&sql, &params, whereKv)
this.exec(dataSource, sql, params, cb)
}
func (this *dbPool) Insert(
dataSource string,
tblName string,
fieldsKv [][]string,
cb ExecResultCb) {
params := []string{}
sql := "INSERT INTO `" + tblName + "` " + this.joinInsertFields(fieldsKv, &params)
this.exec(dataSource, sql, params, cb)
}
func (this *dbPool) Upsert(
dataSource string,
tblName string,
whereKv [][]string,
updateKv [][]string,
insertKv [][]string,
cb ExecResultCb) {
this.OrmSelectOne(dataSource, tblName, whereKv,
func(err error, ds *DataSet) {
if err != nil {
cb(err, 0, 0)
return
}
if ds.Next() {
if len(updateKv) > 0 {
this.Update(dataSource, tblName, whereKv, updateKv, cb)
} else {
cb(nil, 0, 0)
}
} else {
this.Insert(dataSource, tblName, insertKv, cb)
}
})
}
func (this *dbPool) UpsertEx(
dataSource string,
tblName string,
whereKv [][]string,
updateKv [][]string,
insertKv [][]string,
cb ExecResultCb,
updateCb func(*DataSet) bool) {
this.OrmSelectOne(dataSource, tblName, whereKv,
func(err error, ds *DataSet) {
if err != nil {
cb(err, 0, 0)
return
}
if ds.Next() {
if len(updateKv) > 0 {
if updateCb(ds) {
this.Update(dataSource, tblName, whereKv, updateKv, cb)
}
}
} else {
if len(insertKv) > 0 {
this.Insert(dataSource, tblName, insertKv, cb)
}
}
})
}
func (this *dbPool) PageQuery(
dataSource string,
perPage int32,
page int32,
sql string,
params []string,
filter DbQueryFilter,
orderBy string,
cb PageQueryCb) {
var pagination Pagination
pagination.PerPage = q5.Max(1, perPage)
pagination.CurrentPage = q5.Max(1, page)
finalySql := sql
if filter != nil {
finalySql += filter.GenSql()
}
if orderBy != "" {
finalySql += " " + orderBy + " "
}
//GetSysLog().Info("finalySql:%s", finalySql)
this.queryOne(
dataSource,
fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql),
params,
func(err error, rows *DataSet) {
if err != nil {
cb(err, &pagination)
return
}
if rows != nil && rows.Next() {
pagination.Total = q5.ToInt32(rows.GetByIndex(0))
pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(rows.GetByIndex(0)) /
float64(pagination.PerPage)))
}
start := pagination.PerPage * (pagination.CurrentPage - 1)
limit := pagination.PerPage
this.query(
dataSource,
fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit),
params,
func(err error, rows *DataSet) {
if err != nil {
cb(err, &pagination)
return
}
pagination.Rows = rows
cb(nil, &pagination)
})
})
}
func (this *dbPool) StreamPageQuery2(
dataSource string,
pageSize int32,
cursor int64,
sql string,
params []string,
filter DbQueryFilter,
orderBy string,
cb SteamPageQueryCb,
fillCb func(*DataSet)) {
if (pageSize <= 0) {
pageSize = 1
}
if (pageSize > 1000) {
pageSize = 1000
}
var pagination StreamPagination
finalySql := sql
if filter != nil {
finalySql += filter.GenSql()
}
if orderBy != "" {
finalySql += " " + orderBy + " "
}
finalySql += fmt.Sprintf(" LIMIT %d ", pageSize + 1)
//GetSysLog().Info("finalySql:%s", finalySql)
this.query(
dataSource,
finalySql,
params,
func(err error, rows *DataSet) {
if err != nil {
cb(err, &pagination)
return
}
pagination.PreviousCursor = cursor
for rows.Next() {
if (rows.NumOfReaded() <= int64(pageSize)) {
pagination.Count += 1
fillCb(rows)
if (rows.NumOfReaded() == int64(pageSize)) {
pagination.NextCursor = q5.ToInt64(rows.GetByName("idx"))
}
} else if (rows.NumOfReaded() > int64(pageSize)) {
pagination.Remaining = 1
}
}
cb(nil, &pagination)
})
}
func (this *dbPool) StreamPageQuery(
dataSource string,
pageSize int32,
cursor int64,
sql string,
params []string,
filter DbQueryFilter,
orderBy string,
cb SteamPageQueryCb,
fillCb func(*DataSet)) {
if (pageSize <= 0) {
pageSize = 1
}
if (pageSize > 1000) {
pageSize = 1000
}
var pagination StreamPagination
finalySql := sql
if filter != nil {
finalySql += filter.GenSql()
}
if orderBy != "" {
finalySql += " " + orderBy + " "
}
//finalySql += fmt.Sprintf(" LIMIT %d ", pageSize + 1)
//GetSysLog().Info("finalySql:%s", finalySql)
this.queryOne(
dataSource,
fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql),
params,
func(err error, rows *DataSet) {
if err != nil {
cb(err, &pagination)
return
}
var total int32
var totalPages int32
if rows != nil && rows.Next() {
total = q5.ToInt32(rows.GetByIndex(0))
totalPages = int32(math.Ceil(q5.ToFloat64(rows.GetByIndex(0)) /
float64(pageSize)))
}
if cursor <= 0 {
cursor = 1
}
start := pageSize * (int32(cursor) - 1)
limit := pageSize
this.query(
dataSource,
fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit),
params,
func(err error, rows *DataSet) {
if err != nil {
cb(err, &pagination)
return
}
//pagination.Rows = rows
pagination.PreviousCursor = cursor
for rows.Next() {
fillCb(rows)
pagination.Count += 1
}
if int32(cursor) < totalPages {
pagination.NextCursor = cursor + 1
pagination.Remaining = 1
}
pagination.TotalCount = total
cb(nil, &pagination)
})
})
}
func (this *dbPool) borrowConn(name string) *dataSource {
tryCount := 0
for tryCount < 5 {
{
this.lock.Lock()
if head, ok := this.dataSourceHash[name]; ok {
if !head.Empty() {
ds := head.FirstEntry().(*dataSource)
//ds.entry.DelInit()
this.lock.Unlock()
return ds
}
}
this.lock.Unlock()
}
time.Sleep(time.Second * 1)
tryCount++
}
return nil
/*
tryCount := 0
for tryCount < 5 {
{
this.lock.Lock()
if head, ok := this.dataSourceHash[name]; ok {
if !head.Empty() {
ds := head.FirstEntry().(*dataSource)
ds.entry.DelInit()
this.lock.Unlock()
return ds
}
}
this.lock.Unlock()
}
time.Sleep(time.Second * 1)
tryCount++
}
return nil*/
}
func (this *dbPool) returnConn(ds *dataSource) {
this.lock.Lock()
defer this.lock.Unlock()
/*
if head, ok := this.dataSourceHash[ds.name]; ok {
head.AddTail(&ds.entry)
} else {
panic(fmt.Sprintf("returnConn error %s", ds.name))
}*/
}
func (this *dbPool) joinSelectFields(fields []string) string {
return strings.Join(
q5.Map(fields,
func(val string) string {
return "`" + val + "`"
}),
", ")
}
func (this *dbPool) joinWhere(sql *string, params *[]string, whereKv [][]string) {
for _, items := range whereKv {
*sql += " AND " + items[0] + "=?"
*params = append(*params, items[1])
}
}
func (this *dbPool) joinWhereLike(sql *string, params *[]string, whereKv [][]string) {
for _, items := range whereKv {
*sql += " AND " + items[0] + " LIKE ? "
*params = append(*params, items[1])
}
}
func (this *dbPool) joinUpdateFields(fieldsKv [][]string, params *[]string) string {
sql := ""
for index, items := range fieldsKv {
suffix := ""
if index+1 < len(fieldsKv) {
suffix = ","
}
if items[0][0] == '!' {
sql += " `" + items[0][1:] + "`=" + items[1] + suffix
} else {
sql += " `" + items[0] + "`=?" + suffix
*params = append(*params, items[1])
}
}
return sql
}
func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) string {
sql := " ("
for index, items := range fieldsKv {
suffix := ""
if index+1 < len(fieldsKv) {
suffix = ","
}
sql += "`" + items[0] + "`" + suffix
}
sql += ")"
sql += " VALUES("
for index, items := range fieldsKv {
suffix := ""
if index+1 < len(fieldsKv) {
suffix = ","
}
sql += "?" + suffix
*params = append(*params, items[1])
}
sql += ")"
return sql
}
func (this *dbPool) internalExec(dataSource string, sql string, params []string,
cb ExecResultCb) {
ds := this.borrowConn(dataSource)
if ds == nil {
cb(errors.New("borrowConn error"), int64(0), int64(0))
return
}
this.returnConn(ds)
result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...)
//this.returnConn(ds)
if err != nil {
GetSysLog().Warning("f5.dbpool.internalExec error:%s sql:%s\\%s", err, sql, q5.GetCallStack())
}
var lastInsertId int64
var rowsAffected int64
if err == nil {
if id, err := result.LastInsertId(); err == nil {
lastInsertId = id
}
if id, err := result.RowsAffected(); err == nil {
rowsAffected = id
}
}
if this.style == GO_STYLE_DB {
cb(err, lastInsertId, rowsAffected)
} else {
_app.RegisterMainThreadCb(
func() {
cb(err, lastInsertId, rowsAffected)
})
}
}
func (this *dbPool) internalQuery(dataSource string, sql string, params []string,
cb QueryResultCb) {
this.internalQueryEx(dataSource, sql, params, cb, false)
}
func (this *dbPool) syncInternalQuery(dataSource string, sql string, params []string,
cb QueryResultCb) {
this.internalQueryEx(dataSource, sql, params, cb, true)
}
func (this *dbPool) internalQueryEx(dataSource string, sql string, params []string,
cb QueryResultCb, noMainThread bool) {
ds := this.borrowConn(dataSource)
if ds == nil {
cb(errors.New("borrowConn error"), nil)
return
}
this.returnConn(ds)
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
//this.returnConn(ds)
if err != nil {
GetSysLog().Warning("f5.dbpool.internalQuery error:%s sql:%s", err, sql)
}
var dataSet *DataSet
if err == nil {
dataSet = newDataSet(rows)
}
freeFunc := func () {
if dataSet != nil {
dataSet.close()
}
}
if this.style == GO_STYLE_DB || noMainThread {
defer freeFunc()
cb(err, dataSet)
} else {
_app.RegisterMainThreadCb(
func() {
defer freeFunc()
cb(err, dataSet)
})
}
}
func (this *dbPool) internalQueryOne(dataSource string, sql string, params []string,
cb QueryOneCb) {
ds := this.borrowConn(dataSource)
if ds == nil {
cb(errors.New("borrowConn error"), nil)
return
}
this.returnConn(ds)
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
//this.returnConn(ds)
if err != nil {
GetSysLog().Warning("f5.dbpool.internalQueryOne error:%s sql:%s", err, sql)
}
var dataSet *DataSet
if err == nil {
dataSet = newDataSet(rows)
}
freeFunc := func () {
if dataSet != nil {
dataSet.close()
}
}
if this.style == GO_STYLE_DB {
defer freeFunc()
cb(err, dataSet)
} else {
_app.RegisterMainThreadCb(
func() {
defer freeFunc()
cb(err, dataSet)
})
}
}
func (this* dbPool) query(dataSource string, sql string, params []string,
cb QueryResultCb) {
if this.style == GO_STYLE_DB {
this.internalQuery(dataSource, sql, params, cb)
} else {
go this.internalQuery(dataSource, sql, params, cb)
}
}
func (this *dbPool) exec(dataSource string, sql string, params []string,
cb ExecResultCb) {
if this.style == GO_STYLE_DB {
this.internalExec(dataSource, sql, params, cb)
} else {
go this.internalExec(dataSource, sql, params, cb)
}
}
func (this *dbPool) queryOne(dataSource string, sql string, params []string,
cb QueryOneCb) {
if this.style == GO_STYLE_DB {
this.internalQueryOne(dataSource, sql, params, cb)
} else {
go this.internalQueryOne(dataSource, sql, params, cb)
}
}
func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watchTimeCb func() int64,
cond *sync.Cond) {
var lastMaxIdx int64
for true {
hasNewDdata := false
this.RawQuery(
dataSource,
fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", watchTable),
[]string{},
func (err error, ds *DataSet) {
if err != nil {
return
}
if ds.Next() {
idx := q5.ToInt64(ds.GetByName("max_idx"))
if idx > lastMaxIdx {
lastMaxIdx = idx
hasNewDdata = true
GetSysLog().Info("%s hasNewData max_idx:%d", watchTable, lastMaxIdx)
}
}
})
if hasNewDdata {
cond.Broadcast()
} else {
time.Sleep(time.Second * time.Duration(watchTimeCb()))
}
}
}
func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string,
name string, item *maxIdxMonitorProgItem) *chan int64 {
this.monitorLock.Lock()
defer this.monitorLock.Unlock()
key := fmt.Sprintf("%s_%s", dataSource, tblName)
chNewMaxIdx := make(chan int64)
newIdx := atomic.AddInt64(&this.currMonitorIdx, 1)
if p, ok := this.maxIdxMonitorHash.Load(key); ok {
(*p).Store(newIdx, &chNewMaxIdx)
} else {
p := new(q5.ConcurrentMap[int64, *chan int64])
p.Store(newIdx, &chNewMaxIdx)
this.maxIdxMonitorHash.Store(key, p)
notifyFunc := func (newMaxIdx int64) {
p.Range(func(id int64, ch *chan int64) bool {
go func() {
(*ch) <- newMaxIdx
}()
return true
})
}
var lastMaxIdx int64
prog := new(maxIdxMonitorProg)
prog.dataSource = dataSource
prog.tblName = tblName
prog.maxIdx = &lastMaxIdx
prog.itemHash = new(q5.ConcurrentMap[string, *maxIdxMonitorProgItem])
this.maxIdxMonitorProgHash.Store(key, prog)
go func () {
for {
this.RawQuery(
dataSource,
fmt.Sprintf("SELECT MAX(idx) FROM %s", tblName),
[]string{},
func (err error, ds *DataSet) {
if err != nil {
time.Sleep(time.Millisecond * time.Duration(8000 + rand.Intn(1200)))
return
}
if ds.Next() {
idx := q5.ToInt64(ds.GetByIndex(0))
if idx > lastMaxIdx {
lastMaxIdx = idx
GetSysLog().Info("%s hasNewData max_idx:%d", tblName, lastMaxIdx)
}
}
})
go notifyFunc(lastMaxIdx)
time.Sleep(time.Millisecond * time.Duration(800 + rand.Intn(1200)))
}
}()
}
this.addMonitorProgItem(key, name, item)
return &chNewMaxIdx
}
func (this *dbPool) addMonitorProgItem(key string, name string, item *maxIdxMonitorProgItem) {
if p, ok := this.maxIdxMonitorProgHash.Load(key); ok {
(*p).itemHash.Store(name, item)
} else {
panic(fmt.Sprintf("addMonitorProgItem key:%s name:%s lastIdx:%s", key, name))
}
}