471 lines
10 KiB
Go
471 lines
10 KiB
Go
package f5
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"q5"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"net/http"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"gorm.io/driver/mysql"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type App interface {
|
|
GetPkgName() string
|
|
NewGlobalUuid() string
|
|
NewNodeUuid() int64
|
|
GetInstanceId() int32
|
|
GetNodeId() int32
|
|
GetZoneId() int32
|
|
HasFlag(int32) bool
|
|
AddIMMsg(uint16, q5.Args)
|
|
RegisterIMMsgHandle(uint16, func(q5.Args))
|
|
NotifyLoopCond()
|
|
GetNowSeconds() int64
|
|
GetNowMillis() int64
|
|
GetNowNano() int64
|
|
GetTimeOffset() int32
|
|
SetTimeOffset(int32)
|
|
GetLocation() *time.Location
|
|
RegisterMainThreadCb(func())
|
|
GetGinEngine() *gin.Engine
|
|
RegisterOrmDb(name string, host string, port int32,
|
|
user string, passwd string, dataBase string)
|
|
GetOrmDb(string) *gorm.DB
|
|
RegisterCaHandle(c string, a string, handle GinHandlerFunc)
|
|
GetPid() int
|
|
}
|
|
|
|
type UserApp interface {
|
|
GetPkgName() string
|
|
HasTask() bool
|
|
Init()
|
|
Update()
|
|
UnInit()
|
|
GetHttpListenPort() int32
|
|
}
|
|
|
|
type app struct {
|
|
pid int
|
|
zoneId int32
|
|
nodeId int32
|
|
instanceId int32
|
|
terminated bool
|
|
flags map[int32]int32
|
|
servicing bool
|
|
contextHash map[int32]q5.Args
|
|
loopCond *sync.Cond
|
|
imTopNode *IMMsgNode
|
|
imBotNode *IMMsgNode
|
|
imWorkNode *IMMsgNode
|
|
imMsgMutex sync.RWMutex
|
|
chGoLoopTimerExit chan int
|
|
chGoLoopWait chan int64
|
|
nowTime time.Time
|
|
nowUnixNano int64
|
|
timeOffset int32
|
|
location *time.Location
|
|
imMsgHandlers [1024]func(q5.Args)
|
|
maxRunDelay int64
|
|
maxScheduleTime int64
|
|
uuid q5.Uuid
|
|
userApp UserApp
|
|
ginEngine *gin.Engine
|
|
ormDbLock sync.Mutex
|
|
ormDbHash map[string]*gorm.DB
|
|
caHandlersMutex sync.RWMutex
|
|
caHandlers map[string]GinHandlerFunc
|
|
pendingAsyncTask map[string]*q5.ListHead
|
|
}
|
|
|
|
func (this *app) init(userApp UserApp) {
|
|
this.userApp = userApp
|
|
this.nowTime = time.Now()
|
|
this.pid = os.Getpid()
|
|
this.pendingAsyncTask = make(map[string]*q5.ListHead)
|
|
this.ormDbHash = make(map[string]*gorm.DB)
|
|
this.SetTimeOffset(0)
|
|
atomic.StoreInt64(&this.nowUnixNano, this.nowTime.UnixNano())
|
|
_timer = new(timer)
|
|
_timer.init()
|
|
_sysLog = new(sysLog)
|
|
_sysLog.init()
|
|
_tgLog = new(tgLog)
|
|
_tgLog.init()
|
|
_dbFilter = new(dbFilter)
|
|
_dbFilter.init()
|
|
_goStyleDb = new(dbPool)
|
|
_goStyleDb.init(GO_STYLE_DB)
|
|
_jsStyleDb = new(dbPool)
|
|
_jsStyleDb.init(JS_STYLE_DB)
|
|
_httpCliMgr = new(httpCliMgr)
|
|
_httpCliMgr.init()
|
|
{
|
|
tmpNodeId, tmpInstanceId := parseArgs()
|
|
this.nodeId = int32(tmpNodeId)
|
|
this.instanceId = int32(tmpInstanceId)
|
|
}
|
|
this.uuid.SetMachineId((this.nodeId-1)*MAX_NODE_ID + this.instanceId)
|
|
this.loopCond = sync.NewCond(new(sync.Mutex))
|
|
this.chGoLoopTimerExit = make(chan int)
|
|
this.chGoLoopWait = make(chan int64)
|
|
_app.RegisterIMMsgHandle(
|
|
IM_EXEC_MAIN_THREAD_CB,
|
|
func(args q5.Args) {
|
|
cb := args[0].(func())
|
|
cb()
|
|
})
|
|
this.outputRuningLog()
|
|
GetSysLog().Info("node_id:%d instance_id:%d pid:%d",
|
|
this.nodeId,
|
|
this.instanceId,
|
|
this.pid)
|
|
this.installTimer()
|
|
go this.goLoopTimer()
|
|
this.ginEngine = gin.New()
|
|
this.ginEngine.Use(gin.Recovery())
|
|
this.caHandlers = make(map[string]GinHandlerFunc)
|
|
this.userApp.Init()
|
|
go this.ginEngine.Run(fmt.Sprintf(":%d", this.userApp.GetHttpListenPort()))
|
|
this.RegisterCaHandle("Ops", "selfChecking", func (c *gin.Context) {
|
|
var msg struct {
|
|
ErrCode int32 `json:"errcode"`
|
|
ErrMsg string `json:"errmsg"`
|
|
}
|
|
msg.ErrCode = 0
|
|
msg.ErrMsg = ""
|
|
c.JSON(http.StatusOK, msg)
|
|
})
|
|
this.ginEngine.GET("/webapp/index.php", func (c *gin.Context) {
|
|
handleName := c.DefaultQuery("c", "") + "$" + c.DefaultQuery("a", "")
|
|
handle := this.getCaHandle(handleName)
|
|
if handle == nil {
|
|
var msg struct {
|
|
ErrCode int32 `json:"errcode"`
|
|
ErrMsg string `json:"errmsg"`
|
|
}
|
|
msg.ErrCode = 404
|
|
msg.ErrMsg = "not fond"
|
|
c.JSON(http.StatusOK, msg)
|
|
return
|
|
}
|
|
handle(c)
|
|
})
|
|
}
|
|
|
|
func (this *app) unInit() {
|
|
this.chGoLoopTimerExit <- 1
|
|
_httpCliMgr.unInit()
|
|
_goStyleDb.unInit()
|
|
_jsStyleDb.unInit()
|
|
_dbFilter.unInit()
|
|
_timer.unInit()
|
|
_timer = nil
|
|
this.userApp.UnInit()
|
|
}
|
|
|
|
func (this *app) run() {
|
|
for !this.terminated {
|
|
this.nowTime = time.Now()
|
|
atomic.StoreInt64(&this.nowUnixNano, this.nowTime.UnixNano())
|
|
|
|
beginTick := q5.GetTickCount()
|
|
_timer.update()
|
|
this.dispatchIMMsg()
|
|
this.userApp.Update()
|
|
endTick := q5.GetTickCount()
|
|
if this.maxRunDelay < endTick-beginTick {
|
|
this.maxRunDelay = endTick - beginTick
|
|
}
|
|
|
|
this.schedule()
|
|
}
|
|
}
|
|
|
|
func (this *app) NewGlobalUuid() string {
|
|
return q5.ToString(this.uuid.Generate())
|
|
}
|
|
|
|
func (this *app) NewNodeUuid() int64 {
|
|
return this.uuid.Generate()
|
|
}
|
|
|
|
func (this *app) GetInstanceId() int32 {
|
|
return this.instanceId
|
|
}
|
|
|
|
func (this *app) GetNodeId() int32 {
|
|
return this.nodeId
|
|
}
|
|
|
|
func (this *app) GetZoneId() int32 {
|
|
return this.nodeId
|
|
}
|
|
|
|
func (this *app) GetPkgName() string {
|
|
return this.userApp.GetPkgName()
|
|
}
|
|
|
|
func (this *app) HasFlag(flag int32) bool {
|
|
_, ok := this.flags[flag]
|
|
return ok
|
|
}
|
|
|
|
func (this *app) AddIMMsg(msgId uint16, params q5.Args) {
|
|
p := new(IMMsgNode)
|
|
p.msgId = msgId
|
|
p.params = params
|
|
|
|
this.imMsgMutex.Lock()
|
|
if this.imBotNode != nil {
|
|
this.imBotNode.next = p
|
|
this.imBotNode = p
|
|
} else {
|
|
this.imTopNode = p
|
|
this.imBotNode = p
|
|
}
|
|
this.imMsgMutex.Unlock()
|
|
|
|
this.loopCond.Broadcast()
|
|
}
|
|
|
|
func (this *app) RegisterIMMsgHandle(msgId uint16, handle func(q5.Args)) {
|
|
this.imMsgHandlers[msgId] = handle
|
|
}
|
|
|
|
func (this *app) RegisterMainThreadCb(cb func()) {
|
|
_app.AddIMMsg(IM_EXEC_MAIN_THREAD_CB,
|
|
[]interface{}{
|
|
cb,
|
|
})
|
|
}
|
|
|
|
func (this *app) goLoopTimer() {
|
|
var waitMs int64 = 1000 * 10
|
|
for {
|
|
select {
|
|
case <-this.chGoLoopTimerExit:
|
|
return
|
|
case waitMs = <-this.chGoLoopWait:
|
|
if waitMs < 1 {
|
|
waitMs = 1
|
|
}
|
|
case <-time.After(time.Millisecond * time.Duration(waitMs)):
|
|
waitMs = 1000 * 10
|
|
this.loopCond.Broadcast()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (this *app) schedule() {
|
|
if !this.hasTask() {
|
|
this.chGoLoopWait <- GetTimer().GetIdleTime()
|
|
this.loopCond.L.Lock()
|
|
this.loopCond.Wait()
|
|
this.loopCond.L.Unlock()
|
|
}
|
|
}
|
|
|
|
func (this *app) NotifyLoopCond() {
|
|
this.loopCond.Broadcast()
|
|
}
|
|
|
|
func (this *app) GetNowSeconds() int64 {
|
|
return this.nowUnixNano / int64(time.Second)
|
|
}
|
|
|
|
func (this *app) GetNowMillis() int64 {
|
|
return this.nowUnixNano / int64(time.Millisecond)
|
|
}
|
|
|
|
func (this *app) GetNowNano() int64 {
|
|
return this.nowUnixNano
|
|
}
|
|
|
|
func (this *app) GetTimeOffset() int32 {
|
|
return this.timeOffset
|
|
}
|
|
|
|
func (this *app) SetTimeOffset(offset int32) {
|
|
this.timeOffset = offset
|
|
this.location = time.FixedZone("UTC-f5", int(offset))
|
|
if this.location == nil {
|
|
panic("SetTimeOffset error")
|
|
}
|
|
}
|
|
|
|
func (this *app) GetLocation() *time.Location {
|
|
return this.location
|
|
}
|
|
|
|
func (this *app) outputRuningLog() {
|
|
}
|
|
|
|
func (this *app) dispatchIMMsg() {
|
|
this.imMsgMutex.Lock()
|
|
this.imWorkNode = this.imTopNode
|
|
this.imTopNode = nil
|
|
this.imBotNode = nil
|
|
this.imMsgMutex.Unlock()
|
|
|
|
for this.imWorkNode != nil {
|
|
currNode := this.imWorkNode
|
|
this.imWorkNode = this.imWorkNode.next
|
|
if this.imMsgHandlers[currNode.msgId] != nil {
|
|
this.imMsgHandlers[currNode.msgId](currNode.params)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (this *app) installTimer() {
|
|
GetTimer().SetInterval(1000*60,
|
|
func(ev int32, params *q5.Args) {
|
|
if ev == q5.TIMER_EXEC_EVENT {
|
|
GetSysLog().Info("max_run_delay:%d max_schedule_time:%d",
|
|
_app.maxRunDelay,
|
|
_app.maxScheduleTime)
|
|
_app.maxRunDelay = 0
|
|
_app.maxScheduleTime = 0
|
|
}
|
|
})
|
|
}
|
|
|
|
func (this *app) GetGinEngine() *gin.Engine {
|
|
return this.ginEngine
|
|
}
|
|
|
|
func (this *app) RegisterOrmDb(name string, host string, port int32,
|
|
user string, passwd string, dataBase string) {
|
|
this.ormDbLock.Lock()
|
|
defer this.ormDbLock.Unlock()
|
|
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8&parseTime=True&loc=Local",
|
|
user,
|
|
passwd,
|
|
host,
|
|
port,
|
|
dataBase)
|
|
db, err := gorm.Open(mysql.New(mysql.Config{
|
|
DSN: dsn,
|
|
}), &gorm.Config{})
|
|
if err != nil {
|
|
panic("")
|
|
}
|
|
this.ormDbHash[name] = db
|
|
}
|
|
|
|
func (this *app) GetOrmDb(name string) *gorm.DB {
|
|
this.ormDbLock.Lock()
|
|
defer this.ormDbLock.Unlock()
|
|
if db, ok := this.ormDbHash[name]; ok {
|
|
return db
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (this *app) RegisterCaHandle(c string, a string, handle GinHandlerFunc) {
|
|
this.caHandlersMutex.Lock()
|
|
defer this.caHandlersMutex.Unlock()
|
|
handleName := c + "$" + a
|
|
this.caHandlers[handleName] = handle
|
|
}
|
|
|
|
func (this *app) getCaHandle(handleName string) GinHandlerFunc {
|
|
this.caHandlersMutex.Lock()
|
|
defer this.caHandlersMutex.Unlock()
|
|
if handle, ok := this.caHandlers[handleName]; ok {
|
|
return handle
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (this *app) GetPid() int {
|
|
return this.pid
|
|
}
|
|
|
|
func (this *app) forcePendingAsyncTask(key string) *q5.ListHead {
|
|
if l, ok := this.pendingAsyncTask[key]; ok {
|
|
return l
|
|
} else {
|
|
l := q5.NewListHead()
|
|
this.pendingAsyncTask[key] = l
|
|
return l
|
|
}
|
|
}
|
|
|
|
func (this *app) isFirstAsyncTask(key string, entry *q5.ListHead) bool {
|
|
if l, ok := this.pendingAsyncTask[key]; ok {
|
|
return l.IsFirst(entry)
|
|
} else {
|
|
panic("app.isFirstAsyncTask key not exists:" + key)
|
|
}
|
|
}
|
|
|
|
func (this *app) notifyNextTask(key string) {
|
|
if l, ok := this.pendingAsyncTask[key]; ok {
|
|
if !l.Empty() {
|
|
task := l.FirstEntry().(*LockAsyncTask)
|
|
task.checkDo()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (this *app) clearEmptyPendingAsyncTask(key string) {
|
|
deletedKeys := make(map[string]int32)
|
|
if l, ok := this.pendingAsyncTask[key]; ok {
|
|
if l.Empty() {
|
|
deletedKeys[key] = 1
|
|
}
|
|
}
|
|
for k, _ := range(deletedKeys) {
|
|
delete(this.pendingAsyncTask, k)
|
|
}
|
|
}
|
|
|
|
func (this *app) hasTask() bool {
|
|
hasTask := false
|
|
{
|
|
this.imMsgMutex.Lock()
|
|
hasTask = this.imTopNode != nil || this.imWorkNode != nil
|
|
this.imMsgMutex.Unlock()
|
|
}
|
|
if !hasTask {
|
|
hasTask = this.userApp.HasTask()
|
|
}
|
|
return hasTask
|
|
}
|
|
|
|
func parseArgs() (int, int) {
|
|
args := os.Args[1:]
|
|
if len(args) <= 0 {
|
|
return 0, 0
|
|
}
|
|
var nodeId, instanceId int
|
|
for i := 0; i < len(args); i++ {
|
|
arg := args[i]
|
|
if strings.HasPrefix(arg, "-n") {
|
|
if len(arg) > 2 {
|
|
fmt.Sscanf(arg[2:], "%d", &nodeId)
|
|
} else if i+1 < len(args) {
|
|
fmt.Sscanf(args[i+1], "%d", &nodeId)
|
|
i++
|
|
}
|
|
} else if strings.HasPrefix(arg, "-i") {
|
|
if len(arg) > 2 {
|
|
fmt.Sscanf(arg[2:], "%d", &instanceId)
|
|
} else if i+1 < len(args) {
|
|
fmt.Sscanf(args[i+1], "%d", &instanceId)
|
|
i++
|
|
}
|
|
}
|
|
}
|
|
|
|
return nodeId, instanceId
|
|
}
|