This commit is contained in:
aozhiwei 2020-05-03 18:07:26 +08:00
parent 21699e1524
commit 828142a941
8 changed files with 59 additions and 193 deletions

View File

@ -44,17 +44,6 @@ void MSConn::Init(int instance_id, const std::string& remote_ip, int remote_port
void MSConn::UnInit()
{
MSConnMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
MSConnMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
delete pdelnode->msg;
delete pdelnode;
}
a8::Timer::Instance()->DeleteTimer(timer_);
timer_ = nullptr;
tcp_client_->Close();
@ -80,73 +69,9 @@ bool MSConn::Connected()
return tcp_client_->Connected();
}
void MSConn::SendStockMsg()
{
MSConnMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
MSConnMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
if (pdelnode->msg) {
f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg);
delete pdelnode->msg;
}
if (pdelnode->hdr) {
ForwardClientMsg(*pdelnode->hdr);
if (pdelnode->hdr->buf) {
free((char*)pdelnode->hdr->buf);
}
free(pdelnode->hdr);
}
delete pdelnode;
}
}
void MSConn::ForwardClientMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
memset(buff, 0, sizeof(f8::WSProxyPackHead_C));
f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
#if 0
head->rpc_error_code = 0;
#endif
head->socket_handle = hdr.socket_handle;
head->ip_saddr = hdr.ip_saddr;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen);
}
tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen);
free(buff);
}
void MSConn::ForwardClientMsgEx(f8::MsgHdr* hdr)
{
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
ForwardClientMsg(*hdr);
if (hdr->buf) {
free((char*)hdr->buf);
}
free(hdr);
} else {
AddStockMsg(hdr->socket_handle, 0, nullptr, hdr);
}
}
void MSConn::on_error(a8::AsyncTcpClient* sender, int errorId)
{
a8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d",
a8::UdpLog::Instance()->Error("master server errorid=%d remote_ip:%s remote_port:%d",
{
errorId,
sender->remote_address,
@ -157,7 +82,7 @@ void MSConn::on_error(a8::AsyncTcpClient* sender, int errorId)
void MSConn::on_connect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d",
a8::UdpLog::Instance()->Info("master server connected remote_ip:%s remote_port:%d",
{
sender->remote_address,
sender->remote_port
@ -171,7 +96,7 @@ void MSConn::on_connect(a8::AsyncTcpClient* sender)
void MSConn::on_disconnect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect "
a8::UdpLog::Instance()->Info("master server %d disconnected after 10s later reconnect "
"remote_ip:%s remote_port:%d",
{
instance_id,
@ -191,7 +116,7 @@ void MSConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int l
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Debug("recvied target server too long message", {});
a8::UdpLog::Instance()->Debug("recvied master server too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
@ -200,21 +125,20 @@ void MSConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int l
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) {
f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset];
int real_len = p->packlen + (p->ext_len << 16);
while (recv_bufflen_ - offset >= sizeof(f8::PackHead)) {
f8::PackHead* p = (f8::PackHead*) &recv_buff_[offset];
if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) {
if (recv_bufflen_ - offset < sizeof(f8::PackHead) + p->packlen) {
break;
}
App::Instance()->AddSocketMsg(SF_MSConn,
p->socket_handle,
0,
instance_id,
p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)],
real_len);
offset += sizeof(f8::WSProxyPackHead_S) + real_len;
&recv_buff_[offset + sizeof(f8::PackHead)],
p->packlen);
offset += sizeof(f8::PackHead) + p->packlen;
} else {
warning = true;
offset++;
@ -241,6 +165,7 @@ void MSConn::CheckAlive()
} else {
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
last_pong_tick = a8::XGetTickCount();
Close();
Open();
} else {
ss::SS_Ping msg;
@ -248,20 +173,3 @@ void MSConn::CheckAlive()
}
}
}
void MSConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr)
{
MSConnMsgNode* node = new MSConnMsgNode();
node->socket_handle = socket_handle;
node->msgid = msgid;
node->msg = msg;
node->hdr = hdr;
if (bot_node_) {
bot_node_->next_node = node;
bot_node_ = node;
} else {
top_node_ = node;
bot_node_ = node;
}
}

View File

@ -8,16 +8,6 @@ namespace a8
class AsyncTcpClient;
}
struct MSConnMsgNode
{
unsigned short socket_handle = 0;
int msgid = 0;
::google::protobuf::Message* msg = nullptr;
f8::MsgHdr* hdr = nullptr;
MSConnMsgNode* next_node = nullptr;
};
struct timer_list;
class MSConn
{
@ -39,22 +29,9 @@ class MSConn
void SendMsg(int socket_handle, T& msg)
{
static int msgid = f8::Net_GetMessageId(msg);
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg);
} else {
T* new_msg = new T();
*new_msg = msg;
AddStockMsg(socket_handle, msgid, new_msg, nullptr);
}
f8::Net_SendMsg(tcp_client_, 0, msgid, msg);
}
void SendStockMsg();
void ForwardClientMsg(f8::MsgHdr& hdr);
void ForwardClientMsgEx(f8::MsgHdr* hdr);
private:
void on_error(a8::AsyncTcpClient* sender, int errorId);
void on_connect(a8::AsyncTcpClient* sender);
@ -62,15 +39,10 @@ class MSConn
void on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len);
void CheckAlive();
void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr);
private:
char *recv_buff_ = nullptr;
unsigned int recv_bufflen_ = 0;
a8::AsyncTcpClient* tcp_client_ = nullptr;
timer_list* timer_ = nullptr;
MSConnMsgNode* top_node_ = nullptr;
MSConnMsgNode* bot_node_ = nullptr;
};

