This commit is contained in:
azw 2023-05-04 15:23:46 +00:00
commit cc60352018
17 changed files with 156 additions and 17 deletions

View File

@ -8,6 +8,11 @@ message SS_CMPing
{
}
message SS_SMPing
{
optional int32 param1 = 1; optional int32 source = 2 [default = 0]; //0:tcp 1:udp
}
message SS_CMLogin_CMReConnect_CommonHead
{
optional int32 server_id = 1;

View File

@ -102,7 +102,6 @@ target_link_libraries(
curl
hiredis
tinyxml2
tcmalloc
)
if (CMAKE_BUILD_TYPE STREQUAL "Debug")

View File

@ -193,3 +193,8 @@ bool GCListener::GetWebSocketUrl(int socket_handle, std::string& url, std::strin
return false;
}
}
int GCListener::GetSocketCount()
{
return tcp_listener_->GetClientSocketCount();
}

View File

@ -40,6 +40,7 @@ class GCListener : public a8::Singleton<GCListener>
long long GetSendNodeNum();
long long GetSentBytesNum();
bool GetWebSocketUrl(int socket_handle, std::string& url, std::string& query_str);
int GetSocketCount();
private:
a8::TcpListener *tcp_listener_ = nullptr;

View File

@ -55,10 +55,10 @@ const char* const PROJ_LOG_FILENAME_FMT = "log_$pid_%Y%m%d.log";
static void SavePerfLog()
{
f8::UdpLog::Instance()->Info(" max_run_delay_time:%d max_timer_idle:%d "
f8::UdpLog::Instance()->Info("max_run_delay_time:%d max_timer_idle:%d "
"in_data_size:%d out_data_size:%d msgnode_size:%d udp_msgnode_size:%d "
"read_count:%d max_login_time:%d "
"max_join_time:%d",
"max_join_time:%d tcp_count:%d udp_count:%d down_stream_count:%d",
{
App::Instance()->GetPerf().max_run_delay_time,
App::Instance()->GetPerf().max_timer_idle,
@ -69,6 +69,9 @@ static void SavePerfLog()
App::Instance()->GetPerf().read_count,
App::Instance()->GetPerf().max_login_time,
App::Instance()->GetPerf().max_join_time,
GCListener::Instance()->GetSocketCount(),
LongSessionMgr::Instance()->GetLongSessionCount(),
DownStreamMgr::Instance()->GetDownStreamCount()
});
if (App::Instance()->HasFlag(2)) {
a8::XPrintf("mainloop_time:%d netmsg_time:%d send_node_num:%d sent_bytes_num:%d\n",
@ -567,8 +570,14 @@ void App::FreeUdpMsgQueue()
}
while (udp_work_node_) {
UdpMsgNode* pdelnode = udp_work_node_;
delete pdelnode->pkt;
udp_work_node_ = udp_work_node_->next;
{
if (pdelnode->pkt->buf) {
free((void*)pdelnode->pkt->buf);
}
delete pdelnode->pkt;
free(pdelnode);
}
if (!udp_work_node_) {
udp_work_node_ = udp_top_node_;
udp_top_node_ = nullptr;
@ -612,6 +621,13 @@ void App::DispatchUdpMsg()
UdpMsgNode *pdelnode = udp_work_node_;
LongSessionMgr::Instance()->ProcUdpPacket(pdelnode->pkt);
udp_work_node_ = pdelnode->next;
{
if (pdelnode->pkt->buf) {
free((void*)pdelnode->pkt->buf);
}
delete pdelnode->pkt;
free(pdelnode);
}
udp_working_msgnode_size_--;
if (a8::XGetTickCount() - starttick > 200) {
break;

View File

@ -37,7 +37,25 @@ void DownStream::ForwardUpStreamMsg(f8::MsgHdr& hdr)
}
if (auto long_session = long_session_wp_.lock(); !long_session_wp_.expired()) {
if (hdr.msgid == ss::_SS_CMPing) {
ss::SS_SMPing msg;
msg.set_source(1);
{
free(buff);
buff = (char*)malloc(sizeof(f8::PackHead) + msg.ByteSize());
f8::PackHead* head = (f8::PackHead*)buff;
head->packlen = msg.ByteSize();
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
head->ext_len = hdr.buflen >> 16;
msg.SerializeToArray(buff + sizeof(f8::PackHead), head->packlen);
long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen);
}
} else {
long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen);
}
} else {
GCListener::Instance()->SendBuf(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen);
}
@ -58,7 +76,7 @@ void DownStream::OnClose()
void DownStream::ProcCMMsg(f8::MsgHdr& hdr, int tag)
{
if (hdr.msgid == ss::_SS_CMPing && IsLongSession() && tag == ST_Tcp) {
ss::SS_Ping msg;
ss::SS_SMPing msg;
GCListener::Instance()->SendMsgEx(socket_handle_, ss::_SS_CMPing, msg);
if (!long_session_wp_.expired()) {
long_session_wp_.lock()->UpdatePing();

View File

@ -152,3 +152,8 @@ void DownStreamMgr::RemovePendingAccount(int socket_handle)
pending_account_hash_.erase(itr);
}
}
int DownStreamMgr::GetDownStreamCount()
{
return socket_hash_.size();
}

View File

@ -20,6 +20,7 @@ class DownStreamMgr : public a8::Singleton<DownStreamMgr>
std::weak_ptr<DownStream> GetDownStream(int sockhande);
void BindUpStream(int socket_handle, int instance_id);
void AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick);
int GetDownStreamCount();
private:

View File

@ -9,6 +9,7 @@
#include "GCListener.h"
#include "mastermgr.h"
#include "app.h"
#include "jsondatamgr.h"
#include "ss_proto.pb.h"
@ -27,11 +28,30 @@ static void _GMOpsGetNodeId(std::shared_ptr<f8::JsonHttpRequest> request)
request->resp_xobj->SetVal("node_id", App::Instance()->GetNodeId());
}
static void _GMOpsSetKcpSwitch(std::shared_ptr<f8::JsonHttpRequest> request)
{
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
if (request->params->HasKey("open")) {
JsonDataMgr::Instance()->SetKcpSwitch(request->params->At("open")->AsXValue().GetInt());
}
request->resp_xobj->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open);
}
static void _GMOpsGetKcpSwitch(std::shared_ptr<f8::JsonHttpRequest> request)
{
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open);
}
void HandlerMgr::Init()
{
RegisterNetMsgHandlers();
RegisterGMMsgHandler("Ops$selfChecking", _GMOpsSelfChecking);
RegisterGMMsgHandler("Ops$getNodeId", _GMOpsGetNodeId);
RegisterGMMsgHandler("Ops$setKcpSwitch", _GMOpsSetKcpSwitch);
RegisterGMMsgHandler("Ops$getKcpSwitch", _GMOpsGetKcpSwitch);
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_ExecGM,

View File

@ -108,3 +108,8 @@ void JsonDataMgr::TraverseMaster(std::function<void (int, std::string, int)> cb)
cb(instance_id, remote_ip, remote_port);
}
}
void JsonDataMgr::SetKcpSwitch(int is_open)
{
kcp_conf_.open = is_open ? 1 : 0;
}

