diff --git a/server/dbproxy/handlermgr.cc b/server/dbproxy/handlermgr.cc index 50e1920..9513cb5 100644 --- a/server/dbproxy/handlermgr.cc +++ b/server/dbproxy/handlermgr.cc @@ -6,6 +6,10 @@ #include "GSListener.h" #include "dbpool.h" #include "ss_proto.pb.h" +#include "jsondatamgr.h" +#include "app.h" + +#include "framework/cpp/dbpool.h" static void _GMOpsSelfChecking(f8::JsonHttpRequest* request) { @@ -74,7 +78,63 @@ void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname, void HandlerMgr::_SS_GSM_ExecAsyncSql(f8::MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg) { + auto on_ok = [] (a8::XParams& param, const f8::DataSet* data_set) + { + ss::SS_DPM_ExecAsyncSql respmsg; + for (auto& row : *data_set) { + auto p = respmsg.mutable_data_set()->add_rows(); + for (auto& value : row) { + p->add_values(value); + } + } + respmsg.set_context_id(param.sender); + GSListener::Instance()->SendMsg(param.param1, respmsg); + }; + auto on_error = [] (a8::XParams& param, int error_code, const std::string& error_msg) + { + ss::SS_DPM_ExecAsyncSql respmsg; + respmsg.set_context_id(param.sender); + respmsg.set_error_code(error_code); + respmsg.set_error_msg(error_msg); + GSListener::Instance()->SendMsg(param.param1, respmsg); + }; + if (msg.query_type() == 0) { + a8::MutableXObject* conn_info = a8::MutableXObject::NewObject(); + conn_info->SetVal("database", a8::Format("gamedb%d_%d", {GAME_ID, App::Instance()->instance_id})); + f8::DBPool::Instance()->ExecAsyncQuery(*conn_info, + msg.sql().c_str(), + {}, + a8::XParams() + .SetSender(msg.context_id()) + .SetParam1(hdr.socket_handle), + on_ok, + on_error, + msg.hash_code() + ); + delete conn_info; + } else if (msg.query_type() == 1) { + a8::MutableXObject* conn_info = a8::MutableXObject::NewObject(); + conn_info->SetVal("database", a8::Format("gamedb%d_%d", {GAME_ID, App::Instance()->instance_id})); + f8::DBPool::Instance()->ExecAsyncScript(*conn_info, + msg.sql().c_str(), + {}, + a8::XParams() + .SetSender(msg.context_id()) + .SetParam1(hdr.socket_handle), + on_ok, + on_error, + msg.hash_code() + ); + delete conn_info; + } else { + on_error(a8::XParams() + .SetSender(msg.context_id()) + .SetParam1(hdr.socket_handle), + -1, + "参数错误" + ); + } } void HandlerMgr::_SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg) diff --git a/server/dbproxy/jsondatamgr.cc b/server/dbproxy/jsondatamgr.cc index 58ba622..75300d4 100644 --- a/server/dbproxy/jsondatamgr.cc +++ b/server/dbproxy/jsondatamgr.cc @@ -3,16 +3,23 @@ #include "jsondatamgr.h" #include "app.h" +#include "framework/cpp/utils.h" + void JsonDataMgr::Init() { std::string dbproxyserver_cluster_json_file; - if (getenv("is_dev_env")) { + std::string mysql_cluster_json_file; + if (!f8::IsOnlineEnv()) { dbproxyserver_cluster_json_file = a8::Format("/var/data/conf_test/game%d/dbproxy/game%d.dbproxy.cluster.json", {GAME_ID, GAME_ID}); + mysql_cluster_json_file = a8::Format("/var/data/conf_test/game%d/dbproxy/game%d.dbproxy.cluster.json", + {GAME_ID, GAME_ID}); } else { dbproxyserver_cluster_json_file = a8::Format("../config/game%d.dbproxy.cluster.json", {GAME_ID}); + mysql_cluster_json_file = a8::Format("../config/game%d.dbproxy.mysql.cluster.json", {GAME_ID}); } dbproxyserver_cluster_json_.ReadFromFile(dbproxyserver_cluster_json_file); + mysql_cluster_json_.ReadFromFile(mysql_cluster_json_file); } void JsonDataMgr::UnInit() @@ -26,3 +33,11 @@ std::shared_ptr JsonDataMgr::GetConf() } return dbproxyserver_cluster_json_[App::Instance()->instance_id - 1]; } + +std::shared_ptr JsonDataMgr::GetMysqlConf() +{ + if (App::Instance()->instance_id < 1 || App::Instance()->instance_id > mysql_cluster_json_.Size()) { + abort(); + } + return mysql_cluster_json_[App::Instance()->instance_id - 1]; +} diff --git a/server/dbproxy/jsondatamgr.h b/server/dbproxy/jsondatamgr.h index 5eb6c5d..363954a 100644 --- a/server/dbproxy/jsondatamgr.h +++ b/server/dbproxy/jsondatamgr.h @@ -11,7 +11,9 @@ class JsonDataMgr : public a8::Singleton void UnInit(); std::shared_ptr GetConf(); + std::shared_ptr GetMysqlConf(); private: a8::XObject dbproxyserver_cluster_json_; + a8::XObject mysql_cluster_json_; }; diff --git a/server/tools/protobuild/ss_proto.proto b/server/tools/protobuild/ss_proto.proto index b4de4cb..9732e01 100644 --- a/server/tools/protobuild/ss_proto.proto +++ b/server/tools/protobuild/ss_proto.proto @@ -12,7 +12,7 @@ message SS_Pong //行数据 message MFRow { - repeated string values = 1; //数据 + repeated bytes values = 1; //数据 } //数据集 @@ -27,16 +27,15 @@ message SS_GSM_ExecAsyncSql optional int32 exec_type = 1; //查询类型0: single 1:pool optional int32 query_type = 2; //查询类型0: execquery 1:execscript optional int64 context_id = 3; //上下文id - optional string sql = 4; //sql语句 + optional bytes sql = 4; //sql语句 optional int64 hash_code = 5; //hash值 } message SS_DPM_ExecAsyncSql { optional int64 context_id = 1; //上下文id - optional string sql = 2; //sql语句 + optional bytes sql = 2; //sql语句 optional int32 error_code = 3; //错误码 optional string error_msg = 4; //错误描述 optional MFDataSet data_set = 5; //数据集 -} - +} \ No newline at end of file diff --git a/server/tools/scripts/construct/build_pb.py b/server/tools/scripts/construct/build_pb.py index 7a7493a..6ddc7f4 100644 --- a/server/tools/scripts/construct/build_pb.py +++ b/server/tools/scripts/construct/build_pb.py @@ -18,7 +18,7 @@ def printp_stdout(p): if len(line) > 0: print(line, end = '') except Exception as e: - print('build_pb stdout error:' + e) + print('build_pb stdout error:' + str(e)) def printp_stderr(p): try: @@ -27,7 +27,7 @@ def printp_stderr(p): if len(line) > 0: print(line, end = '') except Exception as e: - print('build_pb stderr error:' + e) + print('build_pb stderr error:' + str(e)) def need_rebuild(): for proto_name in ('ss_proto', 'ss_msgid'): @@ -45,8 +45,8 @@ def rebuild(): p = subprocess.Popen( 'protoc --proto_path=../tools/protobuild --cpp_out=../dbproxy ../tools/protobuild/ss_proto.proto && ' + 'protoc --proto_path=../tools/protobuild --cpp_out=../dbproxy ../tools/protobuild/ss_msgid.proto ', - stdin = None, - stdout = None, + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, stderr = None, shell = True) t1 = threading.Thread(target = printp_stdout, args=(p, ))