Compare commits

..

No commits in common. "master" and "online" have entirely different histories.

68 changed files with 705 additions and 2831 deletions

24
.gitmodules vendored
View File

@ -1,18 +1,6 @@
[submodule "third_party/tools"] [submodule "third_party/a8engine"]
path = third_party/tools path = third_party/a8engine
url = git@git.kingsome.cn:server_common/tools.git url = git@git.kingsome.cn:server_common/a8engine.git
[submodule "third_party/behaviac"] [submodule "third_party/framework"]
path = third_party/behaviac path = third_party/framework
url = git@git.kingsome.cn:libs/behaviac.git url = git@git.kingsome.cn:server_common/framework.git
[submodule "third_party/recastnavigation"]
path = third_party/recastnavigation
url = git@git.kingsome.cn:libs/recastnavigation.git
[submodule "third_party/a8"]
path = third_party/a8
url = git@git.kingsome.cn:server_common/a8.git
[submodule "third_party/f8"]
path = third_party/f8
url = git@git.kingsome.cn:server_common/f8.git
[submodule "third_party/kcp"]
path = third_party/kcp
url = git@git.kingsome.cn:libs/kcp.git

View File

@ -1,11 +0,0 @@
path_arr=$(echo $PWD|tr '/' '\n')
path_arr=(${path_arr})
GAME_ID=${path_arr[-5]}
if echo $GAME_ID | grep -q '[^0-9]'
then
echo 'game_id参数必须为数字'
exit 1
fi
python ../../third_party/tools/scripts/server/monitor.py wsproxy${GAME_ID} '-n1 -i1'

View File

@ -3,17 +3,13 @@ package ss;
//id定义 //id定义
enum SSMessageId_e enum SSMessageId_e
{ {
_SS_Ping = 8;
_SS_Pong = 9;
_SS_WSP_SocketDisconnect = 10; _SS_WSP_SocketDisconnect = 10;
_SS_WSP_RequestTargetServer = 11;
_SS_MS_ResponseTargetServer = 12;
_SS_ForceCloseSocket = 13;
_SS_CMKcpHandshake = 99;
_SS_CMPing = 101; _SS_CMPing = 101;
_SS_SMRpcError = 102; _SS_SMRpcError = 102;
_SS_CMLogin = 103; _SS_CMLogin = 103;
_SS_CMReconnect = 104; _SS_CMReConnect = 104;
_SS_Ping = 150;
_SS_Pong = 151;
} }

View File

@ -8,76 +8,11 @@ message SS_CMPing
{ {
} }
message SS_SMPing
{
optional int32 param1 = 1; optional int32 source = 2 [default = 0]; //0:tcp 1:udp
}
message SS_CMLogin_CMReConnect_CommonHead message SS_CMLogin_CMReConnect_CommonHead
{ {
optional int32 server_id = 1; optional int32 server_id = 1;
} }
message SS_CMKcpHandshake
{
optional int32 proto_version = 1; //Constant_e.ProtoVersion
optional string account_id = 2; //id
optional string session_id = 3; //session id
optional string team_uuid = 4; //
optional int32 secret_key_place = 5; // 0() 1kcp底层协议头之后()
}
message SS_SMKcpHandshake
{
optional int32 errcode = 1; //errcode != 0kcp
optional string errmsg = 2; //errmsg
optional int32 conv = 3; //conv
optional bytes secret_key = 4; //secret key客户端每次上报的时候加在包头之前
optional string remote_host = 5; //host
optional int32 remote_port = 6; //port
}
message SS_CMLogin
{
optional int32 server_id = 1; //
optional string team_uuid = 2; //
optional string account_id = 3; //id
optional int32 proto_version = 5; //Constant_e.ProtoVersion
optional string session_id = 20; //id
}
message SS_CMReconnect
{
optional int32 server_id = 1; //
optional string team_uuid = 2; //
optional string account_id = 3; //id
optional string session_id = 4; //session_id
optional string room_uuid = 5; //id
optional string server_info = 6; //
}
message SS_WSP_RequestTargetServer
{
optional int64 context_id = 1;
optional string account_id = 2;
optional string team_id = 3;
optional string server_info = 4;
optional int32 is_reconnect = 5;
optional int32 proto_version = 6; //Constant_e.ProtoVersion
optional string url = 7;
optional string query_str = 8;
optional string session_id = 9;
}
message SS_MS_ResponseTargetServer
{
optional int32 error_code = 1;
optional string error_msg = 2;
optional int64 context_id = 3;
optional string host = 4;
optional int32 port = 5;
}
message SS_SMRpcError message SS_SMRpcError
{ {
optional int32 error_code = 1; optional int32 error_code = 1;
@ -95,7 +30,3 @@ message SS_Pong
{ {
optional int32 param1 = 1; optional int32 param1 = 1;
} }
message SS_ForceCloseSocket
{
}

View File

@ -1,11 +1,9 @@
source ./common.sh cd third_party/wsproxy/server/wsproxy
python ../tools/scripts/construct/build_pb.py --nohooks 1
cd ${SOURCE_PATH} cmake $1 .
${PRE_COMPILE_CMD}
cmake ${COMPILE_FLAGS}
make clean make clean
make make
cp ../bin/${SRC_EXE_NAME} ../../../../bin/${PROJECT_NAME} cp ../bin/wsproxy ../../../../bin/
cd ../../../../ cd ../../../../
@ -13,4 +11,4 @@ tag_name=`git status |grep '# On branch '|sed 's/# On branch //g'`
dir_name=`basename $PWD` dir_name=`basename $PWD`
package_name=${dir_name}.tar.gz package_name=${dir_name}.tar.gz
tar --exclude=*.git -chzf target/${package_name} bin common.sh reload.sh restart.sh start_instance.sh manage.py config tar --exclude=*.git -chzf target/${package_name} bin reload.sh restart.sh start_instance.sh manage.py config

View File

@ -28,46 +28,47 @@ def getExePath(pid):
def getExeCmdLine(pid): def getExeCmdLine(pid):
return os.popen('cat /proc/%d/cmdline' % int(pid)).read() return os.popen('cat /proc/%d/cmdline' % int(pid)).read()
def stop(instance_id, node_id, progname): def stop(instance_id):
pids = getRuningProgramPids(progname) wsproxy_ids = getRuningProgramPids('wsproxy')
pids = wsproxy_ids
for pid in pids: for pid in pids:
exepath = getExePath(pid) exepath = getExePath(pid)
cmdline = getExeCmdLine(pid) cmdline = getExeCmdLine(pid)
if cmdline == ("./%s\0-i\0%d\0-n\0%d\0" % (progname, instance_id, node_id)): if cmdline == ("./wsproxy\0-i\0%d\0" % instance_id):
os.popen('kill -9 %d' % int(pid)) os.popen('kill -9 %d' % int(pid))
def listServer(progname): def listserver():
pids = getRuningProgramPids(progname) wsproxy_ids = getRuningProgramPids('wsproxy')
pids = wsproxy_ids
for pid in pids: for pid in pids:
exepath = getExePath(pid) exepath = getExePath(pid)
cmdline = getExeCmdLine(pid) cmdline = getExeCmdLine(pid)
print(pid, exepath, cmdline) print(pid, exepath, cmdline)
def restartServer(str_instance_ids, str_node_id, progname): def restartServer(str_instance_ids):
instance_ids = str_instance_ids.split(',') instance_ids = str_instance_ids.split(',')
node_id = int(str_node_id)
for instance_id in instance_ids: for instance_id in instance_ids:
instance_id = int(instance_id) instance_id = int(instance_id)
stop(instance_id, node_id, progname) stop(instance_id)
time.sleep(0.5) time.sleep(0.5)
print('%s %d %d starting......' % (progname, instance_id, node_id)) print('wsproxy %d starting......' % instance_id)
cmd = 'sh start_instance.sh %d %d' % (instance_id, node_id) cmd = 'sh start_instance.sh %d' % (instance_id)
os.popen(cmd) os.popen(cmd)
time.sleep(0.5) time.sleep(0.5)
def printHelp(): def printHelp():
print('usuage: [restart stop list]') print('usuage: [restart]')
def main(argv): def main(argv):
if len(argv) == 1: if len(argv) == 1:
printHelp() printHelp()
else: else:
if argv[1] == 'restart': if argv[1] == 'restart':
restartServer(argv[2], argv[3], argv[4]) restartServer(argv[2])
elif argv[1] == 'stop': elif argv[1] == 'stop':
stop(argv[2], argv[3], argv[4]) stop()
elif argv[1] == 'list': elif argv[1] == 'list':
listServer(argv[2]) listserver()
if __name__ == '__main__': if __name__ == '__main__':
main(sys.argv) main(sys.argv)

View File

@ -1,2 +0,0 @@
source ./common.sh
echo ${COMPILE_FLAGS}

View File

@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
source ./common.sh python manage.py restart $1
python manage.py restart $1 $2 ${PROJECT_NAME}
echo 'success' echo 'success'

View File

@ -1,6 +1,4 @@
#!/bin/bash #!/bin/bash
source ./common.sh
cd bin cd bin
nohup ./${PROJECT_NAME} -i $1 -n $2 >> ${PROJECT_NAME}$2_$1.out 2>&1 & nohup ./wsproxy -i $1 >> wsproxy$1.out 2>&1 &

View File

@ -1,14 +0,0 @@
cd third_party/wsproxy/server/wsproxy
python ../tools/scripts/construct/build_pb.py --nohooks 1
cmake -DGAME_ID=2001 -DMASTER_MODE=1 .
make clean
make
cp ../bin/wsproxy ../../../../bin/
cd ../../../../
tag_name=`git status |grep '# On branch '|sed 's/# On branch //g'`
dir_name=`basename $PWD`
package_name=${dir_name}.tar.gz
tar --exclude=*.git -chzf target/${package_name} bin reload.sh restart.sh start_instance.sh manage.py config

View File

@ -1,75 +0,0 @@
#!/usr/bin/python
#coding utf8
import os
import sys
import time
def getRuningProgramPids(progname):
pids = []
lines = os.popen('ps -ef | grep %s' % progname).readlines()
for l in lines:
line = ''
oldc = ''
for c in l.strip():
if c in [' ', '\t'] and c == oldc:
continue
oldc = c
line += c
line = line.split(' ')
if line[7] == './%s' % progname:
pids.append(line[1])
return pids
def getExePath(pid):
return os.popen('ls -l /proc/%d | grep "exe ->" | cut -d " " -f 7-' % int(pid)).read()
def getExeCmdLine(pid):
return os.popen('cat /proc/%d/cmdline' % int(pid)).read()
def stop(instance_id, node_id):
wsproxy_ids = getRuningProgramPids('wsproxy')
pids = wsproxy_ids
for pid in pids:
exepath = getExePath(pid)
cmdline = getExeCmdLine(pid)
if cmdline == ("./wsproxy\0-i\0%d\0-n\0%d\0" % (instance_id, node_id)):
os.popen('kill -9 %d' % int(pid))
def listServer():
wsproxy_ids = getRuningProgramPids('wsproxy')
pids = wsproxy_ids
for pid in pids:
exepath = getExePath(pid)
cmdline = getExeCmdLine(pid)
print(pid, exepath, cmdline)
def restartServer(str_instance_ids, str_node_id):
instance_ids = str_instance_ids.split(',')
node_id = int(str_node_id)
for instance_id in instance_ids:
instance_id = int(instance_id)
stop(instance_id, node_id)
time.sleep(0.5)
print('wsproxy %d starting......' % instance_id)
cmd = 'sh start_instance.sh %d %d' % (instance_id, node_id)
os.popen(cmd)
time.sleep(0.5)
def printHelp():
print('usuage: [restart]')
def main(argv):
if len(argv) == 1:
printHelp()
else:
if argv[1] == 'restart':
restartServer(argv[2], argv[3])
elif argv[1] == 'stop':
stop(argv[2], argv[3])
elif argv[1] == 'list':
listServer()
if __name__ == '__main__':
main(sys.argv)

View File

@ -1,5 +0,0 @@
#!/bin/bash
python manage.py restart $1 $2
echo 'success'

View File

@ -1,4 +0,0 @@
#!/bin/bash
cd bin
nohup ./wsproxy -i $1 -n $2 >> wsproxy$2_$1.out 2>&1 &

View File

@ -1,14 +0,0 @@
cd third_party/wsproxy/server/wsproxy
python ../tools/scripts/construct/build_pb.py --nohooks 1
cmake -DGAME_ID=2002 -DMASTER_MODE=1 .
make clean
make
cp ../bin/wsproxy ../../../../bin/wsproxy${GAME_ID}
cd ../../../../
tag_name=`git status |grep '# On branch '|sed 's/# On branch //g'`
dir_name=`basename $PWD`
package_name=${dir_name}.tar.gz
tar --exclude=*.git -chzf target/${package_name} bin reload.sh restart.sh start_instance.sh manage.py config

View File

@ -1,75 +0,0 @@
#!/usr/bin/python
#coding utf8
import os
import sys
import time
def getRuningProgramPids(progname):
pids = []
lines = os.popen('ps -ef | grep %s' % progname).readlines()
for l in lines:
line = ''
oldc = ''
for c in l.strip():
if c in [' ', '\t'] and c == oldc:
continue
oldc = c
line += c
line = line.split(' ')
if line[7] == './%s' % progname:
pids.append(line[1])
return pids
def getExePath(pid):
return os.popen('ls -l /proc/%d | grep "exe ->" | cut -d " " -f 7-' % int(pid)).read()
def getExeCmdLine(pid):
return os.popen('cat /proc/%d/cmdline' % int(pid)).read()
def stop(instance_id, node_id):
wsproxy_ids = getRuningProgramPids('wsproxy')
pids = wsproxy_ids
for pid in pids:
exepath = getExePath(pid)
cmdline = getExeCmdLine(pid)
if cmdline == ("./wsproxy\0-i\0%d\0-n\0%d\0" % (instance_id, node_id)):
os.popen('kill -9 %d' % int(pid))
def listServer():
wsproxy_ids = getRuningProgramPids('wsproxy')
pids = wsproxy_ids
for pid in pids:
exepath = getExePath(pid)
cmdline = getExeCmdLine(pid)
print(pid, exepath, cmdline)
def restartServer(str_instance_ids, str_node_id):
instance_ids = str_instance_ids.split(',')
node_id = int(str_node_id)
for instance_id in instance_ids:
instance_id = int(instance_id)
stop(instance_id, node_id)
time.sleep(0.5)
print('wsproxy %d starting......' % instance_id)
cmd = 'sh start_instance.sh %d %d' % (instance_id, node_id)
os.popen(cmd)
time.sleep(0.5)
def printHelp():
print('usuage: [restart stop list]')
def main(argv):
if len(argv) == 1:
printHelp()
else:
if argv[1] == 'restart':
restartServer(argv[2], argv[3])
elif argv[1] == 'stop':
stop(argv[2], argv[3])
elif argv[1] == 'list':
listServer()
if __name__ == '__main__':
main(sys.argv)

View File

@ -1,5 +0,0 @@
#!/bin/bash
python manage.py restart $1 $2
echo 'success'

View File

@ -1,4 +0,0 @@
#!/bin/bash
cd bin
nohup ./$3 -i $1 -n $2 >> $3$2_$1.out 2>&1 &

View File

@ -36,7 +36,7 @@ def need_rebuild():
return True return True
s1 = os.stat(proto_name + '.pb.cc') s1 = os.stat(proto_name + '.pb.cc')
s2 = os.stat('../tools/protobuild/' + proto_name + '.proto') s2 = os.stat('../tools/protobuild/' + proto_name + '.proto')
if s1.st_size <= 0 or s1.st_mtime < s2.st_mtime: if s1.st_mtime < s2.st_mtime:
return True return True
return False return False
@ -62,6 +62,9 @@ def rebuild():
except Exception as e: except Exception as e:
print('build_protocol rebuild error:' + str(e)) print('build_protocol rebuild error:' + str(e))
def repair_githooks():
os.system('/bin/bash ../tools/scripts/githooks/install.sh')
parser = OptionParser(usage="%prog [options]") parser = OptionParser(usage="%prog [options]")
parser.add_option( parser.add_option(
"-n", "-n",
@ -70,6 +73,8 @@ parser.add_option(
help = "no repair git hooks", help = "no repair git hooks",
) )
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
if not options.nohooks:
repair_githooks()
if need_rebuild(): if need_rebuild():
rebuild() rebuild()

View File

@ -4,71 +4,38 @@ cmake_minimum_required(VERSION 2.8)
if (${GAME_ID}) if (${GAME_ID})
message(GAME_ID: ${GAME_ID}) message(GAME_ID: ${GAME_ID})
else() else()
message(FATAL_ERROR "GAME_ID error") set(GAME_ID 1008)
message(GAME_ID: ${GAME_ID})
endif() endif()
set(LIB_DIR "ubuntu20.04_g++-9") set(CMAKE_BUILD_TYPE "Debug")
message(LIB_DIR: ${LIB_DIR}) set(CMAKE_CXX_FLAGS_RELEASE "-std=gnu++11 -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_DEBUG "-Wall -g -std=gnu++11")
if (${RELEASE}) set(CMAKE_CXX_FLAGS_DEBUG "-Wall -g -std=gnu++11 -DGAME_ID=${GAME_ID}")
set(CMAKE_BUILD_TYPE "Release")
message("release mode")
else()
set(CMAKE_BUILD_TYPE "Debug")
message("debug mode")
endif()
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -Wall -g -std=gnu++1z -DGAME_ID=${GAME_ID} -DNDEBUG")
if (${ASAN})
set(CMAKE_CXX_FLAGS_DEBUG "-Wall -g -std=gnu++1z -DGAME_ID=${GAME_ID} -DDEBUG -fsanitize=address -fno-omit-frame-pointer")
else()
set(CMAKE_CXX_FLAGS_DEBUG "-Wall -g -std=gnu++1z -DGAME_ID=${GAME_ID} -DDEBUG")
endif()
include_directories( include_directories(
AFTER AFTER
../../third_party/a8 ../../third_party/a8engine
../../third_party/f8
/usr/include/mysql /usr/include/mysql
/usr/include/jsoncpp /usr/include/jsoncpp
/usr/include/hiredis /usr/include/hiredis
/usr/include/eigen3
/usr/include/glm
../../third_party ../../third_party
../../third_party/behaviac/inc
../../third_party/recastnavigation/Detour/Include
../../third_party/recastnavigation/DetourTileCache/Include
. .
) )
link_directories( link_directories(
/usr/lib64/mysql /usr/lib64/mysql
/usr/local/lib /usr/local/lib
../../third_party/behaviac/lib/${LIB_DIR}
) )
aux_source_directory(../../third_party/a8/a8 aux_source_directory(../../third_party/a8engine/a8
SRC_LIST SRC_LIST
) )
aux_source_directory(../../third_party/f8/f8 aux_source_directory(../../third_party/framework/cpp
SRC_LIST SRC_LIST
) )
aux_source_directory(../../third_party/recastnavigation/Detour/Source
SRC_LIST
)
aux_source_directory(../../third_party/recastnavigation/DetourTileCache/Source
SRC_LIST
)
execute_process(
COMMAND touch -a ss_proto.pb.h
COMMAND touch -a ss_proto.pb.cc
COMMAND touch -a ss_msgid.pb.h
COMMAND touch -a ss_msgid.pb.cc
)
aux_source_directory(. aux_source_directory(.
SRC_LIST SRC_LIST
) )
@ -77,42 +44,32 @@ set(EXECUTABLE_OUTPUT_PATH
${PROJECT_BINARY_DIR}/../bin ${PROJECT_BINARY_DIR}/../bin
) )
set_directory_properties(PROPERTIES COMPILE_DEFINITIONS_DEBUG "_DEBUG")
add_executable( add_executable(
wsproxy${GAME_ID} ${SRC_LIST} wsproxy ${SRC_LIST}
) )
add_custom_target(script_pb_protocol ALL) add_custom_target(script_pb_protocol ALL)
add_custom_command(TARGET script_pb_protocol add_custom_command(TARGET script_pb_protocol
PRE_BUILD PRE_BUILD
# COMMAND python ../../tools/script/construct/build_script.py
COMMAND python ../tools/scripts/construct/build_pb.py COMMAND python ../tools/scripts/construct/build_pb.py
# COMMAND python ../../tools/script/construct/build_protocol.py
# COMMAND python ../../tools/script/construct/build_version_file.py
) )
add_dependencies(wsproxy${GAME_ID} script_pb_protocol) add_dependencies(wsproxy script_pb_protocol)
target_link_libraries( target_link_libraries(
wsproxy${GAME_ID} wsproxy
pthread pthread
mysqlclient mysqlclient
protobuf protobuf
rt rt
dl
util
crypto crypto
ssl ssl
jsoncpp jsoncpp
curl curl
hiredis hiredis
tinyxml2 tinyxml2
) )
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
target_link_libraries(
wsproxy${GAME_ID}
behaviac_gcc_debug
)
else()
target_link_libraries(
wsproxy${GAME_ID}
tcmalloc
behaviac_gcc_release
)
endif()

View File

@ -1,14 +1,10 @@
#include "precompile.h" #include "precompile.h"
#include <mutex>
#include <google/protobuf/message.h> #include <google/protobuf/message.h>
#include <a8/websocketsession.h> #include <a8/websocketsession.h>
#include <a8/tcplistener.h> #include <a8/tcplistener.h>
#include <f8/netmsghandler.h> #include "framework/cpp/netmsghandler.h"
#include <f8/udplog.h>
#include <f8/msgqueue.h>
#include "app.h" #include "app.h"
#include "GCListener.h" #include "GCListener.h"
@ -37,8 +33,7 @@ public:
p->msgid, p->msgid,
p->seqid, p->seqid,
&buf[offset + sizeof(f8::PackHead)], &buf[offset + sizeof(f8::PackHead)],
p->packlen, p->packlen);
ST_Tcp);
offset += sizeof(f8::PackHead) + p->packlen; offset += sizeof(f8::PackHead) + p->packlen;
} else { } else {
warning = true; warning = true;
@ -48,61 +43,27 @@ public:
} }
if (warning) { if (warning) {
f8::UdpLog::Instance()->Warning("收到client非法数据包", {}); a8::UdpLog::Instance()->Warning("收到client非法数据包", {});
} }
} }
virtual void OnRawHttpGet(const std::string& url, const std::string& querystr, virtual void OnRawHttpGet(const std::string& url, const std::string& querystr,
std::string& response) override std::string& response) override
{ {
f8::MsgQueue::Instance()->PostMsg App::Instance()->AddIMMsg(IM_ExecGM,
( a8::XParams()
IM_ExecGM, .SetSender(socket_handle)
a8::Args .SetParam1(url)
( .SetParam2(querystr)
{ .SetParam3(saddr));
(int)socket_handle,
(std::string)(url + ""),
(std::string)(querystr + ""),
(unsigned long)saddr
}
)
);
}
virtual bool HandleRedirect(const std::string& url, const std::string& querystr,
std::string& location) override
{
#ifdef DEBUG
f8::MsgQueue::Instance()->PostMsg
(
IM_HandleRedirect,
a8::Args
(
{
(int)socket_handle,
(std::string)(url + ""),
(std::string)(querystr + ""),
(unsigned long)saddr
}
)
);
#endif
return false;
} }
virtual void OnDisConnect() override virtual void OnDisConnect() override
{ {
f8::MsgQueue::Instance()->PostMsg App::Instance()->AddIMMsg(IM_ClientSocketDisconnect,
( a8::XParams()
IM_ClientSocketDisconnect, .SetSender(socket_handle)
a8::Args .SetParam1(1));
(
{
(int)socket_handle
}
)
);
} }
}; };
@ -114,7 +75,7 @@ static void CreateGameClientSocket(a8::TcpSession **p)
static void GSListeneron_error(a8::TcpListener*, int type, int errorid) static void GSListeneron_error(a8::TcpListener*, int type, int errorid)
{ {
f8::UdpLog::Instance()->Debug("GCListeneron_error %d %d", {type, errorid}); a8::UdpLog::Instance()->Debug("GCListeneron_error %d %d", {type, errorid});
} }
void GCListener::Init() void GCListener::Init()
@ -126,24 +87,6 @@ void GCListener::Init()
tcp_listener_->bind_address = "0.0.0.0"; tcp_listener_->bind_address = "0.0.0.0";
tcp_listener_->bind_port = JsonDataMgr::Instance()->GetConf()->At("listen_port")->AsXValue(); tcp_listener_->bind_port = JsonDataMgr::Instance()->GetConf()->At("listen_port")->AsXValue();
tcp_listener_->Open(); tcp_listener_->Open();
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_HandleRedirect,
[this] (const a8::Args& args)
{
int socket_handle = args.Get<int>(0);
std::string url = args.Get<std::string>(1);
std::string query_str = args.Get<std::string>(2);
websocket_url_hash_[socket_handle] = std::make_tuple(url, query_str);
});
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_ClientSocketDisconnect,
[this] (const a8::Args& args)
{
int socket_handle = args.Get<int>(0);
websocket_url_hash_.erase(socket_handle);
});
} }
void GCListener::UnInit() void GCListener::UnInit()
@ -152,9 +95,21 @@ void GCListener::UnInit()
tcp_listener_ = nullptr; tcp_listener_ = nullptr;
} }
void GCListener::SendBuf(unsigned short sockhandle, char* buf, int buflen) void GCListener::ForwardTargetConnMsg(f8::MsgHdr& hdr)
{ {
tcp_listener_->SendClientMsg(sockhandle, buf, buflen); char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen);
f8::PackHead* head = (f8::PackHead*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
head->rpc_error_code = 0;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::PackHead), hdr.buf, hdr.buflen);
}
tcp_listener_->SendClientMsg(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen);
free(buff);
} }
void GCListener::SendText(unsigned short sockhandle, const std::string& text) void GCListener::SendText(unsigned short sockhandle, const std::string& text)
@ -171,30 +126,3 @@ void GCListener::MarkClient(unsigned short sockhandle, bool is_active)
{ {
tcp_listener_->MarkClient(sockhandle, is_active); tcp_listener_->MarkClient(sockhandle, is_active);
} }
long long GCListener::GetSendNodeNum()
{
return tcp_listener_->send_node_num;
}
long long GCListener::GetSentBytesNum()
{
return tcp_listener_->sent_bytes_num;
}
bool GCListener::GetWebSocketUrl(int socket_handle, std::string& url, std::string& query_str)
{
auto itr = websocket_url_hash_.find(socket_handle);
if (itr != websocket_url_hash_.end()) {
url = std::get<0>(itr->second);
query_str = std::get<1>(itr->second);
return true;
} else {
return false;
}
}
int GCListener::GetSocketCount()
{
return tcp_listener_->GetClientSocketCount();
}

