diff --git a/dbpool.go b/dbpool.go index 66c7e78..b2752b5 100644 --- a/dbpool.go +++ b/dbpool.go @@ -25,6 +25,13 @@ type dataSource struct { entry q5.ListHead } +type maxIdxMonitorProg struct { + dataSource string + tblName string + maxIdx *int64 + itemHash *q5.ConcurrentMap[string, *int64] +} + type dbPool struct { style DBStyle lock sync.Mutex @@ -32,12 +39,37 @@ type dbPool struct { currMonitorIdx int64 monitorLock sync.Mutex maxIdxMonitorHash *q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]] + maxIdxMonitorProgHash *q5.ConcurrentMap[string, *maxIdxMonitorProg] } func (this *dbPool) init(style DBStyle) { this.style = style this.dataSourceHash = make(map[string]*q5.ListHead) this.maxIdxMonitorHash = new(q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]]) + this.maxIdxMonitorProgHash = new(q5.ConcurrentMap[string, *maxIdxMonitorProg]) + go this.outputMonitorLog() +} + +func (this *dbPool) outputMonitorLog() { + for { + if this.maxIdxMonitorProgHash.GetSize() > 0 { + GetSysLog().Info("-------------------------------------------------------------") + this.maxIdxMonitorProgHash.Range(func (k string, ele *maxIdxMonitorProg) bool { + ele.itemHash.Range(func (k1 string, ele2 *int64) bool { + GetSysLog().Info("%s.%s.%s maxIdx:%d lastIdx:%d", + ele.dataSource, + ele.tblName, + k1, + *ele.maxIdx, + *ele2) + return true + }) + return true + }) + GetSysLog().Info("-------------------------------------------------------------") + } + time.Sleep(time.Second * 10) + } } func (this *dbPool) unInit() { @@ -252,7 +284,7 @@ func (this *dbPool) IncrementLoad( var newMaxIdx int64 cond := sync.NewCond(new(sync.Mutex)) { - chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName) + chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, &lastIdx) go func () { for { select { @@ -887,7 +919,8 @@ func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watch } } -func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan int64 { +func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string, + name string, lastIdx *int64) *chan int64 { this.monitorLock.Lock() defer this.monitorLock.Unlock() @@ -908,8 +941,14 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan i return true }) } + var lastMaxIdx int64 + prog := new(maxIdxMonitorProg) + prog.dataSource = dataSource + prog.tblName = tblName + prog.maxIdx = &lastMaxIdx + prog.itemHash = new(q5.ConcurrentMap[string, *int64]) + this.maxIdxMonitorProgHash.Store(key, prog) go func () { - var lastMaxIdx int64 for { this.RawQuery( dataSource, @@ -933,5 +972,14 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan i } }() } + this.addMonitorProgItem(key, name, lastIdx) return &chNewMaxIdx } + +func (this *dbPool) addMonitorProgItem(key string, name string, lastIdx *int64) { + if p, ok := this.maxIdxMonitorProgHash.Load(key); ok { + (*p).itemHash.Store(name, lastIdx) + } else { + panic(fmt.Sprintf("addMonitorProgItem key:%s name:%s lastIdx:%s", key, name, lastIdx)) + } +}