package f5 import ( "errors" "fmt" "math" "q5" "strings" "sync" "time" ) type DBStyle int32 const ( GO_STYLE_DB DBStyle = iota JS_STYLE_DB ) type dataSource struct { name string conn *q5.Mysql entry q5.ListHead } type dbPool struct { style DBStyle lock sync.Mutex dataSourceHash map[string]*q5.ListHead } func (this *dbPool) init(style DBStyle) { this.style = style this.dataSourceHash = make(map[string]*q5.ListHead) } func (this *dbPool) unInit() { } func (this *dbPool) RegisterDataSource(name string, host string, port int32, user string, passwd string, dataBase string, size int32) { this.lock.Lock() defer this.lock.Unlock() var head *q5.ListHead if val, ok := this.dataSourceHash[name]; ok { head = val } else { head = q5.NewListHead() this.dataSourceHash[name] = head } for i := int32(0); i < size; i++ { ds := dataSource{} ds.name = name ds.conn = q5.NewMysql(host, port, user, passwd, dataBase) ds.entry.Init(&ds) head.AddTail(&ds.entry) err := ds.conn.Open() if err != nil { panic(fmt.Sprintf("RegisterDataSource err:%s", err)) } } } func (this *dbPool) Select( dataSource string, tblName string, fields []string, whereKv [][]string, cb QueryResultCb) { params := []string{} sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName) this.joinWhere(&sql, ¶ms, whereKv) this.query(dataSource, sql, params, cb) } func (this *dbPool) OrmSelect( dataSource string, tblName string, whereKv [][]string, cb QueryResultCb) { params := []string{} sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName) this.joinWhere(&sql, ¶ms, whereKv) this.query(dataSource, sql, params, cb) } func (this *dbPool) RawQuery(dataSource string, sql string, params []string, cb QueryResultCb) { this.query(dataSource, sql, params, cb) } func (this *dbPool) SelectCustomQuery(dataSource string, sql string, cb QueryResultCb) { params := []string{} this.query(dataSource, sql, params, cb) } func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb QueryResultCb) { params := []string{} this.syncInternalQuery(dataSource, sql, params, cb) } func (this *dbPool) SyncBatchLoadFullTable(dataSource string, sqlTpl string, cb func(*DataSet), errCb func(error)) int64 { var lastIdx int64 var done = false for !done { this.SyncSelectCustomQuery( dataSource, fmt.Sprintf(sqlTpl, lastIdx), func (err error, ds *DataSet) { if err != nil { errCb(err) return } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) cb(ds) if idx > lastIdx { lastIdx = idx } else { panic(fmt.Sprintf("SyncBatchLoadFullTable idx error:%s %s", idx, lastIdx)) } } if ds.NumOfReaded() <= 0 { done = true } }) } return lastIdx } func (this *dbPool) SelectLike( dataSource string, tblName string, fields []string, whereKv [][]string, likeWhere [][]string, start int64, limit int, cb QueryResultCb) { var params []string sql := fmt.Sprintf("SELECT %s FROM %s WHERE idx > %d", this.joinSelectFields(fields), tblName, start) this.joinWhere(&sql, ¶ms, whereKv) this.joinWhereLike(&sql, ¶ms, likeWhere) sql = fmt.Sprintf("%s ORDER BY idx ASC LIMIT %d", sql, limit) this.query(dataSource, sql, params, cb) } func (this *dbPool) SelectOne( dataSource string, tblName string, fields []string, whereKv [][]string, cb QueryOneCb) { params := []string{} sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName) this.joinWhere(&sql, ¶ms, whereKv) this.queryOne(dataSource, sql, params, cb) } func (this *dbPool) OrmSelectOne( dataSource string, tblName string, whereKv [][]string, cb QueryOneCb) { params := []string{} sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName) this.joinWhere(&sql, ¶ms, whereKv) this.queryOne(dataSource, sql, params, cb) } func (this *dbPool) Update( dataSource string, tblName string, whereKv [][]string, fieldsKv [][]string, cb ExecResultCb) { params := []string{} sql := "UPDATE `" + tblName + "` SET " + this.joinUpdateFields(fieldsKv, ¶ms) + " WHERE 1=1" this.joinWhere(&sql, ¶ms, whereKv) this.exec(dataSource, sql, params, cb) } func (this *dbPool) Insert( dataSource string, tblName string, fieldsKv [][]string, cb ExecResultCb) { params := []string{} sql := "INSERT INTO `" + tblName + "` " + this.joinInsertFields(fieldsKv, ¶ms) this.exec(dataSource, sql, params, cb) } func (this *dbPool) Upsert( dataSource string, tblName string, whereKv [][]string, updateKv [][]string, insertKv [][]string, cb ExecResultCb) { this.OrmSelectOne(dataSource, tblName, whereKv, func(err error, ds *DataSet) { if err != nil { cb(err, 0, 0) return } if ds.Next() { if len(updateKv) > 0 { this.Update(dataSource, tblName, whereKv, updateKv, cb) } } else { this.Insert(dataSource, tblName, insertKv, cb) } }) } func (this *dbPool) UpsertEx( dataSource string, tblName string, whereKv [][]string, updateKv [][]string, insertKv [][]string, cb ExecResultCb, updateCb func(*DataSet) bool) { this.OrmSelectOne(dataSource, tblName, whereKv, func(err error, ds *DataSet) { if err != nil { cb(err, 0, 0) return } if ds.Next() { if len(updateKv) > 0 { if updateCb(ds) { this.Update(dataSource, tblName, whereKv, updateKv, cb) } } } else { this.Insert(dataSource, tblName, insertKv, cb) } }) } func (this *dbPool) PageQuery( dataSource string, perPage int32, page int32, sql string, params []string, filter DbQueryFilter, orderBy string, cb PageQueryCb) { var pagination Pagination pagination.PerPage = q5.Max(1, perPage) pagination.CurrentPage = q5.Max(1, page) finalySql := sql if filter != nil { finalySql += filter.GenSql() } if orderBy != "" { finalySql += " " + orderBy + " " } //GetSysLog().Info("finalySql:%s", finalySql) this.queryOne( dataSource, fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql), params, func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } if rows != nil && rows.Next() { pagination.Total = q5.ToInt32(rows.GetByIndex(0)) pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(rows.GetByIndex(0)) / float64(pagination.PerPage))) } start := pagination.PerPage * (pagination.CurrentPage - 1) limit := pagination.PerPage this.query( dataSource, fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit), params, func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } pagination.Rows = rows cb(nil, &pagination) }) }) } func (this *dbPool) StreamPageQuery( dataSource string, pageSize int32, cursor int64, sql string, params []string, filter DbQueryFilter, orderBy string, cb SteamPageQueryCb, fillCb func(*DataSet)) { if (pageSize <= 0) { pageSize = 1 } if (pageSize > 1000) { pageSize = 1000 } var pagination StreamPagination finalySql := sql if filter != nil { finalySql += filter.GenSql() } finalySql += fmt.Sprintf(" LIMIT %d ", pageSize + 1) if orderBy != "" { finalySql += " " + orderBy + " " } GetSysLog().Info("finalySql:%s", finalySql) this.queryOne( dataSource, finalySql, params, func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } pagination.PreviousCursor = cursor for rows.Next() { if (rows.NumOfReaded() <= int64(pageSize)) { fillCb(rows) } else if (rows.NumOfReaded() == int64(pageSize)) { pagination.NextCursor = q5.ToInt64(rows.GetByName("idx")) } else if (rows.NumOfReaded() > int64(pageSize)) { pagination.Remaining = 1 } } cb(nil, &pagination) }) } func (this *dbPool) borrowConn(name string) *dataSource { tryCount := 0 for tryCount < 5 { { this.lock.Lock() if head, ok := this.dataSourceHash[name]; ok { if !head.Empty() { ds := head.FirstEntry().(*dataSource) ds.entry.DelInit() this.lock.Unlock() return ds } } this.lock.Unlock() } time.Sleep(time.Second * 1) tryCount++ } return nil } func (this *dbPool) returnConn(ds *dataSource) { this.lock.Lock() defer this.lock.Unlock() if head, ok := this.dataSourceHash[ds.name]; ok { head.AddTail(&ds.entry) } else { panic(fmt.Sprintf("returnConn error %s", ds.name)) } } func (this *dbPool) joinSelectFields(fields []string) string { return strings.Join( q5.Map(fields, func(val string) string { return "`" + val + "`" }), ", ") } func (this *dbPool) joinWhere(sql *string, params *[]string, whereKv [][]string) { for _, items := range whereKv { *sql += " AND " + items[0] + "=?" *params = append(*params, items[1]) } } func (this *dbPool) joinWhereLike(sql *string, params *[]string, whereKv [][]string) { for _, items := range whereKv { *sql += " AND " + items[0] + " LIKE ? " *params = append(*params, items[1]) } } func (this *dbPool) joinUpdateFields(fieldsKv [][]string, params *[]string) string { sql := "" for index, items := range fieldsKv { suffix := "" if index+1 < len(fieldsKv) { suffix = "," } if items[0][0] == '!' { sql += " `" + items[0][1:] + "`=" + items[1] + suffix } else { sql += " `" + items[0] + "`=?" + suffix *params = append(*params, items[1]) } } return sql } func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) string { sql := " (" for index, items := range fieldsKv { suffix := "" if index+1 < len(fieldsKv) { suffix = "," } sql += "`" + items[0] + "`" + suffix } sql += ")" sql += " VALUES(" for index, items := range fieldsKv { suffix := "" if index+1 < len(fieldsKv) { suffix = "," } sql += "?" + suffix *params = append(*params, items[1]) } sql += ")" return sql } func (this *dbPool) internalExec(dataSource string, sql string, params []string, cb ExecResultCb) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), int64(0), int64(0)) return } this.returnConn(ds) result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...) //this.returnConn(ds) var lastInsertId int64 var rowsAffected int64 if err == nil { if id, err := result.LastInsertId(); err == nil { lastInsertId = id } if id, err := result.RowsAffected(); err == nil { rowsAffected = id } } if this.style == GO_STYLE_DB { cb(err, lastInsertId, rowsAffected) } else { _app.RegisterMainThreadCb( func() { cb(err, lastInsertId, rowsAffected) }) } } func (this *dbPool) internalQuery(dataSource string, sql string, params []string, cb QueryResultCb) { this.internalQueryEx(dataSource, sql, params, cb, false) } func (this *dbPool) syncInternalQuery(dataSource string, sql string, params []string, cb QueryResultCb) { this.internalQueryEx(dataSource, sql, params, cb, true) } func (this *dbPool) internalQueryEx(dataSource string, sql string, params []string, cb QueryResultCb, noMainThread bool) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), nil) return } this.returnConn(ds) rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) //this.returnConn(ds) if err != nil { GetSysLog().Warning("f5.dbpool.internalQuery error:%s sql:%s", err, sql) } var dataSet *DataSet if err == nil { dataSet = NewDataSet(rows) } freeFunc := func () { if dataSet != nil { dataSet.close() } } if this.style == GO_STYLE_DB || noMainThread { defer freeFunc() cb(err, dataSet) } else { _app.RegisterMainThreadCb( func() { defer freeFunc() cb(err, dataSet) }) } } func (this *dbPool) internalQueryOne(dataSource string, sql string, params []string, cb QueryOneCb) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), nil) return } this.returnConn(ds) rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) //this.returnConn(ds) if err != nil { GetSysLog().Warning("f5.dbpool.internalQueryOne error:%s sql:%s", err, sql) } var dataSet *DataSet if err == nil { dataSet = NewDataSet(rows) } freeFunc := func () { if dataSet != nil { dataSet.close() } } if this.style == GO_STYLE_DB { defer freeFunc() cb(err, dataSet) } else { _app.RegisterMainThreadCb( func() { defer freeFunc() cb(err, dataSet) }) } } func (this* dbPool) query(dataSource string, sql string, params []string, cb QueryResultCb) { if this.style == GO_STYLE_DB { this.internalQuery(dataSource, sql, params, cb) } else { go this.internalQuery(dataSource, sql, params, cb) } } func (this *dbPool) exec(dataSource string, sql string, params []string, cb ExecResultCb) { if this.style == GO_STYLE_DB { this.internalExec(dataSource, sql, params, cb) } else { go this.internalExec(dataSource, sql, params, cb) } } func (this *dbPool) queryOne(dataSource string, sql string, params []string, cb QueryOneCb) { if this.style == GO_STYLE_DB { this.internalQueryOne(dataSource, sql, params, cb) } else { go this.internalQueryOne(dataSource, sql, params, cb) } }