View File

@ -23,26 +23,15 @@ class GCListener : public a8::Singleton<GCListener>
void SendMsg(unsigned short socket_handle, T& msg) void SendMsg(unsigned short socket_handle, T& msg)
{ {
static int msgid = f8::Net_GetMessageId(msg); static int msgid = f8::Net_GetMessageId(msg);
SendMsgEx(socket_handle, msgid, msg);
}
template <typename T>
void SendMsgEx(unsigned short socket_handle, int msgid, T& msg)
{
f8::Net_SendMsg(tcp_listener_, socket_handle, 0, msgid, msg); f8::Net_SendMsg(tcp_listener_, socket_handle, 0, msgid, msg);
} }
void SendBuf(unsigned short sockhandle, char* buf, int buflen); void ForwardTargetConnMsg(f8::MsgHdr& hdr);
void SendText(unsigned short sockhandle, const std::string& text); void SendText(unsigned short sockhandle, const std::string& text);
void ForceCloseClient(unsigned short sockhandle); void ForceCloseClient(unsigned short sockhandle);
void MarkClient(unsigned short sockhandle, bool is_active); void MarkClient(unsigned short sockhandle, bool is_active);
long long GetSendNodeNum();
long long GetSentBytesNum();
bool GetWebSocketUrl(int socket_handle, std::string& url, std::string& query_str);
int GetSocketCount();
private: private:
a8::TcpListener *tcp_listener_ = nullptr; a8::TcpListener *tcp_listener_ = nullptr;
std::map<int, std::tuple<std::string, std::string>> websocket_url_hash_;
}; };

View File

