From 9b0fa4e908f421ab362a0b332161fb010726eb72 Mon Sep 17 00:00:00 2001 From: yangduo Date: Thu, 19 Sep 2024 14:46:36 +0800 Subject: [PATCH] mqconsume --- bin/mqconsume/bin/.gitkeep | 0 bin/mqconsume/config/confdb.mysql.json | 9 ++ bin/mqconsume/config/config.json | 5 + bin/mqconsume/config/mqconsume.cluster.json | 7 + server/mqconsume/app/app.go | 56 +++++++ server/mqconsume/app/export.go | 12 ++ server/mqconsume/common/types.go | 11 ++ server/mqconsume/constant/constant.go | 14 ++ server/mqconsume/controller/export.go | 5 + server/mqconsume/global/global.go | 65 ++++++++ server/mqconsume/go.mod | 58 +++++++ server/mqconsume/go.sum | 105 +++++++++++++ server/mqconsume/initialize/enter.go | 13 ++ server/mqconsume/main.go | 9 ++ server/mqconsume/makefile | 17 ++ server/mqconsume/middleware/marquee.go | 28 ++++ server/mqconsume/mt/ConfDb.go | 15 ++ server/mqconsume/mt/Config.go | 38 +++++ server/mqconsume/mt/export.go | 29 ++++ server/mqconsume/mt/mqconsumeCluster.go | 34 ++++ server/mqconsume/mtb/mtb.auto_gen.go | 163 ++++++++++++++++++++ server/mqconsume/proto/mt.proto | 28 ++++ server/mqconsume/proto/protoc-gen.bat | 2 + server/mqconsume/router/export.go | 12 ++ server/mqconsume/router/routermgr.go | 23 +++ server/mqconsume/service/export.go | 13 ++ server/mqconsume/service/mqconsumer.go | 108 +++++++++++++ server/mqconsume/service/servicemgr.go | 13 ++ 28 files changed, 892 insertions(+) create mode 100644 bin/mqconsume/bin/.gitkeep create mode 100644 bin/mqconsume/config/confdb.mysql.json create mode 100644 bin/mqconsume/config/config.json create mode 100644 bin/mqconsume/config/mqconsume.cluster.json create mode 100644 server/mqconsume/app/app.go create mode 100644 server/mqconsume/app/export.go create mode 100644 server/mqconsume/common/types.go create mode 100644 server/mqconsume/constant/constant.go create mode 100644 server/mqconsume/controller/export.go create mode 100644 server/mqconsume/global/global.go create mode 100644 server/mqconsume/go.mod create mode 100644 server/mqconsume/go.sum create mode 100644 server/mqconsume/initialize/enter.go create mode 100644 server/mqconsume/main.go create mode 100644 server/mqconsume/makefile create mode 100644 server/mqconsume/middleware/marquee.go create mode 100644 server/mqconsume/mt/ConfDb.go create mode 100644 server/mqconsume/mt/Config.go create mode 100644 server/mqconsume/mt/export.go create mode 100644 server/mqconsume/mt/mqconsumeCluster.go create mode 100644 server/mqconsume/mtb/mtb.auto_gen.go create mode 100644 server/mqconsume/proto/mt.proto create mode 100644 server/mqconsume/proto/protoc-gen.bat create mode 100644 server/mqconsume/router/export.go create mode 100644 server/mqconsume/router/routermgr.go create mode 100644 server/mqconsume/service/export.go create mode 100644 server/mqconsume/service/mqconsumer.go create mode 100644 server/mqconsume/service/servicemgr.go diff --git a/bin/mqconsume/bin/.gitkeep b/bin/mqconsume/bin/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/bin/mqconsume/config/confdb.mysql.json b/bin/mqconsume/config/confdb.mysql.json new file mode 100644 index 00000000..c3477c54 --- /dev/null +++ b/bin/mqconsume/config/confdb.mysql.json @@ -0,0 +1,9 @@ +{ + "host": "mysql-test.kingsome.cn", + "port": 3306, + "user": "root", + "passwd": "keji178", + "database": "confdb_dev_1", + "max_open_conns": 1, + "max_idle_conns": 1 +} diff --git a/bin/mqconsume/config/config.json b/bin/mqconsume/config/config.json new file mode 100644 index 00000000..59cb74cd --- /dev/null +++ b/bin/mqconsume/config/config.json @@ -0,0 +1,5 @@ +{ + "mqproxy_url": "http://127.0.0.1:3010", + "notice_url": "http://127.0.0.1:3012", + "notice_channel_id": "world_room_1" +} \ No newline at end of file diff --git a/bin/mqconsume/config/mqconsume.cluster.json b/bin/mqconsume/config/mqconsume.cluster.json new file mode 100644 index 00000000..5d81fc52 --- /dev/null +++ b/bin/mqconsume/config/mqconsume.cluster.json @@ -0,0 +1,7 @@ +[ + { + "instance_id": 1, + "listen_port": 3013, + "http_listen_port": 3012 + } +] diff --git a/server/mqconsume/app/app.go b/server/mqconsume/app/app.go new file mode 100644 index 00000000..cb3bee35 --- /dev/null +++ b/server/mqconsume/app/app.go @@ -0,0 +1,56 @@ +package app + +import ( + "f5" + "main/constant" + "main/mt" +) + +type app struct { + initCb func() + unInitCb func() +} + +func (this *app) GetPkgName() string { + return "mqconsume" +} + +func (this *app) GetHttpListenPort() int32 { + return mt.Table.MqconsumeCluster.GetHttpListenPort() +} + +func (this *app) Run(initCb func(), unInitCb func()) { + this.initCb = initCb + this.unInitCb = unInitCb + f5.Run(this) +} + +func (this *app) Init() { + f5.LoadMetaTable(mt.Table) + this.registerDataSources() + this.initCb() +} + +func (this *app) UnInit() { + this.unInitCb() +} + +func (this *app) Update() { +} + +func (this *app) registerDataSources() { + f5.GetGoStyleDb().RegisterDataSource( + constant.CONF_DB, + mt.Table.ConfDb.GetById(0).GetHost(), + mt.Table.ConfDb.GetById(0).GetPort(), + mt.Table.ConfDb.GetById(0).GetUser(), + mt.Table.ConfDb.GetById(0).GetPasswd(), + mt.Table.ConfDb.GetById(0).GetDatabase(), + 1, + mt.Table.ConfDb.GetById(0).GetMaxOpenConns(), + mt.Table.ConfDb.GetById(0).GetMaxIdleConns()) +} + +func (this *app) HasTask() bool { + return false +} diff --git a/server/mqconsume/app/export.go b/server/mqconsume/app/export.go new file mode 100644 index 00000000..668918d5 --- /dev/null +++ b/server/mqconsume/app/export.go @@ -0,0 +1,12 @@ +package app + +import ( + "main/constant" + "main/global" +) + +var _app = new(app) + +func init() { + global.RegModule(constant.APP_MODULE_IDX, _app) +} diff --git a/server/mqconsume/common/types.go b/server/mqconsume/common/types.go new file mode 100644 index 00000000..e8696e44 --- /dev/null +++ b/server/mqconsume/common/types.go @@ -0,0 +1,11 @@ +package common + +type App interface { + Run(func(), func()) +} + +type RouterGroup interface { +} + +type ServiceMgr interface { +} diff --git a/server/mqconsume/constant/constant.go b/server/mqconsume/constant/constant.go new file mode 100644 index 00000000..35464470 --- /dev/null +++ b/server/mqconsume/constant/constant.go @@ -0,0 +1,14 @@ +package constant + +const ( + CONF_DB = "confdb" +) + +const ( + APP_MODULE_IDX = iota + ROUTER_MODULE_IDX + CONTROLLER_MGR_MODULE_IDX + SERVICE_MGR_MODULE_IDX + LISTENER_MODULE_IDX + MAX_MODULE_IDX +) diff --git a/server/mqconsume/controller/export.go b/server/mqconsume/controller/export.go new file mode 100644 index 00000000..6ea7e9ae --- /dev/null +++ b/server/mqconsume/controller/export.go @@ -0,0 +1,5 @@ +package controller + +import ( + +) diff --git a/server/mqconsume/global/global.go b/server/mqconsume/global/global.go new file mode 100644 index 00000000..f73e95a2 --- /dev/null +++ b/server/mqconsume/global/global.go @@ -0,0 +1,65 @@ +package global + +import ( + "fmt" + "main/common" + "main/constant" + "q5" +) + +var modules [constant.MAX_MODULE_IDX]q5.Module +var initOrders = []int32{ + constant.ROUTER_MODULE_IDX, + constant.SERVICE_MGR_MODULE_IDX, +} + +var app common.App +var serviceMgr common.ServiceMgr + +func GetApp() common.App { + return app +} + +func GetServiceMgr() common.ServiceMgr { + return serviceMgr +} + +func RegModule(idx int32, m q5.Module) { + fmt.Printf("RegModule module %d\n", idx) + modules[idx] = m + switch idx { + case constant.APP_MODULE_IDX: + { + app = m.(common.App) + } + case constant.SERVICE_MGR_MODULE_IDX: + { + serviceMgr = m.(common.ServiceMgr) + } + case constant.ROUTER_MODULE_IDX: + { + + } + case constant.LISTENER_MODULE_IDX: + { + } + default: + { + panic("unknow module") + } + } +} + +func InitModules() { + for _, val := range initOrders { + fmt.Printf("init module %d\n", val) + modules[val].Init() + } +} + +func UnInitModules() { + for _, val := range initOrders { + fmt.Printf("unInit module %d", val) + modules[val].UnInit() + } +} diff --git a/server/mqconsume/go.mod b/server/mqconsume/go.mod new file mode 100644 index 00000000..5a636399 --- /dev/null +++ b/server/mqconsume/go.mod @@ -0,0 +1,58 @@ +module mqconsume + +go 1.20 + +require q5 v1.0.0 + +require f5 v1.0.0 + +require jccommon v1.0.0 + +require main v1.0.0 + +require ( + github.com/gin-gonic/gin v1.9.1 + github.com/google/uuid v1.6.0 +) + +require ( + github.com/bytedance/sonic v1.10.1 // indirect + github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect + github.com/chenzhuoyu/iasm v0.9.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.2 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.15.4 // indirect + github.com/go-sql-driver/mysql v1.7.1 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/gomodule/redigo v1.8.3 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect + github.com/leodido/go-urn v1.2.4 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.11 // indirect + golang.org/x/arch v0.5.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + gorm.io/driver/mysql v1.5.1 // indirect + gorm.io/gorm v1.25.4 // indirect +) + +replace q5 => ../../third_party/q5 + +replace f5 => ../../third_party/f5 + +replace jccommon => ../jccommon + +replace main => ./ diff --git a/server/mqconsume/go.sum b/server/mqconsume/go.sum new file mode 100644 index 00000000..73daf4be --- /dev/null +++ b/server/mqconsume/go.sum @@ -0,0 +1,105 @@ +github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= +github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= +github.com/bytedance/sonic v1.10.1 h1:7a1wuFXL1cMy7a3f7/VFcEtriuXQnUBhtoVfOZiaysc= +github.com/bytedance/sonic v1.10.1/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4= +github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= +github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= +github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0= +github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpVsBuRksnlj1mLy4AWzRNQYxauNi62uWcE3to6eA= +github.com/chenzhuoyu/iasm v0.9.0 h1:9fhXjVzq5hUy2gkhhgHl95zG2cEAhw9OSGs8toWWAwo= +github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= +github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= +github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.15.4 h1:zMXza4EpOdooxPel5xDqXEdXG5r+WggpvnAKMsalBjs= +github.com/go-playground/validator/v10 v10.15.4/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= +github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/gomodule/redigo v1.8.3 h1:HR0kYDX2RJZvAup8CsiJwxB4dTCSC0AaUq6S4SiLwUc= +github.com/gomodule/redigo v1.8.3/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= +github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= +github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= +github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.5.0 h1:jpGode6huXQxcskEIpOCvrU+tzo81b6+oFLUYXWtH/Y= +golang.org/x/arch v0.5.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.5.1 h1:WUEH5VF9obL/lTtzjmML/5e6VfFR/788coz2uaVCAZw= +gorm.io/driver/mysql v1.5.1/go.mod h1:Jo3Xu7mMhCyj8dlrb3WoCaRd1FhsVh+yMXb1jUInf5o= +gorm.io/gorm v1.25.1/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= +gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw= +gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/server/mqconsume/initialize/enter.go b/server/mqconsume/initialize/enter.go new file mode 100644 index 00000000..04ac6528 --- /dev/null +++ b/server/mqconsume/initialize/enter.go @@ -0,0 +1,13 @@ +package initialize + +import ( + _ "main/app" + _ "main/controller" + . "main/global" + _ "main/router" + _ "main/service" +) + +func Init() { + GetApp().Run(InitModules, UnInitModules) +} diff --git a/server/mqconsume/main.go b/server/mqconsume/main.go new file mode 100644 index 00000000..ac0e77e9 --- /dev/null +++ b/server/mqconsume/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "main/initialize" +) + +func main() { + initialize.Init() +} diff --git a/server/mqconsume/makefile b/server/mqconsume/makefile new file mode 100644 index 00000000..ded8907d --- /dev/null +++ b/server/mqconsume/makefile @@ -0,0 +1,17 @@ +compile: + @. /etc/profile + + @export GOPROXY=https://goproxy.io + @go build -gcflags=all="-N -l" -o ../../bin/mqconsume/bin + @echo "compile done" + +debug: + @. /etc/profile + + @export GOPROXY=https://goproxy.io + @go build -gcflags=all="-N -l" -ldflags "-X q5.optDebug=1" -o ../../bin/mqconsume/bin + @echo "compile done" + +clean: + @rm -f ../../bin/mqconsume/bin + @echo "clean done" diff --git a/server/mqconsume/middleware/marquee.go b/server/mqconsume/middleware/marquee.go new file mode 100644 index 00000000..1aec2068 --- /dev/null +++ b/server/mqconsume/middleware/marquee.go @@ -0,0 +1,28 @@ +package middleware + +import ( + "f5" + "main/service" + + "github.com/gin-gonic/gin" +) + +func Marquee(c *gin.Context) { + reqObj := struct { + Content string `json:"content"` + Loop int32 `json:"loop"` + Interval int32 `json:"interval"` + }{} + + if err := c.ShouldBindJSON(&reqObj); err != nil { + f5.RspErr2(c, 1, err.Error()) + return + } + + rsp := service.MqConsumer.RequestMarquee(reqObj.Content, reqObj.Loop, reqObj.Interval) + var code int32 = 0 + if rsp != "" { + code = 1 + } + f5.RspErr2(c, code, rsp) +} diff --git a/server/mqconsume/mt/ConfDb.go b/server/mqconsume/mt/ConfDb.go new file mode 100644 index 00000000..ab948bf7 --- /dev/null +++ b/server/mqconsume/mt/ConfDb.go @@ -0,0 +1,15 @@ +package mt + +import ( + "f5" + "main/mtb" +) + +type ConfDb struct { + mtb.ConfDb +} + +type ConfDbTable struct { + f5.IdMetaTable[ConfDb] + selfConf *ConfDb +} diff --git a/server/mqconsume/mt/Config.go b/server/mqconsume/mt/Config.go new file mode 100644 index 00000000..147bdaa9 --- /dev/null +++ b/server/mqconsume/mt/Config.go @@ -0,0 +1,38 @@ +package mt + +import ( + "f5" + "main/mtb" +) + +type Config struct { + mtb.Config +} + +type ConfigTable struct { + f5.IdMetaTable[Config] + selfConf *Config +} + +func (this *Config) Init1() { + +} + +func (this *ConfigTable) GetMqproxyUrl() string { + return this.selfConf.GetMqproxyUrl() +} + +func (this *ConfigTable) GetNoticeUrl() string { + return this.selfConf.GetNoticeUrl() +} + +func (this *ConfigTable) GetNoticeChannelId() string { + return this.selfConf.GetNoticeChannelId() +} + +func (this *ConfigTable) PostInit1() { + this.selfConf = this.GetById(int64(0)) + if this.selfConf == nil { + panic("mqconsume config无法读取本服配置") + } +} diff --git a/server/mqconsume/mt/export.go b/server/mqconsume/mt/export.go new file mode 100644 index 00000000..b6ab37bb --- /dev/null +++ b/server/mqconsume/mt/export.go @@ -0,0 +1,29 @@ +package mt + +import ( + "f5" +) + +type table struct { + MqconsumeCluster *MqconsumeClusterTable + Config *ConfigTable + ConfDb *ConfDbTable +} + +var Table = f5.New(func(this *table) { + this.MqconsumeCluster = f5.New(func(this *MqconsumeClusterTable) { + this.FileName = "../config/mqconsume.cluster.json" + this.PrimKey = "instance_id" + }) + + this.Config = f5.New(func(this *ConfigTable) { + this.FileName = "../config/config.json" + this.PrimKey = "" + }) + + this.ConfDb = f5.New(func(this *ConfDbTable) { + this.FileName = "../config/confdb.mysql.json" + this.PrimKey = "" + }) + +}) diff --git a/server/mqconsume/mt/mqconsumeCluster.go b/server/mqconsume/mt/mqconsumeCluster.go new file mode 100644 index 00000000..35ee7d1c --- /dev/null +++ b/server/mqconsume/mt/mqconsumeCluster.go @@ -0,0 +1,34 @@ +package mt + +import ( + "f5" + "main/mtb" +) + +type MqconsumeCluster struct { + mtb.MqconsumeCluster +} + +type MqconsumeClusterTable struct { + f5.IdMetaTable[MqconsumeCluster] + selfConf *MqconsumeCluster +} + +func (this *MqconsumeCluster) Init1() { + +} + +func (this *MqconsumeClusterTable) GetListenPort() int32 { + return this.selfConf.GetListenPort() +} + +func (this *MqconsumeClusterTable) GetHttpListenPort() int32 { + return this.selfConf.GetHttpListenPort() +} + +func (this *MqconsumeClusterTable) PostInit1() { + this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId())) + if this.selfConf == nil { + panic("mqconsume集群无法读取本服配置") + } +} diff --git a/server/mqconsume/mtb/mtb.auto_gen.go b/server/mqconsume/mtb/mtb.auto_gen.go new file mode 100644 index 00000000..31ae084c --- /dev/null +++ b/server/mqconsume/mtb/mtb.auto_gen.go @@ -0,0 +1,163 @@ +package mtb + +import ( + "f5" +) + +type MqconsumeCluster struct { + instance_id int32 + listen_port int32 + http_listen_port int32 + + _flags1_ uint64 + _flags2_ uint64 +} + +type Config struct { + mqproxy_url string + notice_url string + notice_channel_id string + + _flags1_ uint64 + _flags2_ uint64 +} + +type ConfDb struct { + host string + port int32 + user string + passwd string + database string + max_open_conns int32 + max_idle_conns int32 + + _flags1_ uint64 + _flags2_ uint64 +} + +func (this *MqconsumeCluster) GetInstanceId() int32 { + return this.instance_id +} + +func (this *MqconsumeCluster) HasInstanceId() bool { + return (this._flags1_ & (uint64(1) << 1)) > 0 +} + +func (this *MqconsumeCluster) GetListenPort() int32 { + return this.listen_port +} + +func (this *MqconsumeCluster) HasListenPort() bool { + return (this._flags1_ & (uint64(1) << 2)) > 0 +} + +func (this *MqconsumeCluster) GetHttpListenPort() int32 { + return this.http_listen_port +} + +func (this *MqconsumeCluster) HasHttpListenPort() bool { + return (this._flags1_ & (uint64(1) << 3)) > 0 +} + +func (this *Config) GetMqproxyUrl() string { + return this.mqproxy_url +} + +func (this *Config) HasMqproxyUrl() bool { + return (this._flags1_ & (uint64(1) << 1)) > 0 +} + +func (this *Config) GetNoticeUrl() string { + return this.notice_url +} + +func (this *Config) HasNoticeUrl() bool { + return (this._flags1_ & (uint64(1) << 2)) > 0 +} + +func (this *Config) GetNoticeChannelId() string { + return this.notice_channel_id +} + +func (this *Config) HasNoticeChannelId() bool { + return (this._flags1_ & (uint64(1) << 3)) > 0 +} + +func (this *ConfDb) GetHost() string { + return this.host +} + +func (this *ConfDb) HasHost() bool { + return (this._flags1_ & (uint64(1) << 1)) > 0 +} + +func (this *ConfDb) GetPort() int32 { + return this.port +} + +func (this *ConfDb) HasPort() bool { + return (this._flags1_ & (uint64(1) << 2)) > 0 +} + +func (this *ConfDb) GetUser() string { + return this.user +} + +func (this *ConfDb) HasUser() bool { + return (this._flags1_ & (uint64(1) << 3)) > 0 +} + +func (this *ConfDb) GetPasswd() string { + return this.passwd +} + +func (this *ConfDb) HasPasswd() bool { + return (this._flags1_ & (uint64(1) << 4)) > 0 +} + +func (this *ConfDb) GetDatabase() string { + return this.database +} + +func (this *ConfDb) HasDatabase() bool { + return (this._flags1_ & (uint64(1) << 5)) > 0 +} + +func (this *ConfDb) GetMaxOpenConns() int32 { + return this.max_open_conns +} + +func (this *ConfDb) HasMaxOpenConns() bool { + return (this._flags1_ & (uint64(1) << 6)) > 0 +} + +func (this *ConfDb) GetMaxIdleConns() int32 { + return this.max_idle_conns +} + +func (this *ConfDb) HasMaxIdleConns() bool { + return (this._flags1_ & (uint64(1) << 7)) > 0 +} + + +func (this *MqconsumeCluster) LoadFromKv(kv map[string]interface{}) { + f5.ReadMetaTableField(&this.instance_id, "instance_id", &this._flags1_, 1, kv) + f5.ReadMetaTableField(&this.listen_port, "listen_port", &this._flags1_, 2, kv) + f5.ReadMetaTableField(&this.http_listen_port, "http_listen_port", &this._flags1_, 3, kv) +} + +func (this *Config) LoadFromKv(kv map[string]interface{}) { + f5.ReadMetaTableField(&this.mqproxy_url, "mqproxy_url", &this._flags1_, 1, kv) + f5.ReadMetaTableField(&this.notice_url, "notice_url", &this._flags1_, 2, kv) + f5.ReadMetaTableField(&this.notice_channel_id, "notice_channel_id", &this._flags1_, 3, kv) +} + +func (this *ConfDb) LoadFromKv(kv map[string]interface{}) { + f5.ReadMetaTableField(&this.host, "host", &this._flags1_, 1, kv) + f5.ReadMetaTableField(&this.port, "port", &this._flags1_, 2, kv) + f5.ReadMetaTableField(&this.user, "user", &this._flags1_, 3, kv) + f5.ReadMetaTableField(&this.passwd, "passwd", &this._flags1_, 4, kv) + f5.ReadMetaTableField(&this.database, "database", &this._flags1_, 5, kv) + f5.ReadMetaTableField(&this.max_open_conns, "max_open_conns", &this._flags1_, 6, kv) + f5.ReadMetaTableField(&this.max_idle_conns, "max_idle_conns", &this._flags1_, 7, kv) +} diff --git a/server/mqconsume/proto/mt.proto b/server/mqconsume/proto/mt.proto new file mode 100644 index 00000000..75e307af --- /dev/null +++ b/server/mqconsume/proto/mt.proto @@ -0,0 +1,28 @@ +package mt; + +option go_package = ".;mt"; + +message MqconsumeCluster +{ + optional int32 instance_id = 1; + optional int32 listen_port = 2; + optional int32 http_listen_port = 3; +} + +message Config +{ + optional string mqproxy_url = 1; + optional string notice_url = 2; + optional string notice_channel_id = 3; +} + +message ConfDb +{ + optional string host = 1; + optional int32 port = 2; + optional string user = 3; + optional string passwd = 4; + optional string database = 5; + optional int32 max_open_conns = 6; + optional int32 max_idle_conns = 7; +} diff --git a/server/mqconsume/proto/protoc-gen.bat b/server/mqconsume/proto/protoc-gen.bat new file mode 100644 index 00000000..61b25a95 --- /dev/null +++ b/server/mqconsume/proto/protoc-gen.bat @@ -0,0 +1,2 @@ +protoc --go_out=..\cs .\cs_*.proto +protoc --go_out=..\ss .\ss_*.proto diff --git a/server/mqconsume/router/export.go b/server/mqconsume/router/export.go new file mode 100644 index 00000000..4c2a87fa --- /dev/null +++ b/server/mqconsume/router/export.go @@ -0,0 +1,12 @@ +package router + +import ( + "main/constant" + "main/global" +) + +var _routerMgr = new(routerMgr) + +func init() { + global.RegModule(constant.ROUTER_MODULE_IDX, _routerMgr) +} diff --git a/server/mqconsume/router/routermgr.go b/server/mqconsume/router/routermgr.go new file mode 100644 index 00000000..56df3151 --- /dev/null +++ b/server/mqconsume/router/routermgr.go @@ -0,0 +1,23 @@ +package router + +import ( + "f5" + "main/middleware" + //. "main/global" + //"main/router/system" +) + +type routerMgr struct { + //system system.RouterGroup +} + +func (this *routerMgr) Init() { + mqGroup := f5.GetApp().GetGinEngine().Group("/mqconsumer") + mqGroup.POST("webapp/marquee.php", middleware.Marquee) + + f5.GetSysLog().Info("routerMgr.init") +} + +func (this *routerMgr) UnInit() { + +} diff --git a/server/mqconsume/service/export.go b/server/mqconsume/service/export.go new file mode 100644 index 00000000..374f63cc --- /dev/null +++ b/server/mqconsume/service/export.go @@ -0,0 +1,13 @@ +package service + +import ( + "main/constant" + "main/global" +) + +var _serviceMgr = new(serviceMgr) +var MqConsumer *mqConsumer + +func init() { + global.RegModule(constant.SERVICE_MGR_MODULE_IDX, _serviceMgr) +} diff --git a/server/mqconsume/service/mqconsumer.go b/server/mqconsume/service/mqconsumer.go new file mode 100644 index 00000000..791a817c --- /dev/null +++ b/server/mqconsume/service/mqconsumer.go @@ -0,0 +1,108 @@ +package service + +import ( + "crypto/sha1" + "encoding/json" + "f5" + "fmt" + "io" + "main/mt" + "q5" + "time" +) + +type mqConsumer struct { +} + +func (this *mqConsumer) init() { + subscribelist := map[string]string{ + "marquee": mt.Table.Config.GetNoticeUrl() + "/mqconsumer/webapp/marquee.php", + } + + go this.SubscribeTopics(subscribelist) +} + +func (this *mqConsumer) unInit() { +} + +func (this *mqConsumer) RequestMarquee(content string, loop, interval int32) (rspstr string) { + + appkey := "YOUME1838B3633FF1410BDC9124BBD806F245B9D2E5AC" + identifier := "admin" + appsecret := "q6B570yTyj/00Nk4mYZtgDwyew5v05t13V1vo4mxpEuAaWUiinAyVxG41sNu3vsFe8sipOLfKfIVYGhzpQrqzvj5sId3mrBfj/s65a2gp36yDrI/nX5BnUAJB317SEosR6xLoPuhBvHU+/1DWI7nKSKaRNxnQiC46PJKFc2kX50BAAE=" + channelid := mt.Table.Config.GetNoticeChannelId() + data := map[string]interface{}{ + "Notice": map[string]interface{}{ + "ChannelID": channelid, //频道 ID,app 内唯一,字符串 + "NoticeType": 1, //1 为跑马灯公告,2 为聊天框公告,3 为置顶公告,整数 + "LoopType": 1, // 公告循环类型,1 为一次性公告,2 为周期性公告,整数 + "Title": "", // 公告标题,字符串 + "Content": content, // 公告内容,字符串 + "LinkKeyWords": "", // 链接关键字,字符串 + "LinkAddr": "", // 链接地址,字符串 + "Creator": "", // 公告添加人,字符串 + "EnableStatus": 2, // 公告启用状态,1 为停用,2 为启用,整数 + "SendStartTime": "", // 公告发送时间,格式为 HH:MM:SS,字符串 + "SendInstantly": 1, // 是否即时发送,也属于公告 json 里的字段。1 是,0 否,不指定则不起作用 + "SendTimes": loop, // 公告发送次数,整数(需要传入) + "SendInterval": interval, // 公告发送多次时,每次的间隔,单位秒,整数(需要传入) + }} + + url := "https://sgapi.youme.im/v1/im/add_notice" + if f5.IsTestEnv() { + url = "https://api.youme.im/v1/im/add_notice" + } + curtime := q5.ToString(f5.GetApp().GetRealSeconds()) + sha1c := sha1.New() + io.WriteString(sha1c, appsecret+curtime) + checksum := fmt.Sprintf("%x", sha1c.Sum(nil)) + + params := map[string]string{ + "appkey": appkey, + "identifier": identifier, + "curtime": curtime, + "checksum": checksum, + } + + d, _ := json.Marshal(data) + f5.GetHttpCliMgr().SendGoStyleJsonRspPost( + url, + params, + nil, + "application/json", + string(d), + func(rsp f5.HttpCliResponse) { + if rsp.GetErr() != nil { + rspstr = rsp.GetErr().Error() + return + } + // f5.GetSysLog().Debug("marquee request rsp:%s", rsp.GetRawData()) + }) + return rspstr +} + +func (this *mqConsumer) SubscribeTopics(topics map[string]string) { + channel := fmt.Sprintf("%s%03x", f5.GetApp().GetPkgName(), f5.GetApp().GetInstanceId()) + suburl := mt.Table.Config.GetMqproxyUrl() + "/mqproxy/webapp/subscribe.php" + for len(topics) > 0 { + subscribedTopics := map[string]int{} + for topicitem, cburl := range topics { + f5.GetHttpCliMgr().SendGoStyleRequest( + suburl, + map[string]string{ + "topic": topicitem, + "channel": channel, + "noticeurl": cburl, + }, + func(hcr f5.HttpCliResponse) { + if hcr.GetErr() == nil && hcr.GetRawData() == "" { + subscribedTopics[topicitem] = 0 + } + }) + } + for subitem := range subscribedTopics { + delete(topics, subitem) + } + time.Sleep(time.Second * 10) + } +} diff --git a/server/mqconsume/service/servicemgr.go b/server/mqconsume/service/servicemgr.go new file mode 100644 index 00000000..593242b2 --- /dev/null +++ b/server/mqconsume/service/servicemgr.go @@ -0,0 +1,13 @@ +package service + +type serviceMgr struct { +} + +func (this *serviceMgr) Init() { + MqConsumer = new(mqConsumer) + MqConsumer.init() +} + +func (this *serviceMgr) UnInit() { + MqConsumer.unInit() +}