This commit is contained in:
azw 2023-09-05 13:27:55 +00:00
parent 676965d292
commit 438247f086
8 changed files with 54 additions and 217 deletions

View File

@ -1,10 +1,6 @@
#include "precompile.h"
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
#include <a8/xtimer.h>
#include <a8/uuid.h>
@ -12,6 +8,7 @@
#include <f8/netmsghandler.h>
#include <f8/udplog.h>
#include <f8/app.h>
#include <f8/msgqueue.h>
#include "app.h"
@ -50,9 +47,6 @@ struct UdpMsgNode
UdpMsgNode* next;
};
const char* const PROJ_LOG_ROOT_FMT = "/data/logs/%s/logs";
const char* const PROJ_LOG_FILENAME_FMT = "log_$pid_%Y%m%d.log";
static void SavePerfLog()
{
f8::UdpLog::Instance()->Info("max_run_delay_time:%d max_timer_idle:%d "
@ -73,7 +67,7 @@ static void SavePerfLog()
LongSessionMgr::Instance()->GetLongSessionCount(),
DownStreamMgr::Instance()->GetDownStreamCount()
});
if (App::Instance()->HasFlag(2)) {
if (f8::App::Instance()->HasFlag(2)) {
a8::XPrintf("mainloop_time:%d netmsg_time:%d send_node_num:%d sent_bytes_num:%d\n",
{
App::Instance()->GetPerf().max_run_delay_time,
@ -88,45 +82,18 @@ static void SavePerfLog()
App::Instance()->GetPerf().max_join_time = 0;
}
bool App::Init(int argc, char* argv[])
const std::string App::GetPkgName()
{
signal(SIGPIPE, SIG_IGN);
this->argc_ = argc;
this->argv_ = argv;
return "wsproxy";
}
if (!ParseOpt()) {
terminated_ = true;
if (node_id_ <= 0) {
a8::XPrintf("gameserver启动失败,缺少-n参数\n", {});
} else if (node_id_ > MAX_NODE_ID) {
a8::XPrintf("gameserver启动失败,-n参数不能大于%d\n", {MAX_NODE_ID});
} else if (instance_id_ <= 0) {
a8::XPrintf("gameserver启动失败,缺少-i参数\n", {});
} else if (instance_id_ > MAX_INSTANCE_ID) {
a8::XPrintf("gameserver启动失败,-i参数不能大于%d\n", {MAX_INSTANCE_ID});
}
return false;
}
a8::XPrintf("wsproxy starting node_id:%d instance_id:%d pid:%d\n",
{
node_id_,
instance_id_,
getpid()
});
uuid_ = std::make_shared<a8::uuid::SnowFlake>();
loop_mutex_ = new std::mutex();
loop_cond_ = new std::condition_variable();
void App::Init()
{
msg_mutex_ = new std::mutex();
udp_msg_mutex_ = new std::mutex();
srand(time(nullptr));
InitLog();
f8::MsgQueue::Instance()->Init();
HandlerMgr::Instance()->Init();
f8::Timer::Instance()->Init();
JsonDataMgr::Instance()->Init();
uuid_->SetMachineId((node_id_ - 1) * MAX_NODE_ID + instance_id_);
DownStreamMgr::Instance()->Init();
MasterMgr::Instance()->Init();
UpStreamMgr::Instance()->Init();
@ -135,8 +102,8 @@ bool App::Init(int argc, char* argv[])
f8::UdpLog::Instance()->Info("wsproxy starting instance_id:%d pid:%d",
{
instance_id_,
getpid(),
f8::App::Instance()->GetInstanceId(),
f8::App::Instance()->GetPid(),
});
{
int perf_log_time = 1000 * 60 * 5;
@ -152,48 +119,33 @@ bool App::Init(int argc, char* argv[])
}
});
}
if (HasFlag(1)) {
if (f8::App::Instance()->HasFlag(1)) {
f8::Timer::Instance()->SetTimeout
(
1000 * 60,
[] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
App::Instance()->terminated_ = true;
App::Instance()->NotifyLoopCond();
f8::App::Instance()->Terminated();
f8::App::Instance()->NotifyLoopCond();
}
}
);
}
if (HasFlag(4)) {
f8::Timer::Instance()->SetTimeout
(
1000 * 30,
[] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
App::Instance()->shutdowned_ = true;
a8::XPrintf("shutdowned\n", {});
}
}
);
}
return true;
}
void App::UnInit()
{
a8::XPrintf("wsproxy terminating instance_id:%d pid:%d\n", {instance_id_, getpid()});
a8::XPrintf("wsproxy terminating instance_id:%d pid:%d\n",
{f8::App::Instance()->GetInstanceId(),
f8::App::Instance()->GetPid()});
GCListener::Instance()->UnInit();
LongSessionMgr::Instance()->UnInit();
MasterMgr::Instance()->UnInit();
UpStreamMgr::Instance()->UnInit();
DownStreamMgr::Instance()->UnInit();
JsonDataMgr::Instance()->UnInit();
f8::Timer::Instance()->UnInit();
f8::MsgQueue::Instance()->UnInit();
HandlerMgr::Instance()->UnInit();
UnInitLog();
FreeSocketMsgQueue();
FreeUdpMsgQueue();
@ -201,28 +153,21 @@ void App::UnInit()
msg_mutex_ = nullptr;
delete udp_msg_mutex_;
udp_msg_mutex_ = nullptr;
delete loop_cond_;
loop_cond_ = nullptr;
delete loop_mutex_;
loop_mutex_ = nullptr;
a8::XPrintf("wsproxy terminated instance_id:%d pid:%d\n", {instance_id_, getpid()});
a8::XPrintf("wsproxy terminated instance_id:%d pid:%d\n",
{f8::App::Instance()->GetInstanceId(),
f8::App::Instance()->GetPid()});
}
int App::Run()
void App::Update()
{
int ret = 0;
f8::UdpLog::Instance()->Info("wsproxy running", {});
while (!terminated_) {
a8::tick_t begin_tick = a8::XGetTickCount();
QuickExecute();
SlowerExecute();
a8::tick_t end_tick = a8::XGetTickCount();
if (end_tick - begin_tick > GetPerf().max_run_delay_time) {
GetPerf().max_run_delay_time = end_tick - begin_tick;
}
Schedule();
QuickExecute();
SlowerExecute();
#if 0
a8::tick_t end_tick = a8::XGetTickCount();
if (end_tick - begin_tick > GetPerf().max_run_delay_time) {
GetPerf().max_run_delay_time = end_tick - begin_tick;
}
return ret;
#endif
}
void App::AddSocketMsg(SocketFrom_e sockfrom,
@ -258,7 +203,7 @@ void App::AddSocketMsg(SocketFrom_e sockfrom,
}
++msgnode_size_;
msg_mutex_->unlock();
NotifyLoopCond();
f8::App::Instance()->NotifyLoopCond();
}
void App::QuickExecute()
@ -274,29 +219,6 @@ void App::SlowerExecute()
{
}
void App::NotifyLoopCond()
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
loop_cond_->notify_all();
}
void App::Schedule()
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
if (!HasTask()) {
#if 1
int sleep_time = 1;
loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time));
#else
int sleep_time = f8::Timer::Instance()->GetIdleableMillSeconds();
loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time));
if (sleep_time > perf.max_timer_idle) {
perf.max_timer_idle = sleep_time;
}
#endif
}
}
bool App::HasTask()
{
{
@ -478,64 +400,6 @@ void App::ProcessTargetServerMsg(f8::MsgHdr& hdr, int tag)
}
}
void App::InitLog()
{
std::string filename_fmt = PROJ_LOG_FILENAME_FMT;
a8::ReplaceString(filename_fmt, "$pid", a8::XValue(getpid()));
std::string proj_root_dir = a8::Format(PROJ_ROOT_FMT, {a8::Format(PROJ_NAME_FMT,{GAME_ID})});
std::string proj_log_root_dir = a8::Format(PROJ_LOG_ROOT_FMT, {a8::Format(PROJ_NAME_FMT, {GAME_ID})});
std::string log_file_name = a8::Format(PROJ_LOG_ROOT_FMT,
{a8::Format(PROJ_NAME_FMT, {GAME_ID})}) + "/" + filename_fmt;
a8::MkDir(proj_root_dir);
a8::MkDir(proj_log_root_dir);
f8::UdpLog::Instance()->SetLogFileName(log_file_name);
f8::UdpLog::Instance()->Init();
f8::UdpLog::Instance()->Info("proj_root_dir:%s", {proj_root_dir});
f8::UdpLog::Instance()->Info("proj_log_root_dir:%s", {proj_log_root_dir});
f8::UdpLog::Instance()->Info("log_file_name:%s", {log_file_name});
}
void App::UnInitLog()
{
f8::UdpLog::Instance()->UnInit();
}
bool App::ParseOpt()
{
int ch = 0;
while ((ch = getopt(argc_, argv_, "n:i:f:")) != -1) {
switch (ch) {
case 'n':
{
node_id_ = a8::XValue(optarg);
}
break;
case 'i':
{
instance_id_ = a8::XValue(optarg);
}
break;
case 'f':
{
std::vector<std::string> strings;
a8::Split(optarg, strings, ',');
for (auto& str : strings) {
flags_.insert(a8::XValue(str).GetInt());
}
}
break;
}
}
return instance_id_ > 0 && node_id_ > 0;
}
bool App::HasFlag(int flag)
{
return flags_.find(flag) != flags_.end();
}
void App::FreeSocketMsgQueue()
{
msg_mutex_->lock();
@ -602,7 +466,7 @@ void App::AddUdpMsg(a8::UdpPacket* pkt)
}
++udp_msgnode_size_;
udp_msg_mutex_->unlock();
NotifyLoopCond();
f8::App::Instance()->NotifyLoopCond();
}
void App::DispatchUdpMsg()
@ -638,8 +502,3 @@ void App::DispatchUdpMsg()
udp_working_msgnode_size_ = 0;
}
}
long long App::NewUuid()
{
return uuid_->Generate();
}

