diff --git a/server/tools/protobuild/ss_msgid.proto b/server/tools/protobuild/ss_msgid.proto index 25fbe40..15982a1 100644 --- a/server/tools/protobuild/ss_msgid.proto +++ b/server/tools/protobuild/ss_msgid.proto @@ -6,6 +6,8 @@ enum SSMessageId_e _SS_Ping = 8; _SS_Pong = 9; _SS_WSP_SocketDisconnect = 10; + _SS_WSP_RequestTargetServer = 11; + _SS_MS_ResponseTargetServer = 12; _SS_CMPing = 101; _SS_SMRpcError = 102; diff --git a/server/tools/protobuild/ss_proto.proto b/server/tools/protobuild/ss_proto.proto index 2d85631..cdc02c9 100644 --- a/server/tools/protobuild/ss_proto.proto +++ b/server/tools/protobuild/ss_proto.proto @@ -13,6 +13,28 @@ message SS_CMLogin_CMReConnect_CommonHead optional int32 server_id = 1; } +message SS_CMLogin_CMReConnect_CommonHead2 +{ + optional int32 server_id = 1; + optional string team_uuid = 2; +} + +message SS_WSP_RequestTargetServer +{ + optional int64 context_id = 1; + optional string account_id = 2; + optional string team_id = 3; +} + +message SS_MS_ResponseTargetServer +{ + optional int32 error_code = 1; + optional string error_msg = 2; + optional int64 context_id = 3; + optional string host = 4; + optional int32 port = 5; +} + message SS_SMRpcError { optional int32 error_code = 1; diff --git a/server/wsproxy/CMakeLists.txt b/server/wsproxy/CMakeLists.txt index 1a88675..bf8ac61 100644 --- a/server/wsproxy/CMakeLists.txt +++ b/server/wsproxy/CMakeLists.txt @@ -8,10 +8,17 @@ else() message(GAME_ID: ${GAME_ID}) endif() +if (${MASTER_MODE}) + message(MASTER_MODE: 1) +else() + set(MASTER_MODE 0) + message(MASTER_MODE: 0) +endif() + set(CMAKE_BUILD_TYPE "Debug") set(CMAKE_CXX_FLAGS_RELEASE "-std=gnu++11 -fsanitize=address -fno-omit-frame-pointer") set(CMAKE_CXX_FLAGS_DEBUG "-Wall -g -std=gnu++11") -set(CMAKE_CXX_FLAGS_DEBUG "-Wall -g -std=gnu++11 -DGAME_ID=${GAME_ID}") +set(CMAKE_CXX_FLAGS_DEBUG "-Wall -g -std=gnu++11 -DGAME_ID=${GAME_ID} -DMASTER_MODE=${MASTER_MODE}") include_directories( AFTER @@ -53,10 +60,7 @@ add_executable( add_custom_target(script_pb_protocol ALL) add_custom_command(TARGET script_pb_protocol PRE_BUILD -# COMMAND python ../../tools/script/construct/build_script.py COMMAND python ../tools/scripts/construct/build_pb.py -# COMMAND python ../../tools/script/construct/build_protocol.py -# COMMAND python ../../tools/script/construct/build_version_file.py ) add_dependencies(wsproxy script_pb_protocol) diff --git a/server/wsproxy/GCListener.cc b/server/wsproxy/GCListener.cc index ed32398..4360f4a 100644 --- a/server/wsproxy/GCListener.cc +++ b/server/wsproxy/GCListener.cc @@ -58,6 +58,13 @@ public: .SetParam3(saddr)); } + virtual bool HandleRedirect(const std::string& url, const std::string& querystr, + std::string& location) override + { + location = "ws://192.168.100.21:7101"; + return true; + } + virtual void OnDisConnect() override { App::Instance()->AddIMMsg(IM_ClientSocketDisconnect, diff --git a/server/wsproxy/app.cc b/server/wsproxy/app.cc index 630bdc8..25248b8 100644 --- a/server/wsproxy/app.cc +++ b/server/wsproxy/app.cc @@ -16,13 +16,18 @@ #include "GCListener.h" #include "jsondatamgr.h" #include "handlermgr.h" -#include "target_conn.h" -#include "target_conn_mgr.h" #include "gameclient.h" #include "gameclientmgr.h" #include "ss_msgid.pb.h" #include "ss_proto.pb.h" +#include "target_conn.h" +#include "target_conn_mgr.h" +#if MASTER_MODE +#include "mastersvr.h" +#include "mastersvrmgr.h" +#endif + struct MsgNode { SocketFrom_e sockfrom; @@ -92,6 +97,7 @@ void App::Init(int argc, char* argv[]) GCListener::Instance()->Init(); uuid.SetMachineId(instance_id); GameClientMgr::Instance()->Init(); + MasterSvrMgr::Instance()->Init(); TargetConnMgr::Instance()->Init(); a8::UdpLog::Instance()->Info("masterserver starting instance_id:%d pid:%d", {instance_id, getpid()}); @@ -114,6 +120,7 @@ void App::UnInit() if (terminated) { return; } + MasterSvrMgr::Instance()->UnInit(); TargetConnMgr::Instance()->UnInit(); GameClientMgr::Instance()->UnInit(); GCListener::Instance()->UnInit(); @@ -300,6 +307,11 @@ void App::DispatchMsg() ProcessTargetServerMsg(hdr); } break; + case SF_MasterServer: + { + ProcessMasterServerMsg(hdr); + } + break; } if (pdelnode->buf) { free(pdelnode->buf); @@ -321,12 +333,28 @@ void App::ProcessClientMsg(f8::MsgHdr& hdr) if (hdr.msgid < 100) { return; } +#if MASTER_MODE + if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReConnect) { + ss::SS_CMLogin_CMReConnect_CommonHead2 msg; + bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset); + if (ok) { + MasterSvrMgr::Instance()->RequestTargetServer(hdr, msg.team_uuid()); + } + } else { + GameClient* client = GameClientMgr::Instance()->GetGameClientBySocket(hdr.socket_handle); + if (client && client->conn) { + if (client->conn) { + client->conn->ForwardClientMsg(hdr); + } + } + } +#else TargetConn* conn = nullptr; if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReConnect) { ss::SS_CMLogin_CMReConnect_CommonHead msg; bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset); if (ok) { - conn = TargetConnMgr::Instance()->GetConnByInstanceId(msg.server_id()); + conn = TargetConnMgr::Instance()->GetConnById(msg.server_id()); if (!conn) { ss::SS_SMRpcError respmsg; respmsg.set_error_code(10); @@ -344,6 +372,22 @@ void App::ProcessClientMsg(f8::MsgHdr& hdr) if (conn) { conn->ForwardClientMsg(hdr); } +#endif +} + +void App::ProcessMasterServerMsg(f8::MsgHdr& hdr) +{ +#if MASTER_MODE + f8::NetMsgHandler* handler = f8::GetNetMsgHandler(&HandlerMgr::Instance()->msmsghandler, + hdr.msgid); + if (handler) { + switch (handler->handlerid) { + case HID_MasterSvrMgr: + ProcessNetMsg(handler, MasterSvrMgr::Instance(), hdr); + break; + } + } +#endif } void App::ProcessTargetServerMsg(f8::MsgHdr& hdr) @@ -373,6 +417,16 @@ void App::ProcessIMMsg() case IM_ClientSocketDisconnect: { GameClientMgr::Instance()->OnClientDisconnect(pdelnode->params); + MasterSvrMgr::Instance()->RemoveRequest(pdelnode->params.param1, pdelnode->params.sender); + } + break; + case IM_TargetConnConnect: + { + GameClientMgr::Instance()->OnTargetServerDisconnect(pdelnode->params); + TargetConn* conn = TargetConnMgr::Instance()->GetConnById(pdelnode->params.sender); + if (conn && conn->Connected()) { + conn->SendStockMsg(); + } } break; case IM_TargetConnDisconnect: diff --git a/server/wsproxy/app.h b/server/wsproxy/app.h index e8c4563..8fdb272 100644 --- a/server/wsproxy/app.h +++ b/server/wsproxy/app.h @@ -42,6 +42,7 @@ private: void ProcessIMMsg(); void ProcessClientMsg(f8::MsgHdr& hdr); + void ProcessMasterServerMsg(f8::MsgHdr& hdr); void ProcessTargetServerMsg(f8::MsgHdr& hdr); void InitLog(); diff --git a/server/wsproxy/constant.h b/server/wsproxy/constant.h index 7e0454f..3673a5a 100644 --- a/server/wsproxy/constant.h +++ b/server/wsproxy/constant.h @@ -4,6 +4,7 @@ enum SocketFrom_e { SF_Client, SF_TargetServer, + SF_MasterServer, }; enum InnerMesssage_e @@ -11,16 +12,17 @@ enum InnerMesssage_e IM_ClientSocketDisconnect = 100, IM_PlayerOffline, IM_ExecGM, - IM_TargetConnDisconnect + IM_TargetConnDisconnect, + IM_MasterSvrDisconnect, + IM_TargetConnConnect, + IM_RequestTargetServerTimeout }; //网络处理对象 enum NetHandler_e { - HID_Player, - HID_PlayerMgr, - HID_RoomSvrMgr, HID_GCListener, + HID_MasterSvrMgr, }; enum PlayerState_e diff --git a/server/wsproxy/gameclientmgr.cc b/server/wsproxy/gameclientmgr.cc index a25a338..a94a2a8 100644 --- a/server/wsproxy/gameclientmgr.cc +++ b/server/wsproxy/gameclientmgr.cc @@ -44,6 +44,11 @@ void GameClientMgr::OnTargetServerDisconnect(a8::XParams& param) } } +void GameClientMgr::OnTargetServerConnect(a8::XParams& param) +{ + +} + GameClient* GameClientMgr::GetGameClientBySocket(int sockhandle) { auto itr = socket_hash_.find(sockhandle); @@ -52,7 +57,7 @@ GameClient* GameClientMgr::GetGameClientBySocket(int sockhandle) void GameClientMgr::BindTargetConn(int socket_handle, int conn_instance_id) { - TargetConn* conn = TargetConnMgr::Instance()->GetConnByInstanceId(conn_instance_id); + TargetConn* conn = TargetConnMgr::Instance()->GetConnById(conn_instance_id); if (conn) { GameClient* client = GetGameClientBySocket(socket_handle); if (client) { diff --git a/server/wsproxy/gameclientmgr.h b/server/wsproxy/gameclientmgr.h index aeeeeb0..28bbb71 100644 --- a/server/wsproxy/gameclientmgr.h +++ b/server/wsproxy/gameclientmgr.h @@ -14,6 +14,7 @@ class GameClientMgr : public a8::Singleton void OnClientDisconnect(a8::XParams& param); void OnTargetServerDisconnect(a8::XParams& param); + void OnTargetServerConnect(a8::XParams& param); GameClient* GetGameClientBySocket(int sockhande); void BindTargetConn(int socket_handle, int conn_instance_id); diff --git a/server/wsproxy/handlermgr.cc b/server/wsproxy/handlermgr.cc index 7608c93..02f368c 100644 --- a/server/wsproxy/handlermgr.cc +++ b/server/wsproxy/handlermgr.cc @@ -5,6 +5,9 @@ #include "handlermgr.h" #include "GCListener.h" +#include "mastersvrmgr.h" + +#include "ss_proto.pb.h" static void _GMOpsSelfChecking(f8::JsonHttpRequest* request) { @@ -26,6 +29,7 @@ void HandlerMgr::UnInit() void HandlerMgr::RegisterNetMsgHandlers() { + RegisterNetMsgHandler(&msmsghandler, &MasterSvrMgr::_SS_MS_ResponseTargetServer); } void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle, diff --git a/server/wsproxy/handlermgr.h b/server/wsproxy/handlermgr.h index 4e92981..6f5fab6 100644 --- a/server/wsproxy/handlermgr.h +++ b/server/wsproxy/handlermgr.h @@ -22,8 +22,7 @@ class HandlerMgr : public a8::Singleton void UnInit(); f8::NetMsgHandlerObject gcmsghandler; - f8::NetMsgHandlerObject rsmsghandler; - f8::NetMsgHandlerObject gsmsghandler; + f8::NetMsgHandlerObject msmsghandler; void ProcGMMsg(unsigned long saddr, int sockhandle, const std::string& url, const std::string& querystr); diff --git a/server/wsproxy/jsondatamgr.cc b/server/wsproxy/jsondatamgr.cc index 29eadf3..54c0289 100644 --- a/server/wsproxy/jsondatamgr.cc +++ b/server/wsproxy/jsondatamgr.cc @@ -6,18 +6,26 @@ void JsonDataMgr::Init() { std::string wsproxyserver_cluster_json_file; + std::string masterserver_cluster_json_file; std::string targetserver_cluster_json_file; if (f8::IsOnlineEnv()) { wsproxyserver_cluster_json_file = a8::Format("../config/game%d.wsproxy.cluster.json", {GAME_ID}); + masterserver_cluster_json_file = a8::Format("../config/game%d.masterserver.cluster.json", {GAME_ID}); targetserver_cluster_json_file = a8::Format("../config/game%d.gameserver.cluster.json", {GAME_ID}); } else { wsproxyserver_cluster_json_file = a8::Format("/var/data/conf_test/game%d/wsproxy/game%d.wsproxy.cluster.json", {GAME_ID, GAME_ID}); + masterserver_cluster_json_file = a8::Format("/var/data/conf_test/game%d/wsproxy/game%d.masterserver.cluster.json", + {GAME_ID, GAME_ID}); targetserver_cluster_json_file = a8::Format("/var/data/conf_test/game%d/wsproxy/game%d.gameserver.cluster.json", {GAME_ID, GAME_ID}); } wsproxyserver_cluster_json_.ReadFromFile(wsproxyserver_cluster_json_file); +#if MASTER_MODE + masterserver_cluster_json_.ReadFromFile(masterserver_cluster_json_file); +#else targetserver_cluster_json_.ReadFromFile(targetserver_cluster_json_file); +#endif } void JsonDataMgr::UnInit() @@ -32,6 +40,11 @@ std::shared_ptr JsonDataMgr::GetConf() return wsproxyserver_cluster_json_[App::Instance()->instance_id - 1]; } +std::shared_ptr JsonDataMgr::GetMasterServerClusterConf() +{ + return std::make_shared(masterserver_cluster_json_); +} + std::shared_ptr JsonDataMgr::GetTargetServerClusterConf() { return std::make_shared(targetserver_cluster_json_); diff --git a/server/wsproxy/jsondatamgr.h b/server/wsproxy/jsondatamgr.h index d99fe56..bdc5bac 100644 --- a/server/wsproxy/jsondatamgr.h +++ b/server/wsproxy/jsondatamgr.h @@ -11,10 +11,12 @@ class JsonDataMgr : public a8::Singleton void UnInit(); std::shared_ptr GetConf(); + std::shared_ptr GetMasterServerClusterConf(); std::shared_ptr GetTargetServerClusterConf(); private: a8::XObject wsproxyserver_cluster_json_; + a8::XObject masterserver_cluster_json_; a8::XObject targetserver_cluster_json_; }; diff --git a/server/wsproxy/mastersvr.cc b/server/wsproxy/mastersvr.cc new file mode 100644 index 0000000..d39dc5a --- /dev/null +++ b/server/wsproxy/mastersvr.cc @@ -0,0 +1,150 @@ +#include "precompile.h" + +#include + +#include "ss_proto.pb.h" +#include "ss_msgid.pb.h" +#include "mastersvr.h" +#include +#include +#include +#include "app.h" + +const int PACK_MAX = 1024 * 64; + +void MasterSvr::Init(int instance_id, const std::string& remote_ip, int remote_port) +{ + this->instance_id = instance_id; + this->remote_ip = remote_ip; + this->remote_port = remote_port; + + recv_bufflen_ = 0; + last_pong_tick = a8::XGetTickCount(); + recv_buff_ = (char*) malloc(PACK_MAX * 2); + tcp_client_ = new a8::TcpClient(); + tcp_client_->remote_address = remote_ip; + tcp_client_->remote_port = remote_port; + tcp_client_->on_error = std::bind(&MasterSvr::on_error, this, std::placeholders::_1, std::placeholders::_2); + tcp_client_->on_connect = std::bind(&MasterSvr::on_connect, this, std::placeholders::_1); + tcp_client_->on_disconnect = std::bind(&MasterSvr::on_disconnect, this, std::placeholders::_1); + tcp_client_->on_socketread = std::bind(&MasterSvr::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), + a8::XParams().SetSender(this), + [] (const a8::XParams& param) + { + MasterSvr* conn = (MasterSvr*)param.sender.GetUserData(); + conn->CheckAlive(); + }); +} + +void MasterSvr::UnInit() +{ + a8::Timer::Instance()->DeleteTimer(timer_); + timer_ = nullptr; + tcp_client_->Close(); + delete tcp_client_; + tcp_client_ = nullptr; + recv_bufflen_ = 0; + free(recv_buff_); + recv_buff_ = nullptr; +} + +void MasterSvr::Open() +{ + tcp_client_->Open(); +} + +void MasterSvr::Close() +{ + tcp_client_->Close(); +} + +bool MasterSvr::Connected() +{ + return tcp_client_->Connected(); +} + +void MasterSvr::on_error(a8::TcpClient* sender, int errorId) +{ + a8::UdpLog::Instance()->Error("MasterSvr errorid=%d", {errorId}); +} + +void MasterSvr::on_connect(a8::TcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("target server connected", {}); +} + +void MasterSvr::on_disconnect(a8::TcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect", {instance_id}); + App::Instance()->AddIMMsg(IM_MasterSvrDisconnect, + a8::XParams() + .SetSender(instance_id) + ); +} + +void MasterSvr::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) +{ + #if 0 + ++App::Instance()->perf.read_count; + #endif + if (recv_bufflen_ + len > 2 * PACK_MAX) { + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Debug("recvied target server too long message", {}); + return; + } else { + memmove(&recv_buff_[recv_bufflen_], buf, len); + recv_bufflen_ += len; + } + + bool warning = false; + unsigned int offset = 0; + while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) { + f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset]; + if (p->magic_code == f8::MAGIC_CODE) { + if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + p->packlen) { + break; + } + App::Instance()->AddSocketMsg(SF_MasterServer, + p->socket_handle, + instance_id, + p->msgid, + p->seqid, + &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], + p->packlen); + offset += sizeof(f8::WSProxyPackHead_S) + p->packlen; + } else { + warning = true; + offset++; + continue; + } + } + + if (warning) { + a8::UdpLog::Instance()->Debug("recvied bad package", {}); + } + if (offset > 0 && offset < recv_bufflen_) { + memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); + } + recv_bufflen_ -= offset; + #if 1 + last_pong_tick = a8::XGetTickCount(); + #endif +} + +void MasterSvr::CheckAlive() +{ + if (!Connected()) { + Open(); + } else { + if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) { + last_pong_tick = a8::XGetTickCount(); + Open(); + } else { + ss::SS_Ping msg; + SendMsg(msg); + } + } +} diff --git a/server/wsproxy/mastersvr.h b/server/wsproxy/mastersvr.h new file mode 100644 index 0000000..f9b98a4 --- /dev/null +++ b/server/wsproxy/mastersvr.h @@ -0,0 +1,49 @@ +#pragma once + +#include "framework/cpp/protoutils.h" + +namespace a8 +{ + class TcpClient; +} + +struct timer_list; +class MasterSvr +{ + public: + int instance_id = 0; + std::string remote_ip; + int remote_port = 0; + a8::tick_t last_pong_tick = 0; + + public: + void Init(int instance_id, const std::string& remote_ip, int remote_port); + void UnInit(); + + void Open(); + void Close(); + bool Connected(); + + template + void SendMsg(T& msg) + { + static int msgid = f8::Net_GetMessageId(msg); + #if 0 + f8::Net_SendProxyCMsg(tcp_client_, msgid, msg); + #endif + } + + private: + void on_error(a8::TcpClient* sender, int errorId); + void on_connect(a8::TcpClient* sender); + void on_disconnect(a8::TcpClient* sender); + void on_socketread(a8::TcpClient* sender, char* buf, unsigned int len); + + void CheckAlive(); + + private: + char *recv_buff_ = nullptr; + unsigned int recv_bufflen_ = 0; + a8::TcpClient* tcp_client_ = nullptr; + timer_list* timer_ = nullptr; +}; diff --git a/server/wsproxy/mastersvrmgr.cc b/server/wsproxy/mastersvrmgr.cc new file mode 100644 index 0000000..5a8a08c --- /dev/null +++ b/server/wsproxy/mastersvrmgr.cc @@ -0,0 +1,126 @@ +#include "precompile.h" + +#include +#include + +#include "mastersvrmgr.h" +#include "mastersvr.h" +#include "jsondatamgr.h" +#include "ss_proto.pb.h" +#include "target_conn.h" +#include "target_conn_mgr.h" +#include "app.h" + +#include "framework/cpp/netmsghandler.h" + +void MasterSvrMgr::Init() +{ + curr_context_id_ = a8::MakeInt64(0, time(nullptr) + 1000 * 60 * 10); + + auto master_svr_cluster_conf = JsonDataMgr::Instance()->GetMasterServerClusterConf(); + for (int i = 0; i < master_svr_cluster_conf->Size(); ++i) { + auto master_svr_conf = master_svr_cluster_conf->At(i); + int instance_id = master_svr_conf->At("instance_id")->AsXValue(); + std::string remote_ip = master_svr_conf->At("ip")->AsXValue(); + int remote_port = master_svr_conf->At("port")->AsXValue(); + { + MasterSvr* conn = new MasterSvr(); + conn->Init(instance_id, remote_ip, remote_port); + mastersvr_hash_[conn->instance_id] = conn; + conn->Open(); + } + } +} + +void MasterSvrMgr::UnInit() +{ +} + +void MasterSvrMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg) +{ + f8::MsgHdr* context_hdr = GetHdrByContextId(msg.context_id()); + if (context_hdr) { + if (msg.error_code() == 0) { + TargetConn* conn = TargetConnMgr::Instance()->RecreateTargetConn( + msg.host(), + msg.port() + ); + assert(conn); + if (conn) { + conn->ForwardClientMsg(hdr); + } + } + RemoveRequest(context_hdr->socket_handle, msg.context_id()); + } +} + +MasterSvr* MasterSvrMgr::GetConnById(int instance_id) +{ + auto itr = mastersvr_hash_.find(instance_id); + return itr != mastersvr_hash_.end() ? itr->second : nullptr; +} + +void MasterSvrMgr::RequestTargetServer(f8::MsgHdr& hdr, const std::string& team_id) +{ + if (GetContextIdBySocket(hdr.socket_handle) == 0) { + return; + } + unsigned int code = a8::openssl::Crc32((unsigned char*)team_id.data(), team_id.size()); + MasterSvr* svr = GetConnById(code % mastersvr_hash_.size() + 1); + if (svr) { + ++curr_context_id_; + a8::TimerAttacher* timer_attacher = new a8::TimerAttacher(); + f8::MsgHdr* new_hdr = hdr.Clone(); + new_hdr->user_data = timer_attacher; + + ss::SS_WSP_RequestTargetServer msg; + msg.set_context_id(curr_context_id_); + msg.set_team_id(team_id); + svr->SendMsg(msg); + + pending_socket_hash_[hdr.socket_handle] = curr_context_id_; + assert(pending_request_hash_.find(curr_context_id_) == pending_request_hash_.end()); + pending_request_hash_[curr_context_id_] = new_hdr; + a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, + a8::XParams() + .SetSender(curr_context_id_) + .SetParam1(hdr.socket_handle), + [] (const a8::XParams& param) + { + App::Instance()->AddIMMsg(IM_RequestTargetServerTimeout, + a8::XParams() + .SetSender(param.sender) + .SetParam1(param.param1)); + }, + &timer_attacher->timer_list_); + } +} + +void MasterSvrMgr::RemoveRequest(int socket_handle, long long context_id) +{ + if (context_id == GetContextIdBySocket(socket_handle)) { + f8::MsgHdr* hdr = GetHdrByContextId(context_id); + if (hdr) { + a8::TimerAttacher* timer_attacher = (a8::TimerAttacher*)hdr->user_data; + delete timer_attacher; + if (hdr->buf) { + free((char*)hdr->buf); + } + free(hdr); + pending_request_hash_.erase(context_id); + } + pending_socket_hash_.erase(socket_handle); + } +} + +long long MasterSvrMgr::GetContextIdBySocket(int socket_handle) +{ + auto itr = pending_socket_hash_.find(socket_handle); + return itr != pending_socket_hash_.end() ? itr->second : 0; +} + +f8::MsgHdr* MasterSvrMgr::GetHdrByContextId(long long context_id) +{ + auto itr = pending_request_hash_.find(context_id); + return itr != pending_request_hash_.end() ? itr->second : nullptr; +} diff --git a/server/wsproxy/mastersvrmgr.h b/server/wsproxy/mastersvrmgr.h new file mode 100644 index 0000000..67833b5 --- /dev/null +++ b/server/wsproxy/mastersvrmgr.h @@ -0,0 +1,44 @@ +#pragma once + +namespace f8 +{ + struct MsgHdr; +} + +namespace ss +{ + class SS_MS_ResponseTargetServer; +} + +class MasterSvr; +class MasterSvrMgr : public a8::Singleton +{ + public: + enum { HID = HID_MasterSvrMgr }; + + private: + MasterSvrMgr() {}; + friend class a8::Singleton; + + public: + + void Init(); + void UnInit(); + + void _SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg); + void RequestTargetServer(f8::MsgHdr& hdr, const std::string& team_id); + void RemoveRequest(int socket_handle, long long context_id); + + private: + long long GetContextIdBySocket(int socket_handle); + f8::MsgHdr* GetHdrByContextId(long long context_id); + MasterSvr* GetConnById(int instance_id); + + private: + int target_conn_id_ = 100; + long long curr_context_id_ = 0; + std::map mastersvr_hash_; + std::map pending_socket_hash_; + std::map pending_request_hash_; + +}; diff --git a/server/wsproxy/target_conn.cc b/server/wsproxy/target_conn.cc index 395baa2..3d6a2b0 100644 --- a/server/wsproxy/target_conn.cc +++ b/server/wsproxy/target_conn.cc @@ -39,6 +39,17 @@ void TargetConn::Init(int instance_id, const std::string& remote_ip, int remote_ void TargetConn::UnInit() { + TargetConnMsgNode* work_node; + work_node = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + while (work_node) { + TargetConnMsgNode* pdelnode = work_node; + work_node = work_node->next_node; + delete pdelnode->msg; + delete pdelnode; + } + a8::Timer::Instance()->DeleteTimer(timer_); timer_ = nullptr; tcp_client_->Close(); @@ -64,6 +75,22 @@ bool TargetConn::Connected() return tcp_client_->Connected(); } +void TargetConn::SendStockMsg() +{ + TargetConnMsgNode* work_node; + work_node = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + while (work_node) { + TargetConnMsgNode* pdelnode = work_node; + work_node = work_node->next_node; + + f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg); + delete pdelnode->msg; + delete pdelnode; + } +} + void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr) { char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); @@ -95,6 +122,10 @@ void TargetConn::on_connect(a8::TcpClient* sender) { recv_bufflen_ = 0; a8::UdpLog::Instance()->Info("target server connected", {}); + App::Instance()->AddIMMsg(IM_TargetConnDisconnect, + a8::XParams() + .SetSender(instance_id) + ); } void TargetConn::on_disconnect(a8::TcpClient* sender) @@ -166,9 +197,22 @@ void TargetConn::CheckAlive() Open(); } else { ss::SS_Ping msg; - #if 0 SendMsg(0, msg); - #endif } } } + +void TargetConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg) +{ + TargetConnMsgNode* node = new TargetConnMsgNode(); + node->socket_handle = socket_handle; + node->msgid = msgid; + node->msg = msg; + if (bot_node_) { + bot_node_->next_node = node; + bot_node_ = node; + } else { + top_node_ = node; + bot_node_ = node; + } +} diff --git a/server/wsproxy/target_conn.h b/server/wsproxy/target_conn.h index 47b2fdd..0403e7c 100644 --- a/server/wsproxy/target_conn.h +++ b/server/wsproxy/target_conn.h @@ -7,6 +7,15 @@ namespace a8 class TcpClient; } +struct TargetConnMsgNode +{ + unsigned short socket_handle = 0; + int msgid = 0; + ::google::protobuf::Message* msg = nullptr; + + TargetConnMsgNode* next_node = nullptr; +}; + struct timer_list; class TargetConn { @@ -14,7 +23,6 @@ class TargetConn int instance_id = 0; std::string remote_ip; int remote_port = 0; - int matching_player_num = 0; a8::tick_t last_pong_tick = 0; public: @@ -29,9 +37,19 @@ class TargetConn void SendMsg(int socket_handle, T& msg) { static int msgid = f8::Net_GetMessageId(msg); - f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg); + if (Connected()) { + if (top_node_) { + SendStockMsg(); + } + f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg); + } else { + T* new_msg = new T(); + *new_msg = msg; + AddStockMsg(socket_handle, msgid, new_msg); + } } + void SendStockMsg(); void ForwardClientMsg(f8::MsgHdr& hdr); private: @@ -41,10 +59,14 @@ class TargetConn void on_socketread(a8::TcpClient* sender, char* buf, unsigned int len); void CheckAlive(); + void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg); private: char *recv_buff_ = nullptr; unsigned int recv_bufflen_ = 0; a8::TcpClient* tcp_client_ = nullptr; timer_list* timer_ = nullptr; + + TargetConnMsgNode* top_node_ = nullptr; + TargetConnMsgNode* bot_node_ = nullptr; }; diff --git a/server/wsproxy/target_conn_mgr.cc b/server/wsproxy/target_conn_mgr.cc index 0e19e51..ba1d743 100644 --- a/server/wsproxy/target_conn_mgr.cc +++ b/server/wsproxy/target_conn_mgr.cc @@ -6,6 +6,8 @@ void TargetConnMgr::Init() { +#if MASTER_MODE +#else auto target_server_cluster_conf = JsonDataMgr::Instance()->GetTargetServerClusterConf(); for (int i = 0; i < target_server_cluster_conf->Size(); ++i) { auto target_server_conf = target_server_cluster_conf->At(i); @@ -15,18 +17,44 @@ void TargetConnMgr::Init() { TargetConn* conn = new TargetConn(); conn->Init(instance_id, remote_ip, remote_port); - target_conn_hash_[conn->instance_id] = conn; + id_hash_[conn->instance_id] = conn; conn->Open(); } } +#endif } void TargetConnMgr::UnInit() { } -TargetConn* TargetConnMgr::GetConnByInstanceId(int instance_id) +TargetConn* TargetConnMgr::GetConnByKey(const std::string& key) { - auto itr = target_conn_hash_.find(instance_id); - return itr != target_conn_hash_.end() ? itr->second : nullptr; + auto itr = key_hash_.find(key); + return itr != key_hash_.end() ? itr->second : nullptr; +} + +TargetConn* TargetConnMgr::GetConnById(int instance_id) +{ + auto itr = id_hash_.find(instance_id); + return itr != id_hash_.end() ? itr->second : nullptr; +} + +TargetConn* TargetConnMgr::RecreateTargetConn(const std::string& host, int port) +{ + std::string key = host + ":" + a8::XValue(port).GetString(); + TargetConn* conn = GetConnByKey(key); + if (conn) { + return conn; + } + while (GetConnById(++curr_id_)) {}; + int instance_id = curr_id_; + std::string remote_ip = host; + int remote_port = port; + + conn->Init(instance_id, remote_ip, remote_port); + id_hash_[conn->instance_id] = conn; + key_hash_[key] = conn; + conn->Open(); + return conn; } diff --git a/server/wsproxy/target_conn_mgr.h b/server/wsproxy/target_conn_mgr.h index 9519bdb..d939be7 100644 --- a/server/wsproxy/target_conn_mgr.h +++ b/server/wsproxy/target_conn_mgr.h @@ -12,8 +12,12 @@ class TargetConnMgr : public a8::Singleton void Init(); void UnInit(); - TargetConn* GetConnByInstanceId(int instance_id); + TargetConn* GetConnByKey(const std::string& key); + TargetConn* GetConnById(int instance_id); + TargetConn* RecreateTargetConn(const std::string& host, int port); private: - std::map target_conn_hash_; + unsigned short curr_id_ = 1000; + std::map key_hash_; + std::map id_hash_; };