This commit is contained in:
aozhiwei 2020-05-03 16:22:19 +08:00
parent 4c8f2a719a
commit 19f75b6929
7 changed files with 23 additions and 423 deletions

View File

@ -7,6 +7,21 @@
void MSConnMgr::Init()
{
#if MASTER_MODE
auto master_svr_cluster_conf = JsonDataMgr::Instance()->GetMasterServerClusterConf();
for (int i = 0; i < master_svr_cluster_conf->Size(); ++i) {
auto master_svr_conf = master_svr_cluster_conf->At(i);
int instance_id = master_svr_conf->At("instance_id")->AsXValue();
std::string remote_ip = master_svr_conf->At("ip")->AsXValue();
int remote_port = master_svr_conf->At("port")->AsXValue();
{
MasterSvr* conn = new MasterSvr();
conn->Init(instance_id, remote_ip, remote_port);
mastersvr_hash_[conn->instance_id] = conn;
conn->Open();
}
}
#endif
}
void MSConnMgr::UnInit()

View File

@ -24,8 +24,6 @@
#include "MSConnMgr.h"
#include "IMConnMgr.h"
#include "mastersvr.h"
#include "mastersvrmgr.h"
struct MsgNode
{
@ -125,7 +123,6 @@ bool App::Init(int argc, char* argv[])
JsonDataMgr::Instance()->Init();
uuid.SetMachineId(instance_id);
GameClientMgr::Instance()->Init();
MasterSvrMgr::Instance()->Init();
IMConnMgr::Instance()->Init();
MSConnMgr::Instance()->Init();
WSListener::Instance()->Init();
@ -176,7 +173,6 @@ void App::UnInit()
{
a8::XPrintf("friend_imserver terminating instance_id:%d pid:%d\n", {instance_id, getpid()});
WSListener::Instance()->UnInit();
MasterSvrMgr::Instance()->UnInit();
IMConnMgr::Instance()->UnInit();
MSConnMgr::Instance()->UnInit();
GameClientMgr::Instance()->UnInit();
@ -399,7 +395,9 @@ void App::ProcessClientMsg(f8::MsgHdr& hdr)
ss::SS_CMLogin_CMReConnect_CommonHead2 msg;
bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset);
if (ok) {
#if 0
MasterSvrMgr::Instance()->RequestTargetServer(hdr, msg.team_uuid(), msg.account_id());
#endif
}
} else {
GameClient* client = GameClientMgr::Instance()->GetGameClientBySocket(hdr.socket_handle);
@ -418,11 +416,13 @@ void App::ProcessMasterServerMsg(f8::MsgHdr& hdr)
f8::NetMsgHandler* handler = f8::GetNetMsgHandler(&HandlerMgr::Instance()->msmsghandler,
hdr.msgid);
if (handler) {
#if 0
switch (handler->handlerid) {
case HID_MasterSvrMgr:
ProcessNetMsg(handler, MasterSvrMgr::Instance(), hdr);
break;
}
#endif
}
}
@ -457,7 +457,9 @@ void App::ProcessIMMsg()
case IM_WSProxySocketDisconnect:
{
GameClientMgr::Instance()->OnClientDisconnect(pdelnode->params);
#if 0
MasterSvrMgr::Instance()->RemoveRequest(pdelnode->params.param1, pdelnode->params.sender, true);
#endif
}
break;
#if 0

View File

@ -5,7 +5,6 @@
#include "handlermgr.h"
#include "WSListener.h"
#include "mastersvrmgr.h"
#include "app.h"
#include "ss_proto.pb.h"
@ -30,7 +29,9 @@ void HandlerMgr::UnInit()
void HandlerMgr::RegisterNetMsgHandlers()
{
#if 0
RegisterNetMsgHandler(&msmsghandler, &MasterSvrMgr::_SS_MS_ResponseTargetServer);
#endif
}
void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle,

View File