@ -6,28 +6,20 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <a8/xtimer.h> #include <a8/redis.h>
#include <a8/timer.h>
#include <a8/uuid.h> #include <a8/uuid.h>
#include <a8/udplistener.h>
#include <f8/netmsghandler.h> #include "framework/cpp/netmsghandler.h"
#include <f8/udplog.h>
#include <f8/msgqueue.h>
#include "app.h" #include "app.h"
#include "GCListener.h" #include "GCListener.h"
#include "jsondatamgr.h" #include "jsondatamgr.h"
#include "handlermgr.h" #include "handlermgr.h"
#include "downstream.h" #include "target_conn.h"
#include "downstreammgr.h" #include "target_conn_mgr.h"
#include "gameclient.h"
#include "upstream.h" #include "gameclientmgr.h"
#include "upstreammgr.h"
#include "master.h"
#include "mastermgr.h"
#include "longsessionmgr.h"
#include "ss_msgid.pb.h" #include "ss_msgid.pb.h"
#include "ss_proto.pb.h" #include "ss_proto.pb.h"
@ -40,14 +32,15 @@ struct MsgNode
long ip_saddr; long ip_saddr;
char* buf; char* buf;
int buflen; int buflen;
int tag;
MsgNode* next; MsgNode* next;
}; };
struct UdpMsgNode struct IMMsgNode
{ {
a8::UdpPacket* pkt; unsigned short msgid;
UdpMsgNode* next; a8::XParams params;
IMMsgNode* next = nullptr;
}; };
const char* const PROJ_LOG_ROOT_FMT = "/data/logs/%s/logs"; const char* const PROJ_LOG_ROOT_FMT = "/data/logs/%s/logs";
@ -55,170 +48,104 @@ const char* const PROJ_LOG_FILENAME_FMT = "log_$pid_%Y%m%d.log";
static void SavePerfLog() static void SavePerfLog()
{ {
f8::UdpLog::Instance()->Info("max_run_delay_time:%d max_timer_idle:%d " a8::UdpLog::Instance()->Info(" max_run_delay_time:%d max_timer_idle:%d "
"in_data_size:%d out_data_size:%d msgnode_size:%d udp_msgnode_size:%d " "in_data_size:%d out_data_size:%d msgnode_size:%d read_count:%d",
"read_count:%d max_login_time:%d "
"max_join_time:%d tcp_count:%d udp_count:%d down_stream_count:%d",
{ {
App::Instance()->GetPerf().max_run_delay_time, App::Instance()->perf.max_run_delay_time,
App::Instance()->GetPerf().max_timer_idle, App::Instance()->perf.max_timer_idle,
App::Instance()->GetPerf().in_data_size, App::Instance()->perf.in_data_size,
App::Instance()->GetPerf().out_data_size, App::Instance()->perf.out_data_size,
App::Instance()->GetMsgNodeSize(), App::Instance()->msgnode_size_,
App::Instance()->GetUdpMsgNodeSize(), App::Instance()->perf.read_count,
App::Instance()->GetPerf().read_count,
App::Instance()->GetPerf().max_login_time,
App::Instance()->GetPerf().max_join_time,
GCListener::Instance()->GetSocketCount(),
LongSessionMgr::Instance()->GetLongSessionCount(),
DownStreamMgr::Instance()->GetDownStreamCount()
}); });
if (App::Instance()->HasFlag(2)) { #if 1
a8::XPrintf("mainloop_time:%d netmsg_time:%d send_node_num:%d sent_bytes_num:%d\n", App::Instance()->perf.max_run_delay_time = 0;
{ App::Instance()->perf.max_timer_idle = 0;
App::Instance()->GetPerf().max_run_delay_time, #else
App::Instance()->GetPerf().max_dispatchmsg_time, App::Instance()->perf = PerfMonitor();
GCListener::Instance()->GetSendNodeNum(), #endif
GCListener::Instance()->GetSentBytesNum()
});
}
App::Instance()->GetPerf().max_run_delay_time = 0;
App::Instance()->GetPerf().max_timer_idle = 0;
App::Instance()->GetPerf().max_login_time = 0;
App::Instance()->GetPerf().max_join_time = 0;
} }
bool App::Init(int argc, char* argv[]) void App::Init(int argc, char* argv[])
{ {
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
this->argc_ = argc; this->argc = argc;
this->argv_ = argv; this->argv = argv;
if (!ParseOpt()) { if (!ParseOpt()) {
terminated_ = true; terminated = true;
if (node_id_ <= 0) { a8::XPrintf("masterserver启动失败,缺少-i参数\n", {});
a8::XPrintf("gameserver启动失败,缺少-n参数\n", {}); return;
} else if (node_id_ > MAX_NODE_ID) {
a8::XPrintf("gameserver启动失败,-n参数不能大于%d\n", {MAX_NODE_ID});
} else if (instance_id_ <= 0) {
a8::XPrintf("gameserver启动失败,缺少-i参数\n", {});
} else if (instance_id_ > MAX_INSTANCE_ID) {
a8::XPrintf("gameserver启动失败,-i参数不能大于%d\n", {MAX_INSTANCE_ID});
} }
return false; a8::XPrintf("masterserver starting instance_id:%d pid:%d\n", {instance_id, getpid()});
}
a8::XPrintf("wsproxy starting node_id:%d instance_id:%d pid:%d\n",
{
node_id_,
instance_id_,
getpid()
});
uuid_ = std::make_shared<a8::uuid::SnowFlake>();
loop_mutex_ = new std::mutex(); loop_mutex_ = new std::mutex();
loop_cond_ = new std::condition_variable(); loop_cond_ = new std::condition_variable();
msg_mutex_ = new std::mutex(); msg_mutex_ = new std::mutex();
udp_msg_mutex_ = new std::mutex(); im_msg_mutex_ = new std::mutex();
srand(time(nullptr)); srand(time(nullptr));
InitLog(); InitLog();
f8::MsgQueue::Instance()->Init();
HandlerMgr::Instance()->Init(); HandlerMgr::Instance()->Init();
f8::Timer::Instance()->Init(); a8::Timer::Instance()->Init();
JsonDataMgr::Instance()->Init(); JsonDataMgr::Instance()->Init();
uuid_->SetMachineId((node_id_ - 1) * MAX_NODE_ID + instance_id_);
DownStreamMgr::Instance()->Init();
MasterMgr::Instance()->Init();
UpStreamMgr::Instance()->Init();
LongSessionMgr::Instance()->Init();
GCListener::Instance()->Init(); GCListener::Instance()->Init();
uuid.SetMachineId(instance_id);
GameClientMgr::Instance()->Init();
TargetConnMgr::Instance()->Init();
f8::UdpLog::Instance()->Info("wsproxy starting instance_id:%d pid:%d", a8::UdpLog::Instance()->Info("masterserver starting instance_id:%d pid:%d", {instance_id, getpid()});
{
instance_id_,
getpid(),
});
{ {
int perf_log_time = 1000 * 60 * 5; int perf_log_time = 1000 * 60 * 5;
if (getenv("is_dev_env")) { if (getenv("is_dev_env")) {
perf_log_time = 1000 * 10; perf_log_time = 1000 * 10;
} }
f8::Timer::Instance()->SetInterval a8::Timer::Instance()->AddRepeatTimer(perf_log_time,
(perf_log_time, a8::XParams(),
[] (int event, const a8::Args* args) [] (const a8::XParams& param)
{ {
if (a8::TIMER_EXEC_EVENT == event) {
SavePerfLog(); SavePerfLog();
}
}); });
} }
if (HasFlag(1)) {
f8::Timer::Instance()->SetTimeout
(
1000 * 60,
[] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
App::Instance()->terminated_ = true;
App::Instance()->NotifyLoopCond();
}
}
);
}
if (HasFlag(4)) {
f8::Timer::Instance()->SetTimeout
(
1000 * 30,
[] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
App::Instance()->shutdowned_ = true;
a8::XPrintf("shutdowned\n", {});
}
}
);
}
return true;
} }
void App::UnInit() void App::UnInit()
{ {
a8::XPrintf("wsproxy terminating instance_id:%d pid:%d\n", {instance_id_, getpid()}); if (terminated) {
return;
}
TargetConnMgr::Instance()->UnInit();
GameClientMgr::Instance()->UnInit();
GCListener::Instance()->UnInit(); GCListener::Instance()->UnInit();
LongSessionMgr::Instance()->UnInit();
MasterMgr::Instance()->UnInit();
UpStreamMgr::Instance()->UnInit();
DownStreamMgr::Instance()->UnInit();
JsonDataMgr::Instance()->UnInit(); JsonDataMgr::Instance()->UnInit();
f8::Timer::Instance()->UnInit(); a8::Timer::Instance()->UnInit();
f8::MsgQueue::Instance()->UnInit();
HandlerMgr::Instance()->UnInit(); HandlerMgr::Instance()->UnInit();
UnInitLog(); UnInitLog();
FreeSocketMsgQueue(); delete im_msg_mutex_;
FreeUdpMsgQueue(); im_msg_mutex_ = nullptr;
delete msg_mutex_; delete msg_mutex_;
msg_mutex_ = nullptr; msg_mutex_ = nullptr;
delete udp_msg_mutex_;
udp_msg_mutex_ = nullptr;
delete loop_cond_; delete loop_cond_;
loop_cond_ = nullptr; loop_cond_ = nullptr;
delete loop_mutex_; delete loop_mutex_;
loop_mutex_ = nullptr; loop_mutex_ = nullptr;
a8::XPrintf("wsproxy terminated instance_id:%d pid:%d\n", {instance_id_, getpid()});
} }
int App::Run() int App::Run()
{ {
if (terminated) {
return 0;
}
int ret = 0; int ret = 0;
f8::UdpLog::Instance()->Info("wsproxy running", {}); a8::UdpLog::Instance()->Info("masterserver running", {});
while (!terminated_) { while (!terminated) {
a8::tick_t begin_tick = a8::XGetTickCount(); a8::tick_t begin_tick = a8::XGetTickCount();
QuickExecute(); QuickExecute();
SlowerExecute(); SlowerExecute();
a8::tick_t end_tick = a8::XGetTickCount(); a8::tick_t end_tick = a8::XGetTickCount();
if (end_tick - begin_tick > GetPerf().max_run_delay_time) { if (end_tick - begin_tick > perf.max_run_delay_time) {
GetPerf().max_run_delay_time = end_tick - begin_tick; perf.max_run_delay_time = end_tick - begin_tick;
} }
Schedule(); Schedule();
} }
@ -231,10 +158,9 @@ void App::AddSocketMsg(SocketFrom_e sockfrom,
unsigned short msgid, unsigned short msgid,
unsigned int seqid, unsigned int seqid,
const char *msgbody, const char *msgbody,
int bodylen, int bodylen)
int tag)
{ {
MsgNode *p = (MsgNode*)malloc(sizeof(MsgNode)); MsgNode *p = (MsgNode*) malloc(sizeof(MsgNode));
memset(p, 0, sizeof(MsgNode)); memset(p, 0, sizeof(MsgNode));
p->sockfrom = sockfrom; p->sockfrom = sockfrom;
p->ip_saddr = ip_saddr; p->ip_saddr = ip_saddr;
@ -243,7 +169,6 @@ void App::AddSocketMsg(SocketFrom_e sockfrom,
p->seqid = seqid; p->seqid = seqid;
p->buf = nullptr; p->buf = nullptr;
p->buflen = bodylen; p->buflen = bodylen;
p->tag = tag;
if (bodylen > 0) { if (bodylen > 0) {
p->buf = (char*)malloc(bodylen); p->buf = (char*)malloc(bodylen);
memmove(p->buf, msgbody, bodylen); memmove(p->buf, msgbody, bodylen);
@ -261,13 +186,29 @@ void App::AddSocketMsg(SocketFrom_e sockfrom,
NotifyLoopCond(); NotifyLoopCond();
} }
void App::AddIMMsg(unsigned short imcmd, a8::XParams params)
{
IMMsgNode *p = new IMMsgNode;
p->msgid = imcmd;
p->params = params;
p->next = nullptr;
im_msg_mutex_->lock();
if (im_bot_node_) {
im_bot_node_->next = p;
im_bot_node_ = p;
} else {
im_top_node_ = p;
im_bot_node_ = p;
}
im_msg_mutex_->unlock();
NotifyLoopCond();
}
void App::QuickExecute() void App::QuickExecute()
{ {
f8::Timer::Instance()->Update(); ProcessIMMsg();
f8::MsgQueue::Instance()->Update();
DispatchMsg(); DispatchMsg();
DispatchUdpMsg(); a8::Timer::Instance()->Update();
LongSessionMgr::Instance()->Update();
} }
void App::SlowerExecute() void App::SlowerExecute()
@ -284,21 +225,30 @@ void App::Schedule()
{ {
std::unique_lock<std::mutex> lk(*loop_mutex_); std::unique_lock<std::mutex> lk(*loop_mutex_);
if (!HasTask()) { if (!HasTask()) {
#if 1 int sleep_time = a8::Timer::Instance()->GetIdleableMillSeconds();
int sleep_time = 1;
loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time));
#else
int sleep_time = f8::Timer::Instance()->GetIdleableMillSeconds();
loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time)); loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time));
if (sleep_time > perf.max_timer_idle) { if (sleep_time > perf.max_timer_idle) {
perf.max_timer_idle = sleep_time; perf.max_timer_idle = sleep_time;
} }
#endif
} }
} }
bool App::HasTask() bool App::HasTask()
{ {
{
if (!im_work_node_) {
im_msg_mutex_->lock();
if (!im_work_node_ && im_top_node_) {
im_work_node_ = im_top_node_;
im_top_node_ = nullptr;
im_bot_node_ = nullptr;
}
im_msg_mutex_->unlock();
}
if (im_work_node_) {
return true;
}
}
{ {
if (!work_node_) { if (!work_node_) {
msg_mutex_->lock(); msg_mutex_->lock();
@ -313,20 +263,6 @@ bool App::HasTask()
return true; return true;
} }
} }
{
if (!udp_work_node_) {
udp_msg_mutex_->lock();
if (!udp_work_node_ && udp_top_node_) {
udp_work_node_ = udp_top_node_;
udp_top_node_ = nullptr;
udp_bot_node_ = nullptr;
}
udp_msg_mutex_->unlock();
}
if (udp_work_node_) {
return true;
}
}
return false; return false;
} }
@ -356,17 +292,12 @@ void App::DispatchMsg()
switch (pdelnode->sockfrom) { switch (pdelnode->sockfrom) {
case SF_Client: case SF_Client:
{ {
ProcessClientMsg(hdr, pdelnode->tag); ProcessClientMsg(hdr);
} }
break; break;
case SF_TargetServer: case SF_TargetServer:
{ {
ProcessTargetServerMsg(hdr, pdelnode->tag); ProcessTargetServerMsg(hdr);
}
break;
case SF_MasterServer:
{
ProcessMasterServerMsg(hdr, pdelnode->tag);
} }
break; break;
} }
@ -385,96 +316,82 @@ void App::DispatchMsg()
} }
} }
void App::ProcessClientMsg(f8::MsgHdr& hdr, int tag) void App::ProcessClientMsg(f8::MsgHdr& hdr)
{ {
if (hdr.msgid == ss::_SS_CMLogin || if (hdr.msgid < 100) {
hdr.msgid == ss::_SS_CMReconnect ||
hdr.msgid == ss::_SS_CMKcpHandshake) {
auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle);
if (down_wp.expired()) {
switch (hdr.msgid) {
case ss::_SS_CMLogin:
{
ss::SS_CMLogin msg;
bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset);
if (ok) {
MasterMgr::Instance()->RequestTargetServer
(hdr,
msg.team_uuid(),
msg.account_id(),
msg.session_id(),
"",
0,
msg.proto_version());
}
}
break;
case ss::_SS_CMReconnect:
{
ss::SS_CMReconnect msg;
bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset);
if (ok) {
MasterMgr::Instance()->RequestTargetServer
(hdr,
msg.team_uuid(),
msg.account_id(),
msg.session_id(),
msg.server_info(),
1,
0);
}
}
break;
case ss::_SS_CMKcpHandshake:
{
ss::SS_CMKcpHandshake msg;
bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset);
if (ok) {
LongSessionMgr::Instance()->_SS_CMKcpHandshake(hdr, msg);
}
}
break;
default:
{
abort();
}
break;
}
}
} else {
auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) {
down->ProcCMMsg(hdr, tag);
}
}
}
void App::ProcessMasterServerMsg(f8::MsgHdr& hdr, int tag)
{
f8::NetMsgHandler* handler = f8::GetNetMsgHandler(&HandlerMgr::Instance()->msmsghandler,
hdr.msgid);
if (handler) {
switch (handler->handlerid) {
case HID_MasterMgr:
ProcessNetMsg(handler, MasterMgr::Instance(), hdr);
break;
}
}
}
void App::ProcessTargetServerMsg(f8::MsgHdr& hdr, int tag)
{
if (hdr.msgid == ss::_SS_ForceCloseSocket) {
GCListener::Instance()->ForceCloseClient(hdr.socket_handle);
return; return;
} }
if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReconnect) { TargetConn* conn = nullptr;
DownStreamMgr::Instance()->BindUpStream(hdr.socket_handle, hdr.ip_saddr); if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReConnect) {
ss::SS_CMLogin_CMReConnect_CommonHead msg;
bool ok = msg.ParseFromArray(hdr.buf + hdr.offset, hdr.buflen - hdr.offset);
if (ok) {
conn = TargetConnMgr::Instance()->GetConnByInstanceId(msg.server_id());
if (!conn) {
ss::SS_SMRpcError respmsg;
respmsg.set_error_code(10);
GCListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
}
} else {
return;
}
} else {
GameClient* client = GameClientMgr::Instance()->GetGameClientBySocket(hdr.socket_handle);
if (client) {
conn = client->conn;
}
}
if (conn) {
conn->ForwardClientMsg(hdr);
}
}
void App::ProcessTargetServerMsg(f8::MsgHdr& hdr)
{
if (hdr.msgid < 100) {
return;
}
if (hdr.msgid == ss::_SS_CMLogin || hdr.msgid == ss::_SS_CMReConnect) {
GameClientMgr::Instance()->BindTargetConn(hdr.socket_handle, hdr.ip_saddr);
GCListener::Instance()->MarkClient(hdr.socket_handle, true); GCListener::Instance()->MarkClient(hdr.socket_handle, true);
} }
auto down_wp = DownStreamMgr::Instance()->GetDownStream(hdr.socket_handle); GCListener::Instance()->ForwardTargetConnMsg(hdr);
if (!down_wp.expired()) { }
down_wp.lock()->ForwardUpStreamMsg(hdr);
void App::ProcessIMMsg()
{
if (!im_work_node_ && im_top_node_) {
im_msg_mutex_->lock();
im_work_node_ = im_top_node_;
im_top_node_ = nullptr;
im_bot_node_ = nullptr;
im_msg_mutex_->unlock();
}
while (im_work_node_) {
IMMsgNode *pdelnode = im_work_node_;
switch (im_work_node_->msgid) {
case IM_ClientSocketDisconnect:
{
GameClientMgr::Instance()->OnClientDisconnect(pdelnode->params);
}
break;
case IM_TargetConnDisconnect:
{
GameClientMgr::Instance()->OnTargetServerDisconnect(pdelnode->params);
}
break;
case IM_ExecGM:
{
HandlerMgr::Instance()->ProcGMMsg(pdelnode->params.param3,
pdelnode->params.sender,
pdelnode->params.param1.GetString(),
pdelnode->params.param2.GetString()
);
}
break;
}
im_work_node_ = im_work_node_->next;
delete pdelnode;
} }
} }
@ -490,156 +407,46 @@ void App::InitLog()
a8::MkDir(proj_root_dir); a8::MkDir(proj_root_dir);
a8::MkDir(proj_log_root_dir); a8::MkDir(proj_log_root_dir);
f8::UdpLog::Instance()->SetLogFileName(log_file_name); a8::UdpLog::Instance()->SetLogFileName(log_file_name);
f8::UdpLog::Instance()->Init(); a8::UdpLog::Instance()->Init();
f8::UdpLog::Instance()->Info("proj_root_dir:%s", {proj_root_dir}); a8::UdpLog::Instance()->Info("proj_root_dir:%s", {proj_root_dir});
f8::UdpLog::Instance()->Info("proj_log_root_dir:%s", {proj_log_root_dir}); a8::UdpLog::Instance()->Info("proj_log_root_dir:%s", {proj_log_root_dir});
f8::UdpLog::Instance()->Info("log_file_name:%s", {log_file_name}); a8::UdpLog::Instance()->Info("log_file_name:%s", {log_file_name});
} }
void App::UnInitLog() void App::UnInitLog()
{ {
f8::UdpLog::Instance()->UnInit(); a8::UdpLog::Instance()->UnInit();
} }
bool App::ParseOpt() bool App::ParseOpt()
{ {
int ch = 0; int ch = 0;
while ((ch = getopt(argc_, argv_, "n:i:f:")) != -1) { while ((ch = getopt(argc, argv, "i:")) != -1) {
switch (ch) { switch (ch) {
case 'n':
{
node_id_ = a8::XValue(optarg);
}
break;
case 'i': case 'i':
{ {
instance_id_ = a8::XValue(optarg); instance_id = a8::XValue(optarg);
}
break;
case 'f':
{
std::vector<std::string> strings;
a8::Split(optarg, strings, ',');
for (auto& str : strings) {
flags_.insert(a8::XValue(str).GetInt());
}
} }
break; break;
} }
} }
return instance_id_ > 0 && node_id_ > 0; return instance_id > 0;
} }
bool App::HasFlag(int flag) a8::XParams* App::AddContext(long long context_id)
{ {
return flags_.find(flag) != flags_.end(); context_hash_[context_id] = a8::XParams();
return GetContext(context_id);
} }
void App::FreeSocketMsgQueue() void App::DelContext(long long context_id)
{ {
msg_mutex_->lock(); context_hash_.erase(context_id);
if (!work_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
while (work_node_) {
MsgNode* pdelnode = work_node_;
work_node_ = work_node_->next;
if (pdelnode->buf) {
free(pdelnode->buf);
}
free(pdelnode);
if (!work_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
}
msg_mutex_->unlock();
} }
void App::FreeUdpMsgQueue() a8::XParams* App::GetContext(long long context_id)
{ {
udp_msg_mutex_->lock(); auto itr = context_hash_.find(context_id);
if (!udp_work_node_) { return itr != context_hash_.end() ? &(itr->second) : nullptr;
udp_work_node_ = udp_top_node_;
udp_top_node_ = nullptr;
udp_bot_node_ = nullptr;
}
while (udp_work_node_) {
UdpMsgNode* pdelnode = udp_work_node_;
udp_work_node_ = udp_work_node_->next;
{
if (pdelnode->pkt->buf) {
free((void*)pdelnode->pkt->buf);
}
delete pdelnode->pkt;
free(pdelnode);
}
if (!udp_work_node_) {
udp_work_node_ = udp_top_node_;
udp_top_node_ = nullptr;
udp_bot_node_ = nullptr;
}
}
udp_msg_mutex_->unlock();
}
void App::AddUdpMsg(a8::UdpPacket* pkt)
{
UdpMsgNode *p = (UdpMsgNode*) malloc(sizeof(UdpMsgNode));
memset(p, 0, sizeof(UdpMsgNode));
p->pkt = pkt;
udp_msg_mutex_->lock();
if (udp_bot_node_) {
udp_bot_node_->next = p;
udp_bot_node_ = p;
} else {
udp_top_node_ = p;
udp_bot_node_ = p;
}
++udp_msgnode_size_;
udp_msg_mutex_->unlock();
NotifyLoopCond();
}
void App::DispatchUdpMsg()
{
long long starttick = a8::XGetTickCount();
if (!udp_work_node_ && udp_top_node_) {
udp_msg_mutex_->lock();
udp_work_node_ = udp_top_node_;
udp_top_node_ = nullptr;
udp_bot_node_ = nullptr;
udp_working_msgnode_size_ = udp_msgnode_size_;
udp_msg_mutex_->unlock();
}
while (udp_work_node_) {
UdpMsgNode *pdelnode = udp_work_node_;
LongSessionMgr::Instance()->ProcUdpPacket(pdelnode->pkt);
udp_work_node_ = pdelnode->next;
{
if (pdelnode->pkt->buf) {
free((void*)pdelnode->pkt->buf);
}
delete pdelnode->pkt;
free(pdelnode);
}
udp_working_msgnode_size_--;
if (a8::XGetTickCount() - starttick > 200) {
break;
}
}//end while
if (!udp_work_node_) {
udp_working_msgnode_size_ = 0;
}
}
long long App::NewUuid()
{
return uuid_->Generate();
} }

View File

@ -1,27 +1,18 @@
#pragma once #pragma once
#include <a8/singleton.h> #include <a8/uuid.h>
namespace a8
{
struct UdpPacket;
namespace uuid
{
class SnowFlake;
}
}
struct MsgNode; struct MsgNode;
struct UdpMsgNode; struct IMMsgNode;
class App : public a8::Singleton<App> class App : public a8::Singleton<App>
{ {
private: private:
App() {}; App() {};
friend class a8::Singleton<App>; friend class a8::Singleton<App>;
public: public:
bool Init(int argc, char* argv[]); void Init(int argc, char* argv[]);
void UnInit(); void UnInit();
int Run(); int Run();
@ -32,19 +23,14 @@ public:
unsigned short msgid, unsigned short msgid,
unsigned int seqid, unsigned int seqid,
const char *msgbody, const char *msgbody,
int bodylen, int bodylen);
int tag = ST_Tcp); void AddIMMsg(unsigned short imcmd, a8::XParams params);
void AddUdpMsg(a8::UdpPacket* pkt);
void NotifyLoopCond(); void NotifyLoopCond();
bool HasFlag(int flag);
long long NewUuid(); a8::XParams* AddContext(long long context_id);
int GetNodeId() { return node_id_; } void DelContext(long long context_id);
int GetInstanceId() { return instance_id_; } a8::XParams* GetContext(long long context_id);
PerfMonitor& GetPerf() { return perf_; }
int GetMsgNodeSize() { return msgnode_size_;}
int GetUdpMsgNodeSize() { return udp_msgnode_size_;}
private: private:
void QuickExecute(); void QuickExecute();
@ -53,47 +39,44 @@ private:
bool HasTask(); bool HasTask();
void DispatchMsg(); void DispatchMsg();
void DispatchUdpMsg(); void ProcessIMMsg();
void ProcessClientMsg(f8::MsgHdr& hdr, int tag); void ProcessClientMsg(f8::MsgHdr& hdr);
void ProcessMasterServerMsg(f8::MsgHdr& hdr, int tag); void ProcessTargetServerMsg(f8::MsgHdr& hdr);
void ProcessTargetServerMsg(f8::MsgHdr& hdr, int tag);
void InitLog(); void InitLog();
void UnInitLog(); void UnInitLog();
bool ParseOpt(); bool ParseOpt();
void FreeSocketMsgQueue();
void FreeUdpMsgQueue();
private: public:
int argc_ = 0; int argc = 0;
char** argv_ = nullptr; char** argv = nullptr;
PerfMonitor perf_; volatile bool terminated = false;
volatile bool terminated_ = false; PerfMonitor perf;
volatile bool shutdowned_ = false; a8::uuid::SnowFlake uuid;
int node_id_ = 0; public:
int instance_id_ = 0; int instance_id = 0;
std::set<int> flags_;
std::shared_ptr<a8::uuid::SnowFlake> uuid_; private:
std::mutex *loop_mutex_ = nullptr; std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr; std::condition_variable *loop_cond_ = nullptr;
std::mutex* msg_mutex_ = nullptr; std::mutex *msg_mutex_ = nullptr;
MsgNode* top_node_ = nullptr; MsgNode* top_node_ = nullptr;
MsgNode* bot_node_ = nullptr; MsgNode* bot_node_ = nullptr;
MsgNode* work_node_ = nullptr; MsgNode* work_node_ = nullptr;
std::mutex* udp_msg_mutex_ = nullptr; std::mutex* im_msg_mutex_ = nullptr;
UdpMsgNode* udp_top_node_ = nullptr; IMMsgNode* im_top_node_ = nullptr;
UdpMsgNode* udp_bot_node_ = nullptr; IMMsgNode* im_bot_node_ = nullptr;
UdpMsgNode* udp_work_node_ = nullptr; IMMsgNode* im_work_node_ = nullptr;
std::map<long long, a8::XParams> context_hash_;
public:
int msgnode_size_ = 0 ; int msgnode_size_ = 0 ;
int udp_msgnode_size_ = 0 ;
int working_msgnode_size_ = 0; int working_msgnode_size_ = 0;
int udp_working_msgnode_size_ = 0;
}; };

