From a25f7de5c5e4565e1d8fce2b88a8583ef4cae1ea Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Fri, 3 Aug 2018 19:05:53 +0800 Subject: [PATCH] add server directory --- server/tools/protobuild/ss_msgid.proto | 9 + server/tools/protobuild/ss_proto.proto | 10 + .../tools/scripts/ci/masterserver/boundle.sh | 14 + .../tools/scripts/ci/masterserver/manage.py | 74 +++ .../tools/scripts/ci/masterserver/reload.sh | 0 .../tools/scripts/ci/masterserver/restart.sh | 5 + .../scripts/ci/masterserver/start_instance.sh | 4 + server/tools/scripts/ci/roomserver/boundle.sh | 14 + server/tools/scripts/ci/roomserver/manage.py | 74 +++ server/tools/scripts/ci/roomserver/reload.sh | 0 server/tools/scripts/ci/roomserver/restart.sh | 5 + .../scripts/ci/roomserver/start_instance.sh | 4 + server/tools/scripts/construct/build_pb.py | 71 +++ server/tools/scripts/githooks/applypatch-msg | 1 + server/tools/scripts/githooks/commit-msg | 0 server/tools/scripts/githooks/install.sh | 40 ++ server/tools/scripts/githooks/post-update | 0 server/tools/scripts/githooks/pre-applypatch | 0 server/tools/scripts/githooks/pre-commit | 22 + server/tools/scripts/githooks/pre-push | 0 server/tools/scripts/githooks/pre-rebase | 0 .../tools/scripts/githooks/prepare-commit-msg | 0 server/tools/scripts/githooks/update | 0 server/wsproxy/CMakeLists.txt | 67 +++ server/wsproxy/GCListener.cc | 160 ++++++ server/wsproxy/GCListener.h | 45 ++ server/wsproxy/app.cc | 454 ++++++++++++++++++ server/wsproxy/app.h | 81 ++++ server/wsproxy/constant.h | 37 ++ server/wsproxy/handlermgr.cc | 92 ++++ server/wsproxy/handlermgr.h | 36 ++ server/wsproxy/jsondatamgr.cc | 36 ++ server/wsproxy/jsondatamgr.h | 20 + server/wsproxy/main.cc | 11 + server/wsproxy/precompile.h | 18 + server/wsproxy/target_conn.cc | 146 ++++++ server/wsproxy/target_conn.h | 46 ++ server/wsproxy/types.cc | 0 server/wsproxy/types.h | 10 + 39 files changed, 1606 insertions(+) create mode 100644 server/tools/protobuild/ss_msgid.proto create mode 100644 server/tools/protobuild/ss_proto.proto create mode 100644 server/tools/scripts/ci/masterserver/boundle.sh create mode 100755 server/tools/scripts/ci/masterserver/manage.py create mode 100644 server/tools/scripts/ci/masterserver/reload.sh create mode 100755 server/tools/scripts/ci/masterserver/restart.sh create mode 100755 server/tools/scripts/ci/masterserver/start_instance.sh create mode 100644 server/tools/scripts/ci/roomserver/boundle.sh create mode 100755 server/tools/scripts/ci/roomserver/manage.py create mode 100644 server/tools/scripts/ci/roomserver/reload.sh create mode 100755 server/tools/scripts/ci/roomserver/restart.sh create mode 100755 server/tools/scripts/ci/roomserver/start_instance.sh create mode 100644 server/tools/scripts/construct/build_pb.py create mode 100755 server/tools/scripts/githooks/applypatch-msg create mode 100755 server/tools/scripts/githooks/commit-msg create mode 100755 server/tools/scripts/githooks/install.sh create mode 100755 server/tools/scripts/githooks/post-update create mode 100755 server/tools/scripts/githooks/pre-applypatch create mode 100755 server/tools/scripts/githooks/pre-commit create mode 100755 server/tools/scripts/githooks/pre-push create mode 100755 server/tools/scripts/githooks/pre-rebase create mode 100755 server/tools/scripts/githooks/prepare-commit-msg create mode 100755 server/tools/scripts/githooks/update create mode 100644 server/wsproxy/CMakeLists.txt create mode 100644 server/wsproxy/GCListener.cc create mode 100644 server/wsproxy/GCListener.h create mode 100644 server/wsproxy/app.cc create mode 100644 server/wsproxy/app.h create mode 100644 server/wsproxy/constant.h create mode 100644 server/wsproxy/handlermgr.cc create mode 100644 server/wsproxy/handlermgr.h create mode 100644 server/wsproxy/jsondatamgr.cc create mode 100644 server/wsproxy/jsondatamgr.h create mode 100644 server/wsproxy/main.cc create mode 100644 server/wsproxy/precompile.h create mode 100644 server/wsproxy/target_conn.cc create mode 100644 server/wsproxy/target_conn.h create mode 100644 server/wsproxy/types.cc create mode 100644 server/wsproxy/types.h diff --git a/server/tools/protobuild/ss_msgid.proto b/server/tools/protobuild/ss_msgid.proto new file mode 100644 index 0000000..f364360 --- /dev/null +++ b/server/tools/protobuild/ss_msgid.proto @@ -0,0 +1,9 @@ +package ss; + +//消息id定义 +enum SSMessageId_e +{ + + _SS_Ping = 100; + _SS_Pong = 101; +} diff --git a/server/tools/protobuild/ss_proto.proto b/server/tools/protobuild/ss_proto.proto new file mode 100644 index 0000000..6631495 --- /dev/null +++ b/server/tools/protobuild/ss_proto.proto @@ -0,0 +1,10 @@ +package ss; + +message SS_Ping +{ +} + +message SS_Pong +{ + optional int32 param1 = 1; +} diff --git a/server/tools/scripts/ci/masterserver/boundle.sh b/server/tools/scripts/ci/masterserver/boundle.sh new file mode 100644 index 0000000..da97e59 --- /dev/null +++ b/server/tools/scripts/ci/masterserver/boundle.sh @@ -0,0 +1,14 @@ +cd third_party/matchvs/server/masterserver +python ../tools/scripts/construct/build_pb.py +cmake . +make clean +make +cp ../bin/masterserver ../../../../bin/ + +cd ../../../../ + +tag_name=`git status |grep '# On branch '|sed 's/# On branch //g'` +dir_name=`basename $PWD` +package_name=${dir_name}.tar.gz + +tar --exclude=*.git -chzf target/${package_name} bin reload.sh restart.sh start_instance.sh manage.py config diff --git a/server/tools/scripts/ci/masterserver/manage.py b/server/tools/scripts/ci/masterserver/manage.py new file mode 100755 index 0000000..c1ed810 --- /dev/null +++ b/server/tools/scripts/ci/masterserver/manage.py @@ -0,0 +1,74 @@ +#!/usr/bin/python +#coding utf8 + +import os +import sys +import time + +def getRuningProgramPids(progname): + pids = [] + lines = os.popen('ps -ef | grep %s' % progname).readlines() + for l in lines: + line = '' + oldc = '' + for c in l.strip(): + if c in [' ', '\t'] and c == oldc: + continue + oldc = c + line += c + line = line.split(' ') + + if line[7] == './%s' % progname: + pids.append(line[1]) + return pids + +def getExePath(pid): + return os.popen('ls -l /proc/%d | grep "exe ->" | cut -d " " -f 7-' % int(pid)).read() + +def getExeCmdLine(pid): + return os.popen('cat /proc/%d/cmdline' % int(pid)).read() + +def stop(instance_id): + masterserver_ids = getRuningProgramPids('masterserver') + pids = masterserver_ids + for pid in pids: + exepath = getExePath(pid) + cmdline = getExeCmdLine(pid) + if cmdline == ("./masterserver\0-i\0%d\0" % instance_id): + os.popen('kill -9 %d' % int(pid)) + +def listserver(): + masterserver_ids = getRuningProgramPids('masterserver') + pids = masterserver_ids + for pid in pids: + exepath = getExePath(pid) + cmdline = getExeCmdLine(pid) + print pid, exepath, cmdline + +def restartServer(str_instance_ids): + instance_ids = str_instance_ids.split(',') + for instance_id in instance_ids: + instance_id = int(instance_id) + stop(instance_id) + time.sleep(0.5) + print 'masterserver %d starting......' % instance_id + cmd = 'sh start_instance.sh %d' % (instance_id) + os.popen(cmd) + time.sleep(0.5) + +def printHelp(): + print 'usuage: [restart]' + +def main(argv): + if len(argv) == 1: + printHelp() + else: + if argv[1] == 'restart': + restartServer(argv[2]) + elif argv[1] == 'stop': + stop() + elif argv[1] == 'list': + listserver() + +if __name__ == '__main__': + main(sys.argv) diff --git a/server/tools/scripts/ci/masterserver/reload.sh b/server/tools/scripts/ci/masterserver/reload.sh new file mode 100644 index 0000000..e69de29 diff --git a/server/tools/scripts/ci/masterserver/restart.sh b/server/tools/scripts/ci/masterserver/restart.sh new file mode 100755 index 0000000..2d4ea42 --- /dev/null +++ b/server/tools/scripts/ci/masterserver/restart.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +python manage.py restart $1 + +echo 'success' diff --git a/server/tools/scripts/ci/masterserver/start_instance.sh b/server/tools/scripts/ci/masterserver/start_instance.sh new file mode 100755 index 0000000..2bb259a --- /dev/null +++ b/server/tools/scripts/ci/masterserver/start_instance.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +cd bin +`nohup ./masterserver -i $1 >> masterserver$1.out 2>&1 &` diff --git a/server/tools/scripts/ci/roomserver/boundle.sh b/server/tools/scripts/ci/roomserver/boundle.sh new file mode 100644 index 0000000..eebefd1 --- /dev/null +++ b/server/tools/scripts/ci/roomserver/boundle.sh @@ -0,0 +1,14 @@ +cd third_party/matchvs/server/roomserver +python ../tools/scripts/construct/build_pb.py +cmake . +make clean +make +cp ../bin/roomserver ../../../../bin/ + +cd ../../../../ + +tag_name=`git status |grep '# On branch '|sed 's/# On branch //g'` +dir_name=`basename $PWD` +package_name=${dir_name}.tar.gz + +tar --exclude=*.git -chzf target/${package_name} bin reload.sh restart.sh start_instance.sh manage.py config diff --git a/server/tools/scripts/ci/roomserver/manage.py b/server/tools/scripts/ci/roomserver/manage.py new file mode 100755 index 0000000..b830a57 --- /dev/null +++ b/server/tools/scripts/ci/roomserver/manage.py @@ -0,0 +1,74 @@ +#!/usr/bin/python +#coding utf8 + +import os +import sys +import time + +def getRuningProgramPids(progname): + pids = [] + lines = os.popen('ps -ef | grep %s' % progname).readlines() + for l in lines: + line = '' + oldc = '' + for c in l.strip(): + if c in [' ', '\t'] and c == oldc: + continue + oldc = c + line += c + line = line.split(' ') + + if line[7] == './%s' % progname: + pids.append(line[1]) + return pids + +def getExePath(pid): + return os.popen('ls -l /proc/%d | grep "exe ->" | cut -d " " -f 7-' % int(pid)).read() + +def getExeCmdLine(pid): + return os.popen('cat /proc/%d/cmdline' % int(pid)).read() + +def stop(instance_id): + roomserver_ids = getRuningProgramPids('roomserver') + pids = roomserver_ids + for pid in pids: + exepath = getExePath(pid) + cmdline = getExeCmdLine(pid) + if cmdline == ("./roomserver\0-i\0%d\0" % instance_id): + os.popen('kill -9 %d' % int(pid)) + +def listserver(): + roomserver_ids = getRuningProgramPids('roomserver') + pids = roomserver_ids + for pid in pids: + exepath = getExePath(pid) + cmdline = getExeCmdLine(pid) + print pid, exepath, cmdline + +def restartServer(str_instance_ids): + instance_ids = str_instance_ids.split(',') + for instance_id in instance_ids: + instance_id = int(instance_id) + stop(instance_id) + time.sleep(0.5) + print 'roomserver %d starting......' % instance_id + cmd = 'sh start_instance.sh %d' % (instance_id) + os.popen(cmd) + time.sleep(0.5) + +def printHelp(): + print 'usuage: [restart]' + +def main(argv): + if len(argv) == 1: + printHelp() + else: + if argv[1] == 'restart': + restartServer(argv[2]) + elif argv[1] == 'stop': + stop() + elif argv[1] == 'list': + listserver() + +if __name__ == '__main__': + main(sys.argv) diff --git a/server/tools/scripts/ci/roomserver/reload.sh b/server/tools/scripts/ci/roomserver/reload.sh new file mode 100644 index 0000000..e69de29 diff --git a/server/tools/scripts/ci/roomserver/restart.sh b/server/tools/scripts/ci/roomserver/restart.sh new file mode 100755 index 0000000..2d4ea42 --- /dev/null +++ b/server/tools/scripts/ci/roomserver/restart.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +python manage.py restart $1 + +echo 'success' diff --git a/server/tools/scripts/ci/roomserver/start_instance.sh b/server/tools/scripts/ci/roomserver/start_instance.sh new file mode 100755 index 0000000..ee0c3b3 --- /dev/null +++ b/server/tools/scripts/ci/roomserver/start_instance.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +cd bin +nohup ./roomserver -i $1 >> roomserver$1.out 2>&1 & diff --git a/server/tools/scripts/construct/build_pb.py b/server/tools/scripts/construct/build_pb.py new file mode 100644 index 0000000..022d375 --- /dev/null +++ b/server/tools/scripts/construct/build_pb.py @@ -0,0 +1,71 @@ +#coding utf8 +#!/usr/bin/python + +import os +import sys +import time +import threading +import subprocess + +g_is_terminated = False +def is_terminated(): + return g_is_terminated + +def printp_stdout(p): + try: + while not is_terminated(): + line = p.stdout.readline() + if len(line) > 0: + print line, + except Exception, e: + print 'build_pb stdout error:' + e + +def printp_stderr(p): + try: + while is_terminated(): + line = p.stderr.readline() + if len(line) > 0: + print line, + except Exception, e: + print 'build_pb stderr error:' + e + +def need_rebuild(): + for proto_name in ('vs_proto', 'vs_msgid', 'ss_proto', 'ss_msgid'): + if not os.path.isfile(proto_name + '.pb.cc'): + return True + s1 = os.stat(proto_name + '.pb.cc') + s2 = os.stat('../tools/protobuild/' + proto_name + '.proto') + if s1.st_mtime < s2.st_mtime: + return True + return False + +def rebuild(): + global g_is_terminated + try: + p = subprocess.Popen( + 'protoc --proto_path=../tools/protobuild --cpp_out=../wsproxy ../tools/protobuild/ss_proto.proto && ' + + 'protoc --proto_path=../tools/protobuild --cpp_out=../wsproxy ../tools/protobuild/ss_msgid.proto ', + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + shell = True) + t1 = threading.Thread(target = printp_stdout, args=(p, )) + t2 = threading.Thread(target = printp_stderr, args=(p, )) + t1.start() + t2.start() + p.wait() + g_is_terminated = True + t1.join() + t2.join() + sys.exit(p.returncode) + except Exception, e: + print 'build_protocol rebuild error:' + str(e) + +def repair_githooks(): + os.system('/bin/bash ../tools/scripts/githooks/install.sh') + +repair_githooks() +if need_rebuild(): + rebuild() +else: + print 'pb files already is the latest' diff --git a/server/tools/scripts/githooks/applypatch-msg b/server/tools/scripts/githooks/applypatch-msg new file mode 100755 index 0000000..5ceb5fa --- /dev/null +++ b/server/tools/scripts/githooks/applypatch-msg @@ -0,0 +1 @@ +#!/bin/python diff --git a/server/tools/scripts/githooks/commit-msg b/server/tools/scripts/githooks/commit-msg new file mode 100755 index 0000000..e69de29 diff --git a/server/tools/scripts/githooks/install.sh b/server/tools/scripts/githooks/install.sh new file mode 100755 index 0000000..ee13630 --- /dev/null +++ b/server/tools/scripts/githooks/install.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +basepath=$(cd `dirname $0`; pwd) +githooks_dir=${basepath}/../../../../.git/hooks + +if [ ! -L $githooks_dir/applypatch-msg ]; then + ln -s $basepath/applypatch-msg $githooks_dir/applypatch-msg +fi + +if [ ! -L $githooks_dir/commit-msg ]; then + ln -s $basepath/commit-msg $githooks_dir/commit-msg +fi + +if [ ! -L $githooks_dir/post-update ]; then + ln -s $basepath/post-update $githooks_dir/post-update +fi + +if [ ! -L $githooks_dir/pre-applypatch ]; then + ln -s $basepath/pre-applypatch $githooks_dir/pre-applypatch +fi + +if [ ! -L $githooks_dir/pre-commit ]; then + ln -s $basepath/pre-commit $githooks_dir/pre-commit +fi + +if [ ! -L $githooks_dir/prepare-commit-msg ]; then + ln -s $basepath/prepare-commit-msg $githooks_dir/prepare-commit-msg +fi + +if [ ! -L $githooks_dir/pre-push ]; then + ln -s $basepath/pre-push $githooks_dir/pre-push +fi + +if [ ! -L $githooks_dir/pre-rebase ]; then + ln -s $basepath/pre-rebase $githooks_dir/pre-rebase +fi + +if [ ! -L $githooks_dir/update ]; then + ln -s $basepath/update $githooks_dir/update +fi diff --git a/server/tools/scripts/githooks/post-update b/server/tools/scripts/githooks/post-update new file mode 100755 index 0000000..e69de29 diff --git a/server/tools/scripts/githooks/pre-applypatch b/server/tools/scripts/githooks/pre-applypatch new file mode 100755 index 0000000..e69de29 diff --git a/server/tools/scripts/githooks/pre-commit b/server/tools/scripts/githooks/pre-commit new file mode 100755 index 0000000..d6b2a25 --- /dev/null +++ b/server/tools/scripts/githooks/pre-commit @@ -0,0 +1,22 @@ +#!/bin/bash + +# env |grep kol_env +# if [ $? -ne 0 ]; then +# exit 0 +# fi + +# git branch|grep -q "* dev" +# if [ $? -ne 0 ]; then +# echo 'not dev branch, exit' +# exit 0 +# fi + +# echo 'no sync protocol to svn' + +git branch|grep -q "* master" +if [ $? -eq 0 ]; then + cp -f server/tools/protobuild/kingsomevs.proto ~/opensource/server + cp -f server/tools/protobuild/messages.proto ~/opensource/server + svn update ~/opensource/server + svn commit -m '同步协议' ~/opensource/server +fi diff --git a/server/tools/scripts/githooks/pre-push b/server/tools/scripts/githooks/pre-push new file mode 100755 index 0000000..e69de29 diff --git a/server/tools/scripts/githooks/pre-rebase b/server/tools/scripts/githooks/pre-rebase new file mode 100755 index 0000000..e69de29 diff --git a/server/tools/scripts/githooks/prepare-commit-msg b/server/tools/scripts/githooks/prepare-commit-msg new file mode 100755 index 0000000..e69de29 diff --git a/server/tools/scripts/githooks/update b/server/tools/scripts/githooks/update new file mode 100755 index 0000000..e69de29 diff --git a/server/wsproxy/CMakeLists.txt b/server/wsproxy/CMakeLists.txt new file mode 100644 index 0000000..bccac8b --- /dev/null +++ b/server/wsproxy/CMakeLists.txt @@ -0,0 +1,67 @@ +project(masterserver) +cmake_minimum_required(VERSION 2.8) + + +set(CMAKE_BUILD_TYPE "Debug") +set(CMAKE_CXX_FLAGS_RELEASE "-std=gnu++11 -fsanitize=address -fno-omit-frame-pointer") +set(CMAKE_CXX_FLAGS_DEBUG "-Wall -g -std=gnu++11") + +include_directories( + AFTER + ../../third_party/a8engine + /usr/include/mysql + /usr/include/jsoncpp + /usr/include/python3.4m + /usr/include/hiredis +) + +link_directories( + /usr/lib64/mysql + /usr/local/lib +) + +aux_source_directory(../../third_party/a8engine/a8 + SRC_LIST +) + +aux_source_directory(../common + SRC_LIST +) + +aux_source_directory(. + SRC_LIST +) + +set(EXECUTABLE_OUTPUT_PATH + ${PROJECT_BINARY_DIR}/../bin +) + +set_directory_properties(PROPERTIES COMPILE_DEFINITIONS_DEBUG "_DEBUG") + +add_executable( + masterserver ${SRC_LIST} +) + +add_custom_target(script_pb_protocol ALL) +add_custom_command(TARGET script_pb_protocol + PRE_BUILD +# COMMAND python ../../tools/script/construct/build_script.py + COMMAND python ../tools/scripts/construct/build_pb.py +# COMMAND python ../../tools/script/construct/build_protocol.py +# COMMAND python ../../tools/script/construct/build_version_file.py +) +add_dependencies(masterserver script_pb_protocol) + +target_link_libraries( + masterserver + pthread + mysqlclient + protobuf + rt + crypto + ssl + python3.4m + jsoncpp + curl + hiredis +) diff --git a/server/wsproxy/GCListener.cc b/server/wsproxy/GCListener.cc new file mode 100644 index 0000000..6b3296a --- /dev/null +++ b/server/wsproxy/GCListener.cc @@ -0,0 +1,160 @@ +#include "precompile.h" + +#include +#include +#include + +#include "../common/netmsghandler.h" + +#include "app.h" +#include "GCListener.h" +#include "jsondatamgr.h" +#include "ss_proto.pb.h" +#include "handlermgr.h" +#include "player.h" +#include "playermgr.h" + +class GCClientSession: public a8::WebSocketSession +{ +public: + + virtual void DecodeUserPacket(char* buf, int& offset, unsigned int buflen) override + { + //packagelen + msgid + magiccode + msgbody + //2 + 2 + 4+ xx + \0 + xx + bool warning = false; + while (buflen - offset >= sizeof(PackHead)) { + PackHead* p = (PackHead*)&buf[offset]; + if (p->magiccode == MAGIC_CODE) { + if (buflen - offset < sizeof(PackHead) + p->packlen) { + break; + } + App::Instance()->AddSocketMsg(SF_Client, + socket_handle, + saddr, + p->msgid, + &buf[offset + sizeof(PackHead)], + p->packlen); + offset += sizeof(PackHead) + p->packlen; + } else { + warning = true; + offset++; + continue; + } + } + + if (warning) { + a8::UdpLog::Instance()->Warning("收到client非法数据包", {}); + } + } + + virtual void OnRawHttpGet(const std::string& url, const std::string& querystr, + std::string& response) override + { + App::Instance()->AddIMMsg(IM_ExecGM, + a8::XParams() + .SetSender(socket_handle) + .SetParam1(url) + .SetParam2(querystr)); + } + + virtual void OnDisConnect() override + { + App::Instance()->AddIMMsg(IM_ClientSocketDisconnect, + a8::XParams() + .SetSender(socket_handle) + .SetParam1(1)); + } + +}; + +static void CreateGameClientSocket(a8::TcpSession **p) +{ + *p = new GCClientSession(); +} + +static void GSListeneron_error(a8::TcpListener*, int type, int errorid) +{ + a8::UdpLog::Instance()->Debug("GCListeneron_error %d %d", {type, errorid}); +} + +void GCListener::Init() +{ + tcp_listener_ = new a8::TcpListener(); + tcp_listener_->on_create_client_socket = CreateGameClientSocket; + tcp_listener_->on_error = GSListeneron_error; + + tcp_listener_->bind_address = "0.0.0.0"; + tcp_listener_->bind_port = JsonDataMgr::Instance()->GetConf()->At("listen_port")->AsXValue(); + tcp_listener_->Open(); +} + +void GCListener::UnInit() +{ + delete tcp_listener_; + tcp_listener_ = nullptr; +} + +void GCListener::SendText(unsigned short sockhandle, const std::string& text) +{ + tcp_listener_->SendClientMsg(sockhandle, text.data(), text.size()); +} + +void GCListener::SendErrorMsg(unsigned short sockhandle, int msgid, + int error_code, const std::string& error_msg) +{ + ss::SS_Error msg; + msg.mutable_result()->set_error_code(error_code); + msg.mutable_result()->set_error_msg(error_msg); + Net_SendMsg(tcp_listener_, sockhandle, msgid, msg); +} + +void GCListener::ForceCloseClient(unsigned short sockhandle) +{ + tcp_listener_->ForceCloseClient(sockhandle); +} + +void GCListener::MarkClient(unsigned short sockhandle, bool is_active) +{ + tcp_listener_->MarkClient(sockhandle, is_active); +} + +void GCListener::_SS_RS_ForwardClientMsg(MsgHdr& hdr, const ss::SS_RS_ForwardClientMsg& msg) +{ + Player* hum = PlayerMgr::Instance()->GetPlayerBySocket(msg.context().socket_handle()); + if (msg.context().src_msg_id() != cs::SMMessageId_e::_SMReConnect) { + if (!hum || hum->session_handle != msg.context().session_handle()) { + return; + } + tcp_listener_->SendClientMsg(msg.context().socket_handle(), + msg.src_msg_head_and_body().data(), + msg.src_msg_head_and_body().size()); + } + + NetMsgHandler* handler = GetNetMsgHandler(&HandlerMgr::Instance()->gsmsghandler, + msg.context().src_msg_id()); + if (handler && msg.src_msg_head_and_body().size() > sizeof(PackHead)) { + MsgHdr gs_hdr; + gs_hdr.msgid = msg.context().src_msg_id(); + gs_hdr.socket_handle = msg.context().socket_handle(); + gs_hdr.buf = msg.src_msg_head_and_body().data() + sizeof(PackHead); + gs_hdr.buflen = msg.src_msg_head_and_body().size() - sizeof(PackHead); + gs_hdr.offset = 0; + gs_hdr.ip_saddr = msg.context().ip_saddr(); + gs_hdr.user_data = &msg.context(); + switch (handler->handlerid) { + case HID_Player: + { + if (hum) { + ProcessNetMsg(handler, hum, gs_hdr); + } + } + break; + case HID_PlayerMgr: + { + ProcessNetMsg(handler, PlayerMgr::Instance(), gs_hdr); + } + break; + } + } +} diff --git a/server/wsproxy/GCListener.h b/server/wsproxy/GCListener.h new file mode 100644 index 0000000..2e759e2 --- /dev/null +++ b/server/wsproxy/GCListener.h @@ -0,0 +1,45 @@ +#pragma once + +//game client listener +namespace a8 +{ + class TcpListener; +} + +namespace ss +{ + class SS_RS_ForwardClientMsg; +} + +class GCListener : public a8::Singleton +{ + private: + GCListener() {}; + friend class a8::Singleton; + + public: + enum { HID = HID_GCListener }; + + public: + void Init(); + void UnInit(); + + template + void SendToClient(unsigned short sockhandle, T& msg) + { + static int msgid = ::Net_GetMessageId(msg); + Net_SendMsg(tcp_listener_, sockhandle, msgid, msg); + } + void SendText(unsigned short sockhandle, const std::string& text); + + void SendErrorMsg(unsigned short sockhandle, int msgid, + int error_code, const std::string& error_msg); + + void ForceCloseClient(unsigned short sockhandle); + void MarkClient(unsigned short sockhandle, bool is_active); + + void _SS_RS_ForwardClientMsg(MsgHdr& hdr, const ss::SS_RS_ForwardClientMsg& msg); + + private: + a8::TcpListener *tcp_listener_ = nullptr; +}; diff --git a/server/wsproxy/app.cc b/server/wsproxy/app.cc new file mode 100644 index 0000000..8940e5d --- /dev/null +++ b/server/wsproxy/app.cc @@ -0,0 +1,454 @@ +#include "precompile.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include "vs_proto.pb.h" +#include "vs_msgid.pb.h" + +#include "../common/netmsghandler.h" + +#include "app.h" +#include "GCListener.h" +#include "jsondatamgr.h" +#include "vs_msgid.pb.h" +#include "handlermgr.h" +#include "playermgr.h" +#include "player.h" +#include "roomsvrmgr.h" + +struct MsgNode +{ + SocketFrom_e sockfrom; + int sockhandle; + unsigned short msgid; + long ip_saddr; + char* buf; + int buflen; + MsgNode* next; +}; + +struct IMMsgNode +{ + unsigned short msgid; + a8::XParams params; + IMMsgNode* next = nullptr; + +}; + +const char* const PROJ_LOG_ROOT = "/data/logs/%s/logs"; +const char* const PROJ_LOG_FILENAME = "log_$pid_%Y%m%d.log"; + +static void SavePerfLog() +{ + a8::UdpLog::Instance()->Info(" max_run_delay_time:%d max_timer_idle:%d " + "in_data_size:%d out_data_size:%d msgnode_size:%d read_count:%d", + { + App::Instance()->perf.max_run_delay_time, + App::Instance()->perf.max_timer_idle, + App::Instance()->perf.in_data_size, + App::Instance()->perf.out_data_size, + App::Instance()->msgnode_size_, + App::Instance()->perf.read_count, + }); + #if 1 + App::Instance()->perf.max_run_delay_time = 0; + App::Instance()->perf.max_timer_idle = 0; + #else + App::Instance()->perf = PerfMonitor(); + #endif +} + +void App::Init(int argc, char* argv[]) +{ + this->argc = argc; + this->argv = argv; + + if (!ParseOpt()) { + terminated = true; + a8::XPrintf("masterserver启动失败,缺少-i参数\n", {}); + return; + } + a8::XPrintf("masterserver starting instance_id:%d pid:%d\n", {instance_id, getpid()}); + + loop_mutex_ = new std::mutex(); + loop_cond_ = new std::condition_variable(); + msg_mutex_ = new std::mutex(); + im_msg_mutex_ = new std::mutex(); + + srand(time(nullptr)); + InitLog(); + HandlerMgr::Instance()->Init(); + a8::Timer::Instance()->Init(); + PlayerMgr::Instance()->Init(); + JsonDataMgr::Instance()->Init(); + GCListener::Instance()->Init(); + RoomSvrMgr::Instance()->Init(); + uuid.SetMachineId(instance_id); + + a8::UdpLog::Instance()->Info("masterserver starting instance_id:%d pid:%d", {instance_id, getpid()}); + { + int perf_log_time = 1000 * 60 * 5; + if (getenv("machine_type")) { + perf_log_time = 1000 * 10; + } + a8::Timer::Instance()->AddRepeatTimer(perf_log_time, + a8::XParams(), + [] (const a8::XParams& param) + { + SavePerfLog(); + }); + } +} + +void App::UnInit() +{ + if (terminated) { + return; + } + RoomSvrMgr::Instance()->UnInit(); + GCListener::Instance()->UnInit(); + JsonDataMgr::Instance()->UnInit(); + PlayerMgr::Instance()->UnInit(); + a8::Timer::Instance()->UnInit(); + HandlerMgr::Instance()->UnInit(); + UnInitLog(); + + delete im_msg_mutex_; + im_msg_mutex_ = nullptr; + delete msg_mutex_; + msg_mutex_ = nullptr; + delete loop_cond_; + loop_cond_ = nullptr; + delete loop_mutex_; + loop_mutex_ = nullptr; +} + +int App::Run() +{ + if (terminated) { + return 0; + } + int ret = 0; + a8::UdpLog::Instance()->Info("masterserver running", {}); + while (!terminated) { + a8::tick_t begin_tick = a8::XGetTickCount(); + QuickExecute(); + SlowerExecute(); + a8::tick_t end_tick = a8::XGetTickCount(); + if (end_tick - begin_tick > perf.max_run_delay_time) { + perf.max_run_delay_time = end_tick - begin_tick; + } + Schedule(); + } + return ret; +} + +void App::AddSocketMsg(SocketFrom_e sockfrom, + int sockhandle, + long ip_saddr, + unsigned short msgid, + const char *msgbody, + int bodylen) +{ + MsgNode *p = (MsgNode*) malloc(sizeof(MsgNode)); + memset(p, 0, sizeof(MsgNode)); + p->sockfrom = sockfrom; + p->ip_saddr = ip_saddr; + p->sockhandle = sockhandle; + p->msgid = msgid; + p->buf = nullptr; + p->buflen = bodylen; + if (bodylen > 0) { + p->buf = (char*)malloc(bodylen); + memmove(p->buf, msgbody, bodylen); + } + msg_mutex_->lock(); + if (bot_node_) { + bot_node_->next = p; + bot_node_ = p; + } else { + top_node_ = p; + bot_node_ = p; + } + ++msgnode_size_; + msg_mutex_->unlock(); + NotifyLoopCond(); +} + +void App::AddIMMsg(unsigned short imcmd, a8::XParams params) +{ + IMMsgNode *p = new IMMsgNode; + p->msgid = imcmd; + p->params = params; + p->next = nullptr; + im_msg_mutex_->lock(); + if (im_bot_node_) { + im_bot_node_->next = p; + im_bot_node_ = p; + } else { + im_top_node_ = p; + im_bot_node_ = p; + } + im_msg_mutex_->unlock(); + NotifyLoopCond(); +} + +void App::QuickExecute() +{ + ProcessIMMsg(); + DispatchMsg(); + a8::Timer::Instance()->Update(); +} + +void App::SlowerExecute() +{ +} + +void App::NotifyLoopCond() +{ + std::unique_lock lk(*loop_mutex_); + loop_cond_->notify_all(); +} + +void App::Schedule() +{ + std::unique_lock lk(*loop_mutex_); + if (!HasTask()) { + int sleep_time = a8::Timer::Instance()->GetIdleableMillSeconds(); + loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time)); + if (sleep_time > perf.max_timer_idle) { + perf.max_timer_idle = sleep_time; + } + } +} + +bool App::HasTask() +{ + { + if (!im_work_node_) { + im_msg_mutex_->lock(); + if (!im_work_node_ && im_top_node_) { + im_work_node_ = im_top_node_; + im_top_node_ = nullptr; + im_bot_node_ = nullptr; + } + im_msg_mutex_->unlock(); + } + if (im_work_node_) { + return true; + } + } + { + if (!work_node_) { + msg_mutex_->lock(); + if (!work_node_ && top_node_) { + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + } + msg_mutex_->unlock(); + } + if (work_node_) { + return true; + } + } + return false; +} + +void App::DispatchMsg() +{ + long long starttick = a8::XGetTickCount(); + if (!work_node_ && top_node_) { + msg_mutex_->lock(); + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + working_msgnode_size_ = msgnode_size_; + msg_mutex_->unlock(); + } + + MsgHdr hdr; + while (work_node_) { + MsgNode *pdelnode = work_node_; + work_node_ = pdelnode->next; + hdr.msgid = pdelnode->msgid; + hdr.socket_handle = pdelnode->sockhandle; + hdr.buf = pdelnode->buf; + hdr.buflen = pdelnode->buflen; + hdr.offset = 0; + hdr.ip_saddr = pdelnode->ip_saddr; + switch (pdelnode->sockfrom) { + case SF_Client: + { + ProcessClientMsg(hdr); + } + break; + case SF_RoomServer: + { + ProcessRoomServerMsg(hdr); + } + break; + } + if (pdelnode->buf) { + free(pdelnode->buf); + } + free(pdelnode); + working_msgnode_size_--; + if (a8::XGetTickCount() - starttick > 200) { + break; + } + }//end while + + if (!work_node_) { + working_msgnode_size_ = 0; + } +} + +void App::ProcessClientMsg(MsgHdr& hdr) +{ + NetMsgHandler* handler = GetNetMsgHandler(&HandlerMgr::Instance()->gcmsghandler, + hdr.msgid); + if (handler) { + switch (handler->handlerid) { + case HID_PlayerMgr: + { + ProcessNetMsg(handler, PlayerMgr::Instance(), hdr); + } + break; + case HID_Player: + { + Player* hum = PlayerMgr::Instance()->GetPlayerBySocket(hdr.socket_handle); + if (hum) { + ProcessNetMsg(handler, hum, hdr); + } + } + break; + } + } else { + Player* hum = PlayerMgr::Instance()->GetPlayerBySocket(hdr.socket_handle); + if (hum) { + hum->ForwardClientMsg(hdr); + } + } +} + +void App::ProcessRoomServerMsg(MsgHdr& hdr) +{ + NetMsgHandler* handler = GetNetMsgHandler(&HandlerMgr::Instance()->rsmsghandler, + hdr.msgid); + if (handler) { + switch (handler->handlerid) { + case HID_GCListener: + ProcessNetMsg(handler, GCListener::Instance(), hdr); + break; + case HID_RoomSvrMgr: + ProcessNetMsg(handler, RoomSvrMgr::Instance(), hdr); + break; + case HID_PlayerMgr: + { + ProcessNetMsg(handler, PlayerMgr::Instance(), hdr); + } + break; + } + } +} + +void App::ProcessIMMsg() +{ + if (!im_work_node_ && im_top_node_) { + im_msg_mutex_->lock(); + im_work_node_ = im_top_node_; + im_top_node_ = nullptr; + im_bot_node_ = nullptr; + im_msg_mutex_->unlock(); + } + while (im_work_node_) { + IMMsgNode *pdelnode = im_work_node_; + switch (im_work_node_->msgid) { + case IM_ClientSocketDisconnect: + { + PlayerMgr::Instance()->OnClientDisconnect(pdelnode->params); + } + break; + case IM_PlayerOffline: + { + PlayerMgr::Instance()->OnPlayerOffline(pdelnode->params); + } + break; + case IM_ExecGM: + { + HandlerMgr::Instance()->ProcGMMsg(pdelnode->params.sender, + pdelnode->params.param1.GetString(), + pdelnode->params.param2.GetString() + ); + } + break; + case IM_RSConnDisconnect: + { + PlayerMgr::Instance()->OnRSConnDisconnect(pdelnode->params); + } + break; + } + im_work_node_ = im_work_node_->next; + delete pdelnode; + } +} + +void App::InitLog() +{ + std::string filename_fmt = PROJ_LOG_FILENAME; + if (getenv("machine_type")) { + a8::ReplaceString(filename_fmt, "$pid", a8::XValue(0)); + } else { + a8::ReplaceString(filename_fmt, "$pid", a8::XValue(getpid())); + } + + a8::MkDir(a8::Format(PROJ_ROOT, {PROJ_NAME})); + a8::MkDir(a8::Format(PROJ_LOG_ROOT, {PROJ_NAME})); + a8::UdpLog::Instance()->SetLogFileName(a8::Format(PROJ_LOG_ROOT, {PROJ_NAME}) + "/" + filename_fmt); + a8::UdpLog::Instance()->Init(); +} + +void App::UnInitLog() +{ + a8::UdpLog::Instance()->UnInit(); +} + +bool App::ParseOpt() +{ + int ch = 0; + while ((ch = getopt(argc, argv, "i:")) != -1) { + switch (ch) { + case 'i': + { + instance_id = a8::XValue(optarg); + } + break; + } + } + return instance_id > 0; +} + +a8::XParams* App::AddContext(long long context_id) +{ + context_hash_[context_id] = a8::XParams(); + return GetContext(context_id); +} + +void App::DelContext(long long context_id) +{ + context_hash_.erase(context_id); +} + +a8::XParams* App::GetContext(long long context_id) +{ + auto itr = context_hash_.find(context_id); + return itr != context_hash_.end() ? &(itr->second) : nullptr; +} diff --git a/server/wsproxy/app.h b/server/wsproxy/app.h new file mode 100644 index 0000000..97da542 --- /dev/null +++ b/server/wsproxy/app.h @@ -0,0 +1,81 @@ +#pragma once + +#include + +struct MsgNode; +struct IMMsgNode; +class App : public a8::Singleton +{ + private: + App() {}; + friend class a8::Singleton; + + public: + + void Init(int argc, char* argv[]); + void UnInit(); + + int Run(); + + void AddSocketMsg(SocketFrom_e sockfrom, + int sockhandle, + long ip_saddr, + unsigned short msgid, + const char *msgbody, + int bodylen); + void AddIMMsg(unsigned short imcmd, a8::XParams params); + + void NotifyLoopCond(); + + a8::XParams* AddContext(long long context_id); + void DelContext(long long context_id); + a8::XParams* GetContext(long long context_id); + +private: + void QuickExecute(); + void SlowerExecute(); + void Schedule(); + bool HasTask(); + + void DispatchMsg(); + void ProcessIMMsg(); + + void ProcessClientMsg(MsgHdr& hdr); + void ProcessRoomServerMsg(MsgHdr& hdr); + + void InitLog(); + void UnInitLog(); + + bool ParseOpt(); + + public: + int argc = 0; + char** argv = nullptr; + volatile bool terminated = false; + PerfMonitor perf; + a8::uuid::SnowFlake uuid; + +public: + int instance_id = 0; + + private: + std::mutex *loop_mutex_ = nullptr; + std::condition_variable *loop_cond_ = nullptr; + + std::mutex *msg_mutex_ = nullptr; + MsgNode* top_node_ = nullptr; + MsgNode* bot_node_ = nullptr; + MsgNode* work_node_ = nullptr; + + std::mutex* im_msg_mutex_ = nullptr; + IMMsgNode* im_top_node_ = nullptr; + IMMsgNode* im_bot_node_ = nullptr; + IMMsgNode* im_work_node_ = nullptr; + + std::map context_hash_; + + public: + int msgnode_size_ = 0 ; + int working_msgnode_size_ = 0; + +}; diff --git a/server/wsproxy/constant.h b/server/wsproxy/constant.h new file mode 100644 index 0000000..b73c6ba --- /dev/null +++ b/server/wsproxy/constant.h @@ -0,0 +1,37 @@ +#pragma once + +enum SocketFrom_e +{ + SF_Client, + SF_RoomServer, +}; + +enum InnerMesssage_e +{ + IM_ClientSocketDisconnect = 100, + IM_PlayerOffline, + IM_ExecGM, + IM_RSConnDisconnect +}; + +//网络处理对象 +enum NetHandler_e +{ + HID_Player, + HID_PlayerMgr, + HID_RoomSvrMgr, + HID_GCListener, +}; + +enum PlayerState_e +{ + PS_None, + PS_InRoom, + PS_Matching, + PS_WaitingMatch +}; + +const char* const PROJ_NAME = "matchvs_masterserver"; +const char* const PROJ_ROOT = "/data/logs/%s"; + +const int POSTFIX_LEN = 7; diff --git a/server/wsproxy/handlermgr.cc b/server/wsproxy/handlermgr.cc new file mode 100644 index 0000000..512e3c2 --- /dev/null +++ b/server/wsproxy/handlermgr.cc @@ -0,0 +1,92 @@ +#include "precompile.h" + +#include + +#include "vs_proto.pb.h" +#include "vs_msgid.pb.h" +namespace cs = kingsomevs; + +#include "handlermgr.h" + +#include "player.h" +#include "playermgr.h" +#include "roomsvrmgr.h" +#include "GCListener.h" + +void _GMAppEcho(a8::HTTPRequest& request, a8::MutableXObject* xobj) +{ + xobj->SetVal("error_code", 1); + xobj->SetVal("error_msg", ""); + xobj->SetVal("error_msg", a8::Get(request, "msg")); +} + +void HandlerMgr::Init() +{ + RegisterNetMsgHandlers(); + RegisterGMMsgHandler("app$echo", _GMAppEcho); +} + +void HandlerMgr::UnInit() +{ +} + +void HandlerMgr::RegisterNetMsgHandlers() +{ + RegisterNetMsgHandler(&gcmsghandler, &PlayerMgr::_CMLogin); + RegisterNetMsgHandler(&gcmsghandler, &PlayerMgr::_CMReConnect); + + RegisterNetMsgHandler(&rsmsghandler, &GCListener::_SS_RS_ForwardClientMsg); + + RegisterNetMsgHandler(&rsmsghandler, &RoomSvrMgr::_SS_Pong); + RegisterNetMsgHandler(&rsmsghandler, &RoomSvrMgr::_SS_RS_JoinRandomRoom); + RegisterNetMsgHandler(&rsmsghandler, &RoomSvrMgr::_SS_RS_CreateRandomRoom); + RegisterNetMsgHandler(&rsmsghandler, &RoomSvrMgr::_SS_RS_UpdateMatchInfo); + RegisterNetMsgHandler(&rsmsghandler, &PlayerMgr::_SS_RS_ReConnect); + + RegisterNetMsgHandler(&gcmsghandler, &Player::_CMPing); + RegisterNetMsgHandler(&gcmsghandler, &Player::_CMCreateRoom); + RegisterNetMsgHandler(&gcmsghandler, &Player::_CMJoinRandomRoom); + RegisterNetMsgHandler(&gcmsghandler, &Player::_CMJoinRoom); + RegisterNetMsgHandler(&gcmsghandler, &Player::_CMLeaveRoom); + + RegisterNetMsgHandler(&gsmsghandler, &Player::_SMCreateRoom); + RegisterNetMsgHandler(&gsmsghandler, &Player::_SMJoinRandomRoom); + RegisterNetMsgHandler(&gsmsghandler, &Player::_SMJoinRoom); + RegisterNetMsgHandler(&gsmsghandler, &Player::_SMKickPlayer); + RegisterNetMsgHandler(&gsmsghandler, &Player::_SMLeaveRoom); + RegisterNetMsgHandler(&gsmsghandler, &Player::_SMRoomKickPlayerNotify); + RegisterNetMsgHandler(&gsmsghandler, &Player::_SMRoomPeerLeaveNotify); + RegisterNetMsgHandler(&gsmsghandler, &Player::_SMRoomDisbandNotify); +} + +void HandlerMgr::ProcGMMsg(int sockhandle, const std::string& url, const std::string& querystr) +{ + if (url != "/index.php") { + GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(404, "")); + return; + } + + a8::HTTPRequest request; + a8::ParserUrlQueryString(querystr.c_str(), request); + + std::string msgname = a8::Get(request, "c").GetString() + "$" + a8::Get(request, "a").GetString(); + auto itr = gmhandlers_.find(msgname); + if (itr != gmhandlers_.end()) { + a8::MutableXObject* xobj = a8::MutableXObject::NewObject(); + itr->second(request, xobj); + + std::string response; + xobj->ToJsonStr(response); + GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(response)); + + delete xobj; + } else { + GCListener::Instance()->SendText(sockhandle, a8::HttpResponse("{}")); + } +} + +void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname, + void (*handler)(a8::HTTPRequest&, a8::MutableXObject*)) +{ + gmhandlers_[msgname] = handler; +} diff --git a/server/wsproxy/handlermgr.h b/server/wsproxy/handlermgr.h new file mode 100644 index 0000000..6176fd4 --- /dev/null +++ b/server/wsproxy/handlermgr.h @@ -0,0 +1,36 @@ +#pragma once + +#include + +#include "../common/netmsghandler.h" + +namespace a8 +{ + class MutableXObject; +} + +class HandlerMgr : public a8::Singleton +{ + + private: + HandlerMgr() {}; + friend class a8::Singleton; + + public: + + void Init(); + void UnInit(); + + NetMsgHandlerObject gcmsghandler; + NetMsgHandlerObject rsmsghandler; + NetMsgHandlerObject gsmsghandler; + + void ProcGMMsg(int sockhandle, const std::string& url, const std::string& quyerstr); + + private: + void RegisterNetMsgHandlers(); + void RegisterGMMsgHandler(const std::string& msgname, + void (*)(a8::HTTPRequest&, a8::MutableXObject*)); + + std::map gmhandlers_; +}; diff --git a/server/wsproxy/jsondatamgr.cc b/server/wsproxy/jsondatamgr.cc new file mode 100644 index 0000000..97012e7 --- /dev/null +++ b/server/wsproxy/jsondatamgr.cc @@ -0,0 +1,36 @@ +#include "precompile.h" + +#include "jsondatamgr.h" +#include "app.h" + +void JsonDataMgr::Init() +{ + std::string masterserver_cluster_json_file; + std::string roomserver_cluster_json_file; + if (getenv("machine_type")) { + masterserver_cluster_json_file = "/var/data/conf_test/matchvs/masterserver/matchvs.masterserver.cluster.json"; + roomserver_cluster_json_file = "/var/data/conf_test/matchvs/masterserver/matchvs.roomserver.cluster.json"; + } else { + masterserver_cluster_json_file = "../config/matchvs.masterserver.cluster.json"; + roomserver_cluster_json_file = "../config/matchvs.roomserver.cluster.json"; + } + masterserver_cluster_json_.ReadFromFile(masterserver_cluster_json_file); + roomserver_cluster_json_.ReadFromFile(roomserver_cluster_json_file); +} + +void JsonDataMgr::UnInit() +{ +} + +std::shared_ptr JsonDataMgr::GetConf() +{ + if (App::Instance()->instance_id < 1 || App::Instance()->instance_id > masterserver_cluster_json_.Size()) { + abort(); + } + return masterserver_cluster_json_[App::Instance()->instance_id - 1]; +} + +std::shared_ptr JsonDataMgr::GetRoomServerClusterConf() +{ + return std::make_shared(roomserver_cluster_json_); +} diff --git a/server/wsproxy/jsondatamgr.h b/server/wsproxy/jsondatamgr.h new file mode 100644 index 0000000..bf232be --- /dev/null +++ b/server/wsproxy/jsondatamgr.h @@ -0,0 +1,20 @@ +#pragma once + +class JsonDataMgr : public a8::Singleton +{ + private: + JsonDataMgr() {}; + friend class a8::Singleton; + + public: + void Init(); + void UnInit(); + + std::shared_ptr GetConf(); + std::shared_ptr GetRoomServerClusterConf(); + + private: + a8::XObject masterserver_cluster_json_; + a8::XObject roomserver_cluster_json_; + +}; diff --git a/server/wsproxy/main.cc b/server/wsproxy/main.cc new file mode 100644 index 0000000..7dffbf1 --- /dev/null +++ b/server/wsproxy/main.cc @@ -0,0 +1,11 @@ +#include "precompile.h" +#include "app.h" + +int main(int argc, char* argv[]) +{ + int exitcode = 0; + App::Instance()->Init(argc, argv); + exitcode = App::Instance()->Run(); + App::Instance()->UnInit(); + return exitcode; +} diff --git a/server/wsproxy/precompile.h b/server/wsproxy/precompile.h new file mode 100644 index 0000000..95b68c3 --- /dev/null +++ b/server/wsproxy/precompile.h @@ -0,0 +1,18 @@ +#pragma once + + +#include +#include + +#include "constant.h" +#include "types.h" + +namespace google +{ + namespace protobuf + { + class Message; + } +} + +#include "../common/protoutils.h" diff --git a/server/wsproxy/target_conn.cc b/server/wsproxy/target_conn.cc new file mode 100644 index 0000000..207bb62 --- /dev/null +++ b/server/wsproxy/target_conn.cc @@ -0,0 +1,146 @@ +#include "precompile.h" + +#include + +#include "ss_proto.pb.h" +#include "ss_msgid.pb.h" +#include "RSConn.h" +#include +#include +#include +#include "app.h" + +const int PACK_MAX = 1024 * 64; + +void RSConn::Init(int instance_id, const std::string& remote_ip, int remote_port) +{ + this->instance_id = instance_id; + this->remote_ip = remote_ip; + this->remote_port = remote_port; + + recv_bufflen_ = 0; + last_pong_tick = a8::XGetTickCount(); + recv_buff_ = (char*) malloc(PACK_MAX * 2); + tcp_client_ = new a8::TcpClient(); + tcp_client_->remote_address = remote_ip; + tcp_client_->remote_port = remote_port; + tcp_client_->on_error = std::bind(&RSConn::on_error, this, std::placeholders::_1, std::placeholders::_2); + tcp_client_->on_connect = std::bind(&RSConn::on_connect, this, std::placeholders::_1); + tcp_client_->on_disconnect = std::bind(&RSConn::on_disconnect, this, std::placeholders::_1); + tcp_client_->on_socketread = std::bind(&RSConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), + a8::XParams().SetSender(this), + [] (const a8::XParams& param) + { + RSConn* conn = (RSConn*)param.sender.GetUserData(); + conn->CheckAlive(); + }); +} + +void RSConn::UnInit() +{ + a8::Timer::Instance()->DeleteTimer(timer_); + timer_ = nullptr; + tcp_client_->Close(); + delete tcp_client_; + tcp_client_ = nullptr; + recv_bufflen_ = 0; + free(recv_buff_); + recv_buff_ = nullptr; +} + +void RSConn::Open() +{ + tcp_client_->Open(); +} + +void RSConn::Close() +{ + tcp_client_->Close(); +} + +bool RSConn::Connected() +{ + return tcp_client_->Connected(); +} + +void RSConn::on_error(a8::TcpClient* sender, int errorId) +{ + a8::UdpLog::Instance()->Error("RSConn errorid=%d", {errorId}); +} + +void RSConn::on_connect(a8::TcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("room server connected", {}); +} + +void RSConn::on_disconnect(a8::TcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("room server %d disconnected after 10s later reconnect", {instance_id}); + App::Instance()->AddIMMsg(IM_RSConnDisconnect, + a8::XParams() + .SetSender(instance_id) + ); +} + +void RSConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) +{ + #if 0 + ++App::Instance()->perf.read_count; + #endif + if (recv_bufflen_ + len > 2 * PACK_MAX) { + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Debug("recvied room server too long message", {}); + return; + } else { + memmove(&recv_buff_[recv_bufflen_], buf, len); + recv_bufflen_ += len; + } + + bool warning = false; + unsigned int offset = 0; + while (recv_bufflen_ - offset > sizeof(PackHead)) { + PackHead* p = (PackHead*) &recv_buff_[offset]; + if (p->magiccode == MAGIC_CODE) { + if (recv_bufflen_ - offset < sizeof(PackHead) + p->packlen) { + break; + } + App::Instance()->AddSocketMsg(SF_RoomServer, + instance_id, + 0, + p->msgid, + &recv_buff_[offset + sizeof(PackHead)], + p->packlen); + offset += sizeof(PackHead) + p->packlen; + } else { + warning = true; + offset++; + continue; + } + } + + if (warning) { + a8::UdpLog::Instance()->Debug("recvied bad package", {}); + } + if (offset > 0 && offset < recv_bufflen_) { + memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); + } + recv_bufflen_ -= offset; +} + +void RSConn::CheckAlive() +{ + if (!Connected()) { + Open(); + } else { + if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) { + last_pong_tick = a8::XGetTickCount(); + Open(); + } else { + ss::SS_Ping msg; + SendToRoomServer(msg); + } + } +} diff --git a/server/wsproxy/target_conn.h b/server/wsproxy/target_conn.h new file mode 100644 index 0000000..bfa1382 --- /dev/null +++ b/server/wsproxy/target_conn.h @@ -0,0 +1,46 @@ +#pragma once + +namespace a8 +{ + class TcpClient; +} + +struct timer_list; +class RSConn +{ + public: + int instance_id = 0; + std::string remote_ip; + int remote_port = 0; + int matching_player_num = 0; + a8::tick_t last_pong_tick = 0; + + public: + void Init(int instance_id, const std::string& remote_ip, int remote_port); + void UnInit(); + + void Open(); + void Close(); + bool Connected(); + + template + void SendToRoomServer(T& msg) + { + static int msgid = ::Net_GetMessageId(msg); + Net_SendMsg(tcp_client_, msgid, msg); + } + + private: + void on_error(a8::TcpClient* sender, int errorId); + void on_connect(a8::TcpClient* sender); + void on_disconnect(a8::TcpClient* sender); + void on_socketread(a8::TcpClient* sender, char* buf, unsigned int len); + + void CheckAlive(); + + private: + char *recv_buff_ = nullptr; + unsigned int recv_bufflen_ = 0; + a8::TcpClient* tcp_client_ = nullptr; + timer_list* timer_ = nullptr; +}; diff --git a/server/wsproxy/types.cc b/server/wsproxy/types.cc new file mode 100644 index 0000000..e69de29 diff --git a/server/wsproxy/types.h b/server/wsproxy/types.h new file mode 100644 index 0000000..1467ba7 --- /dev/null +++ b/server/wsproxy/types.h @@ -0,0 +1,10 @@ +#pragma once + +struct PerfMonitor +{ + int max_run_delay_time = 0; + int max_timer_idle = 0; + long long out_data_size = 0; + long long in_data_size = 0; + long long read_count = 0; +};