1
This commit is contained in:
parent
01ba77f7ae
commit
0e32fe0327
42
app.go
42
app.go
@ -79,13 +79,14 @@ type app struct {
|
|||||||
ormDbHash map[string]*gorm.DB
|
ormDbHash map[string]*gorm.DB
|
||||||
caHandlersMutex sync.RWMutex
|
caHandlersMutex sync.RWMutex
|
||||||
caHandlers map[string]GinHandlerFunc
|
caHandlers map[string]GinHandlerFunc
|
||||||
pendingAsyncTask map[string]map[string][]*LockAsyncTask
|
pendingAsyncTask map[string]*q5.ListHead
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *app) init(userApp UserApp) {
|
func (this *app) init(userApp UserApp) {
|
||||||
this.userApp = userApp
|
this.userApp = userApp
|
||||||
this.nowTime = time.Now()
|
this.nowTime = time.Now()
|
||||||
this.pid = os.Getpid()
|
this.pid = os.Getpid()
|
||||||
|
this.pendingAsyncTask = make(map[string]*q5.ListHead)
|
||||||
this.ormDbHash = make(map[string]*gorm.DB)
|
this.ormDbHash = make(map[string]*gorm.DB)
|
||||||
this.SetTimeOffset(0)
|
this.SetTimeOffset(0)
|
||||||
atomic.StoreInt64(&this.nowUnixNano, this.nowTime.UnixNano())
|
atomic.StoreInt64(&this.nowUnixNano, this.nowTime.UnixNano())
|
||||||
@ -385,6 +386,45 @@ func (this *app) GetPid() int {
|
|||||||
return this.pid
|
return this.pid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *app) forcePendingAsyncTask(key string) *q5.ListHead {
|
||||||
|
if l, ok := this.pendingAsyncTask[key]; ok {
|
||||||
|
return l
|
||||||
|
} else {
|
||||||
|
l := q5.NewListHead()
|
||||||
|
this.pendingAsyncTask[key] = l
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *app) isFirstAsyncTask(key string, entry *q5.ListHead) bool {
|
||||||
|
if l, ok := this.pendingAsyncTask[key]; ok {
|
||||||
|
return l.IsFirst(entry)
|
||||||
|
} else {
|
||||||
|
panic("app.isFirstAsyncTask key not exists:" + key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *app) notifyNextTask(key string) {
|
||||||
|
if l, ok := this.pendingAsyncTask[key]; ok {
|
||||||
|
if !l.Empty() {
|
||||||
|
task := l.FirstEntry().(*LockAsyncTask)
|
||||||
|
task.checkDo()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *app) clearEmptyPendingAsyncTask(key string) {
|
||||||
|
deletedKeys := make(map[string]int32)
|
||||||
|
if l, ok := this.pendingAsyncTask[key]; ok {
|
||||||
|
if l.Empty() {
|
||||||
|
deletedKeys[key] = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, _ := range(deletedKeys) {
|
||||||
|
delete(this.pendingAsyncTask, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func parseArgs() (int, int) {
|
func parseArgs() (int, int) {
|
||||||
args := os.Args[1:]
|
args := os.Args[1:]
|
||||||
if len(args) <= 0 {
|
if len(args) <= 0 {
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type taskLock struct {
|
type taskLock struct {
|
||||||
|
key string
|
||||||
entry q5.ListHead
|
entry q5.ListHead
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -14,7 +15,7 @@ type AsyncTask struct {
|
|||||||
succCb func(*AsyncTask)
|
succCb func(*AsyncTask)
|
||||||
failCb func(*AsyncTask)
|
failCb func(*AsyncTask)
|
||||||
execTimes int64
|
execTimes int64
|
||||||
lockKeys [][]string
|
lockKeys map[string]*taskLock
|
||||||
}
|
}
|
||||||
|
|
||||||
type LockAsyncTask = AsyncTask
|
type LockAsyncTask = AsyncTask
|
||||||
@ -72,36 +73,76 @@ func (this *AsyncTask) Continue() *AsyncTask {
|
|||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *AsyncTask) do() *AsyncTask {
|
func (this *AsyncTask) checkDo() *AsyncTask {
|
||||||
if !this.IsRunning() {
|
if !this.IsRunning() {
|
||||||
panic("task is not runing")
|
panic("task is not runing")
|
||||||
}
|
}
|
||||||
GetApp().RegisterMainThreadCb(
|
if this.allIsReady() {
|
||||||
func () {
|
GetApp().RegisterMainThreadCb(
|
||||||
this.cb(this)
|
func () {
|
||||||
this.execTimes += 1
|
this.cb(this)
|
||||||
})
|
this.execTimes += 1
|
||||||
|
})
|
||||||
|
}
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *AsyncTask) OnSucc(cb func(*AsyncTask)) *AsyncTask {
|
func (this *AsyncTask) OnSucc(cb func(*AsyncTask)) *AsyncTask {
|
||||||
|
this.ClearLocks()
|
||||||
this.succCb = cb
|
this.succCb = cb
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *AsyncTask) OnFail(cb func(*AsyncTask)) *AsyncTask {
|
func (this *AsyncTask) OnFail(cb func(*AsyncTask)) *AsyncTask {
|
||||||
|
this.ClearLocks()
|
||||||
this.failCb = cb
|
this.failCb = cb
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *AsyncTask) allIsReady() bool {
|
||||||
|
var allReady = true
|
||||||
|
for _, lock := range(this.lockKeys) {
|
||||||
|
if !_app.isFirstAsyncTask(lock.key, &lock.entry) {
|
||||||
|
allReady = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allReady
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *AsyncTask) ClearLocks() {
|
||||||
|
for _, lock := range(this.lockKeys) {
|
||||||
|
lock.entry.DelInit()
|
||||||
|
}
|
||||||
|
for _, lock := range(this.lockKeys) {
|
||||||
|
_app.clearEmptyPendingAsyncTask(lock.key)
|
||||||
|
}
|
||||||
|
for _, lock := range(this.lockKeys) {
|
||||||
|
_app.notifyNextTask(lock.key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewAsyncTask(cb func(*AsyncTask)) *AsyncTask {
|
func NewAsyncTask(cb func(*AsyncTask)) *AsyncTask {
|
||||||
p := new(AsyncTask)
|
p := new(AsyncTask)
|
||||||
p.lockKeys = [][]string{}
|
p.lockKeys = make(map[string]*taskLock)
|
||||||
return p.init(cb).Continue()
|
return p.init(cb).Continue()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLockAsyncTask(keys [][]string, cb func(*LockAsyncTask)) *LockAsyncTask {
|
func NewLockAsyncTask(keys [][]string, cb func(*LockAsyncTask)) *LockAsyncTask {
|
||||||
p := new(AsyncTask)
|
p := new(AsyncTask)
|
||||||
p.lockKeys = [][]string{}
|
p.lockKeys = make(map[string]*taskLock)
|
||||||
return p.init(cb).Continue()
|
for _, val := range(keys) {
|
||||||
|
if len(val) != 2 {
|
||||||
|
panic("NewLockAsyncTask len error:" + q5.EncodeJson(val))
|
||||||
|
}
|
||||||
|
lock := new(taskLock)
|
||||||
|
lock.key = val[0] + val[1]
|
||||||
|
lock.entry.Init(p)
|
||||||
|
if _, ok := p.lockKeys[lock.key]; ok {
|
||||||
|
panic("NewLockAsyncTask duplicate key:" + lock.key)
|
||||||
|
}
|
||||||
|
p.lockKeys[lock.key] = lock
|
||||||
|
_app.forcePendingAsyncTask(lock.key).AddTail(&lock.entry)
|
||||||
|
}
|
||||||
|
return p.init(cb).checkDo()
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user