This commit is contained in:
aozhiwei 2024-07-23 16:00:52 +08:00
parent ea3ed6fc66
commit ed4f91fb1a

View File

@ -25,6 +25,13 @@ type dataSource struct {
entry q5.ListHead
}
type maxIdxMonitorProg struct {
dataSource string
tblName string
maxIdx *int64
itemHash *q5.ConcurrentMap[string, *int64]
}
type dbPool struct {
style DBStyle
lock sync.Mutex
@ -32,12 +39,37 @@ type dbPool struct {
currMonitorIdx int64
monitorLock sync.Mutex
maxIdxMonitorHash *q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]]
maxIdxMonitorProgHash *q5.ConcurrentMap[string, *maxIdxMonitorProg]
}
func (this *dbPool) init(style DBStyle) {
this.style = style
this.dataSourceHash = make(map[string]*q5.ListHead)
this.maxIdxMonitorHash = new(q5.ConcurrentMap[string, *q5.ConcurrentMap[int64, *chan int64]])
this.maxIdxMonitorProgHash = new(q5.ConcurrentMap[string, *maxIdxMonitorProg])
go this.outputMonitorLog()
}
func (this *dbPool) outputMonitorLog() {
for {
if this.maxIdxMonitorProgHash.GetSize() > 0 {
GetSysLog().Info("-------------------------------------------------------------")
this.maxIdxMonitorProgHash.Range(func (k string, ele *maxIdxMonitorProg) bool {
ele.itemHash.Range(func (k1 string, ele2 *int64) bool {
GetSysLog().Info("%s.%s.%s maxIdx:%d lastIdx:%d",
ele.dataSource,
ele.tblName,
k1,
*ele.maxIdx,
*ele2)
return true
})
return true
})
GetSysLog().Info("-------------------------------------------------------------")
}
time.Sleep(time.Second * 10)
}
}
func (this *dbPool) unInit() {
@ -252,7 +284,7 @@ func (this *dbPool) IncrementLoad(
var newMaxIdx int64
cond := sync.NewCond(new(sync.Mutex))
{
chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName)
chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, &lastIdx)
go func () {
for {
select {
@ -887,7 +919,8 @@ func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watch
}
}
func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan int64 {
func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string,
name string, lastIdx *int64) *chan int64 {
this.monitorLock.Lock()
defer this.monitorLock.Unlock()
@ -908,8 +941,14 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan i
return true
})
}
var lastMaxIdx int64
prog := new(maxIdxMonitorProg)
prog.dataSource = dataSource
prog.tblName = tblName
prog.maxIdx = &lastMaxIdx
prog.itemHash = new(q5.ConcurrentMap[string, *int64])
this.maxIdxMonitorProgHash.Store(key, prog)
go func () {
var lastMaxIdx int64
for {
this.RawQuery(
dataSource,
@ -933,5 +972,14 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string) *chan i
}
}()
}
this.addMonitorProgItem(key, name, lastIdx)
return &chNewMaxIdx
}
func (this *dbPool) addMonitorProgItem(key string, name string, lastIdx *int64) {
if p, ok := this.maxIdxMonitorProgHash.Load(key); ok {
(*p).itemHash.Store(name, lastIdx)
} else {
panic(fmt.Sprintf("addMonitorProgItem key:%s name:%s lastIdx:%s", key, name, lastIdx))
}
}