From 275709d03a7c1c54d5cd1c46411ef713c11e7591 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Wed, 5 Jun 2019 14:40:11 +0800 Subject: [PATCH] master mode ok --- server/wsproxy/app.cc | 5 +++-- server/wsproxy/mastersvr.cc | 12 +++++------ server/wsproxy/mastersvr.h | 4 +--- server/wsproxy/mastersvrmgr.cc | 22 ++++++++++++------- server/wsproxy/mastersvrmgr.h | 2 +- server/wsproxy/target_conn.cc | 35 +++++++++++++++++++++++++++---- server/wsproxy/target_conn.h | 7 +++++-- server/wsproxy/target_conn_mgr.cc | 1 + third_party/a8engine | 2 +- third_party/framework | 2 +- 10 files changed, 65 insertions(+), 27 deletions(-) diff --git a/server/wsproxy/app.cc b/server/wsproxy/app.cc index 97a8cc8..aa9c5e7 100644 --- a/server/wsproxy/app.cc +++ b/server/wsproxy/app.cc @@ -466,13 +466,13 @@ void App::ProcessIMMsg() { GameClientMgr::Instance()->OnClientDisconnect(pdelnode->params); #if MASTER_MODE - MasterSvrMgr::Instance()->RemoveRequest(pdelnode->params.param1, pdelnode->params.sender); + MasterSvrMgr::Instance()->RemoveRequest(pdelnode->params.param1, pdelnode->params.sender, true); #endif } break; case IM_TargetConnConnect: { - GameClientMgr::Instance()->OnTargetServerDisconnect(pdelnode->params); + GameClientMgr::Instance()->OnTargetServerConnect(pdelnode->params); TargetConn* conn = TargetConnMgr::Instance()->GetConnById(pdelnode->params.sender); if (conn && conn->Connected()) { conn->SendStockMsg(); @@ -626,3 +626,4 @@ void App::FreeIMMsgQueue() } im_msg_mutex_->unlock(); } + diff --git a/server/wsproxy/mastersvr.cc b/server/wsproxy/mastersvr.cc index d39dc5a..538a2b3 100644 --- a/server/wsproxy/mastersvr.cc +++ b/server/wsproxy/mastersvr.cc @@ -101,20 +101,20 @@ void MasterSvr::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len bool warning = false; unsigned int offset = 0; - while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) { - f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset]; + while (recv_bufflen_ - offset >= sizeof(f8::PackHead)) { + f8::PackHead* p = (f8::PackHead*) &recv_buff_[offset]; if (p->magic_code == f8::MAGIC_CODE) { - if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + p->packlen) { + if (recv_bufflen_ - offset < sizeof(f8::PackHead) + p->packlen) { break; } App::Instance()->AddSocketMsg(SF_MasterServer, - p->socket_handle, + 0, instance_id, p->msgid, p->seqid, - &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], + &recv_buff_[offset + sizeof(f8::PackHead)], p->packlen); - offset += sizeof(f8::WSProxyPackHead_S) + p->packlen; + offset += sizeof(f8::PackHead) + p->packlen; } else { warning = true; offset++; diff --git a/server/wsproxy/mastersvr.h b/server/wsproxy/mastersvr.h index f9b98a4..49d445b 100644 --- a/server/wsproxy/mastersvr.h +++ b/server/wsproxy/mastersvr.h @@ -28,9 +28,7 @@ class MasterSvr void SendMsg(T& msg) { static int msgid = f8::Net_GetMessageId(msg); - #if 0 - f8::Net_SendProxyCMsg(tcp_client_, msgid, msg); - #endif + f8::Net_SendMsg(tcp_client_, 0, msgid, msg); } private: diff --git a/server/wsproxy/mastersvrmgr.cc b/server/wsproxy/mastersvrmgr.cc index 770dda5..8738e07 100644 --- a/server/wsproxy/mastersvrmgr.cc +++ b/server/wsproxy/mastersvrmgr.cc @@ -42,6 +42,7 @@ void MasterSvrMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ { f8::MsgHdr* context_hdr = GetHdrByContextId(msg.context_id()); if (context_hdr) { + bool auto_free = true; if (msg.error_code() == 0) { TargetConn* conn = TargetConnMgr::Instance()->RecreateTargetConn( msg.host(), @@ -49,10 +50,11 @@ void MasterSvrMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ ); assert(conn); if (conn) { - conn->ForwardClientMsg(hdr); + auto_free = false; + conn->ForwardClientMsgEx(context_hdr); } } - RemoveRequest(context_hdr->socket_handle, msg.context_id()); + RemoveRequest(context_hdr->socket_handle, msg.context_id(), auto_free); } } @@ -64,11 +66,15 @@ MasterSvr* MasterSvrMgr::GetConnById(int instance_id) void MasterSvrMgr::RequestTargetServer(f8::MsgHdr& hdr, const std::string& team_id) { - if (GetContextIdBySocket(hdr.socket_handle) == 0) { + if (GetContextIdBySocket(hdr.socket_handle) != 0) { return; } unsigned int code = a8::openssl::Crc32((unsigned char*)team_id.data(), team_id.size()); + #if 1 + MasterSvr* svr = GetConnById(2); + #else MasterSvr* svr = GetConnById(code % mastersvr_hash_.size() + 1); + #endif if (svr) { ++curr_context_id_; a8::TimerAttacher* timer_attacher = new a8::TimerAttacher(); @@ -98,17 +104,19 @@ void MasterSvrMgr::RequestTargetServer(f8::MsgHdr& hdr, const std::string& team_ } } -void MasterSvrMgr::RemoveRequest(int socket_handle, long long context_id) +void MasterSvrMgr::RemoveRequest(int socket_handle, long long context_id, bool auto_free) { if (context_id == GetContextIdBySocket(socket_handle)) { f8::MsgHdr* hdr = GetHdrByContextId(context_id); if (hdr) { a8::TimerAttacher* timer_attacher = (a8::TimerAttacher*)hdr->user_data; delete timer_attacher; - if (hdr->buf) { - free((char*)hdr->buf); + if (auto_free) { + if (hdr->buf) { + free((char*)hdr->buf); + } + free(hdr); } - free(hdr); pending_request_hash_.erase(context_id); } pending_socket_hash_.erase(socket_handle); diff --git a/server/wsproxy/mastersvrmgr.h b/server/wsproxy/mastersvrmgr.h index 67833b5..23c421c 100644 --- a/server/wsproxy/mastersvrmgr.h +++ b/server/wsproxy/mastersvrmgr.h @@ -27,7 +27,7 @@ class MasterSvrMgr : public a8::Singleton void _SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg); void RequestTargetServer(f8::MsgHdr& hdr, const std::string& team_id); - void RemoveRequest(int socket_handle, long long context_id); + void RemoveRequest(int socket_handle, long long context_id, bool auto_free); private: long long GetContextIdBySocket(int socket_handle); diff --git a/server/wsproxy/target_conn.cc b/server/wsproxy/target_conn.cc index 209359b..e1a01d9 100644 --- a/server/wsproxy/target_conn.cc +++ b/server/wsproxy/target_conn.cc @@ -85,8 +85,17 @@ void TargetConn::SendStockMsg() TargetConnMsgNode* pdelnode = work_node; work_node = work_node->next_node; - f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg); - delete pdelnode->msg; + if (pdelnode->msg) { + f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg); + delete pdelnode->msg; + } + if (pdelnode->hdr) { + ForwardClientMsg(*pdelnode->hdr); + if (pdelnode->hdr->buf) { + free((char*)pdelnode->hdr->buf); + } + free(pdelnode->hdr); + } delete pdelnode; } } @@ -114,6 +123,22 @@ void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr) free(buff); } +void TargetConn::ForwardClientMsgEx(f8::MsgHdr* hdr) +{ + if (Connected()) { + if (top_node_) { + SendStockMsg(); + } + ForwardClientMsg(*hdr); + if (hdr->buf) { + free((char*)hdr->buf); + } + free(hdr); + } else { + AddStockMsg(hdr->socket_handle, 0, nullptr, hdr); + } +} + void TargetConn::on_error(a8::TcpClient* sender, int errorId) { a8::UdpLog::Instance()->Error("TargetConn errorid=%d", {errorId}); @@ -123,7 +148,7 @@ void TargetConn::on_connect(a8::TcpClient* sender) { recv_bufflen_ = 0; a8::UdpLog::Instance()->Info("target server connected", {}); - App::Instance()->AddIMMsg(IM_TargetConnDisconnect, + App::Instance()->AddIMMsg(IM_TargetConnConnect, a8::XParams() .SetSender(instance_id) ); @@ -204,12 +229,14 @@ void TargetConn::CheckAlive() } } -void TargetConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg) +void TargetConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, + f8::MsgHdr* hdr) { TargetConnMsgNode* node = new TargetConnMsgNode(); node->socket_handle = socket_handle; node->msgid = msgid; node->msg = msg; + node->hdr = hdr; if (bot_node_) { bot_node_->next_node = node; bot_node_ = node; diff --git a/server/wsproxy/target_conn.h b/server/wsproxy/target_conn.h index 0403e7c..8624dd9 100644 --- a/server/wsproxy/target_conn.h +++ b/server/wsproxy/target_conn.h @@ -12,6 +12,7 @@ struct TargetConnMsgNode unsigned short socket_handle = 0; int msgid = 0; ::google::protobuf::Message* msg = nullptr; + f8::MsgHdr* hdr = nullptr; TargetConnMsgNode* next_node = nullptr; }; @@ -45,12 +46,13 @@ class TargetConn } else { T* new_msg = new T(); *new_msg = msg; - AddStockMsg(socket_handle, msgid, new_msg); + AddStockMsg(socket_handle, msgid, new_msg, nullptr); } } void SendStockMsg(); void ForwardClientMsg(f8::MsgHdr& hdr); + void ForwardClientMsgEx(f8::MsgHdr* hdr); private: void on_error(a8::TcpClient* sender, int errorId); @@ -59,7 +61,8 @@ class TargetConn void on_socketread(a8::TcpClient* sender, char* buf, unsigned int len); void CheckAlive(); - void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg); + void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, + f8::MsgHdr* hdr); private: char *recv_buff_ = nullptr; diff --git a/server/wsproxy/target_conn_mgr.cc b/server/wsproxy/target_conn_mgr.cc index 4edd198..3abe90d 100644 --- a/server/wsproxy/target_conn_mgr.cc +++ b/server/wsproxy/target_conn_mgr.cc @@ -63,6 +63,7 @@ TargetConn* TargetConnMgr::RecreateTargetConn(const std::string& host, int port) std::string remote_ip = host; int remote_port = port; + conn = new TargetConn(); conn->Init(instance_id, remote_ip, remote_port); id_hash_[conn->instance_id] = conn; key_hash_[key] = conn; diff --git a/third_party/a8engine b/third_party/a8engine index 3e1f116..f9222e3 160000 --- a/third_party/a8engine +++ b/third_party/a8engine @@ -1 +1 @@ -Subproject commit 3e1f116639d40a78e5ef625009685a611c5bf7bc +Subproject commit f9222e376f8a678a252932c4a3fb93860d7625f2 diff --git a/third_party/framework b/third_party/framework index ca38a14..628e40d 160000 --- a/third_party/framework +++ b/third_party/framework @@ -1 +1 @@ -Subproject commit ca38a14dc4384b5654d21261130dc1bf75c8d5a8 +Subproject commit 628e40d32269414703ba6a74b534593611aa5be8