View File

@ -4,13 +4,6 @@ enum SocketFrom_e
{ {
SF_Client, SF_Client,
SF_TargetServer, SF_TargetServer,
SF_MasterServer,
};
enum SocketTag_e
{
ST_Tcp = 1,
ST_Udp,
}; };
enum InnerMesssage_e enum InnerMesssage_e
@ -18,22 +11,27 @@ enum InnerMesssage_e
IM_ClientSocketDisconnect = 100, IM_ClientSocketDisconnect = 100,
IM_PlayerOffline, IM_PlayerOffline,
IM_ExecGM, IM_ExecGM,
IM_UpStreamDisconnect, IM_TargetConnDisconnect
IM_UpStreamConnect,
IM_HandleRedirect
}; };
//网络处理对象 //网络处理对象
enum NetHandler_e enum NetHandler_e
{ {
HID_Player,
HID_PlayerMgr,
HID_RoomSvrMgr,
HID_GCListener, HID_GCListener,
HID_MasterMgr, };
enum PlayerState_e
{
PS_None,
PS_InRoom,
PS_Matching,
PS_WaitingMatch
}; };
const char* const PROJ_NAME_FMT = "game%d_wsproxy"; const char* const PROJ_NAME_FMT = "game%d_wsproxy";
const char* const PROJ_ROOT_FMT = "/data/logs/%s"; const char* const PROJ_ROOT_FMT = "/data/logs/%s";
const int MAX_NODE_ID = 8; const int POSTFIX_LEN = 7;
const int MAX_INSTANCE_ID = 500;
const int ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT = a8::TIMER_USER_EVENT + 1;

View File

@ -1,89 +0,0 @@
#include "precompile.h"
#include "downstream.h"
#include "upstream.h"
#include "longsessionmgr.h"
#include "GCListener.h"
#include "longsession.h"
#include "kcpsession.h"
#include "ss_msgid.pb.h"
#include "ss_proto.pb.h"
void DownStream::Init(int socket_handle, std::weak_ptr<UpStream> up)
{
socket_handle_ = socket_handle;
up_ = up;
long_session_wp_ = LongSessionMgr::Instance()->GetSession(socket_handle_);
is_long_session_ = !long_session_wp_.expired();
}
void DownStream::ReBindUpStream(std::weak_ptr<UpStream> up)
{
up_ = up;
}
void DownStream::ForwardUpStreamMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen);
f8::PackHead* head = (f8::PackHead*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
head->ext_len = hdr.buflen >> 16;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::PackHead), hdr.buf, hdr.buflen);
}
if (auto long_session = long_session_wp_.lock(); !long_session_wp_.expired()) {
if (hdr.msgid == ss::_SS_CMPing) {
ss::SS_SMPing msg;
msg.set_source(1);
{
free(buff);
buff = (char*)malloc(sizeof(f8::PackHead) + msg.ByteSize());
f8::PackHead* head = (f8::PackHead*)buff;
head->packlen = msg.ByteSize();
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
head->ext_len = hdr.buflen >> 16;
msg.SerializeToArray(buff + sizeof(f8::PackHead), head->packlen);
long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen);
}
} else {
long_session->GetKcpSession()->SendClientMsg(buff, sizeof(f8::PackHead) + head->packlen);
}
} else {
GCListener::Instance()->SendBuf(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen);
}
free(buff);
}
void DownStream::OnClose()
{
if (!GetUpStream().expired()) {
ss::SS_WSP_SocketDisconnect msg;
GetUpStream().lock()->SendMsg(socket_handle_, msg);
}
if (!long_session_wp_.expired()) {
LongSessionMgr::Instance()->DelSession(socket_handle_);
}
}
void DownStream::ProcCMMsg(f8::MsgHdr& hdr, int tag)
{
if (hdr.msgid == ss::_SS_CMPing && IsLongSession() && tag == ST_Tcp) {
ss::SS_SMPing msg;
GCListener::Instance()->SendMsgEx(socket_handle_, ss::_SS_CMPing, msg);
if (!long_session_wp_.expired()) {
long_session_wp_.lock()->UpdatePing();
}
return;
}
if (!GetUpStream().expired()) {
GetUpStream().lock()->ForwardClientMsg(hdr);
}
}

View File

@ -1,25 +0,0 @@
#pragma once
class UpStream;
class LongSession;
class DownStream
{
public:
void Init(int socket_handle, std::weak_ptr<UpStream> up);
int GetSocketHandle() const { return socket_handle_; }
std::weak_ptr<UpStream> GetUpStream() const { return up_; }
void ReBindUpStream(std::weak_ptr<UpStream> up);
bool IsLongSession() { return is_long_session_; }
void ProcCMMsg(f8::MsgHdr& hdr, int tag);
void ForwardUpStreamMsg(f8::MsgHdr& hdr);
void OnClose();
private:
int socket_handle_ = a8::INVALID_SOCKET_HANDLE;
std::weak_ptr<UpStream> up_;
bool is_long_session_ = false;
std::weak_ptr<LongSession> long_session_wp_;
};

View File