View File

@ -21,20 +21,20 @@ public:
//packagelen + msgid + magiccode + msgbody
//2 + 2 + 4+ xx + \0 + xx
bool warning = false;
while (buflen - offset >= sizeof(f8::PackHead)) {
f8::PackHead* p = (f8::PackHead*)&buf[offset];
while (buflen - offset >= sizeof(f8::WSProxyPackHead_C)) {
f8::WSProxyPackHead_C* p = (f8::WSProxyPackHead_C*)&buf[offset];
if (p->magic_code == f8::MAGIC_CODE) {
if (buflen - offset < sizeof(f8::PackHead) + p->packlen) {
if (buflen - offset < sizeof(f8::WSProxyPackHead_C) + p->packlen) {
break;
}
App::Instance()->AddSocketMsg(SF_WSProxy,
socket_handle,
saddr,
(socket_handle << 16) + p->socket_handle,
p->ip_saddr,
p->msgid,
p->seqid,
&buf[offset + sizeof(f8::PackHead)],
&buf[offset + sizeof(f8::WSProxyPackHead_C)],
p->packlen);
offset += sizeof(f8::PackHead) + p->packlen;
offset += sizeof(f8::WSProxyPackHead_C) + p->packlen;
} else {
warning = true;
offset++;
@ -95,23 +95,6 @@ void WSListener::UnInit()
tcp_listener_ = nullptr;
}
void WSListener::ForwardTargetConnMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen);
f8::PackHead* head = (f8::PackHead*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
head->ext_len = hdr.buflen >> 16;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::PackHead), hdr.buf, hdr.buflen);
}
tcp_listener_->SendClientMsg(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen);
free(buff);
}
void WSListener::SendText(unsigned short sockhandle, const std::string& text)
{
tcp_listener_->SendClientMsg(sockhandle, text.data(), text.size());
@ -126,3 +109,8 @@ void WSListener::MarkClient(unsigned short sockhandle, bool is_active)
{
tcp_listener_->MarkClient(sockhandle, is_active);
}
void WSListener::_SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg)
{
}

View File

