diff --git a/server/wsproxy/GCListener.cc b/server/wsproxy/GCListener.cc index 35043eb..103c771 100644 --- a/server/wsproxy/GCListener.cc +++ b/server/wsproxy/GCListener.cc @@ -17,11 +17,10 @@ #include "ss_proto.pb.h" #include "handlermgr.h" #include "ikcp.h" +#include "kcpsession.h" #define USE_KCP 1 -static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64; - class GCClientSession: public a8::WebSocketSession { public: @@ -129,156 +128,6 @@ static void GSListeneron_error(a8::TcpListener*, int type, int errorid) f8::UdpLog::Instance()->Debug("GCListeneron_error %d %d", {type, errorid}); } -class KcpSession -{ -public: - - static int UdpOutput(const char *buf, int len, ikcpcb *kcp, void *user) - { - KcpSession* session = (KcpSession*)user; - a8::UdpPacket* pkt = new a8::UdpPacket(); - pkt->buf = buf; - pkt->buf_len = len; - pkt->remote_addr = session->remote_addr_; - session->udp_listener_->SendUdpPacket(pkt); - return 0; - } - - void Init(a8::UdpListener* udp_listener, int socket_handle, a8::UdpPacket* pkt) - { - udp_listener_ = udp_listener; - remote_key_ = pkt->GetRemoteKey(); - remote_addr_ = pkt->remote_addr; - socket_handle_ = socket_handle; - kcp_ = ikcp_create(socket_handle_, (void*)this); - ikcp_wndsize(kcp_, 128, 128); - ikcp_nodelay(kcp_, 2, 10, 2, 1); - kcp_->rx_minrto = 10; - kcp_->fastresend = 1; - kcp_->output = KcpSession::UdpOutput; - recv_buff_ = (char *)malloc(max_packet_len_ + 1); - init_tick_ = a8::XGetTickCount(); - } - - void UnInit() - { - if (recv_buff_) { - free(recv_buff_); - recv_buff_ = nullptr; - } - if (kcp_) { - ikcp_release(kcp_); - kcp_ = nullptr; - } - } - - void Update() - { - long long tick = a8::XGetTickCount(); - ikcp_update(kcp_, tick - init_tick_); - UpdateInput(); - } - - void SendClientMsg(char* buf, int buf_len) - { - ikcp_send(kcp_, buf, buf_len); - } - - void OnRecvPacket(a8::UdpPacket* pkt) - { - ikcp_input(kcp_, pkt->buf, pkt->buf_len); - } - -private: - - void UpdateInput() - { - char buf[DEFAULT_MAX_RECV_BUFFERSIZE]; - int buflen = ikcp_recv(kcp_, buf, DEFAULT_MAX_RECV_BUFFERSIZE - 10); - if (buflen <= 0) { - return; - } - - unsigned int already_read_bytes = 0; - do { - if (already_read_bytes < buflen) { - int read_bytes = std::min(buflen - already_read_bytes, - (unsigned int)max_packet_len_ - recv_bufflen_); - if (read_bytes > 0) { - memmove(&recv_buff_[recv_bufflen_], buf + already_read_bytes, read_bytes); - recv_bufflen_ += read_bytes; - already_read_bytes += read_bytes; - } - } - - int offset = 0; - int prev_offset = 0; - do { - prev_offset = offset; - DecodePacket(recv_buff_, offset, recv_bufflen_); - } while (prev_offset < offset && offset < recv_bufflen_); - - if (offset > 0 && offset < recv_bufflen_){ - memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); - } - recv_bufflen_ -= offset; - if (recv_bufflen_ >= max_packet_len_) { - //收到超长包 - #if 1 - abort(); - #else - //Close(); - #endif - return; - } - } while (already_read_bytes < buflen); - } - - void DecodePacket(char* buf, int& offset, unsigned int buflen) - { - //packagelen + msgid + magiccode + msgbody - //2 + 2 + 4+ xx + \0 + xx - bool warning = false; - while (buflen - offset >= sizeof(f8::PackHead)) { - f8::PackHead* p = (f8::PackHead*)&buf[offset]; - if (p->magic_code == f8::MAGIC_CODE) { - if (buflen - offset < sizeof(f8::PackHead) + p->packlen) { - break; - } - //a8::XPrintf("Recv MsgId:%d\n", {p->msgid}); - App::Instance()->AddSocketMsg(SF_Client, - socket_handle_, - 0, - //saddr, - p->msgid, - p->seqid, - &buf[offset + sizeof(f8::PackHead)], - p->packlen); - offset += sizeof(f8::PackHead) + p->packlen; - } else { - warning = true; - offset++; - continue; - } - } - - if (warning) { - f8::UdpLog::Instance()->Warning("收到client非法数据包", {}); - } - } - -private: - long long init_tick_ = 0; - a8::UdpListener* udp_listener_ = nullptr; - ikcpcb* kcp_ = nullptr; - int socket_handle_ = 0; - long long remote_key_ = 0; - sockaddr_in remote_addr_ = {}; - char* recv_buff_ = nullptr; - int recv_bufflen_ = 0; - int max_packet_len_ = 1024 * 64 *2; -}; - static void GSUdpListeneron_error(int errorid) { f8::UdpLog::Instance()->Debug("GCUdpListeneron_error %d", {errorid}); diff --git a/server/wsproxy/kcpsession.cc b/server/wsproxy/kcpsession.cc new file mode 100644 index 0000000..0438bea --- /dev/null +++ b/server/wsproxy/kcpsession.cc @@ -0,0 +1,144 @@ +#include "precompile.h" + +#include + +#include +#include + +#include "kcpsession.h" +#include "app.h" + +static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64; + +static int UdpOutput(const char *buf, int len, ikcpcb *kcp, void *user) +{ + KcpSession* session = (KcpSession*)user; + a8::UdpPacket* pkt = new a8::UdpPacket(); + pkt->buf = buf; + pkt->buf_len = len; + pkt->remote_addr = session->GetAddr(); + session->GetUdpListener()->SendUdpPacket(pkt); + return 0; +} + +void KcpSession::Init(a8::UdpListener* udp_listener, int socket_handle, a8::UdpPacket* pkt) +{ + udp_listener_ = udp_listener; + remote_key_ = pkt->GetRemoteKey(); + remote_addr_ = pkt->remote_addr; + socket_handle_ = socket_handle; + kcp_ = ikcp_create(socket_handle_, (void*)this); + ikcp_wndsize(kcp_, 128, 128); + ikcp_nodelay(kcp_, 2, 10, 2, 1); + kcp_->rx_minrto = 10; + kcp_->fastresend = 1; + kcp_->output = UdpOutput; + recv_buff_ = (char *)malloc(max_packet_len_ + 1); + init_tick_ = a8::XGetTickCount(); +} + +void KcpSession::UnInit() +{ + if (recv_buff_) { + free(recv_buff_); + recv_buff_ = nullptr; + } + if (kcp_) { + ikcp_release(kcp_); + kcp_ = nullptr; + } +} + +void KcpSession::Update() +{ + long long tick = a8::XGetTickCount(); + ikcp_update(kcp_, tick - init_tick_); + UpdateInput(); +} + +void KcpSession::SendClientMsg(char* buf, int buf_len) +{ + ikcp_send(kcp_, buf, buf_len); +} + +void KcpSession::OnRecvPacket(a8::UdpPacket* pkt) +{ + ikcp_input(kcp_, pkt->buf, pkt->buf_len); +} + + +void KcpSession::UpdateInput() +{ + char buf[DEFAULT_MAX_RECV_BUFFERSIZE]; + int buflen = ikcp_recv(kcp_, buf, DEFAULT_MAX_RECV_BUFFERSIZE - 10); + if (buflen <= 0) { + return; + } + + unsigned int already_read_bytes = 0; + do { + if (already_read_bytes < buflen) { + int read_bytes = std::min(buflen - already_read_bytes, + (unsigned int)max_packet_len_ - recv_bufflen_); + if (read_bytes > 0) { + memmove(&recv_buff_[recv_bufflen_], buf + already_read_bytes, read_bytes); + recv_bufflen_ += read_bytes; + already_read_bytes += read_bytes; + } + } + + int offset = 0; + int prev_offset = 0; + do { + prev_offset = offset; + DecodePacket(recv_buff_, offset, recv_bufflen_); + } while (prev_offset < offset && offset < recv_bufflen_); + + if (offset > 0 && offset < recv_bufflen_){ + memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); + } + recv_bufflen_ -= offset; + if (recv_bufflen_ >= max_packet_len_) { + //收到超长包 +#if 1 + abort(); +#else + //Close(); +#endif + return; + } + } while (already_read_bytes < buflen); +} + +void KcpSession::DecodePacket(char* buf, int& offset, unsigned int buflen) +{ + //packagelen + msgid + magiccode + msgbody + //2 + 2 + 4+ xx + \0 + xx + bool warning = false; + while (buflen - offset >= sizeof(f8::PackHead)) { + f8::PackHead* p = (f8::PackHead*)&buf[offset]; + if (p->magic_code == f8::MAGIC_CODE) { + if (buflen - offset < sizeof(f8::PackHead) + p->packlen) { + break; + } + //a8::XPrintf("Recv MsgId:%d\n", {p->msgid}); + App::Instance()->AddSocketMsg(SF_Client, + socket_handle_, + 0, + //saddr, + p->msgid, + p->seqid, + &buf[offset + sizeof(f8::PackHead)], + p->packlen); + offset += sizeof(f8::PackHead) + p->packlen; + } else { + warning = true; + offset++; + continue; + } + } + + if (warning) { + f8::UdpLog::Instance()->Warning("收到client非法数据包", {}); + } +} diff --git a/server/wsproxy/kcpsession.h b/server/wsproxy/kcpsession.h new file mode 100644 index 0000000..5723bc1 --- /dev/null +++ b/server/wsproxy/kcpsession.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include + +#include "ikcp.h" + +class KcpSession +{ +public: + + void Init(a8::UdpListener* udp_listener, int socket_handle, a8::UdpPacket* pkt); + void UnInit(); + void Update(); + void SendClientMsg(char* buf, int buf_len); + void OnRecvPacket(a8::UdpPacket* pkt); + const sockaddr_in& GetAddr() const { return remote_addr_; } + a8::UdpListener* GetUdpListener() { return udp_listener_; } + void UpdateInput(); + + private: + void DecodePacket(char* buf, int& offset, unsigned int buflen); + +private: + long long init_tick_ = 0; + a8::UdpListener* udp_listener_ = nullptr; + ikcpcb* kcp_ = nullptr; + int socket_handle_ = 0; + long long remote_key_ = 0; + sockaddr_in remote_addr_ = {}; + char* recv_buff_ = nullptr; + int recv_bufflen_ = 0; + int max_packet_len_ = 1024 * 64 *2; +}; diff --git a/server/wsproxy/longsession.cc b/server/wsproxy/longsession.cc index 8e9fc06..e2fc1d4 100644 --- a/server/wsproxy/longsession.cc +++ b/server/wsproxy/longsession.cc @@ -1,3 +1,13 @@ #include "precompile.h" #include "longsession.h" + +void LongSession::Init() +{ + +} + +void LongSession::UnInit() +{ + +} diff --git a/server/wsproxy/longsession.h b/server/wsproxy/longsession.h index 16bbefa..fe86693 100644 --- a/server/wsproxy/longsession.h +++ b/server/wsproxy/longsession.h @@ -1,9 +1,18 @@ #pragma once +class KcpSession; class LongSession { + public: + + void Init(); + void UnInit(); + int GetSocketHandle() const { return socket_handle_; } + long long GetSecretKey() const { return secret_key_; } + private: int socket_handle_ = a8::INVALID_SOCKET_HANDLE; - + long long secret_key_ = 0; + std::shared_ptr kcp_session_; }; diff --git a/server/wsproxy/longsessionmgr.h b/server/wsproxy/longsessionmgr.h index f099339..8f850ea 100644 --- a/server/wsproxy/longsessionmgr.h +++ b/server/wsproxy/longsessionmgr.h @@ -14,9 +14,9 @@ class LongSessionMgr : public a8::Singleton void Init(); void UnInit(); - std::shared_ptr GetSessionBySocketHandle(int socket_handle); + std::shared_ptr GetSession(int socket_handle); private: - std::map> session_hash_; + std::map> socket_handle_hash_; };