wsproxy/server/wsproxy/longsessionmgr.cc
2023-06-09 06:57:46 +00:00

183 lines
6.1 KiB
C++

#include "precompile.h"
#include <a8/udplistener.h>
#include <f8/netmsghandler.h>
#include <f8/udplog.h>
#include "longsessionmgr.h"
#include "app.h"
#include "jsondatamgr.h"
#include "longsession.h"
#include "kcpsession.h"
#include "GCListener.h"
#include "downstreammgr.h"
#include "ss_msgid.pb.h"
#include "ss_proto.pb.h"
static void GSUdpListeneron_error(int errorid)
{
f8::UdpLog::Instance()->Debug("GCUdpListeneron_error %d", {errorid});
}
static void GSUdpListeneron_recv_packet(a8::UdpPacket* pkt)
{
App::Instance()->AddUdpMsg(pkt);
}
void LongSessionMgr::Init()
{
udp_listener_ = std::make_shared<a8::UdpListener>();
udp_listener_->on_error = GSUdpListeneron_error;
udp_listener_->on_recv_packet = GSUdpListeneron_recv_packet;
udp_listener_->bind_address = "0.0.0.0";
udp_listener_->bind_port = JsonDataMgr::Instance()->GetUdpPort();
udp_listener_->Open();
}
void LongSessionMgr::UnInit()
{
for (auto& pair : socket_handle_hash_) {
pair.second->UnInit();
}
socket_handle_hash_.clear();
}
void LongSessionMgr::Update()
{
long long tick = a8::XGetTickCount();
for (auto& pair : socket_handle_hash_) {
pair.second->Update(tick);
}
}
void LongSessionMgr::_SS_CMKcpHandshake(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg)
{
ss::SS_SMKcpHandshake respmsg;
respmsg.set_errcode(0);
if (!JsonDataMgr::Instance()->GetKcpConf().open) {
respmsg.set_errcode(1);
respmsg.set_errmsg("not support kcp");
GCListener::Instance()->SendMsgEx(hdr.socket_handle, ss::_SS_CMKcpHandshake, respmsg);
return;
}
if (GetSession(hdr.socket_handle)) {
#ifdef DEBUG
abort();
#endif
return;
}
auto session = std::make_shared<LongSession>();
session->Init(hdr, msg);
socket_handle_hash_[session->GetKcpSession()->GetSocketHandle()] = session;
respmsg.set_conv(session->GetKcpSession()->GetSocketHandle());
respmsg.set_secret_key(session->GetKcpSession()->GetSecretKeyDataPtr(), KcpSession::GetSecretKeyLen());
respmsg.set_remote_host(JsonDataMgr::Instance()->GetUdpHost());
respmsg.set_remote_port(JsonDataMgr::Instance()->GetUdpPort());
GCListener::Instance()->SendMsgEx(hdr.socket_handle, ss::_SS_CMKcpHandshake, respmsg);
{
int socket_handle = hdr.socket_handle;
f8::Timer::Instance()->SetTimeout
(
1000 * 30,
[this, socket_handle] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
if (DownStreamMgr::Instance()->GetDownStream(socket_handle).expired()) {
GCListener::Instance()->ForceCloseClient(socket_handle);
}
}
}
);
}
}
std::shared_ptr<LongSession> LongSessionMgr::GetSession(int socket_handle)
{
auto itr = socket_handle_hash_.find(socket_handle);
return itr != socket_handle_hash_.end() ? itr->second : nullptr;
}
void LongSessionMgr::ProcUdpPacket(a8::UdpPacket* pkt)
{
#if 0
f8::UdpLog::Instance()->Debug("ProcUdpPacket host:%s port:%d buflen:%d",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
#endif
const int IKCP_OVERHEAD = 24;
if (pkt->buf_len < IKCP_OVERHEAD) {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d buflen:%d over_head_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
return;
}
int socket_handle = ikcp_getconv(pkt->buf);
auto session = GetSession(socket_handle);
if (!session) {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s socket_handle:%d session_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
socket_handle
});
return;
}
if (session->GetSecretKeyPlace()) {
if (pkt->buf_len >= IKCP_OVERHEAD + KcpSession::GetSecretKeyLen()) {
long long secret_key = KcpSession::ReadSecretKey(pkt->buf + IKCP_OVERHEAD, pkt->buf_len);
if (secret_key == session->GetKcpSession()->GetSecretKey()) {
session->GetKcpSession()->OnRecvPacket(pkt);
} else {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d socket_handle%d secret_key:%d secret_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
socket_handle,
secret_key
});
}
} else {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d buflen:%d bufflen_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
}
} else {
session->GetKcpSession()->OnRecvPacket(pkt);
}
}
void LongSessionMgr::DelSession(int socket_handle)
{
{
auto session = GetSession(socket_handle);
if (session) {
if (session.use_count() != 2) {
#ifdef DEBUG
abort();
#endif
}
session->UnInit();
}
}
socket_handle_hash_.erase(socket_handle);
}
int LongSessionMgr::GetLongSessionCount()
{
return socket_handle_hash_.size();
}