This commit is contained in:
azw 2023-04-09 07:35:24 +00:00
parent 8968761471
commit 5811bcd658
12 changed files with 67 additions and 67 deletions

View File

@ -128,7 +128,7 @@ void GCListener::UnInit()
tcp_listener_ = nullptr; tcp_listener_ = nullptr;
} }
void GCListener::ForwardTargetConnMsg(f8::MsgHdr& hdr) void GCListener::ForwardUpStreamMsg(f8::MsgHdr& hdr)
{ {
char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen); char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen);
f8::PackHead* head = (f8::PackHead*)buff; f8::PackHead* head = (f8::PackHead*)buff;

View File

@ -26,7 +26,7 @@ class GCListener : public a8::Singleton<GCListener>
f8::Net_SendMsg(tcp_listener_, socket_handle, 0, msgid, msg); f8::Net_SendMsg(tcp_listener_, socket_handle, 0, msgid, msg);
} }
void ForwardTargetConnMsg(f8::MsgHdr& hdr); void ForwardUpStreamMsg(f8::MsgHdr& hdr);
void SendText(unsigned short sockhandle, const std::string& text); void SendText(unsigned short sockhandle, const std::string& text);
void ForceCloseClient(unsigned short sockhandle); void ForceCloseClient(unsigned short sockhandle);

View File

@ -113,7 +113,7 @@ bool App::Init(int argc, char* argv[])
uuid.SetMachineId((node_id - 1) * MAX_NODE_ID + instance_id); uuid.SetMachineId((node_id - 1) * MAX_NODE_ID + instance_id);
DownStreamMgr::Instance()->Init(); DownStreamMgr::Instance()->Init();
MasterSvrMgr::Instance()->Init(); MasterSvrMgr::Instance()->Init();
TargetConnMgr::Instance()->Init(); UpStreamMgr::Instance()->Init();
GCListener::Instance()->Init(); GCListener::Instance()->Init();
f8::UdpLog::Instance()->Info("wsproxy starting instance_id:%d pid:%d", f8::UdpLog::Instance()->Info("wsproxy starting instance_id:%d pid:%d",
@ -169,7 +169,7 @@ void App::UnInit()
a8::XPrintf("wsproxy terminating instance_id:%d pid:%d\n", {instance_id, getpid()}); a8::XPrintf("wsproxy terminating instance_id:%d pid:%d\n", {instance_id, getpid()});
GCListener::Instance()->UnInit(); GCListener::Instance()->UnInit();
MasterSvrMgr::Instance()->UnInit(); MasterSvrMgr::Instance()->UnInit();
TargetConnMgr::Instance()->UnInit(); UpStreamMgr::Instance()->UnInit();
DownStreamMgr::Instance()->UnInit(); DownStreamMgr::Instance()->UnInit();
JsonDataMgr::Instance()->UnInit(); JsonDataMgr::Instance()->UnInit();
f8::Timer::Instance()->UnInit(); f8::Timer::Instance()->UnInit();
@ -420,10 +420,10 @@ void App::ProcessTargetServerMsg(f8::MsgHdr& hdr)
return; return;
} }
if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReconnect) { if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReconnect) {
DownStreamMgr::Instance()->BindTargetConn(hdr.socket_handle, hdr.ip_saddr); DownStreamMgr::Instance()->BindUpStream(hdr.socket_handle, hdr.ip_saddr);
GCListener::Instance()->MarkClient(hdr.socket_handle, true); GCListener::Instance()->MarkClient(hdr.socket_handle, true);
} }
GCListener::Instance()->ForwardTargetConnMsg(hdr); GCListener::Instance()->ForwardUpStreamMsg(hdr);
} }
#if 0 #if 0
@ -445,16 +445,16 @@ void App::ProcessIMMsg()
MasterSvrMgr::Instance()->RemoveRequest(pdelnode->params.param1, pdelnode->params.sender, true); MasterSvrMgr::Instance()->RemoveRequest(pdelnode->params.param1, pdelnode->params.sender, true);
} }
break; break;
case IM_TargetConnConnect: case IM_UpStreamConnect:
{ {
DownStreamMgr::Instance()->OnTargetServerConnect(pdelnode->params); DownStreamMgr::Instance()->OnTargetServerConnect(pdelnode->params);
TargetConn* conn = TargetConnMgr::Instance()->GetConnById(pdelnode->params.sender); UpStream* conn = UpStreamMgr::Instance()->GetConnById(pdelnode->params.sender);
if (conn && conn->Connected()) { if (conn && conn->Connected()) {
conn->SendStockMsg(); conn->SendStockMsg();
} }
} }
break; break;
case IM_TargetConnDisconnect: case IM_UpStreamDisconnect:
{ {
DownStreamMgr::Instance()->OnTargetServerDisconnect(pdelnode->params); DownStreamMgr::Instance()->OnTargetServerDisconnect(pdelnode->params);
} }