@ -1,158 +0,0 @@
#include "precompile.h"
#include <string.h>
#include "ss_proto.pb.h"
#include "ss_msgid.pb.h"
#include "mastersvr.h"
#include <a8/tcpclient.h>
#include <a8/asynctcpclient.h>
#include <a8/ioloop.h>
#include <a8/udplog.h>
#include <a8/timer.h>
#include "app.h"
const int PACK_MAX = 1024 * 64;
void MasterSvr::Init(int instance_id, const std::string& remote_ip, int remote_port)
{
this->instance_id = instance_id;
this->remote_ip = remote_ip;
this->remote_port = remote_port;
recv_bufflen_ = 0;
last_pong_tick = a8::XGetTickCount();
recv_buff_ = (char*) malloc(PACK_MAX * 2);
tcp_client_ = a8::IoLoop::Instance()->CreateAsyncTcpClient();
tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&MasterSvr::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&MasterSvr::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&MasterSvr::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&MasterSvr::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150),
a8::XParams().SetSender(this),
[] (const a8::XParams& param)
{
MasterSvr* conn = (MasterSvr*)param.sender.GetUserData();
conn->CheckAlive();
});
}
void MasterSvr::UnInit()
{
a8::Timer::Instance()->DeleteTimer(timer_);
timer_ = nullptr;
tcp_client_->Close();
a8::IoLoop::Instance()->DestoryAsyncTcpClient(tcp_client_);
tcp_client_ = nullptr;
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
}
void MasterSvr::Open()
{
tcp_client_->Open();
}
void MasterSvr::Close()
{
tcp_client_->Close();
}
bool MasterSvr::Connected()
{
return tcp_client_->Connected();
}
void MasterSvr::on_error(a8::AsyncTcpClient* sender, int errorId)
{
a8::UdpLog::Instance()->Error("MasterSvr errorid=%d remote_ip:%s remote_port:%d",
{
errorId,
sender->remote_address,
sender->remote_port
});
}
void MasterSvr::on_connect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("masterserver connected", {});
}
void MasterSvr::on_disconnect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("masterserver %d disconnected after 10s later reconnect", {instance_id});
App::Instance()->AddIMMsg(IM_MasterSvrDisconnect,
a8::XParams()
.SetSender(instance_id)
);
}
void MasterSvr::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len)
{
#if 0
++App::Instance()->perf.read_count;
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Debug("recvied masterserver too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
recv_bufflen_ += len;
}
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(f8::PackHead)) {
f8::PackHead* p = (f8::PackHead*) &recv_buff_[offset];
if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(f8::PackHead) + p->packlen) {
break;
}
App::Instance()->AddSocketMsg(SF_MasterServer,
0,
instance_id,
p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(f8::PackHead)],
p->packlen);
offset += sizeof(f8::PackHead) + p->packlen;
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
a8::UdpLog::Instance()->Debug("recvied bad package", {});
}
if (offset > 0 && offset < recv_bufflen_) {
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
#if 1
last_pong_tick = a8::XGetTickCount();
#endif
}
void MasterSvr::CheckAlive()
{
if (!Connected()) {
Open();
} else {
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
last_pong_tick = a8::XGetTickCount();
Close();
Open();
} else {
ss::SS_Ping msg;
SendMsg(msg);
}
}
}

View File

@ -1,48 +0,0 @@
#pragma once
#include "framework/cpp/protoutils.h"
namespace a8
{
class TcpClient;
class AsyncTcpClient;
}
struct timer_list;
class MasterSvr
{
public:
int instance_id = 0;
std::string remote_ip;
int remote_port = 0;
a8::tick_t last_pong_tick = 0;
public:
void Init(int instance_id, const std::string& remote_ip, int remote_port);
void UnInit();
void Open();
void Close();
bool Connected();
template <typename T>
void SendMsg(T& msg)
{
static int msgid = f8::Net_GetMessageId(msg);
f8::Net_SendMsg(tcp_client_, 0, msgid, msg);
}
private:
void on_error(a8::AsyncTcpClient* sender, int errorId);
void on_connect(a8::AsyncTcpClient* sender);
void on_disconnect(a8::AsyncTcpClient* sender);
void on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len);
void CheckAlive();
private:
char *recv_buff_ = nullptr;
unsigned int recv_bufflen_ = 0;
a8::AsyncTcpClient* tcp_client_ = nullptr;
timer_list* timer_ = nullptr;
};

View File