@ -1,159 +0,0 @@
#include "precompile.h"
#include <f8/udplog.h>
#include <f8/msgqueue.h>
#include "downstreammgr.h"
#include "ss_proto.pb.h"
#include "downstream.h"
#include "upstream.h"
#include "upstreammgr.h"
#include "GCListener.h"
#include "app.h"
#include "mastermgr.h"
struct PendingAccount
{
int socket_handle = 0;
std::string account_id;
long long add_tick = 0;
f8::TimerWp timer_wp;
};
void DownStreamMgr::Init()
{
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_ClientSocketDisconnect,
[this] (const a8::Args& args)
{
int socket_handle = args.Get<int>(0);
OnClientDisconnect(socket_handle);
});
}
void DownStreamMgr::UnInit()
{
socket_hash_.clear();
pending_account_hash_.clear();
}
void DownStreamMgr::OnClientDisconnect(int socket_handle)
{
auto down_wp = GetDownStream(socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) {
down->OnClose();
socket_hash_.erase(socket_handle);
}
RemovePendingAccount(socket_handle);
MasterMgr::Instance()->RemoveRequest(socket_handle);
}
void DownStreamMgr::OnUpStreamDisconnect(int instance_id)
{
std::list<std::shared_ptr<DownStream>> delete_client;
for (auto& pair : socket_hash_) {
if (!pair.second->GetUpStream().expired() &&
pair.second->GetUpStream().lock()->instance_id == instance_id) {
delete_client.push_back(pair.second);
}
}
for (auto& client : delete_client) {
RemovePendingAccount(client->GetSocketHandle());
GCListener::Instance()->ForceCloseClient(client->GetSocketHandle());
socket_hash_.erase(client->GetSocketHandle());
}
}
void DownStreamMgr::OnUpStreamConnect(int instance_id)
{
}
std::weak_ptr<DownStream> DownStreamMgr::GetDownStream(int sockhandle)
{
auto itr = socket_hash_.find(sockhandle);
return itr != socket_hash_.end() ? itr->second : nullptr;
}
void DownStreamMgr::BindUpStream(int socket_handle, int instance_id)
{
std::weak_ptr<UpStream> up_wp = UpStreamMgr::Instance()->GetUpStreamById(instance_id);
if (!up_wp.expired()) {
auto down_wp = GetDownStream(socket_handle);
if (auto down = down_wp.lock(); !down_wp.expired()) {
down->ReBindUpStream(up_wp);
} else {
down = std::make_shared<DownStream>();
down->Init(socket_handle, up_wp);
socket_hash_[down->GetSocketHandle()] = down;
f8::UdpLog::Instance()->Info("BindUpStream socket_handle:%d",
{
socket_handle
});
{
if (auto pending_account = GetPendingAccount(socket_handle)) {
long long cur_tick = a8::XGetTickCount();
if (cur_tick - pending_account->add_tick > App::Instance()->GetPerf().max_join_time) {
App::Instance()->GetPerf().max_join_time = cur_tick - pending_account->add_tick;
}
f8::UdpLog::Instance()->Info("BindUpStream account_id:%s",
{
pending_account->account_id
});
RemovePendingAccount(socket_handle);
}
}
}
}
}
void DownStreamMgr::AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick)
{
f8::UdpLog::Instance()->Info("AddPendingAccount %s %d", {account_id, socket_handle});
if (!GetPendingAccount(socket_handle)){
auto timer_wp = f8::Timer::Instance()->SetTimeoutWpEx
(
1000 * 10,
[this, socket_handle] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
pending_account_hash_.erase(socket_handle);
App::Instance()->GetPerf().max_join_time =
std::max((long long)1000 * 10, App::Instance()->GetPerf().max_join_time);
}
},
&timer_attacher_
);
auto p = std::make_shared<PendingAccount>();
p->socket_handle = socket_handle;
p->account_id = account_id;
p->add_tick = req_tick;
p->timer_wp = timer_wp;
pending_account_hash_[socket_handle] = p;
}
}
std::shared_ptr<PendingAccount> DownStreamMgr::GetPendingAccount(int socket_handle)
{
auto itr = pending_account_hash_.find(socket_handle);
return itr != pending_account_hash_.end() ? itr->second : nullptr;
}
void DownStreamMgr::RemovePendingAccount(int socket_handle)
{
auto itr = pending_account_hash_.find(socket_handle);
if (itr != pending_account_hash_.end()) {
f8::UdpLog::Instance()->Info("RemovePendingAccount %d", {socket_handle});
if (!itr->second->timer_wp.expired()) {
f8::Timer::Instance()->Delete(itr->second->timer_wp);
}
pending_account_hash_.erase(itr);
}
}
int DownStreamMgr::GetDownStreamCount()
{
return socket_hash_.size();
}

View File

@ -1,35 +0,0 @@
#pragma once
#include <f8/timer.h>
class DownStream;
struct PendingAccount;
class DownStreamMgr : public a8::Singleton<DownStreamMgr>
{
private:
DownStreamMgr() {};
friend class a8::Singleton<DownStreamMgr>;
public:
void Init();
void UnInit();
void OnUpStreamDisconnect(int instance_id);
void OnUpStreamConnect(int instance_id);
std::weak_ptr<DownStream> GetDownStream(int sockhande);
void BindUpStream(int socket_handle, int instance_id);
void AddPendingAccount(const std::string& account_id, int socket_handle, long long req_tick);
int GetDownStreamCount();
private:
void OnClientDisconnect(int socket_handle);
std::shared_ptr<PendingAccount> GetPendingAccount(int socket_handle);
void RemovePendingAccount(int socket_handle);
private:
f8::Attacher timer_attacher_;
std::map<int, std::shared_ptr<DownStream>> socket_hash_;
std::map<int, std::shared_ptr<PendingAccount>> pending_account_hash_;
};

View File

@ -0,0 +1,3 @@
#include "precompile.h"
#include "gameclient.h"

View File

@ -0,0 +1,10 @@
#pragma once
class TargetConn;
class GameClient
{
public:
int socket_handle = a8::INVALID_SOCKET_HANDLE;
TargetConn* conn = nullptr;
};

View File

@ -0,0 +1,67 @@
#include "precompile.h"
#include "gameclientmgr.h"
#include "ss_proto.pb.h"
#include "gameclient.h"
#include "target_conn.h"
#include "target_conn_mgr.h"
#include "GCListener.h"
void GameClientMgr::Init()
{
}
void GameClientMgr::UnInit()
{
}
void GameClientMgr::OnClientDisconnect(a8::XParams& param)
{
GameClient* client = GetGameClientBySocket(param.sender);
if (client) {
if (client->conn) {
ss::SS_WSP_SocketDisconnect msg;
client->conn->SendMsg(msg);
}
socket_hash_.erase(param.sender);
delete client;
}
}
void GameClientMgr::OnTargetServerDisconnect(a8::XParams& param)
{
std::list<GameClient*> delete_client;
for (auto& pair : socket_hash_) {
if (pair.second->conn && pair.second->conn->instance_id == param.sender.GetInt()) {
delete_client.push_back(pair.second);
}
}
for (auto& client : delete_client) {
GCListener::Instance()->ForceCloseClient(client->socket_handle);
socket_hash_.erase(client->socket_handle);
delete client;
}
}
GameClient* GameClientMgr::GetGameClientBySocket(int sockhandle)
{
auto itr = socket_hash_.find(sockhandle);
return itr != socket_hash_.end() ? itr->second : nullptr;
}
void GameClientMgr::BindTargetConn(int socket_handle, int conn_instance_id)
{
TargetConn* conn = TargetConnMgr::Instance()->GetConnByInstanceId(conn_instance_id);
if (conn) {
GameClient* client = GetGameClientBySocket(socket_handle);
if (client) {
client->conn = conn;
} else {
client = new GameClient();
client->socket_handle = socket_handle;
client->conn = conn;
socket_hash_[client->socket_handle] = client;
}
}
}

View File

@ -0,0 +1,22 @@
#pragma once
class GameClient;
class GameClientMgr : public a8::Singleton<GameClientMgr>
{
private:
GameClientMgr() {};
friend class a8::Singleton<GameClientMgr>;
public:
void Init();
void UnInit();
void OnClientDisconnect(a8::XParams& param);
void OnTargetServerDisconnect(a8::XParams& param);
GameClient* GetGameClientBySocket(int sockhande);
void BindTargetConn(int socket_handle, int conn_instance_id);
private:
std::map<int, GameClient*> socket_hash_;
};

View File

@ -1,19 +1,12 @@
#include "precompile.h" #include "precompile.h"
#include <a8/mutable_xobject.h> #include <a8/mutable_xobject.h>
#include <f8/msgqueue.h>
#include <f8/jsonhttprequest.h>
#include "handlermgr.h" #include "handlermgr.h"
#include "GCListener.h" #include "GCListener.h"
#include "mastermgr.h"
#include "app.h"
#include "jsondatamgr.h"
#include "ss_proto.pb.h" static void _GMOpsSelfChecking(f8::JsonHttpRequest* request)
static void _GMOpsSelfChecking(std::shared_ptr<f8::JsonHttpRequest> request)
{ {
request->resp_xobj->SetVal("errcode", 0); request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", ""); request->resp_xobj->SetVal("errmsg", "");
@ -21,48 +14,10 @@ static void _GMOpsSelfChecking(std::shared_ptr<f8::JsonHttpRequest> request)
request->resp_xobj->SetVal("max_rundelay", 10); request->resp_xobj->SetVal("max_rundelay", 10);
} }
static void _GMOpsGetNodeId(std::shared_ptr<f8::JsonHttpRequest> request)
{
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("node_id", App::Instance()->GetNodeId());
}
static void _GMOpsSetKcpSwitch(std::shared_ptr<f8::JsonHttpRequest> request)
{
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
if (request->params->HasKey("open")) {
JsonDataMgr::Instance()->SetKcpSwitch(request->params->At("open")->AsXValue().GetInt());
}
request->resp_xobj->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open);
}
static void _GMOpsGetKcpSwitch(std::shared_ptr<f8::JsonHttpRequest> request)
{
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("is_open", JsonDataMgr::Instance()->GetKcpConf().open);
}
void HandlerMgr::Init() void HandlerMgr::Init()
{ {
RegisterNetMsgHandlers(); RegisterNetMsgHandlers();
RegisterGMMsgHandler("Ops$selfChecking", _GMOpsSelfChecking); RegisterGMMsgHandler("Ops$selfChecking", _GMOpsSelfChecking);
RegisterGMMsgHandler("Ops$getNodeId", _GMOpsGetNodeId);
RegisterGMMsgHandler("Ops$setKcpSwitch", _GMOpsSetKcpSwitch);
RegisterGMMsgHandler("Ops$getKcpSwitch", _GMOpsGetKcpSwitch);
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_ExecGM,
[this] (const a8::Args& args)
{
int socket_handle = args.Get<int>(0);
std::string url = args.Get<std::string>(1);
std::string query_str = args.Get<std::string>(2);
unsigned long saddr = args.Get<unsigned long>(3);
ProcGMMsg(saddr, socket_handle, url, query_str);
});
} }
void HandlerMgr::UnInit() void HandlerMgr::UnInit()
@ -71,7 +26,6 @@ void HandlerMgr::UnInit()
void HandlerMgr::RegisterNetMsgHandlers() void HandlerMgr::RegisterNetMsgHandlers()
{ {
RegisterNetMsgHandler(&msmsghandler, &MasterMgr::_SS_MS_ResponseTargetServer);
} }
void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle, void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle,
@ -88,17 +42,19 @@ void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle,
std::string msgname = a8::Get(request, "c").GetString() + "$" + a8::Get(request, "a").GetString(); std::string msgname = a8::Get(request, "c").GetString() + "$" + a8::Get(request, "a").GetString();
auto itr = gmhandlers_.find(msgname); auto itr = gmhandlers_.find(msgname);
if (itr != gmhandlers_.end()) { if (itr != gmhandlers_.end()) {
auto request = std::make_shared<f8::JsonHttpRequest>(); f8::JsonHttpRequest* request = new f8::JsonHttpRequest;
request->saddr = saddr; request->saddr = saddr;
request->socket_handle = sockhandle; request->socket_handle = sockhandle;
request->query_str = querystr; request->query_str = querystr;
request->params->ReadFromUrlQueryString(querystr); request->request.ReadFromUrlQueryString(querystr);
itr->second(request); itr->second(request);
if (!request->pending){ if (!request->pending){
std::string response; std::string response;
request->resp_xobj->ToJsonStr(response); request->resp_xobj->ToJsonStr(response);
GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(response)); GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(response));
delete request;
} }
} else { } else {
GCListener::Instance()->SendText(sockhandle, a8::HttpResponse("{}")); GCListener::Instance()->SendText(sockhandle, a8::HttpResponse("{}"));
@ -106,7 +62,7 @@ void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle,
} }
void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname, void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname,
void (*handler)(std::shared_ptr<f8::JsonHttpRequest>)) void (*handler)(f8::JsonHttpRequest*))
{ {
gmhandlers_[msgname] = handler; gmhandlers_[msgname] = handler;
} }

View File

@ -2,7 +2,7 @@
#include <a8/basehttpsession.h> #include <a8/basehttpsession.h>
#include <f8/netmsghandler.h> #include "framework/cpp/netmsghandler.h"
namespace a8 namespace a8
{ {
@ -22,7 +22,8 @@ class HandlerMgr : public a8::Singleton<HandlerMgr>
void UnInit(); void UnInit();
f8::NetMsgHandlerObject gcmsghandler; f8::NetMsgHandlerObject gcmsghandler;
f8::NetMsgHandlerObject msmsghandler; f8::NetMsgHandlerObject rsmsghandler;
f8::NetMsgHandlerObject gsmsghandler;
void ProcGMMsg(unsigned long saddr, int sockhandle, void ProcGMMsg(unsigned long saddr, int sockhandle,
const std::string& url, const std::string& querystr); const std::string& url, const std::string& querystr);
@ -30,7 +31,7 @@ class HandlerMgr : public a8::Singleton<HandlerMgr>
private: private:
void RegisterNetMsgHandlers(); void RegisterNetMsgHandlers();
void RegisterGMMsgHandler(const std::string& msgname, void RegisterGMMsgHandler(const std::string& msgname,
void (*)(std::shared_ptr<f8::JsonHttpRequest>)); void (*)(f8::JsonHttpRequest*));
std::map<std::string, void (*)(std::shared_ptr<f8::JsonHttpRequest>)> gmhandlers_; std::map<std::string, void (*)(f8::JsonHttpRequest*)> gmhandlers_;
}; };

View File

@ -1 +0,0 @@
../../third_party/kcp/ikcp.c

View File

@ -1 +0,0 @@
../../third_party/kcp/ikcp.h

View File

