From c1fe4f0cb152185fd022522a67267b317acb51a7 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Tue, 9 Apr 2024 21:15:05 +0800 Subject: [PATCH] 1 --- async_task.go | 40 ---------- lock_async_task.go | 179 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+), 40 deletions(-) create mode 100644 lock_async_task.go diff --git a/async_task.go b/async_task.go index 14b18d8..583b152 100644 --- a/async_task.go +++ b/async_task.go @@ -1,14 +1,8 @@ package f5 import ( - "q5" ) -type taskLock struct { - key string - entry q5.ListHead -} - type AsyncTask struct { status int32 debugInfo string @@ -21,8 +15,6 @@ type AsyncTask struct { lockKeys map[string]*taskLock } -type LockAsyncTask = AsyncTask - func (this *AsyncTask) init(cb func(*AsyncTask)) *AsyncTask { this.cb = cb return this @@ -153,35 +145,3 @@ func NewAsyncTask(cb func(*AsyncTask)) *AsyncTask { p.lockKeys = make(map[string]*taskLock) return p.init(cb).Continue() } - -func NewLockAsyncTask(keys [][]string, cb func(*LockAsyncTask)) *LockAsyncTask { - p := new(AsyncTask) - p.lockKeys = make(map[string]*taskLock) - for _, val := range(keys) { - if len(val) != 2 { - panic("NewLockAsyncTask len error:" + q5.EncodeJson(val)) - } - lock := new(taskLock) - lock.key = val[0] + val[1] - lock.entry.Init(p) - if _, ok := p.lockKeys[lock.key]; ok { - panic("NewLockAsyncTask duplicate key:" + lock.key) - } - p.lockKeys[lock.key] = lock - _app.forcePendingAsyncTask(lock.key).AddTail(&lock.entry) - } - if q5.IsDebug() { - p.debugInfo = q5.GetCallStack() - _timer.SetTimeout( - 1000 * 2, - func (e int32, args* q5.Args) { - if e == q5.TIMER_EXEC_EVENT { - if p.IsRunning() { - _sysLog.Warning("LockAsyncaskTimeOut %s", p.debugInfo) - panic("LockAsyncaskTimeOut") - } - } - }) - } - return p.init(cb).checkDo() -} diff --git a/lock_async_task.go b/lock_async_task.go new file mode 100644 index 0000000..059a3fa --- /dev/null +++ b/lock_async_task.go @@ -0,0 +1,179 @@ +package f5 + +import ( + "q5" +) + +type taskLock struct { + key string + entry q5.ListHead +} + +type LockAsyncTask struct { + status int32 + debugInfo string + cb func(*LockAsyncTask) + succCb func(*LockAsyncTask) + failCb func(*LockAsyncTask) + exitCb func(*LockAsyncTask) + preExecTimes int64 + execTimes int64 + lockKeys map[string]*taskLock +} + +func (this *LockAsyncTask) init(cb func(*LockAsyncTask)) *LockAsyncTask { + this.cb = cb + return this +} + +func (this *LockAsyncTask) GetExecTimes() int64 { + return this.execTimes +} + +func (this *LockAsyncTask) IsRunning() bool { + return this.status == 0 +} + +func (this *LockAsyncTask) IsSucc() bool { + return this.status > 0 +} + +func (this *LockAsyncTask) IsFail() bool { + return this.status < 0 +} + +func (this *LockAsyncTask) SetSucc() { + if !this.IsRunning() { + panic("task is not runing") + } + this.status = 1 + this.ClearLocks() + if this.succCb != nil { + this.succCb(this) + } + if this.exitCb != nil { + this.exitCb(this) + } +} + +func (this *LockAsyncTask) SetFail() { + if !this.IsRunning() { + panic("task is not runing") + } + this.status = -1 + this.ClearLocks() + if this.failCb != nil { + this.failCb(this) + } + if this.exitCb != nil { + this.exitCb(this) + } +} + +func (this *LockAsyncTask) Continue() *LockAsyncTask { + if !this.IsRunning() { + panic("task is not runing") + } + if len(this.lockKeys) > 0 { + panic("lockTask not support continue") + } + GetApp().RegisterMainThreadCb( + func () { + this.cb(this) + this.execTimes += 1 + }) + return this +} + +func (this *LockAsyncTask) checkDo() *LockAsyncTask { + if !this.IsRunning() { + panic("task is not runing") + } + if this.allIsReady() { + if this.preExecTimes <= 0 { + this.preExecTimes++ + if this.preExecTimes > 1 { + panic("locktask preExectimes > 0") + } + GetApp().RegisterMainThreadCb( + func () { + this.cb(this) + if this.execTimes > 0 { + panic("locktask only run once") + } + this.execTimes += 1 + }) + } + } + return this +} + +func (this *LockAsyncTask) OnSucc(cb func(*LockAsyncTask)) *LockAsyncTask { + this.succCb = cb + return this +} + +func (this *LockAsyncTask) OnFail(cb func(*LockAsyncTask)) *LockAsyncTask { + this.failCb = cb + return this +} + +func (this *LockAsyncTask) OnExit(cb func(*LockAsyncTask)) *LockAsyncTask { + this.exitCb = cb + return this +} + +func (this *LockAsyncTask) allIsReady() bool { + var allReady = true + for _, lock := range(this.lockKeys) { + if !_app.isFirstAsyncTask(lock.key, &lock.entry) { + allReady = false + break + } + } + return allReady +} + +func (this *LockAsyncTask) ClearLocks() { + for _, lock := range(this.lockKeys) { + lock.entry.DelInit() + } + for _, lock := range(this.lockKeys) { + _app.clearEmptyPendingAsyncTask(lock.key) + } + for _, lock := range(this.lockKeys) { + _app.notifyNextTask(lock.key) + } +} + +func NewLockAsyncTask(keys [][]string, cb func(*LockAsyncTask)) *LockAsyncTask { + p := new(LockAsyncTask) + p.lockKeys = make(map[string]*taskLock) + for _, val := range(keys) { + if len(val) != 2 { + panic("NewLockAsyncTask len error:" + q5.EncodeJson(val)) + } + lock := new(taskLock) + lock.key = val[0] + val[1] + lock.entry.Init(p) + if _, ok := p.lockKeys[lock.key]; ok { + panic("NewLockAsyncTask duplicate key:" + lock.key) + } + p.lockKeys[lock.key] = lock + _app.forcePendingAsyncTask(lock.key).AddTail(&lock.entry) + } + if q5.IsDebug() { + p.debugInfo = q5.GetCallStack() + _timer.SetTimeout( + 1000 * 2, + func (e int32, args* q5.Args) { + if e == q5.TIMER_EXEC_EVENT { + if p.IsRunning() { + _sysLog.Warning("LockAsyncaskTimeOut %s", p.debugInfo) + panic("LockAsyncaskTimeOut") + } + } + }) + } + return p.init(cb).checkDo() +}