This commit is contained in:
yangduo 2024-09-02 11:58:21 +08:00
parent 75100ea988
commit 94e634fd9b
12 changed files with 280 additions and 70 deletions

View File

@ -3,6 +3,7 @@ package app
import ( import (
"f5" "f5"
//. "main/global" //. "main/global"
"main/constant"
"main/mt" "main/mt"
) )
@ -16,7 +17,7 @@ func (this *app) GetPkgName() string {
} }
func (this *app) GetHttpListenPort() int32 { func (this *app) GetHttpListenPort() int32 {
return mt.Table.GamesapiCluster.GetHttpListenPort() return mt.Table.ApigateCluster.GetHttpListenPort()
} }
func (this *app) Run(initCb func(), unInitCb func()) { func (this *app) Run(initCb func(), unInitCb func()) {
@ -39,6 +40,16 @@ func (this *app) Update() {
} }
func (this *app) registerDataSources() { func (this *app) registerDataSources() {
f5.GetGoStyleDb().RegisterDataSource(
constant.CONF_DB,
mt.Table.ConfDb.GetById(0).GetHost(),
mt.Table.ConfDb.GetById(0).GetPort(),
mt.Table.ConfDb.GetById(0).GetUser(),
mt.Table.ConfDb.GetById(0).GetPasswd(),
mt.Table.ConfDb.GetById(0).GetDatabase(),
1,
mt.Table.ConfDb.GetById(0).GetMaxOpenConns(),
mt.Table.ConfDb.GetById(0).GetMaxIdleConns())
} }
func (this *app) HasTask() bool { func (this *app) HasTask() bool {

View File

@ -1,5 +1,9 @@
package constant package constant
const (
CONF_DB = "confdb"
)
const ( const (
APP_MODULE_IDX = iota APP_MODULE_IDX = iota
ROUTER_MODULE_IDX ROUTER_MODULE_IDX

View File

@ -4,10 +4,9 @@ import (
"bytes" "bytes"
"errors" "errors"
"f5" "f5"
"io/ioutil" "io"
"jccommon" "jccommon"
"main/service" "main/service"
"main/mt"
"net/http" "net/http"
net_url "net/url" net_url "net/url"
"q5" "q5"
@ -28,27 +27,37 @@ func CaForward(c *gin.Context) {
accountId := c.DefaultQuery("account_id", "") accountId := c.DefaultQuery("account_id", "")
sessionId := c.DefaultQuery("session_id", "") sessionId := c.DefaultQuery("session_id", "")
if !jccommon.IsValidSessionId(accountId, sessionId) { var needlimit bool = !strings.HasPrefix(c.DefaultQuery("c", ""), "OutApp")
if needlimit && !jccommon.IsValidSessionId(accountId, sessionId) {
f5.RspErr(c, 500, "invalid session_id") f5.RspErr(c, 500, "invalid session_id")
c.Abort() c.Abort()
service.SApiForward.IncInvalidSessionTimes() service.SApiForward.IncInvalidSessionTimes()
return return
} }
if needlimit {
f5.GetSysLog().Debug("need limit check")
cLock := service.SApiForward.AcquireLock(accountId) cLock := service.SApiForward.AcquireLock(accountId)
if cLock == nil { if cLock == nil {
f5.RspErr(c, 500, "system busy") f5.RspErr(c, 500, "system busy")
c.Abort() c.Abort()
return return
} }
defer service.SApiForward.ReleaseLock(cLock) defer func() {
f5.GetSysLog().Debug("need limit defer call")
service.SApiForward.ReleaseLock(cLock)
}()
} else {
f5.GetSysLog().Debug("no need limit check")
}
service.SApiForward.IncTotalTimes() service.SApiForward.IncTotalTimes()
beginTick := q5.GetTickCount() beginTick := q5.GetTickCount()
defer func() { defer func() {
costTime := q5.GetTickCount() - beginTick costTime := q5.GetTickCount() - beginTick
service.SApiForward.UpdateCostTime(costTime) service.SApiForward.UpdateCostTime(costTime)
}() }()
newUrl := mt.Table.Config.GetById(0).GetRedirectUrl() + c.Request.URL.Path[5:] downStreamUrl, downStreamHost := service.SApiForward.GetDownStreamHost()
newUrl := downStreamUrl + c.Request.URL.Path[5:]
if !q5.StrContains(newUrl, "?") { if !q5.StrContains(newUrl, "?") {
newUrl = newUrl + "?" newUrl = newUrl + "?"
} }
@ -110,10 +119,13 @@ func CaForward(c *gin.Context) {
f5.GetSysLog().Info("CaForward create request url:%s error:%s", newUrl, createErr) f5.GetSysLog().Info("CaForward create request url:%s error:%s", newUrl, createErr)
return return
} }
if downStreamHost != "" {
httpRequest.Host = downStreamHost
}
client := &http.Client{} client := &http.Client{}
if resp, err := client.Do(httpRequest); err == nil { if resp, err := client.Do(httpRequest); err == nil {
defer resp.Body.Close() defer resp.Body.Close()
if bytes, err := ioutil.ReadAll(resp.Body); err == nil { if bytes, err := io.ReadAll(resp.Body); err == nil {
service.SApiForward.IncOkTimes() service.SApiForward.IncOkTimes()
c.String(200, string(bytes)) c.String(200, string(bytes))
c.Abort() c.Abort()

View File

@ -0,0 +1,34 @@
package mt
import (
"f5"
"main/mtb"
)
type ApigateCluster struct {
mtb.ApigateCluster
}
type ApigateClusterTable struct {
f5.IdMetaTable[ApigateCluster]
selfConf *ApigateCluster
}
func (this *ApigateCluster) Init1() {
}
func (this *ApigateClusterTable) GetListenPort() int32 {
return this.selfConf.GetListenPort()
}
func (this *ApigateClusterTable) GetHttpListenPort() int32 {
return this.selfConf.GetHttpListenPort()
}
func (this *ApigateClusterTable) PostInit1() {
this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId()))
if this.selfConf == nil {
panic("apigate集群无法读取本服配置")
}
}

View File

@ -0,0 +1,15 @@
package mt
import (
"f5"
"main/mtb"
)
type ConfDb struct {
mtb.ConfDb
}
type ConfDbTable struct {
f5.IdMetaTable[ConfDb]
selfConf *ConfDb
}

View File

@ -3,10 +3,12 @@ package mt
import ( import (
"f5" "f5"
"main/mtb" "main/mtb"
"net/url"
) )
type Config struct { type Config struct {
mtb.Config mtb.Config
redirectHost string
} }
type ConfigTable struct { type ConfigTable struct {
@ -14,10 +16,26 @@ type ConfigTable struct {
selfConf *Config selfConf *Config
} }
func (this *Config) Init1() {
u, err := url.Parse(this.GetRedirectUrl())
if err != nil {
panic(err)
}
this.redirectHost = u.Host
}
func (this *ConfigTable) GetMaxConcurrentNum() int32 { func (this *ConfigTable) GetMaxConcurrentNum() int32 {
return this.selfConf.GetMaxConcurrentNum() return this.selfConf.GetMaxConcurrentNum()
} }
func (this *ConfigTable) GetRedirectUrl() string {
return this.selfConf.GetRedirectUrl()
}
func (this *ConfigTable) GetRedirectHost() string{
return this.selfConf.redirectHost
}
func (this *ConfigTable) PostInit1() { func (this *ConfigTable) PostInit1() {
this.selfConf = this.GetById(int64(0)) this.selfConf = this.GetById(int64(0))
if this.selfConf == nil { if this.selfConf == nil {

View File

@ -1,34 +0,0 @@
package mt
import (
"f5"
"main/mtb"
)
type GamesapiCluster struct {
mtb.GamesapiCluster
}
type GamesapiClusterTable struct {
f5.IdMetaTable[GamesapiCluster]
selfConf *GamesapiCluster
}
func (this *GamesapiCluster) Init1() {
}
func (this *GamesapiClusterTable) GetListenPort() int32 {
return this.selfConf.GetListenPort()
}
func (this *GamesapiClusterTable) GetHttpListenPort() int32 {
return this.selfConf.GetHttpListenPort()
}
func (this *GamesapiClusterTable) PostInit1() {
this.selfConf = this.GetById(int64(f5.GetApp().GetInstanceId()))
if this.selfConf == nil {
panic("gamesapi集群无法读取本服配置")
}
}

View File

@ -5,13 +5,14 @@ import (
) )
type table struct { type table struct {
GamesapiCluster *GamesapiClusterTable ApigateCluster *ApigateClusterTable
Config *ConfigTable Config *ConfigTable
ConfDb *ConfDbTable
} }
var Table = f5.New(func(this *table) { var Table = f5.New(func(this *table) {
this.GamesapiCluster = f5.New(func(this *GamesapiClusterTable) { this.ApigateCluster = f5.New(func(this *ApigateClusterTable) {
this.FileName = "../config/gamesapi.cluster.json" this.FileName = "../config/apigate.cluster.json"
this.PrimKey = "instance_id" this.PrimKey = "instance_id"
}) })
@ -19,4 +20,9 @@ var Table = f5.New(func(this *table) {
this.FileName = "../config/config.json" this.FileName = "../config/config.json"
this.PrimKey = "" this.PrimKey = ""
}) })
this.ConfDb = f5.New(func(this *ConfDbTable) {
this.FileName = "../config/confdb.mysql.json"
this.PrimKey = ""
})
}) })

View File

@ -4,7 +4,7 @@ import (
"f5" "f5"
) )
type GamesapiCluster struct { type ApigateCluster struct {
instance_id int32 instance_id int32
listen_port int32 listen_port int32
http_listen_port int32 http_listen_port int32
@ -22,27 +22,40 @@ type Config struct {
_flags2_ uint64 _flags2_ uint64
} }
func (this *GamesapiCluster) GetInstanceId() int32 { type ConfDb struct {
host string
port int32
user string
passwd string
database string
max_open_conns int32
max_idle_conns int32
_flags1_ uint64
_flags2_ uint64
}
func (this *ApigateCluster) GetInstanceId() int32 {
return this.instance_id return this.instance_id
} }
func (this *GamesapiCluster) HasInstanceId() bool { func (this *ApigateCluster) HasInstanceId() bool {
return (this._flags1_ & (uint64(1) << 1)) > 0 return (this._flags1_ & (uint64(1) << 1)) > 0
} }
func (this *GamesapiCluster) GetListenPort() int32 { func (this *ApigateCluster) GetListenPort() int32 {
return this.listen_port return this.listen_port
} }
func (this *GamesapiCluster) HasListenPort() bool { func (this *ApigateCluster) HasListenPort() bool {
return (this._flags1_ & (uint64(1) << 2)) > 0 return (this._flags1_ & (uint64(1) << 2)) > 0
} }
func (this *GamesapiCluster) GetHttpListenPort() int32 { func (this *ApigateCluster) GetHttpListenPort() int32 {
return this.http_listen_port return this.http_listen_port
} }
func (this *GamesapiCluster) HasHttpListenPort() bool { func (this *ApigateCluster) HasHttpListenPort() bool {
return (this._flags1_ & (uint64(1) << 3)) > 0 return (this._flags1_ & (uint64(1) << 3)) > 0
} }
@ -70,8 +83,64 @@ func (this *Config) HasRequestOverTime() bool {
return (this._flags1_ & (uint64(1) << 3)) > 0 return (this._flags1_ & (uint64(1) << 3)) > 0
} }
func (this *ConfDb) GetHost() string {
return this.host
}
func (this *GamesapiCluster) LoadFromKv(kv map[string]interface{}) { func (this *ConfDb) HasHost() bool {
return (this._flags1_ & (uint64(1) << 1)) > 0
}
func (this *ConfDb) GetPort() int32 {
return this.port
}
func (this *ConfDb) HasPort() bool {
return (this._flags1_ & (uint64(1) << 2)) > 0
}
func (this *ConfDb) GetUser() string {
return this.user
}
func (this *ConfDb) HasUser() bool {
return (this._flags1_ & (uint64(1) << 3)) > 0
}
func (this *ConfDb) GetPasswd() string {
return this.passwd
}
func (this *ConfDb) HasPasswd() bool {
return (this._flags1_ & (uint64(1) << 4)) > 0
}
func (this *ConfDb) GetDatabase() string {
return this.database
}
func (this *ConfDb) HasDatabase() bool {
return (this._flags1_ & (uint64(1) << 5)) > 0
}
func (this *ConfDb) GetMaxOpenConns() int32 {
return this.max_open_conns
}
func (this *ConfDb) HasMaxOpenConns() bool {
return (this._flags1_ & (uint64(1) << 6)) > 0
}
func (this *ConfDb) GetMaxIdleConns() int32 {
return this.max_idle_conns
}
func (this *ConfDb) HasMaxIdleConns() bool {
return (this._flags1_ & (uint64(1) << 7)) > 0
}
func (this *ApigateCluster) LoadFromKv(kv map[string]interface{}) {
f5.ReadMetaTableField(&this.instance_id, "instance_id", &this._flags1_, 1, kv) f5.ReadMetaTableField(&this.instance_id, "instance_id", &this._flags1_, 1, kv)
f5.ReadMetaTableField(&this.listen_port, "listen_port", &this._flags1_, 2, kv) f5.ReadMetaTableField(&this.listen_port, "listen_port", &this._flags1_, 2, kv)
f5.ReadMetaTableField(&this.http_listen_port, "http_listen_port", &this._flags1_, 3, kv) f5.ReadMetaTableField(&this.http_listen_port, "http_listen_port", &this._flags1_, 3, kv)
@ -82,3 +151,13 @@ func (this *Config) LoadFromKv(kv map[string]interface{}) {
f5.ReadMetaTableField(&this.max_concurrent_num, "max_concurrent_num", &this._flags1_, 2, kv) f5.ReadMetaTableField(&this.max_concurrent_num, "max_concurrent_num", &this._flags1_, 2, kv)
f5.ReadMetaTableField(&this.request_over_time, "request_over_time", &this._flags1_, 3, kv) f5.ReadMetaTableField(&this.request_over_time, "request_over_time", &this._flags1_, 3, kv)
} }
func (this *ConfDb) LoadFromKv(kv map[string]interface{}) {
f5.ReadMetaTableField(&this.host, "host", &this._flags1_, 1, kv)
f5.ReadMetaTableField(&this.port, "port", &this._flags1_, 2, kv)
f5.ReadMetaTableField(&this.user, "user", &this._flags1_, 3, kv)
f5.ReadMetaTableField(&this.passwd, "passwd", &this._flags1_, 4, kv)
f5.ReadMetaTableField(&this.database, "database", &this._flags1_, 5, kv)
f5.ReadMetaTableField(&this.max_open_conns, "max_open_conns", &this._flags1_, 6, kv)
f5.ReadMetaTableField(&this.max_idle_conns, "max_idle_conns", &this._flags1_, 7, kv)
}

View File

@ -2,7 +2,7 @@ package mt;
option go_package = ".;mt"; option go_package = ".;mt";
message GamesapiCluster message ApigateCluster
{ {
optional int32 instance_id = 1; optional int32 instance_id = 1;
optional int32 listen_port = 2; optional int32 listen_port = 2;
@ -15,3 +15,14 @@ message Config
optional int32 max_concurrent_num = 2; optional int32 max_concurrent_num = 2;
optional int32 request_over_time = 3; optional int32 request_over_time = 3;
} }
message ConfDb
{
optional string host = 1;
optional int32 port = 2;
optional string user = 3;
optional string passwd = 4;
optional string database = 5;
optional int32 max_open_conns = 6;
optional int32 max_idle_conns = 7;
}

View File

@ -1,16 +1,26 @@
package service package service
import ( import (
"apigate/constant"
"f5" "f5"
"fmt"
"main/mt" "main/mt"
"math/rand"
"q5" "q5"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
) )
type downStreamHost struct {
Host string `json:"host"`
Port int32 `json:"port"`
Url string `json:"url"`
}
type sApiForward struct { type sApiForward struct {
userCache []*SApiForwardLockCache userCache []*SApiForwardLockCache
downStreams []downStreamHost
insessTimes int32 insessTimes int32
total int32 total int32
getTimes int32 getTimes int32
@ -35,6 +45,7 @@ type SApiForwardLock struct {
} }
func (this *sApiForward) init() { func (this *sApiForward) init() {
q5.NewSlice(&this.downStreams, 0, 20)
q5.NewSlice(&this.userCache, 1024, 1024) q5.NewSlice(&this.userCache, 1024, 1024)
for i := 0; i < len(this.userCache); i++ { for i := 0; i < len(this.userCache); i++ {
p := new(SApiForwardLockCache) p := new(SApiForwardLockCache)
@ -42,6 +53,11 @@ func (this *sApiForward) init() {
p.userHash = &map[string]*SApiForwardLock{} p.userHash = &map[string]*SApiForwardLock{}
this.userCache[i] = p this.userCache[i] = p
} }
this.LoadDownStreams()
go func() {
time.Sleep(time.Second * 60 * 10)
this.LoadDownStreams()
}()
} }
func (this *sApiForward) unInit() { func (this *sApiForward) unInit() {
@ -162,3 +178,41 @@ func (this *sApiForward) outputMonitorLog() {
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
} }
} }
func (this *sApiForward) LoadDownStreams() error {
err, ds := f5.GetGoStyleDb().NewOrmSelect(
constant.CONF_DB,
"t_apigate_host",
[][]string{})
if err == nil {
downStreams := []downStreamHost{}
q5.NewSlice(&downStreams, 0, 20)
for ds.Next() {
host := ds.GetByName("apigate_host")
port := q5.ToInt32(ds.GetByName("apigate_port"))
enable := q5.ToInt32(ds.GetByName("enable"))
if enable != 0 {
downSteam := q5.NewSliceElement(&downStreams)
downSteam.Host = host
downSteam.Port = port
downSteam.Url = fmt.Sprintf("http://%s:%d", downSteam.Host, downSteam.Port)
}
}
this.downStreams = downStreams
f5.GetSysLog().Info("LoadDownstreams ok %s", q5.EncodeJson(&downStreams))
} else {
f5.GetSysLog().Info("LoadDownstreams err %s", err)
}
return err
}
func (this *sApiForward) GetDownStreamHost() (string, string) {
downStreams := this.downStreams
if len(downStreams) <= 0 {
return mt.Table.Config.GetRedirectUrl(), ""
}
downStream := downStreams[rand.Intn(len(downStreams))]
return downStream.Url, mt.Table.Config.GetRedirectHost()
}

View File

@ -2,14 +2,14 @@ package service
import ( import (
"f5" "f5"
"main/mt" "fmt"
"main/constant" "main/constant"
"main/mt"
"math/rand"
"q5" "q5"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"math/rand"
"fmt"
) )
type downStreamHost struct { type downStreamHost struct {
@ -194,14 +194,14 @@ func (this *sApiForward) outputMonitorLog() {
func (this *sApiForward) LoadDownStreams() error { func (this *sApiForward) LoadDownStreams() error {
err, ds := f5.GetGoStyleDb().NewOrmSelect( err, ds := f5.GetGoStyleDb().NewOrmSelect(
constant.CONF_DB, constant.CONF_DB,
"t_apigate_host", "t_internal_gameapi_host",
[][]string{}) [][]string{})
if err == nil { if err == nil {
downStreams := []downStreamHost{} downStreams := []downStreamHost{}
q5.NewSlice(&downStreams, 0, 20) q5.NewSlice(&downStreams, 0, 20)
for ds.Next() { for ds.Next() {
host := ds.GetByName("apigate_host") host := ds.GetByName("gameapi_host")
port := q5.ToInt32(ds.GetByName("apigate_port")) port := q5.ToInt32(ds.GetByName("gameapi_port"))
enable := q5.ToInt32(ds.GetByName("enable")) enable := q5.ToInt32(ds.GetByName("enable"))
if enable != 0 { if enable != 0 {
downSteam := q5.NewSliceElement(&downStreams) downSteam := q5.NewSliceElement(&downStreams)