This commit is contained in:
aozhiwei 2023-04-24 11:07:22 +08:00
parent 54336693e4
commit 5ed753636c
4 changed files with 14 additions and 11 deletions

View File

@ -420,8 +420,8 @@ void App::ProcessClientMsg(f8::MsgHdr& hdr)
} else { } else {
auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle); auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) { if (auto down = down_wp.lock(); !down_wp.expired()) {
if (!down->conn.expired()) { if (!down->GetUpStream().expired()) {
down->conn.lock()->ForwardClientMsg(hdr); down->GetUpStream().lock()->ForwardClientMsg(hdr);
} }
} }
} }

View File

@ -12,6 +12,8 @@ void DownStream::ForwardUpStreamMsg(f8::MsgHdr& hdr)
void DownStream::OnClose() void DownStream::OnClose()
{ {
if (!GetUpStream().expired()) {
ss::SS_WSP_SocketDisconnect msg; ss::SS_WSP_SocketDisconnect msg;
conn.lock()->SendMsg(socket_handle, msg); GetUpStream().lock()->SendMsg(socket_handle, msg);
}
} }

View File

@ -5,11 +5,13 @@ class DownStream
{ {
public: public:
int socket_handle = a8::INVALID_SOCKET_HANDLE; int socket_handle = a8::INVALID_SOCKET_HANDLE;
std::weak_ptr<UpStream> conn;
void SetUpStream(std::weak_ptr<UpStream> up) { up_ = up; }
std::weak_ptr<UpStream> GetUpStream() { return up_; }
void ForwardUpStreamMsg(f8::MsgHdr& hdr); void ForwardUpStreamMsg(f8::MsgHdr& hdr);
void OnClose(); void OnClose();
private: private:
int type_ = 0; int type_ = 0;
std::weak_ptr<UpStream> up_;
}; };

View File

@ -43,9 +43,7 @@ void DownStreamMgr::OnClientDisconnect(int socket_handle)
{ {
auto down_wp = GetDownStream(socket_handle); auto down_wp = GetDownStream(socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) { if (auto down = down_wp.lock(); !down_wp.expired()) {
if (!down->conn.expired()) {
down->OnClose(); down->OnClose();
}
socket_hash_.erase(socket_handle); socket_hash_.erase(socket_handle);
} }
RemovePendingAccount(socket_handle); RemovePendingAccount(socket_handle);
@ -56,7 +54,8 @@ void DownStreamMgr::OnUpStreamDisconnect(int instance_id)
{ {
std::list<std::shared_ptr<DownStream>> delete_client; std::list<std::shared_ptr<DownStream>> delete_client;
for (auto& pair : socket_hash_) { for (auto& pair : socket_hash_) {
if (!pair.second->conn.expired() && pair.second->conn.lock()->instance_id == instance_id) { if (!pair.second->GetUpStream().expired() &&
pair.second->GetUpStream().lock()->instance_id == instance_id) {
delete_client.push_back(pair.second); delete_client.push_back(pair.second);
} }
} }
@ -84,11 +83,11 @@ void DownStreamMgr::BindUpStream(int socket_handle, int conn_instance_id)
if (!conn.expired()) { if (!conn.expired()) {
auto down_wp = GetDownStream(socket_handle); auto down_wp = GetDownStream(socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) { if (auto down = down_wp.lock(); !down_wp.expired()) {
down->conn = conn; down->SetUpStream(conn);
} else { } else {
down = std::make_shared<DownStream>(); down = std::make_shared<DownStream>();
down->socket_handle = socket_handle; down->socket_handle = socket_handle;
down->conn = conn; down->SetUpStream(conn);
socket_hash_[down->socket_handle] = down; socket_hash_[down->socket_handle] = down;
f8::UdpLog::Instance()->Info("BindUpStream socket_handle:%d", f8::UdpLog::Instance()->Info("BindUpStream socket_handle:%d",
{ {