package f5 import ( "q5" "time" "fmt" "math" "strings" "sync" "errors" ) type DBStyle int32 const ( GO_STYLE_DB DBStyle = iota JS_STYLE_DB ) type dataSource struct { name string conn *q5.Mysql entry q5.ListHead } type dbPool struct { style DBStyle lock sync.Mutex dataSourceHash map[string]*q5.ListHead } func (this *dbPool) init() { } func (this *dbPool) unInit() { } func (this *dbPool) RegisterDataSource(name string, host string, port int32, user string, passwd string, dataBase string, size int32) { this.lock.Lock() defer this.lock.Unlock() var head *q5.ListHead if val, ok := this.dataSourceHash[name]; ok { head = val } else { head = q5.NewListHead() this.dataSourceHash[name] = head } for i := int32(0); i < size; i++ { ds := dataSource{} ds.name = name ds.conn = q5.NewMysql(host, port, user, passwd, dataBase) ds.entry.Init(&ds) head.AddTail(&ds.entry) err := ds.conn.Open() if err != nil { panic(fmt.Sprintf("RegisterDataSource err:%s", err)) } } } func (this *dbPool) Select( dataSource string, tblName string, fields []string, whereKv [][]string, cb QueryResultCb) { params := []string{} sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName) this.joinWhere(&sql, ¶ms, whereKv) this.internalQuery(dataSource, sql, params, cb) } func (this *dbPool) OrmSelect( dataSource string, tblName string, whereKv [][]string, cb QueryResultCb) { params := []string{} sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName) this.joinWhere(&sql, ¶ms, whereKv) this.internalQuery(dataSource, sql, params, cb) } func (this *dbPool) SelectOne( dataSource string, tblName string, fields []string, whereKv [][]string, cb QueryOneCb) { params := []string{} sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName) this.joinWhere(&sql, ¶ms, whereKv) this.internalQueryOne(dataSource, sql, params, cb) } func (this *dbPool) OrmSelectOne( dataSource string, tblName string, fields []string, whereKv [][]string, cb QueryOneCb) { params := []string{} sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName) this.joinWhere(&sql, ¶ms, whereKv) this.internalQueryOne(dataSource, sql, params, cb) } func (this *dbPool) Update( dataSource string, tblName string, fieldsKv [][]string, whereKv [][]string, cb ExecResultCb) { params := []string{} sql := "UPDATE `" + tblName + "` SET " + this.joinUpdateFields(fieldsKv, ¶ms) + " WHERE 1=1" this.joinWhere(&sql, ¶ms, whereKv) this.internalExec(dataSource, sql, params, cb) } func (this *dbPool) Insert( dataSource string, tblName string, fieldsKv [][]string, cb ExecResultCb) { params := []string{} sql := "INSERT INTO `" + tblName + "` " + this.joinInsertFields(fieldsKv, ¶ms) this.internalExec(dataSource, sql, params, cb) } func (this *dbPool) Upsert( dataSource string, tblName string, whereKv map[string]string, updateKv map[string]string, insertKv map[string]string, cb ExecResultCb) { } func (this *dbPool) PageQuery( dataSource string, perPage int32, page int32, sql string, params []string, filter DbQueryFilter, orderBy string, cb PageQueryCb) { var pagination Pagination pagination.PerPage = q5.Max(1, perPage) pagination.CurrentPage = q5.Max(1, page) finalySql := sql if filter != nil { finalySql += filter.GenSql() } if orderBy != "" { finalySql += " " + orderBy + " " } this.internalQueryOne( dataSource, fmt.Sprintf("SELECT COUNT(*) FROM (%s)", finalySql), params, func (err error, row *[]*string) { if err != nil { cb(err, &pagination) return } pagination.Total = q5.ToInt32(*(*row)[0]) pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(*(*row)[0]) / float64(pagination.PerPage))) start := pagination.PerPage * (pagination.CurrentPage - 1) limit := pagination.PerPage this.internalQuery( dataSource, fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit), params, func (err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } pagination.Rows = rows cb(nil, &pagination) }) }) } func (this *dbPool) borrowConn(name string) *dataSource { tryCount := 0 for tryCount < 5 { { this.lock.Lock() defer this.lock.Unlock() if head, ok := this.dataSourceHash[name]; ok { if !head.Empty() { next := head.Next() next.Del() return next.GetData().(*dataSource) } } } time.Sleep(time.Second * 1) tryCount++ } return nil } func (this *dbPool) returnConn(ds *dataSource) { this.lock.Lock() defer this.lock.Unlock() if head, ok := this.dataSourceHash[ds.name]; ok { head.AddTail(&ds.entry) } else { panic(fmt.Sprintf("returnConn error %s", ds.name)) } } func (this *dbPool) joinSelectFields(fields []string) string { return strings.Join( q5.Map(fields, func(val string) string { return "`" + val + "`" }), ", ") } func (this *dbPool) joinWhere(sql *string, params *[]string, whereKv [][]string) { for _, items := range whereKv { *sql += " AND " + items[0] + "=?" *params = append(*params, items[1]) } } func (this *dbPool) joinUpdateFields(fieldsKv [][]string, params *[]string) string { return "" } func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) string { return "" } func (this *dbPool) internalExec(dataSource string, sql string, params []string, cb ExecResultCb) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), int64(0), int64(0)) return } result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...) this.returnConn(ds) var lastInsertId int64 var rowsAffected int64 if err == nil { if id, err := result.LastInsertId(); err == nil { lastInsertId = id } if id, err := result.RowsAffected(); err == nil { rowsAffected = id } } if this.style == GO_STYLE_DB { cb(err, lastInsertId, rowsAffected) } else { _app.RegisterMainThreadCb( func () { cb(err, lastInsertId, rowsAffected) }) } } func (this *dbPool) internalQuery(dataSource string, sql string, params []string, cb QueryResultCb) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), int64(0), int64(0)) return } rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) this.returnConn(ds) if this.style == GO_STYLE_DB { cb(err, NewDataSet(rows)) } else { _app.RegisterMainThreadCb( func () { cb(err, NewDataSet(rows)) }) } } func (this *dbPool) internalQueryOne(dataSource string, sql string, params []string, cb QueryOneCb) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), int64(0), int64(0)) return } rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) this.returnConn(ds) values := &[]*string{} if err == nil { dataSet := NewDataSet(rows) if dataSet.Next() { values = dataSet.GetValues() } } if this.style == GO_STYLE_DB { cb(err, values) } else { _app.RegisterMainThreadCb( func () { cb(err, values) }) } }