View File

@ -2,18 +2,16 @@
#include <a8/singleton.h>
#include <f8/app.h>
namespace a8
{
struct UdpPacket;
namespace uuid
{
class SnowFlake;
}
}
struct MsgNode;
struct UdpMsgNode;
class App : public a8::Singleton<App>
class App : public a8::Singleton<App>, public f8::UserApp
{
private:
App() {};
@ -21,10 +19,11 @@ private:
public:
bool Init(int argc, char* argv[]);
void UnInit();
int Run();
virtual const std::string GetPkgName() override;
virtual void Init() override;
virtual void UnInit() override;
virtual void Update() override;
virtual bool HasTask() override;
void AddSocketMsg(SocketFrom_e sockfrom,
int sockhandle,
@ -37,11 +36,6 @@ public:
void AddUdpMsg(a8::UdpPacket* pkt);
void NotifyLoopCond();
bool HasFlag(int flag);
long long NewUuid();
int GetNodeId() { return node_id_; }
int GetInstanceId() { return instance_id_; }
PerfMonitor& GetPerf() { return perf_; }
int GetMsgNodeSize() { return msgnode_size_;}
int GetUdpMsgNodeSize() { return udp_msgnode_size_;}
@ -49,8 +43,6 @@ public:
private:
void QuickExecute();
void SlowerExecute();
void Schedule();
bool HasTask();
void DispatchMsg();
void DispatchUdpMsg();
@ -59,27 +51,11 @@ private:
void ProcessMasterServerMsg(f8::MsgHdr& hdr, int tag);
void ProcessTargetServerMsg(f8::MsgHdr& hdr, int tag);
void InitLog();
void UnInitLog();
bool ParseOpt();
void FreeSocketMsgQueue();
void FreeUdpMsgQueue();
private:
int argc_ = 0;
char** argv_ = nullptr;
PerfMonitor perf_;
volatile bool terminated_ = false;
volatile bool shutdowned_ = false;
int node_id_ = 0;
int instance_id_ = 0;
std::set<int> flags_;
std::shared_ptr<a8::uuid::SnowFlake> uuid_;
std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr;
std::mutex* msg_mutex_ = nullptr;
MsgNode* top_node_ = nullptr;

View File

@ -3,6 +3,7 @@
#include <a8/mutable_xobject.h>
#include <f8/msgqueue.h>
#include <f8/jsonhttprequest.h>
#include <f8/app.h>
#include "handlermgr.h"
@ -25,7 +26,7 @@ static void _GMOpsGetNodeId(std::shared_ptr<f8::JsonHttpRequest> request)
{
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("node_id", App::Instance()->GetNodeId());
request->resp_xobj->SetVal("node_id", f8::App::Instance()->GetNodeId());
}
static void _GMOpsSetKcpSwitch(std::shared_ptr<f8::JsonHttpRequest> request)

View File

@ -23,23 +23,26 @@ void JsonDataMgr::Init()
std::string master_cluster_json_file;
std::string kcp_conf_json_file;
wsproxy_cluster_json_file = a8::Format
("%s/node%d/game%d.wsproxy.cluster.json",
("%s/zone%d/node%d/game%d.wsproxy.cluster.json",
{
work_path_,
App::Instance()->GetNodeId(),
f8::App::Instance()->GetZoneId(),
f8::App::Instance()->GetNodeId(),
GAME_ID
});
master_cluster_json_file = a8::Format
("%s/node%d/game%d.masterserver.cluster.json",
("%s/zone%d/node%d/game%d.masterserver.cluster.json",
{
work_path_,
App::Instance()->GetNodeId(),
f8::App::Instance()->GetZoneId(),
f8::App::Instance()->GetNodeId(),
GAME_ID
});
kcp_conf_json_file = a8::Format
("%s/kcp_conf.json",
("%s/zone%d/kcp_conf.json",
{
work_path_,
f8::App::Instance()->GetZoneId(),
});
wsproxy_cluster_json_.ReadFromFile(wsproxy_cluster_json_file);
@ -91,7 +94,7 @@ std::shared_ptr<a8::XObject> JsonDataMgr::GetConf()
{
for (int i = 0; i < wsproxy_cluster_json_.Size(); ++i) {
std::shared_ptr<a8::XObject> conf = wsproxy_cluster_json_.At(i);
if (conf->At("instance_id")->AsXValue().GetInt() == App::Instance()->GetInstanceId()) {
if (conf->At("instance_id")->AsXValue().GetInt() == f8::App::Instance()->GetInstanceId()) {
return conf;
}
}

View File

@ -37,7 +37,7 @@ KcpSession::~KcpSession()
void KcpSession::Init(int socket_handle, int secret_key_place)
{
socket_handle_ = socket_handle;
secret_key_ = App::Instance()->NewUuid();
secret_key_ = f8::App::Instance()->NewNodeUuid();
kcp_ = ikcp_create(socket_handle_, (void*)this);
const KcpConf& kcp_conf = JsonDataMgr::Instance()->GetKcpConf();
ikcp_wndsize(kcp_, kcp_conf.sndwnd, kcp_conf.rcvwnd);

View File

@ -1,12 +1,10 @@
#include "precompile.h"
#include <f8/app.h>
#include "app.h"
int main(int argc, char* argv[])
{
int exitcode = 0;
if (App::Instance()->Init(argc, argv)) {
exitcode = App::Instance()->Run();
App::Instance()->UnInit();
}
return exitcode;
return f8::App::Instance()->Run(argc, argv, App::Instance());
}

View File

@ -110,7 +110,7 @@ void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr,
std::string data = a8::Format("!%s_%s_%d_%d",
{
account_id,
App::Instance()->NewUuid(),
f8::App::Instance()->NewNodeUuid(),
getpid(),
rand()
});

2
third_party/f8 vendored

@ -1 +1 @@
Subproject commit 243bbe515ef4a01089f9a6cf608c93d4097018de
Subproject commit 389b2df366a33d73510c373fcf0875427a31d482