@ -1,86 +1,23 @@
#include "precompile.h" #include "precompile.h"
#include <mutex>
#include <f8/utils.h>
#include <f8/udplog.h>
#include "jsondatamgr.h" #include "jsondatamgr.h"
#include "app.h" #include "app.h"
void JsonDataMgr::Init() void JsonDataMgr::Init()
{ {
if (!f8::IsOnlineEnv()) { std::string wsproxyserver_cluster_json_file;
work_path_ = a8::Format std::string targetserver_cluster_json_file;
("../../../conf_test/game%d/%s", if (f8::IsOnlineEnv()) {
{ wsproxyserver_cluster_json_file = a8::Format("../config/game%d.wsproxy.cluster.json", {GAME_ID});
GAME_ID, targetserver_cluster_json_file = a8::Format("../config/game%d.gameserver.cluster.json", {GAME_ID});
f8::IsTestEnv() ? "wsproxy.test" : "wsproxy.dev" } else {
}); wsproxyserver_cluster_json_file = a8::Format("/var/data/conf_test/game%d/wsproxy/game%d.wsproxy.cluster.json",
} {GAME_ID, GAME_ID});
targetserver_cluster_json_file = a8::Format("/var/data/conf_test/game%d/wsproxy/game%d.gameserver.cluster.json",
std::string wsproxy_cluster_json_file; {GAME_ID, GAME_ID});
std::string master_cluster_json_file;
std::string kcp_conf_json_file;
wsproxy_cluster_json_file = a8::Format
("%s/node%d/game%d.wsproxy.cluster.json",
{
work_path_,
App::Instance()->GetNodeId(),
GAME_ID
});
master_cluster_json_file = a8::Format
("%s/node%d/game%d.masterserver.cluster.json",
{
work_path_,
App::Instance()->GetNodeId(),
GAME_ID
});
kcp_conf_json_file = a8::Format
("%s/kcp_conf.json",
{
work_path_,
});
wsproxy_cluster_json_.ReadFromFile(wsproxy_cluster_json_file);
master_cluster_json_.ReadFromFile(master_cluster_json_file);
kcp_conf_json_.ReadFromFile(kcp_conf_json_file);
udp_host_ = GetConf()->At("listen_udp_host")->AsXValue().GetString();
udp_port_ = GetConf()->At("listen_udp_port")->AsXValue();
{
kcp_conf_.open = kcp_conf_json_.At("open")->AsXValue();
kcp_conf_.sndwnd = kcp_conf_json_.At("sndwnd")->AsXValue();
kcp_conf_.rcvwnd = kcp_conf_json_.At("rcvwnd")->AsXValue();
kcp_conf_.nodelay = kcp_conf_json_.At("nodelay")->AsXValue();
kcp_conf_.interval = kcp_conf_json_.At("interval")->AsXValue();
kcp_conf_.resend = kcp_conf_json_.At("resend")->AsXValue();
kcp_conf_.nc = kcp_conf_json_.At("nc")->AsXValue();
kcp_conf_.rx_minrto = kcp_conf_json_.At("rx_minrto")->AsXValue();
kcp_conf_.fastresend = kcp_conf_json_.At("fastresend")->AsXValue();
f8::UdpLog::Instance()->Info
(" kcp_conf open:%d sndwnd:%d rcvwnd:%d "
"nodelay:%d interval:%d resend:%d nc:%d "
"rx_minrto:%d fastresend:%d",
{
kcp_conf_.open,
kcp_conf_.sndwnd,
kcp_conf_.rcvwnd,
kcp_conf_.nodelay,
kcp_conf_.interval,
kcp_conf_.resend,
kcp_conf_.nc,
kcp_conf_.rx_minrto,
kcp_conf_.fastresend
});
} }
wsproxyserver_cluster_json_.ReadFromFile(wsproxyserver_cluster_json_file);
targetserver_cluster_json_.ReadFromFile(targetserver_cluster_json_file);
} }
void JsonDataMgr::UnInit() void JsonDataMgr::UnInit()
@ -89,27 +26,13 @@ void JsonDataMgr::UnInit()
std::shared_ptr<a8::XObject> JsonDataMgr::GetConf() std::shared_ptr<a8::XObject> JsonDataMgr::GetConf()
{ {
for (int i = 0; i < wsproxy_cluster_json_.Size(); ++i) { if (App::Instance()->instance_id < 1 || App::Instance()->instance_id > wsproxyserver_cluster_json_.Size()) {
std::shared_ptr<a8::XObject> conf = wsproxy_cluster_json_.At(i); abort();
if (conf->At("instance_id")->AsXValue().GetInt() == App::Instance()->GetInstanceId()) {
return conf;
} }
} return wsproxyserver_cluster_json_[App::Instance()->instance_id - 1];
A8_ABORT();
} }
void JsonDataMgr::TraverseMaster(std::function<void (int, std::string, int)> cb) std::shared_ptr<a8::XObject> JsonDataMgr::GetTargetServerClusterConf()
{ {
for (int i = 0; i < master_cluster_json_.Size(); ++i) { return std::make_shared<a8::XObject>(targetserver_cluster_json_);
auto master_svr_conf = master_cluster_json_.At(i);
int instance_id = master_svr_conf->At("instance_id")->AsXValue();
std::string remote_ip = master_svr_conf->At("ip")->AsXValue();
int remote_port = master_svr_conf->At("port")->AsXValue();
cb(instance_id, remote_ip, remote_port);
}
}
void JsonDataMgr::SetKcpSwitch(int is_open)
{
kcp_conf_.open = is_open ? 1 : 0;
} }

View File

@ -1,23 +1,5 @@
#pragma once #pragma once
#include <a8/singleton.h>
struct KcpConf
{
int open = 0;
int sndwnd = 0;
int rcvwnd = 0;
int nodelay = 0;
int interval = 0;
int resend = 0;
int nc = 0;
int rx_minrto = 0;
int fastresend = 0;
};
class JsonDataMgr : public a8::Singleton<JsonDataMgr> class JsonDataMgr : public a8::Singleton<JsonDataMgr>
{ {
private: private:
@ -28,19 +10,11 @@ class JsonDataMgr : public a8::Singleton<JsonDataMgr>
void Init(); void Init();
void UnInit(); void UnInit();
std::string GetUdpHost() { return udp_host_; }
int GetUdpPort() { return udp_port_; }
std::shared_ptr<a8::XObject> GetConf(); std::shared_ptr<a8::XObject> GetConf();
void TraverseMaster(std::function<void (int, std::string, int)> cb); std::shared_ptr<a8::XObject> GetTargetServerClusterConf();
const KcpConf& GetKcpConf() { return kcp_conf_; }
void SetKcpSwitch(int is_open);
private: private:
std::string work_path_ = "../config"; a8::XObject wsproxyserver_cluster_json_;
a8::XObject wsproxy_cluster_json_; a8::XObject targetserver_cluster_json_;
a8::XObject master_cluster_json_;
a8::XObject kcp_conf_json_;
std::string udp_host_;
int udp_port_ = 0;
KcpConf kcp_conf_ = {};
}; };

View File

@ -1,185 +0,0 @@
#include "precompile.h"
#include <memory.h>
#include <string.h>
#include <f8/netmsghandler.h>
#include <f8/udplog.h>
#include "kcpsession.h"
#include "longsessionmgr.h"
#include "jsondatamgr.h"
#include "app.h"
static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64;
static int UdpOutput(const char *buf, int len, ikcpcb *kcp, void *user)
{
KcpSession* session = (KcpSession*)user;
a8::UdpPacket pkt;
pkt.buf = buf;
pkt.buf_len = len;
pkt.remote_addr = session->GetAddr();
LongSessionMgr::Instance()->GetUdpListener()->SendUdpPacket(&pkt);
return 0;
}
KcpSession::KcpSession()
{
}
KcpSession::~KcpSession()
{
}
void KcpSession::Init(int socket_handle, int secret_key_place)
{
socket_handle_ = socket_handle;
secret_key_ = App::Instance()->NewUuid();
kcp_ = ikcp_create(socket_handle_, (void*)this);
const KcpConf& kcp_conf = JsonDataMgr::Instance()->GetKcpConf();
ikcp_wndsize(kcp_, kcp_conf.sndwnd, kcp_conf.rcvwnd);
ikcp_nodelay(kcp_, kcp_conf.nodelay, kcp_conf.interval, kcp_conf.resend, kcp_conf.nc);
kcp_->rx_minrto = kcp_conf.rx_minrto;
kcp_->fastresend = kcp_conf.fastresend;
secret_key_place_ = secret_key_place;
if (secret_key_place_ > 0) {
secret_key_place_ = 1;
} else if (secret_key_place_ < 0) {
secret_key_place_ = 0;
}
kcp_->output = UdpOutput;
init_tick_ = a8::XGetTickCount();
}
void KcpSession::UnInit()
{
if (kcp_) {
ikcp_release(kcp_);
kcp_ = nullptr;
}
}
void KcpSession::Update(long long tick)
{
ikcp_update(kcp_, tick - init_tick_);
UpdateInput();
}
void KcpSession::SendClientMsg(char* buf, int buf_len)
{
ikcp_send(kcp_, buf, buf_len);
}
void KcpSession::OnRecvPacket(a8::UdpPacket* pkt)
{
const int IKCP_OVERHEAD = 24;
remote_addr_ = pkt->remote_addr;
if (GetSecretKeyPlace()) {
char* buf = (char*)malloc(pkt->buf_len - GetSecretKeyLen());
int buflen = pkt->buf_len - GetSecretKeyLen();
memmove(buf, pkt->buf, IKCP_OVERHEAD);
if (pkt->buf_len > IKCP_OVERHEAD + GetSecretKeyLen()) {
memmove(buf + IKCP_OVERHEAD, pkt->buf + IKCP_OVERHEAD + GetSecretKeyLen(), buflen - IKCP_OVERHEAD);
}
ikcp_input(kcp_, buf, buflen);
free(buf);
} else {
ikcp_input(kcp_, pkt->buf, pkt->buf_len);
}
}
void KcpSession::UpdateInput()
{
char buf[DEFAULT_MAX_RECV_BUFFERSIZE];
int buflen = ikcp_recv(kcp_, buf, DEFAULT_MAX_RECV_BUFFERSIZE - 10);
if (buflen <= 0) {
return;
}
OnSocketRead(buf, buflen);
}
void KcpSession::DecodeUserPacket(char* buf, int& offset, unsigned int buflen)
{
if (GetSecretKeyPlace()) {
DecodeUserPacketNew(buf, offset, buflen);
} else {
DecodeUserPacketOld(buf, offset, buflen);
}
}
int KcpSession::GetSecretKeyPlace()
{
return secret_key_place_;
}
void KcpSession::DecodeUserPacketOld(char* buf, int& offset, unsigned int buflen)
{
bool warning = false;
while (buflen - offset >= sizeof(f8::PackHead) + GetSecretKeyLen()) {
long long secret_key = KcpSession::ReadSecretKey(&buf[offset], buflen);
if (secret_key != secret_key_) {
warning = true;
offset++;
continue;
}
f8::PackHead* p = (f8::PackHead*)&buf[offset + GetSecretKeyLen()];
if (p->magic_code == f8::MAGIC_CODE) {
if (buflen - offset < sizeof(f8::PackHead) + p->packlen + GetSecretKeyLen()) {
break;
}
App::Instance()->AddSocketMsg(SF_Client,
socket_handle_,
0,
//saddr,
p->msgid,
p->seqid,
&buf[offset + sizeof(f8::PackHead) + GetSecretKeyLen()],
p->packlen,
ST_Udp);
offset += sizeof(f8::PackHead) + p->packlen + GetSecretKeyLen();
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
f8::UdpLog::Instance()->Warning("收到kcp client非法数据包1", {});
}
}
void KcpSession::DecodeUserPacketNew(char* buf, int& offset, unsigned int buflen)
{
bool warning = false;
while (buflen - offset >= sizeof(f8::PackHead)) {
f8::PackHead* p = (f8::PackHead*)&buf[offset];
if (p->magic_code == f8::MAGIC_CODE) {
if (buflen - offset < sizeof(f8::PackHead) + p->packlen) {
break;
}
App::Instance()->AddSocketMsg(SF_Client,
socket_handle_,
0,
//saddr,
p->msgid,
p->seqid,
&buf[offset + sizeof(f8::PackHead)],
p->packlen,
ST_Udp);
offset += sizeof(f8::PackHead) + p->packlen;
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
f8::UdpLog::Instance()->Warning("收到kcp client非法数据包2", {});
}
}

View File

@ -1,49 +0,0 @@
#pragma once
#include <a8/udpsession.h>
#include <a8/udplistener.h>
#include "ikcp.h"
class KcpSession : public a8::UdpSession
{
public:
KcpSession();
~KcpSession();
void Init(int socket_handle, int secret_key_place);
void UnInit();
void Update(long long tick);
const sockaddr_in& GetAddr() const { return remote_addr_; }
int GetSocketHandle() { return socket_handle_; }
long long GetSecretKey() { return secret_key_; }
void* GetSecretKeyDataPtr() { return &secret_key_; }
int GetSecretKeyPlace();
void SendClientMsg(char* buf, int buf_len);
virtual void OnRecvPacket(a8::UdpPacket* pkt) override;
static int GetSecretKeyLen() { return sizeof(long long); }
static long long ReadSecretKey(const char* buf, int buf_len)
{
return buf_len < GetSecretKeyLen() ? 0 : *((long long*)buf);
}
protected:
virtual void DecodeUserPacket(char* buf, int& offset, unsigned int buflen) override;
void DecodeUserPacketOld(char* buf, int& offset, unsigned int buflen);
void DecodeUserPacketNew(char* buf, int& offset, unsigned int buflen);
private:
void UpdateInput();
private:
int secret_key_place_ = 0;
int socket_handle_ = 0;
long long secret_key_ = 0;
ikcpcb* kcp_ = nullptr;
long long init_tick_ = 0;
sockaddr_in remote_addr_ = {};
};

View File

@ -1,35 +0,0 @@
#include "precompile.h"
#include <f8/netmsghandler.h>
#include "longsession.h"
#include "kcpsession.h"
#include "ss_msgid.pb.h"
#include "ss_proto.pb.h"
void LongSession::Init(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg)
{
kcp_session_ = std::make_shared<KcpSession>();
kcp_session_->Init(hdr.socket_handle, msg.secret_key_place());
}
void LongSession::UnInit()
{
kcp_session_->UnInit();
}
void LongSession::Update(long long tick)
{
GetKcpSession()->Update(tick);
}
void LongSession::UpdatePing()
{
last_ping_tick_ = a8::XGetTickCount();
}
int LongSession::GetSecretKeyPlace()
{
return kcp_session_->GetSecretKeyPlace();
}

View File

@ -1,25 +0,0 @@
#pragma once
namespace ss
{
class SS_CMKcpHandshake;
}
class KcpSession;
class LongSession
{
public:
void Init(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg);
void UnInit();
void Update(long long tick);
std::shared_ptr<KcpSession> GetKcpSession() { return kcp_session_; }
void UpdatePing();
int GetSecretKeyPlace();
private:
long long last_ping_tick_ = 0;
std::shared_ptr<KcpSession> kcp_session_;
};

View File

@ -1,182 +0,0 @@
#include "precompile.h"
#include <a8/udplistener.h>
#include <f8/netmsghandler.h>
#include <f8/udplog.h>
#include "longsessionmgr.h"
#include "app.h"
#include "jsondatamgr.h"
#include "longsession.h"
#include "kcpsession.h"
#include "GCListener.h"
#include "downstreammgr.h"
#include "ss_msgid.pb.h"
#include "ss_proto.pb.h"
static void GSUdpListeneron_error(int errorid)
{
f8::UdpLog::Instance()->Debug("GCUdpListeneron_error %d", {errorid});
}
static void GSUdpListeneron_recv_packet(a8::UdpPacket* pkt)
{
App::Instance()->AddUdpMsg(pkt);
}
void LongSessionMgr::Init()
{
udp_listener_ = std::make_shared<a8::UdpListener>();
udp_listener_->on_error = GSUdpListeneron_error;
udp_listener_->on_recv_packet = GSUdpListeneron_recv_packet;
udp_listener_->bind_address = "0.0.0.0";
udp_listener_->bind_port = JsonDataMgr::Instance()->GetUdpPort();
udp_listener_->Open();
}
void LongSessionMgr::UnInit()
{
for (auto& pair : socket_handle_hash_) {
pair.second->UnInit();
}
socket_handle_hash_.clear();
}
void LongSessionMgr::Update()
{
long long tick = a8::XGetTickCount();
for (auto& pair : socket_handle_hash_) {
pair.second->Update(tick);
}
}
void LongSessionMgr::_SS_CMKcpHandshake(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg)
{
ss::SS_SMKcpHandshake respmsg;
respmsg.set_errcode(0);
if (!JsonDataMgr::Instance()->GetKcpConf().open) {
respmsg.set_errcode(1);
respmsg.set_errmsg("not support kcp");
GCListener::Instance()->SendMsgEx(hdr.socket_handle, ss::_SS_CMKcpHandshake, respmsg);
return;
}
if (GetSession(hdr.socket_handle)) {
#ifdef DEBUG
abort();
#endif
return;
}
auto session = std::make_shared<LongSession>();
session->Init(hdr, msg);
socket_handle_hash_[session->GetKcpSession()->GetSocketHandle()] = session;
respmsg.set_conv(session->GetKcpSession()->GetSocketHandle());
respmsg.set_secret_key(session->GetKcpSession()->GetSecretKeyDataPtr(), KcpSession::GetSecretKeyLen());
respmsg.set_remote_host(JsonDataMgr::Instance()->GetUdpHost());
respmsg.set_remote_port(JsonDataMgr::Instance()->GetUdpPort());
GCListener::Instance()->SendMsgEx(hdr.socket_handle, ss::_SS_CMKcpHandshake, respmsg);
{
int socket_handle = hdr.socket_handle;
f8::Timer::Instance()->SetTimeout
(
1000 * 30,
[this, socket_handle] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
if (DownStreamMgr::Instance()->GetDownStream(socket_handle).expired()) {
GCListener::Instance()->ForceCloseClient(socket_handle);
}
}
}
);
}
}
std::shared_ptr<LongSession> LongSessionMgr::GetSession(int socket_handle)
{
auto itr = socket_handle_hash_.find(socket_handle);
return itr != socket_handle_hash_.end() ? itr->second : nullptr;
}
void LongSessionMgr::ProcUdpPacket(a8::UdpPacket* pkt)
{
#if 0
f8::UdpLog::Instance()->Debug("ProcUdpPacket host:%s port:%d buflen:%d",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
#endif
const int IKCP_OVERHEAD = 24;
if (pkt->buf_len < IKCP_OVERHEAD) {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d buflen:%d over_head_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
return;
}
int socket_handle = ikcp_getconv(pkt->buf);
auto session = GetSession(socket_handle);
if (!session) {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s socket_handle:%d session_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
socket_handle
});
return;
}
if (session->GetSecretKeyPlace()) {
if (pkt->buf_len >= IKCP_OVERHEAD + KcpSession::GetSecretKeyLen()) {
long long secret_key = KcpSession::ReadSecretKey(pkt->buf + IKCP_OVERHEAD, pkt->buf_len);
if (secret_key == session->GetKcpSession()->GetSecretKey()) {
session->GetKcpSession()->OnRecvPacket(pkt);
} else {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d socket_handle%d secret_key:%d secret_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
socket_handle,
secret_key
});
}
} else {
f8::UdpLog::Instance()->Warning("ProcUdpPacket host:%s port:%d buflen:%d bufflen_error",
{
inet_ntoa(pkt->remote_addr.sin_addr),
pkt->remote_addr.sin_port,
pkt->buf_len
});
}
} else {
session->GetKcpSession()->OnRecvPacket(pkt);
}
}
void LongSessionMgr::DelSession(int socket_handle)
{
{
auto session = GetSession(socket_handle);
if (session) {
if (session.use_count() != 2) {
#ifdef DEBUG
abort();
#endif
}
session->UnInit();
}
}
socket_handle_hash_.erase(socket_handle);
}
int LongSessionMgr::GetLongSessionCount()
{
return socket_handle_hash_.size();
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <a8/singleton.h>
namespace a8
{
class UdpListener;
struct UdpPacket;
}
namespace ss
{
class SS_CMKcpHandshake;
}
class LongSession;
class LongSessionMgr : public a8::Singleton<LongSessionMgr>
{
private:
LongSessionMgr() {};
friend class a8::Singleton<LongSessionMgr>;
public:
void Init();
void UnInit();
void Update();
void _SS_CMKcpHandshake(f8::MsgHdr& hdr, const ss::SS_CMKcpHandshake& msg);
void ProcUdpPacket(a8::UdpPacket* pkt);
std::shared_ptr<LongSession> GetSession(int socket_handle);
std::shared_ptr<a8::UdpListener> GetUdpListener() { return udp_listener_; }
void DelSession(int socket_handle);
int GetLongSessionCount();
private:
std::shared_ptr<a8::UdpListener> udp_listener_;
std::map<int, std::shared_ptr<LongSession>> socket_handle_hash_;
};

View File

@ -4,9 +4,8 @@
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
int exitcode = 0; int exitcode = 0;
if (App::Instance()->Init(argc, argv)) { App::Instance()->Init(argc, argv);
exitcode = App::Instance()->Run(); exitcode = App::Instance()->Run();
App::Instance()->UnInit(); App::Instance()->UnInit();
}
return exitcode; return exitcode;
} }

