add target_conn_mg

This commit is contained in:
aozhiwei 2018-08-06 20:26:16 +08:00
parent 6ec1c2fab5
commit 2394a9c854
13 changed files with 149 additions and 61 deletions

View File

@ -23,7 +23,7 @@ public:
bool warning = false; bool warning = false;
while (buflen - offset >= sizeof(PackHead)) { while (buflen - offset >= sizeof(PackHead)) {
PackHead* p = (PackHead*)&buf[offset]; PackHead* p = (PackHead*)&buf[offset];
if (p->magiccode == MAGIC_CODE) { if (p->magic_code == MAGIC_CODE) {
if (buflen - offset < sizeof(PackHead) + p->packlen) { if (buflen - offset < sizeof(PackHead) + p->packlen) {
break; break;
} }
@ -31,6 +31,7 @@ public:
socket_handle, socket_handle,
saddr, saddr,
p->msgid, p->msgid,
p->seqid,
&buf[offset + sizeof(PackHead)], &buf[offset + sizeof(PackHead)],
p->packlen); p->packlen);
offset += sizeof(PackHead) + p->packlen; offset += sizeof(PackHead) + p->packlen;
@ -98,17 +99,6 @@ void GCListener::SendText(unsigned short sockhandle, const std::string& text)
tcp_listener_->SendClientMsg(sockhandle, text.data(), text.size()); tcp_listener_->SendClientMsg(sockhandle, text.data(), text.size());
} }
void GCListener::SendErrorMsg(unsigned short sockhandle, int msgid,
int error_code, const std::string& error_msg)
{
#if 0
ss::SS_Error msg;
msg.mutable_result()->set_error_code(error_code);
msg.mutable_result()->set_error_msg(error_msg);
Net_SendMsg(tcp_listener_, sockhandle, msgid, msg);
#endif
}
void GCListener::ForceCloseClient(unsigned short sockhandle) void GCListener::ForceCloseClient(unsigned short sockhandle)
{ {
tcp_listener_->ForceCloseClient(sockhandle); tcp_listener_->ForceCloseClient(sockhandle);

View File

@ -32,9 +32,6 @@ class GCListener : public a8::Singleton<GCListener>
} }
void SendText(unsigned short sockhandle, const std::string& text); void SendText(unsigned short sockhandle, const std::string& text);
void SendErrorMsg(unsigned short sockhandle, int msgid,
int error_code, const std::string& error_msg);
void ForceCloseClient(unsigned short sockhandle); void ForceCloseClient(unsigned short sockhandle);
void MarkClient(unsigned short sockhandle, bool is_active); void MarkClient(unsigned short sockhandle, bool is_active);

View File

