657 lines
14 KiB
Go
657 lines
14 KiB
Go
package f5
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"q5"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type DBStyle int32
|
|
|
|
const (
|
|
GO_STYLE_DB DBStyle = iota
|
|
JS_STYLE_DB
|
|
)
|
|
|
|
type dataSource struct {
|
|
name string
|
|
conn *q5.Mysql
|
|
entry q5.ListHead
|
|
}
|
|
|
|
type dbPool struct {
|
|
style DBStyle
|
|
lock sync.Mutex
|
|
dataSourceHash map[string]*q5.ListHead
|
|
}
|
|
|
|
func (this *dbPool) init(style DBStyle) {
|
|
this.style = style
|
|
this.dataSourceHash = make(map[string]*q5.ListHead)
|
|
}
|
|
|
|
func (this *dbPool) unInit() {
|
|
}
|
|
|
|
func (this *dbPool) RegisterDataSource(name string, host string, port int32,
|
|
user string, passwd string, dataBase string, size int32, maxOpenConns int32, maxIdleConns int32) {
|
|
this.lock.Lock()
|
|
defer this.lock.Unlock()
|
|
var head *q5.ListHead
|
|
if val, ok := this.dataSourceHash[name]; ok {
|
|
head = val
|
|
} else {
|
|
head = q5.NewListHead()
|
|
this.dataSourceHash[name] = head
|
|
}
|
|
if maxOpenConns <= 0 {
|
|
maxOpenConns = 1
|
|
}
|
|
if maxIdleConns <= 0 {
|
|
maxIdleConns = 1
|
|
}
|
|
for i := int32(0); i < size; i++ {
|
|
ds := dataSource{}
|
|
ds.name = name
|
|
ds.conn = q5.NewMysql(host, port, user, passwd, dataBase, maxOpenConns, maxIdleConns)
|
|
ds.entry.Init(&ds)
|
|
head.AddTail(&ds.entry)
|
|
err := ds.conn.Open()
|
|
if err != nil {
|
|
panic(fmt.Sprintf("RegisterDataSource err:%s", err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (this *dbPool) Select(
|
|
dataSource string,
|
|
tblName string,
|
|
fields []string,
|
|
whereKv [][]string,
|
|
cb QueryResultCb) {
|
|
params := []string{}
|
|
sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName)
|
|
this.joinWhere(&sql, ¶ms, whereKv)
|
|
this.query(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) OrmSelect(
|
|
dataSource string,
|
|
tblName string,
|
|
whereKv [][]string,
|
|
cb QueryResultCb) {
|
|
params := []string{}
|
|
sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName)
|
|
this.joinWhere(&sql, ¶ms, whereKv)
|
|
this.query(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) RawQuery(dataSource string, sql string, params []string, cb QueryResultCb) {
|
|
this.query(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) SelectCustomQuery(dataSource string, sql string, cb QueryResultCb) {
|
|
params := []string{}
|
|
this.query(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb QueryResultCb) {
|
|
params := []string{}
|
|
this.syncInternalQuery(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) SyncBatchLoadFullTable(dataSource string, sqlTpl string,
|
|
cb func(*DataSet), errCb func(error)) int64 {
|
|
var lastIdx int64
|
|
var done = false
|
|
for !done {
|
|
this.SyncSelectCustomQuery(
|
|
dataSource,
|
|
fmt.Sprintf(sqlTpl, lastIdx),
|
|
func (err error, ds *DataSet) {
|
|
if err != nil {
|
|
errCb(err)
|
|
return
|
|
}
|
|
for ds.Next() {
|
|
idx := q5.ToInt64(ds.GetByName("idx"))
|
|
cb(ds)
|
|
if idx > lastIdx {
|
|
lastIdx = idx
|
|
} else {
|
|
panic(fmt.Sprintf("SyncBatchLoadFullTable idx error:%s %s", idx, lastIdx))
|
|
}
|
|
}
|
|
if ds.NumOfReaded() <= 0 {
|
|
done = true
|
|
}
|
|
})
|
|
}
|
|
return lastIdx
|
|
}
|
|
|
|
func (this *dbPool) LoopLoad(
|
|
dataSource string,
|
|
watchTable string,
|
|
watchTimeCb func() int64,
|
|
sqlCb func(int64) string,
|
|
params []string,
|
|
nextTimeCb func() time.Duration,
|
|
nextRoundCb func() time.Duration,
|
|
doCb func(*DataSet) bool) {
|
|
{
|
|
go func () {
|
|
this.RawQuery(
|
|
dataSource,
|
|
fmt.Sprintf("SELECT MAX(idx) FROM %s", watchTable),
|
|
params,
|
|
func (err error, ds *DataSet) {
|
|
})
|
|
}()
|
|
}
|
|
var lastIdx int64
|
|
for true {
|
|
hasNextData := false
|
|
sql := sqlCb(lastIdx)
|
|
this.RawQuery(
|
|
dataSource,
|
|
sql,
|
|
params,
|
|
func (err error, ds *DataSet) {
|
|
if err == nil {
|
|
for ds.Next() {
|
|
idx := q5.ToInt64(ds.GetByName("idx"))
|
|
if !doCb(ds) {
|
|
return
|
|
}
|
|
if idx > lastIdx {
|
|
lastIdx = idx
|
|
}
|
|
hasNextData = true
|
|
}
|
|
}
|
|
})
|
|
if hasNextData {
|
|
time.Sleep(nextTimeCb())
|
|
} else {
|
|
lastIdx = 0
|
|
time.Sleep(nextRoundCb())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (this *dbPool) SelectLike(
|
|
dataSource string,
|
|
tblName string,
|
|
fields []string,
|
|
whereKv [][]string,
|
|
likeWhere [][]string,
|
|
start int64,
|
|
limit int,
|
|
cb QueryResultCb) {
|
|
var params []string
|
|
sql := fmt.Sprintf("SELECT %s FROM %s WHERE idx > %d", this.joinSelectFields(fields), tblName, start)
|
|
this.joinWhere(&sql, ¶ms, whereKv)
|
|
this.joinWhereLike(&sql, ¶ms, likeWhere)
|
|
sql = fmt.Sprintf("%s ORDER BY idx ASC LIMIT %d", sql, limit)
|
|
|
|
this.query(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) SelectOne(
|
|
dataSource string,
|
|
tblName string,
|
|
fields []string,
|
|
whereKv [][]string,
|
|
cb QueryOneCb) {
|
|
params := []string{}
|
|
sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName)
|
|
this.joinWhere(&sql, ¶ms, whereKv)
|
|
this.queryOne(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) OrmSelectOne(
|
|
dataSource string,
|
|
tblName string,
|
|
whereKv [][]string,
|
|
cb QueryOneCb) {
|
|
params := []string{}
|
|
sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName)
|
|
this.joinWhere(&sql, ¶ms, whereKv)
|
|
this.queryOne(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) Update(
|
|
dataSource string,
|
|
tblName string,
|
|
whereKv [][]string,
|
|
fieldsKv [][]string,
|
|
cb ExecResultCb) {
|
|
params := []string{}
|
|
sql := "UPDATE `" + tblName + "` SET " + this.joinUpdateFields(fieldsKv, ¶ms) +
|
|
" WHERE 1=1"
|
|
this.joinWhere(&sql, ¶ms, whereKv)
|
|
this.exec(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) Insert(
|
|
dataSource string,
|
|
tblName string,
|
|
fieldsKv [][]string,
|
|
cb ExecResultCb) {
|
|
params := []string{}
|
|
sql := "INSERT INTO `" + tblName + "` " + this.joinInsertFields(fieldsKv, ¶ms)
|
|
this.exec(dataSource, sql, params, cb)
|
|
}
|
|
|
|
func (this *dbPool) Upsert(
|
|
dataSource string,
|
|
tblName string,
|
|
whereKv [][]string,
|
|
updateKv [][]string,
|
|
insertKv [][]string,
|
|
cb ExecResultCb) {
|
|
this.OrmSelectOne(dataSource, tblName, whereKv,
|
|
func(err error, ds *DataSet) {
|
|
if err != nil {
|
|
cb(err, 0, 0)
|
|
return
|
|
}
|
|
if ds.Next() {
|
|
if len(updateKv) > 0 {
|
|
this.Update(dataSource, tblName, whereKv, updateKv, cb)
|
|
}
|
|
} else {
|
|
this.Insert(dataSource, tblName, insertKv, cb)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (this *dbPool) UpsertEx(
|
|
dataSource string,
|
|
tblName string,
|
|
whereKv [][]string,
|
|
updateKv [][]string,
|
|
insertKv [][]string,
|
|
cb ExecResultCb,
|
|
updateCb func(*DataSet) bool) {
|
|
this.OrmSelectOne(dataSource, tblName, whereKv,
|
|
func(err error, ds *DataSet) {
|
|
if err != nil {
|
|
cb(err, 0, 0)
|
|
return
|
|
}
|
|
if ds.Next() {
|
|
if len(updateKv) > 0 {
|
|
if updateCb(ds) {
|
|
this.Update(dataSource, tblName, whereKv, updateKv, cb)
|
|
}
|
|
}
|
|
} else {
|
|
this.Insert(dataSource, tblName, insertKv, cb)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (this *dbPool) PageQuery(
|
|
dataSource string,
|
|
perPage int32,
|
|
page int32,
|
|
sql string,
|
|
params []string,
|
|
filter DbQueryFilter,
|
|
orderBy string,
|
|
cb PageQueryCb) {
|
|
var pagination Pagination
|
|
pagination.PerPage = q5.Max(1, perPage)
|
|
pagination.CurrentPage = q5.Max(1, page)
|
|
finalySql := sql
|
|
if filter != nil {
|
|
finalySql += filter.GenSql()
|
|
}
|
|
if orderBy != "" {
|
|
finalySql += " " + orderBy + " "
|
|
}
|
|
//GetSysLog().Info("finalySql:%s", finalySql)
|
|
this.queryOne(
|
|
dataSource,
|
|
fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql),
|
|
params,
|
|
func(err error, rows *DataSet) {
|
|
if err != nil {
|
|
cb(err, &pagination)
|
|
return
|
|
}
|
|
if rows != nil && rows.Next() {
|
|
pagination.Total = q5.ToInt32(rows.GetByIndex(0))
|
|
pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(rows.GetByIndex(0)) /
|
|
float64(pagination.PerPage)))
|
|
}
|
|
start := pagination.PerPage * (pagination.CurrentPage - 1)
|
|
limit := pagination.PerPage
|
|
this.query(
|
|
dataSource,
|
|
fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit),
|
|
params,
|
|
func(err error, rows *DataSet) {
|
|
if err != nil {
|
|
cb(err, &pagination)
|
|
return
|
|
}
|
|
pagination.Rows = rows
|
|
cb(nil, &pagination)
|
|
})
|
|
})
|
|
}
|
|
|
|
func (this *dbPool) StreamPageQuery(
|
|
dataSource string,
|
|
pageSize int32,
|
|
cursor int64,
|
|
sql string,
|
|
params []string,
|
|
filter DbQueryFilter,
|
|
orderBy string,
|
|
cb SteamPageQueryCb,
|
|
fillCb func(*DataSet)) {
|
|
if (pageSize <= 0) {
|
|
pageSize = 1
|
|
}
|
|
if (pageSize > 1000) {
|
|
pageSize = 1000
|
|
}
|
|
var pagination StreamPagination
|
|
finalySql := sql
|
|
if filter != nil {
|
|
finalySql += filter.GenSql()
|
|
}
|
|
finalySql += fmt.Sprintf(" LIMIT %d ", pageSize + 1)
|
|
if orderBy != "" {
|
|
finalySql += " " + orderBy + " "
|
|
}
|
|
GetSysLog().Info("finalySql:%s", finalySql)
|
|
this.queryOne(
|
|
dataSource,
|
|
finalySql,
|
|
params,
|
|
func(err error, rows *DataSet) {
|
|
if err != nil {
|
|
cb(err, &pagination)
|
|
return
|
|
}
|
|
pagination.PreviousCursor = cursor
|
|
for rows.Next() {
|
|
if (rows.NumOfReaded() <= int64(pageSize)) {
|
|
fillCb(rows)
|
|
} else if (rows.NumOfReaded() == int64(pageSize)) {
|
|
pagination.NextCursor = q5.ToInt64(rows.GetByName("idx"))
|
|
} else if (rows.NumOfReaded() > int64(pageSize)) {
|
|
pagination.Remaining = 1
|
|
}
|
|
}
|
|
cb(nil, &pagination)
|
|
})
|
|
}
|
|
|
|
func (this *dbPool) borrowConn(name string) *dataSource {
|
|
tryCount := 0
|
|
for tryCount < 5 {
|
|
{
|
|
this.lock.Lock()
|
|
if head, ok := this.dataSourceHash[name]; ok {
|
|
if !head.Empty() {
|
|
ds := head.FirstEntry().(*dataSource)
|
|
//ds.entry.DelInit()
|
|
this.lock.Unlock()
|
|
return ds
|
|
}
|
|
}
|
|
this.lock.Unlock()
|
|
}
|
|
time.Sleep(time.Second * 1)
|
|
tryCount++
|
|
}
|
|
return nil
|
|
/*
|
|
tryCount := 0
|
|
for tryCount < 5 {
|
|
{
|
|
this.lock.Lock()
|
|
if head, ok := this.dataSourceHash[name]; ok {
|
|
if !head.Empty() {
|
|
ds := head.FirstEntry().(*dataSource)
|
|
ds.entry.DelInit()
|
|
this.lock.Unlock()
|
|
return ds
|
|
}
|
|
}
|
|
this.lock.Unlock()
|
|
}
|
|
time.Sleep(time.Second * 1)
|
|
tryCount++
|
|
}
|
|
return nil*/
|
|
}
|
|
|
|
func (this *dbPool) returnConn(ds *dataSource) {
|
|
this.lock.Lock()
|
|
defer this.lock.Unlock()
|
|
|
|
/*
|
|
if head, ok := this.dataSourceHash[ds.name]; ok {
|
|
head.AddTail(&ds.entry)
|
|
} else {
|
|
panic(fmt.Sprintf("returnConn error %s", ds.name))
|
|
}*/
|
|
}
|
|
|
|
func (this *dbPool) joinSelectFields(fields []string) string {
|
|
return strings.Join(
|
|
q5.Map(fields,
|
|
func(val string) string {
|
|
return "`" + val + "`"
|
|
}),
|
|
", ")
|
|
}
|
|
|
|
func (this *dbPool) joinWhere(sql *string, params *[]string, whereKv [][]string) {
|
|
for _, items := range whereKv {
|
|
*sql += " AND " + items[0] + "=?"
|
|
*params = append(*params, items[1])
|
|
}
|
|
}
|
|
|
|
func (this *dbPool) joinWhereLike(sql *string, params *[]string, whereKv [][]string) {
|
|
for _, items := range whereKv {
|
|
*sql += " AND " + items[0] + " LIKE ? "
|
|
*params = append(*params, items[1])
|
|
}
|
|
}
|
|
|
|
func (this *dbPool) joinUpdateFields(fieldsKv [][]string, params *[]string) string {
|
|
sql := ""
|
|
for index, items := range fieldsKv {
|
|
suffix := ""
|
|
if index+1 < len(fieldsKv) {
|
|
suffix = ","
|
|
}
|
|
if items[0][0] == '!' {
|
|
sql += " `" + items[0][1:] + "`=" + items[1] + suffix
|
|
} else {
|
|
sql += " `" + items[0] + "`=?" + suffix
|
|
*params = append(*params, items[1])
|
|
}
|
|
}
|
|
return sql
|
|
}
|
|
|
|
func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) string {
|
|
sql := " ("
|
|
for index, items := range fieldsKv {
|
|
suffix := ""
|
|
if index+1 < len(fieldsKv) {
|
|
suffix = ","
|
|
}
|
|
sql += "`" + items[0] + "`" + suffix
|
|
}
|
|
sql += ")"
|
|
sql += " VALUES("
|
|
for index, items := range fieldsKv {
|
|
suffix := ""
|
|
if index+1 < len(fieldsKv) {
|
|
suffix = ","
|
|
}
|
|
sql += "?" + suffix
|
|
*params = append(*params, items[1])
|
|
}
|
|
sql += ")"
|
|
return sql
|
|
}
|
|
|
|
func (this *dbPool) internalExec(dataSource string, sql string, params []string,
|
|
cb ExecResultCb) {
|
|
ds := this.borrowConn(dataSource)
|
|
if ds == nil {
|
|
cb(errors.New("borrowConn error"), int64(0), int64(0))
|
|
return
|
|
}
|
|
this.returnConn(ds)
|
|
result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...)
|
|
//this.returnConn(ds)
|
|
if err != nil {
|
|
GetSysLog().Warning("f5.dbpool.internalExec error:%s sql:%s\\%s", err, sql, q5.GetCallStack())
|
|
}
|
|
|
|
var lastInsertId int64
|
|
var rowsAffected int64
|
|
if err == nil {
|
|
if id, err := result.LastInsertId(); err == nil {
|
|
lastInsertId = id
|
|
}
|
|
if id, err := result.RowsAffected(); err == nil {
|
|
rowsAffected = id
|
|
}
|
|
}
|
|
if this.style == GO_STYLE_DB {
|
|
cb(err, lastInsertId, rowsAffected)
|
|
} else {
|
|
_app.RegisterMainThreadCb(
|
|
func() {
|
|
cb(err, lastInsertId, rowsAffected)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (this *dbPool) internalQuery(dataSource string, sql string, params []string,
|
|
cb QueryResultCb) {
|
|
this.internalQueryEx(dataSource, sql, params, cb, false)
|
|
}
|
|
|
|
func (this *dbPool) syncInternalQuery(dataSource string, sql string, params []string,
|
|
cb QueryResultCb) {
|
|
this.internalQueryEx(dataSource, sql, params, cb, true)
|
|
}
|
|
|
|
func (this *dbPool) internalQueryEx(dataSource string, sql string, params []string,
|
|
cb QueryResultCb, noMainThread bool) {
|
|
ds := this.borrowConn(dataSource)
|
|
if ds == nil {
|
|
cb(errors.New("borrowConn error"), nil)
|
|
return
|
|
}
|
|
this.returnConn(ds)
|
|
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
|
|
//this.returnConn(ds)
|
|
if err != nil {
|
|
GetSysLog().Warning("f5.dbpool.internalQuery error:%s sql:%s", err, sql)
|
|
}
|
|
|
|
var dataSet *DataSet
|
|
if err == nil {
|
|
dataSet = newDataSet(rows)
|
|
}
|
|
freeFunc := func () {
|
|
if dataSet != nil {
|
|
dataSet.close()
|
|
}
|
|
}
|
|
|
|
if this.style == GO_STYLE_DB || noMainThread {
|
|
defer freeFunc()
|
|
cb(err, dataSet)
|
|
} else {
|
|
_app.RegisterMainThreadCb(
|
|
func() {
|
|
defer freeFunc()
|
|
cb(err, dataSet)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (this *dbPool) internalQueryOne(dataSource string, sql string, params []string,
|
|
cb QueryOneCb) {
|
|
ds := this.borrowConn(dataSource)
|
|
if ds == nil {
|
|
cb(errors.New("borrowConn error"), nil)
|
|
return
|
|
}
|
|
this.returnConn(ds)
|
|
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
|
|
//this.returnConn(ds)
|
|
if err != nil {
|
|
GetSysLog().Warning("f5.dbpool.internalQueryOne error:%s sql:%s", err, sql)
|
|
}
|
|
|
|
var dataSet *DataSet
|
|
if err == nil {
|
|
dataSet = newDataSet(rows)
|
|
}
|
|
freeFunc := func () {
|
|
if dataSet != nil {
|
|
dataSet.close()
|
|
}
|
|
}
|
|
|
|
if this.style == GO_STYLE_DB {
|
|
defer freeFunc()
|
|
cb(err, dataSet)
|
|
} else {
|
|
_app.RegisterMainThreadCb(
|
|
func() {
|
|
defer freeFunc()
|
|
cb(err, dataSet)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (this* dbPool) query(dataSource string, sql string, params []string,
|
|
cb QueryResultCb) {
|
|
if this.style == GO_STYLE_DB {
|
|
this.internalQuery(dataSource, sql, params, cb)
|
|
} else {
|
|
go this.internalQuery(dataSource, sql, params, cb)
|
|
}
|
|
}
|
|
|
|
func (this *dbPool) exec(dataSource string, sql string, params []string,
|
|
cb ExecResultCb) {
|
|
if this.style == GO_STYLE_DB {
|
|
this.internalExec(dataSource, sql, params, cb)
|
|
} else {
|
|
go this.internalExec(dataSource, sql, params, cb)
|
|
}
|
|
}
|
|
|
|
func (this *dbPool) queryOne(dataSource string, sql string, params []string,
|
|
cb QueryOneCb) {
|
|
if this.style == GO_STYLE_DB {
|
|
this.internalQueryOne(dataSource, sql, params, cb)
|
|
} else {
|
|
go this.internalQueryOne(dataSource, sql, params, cb)
|
|
}
|
|
}
|