View File

@ -12,9 +12,9 @@ enum InnerMesssage_e
IM_ClientSocketDisconnect = 100, IM_ClientSocketDisconnect = 100,
IM_PlayerOffline, IM_PlayerOffline,
IM_ExecGM, IM_ExecGM,
IM_TargetConnDisconnect, IM_UpStreamDisconnect,
IM_MasterSvrDisconnect, IM_MasterSvrDisconnect,
IM_TargetConnConnect, IM_UpStreamConnect,
}; };
//网络处理对象 //网络处理对象

View File

@ -1,10 +1,10 @@
#pragma once #pragma once
class TargetConn; class UpStream;
class DownStream class DownStream
{ {
public: public:
int socket_handle = a8::INVALID_SOCKET_HANDLE; int socket_handle = a8::INVALID_SOCKET_HANDLE;
TargetConn* conn = nullptr; UpStream* conn = nullptr;
}; };

View File

@ -73,9 +73,9 @@ DownStream* DownStreamMgr::GetGameClientBySocket(int sockhandle)
return itr != socket_hash_.end() ? itr->second : nullptr; return itr != socket_hash_.end() ? itr->second : nullptr;
} }
void DownStreamMgr::BindTargetConn(int socket_handle, int conn_instance_id) void DownStreamMgr::BindUpStream(int socket_handle, int conn_instance_id)
{ {
TargetConn* conn = TargetConnMgr::Instance()->GetConnById(conn_instance_id); UpStream* conn = UpStreamMgr::Instance()->GetConnById(conn_instance_id);
if (conn) { if (conn) {
DownStream* client = GetGameClientBySocket(socket_handle); DownStream* client = GetGameClientBySocket(socket_handle);
if (client) { if (client) {
@ -85,7 +85,7 @@ void DownStreamMgr::BindTargetConn(int socket_handle, int conn_instance_id)
client->socket_handle = socket_handle; client->socket_handle = socket_handle;
client->conn = conn; client->conn = conn;
socket_hash_[client->socket_handle] = client; socket_hash_[client->socket_handle] = client;
f8::UdpLog::Instance()->Info("BindTargetConn socket_handle:%d", {socket_handle}); f8::UdpLog::Instance()->Info("BindUpStream socket_handle:%d", {socket_handle});
{ {
#if 0 #if 0
auto itr = pending_account_hash_.find(socket_handle); auto itr = pending_account_hash_.find(socket_handle);
@ -103,7 +103,7 @@ void DownStreamMgr::BindTargetConn(int socket_handle, int conn_instance_id)
cur_tick - req_tick cur_tick - req_tick
}); });
} }
f8::UdpLog::Instance()->Info("BindTargetConn account_id:%s", {account_id}); f8::UdpLog::Instance()->Info("BindUpStream account_id:%s", {account_id});
RemovePendingAccount(socket_handle); RemovePendingAccount(socket_handle);
} }
#endif #endif

View File

@ -20,7 +20,7 @@ class DownStreamMgr : public a8::Singleton<DownStreamMgr>
void OnTargetServerConnect(a8::XParams& param); void OnTargetServerConnect(a8::XParams& param);
#endif #endif
DownStream* GetGameClientBySocket(int sockhande); DownStream* GetGameClientBySocket(int sockhande);
void BindTargetConn(int socket_handle, int conn_instance_id); void BindUpStream(int socket_handle, int conn_instance_id);
void AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick); void AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick);
private: private:

