package f5 import ( "errors" "fmt" "math" "q5" "strings" "sync" "sync/atomic" "math/rand" "time" ) type DBStyle int32 const ( GO_STYLE_DB DBStyle = iota JS_STYLE_DB ) type dataSource struct { name string conn *q5.Mysql entry q5.ListHead } type maxIdxMonitorProgItem struct { execTimes int64 okTimes int64 errTimes int64 abortTimes int64 lastIdx *int64 } type maxIdxMonitorProg struct { dataSource string tblName string maxIdx *int64 itemHash *q5.ConcurrentMap[string, *maxIdxMonitorProgItem] } type dbPool struct { style DBStyle lock sync.Mutex dataSourceHash map[string]*q5.ListHead currMonitorIdx int64 monitorLock sync.Mutex maxIdxMonitorHash *q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]] maxIdxMonitorProgHash *q5.ConcurrentMap[string, *maxIdxMonitorProg] } func (this *dbPool) init(style DBStyle) { this.style = style this.dataSourceHash = make(map[string]*q5.ListHead) this.maxIdxMonitorHash = new(q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]]) this.maxIdxMonitorProgHash = new(q5.ConcurrentMap[string, *maxIdxMonitorProg]) go this.outputMonitorLog() } func (this *dbPool) outputMonitorLog() { for { if this.maxIdxMonitorProgHash.GetSize() > 0 { GetSysLog().Info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") this.maxIdxMonitorProgHash.Range(func (k string, ele *maxIdxMonitorProg) bool { ele.itemHash.Range(func (k1 string, ele2 *maxIdxMonitorProgItem) bool { GetSysLog().Info("%s.%s.%s maxIdx:%d lastIdx:%d exec:%d ok:%d err:%d abort:%d", ele.dataSource, ele.tblName, k1, *ele.maxIdx, *ele2.lastIdx, ele2.execTimes, ele2.okTimes, ele2.errTimes, ele2.abortTimes) return true }) return true }) GetSysLog().Info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") } time.Sleep(time.Second * 10) } } func (this *dbPool) unInit() { } func (this *dbPool) RegisterDataSource(name string, host string, port int32, user string, passwd string, dataBase string, size int32, maxOpenConns int32, maxIdleConns int32) { this.lock.Lock() defer this.lock.Unlock() var head *q5.ListHead if val, ok := this.dataSourceHash[name]; ok { head = val } else { head = q5.NewListHead() this.dataSourceHash[name] = head } if maxOpenConns <= 0 { maxOpenConns = 1 } if maxIdleConns <= 0 { maxIdleConns = 1 } for i := int32(0); i < size; i++ { ds := dataSource{} ds.name = name ds.conn = q5.NewMysql(host, port, user, passwd, dataBase, maxOpenConns, maxIdleConns) ds.entry.Init(&ds) head.AddTail(&ds.entry) err := ds.conn.Open() if err != nil { panic(fmt.Sprintf("RegisterDataSource err:%s", err)) } } } func (this *dbPool) Select( dataSource string, tblName string, fields []string, whereKv [][]string, cb QueryResultCb) { params := []string{} sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName) this.joinWhere(&sql, ¶ms, whereKv) this.query(dataSource, sql, params, cb) } func (this *dbPool) OrmSelect( dataSource string, tblName string, whereKv [][]string, cb QueryResultCb) { params := []string{} sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName) this.joinWhere(&sql, ¶ms, whereKv) this.query(dataSource, sql, params, cb) } func (this *dbPool) RawQuery(dataSource string, sql string, params []string, cb QueryResultCb) { this.query(dataSource, sql, params, cb) } func (this *dbPool) SelectCustomQuery(dataSource string, sql string, cb QueryResultCb) { params := []string{} this.query(dataSource, sql, params, cb) } func (this *dbPool) SyncSelectCustomQuery(dataSource string, sql string, cb QueryResultCb) { params := []string{} this.syncInternalQuery(dataSource, sql, params, cb) } func (this *dbPool) SyncBatchLoadFullTable(dataSource string, sqlTpl string, cb func(*DataSet), errCb func(error)) int64 { var lastIdx int64 var done = false for !done { this.SyncSelectCustomQuery( dataSource, fmt.Sprintf(sqlTpl, lastIdx), func (err error, ds *DataSet) { if err != nil { errCb(err) return } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) cb(ds) if idx > lastIdx { lastIdx = idx } else { panic(fmt.Sprintf("SyncBatchLoadFullTable idx error:%s %s", idx, lastIdx)) } } if ds.NumOfReaded() <= 0 { done = true } }) } return lastIdx } func (this *dbPool) BatchLoadFullTable( dataSource string, sqlCb func(int64) string, params []string, nextTimeCb func(), doCb func(*DataSet) bool) error { if this.style != GO_STYLE_DB { panic("dbpool.BatchLoadFullTable is not gostyle") } var resultErr error var lastIdx int64 done := false for !done && resultErr == nil { this.syncInternalQuery( dataSource, sqlCb(lastIdx), params, func (err error, ds *DataSet) { if err != nil { resultErr = err return } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) fmt.Println("BatchLoadFullTable idx:%d lastIdx:%d", idx, lastIdx) doCb(ds) if idx > lastIdx { lastIdx = idx } else { panic(fmt.Sprintf("BatchLoadFullTable idx error:%s %s", idx, lastIdx)) } } if ds.NumOfReaded() <= 0 { done = true } }) nextTimeCb() } return resultErr } func (this *dbPool) LoopLoad( dataSource string, name string, watchTable string, watchTimeCb func() int64, sqlCb func(int64) string, params []string, nextTimeCb func() int64, nextRoundCb func() int64, doCb func(*DataSet) bool) { cond := sync.NewCond(new(sync.Mutex)) chTimer := make(chan int64) go this.discoverNewData(dataSource, watchTable, watchTimeCb, cond) go q5.CreateCondTimer(chTimer, cond, 10) var lastIdx int64 for true { hasNextData := false sql := sqlCb(lastIdx) this.RawQuery( dataSource, sql, params, func (err error, ds *DataSet) { if err != nil { return } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) if !doCb(ds) { return } if idx > lastIdx { lastIdx = idx } hasNextData = true } }) if hasNextData { chTimer <- nextTimeCb() } else { lastIdx = 0 chTimer <- nextRoundCb() } cond.L.Lock() cond.Wait() cond.L.Unlock() if !hasNextData { GetSysLog().Info("dbpool.LoopLoad %s fetch next batch last_idx:%d", name, lastIdx) } else { GetSysLog().Info("dbpool.LoopLoad %s fetch next round last_idx:%d", name, lastIdx) } } } func (this *dbPool) IncrementLoad( dataSource string, name string, tblName string, initIdx int64, sqlCb func(int64, int64) (string, []string), lastIdxChgCb func(int64), doCb func(*DataSet) bool) { if this.style != GO_STYLE_DB { panic("dbpool.IncrementLoad is not gostyle") } lastIdx := initIdx var maxIdx int64 var newMaxIdx int64 cond := sync.NewCond(new(sync.Mutex)) item := new(maxIdxMonitorProgItem) item.lastIdx = &lastIdx { chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, item) go func () { for { select { case idx := <-*chNewMaxIdx: { q5.AtomicStoreInt64WhenGreater(&idx, &newMaxIdx) cond.Broadcast() } } } }() } for { if lastIdx < maxIdx { oldLastIdx := lastIdx sql, params := sqlCb(lastIdx, maxIdx) this.syncInternalQuery( dataSource, sql, params, func (err error, ds *DataSet) { item.execTimes += 1 if err != nil { item.errTimes += 1 time.Sleep(time.Second * 10) return } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) if !doCb(ds) { item.abortTimes += 1 return } item.okTimes += 1 if idx > lastIdx { lastIdx = idx } else { panic(fmt.Sprintf("IncrementLoad idx error:%s %s", idx, lastIdx)) } } if ds.NumOfReaded() <= 0 { lastIdx = maxIdx } }) if lastIdx > oldLastIdx { lastIdxChgCb(lastIdx) } } q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx) if lastIdx >= maxIdx { cond.L.Lock() cond.Wait() cond.L.Unlock() } else { time.Sleep(time.Millisecond * 1000 * 3) } } } func (this *dbPool) LoopLoadNew( dataSource string, name string, tblName string, initIdx int64, sqlCb func(int64, int64) (string, []string), doCb func(*DataSet) bool) { if this.style != GO_STYLE_DB { panic("dbpool.IncrementLoad is not gostyle") } lastIdx := initIdx var maxIdx int64 var newMaxIdx int64 cond := sync.NewCond(new(sync.Mutex)) item := new(maxIdxMonitorProgItem) item.lastIdx = &lastIdx { chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, item) go func () { for { select { case idx := <-*chNewMaxIdx: { q5.AtomicStoreInt64WhenGreater(&idx, &newMaxIdx) cond.Broadcast() } } } }() } for { if lastIdx < maxIdx { sql, params := sqlCb(lastIdx, maxIdx) this.syncInternalQuery( dataSource, sql, params, func (err error, ds *DataSet) { item.execTimes += 1 if err != nil { item.errTimes += 1 time.Sleep(time.Second * 10) return } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) if !doCb(ds) { item.abortTimes += 1 return } item.okTimes += 1 if idx > lastIdx { lastIdx = idx } else { panic(fmt.Sprintf("IncrementLoad idx error:%s %s", idx, lastIdx)) } } if ds.NumOfReaded() <= 0 { lastIdx = maxIdx } }) } q5.AtomicStoreInt64WhenGreater(&newMaxIdx, &maxIdx) if lastIdx >= maxIdx { go func() { time.Sleep(time.Millisecond * 1000 * 60) cond.Broadcast() }() cond.L.Lock() cond.Wait() cond.L.Unlock() lastIdx = 0 } else { time.Sleep(time.Millisecond * 1000 * 3) } } } func (this *dbPool) SelectLike( dataSource string, tblName string, fields []string, whereKv [][]string, likeWhere [][]string, start int64, limit int, cb QueryResultCb) { var params []string sql := fmt.Sprintf("SELECT %s FROM %s WHERE idx > %d", this.joinSelectFields(fields), tblName, start) this.joinWhere(&sql, ¶ms, whereKv) this.joinWhereLike(&sql, ¶ms, likeWhere) sql = fmt.Sprintf("%s ORDER BY idx ASC LIMIT %d", sql, limit) this.query(dataSource, sql, params, cb) } func (this *dbPool) SelectOne( dataSource string, tblName string, fields []string, whereKv [][]string, cb QueryOneCb) { params := []string{} sql := fmt.Sprintf("SELECT %s FROM %s WHERE 1=1 ", this.joinSelectFields(fields), tblName) this.joinWhere(&sql, ¶ms, whereKv) this.queryOne(dataSource, sql, params, cb) } func (this *dbPool) OrmSelectOne( dataSource string, tblName string, whereKv [][]string, cb QueryOneCb) { params := []string{} sql := fmt.Sprintf("SELECT * FROM %s WHERE 1=1 ", tblName) this.joinWhere(&sql, ¶ms, whereKv) this.queryOne(dataSource, sql, params, cb) } func (this *dbPool) Update( dataSource string, tblName string, whereKv [][]string, fieldsKv [][]string, cb ExecResultCb) { params := []string{} sql := "UPDATE `" + tblName + "` SET " + this.joinUpdateFields(fieldsKv, ¶ms) + " WHERE 1=1" this.joinWhere(&sql, ¶ms, whereKv) this.exec(dataSource, sql, params, cb) } func (this *dbPool) Insert( dataSource string, tblName string, fieldsKv [][]string, cb ExecResultCb) { params := []string{} sql := "INSERT INTO `" + tblName + "` " + this.joinInsertFields(fieldsKv, ¶ms) this.exec(dataSource, sql, params, cb) } func (this *dbPool) Upsert( dataSource string, tblName string, whereKv [][]string, updateKv [][]string, insertKv [][]string, cb ExecResultCb) { this.OrmSelectOne(dataSource, tblName, whereKv, func(err error, ds *DataSet) { if err != nil { cb(err, 0, 0) return } if ds.Next() { if len(updateKv) > 0 { this.Update(dataSource, tblName, whereKv, updateKv, cb) } else { cb(nil, 0, 0) } } else { this.Insert(dataSource, tblName, insertKv, cb) } }) } func (this *dbPool) UpsertEx( dataSource string, tblName string, whereKv [][]string, updateKv [][]string, insertKv [][]string, cb ExecResultCb, updateCb func(*DataSet) bool) { this.OrmSelectOne(dataSource, tblName, whereKv, func(err error, ds *DataSet) { if err != nil { cb(err, 0, 0) return } if ds.Next() { if len(updateKv) > 0 { if updateCb(ds) { this.Update(dataSource, tblName, whereKv, updateKv, cb) } } } else { if len(insertKv) > 0 { this.Insert(dataSource, tblName, insertKv, cb) } } }) } func (this *dbPool) PageQuery( dataSource string, perPage int32, page int32, sql string, params []string, filter DbQueryFilter, orderBy string, cb PageQueryCb) { var pagination Pagination pagination.PerPage = q5.Max(1, perPage) pagination.CurrentPage = q5.Max(1, page) finalySql := sql if filter != nil { finalySql += filter.GenSql() } if orderBy != "" { finalySql += " " + orderBy + " " } //GetSysLog().Info("finalySql:%s", finalySql) this.queryOne( dataSource, fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql), params, func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } if rows != nil && rows.Next() { pagination.Total = q5.ToInt32(rows.GetByIndex(0)) pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(rows.GetByIndex(0)) / float64(pagination.PerPage))) } start := pagination.PerPage * (pagination.CurrentPage - 1) limit := pagination.PerPage this.query( dataSource, fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit), params, func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } pagination.Rows = rows cb(nil, &pagination) }) }) } func (this *dbPool) StreamPageQuery2( dataSource string, pageSize int32, cursor int64, sql string, params []string, filter DbQueryFilter, orderBy string, cb SteamPageQueryCb, fillCb func(*DataSet)) { if (pageSize <= 0) { pageSize = 1 } if (pageSize > 1000) { pageSize = 1000 } var pagination StreamPagination finalySql := sql if filter != nil { finalySql += filter.GenSql() } if orderBy != "" { finalySql += " " + orderBy + " " } finalySql += fmt.Sprintf(" LIMIT %d ", pageSize + 1) //GetSysLog().Info("finalySql:%s", finalySql) this.query( dataSource, finalySql, params, func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } pagination.PreviousCursor = cursor for rows.Next() { if (rows.NumOfReaded() <= int64(pageSize)) { pagination.Count += 1 fillCb(rows) if (rows.NumOfReaded() == int64(pageSize)) { pagination.NextCursor = q5.ToInt64(rows.GetByName("idx")) } } else if (rows.NumOfReaded() > int64(pageSize)) { pagination.Remaining = 1 } } cb(nil, &pagination) }) } func (this *dbPool) StreamPageQuery( dataSource string, pageSize int32, cursor int64, sql string, params []string, filter DbQueryFilter, orderBy string, cb SteamPageQueryCb, fillCb func(*DataSet)) { if (pageSize <= 0) { pageSize = 1 } if (pageSize > 1000) { pageSize = 1000 } var pagination StreamPagination finalySql := sql if filter != nil { finalySql += filter.GenSql() } if orderBy != "" { finalySql += " " + orderBy + " " } //finalySql += fmt.Sprintf(" LIMIT %d ", pageSize + 1) //GetSysLog().Info("finalySql:%s", finalySql) this.queryOne( dataSource, fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql), params, func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } var total int32 var totalPages int32 if rows != nil && rows.Next() { total = q5.ToInt32(rows.GetByIndex(0)) totalPages = int32(math.Ceil(q5.ToFloat64(rows.GetByIndex(0)) / float64(pageSize))) } if cursor <= 0 { cursor = 1 } start := pageSize * (int32(cursor) - 1) limit := pageSize this.query( dataSource, fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit), params, func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } //pagination.Rows = rows pagination.PreviousCursor = cursor for rows.Next() { fillCb(rows) pagination.Count += 1 } if int32(cursor) < totalPages { pagination.NextCursor = cursor + 1 pagination.Remaining = 1 } pagination.TotalCount = total cb(nil, &pagination) }) }) } func (this *dbPool) borrowConn(name string) *dataSource { tryCount := 0 for tryCount < 5 { { this.lock.Lock() if head, ok := this.dataSourceHash[name]; ok { if !head.Empty() { ds := head.FirstEntry().(*dataSource) //ds.entry.DelInit() this.lock.Unlock() return ds } } this.lock.Unlock() } time.Sleep(time.Second * 1) tryCount++ } return nil /* tryCount := 0 for tryCount < 5 { { this.lock.Lock() if head, ok := this.dataSourceHash[name]; ok { if !head.Empty() { ds := head.FirstEntry().(*dataSource) ds.entry.DelInit() this.lock.Unlock() return ds } } this.lock.Unlock() } time.Sleep(time.Second * 1) tryCount++ } return nil*/ } func (this *dbPool) returnConn(ds *dataSource) { this.lock.Lock() defer this.lock.Unlock() /* if head, ok := this.dataSourceHash[ds.name]; ok { head.AddTail(&ds.entry) } else { panic(fmt.Sprintf("returnConn error %s", ds.name)) }*/ } func (this *dbPool) joinSelectFields(fields []string) string { return strings.Join( q5.Map(fields, func(val string) string { return "`" + val + "`" }), ", ") } func (this *dbPool) joinWhere(sql *string, params *[]string, whereKv [][]string) { for _, items := range whereKv { *sql += " AND " + items[0] + "=?" *params = append(*params, items[1]) } } func (this *dbPool) joinWhereLike(sql *string, params *[]string, whereKv [][]string) { for _, items := range whereKv { *sql += " AND " + items[0] + " LIKE ? " *params = append(*params, items[1]) } } func (this *dbPool) joinUpdateFields(fieldsKv [][]string, params *[]string) string { sql := "" for index, items := range fieldsKv { suffix := "" if index+1 < len(fieldsKv) { suffix = "," } if items[0][0] == '!' { sql += " `" + items[0][1:] + "`=" + items[1] + suffix } else { sql += " `" + items[0] + "`=?" + suffix *params = append(*params, items[1]) } } return sql } func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) string { sql := " (" for index, items := range fieldsKv { suffix := "" if index+1 < len(fieldsKv) { suffix = "," } sql += "`" + items[0] + "`" + suffix } sql += ")" sql += " VALUES(" for index, items := range fieldsKv { suffix := "" if index+1 < len(fieldsKv) { suffix = "," } sql += "?" + suffix *params = append(*params, items[1]) } sql += ")" return sql } func (this *dbPool) internalExec(dataSource string, sql string, params []string, cb ExecResultCb) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), int64(0), int64(0)) return } this.returnConn(ds) result, err := ds.conn.Exec(sql, q5.ToInterfaces(params)...) //this.returnConn(ds) if err != nil { GetSysLog().Warning("f5.dbpool.internalExec error:%s sql:%s\\%s", err, sql, q5.GetCallStack()) } var lastInsertId int64 var rowsAffected int64 if err == nil { if id, err := result.LastInsertId(); err == nil { lastInsertId = id } if id, err := result.RowsAffected(); err == nil { rowsAffected = id } } if this.style == GO_STYLE_DB { cb(err, lastInsertId, rowsAffected) } else { _app.RegisterMainThreadCb( func() { cb(err, lastInsertId, rowsAffected) }) } } func (this *dbPool) internalQuery(dataSource string, sql string, params []string, cb QueryResultCb) { this.internalQueryEx(dataSource, sql, params, cb, false) } func (this *dbPool) syncInternalQuery(dataSource string, sql string, params []string, cb QueryResultCb) { this.internalQueryEx(dataSource, sql, params, cb, true) } func (this *dbPool) internalQueryEx(dataSource string, sql string, params []string, cb QueryResultCb, noMainThread bool) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), nil) return } this.returnConn(ds) rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) //this.returnConn(ds) if err != nil { GetSysLog().Warning("f5.dbpool.internalQuery error:%s sql:%s", err, sql) } var dataSet *DataSet if err == nil { dataSet = newDataSet(rows) } freeFunc := func () { if dataSet != nil { dataSet.close() } } if this.style == GO_STYLE_DB || noMainThread { defer freeFunc() cb(err, dataSet) } else { _app.RegisterMainThreadCb( func() { defer freeFunc() cb(err, dataSet) }) } } func (this *dbPool) internalQueryOne(dataSource string, sql string, params []string, cb QueryOneCb) { ds := this.borrowConn(dataSource) if ds == nil { cb(errors.New("borrowConn error"), nil) return } this.returnConn(ds) rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) //this.returnConn(ds) if err != nil { GetSysLog().Warning("f5.dbpool.internalQueryOne error:%s sql:%s", err, sql) } var dataSet *DataSet if err == nil { dataSet = newDataSet(rows) } freeFunc := func () { if dataSet != nil { dataSet.close() } } if this.style == GO_STYLE_DB { defer freeFunc() cb(err, dataSet) } else { _app.RegisterMainThreadCb( func() { defer freeFunc() cb(err, dataSet) }) } } func (this* dbPool) query(dataSource string, sql string, params []string, cb QueryResultCb) { if this.style == GO_STYLE_DB { this.internalQuery(dataSource, sql, params, cb) } else { go this.internalQuery(dataSource, sql, params, cb) } } func (this *dbPool) exec(dataSource string, sql string, params []string, cb ExecResultCb) { if this.style == GO_STYLE_DB { this.internalExec(dataSource, sql, params, cb) } else { go this.internalExec(dataSource, sql, params, cb) } } func (this *dbPool) queryOne(dataSource string, sql string, params []string, cb QueryOneCb) { if this.style == GO_STYLE_DB { this.internalQueryOne(dataSource, sql, params, cb) } else { go this.internalQueryOne(dataSource, sql, params, cb) } } func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watchTimeCb func() int64, cond *sync.Cond) { var lastMaxIdx int64 for true { hasNewDdata := false this.RawQuery( dataSource, fmt.Sprintf("SELECT MAX(idx) AS max_idx FROM %s", watchTable), []string{}, func (err error, ds *DataSet) { if err != nil { return } if ds.Next() { idx := q5.ToInt64(ds.GetByName("max_idx")) if idx > lastMaxIdx { lastMaxIdx = idx hasNewDdata = true GetSysLog().Info("%s hasNewData max_idx:%d", watchTable, lastMaxIdx) } } }) if hasNewDdata { cond.Broadcast() } else { time.Sleep(time.Second * time.Duration(watchTimeCb())) } } } func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string, name string, item *maxIdxMonitorProgItem) *chan int64 { this.monitorLock.Lock() defer this.monitorLock.Unlock() key := fmt.Sprintf("%s_%s", dataSource, tblName) chNewMaxIdx := make(chan int64) newIdx := atomic.AddInt64(&this.currMonitorIdx, 1) if p, ok := this.maxIdxMonitorHash.Load(key); ok { (*p).Store(newIdx, &chNewMaxIdx) } else { p := new(q5.ConcurrentMap[int64, *chan int64]) p.Store(newIdx, &chNewMaxIdx) this.maxIdxMonitorHash.Store(key, p) notifyFunc := func (newMaxIdx int64) { p.Range(func(id int64, ch *chan int64) bool { go func() { (*ch) <- newMaxIdx }() return true }) } var lastMaxIdx int64 prog := new(maxIdxMonitorProg) prog.dataSource = dataSource prog.tblName = tblName prog.maxIdx = &lastMaxIdx prog.itemHash = new(q5.ConcurrentMap[string, *maxIdxMonitorProgItem]) this.maxIdxMonitorProgHash.Store(key, prog) go func () { for { this.RawQuery( dataSource, fmt.Sprintf("SELECT MAX(idx) FROM %s", tblName), []string{}, func (err error, ds *DataSet) { if err != nil { time.Sleep(time.Millisecond * time.Duration(8000 + rand.Intn(1200))) return } if ds.Next() { idx := q5.ToInt64(ds.GetByIndex(0)) if idx > lastMaxIdx { lastMaxIdx = idx GetSysLog().Info("%s hasNewData max_idx:%d", tblName, lastMaxIdx) } } }) go notifyFunc(lastMaxIdx) time.Sleep(time.Millisecond * time.Duration(800 + rand.Intn(1200))) } }() } this.addMonitorProgItem(key, name, item) return &chNewMaxIdx } func (this *dbPool) addMonitorProgItem(key string, name string, item *maxIdxMonitorProgItem) { if p, ok := this.maxIdxMonitorProgHash.Load(key); ok { (*p).itemHash.Store(name, item) } else { panic(fmt.Sprintf("addMonitorProgItem key:%s name:%s lastIdx:%s", key, name)) } }