From 94e634fd9b25d31d97599e4abb789b827316f613 Mon Sep 17 00:00:00 2001 From: yangduo Date: Mon, 2 Sep 2024 11:58:21 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/apigate/app/app.go | 13 +++- server/apigate/constant/constant.go | 4 ++ server/apigate/middleware/caforward.go | 44 +++++++----- server/apigate/mt/ApigateCluster.go | 34 +++++++++ server/apigate/mt/ConfDb.go | 15 ++++ server/apigate/mt/Config.go | 18 +++++ server/apigate/mt/GamesapiCluster.go | 34 --------- server/apigate/mt/export.go | 14 ++-- server/apigate/mtb/mtb.auto_gen.go | 95 ++++++++++++++++++++++--- server/apigate/proto/mt.proto | 13 +++- server/apigate/service/sapi_forward.go | 54 ++++++++++++++ server/gamesapi/service/sapi_forward.go | 12 ++-- 12 files changed, 280 insertions(+), 70 deletions(-) create mode 100644 server/apigate/mt/ApigateCluster.go create mode 100644 server/apigate/mt/ConfDb.go delete mode 100644 server/apigate/mt/GamesapiCluster.go diff --git a/server/apigate/app/app.go b/server/apigate/app/app.go index 79c9b12c..429c1639 100644 --- a/server/apigate/app/app.go +++ b/server/apigate/app/app.go @@ -3,6 +3,7 @@ package app import ( "f5" //. "main/global" + "main/constant" "main/mt" ) @@ -16,7 +17,7 @@ func (this *app) GetPkgName() string { } func (this *app) GetHttpListenPort() int32 { - return mt.Table.GamesapiCluster.GetHttpListenPort() + return mt.Table.ApigateCluster.GetHttpListenPort() } func (this *app) Run(initCb func(), unInitCb func()) { @@ -39,6 +40,16 @@ func (this *app) Update() { } func (this *app) registerDataSources() { + f5.GetGoStyleDb().RegisterDataSource( + constant.CONF_DB, + mt.Table.ConfDb.GetById(0).GetHost(), + mt.Table.ConfDb.GetById(0).GetPort(), + mt.Table.ConfDb.GetById(0).GetUser(), + mt.Table.ConfDb.GetById(0).GetPasswd(), + mt.Table.ConfDb.GetById(0).GetDatabase(), + 1, + mt.Table.ConfDb.GetById(0).GetMaxOpenConns(), + mt.Table.ConfDb.GetById(0).GetMaxIdleConns()) } func (this *app) HasTask() bool { diff --git a/server/apigate/constant/constant.go b/server/apigate/constant/constant.go index c8140346..e281935b 100644 --- a/server/apigate/constant/constant.go +++ b/server/apigate/constant/constant.go @@ -1,5 +1,9 @@ package constant +const ( + CONF_DB = "confdb" +) + const ( APP_MODULE_IDX = iota ROUTER_MODULE_IDX diff --git a/server/apigate/middleware/caforward.go b/server/apigate/middleware/caforward.go index 5edba1a3..36bf9e81 100644 --- a/server/apigate/middleware/caforward.go +++ b/server/apigate/middleware/caforward.go @@ -4,10 +4,9 @@ import ( "bytes" "errors" "f5" - "io/ioutil" + "io" "jccommon" "main/service" - "main/mt" "net/http" net_url "net/url" "q5" @@ -18,37 +17,47 @@ import ( ) /* - 转发规则 - 如果c是OutApp开头则不需要对account_id和session_id参数校验 - 其他的需要校验 +转发规则 +如果c是OutApp开头则不需要对account_id和session_id参数校验 +其他的需要校验 - OutApp不需要限制并发数!!! - */ +OutApp不需要限制并发数!!! +*/ func CaForward(c *gin.Context) { accountId := c.DefaultQuery("account_id", "") sessionId := c.DefaultQuery("session_id", "") - if !jccommon.IsValidSessionId(accountId, sessionId) { + var needlimit bool = !strings.HasPrefix(c.DefaultQuery("c", ""), "OutApp") + if needlimit && !jccommon.IsValidSessionId(accountId, sessionId) { f5.RspErr(c, 500, "invalid session_id") c.Abort() service.SApiForward.IncInvalidSessionTimes() return } - cLock := service.SApiForward.AcquireLock(accountId) - if cLock == nil { - f5.RspErr(c, 500, "system busy") - c.Abort() - return + if needlimit { + f5.GetSysLog().Debug("need limit check") + cLock := service.SApiForward.AcquireLock(accountId) + if cLock == nil { + f5.RspErr(c, 500, "system busy") + c.Abort() + return + } + defer func() { + f5.GetSysLog().Debug("need limit defer call") + service.SApiForward.ReleaseLock(cLock) + }() + } else { + f5.GetSysLog().Debug("no need limit check") } - defer service.SApiForward.ReleaseLock(cLock) service.SApiForward.IncTotalTimes() beginTick := q5.GetTickCount() defer func() { costTime := q5.GetTickCount() - beginTick service.SApiForward.UpdateCostTime(costTime) }() - newUrl := mt.Table.Config.GetById(0).GetRedirectUrl() + c.Request.URL.Path[5:] + downStreamUrl, downStreamHost := service.SApiForward.GetDownStreamHost() + newUrl := downStreamUrl + c.Request.URL.Path[5:] if !q5.StrContains(newUrl, "?") { newUrl = newUrl + "?" } @@ -110,10 +119,13 @@ func CaForward(c *gin.Context) { f5.GetSysLog().Info("CaForward create request url:%s error:%s", newUrl, createErr) return } + if downStreamHost != "" { + httpRequest.Host = downStreamHost + } client := &http.Client{} if resp, err := client.Do(httpRequest); err == nil { defer resp.Body.Close() - if bytes, err := ioutil.ReadAll(resp.Body); err == nil { + if bytes, err := io.ReadAll(resp.Body); err == nil { service.SApiForward.IncOkTimes() c.String(200, string(bytes)) c.Abort() diff --git a/server/apigate/mt/ApigateCluster.go b/server/apigate/mt/ApigateCluster.go new file mode 100644 index 00000000..d23e3d68 --- /dev/null +++ b/server/apigate/mt/ApigateCluster.go @@ -0,0 +1,34 @@ +package mt + +import ( + "f5" + "main/mtb" +) + +type ApigateCluster struct { + mtb.ApigateCluster +} + +type ApigateClusterTable struct { + f5.IdMetaTable[ApigateCluster] + selfConf *ApigateCluster +} + +func (this *ApigateCluster) Init1() { + +} + +func (this *ApigateClusterTable) GetListenPort() int32 { + return this.selfConf.GetListenPort() +} + +func (this *ApigateClusterTable) GetHttpListenPort() int32 { + return this.selfConf.GetHttpListenPort() +} + +func (this *ApigateClusterTable) PostInit1() { + this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId())) + if this.selfConf == nil { + panic("apigate集群无法读取本服配置") + } +} diff --git a/server/apigate/mt/ConfDb.go b/server/apigate/mt/ConfDb.go new file mode 100644 index 00000000..ab948bf7 --- /dev/null +++ b/server/apigate/mt/ConfDb.go @@ -0,0 +1,15 @@ +package mt + +import ( + "f5" + "main/mtb" +) + +type ConfDb struct { + mtb.ConfDb +} + +type ConfDbTable struct { + f5.IdMetaTable[ConfDb] + selfConf *ConfDb +} diff --git a/server/apigate/mt/Config.go b/server/apigate/mt/Config.go index 97c41d11..da0e4b0e 100644 --- a/server/apigate/mt/Config.go +++ b/server/apigate/mt/Config.go @@ -3,10 +3,12 @@ package mt import ( "f5" "main/mtb" + "net/url" ) type Config struct { mtb.Config + redirectHost string } type ConfigTable struct { @@ -14,10 +16,26 @@ type ConfigTable struct { selfConf *Config } +func (this *Config) Init1() { + u, err := url.Parse(this.GetRedirectUrl()) + if err != nil { + panic(err) + } + this.redirectHost = u.Host +} + func (this *ConfigTable) GetMaxConcurrentNum() int32 { return this.selfConf.GetMaxConcurrentNum() } +func (this *ConfigTable) GetRedirectUrl() string { + return this.selfConf.GetRedirectUrl() +} + +func (this *ConfigTable) GetRedirectHost() string{ + return this.selfConf.redirectHost +} + func (this *ConfigTable) PostInit1() { this.selfConf = this.GetById(int64(0)) if this.selfConf == nil { diff --git a/server/apigate/mt/GamesapiCluster.go b/server/apigate/mt/GamesapiCluster.go deleted file mode 100644 index d815dad0..00000000 --- a/server/apigate/mt/GamesapiCluster.go +++ /dev/null @@ -1,34 +0,0 @@ -package mt - -import ( - "f5" - "main/mtb" -) - -type GamesapiCluster struct { - mtb.GamesapiCluster -} - -type GamesapiClusterTable struct { - f5.IdMetaTable[GamesapiCluster] - selfConf *GamesapiCluster -} - -func (this *GamesapiCluster) Init1() { - -} - -func (this *GamesapiClusterTable) GetListenPort() int32 { - return this.selfConf.GetListenPort() -} - -func (this *GamesapiClusterTable) GetHttpListenPort() int32 { - return this.selfConf.GetHttpListenPort() -} - -func (this *GamesapiClusterTable) PostInit1() { - this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId())) - if this.selfConf == nil { - panic("gamesapi集群无法读取本服配置") - } -} diff --git a/server/apigate/mt/export.go b/server/apigate/mt/export.go index 36b98790..df9a5f58 100644 --- a/server/apigate/mt/export.go +++ b/server/apigate/mt/export.go @@ -5,13 +5,14 @@ import ( ) type table struct { - GamesapiCluster *GamesapiClusterTable - Config *ConfigTable + ApigateCluster *ApigateClusterTable + Config *ConfigTable + ConfDb *ConfDbTable } var Table = f5.New(func(this *table) { - this.GamesapiCluster = f5.New(func(this *GamesapiClusterTable) { - this.FileName = "../config/gamesapi.cluster.json" + this.ApigateCluster = f5.New(func(this *ApigateClusterTable) { + this.FileName = "../config/apigate.cluster.json" this.PrimKey = "instance_id" }) @@ -19,4 +20,9 @@ var Table = f5.New(func(this *table) { this.FileName = "../config/config.json" this.PrimKey = "" }) + + this.ConfDb = f5.New(func(this *ConfDbTable) { + this.FileName = "../config/confdb.mysql.json" + this.PrimKey = "" + }) }) diff --git a/server/apigate/mtb/mtb.auto_gen.go b/server/apigate/mtb/mtb.auto_gen.go index 62393f6e..717775f6 100644 --- a/server/apigate/mtb/mtb.auto_gen.go +++ b/server/apigate/mtb/mtb.auto_gen.go @@ -4,7 +4,7 @@ import ( "f5" ) -type GamesapiCluster struct { +type ApigateCluster struct { instance_id int32 listen_port int32 http_listen_port int32 @@ -22,27 +22,40 @@ type Config struct { _flags2_ uint64 } -func (this *GamesapiCluster) GetInstanceId() int32 { +type ConfDb struct { + host string + port int32 + user string + passwd string + database string + max_open_conns int32 + max_idle_conns int32 + + _flags1_ uint64 + _flags2_ uint64 +} + +func (this *ApigateCluster) GetInstanceId() int32 { return this.instance_id } -func (this *GamesapiCluster) HasInstanceId() bool { +func (this *ApigateCluster) HasInstanceId() bool { return (this._flags1_ & (uint64(1) << 1)) > 0 } -func (this *GamesapiCluster) GetListenPort() int32 { +func (this *ApigateCluster) GetListenPort() int32 { return this.listen_port } -func (this *GamesapiCluster) HasListenPort() bool { +func (this *ApigateCluster) HasListenPort() bool { return (this._flags1_ & (uint64(1) << 2)) > 0 } -func (this *GamesapiCluster) GetHttpListenPort() int32 { +func (this *ApigateCluster) GetHttpListenPort() int32 { return this.http_listen_port } -func (this *GamesapiCluster) HasHttpListenPort() bool { +func (this *ApigateCluster) HasHttpListenPort() bool { return (this._flags1_ & (uint64(1) << 3)) > 0 } @@ -70,8 +83,64 @@ func (this *Config) HasRequestOverTime() bool { return (this._flags1_ & (uint64(1) << 3)) > 0 } +func (this *ConfDb) GetHost() string { + return this.host +} -func (this *GamesapiCluster) LoadFromKv(kv map[string]interface{}) { +func (this *ConfDb) HasHost() bool { + return (this._flags1_ & (uint64(1) << 1)) > 0 +} + +func (this *ConfDb) GetPort() int32 { + return this.port +} + +func (this *ConfDb) HasPort() bool { + return (this._flags1_ & (uint64(1) << 2)) > 0 +} + +func (this *ConfDb) GetUser() string { + return this.user +} + +func (this *ConfDb) HasUser() bool { + return (this._flags1_ & (uint64(1) << 3)) > 0 +} + +func (this *ConfDb) GetPasswd() string { + return this.passwd +} + +func (this *ConfDb) HasPasswd() bool { + return (this._flags1_ & (uint64(1) << 4)) > 0 +} + +func (this *ConfDb) GetDatabase() string { + return this.database +} + +func (this *ConfDb) HasDatabase() bool { + return (this._flags1_ & (uint64(1) << 5)) > 0 +} + +func (this *ConfDb) GetMaxOpenConns() int32 { + return this.max_open_conns +} + +func (this *ConfDb) HasMaxOpenConns() bool { + return (this._flags1_ & (uint64(1) << 6)) > 0 +} + +func (this *ConfDb) GetMaxIdleConns() int32 { + return this.max_idle_conns +} + +func (this *ConfDb) HasMaxIdleConns() bool { + return (this._flags1_ & (uint64(1) << 7)) > 0 +} + + +func (this *ApigateCluster) LoadFromKv(kv map[string]interface{}) { f5.ReadMetaTableField(&this.instance_id, "instance_id", &this._flags1_, 1, kv) f5.ReadMetaTableField(&this.listen_port, "listen_port", &this._flags1_, 2, kv) f5.ReadMetaTableField(&this.http_listen_port, "http_listen_port", &this._flags1_, 3, kv) @@ -82,3 +151,13 @@ func (this *Config) LoadFromKv(kv map[string]interface{}) { f5.ReadMetaTableField(&this.max_concurrent_num, "max_concurrent_num", &this._flags1_, 2, kv) f5.ReadMetaTableField(&this.request_over_time, "request_over_time", &this._flags1_, 3, kv) } + +func (this *ConfDb) LoadFromKv(kv map[string]interface{}) { + f5.ReadMetaTableField(&this.host, "host", &this._flags1_, 1, kv) + f5.ReadMetaTableField(&this.port, "port", &this._flags1_, 2, kv) + f5.ReadMetaTableField(&this.user, "user", &this._flags1_, 3, kv) + f5.ReadMetaTableField(&this.passwd, "passwd", &this._flags1_, 4, kv) + f5.ReadMetaTableField(&this.database, "database", &this._flags1_, 5, kv) + f5.ReadMetaTableField(&this.max_open_conns, "max_open_conns", &this._flags1_, 6, kv) + f5.ReadMetaTableField(&this.max_idle_conns, "max_idle_conns", &this._flags1_, 7, kv) +} diff --git a/server/apigate/proto/mt.proto b/server/apigate/proto/mt.proto index b9a74a9f..2621ffe9 100644 --- a/server/apigate/proto/mt.proto +++ b/server/apigate/proto/mt.proto @@ -2,7 +2,7 @@ package mt; option go_package = ".;mt"; -message GamesapiCluster +message ApigateCluster { optional int32 instance_id = 1; optional int32 listen_port = 2; @@ -15,3 +15,14 @@ message Config optional int32 max_concurrent_num = 2; optional int32 request_over_time = 3; } + +message ConfDb +{ + optional string host = 1; + optional int32 port = 2; + optional string user = 3; + optional string passwd = 4; + optional string database = 5; + optional int32 max_open_conns = 6; + optional int32 max_idle_conns = 7; +} diff --git a/server/apigate/service/sapi_forward.go b/server/apigate/service/sapi_forward.go index b598d345..7b1f5f1c 100644 --- a/server/apigate/service/sapi_forward.go +++ b/server/apigate/service/sapi_forward.go @@ -1,16 +1,26 @@ package service import ( + "apigate/constant" "f5" + "fmt" "main/mt" + "math/rand" "q5" "sync" "sync/atomic" "time" ) +type downStreamHost struct { + Host string `json:"host"` + Port int32 `json:"port"` + Url string `json:"url"` +} + type sApiForward struct { userCache []*SApiForwardLockCache + downStreams []downStreamHost insessTimes int32 total int32 getTimes int32 @@ -35,6 +45,7 @@ type SApiForwardLock struct { } func (this *sApiForward) init() { + q5.NewSlice(&this.downStreams, 0, 20) q5.NewSlice(&this.userCache, 1024, 1024) for i := 0; i < len(this.userCache); i++ { p := new(SApiForwardLockCache) @@ -42,6 +53,11 @@ func (this *sApiForward) init() { p.userHash = &map[string]*SApiForwardLock{} this.userCache[i] = p } + this.LoadDownStreams() + go func() { + time.Sleep(time.Second * 60 * 10) + this.LoadDownStreams() + }() } func (this *sApiForward) unInit() { @@ -162,3 +178,41 @@ func (this *sApiForward) outputMonitorLog() { time.Sleep(time.Second * 10) } } + +func (this *sApiForward) LoadDownStreams() error { + err, ds := f5.GetGoStyleDb().NewOrmSelect( + constant.CONF_DB, + "t_apigate_host", + [][]string{}) + if err == nil { + downStreams := []downStreamHost{} + q5.NewSlice(&downStreams, 0, 20) + for ds.Next() { + host := ds.GetByName("apigate_host") + port := q5.ToInt32(ds.GetByName("apigate_port")) + enable := q5.ToInt32(ds.GetByName("enable")) + if enable != 0 { + downSteam := q5.NewSliceElement(&downStreams) + downSteam.Host = host + downSteam.Port = port + downSteam.Url = fmt.Sprintf("http://%s:%d", downSteam.Host, downSteam.Port) + } + } + this.downStreams = downStreams + f5.GetSysLog().Info("LoadDownstreams ok %s", q5.EncodeJson(&downStreams)) + } else { + f5.GetSysLog().Info("LoadDownstreams err %s", err) + } + return err +} + + + +func (this *sApiForward) GetDownStreamHost() (string, string) { + downStreams := this.downStreams + if len(downStreams) <= 0 { + return mt.Table.Config.GetRedirectUrl(), "" + } + downStream := downStreams[rand.Intn(len(downStreams))] + return downStream.Url, mt.Table.Config.GetRedirectHost() +} diff --git a/server/gamesapi/service/sapi_forward.go b/server/gamesapi/service/sapi_forward.go index db4044ca..49eab1d1 100644 --- a/server/gamesapi/service/sapi_forward.go +++ b/server/gamesapi/service/sapi_forward.go @@ -2,14 +2,14 @@ package service import ( "f5" - "main/mt" + "fmt" "main/constant" + "main/mt" + "math/rand" "q5" "sync" "sync/atomic" "time" - "math/rand" - "fmt" ) type downStreamHost struct { @@ -194,14 +194,14 @@ func (this *sApiForward) outputMonitorLog() { func (this *sApiForward) LoadDownStreams() error { err, ds := f5.GetGoStyleDb().NewOrmSelect( constant.CONF_DB, - "t_apigate_host", + "t_internal_gameapi_host", [][]string{}) if err == nil { downStreams := []downStreamHost{} q5.NewSlice(&downStreams, 0, 20) for ds.Next() { - host := ds.GetByName("apigate_host") - port := q5.ToInt32(ds.GetByName("apigate_port")) + host := ds.GetByName("gameapi_host") + port := q5.ToInt32(ds.GetByName("gameapi_port")) enable := q5.ToInt32(ds.GetByName("enable")) if enable != 0 { downSteam := q5.NewSliceElement(&downStreams)