This commit is contained in:
yangduo 2024-09-12 16:44:42 +08:00
parent 56a8fa3b25
commit f1771dec9f
2 changed files with 114 additions and 30 deletions

View File

@ -2,6 +2,7 @@ package listener
import ( import (
"encoding/json" "encoding/json"
"f5"
"fmt" "fmt"
"main/service" "main/service"
"net" "net"
@ -74,28 +75,56 @@ func (this *UDPListener) serverRead() {
func (this *UDPListener) serverRead2() { func (this *UDPListener) serverRead2() {
for { for {
buf := make([]byte, jccommon.MAX_PACKET_LEN) buf := make([]byte, jccommon.MAX_PACKET_LEN)
bufLen, _, err := this.udpconn.ReadFrom(buf) bufLen, peer, err := this.udpconn.ReadFromUDP(buf)
if err == nil && bufLen > 0 { if err == nil && bufLen > 0 {
this.ch <- true this.ch <- true
recvdata:=buf[:bufLen] recvdata := buf[:bufLen]
go func(data []byte, ch chan bool) { go func(addr *net.UDPAddr, data []byte, ch chan bool) {
obj := struct { obj := struct {
Topic string `json:"topic"` Req string `json:"req"`
Expire int32 `json:"expire"` Topic string `json:"topic"`
Msg string `json:"msg"` Channel string `json:"channel"`
Expire int32 `json:"expire"`
Msg string `json:"msg"`
}{} }{}
if json.Unmarshal(data, &obj) == nil { if json.Unmarshal(data, &obj) == nil {
expire := obj.Expire retcode := 1
if expire > 86400*7 { content := ""
expire = 86400 * 7 switch obj.Req {
case "pub":
expire := obj.Expire
if expire > 86400*7 {
expire = 86400 * 7
}
if obj.Topic != "" && obj.Msg != "" {
service.MqManager.PublishTopic(obj.Topic, obj.Msg, expire)
retcode = 0
}
case "sub":
if obj.Topic != "" && obj.Msg != "" {
service.MqManager.SubscribeChannel(obj.Topic, obj.Channel)
retcode = 0
}
case "unsub":
if obj.Topic != "" && obj.Msg != "" {
service.MqManager.UnsubscribeChannel(obj.Topic, obj.Channel)
retcode = 0
}
case "consume":
if obj.Topic != "" && obj.Msg != "" {
retcode = 0
content = service.MqManager.ConsumeTopic(obj.Topic, obj.Channel)
}
} }
service.MqManager.PublishTopic(obj.Topic, obj.Msg, expire)
rsp := fmt.Sprintf("%d|%s", retcode, content)
this.udpconn.WriteToUDP([]byte(rsp), addr)
} else { } else {
fmt.Println("bad message") f5.GetSysLog().Info("bad req:%s", string(data))
} }
<-ch <-ch
}(recvdata, this.ch) }(peer, recvdata, this.ch)
} }
} }
} }

View File

@ -9,16 +9,17 @@ import (
) )
type mqManager struct { type mqManager struct {
topicMap q5.ConcurrentMap[string, *topicCache] topicMap q5.ConcurrentMap[string, *topicCache]
lock *sync.Mutex lock *sync.Mutex
publishcount int32 publishcount int32
consumecount int32 consumecount int32
expirecount int32 expirecount int32
} }
type topicCache struct { type topicCache struct {
lock *sync.Mutex lock *sync.Mutex
msgList []*msgItem channelMap q5.ConcurrentMap[string, int64]
msgList []*msgItem
} }
func (this *topicCache) Lock() { func (this *topicCache) Lock() {
@ -30,9 +31,10 @@ func (this *topicCache) Unlock() {
} }
type msgItem struct { type msgItem struct {
content string channels *q5.ConcurrentMap[string, int64]
addTime int64 content string
expire int64 addTime int64
expire int64
} }
func (this *mqManager) init() { func (this *mqManager) init() {
@ -58,24 +60,54 @@ func (this *mqManager) PublishTopic(topic, message string, life int32) {
if !exist { if !exist {
tc := new(topicCache) tc := new(topicCache)
tc.lock = new(sync.Mutex) tc.lock = new(sync.Mutex)
tc.channelMap = q5.ConcurrentMap[string, int64]{}
tc.msgList = make([]*msgItem, 0) tc.msgList = make([]*msgItem, 0)
this.topicMap.Store(topic, tc) this.topicMap.Store(topic, tc)
t = &tc t = &tc
} }
this.Unlock()
(*t).Lock()
newitem := new(msgItem) newitem := new(msgItem)
newitem.addTime = f5.GetApp().GetRealSeconds() newitem.addTime = f5.GetApp().GetRealSeconds()
newitem.content = message newitem.content = message
newitem.expire = newitem.addTime + int64(life) newitem.expire = newitem.addTime + int64(life)
if (*t).channelMap.GetSize() > 0 {
newitem.channels = new(q5.ConcurrentMap[string, int64])
(*t).channelMap.Range(func(key string, value int64) bool {
newitem.channels.Store(key, value)
return true
})
}
this.Unlock()
(*t).Lock()
(*t).msgList = append((*t).msgList, newitem) (*t).msgList = append((*t).msgList, newitem)
(*t).Unlock() (*t).Unlock()
this.IncPublishTimes() this.IncPublishTimes()
} }
func (this *mqManager) ConsumeTopic(topic string) (msg string) { func (this *mqManager) SubscribeChannel(topic, channel string) {
msg = "" this.Lock()
t, exist := this.topicMap.Load(topic)
if !exist {
tc := new(topicCache)
tc.lock = new(sync.Mutex)
tc.msgList = make([]*msgItem, 0)
this.topicMap.Store(topic, tc)
t = &tc
}
(*t).channelMap.Store(channel, 0)
this.Unlock()
}
func (this *mqManager) UnsubscribeChannel(topic, channel string) {
this.Lock()
t, exist := this.topicMap.Load(topic)
if exist {
(*t).channelMap.Delete(channel)
}
this.Unlock()
}
func (this *mqManager) ConsumeTopic(topic, channel string) (msg string) {
t, exist := this.topicMap.Load(topic) t, exist := this.topicMap.Load(topic)
if !exist { if !exist {
return return
@ -88,8 +120,31 @@ func (this *mqManager) ConsumeTopic(topic string) (msg string) {
return return
} }
msg = (*t).msgList[0].content msgidx := -1
(*t).msgList = (*t).msgList[1:] del := false
for index := range (*t).msgList {
if (*t).msgList[index].channels == nil {
msgidx = index
del = true
} else {
_, exist := (*t).msgList[index].channels.LoadAndDelete(channel)
if exist {
msgidx = index
del = (*t).msgList[index].channels.GetSize() == 0
}
}
}
if msgidx > 0 {
msg = (*t).msgList[msgidx].content
if del {
if msgidx+1 < len((*t).msgList) {
(*t).msgList = append((*t).msgList[:msgidx], (*t).msgList[msgidx+1:]...)
} else {
(*t).msgList = (*t).msgList[:msgidx]
}
}
}
this.IncConsumeTimes() this.IncConsumeTimes()
return return