diff --git a/server/mqproxy/listener/udplistener.go b/server/mqproxy/listener/udplistener.go index 15508200..182c7087 100644 --- a/server/mqproxy/listener/udplistener.go +++ b/server/mqproxy/listener/udplistener.go @@ -51,6 +51,7 @@ func (this *UDPListener) serverRead() { Req string `json:"req"` Topic string `json:"topic"` Channel string `json:"channel"` + Notice string `json:"noticeurl"` Expire int32 `json:"expire"` Msg string `json:"msg"` }{} @@ -69,7 +70,7 @@ func (this *UDPListener) serverRead() { } case "sub": if obj.Topic != "" && obj.Channel != "" { - service.MqManager.SubscribeChannel(obj.Topic, obj.Channel) + service.MqManager.SubscribeChannel(obj.Topic, obj.Channel, obj.Notice) retcode = 0 } case "unsub": diff --git a/server/mqproxy/middleware/mq.go b/server/mqproxy/middleware/mq.go new file mode 100644 index 00000000..7a739cc8 --- /dev/null +++ b/server/mqproxy/middleware/mq.go @@ -0,0 +1,36 @@ +package middleware + +import ( + "f5" + "main/service" + "net/http" + + "github.com/gin-gonic/gin" +) + +func Subscribe(c *gin.Context) { + topic := c.DefaultQuery("topic", "") + channel := c.DefaultQuery("channel", "") + noticeurl := c.DefaultQuery("noticeurl", "") + + service.MqManager.SubscribeChannel(topic, channel, noticeurl) + f5.GetSysLog().Debug("subscribe:%s, %s, %s", topic, channel, noticeurl) + c.String(http.StatusOK, "") +} + +func Unsubscribe(c *gin.Context) { + topic := c.DefaultQuery("topic", "") + channel := c.DefaultQuery("channel", "") + + service.MqManager.UnsubscribeChannel(topic, channel) + c.String(http.StatusOK, "") +} + +func Consume(c *gin.Context) { + topic := c.DefaultQuery("topic", "") + channel := c.DefaultQuery("channel", "") + + msg := service.MqManager.ConsumeTopic(topic, channel) + f5.GetSysLog().Debug("consume:%s, %s, %s", topic, channel, msg) + c.String(http.StatusOK, msg) +} diff --git a/server/mqproxy/mt/Config.go b/server/mqproxy/mt/Config.go index 0cf8b7f6..8e378b46 100644 --- a/server/mqproxy/mt/Config.go +++ b/server/mqproxy/mt/Config.go @@ -51,6 +51,6 @@ func (this *ConfigTable) GetMaxConcurrentNum() int32 { func (this *ConfigTable) PostInit1() { this.selfConf = this.GetById(int64(0)) if this.selfConf == nil { - panic("gamesapi config无法读取本服配置") + panic("mqproxy config无法读取本服配置") } } diff --git a/server/mqproxy/router/routermgr.go b/server/mqproxy/router/routermgr.go index 7ab29e0f..418df795 100644 --- a/server/mqproxy/router/routermgr.go +++ b/server/mqproxy/router/routermgr.go @@ -15,6 +15,11 @@ func (this *routerMgr) Init() { redirectGroup := f5.GetApp().GetGinEngine().Group("/sapi") redirectGroup.Any("webapp/index.php", middleware.CaForward) + mqGroup := f5.GetApp().GetGinEngine().Group("/mqproxy") + mqGroup.Any("webapp/subscribe.php", middleware.Subscribe) + mqGroup.Any("webapp/unsubscribe.php", middleware.Unsubscribe) + mqGroup.Any("webapp/consume.php", middleware.Consume) + f5.GetSysLog().Info("routerMgr.init") } diff --git a/server/mqproxy/service/mqmgr.go b/server/mqproxy/service/mqmgr.go index 31ca600d..1834875d 100644 --- a/server/mqproxy/service/mqmgr.go +++ b/server/mqproxy/service/mqmgr.go @@ -19,6 +19,7 @@ type mqManager struct { type topicCache struct { lock *sync.Mutex channelMap q5.ConcurrentMap[string, int64] + noticeMap q5.ConcurrentMap[string, string] msgList []*msgItem } @@ -40,7 +41,7 @@ type msgItem struct { func (this *mqManager) init() { this.topicMap = q5.ConcurrentMap[string, *topicCache]{} this.lock = new(sync.Mutex) - go this.outputMonitorLog() + go this.asyncTasks() } func (this *mqManager) unInit() { @@ -61,6 +62,7 @@ func (this *mqManager) PublishTopic(topic, message string, life int32) { tc := new(topicCache) tc.lock = new(sync.Mutex) tc.channelMap = q5.ConcurrentMap[string, int64]{} + tc.noticeMap = q5.ConcurrentMap[string, string]{} tc.msgList = make([]*msgItem, 0) this.topicMap.Store(topic, tc) t = &tc @@ -76,6 +78,19 @@ func (this *mqManager) PublishTopic(topic, message string, life int32) { return true }) } + + (*t).noticeMap.Range(func(channel, noticeurl string) bool { + f5.GetHttpCliMgr().SendGoStylePost( + noticeurl, + map[string]string{}, + "", + message, + func(rsp f5.HttpCliResponse) { + + }) + return true + }) + this.Unlock() (*t).Lock() @@ -84,7 +99,7 @@ func (this *mqManager) PublishTopic(topic, message string, life int32) { this.IncPublishTimes() } -func (this *mqManager) SubscribeChannel(topic, channel string) { +func (this *mqManager) SubscribeChannel(topic, channel, noticeurl string) { this.Lock() t, exist := this.topicMap.Load(topic) if !exist { @@ -95,6 +110,9 @@ func (this *mqManager) SubscribeChannel(topic, channel string) { t = &tc } (*t).channelMap.Store(channel, 0) + if noticeurl != "" { + (*t).noticeMap.Store(channel, noticeurl) + } this.Unlock() } @@ -103,6 +121,7 @@ func (this *mqManager) UnsubscribeChannel(topic, channel string) { t, exist := this.topicMap.Load(topic) if exist { (*t).channelMap.Delete(channel) + (*t).noticeMap.Delete(channel) } this.Unlock() } @@ -163,7 +182,7 @@ func (this *mqManager) IncExpireTimes() { } -func (this *mqManager) outputMonitorLog() { +func (this *mqManager) asyncTasks() { logtimes := 0 for { f5.GetSysLog().Info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")