View File

@ -45,7 +45,7 @@ void MasterSvrMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_
if (context_hdr) { if (context_hdr) {
int socket_handle = context_hdr->socket_handle; int socket_handle = context_hdr->socket_handle;
if (msg.error_code() == 0) { if (msg.error_code() == 0) {
TargetConn* conn = TargetConnMgr::Instance()->RecreateTargetConn( UpStream* conn = UpStreamMgr::Instance()->RecreateUpStream(
msg.host(), msg.host(),
msg.port() msg.port()
); );
@ -76,7 +76,7 @@ void MasterSvrMgr::RequestTargetServer(f8::MsgHdr& hdr,
#if 0 #if 0
#if GAME_ID == 9003 #if GAME_ID == 9003
{ {
TargetConn* conn = TargetConnMgr::Instance()->RecreateTargetConn( UpStream* conn = UpStreamMgr::Instance()->RecreateUpStream(
"10.10.4.4", "10.10.4.4",
8951 8951
); );

View File

@ -15,7 +15,7 @@
const int PACK_MAX = 1024 * 64 * 2; const int PACK_MAX = 1024 * 64 * 2;
void TargetConn::Init(int instance_id, const std::string& remote_ip, int remote_port) void UpStream::Init(int instance_id, const std::string& remote_ip, int remote_port)
{ {
if (remote_ip.empty()) { if (remote_ip.empty()) {
abort(); abort();
@ -30,29 +30,29 @@ void TargetConn::Init(int instance_id, const std::string& remote_ip, int remote_
tcp_client_ = new a8::TcpClient(); tcp_client_ = new a8::TcpClient();
tcp_client_->remote_address = remote_ip; tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port; tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&TargetConn::on_error, this, std::placeholders::_1, std::placeholders::_2); tcp_client_->on_error = std::bind(&UpStream::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&TargetConn::on_connect, this, std::placeholders::_1); tcp_client_->on_connect = std::bind(&UpStream::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&TargetConn::on_disconnect, this, std::placeholders::_1); tcp_client_->on_disconnect = std::bind(&UpStream::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&TargetConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); tcp_client_->on_socketread = std::bind(&UpStream::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
#if 0 #if 0
timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150),
a8::XParams().SetSender(this), a8::XParams().SetSender(this),
[] (const a8::XParams& param) [] (const a8::XParams& param)
{ {
TargetConn* conn = (TargetConn*)param.sender.GetUserData(); UpStream* conn = (UpStream*)param.sender.GetUserData();
conn->CheckAlive(); conn->CheckAlive();
}); });
#endif #endif
} }
void TargetConn::UnInit() void UpStream::UnInit()
{ {
TargetConnMsgNode* work_node; UpStreamMsgNode* work_node;
work_node = top_node_; work_node = top_node_;
top_node_ = nullptr; top_node_ = nullptr;
bot_node_ = nullptr; bot_node_ = nullptr;
while (work_node) { while (work_node) {
TargetConnMsgNode* pdelnode = work_node; UpStreamMsgNode* pdelnode = work_node;
work_node = work_node->next_node; work_node = work_node->next_node;
delete pdelnode->msg; delete pdelnode->msg;
delete pdelnode; delete pdelnode;
@ -70,29 +70,29 @@ void TargetConn::UnInit()
recv_buff_ = nullptr; recv_buff_ = nullptr;
} }
void TargetConn::Open() void UpStream::Open()
{ {
tcp_client_->Open(); tcp_client_->Open();
} }
void TargetConn::Close() void UpStream::Close()
{ {
tcp_client_->Close(); tcp_client_->Close();
} }
bool TargetConn::Connected() bool UpStream::Connected()
{ {
return tcp_client_->Connected(); return tcp_client_->Connected();
} }
void TargetConn::SendStockMsg() void UpStream::SendStockMsg()
{ {
TargetConnMsgNode* work_node; UpStreamMsgNode* work_node;
work_node = top_node_; work_node = top_node_;
top_node_ = nullptr; top_node_ = nullptr;
bot_node_ = nullptr; bot_node_ = nullptr;
while (work_node) { while (work_node) {
TargetConnMsgNode* pdelnode = work_node; UpStreamMsgNode* pdelnode = work_node;
work_node = work_node->next_node; work_node = work_node->next_node;
if (pdelnode->msg) { if (pdelnode->msg) {
@ -110,7 +110,7 @@ void TargetConn::SendStockMsg()
} }
} }
void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr) void UpStream::ForwardClientMsg(f8::MsgHdr& hdr)
{ {
char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
memset(buff, 0, sizeof(f8::WSProxyPackHead_C)); memset(buff, 0, sizeof(f8::WSProxyPackHead_C));
@ -133,7 +133,7 @@ void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr)
free(buff); free(buff);
} }
void TargetConn::ForwardClientMsgEx(f8::MsgHdr* hdr) void UpStream::ForwardClientMsgEx(f8::MsgHdr* hdr)
{ {
if (Connected()) { if (Connected()) {
if (top_node_) { if (top_node_) {
@ -149,7 +149,7 @@ void TargetConn::ForwardClientMsgEx(f8::MsgHdr* hdr)
} }
} }
void TargetConn::on_error(a8::TcpClient* sender, int errorId) void UpStream::on_error(a8::TcpClient* sender, int errorId)
{ {
f8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d", f8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d",
{ {
@ -159,7 +159,7 @@ void TargetConn::on_error(a8::TcpClient* sender, int errorId)
}); });
} }
void TargetConn::on_connect(a8::TcpClient* sender) void UpStream::on_connect(a8::TcpClient* sender)
{ {
recv_bufflen_ = 0; recv_bufflen_ = 0;
f8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d", f8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d",
@ -168,14 +168,14 @@ void TargetConn::on_connect(a8::TcpClient* sender)
sender->remote_port sender->remote_port
}); });
#if 0 #if 0
App::Instance()->AddIMMsg(IM_TargetConnConnect, App::Instance()->AddIMMsg(IM_UpStreamConnect,
a8::XParams() a8::XParams()
.SetSender(instance_id) .SetSender(instance_id)
); );
#endif #endif
} }
void TargetConn::on_disconnect(a8::TcpClient* sender) void UpStream::on_disconnect(a8::TcpClient* sender)
{ {
recv_bufflen_ = 0; recv_bufflen_ = 0;
f8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect " f8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect "
@ -186,14 +186,14 @@ void TargetConn::on_disconnect(a8::TcpClient* sender)
sender->remote_port sender->remote_port
}); });
#if 0 #if 0
App::Instance()->AddIMMsg(IM_TargetConnDisconnect, App::Instance()->AddIMMsg(IM_UpStreamDisconnect,
a8::XParams() a8::XParams()
.SetSender(instance_id) .SetSender(instance_id)
); );
#endif #endif
} }
void TargetConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) void UpStream::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
{ {
#if 0 #if 0
++App::Instance()->perf.read_count; ++App::Instance()->perf.read_count;
@ -243,7 +243,7 @@ void TargetConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int le
#endif #endif
} }
void TargetConn::CheckAlive() void UpStream::CheckAlive()
{ {
if (!Connected()) { if (!Connected()) {
Open(); Open();
@ -258,10 +258,10 @@ void TargetConn::CheckAlive()
} }
} }
void TargetConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, void UpStream::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr) f8::MsgHdr* hdr)
{ {
TargetConnMsgNode* node = new TargetConnMsgNode(); UpStreamMsgNode* node = new UpStreamMsgNode();
node->socket_handle = socket_handle; node->socket_handle = socket_handle;
node->msgid = msgid; node->msgid = msgid;
node->msg = msg; node->msg = msg;

View File

@ -8,18 +8,18 @@ namespace a8
class AsyncTcpClient; class AsyncTcpClient;
} }
struct TargetConnMsgNode struct UpStreamMsgNode
{ {
unsigned short socket_handle = 0; unsigned short socket_handle = 0;
int msgid = 0; int msgid = 0;
::google::protobuf::Message* msg = nullptr; ::google::protobuf::Message* msg = nullptr;
f8::MsgHdr* hdr = nullptr; f8::MsgHdr* hdr = nullptr;
TargetConnMsgNode* next_node = nullptr; UpStreamMsgNode* next_node = nullptr;
}; };
struct timer_list; struct timer_list;
class TargetConn class UpStream
{ {
public: public:
int instance_id = 0; int instance_id = 0;
@ -71,6 +71,6 @@ class TargetConn
a8::TcpClient* tcp_client_ = nullptr; a8::TcpClient* tcp_client_ = nullptr;
timer_list* timer_ = nullptr; timer_list* timer_ = nullptr;
TargetConnMsgNode* top_node_ = nullptr; UpStreamMsgNode* top_node_ = nullptr;
TargetConnMsgNode* bot_node_ = nullptr; UpStreamMsgNode* bot_node_ = nullptr;
}; };

