This commit is contained in:
aozhiwei 2024-04-09 21:15:05 +08:00
parent d785be5e37
commit c1fe4f0cb1
2 changed files with 179 additions and 40 deletions

View File

@ -1,14 +1,8 @@
package f5
import (
"q5"
)
type taskLock struct {
key string
entry q5.ListHead
}
type AsyncTask struct {
status int32
debugInfo string
@ -21,8 +15,6 @@ type AsyncTask struct {
lockKeys map[string]*taskLock
}
type LockAsyncTask = AsyncTask
func (this *AsyncTask) init(cb func(*AsyncTask)) *AsyncTask {
this.cb = cb
return this
@ -153,35 +145,3 @@ func NewAsyncTask(cb func(*AsyncTask)) *AsyncTask {
p.lockKeys = make(map[string]*taskLock)
return p.init(cb).Continue()
}
func NewLockAsyncTask(keys [][]string, cb func(*LockAsyncTask)) *LockAsyncTask {
p := new(AsyncTask)
p.lockKeys = make(map[string]*taskLock)
for _, val := range(keys) {
if len(val) != 2 {
panic("NewLockAsyncTask len error:" + q5.EncodeJson(val))
}
lock := new(taskLock)
lock.key = val[0] + val[1]
lock.entry.Init(p)
if _, ok := p.lockKeys[lock.key]; ok {
panic("NewLockAsyncTask duplicate key:" + lock.key)
}
p.lockKeys[lock.key] = lock
_app.forcePendingAsyncTask(lock.key).AddTail(&lock.entry)
}
if q5.IsDebug() {
p.debugInfo = q5.GetCallStack()
_timer.SetTimeout(
1000 * 2,
func (e int32, args* q5.Args) {
if e == q5.TIMER_EXEC_EVENT {
if p.IsRunning() {
_sysLog.Warning("LockAsyncaskTimeOut %s", p.debugInfo)
panic("LockAsyncaskTimeOut")
}
}
})
}
return p.init(cb).checkDo()
}

179
lock_async_task.go Normal file
View File

@ -0,0 +1,179 @@
package f5
import (
"q5"
)
type taskLock struct {
key string
entry q5.ListHead
}
type LockAsyncTask struct {
status int32
debugInfo string
cb func(*LockAsyncTask)
succCb func(*LockAsyncTask)
failCb func(*LockAsyncTask)
exitCb func(*LockAsyncTask)
preExecTimes int64
execTimes int64
lockKeys map[string]*taskLock
}
func (this *LockAsyncTask) init(cb func(*LockAsyncTask)) *LockAsyncTask {
this.cb = cb
return this
}
func (this *LockAsyncTask) GetExecTimes() int64 {
return this.execTimes
}
func (this *LockAsyncTask) IsRunning() bool {
return this.status == 0
}
func (this *LockAsyncTask) IsSucc() bool {
return this.status > 0
}
func (this *LockAsyncTask) IsFail() bool {
return this.status < 0
}
func (this *LockAsyncTask) SetSucc() {
if !this.IsRunning() {
panic("task is not runing")
}
this.status = 1
this.ClearLocks()
if this.succCb != nil {
this.succCb(this)
}
if this.exitCb != nil {
this.exitCb(this)
}
}
func (this *LockAsyncTask) SetFail() {
if !this.IsRunning() {
panic("task is not runing")
}
this.status = -1
this.ClearLocks()
if this.failCb != nil {
this.failCb(this)
}
if this.exitCb != nil {
this.exitCb(this)
}
}
func (this *LockAsyncTask) Continue() *LockAsyncTask {
if !this.IsRunning() {
panic("task is not runing")
}
if len(this.lockKeys) > 0 {
panic("lockTask not support continue")
}
GetApp().RegisterMainThreadCb(
func () {
this.cb(this)
this.execTimes += 1
})
return this
}
func (this *LockAsyncTask) checkDo() *LockAsyncTask {
if !this.IsRunning() {
panic("task is not runing")
}
if this.allIsReady() {
if this.preExecTimes <= 0 {
this.preExecTimes++
if this.preExecTimes > 1 {
panic("locktask preExectimes > 0")
}
GetApp().RegisterMainThreadCb(
func () {
this.cb(this)
if this.execTimes > 0 {
panic("locktask only run once")
}
this.execTimes += 1
})
}
}
return this
}
func (this *LockAsyncTask) OnSucc(cb func(*LockAsyncTask)) *LockAsyncTask {
this.succCb = cb
return this
}
func (this *LockAsyncTask) OnFail(cb func(*LockAsyncTask)) *LockAsyncTask {
this.failCb = cb
return this
}
func (this *LockAsyncTask) OnExit(cb func(*LockAsyncTask)) *LockAsyncTask {
this.exitCb = cb
return this
}
func (this *LockAsyncTask) allIsReady() bool {
var allReady = true
for _, lock := range(this.lockKeys) {
if !_app.isFirstAsyncTask(lock.key, &lock.entry) {
allReady = false
break
}
}
return allReady
}
func (this *LockAsyncTask) ClearLocks() {
for _, lock := range(this.lockKeys) {
lock.entry.DelInit()
}
for _, lock := range(this.lockKeys) {
_app.clearEmptyPendingAsyncTask(lock.key)
}
for _, lock := range(this.lockKeys) {
_app.notifyNextTask(lock.key)
}
}
func NewLockAsyncTask(keys [][]string, cb func(*LockAsyncTask)) *LockAsyncTask {
p := new(LockAsyncTask)
p.lockKeys = make(map[string]*taskLock)
for _, val := range(keys) {
if len(val) != 2 {
panic("NewLockAsyncTask len error:" + q5.EncodeJson(val))
}
lock := new(taskLock)
lock.key = val[0] + val[1]
lock.entry.Init(p)
if _, ok := p.lockKeys[lock.key]; ok {
panic("NewLockAsyncTask duplicate key:" + lock.key)
}
p.lockKeys[lock.key] = lock
_app.forcePendingAsyncTask(lock.key).AddTail(&lock.entry)
}
if q5.IsDebug() {
p.debugInfo = q5.GetCallStack()
_timer.SetTimeout(
1000 * 2,
func (e int32, args* q5.Args) {
if e == q5.TIMER_EXEC_EVENT {
if p.IsRunning() {
_sysLog.Warning("LockAsyncaskTimeOut %s", p.debugInfo)
panic("LockAsyncaskTimeOut")
}
}
})
}
return p.init(cb).checkDo()
}