This commit is contained in:
aozhiwei 2024-07-23 16:20:00 +08:00
parent 9aa9a8e438
commit 2e198e3ab2

View File

@ -25,11 +25,18 @@ type dataSource struct {
entry q5.ListHead entry q5.ListHead
} }
type maxIdxMonitorProgItem struct {
execTimes int64
errTimes int64
abortTimes int64
lastIdx *int64
}
type maxIdxMonitorProg struct { type maxIdxMonitorProg struct {
dataSource string dataSource string
tblName string tblName string
maxIdx *int64 maxIdx *int64
itemHash *q5.ConcurrentMap[string, *int64] itemHash *q5.ConcurrentMap[string, *maxIdxMonitorProgItem]
} }
type dbPool struct { type dbPool struct {
@ -55,13 +62,16 @@ func (this *dbPool) outputMonitorLog() {
if this.maxIdxMonitorProgHash.GetSize() > 0 { if this.maxIdxMonitorProgHash.GetSize() > 0 {
GetSysLog().Info("--------------------------------------------------------------------------------------") GetSysLog().Info("--------------------------------------------------------------------------------------")
this.maxIdxMonitorProgHash.Range(func (k string, ele *maxIdxMonitorProg) bool { this.maxIdxMonitorProgHash.Range(func (k string, ele *maxIdxMonitorProg) bool {
ele.itemHash.Range(func (k1 string, ele2 *int64) bool { ele.itemHash.Range(func (k1 string, ele2 *maxIdxMonitorProgItem) bool {
GetSysLog().Info("%s.%s.%s maxIdx:%d lastIdx:%d", GetSysLog().Info("%s.%s.%s maxIdx:%d lastIdx:%d exec:%d err:%d abort:%d",
ele.dataSource, ele.dataSource,
ele.tblName, ele.tblName,
k1, k1,
*ele.maxIdx, *ele.maxIdx,
*ele2) *ele2.lastIdx,
ele2.execTimes,
ele2.errTimes,
ele2.abortTimes)
return true return true
}) })
return true return true
@ -283,8 +293,10 @@ func (this *dbPool) IncrementLoad(
var maxIdx int64 var maxIdx int64
var newMaxIdx int64 var newMaxIdx int64
cond := sync.NewCond(new(sync.Mutex)) cond := sync.NewCond(new(sync.Mutex))
item := new(maxIdxMonitorProgItem)
item.lastIdx = &lastIdx
{ {
chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, &lastIdx) chNewMaxIdx := this.registerNewMaxIdx(dataSource, tblName, name, item)
go func () { go func () {
for { for {
select { select {
@ -306,13 +318,16 @@ func (this *dbPool) IncrementLoad(
sql, sql,
params, params,
func (err error, ds *DataSet) { func (err error, ds *DataSet) {
item.execTimes += 1
if err != nil { if err != nil {
item.errTimes += 1
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
return return
} }
for ds.Next() { for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx")) idx := q5.ToInt64(ds.GetByName("idx"))
if !doCb(ds) { if !doCb(ds) {
item.abortTimes += 1
return return
} }
if idx > lastIdx { if idx > lastIdx {
@ -920,7 +935,7 @@ func (this *dbPool ) discoverNewData(dataSource string, watchTable string, watch
} }
func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string, func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string,
name string, lastIdx *int64) *chan int64 { name string, item *maxIdxMonitorProgItem) *chan int64 {
this.monitorLock.Lock() this.monitorLock.Lock()
defer this.monitorLock.Unlock() defer this.monitorLock.Unlock()
@ -946,7 +961,7 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string,
prog.dataSource = dataSource prog.dataSource = dataSource
prog.tblName = tblName prog.tblName = tblName
prog.maxIdx = &lastMaxIdx prog.maxIdx = &lastMaxIdx
prog.itemHash = new(q5.ConcurrentMap[string, *int64]) prog.itemHash = new(q5.ConcurrentMap[string, *maxIdxMonitorProgItem])
this.maxIdxMonitorProgHash.Store(key, prog) this.maxIdxMonitorProgHash.Store(key, prog)
go func () { go func () {
for { for {
@ -972,14 +987,14 @@ func (this *dbPool) registerNewMaxIdx(dataSource string, tblName string,
} }
}() }()
} }
this.addMonitorProgItem(key, name, lastIdx) this.addMonitorProgItem(key, name, item)
return &chNewMaxIdx return &chNewMaxIdx
} }
func (this *dbPool) addMonitorProgItem(key string, name string, lastIdx *int64) { func (this *dbPool) addMonitorProgItem(key string, name string, item *maxIdxMonitorProgItem) {
if p, ok := this.maxIdxMonitorProgHash.Load(key); ok { if p, ok := this.maxIdxMonitorProgHash.Load(key); ok {
(*p).itemHash.Store(name, lastIdx) (*p).itemHash.Store(name, item)
} else { } else {
panic(fmt.Sprintf("addMonitorProgItem key:%s name:%s lastIdx:%s", key, name, lastIdx)) panic(fmt.Sprintf("addMonitorProgItem key:%s name:%s lastIdx:%s", key, name))
} }
} }