udp
This commit is contained in:
parent
acd99abe93
commit
17511d90ba
@ -16,7 +16,7 @@ func (this *app) GetPkgName() string {
|
||||
}
|
||||
|
||||
func (this *app) GetHttpListenPort() int32 {
|
||||
return mt.Table.GamesapiCluster.GetHttpListenPort()
|
||||
return mt.Table.MqproxyCluster.GetHttpListenPort()
|
||||
}
|
||||
|
||||
func (this *app) Run(initCb func(), unInitCb func()) {
|
||||
|
@ -9,5 +9,6 @@ const (
|
||||
ROUTER_MODULE_IDX
|
||||
CONTROLLER_MGR_MODULE_IDX
|
||||
SERVICE_MGR_MODULE_IDX
|
||||
LISTENER_MODULE_IDX
|
||||
MAX_MODULE_IDX
|
||||
)
|
||||
|
@ -11,6 +11,7 @@ var modules [constant.MAX_MODULE_IDX]q5.Module
|
||||
var initOrders = []int32{
|
||||
constant.SERVICE_MGR_MODULE_IDX,
|
||||
constant.ROUTER_MODULE_IDX,
|
||||
constant.LISTENER_MODULE_IDX,
|
||||
}
|
||||
|
||||
var app common.App
|
||||
@ -37,6 +38,12 @@ func RegModule(idx int32, m q5.Module) {
|
||||
serviceMgr = m.(common.ServiceMgr)
|
||||
}
|
||||
case constant.ROUTER_MODULE_IDX:
|
||||
{
|
||||
|
||||
}
|
||||
case constant.LISTENER_MODULE_IDX:
|
||||
{
|
||||
}
|
||||
default:
|
||||
{
|
||||
panic("unknow module")
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
_ "main/app"
|
||||
_ "main/controller"
|
||||
. "main/global"
|
||||
_ "main/listener"
|
||||
_ "main/router"
|
||||
_ "main/service"
|
||||
)
|
||||
|
12
server/mqproxy/listener/export.go
Normal file
12
server/mqproxy/listener/export.go
Normal file
@ -0,0 +1,12 @@
|
||||
package listener
|
||||
|
||||
import (
|
||||
"main/constant"
|
||||
"main/global"
|
||||
)
|
||||
|
||||
var _udpListener = new(UDPListener)
|
||||
|
||||
func init() {
|
||||
global.RegModule(constant.LISTENER_MODULE_IDX, _udpListener)
|
||||
}
|
101
server/mqproxy/listener/udplistener.go
Normal file
101
server/mqproxy/listener/udplistener.go
Normal file
@ -0,0 +1,101 @@
|
||||
package listener
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"main/service"
|
||||
"net"
|
||||
|
||||
"jccommon"
|
||||
"main/mt"
|
||||
"q5"
|
||||
// . "main/global"
|
||||
)
|
||||
|
||||
type UDPListener struct {
|
||||
// ss.MsgHandlerImpl
|
||||
udpconn *net.UDPConn
|
||||
ch chan bool
|
||||
}
|
||||
|
||||
func (this *UDPListener) Init() {
|
||||
this.ch = make(chan bool, 64)
|
||||
udpAddr, _ := net.ResolveUDPAddr("udp", "0.0.0.0:"+
|
||||
q5.ToString(mt.Table.MqproxyCluster.GetListenPort()))
|
||||
conn, err := net.ListenUDP("udp", udpAddr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
} else {
|
||||
this.udpconn = conn
|
||||
// go this.serverRead()
|
||||
go this.serverRead2()
|
||||
}
|
||||
}
|
||||
|
||||
func (this *UDPListener) UnInit() {
|
||||
if this.udpconn != nil {
|
||||
this.udpconn.Close()
|
||||
}
|
||||
|
||||
close(this.ch)
|
||||
}
|
||||
|
||||
var UDPPACKHEAD = 2
|
||||
|
||||
func (this *UDPListener) serverRead() {
|
||||
for {
|
||||
this.ch <- true
|
||||
go func(conn *net.UDPConn, ch chan bool) {
|
||||
buf := make([]byte, jccommon.MAX_PACKET_LEN+UDPPACKHEAD)
|
||||
bufLen, _, err := this.udpconn.ReadFrom(buf)
|
||||
if err == nil && bufLen > UDPPACKHEAD {
|
||||
dataLen := q5.MkUInt16(buf[0], buf[1])
|
||||
if bufLen >= int(UDPPACKHEAD)+int(dataLen) {
|
||||
obj := struct {
|
||||
Topic string `json:"topic"`
|
||||
Expire int32 `json:"expire"`
|
||||
Msg string `json:"msg"`
|
||||
}{}
|
||||
data := buf[UDPPACKHEAD : UDPPACKHEAD+int(dataLen)]
|
||||
if json.Unmarshal(data, &obj) == nil {
|
||||
expire := obj.Expire
|
||||
if expire > 86400*7 {
|
||||
expire = 86400 * 7
|
||||
}
|
||||
service.MqManager.PublishTopic(obj.Topic, obj.Msg, expire)
|
||||
}
|
||||
}
|
||||
}
|
||||
<-ch
|
||||
}(this.udpconn, this.ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (this *UDPListener) serverRead2() {
|
||||
for {
|
||||
buf := make([]byte, jccommon.MAX_PACKET_LEN)
|
||||
bufLen, _, err := this.udpconn.ReadFrom(buf)
|
||||
if err == nil && bufLen > 0 {
|
||||
this.ch <- true
|
||||
recvdata:=buf[:bufLen]
|
||||
go func(data []byte, ch chan bool) {
|
||||
obj := struct {
|
||||
Topic string `json:"topic"`
|
||||
Expire int32 `json:"expire"`
|
||||
Msg string `json:"msg"`
|
||||
}{}
|
||||
if json.Unmarshal(data, &obj) == nil {
|
||||
expire := obj.Expire
|
||||
if expire > 86400*7 {
|
||||
expire = 86400 * 7
|
||||
}
|
||||
service.MqManager.PublishTopic(obj.Topic, obj.Msg, expire)
|
||||
} else {
|
||||
fmt.Println("bad message")
|
||||
}
|
||||
|
||||
<-ch
|
||||
}(recvdata, this.ch)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,134 +1,122 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"f5"
|
||||
"io/ioutil"
|
||||
"jccommon"
|
||||
"main/service"
|
||||
"net/http"
|
||||
net_url "net/url"
|
||||
"q5"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func CaForward(c *gin.Context) {
|
||||
accountId := c.DefaultQuery("account_id", "")
|
||||
sessionId := c.DefaultQuery("session_id", "")
|
||||
// accountId := c.DefaultQuery("account_id", "")
|
||||
// sessionId := c.DefaultQuery("session_id", "")
|
||||
|
||||
if !jccommon.IsValidSessionId(accountId, sessionId) {
|
||||
f5.RspErr(c, 500, "invalid session_id")
|
||||
c.Abort()
|
||||
service.SApiForward.IncInvalidSessionTimes()
|
||||
return
|
||||
}
|
||||
// if !jccommon.IsValidSessionId(accountId, sessionId) {
|
||||
// f5.RspErr(c, 500, "invalid session_id")
|
||||
// c.Abort()
|
||||
// service.SApiForward.IncInvalidSessionTimes()
|
||||
// return
|
||||
// }
|
||||
|
||||
cLock := service.SApiForward.AcquireLock(accountId)
|
||||
if cLock == nil {
|
||||
f5.RspErr(c, 500, "system busy")
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
defer service.SApiForward.ReleaseLock(cLock)
|
||||
service.SApiForward.IncTotalTimes()
|
||||
beginTick := q5.GetTickCount()
|
||||
defer func() {
|
||||
costTime := q5.GetTickCount() - beginTick
|
||||
service.SApiForward.UpdateCostTime(costTime)
|
||||
}()
|
||||
downStreamUrl, downStreamHost := service.SApiForward.GetDownStreamHost();
|
||||
newUrl := downStreamUrl + c.Request.URL.Path[5:]
|
||||
if !q5.StrContains(newUrl, "?") {
|
||||
newUrl = newUrl + "?"
|
||||
}
|
||||
params := []*[]string{}
|
||||
nonce := uuid.New().String()
|
||||
nowTime := f5.GetApp().GetRealSeconds()
|
||||
u := net_url.Values{}
|
||||
{
|
||||
for k, v := range c.Request.URL.Query() {
|
||||
u.Set(k, v[0])
|
||||
q5.AppendSlice(¶ms, &[]string{k, v[0]})
|
||||
}
|
||||
u.Set("__nonce", nonce)
|
||||
u.Set("__timestamp", q5.ToString(nowTime))
|
||||
}
|
||||
// cLock := service.SApiForward.AcquireLock(accountId)
|
||||
// if cLock == nil {
|
||||
// f5.RspErr(c, 500, "system busy")
|
||||
// c.Abort()
|
||||
// return
|
||||
// }
|
||||
// defer service.SApiForward.ReleaseLock(cLock)
|
||||
// service.SApiForward.IncTotalTimes()
|
||||
// beginTick := q5.GetTickCount()
|
||||
// defer func() {
|
||||
// costTime := q5.GetTickCount() - beginTick
|
||||
// service.SApiForward.UpdateCostTime(costTime)
|
||||
// }()
|
||||
// downStreamUrl, downStreamHost := service.SApiForward.GetDownStreamHost();
|
||||
// newUrl := downStreamUrl + c.Request.URL.Path[5:]
|
||||
// if !q5.StrContains(newUrl, "?") {
|
||||
// newUrl = newUrl + "?"
|
||||
// }
|
||||
// params := []*[]string{}
|
||||
// nonce := uuid.New().String()
|
||||
// nowTime := f5.GetApp().GetRealSeconds()
|
||||
// u := net_url.Values{}
|
||||
// {
|
||||
// for k, v := range c.Request.URL.Query() {
|
||||
// u.Set(k, v[0])
|
||||
// q5.AppendSlice(¶ms, &[]string{k, v[0]})
|
||||
// }
|
||||
// u.Set("__nonce", nonce)
|
||||
// u.Set("__timestamp", q5.ToString(nowTime))
|
||||
// }
|
||||
|
||||
var httpRequest *http.Request
|
||||
var createErr error
|
||||
switch strings.ToUpper(c.Request.Method) {
|
||||
case "GET":
|
||||
{
|
||||
service.SApiForward.IncGetTimes()
|
||||
u.Set("__sign", service.SApiForward.Sign(params, nonce, nowTime, ""))
|
||||
newUrl += u.Encode()
|
||||
httpRequest, createErr = http.NewRequest("GET", newUrl, nil)
|
||||
if !f5.IsOnlineEnv() {
|
||||
f5.GetSysLog().Info("CaForward method:%s newUrl:%s ", c.Request.Method, newUrl)
|
||||
}
|
||||
}
|
||||
case "POST":
|
||||
{
|
||||
service.SApiForward.IncPostTimes()
|
||||
if postData, err := c.GetRawData(); err == nil {
|
||||
u.Set("__sign", service.SApiForward.Sign(params, nonce, nowTime, string(postData)))
|
||||
newUrl += u.Encode()
|
||||
httpRequest, createErr = http.NewRequest("POST", newUrl, bytes.NewBuffer(postData))
|
||||
contentType := c.GetHeader("Content-Type")
|
||||
if contentType != "" {
|
||||
httpRequest.Header.Set("Content-Type", contentType)
|
||||
}
|
||||
if !f5.IsOnlineEnv() {
|
||||
f5.GetSysLog().Info("CaForward method:%s newUrl:%s Content-Type:%s postData:%s",
|
||||
c.Request.Method,
|
||||
newUrl,
|
||||
contentType,
|
||||
postData)
|
||||
}
|
||||
} else {
|
||||
createErr = err
|
||||
}
|
||||
}
|
||||
default:
|
||||
{
|
||||
createErr = errors.New("method error")
|
||||
}
|
||||
}
|
||||
if createErr != nil {
|
||||
service.SApiForward.IncCreateErrTimes()
|
||||
f5.RspErr(c, 500, "create request error")
|
||||
c.Abort()
|
||||
f5.GetSysLog().Info("CaForward create request url:%s error:%s", newUrl, createErr)
|
||||
return
|
||||
}
|
||||
if downStreamHost != "" {
|
||||
httpRequest.Host = downStreamHost
|
||||
}
|
||||
client := &http.Client{}
|
||||
if resp, err := client.Do(httpRequest); err == nil {
|
||||
defer resp.Body.Close()
|
||||
if bytes, err := ioutil.ReadAll(resp.Body); err == nil {
|
||||
service.SApiForward.IncOkTimes()
|
||||
c.String(200, string(bytes))
|
||||
c.Abort()
|
||||
return
|
||||
} else {
|
||||
service.SApiForward.IncReadRspErrTimes()
|
||||
f5.RspErr(c, 500, "read response error")
|
||||
c.Abort()
|
||||
f5.GetSysLog().Info("CaForward read response url:%s eror:%s", newUrl, err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
service.SApiForward.IncDoErrTimes()
|
||||
f5.RspErr(c, 500, "client.Do error")
|
||||
c.Abort()
|
||||
f5.GetSysLog().Info("CaForward client.Do url:%s error:%s", newUrl, err)
|
||||
return
|
||||
}
|
||||
// var httpRequest *http.Request
|
||||
// var createErr error
|
||||
// switch strings.ToUpper(c.Request.Method) {
|
||||
// case "GET":
|
||||
// {
|
||||
// service.SApiForward.IncGetTimes()
|
||||
// u.Set("__sign", service.SApiForward.Sign(params, nonce, nowTime, ""))
|
||||
// newUrl += u.Encode()
|
||||
// httpRequest, createErr = http.NewRequest("GET", newUrl, nil)
|
||||
// if !f5.IsOnlineEnv() {
|
||||
// f5.GetSysLog().Info("CaForward method:%s newUrl:%s ", c.Request.Method, newUrl)
|
||||
// }
|
||||
// }
|
||||
// case "POST":
|
||||
// {
|
||||
// service.SApiForward.IncPostTimes()
|
||||
// if postData, err := c.GetRawData(); err == nil {
|
||||
// u.Set("__sign", service.SApiForward.Sign(params, nonce, nowTime, string(postData)))
|
||||
// newUrl += u.Encode()
|
||||
// httpRequest, createErr = http.NewRequest("POST", newUrl, bytes.NewBuffer(postData))
|
||||
// contentType := c.GetHeader("Content-Type")
|
||||
// if contentType != "" {
|
||||
// httpRequest.Header.Set("Content-Type", contentType)
|
||||
// }
|
||||
// if !f5.IsOnlineEnv() {
|
||||
// f5.GetSysLog().Info("CaForward method:%s newUrl:%s Content-Type:%s postData:%s",
|
||||
// c.Request.Method,
|
||||
// newUrl,
|
||||
// contentType,
|
||||
// postData)
|
||||
// }
|
||||
// } else {
|
||||
// createErr = err
|
||||
// }
|
||||
// }
|
||||
// default:
|
||||
// {
|
||||
// createErr = errors.New("method error")
|
||||
// }
|
||||
// }
|
||||
// if createErr != nil {
|
||||
// service.SApiForward.IncCreateErrTimes()
|
||||
// f5.RspErr(c, 500, "create request error")
|
||||
// c.Abort()
|
||||
// f5.GetSysLog().Info("CaForward create request url:%s error:%s", newUrl, createErr)
|
||||
// return
|
||||
// }
|
||||
// if downStreamHost != "" {
|
||||
// httpRequest.Host = downStreamHost
|
||||
// }
|
||||
// client := &http.Client{}
|
||||
// if resp, err := client.Do(httpRequest); err == nil {
|
||||
// defer resp.Body.Close()
|
||||
// if bytes, err := ioutil.ReadAll(resp.Body); err == nil {
|
||||
// service.SApiForward.IncOkTimes()
|
||||
// c.String(200, string(bytes))
|
||||
// c.Abort()
|
||||
// return
|
||||
// } else {
|
||||
// service.SApiForward.IncReadRspErrTimes()
|
||||
// f5.RspErr(c, 500, "read response error")
|
||||
// c.Abort()
|
||||
// f5.GetSysLog().Info("CaForward read response url:%s eror:%s", newUrl, err)
|
||||
// return
|
||||
// }
|
||||
// } else {
|
||||
// service.SApiForward.IncDoErrTimes()
|
||||
// f5.RspErr(c, 500, "client.Do error")
|
||||
// c.Abort()
|
||||
// f5.GetSysLog().Info("CaForward client.Do url:%s error:%s", newUrl, err)
|
||||
// return
|
||||
// }
|
||||
|
||||
}
|
||||
|
@ -1,34 +0,0 @@
|
||||
package mt
|
||||
|
||||
import (
|
||||
"f5"
|
||||
"main/mtb"
|
||||
)
|
||||
|
||||
type GamesapiCluster struct {
|
||||
mtb.GamesapiCluster
|
||||
}
|
||||
|
||||
type GamesapiClusterTable struct {
|
||||
f5.IdMetaTable[GamesapiCluster]
|
||||
selfConf *GamesapiCluster
|
||||
}
|
||||
|
||||
func (this *GamesapiCluster) Init1() {
|
||||
|
||||
}
|
||||
|
||||
func (this *GamesapiClusterTable) GetListenPort() int32 {
|
||||
return this.selfConf.GetListenPort()
|
||||
}
|
||||
|
||||
func (this *GamesapiClusterTable) GetHttpListenPort() int32 {
|
||||
return this.selfConf.GetHttpListenPort()
|
||||
}
|
||||
|
||||
func (this *GamesapiClusterTable) PostInit1() {
|
||||
this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId()))
|
||||
if this.selfConf == nil {
|
||||
panic("gamesapi集群无法读取本服配置")
|
||||
}
|
||||
}
|
@ -5,14 +5,14 @@ import (
|
||||
)
|
||||
|
||||
type table struct {
|
||||
GamesapiCluster *GamesapiClusterTable
|
||||
MqproxyCluster *MqproxyClusterTable
|
||||
Config *ConfigTable
|
||||
ConfDb *ConfDbTable
|
||||
ConfDb *ConfDbTable
|
||||
}
|
||||
|
||||
var Table = f5.New(func(this *table) {
|
||||
this.GamesapiCluster = f5.New(func(this *GamesapiClusterTable) {
|
||||
this.FileName = "../config/gamesapi.cluster.json"
|
||||
this.MqproxyCluster = f5.New(func(this *MqproxyClusterTable) {
|
||||
this.FileName = "../config/mqproxy.cluster.json"
|
||||
this.PrimKey = "instance_id"
|
||||
})
|
||||
|
||||
|
34
server/mqproxy/mt/mqproxyCluster.go
Normal file
34
server/mqproxy/mt/mqproxyCluster.go
Normal file
@ -0,0 +1,34 @@
|
||||
package mt
|
||||
|
||||
import (
|
||||
"f5"
|
||||
"main/mtb"
|
||||
)
|
||||
|
||||
type MqproxyCluster struct {
|
||||
mtb.MqproxyCluster
|
||||
}
|
||||
|
||||
type MqproxyClusterTable struct {
|
||||
f5.IdMetaTable[MqproxyCluster]
|
||||
selfConf *MqproxyCluster
|
||||
}
|
||||
|
||||
func (this *MqproxyCluster) Init1() {
|
||||
|
||||
}
|
||||
|
||||
func (this *MqproxyClusterTable) GetListenPort() int32 {
|
||||
return this.selfConf.GetListenPort()
|
||||
}
|
||||
|
||||
func (this *MqproxyClusterTable) GetHttpListenPort() int32 {
|
||||
return this.selfConf.GetHttpListenPort()
|
||||
}
|
||||
|
||||
func (this *MqproxyClusterTable) PostInit1() {
|
||||
this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId()))
|
||||
if this.selfConf == nil {
|
||||
panic("mqproxy集群无法读取本服配置")
|
||||
}
|
||||
}
|
@ -4,7 +4,7 @@ import (
|
||||
"f5"
|
||||
)
|
||||
|
||||
type GamesapiCluster struct {
|
||||
type MqproxyCluster struct {
|
||||
instance_id int32
|
||||
listen_port int32
|
||||
http_listen_port int32
|
||||
@ -40,27 +40,27 @@ type ConfDb struct {
|
||||
_flags2_ uint64
|
||||
}
|
||||
|
||||
func (this *GamesapiCluster) GetInstanceId() int32 {
|
||||
func (this *MqproxyCluster) GetInstanceId() int32 {
|
||||
return this.instance_id
|
||||
}
|
||||
|
||||
func (this *GamesapiCluster) HasInstanceId() bool {
|
||||
func (this *MqproxyCluster) HasInstanceId() bool {
|
||||
return (this._flags1_ & (uint64(1) << 1)) > 0
|
||||
}
|
||||
|
||||
func (this *GamesapiCluster) GetListenPort() int32 {
|
||||
func (this *MqproxyCluster) GetListenPort() int32 {
|
||||
return this.listen_port
|
||||
}
|
||||
|
||||
func (this *GamesapiCluster) HasListenPort() bool {
|
||||
func (this *MqproxyCluster) HasListenPort() bool {
|
||||
return (this._flags1_ & (uint64(1) << 2)) > 0
|
||||
}
|
||||
|
||||
func (this *GamesapiCluster) GetHttpListenPort() int32 {
|
||||
func (this *MqproxyCluster) GetHttpListenPort() int32 {
|
||||
return this.http_listen_port
|
||||
}
|
||||
|
||||
func (this *GamesapiCluster) HasHttpListenPort() bool {
|
||||
func (this *MqproxyCluster) HasHttpListenPort() bool {
|
||||
return (this._flags1_ & (uint64(1) << 3)) > 0
|
||||
}
|
||||
|
||||
@ -185,7 +185,7 @@ func (this *ConfDb) HasMaxIdleConns() bool {
|
||||
}
|
||||
|
||||
|
||||
func (this *GamesapiCluster) LoadFromKv(kv map[string]interface{}) {
|
||||
func (this *MqproxyCluster) LoadFromKv(kv map[string]interface{}) {
|
||||
f5.ReadMetaTableField(&this.instance_id, "instance_id", &this._flags1_, 1, kv)
|
||||
f5.ReadMetaTableField(&this.listen_port, "listen_port", &this._flags1_, 2, kv)
|
||||
f5.ReadMetaTableField(&this.http_listen_port, "http_listen_port", &this._flags1_, 3, kv)
|
||||
|
@ -2,7 +2,7 @@ package mt;
|
||||
|
||||
option go_package = ".;mt";
|
||||
|
||||
message GamesapiCluster
|
||||
message MqproxyCluster
|
||||
{
|
||||
optional int32 instance_id = 1;
|
||||
optional int32 listen_port = 2;
|
||||
|
@ -6,7 +6,7 @@ import (
|
||||
)
|
||||
|
||||
var _serviceMgr = new(serviceMgr)
|
||||
var SApiForward *sApiForward
|
||||
var MqManager *mqManager
|
||||
|
||||
func init() {
|
||||
global.RegModule(constant.SERVICE_MGR_MODULE_IDX, _serviceMgr)
|
||||
|
147
server/mqproxy/service/mqmgr.go
Normal file
147
server/mqproxy/service/mqmgr.go
Normal file
@ -0,0 +1,147 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"f5"
|
||||
"q5"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type mqManager struct {
|
||||
topicMap q5.ConcurrentMap[string, *topicCache]
|
||||
lock *sync.Mutex
|
||||
publishcount int32
|
||||
consumecount int32
|
||||
expirecount int32
|
||||
}
|
||||
|
||||
type topicCache struct {
|
||||
lock *sync.Mutex
|
||||
msgList []*msgItem
|
||||
}
|
||||
|
||||
func (this *topicCache) Lock() {
|
||||
this.lock.Lock()
|
||||
}
|
||||
|
||||
func (this *topicCache) Unlock() {
|
||||
this.lock.Unlock()
|
||||
}
|
||||
|
||||
type msgItem struct {
|
||||
content string
|
||||
addTime int64
|
||||
expire int64
|
||||
}
|
||||
|
||||
func (this *mqManager) init() {
|
||||
this.topicMap = q5.ConcurrentMap[string, *topicCache]{}
|
||||
this.lock = new(sync.Mutex)
|
||||
go this.outputMonitorLog()
|
||||
}
|
||||
|
||||
func (this *mqManager) unInit() {
|
||||
}
|
||||
|
||||
func (this *mqManager) Lock() {
|
||||
this.lock.Lock()
|
||||
}
|
||||
|
||||
func (this *mqManager) Unlock() {
|
||||
this.lock.Unlock()
|
||||
}
|
||||
|
||||
func (this *mqManager) PublishTopic(topic, message string, life int32) {
|
||||
this.Lock()
|
||||
t, exist := this.topicMap.Load(topic)
|
||||
if !exist {
|
||||
tc := new(topicCache)
|
||||
tc.lock = new(sync.Mutex)
|
||||
tc.msgList = make([]*msgItem, 0)
|
||||
this.topicMap.Store(topic, tc)
|
||||
t = &tc
|
||||
}
|
||||
this.Unlock()
|
||||
|
||||
(*t).Lock()
|
||||
newitem := new(msgItem)
|
||||
newitem.addTime = f5.GetApp().GetRealSeconds()
|
||||
newitem.content = message
|
||||
newitem.expire = newitem.addTime + int64(life)
|
||||
(*t).msgList = append((*t).msgList, newitem)
|
||||
(*t).Unlock()
|
||||
this.IncPublishTimes()
|
||||
}
|
||||
|
||||
func (this *mqManager) ConsumeTopic(topic string) (msg string) {
|
||||
msg = ""
|
||||
t, exist := this.topicMap.Load(topic)
|
||||
if !exist {
|
||||
return
|
||||
}
|
||||
|
||||
(*t).Lock()
|
||||
defer (*t).Unlock()
|
||||
|
||||
if len((*t).msgList) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
msg = (*t).msgList[0].content
|
||||
(*t).msgList = (*t).msgList[1:]
|
||||
|
||||
this.IncConsumeTimes()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *mqManager) IncPublishTimes() {
|
||||
atomic.AddInt32(&this.publishcount, 1)
|
||||
}
|
||||
|
||||
func (this *mqManager) IncConsumeTimes() {
|
||||
atomic.AddInt32(&this.consumecount, 1)
|
||||
}
|
||||
|
||||
func (this *mqManager) IncExpireTimes() {
|
||||
atomic.AddInt32(&this.expirecount, 1)
|
||||
|
||||
}
|
||||
|
||||
func (this *mqManager) outputMonitorLog() {
|
||||
logtimes := 0
|
||||
for {
|
||||
f5.GetSysLog().Info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
|
||||
f5.GetSysLog().Info("publishcount:%d, consumecount:%d, expirecount:%d",
|
||||
this.publishcount,
|
||||
this.consumecount,
|
||||
this.expirecount)
|
||||
f5.GetSysLog().Info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
||||
|
||||
logtimes++
|
||||
if logtimes > 6 {
|
||||
logtimes = 0
|
||||
this.publishcount = 0
|
||||
this.consumecount = 0
|
||||
this.expirecount = 0
|
||||
|
||||
this.Lock()
|
||||
now := f5.GetApp().GetRealSeconds()
|
||||
this.topicMap.Range(func(key string, value *topicCache) bool {
|
||||
value.Lock()
|
||||
defer value.Unlock()
|
||||
for i := 0; i < len(value.msgList); {
|
||||
if value.msgList[i].expire < now {
|
||||
value.msgList = append(value.msgList[:i], value.msgList[i+1:]...)
|
||||
this.IncExpireTimes()
|
||||
} else {
|
||||
i++
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
this.Unlock()
|
||||
}
|
||||
time.Sleep(time.Second * 10)
|
||||
}
|
||||
}
|
@ -4,12 +4,10 @@ type serviceMgr struct {
|
||||
}
|
||||
|
||||
func (this *serviceMgr) Init() {
|
||||
SApiForward = new(sApiForward)
|
||||
SApiForward.init()
|
||||
|
||||
go SApiForward.outputMonitorLog()
|
||||
MqManager = new(mqManager)
|
||||
MqManager.init()
|
||||
}
|
||||
|
||||
func (this *serviceMgr) UnInit() {
|
||||
SApiForward.unInit()
|
||||
MqManager.unInit()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user