diff --git a/app.go b/app.go index e14289b..d069fbf 100644 --- a/app.go +++ b/app.go @@ -79,13 +79,14 @@ type app struct { ormDbHash map[string]*gorm.DB caHandlersMutex sync.RWMutex caHandlers map[string]GinHandlerFunc - pendingAsyncTask map[string]map[string][]*LockAsyncTask + pendingAsyncTask map[string]*q5.ListHead } func (this *app) init(userApp UserApp) { this.userApp = userApp this.nowTime = time.Now() this.pid = os.Getpid() + this.pendingAsyncTask = make(map[string]*q5.ListHead) this.ormDbHash = make(map[string]*gorm.DB) this.SetTimeOffset(0) atomic.StoreInt64(&this.nowUnixNano, this.nowTime.UnixNano()) @@ -385,6 +386,45 @@ func (this *app) GetPid() int { return this.pid } +func (this *app) forcePendingAsyncTask(key string) *q5.ListHead { + if l, ok := this.pendingAsyncTask[key]; ok { + return l + } else { + l := q5.NewListHead() + this.pendingAsyncTask[key] = l + return l + } +} + +func (this *app) isFirstAsyncTask(key string, entry *q5.ListHead) bool { + if l, ok := this.pendingAsyncTask[key]; ok { + return l.IsFirst(entry) + } else { + panic("app.isFirstAsyncTask key not exists:" + key) + } +} + +func (this *app) notifyNextTask(key string) { + if l, ok := this.pendingAsyncTask[key]; ok { + if !l.Empty() { + task := l.FirstEntry().(*LockAsyncTask) + task.checkDo() + } + } +} + +func (this *app) clearEmptyPendingAsyncTask(key string) { + deletedKeys := make(map[string]int32) + if l, ok := this.pendingAsyncTask[key]; ok { + if l.Empty() { + deletedKeys[key] = 1 + } + } + for k, _ := range(deletedKeys) { + delete(this.pendingAsyncTask, k) + } +} + func parseArgs() (int, int) { args := os.Args[1:] if len(args) <= 0 { diff --git a/async_task.go b/async_task.go index 7d95133..b60699a 100644 --- a/async_task.go +++ b/async_task.go @@ -5,6 +5,7 @@ import ( ) type taskLock struct { + key string entry q5.ListHead } @@ -14,7 +15,7 @@ type AsyncTask struct { succCb func(*AsyncTask) failCb func(*AsyncTask) execTimes int64 - lockKeys [][]string + lockKeys map[string]*taskLock } type LockAsyncTask = AsyncTask @@ -72,36 +73,76 @@ func (this *AsyncTask) Continue() *AsyncTask { return this } -func (this *AsyncTask) do() *AsyncTask { +func (this *AsyncTask) checkDo() *AsyncTask { if !this.IsRunning() { panic("task is not runing") } - GetApp().RegisterMainThreadCb( - func () { - this.cb(this) - this.execTimes += 1 - }) + if this.allIsReady() { + GetApp().RegisterMainThreadCb( + func () { + this.cb(this) + this.execTimes += 1 + }) + } return this } func (this *AsyncTask) OnSucc(cb func(*AsyncTask)) *AsyncTask { + this.ClearLocks() this.succCb = cb return this } func (this *AsyncTask) OnFail(cb func(*AsyncTask)) *AsyncTask { + this.ClearLocks() this.failCb = cb return this } +func (this *AsyncTask) allIsReady() bool { + var allReady = true + for _, lock := range(this.lockKeys) { + if !_app.isFirstAsyncTask(lock.key, &lock.entry) { + allReady = false + break + } + } + return allReady +} + +func (this *AsyncTask) ClearLocks() { + for _, lock := range(this.lockKeys) { + lock.entry.DelInit() + } + for _, lock := range(this.lockKeys) { + _app.clearEmptyPendingAsyncTask(lock.key) + } + for _, lock := range(this.lockKeys) { + _app.notifyNextTask(lock.key) + } +} + func NewAsyncTask(cb func(*AsyncTask)) *AsyncTask { p := new(AsyncTask) - p.lockKeys = [][]string{} + p.lockKeys = make(map[string]*taskLock) return p.init(cb).Continue() } func NewLockAsyncTask(keys [][]string, cb func(*LockAsyncTask)) *LockAsyncTask { p := new(AsyncTask) - p.lockKeys = [][]string{} - return p.init(cb).Continue() + p.lockKeys = make(map[string]*taskLock) + for _, val := range(keys) { + if len(val) != 2 { + panic("NewLockAsyncTask len error:" + q5.EncodeJson(val)) + } + lock := new(taskLock) + lock.key = val[0] + val[1] + lock.entry.Init(p) + if _, ok := p.lockKeys[lock.key]; ok { + panic("NewLockAsyncTask duplicate key:" + lock.key) + } + p.lockKeys[lock.key] = lock + _app.forcePendingAsyncTask(lock.key).AddTail(&lock.entry) + } + return p.init(cb).checkDo() }