This commit is contained in:
aozhiwei 2020-12-16 16:02:44 +08:00
parent 5704178ac6
commit 5cdaaf350c
4 changed files with 14 additions and 257 deletions

View File

@ -16,13 +16,13 @@ func (this *App_) Init() {
this.App_.Init()
G.MetaMgr = new(MetaMgr).Init()
G.HttpServer = new(f5.HttpServer).Init("httpserver", 1000 * 60)
G.OrderMgr = new(OrderMgr).Init()
G.UserMgr = new(UserMgr).Init()
G.HttpServer.Start(G.MetaMgr.GetServer(1).GetListenPort());
}
func (this *App_) UnInit() {
G.OrderMgr.UnInit()
G.UserMgr.UnInit()
G.HttpServer.UnInit()
G.MetaMgr.UnInit()
this.App_.UnInit()

View File

@ -6,7 +6,7 @@ import (
type GlobalVar struct {
MetaMgr *MetaMgr
OrderMgr *OrderMgr
UserMgr *UserMgr
HttpServer *f5.HttpServer
}

View File

@ -1,274 +1,31 @@
package main
import (
"sync"
"sync/atomic"
"net/http"
"database/sql"
"q5"
"f5"
)
type OrderInfo struct {
dbInstanceId int32
idx int64
accountId string
orderId string
roleId string
serverId int32
channel int32
polySdkChannel int32
unifiedChannel int32
tryCount int32
price int32
createTime int64
type UserMgr struct {
}
func (this *OrderInfo) ScanFromRows(rows *sql.Rows) (error) {
err := rows.Scan(
&this.idx,
&this.accountId,
&this.orderId,
&this.roleId,
&this.serverId,
&this.channel,
&this.polySdkChannel,
&this.unifiedChannel,
&this.tryCount,
&this.price,
&this.createTime)
return err
}
func (this *OrderInfo) ScanFromRow(row *sql.Row) (error) {
err := row.Scan(
&this.idx,
&this.accountId,
&this.orderId,
&this.roleId,
&this.serverId,
&this.channel,
&this.polySdkChannel,
&this.unifiedChannel,
&this.tryCount,
&this.price,
&this.createTime)
return err
}
type OrderMgr struct {
fetchEventAtomic int64
mysqlClusterMutex sync.RWMutex
dbIdxHash sync.Map
mysqlCluster []*MtwMysqlConf
gameHash map[int64]*MtwGameConf
orderMutex sync.RWMutex
orderList []*OrderInfo
orderHash map[string]*OrderInfo
}
func (this *OrderMgr) Init() *OrderMgr {
this.orderList = make([]*OrderInfo, 0)
this.orderHash = make(map[string]*OrderInfo)
f5.Timer().AddSimpleRepeatTimer(1000 * 3,
func (params* q5.XParams) {
if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 {
atomic.AddInt64(&this.fetchEventAtomic, 1)
go this.FetchEventGo()
} else {
f5.SysLog().Warning("FetchEvent last pending")
}
})
G.HttpServer.RegisterHandle("OrderMgr", "syncOrder",
func (w* http.ResponseWriter, r *http.Request) {
G.OrderMgr.syncOrder()
q5.ResponseOk(w)
})
G.HttpServer.RegisterHandle("OrderMgr", "reissueOrder",
func (w* http.ResponseWriter, r *http.Request) {
G.OrderMgr.reissueOrder(w, r)
})
f5.App.RegisterIMMsgHandle(IM_SYNC_ORDER,
func (msgId int16, params *q5.XParams) {
f5.Timer().AddSimpleDeadLineTimer(1000,
func (params* q5.XParams) {
if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 {
atomic.AddInt64(&this.fetchEventAtomic, 1)
go this.FetchEventGo()
}
})
})
func (this *UserMgr) Init() *UserMgr {
G.HttpServer.RegisterRestHandle("/user/login", this._userLogin)
G.HttpServer.RegisterRestHandle("/user/logout", this._userLogout)
G.HttpServer.RegisterRestHandle("/user/info", this._userInfo)
return this
}
func (this *OrderMgr) UnInit() {
func (this *UserMgr) UnInit() {
}
func (this *OrderMgr) FetchEventGo() {
defer atomic.AddInt64(&this.fetchEventAtomic, -1)
func (this *UserMgr) _userLogin(w *http.ResponseWriter, r *http.Request) {
for i := 0; i < this.getMysqlClusterSize(); i++ {
dbConf := this.getMysqlCluster(i)
if dbConf == nil {
continue
}
conn := q5.NewMysql(
dbConf.GetHost(),
dbConf.GetPort(),
dbConf.GetUser(),
dbConf.GetPasswd(),
dbConf.GetDatabase())
defer conn.Close()
if err := conn.Open(); err != nil {
f5.SysLog().Warning(
"FetchEventGo can't connnect %s error:%s",
conn,
err)
return
}
if !this.FetchEventOneDB(dbConf, conn) {
f5.SysLog().Warning("FetchEventOneDB error")
return
}
}
}
func (this *OrderMgr) getMysqlClusterSize() int {
this.mysqlClusterMutex.Lock()
defer this.mysqlClusterMutex.Unlock()
return len(this.mysqlCluster)
func (this *UserMgr) _userInfo(w *http.ResponseWriter, r *http.Request) {
}
func (this *OrderMgr) getMysqlCluster(index int) *MtwMysqlConf {
this.mysqlClusterMutex.Lock()
defer this.mysqlClusterMutex.Unlock()
if index >= 0 && index < len(this.mysqlCluster) {
return this.mysqlCluster[index]
} else {
return nil
}
}
func (this *UserMgr) _userLogout(w *http.ResponseWriter, r *http.Request) {
func (this *OrderMgr) GetDBIdx(instanceId int32) int64 {
if val, ok := this.dbIdxHash.Load(instanceId); ok {
return val.(int64)
} else {
return 0
}
}
func (this *OrderMgr) SetDBIdx(instanceId int32, idx int64) {
this.dbIdxHash.Store(instanceId, idx)
}
func (this *OrderMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool {
lastIdx := this.GetDBIdx(conf.GetInstanceId())
for true {
rows, err := conn.Query(this.GenSelect() +
"FROM `orderinfo` " +
"WHERE idx > ? AND sp_pay_result = 1 AND status = 0 " +
"LIMIT 0, 1000;",
lastIdx)
if err != nil {
f5.SysLog().Warning(
"FetchEventOneDB queryError %s error:%s",
conn,
err)
return false
}
defer rows.Close()
hasData := false
for rows.Next() {
hasData = true
orderInfo := new(OrderInfo)
orderInfo.ScanFromRows(rows)
this.AddOrder(conf.GetInstanceId(), orderInfo)
if orderInfo.idx > lastIdx {
lastIdx = orderInfo.idx
}
}
if !hasData {
break
}
}//end for
this.SetDBIdx(conf.GetInstanceId(), lastIdx)
return true
}
func (this *OrderMgr) syncOrder() {
if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 {
atomic.AddInt64(&this.fetchEventAtomic, 1)
go this.FetchEventGo()
} else {
f5.App.AddIMMsg(IM_SYNC_ORDER, new(q5.XParams))
}
}
func (this *OrderMgr) reissueOrder(w* http.ResponseWriter, r *http.Request) {
orderId := q5.Request(r, "order_id").GetString()
dbIndex := int(q5.Crc32(orderId)) % this.getMysqlClusterSize()
dbConf := this.getMysqlCluster(dbIndex)
if dbConf == nil {
q5.ResponseErr(w, 1, "获取数据库配置失败")
return
}
conn := q5.NewMysql(
dbConf.GetHost(),
dbConf.GetPort(),
dbConf.GetUser(),
dbConf.GetPasswd(),
dbConf.GetDatabase())
defer conn.Close()
if err := conn.Open(); err != nil {
q5.ResponseErr(w, 2, "数据库连接失败")
return
}
{
row := conn.QueryRow(this.GenSelect() +
"FROM `orderinfo` " +
"WHERE orderid = '?';",
orderId)
orderInfo := new(OrderInfo)
if err := orderInfo.ScanFromRow(row); err != nil {
q5.ResponseErr(w, 3, "订单不存在")
return
}
if _, err := conn.Exec("UPDATE `orderinfo` SET status = 0 WHERE orderid='?';", orderId);
err != nil {
q5.ResponseErr(w, 4, "更新订单状态失败")
return
}
this.AddOrder(dbConf.GetInstanceId(), orderInfo)
this.PromoteToTop(orderInfo.orderId)
q5.ResponseOk(w)
}
}
func (this *OrderMgr) AddOrder(
dbInstanceId int32,
orderInfo *OrderInfo) {
this.orderMutex.Lock()
defer this.orderMutex.Unlock()
if _, ok := this.orderHash[orderInfo.orderId]; ok {
panic("orderId已经存在" + orderInfo.orderId)
}
orderInfo.dbInstanceId = dbInstanceId
this.orderList = append(this.orderList, orderInfo)
this.orderHash[orderInfo.orderId] = orderInfo
}
func (this *OrderMgr) GenSelect() string {
selectSql := "SELECT idx, account_id, orderid, roleid, server_id, " +
"channel, poly_sdk_channel, unified_channel, try_count, price, createtime "
return selectSql
}
func (this *OrderMgr) PromoteToTop(orderId string) {
this.orderMutex.Lock()
defer this.orderMutex.Unlock()
if _, ok := this.orderHash[orderId]; ok {
}
}

2
third_party/f5 vendored

@ -1 +1 @@
Subproject commit 8f0700f501db978d79167ec246c4aedb36e99b8a
Subproject commit 60518a8f3ccb28de37ba2b386fd36923b085b965