This commit is contained in:
aozhiwei 2023-04-24 11:15:00 +08:00
parent 39e1c883b6
commit 0c1f08b877
4 changed files with 18 additions and 15 deletions

View File

@ -14,6 +14,6 @@ void DownStream::OnClose()
{
if (!GetUpStream().expired()) {
ss::SS_WSP_SocketDisconnect msg;
GetUpStream().lock()->SendMsg(socket_handle, msg);
GetUpStream().lock()->SendMsg(socket_handle_, msg);
}
}

View File

@ -4,14 +4,17 @@ class UpStream;
class DownStream
{
public:
int socket_handle = a8::INVALID_SOCKET_HANDLE;
void SetUpStream(std::weak_ptr<UpStream> up) { up_ = up; }
int GetSocketHandle() { return socket_handle_; }
void SetSocketHandle(int socket_handle) { socket_handle_ = socket_handle; }
std::weak_ptr<UpStream> GetUpStream() { return up_; }
void SetUpStream(std::weak_ptr<UpStream> up) { up_ = up; }
void ForwardUpStreamMsg(f8::MsgHdr& hdr);
void OnClose();
private:
int socket_handle_ = a8::INVALID_SOCKET_HANDLE;
int type_ = 0;
std::weak_ptr<UpStream> up_;
};

View File

@ -60,9 +60,9 @@ void DownStreamMgr::OnUpStreamDisconnect(int instance_id)
}
}
for (auto& client : delete_client) {
RemovePendingAccount(client->socket_handle);
GCListener::Instance()->ForceCloseClient(client->socket_handle);
socket_hash_.erase(client->socket_handle);
RemovePendingAccount(client->GetSocketHandle());
GCListener::Instance()->ForceCloseClient(client->GetSocketHandle());
socket_hash_.erase(client->GetSocketHandle());
}
}
@ -79,16 +79,16 @@ std::weak_ptr<DownStream> DownStreamMgr::GetDownStream(int sockhandle)
void DownStreamMgr::BindUpStream(int socket_handle, int conn_instance_id)
{
std::weak_ptr<UpStream> conn = UpStreamMgr::Instance()->GetUpStreamById(conn_instance_id);
if (!conn.expired()) {
std::weak_ptr<UpStream> up_wp = UpStreamMgr::Instance()->GetUpStreamById(conn_instance_id);
if (!up_wp.expired()) {
auto down_wp = GetDownStream(socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) {
down->SetUpStream(conn);
down->SetUpStream(up_wp);
} else {
down = std::make_shared<DownStream>();
down->socket_handle = socket_handle;
down->SetUpStream(conn);
socket_hash_[down->socket_handle] = down;
down->SetSocketHandle(socket_handle);
down->SetUpStream(up_wp);
socket_hash_[down->GetSocketHandle()] = down;
f8::UdpLog::Instance()->Info("BindUpStream socket_handle:%d",
{
socket_handle

View File

@ -15,9 +15,9 @@ void UpStreamMgr::Init()
[this] (const a8::Args& args)
{
int instance_id = args.Get<int>(0);
std::weak_ptr<UpStream> conn = GetUpStreamById(instance_id);
if (!conn.expired() && conn.lock()->Connected()) {
conn.lock()->SendStockMsg();
std::weak_ptr<UpStream> up_wp = GetUpStreamById(instance_id);
if (!up_wp.expired() && up_wp.lock()->Connected()) {
up_wp.lock()->SendStockMsg();
}
});
f8::MsgQueue::Instance()->RegisterCallBack