wsproxy/server/wsproxy/downstreammgr.cc
2023-04-26 06:58:30 +00:00

160 lines
5.1 KiB
C++

#include "precompile.h"
#include <f8/udplog.h>
#include <f8/msgqueue.h>
#include "downstreammgr.h"
#include "ss_proto.pb.h"
#include "downstream.h"
#include "upstream.h"
#include "upstreammgr.h"
#include "GCListener.h"
#include "app.h"
#include "mastermgr.h"
struct PendingAccount
{
int socket_handle = 0;
std::string account_id;
long long add_tick = 0;
f8::TimerWp timer_wp;
};
void DownStreamMgr::Init()
{
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_ClientSocketDisconnect,
[this] (const a8::Args& args)
{
int socket_handle = args.Get<int>(0);
OnClientDisconnect(socket_handle);
});
}
void DownStreamMgr::UnInit()
{
socket_hash_.clear();
pending_account_hash_.clear();
}
void DownStreamMgr::OnClientDisconnect(int socket_handle)
{
auto down_wp = GetDownStream(socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) {
down->OnClose();
socket_hash_.erase(socket_handle);
}
RemovePendingAccount(socket_handle);
MasterMgr::Instance()->RemoveRequest(socket_handle);
}
void DownStreamMgr::OnUpStreamDisconnect(int instance_id)
{
std::list<std::shared_ptr<DownStream>> delete_client;
for (auto& pair : socket_hash_) {
if (!pair.second->GetUpStream().expired() &&
pair.second->GetUpStream().lock()->instance_id == instance_id) {
delete_client.push_back(pair.second);
}
}
for (auto& client : delete_client) {
RemovePendingAccount(client->GetSocketHandle());
GCListener::Instance()->ForceCloseClient(client->GetSocketHandle());
socket_hash_.erase(client->GetSocketHandle());
}
}
void DownStreamMgr::OnUpStreamConnect(int instance_id)
{
}
std::weak_ptr<DownStream> DownStreamMgr::GetDownStream(int sockhandle)
{
auto itr = socket_hash_.find(sockhandle);
return itr != socket_hash_.end() ? itr->second : nullptr;
}
void DownStreamMgr::BindUpStream(int socket_handle, int instance_id)
{
std::weak_ptr<UpStream> up_wp = UpStreamMgr::Instance()->GetUpStreamById(instance_id);
if (!up_wp.expired()) {
auto down_wp = GetDownStream(socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) {
down->ReBindUpStream(up_wp);
} else {
down = std::make_shared<DownStream>();
down->Init(socket_handle, up_wp);
socket_hash_[down->GetSocketHandle()] = down;
f8::UdpLog::Instance()->Info("BindUpStream socket_handle:%d",
{
socket_handle
});
{
if (auto pending_account = GetPendingAccount(socket_handle)) {
long long cur_tick = a8::XGetTickCount();
if (cur_tick - pending_account->add_tick > App::Instance()->GetPerf().max_join_time) {
App::Instance()->GetPerf().max_join_time = cur_tick - pending_account->add_tick;
}
f8::UdpLog::Instance()->Info("BindUpStream account_id:%s",
{
pending_account->account_id
});
RemovePendingAccount(socket_handle);
}
}
}
}
}
void DownStreamMgr::AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick)
{
f8::UdpLog::Instance()->Info("AddPendingAccount %s %d", {account_id, socket_handle});
if (!GetPendingAccount(socket_handle)){
auto timer_wp = f8::Timer::Instance()->SetTimeoutWpEx
(
1000 * 10,
[this, socket_handle] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
pending_account_hash_.erase(socket_handle);
App::Instance()->GetPerf().max_join_time =
std::max((long long)1000 * 10, App::Instance()->GetPerf().max_join_time);
}
},
&timer_attacher_
);
auto p = std::make_shared<PendingAccount>();
p->socket_handle = socket_handle;
p->account_id = account_id;
p->add_tick = req_tick;
p->timer_wp = timer_wp;
pending_account_hash_[socket_handle] = p;
}
}
std::shared_ptr<PendingAccount> DownStreamMgr::GetPendingAccount(int socket_handle)
{
auto itr = pending_account_hash_.find(socket_handle);
return itr != pending_account_hash_.end() ? itr->second : nullptr;
}
void DownStreamMgr::RemovePendingAccount(int socket_handle)
{
auto itr = pending_account_hash_.find(socket_handle);
if (itr != pending_account_hash_.end()) {
f8::UdpLog::Instance()->Info("RemovePendingAccount %d", {socket_handle});
if (!itr->second->timer_wp.expired()) {
f8::Timer::Instance()->Delete(itr->second->timer_wp);
}
pending_account_hash_.erase(itr);
}
}
int DownStreamMgr::GetDownStreamCount()
{
return socket_hash_.size();
}