View File

@ -1,160 +0,0 @@
#include "precompile.h"
#include <string.h>
#include <a8/tcpclient.h>
#include <f8/udplog.h>
#include <f8/timer.h>
#include "ss_proto.pb.h"
#include "ss_msgid.pb.h"
#include "master.h"
#include "app.h"
const int PACK_MAX = 1024 * 64;
void Master::Init(int instance_id, const std::string& remote_ip, int remote_port)
{
this->instance_id = instance_id;
this->remote_ip = remote_ip;
this->remote_port = remote_port;
recv_bufflen_ = 0;
last_pong_tick = a8::XGetTickCount();
recv_buff_ = (char*) malloc(PACK_MAX * 2);
tcp_client_ = std::make_shared<a8::TcpClient>();
tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&Master::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&Master::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&Master::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&Master::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
timer_wp_ = f8::Timer::Instance()->SetIntervalWpEx
(1000 * 9 + a8::RandEx(500, 150),
[this] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
CheckAlive();
}
},
&attacher_);
}
void Master::UnInit()
{
if (!timer_wp_.expired()) {
f8::Timer::Instance()->Delete(timer_wp_);
}
tcp_client_->Close();
if (tcp_client_.use_count() != 1) {
abort();
}
tcp_client_ = nullptr;
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
}
void Master::Open()
{
tcp_client_->Open();
}
void Master::Close()
{
tcp_client_->Close();
}
bool Master::Connected()
{
return tcp_client_->Connected();
}
void Master::on_error(a8::TcpClient* sender, int errorId)
{
f8::UdpLog::Instance()->Error("Master errorid=%d remote_ip:%s remote_port:%d",
{
errorId,
sender->remote_address,
sender->remote_port
});
}
void Master::on_connect(a8::TcpClient* sender)
{
recv_bufflen_ = 0;
f8::UdpLog::Instance()->Info("masterserver connected", {});
}
void Master::on_disconnect(a8::TcpClient* sender)
{
recv_bufflen_ = 0;
f8::UdpLog::Instance()->Info("masterserver %d disconnected after 10s later reconnect", {instance_id});
}
void Master::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
{
#if 0
++App::Instance()->perf.read_count;
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
f8::UdpLog::Instance()->Debug("recvied masterserver too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
recv_bufflen_ += len;
}
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(f8::PackHead)) {
f8::PackHead* p = (f8::PackHead*) &recv_buff_[offset];
if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(f8::PackHead) + p->packlen) {
break;
}
App::Instance()->AddSocketMsg(SF_MasterServer,
0,
instance_id,
p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(f8::PackHead)],
p->packlen);
offset += sizeof(f8::PackHead) + p->packlen;
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
f8::UdpLog::Instance()->Debug("recvied bad package", {});
}
if (offset > 0 && offset < recv_bufflen_) {
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
#if 1
last_pong_tick = a8::XGetTickCount();
#endif
}
void Master::CheckAlive()
{
if (!Connected()) {
Open();
} else {
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
last_pong_tick = a8::XGetTickCount();
Close();
Open();
} else {
ss::SS_Ping msg;
SendMsg(msg);
}
}
}

View File

@ -1,201 +0,0 @@
#include "precompile.h"
#include <unistd.h>
#include <a8/openssl.h>
#include <f8/timer.h>
#include <f8/netmsghandler.h>
#include <f8/protoutils.h>
#include "mastermgr.h"
#include "master.h"
#include "jsondatamgr.h"
#include "ss_proto.pb.h"
#include "upstream.h"
#include "upstreammgr.h"
#include "app.h"
#include "downstreammgr.h"
#include "GCListener.h"
class RequestTarget
{
public:
long long context_id = 0;
int socket_handle = 0;
std::string account_id;
f8::MsgHdr* hdr_copy = nullptr;
f8::TimerWp timer_wp;
long long req_tick = 0;
std::weak_ptr<UpStream> conn;
};
void MasterMgr::Init()
{
curr_context_id_ = a8::MakeInt64(0, time(nullptr) + 1000 * 60 * 10);
JsonDataMgr::Instance()->TraverseMaster
(
[this] (int instance_id, std::string remote_ip, int remote_port)
{
auto conn = std::make_shared<Master>();
conn->Init(instance_id, remote_ip, remote_port);
mastersvr_hash_[conn->instance_id] = conn;
conn->Open();
});
}
void MasterMgr::UnInit()
{
for (auto& pair : mastersvr_hash_) {
pair.second->UnInit();
}
mastersvr_hash_.clear();
}
void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg)
{
auto req = GetRequestByContextId(msg.context_id());
if (req) {
if (msg.error_code() == 0) {
std::weak_ptr<UpStream> conn = UpStreamMgr::Instance()->RecreateUpStream
(
msg.host(),
msg.port()
);
if (!conn.expired()) {
conn.lock()->ForwardClientMsgEx(req->hdr_copy);
req->conn = conn;
req->hdr_copy = nullptr;
if (!req->timer_wp.expired()) {
f8::Timer::Instance()->FireEvent(req->timer_wp,
ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT,
nullptr);
f8::Timer::Instance()->Delete(req->timer_wp);
}
RemoveRequest(req->socket_handle);
return;
} else {
abort();
}
} else {
RemoveRequest(req->socket_handle);
}
}
}
std::shared_ptr<Master> MasterMgr::GetConnById(int instance_id)
{
auto itr = mastersvr_hash_.find(instance_id);
return itr != mastersvr_hash_.end() ? itr->second : nullptr;
}
void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr,
const std::string& team_id,
const std::string& account_id,
const std::string& session_id,
const std::string& server_info,
int is_reconnect,
int proto_version)
{
if (GetRequestBySocket(hdr.socket_handle)) {
return;
}
unsigned int code = 0;
std::string team_uuid = team_id;
if (!team_id.empty()) {
code = a8::openssl::Crc32((unsigned char*)team_id.data(), team_id.size());
} else {
std::string data = a8::Format("!%s_%s_%d_%d",
{
account_id,
App::Instance()->NewUuid(),
getpid(),
rand()
});
team_uuid = data;
code = a8::openssl::Crc32((unsigned char*)data.data(), data.size());
}
std::shared_ptr<Master> svr = GetConnById(code % mastersvr_hash_.size() + 1);
if (svr) {
++curr_context_id_;
auto req = std::make_shared<RequestTarget>();
req->context_id = curr_context_id_;
req->socket_handle = hdr.socket_handle;
req->account_id = account_id;
req->req_tick = a8::XGetTickCount();
req->hdr_copy = hdr.Clone();
ss::SS_WSP_RequestTargetServer msg;
msg.set_context_id(curr_context_id_);
msg.set_account_id(account_id);
msg.set_session_id(session_id);
msg.set_team_id(team_uuid);
msg.set_server_info(server_info);
msg.set_is_reconnect(is_reconnect);
msg.set_proto_version(proto_version);
#ifdef DEBUG
std::string url;
std::string query_str;
{
GCListener::Instance()->GetWebSocketUrl(hdr.socket_handle, url, query_str);
}
msg.set_url(url);
msg.set_query_str(query_str);
#endif
svr->SendMsg(msg);
pending_socket_hash_[hdr.socket_handle] = req;
assert(pending_context_hash_.find(curr_context_id_) == pending_context_hash_.end());
pending_context_hash_[curr_context_id_] = req;
req->timer_wp = f8::Timer::Instance()->SetTimeoutWp
(1000 * 10,
[req] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
MasterMgr::Instance()->RemoveRequest
(
req->socket_handle
);
long long req_handle_time = a8::XGetTickCount() - req->req_tick;
if (req_handle_time > App::Instance()->GetPerf().max_login_time) {
App::Instance()->GetPerf().max_login_time = req_handle_time;
}
DownStreamMgr::Instance()->AddPendingAccount(req->account_id, req->socket_handle, req->req_tick);
} else if (ALLOC_TARGET_SERVER_SUCCESS_TIMER_EVENT == event) {
long long req_handle_time = a8::XGetTickCount() - req->req_tick;
if (req_handle_time > App::Instance()->GetPerf().max_login_time) {
App::Instance()->GetPerf().max_login_time = req_handle_time;
}
DownStreamMgr::Instance()->AddPendingAccount(req->account_id, req->socket_handle, req->req_tick);
}
}
);
}
}
void MasterMgr::RemoveRequest(int socket_handle)
{
auto req = GetRequestBySocket(socket_handle);
if (req) {
if (req->hdr_copy) {
f8::MsgHdr::Destroy(req->hdr_copy);
req->hdr_copy = nullptr;
}
pending_context_hash_.erase(req->context_id);
pending_socket_hash_.erase(socket_handle);
}
}
std::shared_ptr<RequestTarget> MasterMgr::GetRequestBySocket(int socket_handle)
{
auto itr = pending_socket_hash_.find(socket_handle);
return itr != pending_socket_hash_.end() ? itr->second : nullptr;
}
std::shared_ptr<RequestTarget> MasterMgr::GetRequestByContextId(long long context_id)
{
auto itr = pending_context_hash_.find(context_id);
return itr != pending_context_hash_.end() ? itr->second : nullptr;
}

View File

@ -1,50 +0,0 @@
#pragma once
namespace f8
{
struct MsgHdr;
}
namespace ss
{
class SS_MS_ResponseTargetServer;
}
class RequestTarget;
class Master;
class MasterMgr : public a8::Singleton<MasterMgr>
{
public:
enum { HID = HID_MasterMgr };
private:
MasterMgr() {};
friend class a8::Singleton<MasterMgr>;
public:
void Init();
void UnInit();
void _SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg);
void RequestTargetServer(f8::MsgHdr& hdr,
const std::string& team_id,
const std::string& account_id,
const std::string& session_id,
const std::string& server_info,
int is_reconnect,
int proto_version);
void RemoveRequest(int socket_handle);
private:
std::shared_ptr<RequestTarget> GetRequestBySocket(int socket_handle);
std::shared_ptr<RequestTarget> GetRequestByContextId(long long context_id);
std::shared_ptr<Master> GetConnById(int instance_id);
private:
long long curr_context_id_ = 0;
std::map<int, std::shared_ptr<Master>> mastersvr_hash_;
std::map<int, std::shared_ptr<RequestTarget>> pending_socket_hash_;
std::map<long long, std::shared_ptr<RequestTarget>> pending_context_hash_;
};

View File

@ -1,8 +1,20 @@
#pragma once #pragma once
#include <a8/a8.h> #include <a8/a8.h>
#include <f8/f8.h> #include <a8/udplog.h>
#include <f8/timer.h>
#include "constant.h" #include "constant.h"
#include "types.h" #include "types.h"
namespace google
{
namespace protobuf
{
class Message;
}
}
#include "framework/cpp/types.h"
#include "framework/cpp/utils.h"
#include "framework/cpp/protoutils.h"

View File

