From d5f5b10714ffa476452cb9f7cefcfd65978e86fc Mon Sep 17 00:00:00 2001 From: azw Date: Mon, 17 Apr 2023 11:41:08 +0000 Subject: [PATCH] 1 --- server/wsproxy/GCListener.cc | 115 +++++++++++++++++++++++++++++++++-- server/wsproxy/GCListener.h | 1 + server/wsproxy/app.cc | 1 + 3 files changed, 113 insertions(+), 4 deletions(-) diff --git a/server/wsproxy/GCListener.cc b/server/wsproxy/GCListener.cc index 7f68bf4..7710c9b 100644 --- a/server/wsproxy/GCListener.cc +++ b/server/wsproxy/GCListener.cc @@ -18,6 +18,10 @@ #include "handlermgr.h" #include "ikcp.h" +#define USE_KCP 1 + +static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64; + class GCClientSession: public a8::WebSocketSession { public: @@ -152,10 +156,16 @@ public: kcp_->rx_minrto = 10; kcp_->fastresend = 1; kcp_->output = KcpSession::UdpOutput; + recv_buff_ = (char *)malloc(max_packet_len_ + 1); + init_tick_ = a8::XGetTickCount(); } void UnInit() { + if (recv_buff_) { + free(recv_buff_); + recv_buff_ = nullptr; + } if (kcp_) { ikcp_release(kcp_); kcp_ = nullptr; @@ -164,25 +174,108 @@ public: void Update() { - + long long tick = a8::XGetTickCount(); + ikcp_update(kcp_, tick - init_tick_); + UpdateInput(); } - void SendMsg(unsigned short msgid, const ::google::protobuf::Message& msg) + void SendClientMsg(char* buf, int buf_len) { - + ikcp_send(kcp_, buf, buf_len); } void OnRecvPacket(a8::UdpPacket* pkt) { - + ikcp_input(kcp_, pkt->buf, pkt->buf_len); } private: + + void UpdateInput() + { + char buf[DEFAULT_MAX_RECV_BUFFERSIZE]; + int buflen = ikcp_recv(kcp_, buf, DEFAULT_MAX_RECV_BUFFERSIZE - 10); + if (buflen <= 0) { + return; + } + + unsigned int already_read_bytes = 0; + do { + if (already_read_bytes < buflen) { + int read_bytes = std::min(buflen - already_read_bytes, + (unsigned int)max_packet_len_ - recv_bufflen_); + if (read_bytes > 0) { + memmove(&recv_buff_[recv_bufflen_], buf + already_read_bytes, read_bytes); + recv_bufflen_ += read_bytes; + already_read_bytes += read_bytes; + } + } + + int offset = 0; + int prev_offset = 0; + do { + prev_offset = offset; + DecodePacket(recv_buff_, offset, recv_bufflen_); + } while (prev_offset < offset && offset < recv_bufflen_); + + if (offset > 0 && offset < recv_bufflen_){ + memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); + } + recv_bufflen_ -= offset; + if (recv_bufflen_ >= max_packet_len_) { + //收到超长包 + #if 1 + abort(); + #else + //Close(); + #endif + return; + } + } while (already_read_bytes < buflen); + } + + void DecodePacket(char* buf, int& offset, unsigned int buflen) + { + //packagelen + msgid + magiccode + msgbody + //2 + 2 + 4+ xx + \0 + xx + bool warning = false; + while (buflen - offset >= sizeof(f8::PackHead)) { + f8::PackHead* p = (f8::PackHead*)&buf[offset]; + if (p->magic_code == f8::MAGIC_CODE) { + if (buflen - offset < sizeof(f8::PackHead) + p->packlen) { + break; + } + App::Instance()->AddSocketMsg(SF_Client, + socket_handle_, + 0, + //saddr, + p->msgid, + p->seqid, + &buf[offset + sizeof(f8::PackHead)], + p->packlen); + offset += sizeof(f8::PackHead) + p->packlen; + } else { + warning = true; + offset++; + continue; + } + } + + if (warning) { + f8::UdpLog::Instance()->Warning("收到client非法数据包", {}); + } + } + +private: + long long init_tick_ = 0; a8::UdpListener* udp_listener_ = nullptr; ikcpcb* kcp_ = nullptr; int socket_handle_ = 0; long long remote_key_ = 0; sockaddr_in remote_addr_ = {}; + char* recv_buff_ = nullptr; + int recv_bufflen_ = 0; + int max_packet_len_ = 1024 * 64 *2; }; static void GSUdpListeneron_error(int errorid) @@ -233,7 +326,14 @@ void GCListener::ForwardUpStreamMsg(f8::MsgHdr& hdr) memmove(buff + sizeof(f8::PackHead), hdr.buf, hdr.buflen); } + #ifdef USE_KCP + auto session = GetKcpSessionBySocketHandle(hdr.socket_handle); + if (session) { + session->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen); + } + #else tcp_listener_->SendClientMsg(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen); + #endif free(buff); } @@ -287,3 +387,10 @@ void GCListener::ProcUdpPacket(a8::UdpPacket* pkt) } session->OnRecvPacket(pkt); } + +void GCListener::Update() +{ + for (auto& pair : kcp_session_addr_hash_) { + pair.second->Update(); + } +} diff --git a/server/wsproxy/GCListener.h b/server/wsproxy/GCListener.h index 7a3798f..f004263 100644 --- a/server/wsproxy/GCListener.h +++ b/server/wsproxy/GCListener.h @@ -21,6 +21,7 @@ class GCListener : public a8::Singleton public: void Init(); void UnInit(); + void Update(); template void SendMsg(unsigned short socket_handle, T& msg) diff --git a/server/wsproxy/app.cc b/server/wsproxy/app.cc index 2a6ca60..165e5d3 100644 --- a/server/wsproxy/app.cc +++ b/server/wsproxy/app.cc @@ -252,6 +252,7 @@ void App::QuickExecute() f8::MsgQueue::Instance()->Update(); DispatchMsg(); DispatchUdpMsg(); + GCListener::Instance()->Update(); f8::Timer::Instance()->Update(); }