1
This commit is contained in:
parent
5cc155cd2b
commit
9606fc7a1c
@ -23,6 +23,7 @@ struct HttpTunnelRequest
|
||||
{
|
||||
int socket_handle = 0;
|
||||
long long context_id = 0;
|
||||
f8::TimerWp timer_wp;
|
||||
std::shared_ptr<f8::JsonHttpRequest> request;
|
||||
};
|
||||
|
||||
@ -106,7 +107,43 @@ void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr* hdr, const ss::SS_MS_Res
|
||||
|
||||
void MasterMgr::_SS_MS_HttpTunnelResponse(f8::MsgHdr* hdr, const ss::SS_MS_HttpTunnelResponse& msg)
|
||||
{
|
||||
|
||||
auto req = GetRequestByContextId(msg.context_id());
|
||||
if (req) {
|
||||
#ifdef DEBUG
|
||||
a8::XPrintf("error_code:%d error_msg:%s host:%s port:%d\n",
|
||||
{
|
||||
msg.error_code(),
|
||||
msg.error_msg(),
|
||||
msg.host(),
|
||||
msg.port()
|
||||
});
|
||||
#endif
|
||||
if (msg.error_code() == 0) {
|
||||
std::weak_ptr<UpStream> conn = UpStreamMgr::Instance()->RecreateUpStream
|
||||
(
|
||||
msg.host(),
|
||||
msg.port()
|
||||
);
|
||||
if (!conn.expired()) {
|
||||
conn.lock()->ForwardClientMsgEx(req->hdr_copy);
|
||||
f8::MsgHdr::Destroy(req->hdr_copy);
|
||||
req->conn = conn;
|
||||
req->hdr_copy = nullptr;
|
||||
if (!req->timer_wp.expired()) {
|
||||
f8::Timer::Instance()->FireEvent(req->timer_wp,
|
||||
ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT,
|
||||
nullptr);
|
||||
f8::Timer::Instance()->Delete(req->timer_wp);
|
||||
}
|
||||
RemoveRequest(req->socket_handle);
|
||||
return;
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
} else {
|
||||
RemoveRequest(req->socket_handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Master> MasterMgr::GetConnById(int instance_id)
|
||||
@ -268,20 +305,36 @@ void MasterMgr::AddHttpTunnelRequest(int socket_handle, std::shared_ptr<f8::Json
|
||||
|
||||
pending_http_tunnel_socket_hash_[socket_handle] = req;
|
||||
pending_http_tunnel_context_hash_[curr_context_id_] = req;
|
||||
req->timer_wp = f8::Timer::Instance()->SetTimeoutWp
|
||||
(1000 * 10,
|
||||
[req] (int event, const a8::Args* args)
|
||||
{
|
||||
if (a8::TIMER_EXEC_EVENT == event) {
|
||||
MasterMgr::Instance()->RemoveHttpTunnelRequest(req->socket_handle);
|
||||
} else if (ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT == event) {
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
void MasterMgr::RemoveHttpTunnelRequest(int socket_handle)
|
||||
{
|
||||
auto req = GetHttpTunnelRequest(socket_handle);
|
||||
auto req = GetHttpTunnelRequestBySocket(socket_handle);
|
||||
if (req) {
|
||||
pending_http_tunnel_socket_hash_.erase(req->socket_handle);
|
||||
pending_http_tunnel_context_hash_.erase(req->context_id);
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<HttpTunnelRequest> MasterMgr::GetHttpTunnelRequest(int socket_handle)
|
||||
std::shared_ptr<HttpTunnelRequest> MasterMgr::GetHttpTunnelRequestBySocket(int socket_handle)
|
||||
{
|
||||
auto itr = pending_http_tunnel_socket_hash_.find(socket_handle);
|
||||
return itr != pending_http_tunnel_socket_hash_.end() ? itr->second : nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<HttpTunnelRequest> MasterMgr::GetHttpTunnelRequestByContextId(long long context_id)
|
||||
{
|
||||
auto itr = pending_http_tunnel_context_hash_.find(context_id);
|
||||
return itr != pending_http_tunnel_context_hash_.end() ? itr->second : nullptr;
|
||||
}
|
||||
|
@ -41,7 +41,8 @@ class MasterMgr : public a8::Singleton<MasterMgr>
|
||||
void AddHttpTunnelRequest(int socket_handle, std::shared_ptr<f8::JsonHttpRequest> request);
|
||||
|
||||
private:
|
||||
std::shared_ptr<HttpTunnelRequest> GetHttpTunnelRequest(int socket_handle);
|
||||
std::shared_ptr<HttpTunnelRequest> GetHttpTunnelRequestBySocket(int socket_handle);
|
||||
std::shared_ptr<HttpTunnelRequest> GetHttpTunnelRequestByContextId(long long conext_id);
|
||||
std::shared_ptr<RequestTarget> GetRequestBySocket(int socket_handle);
|
||||
std::shared_ptr<RequestTarget> GetRequestByContextId(long long context_id);
|
||||
std::shared_ptr<Master> GetConnById(int instance_id);
|
||||
|
Loading…
x
Reference in New Issue
Block a user