228 lines
5.3 KiB
Go
228 lines
5.3 KiB
Go
package service
|
|
|
|
import (
|
|
"f5"
|
|
"main/mt"
|
|
"main/constant"
|
|
"q5"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"math/rand"
|
|
"fmt"
|
|
)
|
|
|
|
type downStreamHost struct {
|
|
host string
|
|
port int32
|
|
url string
|
|
}
|
|
|
|
type sApiForward struct {
|
|
userCache []*SApiForwardLockCache
|
|
downStreams []*downStreamHost
|
|
insessTimes int32
|
|
total int32
|
|
getTimes int32
|
|
postTimes int32
|
|
createErrTimes int32
|
|
doErrTimes int32
|
|
okTimes int32
|
|
readRspErrTimes int32
|
|
refuseTimes int32
|
|
maxCostTime int64
|
|
}
|
|
|
|
type SApiForwardLockCache struct {
|
|
lock *sync.Mutex
|
|
userHash *map[string]*SApiForwardLock
|
|
}
|
|
|
|
type SApiForwardLock struct {
|
|
accountId string
|
|
lockTimes int32
|
|
lock *sync.Mutex
|
|
}
|
|
|
|
func (this *sApiForward) init() {
|
|
q5.NewSlice(&this.downStreams, 0, 20)
|
|
q5.NewSlice(&this.userCache, 1024, 1024)
|
|
for i := 0; i < len(this.userCache); i++ {
|
|
p := new(SApiForwardLockCache)
|
|
p.lock = new(sync.Mutex)
|
|
p.userHash = &map[string]*SApiForwardLock{}
|
|
this.userCache[i] = p
|
|
}
|
|
this.LoadDownStreams()
|
|
}
|
|
|
|
func (this *sApiForward) unInit() {
|
|
}
|
|
|
|
func (this *sApiForward) AcquireLock(accountId string) *SApiForwardLock {
|
|
crc32 := q5.Crc32(accountId)
|
|
c := this.userCache[int64(crc32)%int64(len(this.userCache))]
|
|
u := this.getOrCreate(c, accountId)
|
|
if atomic.AddInt32(&u.lockTimes, 1) > mt.Table.Config.GetMaxConcurrentNum() {
|
|
atomic.AddInt32(&u.lockTimes, -1)
|
|
this.IncRefuseTimes()
|
|
return nil
|
|
}
|
|
u.lock.Lock()
|
|
return u
|
|
}
|
|
|
|
func (this *sApiForward) ReleaseLock(l *SApiForwardLock) {
|
|
l.lock.Unlock()
|
|
if atomic.AddInt32(&l.lockTimes, -1) < 1 {
|
|
crc32 := q5.Crc32(l.accountId)
|
|
c := this.userCache[int64(crc32)%int64(len(this.userCache))]
|
|
delete(*c.userHash, l.accountId)
|
|
}
|
|
}
|
|
|
|
func (this *sApiForward) IncInvalidSessionTimes() {
|
|
atomic.AddInt32(&this.insessTimes, 1)
|
|
}
|
|
|
|
func (this *sApiForward) IncTotalTimes() {
|
|
atomic.AddInt32(&this.total, 1)
|
|
}
|
|
|
|
func (this *sApiForward) IncGetTimes() {
|
|
atomic.AddInt32(&this.getTimes, 1)
|
|
}
|
|
|
|
func (this *sApiForward) IncPostTimes() {
|
|
atomic.AddInt32(&this.postTimes, 1)
|
|
}
|
|
|
|
func (this *sApiForward) IncCreateErrTimes() {
|
|
atomic.AddInt32(&this.createErrTimes, 1)
|
|
}
|
|
|
|
func (this *sApiForward) IncDoErrTimes() {
|
|
atomic.AddInt32(&this.doErrTimes, 1)
|
|
|
|
}
|
|
|
|
func (this *sApiForward) IncOkTimes() {
|
|
atomic.AddInt32(&this.okTimes, 1)
|
|
|
|
}
|
|
|
|
func (this *sApiForward) IncReadRspErrTimes() {
|
|
atomic.AddInt32(&this.readRspErrTimes, 1)
|
|
|
|
}
|
|
|
|
func (this *sApiForward) IncRefuseTimes() {
|
|
atomic.AddInt32(&this.refuseTimes, 1)
|
|
}
|
|
|
|
func (this *sApiForward) UpdateCostTime(costTime int64) {
|
|
if this.maxCostTime < costTime {
|
|
this.maxCostTime = costTime
|
|
}
|
|
}
|
|
|
|
func (this *sApiForward) getOrCreate(c *SApiForwardLockCache, accountId string) *SApiForwardLock {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
if u, ok := (*c.userHash)[accountId]; ok {
|
|
return u
|
|
} else {
|
|
u = new(SApiForwardLock)
|
|
u.accountId = accountId
|
|
u.lock = new(sync.Mutex)
|
|
(*c.userHash)[accountId] = u
|
|
return u
|
|
}
|
|
}
|
|
|
|
func (this *sApiForward) Sign(params []*[]string, nonce string, timeStamp int64, postData string) string {
|
|
signData := ""
|
|
q5.Sort(params, func(a *[]string, b *[]string) bool {
|
|
return (*a)[0] < (*b)[0]
|
|
})
|
|
for _, v := range params {
|
|
signData += (*v)[0] + "=" + (*v)[1] + "&"
|
|
}
|
|
signData += nonce + q5.ToString(timeStamp) + postData + mt.Table.Config.GetRedirectSecretKey()
|
|
return q5.Md5Str(signData)
|
|
}
|
|
|
|
func (this *sApiForward) outputMonitorLog() {
|
|
logtimes := 0
|
|
for {
|
|
f5.GetSysLog().Info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
|
|
f5.GetSysLog().Info("total:%d, invalid_session:%d, get:%d,post:%d, create_error:%d, do_error:%d, ok:%d, read_rsp_err:%d, refuse:%d, max_cost_time:%d",
|
|
this.total,
|
|
this.insessTimes,
|
|
this.getTimes,
|
|
this.postTimes,
|
|
this.createErrTimes,
|
|
this.doErrTimes,
|
|
this.okTimes,
|
|
this.readRspErrTimes,
|
|
this.refuseTimes,
|
|
this.maxCostTime)
|
|
f5.GetSysLog().Info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
|
|
|
logtimes++
|
|
if logtimes > 6 {
|
|
logtimes = 0
|
|
this.insessTimes = 0
|
|
this.total = 0
|
|
this.getTimes = 0
|
|
this.postTimes = 0
|
|
this.createErrTimes = 0
|
|
this.doErrTimes = 0
|
|
this.okTimes = 0
|
|
this.readRspErrTimes = 0
|
|
this.refuseTimes = 0
|
|
this.maxCostTime = 0
|
|
}
|
|
time.Sleep(time.Second * 10)
|
|
}
|
|
}
|
|
|
|
func (this *sApiForward) LoadDownStreams() error {
|
|
err, ds := f5.GetGoStyleDb().NewOrmSelect(
|
|
constant.CONF_DB,
|
|
"t_apigate_host",
|
|
[][]string{})
|
|
if err == nil {
|
|
downStreams := []*downStreamHost{}
|
|
q5.NewSlice(&downStreams, 0, 20)
|
|
for ds.Next() {
|
|
host := ds.GetByName("apigate_host")
|
|
port := q5.ToInt32(ds.GetByName("apigate_port"))
|
|
enable := q5.ToInt32(ds.GetByName("enable"))
|
|
if enable != 0 {
|
|
downSteam := *q5.NewSliceElement(&downStreams)
|
|
downSteam.host = host
|
|
downSteam.port = port
|
|
downSteam.url = fmt.Sprintf("http://%s:%d", downSteam.host, downSteam.port)
|
|
}
|
|
}
|
|
if len(downStreams) > 0 {
|
|
this.downStreams = downStreams
|
|
}
|
|
f5.GetSysLog().Info("LoadDownstreams ok %s", q5.EncodeJson(&downStreams))
|
|
} else {
|
|
f5.GetSysLog().Info("LoadDownstreams err %s", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (this *sApiForward) GetDownStreamHost() (string, string) {
|
|
downStreams := this.downStreams
|
|
if len(downStreams) <= 0 {
|
|
newUrl := mt.Table.Config.GetById(0).GetRedirectUrl()
|
|
return newUrl, ""
|
|
}
|
|
downStream := downStreams[rand.Intn(len(downStreams))]
|
|
return downStream.url, downStream.host
|
|
}
|