@ -15,12 +15,16 @@
#include "GCListener.h" #include "GCListener.h"
#include "jsondatamgr.h" #include "jsondatamgr.h"
#include "handlermgr.h" #include "handlermgr.h"
#include "target_conn.h"
#include "gameclient.h"
#include "gameclientmgr.h"
struct MsgNode struct MsgNode
{ {
SocketFrom_e sockfrom; SocketFrom_e sockfrom;
int sockhandle; int sockhandle;
unsigned short msgid; unsigned short msgid;
unsigned int seqid;
long ip_saddr; long ip_saddr;
char* buf; char* buf;
int buflen; int buflen;
@ -82,6 +86,7 @@ void App::Init(int argc, char* argv[])
JsonDataMgr::Instance()->Init(); JsonDataMgr::Instance()->Init();
GCListener::Instance()->Init(); GCListener::Instance()->Init();
uuid.SetMachineId(instance_id); uuid.SetMachineId(instance_id);
GameClientMgr::Instance()->Init();
a8::UdpLog::Instance()->Info("masterserver starting instance_id:%d pid:%d", {instance_id, getpid()}); a8::UdpLog::Instance()->Info("masterserver starting instance_id:%d pid:%d", {instance_id, getpid()});
{ {
@ -103,6 +108,7 @@ void App::UnInit()
if (terminated) { if (terminated) {
return; return;
} }
GameClientMgr::Instance()->UnInit();
GCListener::Instance()->UnInit(); GCListener::Instance()->UnInit();
JsonDataMgr::Instance()->UnInit(); JsonDataMgr::Instance()->UnInit();
a8::Timer::Instance()->UnInit(); a8::Timer::Instance()->UnInit();
@ -143,6 +149,7 @@ void App::AddSocketMsg(SocketFrom_e sockfrom,
int sockhandle, int sockhandle,
long ip_saddr, long ip_saddr,
unsigned short msgid, unsigned short msgid,
unsigned int seqid,
const char *msgbody, const char *msgbody,
int bodylen) int bodylen)
{ {
@ -152,6 +159,7 @@ void App::AddSocketMsg(SocketFrom_e sockfrom,
p->ip_saddr = ip_saddr; p->ip_saddr = ip_saddr;
p->sockhandle = sockhandle; p->sockhandle = sockhandle;
p->msgid = msgid; p->msgid = msgid;
p->seqid = seqid;
p->buf = nullptr; p->buf = nullptr;
p->buflen = bodylen; p->buflen = bodylen;
if (bodylen > 0) { if (bodylen > 0) {
@ -279,9 +287,9 @@ void App::DispatchMsg()
ProcessClientMsg(hdr); ProcessClientMsg(hdr);
} }
break; break;
case SF_RoomServer: case SF_TargetServer:
{ {
ProcessRoomServerMsg(hdr); ProcessTargetServerMsg(hdr);
} }
break; break;
} }
@ -302,24 +310,16 @@ void App::DispatchMsg()
void App::ProcessClientMsg(MsgHdr& hdr) void App::ProcessClientMsg(MsgHdr& hdr)
{ {
NetMsgHandler* handler = GetNetMsgHandler(&HandlerMgr::Instance()->gcmsghandler, if (hdr.msgid < 100) {
hdr.msgid); return;
if (handler) {
#if 0
switch (handler->handlerid) {
} }
#endif GameClient* client = GameClientMgr::Instance()->GetGameClientBySocket(hdr.socket_handle);
} else { if (client && client->conn) {
#if 0 client->conn->ForwardClientMsg(hdr);
Player* hum = PlayerMgr::Instance()->GetPlayerBySocket(hdr.socket_handle);
if (hum) {
hum->ForwardClientMsg(hdr);
}
#endif
} }
} }
void App::ProcessRoomServerMsg(MsgHdr& hdr) void App::ProcessTargetServerMsg(MsgHdr& hdr)
{ {
NetMsgHandler* handler = GetNetMsgHandler(&HandlerMgr::Instance()->rsmsghandler, NetMsgHandler* handler = GetNetMsgHandler(&HandlerMgr::Instance()->rsmsghandler,
hdr.msgid); hdr.msgid);

View File

@ -21,6 +21,7 @@ class App : public a8::Singleton<App>
int sockhandle, int sockhandle,
long ip_saddr, long ip_saddr,
unsigned short msgid, unsigned short msgid,
unsigned int seqid,
const char *msgbody, const char *msgbody,
int bodylen); int bodylen);
void AddIMMsg(unsigned short imcmd, a8::XParams params); void AddIMMsg(unsigned short imcmd, a8::XParams params);
@ -41,7 +42,7 @@ private:
void ProcessIMMsg(); void ProcessIMMsg();
void ProcessClientMsg(MsgHdr& hdr); void ProcessClientMsg(MsgHdr& hdr);
void ProcessRoomServerMsg(MsgHdr& hdr); void ProcessTargetServerMsg(MsgHdr& hdr);
void InitLog(); void InitLog();
void UnInitLog(); void UnInitLog();

View File

@ -3,7 +3,7 @@
enum SocketFrom_e enum SocketFrom_e
{ {
SF_Client, SF_Client,
SF_RoomServer, SF_TargetServer,
}; };
enum InnerMesssage_e enum InnerMesssage_e
@ -11,7 +11,7 @@ enum InnerMesssage_e
IM_ClientSocketDisconnect = 100, IM_ClientSocketDisconnect = 100,
IM_PlayerOffline, IM_PlayerOffline,
IM_ExecGM, IM_ExecGM,
IM_RSConnDisconnect IM_TargetConnDisconnect
}; };
//网络处理对象 //网络处理对象

View File

@ -0,0 +1,3 @@
#include "precompile.h"
#include "gameclient.h"

View File

@ -0,0 +1,9 @@
#pragma once
class TargetConn;
class GameClient
{
public:
TargetConn* conn = nullptr;
};

View File

@ -0,0 +1,17 @@
#include "precompile.h"
#include "gameclientmgr.h"
void GameClientMgr::Init()
{
}
void GameClientMgr::UnInit()
{
}
GameClient* GameClientMgr::GetGameClientBySocket(int sockhandle)
{
auto itr = socket_hash_.find(sockhandle);
return itr != socket_hash_.end() ? itr->second : nullptr;
}

View File

@ -0,0 +1,19 @@
#pragma once
class GameClient;
class GameClientMgr : public a8::Singleton<GameClientMgr>
{
private:
GameClientMgr() {};
friend class a8::Singleton<GameClientMgr>;
public:
void Init();
void UnInit();
GameClient* GetGameClientBySocket(int sockhande);
private:
std::map<int, GameClient*> socket_hash_;
};

View File

@ -12,7 +12,7 @@
const int PACK_MAX = 1024 * 64; const int PACK_MAX = 1024 * 64;
void RSConn::Init(int instance_id, const std::string& remote_ip, int remote_port) void TargetConn::Init(int instance_id, const std::string& remote_ip, int remote_port)
{ {
this->instance_id = instance_id; this->instance_id = instance_id;
this->remote_ip = remote_ip; this->remote_ip = remote_ip;
@ -24,20 +24,20 @@ void RSConn::Init(int instance_id, const std::string& remote_ip, int remote_port
tcp_client_ = new a8::TcpClient(); tcp_client_ = new a8::TcpClient();
tcp_client_->remote_address = remote_ip; tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port; tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&RSConn::on_error, this, std::placeholders::_1, std::placeholders::_2); tcp_client_->on_error = std::bind(&TargetConn::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&RSConn::on_connect, this, std::placeholders::_1); tcp_client_->on_connect = std::bind(&TargetConn::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&RSConn::on_disconnect, this, std::placeholders::_1); tcp_client_->on_disconnect = std::bind(&TargetConn::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&RSConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); tcp_client_->on_socketread = std::bind(&TargetConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150),
a8::XParams().SetSender(this), a8::XParams().SetSender(this),
[] (const a8::XParams& param) [] (const a8::XParams& param)
{ {
RSConn* conn = (RSConn*)param.sender.GetUserData(); TargetConn* conn = (TargetConn*)param.sender.GetUserData();
conn->CheckAlive(); conn->CheckAlive();
}); });
} }
void RSConn::UnInit() void TargetConn::UnInit()
{ {
a8::Timer::Instance()->DeleteTimer(timer_); a8::Timer::Instance()->DeleteTimer(timer_);
timer_ = nullptr; timer_ = nullptr;
@ -49,43 +49,61 @@ void RSConn::UnInit()
recv_buff_ = nullptr; recv_buff_ = nullptr;
} }
void RSConn::Open() void TargetConn::Open()
{ {
tcp_client_->Open(); tcp_client_->Open();
} }
void RSConn::Close() void TargetConn::Close()
{ {
tcp_client_->Close(); tcp_client_->Close();
} }
bool RSConn::Connected() bool TargetConn::Connected()
{ {
return tcp_client_->Connected(); return tcp_client_->Connected();
} }
void RSConn::on_error(a8::TcpClient* sender, int errorId) void TargetConn::ForwardClientMsg(MsgHdr& hdr)
{ {
a8::UdpLog::Instance()->Error("RSConn errorid=%d", {errorId}); char* buff = (char*)malloc(sizeof(WSProxyPackHead_C) + hdr.buflen);
WSProxyPackHead_C* head = (WSProxyPackHead_C*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = MAGIC_CODE;
#if 0
head->rpc_error_code = 0;
#endif
head->socket_handle = hdr.socket_handle;
head->ip_saddr = hdr.ip_saddr;
tcp_client_->SendBuff(buff, sizeof(WSProxyPackHead_C*) + head->packlen);
free(buff);
} }
void RSConn::on_connect(a8::TcpClient* sender) void TargetConn::on_error(a8::TcpClient* sender, int errorId)
{
a8::UdpLog::Instance()->Error("TargetConn errorid=%d", {errorId});
}
void TargetConn::on_connect(a8::TcpClient* sender)
{ {
recv_bufflen_ = 0; recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("room server connected", {}); a8::UdpLog::Instance()->Info("room server connected", {});
} }
void RSConn::on_disconnect(a8::TcpClient* sender) void TargetConn::on_disconnect(a8::TcpClient* sender)
{ {
recv_bufflen_ = 0; recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("room server %d disconnected after 10s later reconnect", {instance_id}); a8::UdpLog::Instance()->Info("room server %d disconnected after 10s later reconnect", {instance_id});
App::Instance()->AddIMMsg(IM_RSConnDisconnect, App::Instance()->AddIMMsg(IM_TargetConnDisconnect,
a8::XParams() a8::XParams()
.SetSender(instance_id) .SetSender(instance_id)
); );
} }
void RSConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) void TargetConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
{ {
#if 0 #if 0
++App::Instance()->perf.read_count; ++App::Instance()->perf.read_count;
@ -103,14 +121,15 @@ void RSConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
unsigned int offset = 0; unsigned int offset = 0;
while (recv_bufflen_ - offset > sizeof(PackHead)) { while (recv_bufflen_ - offset > sizeof(PackHead)) {
PackHead* p = (PackHead*) &recv_buff_[offset]; PackHead* p = (PackHead*) &recv_buff_[offset];
if (p->magiccode == MAGIC_CODE) { if (p->magic_code == MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(PackHead) + p->packlen) { if (recv_bufflen_ - offset < sizeof(PackHead) + p->packlen) {
break; break;
} }
App::Instance()->AddSocketMsg(SF_RoomServer, App::Instance()->AddSocketMsg(SF_TargetServer,
instance_id, instance_id,
0, 0,
p->msgid, p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(PackHead)], &recv_buff_[offset + sizeof(PackHead)],
p->packlen); p->packlen);
offset += sizeof(PackHead) + p->packlen; offset += sizeof(PackHead) + p->packlen;
@ -130,7 +149,7 @@ void RSConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
recv_bufflen_ -= offset; recv_bufflen_ -= offset;
} }
void RSConn::CheckAlive() void TargetConn::CheckAlive()
{ {
if (!Connected()) { if (!Connected()) {
Open(); Open();
@ -139,8 +158,10 @@ void RSConn::CheckAlive()
last_pong_tick = a8::XGetTickCount(); last_pong_tick = a8::XGetTickCount();
Open(); Open();
} else { } else {
#if 0
ss::SS_Ping msg; ss::SS_Ping msg;
SendToRoomServer(msg); SendToRoomServer(msg);
#endif
} }
} }
} }

