This commit is contained in:
aozhiwei 2020-12-14 19:46:45 +08:00
parent 2d53acb2b8
commit 045641ada2
6 changed files with 184 additions and 35 deletions

View File

@ -16,11 +16,13 @@ func (this *App_) Init() {
this.App_.Init()
G.MetaMgr = new(MetaMgr).Init()
G.HttpServer = new(f5.HttpServer).Init("httpserver", 1000 * 60)
G.OrderMgr = new(OrderMgr).Init()
G.HttpServer.Start(G.MetaMgr.GetServer(1).GetListenPort());
}
func (this *App_) UnInit() {
G.OrderMgr.UnInit()
G.HttpServer.UnInit()
G.MetaMgr.UnInit()
this.App_.UnInit()

View File

@ -1,3 +1,7 @@
package main
const SESSION_KEY = "f3a6a9a5-217a-4079-ab99-b5d69b8212be"
const (
IM_SYNC_ORDER = 100
)

View File

@ -6,6 +6,7 @@ import (
type GlobalVar struct {
MetaMgr *MetaMgr
OrderMgr *OrderMgr
HttpServer *f5.HttpServer
}

View File

@ -145,6 +145,7 @@ type MysqlConf struct {
Port *int32 `protobuf:"varint,3,opt,name=port" json:"port,omitempty"`
User *string `protobuf:"bytes,4,opt,name=user" json:"user,omitempty"`
Passwd *string `protobuf:"bytes,5,opt,name=passwd" json:"passwd,omitempty"`
Database *string `protobuf:"bytes,6,opt,name=database" json:"database,omitempty"`
}
func (x *MysqlConf) Reset() {
@ -214,6 +215,13 @@ func (x *MysqlConf) GetPasswd() string {
return ""
}
func (x *MysqlConf) GetDatabase() string {
if x != nil && x.Database != nil {
return *x.Database
}
return ""
}
type MysqlConfMetas struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -382,24 +390,25 @@ var file_mt_proto_rawDesc = []byte{
0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x22, 0x31, 0x0a, 0x0f, 0x53, 0x65, 0x72,
0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1e, 0x0a, 0x06,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6d,
0x74, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x5a, 0x0a, 0x09,
0x74, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x6c, 0x0a, 0x09,
0x4d, 0x79, 0x73, 0x71, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x12, 0x13, 0x0a, 0x0b, 0x69, 0x6e, 0x73,
0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x12, 0x0c,
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x12, 0x0c, 0x0a, 0x04,
0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x12, 0x0c, 0x0a, 0x04, 0x75, 0x73,
0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x12, 0x0e, 0x0a, 0x06, 0x70, 0x61, 0x73, 0x73,
0x77, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x22, 0x2f, 0x0a, 0x0e, 0x4d, 0x79, 0x73, 0x71,
0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1d, 0x0a, 0x06, 0x76, 0x61,
0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x74, 0x2e,
0x4d, 0x79, 0x73, 0x71, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x22, 0x3f, 0x0a, 0x08, 0x47, 0x61, 0x6d,
0x65, 0x43, 0x6f, 0x6e, 0x66, 0x12, 0x0e, 0x0a, 0x06, 0x67, 0x61, 0x6d, 0x65, 0x69, 0x64, 0x18,
0x01, 0x20, 0x01, 0x28, 0x05, 0x12, 0x0f, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c,
0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x12, 0x12, 0x0a, 0x0a, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x79,
0x5f, 0x75, 0x72, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x22, 0x2d, 0x0a, 0x0d, 0x47, 0x61,
0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1c, 0x0a, 0x06, 0x76,
0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x6d, 0x74,
0x2e, 0x47, 0x61, 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x6d,
0x74,
0x77, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x12, 0x10, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61,
0x62, 0x61, 0x73, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x22, 0x2f, 0x0a, 0x0e, 0x4d, 0x79,
0x73, 0x71, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1d, 0x0a, 0x06,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d,
0x74, 0x2e, 0x4d, 0x79, 0x73, 0x71, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x22, 0x3f, 0x0a, 0x08, 0x47,
0x61, 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x12, 0x0e, 0x0a, 0x06, 0x67, 0x61, 0x6d, 0x65, 0x69,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x12, 0x0f, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e,
0x65, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x12, 0x12, 0x0a, 0x0a, 0x6e, 0x6f, 0x74, 0x69,
0x66, 0x79, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x22, 0x2d, 0x0a, 0x0d,
0x47, 0x61, 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1c, 0x0a,
0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e,
0x6d, 0x74, 0x2e, 0x47, 0x61, 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x42, 0x06, 0x5a, 0x04, 0x2e,
0x3b, 0x6d, 0x74,
}
var (

View File

@ -3,49 +3,91 @@ package main
import (
"sync"
"sync/atomic"
"net/http"
"q5"
"f5"
)
type GameMgr struct {
type OrderInfo struct {
dbInstanceId int32
idx int64
accountId string
orderId string
roleId string
serverId int32
channel int32
polySdkChannel int32
unifiedChannel int32
tryCount int32
price int32
createTime int64
}
type OrderMgr struct {
fetchEventAtomic int64
mysqlClusterMutex sync.RWMutex
dbIdxHash sync.Map
mysqlCluster []*MtwMysqlConf
gameHash map[int64]*MtwGameConf
orderMutex sync.RWMutex
orderList []*OrderInfo
orderHash map[string]*OrderInfo
}
func (this *GameMgr) Init() *GameMgr {
f5.Timer().AddSimpleRepeatTimer(1000 * 10,
func (this *OrderMgr) Init() *OrderMgr {
this.orderList = make([]*OrderInfo, 0)
this.orderHash = make(map[string]*OrderInfo)
f5.Timer().AddSimpleRepeatTimer(1000 * 3,
func (params* q5.XParams) {
if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 {
atomic.AddInt64(&this.fetchEventAtomic, 1)
go this.FetchEventGo()
} else {
f5.SysLog().Warning("FetchEvent last pending")
}
})
G.HttpServer.RegisterHandle("OrderMgr", "syncOrder",
func (w* http.ResponseWriter, r *http.Request) {
G.OrderMgr.syncOrder()
q5.ResponseOk(w)
})
G.HttpServer.RegisterHandle("OrderMgr", "reissueOrder",
func (w* http.ResponseWriter, r *http.Request) {
G.OrderMgr.reissueOrder(w, r)
})
f5.App.RegisterIMMsgHandle(IM_SYNC_ORDER,
func (msgId int16, params *q5.XParams) {
f5.Timer().AddSimpleDeadLineTimer(1000,
func (params* q5.XParams) {
if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 {
atomic.AddInt64(&this.fetchEventAtomic, 1)
go this.FetchEventGo()
}
})
})
return this
}
func (this *GameMgr) UnInit() {
func (this *OrderMgr) UnInit() {
}
func (this *GameMgr) FetchEventGo() {
atomic.AddInt64(&this.fetchEventAtomic, 1)
func (this *OrderMgr) FetchEventGo() {
defer atomic.AddInt64(&this.fetchEventAtomic, -1)
for i := 0; i < this.getMysqlClusterSize(); i++ {
conf := this.getMysqlCluster(i)
if conf == nil {
dbConf := this.getMysqlCluster(i)
if dbConf == nil {
continue
}
conn := q5.NewMysql(
conf.GetHost(),
conf.GetPort(),
conf.GetUser(),
conf.GetPasswd(),
"")
dbConf.GetHost(),
dbConf.GetPort(),
dbConf.GetUser(),
dbConf.GetPasswd(),
dbConf.GetDatabase())
defer conn.Close()
if err := conn.Open(); err != nil {
f5.SysLog().Warning(
@ -54,21 +96,20 @@ func (this *GameMgr) FetchEventGo() {
err)
return
}
if !this.FetchEventOneDB(conf, conn) {
if !this.FetchEventOneDB(dbConf, conn) {
f5.SysLog().Warning("FetchEventOneDB error")
return
}
}
}
func (this *GameMgr) getMysqlClusterSize() int {
func (this *OrderMgr) getMysqlClusterSize() int {
this.mysqlClusterMutex.Lock()
defer this.mysqlClusterMutex.Unlock()
return len(this.mysqlCluster)
}
func (this *GameMgr) getMysqlCluster(index int) *MtwMysqlConf {
func (this *OrderMgr) getMysqlCluster(index int) *MtwMysqlConf {
this.mysqlClusterMutex.Lock()
defer this.mysqlClusterMutex.Unlock()
if index >= 0 && index < len(this.mysqlCluster) {
@ -78,11 +119,23 @@ func (this *GameMgr) getMysqlCluster(index int) *MtwMysqlConf {
}
}
func (this *GameMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool {
var lastIdx int64 = 0
func (this *OrderMgr) GetDBIdx(instanceId int32) int64 {
if val, ok := this.dbIdxHash.Load(instanceId); ok {
return val.(int64)
} else {
return 0
}
}
func (this *OrderMgr) SetDBIdx(instanceId int32, idx int64) {
this.dbIdxHash.Store(instanceId, idx)
}
func (this *OrderMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool {
lastIdx := this.GetDBIdx(conf.GetInstanceId())
for true {
rows, err := conn.Query("SELECT idx, account_id, orderid, roleid, server_id, " +
" channel, poly_sdk_channel, unified_channel, try_count, price " +
" channel, poly_sdk_channel, unified_channel, try_count, price, createtime " +
"FROM `orderinfo` " +
"WHERE idx > ? AND sp_pay_result = 1 AND status = 0 " +
"LIMIT 0, 1000;",
@ -107,9 +160,23 @@ func (this *GameMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool {
var unifiedChannel int32
var tryCount int32
var price int32
var createTime int64
hasData = true
rows.Scan(&idx, &accountId, &orderId, &roleId, &serverId,
&channel, &polySdkChannel, &unifiedChannel, &tryCount, &price)
rows.Scan(&idx, &accountId, &orderId, &roleId, &serverId, &channel,
&polySdkChannel, &unifiedChannel, &tryCount, &price, createTime)
this.AddOrder(
conf.GetInstanceId(),
idx,
accountId,
orderId,
roleId,
serverId,
channel,
polySdkChannel,
unifiedChannel,
tryCount,
price,
createTime)
if idx > lastIdx {
lastIdx = idx
}
@ -118,5 +185,70 @@ func (this *GameMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool {
break
}
}//end for
this.SetDBIdx(conf.GetInstanceId(), lastIdx)
return true
}
func (this *OrderMgr) syncOrder() {
if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 {
atomic.AddInt64(&this.fetchEventAtomic, 1)
go this.FetchEventGo()
} else {
f5.App.AddIMMsg(IM_SYNC_ORDER, new(q5.XParams))
}
}
func (this *OrderMgr) reissueOrder(w* http.ResponseWriter, r *http.Request) {
orderId := q5.Request(r, "order_id").GetString()
dbIndex := int(q5.Crc32(orderId)) % this.getMysqlClusterSize()
dbConf := this.getMysqlCluster(dbIndex)
if dbConf == nil {
q5.ResponseErr(w, 1, "获取数据库配置失败")
return
}
conn := q5.NewMysql(
dbConf.GetHost(),
dbConf.GetPort(),
dbConf.GetUser(),
dbConf.GetPasswd(),
dbConf.GetDatabase())
defer conn.Close()
if err := conn.Open(); err != nil {
q5.ResponseErr(w, 2, "数据库连接失败")
return
}
}
func (this *OrderMgr) AddOrder(
dbInstanceId int32,
idx int64,
accountId string,
orderId string,
roleId string,
serverId int32,
channel int32,
polySdkChannel int32,
unifiedChannel int32,
tryCount int32,
price int32,
createTime int64) {
this.orderMutex.Lock()
defer this.orderMutex.Unlock()
if _, ok := this.orderHash[orderId]; ok {
panic("orderId已经存在" + orderId)
}
orderInfo := new(OrderInfo)
orderInfo.dbInstanceId = dbInstanceId
orderInfo.idx = idx
orderInfo.accountId = accountId
orderInfo.roleId = roleId
orderInfo.serverId = serverId
orderInfo.channel = channel
orderInfo.polySdkChannel = polySdkChannel
orderInfo.unifiedChannel = unifiedChannel
orderInfo.tryCount = tryCount
orderInfo.price = price
orderInfo.createTime = createTime
this.orderList = append(this.orderList, orderInfo)
this.orderHash[orderId] = orderInfo
}

View File

@ -21,6 +21,7 @@ message MysqlConf
optional int32 port = 3;
optional string user = 4;
optional string passwd = 5;
optional string database = 6;
}
message MysqlConfMetas