From 4c8f2a719aff5ffd57873a5e7d9dd396f62cd033 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Sun, 3 May 2020 16:00:39 +0800 Subject: [PATCH] 1 --- server/imserver/app.cc | 12 +- server/imserver/gameclientmgr.cc | 8 +- server/imserver/mastersvrmgr.cc | 4 +- server/imserver/target_conn.cc | 267 ----------------------------- server/imserver/target_conn.h | 76 -------- server/imserver/target_conn_mgr.cc | 50 ------ server/imserver/target_conn_mgr.h | 23 --- 7 files changed, 16 insertions(+), 424 deletions(-) delete mode 100644 server/imserver/target_conn.cc delete mode 100644 server/imserver/target_conn.h delete mode 100644 server/imserver/target_conn_mgr.cc delete mode 100644 server/imserver/target_conn_mgr.h diff --git a/server/imserver/app.cc b/server/imserver/app.cc index ae9d4d9..140cf32 100644 --- a/server/imserver/app.cc +++ b/server/imserver/app.cc @@ -22,8 +22,8 @@ #include "ss_msgid.pb.h" #include "ss_proto.pb.h" -#include "target_conn.h" -#include "target_conn_mgr.h" +#include "MSConnMgr.h" +#include "IMConnMgr.h" #include "mastersvr.h" #include "mastersvrmgr.h" @@ -126,7 +126,8 @@ bool App::Init(int argc, char* argv[]) uuid.SetMachineId(instance_id); GameClientMgr::Instance()->Init(); MasterSvrMgr::Instance()->Init(); - TargetConnMgr::Instance()->Init(); + IMConnMgr::Instance()->Init(); + MSConnMgr::Instance()->Init(); WSListener::Instance()->Init(); a8::UdpLog::Instance()->Info("friend_imserver starting instance_id:%d pid:%d ", @@ -176,7 +177,8 @@ void App::UnInit() a8::XPrintf("friend_imserver terminating instance_id:%d pid:%d\n", {instance_id, getpid()}); WSListener::Instance()->UnInit(); MasterSvrMgr::Instance()->UnInit(); - TargetConnMgr::Instance()->UnInit(); + IMConnMgr::Instance()->UnInit(); + MSConnMgr::Instance()->UnInit(); GameClientMgr::Instance()->UnInit(); JsonDataMgr::Instance()->UnInit(); a8::IoLoop::Instance()->UnInit(); @@ -403,7 +405,9 @@ void App::ProcessClientMsg(f8::MsgHdr& hdr) GameClient* client = GameClientMgr::Instance()->GetGameClientBySocket(hdr.socket_handle); if (client && client->conn) { if (client->conn) { + #if 0 client->conn->ForwardClientMsg(hdr); + #endif } } } diff --git a/server/imserver/gameclientmgr.cc b/server/imserver/gameclientmgr.cc index aa3d0c9..d39191e 100644 --- a/server/imserver/gameclientmgr.cc +++ b/server/imserver/gameclientmgr.cc @@ -4,8 +4,6 @@ #include "ss_proto.pb.h" #include "gameclient.h" -#include "target_conn.h" -#include "target_conn_mgr.h" #include "WSListener.h" #include "app.h" @@ -28,7 +26,9 @@ void GameClientMgr::OnClientDisconnect(a8::XParams& param) if (client) { if (client->conn) { ss::SS_WSP_SocketDisconnect msg; + #if 0 client->conn->SendMsg(param.sender, msg); + #endif } socket_hash_.erase(param.sender); delete client; @@ -40,9 +40,11 @@ void GameClientMgr::OnTargetServerDisconnect(a8::XParams& param) { std::list delete_client; for (auto& pair : socket_hash_) { + #if 0 if (pair.second->conn && pair.second->conn->instance_id == param.sender.GetInt()) { delete_client.push_back(pair.second); } + #endif } for (auto& client : delete_client) { RemovePendingAccount(client->socket_handle); @@ -65,6 +67,7 @@ GameClient* GameClientMgr::GetGameClientBySocket(int sockhandle) void GameClientMgr::BindTargetConn(int socket_handle, int conn_instance_id) { + #if 0 TargetConn* conn = TargetConnMgr::Instance()->GetConnById(conn_instance_id); if (conn) { GameClient* client = GetGameClientBySocket(socket_handle); @@ -91,6 +94,7 @@ void GameClientMgr::BindTargetConn(int socket_handle, int conn_instance_id) } } } + #endif } void GameClientMgr::AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick) diff --git a/server/imserver/mastersvrmgr.cc b/server/imserver/mastersvrmgr.cc index 669b386..6724285 100644 --- a/server/imserver/mastersvrmgr.cc +++ b/server/imserver/mastersvrmgr.cc @@ -9,8 +9,6 @@ #include "mastersvr.h" #include "jsondatamgr.h" #include "ss_proto.pb.h" -#include "target_conn.h" -#include "target_conn_mgr.h" #include "app.h" #include "gameclientmgr.h" @@ -48,6 +46,7 @@ void MasterSvrMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ bool auto_free = true; int socket_handle = context_hdr->socket_handle; if (msg.error_code() == 0) { + #if 0 TargetConn* conn = TargetConnMgr::Instance()->RecreateTargetConn( msg.host(), msg.port() @@ -57,6 +56,7 @@ void MasterSvrMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ auto_free = false; conn->ForwardClientMsgEx(context_hdr); } + #endif } RemoveRequest(socket_handle, msg.context_id(), auto_free); } diff --git a/server/imserver/target_conn.cc b/server/imserver/target_conn.cc deleted file mode 100644 index f076939..0000000 --- a/server/imserver/target_conn.cc +++ /dev/null @@ -1,267 +0,0 @@ -#include "precompile.h" - -#include - -#include "ss_proto.pb.h" -#include "ss_msgid.pb.h" -#include "target_conn.h" -#include -#include -#include -#include -#include -#include "app.h" - -const int PACK_MAX = 1024 * 64 * 2; - -void TargetConn::Init(int instance_id, const std::string& remote_ip, int remote_port) -{ - if (remote_ip.empty()) { - abort(); - } - this->instance_id = instance_id; - this->remote_ip = remote_ip; - this->remote_port = remote_port; - - recv_bufflen_ = 0; - last_pong_tick = a8::XGetTickCount(); - recv_buff_ = (char*) malloc(PACK_MAX * 2); - tcp_client_ = a8::IoLoop::Instance()->CreateAsyncTcpClient(); - tcp_client_->remote_address = remote_ip; - tcp_client_->remote_port = remote_port; - tcp_client_->on_error = std::bind(&TargetConn::on_error, this, std::placeholders::_1, std::placeholders::_2); - tcp_client_->on_connect = std::bind(&TargetConn::on_connect, this, std::placeholders::_1); - tcp_client_->on_disconnect = std::bind(&TargetConn::on_disconnect, this, std::placeholders::_1); - tcp_client_->on_socketread = std::bind(&TargetConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); - timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), - a8::XParams().SetSender(this), - [] (const a8::XParams& param) - { - TargetConn* conn = (TargetConn*)param.sender.GetUserData(); - conn->CheckAlive(); - }); -} - -void TargetConn::UnInit() -{ - TargetConnMsgNode* work_node; - work_node = top_node_; - top_node_ = nullptr; - bot_node_ = nullptr; - while (work_node) { - TargetConnMsgNode* pdelnode = work_node; - work_node = work_node->next_node; - delete pdelnode->msg; - delete pdelnode; - } - - a8::Timer::Instance()->DeleteTimer(timer_); - timer_ = nullptr; - tcp_client_->Close(); - a8::IoLoop::Instance()->DestoryAsyncTcpClient(tcp_client_); - tcp_client_ = nullptr; - recv_bufflen_ = 0; - free(recv_buff_); - recv_buff_ = nullptr; -} - -void TargetConn::Open() -{ - tcp_client_->Open(); -} - -void TargetConn::Close() -{ - tcp_client_->Close(); -} - -bool TargetConn::Connected() -{ - return tcp_client_->Connected(); -} - -void TargetConn::SendStockMsg() -{ - TargetConnMsgNode* work_node; - work_node = top_node_; - top_node_ = nullptr; - bot_node_ = nullptr; - while (work_node) { - TargetConnMsgNode* pdelnode = work_node; - work_node = work_node->next_node; - - if (pdelnode->msg) { - f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg); - delete pdelnode->msg; - } - if (pdelnode->hdr) { - ForwardClientMsg(*pdelnode->hdr); - if (pdelnode->hdr->buf) { - free((char*)pdelnode->hdr->buf); - } - free(pdelnode->hdr); - } - delete pdelnode; - } -} - -void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr) -{ - char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); - memset(buff, 0, sizeof(f8::WSProxyPackHead_C)); - f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff; - head->packlen = hdr.buflen; - head->msgid = hdr.msgid; - head->seqid = hdr.seqid; - head->magic_code = f8::MAGIC_CODE; - #if 0 - head->rpc_error_code = 0; - #endif - head->socket_handle = hdr.socket_handle; - head->ip_saddr = hdr.ip_saddr; - - if (hdr.buflen > 0) { - memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen); - } - - tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen); - free(buff); -} - -void TargetConn::ForwardClientMsgEx(f8::MsgHdr* hdr) -{ - if (Connected()) { - if (top_node_) { - SendStockMsg(); - } - ForwardClientMsg(*hdr); - if (hdr->buf) { - free((char*)hdr->buf); - } - free(hdr); - } else { - AddStockMsg(hdr->socket_handle, 0, nullptr, hdr); - } -} - -void TargetConn::on_error(a8::AsyncTcpClient* sender, int errorId) -{ - a8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d", - { - errorId, - sender->remote_address, - sender->remote_port - }); -} - -void TargetConn::on_connect(a8::AsyncTcpClient* sender) -{ - recv_bufflen_ = 0; - a8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d", - { - sender->remote_address, - sender->remote_port - }); - App::Instance()->AddIMMsg(IM_TargetConnConnect, - a8::XParams() - .SetSender(instance_id) - ); -} - -void TargetConn::on_disconnect(a8::AsyncTcpClient* sender) -{ - recv_bufflen_ = 0; - a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect " - "remote_ip:%s remote_port:%d", - { - instance_id, - sender->remote_address, - sender->remote_port - }); - App::Instance()->AddIMMsg(IM_TargetConnDisconnect, - a8::XParams() - .SetSender(instance_id) - ); -} - -void TargetConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len) -{ - #if 0 - ++App::Instance()->perf.read_count; - #endif - if (recv_bufflen_ + len > 2 * PACK_MAX) { - recv_bufflen_ = 0; - a8::UdpLog::Instance()->Debug("recvied target server too long message", {}); - return; - } else { - memmove(&recv_buff_[recv_bufflen_], buf, len); - recv_bufflen_ += len; - } - - bool warning = false; - unsigned int offset = 0; - while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) { - f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset]; - int real_len = p->packlen + (p->ext_len << 16); - if (p->magic_code == f8::MAGIC_CODE) { - if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) { - break; - } - App::Instance()->AddSocketMsg(SF_TargetServer, - p->socket_handle, - instance_id, - p->msgid, - p->seqid, - &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], - real_len); - offset += sizeof(f8::WSProxyPackHead_S) + real_len; - } else { - warning = true; - offset++; - continue; - } - } - - if (warning) { - a8::UdpLog::Instance()->Debug("recvied bad package", {}); - } - if (offset > 0 && offset < recv_bufflen_) { - memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); - } - recv_bufflen_ -= offset; - #if 1 - last_pong_tick = a8::XGetTickCount(); - #endif -} - -void TargetConn::CheckAlive() -{ - if (!Connected()) { - Open(); - } else { - if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) { - last_pong_tick = a8::XGetTickCount(); - Open(); - } else { - ss::SS_Ping msg; - SendMsg(0, msg); - } - } -} - -void TargetConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, - f8::MsgHdr* hdr) -{ - TargetConnMsgNode* node = new TargetConnMsgNode(); - node->socket_handle = socket_handle; - node->msgid = msgid; - node->msg = msg; - node->hdr = hdr; - if (bot_node_) { - bot_node_->next_node = node; - bot_node_ = node; - } else { - top_node_ = node; - bot_node_ = node; - } -} diff --git a/server/imserver/target_conn.h b/server/imserver/target_conn.h deleted file mode 100644 index f587904..0000000 --- a/server/imserver/target_conn.h +++ /dev/null @@ -1,76 +0,0 @@ -#pragma once - -#include "framework/cpp/protoutils.h" - -namespace a8 -{ - class TcpClient; - class AsyncTcpClient; -} - -struct TargetConnMsgNode -{ - unsigned short socket_handle = 0; - int msgid = 0; - ::google::protobuf::Message* msg = nullptr; - f8::MsgHdr* hdr = nullptr; - - TargetConnMsgNode* next_node = nullptr; -}; - -struct timer_list; -class TargetConn -{ - public: - int instance_id = 0; - std::string remote_ip; - int remote_port = 0; - a8::tick_t last_pong_tick = 0; - - public: - void Init(int instance_id, const std::string& remote_ip, int remote_port); - void UnInit(); - - void Open(); - void Close(); - bool Connected(); - - template - void SendMsg(int socket_handle, T& msg) - { - static int msgid = f8::Net_GetMessageId(msg); - if (Connected()) { - if (top_node_) { - SendStockMsg(); - } - f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg); - } else { - T* new_msg = new T(); - *new_msg = msg; - AddStockMsg(socket_handle, msgid, new_msg, nullptr); - } - } - - void SendStockMsg(); - void ForwardClientMsg(f8::MsgHdr& hdr); - void ForwardClientMsgEx(f8::MsgHdr* hdr); - - private: - void on_error(a8::AsyncTcpClient* sender, int errorId); - void on_connect(a8::AsyncTcpClient* sender); - void on_disconnect(a8::AsyncTcpClient* sender); - void on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len); - - void CheckAlive(); - void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, - f8::MsgHdr* hdr); - - private: - char *recv_buff_ = nullptr; - unsigned int recv_bufflen_ = 0; - a8::AsyncTcpClient* tcp_client_ = nullptr; - timer_list* timer_ = nullptr; - - TargetConnMsgNode* top_node_ = nullptr; - TargetConnMsgNode* bot_node_ = nullptr; -}; diff --git a/server/imserver/target_conn_mgr.cc b/server/imserver/target_conn_mgr.cc deleted file mode 100644 index 61a5b26..0000000 --- a/server/imserver/target_conn_mgr.cc +++ /dev/null @@ -1,50 +0,0 @@ -#include "precompile.h" - -#include "target_conn_mgr.h" -#include "target_conn.h" -#include "jsondatamgr.h" -#include "app.h" - -void TargetConnMgr::Init() -{ -} - -void TargetConnMgr::UnInit() -{ - for (auto& pair : id_hash_) { - pair.second->UnInit(); - delete pair.second; - } -} - -TargetConn* TargetConnMgr::GetConnByKey(const std::string& key) -{ - auto itr = key_hash_.find(key); - return itr != key_hash_.end() ? itr->second : nullptr; -} - -TargetConn* TargetConnMgr::GetConnById(int instance_id) -{ - auto itr = id_hash_.find(instance_id); - return itr != id_hash_.end() ? itr->second : nullptr; -} - -TargetConn* TargetConnMgr::RecreateTargetConn(const std::string& host, int port) -{ - std::string key = host + ":" + a8::XValue(port).GetString(); - TargetConn* conn = GetConnByKey(key); - if (conn) { - return conn; - } - while (GetConnById(++curr_id_)) {}; - int instance_id = curr_id_; - std::string remote_ip = host; - int remote_port = port; - - conn = new TargetConn(); - conn->Init(instance_id, remote_ip, remote_port); - id_hash_[conn->instance_id] = conn; - key_hash_[key] = conn; - conn->Open(); - return conn; -} diff --git a/server/imserver/target_conn_mgr.h b/server/imserver/target_conn_mgr.h deleted file mode 100644 index d939be7..0000000 --- a/server/imserver/target_conn_mgr.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -class TargetConn; -class TargetConnMgr : public a8::Singleton -{ - private: - TargetConnMgr() {}; - friend class a8::Singleton; - - public: - - void Init(); - void UnInit(); - - TargetConn* GetConnByKey(const std::string& key); - TargetConn* GetConnById(int instance_id); - TargetConn* RecreateTargetConn(const std::string& host, int port); - - private: - unsigned short curr_id_ = 1000; - std::map key_hash_; - std::map id_hash_; -};