diff --git a/server/wsproxy/GCListener.cc b/server/wsproxy/GCListener.cc index f71de8a..cf7e3ab 100644 --- a/server/wsproxy/GCListener.cc +++ b/server/wsproxy/GCListener.cc @@ -128,7 +128,7 @@ void GCListener::UnInit() tcp_listener_ = nullptr; } -void GCListener::ForwardTargetConnMsg(f8::MsgHdr& hdr) +void GCListener::ForwardUpStreamMsg(f8::MsgHdr& hdr) { char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen); f8::PackHead* head = (f8::PackHead*)buff; diff --git a/server/wsproxy/GCListener.h b/server/wsproxy/GCListener.h index 962c333..37c3fd3 100644 --- a/server/wsproxy/GCListener.h +++ b/server/wsproxy/GCListener.h @@ -26,7 +26,7 @@ class GCListener : public a8::Singleton f8::Net_SendMsg(tcp_listener_, socket_handle, 0, msgid, msg); } - void ForwardTargetConnMsg(f8::MsgHdr& hdr); + void ForwardUpStreamMsg(f8::MsgHdr& hdr); void SendText(unsigned short sockhandle, const std::string& text); void ForceCloseClient(unsigned short sockhandle); diff --git a/server/wsproxy/app.cc b/server/wsproxy/app.cc index 7263621..80a7261 100644 --- a/server/wsproxy/app.cc +++ b/server/wsproxy/app.cc @@ -113,7 +113,7 @@ bool App::Init(int argc, char* argv[]) uuid.SetMachineId((node_id - 1) * MAX_NODE_ID + instance_id); DownStreamMgr::Instance()->Init(); MasterSvrMgr::Instance()->Init(); - TargetConnMgr::Instance()->Init(); + UpStreamMgr::Instance()->Init(); GCListener::Instance()->Init(); f8::UdpLog::Instance()->Info("wsproxy starting instance_id:%d pid:%d", @@ -169,7 +169,7 @@ void App::UnInit() a8::XPrintf("wsproxy terminating instance_id:%d pid:%d\n", {instance_id, getpid()}); GCListener::Instance()->UnInit(); MasterSvrMgr::Instance()->UnInit(); - TargetConnMgr::Instance()->UnInit(); + UpStreamMgr::Instance()->UnInit(); DownStreamMgr::Instance()->UnInit(); JsonDataMgr::Instance()->UnInit(); f8::Timer::Instance()->UnInit(); @@ -420,10 +420,10 @@ void App::ProcessTargetServerMsg(f8::MsgHdr& hdr) return; } if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReconnect) { - DownStreamMgr::Instance()->BindTargetConn(hdr.socket_handle, hdr.ip_saddr); + DownStreamMgr::Instance()->BindUpStream(hdr.socket_handle, hdr.ip_saddr); GCListener::Instance()->MarkClient(hdr.socket_handle, true); } - GCListener::Instance()->ForwardTargetConnMsg(hdr); + GCListener::Instance()->ForwardUpStreamMsg(hdr); } #if 0 @@ -445,16 +445,16 @@ void App::ProcessIMMsg() MasterSvrMgr::Instance()->RemoveRequest(pdelnode->params.param1, pdelnode->params.sender, true); } break; - case IM_TargetConnConnect: + case IM_UpStreamConnect: { DownStreamMgr::Instance()->OnTargetServerConnect(pdelnode->params); - TargetConn* conn = TargetConnMgr::Instance()->GetConnById(pdelnode->params.sender); + UpStream* conn = UpStreamMgr::Instance()->GetConnById(pdelnode->params.sender); if (conn && conn->Connected()) { conn->SendStockMsg(); } } break; - case IM_TargetConnDisconnect: + case IM_UpStreamDisconnect: { DownStreamMgr::Instance()->OnTargetServerDisconnect(pdelnode->params); } diff --git a/server/wsproxy/constant.h b/server/wsproxy/constant.h index 3165ef0..cb1e1bb 100644 --- a/server/wsproxy/constant.h +++ b/server/wsproxy/constant.h @@ -12,9 +12,9 @@ enum InnerMesssage_e IM_ClientSocketDisconnect = 100, IM_PlayerOffline, IM_ExecGM, - IM_TargetConnDisconnect, + IM_UpStreamDisconnect, IM_MasterSvrDisconnect, - IM_TargetConnConnect, + IM_UpStreamConnect, }; //网络处理对象 diff --git a/server/wsproxy/downstream.h b/server/wsproxy/downstream.h index 2ba072d..84c0b8a 100644 --- a/server/wsproxy/downstream.h +++ b/server/wsproxy/downstream.h @@ -1,10 +1,10 @@ #pragma once -class TargetConn; +class UpStream; class DownStream { public: int socket_handle = a8::INVALID_SOCKET_HANDLE; - TargetConn* conn = nullptr; + UpStream* conn = nullptr; }; diff --git a/server/wsproxy/downstreammgr.cc b/server/wsproxy/downstreammgr.cc index 9d2037d..e696877 100644 --- a/server/wsproxy/downstreammgr.cc +++ b/server/wsproxy/downstreammgr.cc @@ -73,9 +73,9 @@ DownStream* DownStreamMgr::GetGameClientBySocket(int sockhandle) return itr != socket_hash_.end() ? itr->second : nullptr; } -void DownStreamMgr::BindTargetConn(int socket_handle, int conn_instance_id) +void DownStreamMgr::BindUpStream(int socket_handle, int conn_instance_id) { - TargetConn* conn = TargetConnMgr::Instance()->GetConnById(conn_instance_id); + UpStream* conn = UpStreamMgr::Instance()->GetConnById(conn_instance_id); if (conn) { DownStream* client = GetGameClientBySocket(socket_handle); if (client) { @@ -85,7 +85,7 @@ void DownStreamMgr::BindTargetConn(int socket_handle, int conn_instance_id) client->socket_handle = socket_handle; client->conn = conn; socket_hash_[client->socket_handle] = client; - f8::UdpLog::Instance()->Info("BindTargetConn socket_handle:%d", {socket_handle}); + f8::UdpLog::Instance()->Info("BindUpStream socket_handle:%d", {socket_handle}); { #if 0 auto itr = pending_account_hash_.find(socket_handle); @@ -103,7 +103,7 @@ void DownStreamMgr::BindTargetConn(int socket_handle, int conn_instance_id) cur_tick - req_tick }); } - f8::UdpLog::Instance()->Info("BindTargetConn account_id:%s", {account_id}); + f8::UdpLog::Instance()->Info("BindUpStream account_id:%s", {account_id}); RemovePendingAccount(socket_handle); } #endif diff --git a/server/wsproxy/downstreammgr.h b/server/wsproxy/downstreammgr.h index 4a08efe..8b032b6 100644 --- a/server/wsproxy/downstreammgr.h +++ b/server/wsproxy/downstreammgr.h @@ -20,7 +20,7 @@ class DownStreamMgr : public a8::Singleton void OnTargetServerConnect(a8::XParams& param); #endif DownStream* GetGameClientBySocket(int sockhande); - void BindTargetConn(int socket_handle, int conn_instance_id); + void BindUpStream(int socket_handle, int conn_instance_id); void AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick); private: diff --git a/server/wsproxy/mastersvrmgr.cc b/server/wsproxy/mastersvrmgr.cc index d121ab8..4ae728f 100644 --- a/server/wsproxy/mastersvrmgr.cc +++ b/server/wsproxy/mastersvrmgr.cc @@ -45,7 +45,7 @@ void MasterSvrMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ if (context_hdr) { int socket_handle = context_hdr->socket_handle; if (msg.error_code() == 0) { - TargetConn* conn = TargetConnMgr::Instance()->RecreateTargetConn( + UpStream* conn = UpStreamMgr::Instance()->RecreateUpStream( msg.host(), msg.port() ); @@ -76,7 +76,7 @@ void MasterSvrMgr::RequestTargetServer(f8::MsgHdr& hdr, #if 0 #if GAME_ID == 9003 { - TargetConn* conn = TargetConnMgr::Instance()->RecreateTargetConn( + UpStream* conn = UpStreamMgr::Instance()->RecreateUpStream( "10.10.4.4", 8951 ); diff --git a/server/wsproxy/upstream.cc b/server/wsproxy/upstream.cc index eef3ed0..d18070a 100644 --- a/server/wsproxy/upstream.cc +++ b/server/wsproxy/upstream.cc @@ -15,7 +15,7 @@ const int PACK_MAX = 1024 * 64 * 2; -void TargetConn::Init(int instance_id, const std::string& remote_ip, int remote_port) +void UpStream::Init(int instance_id, const std::string& remote_ip, int remote_port) { if (remote_ip.empty()) { abort(); @@ -30,29 +30,29 @@ void TargetConn::Init(int instance_id, const std::string& remote_ip, int remote_ tcp_client_ = new a8::TcpClient(); tcp_client_->remote_address = remote_ip; tcp_client_->remote_port = remote_port; - tcp_client_->on_error = std::bind(&TargetConn::on_error, this, std::placeholders::_1, std::placeholders::_2); - tcp_client_->on_connect = std::bind(&TargetConn::on_connect, this, std::placeholders::_1); - tcp_client_->on_disconnect = std::bind(&TargetConn::on_disconnect, this, std::placeholders::_1); - tcp_client_->on_socketread = std::bind(&TargetConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + tcp_client_->on_error = std::bind(&UpStream::on_error, this, std::placeholders::_1, std::placeholders::_2); + tcp_client_->on_connect = std::bind(&UpStream::on_connect, this, std::placeholders::_1); + tcp_client_->on_disconnect = std::bind(&UpStream::on_disconnect, this, std::placeholders::_1); + tcp_client_->on_socketread = std::bind(&UpStream::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); #if 0 timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), a8::XParams().SetSender(this), [] (const a8::XParams& param) { - TargetConn* conn = (TargetConn*)param.sender.GetUserData(); + UpStream* conn = (UpStream*)param.sender.GetUserData(); conn->CheckAlive(); }); #endif } -void TargetConn::UnInit() +void UpStream::UnInit() { - TargetConnMsgNode* work_node; + UpStreamMsgNode* work_node; work_node = top_node_; top_node_ = nullptr; bot_node_ = nullptr; while (work_node) { - TargetConnMsgNode* pdelnode = work_node; + UpStreamMsgNode* pdelnode = work_node; work_node = work_node->next_node; delete pdelnode->msg; delete pdelnode; @@ -70,29 +70,29 @@ void TargetConn::UnInit() recv_buff_ = nullptr; } -void TargetConn::Open() +void UpStream::Open() { tcp_client_->Open(); } -void TargetConn::Close() +void UpStream::Close() { tcp_client_->Close(); } -bool TargetConn::Connected() +bool UpStream::Connected() { return tcp_client_->Connected(); } -void TargetConn::SendStockMsg() +void UpStream::SendStockMsg() { - TargetConnMsgNode* work_node; + UpStreamMsgNode* work_node; work_node = top_node_; top_node_ = nullptr; bot_node_ = nullptr; while (work_node) { - TargetConnMsgNode* pdelnode = work_node; + UpStreamMsgNode* pdelnode = work_node; work_node = work_node->next_node; if (pdelnode->msg) { @@ -110,7 +110,7 @@ void TargetConn::SendStockMsg() } } -void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr) +void UpStream::ForwardClientMsg(f8::MsgHdr& hdr) { char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); memset(buff, 0, sizeof(f8::WSProxyPackHead_C)); @@ -133,7 +133,7 @@ void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr) free(buff); } -void TargetConn::ForwardClientMsgEx(f8::MsgHdr* hdr) +void UpStream::ForwardClientMsgEx(f8::MsgHdr* hdr) { if (Connected()) { if (top_node_) { @@ -149,7 +149,7 @@ void TargetConn::ForwardClientMsgEx(f8::MsgHdr* hdr) } } -void TargetConn::on_error(a8::TcpClient* sender, int errorId) +void UpStream::on_error(a8::TcpClient* sender, int errorId) { f8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d", { @@ -159,7 +159,7 @@ void TargetConn::on_error(a8::TcpClient* sender, int errorId) }); } -void TargetConn::on_connect(a8::TcpClient* sender) +void UpStream::on_connect(a8::TcpClient* sender) { recv_bufflen_ = 0; f8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d", @@ -168,14 +168,14 @@ void TargetConn::on_connect(a8::TcpClient* sender) sender->remote_port }); #if 0 - App::Instance()->AddIMMsg(IM_TargetConnConnect, + App::Instance()->AddIMMsg(IM_UpStreamConnect, a8::XParams() .SetSender(instance_id) ); #endif } -void TargetConn::on_disconnect(a8::TcpClient* sender) +void UpStream::on_disconnect(a8::TcpClient* sender) { recv_bufflen_ = 0; f8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect " @@ -186,14 +186,14 @@ void TargetConn::on_disconnect(a8::TcpClient* sender) sender->remote_port }); #if 0 - App::Instance()->AddIMMsg(IM_TargetConnDisconnect, + App::Instance()->AddIMMsg(IM_UpStreamDisconnect, a8::XParams() .SetSender(instance_id) ); #endif } -void TargetConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) +void UpStream::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) { #if 0 ++App::Instance()->perf.read_count; @@ -243,7 +243,7 @@ void TargetConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int le #endif } -void TargetConn::CheckAlive() +void UpStream::CheckAlive() { if (!Connected()) { Open(); @@ -258,10 +258,10 @@ void TargetConn::CheckAlive() } } -void TargetConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, +void UpStream::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, f8::MsgHdr* hdr) { - TargetConnMsgNode* node = new TargetConnMsgNode(); + UpStreamMsgNode* node = new UpStreamMsgNode(); node->socket_handle = socket_handle; node->msgid = msgid; node->msg = msg; diff --git a/server/wsproxy/upstream.h b/server/wsproxy/upstream.h index f38e29d..8e4c59a 100644 --- a/server/wsproxy/upstream.h +++ b/server/wsproxy/upstream.h @@ -8,18 +8,18 @@ namespace a8 class AsyncTcpClient; } -struct TargetConnMsgNode +struct UpStreamMsgNode { unsigned short socket_handle = 0; int msgid = 0; ::google::protobuf::Message* msg = nullptr; f8::MsgHdr* hdr = nullptr; - TargetConnMsgNode* next_node = nullptr; + UpStreamMsgNode* next_node = nullptr; }; struct timer_list; -class TargetConn +class UpStream { public: int instance_id = 0; @@ -71,6 +71,6 @@ class TargetConn a8::TcpClient* tcp_client_ = nullptr; timer_list* timer_ = nullptr; - TargetConnMsgNode* top_node_ = nullptr; - TargetConnMsgNode* bot_node_ = nullptr; + UpStreamMsgNode* top_node_ = nullptr; + UpStreamMsgNode* bot_node_ = nullptr; }; diff --git a/server/wsproxy/upstreammgr.cc b/server/wsproxy/upstreammgr.cc index 5599131..451e0a5 100644 --- a/server/wsproxy/upstreammgr.cc +++ b/server/wsproxy/upstreammgr.cc @@ -5,11 +5,11 @@ #include "jsondatamgr.h" #include "app.h" -void TargetConnMgr::Init() +void UpStreamMgr::Init() { } -void TargetConnMgr::UnInit() +void UpStreamMgr::UnInit() { for (auto& pair : id_hash_) { pair.second->UnInit(); @@ -17,22 +17,22 @@ void TargetConnMgr::UnInit() } } -TargetConn* TargetConnMgr::GetConnByKey(const std::string& key) +UpStream* UpStreamMgr::GetConnByKey(const std::string& key) { auto itr = key_hash_.find(key); return itr != key_hash_.end() ? itr->second : nullptr; } -TargetConn* TargetConnMgr::GetConnById(int instance_id) +UpStream* UpStreamMgr::GetConnById(int instance_id) { auto itr = id_hash_.find(instance_id); return itr != id_hash_.end() ? itr->second : nullptr; } -TargetConn* TargetConnMgr::RecreateTargetConn(const std::string& host, int port) +UpStream* UpStreamMgr::RecreateUpStream(const std::string& host, int port) { std::string key = host + ":" + a8::XValue(port).GetString(); - TargetConn* conn = GetConnByKey(key); + UpStream* conn = GetConnByKey(key); if (conn) { return conn; } @@ -41,7 +41,7 @@ TargetConn* TargetConnMgr::RecreateTargetConn(const std::string& host, int port) std::string remote_ip = host; int remote_port = port; - conn = new TargetConn(); + conn = new UpStream(); conn->Init(instance_id, remote_ip, remote_port); id_hash_[conn->instance_id] = conn; key_hash_[key] = conn; diff --git a/server/wsproxy/upstreammgr.h b/server/wsproxy/upstreammgr.h index 9bcd955..bfbdb50 100644 --- a/server/wsproxy/upstreammgr.h +++ b/server/wsproxy/upstreammgr.h @@ -2,24 +2,24 @@ #include -class TargetConn; -class TargetConnMgr : public a8::Singleton +class UpStream; +class UpStreamMgr : public a8::Singleton { private: - TargetConnMgr() {}; - friend class a8::Singleton; + UpStreamMgr() {}; + friend class a8::Singleton; public: void Init(); void UnInit(); - TargetConn* GetConnByKey(const std::string& key); - TargetConn* GetConnById(int instance_id); - TargetConn* RecreateTargetConn(const std::string& host, int port); + UpStream* GetConnByKey(const std::string& key); + UpStream* GetConnById(int instance_id); + UpStream* RecreateUpStream(const std::string& host, int port); private: unsigned short curr_id_ = 1000; - std::map key_hash_; - std::map id_hash_; + std::map key_hash_; + std::map id_hash_; };