From 17511d90ba516d0047f360c2b6ca85e60af48652 Mon Sep 17 00:00:00 2001 From: yangduo Date: Thu, 12 Sep 2024 11:05:19 +0800 Subject: [PATCH] udp --- server/mqproxy/app/app.go | 2 +- server/mqproxy/constant/constant.go | 1 + server/mqproxy/global/global.go | 7 + server/mqproxy/initialize/enter.go | 1 + server/mqproxy/listener/export.go | 12 ++ server/mqproxy/listener/udplistener.go | 101 +++++++++++ server/mqproxy/middleware/caforward.go | 232 ++++++++++++------------- server/mqproxy/mt/GamesapiCluster.go | 34 ---- server/mqproxy/mt/export.go | 8 +- server/mqproxy/mt/mqproxyCluster.go | 34 ++++ server/mqproxy/mtb/mtb.auto_gen.go | 16 +- server/mqproxy/proto/mt.proto | 2 +- server/mqproxy/service/export.go | 2 +- server/mqproxy/service/mqmgr.go | 147 ++++++++++++++++ server/mqproxy/service/servicemgr.go | 8 +- 15 files changed, 431 insertions(+), 176 deletions(-) create mode 100644 server/mqproxy/listener/export.go create mode 100644 server/mqproxy/listener/udplistener.go delete mode 100644 server/mqproxy/mt/GamesapiCluster.go create mode 100644 server/mqproxy/mt/mqproxyCluster.go create mode 100644 server/mqproxy/service/mqmgr.go diff --git a/server/mqproxy/app/app.go b/server/mqproxy/app/app.go index 79eb7305..a979d19b 100644 --- a/server/mqproxy/app/app.go +++ b/server/mqproxy/app/app.go @@ -16,7 +16,7 @@ func (this *app) GetPkgName() string { } func (this *app) GetHttpListenPort() int32 { - return mt.Table.GamesapiCluster.GetHttpListenPort() + return mt.Table.MqproxyCluster.GetHttpListenPort() } func (this *app) Run(initCb func(), unInitCb func()) { diff --git a/server/mqproxy/constant/constant.go b/server/mqproxy/constant/constant.go index e281935b..35464470 100644 --- a/server/mqproxy/constant/constant.go +++ b/server/mqproxy/constant/constant.go @@ -9,5 +9,6 @@ const ( ROUTER_MODULE_IDX CONTROLLER_MGR_MODULE_IDX SERVICE_MGR_MODULE_IDX + LISTENER_MODULE_IDX MAX_MODULE_IDX ) diff --git a/server/mqproxy/global/global.go b/server/mqproxy/global/global.go index 8fe71005..89e5bed6 100644 --- a/server/mqproxy/global/global.go +++ b/server/mqproxy/global/global.go @@ -11,6 +11,7 @@ var modules [constant.MAX_MODULE_IDX]q5.Module var initOrders = []int32{ constant.SERVICE_MGR_MODULE_IDX, constant.ROUTER_MODULE_IDX, + constant.LISTENER_MODULE_IDX, } var app common.App @@ -37,6 +38,12 @@ func RegModule(idx int32, m q5.Module) { serviceMgr = m.(common.ServiceMgr) } case constant.ROUTER_MODULE_IDX: + { + + } + case constant.LISTENER_MODULE_IDX: + { + } default: { panic("unknow module") diff --git a/server/mqproxy/initialize/enter.go b/server/mqproxy/initialize/enter.go index 04ac6528..7eab695a 100644 --- a/server/mqproxy/initialize/enter.go +++ b/server/mqproxy/initialize/enter.go @@ -4,6 +4,7 @@ import ( _ "main/app" _ "main/controller" . "main/global" + _ "main/listener" _ "main/router" _ "main/service" ) diff --git a/server/mqproxy/listener/export.go b/server/mqproxy/listener/export.go new file mode 100644 index 00000000..d6695555 --- /dev/null +++ b/server/mqproxy/listener/export.go @@ -0,0 +1,12 @@ +package listener + +import ( + "main/constant" + "main/global" +) + +var _udpListener = new(UDPListener) + +func init() { + global.RegModule(constant.LISTENER_MODULE_IDX, _udpListener) +} diff --git a/server/mqproxy/listener/udplistener.go b/server/mqproxy/listener/udplistener.go new file mode 100644 index 00000000..7dd6488d --- /dev/null +++ b/server/mqproxy/listener/udplistener.go @@ -0,0 +1,101 @@ +package listener + +import ( + "encoding/json" + "fmt" + "main/service" + "net" + + "jccommon" + "main/mt" + "q5" + // . "main/global" +) + +type UDPListener struct { + // ss.MsgHandlerImpl + udpconn *net.UDPConn + ch chan bool +} + +func (this *UDPListener) Init() { + this.ch = make(chan bool, 64) + udpAddr, _ := net.ResolveUDPAddr("udp", "0.0.0.0:"+ + q5.ToString(mt.Table.MqproxyCluster.GetListenPort())) + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + panic(err) + } else { + this.udpconn = conn + // go this.serverRead() + go this.serverRead2() + } +} + +func (this *UDPListener) UnInit() { + if this.udpconn != nil { + this.udpconn.Close() + } + + close(this.ch) +} + +var UDPPACKHEAD = 2 + +func (this *UDPListener) serverRead() { + for { + this.ch <- true + go func(conn *net.UDPConn, ch chan bool) { + buf := make([]byte, jccommon.MAX_PACKET_LEN+UDPPACKHEAD) + bufLen, _, err := this.udpconn.ReadFrom(buf) + if err == nil && bufLen > UDPPACKHEAD { + dataLen := q5.MkUInt16(buf[0], buf[1]) + if bufLen >= int(UDPPACKHEAD)+int(dataLen) { + obj := struct { + Topic string `json:"topic"` + Expire int32 `json:"expire"` + Msg string `json:"msg"` + }{} + data := buf[UDPPACKHEAD : UDPPACKHEAD+int(dataLen)] + if json.Unmarshal(data, &obj) == nil { + expire := obj.Expire + if expire > 86400*7 { + expire = 86400 * 7 + } + service.MqManager.PublishTopic(obj.Topic, obj.Msg, expire) + } + } + } + <-ch + }(this.udpconn, this.ch) + } +} + +func (this *UDPListener) serverRead2() { + for { + buf := make([]byte, jccommon.MAX_PACKET_LEN) + bufLen, _, err := this.udpconn.ReadFrom(buf) + if err == nil && bufLen > 0 { + this.ch <- true + recvdata:=buf[:bufLen] + go func(data []byte, ch chan bool) { + obj := struct { + Topic string `json:"topic"` + Expire int32 `json:"expire"` + Msg string `json:"msg"` + }{} + if json.Unmarshal(data, &obj) == nil { + expire := obj.Expire + if expire > 86400*7 { + expire = 86400 * 7 + } + service.MqManager.PublishTopic(obj.Topic, obj.Msg, expire) + } else { + fmt.Println("bad message") + } + + <-ch + }(recvdata, this.ch) + } + } +} diff --git a/server/mqproxy/middleware/caforward.go b/server/mqproxy/middleware/caforward.go index 72817d5c..31eb2a2e 100644 --- a/server/mqproxy/middleware/caforward.go +++ b/server/mqproxy/middleware/caforward.go @@ -1,134 +1,122 @@ package middleware import ( - "bytes" - "errors" - "f5" - "io/ioutil" - "jccommon" - "main/service" - "net/http" - net_url "net/url" - "q5" - "strings" - "github.com/gin-gonic/gin" - "github.com/google/uuid" ) func CaForward(c *gin.Context) { - accountId := c.DefaultQuery("account_id", "") - sessionId := c.DefaultQuery("session_id", "") + // accountId := c.DefaultQuery("account_id", "") + // sessionId := c.DefaultQuery("session_id", "") - if !jccommon.IsValidSessionId(accountId, sessionId) { - f5.RspErr(c, 500, "invalid session_id") - c.Abort() - service.SApiForward.IncInvalidSessionTimes() - return - } + // if !jccommon.IsValidSessionId(accountId, sessionId) { + // f5.RspErr(c, 500, "invalid session_id") + // c.Abort() + // service.SApiForward.IncInvalidSessionTimes() + // return + // } - cLock := service.SApiForward.AcquireLock(accountId) - if cLock == nil { - f5.RspErr(c, 500, "system busy") - c.Abort() - return - } - defer service.SApiForward.ReleaseLock(cLock) - service.SApiForward.IncTotalTimes() - beginTick := q5.GetTickCount() - defer func() { - costTime := q5.GetTickCount() - beginTick - service.SApiForward.UpdateCostTime(costTime) - }() - downStreamUrl, downStreamHost := service.SApiForward.GetDownStreamHost(); - newUrl := downStreamUrl + c.Request.URL.Path[5:] - if !q5.StrContains(newUrl, "?") { - newUrl = newUrl + "?" - } - params := []*[]string{} - nonce := uuid.New().String() - nowTime := f5.GetApp().GetRealSeconds() - u := net_url.Values{} - { - for k, v := range c.Request.URL.Query() { - u.Set(k, v[0]) - q5.AppendSlice(¶ms, &[]string{k, v[0]}) - } - u.Set("__nonce", nonce) - u.Set("__timestamp", q5.ToString(nowTime)) - } + // cLock := service.SApiForward.AcquireLock(accountId) + // if cLock == nil { + // f5.RspErr(c, 500, "system busy") + // c.Abort() + // return + // } + // defer service.SApiForward.ReleaseLock(cLock) + // service.SApiForward.IncTotalTimes() + // beginTick := q5.GetTickCount() + // defer func() { + // costTime := q5.GetTickCount() - beginTick + // service.SApiForward.UpdateCostTime(costTime) + // }() + // downStreamUrl, downStreamHost := service.SApiForward.GetDownStreamHost(); + // newUrl := downStreamUrl + c.Request.URL.Path[5:] + // if !q5.StrContains(newUrl, "?") { + // newUrl = newUrl + "?" + // } + // params := []*[]string{} + // nonce := uuid.New().String() + // nowTime := f5.GetApp().GetRealSeconds() + // u := net_url.Values{} + // { + // for k, v := range c.Request.URL.Query() { + // u.Set(k, v[0]) + // q5.AppendSlice(¶ms, &[]string{k, v[0]}) + // } + // u.Set("__nonce", nonce) + // u.Set("__timestamp", q5.ToString(nowTime)) + // } - var httpRequest *http.Request - var createErr error - switch strings.ToUpper(c.Request.Method) { - case "GET": - { - service.SApiForward.IncGetTimes() - u.Set("__sign", service.SApiForward.Sign(params, nonce, nowTime, "")) - newUrl += u.Encode() - httpRequest, createErr = http.NewRequest("GET", newUrl, nil) - if !f5.IsOnlineEnv() { - f5.GetSysLog().Info("CaForward method:%s newUrl:%s ", c.Request.Method, newUrl) - } - } - case "POST": - { - service.SApiForward.IncPostTimes() - if postData, err := c.GetRawData(); err == nil { - u.Set("__sign", service.SApiForward.Sign(params, nonce, nowTime, string(postData))) - newUrl += u.Encode() - httpRequest, createErr = http.NewRequest("POST", newUrl, bytes.NewBuffer(postData)) - contentType := c.GetHeader("Content-Type") - if contentType != "" { - httpRequest.Header.Set("Content-Type", contentType) - } - if !f5.IsOnlineEnv() { - f5.GetSysLog().Info("CaForward method:%s newUrl:%s Content-Type:%s postData:%s", - c.Request.Method, - newUrl, - contentType, - postData) - } - } else { - createErr = err - } - } - default: - { - createErr = errors.New("method error") - } - } - if createErr != nil { - service.SApiForward.IncCreateErrTimes() - f5.RspErr(c, 500, "create request error") - c.Abort() - f5.GetSysLog().Info("CaForward create request url:%s error:%s", newUrl, createErr) - return - } - if downStreamHost != "" { - httpRequest.Host = downStreamHost - } - client := &http.Client{} - if resp, err := client.Do(httpRequest); err == nil { - defer resp.Body.Close() - if bytes, err := ioutil.ReadAll(resp.Body); err == nil { - service.SApiForward.IncOkTimes() - c.String(200, string(bytes)) - c.Abort() - return - } else { - service.SApiForward.IncReadRspErrTimes() - f5.RspErr(c, 500, "read response error") - c.Abort() - f5.GetSysLog().Info("CaForward read response url:%s eror:%s", newUrl, err) - return - } - } else { - service.SApiForward.IncDoErrTimes() - f5.RspErr(c, 500, "client.Do error") - c.Abort() - f5.GetSysLog().Info("CaForward client.Do url:%s error:%s", newUrl, err) - return - } + // var httpRequest *http.Request + // var createErr error + // switch strings.ToUpper(c.Request.Method) { + // case "GET": + // { + // service.SApiForward.IncGetTimes() + // u.Set("__sign", service.SApiForward.Sign(params, nonce, nowTime, "")) + // newUrl += u.Encode() + // httpRequest, createErr = http.NewRequest("GET", newUrl, nil) + // if !f5.IsOnlineEnv() { + // f5.GetSysLog().Info("CaForward method:%s newUrl:%s ", c.Request.Method, newUrl) + // } + // } + // case "POST": + // { + // service.SApiForward.IncPostTimes() + // if postData, err := c.GetRawData(); err == nil { + // u.Set("__sign", service.SApiForward.Sign(params, nonce, nowTime, string(postData))) + // newUrl += u.Encode() + // httpRequest, createErr = http.NewRequest("POST", newUrl, bytes.NewBuffer(postData)) + // contentType := c.GetHeader("Content-Type") + // if contentType != "" { + // httpRequest.Header.Set("Content-Type", contentType) + // } + // if !f5.IsOnlineEnv() { + // f5.GetSysLog().Info("CaForward method:%s newUrl:%s Content-Type:%s postData:%s", + // c.Request.Method, + // newUrl, + // contentType, + // postData) + // } + // } else { + // createErr = err + // } + // } + // default: + // { + // createErr = errors.New("method error") + // } + // } + // if createErr != nil { + // service.SApiForward.IncCreateErrTimes() + // f5.RspErr(c, 500, "create request error") + // c.Abort() + // f5.GetSysLog().Info("CaForward create request url:%s error:%s", newUrl, createErr) + // return + // } + // if downStreamHost != "" { + // httpRequest.Host = downStreamHost + // } + // client := &http.Client{} + // if resp, err := client.Do(httpRequest); err == nil { + // defer resp.Body.Close() + // if bytes, err := ioutil.ReadAll(resp.Body); err == nil { + // service.SApiForward.IncOkTimes() + // c.String(200, string(bytes)) + // c.Abort() + // return + // } else { + // service.SApiForward.IncReadRspErrTimes() + // f5.RspErr(c, 500, "read response error") + // c.Abort() + // f5.GetSysLog().Info("CaForward read response url:%s eror:%s", newUrl, err) + // return + // } + // } else { + // service.SApiForward.IncDoErrTimes() + // f5.RspErr(c, 500, "client.Do error") + // c.Abort() + // f5.GetSysLog().Info("CaForward client.Do url:%s error:%s", newUrl, err) + // return + // } } diff --git a/server/mqproxy/mt/GamesapiCluster.go b/server/mqproxy/mt/GamesapiCluster.go deleted file mode 100644 index d815dad0..00000000 --- a/server/mqproxy/mt/GamesapiCluster.go +++ /dev/null @@ -1,34 +0,0 @@ -package mt - -import ( - "f5" - "main/mtb" -) - -type GamesapiCluster struct { - mtb.GamesapiCluster -} - -type GamesapiClusterTable struct { - f5.IdMetaTable[GamesapiCluster] - selfConf *GamesapiCluster -} - -func (this *GamesapiCluster) Init1() { - -} - -func (this *GamesapiClusterTable) GetListenPort() int32 { - return this.selfConf.GetListenPort() -} - -func (this *GamesapiClusterTable) GetHttpListenPort() int32 { - return this.selfConf.GetHttpListenPort() -} - -func (this *GamesapiClusterTable) PostInit1() { - this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId())) - if this.selfConf == nil { - panic("gamesapi集群无法读取本服配置") - } -} diff --git a/server/mqproxy/mt/export.go b/server/mqproxy/mt/export.go index c23badaa..23ef80af 100644 --- a/server/mqproxy/mt/export.go +++ b/server/mqproxy/mt/export.go @@ -5,14 +5,14 @@ import ( ) type table struct { - GamesapiCluster *GamesapiClusterTable + MqproxyCluster *MqproxyClusterTable Config *ConfigTable - ConfDb *ConfDbTable + ConfDb *ConfDbTable } var Table = f5.New(func(this *table) { - this.GamesapiCluster = f5.New(func(this *GamesapiClusterTable) { - this.FileName = "../config/gamesapi.cluster.json" + this.MqproxyCluster = f5.New(func(this *MqproxyClusterTable) { + this.FileName = "../config/mqproxy.cluster.json" this.PrimKey = "instance_id" }) diff --git a/server/mqproxy/mt/mqproxyCluster.go b/server/mqproxy/mt/mqproxyCluster.go new file mode 100644 index 00000000..c96681bb --- /dev/null +++ b/server/mqproxy/mt/mqproxyCluster.go @@ -0,0 +1,34 @@ +package mt + +import ( + "f5" + "main/mtb" +) + +type MqproxyCluster struct { + mtb.MqproxyCluster +} + +type MqproxyClusterTable struct { + f5.IdMetaTable[MqproxyCluster] + selfConf *MqproxyCluster +} + +func (this *MqproxyCluster) Init1() { + +} + +func (this *MqproxyClusterTable) GetListenPort() int32 { + return this.selfConf.GetListenPort() +} + +func (this *MqproxyClusterTable) GetHttpListenPort() int32 { + return this.selfConf.GetHttpListenPort() +} + +func (this *MqproxyClusterTable) PostInit1() { + this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId())) + if this.selfConf == nil { + panic("mqproxy集群无法读取本服配置") + } +} diff --git a/server/mqproxy/mtb/mtb.auto_gen.go b/server/mqproxy/mtb/mtb.auto_gen.go index 337b9e43..a20427a2 100644 --- a/server/mqproxy/mtb/mtb.auto_gen.go +++ b/server/mqproxy/mtb/mtb.auto_gen.go @@ -4,7 +4,7 @@ import ( "f5" ) -type GamesapiCluster struct { +type MqproxyCluster struct { instance_id int32 listen_port int32 http_listen_port int32 @@ -40,27 +40,27 @@ type ConfDb struct { _flags2_ uint64 } -func (this *GamesapiCluster) GetInstanceId() int32 { +func (this *MqproxyCluster) GetInstanceId() int32 { return this.instance_id } -func (this *GamesapiCluster) HasInstanceId() bool { +func (this *MqproxyCluster) HasInstanceId() bool { return (this._flags1_ & (uint64(1) << 1)) > 0 } -func (this *GamesapiCluster) GetListenPort() int32 { +func (this *MqproxyCluster) GetListenPort() int32 { return this.listen_port } -func (this *GamesapiCluster) HasListenPort() bool { +func (this *MqproxyCluster) HasListenPort() bool { return (this._flags1_ & (uint64(1) << 2)) > 0 } -func (this *GamesapiCluster) GetHttpListenPort() int32 { +func (this *MqproxyCluster) GetHttpListenPort() int32 { return this.http_listen_port } -func (this *GamesapiCluster) HasHttpListenPort() bool { +func (this *MqproxyCluster) HasHttpListenPort() bool { return (this._flags1_ & (uint64(1) << 3)) > 0 } @@ -185,7 +185,7 @@ func (this *ConfDb) HasMaxIdleConns() bool { } -func (this *GamesapiCluster) LoadFromKv(kv map[string]interface{}) { +func (this *MqproxyCluster) LoadFromKv(kv map[string]interface{}) { f5.ReadMetaTableField(&this.instance_id, "instance_id", &this._flags1_, 1, kv) f5.ReadMetaTableField(&this.listen_port, "listen_port", &this._flags1_, 2, kv) f5.ReadMetaTableField(&this.http_listen_port, "http_listen_port", &this._flags1_, 3, kv) diff --git a/server/mqproxy/proto/mt.proto b/server/mqproxy/proto/mt.proto index 2dfd5885..69b8b502 100644 --- a/server/mqproxy/proto/mt.proto +++ b/server/mqproxy/proto/mt.proto @@ -2,7 +2,7 @@ package mt; option go_package = ".;mt"; -message GamesapiCluster +message MqproxyCluster { optional int32 instance_id = 1; optional int32 listen_port = 2; diff --git a/server/mqproxy/service/export.go b/server/mqproxy/service/export.go index bbfe94fe..9668098e 100644 --- a/server/mqproxy/service/export.go +++ b/server/mqproxy/service/export.go @@ -6,7 +6,7 @@ import ( ) var _serviceMgr = new(serviceMgr) -var SApiForward *sApiForward +var MqManager *mqManager func init() { global.RegModule(constant.SERVICE_MGR_MODULE_IDX, _serviceMgr) diff --git a/server/mqproxy/service/mqmgr.go b/server/mqproxy/service/mqmgr.go new file mode 100644 index 00000000..63876a9d --- /dev/null +++ b/server/mqproxy/service/mqmgr.go @@ -0,0 +1,147 @@ +package service + +import ( + "f5" + "q5" + "sync" + "sync/atomic" + "time" +) + +type mqManager struct { + topicMap q5.ConcurrentMap[string, *topicCache] + lock *sync.Mutex + publishcount int32 + consumecount int32 + expirecount int32 +} + +type topicCache struct { + lock *sync.Mutex + msgList []*msgItem +} + +func (this *topicCache) Lock() { + this.lock.Lock() +} + +func (this *topicCache) Unlock() { + this.lock.Unlock() +} + +type msgItem struct { + content string + addTime int64 + expire int64 +} + +func (this *mqManager) init() { + this.topicMap = q5.ConcurrentMap[string, *topicCache]{} + this.lock = new(sync.Mutex) + go this.outputMonitorLog() +} + +func (this *mqManager) unInit() { +} + +func (this *mqManager) Lock() { + this.lock.Lock() +} + +func (this *mqManager) Unlock() { + this.lock.Unlock() +} + +func (this *mqManager) PublishTopic(topic, message string, life int32) { + this.Lock() + t, exist := this.topicMap.Load(topic) + if !exist { + tc := new(topicCache) + tc.lock = new(sync.Mutex) + tc.msgList = make([]*msgItem, 0) + this.topicMap.Store(topic, tc) + t = &tc + } + this.Unlock() + + (*t).Lock() + newitem := new(msgItem) + newitem.addTime = f5.GetApp().GetRealSeconds() + newitem.content = message + newitem.expire = newitem.addTime + int64(life) + (*t).msgList = append((*t).msgList, newitem) + (*t).Unlock() + this.IncPublishTimes() +} + +func (this *mqManager) ConsumeTopic(topic string) (msg string) { + msg = "" + t, exist := this.topicMap.Load(topic) + if !exist { + return + } + + (*t).Lock() + defer (*t).Unlock() + + if len((*t).msgList) == 0 { + return + } + + msg = (*t).msgList[0].content + (*t).msgList = (*t).msgList[1:] + + this.IncConsumeTimes() + return +} + +func (this *mqManager) IncPublishTimes() { + atomic.AddInt32(&this.publishcount, 1) +} + +func (this *mqManager) IncConsumeTimes() { + atomic.AddInt32(&this.consumecount, 1) +} + +func (this *mqManager) IncExpireTimes() { + atomic.AddInt32(&this.expirecount, 1) + +} + +func (this *mqManager) outputMonitorLog() { + logtimes := 0 + for { + f5.GetSysLog().Info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") + f5.GetSysLog().Info("publishcount:%d, consumecount:%d, expirecount:%d", + this.publishcount, + this.consumecount, + this.expirecount) + f5.GetSysLog().Info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + + logtimes++ + if logtimes > 6 { + logtimes = 0 + this.publishcount = 0 + this.consumecount = 0 + this.expirecount = 0 + + this.Lock() + now := f5.GetApp().GetRealSeconds() + this.topicMap.Range(func(key string, value *topicCache) bool { + value.Lock() + defer value.Unlock() + for i := 0; i < len(value.msgList); { + if value.msgList[i].expire < now { + value.msgList = append(value.msgList[:i], value.msgList[i+1:]...) + this.IncExpireTimes() + } else { + i++ + } + } + return true + }) + this.Unlock() + } + time.Sleep(time.Second * 10) + } +} diff --git a/server/mqproxy/service/servicemgr.go b/server/mqproxy/service/servicemgr.go index acd67915..52a0bf08 100644 --- a/server/mqproxy/service/servicemgr.go +++ b/server/mqproxy/service/servicemgr.go @@ -4,12 +4,10 @@ type serviceMgr struct { } func (this *serviceMgr) Init() { - SApiForward = new(sApiForward) - SApiForward.init() - - go SApiForward.outputMonitorLog() + MqManager = new(mqManager) + MqManager.init() } func (this *serviceMgr) UnInit() { - SApiForward.unInit() + MqManager.unInit() }