http sub unsub consume

This commit is contained in:
yangduo 2024-09-18 16:54:02 +08:00
parent 4d428b8e7d
commit c278174a95
5 changed files with 66 additions and 5 deletions

View File

@ -51,6 +51,7 @@ func (this *UDPListener) serverRead() {
Req string `json:"req"` Req string `json:"req"`
Topic string `json:"topic"` Topic string `json:"topic"`
Channel string `json:"channel"` Channel string `json:"channel"`
Notice string `json:"noticeurl"`
Expire int32 `json:"expire"` Expire int32 `json:"expire"`
Msg string `json:"msg"` Msg string `json:"msg"`
}{} }{}
@ -69,7 +70,7 @@ func (this *UDPListener) serverRead() {
} }
case "sub": case "sub":
if obj.Topic != "" && obj.Channel != "" { if obj.Topic != "" && obj.Channel != "" {
service.MqManager.SubscribeChannel(obj.Topic, obj.Channel) service.MqManager.SubscribeChannel(obj.Topic, obj.Channel, obj.Notice)
retcode = 0 retcode = 0
} }
case "unsub": case "unsub":

View File

@ -0,0 +1,36 @@
package middleware
import (
"f5"
"main/service"
"net/http"
"github.com/gin-gonic/gin"
)
func Subscribe(c *gin.Context) {
topic := c.DefaultQuery("topic", "")
channel := c.DefaultQuery("channel", "")
noticeurl := c.DefaultQuery("noticeurl", "")
service.MqManager.SubscribeChannel(topic, channel, noticeurl)
f5.GetSysLog().Debug("subscribe:%s, %s, %s", topic, channel, noticeurl)
c.String(http.StatusOK, "")
}
func Unsubscribe(c *gin.Context) {
topic := c.DefaultQuery("topic", "")
channel := c.DefaultQuery("channel", "")
service.MqManager.UnsubscribeChannel(topic, channel)
c.String(http.StatusOK, "")
}
func Consume(c *gin.Context) {
topic := c.DefaultQuery("topic", "")
channel := c.DefaultQuery("channel", "")
msg := service.MqManager.ConsumeTopic(topic, channel)
f5.GetSysLog().Debug("consume:%s, %s, %s", topic, channel, msg)
c.String(http.StatusOK, msg)
}

View File

@ -51,6 +51,6 @@ func (this *ConfigTable) GetMaxConcurrentNum() int32 {
func (this *ConfigTable) PostInit1() { func (this *ConfigTable) PostInit1() {
this.selfConf = this.GetById(int64(0)) this.selfConf = this.GetById(int64(0))
if this.selfConf == nil { if this.selfConf == nil {
panic("gamesapi config无法读取本服配置") panic("mqproxy config无法读取本服配置")
} }
} }

View File

@ -15,6 +15,11 @@ func (this *routerMgr) Init() {
redirectGroup := f5.GetApp().GetGinEngine().Group("/sapi") redirectGroup := f5.GetApp().GetGinEngine().Group("/sapi")
redirectGroup.Any("webapp/index.php", middleware.CaForward) redirectGroup.Any("webapp/index.php", middleware.CaForward)
mqGroup := f5.GetApp().GetGinEngine().Group("/mqproxy")
mqGroup.Any("webapp/subscribe.php", middleware.Subscribe)
mqGroup.Any("webapp/unsubscribe.php", middleware.Unsubscribe)
mqGroup.Any("webapp/consume.php", middleware.Consume)
f5.GetSysLog().Info("routerMgr.init") f5.GetSysLog().Info("routerMgr.init")
} }

View File

@ -19,6 +19,7 @@ type mqManager struct {
type topicCache struct { type topicCache struct {
lock *sync.Mutex lock *sync.Mutex
channelMap q5.ConcurrentMap[string, int64] channelMap q5.ConcurrentMap[string, int64]
noticeMap q5.ConcurrentMap[string, string]
msgList []*msgItem msgList []*msgItem
} }
@ -40,7 +41,7 @@ type msgItem struct {
func (this *mqManager) init() { func (this *mqManager) init() {
this.topicMap = q5.ConcurrentMap[string, *topicCache]{} this.topicMap = q5.ConcurrentMap[string, *topicCache]{}
this.lock = new(sync.Mutex) this.lock = new(sync.Mutex)
go this.outputMonitorLog() go this.asyncTasks()
} }
func (this *mqManager) unInit() { func (this *mqManager) unInit() {
@ -61,6 +62,7 @@ func (this *mqManager) PublishTopic(topic, message string, life int32) {
tc := new(topicCache) tc := new(topicCache)
tc.lock = new(sync.Mutex) tc.lock = new(sync.Mutex)
tc.channelMap = q5.ConcurrentMap[string, int64]{} tc.channelMap = q5.ConcurrentMap[string, int64]{}
tc.noticeMap = q5.ConcurrentMap[string, string]{}
tc.msgList = make([]*msgItem, 0) tc.msgList = make([]*msgItem, 0)
this.topicMap.Store(topic, tc) this.topicMap.Store(topic, tc)
t = &tc t = &tc
@ -76,6 +78,19 @@ func (this *mqManager) PublishTopic(topic, message string, life int32) {
return true return true
}) })
} }
(*t).noticeMap.Range(func(channel, noticeurl string) bool {
f5.GetHttpCliMgr().SendGoStylePost(
noticeurl,
map[string]string{},
"",
message,
func(rsp f5.HttpCliResponse) {
})
return true
})
this.Unlock() this.Unlock()
(*t).Lock() (*t).Lock()
@ -84,7 +99,7 @@ func (this *mqManager) PublishTopic(topic, message string, life int32) {
this.IncPublishTimes() this.IncPublishTimes()
} }
func (this *mqManager) SubscribeChannel(topic, channel string) { func (this *mqManager) SubscribeChannel(topic, channel, noticeurl string) {
this.Lock() this.Lock()
t, exist := this.topicMap.Load(topic) t, exist := this.topicMap.Load(topic)
if !exist { if !exist {
@ -95,6 +110,9 @@ func (this *mqManager) SubscribeChannel(topic, channel string) {
t = &tc t = &tc
} }
(*t).channelMap.Store(channel, 0) (*t).channelMap.Store(channel, 0)
if noticeurl != "" {
(*t).noticeMap.Store(channel, noticeurl)
}
this.Unlock() this.Unlock()
} }
@ -103,6 +121,7 @@ func (this *mqManager) UnsubscribeChannel(topic, channel string) {
t, exist := this.topicMap.Load(topic) t, exist := this.topicMap.Load(topic)
if exist { if exist {
(*t).channelMap.Delete(channel) (*t).channelMap.Delete(channel)
(*t).noticeMap.Delete(channel)
} }
this.Unlock() this.Unlock()
} }
@ -163,7 +182,7 @@ func (this *mqManager) IncExpireTimes() {
} }
func (this *mqManager) outputMonitorLog() { func (this *mqManager) asyncTasks() {
logtimes := 0 logtimes := 0
for { for {
f5.GetSysLog().Info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") f5.GetSysLog().Info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")