wsproxy/server/wsproxy/upstreammgr.cc
2023-04-14 08:41:25 +00:00

68 lines
1.7 KiB
C++

#include "precompile.h"
#include <f8/msgqueue.h>
#include "upstreammgr.h"
#include "upstream.h"
#include "jsondatamgr.h"
#include "app.h"
void UpStreamMgr::Init()
{
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_UpStreamConnect,
[this] (const a8::Args& args)
{
int instance_id = args.Get<int>(0);
std::weak_ptr<UpStream> conn = GetConnById(instance_id);
if (!conn.expired() && conn.lock()->Connected()) {
conn.lock()->SendStockMsg();
}
});
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_UpStreamDisconnect,
[this] (const a8::Args& args)
{
});
}
void UpStreamMgr::UnInit()
{
for (auto& pair : id_hash_) {
pair.second->UnInit();
}
}
std::weak_ptr<UpStream> UpStreamMgr::GetConnByKey(const std::string& key)
{
auto itr = key_hash_.find(key);
return itr != key_hash_.end() ? itr->second : nullptr;
}
std::weak_ptr<UpStream> UpStreamMgr::GetConnById(int instance_id)
{
auto itr = id_hash_.find(instance_id);
return itr != id_hash_.end() ? itr->second : nullptr;
}
std::weak_ptr<UpStream> UpStreamMgr::RecreateUpStream(const std::string& host, int port)
{
std::string key = host + ":" + a8::XValue(port).GetString();
if (!GetConnByKey(key).expired()) {
return GetConnByKey(key);
}
while (GetConnById(++curr_id_).expired()) {};
int instance_id = curr_id_;
std::string remote_ip = host;
int remote_port = port;
std::shared_ptr<UpStream> conn = std::make_shared<UpStream>();
conn->Init(instance_id, remote_ip, remote_port);
id_hash_[conn->instance_id] = conn;
key_hash_[key] = conn;
conn->Open();
return conn;
}