package f5 import ( "fmt" "os" "q5" "strings" "sync" "sync/atomic" "time" "net/http" "github.com/gin-gonic/gin" "gorm.io/driver/mysql" "gorm.io/gorm" ) type App interface { GetPkgName() string NewGlobalUuid() string NewNodeUuid() int64 NewLockNodeUuid() int64 GetInstanceId() int32 GetNodeId() int32 GetZoneId() int32 HasFlag(int32) bool AddIMMsg(uint16, q5.Args) RegisterIMMsgHandle(uint16, func(q5.Args)) NotifyLoopCond() GetNowSeconds() int64 GetNowMillis() int64 GetNowNano() int64 GetRealSeconds() int64 GetRealMills() int64 GetTimeOffset() int32 SetTimeOffset(int32) GetLocation() *time.Location RegisterMainThreadCb(func()) GetGinEngine() *gin.Engine RegisterOrmDb(name string, host string, port int32, user string, passwd string, dataBase string) GetOrmDb(string) *gorm.DB RegisterCaHandle(c string, a string, handle GinHandlerFunc) GetPid() int } type UserApp interface { GetPkgName() string HasTask() bool Init() Update() UnInit() GetHttpListenPort() int32 } type app struct { pid int zoneId int32 nodeId int32 instanceId int32 terminated bool flags map[int32]int32 servicing bool contextHash map[int32]q5.Args loopCond *sync.Cond imTopNode *IMMsgNode imBotNode *IMMsgNode imWorkNode *IMMsgNode imMsgMutex sync.RWMutex chGoLoopTimerExit chan int chGoLoopWait chan int64 nowTime time.Time nowUnixNano int64 timeOffset int32 location *time.Location imMsgHandlers [1024]func(q5.Args) maxRunDelay int64 maxScheduleTime int64 uuid q5.Uuid lockUuid q5.Uuid uuidMutex sync.RWMutex userApp UserApp ginEngine *gin.Engine ormDbLock sync.Mutex ormDbHash map[string]*gorm.DB caHandlers sync.Map pendingAsyncTask map[string]*q5.ListHead } func (this *app) init(userApp UserApp) { this.userApp = userApp this.nowTime = time.Now() this.pid = os.Getpid() this.pendingAsyncTask = make(map[string]*q5.ListHead) this.ormDbHash = make(map[string]*gorm.DB) this.SetTimeOffset(0) atomic.StoreInt64(&this.nowUnixNano, this.nowTime.UnixNano()) _timer = new(timer) _timer.init() _sysLog = new(sysLog) _sysLog.init() _tgLog = new(tgLog) _tgLog.init() _dbFilter = new(dbFilter) _dbFilter.init() _goStyleDb = new(goDbPool) _goStyleDb.init(GO_STYLE_DB) _jsStyleDb = new(dbPool) _jsStyleDb.init(JS_STYLE_DB) _httpCliMgr = new(httpCliMgr) _httpCliMgr.init() _msgQueue = new(msgQueue) _msgQueue.init() { tmpNodeId, tmpInstanceId := parseArgs() this.nodeId = int32(tmpNodeId) this.instanceId = int32(tmpInstanceId) } this.uuid.SetMachineId((this.nodeId-1)*MAX_NODE_ID + this.instanceId) this.loopCond = sync.NewCond(new(sync.Mutex)) this.chGoLoopTimerExit = make(chan int) this.chGoLoopWait = make(chan int64) _app.RegisterIMMsgHandle( IM_EXEC_MAIN_THREAD_CB, func(args q5.Args) { cb := args[0].(func()) cb() }) this.outputRuningLog() GetSysLog().Info("node_id:%d instance_id:%d pid:%d", this.nodeId, this.instanceId, this.pid) this.installTimer() go this.goLoopTimer() this.ginEngine = gin.New() this.ginEngine.Use(gin.Recovery()) this.userApp.Init() go this.ginEngine.Run(fmt.Sprintf(":%d", this.userApp.GetHttpListenPort())) this.RegisterCaHandle("Ops", "selfChecking", func (c *gin.Context) { var msg struct { ErrCode int32 `json:"errcode"` ErrMsg string `json:"errmsg"` } msg.ErrCode = 0 msg.ErrMsg = "" c.JSON(http.StatusOK, msg) }) this.ginEngine.Any("/webapp/index.php", func (c *gin.Context) { handleName := c.DefaultQuery("c", "") + "$" + c.DefaultQuery("a", "") handle := this.getCaHandle(handleName) if handle == nil { var msg struct { ErrCode int32 `json:"errcode"` ErrMsg string `json:"errmsg"` } msg.ErrCode = 404 msg.ErrMsg = "not fond" c.JSON(http.StatusOK, msg) return } handle(c) }) } func (this *app) unInit() { this.chGoLoopTimerExit <- 1 _msgQueue.unInit() _httpCliMgr.unInit() _goStyleDb.unInit() _jsStyleDb.unInit() _dbFilter.unInit() _timer.unInit() _timer = nil this.userApp.UnInit() } func (this *app) run() { for !this.terminated { this.nowTime = time.Now() atomic.StoreInt64(&this.nowUnixNano, this.nowTime.UnixNano()) beginTick := q5.GetTickCount() _timer.update() this.dispatchIMMsg() this.userApp.Update() endTick := q5.GetTickCount() if this.maxRunDelay < endTick-beginTick { this.maxRunDelay = endTick - beginTick } this.schedule() } } func (this *app) NewGlobalUuid() string { return q5.ToString(this.uuid.Generate()) } func (this *app) NewNodeUuid() int64 { return this.uuid.Generate() } func (this *app) NewLockNodeUuid() int64 { defer this.uuidMutex.Unlock() this.uuidMutex.Lock() return this.lockUuid.Generate() } func (this *app) GetInstanceId() int32 { return this.instanceId } func (this *app) GetNodeId() int32 { return this.nodeId } func (this *app) GetZoneId() int32 { return this.nodeId } func (this *app) GetPkgName() string { return this.userApp.GetPkgName() } func (this *app) HasFlag(flag int32) bool { _, ok := this.flags[flag] return ok } func (this *app) AddIMMsg(msgId uint16, params q5.Args) { p := new(IMMsgNode) p.msgId = msgId p.params = params this.imMsgMutex.Lock() if this.imBotNode != nil { this.imBotNode.next = p this.imBotNode = p } else { this.imTopNode = p this.imBotNode = p } this.imMsgMutex.Unlock() this.loopCond.Broadcast() } func (this *app) RegisterIMMsgHandle(msgId uint16, handle func(q5.Args)) { this.imMsgHandlers[msgId] = handle } func (this *app) RegisterMainThreadCb(cb func()) { _app.AddIMMsg(IM_EXEC_MAIN_THREAD_CB, []interface{}{ cb, }) } func (this *app) goLoopTimer() { var waitMs int64 = 1000 * 10 for { select { case <-this.chGoLoopTimerExit: return case waitMs = <-this.chGoLoopWait: if waitMs < 1 { waitMs = 1 } case <-time.After(time.Millisecond * time.Duration(waitMs)): waitMs = 1000 * 10 this.loopCond.Broadcast() } } } func (this *app) schedule() { if !this.hasTask() { this.chGoLoopWait <- GetTimer().GetIdleTime() this.loopCond.L.Lock() this.loopCond.Wait() this.loopCond.L.Unlock() } } func (this *app) NotifyLoopCond() { this.loopCond.Broadcast() } func (this *app) GetNowSeconds() int64 { return this.nowUnixNano / int64(time.Second) } func (this *app) GetNowMillis() int64 { return this.nowUnixNano / int64(time.Millisecond) } func (this *app) GetNowNano() int64 { return this.nowUnixNano } func (this *app) GetRealSeconds() int64 { return time.Now().UnixNano() / int64(time.Second) } func (this *app) GetRealMills() int64 { return time.Now().UnixNano() / int64(time.Millisecond) } func (this *app) GetTimeOffset() int32 { return this.timeOffset } func (this *app) SetTimeOffset(offset int32) { this.timeOffset = offset this.location = time.FixedZone("UTC-f5", int(offset)) if this.location == nil { panic("SetTimeOffset error") } } func (this *app) GetLocation() *time.Location { return this.location } func (this *app) outputRuningLog() { } func (this *app) dispatchIMMsg() { this.imMsgMutex.Lock() this.imWorkNode = this.imTopNode this.imTopNode = nil this.imBotNode = nil this.imMsgMutex.Unlock() for this.imWorkNode != nil { currNode := this.imWorkNode this.imWorkNode = this.imWorkNode.next if this.imMsgHandlers[currNode.msgId] != nil { this.imMsgHandlers[currNode.msgId](currNode.params) } } } func (this *app) installTimer() { GetTimer().SetInterval(1000*60, func(ev int32, params *q5.Args) { if ev == q5.TIMER_EXEC_EVENT { GetSysLog().Info("max_run_delay:%d max_schedule_time:%d", _app.maxRunDelay, _app.maxScheduleTime) _app.maxRunDelay = 0 _app.maxScheduleTime = 0 } }) } func (this *app) GetGinEngine() *gin.Engine { return this.ginEngine } func (this *app) RegisterOrmDb(name string, host string, port int32, user string, passwd string, dataBase string) { this.ormDbLock.Lock() defer this.ormDbLock.Unlock() dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8&parseTime=True&loc=Local", user, passwd, host, port, dataBase) db, err := gorm.Open(mysql.New(mysql.Config{ DSN: dsn, }), &gorm.Config{}) if err != nil { panic("") } this.ormDbHash[name] = db } func (this *app) GetOrmDb(name string) *gorm.DB { this.ormDbLock.Lock() defer this.ormDbLock.Unlock() if db, ok := this.ormDbHash[name]; ok { return db } else { return nil } } func (this *app) RegisterCaHandle(c string, a string, handle GinHandlerFunc) { handleName := c + "$" + a this.caHandlers.Store(handleName, handle) } func (this *app) getCaHandle(handleName string) GinHandlerFunc { if p, ok := this.caHandlers.Load(handleName); ok { return p.(GinHandlerFunc) } else { return nil } } func (this *app) GetPid() int { return this.pid } func (this *app) forcePendingAsyncTask(key string) *q5.ListHead { if l, ok := this.pendingAsyncTask[key]; ok { return l } else { l := q5.NewListHead() this.pendingAsyncTask[key] = l return l } } func (this *app) isFirstAsyncTask(key string, entry *q5.ListHead) bool { if l, ok := this.pendingAsyncTask[key]; ok { return l.IsFirst(entry) } else { panic("app.isFirstAsyncTask key not exists:" + key) } } func (this *app) notifyNextTask(key string) { if l, ok := this.pendingAsyncTask[key]; ok { if !l.Empty() { task := l.FirstEntry().(*LockAsyncTask) task.checkDo() } } } func (this *app) clearEmptyPendingAsyncTask(key string) { deletedKeys := make(map[string]int32) if l, ok := this.pendingAsyncTask[key]; ok { if l.Empty() { deletedKeys[key] = 1 } } for k, _ := range(deletedKeys) { delete(this.pendingAsyncTask, k) } } func (this *app) hasTask() bool { hasTask := false { this.imMsgMutex.Lock() hasTask = this.imTopNode != nil || this.imWorkNode != nil this.imMsgMutex.Unlock() } if !hasTask { hasTask = this.userApp.HasTask() } return hasTask } func parseArgs() (int, int) { args := os.Args[1:] if len(args) <= 0 { return 0, 0 } var nodeId, instanceId int for i := 0; i < len(args); i++ { arg := args[i] if strings.HasPrefix(arg, "-n") { if len(arg) > 2 { fmt.Sscanf(arg[2:], "%d", &nodeId) } else if i+1 < len(args) { fmt.Sscanf(args[i+1], "%d", &nodeId) i++ } } else if strings.HasPrefix(arg, "-i") { if len(arg) > 2 { fmt.Sscanf(arg[2:], "%d", &instanceId) } else if i+1 < len(args) { fmt.Sscanf(args[i+1], "%d", &instanceId) i++ } } } return nodeId, instanceId }