diff --git a/server/wsproxy/GCListener.cc b/server/wsproxy/GCListener.cc index 0fe6f27..59b4faa 100644 --- a/server/wsproxy/GCListener.cc +++ b/server/wsproxy/GCListener.cc @@ -22,6 +22,9 @@ public: virtual void DecodeUserPacket(char* buf, int& offset, unsigned int buflen) override { + #if 1 + is_activite = true; + #endif //packagelen + msgid + magiccode + msgbody //2 + 2 + 4+ xx + \0 + xx bool warning = false; @@ -31,14 +34,15 @@ public: if (buflen - offset < sizeof(f8::PackHead) + p->packlen) { break; } - App::Instance()->AddSocketMsg(SF_Client, - socket_handle, - saddr, - p->msgid, - p->seqid, - &buf[offset + sizeof(f8::PackHead)], - p->packlen, - ST_Tcp); + f8::App::Instance()->AddSocketMsg + (SF_Client, + socket_handle, + saddr, + p->msgid, + p->seqid, + &buf[offset + sizeof(f8::PackHead)], + p->packlen, + ST_Tcp); offset += sizeof(f8::PackHead) + p->packlen; } else { warning = true; @@ -107,11 +111,6 @@ public: }; -static void CreateGameClientSocket(a8::TcpSession **p) -{ - *p = new GCClientSession(); -} - static void GSListeneron_error(a8::TcpListener*, int type, int errorid) { f8::UdpLog::Instance()->Debug("GCListeneron_error %d %d", {type, errorid}); @@ -120,7 +119,7 @@ static void GSListeneron_error(a8::TcpListener*, int type, int errorid) void GCListener::Init() { tcp_listener_ = new a8::TcpListener(); - tcp_listener_->on_create_client_socket = CreateGameClientSocket; + tcp_listener_->RegisterSessionClass(); tcp_listener_->on_error = GSListeneron_error; tcp_listener_->bind_address = "0.0.0.0"; diff --git a/server/wsproxy/app.cc b/server/wsproxy/app.cc index 03e20bf..9fa99a8 100644 --- a/server/wsproxy/app.cc +++ b/server/wsproxy/app.cc @@ -164,42 +164,6 @@ void App::Update() SlowerExecute(); } -void App::AddSocketMsg(SocketFrom_e sockfrom, - int sockhandle, - long ip_saddr, - unsigned short msgid, - unsigned int seqid, - const char *msgbody, - int bodylen, - int tag) -{ - MsgNode *p = (MsgNode*)malloc(sizeof(MsgNode)); - memset(p, 0, sizeof(MsgNode)); - p->sockfrom = sockfrom; - p->ip_saddr = ip_saddr; - p->sockhandle = sockhandle; - p->msgid = msgid; - p->seqid = seqid; - p->buf = nullptr; - p->buflen = bodylen; - p->tag = tag; - if (bodylen > 0) { - p->buf = (char*)malloc(bodylen); - memmove(p->buf, msgbody, bodylen); - } - msg_mutex_->lock(); - if (bot_node_) { - bot_node_->next = p; - bot_node_ = p; - } else { - top_node_ = p; - bot_node_ = p; - } - ++msgnode_size_; - msg_mutex_->unlock(); - f8::App::Instance()->NotifyLoopCond(); -} - void App::QuickExecute() { f8::Timer::Instance()->Update(); @@ -248,6 +212,7 @@ bool App::HasTask() void App::DispatchMsg() { + #if 0 long long starttick = a8::XGetTickCount(); if (!work_node_ && top_node_) { msg_mutex_->lock(); @@ -299,20 +264,21 @@ void App::DispatchMsg() if (!work_node_) { working_msgnode_size_ = 0; } + #endif } -void App::ProcessClientMsg(f8::MsgHdr& hdr, int tag) +void App::ProcessClientMsg(f8::MsgHdr* hdr, int tag) { - if (hdr.msgid == ss::_SS_CMLogin || - hdr.msgid == ss::_SS_CMReconnect || - hdr.msgid == ss::_SS_CMKcpHandshake) { - auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle); + if (hdr->msgid == ss::_SS_CMLogin || + hdr->msgid == ss::_SS_CMReconnect || + hdr->msgid == ss::_SS_CMKcpHandshake) { + auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr->socket_handle); if (down_wp.expired()) { - switch (hdr.msgid) { + switch (hdr->msgid) { case ss::_SS_CMLogin: { ss::SS_CMLogin msg; - bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset); + bool ok = msg.ParseFromArray(hdr->buf + hdr->offset, hdr->buflen - hdr->offset); if (ok) { MasterMgr::Instance()->RequestTargetServer (hdr, @@ -328,7 +294,7 @@ void App::ProcessClientMsg(f8::MsgHdr& hdr, int tag) case ss::_SS_CMReconnect: { ss::SS_CMReconnect msg; - bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset); + bool ok = msg.ParseFromArray(hdr->buf + hdr->offset, hdr->buflen - hdr->offset); if (ok) { MasterMgr::Instance()->RequestTargetServer (hdr, @@ -344,7 +310,7 @@ void App::ProcessClientMsg(f8::MsgHdr& hdr, int tag) case ss::_SS_CMKcpHandshake: { ss::SS_CMKcpHandshake msg; - bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset); + bool ok = msg.ParseFromArray(hdr->buf + hdr->offset, hdr->buflen - hdr->offset); if (ok) { LongSessionMgr::Instance()->_SS_CMKcpHandshake(hdr, msg); } @@ -358,17 +324,17 @@ void App::ProcessClientMsg(f8::MsgHdr& hdr, int tag) } } } else { - auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle); + auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr->socket_handle); if (auto down = down_wp.lock(); !down_wp.expired()) { down->ProcCMMsg(hdr, tag); } } } -void App::ProcessMasterServerMsg(f8::MsgHdr& hdr, int tag) +void App::ProcessMasterServerMsg(f8::MsgHdr* hdr, int tag) { f8::NetMsgHandler* handler = f8::GetNetMsgHandler(&HandlerMgr::Instance()->msmsghandler, - hdr.msgid); + hdr->msgid); if (handler) { switch (handler->handlerid) { case HID_MasterMgr: @@ -378,17 +344,17 @@ void App::ProcessMasterServerMsg(f8::MsgHdr& hdr, int tag) } } -void App::ProcessTargetServerMsg(f8::MsgHdr& hdr, int tag) +void App::ProcessTargetServerMsg(f8::MsgHdr* hdr, int tag) { - if (hdr.msgid == ss::_SS_ForceCloseSocket) { - GCListener::Instance()->ForceCloseClient(hdr.socket_handle); + if (hdr->msgid == ss::_SS_ForceCloseSocket) { + GCListener::Instance()->ForceCloseClient(hdr->socket_handle); return; } - if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReconnect) { - DownStreamMgr::Instance()->BindUpStream(hdr.socket_handle, hdr.ip_saddr); - GCListener::Instance()->MarkClient(hdr.socket_handle, true); + if (hdr->msgid == ss::_SS_CMLogin || hdr->msgid == ss::_SS_CMReconnect) { + DownStreamMgr::Instance()->BindUpStream(hdr->socket_handle, hdr->ip_saddr); + GCListener::Instance()->MarkClient(hdr->socket_handle, true); } - auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle); + auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr->socket_handle); if (!down_wp.expired()) { down_wp.lock()->ForwardUpStreamMsg(hdr); } @@ -511,3 +477,25 @@ int App::GetGameId() const return GAME_ID; #endif } + +void App::DispatchSocketMsg(f8::MsgHdr* hdr) +{ + switch (hdr->sockfrom) { + case SF_Client: + { + ProcessClientMsg(hdr, hdr->tag); + } + break; + case SF_TargetServer: + { + ProcessTargetServerMsg(hdr, hdr->tag); + } + break; + case SF_MasterServer: + { + ProcessMasterServerMsg(hdr, hdr->tag); + } + break; + } + f8::App::Instance()->FreeSocketMsg(hdr); +} diff --git a/server/wsproxy/app.h b/server/wsproxy/app.h index 7bd3c9c..1db1d61 100644 --- a/server/wsproxy/app.h +++ b/server/wsproxy/app.h @@ -24,15 +24,7 @@ public: virtual void UnInit() override; virtual void Update() override; virtual bool HasTask() override; - - void AddSocketMsg(SocketFrom_e sockfrom, - int sockhandle, - long ip_saddr, - unsigned short msgid, - unsigned int seqid, - const char *msgbody, - int bodylen, - int tag = ST_Tcp); + virtual void DispatchSocketMsg(f8::MsgHdr* hdr) override; void AddUdpMsg(a8::UdpPacket* pkt); @@ -49,9 +41,9 @@ private: void DispatchMsg(); void DispatchUdpMsg(); - void ProcessClientMsg(f8::MsgHdr& hdr, int tag); - void ProcessMasterServerMsg(f8::MsgHdr& hdr, int tag); - void ProcessTargetServerMsg(f8::MsgHdr& hdr, int tag); + void ProcessClientMsg(f8::MsgHdr* hdr, int tag); + void ProcessMasterServerMsg(f8::MsgHdr* hdr, int tag); + void ProcessTargetServerMsg(f8::MsgHdr* hdr, int tag); void FreeSocketMsgQueue(); void FreeUdpMsgQueue(); diff --git a/server/wsproxy/downstream.cc b/server/wsproxy/downstream.cc index c00e1ff..0a4625b 100644 --- a/server/wsproxy/downstream.cc +++ b/server/wsproxy/downstream.cc @@ -23,21 +23,21 @@ void DownStream::ReBindUpStream(std::weak_ptr up) up_ = up; } -void DownStream::ForwardUpStreamMsg(f8::MsgHdr& hdr) +void DownStream::ForwardUpStreamMsg(f8::MsgHdr* hdr) { - char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen); + char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr->buflen); f8::PackHead* head = (f8::PackHead*)buff; - head->packlen = hdr.buflen; - head->msgid = hdr.msgid; - head->seqid = hdr.seqid; + head->packlen = hdr->buflen; + head->msgid = hdr->msgid; + head->seqid = hdr->seqid; head->magic_code = f8::MAGIC_CODE; - head->ext_len = hdr.buflen >> 16; - if (hdr.buflen > 0) { - memmove(buff + sizeof(f8::PackHead), hdr.buf, hdr.buflen); + head->ext_len = hdr->buflen >> 16; + if (hdr->buflen > 0) { + memmove(buff + sizeof(f8::PackHead), hdr->buf, hdr->buflen); } if (auto long_session = long_session_wp_.lock(); !long_session_wp_.expired()) { - if (hdr.msgid == ss::_SS_CMPing) { + if (hdr->msgid == ss::_SS_CMPing) { ss::SS_SMPing msg; msg.set_source(1); { @@ -46,10 +46,10 @@ void DownStream::ForwardUpStreamMsg(f8::MsgHdr& hdr) buff = (char*)malloc(sizeof(f8::PackHead) + msg.ByteSize()); f8::PackHead* head = (f8::PackHead*)buff; head->packlen = msg.ByteSize(); - head->msgid = hdr.msgid; - head->seqid = hdr.seqid; + head->msgid = hdr->msgid; + head->seqid = hdr->seqid; head->magic_code = f8::MAGIC_CODE; - head->ext_len = hdr.buflen >> 16; + head->ext_len = hdr->buflen >> 16; msg.SerializeToArray(buff + sizeof(f8::PackHead), head->packlen); long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen); } @@ -57,7 +57,7 @@ void DownStream::ForwardUpStreamMsg(f8::MsgHdr& hdr) long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen); } } else { - GCListener::Instance()->SendBuf(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen); + GCListener::Instance()->SendBuf(hdr->socket_handle, buff, sizeof(f8::PackHead) + head->packlen); } free(buff); } @@ -73,9 +73,9 @@ void DownStream::OnClose() } } -void DownStream::ProcCMMsg(f8::MsgHdr& hdr, int tag) +void DownStream::ProcCMMsg(f8::MsgHdr* hdr, int tag) { - if (hdr.msgid == ss::_SS_CMPing && IsLongSession() && tag == ST_Tcp) { + if (hdr->msgid == ss::_SS_CMPing && IsLongSession() && tag == ST_Tcp) { ss::SS_SMPing msg; GCListener::Instance()->SendMsgEx(socket_handle_, ss::_SS_CMPing, msg); if (!long_session_wp_.expired()) { diff --git a/server/wsproxy/downstream.h b/server/wsproxy/downstream.h index 1a017e7..85f0a56 100644 --- a/server/wsproxy/downstream.h +++ b/server/wsproxy/downstream.h @@ -12,9 +12,9 @@ class DownStream std::weak_ptr GetUpStream() const { return up_; } void ReBindUpStream(std::weak_ptr up); bool IsLongSession() { return is_long_session_; } - void ProcCMMsg(f8::MsgHdr& hdr, int tag); + void ProcCMMsg(f8::MsgHdr* hdr, int tag); - void ForwardUpStreamMsg(f8::MsgHdr& hdr); + void ForwardUpStreamMsg(f8::MsgHdr* hdr); void OnClose(); private: diff --git a/server/wsproxy/handlermgr.cc b/server/wsproxy/handlermgr.cc index 0cddadd..ef1ed16 100644 --- a/server/wsproxy/handlermgr.cc +++ b/server/wsproxy/handlermgr.cc @@ -16,34 +16,34 @@ static void _GMOpsSelfChecking(std::shared_ptr request) { - request->resp_xobj->SetVal("errcode", 0); - request->resp_xobj->SetVal("errmsg", ""); - request->resp_xobj->SetVal("healthy", 1); - request->resp_xobj->SetVal("max_rundelay", 10); + request->GetResp()->SetVal("errcode", 0); + request->GetResp()->SetVal("errmsg", ""); + request->GetResp()->SetVal("healthy", 1); + request->GetResp()->SetVal("max_rundelay", 10); } static void _GMOpsGetNodeId(std::shared_ptr request) { - request->resp_xobj->SetVal("errcode", 0); - request->resp_xobj->SetVal("errmsg", ""); - request->resp_xobj->SetVal("node_id", f8::App::Instance()->GetNodeId()); + request->GetResp()->SetVal("errcode", 0); + request->GetResp()->SetVal("errmsg", ""); + request->GetResp()->SetVal("node_id", f8::App::Instance()->GetNodeId()); } static void _GMOpsSetKcpSwitch(std::shared_ptr request) { - request->resp_xobj->SetVal("errcode", 0); - request->resp_xobj->SetVal("errmsg", ""); - if (request->params->HasKey("open")) { - JsonDataMgr::Instance()->SetKcpSwitch(request->params->At("open")->AsXValue().GetInt()); + request->GetResp()->SetVal("errcode", 0); + request->GetResp()->SetVal("errmsg", ""); + if (request->GetParams()->HasKey("open")) { + JsonDataMgr::Instance()->SetKcpSwitch(request->GetParams()->At("open")->AsXValue().GetInt()); } - request->resp_xobj->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open); + request->GetResp()->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open); } static void _GMOpsGetKcpSwitch(std::shared_ptr request) { - request->resp_xobj->SetVal("errcode", 0); - request->resp_xobj->SetVal("errmsg", ""); - request->resp_xobj->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open); + request->GetResp()->SetVal("errcode", 0); + request->GetResp()->SetVal("errmsg", ""); + request->GetResp()->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open); } void HandlerMgr::Init() @@ -89,18 +89,17 @@ void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle, std::string msgname = a8::Get(request, "c").GetString() + "$" + a8::Get(request, "a").GetString(); auto itr = gmhandlers_.find(msgname); if (itr != gmhandlers_.end()) { - auto request = std::make_shared(); - request->saddr = saddr; - request->socket_handle = sockhandle; - request->query_str = querystr; - request->params->ReadFromUrlQueryString(querystr); + auto request = std::make_shared + ( + saddr, + url, + querystr, + [sockhandle] (const a8::Args& args) + { + std::string data = args.Get(0); + GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(data)); + }); itr->second(request); - - if (!request->pending){ - std::string response; - request->resp_xobj->ToJsonStr(response); - GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(response)); - } } else { GCListener::Instance()->SendText(sockhandle, a8::HttpResponse("{}")); } diff --git a/server/wsproxy/kcpsession.cc b/server/wsproxy/kcpsession.cc index 0c0c050..449e7d1 100644 --- a/server/wsproxy/kcpsession.cc +++ b/server/wsproxy/kcpsession.cc @@ -131,15 +131,16 @@ void KcpSession::DecodeUserPacketOld(char* buf, int& offset, unsigned int buflen if (buflen - offset < sizeof(f8::PackHead) + p->packlen + GetSecretKeyLen()) { break; } - App::Instance()->AddSocketMsg(SF_Client, - socket_handle_, - 0, - //saddr, - p->msgid, - p->seqid, - &buf[offset + sizeof(f8::PackHead) + GetSecretKeyLen()], - p->packlen, - ST_Udp); + f8::App::Instance()->AddSocketMsg + (SF_Client, + socket_handle_, + 0, + //saddr, + p->msgid, + p->seqid, + &buf[offset + sizeof(f8::PackHead) + GetSecretKeyLen()], + p->packlen, + ST_Udp); offset += sizeof(f8::PackHead) + p->packlen + GetSecretKeyLen(); } else { warning = true; @@ -162,15 +163,16 @@ void KcpSession::DecodeUserPacketNew(char* buf, int& offset, unsigned int buflen if (buflen - offset < sizeof(f8::PackHead) + p->packlen) { break; } - App::Instance()->AddSocketMsg(SF_Client, - socket_handle_, - 0, - //saddr, - p->msgid, - p->seqid, - &buf[offset + sizeof(f8::PackHead)], - p->packlen, - ST_Udp); + f8::App::Instance()->AddSocketMsg + (SF_Client, + socket_handle_, + 0, + //saddr, + p->msgid, + p->seqid, + &buf[offset + sizeof(f8::PackHead)], + p->packlen, + ST_Udp); offset += sizeof(f8::PackHead) + p->packlen; } else { warning = true; diff --git a/server/wsproxy/longsession.cc b/server/wsproxy/longsession.cc index f41b158..461c78d 100644 --- a/server/wsproxy/longsession.cc +++ b/server/wsproxy/longsession.cc @@ -8,10 +8,10 @@ #include "ss_msgid.pb.h" #include "ss_proto.pb.h" -void LongSession::Init(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg) +void LongSession::Init(f8::MsgHdr* hdr, const ss::SS_CMKcpHandshake& msg) { kcp_session_ = std::make_shared(); - kcp_session_->Init(hdr.socket_handle, msg.secret_key_place()); + kcp_session_->Init(hdr->socket_handle, msg.secret_key_place()); } void LongSession::UnInit() diff --git a/server/wsproxy/longsession.h b/server/wsproxy/longsession.h index 41ec1d0..a2a56ff 100644 --- a/server/wsproxy/longsession.h +++ b/server/wsproxy/longsession.h @@ -11,7 +11,7 @@ class LongSession public: - void Init(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg); + void Init(f8::MsgHdr* hdr, const ss::SS_CMKcpHandshake& msg); void UnInit(); void Update(long long tick); diff --git a/server/wsproxy/longsessionmgr.cc b/server/wsproxy/longsessionmgr.cc index 9720d99..133ed95 100644 --- a/server/wsproxy/longsessionmgr.cc +++ b/server/wsproxy/longsessionmgr.cc @@ -53,17 +53,17 @@ void LongSessionMgr::Update() } } -void LongSessionMgr::_SS_CMKcpHandshake(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg) +void LongSessionMgr::_SS_CMKcpHandshake(f8::MsgHdr* hdr, const ss::SS_CMKcpHandshake& msg) { ss::SS_SMKcpHandshake respmsg; respmsg.set_errcode(0); if (!JsonDataMgr::Instance()->GetKcpConf().open) { respmsg.set_errcode(1); respmsg.set_errmsg("not support kcp"); - GCListener::Instance()->SendMsgEx(hdr.socket_handle, ss::_SS_CMKcpHandshake, respmsg); + GCListener::Instance()->SendMsgEx(hdr->socket_handle, ss::_SS_CMKcpHandshake, respmsg); return; } - if (GetSession(hdr.socket_handle)) { + if (GetSession(hdr->socket_handle)) { #ifdef DEBUG abort(); #endif @@ -77,9 +77,9 @@ void LongSessionMgr::_SS_CMKcpHandshake(f8::MsgHdr& hdr, const ss::SS_CMKcpHands respmsg.set_secret_key(session->GetKcpSession()->GetSecretKeyDataPtr(), KcpSession::GetSecretKeyLen()); respmsg.set_remote_host(JsonDataMgr::Instance()->GetUdpHost()); respmsg.set_remote_port(JsonDataMgr::Instance()->GetUdpPort()); - GCListener::Instance()->SendMsgEx(hdr.socket_handle, ss::_SS_CMKcpHandshake, respmsg); + GCListener::Instance()->SendMsgEx(hdr->socket_handle, ss::_SS_CMKcpHandshake, respmsg); { - int socket_handle = hdr.socket_handle; + int socket_handle = hdr->socket_handle; f8::Timer::Instance()->SetTimeout ( 1000 * 30, diff --git a/server/wsproxy/longsessionmgr.h b/server/wsproxy/longsessionmgr.h index 4595195..bf20b43 100644 --- a/server/wsproxy/longsessionmgr.h +++ b/server/wsproxy/longsessionmgr.h @@ -26,7 +26,7 @@ class LongSessionMgr : public a8::Singleton void UnInit(); void Update(); - void _SS_CMKcpHandshake(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg); + void _SS_CMKcpHandshake(f8::MsgHdr* hdr, const ss::SS_CMKcpHandshake& msg); void ProcUdpPacket(a8::UdpPacket* pkt); std::shared_ptr GetSession(int socket_handle); std::shared_ptr GetUdpListener() { return udp_listener_; } diff --git a/server/wsproxy/master.cc b/server/wsproxy/master.cc index cbc7de9..66d444b 100644 --- a/server/wsproxy/master.cc +++ b/server/wsproxy/master.cc @@ -114,13 +114,15 @@ void Master::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) if (recv_bufflen_ - offset < sizeof(f8::PackHead) + p->packlen) { break; } - App::Instance()->AddSocketMsg(SF_MasterServer, - 0, - instance_id, - p->msgid, - p->seqid, - &recv_buff_[offset + sizeof(f8::PackHead)], - p->packlen); + f8::App::Instance()->AddSocketMsg + (SF_MasterServer, + 0, + instance_id, + p->msgid, + p->seqid, + &recv_buff_[offset + sizeof(f8::PackHead)], + p->packlen, + 0); offset += sizeof(f8::PackHead) + p->packlen; } else { warning = true; diff --git a/server/wsproxy/mastermgr.cc b/server/wsproxy/mastermgr.cc index ee2a3bd..d280846 100644 --- a/server/wsproxy/mastermgr.cc +++ b/server/wsproxy/mastermgr.cc @@ -54,7 +54,7 @@ void MasterMgr::UnInit() mastersvr_hash_.clear(); } -void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg) +void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr* hdr, const ss::SS_MS_ResponseTargetServer& msg) { auto req = GetRequestByContextId(msg.context_id()); if (req) { @@ -91,7 +91,7 @@ std::shared_ptr MasterMgr::GetConnById(int instance_id) return itr != mastersvr_hash_.end() ? itr->second : nullptr; } -void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr, +void MasterMgr::RequestTargetServer(f8::MsgHdr* hdr, const std::string& team_id, const std::string& account_id, const std::string& session_id, @@ -99,7 +99,7 @@ void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr, int is_reconnect, int proto_version) { - if (GetRequestBySocket(hdr.socket_handle)) { + if (GetRequestBySocket(hdr->socket_handle)) { return; } unsigned int code = 0; @@ -122,10 +122,10 @@ void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr, ++curr_context_id_; auto req = std::make_shared(); req->context_id = curr_context_id_; - req->socket_handle = hdr.socket_handle; + req->socket_handle = hdr->socket_handle; req->account_id = account_id; req->req_tick = a8::XGetTickCount(); - req->hdr_copy = hdr.Clone(); + req->hdr_copy = hdr->Clone(); ss::SS_WSP_RequestTargetServer msg; msg.set_context_id(curr_context_id_); @@ -139,14 +139,14 @@ void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr, std::string url; std::string query_str; { - GCListener::Instance()->GetWebSocketUrl(hdr.socket_handle, url, query_str); + GCListener::Instance()->GetWebSocketUrl(hdr->socket_handle, url, query_str); } msg.set_url(url); msg.set_query_str(query_str); #endif svr->SendMsg(msg); - pending_socket_hash_[hdr.socket_handle] = req; + pending_socket_hash_[hdr->socket_handle] = req; assert(pending_context_hash_.find(curr_context_id_) == pending_context_hash_.end()); pending_context_hash_[curr_context_id_] = req; req->timer_wp = f8::Timer::Instance()->SetTimeoutWp diff --git a/server/wsproxy/mastermgr.h b/server/wsproxy/mastermgr.h index 69047cb..562351c 100644 --- a/server/wsproxy/mastermgr.h +++ b/server/wsproxy/mastermgr.h @@ -26,8 +26,8 @@ class MasterMgr : public a8::Singleton void Init(); void UnInit(); - void _SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg); - void RequestTargetServer(f8::MsgHdr& hdr, + void _SS_MS_ResponseTargetServer(f8::MsgHdr* hdr, const ss::SS_MS_ResponseTargetServer& msg); + void RequestTargetServer(f8::MsgHdr* hdr, const std::string& team_id, const std::string& account_id, const std::string& session_id, diff --git a/server/wsproxy/upstream.cc b/server/wsproxy/upstream.cc index 1532c2f..8e9ac9f 100644 --- a/server/wsproxy/upstream.cc +++ b/server/wsproxy/upstream.cc @@ -99,7 +99,7 @@ void UpStream::SendStockMsg() delete pdelnode->msg; } if (pdelnode->hdr) { - ForwardClientMsg(*pdelnode->hdr); + ForwardClientMsg(pdelnode->hdr); f8::MsgHdr::Destroy(pdelnode->hdr); pdelnode->hdr = nullptr; } @@ -107,23 +107,23 @@ void UpStream::SendStockMsg() } } -void UpStream::ForwardClientMsg(f8::MsgHdr& hdr) +void UpStream::ForwardClientMsg(f8::MsgHdr* hdr) { - char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); + char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr->buflen); memset(buff, 0, sizeof(f8::WSProxyPackHead_C)); f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff; - head->packlen = hdr.buflen; - head->msgid = hdr.msgid; - head->seqid = hdr.seqid; + head->packlen = hdr->buflen; + head->msgid = hdr->msgid; + head->seqid = hdr->seqid; head->magic_code = f8::MAGIC_CODE; #if 0 head->rpc_error_code = 0; #endif - head->socket_handle = hdr.socket_handle; - head->ip_saddr = hdr.ip_saddr; + head->socket_handle = hdr->socket_handle; + head->ip_saddr = hdr->ip_saddr; - if (hdr.buflen > 0) { - memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen); + if (hdr->buflen > 0) { + memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr->buf, hdr->buflen); } tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen); @@ -136,7 +136,7 @@ void UpStream::ForwardClientMsgEx(f8::MsgHdr* hdr) if (top_node_) { SendStockMsg(); } - ForwardClientMsg(*hdr); + ForwardClientMsg(hdr); if (hdr->buf) { free((char*)hdr->buf); } @@ -221,13 +221,15 @@ void UpStream::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) { break; } - App::Instance()->AddSocketMsg(SF_TargetServer, - p->socket_handle, - instance_id, - p->msgid, - p->seqid, - &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], - real_len); + f8::App::Instance()->AddSocketMsg + (SF_TargetServer, + p->socket_handle, + instance_id, + p->msgid, + p->seqid, + &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], + real_len, + 0); offset += sizeof(f8::WSProxyPackHead_S) + real_len; } else { warning = true; diff --git a/server/wsproxy/upstream.h b/server/wsproxy/upstream.h index e0b37c3..7d612fc 100644 --- a/server/wsproxy/upstream.h +++ b/server/wsproxy/upstream.h @@ -50,7 +50,7 @@ class UpStream } void SendStockMsg(); - void ForwardClientMsg(f8::MsgHdr& hdr); + void ForwardClientMsg(f8::MsgHdr* hdr); void ForwardClientMsgEx(f8::MsgHdr* hdr); private: diff --git a/third_party/f8 b/third_party/f8 index 1c141d8..e6570ff 160000 --- a/third_party/f8 +++ b/third_party/f8 @@ -1 +1 @@ -Subproject commit 1c141d8817c21820eb0e79aad3279245a8a72866 +Subproject commit e6570ff2d38b71d20328d875d02237a3a908512d