From f1771dec9f58a9e74194f073626d557b8ef3555f Mon Sep 17 00:00:00 2001 From: yangduo Date: Thu, 12 Sep 2024 16:44:42 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/mqproxy/listener/udplistener.go | 55 ++++++++++++---- server/mqproxy/service/mqmgr.go | 89 +++++++++++++++++++++----- 2 files changed, 114 insertions(+), 30 deletions(-) diff --git a/server/mqproxy/listener/udplistener.go b/server/mqproxy/listener/udplistener.go index 7dd6488d..f9394289 100644 --- a/server/mqproxy/listener/udplistener.go +++ b/server/mqproxy/listener/udplistener.go @@ -2,6 +2,7 @@ package listener import ( "encoding/json" + "f5" "fmt" "main/service" "net" @@ -74,28 +75,56 @@ func (this *UDPListener) serverRead() { func (this *UDPListener) serverRead2() { for { buf := make([]byte, jccommon.MAX_PACKET_LEN) - bufLen, _, err := this.udpconn.ReadFrom(buf) - if err == nil && bufLen > 0 { + bufLen, peer, err := this.udpconn.ReadFromUDP(buf) + if err == nil && bufLen > 0 { this.ch <- true - recvdata:=buf[:bufLen] - go func(data []byte, ch chan bool) { + recvdata := buf[:bufLen] + go func(addr *net.UDPAddr, data []byte, ch chan bool) { obj := struct { - Topic string `json:"topic"` - Expire int32 `json:"expire"` - Msg string `json:"msg"` + Req string `json:"req"` + Topic string `json:"topic"` + Channel string `json:"channel"` + Expire int32 `json:"expire"` + Msg string `json:"msg"` }{} if json.Unmarshal(data, &obj) == nil { - expire := obj.Expire - if expire > 86400*7 { - expire = 86400 * 7 + retcode := 1 + content := "" + switch obj.Req { + case "pub": + expire := obj.Expire + if expire > 86400*7 { + expire = 86400 * 7 + } + if obj.Topic != "" && obj.Msg != "" { + service.MqManager.PublishTopic(obj.Topic, obj.Msg, expire) + retcode = 0 + } + case "sub": + if obj.Topic != "" && obj.Msg != "" { + service.MqManager.SubscribeChannel(obj.Topic, obj.Channel) + retcode = 0 + } + case "unsub": + if obj.Topic != "" && obj.Msg != "" { + service.MqManager.UnsubscribeChannel(obj.Topic, obj.Channel) + retcode = 0 + } + case "consume": + if obj.Topic != "" && obj.Msg != "" { + retcode = 0 + content = service.MqManager.ConsumeTopic(obj.Topic, obj.Channel) + } } - service.MqManager.PublishTopic(obj.Topic, obj.Msg, expire) + + rsp := fmt.Sprintf("%d|%s", retcode, content) + this.udpconn.WriteToUDP([]byte(rsp), addr) } else { - fmt.Println("bad message") + f5.GetSysLog().Info("bad req:%s", string(data)) } <-ch - }(recvdata, this.ch) + }(peer, recvdata, this.ch) } } } diff --git a/server/mqproxy/service/mqmgr.go b/server/mqproxy/service/mqmgr.go index 63876a9d..9060c6f7 100644 --- a/server/mqproxy/service/mqmgr.go +++ b/server/mqproxy/service/mqmgr.go @@ -9,16 +9,17 @@ import ( ) type mqManager struct { - topicMap q5.ConcurrentMap[string, *topicCache] - lock *sync.Mutex - publishcount int32 - consumecount int32 - expirecount int32 + topicMap q5.ConcurrentMap[string, *topicCache] + lock *sync.Mutex + publishcount int32 + consumecount int32 + expirecount int32 } type topicCache struct { - lock *sync.Mutex - msgList []*msgItem + lock *sync.Mutex + channelMap q5.ConcurrentMap[string, int64] + msgList []*msgItem } func (this *topicCache) Lock() { @@ -30,9 +31,10 @@ func (this *topicCache) Unlock() { } type msgItem struct { - content string - addTime int64 - expire int64 + channels *q5.ConcurrentMap[string, int64] + content string + addTime int64 + expire int64 } func (this *mqManager) init() { @@ -58,24 +60,54 @@ func (this *mqManager) PublishTopic(topic, message string, life int32) { if !exist { tc := new(topicCache) tc.lock = new(sync.Mutex) + tc.channelMap = q5.ConcurrentMap[string, int64]{} tc.msgList = make([]*msgItem, 0) this.topicMap.Store(topic, tc) t = &tc } - this.Unlock() - - (*t).Lock() newitem := new(msgItem) newitem.addTime = f5.GetApp().GetRealSeconds() newitem.content = message newitem.expire = newitem.addTime + int64(life) + if (*t).channelMap.GetSize() > 0 { + newitem.channels = new(q5.ConcurrentMap[string, int64]) + (*t).channelMap.Range(func(key string, value int64) bool { + newitem.channels.Store(key, value) + return true + }) + } + this.Unlock() + + (*t).Lock() (*t).msgList = append((*t).msgList, newitem) (*t).Unlock() this.IncPublishTimes() } -func (this *mqManager) ConsumeTopic(topic string) (msg string) { - msg = "" +func (this *mqManager) SubscribeChannel(topic, channel string) { + this.Lock() + t, exist := this.topicMap.Load(topic) + if !exist { + tc := new(topicCache) + tc.lock = new(sync.Mutex) + tc.msgList = make([]*msgItem, 0) + this.topicMap.Store(topic, tc) + t = &tc + } + (*t).channelMap.Store(channel, 0) + this.Unlock() +} + +func (this *mqManager) UnsubscribeChannel(topic, channel string) { + this.Lock() + t, exist := this.topicMap.Load(topic) + if exist { + (*t).channelMap.Delete(channel) + } + this.Unlock() +} + +func (this *mqManager) ConsumeTopic(topic, channel string) (msg string) { t, exist := this.topicMap.Load(topic) if !exist { return @@ -88,8 +120,31 @@ func (this *mqManager) ConsumeTopic(topic string) (msg string) { return } - msg = (*t).msgList[0].content - (*t).msgList = (*t).msgList[1:] + msgidx := -1 + del := false + for index := range (*t).msgList { + if (*t).msgList[index].channels == nil { + msgidx = index + del = true + } else { + _, exist := (*t).msgList[index].channels.LoadAndDelete(channel) + if exist { + msgidx = index + del = (*t).msgList[index].channels.GetSize() == 0 + } + } + } + + if msgidx > 0 { + msg = (*t).msgList[msgidx].content + if del { + if msgidx+1 < len((*t).msgList) { + (*t).msgList = append((*t).msgList[:msgidx], (*t).msgList[msgidx+1:]...) + } else { + (*t).msgList = (*t).msgList[:msgidx] + } + } + } this.IncConsumeTimes() return