From 268f62cff44d2481f7f112aeb8bdcb20319fbd86 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Wed, 15 May 2019 11:29:02 +0800 Subject: [PATCH] add master* --- server/wsproxy/constant.h | 3 +- server/wsproxy/mastersvr.cc | 172 +++++++++++++++++++++++++++++++++ server/wsproxy/mastersvr.h | 54 +++++++++++ server/wsproxy/mastersvrmgr.cc | 0 server/wsproxy/mastersvrmgr.h | 19 ++++ 5 files changed, 247 insertions(+), 1 deletion(-) create mode 100644 server/wsproxy/mastersvr.cc create mode 100644 server/wsproxy/mastersvr.h create mode 100644 server/wsproxy/mastersvrmgr.cc create mode 100644 server/wsproxy/mastersvrmgr.h diff --git a/server/wsproxy/constant.h b/server/wsproxy/constant.h index 7e0454f..4a5a9c8 100644 --- a/server/wsproxy/constant.h +++ b/server/wsproxy/constant.h @@ -11,7 +11,8 @@ enum InnerMesssage_e IM_ClientSocketDisconnect = 100, IM_PlayerOffline, IM_ExecGM, - IM_TargetConnDisconnect + IM_TargetConnDisconnect, + IM_MasterSvrDisconnect }; //网络处理对象 diff --git a/server/wsproxy/mastersvr.cc b/server/wsproxy/mastersvr.cc new file mode 100644 index 0000000..f6a7b31 --- /dev/null +++ b/server/wsproxy/mastersvr.cc @@ -0,0 +1,172 @@ +#include "precompile.h" + +#include + +#include "ss_proto.pb.h" +#include "ss_msgid.pb.h" +#include "mastersvr.h" +#include +#include +#include +#include "app.h" + +const int PACK_MAX = 1024 * 64; + +void MasterSvr::Init(int instance_id, const std::string& remote_ip, int remote_port) +{ + this->instance_id = instance_id; + this->remote_ip = remote_ip; + this->remote_port = remote_port; + + recv_bufflen_ = 0; + last_pong_tick = a8::XGetTickCount(); + recv_buff_ = (char*) malloc(PACK_MAX * 2); + tcp_client_ = new a8::TcpClient(); + tcp_client_->remote_address = remote_ip; + tcp_client_->remote_port = remote_port; + tcp_client_->on_error = std::bind(&MasterSvr::on_error, this, std::placeholders::_1, std::placeholders::_2); + tcp_client_->on_connect = std::bind(&MasterSvr::on_connect, this, std::placeholders::_1); + tcp_client_->on_disconnect = std::bind(&MasterSvr::on_disconnect, this, std::placeholders::_1); + tcp_client_->on_socketread = std::bind(&MasterSvr::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), + a8::XParams().SetSender(this), + [] (const a8::XParams& param) + { + MasterSvr* conn = (MasterSvr*)param.sender.GetUserData(); + conn->CheckAlive(); + }); +} + +void MasterSvr::UnInit() +{ + a8::Timer::Instance()->DeleteTimer(timer_); + timer_ = nullptr; + tcp_client_->Close(); + delete tcp_client_; + tcp_client_ = nullptr; + recv_bufflen_ = 0; + free(recv_buff_); + recv_buff_ = nullptr; +} + +void MasterSvr::Open() +{ + tcp_client_->Open(); +} + +void MasterSvr::Close() +{ + tcp_client_->Close(); +} + +bool MasterSvr::Connected() +{ + return tcp_client_->Connected(); +} + +void MasterSvr::ForwardClientMsg(f8::MsgHdr& hdr) +{ + char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); + f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff; + head->packlen = hdr.buflen; + head->msgid = hdr.msgid; + head->seqid = hdr.seqid; + head->magic_code = f8::MAGIC_CODE; + #if 0 + head->rpc_error_code = 0; + #endif + head->socket_handle = hdr.socket_handle; + head->ip_saddr = hdr.ip_saddr; + + if (hdr.buflen > 0) { + memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen); + } + + tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen); + free(buff); +} + +void MasterSvr::on_error(a8::TcpClient* sender, int errorId) +{ + a8::UdpLog::Instance()->Error("MasterSvr errorid=%d", {errorId}); +} + +void MasterSvr::on_connect(a8::TcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("target server connected", {}); +} + +void MasterSvr::on_disconnect(a8::TcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect", {instance_id}); + App::Instance()->AddIMMsg(IM_MasterSvrDisconnect, + a8::XParams() + .SetSender(instance_id) + ); +} + +void MasterSvr::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) +{ + #if 0 + ++App::Instance()->perf.read_count; + #endif + if (recv_bufflen_ + len > 2 * PACK_MAX) { + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Debug("recvied target server too long message", {}); + return; + } else { + memmove(&recv_buff_[recv_bufflen_], buf, len); + recv_bufflen_ += len; + } + + bool warning = false; + unsigned int offset = 0; + while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) { + f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset]; + if (p->magic_code == f8::MAGIC_CODE) { + if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + p->packlen) { + break; + } + App::Instance()->AddSocketMsg(SF_TargetServer, + p->socket_handle, + instance_id, + p->msgid, + p->seqid, + &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], + p->packlen); + offset += sizeof(f8::WSProxyPackHead_S) + p->packlen; + } else { + warning = true; + offset++; + continue; + } + } + + if (warning) { + a8::UdpLog::Instance()->Debug("recvied bad package", {}); + } + if (offset > 0 && offset < recv_bufflen_) { + memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); + } + recv_bufflen_ -= offset; + #if 1 + last_pong_tick = a8::XGetTickCount(); + #endif +} + +void MasterSvr::CheckAlive() +{ + if (!Connected()) { + Open(); + } else { + if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) { + last_pong_tick = a8::XGetTickCount(); + Open(); + } else { + ss::SS_Ping msg; + SendMsg(msg); + } + } +} diff --git a/server/wsproxy/mastersvr.h b/server/wsproxy/mastersvr.h new file mode 100644 index 0000000..e79d435 --- /dev/null +++ b/server/wsproxy/mastersvr.h @@ -0,0 +1,54 @@ +#pragma once + +#include "framework/cpp/protoutils.h" + +namespace a8 +{ + class TcpClient; +} + +struct timer_list; +class MasterSvr +{ + public: + int instance_id = 0; + std::string remote_ip; + int remote_port = 0; + int matching_player_num = 0; + a8::tick_t last_pong_tick = 0; + + public: + void Init(int instance_id, const std::string& remote_ip, int remote_port); + void UnInit(); + + void Open(); + void Close(); + bool Connected(); + + template + void SendMsg(T& msg) + { + static int msgid = f8::Net_GetMessageId(msg); + #if 1 + f8::Net_SendProxyCMsg(tcp_client_, msgid, msg); + #else + f8::Net_SendMsg(tcp_client_, 0, msgid, msg); + #endif + } + + void ForwardClientMsg(f8::MsgHdr& hdr); + + private: + void on_error(a8::TcpClient* sender, int errorId); + void on_connect(a8::TcpClient* sender); + void on_disconnect(a8::TcpClient* sender); + void on_socketread(a8::TcpClient* sender, char* buf, unsigned int len); + + void CheckAlive(); + + private: + char *recv_buff_ = nullptr; + unsigned int recv_bufflen_ = 0; + a8::TcpClient* tcp_client_ = nullptr; + timer_list* timer_ = nullptr; +}; diff --git a/server/wsproxy/mastersvrmgr.cc b/server/wsproxy/mastersvrmgr.cc new file mode 100644 index 0000000..e69de29 diff --git a/server/wsproxy/mastersvrmgr.h b/server/wsproxy/mastersvrmgr.h new file mode 100644 index 0000000..1fda6ad --- /dev/null +++ b/server/wsproxy/mastersvrmgr.h @@ -0,0 +1,19 @@ +#pragma once + +class MasterSvr; +class MasterSvrMgr : public a8::Singleton +{ + private: + MasterSvrMgr() {}; + friend class a8::Singleton; + + public: + + void Init(); + void UnInit(); + + MasterSvr* GetConnByInstanceId(int instance_id); + + private: + std::map target_conn_hash_; +};