diff --git a/server/dbproxy/app.cc b/server/dbproxy/app.cc index ed9ebae..f631e41 100644 --- a/server/dbproxy/app.cc +++ b/server/dbproxy/app.cc @@ -11,6 +11,8 @@ #include #include "framework/cpp/netmsghandler.h" +#include "framework/cpp/msgqueue.h" +#include "framework/cpp/dbpool.h" #include "app.h" #include "GSListener.h" @@ -18,7 +20,6 @@ #include "handlermgr.h" #include "ss_msgid.pb.h" #include "ss_proto.pb.h" -#include "dbpool.h" struct MsgNode { @@ -86,8 +87,10 @@ void App::Init(int argc, char* argv[]) InitLog(); HandlerMgr::Instance()->Init(); a8::Timer::Instance()->Init(); + f8::MsgQueue::Instance()->Init(); JsonDataMgr::Instance()->Init(); - DBPool::Instance()->Init(); + f8::DBPool::Instance()->Init(); + f8::DBPool::Instance()->SetThreadNum(10); GSListener::Instance()->Init(); uuid.SetMachineId(instance_id); @@ -112,8 +115,9 @@ void App::UnInit() return; } GSListener::Instance()->UnInit(); - DBPool::Instance()->UnInit(); + f8::DBPool::Instance()->UnInit(); JsonDataMgr::Instance()->UnInit(); + f8::MsgQueue::Instance()->UnInit(); a8::Timer::Instance()->UnInit(); HandlerMgr::Instance()->UnInit(); UnInitLog(); @@ -320,7 +324,7 @@ void App::ProcessGSMsg(f8::MsgHdr& hdr) break; case HID_DBPool: { - ProcessNetMsg(handler, DBPool::Instance(), hdr); + // ProcessNetMsg(handler, DBPool::Instance(), hdr); } break; } @@ -339,6 +343,15 @@ void App::ProcessIMMsg() while (im_work_node_) { IMMsgNode *pdelnode = im_work_node_; switch (im_work_node_->msgid) { + case f8::IM_SysMsgQueue: + { + const a8::XParams* param = (const a8::XParams*)pdelnode->params.param1.GetUserData(); + f8::MsgQueue::Instance()->ProcessMsg(pdelnode->params.sender.GetInt(), + *param + ); + delete param; + } + break; case IM_ExecGM: { HandlerMgr::Instance()->ProcGMMsg(pdelnode->params.param3, diff --git a/server/dbproxy/handlermgr.cc b/server/dbproxy/handlermgr.cc index 4f60b68..26feee9 100644 --- a/server/dbproxy/handlermgr.cc +++ b/server/dbproxy/handlermgr.cc @@ -6,6 +6,10 @@ #include "GSListener.h" #include "dbpool.h" #include "ss_proto.pb.h" +#include "jsondatamgr.h" +#include "app.h" + +#include "framework/cpp/dbpool.h" static void _GMOpsSelfChecking(f8::JsonHttpRequest* request) { @@ -28,7 +32,7 @@ void HandlerMgr::UnInit() void HandlerMgr::RegisterNetMsgHandlers() { RegisterNetMsgHandler(&gsmsghandler, &HandlerMgr::_SS_Ping); - RegisterNetMsgHandler(&gsmsghandler, &DBPool::_SS_GSM_ExecAsyncSql); + RegisterNetMsgHandler(&gsmsghandler, &HandlerMgr::_SS_GSM_ExecAsyncSql); } void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle, @@ -72,6 +76,75 @@ void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname, gmhandlers_[msgname] = handler; } +void HandlerMgr::_SS_GSM_ExecAsyncSql(f8::MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg) +{ + auto on_ok = [] (a8::XParams& param, const f8::DataSet* data_set) + { + ss::SS_DPM_ExecAsyncSql respmsg; + for (auto& row : *data_set) { + auto p = respmsg.mutable_data_set()->add_rows(); + for (auto& value : row) { + p->add_values(value); + } + } + respmsg.set_context_id(param.sender); + GSListener::Instance()->SendMsg(param.param1, respmsg); + }; + auto on_error = [] (a8::XParams& param, int error_code, const std::string& error_msg) + { + ss::SS_DPM_ExecAsyncSql respmsg; + respmsg.set_context_id(param.sender); + respmsg.set_error_code(error_code); + respmsg.set_error_msg(error_msg); + GSListener::Instance()->SendMsg(param.param1, respmsg); + }; + + if (msg.query_type() == 0) { + a8::MutableXObject* conn_info = a8::MutableXObject::NewObject(); + { + std::shared_ptr conn_cfg = JsonDataMgr::Instance()->GetMysqlConf(); + conn_cfg->DeepCopy(*conn_info); + } + conn_info->SetVal("database", a8::Format("gamedb%d_%d", {GAME_ID, App::Instance()->instance_id})); + f8::DBPool::Instance()->ExecAsyncQuery(*conn_info, + msg.sql().c_str(), + {}, + a8::XParams() + .SetSender(msg.context_id()) + .SetParam1(hdr.socket_handle), + on_ok, + on_error, + msg.hash_code() + ); + delete conn_info; + } else if (msg.query_type() == 1) { + a8::MutableXObject* conn_info = a8::MutableXObject::NewObject(); + { + std::shared_ptr conn_cfg = JsonDataMgr::Instance()->GetMysqlConf(); + conn_cfg->DeepCopy(*conn_info); + } + conn_info->SetVal("database", a8::Format("gamedb%d_%d", {GAME_ID, App::Instance()->instance_id})); + f8::DBPool::Instance()->ExecAsyncScript(*conn_info, + msg.sql().c_str(), + {}, + a8::XParams() + .SetSender(msg.context_id()) + .SetParam1(hdr.socket_handle), + on_ok, + on_error, + msg.hash_code() + ); + delete conn_info; + } else { + on_error(a8::XParams() + .SetSender(msg.context_id()) + .SetParam1(hdr.socket_handle), + -1, + "参数错误" + ); + } +} + void HandlerMgr::_SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg) { ss::SS_Pong pongmsg; diff --git a/server/dbproxy/handlermgr.h b/server/dbproxy/handlermgr.h index 7e7c8ca..c43943c 100644 --- a/server/dbproxy/handlermgr.h +++ b/server/dbproxy/handlermgr.h @@ -12,6 +12,7 @@ namespace a8 namespace ss { class SS_Ping; + class SS_GSM_ExecAsyncSql; } class HandlerMgr : public a8::Singleton @@ -35,6 +36,7 @@ class HandlerMgr : public a8::Singleton const std::string& url, const std::string& quyerstr); void _SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg); + void _SS_GSM_ExecAsyncSql(f8::MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg); private: void RegisterNetMsgHandlers(); diff --git a/server/dbproxy/jsondatamgr.cc b/server/dbproxy/jsondatamgr.cc index 58ba622..a282ef3 100644 --- a/server/dbproxy/jsondatamgr.cc +++ b/server/dbproxy/jsondatamgr.cc @@ -3,16 +3,23 @@ #include "jsondatamgr.h" #include "app.h" +#include "framework/cpp/utils.h" + void JsonDataMgr::Init() { std::string dbproxyserver_cluster_json_file; - if (getenv("is_dev_env")) { + std::string mysql_cluster_json_file; + if (!f8::IsOnlineEnv()) { dbproxyserver_cluster_json_file = a8::Format("/var/data/conf_test/game%d/dbproxy/game%d.dbproxy.cluster.json", {GAME_ID, GAME_ID}); + mysql_cluster_json_file = a8::Format("/var/data/conf_test/game%d/dbproxy/game%d.dbproxy.mysql.cluster.json", + {GAME_ID, GAME_ID}); } else { dbproxyserver_cluster_json_file = a8::Format("../config/game%d.dbproxy.cluster.json", {GAME_ID}); + mysql_cluster_json_file = a8::Format("../config/game%d.dbproxy.mysql.cluster.json", {GAME_ID}); } dbproxyserver_cluster_json_.ReadFromFile(dbproxyserver_cluster_json_file); + mysql_cluster_json_.ReadFromFile(mysql_cluster_json_file); } void JsonDataMgr::UnInit() @@ -26,3 +33,11 @@ std::shared_ptr JsonDataMgr::GetConf() } return dbproxyserver_cluster_json_[App::Instance()->instance_id - 1]; } + +std::shared_ptr JsonDataMgr::GetMysqlConf() +{ + if (App::Instance()->instance_id < 1 || App::Instance()->instance_id > mysql_cluster_json_.Size()) { + abort(); + } + return mysql_cluster_json_[App::Instance()->instance_id - 1]; +} diff --git a/server/dbproxy/jsondatamgr.h b/server/dbproxy/jsondatamgr.h index 5eb6c5d..363954a 100644 --- a/server/dbproxy/jsondatamgr.h +++ b/server/dbproxy/jsondatamgr.h @@ -11,7 +11,9 @@ class JsonDataMgr : public a8::Singleton void UnInit(); std::shared_ptr GetConf(); + std::shared_ptr GetMysqlConf(); private: a8::XObject dbproxyserver_cluster_json_; + a8::XObject mysql_cluster_json_; }; diff --git a/server/tools/protobuild/ss_proto.proto b/server/tools/protobuild/ss_proto.proto index 75c22c4..b9e006c 100644 --- a/server/tools/protobuild/ss_proto.proto +++ b/server/tools/protobuild/ss_proto.proto @@ -38,5 +38,4 @@ message SS_DPM_ExecAsyncSql optional int32 error_code = 3; //错误码 optional bytes error_msg = 4; //错误描述 optional MFDataSet data_set = 5; //数据集 -} - +} \ No newline at end of file diff --git a/server/tools/scripts/construct/build_pb.py b/server/tools/scripts/construct/build_pb.py index 7a7493a..6ddc7f4 100644 --- a/server/tools/scripts/construct/build_pb.py +++ b/server/tools/scripts/construct/build_pb.py @@ -18,7 +18,7 @@ def printp_stdout(p): if len(line) > 0: print(line, end = '') except Exception as e: - print('build_pb stdout error:' + e) + print('build_pb stdout error:' + str(e)) def printp_stderr(p): try: @@ -27,7 +27,7 @@ def printp_stderr(p): if len(line) > 0: print(line, end = '') except Exception as e: - print('build_pb stderr error:' + e) + print('build_pb stderr error:' + str(e)) def need_rebuild(): for proto_name in ('ss_proto', 'ss_msgid'): @@ -45,8 +45,8 @@ def rebuild(): p = subprocess.Popen( 'protoc --proto_path=../tools/protobuild --cpp_out=../dbproxy ../tools/protobuild/ss_proto.proto && ' + 'protoc --proto_path=../tools/protobuild --cpp_out=../dbproxy ../tools/protobuild/ss_msgid.proto ', - stdin = None, - stdout = None, + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, stderr = None, shell = True) t1 = threading.Thread(target = printp_stdout, args=(p, ))