#include "precompile.h" #include "downstream.h" #include "upstream.h" #include "longsessionmgr.h" #include "GCListener.h" #include "longsession.h" #include "kcpsession.h" #include "ss_msgid.pb.h" #include "ss_proto.pb.h" void DownStream::Init(int socket_handle, std::weak_ptr up) { socket_handle_ = socket_handle; up_ = up; long_session_wp_ = LongSessionMgr::Instance()->GetSession(socket_handle_); is_long_session_ = !long_session_wp_.expired(); } void DownStream::ReBindUpStream(std::weak_ptr up) { up_ = up; } void DownStream::ForwardUpStreamMsg(f8::MsgHdr* hdr) { char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr->buflen); f8::PackHead* head = (f8::PackHead*)buff; head->packlen = hdr->buflen; head->msgid = hdr->msgid; head->seqid = hdr->seqid; head->magic_code = f8::MAGIC_CODE; head->ext_len = hdr->buflen >> 16; if (hdr->buflen > 0) { memmove(buff + sizeof(f8::PackHead), hdr->buf, hdr->buflen); } if (auto long_session = long_session_wp_.lock(); !long_session_wp_.expired()) { if (hdr->msgid == ss::_SS_CMPing) { ss::SS_SMPing msg; msg.set_source(1); { free(buff); buff = (char*)malloc(sizeof(f8::PackHead) + msg.ByteSize()); f8::PackHead* head = (f8::PackHead*)buff; head->packlen = msg.ByteSize(); head->msgid = hdr->msgid; head->seqid = hdr->seqid; head->magic_code = f8::MAGIC_CODE; head->ext_len = hdr->buflen >> 16; msg.SerializeToArray(buff + sizeof(f8::PackHead), head->packlen); long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen); } } else { long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen); } } else { GCListener::Instance()->SendBuf(hdr->socket_handle, buff, sizeof(f8::PackHead) + head->packlen); } free(buff); } void DownStream::OnClose() { if (!GetUpStream().expired()) { ss::SS_WSP_SocketDisconnect msg; GetUpStream().lock()->SendMsg(socket_handle_, msg); } if (!long_session_wp_.expired()) { LongSessionMgr::Instance()->DelSession(socket_handle_); } } void DownStream::ProcCMMsg(f8::MsgHdr* hdr) { if (hdr->msgid == ss::_SS_CMPing && IsLongSession() && hdr->tag == ST_Tcp) { ss::SS_SMPing msg; GCListener::Instance()->SendMsgEx(socket_handle_, ss::_SS_CMPing, msg); if (!long_session_wp_.expired()) { long_session_wp_.lock()->UpdatePing(); } return; } if (!GetUpStream().expired()) { GetUpStream().lock()->ForwardClientMsg(hdr); } }