wsproxy/server/wsproxy/downstream.cc
2023-04-27 06:41:08 +00:00

90 lines
2.7 KiB
C++

#include "precompile.h"
#include "downstream.h"
#include "upstream.h"
#include "longsessionmgr.h"
#include "GCListener.h"
#include "longsession.h"
#include "kcpsession.h"
#include "ss_msgid.pb.h"
#include "ss_proto.pb.h"
void DownStream::Init(int socket_handle, std::weak_ptr<UpStream> up)
{
socket_handle_ = socket_handle;
up_ = up;
long_session_wp_ = LongSessionMgr::Instance()->GetSession(socket_handle_);
is_long_session_ = !long_session_wp_.expired();
}
void DownStream::ReBindUpStream(std::weak_ptr<UpStream> up)
{
up_ = up;
}
void DownStream::ForwardUpStreamMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen);
f8::PackHead* head = (f8::PackHead*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
head->ext_len = hdr.buflen >> 16;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::PackHead), hdr.buf, hdr.buflen);
}
if (auto long_session = long_session_wp_.lock(); !long_session_wp_.expired()) {
if (hdr.msgid == ss::_SS_CMPing) {
ss::SS_SMPing msg;
msg.set_source(1);
{
free(buff);
buff = (char*)malloc(sizeof(f8::PackHead) + msg.ByteSize());
f8::PackHead* head = (f8::PackHead*)buff;
head->packlen = msg.ByteSize();
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
head->ext_len = hdr.buflen >> 16;
msg.SerializeToArray(buff + sizeof(f8::PackHead), head->packlen);
long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen);
}
} else {
long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen);
}
} else {
GCListener::Instance()->SendBuf(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen);
}
free(buff);
}
void DownStream::OnClose()
{
if (!GetUpStream().expired()) {
ss::SS_WSP_SocketDisconnect msg;
GetUpStream().lock()->SendMsg(socket_handle_, msg);
}
if (!long_session_wp_.expired()) {
LongSessionMgr::Instance()->DelSession(socket_handle_);
}
}
void DownStream::ProcCMMsg(f8::MsgHdr& hdr, int tag)
{
if (hdr.msgid == ss::_SS_CMPing && IsLongSession() && tag == ST_Tcp) {
ss::SS_SMPing msg;
GCListener::Instance()->SendMsgEx(socket_handle_, ss::_SS_CMPing, msg);
if (!long_session_wp_.expired()) {
long_session_wp_.lock()->UpdatePing();
}
return;
}
if (!GetUpStream().expired()) {
GetUpStream().lock()->ForwardClientMsg(hdr);
}
}