From 2e198e3ab2ac5b30295da20bb287c4e89a99658a Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Tue, 23 Jul 2024 16:20:00 +0800 Subject: [PATCH] 1 --- dbpool.go | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/dbpool.go b/dbpool.go index fc8a428..5a3f8f7 100644 --- a/dbpool.go +++ b/dbpool.go @@ -25,11 +25,18 @@ type dataSource struct { entry q5.ListHead } +type maxIdxMonitorProgItem struct { + execTimes int64 + errTimes int64 + abortTimes int64 + lastIdx *int64 +} + type maxIdxMonitorProg struct { dataSource string tblName string maxIdx *int64 - itemHash *q5.ConcurrentMap[string, *int64] + itemHash *q5.ConcurrentMap[string, *maxIdxMonitorProgItem] } type dbPool struct { @@ -55,13 +62,16 @@ func (this *dbPool) outputMonitorLog() { if this.maxIdxMonitorProgHash.GetSize() > 0 { GetSysLog().Info("--------------------------------------------------------------------------------------") this.maxIdxMonitorProgHash.Range(func (k string, ele *maxIdxMonitorProg) bool { - ele.itemHash.Range(func (k1 string, ele2 *int64) bool { - GetSysLog().Info("%s.%s.%s maxIdx:%d lastIdx:%d", + ele.itemHash.Range(func (k1 string, ele2 *maxIdxMonitorProgItem) bool { + GetSysLog().Info("%s.%s.%s maxIdx:%d lastIdx:%d exec:%d err:%d abort:%d", ele.dataSource, ele.tblName, k1, *ele.maxIdx, - *ele2) + *ele2.lastIdx, + ele2.execTimes, + ele2.errTimes, + ele2.abortTimes) return true }) return true @@ -283,8 +293,10 @@ func (this *dbPool) IncrementLoad( var maxIdx int64 var newMaxIdx int64 cond := sync.NewCond(new(sync.Mutex)) + item := new(maxIdxMonitorProgItem) + item.lastIdx = &lastIdx { - chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, &lastIdx) + chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, item) go func () { for { select { @@ -306,13 +318,16 @@ func (this *dbPool) IncrementLoad( sql, params, func (err error, ds *DataSet) { + item.execTimes += 1 if err != nil { + item.errTimes += 1 time.Sleep(time.Second * 10) return } for ds.Next() { idx := q5.ToInt64(ds.GetByName("idx")) if !doCb(ds) { + item.abortTimes += 1 return } if idx > lastIdx { @@ -920,7 +935,7 @@ func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watch } func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string, - name string, lastIdx *int64) *chan int64 { + name string, item *maxIdxMonitorProgItem) *chan int64 { this.monitorLock.Lock() defer this.monitorLock.Unlock() @@ -946,7 +961,7 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string, prog.dataSource = dataSource prog.tblName = tblName prog.maxIdx = &lastMaxIdx - prog.itemHash = new(q5.ConcurrentMap[string, *int64]) + prog.itemHash = new(q5.ConcurrentMap[string, *maxIdxMonitorProgItem]) this.maxIdxMonitorProgHash.Store(key, prog) go func () { for { @@ -972,14 +987,14 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string, } }() } - this.addMonitorProgItem(key, name, lastIdx) + this.addMonitorProgItem(key, name, item) return &chNewMaxIdx } -func (this *dbPool) addMonitorProgItem(key string, name string, lastIdx *int64) { +func (this *dbPool) addMonitorProgItem(key string, name string, item *maxIdxMonitorProgItem) { if p, ok := this.maxIdxMonitorProgHash.Load(key); ok { - (*p).itemHash.Store(name, lastIdx) + (*p).itemHash.Store(name, item) } else { - panic(fmt.Sprintf("addMonitorProgItem key:%s name:%s lastIdx:%s", key, name, lastIdx)) + panic(fmt.Sprintf("addMonitorProgItem key:%s name:%s lastIdx:%s", key, name)) } }