View File

@ -6,7 +6,7 @@ namespace a8
} }
struct timer_list; struct timer_list;
class RSConn class TargetConn
{ {
public: public:
int instance_id = 0; int instance_id = 0;
@ -23,12 +23,7 @@ class RSConn
void Close(); void Close();
bool Connected(); bool Connected();
template <typename T> void ForwardClientMsg(MsgHdr& hdr);
void SendToRoomServer(T& msg)
{
static int msgid = ::Net_GetMessageId(msg);
Net_SendMsg(tcp_client_, msgid, msg);
}
private: private:
void on_error(a8::TcpClient* sender, int errorId); void on_error(a8::TcpClient* sender, int errorId);

View File

@ -0,0 +1,17 @@
#include "precompile.h"
#include "target_conn_mgr.h"
void TargetConnMgr::Init()
{
}
void TargetConnMgr::UnInit()
{
}
TargetConn* TargetConnMgr::GetConnBySocket(int instance_id)
{
auto itr = target_conn_hash_.find(instance_id);
return itr != target_conn_hash_.end() ? itr->second : nullptr;
}

View File

@ -0,0 +1,19 @@
#pragma once
class TargetConn;
class TargetConnMgr : public a8::Singleton<TargetConnMgr>
{
private:
TargetConnMgr() {};
friend class a8::Singleton<TargetConnMgr>;
public:
void Init();
void UnInit();
TargetConn* GetConnBySocket(int instance_id);
private:
std::map<int, TargetConn*> target_conn_hash_;
};