1
This commit is contained in:
parent
393bdfff49
commit
d5f5b10714
@ -18,6 +18,10 @@
|
||||
#include "handlermgr.h"
|
||||
#include "ikcp.h"
|
||||
|
||||
#define USE_KCP 1
|
||||
|
||||
static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64;
|
||||
|
||||
class GCClientSession: public a8::WebSocketSession
|
||||
{
|
||||
public:
|
||||
@ -152,10 +156,16 @@ public:
|
||||
kcp_->rx_minrto = 10;
|
||||
kcp_->fastresend = 1;
|
||||
kcp_->output = KcpSession::UdpOutput;
|
||||
recv_buff_ = (char *)malloc(max_packet_len_ + 1);
|
||||
init_tick_ = a8::XGetTickCount();
|
||||
}
|
||||
|
||||
void UnInit()
|
||||
{
|
||||
if (recv_buff_) {
|
||||
free(recv_buff_);
|
||||
recv_buff_ = nullptr;
|
||||
}
|
||||
if (kcp_) {
|
||||
ikcp_release(kcp_);
|
||||
kcp_ = nullptr;
|
||||
@ -164,25 +174,108 @@ public:
|
||||
|
||||
void Update()
|
||||
{
|
||||
|
||||
long long tick = a8::XGetTickCount();
|
||||
ikcp_update(kcp_, tick - init_tick_);
|
||||
UpdateInput();
|
||||
}
|
||||
|
||||
void SendMsg(unsigned short msgid, const ::google::protobuf::Message& msg)
|
||||
void SendClientMsg(char* buf, int buf_len)
|
||||
{
|
||||
|
||||
ikcp_send(kcp_, buf, buf_len);
|
||||
}
|
||||
|
||||
void OnRecvPacket(a8::UdpPacket* pkt)
|
||||
{
|
||||
|
||||
ikcp_input(kcp_, pkt->buf, pkt->buf_len);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
void UpdateInput()
|
||||
{
|
||||
char buf[DEFAULT_MAX_RECV_BUFFERSIZE];
|
||||
int buflen = ikcp_recv(kcp_, buf, DEFAULT_MAX_RECV_BUFFERSIZE - 10);
|
||||
if (buflen <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
unsigned int already_read_bytes = 0;
|
||||
do {
|
||||
if (already_read_bytes < buflen) {
|
||||
int read_bytes = std::min(buflen - already_read_bytes,
|
||||
(unsigned int)max_packet_len_ - recv_bufflen_);
|
||||
if (read_bytes > 0) {
|
||||
memmove(&recv_buff_[recv_bufflen_], buf + already_read_bytes, read_bytes);
|
||||
recv_bufflen_ += read_bytes;
|
||||
already_read_bytes += read_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
int offset = 0;
|
||||
int prev_offset = 0;
|
||||
do {
|
||||
prev_offset = offset;
|
||||
DecodePacket(recv_buff_, offset, recv_bufflen_);
|
||||
} while (prev_offset < offset && offset < recv_bufflen_);
|
||||
|
||||
if (offset > 0 && offset < recv_bufflen_){
|
||||
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
|
||||
}
|
||||
recv_bufflen_ -= offset;
|
||||
if (recv_bufflen_ >= max_packet_len_) {
|
||||
//收到超长包
|
||||
#if 1
|
||||
abort();
|
||||
#else
|
||||
//Close();
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
} while (already_read_bytes < buflen);
|
||||
}
|
||||
|
||||
void DecodePacket(char* buf, int& offset, unsigned int buflen)
|
||||
{
|
||||
//packagelen + msgid + magiccode + msgbody
|
||||
//2 + 2 + 4+ xx + \0 + xx
|
||||
bool warning = false;
|
||||
while (buflen - offset >= sizeof(f8::PackHead)) {
|
||||
f8::PackHead* p = (f8::PackHead*)&buf[offset];
|
||||
if (p->magic_code == f8::MAGIC_CODE) {
|
||||
if (buflen - offset < sizeof(f8::PackHead) + p->packlen) {
|
||||
break;
|
||||
}
|
||||
App::Instance()->AddSocketMsg(SF_Client,
|
||||
socket_handle_,
|
||||
0,
|
||||
//saddr,
|
||||
p->msgid,
|
||||
p->seqid,
|
||||
&buf[offset + sizeof(f8::PackHead)],
|
||||
p->packlen);
|
||||
offset += sizeof(f8::PackHead) + p->packlen;
|
||||
} else {
|
||||
warning = true;
|
||||
offset++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (warning) {
|
||||
f8::UdpLog::Instance()->Warning("收到client非法数据包", {});
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
long long init_tick_ = 0;
|
||||
a8::UdpListener* udp_listener_ = nullptr;
|
||||
ikcpcb* kcp_ = nullptr;
|
||||
int socket_handle_ = 0;
|
||||
long long remote_key_ = 0;
|
||||
sockaddr_in remote_addr_ = {};
|
||||
char* recv_buff_ = nullptr;
|
||||
int recv_bufflen_ = 0;
|
||||
int max_packet_len_ = 1024 * 64 *2;
|
||||
};
|
||||
|
||||
static void GSUdpListeneron_error(int errorid)
|
||||
@ -233,7 +326,14 @@ void GCListener::ForwardUpStreamMsg(f8::MsgHdr& hdr)
|
||||
memmove(buff + sizeof(f8::PackHead), hdr.buf, hdr.buflen);
|
||||
}
|
||||
|
||||
#ifdef USE_KCP
|
||||
auto session = GetKcpSessionBySocketHandle(hdr.socket_handle);
|
||||
if (session) {
|
||||
session->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen);
|
||||
}
|
||||
#else
|
||||
tcp_listener_->SendClientMsg(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen);
|
||||
#endif
|
||||
free(buff);
|
||||
}
|
||||
|
||||
@ -287,3 +387,10 @@ void GCListener::ProcUdpPacket(a8::UdpPacket* pkt)
|
||||
}
|
||||
session->OnRecvPacket(pkt);
|
||||
}
|
||||
|
||||
void GCListener::Update()
|
||||
{
|
||||
for (auto& pair : kcp_session_addr_hash_) {
|
||||
pair.second->Update();
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ class GCListener : public a8::Singleton<GCListener>
|
||||
public:
|
||||
void Init();
|
||||
void UnInit();
|
||||
void Update();
|
||||
|
||||
template <typename T>
|
||||
void SendMsg(unsigned short socket_handle, T& msg)
|
||||
|
@ -252,6 +252,7 @@ void App::QuickExecute()
|
||||
f8::MsgQueue::Instance()->Update();
|
||||
DispatchMsg();
|
||||
DispatchUdpMsg();
|
||||
GCListener::Instance()->Update();
|
||||
f8::Timer::Instance()->Update();
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user