@ -6,6 +6,11 @@ namespace a8
class TcpListener;
}
namespace ss
{
class SS_Ping;
}
class WSListener : public a8::Singleton<WSListener>
{
private:
@ -20,18 +25,24 @@ class WSListener : public a8::Singleton<WSListener>
void UnInit();
template <typename T>
void SendMsg(unsigned short socket_handle, T& msg)
void SendProxyMsg(int sockhandle, T& msg)
{
static int msgid = f8::Net_GetMessageId(msg);
f8::Net_SendMsg(tcp_listener_, socket_handle, 0, msgid, msg);
f8::Net_SendProxyMsg(tcp_listener_, sockhandle, 0, 0, msgid, msg);
}
void ForwardTargetConnMsg(f8::MsgHdr& hdr);
void SendText(unsigned short sockhandle, const std::string& text);
template <typename T>
void SendToClient(int sockhandle, unsigned int seqid, T& msg)
{
static int msgid = f8::Net_GetMessageId(msg);
f8::Net_SendProxyMsg(tcp_listener_, sockhandle, seqid, 0, msgid, msg);
}
void SendText(unsigned short sockhandle, const std::string& text);
void ForceCloseClient(unsigned short sockhandle);
void MarkClient(unsigned short sockhandle, bool is_active);
void _SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg);
private:
a8::TcpListener *tcp_listener_ = nullptr;
};

View File

@ -348,7 +348,7 @@ void App::DispatchMsg()
switch (pdelnode->sockfrom) {
case SF_WSProxy:
{
ProcessClientMsg(hdr);
ProcessWSProxyMsg(hdr);
}
break;
case SF_IMServer:
@ -361,13 +361,11 @@ void App::DispatchMsg()
}
break;
#if 0
case SF_MasterServer:
default:
{
ProcessMasterServerMsg(hdr);
}
break;
#endif
}
if (pdelnode->buf) {
free(pdelnode->buf);
@ -384,30 +382,22 @@ void App::DispatchMsg()
}
}
void App::ProcessClientMsg(f8::MsgHdr& hdr)
void App::ProcessWSProxyMsg(f8::MsgHdr& hdr)
{
if (hdr.msgid < 100) {
return;
}
if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReConnect) {
ss::SS_CMLogin_CMReConnect_CommonHead2 msg;
bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset);
if (ok) {
#if 0
MasterSvrMgr::Instance()->RequestTargetServer(hdr, msg.team_uuid(), msg.account_id());
#endif
}
} else {
#if 0
GameClient* client = GameClientMgr::Instance()->GetGameClientBySocket(hdr.socket_handle);
if (client && client->conn) {
if (client->conn) {
#if 0
client->conn->ForwardClientMsg(hdr);
#endif
f8::NetMsgHandler* handler = f8::GetNetMsgHandler(&HandlerMgr::Instance()->wsmsghandler,
hdr.msgid);
if (handler) {
switch (handler->handlerid) {
case HID_WSListener:
{
ProcessNetMsg(handler, WSListener::Instance(), hdr);
}
break;
default:
{
}
break;
}
#endif
}
}
@ -441,7 +431,6 @@ void App::ProcessTargetServerMsg(f8::MsgHdr& hdr)
#endif
WSListener::Instance()->MarkClient(hdr.socket_handle, true);
}
WSListener::Instance()->ForwardTargetConnMsg(hdr);
}
void App::ProcessIMMsg()

View File

@ -42,7 +42,7 @@ private:
void DispatchMsg();
void ProcessIMMsg();
void ProcessClientMsg(f8::MsgHdr& hdr);
void ProcessWSProxyMsg(f8::MsgHdr& hdr);
void ProcessMasterServerMsg(f8::MsgHdr& hdr);
void ProcessTargetServerMsg(f8::MsgHdr& hdr);

View File

@ -29,9 +29,7 @@ void HandlerMgr::UnInit()
void HandlerMgr::RegisterNetMsgHandlers()
{
#if 0
RegisterNetMsgHandler(&msmsghandler, &MasterSvrMgr::_SS_MS_ResponseTargetServer);
#endif
RegisterNetMsgHandler(&wsmsghandler, &WSListener::_SS_Ping);
}
void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle,

View File

@ -21,7 +21,7 @@ class HandlerMgr : public a8::Singleton<HandlerMgr>
void Init();
void UnInit();
f8::NetMsgHandlerObject gcmsghandler;
f8::NetMsgHandlerObject wsmsghandler;
f8::NetMsgHandlerObject msmsghandler;
void ProcGMMsg(unsigned long saddr, int sockhandle,