2020-12-28 10:47:04 +08:00

310 lines
9.0 KiB
C++

#include "precompile.h"
#include <a8/mutable_xobject.h>
#include <a8/timer.h>
#include <a8/udplog.h>
#include "svrmgr.h"
#include "app.h"
#include "WSListener.h"
#include "IMListener.h"
#include "cachemgr.h"
void SvrMgr::Init()
{
a8::Timer::Instance()->AddRepeatTimer
(1000 * 2,
a8::XParams(),
[] (const a8::XParams& param)
{
SvrMgr::Instance()->ClearTimeOutNode();
});
}
void SvrMgr::UnInit()
{
}
void SvrMgr::_SS_WSP_RequestTargetServer(f8::MsgHdr& hdr, const ss::SS_WSP_RequestTargetServer& msg)
{
ss::SS_MS_ResponseTargetServer respmsg;
respmsg.set_context_id(msg.context_id());
SvrNode* node = AllocNode(msg.account_id());
if (node) {
respmsg.set_host(node->ip);
respmsg.set_port(node->port);
} else {
respmsg.set_error_code(1);
respmsg.set_error_msg("无法分配节点");
}
WSListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
}
void SvrMgr::_SS_IM_ReportServerInfo(f8::MsgHdr& hdr, const ss::SS_IM_ReportServerInfo& msg)
{
SvrNode* svr = GetNodeBySocket(hdr.socket_handle);
if (svr) {
if (svr->instance_id != msg.instance_id() ||
svr->ip != msg.ip() ||
svr->port != msg.port()
) {
a8::UdpLog::Instance()->Warning
(
"serverA %s:%d-%d serverB %s:%d-%d",
{
svr->ip,
svr->port,
svr->instance_id,
msg.ip(),
msg.port(),
msg.instance_id()
});
IMListener::Instance()->ForceCloseClient(hdr.socket_handle);
return;
}
if (svr->online_num != msg.online_num() ||
svr->servicing != msg.servicing()
) {
svr->online_num = msg.online_num();
svr->servicing = msg.servicing();
RearrangeNode();
}
svr->last_active_tick = a8::XGetTickCount();
} else {
std::string key = msg.ip() + ":" + a8::XValue(msg.port()).GetString();
if (GetNodeByKey(key)) {
a8::UdpLog::Instance()->Warning
(
"server %s:%d registered",
{
msg.ip(),
msg.port()
});
IMListener::Instance()->ForceCloseClient(hdr.socket_handle);
return;
}
svr = new SvrNode;
svr->socket_handle = hdr.socket_handle;
svr->key = key;
svr->node_idx = App::Instance()->NewUuid();
svr->instance_id = msg.instance_id();
svr->online_num = msg.online_num();
svr->ip = msg.ip();
svr->port = msg.port();
svr->servicing = msg.servicing();
svr->last_active_tick = a8::XGetTickCount();
a8::UdpLog::Instance()->Info
(
"register new node socket_handle:%d node_idx:%d instance_id:%d "
"ip:%s port:%d online_num:%d servicing:%d",
{
svr->socket_handle,
svr->node_idx,
svr->instance_id,
svr->ip,
svr->port,
svr->online_num,
svr->servicing
}
);
node_key_hash_[key] = svr;
socket_hash_[hdr.socket_handle] = svr;
node_sorted_list_.push_back(svr);
RearrangeNode();
}
ss::SS_MS_ConfirmedServerInfo respmsg;
IMListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
}
void SvrMgr::_SS_IM_IMServerList(f8::MsgHdr& hdr, const ss::SS_IM_IMServerList& msg)
{
ss::SS_MS_IMServerList respmsg;
for (auto& pair : node_key_hash_) {
auto p = respmsg.add_server_list();
p->set_instance_id(pair.second->instance_id);
p->set_online_num(pair.second->online_num);
p->set_ip(pair.second->ip);
p->set_port(pair.second->port);
p->set_servicing(pair.second->servicing);
}
IMListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
}
void SvrMgr::___GSList(f8::JsonHttpRequest* request)
{
{
a8::MutableXObject* node_list = a8::MutableXObject::NewArray();
for (auto& pair : node_key_hash_) {
a8::MutableXObject* node = a8::MutableXObject::NewObject();
node->SetVal("instance_id", pair.second->instance_id);
node->SetVal("online_num", pair.second->online_num);
node->SetVal("ip", pair.second->ip);
node->SetVal("port", pair.second->port);
node->SetVal("servicing", pair.second->servicing);
node_list->Push(*node);
delete node;
}
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("node_list", *node_list);
delete node_list;
}
{
a8::MutableXObject* node_list = a8::MutableXObject::NewArray();
for (SvrNode* gs_node : node_sorted_list_) {
a8::MutableXObject* node = a8::MutableXObject::NewObject();
node->SetVal("instance_id", gs_node->instance_id);
node->SetVal("online_num", gs_node->online_num);
node->SetVal("ip", gs_node->ip);
node->SetVal("port", gs_node->port);
node->SetVal("servicing", gs_node->servicing);
node_list->Push(*node);
delete node;
}
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("sorted_node_list", *node_list);
delete node_list;
}
}
void SvrMgr::OnIMServerDisconnect(a8::XParams& param)
{
SvrNode* svr = GetNodeBySocket(param.sender);
if (svr) {
RemoveNode(svr);
RearrangeNode();
}
}
size_t SvrMgr::GetNodeNum()
{
return node_key_hash_.size();
}
SvrNode* SvrMgr::AllocNode(const std::string& account_id)
{
Friend* friend_data = CacheMgr::Instance()->GetFriendData(account_id);
if (friend_data && friend_data->svr_node) {
return friend_data->svr_node;
}
if (!node_sorted_list_.empty()) {
size_t rnd = std::min((size_t)2, node_sorted_list_.size());
int idx = rand() % rnd;
while (idx >= 0) {
if (node_sorted_list_[idx]->servicing) {
return node_sorted_list_[idx];
}
--idx;
}
}
a8::UdpLog::Instance()->Warning
(
"节点分配失败 node_sorted_list.size:%d node_list.size:%d",
{
node_sorted_list_.size(),
node_key_hash_.size()
});
return nullptr;
}
void SvrMgr::RearrangeNode()
{
std::sort
(
node_sorted_list_.begin(),
node_sorted_list_.end(),
[] (const SvrNode* a, const SvrNode* b)
{
if (a->servicing && b->servicing) {
if (a->online_num < b->online_num) {
return true;
}
if (a->online_num > b->online_num) {
return false;
}
return a->node_idx > b->node_idx;
}
if (a->servicing) {
return true;
}
if (b->servicing) {
return false;
}
return a->node_idx > b->node_idx;
}
);
}
void SvrMgr::ClearTimeOutNode()
{
std::vector<SvrNode*> time_out_nodes;
for (auto& pair : node_key_hash_) {
if (a8::XGetTickCount() - pair.second->last_active_tick > 1000 * 3) {
time_out_nodes.push_back(pair.second);
}
}
for (SvrNode* node : time_out_nodes) {
RemoveNode(node);
}
RearrangeNode();
}
SvrNode* SvrMgr::GetNodeByKey(const std::string& key)
{
auto itr = node_key_hash_.find(key);
return itr != node_key_hash_.end() ? itr->second : nullptr;
}
SvrNode* SvrMgr::GetNodeBySocket(int socket_handle)
{
auto itr = socket_hash_.find(socket_handle);
return itr != socket_hash_.end() ? itr->second : nullptr;
}
void SvrMgr::RemoveNode(SvrNode* svr_node)
{
{
a8::UdpLog::Instance()->Warning
(
"remove node socket_handle:%d node_idx:%d instance_id:%d "
"ip:%s port:%d online_num:%d servicing:%d",
{
svr_node->socket_handle,
svr_node->node_idx,
svr_node->instance_id,
svr_node->ip,
svr_node->port,
svr_node->online_num,
svr_node->servicing
}
);
}
{
for (size_t i = 0; i < node_sorted_list_.size(); ++i) {
if (node_sorted_list_[i] == svr_node) {
node_sorted_list_.erase(node_sorted_list_.begin() + i);
break;
}
}
}
{
struct Friend *node, *tmp;
list_for_each_entry_safe(node, tmp, &svr_node->human_list, human_entry) {
list_del_init(&node->human_entry);
node->svr_node = nullptr;
}
}
{
node_key_hash_.erase(svr_node->key);
socket_hash_.erase(svr_node->socket_handle);
}
delete svr_node;
}