View File

@ -5,11 +5,11 @@
#include "jsondatamgr.h" #include "jsondatamgr.h"
#include "app.h" #include "app.h"
void TargetConnMgr::Init() void UpStreamMgr::Init()
{ {
} }
void TargetConnMgr::UnInit() void UpStreamMgr::UnInit()
{ {
for (auto& pair : id_hash_) { for (auto& pair : id_hash_) {
pair.second->UnInit(); pair.second->UnInit();
@ -17,22 +17,22 @@ void TargetConnMgr::UnInit()
} }
} }
TargetConn* TargetConnMgr::GetConnByKey(const std::string& key) UpStream* UpStreamMgr::GetConnByKey(const std::string& key)
{ {
auto itr = key_hash_.find(key); auto itr = key_hash_.find(key);
return itr != key_hash_.end() ? itr->second : nullptr; return itr != key_hash_.end() ? itr->second : nullptr;
} }
TargetConn* TargetConnMgr::GetConnById(int instance_id) UpStream* UpStreamMgr::GetConnById(int instance_id)
{ {
auto itr = id_hash_.find(instance_id); auto itr = id_hash_.find(instance_id);
return itr != id_hash_.end() ? itr->second : nullptr; return itr != id_hash_.end() ? itr->second : nullptr;
} }
TargetConn* TargetConnMgr::RecreateTargetConn(const std::string& host, int port) UpStream* UpStreamMgr::RecreateUpStream(const std::string& host, int port)
{ {
std::string key = host + ":" + a8::XValue(port).GetString(); std::string key = host + ":" + a8::XValue(port).GetString();
TargetConn* conn = GetConnByKey(key); UpStream* conn = GetConnByKey(key);
if (conn) { if (conn) {
return conn; return conn;
} }
@ -41,7 +41,7 @@ TargetConn* TargetConnMgr::RecreateTargetConn(const std::string& host, int port)
std::string remote_ip = host; std::string remote_ip = host;
int remote_port = port; int remote_port = port;
conn = new TargetConn(); conn = new UpStream();
conn->Init(instance_id, remote_ip, remote_port); conn->Init(instance_id, remote_ip, remote_port);
id_hash_[conn->instance_id] = conn; id_hash_[conn->instance_id] = conn;
key_hash_[key] = conn; key_hash_[key] = conn;

View File

@ -2,24 +2,24 @@
#include <a8/singleton.h> #include <a8/singleton.h>
class TargetConn; class UpStream;
class TargetConnMgr : public a8::Singleton<TargetConnMgr> class UpStreamMgr : public a8::Singleton<UpStreamMgr>
{ {
private: private:
TargetConnMgr() {}; UpStreamMgr() {};
friend class a8::Singleton<TargetConnMgr>; friend class a8::Singleton<UpStreamMgr>;
public: public:
void Init(); void Init();
void UnInit(); void UnInit();
TargetConn* GetConnByKey(const std::string& key); UpStream* GetConnByKey(const std::string& key);
TargetConn* GetConnById(int instance_id); UpStream* GetConnById(int instance_id);
TargetConn* RecreateTargetConn(const std::string& host, int port); UpStream* RecreateUpStream(const std::string& host, int port);
private: private:
unsigned short curr_id_ = 1000; unsigned short curr_id_ = 1000;
std::map<std::string, TargetConn*> key_hash_; std::map<std::string, UpStream*> key_hash_;
std::map<int, TargetConn*> id_hash_; std::map<int, UpStream*> id_hash_;
}; };