From 9606fc7a1c660a1f8f777e2dd3fa04ae9ab31e51 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Sun, 24 Mar 2024 14:17:16 +0800 Subject: [PATCH] 1 --- server/wsproxy/mastermgr.cc | 59 +++++++++++++++++++++++++++++++++++-- server/wsproxy/mastermgr.h | 3 +- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/server/wsproxy/mastermgr.cc b/server/wsproxy/mastermgr.cc index ffb7ed2..6ff7c05 100644 --- a/server/wsproxy/mastermgr.cc +++ b/server/wsproxy/mastermgr.cc @@ -23,6 +23,7 @@ struct HttpTunnelRequest { int socket_handle = 0; long long context_id = 0; + f8::TimerWp timer_wp; std::shared_ptr request; }; @@ -106,7 +107,43 @@ void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr* hdr, const ss::SS_MS_Res void MasterMgr::_SS_MS_HttpTunnelResponse(f8::MsgHdr* hdr, const ss::SS_MS_HttpTunnelResponse& msg) { - + auto req = GetRequestByContextId(msg.context_id()); + if (req) { +#ifdef DEBUG + a8::XPrintf("error_code:%d error_msg:%s host:%s port:%d\n", + { + msg.error_code(), + msg.error_msg(), + msg.host(), + msg.port() + }); +#endif + if (msg.error_code() == 0) { + std::weak_ptr conn = UpStreamMgr::Instance()->RecreateUpStream + ( + msg.host(), + msg.port() + ); + if (!conn.expired()) { + conn.lock()->ForwardClientMsgEx(req->hdr_copy); + f8::MsgHdr::Destroy(req->hdr_copy); + req->conn = conn; + req->hdr_copy = nullptr; + if (!req->timer_wp.expired()) { + f8::Timer::Instance()->FireEvent(req->timer_wp, + ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT, + nullptr); + f8::Timer::Instance()->Delete(req->timer_wp); + } + RemoveRequest(req->socket_handle); + return; + } else { + abort(); + } + } else { + RemoveRequest(req->socket_handle); + } + } } std::shared_ptr MasterMgr::GetConnById(int instance_id) @@ -268,20 +305,36 @@ void MasterMgr::AddHttpTunnelRequest(int socket_handle, std::shared_ptrtimer_wp = f8::Timer::Instance()->SetTimeoutWp + (1000 * 10, + [req] (int event, const a8::Args* args) + { + if (a8::TIMER_EXEC_EVENT == event) { + MasterMgr::Instance()->RemoveHttpTunnelRequest(req->socket_handle); + } else if (ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT == event) { + } + } + ); } } void MasterMgr::RemoveHttpTunnelRequest(int socket_handle) { - auto req = GetHttpTunnelRequest(socket_handle); + auto req = GetHttpTunnelRequestBySocket(socket_handle); if (req) { pending_http_tunnel_socket_hash_.erase(req->socket_handle); pending_http_tunnel_context_hash_.erase(req->context_id); } } -std::shared_ptr MasterMgr::GetHttpTunnelRequest(int socket_handle) +std::shared_ptr MasterMgr::GetHttpTunnelRequestBySocket(int socket_handle) { auto itr = pending_http_tunnel_socket_hash_.find(socket_handle); return itr != pending_http_tunnel_socket_hash_.end() ? itr->second : nullptr; } + +std::shared_ptr MasterMgr::GetHttpTunnelRequestByContextId(long long context_id) +{ + auto itr = pending_http_tunnel_context_hash_.find(context_id); + return itr != pending_http_tunnel_context_hash_.end() ? itr->second : nullptr; +} diff --git a/server/wsproxy/mastermgr.h b/server/wsproxy/mastermgr.h index 4a61653..567aeda 100644 --- a/server/wsproxy/mastermgr.h +++ b/server/wsproxy/mastermgr.h @@ -41,7 +41,8 @@ class MasterMgr : public a8::Singleton void AddHttpTunnelRequest(int socket_handle, std::shared_ptr request); private: - std::shared_ptr GetHttpTunnelRequest(int socket_handle); + std::shared_ptr GetHttpTunnelRequestBySocket(int socket_handle); + std::shared_ptr GetHttpTunnelRequestByContextId(long long conext_id); std::shared_ptr GetRequestBySocket(int socket_handle); std::shared_ptr GetRequestByContextId(long long context_id); std::shared_ptr GetConnById(int instance_id);