@ -1,168 +0,0 @@
#include "precompile.h"
#include <unistd.h>
#include <a8/openssl.h>
#include <a8/timer.h>
#include "mastersvrmgr.h"
#include "mastersvr.h"
#include "jsondatamgr.h"
#include "ss_proto.pb.h"
#include "app.h"
#include "gameclientmgr.h"
#include "framework/cpp/netmsghandler.h"
void MasterSvrMgr::Init()
{
curr_context_id_ = a8::MakeInt64(0, time(nullptr) + 1000 * 60 * 10);
#if MASTER_MODE
auto master_svr_cluster_conf = JsonDataMgr::Instance()->GetMasterServerClusterConf();
for (int i = 0; i < master_svr_cluster_conf->Size(); ++i) {
auto master_svr_conf = master_svr_cluster_conf->At(i);
int instance_id = master_svr_conf->At("instance_id")->AsXValue();
std::string remote_ip = master_svr_conf->At("ip")->AsXValue();
int remote_port = master_svr_conf->At("port")->AsXValue();
{
MasterSvr* conn = new MasterSvr();
conn->Init(instance_id, remote_ip, remote_port);
mastersvr_hash_[conn->instance_id] = conn;
conn->Open();
}
}
#endif
}
void MasterSvrMgr::UnInit()
{
}
void MasterSvrMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg)
{
f8::MsgHdr* context_hdr = GetHdrByContextId(msg.context_id());
if (context_hdr) {
bool auto_free = true;
int socket_handle = context_hdr->socket_handle;
if (msg.error_code() == 0) {
#if 0
TargetConn* conn = TargetConnMgr::Instance()->RecreateTargetConn(
msg.host(),
msg.port()
);
assert(conn);
if (conn) {
auto_free = false;
conn->ForwardClientMsgEx(context_hdr);
}
#endif
}
RemoveRequest(socket_handle, msg.context_id(), auto_free);
}
}
MasterSvr* MasterSvrMgr::GetConnById(int instance_id)
{
auto itr = mastersvr_hash_.find(instance_id);
return itr != mastersvr_hash_.end() ? itr->second : nullptr;
}
void MasterSvrMgr::RequestTargetServer(f8::MsgHdr& hdr, const std::string& team_id, const std::string& account_id)
{
if (GetContextIdBySocket(hdr.socket_handle) != 0) {
return;
}
unsigned int code = 0;
std::string team_uuid = team_id;
if (!team_id.empty()) {
code = a8::openssl::Crc32((unsigned char*)team_id.data(), team_id.size());
} else {
std::string data = a8::Format("!%s_%s_%d_%d",
{
account_id,
App::Instance()->uuid.Generate(),
getpid(),
rand()
});
team_uuid = data;
code = a8::openssl::Crc32((unsigned char*)data.data(), data.size());
}
MasterSvr* svr = GetConnById(code % mastersvr_hash_.size() + 1);
if (svr) {
++curr_context_id_;
a8::TimerAttacher* timer_attacher = new a8::TimerAttacher();
f8::MsgHdr* new_hdr = hdr.Clone();
new_hdr->user_data = timer_attacher;
ss::SS_WSP_RequestTargetServer msg;
msg.set_context_id(curr_context_id_);
msg.set_team_id(team_uuid);
svr->SendMsg(msg);
pending_socket_hash_[hdr.socket_handle] = curr_context_id_;
assert(pending_request_hash_.find(curr_context_id_) == pending_request_hash_.end());
pending_request_hash_[curr_context_id_] = new_hdr;
auto timer_func =
[] (const a8::XParams& param)
{
a8::Timer::Instance()->RemoveTimerAfterFunc(a8::Timer::Instance()->GetRunningTimer());
MasterSvrMgr::Instance()->RemoveRequest(
param.param1,
param.sender,
true
);
};
auto timer_after_func =
[] (const a8::XParams& param)
{
long long req_handle_time = a8::XGetTickCount() - param.param3.GetInt64();
if (req_handle_time > App::Instance()->perf.max_login_time) {
App::Instance()->perf.max_login_time = req_handle_time;
}
GameClientMgr::Instance()->AddPendingAccount(param.param2.GetString(), param.param1, param.param3);
};
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
a8::XParams()
.SetSender(curr_context_id_)
.SetParam1(hdr.socket_handle)
.SetParam2(account_id)
.SetParam3(a8::XGetTickCount()),
timer_func,
&timer_attacher->timer_list_,
timer_after_func
);
}
}
void MasterSvrMgr::RemoveRequest(int socket_handle, long long context_id, bool auto_free)
{
if (context_id == GetContextIdBySocket(socket_handle)) {
f8::MsgHdr* hdr = GetHdrByContextId(context_id);
if (hdr) {
a8::TimerAttacher* timer_attacher = (a8::TimerAttacher*)hdr->user_data;
delete timer_attacher;
hdr->user_data = nullptr;
if (auto_free) {
if (hdr->buf) {
free((char*)hdr->buf);
}
free(hdr);
}
}
pending_request_hash_.erase(context_id);
pending_socket_hash_.erase(socket_handle);
}
}
long long MasterSvrMgr::GetContextIdBySocket(int socket_handle)
{
auto itr = pending_socket_hash_.find(socket_handle);
return itr != pending_socket_hash_.end() ? itr->second : 0;
}
f8::MsgHdr* MasterSvrMgr::GetHdrByContextId(long long context_id)
{
auto itr = pending_request_hash_.find(context_id);
return itr != pending_request_hash_.end() ? itr->second : nullptr;
}

View File

@ -1,44 +0,0 @@
#pragma once
namespace f8
{
struct MsgHdr;
}
namespace ss
{
class SS_MS_ResponseTargetServer;
}
class MasterSvr;
class MasterSvrMgr : public a8::Singleton<MasterSvrMgr>
{
public:
enum { HID = HID_MasterSvrMgr };
private:
MasterSvrMgr() {};
friend class a8::Singleton<MasterSvrMgr>;
public:
void Init();
void UnInit();
void _SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg);
void RequestTargetServer(f8::MsgHdr& hdr, const std::string& team_id, const std::string& account_id);
void RemoveRequest(int socket_handle, long long context_id, bool auto_free);
private:
long long GetContextIdBySocket(int socket_handle);
f8::MsgHdr* GetHdrByContextId(long long context_id);
MasterSvr* GetConnById(int instance_id);
private:
int target_conn_id_ = 100;
long long curr_context_id_ = 0;
std::map<int, MasterSvr*> mastersvr_hash_;
std::map<int, long long> pending_socket_hash_;
std::map<long long, f8::MsgHdr*> pending_request_hash_;
};