diff --git a/server/tools/protobuild/ss_proto.proto b/server/tools/protobuild/ss_proto.proto index 28d3de4..7b6952f 100644 --- a/server/tools/protobuild/ss_proto.proto +++ b/server/tools/protobuild/ss_proto.proto @@ -8,6 +8,11 @@ message SS_CMPing { } +message SS_SMPing +{ + optional int32 param1 = 1; optional int32 source = 2 [default = 0]; //0:tcp 1:udp +} + message SS_CMLogin_CMReConnect_CommonHead { optional int32 server_id = 1; diff --git a/server/wsproxy/CMakeLists.txt b/server/wsproxy/CMakeLists.txt index 0af43ba..8c3beaf 100644 --- a/server/wsproxy/CMakeLists.txt +++ b/server/wsproxy/CMakeLists.txt @@ -102,7 +102,6 @@ target_link_libraries( curl hiredis tinyxml2 - tcmalloc ) if (CMAKE_BUILD_TYPE STREQUAL "Debug") diff --git a/server/wsproxy/GCListener.cc b/server/wsproxy/GCListener.cc index d3c3220..0fe6f27 100644 --- a/server/wsproxy/GCListener.cc +++ b/server/wsproxy/GCListener.cc @@ -193,3 +193,8 @@ bool GCListener::GetWebSocketUrl(int socket_handle, std::string& url, std::strin return false; } } + +int GCListener::GetSocketCount() +{ + return tcp_listener_->GetClientSocketCount(); +} diff --git a/server/wsproxy/GCListener.h b/server/wsproxy/GCListener.h index 4aa30d5..dd3bfd7 100644 --- a/server/wsproxy/GCListener.h +++ b/server/wsproxy/GCListener.h @@ -40,6 +40,7 @@ class GCListener : public a8::Singleton long long GetSendNodeNum(); long long GetSentBytesNum(); bool GetWebSocketUrl(int socket_handle, std::string& url, std::string& query_str); + int GetSocketCount(); private: a8::TcpListener *tcp_listener_ = nullptr; diff --git a/server/wsproxy/app.cc b/server/wsproxy/app.cc index 3c99376..b45c0e0 100644 --- a/server/wsproxy/app.cc +++ b/server/wsproxy/app.cc @@ -55,10 +55,10 @@ const char* const PROJ_LOG_FILENAME_FMT = "log_$pid_%Y%m%d.log"; static void SavePerfLog() { - f8::UdpLog::Instance()->Info(" max_run_delay_time:%d max_timer_idle:%d " + f8::UdpLog::Instance()->Info("max_run_delay_time:%d max_timer_idle:%d " "in_data_size:%d out_data_size:%d msgnode_size:%d udp_msgnode_size:%d " "read_count:%d max_login_time:%d " - "max_join_time:%d", + "max_join_time:%d tcp_count:%d udp_count:%d down_stream_count:%d", { App::Instance()->GetPerf().max_run_delay_time, App::Instance()->GetPerf().max_timer_idle, @@ -69,6 +69,9 @@ static void SavePerfLog() App::Instance()->GetPerf().read_count, App::Instance()->GetPerf().max_login_time, App::Instance()->GetPerf().max_join_time, + GCListener::Instance()->GetSocketCount(), + LongSessionMgr::Instance()->GetLongSessionCount(), + DownStreamMgr::Instance()->GetDownStreamCount() }); if (App::Instance()->HasFlag(2)) { a8::XPrintf("mainloop_time:%d netmsg_time:%d send_node_num:%d sent_bytes_num:%d\n", @@ -567,8 +570,14 @@ void App::FreeUdpMsgQueue() } while (udp_work_node_) { UdpMsgNode* pdelnode = udp_work_node_; - delete pdelnode->pkt; udp_work_node_ = udp_work_node_->next; + { + if (pdelnode->pkt->buf) { + free((void*)pdelnode->pkt->buf); + } + delete pdelnode->pkt; + free(pdelnode); + } if (!udp_work_node_) { udp_work_node_ = udp_top_node_; udp_top_node_ = nullptr; @@ -612,6 +621,13 @@ void App::DispatchUdpMsg() UdpMsgNode *pdelnode = udp_work_node_; LongSessionMgr::Instance()->ProcUdpPacket(pdelnode->pkt); udp_work_node_ = pdelnode->next; + { + if (pdelnode->pkt->buf) { + free((void*)pdelnode->pkt->buf); + } + delete pdelnode->pkt; + free(pdelnode); + } udp_working_msgnode_size_--; if (a8::XGetTickCount() - starttick > 200) { break; diff --git a/server/wsproxy/downstream.cc b/server/wsproxy/downstream.cc index 868a806..c00e1ff 100644 --- a/server/wsproxy/downstream.cc +++ b/server/wsproxy/downstream.cc @@ -37,7 +37,25 @@ void DownStream::ForwardUpStreamMsg(f8::MsgHdr& hdr) } if (auto long_session = long_session_wp_.lock(); !long_session_wp_.expired()) { - long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen); + if (hdr.msgid == ss::_SS_CMPing) { + ss::SS_SMPing msg; + msg.set_source(1); + { + free(buff); + + buff = (char*)malloc(sizeof(f8::PackHead) + msg.ByteSize()); + f8::PackHead* head = (f8::PackHead*)buff; + head->packlen = msg.ByteSize(); + head->msgid = hdr.msgid; + head->seqid = hdr.seqid; + head->magic_code = f8::MAGIC_CODE; + head->ext_len = hdr.buflen >> 16; + msg.SerializeToArray(buff + sizeof(f8::PackHead), head->packlen); + long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen); + } + } else { + long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen); + } } else { GCListener::Instance()->SendBuf(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen); } @@ -58,7 +76,7 @@ void DownStream::OnClose() void DownStream::ProcCMMsg(f8::MsgHdr& hdr, int tag) { if (hdr.msgid == ss::_SS_CMPing && IsLongSession() && tag == ST_Tcp) { - ss::SS_Ping msg; + ss::SS_SMPing msg; GCListener::Instance()->SendMsgEx(socket_handle_, ss::_SS_CMPing, msg); if (!long_session_wp_.expired()) { long_session_wp_.lock()->UpdatePing(); diff --git a/server/wsproxy/downstreammgr.cc b/server/wsproxy/downstreammgr.cc index 531acc7..ef99152 100644 --- a/server/wsproxy/downstreammgr.cc +++ b/server/wsproxy/downstreammgr.cc @@ -152,3 +152,8 @@ void DownStreamMgr::RemovePendingAccount(int socket_handle) pending_account_hash_.erase(itr); } } + +int DownStreamMgr::GetDownStreamCount() +{ + return socket_hash_.size(); +} diff --git a/server/wsproxy/downstreammgr.h b/server/wsproxy/downstreammgr.h index 55dd8dc..09c8dcd 100644 --- a/server/wsproxy/downstreammgr.h +++ b/server/wsproxy/downstreammgr.h @@ -20,6 +20,7 @@ class DownStreamMgr : public a8::Singleton std::weak_ptr GetDownStream(int sockhande); void BindUpStream(int socket_handle, int instance_id); void AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick); + int GetDownStreamCount(); private: diff --git a/server/wsproxy/handlermgr.cc b/server/wsproxy/handlermgr.cc index fdf963c..2f562f5 100644 --- a/server/wsproxy/handlermgr.cc +++ b/server/wsproxy/handlermgr.cc @@ -9,6 +9,7 @@ #include "GCListener.h" #include "mastermgr.h" #include "app.h" +#include "jsondatamgr.h" #include "ss_proto.pb.h" @@ -27,11 +28,30 @@ static void _GMOpsGetNodeId(std::shared_ptr request) request->resp_xobj->SetVal("node_id", App::Instance()->GetNodeId()); } +static void _GMOpsSetKcpSwitch(std::shared_ptr request) +{ + request->resp_xobj->SetVal("errcode", 0); + request->resp_xobj->SetVal("errmsg", ""); + if (request->params->HasKey("open")) { + JsonDataMgr::Instance()->SetKcpSwitch(request->params->At("open")->AsXValue().GetInt()); + } + request->resp_xobj->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open); +} + +static void _GMOpsGetKcpSwitch(std::shared_ptr request) +{ + request->resp_xobj->SetVal("errcode", 0); + request->resp_xobj->SetVal("errmsg", ""); + request->resp_xobj->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open); +} + void HandlerMgr::Init() { RegisterNetMsgHandlers(); RegisterGMMsgHandler("Ops$selfChecking", _GMOpsSelfChecking); RegisterGMMsgHandler("Ops$getNodeId", _GMOpsGetNodeId); + RegisterGMMsgHandler("Ops$setKcpSwitch", _GMOpsSetKcpSwitch); + RegisterGMMsgHandler("Ops$getKcpSwitch", _GMOpsGetKcpSwitch); f8::MsgQueue::Instance()->RegisterCallBack ( IM_ExecGM, diff --git a/server/wsproxy/jsondatamgr.cc b/server/wsproxy/jsondatamgr.cc index 970e2b7..486e933 100644 --- a/server/wsproxy/jsondatamgr.cc +++ b/server/wsproxy/jsondatamgr.cc @@ -108,3 +108,8 @@ void JsonDataMgr::TraverseMaster(std::function cb) cb(instance_id, remote_ip, remote_port); } } + +void JsonDataMgr::SetKcpSwitch(int is_open) +{ + kcp_conf_.open = is_open ? 1 : 0; +} diff --git a/server/wsproxy/jsondatamgr.h b/server/wsproxy/jsondatamgr.h index 66bb92d..a2bc1b6 100644 --- a/server/wsproxy/jsondatamgr.h +++ b/server/wsproxy/jsondatamgr.h @@ -33,6 +33,7 @@ class JsonDataMgr : public a8::Singleton std::shared_ptr GetConf(); void TraverseMaster(std::function cb); const KcpConf& GetKcpConf() { return kcp_conf_; } + void SetKcpSwitch(int is_open); private: std::string work_path_ = "../config"; diff --git a/server/wsproxy/kcpsession.cc b/server/wsproxy/kcpsession.cc index efe4b3d..3010fc3 100644 --- a/server/wsproxy/kcpsession.cc +++ b/server/wsproxy/kcpsession.cc @@ -95,12 +95,17 @@ void KcpSession::DecodeUserPacket(char* buf, int& offset, unsigned int buflen) //2 + 2 + 4+ xx + \0 + xx bool warning = false; while (buflen - offset >= sizeof(f8::PackHead) + GetSecretKeyLen()) { + long long secret_key = KcpSession::ReadSecretKey(&buf[offset], buflen); + if (secret_key != secret_key_) { + warning = true; + offset++; + continue; + } f8::PackHead* p = (f8::PackHead*)&buf[offset + GetSecretKeyLen()]; if (p->magic_code == f8::MAGIC_CODE) { if (buflen - offset < sizeof(f8::PackHead) + p->packlen + GetSecretKeyLen()) { break; } - //a8::XPrintf("Recv MsgId:%d\n", {p->msgid}); App::Instance()->AddSocketMsg(SF_Client, socket_handle_, 0, diff --git a/server/wsproxy/longsessionmgr.cc b/server/wsproxy/longsessionmgr.cc index 3915a67..f66d778 100644 --- a/server/wsproxy/longsessionmgr.cc +++ b/server/wsproxy/longsessionmgr.cc @@ -39,7 +39,10 @@ void LongSessionMgr::Init() void LongSessionMgr::UnInit() { - + for (auto& pair : socket_handle_hash_) { + pair.second->UnInit(); + } + socket_handle_hash_.clear(); } void LongSessionMgr::Update() @@ -100,15 +103,60 @@ std::shared_ptr LongSessionMgr::GetSession(int socket_handle) void LongSessionMgr::ProcUdpPacket(a8::UdpPacket* pkt) { + #if 0 + f8::UdpLog::Instance()->Debug("ProcUdpPacket host:%s port:%d buflen:%d", + { + inet_ntoa(pkt->remote_addr.sin_addr), + pkt->remote_addr.sin_port, + pkt->buf_len + }); + #endif const int IKCP_OVERHEAD = 24; - if (pkt->buf_len > IKCP_OVERHEAD + KcpSession::GetSecretKeyLen()) { - int socket_handle = ikcp_getconv(pkt->buf); - long long secret_key = KcpSession::ReadSecretKey(pkt->buf + IKCP_OVERHEAD, pkt->buf_len); - auto session = GetSession(socket_handle); - if (session && secret_key == session->GetKcpSession()->GetSecretKey()) { - session->GetKcpSession()->OnRecvPacket(pkt); - } + + if (pkt->buf_len < IKCP_OVERHEAD) { + f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d buflen:%d over_head_error", + { + inet_ntoa(pkt->remote_addr.sin_addr), + pkt->remote_addr.sin_port, + pkt->buf_len + }); + return; } + int socket_handle = ikcp_getconv(pkt->buf); + auto session = GetSession(socket_handle); + if (!session) { + f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s socket_handle:%d session_error", + { + inet_ntoa(pkt->remote_addr.sin_addr), + pkt->remote_addr.sin_port, + socket_handle + }); + return; + } + session->GetKcpSession()->OnRecvPacket(pkt); +#if 0 + if (pkt->buf_len > IKCP_OVERHEAD + KcpSession::GetSecretKeyLen()) { + long long secret_key = KcpSession::ReadSecretKey(pkt->buf + IKCP_OVERHEAD, pkt->buf_len); + if (secret_key == session->GetKcpSession()->GetSecretKey()) { + session->GetKcpSession()->OnRecvPacket(pkt); + } else { + f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d socket_handle%d secret_key:%d secret_error", + { + inet_ntoa(pkt->remote_addr.sin_addr), + pkt->remote_addr.sin_port, + socket_handle, + secret_key + }); + } + } else { + f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d buflen:%d bufflen_error", + { + inet_ntoa(pkt->remote_addr.sin_addr), + pkt->remote_addr.sin_port, + pkt->buf_len + }); + } +#endif } void LongSessionMgr::DelSession(int socket_handle) @@ -126,3 +174,8 @@ void LongSessionMgr::DelSession(int socket_handle) } socket_handle_hash_.erase(socket_handle); } + +int LongSessionMgr::GetLongSessionCount() +{ + return socket_handle_hash_.size(); +} diff --git a/server/wsproxy/longsessionmgr.h b/server/wsproxy/longsessionmgr.h index 0be1d50..4595195 100644 --- a/server/wsproxy/longsessionmgr.h +++ b/server/wsproxy/longsessionmgr.h @@ -31,6 +31,7 @@ class LongSessionMgr : public a8::Singleton std::shared_ptr GetSession(int socket_handle); std::shared_ptr GetUdpListener() { return udp_listener_; } void DelSession(int socket_handle); + int GetLongSessionCount(); private: std::shared_ptr udp_listener_; diff --git a/server/wsproxy/mastermgr.cc b/server/wsproxy/mastermgr.cc index 0ed36cc..8049235 100644 --- a/server/wsproxy/mastermgr.cc +++ b/server/wsproxy/mastermgr.cc @@ -48,6 +48,10 @@ void MasterMgr::Init() void MasterMgr::UnInit() { + for (auto& pair : mastersvr_hash_) { + pair.second->UnInit(); + } + mastersvr_hash_.clear(); } void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg) diff --git a/third_party/a8 b/third_party/a8 index 4fb98b2..54fe50e 160000 --- a/third_party/a8 +++ b/third_party/a8 @@ -1 +1 @@ -Subproject commit 4fb98b218b1f30f57bfe63a41f8fcd6d1778fc99 +Subproject commit 54fe50edf11dda774dcd6fc480134c9e4ce449cd diff --git a/third_party/f8 b/third_party/f8 index c15cb12..ae5c84a 160000 --- a/third_party/f8 +++ b/third_party/f8 @@ -1 +1 @@ -Subproject commit c15cb12a75cdc0e190a76ffa94f88c0ec06eec41 +Subproject commit ae5c84ae98a314a6f29a2e29e9a2937801324c15