aozhiwei 03a8c08040 1
2020-08-22 21:35:43 +08:00

323 lines
10 KiB
C++

#include "precompile.h"
#include <a8/mutable_xobject.h>
#include <a8/timer.h>
#include "gsmgr.h"
#include "app.h"
#include "GGListener.h"
#include "framework/cpp/utils.h"
void GSMgr::Init()
{
a8::Timer::Instance()->AddRepeatTimer(1000 * 2,
a8::XParams(),
[] (const a8::XParams& param)
{
GSMgr::Instance()->ClearTimeOutNode();
});
}
void GSMgr::UnInit()
{
}
void GSMgr::_SS_WSP_RequestTargetServer(f8::MsgHdr& hdr, const ss::SS_WSP_RequestTargetServer& msg)
{
int target_channel = GetTargetChannel(msg);
ss::SS_MS_ResponseTargetServer respmsg;
respmsg.set_context_id(msg.context_id());
if (msg.is_reconnect()) {
GSNode* node = GetNodeByNodeKey(msg.server_info());
if (node) {
respmsg.set_host(node->ip);
respmsg.set_port(node->port);
} else {
respmsg.set_error_code(1);
respmsg.set_error_msg("无法分配节点");
}
} else {
GSNode* node = GetNodeByTeamId(msg.team_id());
if (node && node->channel == target_channel) {
respmsg.set_host(node->ip);
respmsg.set_port(node->port);
} else {
node = AllocNode(target_channel);
if (node) {
respmsg.set_host(node->ip);
respmsg.set_port(node->port);
team_hash_[msg.team_id()] = node;
} else {
respmsg.set_error_code(1);
respmsg.set_error_msg("无法分配节点");
}
}
}
GGListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
}
void GSMgr::_SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg)
{
ss::SS_Pong pongmsg;
GGListener::Instance()->SendMsg(hdr.socket_handle, pongmsg);
}
void GSMgr::___GSReport(f8::JsonHttpRequest* request)
{
std::string ip = request->request.Get("ip");
int port = request->request.Get("port");
int alive_count = request->request.Get("alive_count");
int online_num = request->request.Get("online_num");
int room_num = request->request.Get("room_num");
int instance_id = request->request.Get("instance_id");
int node_id = request->request.Get("node_id");
int channel = request->request.Get("channel");
bool servicing = request->request.Get("servicing");
std::string key = ip + ":" + a8::XValue(port).GetString();
auto itr = node_key_hash_.find(key);
if (itr != node_key_hash_.end()) {
if (itr->second.online_num != online_num ||
itr->second.room_num != room_num ||
itr->second.alive_count != alive_count ||
itr->second.servicing != servicing ||
itr->second.channel != channel
) {
itr->second.online_num = online_num;
itr->second.room_num = room_num;
itr->second.servicing = servicing;
if (itr->second.channel != channel) {
itr->second.channel = channel;
OnChannelChange(&itr->second);
}
RearrangeNode();
}
itr->second.alive_count = alive_count;
itr->second.last_active_tick = a8::XGetTickCount();
} else {
GSNode gs;
gs.key = key;
gs.node_id = node_id;
gs.node_idx = App::Instance()->NewUuid();
gs.instance_id = instance_id;
gs.alive_count = alive_count;
gs.online_num = online_num;
gs.room_num = room_num;
gs.ip = ip;
gs.port = port;
gs.servicing = servicing;
gs.channel = channel;
gs.last_active_tick = a8::XGetTickCount();
node_key_hash_[key] = gs;
AddNodeToSortedNodes(&node_key_hash_[key]);
RearrangeNode();
}
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
}
void GSMgr::___GSList(f8::JsonHttpRequest* request)
{
{
a8::MutableXObject* node_list = a8::MutableXObject::NewArray();
for (auto& pair : node_key_hash_) {
a8::MutableXObject* node = a8::MutableXObject::NewObject();
node->SetVal("node_id", pair.second.node_id);
node->SetVal("instance_id", pair.second.instance_id);
node->SetVal("room_num", pair.second.room_num);
node->SetVal("alive_count", pair.second.alive_count);
node->SetVal("online_num", pair.second.online_num);
node->SetVal("ip", pair.second.ip);
node->SetVal("port", pair.second.port);
node->SetVal("servicing", pair.second.servicing);
node->SetVal("channel", pair.second.channel);
node_list->Push(*node);
delete node;
}
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("node_list", *node_list);
delete node_list;
}
{
a8::MutableXObject* node_list = a8::MutableXObject::NewArray();
for (auto& pair : sorted_node_hash_) {
for (GSNode* gs_node : pair.second) {
a8::MutableXObject* node = a8::MutableXObject::NewObject();
node->SetVal("node_id", gs_node->node_id);
node->SetVal("instance_id", gs_node->instance_id);
node->SetVal("room_num", gs_node->room_num);
node->SetVal("online_num", gs_node->online_num);
node->SetVal("ip", gs_node->ip);
node->SetVal("port", gs_node->port);
node->SetVal("servicing", gs_node->servicing);
node->SetVal("channel", gs_node->channel);
node_list->Push(*node);
delete node;
}
}
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("sorted_node_list", *node_list);
delete node_list;
}
}
GSNode* GSMgr::GetNodeByTeamId(const std::string& team_id)
{
auto itr = team_hash_.find(team_id);
return itr != team_hash_.end() ? itr->second : nullptr;
}
GSNode* GSMgr::GetNodeByNodeKey(const std::string& node_key)
{
auto itr = node_key_hash_.find(node_key);
return itr != node_key_hash_.end() ? &itr->second : nullptr;
}
GSNode* GSMgr::AllocNode(int channel)
{
std::vector<GSNode*>* sorted_nodes = GetSortedNodesByChannel(channel);
if (sorted_nodes && !sorted_nodes->empty()) {
size_t rnd = std::min((size_t)2, sorted_nodes->size());
int idx = rand() % rnd;
while (idx >= 0) {
if (sorted_nodes->at(idx)->servicing) {
return sorted_nodes->at(idx);
}
--idx;
}
}
a8::UdpLog::Instance()->Warning
("节点分配失败 sorted_node_hashlist.size:%d node_list.size:%d sorted_node.size:%d channel:%d",
{
sorted_node_hash_.size(),
node_key_hash_.size(),
sorted_nodes ? sorted_nodes->size() : 0,
channel
});
return nullptr;
}
void GSMgr::RearrangeNode()
{
for (auto& pair : sorted_node_hash_) {
std::sort(pair.second.begin(), pair.second.end(),
[] (const GSNode* a, const GSNode* b)
{
if (a->servicing && b->servicing) {
if (a->online_num < b->online_num) {
return true;
}
if (a->online_num > b->online_num) {
return false;
}
if (a->room_num < b->room_num) {
return true;
}
if (a->room_num > b->room_num) {
return false;
}
return a->node_idx > b->node_idx;
}
if (a->servicing) {
return true;
}
if (b->servicing) {
return false;
}
return a->node_idx > b->node_idx;
}
);
}
}
void GSMgr::ClearTimeOutNode()
{
std::vector<GSNode*> time_out_nodes;
for (auto& pair : node_key_hash_) {
if (a8::XGetTickCount() - pair.second.last_active_tick > 1000 * 5) {
time_out_nodes.push_back(&pair.second);
}
}
for (GSNode* node : time_out_nodes) {
RemoveNodeFromSortedNodes(node);
{
std::vector<std::string> deleted_teams;
for (auto& pair : team_hash_) {
if (pair.second == node) {
deleted_teams.push_back(pair.first);
}
}
for (const std::string& team_id : deleted_teams) {
team_hash_.erase(team_id);
}
}
node_key_hash_.erase(node->key);
}
RearrangeNode();
}
void GSMgr::AddNodeToSortedNodes(GSNode* node)
{
auto itr = sorted_node_hash_.find(node->channel);
if (itr != sorted_node_hash_.end()) {
itr->second.push_back(node);
} else {
sorted_node_hash_[node->channel] = std::vector<GSNode*>({node});
}
}
void GSMgr::RemoveNodeFromSortedNodes(GSNode* node)
{
for (auto& pair : sorted_node_hash_) {
std::vector<GSNode*>& node_list = pair.second;
for (size_t i = 0; i < node_list.size(); ++i) {
if (node_list[i] == node) {
node_list.erase(node_list.begin() + i);
break;
}
}
}
}
std::vector<GSNode*>* GSMgr::GetSortedNodesByChannel(int channel)
{
auto itr = sorted_node_hash_.find(channel);
if (itr != sorted_node_hash_.end()) {
return &itr->second;
} else {
return nullptr;
}
}
void GSMgr::OnChannelChange(GSNode* node)
{
RemoveNodeFromSortedNodes(node);
AddNodeToSortedNodes(node);
}
int GSMgr::GetTargetChannel(const ss::SS_WSP_RequestTargetServer& msg)
{
int channel = f8::ExtractChannelIdFromAccountId(msg.account_id());
if (channel == 6000 && !msg.url().empty()) {
std::vector<std::string> strings;
a8::Split(msg.url(), strings, '/');
if (!strings.empty()) {
channel = a8::XValue(strings[0]);
}
}
if (!App::Instance()->IsSeparateChannel(channel)) {
channel = 0;
}
return channel;
}