View File

@ -33,6 +33,7 @@ class JsonDataMgr : public a8::Singleton<JsonDataMgr>
std::shared_ptr<a8::XObject> GetConf();
void TraverseMaster(std::function<void (int, std::string, int)> cb);
const KcpConf& GetKcpConf() { return kcp_conf_; }
void SetKcpSwitch(int is_open);
private:
std::string work_path_ = "../config";

View File

@ -95,12 +95,17 @@ void KcpSession::DecodeUserPacket(char* buf, int& offset, unsigned int buflen)
//2 + 2 + 4+ xx + \0 + xx
bool warning = false;
while (buflen - offset >= sizeof(f8::PackHead) + GetSecretKeyLen()) {
long long secret_key = KcpSession::ReadSecretKey(&buf[offset], buflen);
if (secret_key != secret_key_) {
warning = true;
offset++;
continue;
}
f8::PackHead* p = (f8::PackHead*)&buf[offset + GetSecretKeyLen()];
if (p->magic_code == f8::MAGIC_CODE) {
if (buflen - offset < sizeof(f8::PackHead) + p->packlen + GetSecretKeyLen()) {
break;
}
//a8::XPrintf("Recv MsgId:%d\n", {p->msgid});
App::Instance()->AddSocketMsg(SF_Client,
socket_handle_,
0,

View File

@ -39,7 +39,10 @@ void LongSessionMgr::Init()
void LongSessionMgr::UnInit()
{
for (auto& pair : socket_handle_hash_) {
pair.second->UnInit();
}
socket_handle_hash_.clear();
}
void LongSessionMgr::Update()
@ -100,15 +103,60 @@ std::shared_ptr<LongSession> LongSessionMgr::GetSession(int socket_handle)
void LongSessionMgr::ProcUdpPacket(a8::UdpPacket* pkt)
{
#if 0
f8::UdpLog::Instance()->Debug("ProcUdpPacket host:%s port:%d buflen:%d",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
#endif
const int IKCP_OVERHEAD = 24;
if (pkt->buf_len > IKCP_OVERHEAD + KcpSession::GetSecretKeyLen()) {
if (pkt->buf_len < IKCP_OVERHEAD) {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d buflen:%d over_head_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
return;
}
int socket_handle = ikcp_getconv(pkt->buf);
long long secret_key = KcpSession::ReadSecretKey(pkt->buf + IKCP_OVERHEAD, pkt->buf_len);
auto session = GetSession(socket_handle);
if (session && secret_key == session->GetKcpSession()->GetSecretKey()) {
if (!session) {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s socket_handle:%d session_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
socket_handle
});
return;
}
session->GetKcpSession()->OnRecvPacket(pkt);
#if 0
if (pkt->buf_len > IKCP_OVERHEAD + KcpSession::GetSecretKeyLen()) {
long long secret_key = KcpSession::ReadSecretKey(pkt->buf + IKCP_OVERHEAD, pkt->buf_len);
if (secret_key == session->GetKcpSession()->GetSecretKey()) {
session->GetKcpSession()->OnRecvPacket(pkt);
} else {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d socket_handle%d secret_key:%d secret_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
socket_handle,
secret_key
});
}
} else {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d buflen:%d bufflen_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
}
#endif
}
void LongSessionMgr::DelSession(int socket_handle)
@ -126,3 +174,8 @@ void LongSessionMgr::DelSession(int socket_handle)
}
socket_handle_hash_.erase(socket_handle);
}
int LongSessionMgr::GetLongSessionCount()
{
return socket_handle_hash_.size();
}

View File

@ -31,6 +31,7 @@ class LongSessionMgr : public a8::Singleton<LongSessionMgr>
std::shared_ptr<LongSession> GetSession(int socket_handle);
std::shared_ptr<a8::UdpListener> GetUdpListener() { return udp_listener_; }
void DelSession(int socket_handle);
int GetLongSessionCount();
private:
std::shared_ptr<a8::UdpListener> udp_listener_;

View File

@ -48,6 +48,10 @@ void MasterMgr::Init()
void MasterMgr::UnInit()
{
for (auto& pair : mastersvr_hash_) {
pair.second->UnInit();
}
mastersvr_hash_.clear();
}
void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg)

2
third_party/a8 vendored

@ -1 +1 @@
Subproject commit 4fb98b218b1f30f57bfe63a41f8fcd6d1778fc99
Subproject commit 54fe50edf11dda774dcd6fc480134c9e4ce449cd

2
third_party/f8 vendored

@ -1 +1 @@
Subproject commit c15cb12a75cdc0e190a76ffa94f88c0ec06eec41
Subproject commit ae5c84ae98a314a6f29a2e29e9a2937801324c15