wsproxy/server/wsproxy/mastermgr.cc
2023-04-14 11:46:55 +00:00

190 lines
6.6 KiB
C++

#include "precompile.h"
#include <unistd.h>
#include <a8/openssl.h>
#include <f8/timer.h>
#include <f8/netmsghandler.h>
#include <f8/protoutils.h>
#include "mastermgr.h"
#include "master.h"
#include "jsondatamgr.h"
#include "ss_proto.pb.h"
#include "upstream.h"
#include "upstreammgr.h"
#include "app.h"
#include "downstreammgr.h"
class RequestTarget
{
public:
long long context_id = 0;
int socket_handle = 0;
std::string account_id;
f8::MsgHdr* hdr_copy = nullptr;
f8::TimerWp timer_wp;
long long req_tick = 0;
std::weak_ptr<UpStream> conn;
};
void MasterMgr::Init()
{
curr_context_id_ = a8::MakeInt64(0, time(nullptr) + 1000 * 60 * 10);
auto master_svr_cluster_conf = JsonDataMgr::Instance()->GetMasterServerClusterConf();
for (int i = 0; i < master_svr_cluster_conf->Size(); ++i) {
auto master_svr_conf = master_svr_cluster_conf->At(i);
int instance_id = master_svr_conf->At("instance_id")->AsXValue();
std::string remote_ip = master_svr_conf->At("ip")->AsXValue();
int remote_port = master_svr_conf->At("port")->AsXValue();
{
auto conn = std::make_shared<Master>();
conn->Init(instance_id, remote_ip, remote_port);
mastersvr_hash_[conn->instance_id] = conn;
conn->Open();
}
}
}
void MasterMgr::UnInit()
{
}
void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg)
{
auto req = GetRequestByContextId(msg.context_id());
if (req) {
if (msg.error_code() == 0) {
std::weak_ptr<UpStream> conn = UpStreamMgr::Instance()->RecreateUpStream
(
msg.host(),
msg.port()
);
if (!conn.expired()) {
conn.lock()->ForwardClientMsgEx(req->hdr_copy);
req->conn = conn;
req->hdr_copy = nullptr;
if (!req->timer_wp.expired()) {
f8::Timer::Instance()->FireEvent(req->timer_wp,
ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT,
nullptr);
f8::Timer::Instance()->Delete(req->timer_wp);
}
RemoveRequest(req->socket_handle);
return;
} else {
abort();
}
} else {
RemoveRequest(req->socket_handle);
}
}
}
std::shared_ptr<Master> MasterMgr::GetConnById(int instance_id)
{
auto itr = mastersvr_hash_.find(instance_id);
return itr != mastersvr_hash_.end() ? itr->second : nullptr;
}
void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr,
const std::string& team_id,
const std::string& account_id,
const std::string& server_info,
int is_reconnect,
int proto_version)
{
if (GetRequestBySocket(hdr.socket_handle)) {
return;
}
unsigned int code = 0;
std::string team_uuid = team_id;
if (!team_id.empty()) {
code = a8::openssl::Crc32((unsigned char*)team_id.data(), team_id.size());
} else {
std::string data = a8::Format("!%s_%s_%d_%d",
{
account_id,
App::Instance()->uuid.Generate(),
getpid(),
rand()
});
team_uuid = data;
code = a8::openssl::Crc32((unsigned char*)data.data(), data.size());
}
std::shared_ptr<Master> svr = GetConnById(code % mastersvr_hash_.size() + 1);
if (svr) {
++curr_context_id_;
auto req = std::make_shared<RequestTarget>();
req->context_id = curr_context_id_;
req->socket_handle = hdr.socket_handle;
req->account_id = account_id;
req->req_tick = a8::XGetTickCount();
req->hdr_copy = hdr.Clone();
ss::SS_WSP_RequestTargetServer msg;
msg.set_context_id(curr_context_id_);
msg.set_account_id(account_id);
msg.set_team_id(team_uuid);
msg.set_server_info(server_info);
msg.set_is_reconnect(is_reconnect);
msg.set_proto_version(proto_version);
svr->SendMsg(msg);
pending_socket_hash_[hdr.socket_handle] = req;
assert(pending_context_hash_.find(curr_context_id_) == pending_context_hash_.end());
pending_context_hash_[curr_context_id_] = req;
req->timer_wp = f8::Timer::Instance()->SetTimeoutWp
(1000 * 10,
[req] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
MasterMgr::Instance()->RemoveRequest
(
req->socket_handle
);
long long req_handle_time = a8::XGetTickCount() - req->req_tick;
if (req_handle_time > App::Instance()->perf.max_login_time) {
App::Instance()->perf.max_login_time = req_handle_time;
}
DownStreamMgr::Instance()->AddPendingAccount(req->account_id, req->socket_handle, req->req_tick);
} else if (ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT == event) {
long long req_handle_time = a8::XGetTickCount() - req->req_tick;
if (req_handle_time > App::Instance()->perf.max_login_time) {
App::Instance()->perf.max_login_time = req_handle_time;
}
DownStreamMgr::Instance()->AddPendingAccount(req->account_id, req->socket_handle, req->req_tick);
}
}
);
}
}
void MasterMgr::RemoveRequest(int socket_handle)
{
auto req = GetRequestBySocket(socket_handle);
if (req) {
if (req->hdr_copy) {
f8::MsgHdr::Destroy(req->hdr_copy);
req->hdr_copy = nullptr;
}
pending_context_hash_.erase(req->context_id);
pending_socket_hash_.erase(socket_handle);
}
}
std::shared_ptr<RequestTarget> MasterMgr::GetRequestBySocket(int socket_handle)
{
auto itr = pending_socket_hash_.find(socket_handle);
return itr != pending_socket_hash_.end() ? itr->second : nullptr;
}
std::shared_ptr<RequestTarget> MasterMgr::GetRequestByContextId(long long context_id)
{
auto itr = pending_context_hash_.find(context_id);
return itr != pending_context_hash_.end() ? itr->second : nullptr;
}