diff --git a/server/gamepay_backend/app.go b/server/gamepay_backend/app.go index 8a06d3c..38c7988 100644 --- a/server/gamepay_backend/app.go +++ b/server/gamepay_backend/app.go @@ -16,11 +16,13 @@ func (this *App_) Init() { this.App_.Init() G.MetaMgr = new(MetaMgr).Init() G.HttpServer = new(f5.HttpServer).Init("httpserver", 1000 * 60) + G.OrderMgr = new(OrderMgr).Init() G.HttpServer.Start(G.MetaMgr.GetServer(1).GetListenPort()); } func (this *App_) UnInit() { + G.OrderMgr.UnInit() G.HttpServer.UnInit() G.MetaMgr.UnInit() this.App_.UnInit() diff --git a/server/gamepay_backend/constant.go b/server/gamepay_backend/constant.go index 2617d5e..25b7436 100644 --- a/server/gamepay_backend/constant.go +++ b/server/gamepay_backend/constant.go @@ -1,3 +1,7 @@ package main const SESSION_KEY = "f3a6a9a5-217a-4079-ab99-b5d69b8212be" + +const ( + IM_SYNC_ORDER = 100 +) diff --git a/server/gamepay_backend/g.go b/server/gamepay_backend/g.go index 3448b61..0b688f2 100644 --- a/server/gamepay_backend/g.go +++ b/server/gamepay_backend/g.go @@ -6,6 +6,7 @@ import ( type GlobalVar struct { MetaMgr *MetaMgr + OrderMgr *OrderMgr HttpServer *f5.HttpServer } diff --git a/server/gamepay_backend/mt/mt.pb.go b/server/gamepay_backend/mt/mt.pb.go index 121f0f3..07ffdac 100644 --- a/server/gamepay_backend/mt/mt.pb.go +++ b/server/gamepay_backend/mt/mt.pb.go @@ -145,6 +145,7 @@ type MysqlConf struct { Port *int32 `protobuf:"varint,3,opt,name=port" json:"port,omitempty"` User *string `protobuf:"bytes,4,opt,name=user" json:"user,omitempty"` Passwd *string `protobuf:"bytes,5,opt,name=passwd" json:"passwd,omitempty"` + Database *string `protobuf:"bytes,6,opt,name=database" json:"database,omitempty"` } func (x *MysqlConf) Reset() { @@ -214,6 +215,13 @@ func (x *MysqlConf) GetPasswd() string { return "" } +func (x *MysqlConf) GetDatabase() string { + if x != nil && x.Database != nil { + return *x.Database + } + return "" +} + type MysqlConfMetas struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -382,24 +390,25 @@ var file_mt_proto_rawDesc = []byte{ 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x22, 0x31, 0x0a, 0x0f, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1e, 0x0a, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6d, - 0x74, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x5a, 0x0a, 0x09, + 0x74, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x6c, 0x0a, 0x09, 0x4d, 0x79, 0x73, 0x71, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x12, 0x13, 0x0a, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x12, 0x0c, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x12, 0x0c, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x12, 0x0c, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x12, 0x0e, 0x0a, 0x06, 0x70, 0x61, 0x73, 0x73, - 0x77, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x22, 0x2f, 0x0a, 0x0e, 0x4d, 0x79, 0x73, 0x71, - 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1d, 0x0a, 0x06, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x74, 0x2e, - 0x4d, 0x79, 0x73, 0x71, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x22, 0x3f, 0x0a, 0x08, 0x47, 0x61, 0x6d, - 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x12, 0x0e, 0x0a, 0x06, 0x67, 0x61, 0x6d, 0x65, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x05, 0x12, 0x0f, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x12, 0x12, 0x0a, 0x0a, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x79, - 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x22, 0x2d, 0x0a, 0x0d, 0x47, 0x61, - 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1c, 0x0a, 0x06, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x6d, 0x74, - 0x2e, 0x47, 0x61, 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x6d, - 0x74, + 0x77, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x12, 0x10, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, + 0x62, 0x61, 0x73, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x22, 0x2f, 0x0a, 0x0e, 0x4d, 0x79, + 0x73, 0x71, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1d, 0x0a, 0x06, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, + 0x74, 0x2e, 0x4d, 0x79, 0x73, 0x71, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x22, 0x3f, 0x0a, 0x08, 0x47, + 0x61, 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x12, 0x0e, 0x0a, 0x06, 0x67, 0x61, 0x6d, 0x65, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x12, 0x0f, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, + 0x65, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x12, 0x12, 0x0a, 0x0a, 0x6e, 0x6f, 0x74, 0x69, + 0x66, 0x79, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x22, 0x2d, 0x0a, 0x0d, + 0x47, 0x61, 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x12, 0x1c, 0x0a, + 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, + 0x6d, 0x74, 0x2e, 0x47, 0x61, 0x6d, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x42, 0x06, 0x5a, 0x04, 0x2e, + 0x3b, 0x6d, 0x74, } var ( diff --git a/server/gamepay_backend/ordermgr.go b/server/gamepay_backend/ordermgr.go index d8cd377..c7ce014 100644 --- a/server/gamepay_backend/ordermgr.go +++ b/server/gamepay_backend/ordermgr.go @@ -3,49 +3,91 @@ package main import ( "sync" "sync/atomic" + "net/http" "q5" "f5" ) -type GameMgr struct { +type OrderInfo struct { + dbInstanceId int32 + idx int64 + accountId string + orderId string + roleId string + serverId int32 + channel int32 + polySdkChannel int32 + unifiedChannel int32 + tryCount int32 + price int32 + createTime int64 +} + +type OrderMgr struct { fetchEventAtomic int64 + mysqlClusterMutex sync.RWMutex dbIdxHash sync.Map mysqlCluster []*MtwMysqlConf gameHash map[int64]*MtwGameConf + + orderMutex sync.RWMutex + orderList []*OrderInfo + orderHash map[string]*OrderInfo } -func (this *GameMgr) Init() *GameMgr { - f5.Timer().AddSimpleRepeatTimer(1000 * 10, +func (this *OrderMgr) Init() *OrderMgr { + this.orderList = make([]*OrderInfo, 0) + this.orderHash = make(map[string]*OrderInfo) + f5.Timer().AddSimpleRepeatTimer(1000 * 3, func (params* q5.XParams) { if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 { + atomic.AddInt64(&this.fetchEventAtomic, 1) go this.FetchEventGo() } else { f5.SysLog().Warning("FetchEvent last pending") } }) + G.HttpServer.RegisterHandle("OrderMgr", "syncOrder", + func (w* http.ResponseWriter, r *http.Request) { + G.OrderMgr.syncOrder() + q5.ResponseOk(w) + }) + G.HttpServer.RegisterHandle("OrderMgr", "reissueOrder", + func (w* http.ResponseWriter, r *http.Request) { + G.OrderMgr.reissueOrder(w, r) + }) + f5.App.RegisterIMMsgHandle(IM_SYNC_ORDER, + func (msgId int16, params *q5.XParams) { + f5.Timer().AddSimpleDeadLineTimer(1000, + func (params* q5.XParams) { + if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 { + atomic.AddInt64(&this.fetchEventAtomic, 1) + go this.FetchEventGo() + } + }) + }) return this } -func (this *GameMgr) UnInit() { +func (this *OrderMgr) UnInit() { } -func (this *GameMgr) FetchEventGo() { - atomic.AddInt64(&this.fetchEventAtomic, 1) +func (this *OrderMgr) FetchEventGo() { defer atomic.AddInt64(&this.fetchEventAtomic, -1) for i := 0; i < this.getMysqlClusterSize(); i++ { - conf := this.getMysqlCluster(i) - if conf == nil { + dbConf := this.getMysqlCluster(i) + if dbConf == nil { continue } conn := q5.NewMysql( - conf.GetHost(), - conf.GetPort(), - conf.GetUser(), - conf.GetPasswd(), - "") + dbConf.GetHost(), + dbConf.GetPort(), + dbConf.GetUser(), + dbConf.GetPasswd(), + dbConf.GetDatabase()) defer conn.Close() if err := conn.Open(); err != nil { f5.SysLog().Warning( @@ -54,21 +96,20 @@ func (this *GameMgr) FetchEventGo() { err) return } - if !this.FetchEventOneDB(conf, conn) { + if !this.FetchEventOneDB(dbConf, conn) { f5.SysLog().Warning("FetchEventOneDB error") return } } } - -func (this *GameMgr) getMysqlClusterSize() int { +func (this *OrderMgr) getMysqlClusterSize() int { this.mysqlClusterMutex.Lock() defer this.mysqlClusterMutex.Unlock() return len(this.mysqlCluster) } -func (this *GameMgr) getMysqlCluster(index int) *MtwMysqlConf { +func (this *OrderMgr) getMysqlCluster(index int) *MtwMysqlConf { this.mysqlClusterMutex.Lock() defer this.mysqlClusterMutex.Unlock() if index >= 0 && index < len(this.mysqlCluster) { @@ -78,11 +119,23 @@ func (this *GameMgr) getMysqlCluster(index int) *MtwMysqlConf { } } -func (this *GameMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool { - var lastIdx int64 = 0 +func (this *OrderMgr) GetDBIdx(instanceId int32) int64 { + if val, ok := this.dbIdxHash.Load(instanceId); ok { + return val.(int64) + } else { + return 0 + } +} + +func (this *OrderMgr) SetDBIdx(instanceId int32, idx int64) { + this.dbIdxHash.Store(instanceId, idx) +} + +func (this *OrderMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool { + lastIdx := this.GetDBIdx(conf.GetInstanceId()) for true { rows, err := conn.Query("SELECT idx, account_id, orderid, roleid, server_id, " + - " channel, poly_sdk_channel, unified_channel, try_count, price " + + " channel, poly_sdk_channel, unified_channel, try_count, price, createtime " + "FROM `orderinfo` " + "WHERE idx > ? AND sp_pay_result = 1 AND status = 0 " + "LIMIT 0, 1000;", @@ -107,9 +160,23 @@ func (this *GameMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool { var unifiedChannel int32 var tryCount int32 var price int32 + var createTime int64 hasData = true - rows.Scan(&idx, &accountId, &orderId, &roleId, &serverId, - &channel, &polySdkChannel, &unifiedChannel, &tryCount, &price) + rows.Scan(&idx, &accountId, &orderId, &roleId, &serverId, &channel, + &polySdkChannel, &unifiedChannel, &tryCount, &price, createTime) + this.AddOrder( + conf.GetInstanceId(), + idx, + accountId, + orderId, + roleId, + serverId, + channel, + polySdkChannel, + unifiedChannel, + tryCount, + price, + createTime) if idx > lastIdx { lastIdx = idx } @@ -118,5 +185,70 @@ func (this *GameMgr) FetchEventOneDB(conf *MtwMysqlConf, conn *q5.Mysql) bool { break } }//end for + this.SetDBIdx(conf.GetInstanceId(), lastIdx) return true } + +func (this *OrderMgr) syncOrder() { + if atomic.LoadInt64(&this.fetchEventAtomic) <= 0 { + atomic.AddInt64(&this.fetchEventAtomic, 1) + go this.FetchEventGo() + } else { + f5.App.AddIMMsg(IM_SYNC_ORDER, new(q5.XParams)) + } +} + +func (this *OrderMgr) reissueOrder(w* http.ResponseWriter, r *http.Request) { + orderId := q5.Request(r, "order_id").GetString() + dbIndex := int(q5.Crc32(orderId)) % this.getMysqlClusterSize() + dbConf := this.getMysqlCluster(dbIndex) + if dbConf == nil { + q5.ResponseErr(w, 1, "获取数据库配置失败") + return + } + conn := q5.NewMysql( + dbConf.GetHost(), + dbConf.GetPort(), + dbConf.GetUser(), + dbConf.GetPasswd(), + dbConf.GetDatabase()) + defer conn.Close() + if err := conn.Open(); err != nil { + q5.ResponseErr(w, 2, "数据库连接失败") + return + } +} + +func (this *OrderMgr) AddOrder( + dbInstanceId int32, + idx int64, + accountId string, + orderId string, + roleId string, + serverId int32, + channel int32, + polySdkChannel int32, + unifiedChannel int32, + tryCount int32, + price int32, + createTime int64) { + this.orderMutex.Lock() + defer this.orderMutex.Unlock() + if _, ok := this.orderHash[orderId]; ok { + panic("orderId已经存在" + orderId) + } + orderInfo := new(OrderInfo) + orderInfo.dbInstanceId = dbInstanceId + orderInfo.idx = idx + orderInfo.accountId = accountId + orderInfo.roleId = roleId + orderInfo.serverId = serverId + orderInfo.channel = channel + orderInfo.polySdkChannel = polySdkChannel + orderInfo.unifiedChannel = unifiedChannel + orderInfo.tryCount = tryCount + orderInfo.price = price + orderInfo.createTime = createTime + this.orderList = append(this.orderList, orderInfo) + this.orderHash[orderId] = orderInfo +} diff --git a/server/gamepay_backend/proto/mt.proto b/server/gamepay_backend/proto/mt.proto index ef994a3..367979b 100644 --- a/server/gamepay_backend/proto/mt.proto +++ b/server/gamepay_backend/proto/mt.proto @@ -21,6 +21,7 @@ message MysqlConf optional int32 port = 3; optional string user = 4; optional string passwd = 5; + optional string database = 6; } message MysqlConfMetas