#include "precompile.h" #include #include #include #include #include #include #include #include #include #include #include #include "app.h" #include "GCListener.h" #include "jsondatamgr.h" #include "handlermgr.h" #include "downstream.h" #include "downstreammgr.h" #include "upstream.h" #include "upstreammgr.h" #include "master.h" #include "mastermgr.h" #include "longsessionmgr.h" #include "ss_msgid.pb.h" #include "ss_proto.pb.h" struct MsgNode { SocketFrom_e sockfrom; int sockhandle; unsigned short msgid; unsigned int seqid; long ip_saddr; char* buf; int buflen; int tag; MsgNode* next; }; struct UdpMsgNode { a8::UdpPacket* pkt; UdpMsgNode* next; }; const char* const PROJ_LOG_ROOT_FMT = "/data/logs/%s/logs"; const char* const PROJ_LOG_FILENAME_FMT = "log_$pid_%Y%m%d.log"; static void SavePerfLog() { f8::UdpLog::Instance()->Info("max_run_delay_time:%d max_timer_idle:%d " "in_data_size:%d out_data_size:%d msgnode_size:%d udp_msgnode_size:%d " "read_count:%d max_login_time:%d " "max_join_time:%d tcp_count:%d udp_count:%d down_stream_count:%d", { App::Instance()->GetPerf().max_run_delay_time, App::Instance()->GetPerf().max_timer_idle, App::Instance()->GetPerf().in_data_size, App::Instance()->GetPerf().out_data_size, App::Instance()->GetMsgNodeSize(), App::Instance()->GetUdpMsgNodeSize(), App::Instance()->GetPerf().read_count, App::Instance()->GetPerf().max_login_time, App::Instance()->GetPerf().max_join_time, GCListener::Instance()->GetSocketCount(), LongSessionMgr::Instance()->GetLongSessionCount(), DownStreamMgr::Instance()->GetDownStreamCount() }); if (App::Instance()->HasFlag(2)) { a8::XPrintf("mainloop_time:%d netmsg_time:%d send_node_num:%d sent_bytes_num:%d\n", { App::Instance()->GetPerf().max_run_delay_time, App::Instance()->GetPerf().max_dispatchmsg_time, GCListener::Instance()->GetSendNodeNum(), GCListener::Instance()->GetSentBytesNum() }); } App::Instance()->GetPerf().max_run_delay_time = 0; App::Instance()->GetPerf().max_timer_idle = 0; App::Instance()->GetPerf().max_login_time = 0; App::Instance()->GetPerf().max_join_time = 0; } bool App::Init(int argc, char* argv[]) { signal(SIGPIPE, SIG_IGN); this->argc_ = argc; this->argv_ = argv; if (!ParseOpt()) { terminated_ = true; if (node_id_ <= 0) { a8::XPrintf("gameserver启动失败,缺少-n参数\n", {}); } else if (node_id_ > MAX_NODE_ID) { a8::XPrintf("gameserver启动失败,-n参数不能大于%d\n", {MAX_NODE_ID}); } else if (instance_id_ <= 0) { a8::XPrintf("gameserver启动失败,缺少-i参数\n", {}); } else if (instance_id_ > MAX_INSTANCE_ID) { a8::XPrintf("gameserver启动失败,-i参数不能大于%d\n", {MAX_INSTANCE_ID}); } return false; } a8::XPrintf("wsproxy starting node_id:%d instance_id:%d pid:%d\n", { node_id_, instance_id_, getpid() }); uuid_ = std::make_shared(); loop_mutex_ = new std::mutex(); loop_cond_ = new std::condition_variable(); msg_mutex_ = new std::mutex(); udp_msg_mutex_ = new std::mutex(); srand(time(nullptr)); InitLog(); f8::MsgQueue::Instance()->Init(); HandlerMgr::Instance()->Init(); f8::Timer::Instance()->Init(); JsonDataMgr::Instance()->Init(); uuid_->SetMachineId((node_id_ - 1) * MAX_NODE_ID + instance_id_); DownStreamMgr::Instance()->Init(); MasterMgr::Instance()->Init(); UpStreamMgr::Instance()->Init(); LongSessionMgr::Instance()->Init(); GCListener::Instance()->Init(); f8::UdpLog::Instance()->Info("wsproxy starting instance_id:%d pid:%d", { instance_id_, getpid(), }); { int perf_log_time = 1000 * 60 * 5; if (getenv("is_dev_env")) { perf_log_time = 1000 * 10; } f8::Timer::Instance()->SetInterval (perf_log_time, [] (int event, const a8::Args* args) { if (a8::TIMER_EXEC_EVENT == event) { SavePerfLog(); } }); } if (HasFlag(1)) { f8::Timer::Instance()->SetTimeout ( 1000 * 60, [] (int event, const a8::Args* args) { if (a8::TIMER_EXEC_EVENT == event) { App::Instance()->terminated_ = true; App::Instance()->NotifyLoopCond(); } } ); } if (HasFlag(4)) { f8::Timer::Instance()->SetTimeout ( 1000 * 30, [] (int event, const a8::Args* args) { if (a8::TIMER_EXEC_EVENT == event) { App::Instance()->shutdowned_ = true; a8::XPrintf("shutdowned\n", {}); } } ); } return true; } void App::UnInit() { a8::XPrintf("wsproxy terminating instance_id:%d pid:%d\n", {instance_id_, getpid()}); GCListener::Instance()->UnInit(); LongSessionMgr::Instance()->UnInit(); MasterMgr::Instance()->UnInit(); UpStreamMgr::Instance()->UnInit(); DownStreamMgr::Instance()->UnInit(); JsonDataMgr::Instance()->UnInit(); f8::Timer::Instance()->UnInit(); f8::MsgQueue::Instance()->UnInit(); HandlerMgr::Instance()->UnInit(); UnInitLog(); FreeSocketMsgQueue(); FreeUdpMsgQueue(); delete msg_mutex_; msg_mutex_ = nullptr; delete udp_msg_mutex_; udp_msg_mutex_ = nullptr; delete loop_cond_; loop_cond_ = nullptr; delete loop_mutex_; loop_mutex_ = nullptr; a8::XPrintf("wsproxy terminated instance_id:%d pid:%d\n", {instance_id_, getpid()}); } int App::Run() { int ret = 0; f8::UdpLog::Instance()->Info("wsproxy running", {}); while (!terminated_) { a8::tick_t begin_tick = a8::XGetTickCount(); QuickExecute(); SlowerExecute(); a8::tick_t end_tick = a8::XGetTickCount(); if (end_tick - begin_tick > GetPerf().max_run_delay_time) { GetPerf().max_run_delay_time = end_tick - begin_tick; } Schedule(); } return ret; } void App::AddSocketMsg(SocketFrom_e sockfrom, int sockhandle, long ip_saddr, unsigned short msgid, unsigned int seqid, const char *msgbody, int bodylen, int tag) { MsgNode *p = (MsgNode*)malloc(sizeof(MsgNode)); memset(p, 0, sizeof(MsgNode)); p->sockfrom = sockfrom; p->ip_saddr = ip_saddr; p->sockhandle = sockhandle; p->msgid = msgid; p->seqid = seqid; p->buf = nullptr; p->buflen = bodylen; p->tag = tag; if (bodylen > 0) { p->buf = (char*)malloc(bodylen); memmove(p->buf, msgbody, bodylen); } msg_mutex_->lock(); if (bot_node_) { bot_node_->next = p; bot_node_ = p; } else { top_node_ = p; bot_node_ = p; } ++msgnode_size_; msg_mutex_->unlock(); NotifyLoopCond(); } void App::QuickExecute() { f8::Timer::Instance()->Update(); f8::MsgQueue::Instance()->Update(); DispatchMsg(); DispatchUdpMsg(); LongSessionMgr::Instance()->Update(); } void App::SlowerExecute() { } void App::NotifyLoopCond() { std::unique_lock lk(*loop_mutex_); loop_cond_->notify_all(); } void App::Schedule() { std::unique_lock lk(*loop_mutex_); if (!HasTask()) { #if 1 int sleep_time = 1; loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time)); #else int sleep_time = f8::Timer::Instance()->GetIdleableMillSeconds(); loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time)); if (sleep_time > perf.max_timer_idle) { perf.max_timer_idle = sleep_time; } #endif } } bool App::HasTask() { { if (!work_node_) { msg_mutex_->lock(); if (!work_node_ && top_node_) { work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; } msg_mutex_->unlock(); } if (work_node_) { return true; } } { if (!udp_work_node_) { udp_msg_mutex_->lock(); if (!udp_work_node_ && udp_top_node_) { udp_work_node_ = udp_top_node_; udp_top_node_ = nullptr; udp_bot_node_ = nullptr; } udp_msg_mutex_->unlock(); } if (udp_work_node_) { return true; } } return false; } void App::DispatchMsg() { long long starttick = a8::XGetTickCount(); if (!work_node_ && top_node_) { msg_mutex_->lock(); work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; working_msgnode_size_ = msgnode_size_; msg_mutex_->unlock(); } f8::MsgHdr hdr; while (work_node_) { MsgNode *pdelnode = work_node_; work_node_ = pdelnode->next; hdr.msgid = pdelnode->msgid; hdr.seqid = pdelnode->seqid; hdr.socket_handle = pdelnode->sockhandle; hdr.buf = pdelnode->buf; hdr.buflen = pdelnode->buflen; hdr.offset = 0; hdr.ip_saddr = pdelnode->ip_saddr; switch (pdelnode->sockfrom) { case SF_Client: { ProcessClientMsg(hdr, pdelnode->tag); } break; case SF_TargetServer: { ProcessTargetServerMsg(hdr, pdelnode->tag); } break; case SF_MasterServer: { ProcessMasterServerMsg(hdr, pdelnode->tag); } break; } if (pdelnode->buf) { free(pdelnode->buf); } free(pdelnode); working_msgnode_size_--; if (a8::XGetTickCount() - starttick > 200) { break; } }//end while if (!work_node_) { working_msgnode_size_ = 0; } } void App::ProcessClientMsg(f8::MsgHdr& hdr, int tag) { if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReconnect || hdr.msgid == ss::_SS_CMKcpHandshake) { auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle); if (down_wp.expired()) { switch (hdr.msgid) { case ss::_SS_CMLogin: { ss::SS_CMLogin msg; bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset); if (ok) { MasterMgr::Instance()->RequestTargetServer (hdr, msg.team_uuid(), msg.account_id(), msg.session_id(), "", 0, msg.proto_version()); } } break; case ss::_SS_CMReconnect: { ss::SS_CMReconnect msg; bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset); if (ok) { MasterMgr::Instance()->RequestTargetServer (hdr, msg.team_uuid(), msg.account_id(), msg.session_id(), msg.server_info(), 1, 0); } } break; case ss::_SS_CMKcpHandshake: { ss::SS_CMKcpHandshake msg; bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset); if (ok) { LongSessionMgr::Instance()->_SS_CMKcpHandshake(hdr, msg); } } break; default: { abort(); } break; } } } else { auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle); if (auto down = down_wp.lock(); !down_wp.expired()) { down->ProcCMMsg(hdr, tag); } } } void App::ProcessMasterServerMsg(f8::MsgHdr& hdr, int tag) { f8::NetMsgHandler* handler = f8::GetNetMsgHandler(&HandlerMgr::Instance()->msmsghandler, hdr.msgid); if (handler) { switch (handler->handlerid) { case HID_MasterMgr: ProcessNetMsg(handler, MasterMgr::Instance(), hdr); break; } } } void App::ProcessTargetServerMsg(f8::MsgHdr& hdr, int tag) { if (hdr.msgid == ss::_SS_ForceCloseSocket) { GCListener::Instance()->ForceCloseClient(hdr.socket_handle); return; } if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReconnect) { DownStreamMgr::Instance()->BindUpStream(hdr.socket_handle, hdr.ip_saddr); GCListener::Instance()->MarkClient(hdr.socket_handle, true); } auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle); if (!down_wp.expired()) { down_wp.lock()->ForwardUpStreamMsg(hdr); } } void App::InitLog() { std::string filename_fmt = PROJ_LOG_FILENAME_FMT; a8::ReplaceString(filename_fmt, "$pid", a8::XValue(getpid())); std::string proj_root_dir = a8::Format(PROJ_ROOT_FMT, {a8::Format(PROJ_NAME_FMT,{GAME_ID})}); std::string proj_log_root_dir = a8::Format(PROJ_LOG_ROOT_FMT, {a8::Format(PROJ_NAME_FMT, {GAME_ID})}); std::string log_file_name = a8::Format(PROJ_LOG_ROOT_FMT, {a8::Format(PROJ_NAME_FMT, {GAME_ID})}) + "/" + filename_fmt; a8::MkDir(proj_root_dir); a8::MkDir(proj_log_root_dir); f8::UdpLog::Instance()->SetLogFileName(log_file_name); f8::UdpLog::Instance()->Init(); f8::UdpLog::Instance()->Info("proj_root_dir:%s", {proj_root_dir}); f8::UdpLog::Instance()->Info("proj_log_root_dir:%s", {proj_log_root_dir}); f8::UdpLog::Instance()->Info("log_file_name:%s", {log_file_name}); } void App::UnInitLog() { f8::UdpLog::Instance()->UnInit(); } bool App::ParseOpt() { int ch = 0; while ((ch = getopt(argc_, argv_, "n:i:f:")) != -1) { switch (ch) { case 'n': { node_id_ = a8::XValue(optarg); } break; case 'i': { instance_id_ = a8::XValue(optarg); } break; case 'f': { std::vector strings; a8::Split(optarg, strings, ','); for (auto& str : strings) { flags_.insert(a8::XValue(str).GetInt()); } } break; } } return instance_id_ > 0 && node_id_ > 0; } bool App::HasFlag(int flag) { return flags_.find(flag) != flags_.end(); } void App::FreeSocketMsgQueue() { msg_mutex_->lock(); if (!work_node_) { work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; } while (work_node_) { MsgNode* pdelnode = work_node_; work_node_ = work_node_->next; if (pdelnode->buf) { free(pdelnode->buf); } free(pdelnode); if (!work_node_) { work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; } } msg_mutex_->unlock(); } void App::FreeUdpMsgQueue() { udp_msg_mutex_->lock(); if (!udp_work_node_) { udp_work_node_ = udp_top_node_; udp_top_node_ = nullptr; udp_bot_node_ = nullptr; } while (udp_work_node_) { UdpMsgNode* pdelnode = udp_work_node_; delete pdelnode->pkt; udp_work_node_ = udp_work_node_->next; if (!udp_work_node_) { udp_work_node_ = udp_top_node_; udp_top_node_ = nullptr; udp_bot_node_ = nullptr; } } udp_msg_mutex_->unlock(); } void App::AddUdpMsg(a8::UdpPacket* pkt) { UdpMsgNode *p = (UdpMsgNode*) malloc(sizeof(UdpMsgNode)); memset(p, 0, sizeof(UdpMsgNode)); p->pkt = pkt; udp_msg_mutex_->lock(); if (udp_bot_node_) { udp_bot_node_->next = p; udp_bot_node_ = p; } else { udp_top_node_ = p; udp_bot_node_ = p; } ++udp_msgnode_size_; udp_msg_mutex_->unlock(); NotifyLoopCond(); } void App::DispatchUdpMsg() { long long starttick = a8::XGetTickCount(); if (!udp_work_node_ && udp_top_node_) { udp_msg_mutex_->lock(); udp_work_node_ = udp_top_node_; udp_top_node_ = nullptr; udp_bot_node_ = nullptr; udp_working_msgnode_size_ = udp_msgnode_size_; udp_msg_mutex_->unlock(); } while (udp_work_node_) { UdpMsgNode *pdelnode = udp_work_node_; LongSessionMgr::Instance()->ProcUdpPacket(pdelnode->pkt); udp_work_node_ = pdelnode->next; udp_working_msgnode_size_--; if (a8::XGetTickCount() - starttick > 200) { break; } }//end while if (!udp_work_node_) { udp_working_msgnode_size_ = 0; } } long long App::NewUuid() { return uuid_->Generate(); }