@ -0,0 +1,172 @@
#include "precompile.h"
#include <string.h>
#include "ss_proto.pb.h"
#include "ss_msgid.pb.h"
#include "target_conn.h"
#include <a8/tcpclient.h>
#include <a8/udplog.h>
#include <a8/timer.h>
#include "app.h"
const int PACK_MAX = 1024 * 64;
void TargetConn::Init(int instance_id, const std::string& remote_ip, int remote_port)
{
this->instance_id = instance_id;
this->remote_ip = remote_ip;
this->remote_port = remote_port;
recv_bufflen_ = 0;
last_pong_tick = a8::XGetTickCount();
recv_buff_ = (char*) malloc(PACK_MAX * 2);
tcp_client_ = new a8::TcpClient();
tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&TargetConn::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&TargetConn::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&TargetConn::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&TargetConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150),
a8::XParams().SetSender(this),
[] (const a8::XParams& param)
{
TargetConn* conn = (TargetConn*)param.sender.GetUserData();
conn->CheckAlive();
});
}
void TargetConn::UnInit()
{
a8::Timer::Instance()->DeleteTimer(timer_);
timer_ = nullptr;
tcp_client_->Close();
delete tcp_client_;
tcp_client_ = nullptr;
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
}
void TargetConn::Open()
{
tcp_client_->Open();
}
void TargetConn::Close()
{
tcp_client_->Close();
}
bool TargetConn::Connected()
{
return tcp_client_->Connected();
}
void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
#if 0
head->rpc_error_code = 0;
#endif
head->socket_handle = hdr.socket_handle;
head->ip_saddr = hdr.ip_saddr;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen);
}
tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen);
free(buff);
}
void TargetConn::on_error(a8::TcpClient* sender, int errorId)
{
a8::UdpLog::Instance()->Error("TargetConn errorid=%d", {errorId});
}
void TargetConn::on_connect(a8::TcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server connected", {});
}
void TargetConn::on_disconnect(a8::TcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect", {instance_id});
App::Instance()->AddIMMsg(IM_TargetConnDisconnect,
a8::XParams()
.SetSender(instance_id)
);
}
void TargetConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
{
#if 0
++App::Instance()->perf.read_count;
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Debug("recvied target server too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
recv_bufflen_ += len;
}
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) {
f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset];
if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + p->packlen) {
break;
}
App::Instance()->AddSocketMsg(SF_TargetServer,
p->socket_handle,
instance_id,
p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)],
p->packlen);
offset += sizeof(f8::WSProxyPackHead_S) + p->packlen;
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
a8::UdpLog::Instance()->Debug("recvied bad package", {});
}
if (offset > 0 && offset < recv_bufflen_) {
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
#if 1
last_pong_tick = a8::XGetTickCount();
#endif
}
void TargetConn::CheckAlive()
{
if (!Connected()) {
Open();
} else {
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
last_pong_tick = a8::XGetTickCount();
Open();
} else {
ss::SS_Ping msg;
SendMsg(msg);
}
}
}

View File

@ -1,18 +1,20 @@
#pragma once #pragma once
#include <f8/protoutils.h> #include "framework/cpp/protoutils.h"
namespace a8 namespace a8
{ {
class TcpClient; class TcpClient;
} }
class Master struct timer_list;
class TargetConn
{ {
public: public:
int instance_id = 0; int instance_id = 0;
std::string remote_ip; std::string remote_ip;
int remote_port = 0; int remote_port = 0;
int matching_player_num = 0;
a8::tick_t last_pong_tick = 0; a8::tick_t last_pong_tick = 0;
public: public:
@ -27,9 +29,15 @@ class Master
void SendMsg(T& msg) void SendMsg(T& msg)
{ {
static int msgid = f8::Net_GetMessageId(msg); static int msgid = f8::Net_GetMessageId(msg);
f8::Net_SendMsg(tcp_client_.get(), 0, msgid, msg); #if 1
f8::Net_SendProxyCMsg(tcp_client_, msgid, msg);
#else
f8::Net_SendMsg(tcp_client_, 0, msgid, msg);
#endif
} }
void ForwardClientMsg(f8::MsgHdr& hdr);
private: private:
void on_error(a8::TcpClient* sender, int errorId); void on_error(a8::TcpClient* sender, int errorId);
void on_connect(a8::TcpClient* sender); void on_connect(a8::TcpClient* sender);
@ -41,7 +49,6 @@ class Master
private: private:
char *recv_buff_ = nullptr; char *recv_buff_ = nullptr;
unsigned int recv_bufflen_ = 0; unsigned int recv_bufflen_ = 0;
std::shared_ptr<a8::TcpClient> tcp_client_; a8::TcpClient* tcp_client_ = nullptr;
f8::TimerWp timer_wp_; timer_list* timer_ = nullptr;
f8::Attacher attacher_;
}; };

View File

@ -0,0 +1,32 @@
#include "precompile.h"
#include "target_conn_mgr.h"
#include "target_conn.h"
#include "jsondatamgr.h"
void TargetConnMgr::Init()
{
auto target_server_cluster_conf = JsonDataMgr::Instance()->GetTargetServerClusterConf();
for (int i = 0; i < target_server_cluster_conf->Size(); ++i) {
auto target_server_conf = target_server_cluster_conf->At(i);
int instance_id = target_server_conf->At("instance_id")->AsXValue();
std::string remote_ip = target_server_conf->At("ip")->AsXValue();
int remote_port = target_server_conf->At("port")->AsXValue();
{
TargetConn* conn = new TargetConn();
conn->Init(instance_id, remote_ip, remote_port);
target_conn_hash_[conn->instance_id] = conn;
conn->Open();
}
}
}
void TargetConnMgr::UnInit()
{
}
TargetConn* TargetConnMgr::GetConnByInstanceId(int instance_id)
{
auto itr = target_conn_hash_.find(instance_id);
return itr != target_conn_hash_.end() ? itr->second : nullptr;
}

View File

@ -0,0 +1,19 @@
#pragma once
class TargetConn;
class TargetConnMgr : public a8::Singleton<TargetConnMgr>
{
private:
TargetConnMgr() {};
friend class a8::Singleton<TargetConnMgr>;
public:
void Init();
void UnInit();
TargetConn* GetConnByInstanceId(int instance_id);
private:
std::map<int, TargetConn*> target_conn_hash_;
};

View File

@ -3,10 +3,7 @@
struct PerfMonitor struct PerfMonitor
{ {
int max_run_delay_time = 0; int max_run_delay_time = 0;
int max_dispatchmsg_time = 0;
int max_timer_idle = 0; int max_timer_idle = 0;
long long max_login_time = 0;
long long max_join_time = 0;
long long out_data_size = 0; long long out_data_size = 0;
long long in_data_size = 0; long long in_data_size = 0;
long long read_count = 0; long long read_count = 0;

View File

@ -1,283 +0,0 @@
#include "precompile.h"
#include <string.h>
#include <a8/tcpclient.h>
#include <f8/udplog.h>
#include <f8/timer.h>
#include <f8/msgqueue.h>
#include "ss_proto.pb.h"
#include "ss_msgid.pb.h"
#include "upstream.h"
#include "app.h"
const int PACK_MAX = 1024 * 64 * 2;
void UpStream::Init(int instance_id, const std::string& remote_ip, int remote_port)
{
if (remote_ip.empty()) {
abort();
}
this->instance_id = instance_id;
this->remote_ip = remote_ip;
this->remote_port = remote_port;
recv_bufflen_ = 0;
last_pong_tick = a8::XGetTickCount();
recv_buff_ = (char*) malloc(PACK_MAX * 2);
tcp_client_ = std::make_shared<a8::TcpClient>();
tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&UpStream::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&UpStream::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&UpStream::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&UpStream::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
timer_wp_ = f8::Timer::Instance()->SetIntervalWpEx
(1000 * 9 + a8::RandEx(500, 150),
[this] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
CheckAlive();
}
},
&attacher_);
}
void UpStream::UnInit()
{
UpStreamMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
UpStreamMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
delete pdelnode->msg;
delete pdelnode;
}
if (!timer_wp_.expired()) {
f8::Timer::Instance()->Delete(timer_wp_);
}
tcp_client_->Close();
if(tcp_client_.use_count() != 1) {
abort();
}
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
}
void UpStream::Open()
{
tcp_client_->Open();
}
void UpStream::Close()
{
tcp_client_->Close();
}
bool UpStream::Connected()
{
return tcp_client_->Connected();
}
void UpStream::SendStockMsg()
{
UpStreamMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
UpStreamMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
if (pdelnode->msg) {
f8::Net_SendProxyCMsg(tcp_client_.get(), pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg);
delete pdelnode->msg;
}
if (pdelnode->hdr) {
ForwardClientMsg(*pdelnode->hdr);
f8::MsgHdr::Destroy(pdelnode->hdr);
pdelnode->hdr = nullptr;
}
delete pdelnode;
}
}
void UpStream::ForwardClientMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
memset(buff, 0, sizeof(f8::WSProxyPackHead_C));
f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
#if 0
head->rpc_error_code = 0;
#endif
head->socket_handle = hdr.socket_handle;
head->ip_saddr = hdr.ip_saddr;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen);
}
tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen);
free(buff);
}
void UpStream::ForwardClientMsgEx(f8::MsgHdr* hdr)
{
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
ForwardClientMsg(*hdr);
if (hdr->buf) {
free((char*)hdr->buf);
}
free(hdr);
} else {
AddStockMsg(hdr->socket_handle, 0, nullptr, hdr);
}
}
void UpStream::on_error(a8::TcpClient* sender, int errorId)
{
f8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d",
{
errorId,
sender->remote_address,
sender->remote_port
});
}
void UpStream::on_connect(a8::TcpClient* sender)
{
recv_bufflen_ = 0;
f8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d",
{
sender->remote_address,
sender->remote_port
});
f8::MsgQueue::Instance()->PostMsg
(
IM_UpStreamConnect,
a8::Args
(
{
instance_id
}
)
);
}
void UpStream::on_disconnect(a8::TcpClient* sender)
{
recv_bufflen_ = 0;
f8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect "
"remote_ip:%s remote_port:%d",
{
instance_id,
sender->remote_address,
sender->remote_port
});
f8::MsgQueue::Instance()->PostMsg
(
IM_UpStreamDisconnect,
a8::Args
(
{
instance_id
}
)
);
}
void UpStream::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
{
#if 0
++App::Instance()->perf.read_count;
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
f8::UdpLog::Instance()->Debug("recvied target server too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
recv_bufflen_ += len;
}
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) {
f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset];
int real_len = p->packlen + (p->ext_len << 16);
if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) {
break;
}
App::Instance()->AddSocketMsg(SF_TargetServer,
p->socket_handle,
instance_id,
p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)],
real_len);
offset += sizeof(f8::WSProxyPackHead_S) + real_len;
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
f8::UdpLog::Instance()->Debug("recvied bad package", {});
}
if (offset > 0 && offset < recv_bufflen_) {
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
#if 1
last_pong_tick = a8::XGetTickCount();
#endif
}
void UpStream::CheckAlive()
{
if (!Connected()) {
Open();
} else {
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
last_pong_tick = a8::XGetTickCount();
Open();
} else {
ss::SS_Ping msg;
SendMsg(0, msg);
}
}
}
void UpStream::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr)
{
UpStreamMsgNode* node = new UpStreamMsgNode();
node->socket_handle = socket_handle;
node->msgid = msgid;
node->msg = msg;
node->hdr = hdr;
if (bot_node_) {
bot_node_->next_node = node;
bot_node_ = node;
} else {
top_node_ = node;
bot_node_ = node;
}
}

View File

@ -1,75 +0,0 @@
#pragma once
#include <f8/protoutils.h>
namespace a8
{
class TcpClient;
}
struct UpStreamMsgNode
{
unsigned short socket_handle = 0;
int msgid = 0;
::google::protobuf::Message* msg = nullptr;
f8::MsgHdr* hdr = nullptr;
UpStreamMsgNode* next_node = nullptr;
};
class UpStream
{
public:
int instance_id = 0;
std::string remote_ip;
int remote_port = 0;
a8::tick_t last_pong_tick = 0;
public:
void Init(int instance_id, const std::string& remote_ip, int remote_port);
void UnInit();
void Open();
void Close();
bool Connected();
template <typename T>
void SendMsg(int socket_handle, T& msg)
{
static int msgid = f8::Net_GetMessageId(msg);
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
f8::Net_SendProxyCMsg(tcp_client_.get(), socket_handle, msgid, msg);
} else {
T* new_msg = new T();
*new_msg = msg;
AddStockMsg(socket_handle, msgid, new_msg, nullptr);
}
}
void SendStockMsg();
void ForwardClientMsg(f8::MsgHdr& hdr);
void ForwardClientMsgEx(f8::MsgHdr* hdr);
private:
void on_error(a8::TcpClient* sender, int errorId);
void on_connect(a8::TcpClient* sender);
void on_disconnect(a8::TcpClient* sender);
void on_socketread(a8::TcpClient* sender, char* buf, unsigned int len);
void CheckAlive();
void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr);
private:
char *recv_buff_ = nullptr;
unsigned int recv_bufflen_ = 0;
std::shared_ptr<a8::TcpClient> tcp_client_;
f8::TimerWp timer_wp_;
f8::Attacher attacher_;
UpStreamMsgNode* top_node_ = nullptr;
UpStreamMsgNode* bot_node_ = nullptr;
};

View File

@ -1,65 +0,0 @@
#include "precompile.h"
#include <f8/msgqueue.h>
#include "upstreammgr.h"
#include "upstream.h"
void UpStreamMgr::Init()
{
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_UpStreamConnect,
[this] (const a8::Args& args)
{
int instance_id = args.Get<int>(0);
std::weak_ptr<UpStream> up_wp = GetUpStreamById(instance_id);
if (!up_wp.expired() && up_wp.lock()->Connected()) {
up_wp.lock()->SendStockMsg();
}
});
f8::MsgQueue::Instance()->RegisterCallBack
(
IM_UpStreamDisconnect,
[this] (const a8::Args& args)
{
});
}
void UpStreamMgr::UnInit()
{
for (auto& pair : id_hash_) {
pair.second->UnInit();
}
}
std::weak_ptr<UpStream> UpStreamMgr::GetUpStreamByKey(const std::string& key)
{
auto itr = key_hash_.find(key);
return itr != key_hash_.end() ? itr->second : nullptr;
}
std::weak_ptr<UpStream> UpStreamMgr::GetUpStreamById(int instance_id)
{
auto itr = id_hash_.find(instance_id);
return itr != id_hash_.end() ? itr->second : nullptr;
}
std::weak_ptr<UpStream> UpStreamMgr::RecreateUpStream(const std::string& host, int port)
{
std::string key = host + ":" + a8::XValue(port).GetString();
if (!GetUpStreamByKey(key).expired()) {
return GetUpStreamByKey(key);
}
while (!GetUpStreamById(++curr_id_).expired()) {};
int instance_id = curr_id_;
std::string remote_ip = host;
int remote_port = port;
std::shared_ptr<UpStream> conn = std::make_shared<UpStream>();
conn->Init(instance_id, remote_ip, remote_port);
id_hash_[conn->instance_id] = conn;
key_hash_[key] = conn;
conn->Open();
return conn;
}

View File

@ -1,25 +0,0 @@
#pragma once
#include <a8/singleton.h>
class UpStream;
class UpStreamMgr : public a8::Singleton<UpStreamMgr>
{
private:
UpStreamMgr() {};
friend class a8::Singleton<UpStreamMgr>;
public:
void Init();
void UnInit();
std::weak_ptr<UpStream> GetUpStreamByKey(const std::string& key);
std::weak_ptr<UpStream> GetUpStreamById(int instance_id);
std::weak_ptr<UpStream> RecreateUpStream(const std::string& host, int port);
private:
unsigned short curr_id_ = 1000;
std::map<std::string, std::shared_ptr<UpStream>> key_hash_;
std::map<int, std::shared_ptr<UpStream>> id_hash_;
};

1
third_party/a8 vendored

@ -1 +0,0 @@
Subproject commit 1e577389c8a2870db9ddbf18577bfca24def049b

1
third_party/a8engine vendored Submodule

@ -0,0 +1 @@
Subproject commit bc1e1e002cdfbbac07abdf14151afb0bbd8025a8

@ -1 +0,0 @@
Subproject commit 8cd7e2432785bf4027e89c5dc74cb4980e4cd3c1

1
third_party/f8 vendored

@ -1 +0,0 @@
Subproject commit 243bbe515ef4a01089f9a6cf608c93d4097018de

1
third_party/framework vendored Submodule

@ -0,0 +1 @@
Subproject commit 18133846b6672634219c080064b7a24720d17588

1
third_party/kcp vendored

@ -1 +0,0 @@
Subproject commit 10ee2b30c8739408c0db98211df4ebce7b01c0ed

@ -1 +0,0 @@
Subproject commit 0165dc279d62af5536f122ea30c3cd3f642f34ee

1
third_party/tools vendored

@ -1 +0,0 @@
Subproject commit d38ce0b86c7b9262391f48108d808ddb62854760