This commit is contained in:
zhulongjun 2019-03-04 21:05:20 +08:00
commit 76aa2999c2
7 changed files with 116 additions and 12 deletions

View File

@ -11,6 +11,8 @@
#include <a8/uuid.h>
#include "framework/cpp/netmsghandler.h"
#include "framework/cpp/msgqueue.h"
#include "framework/cpp/dbpool.h"
#include "app.h"
#include "GSListener.h"
@ -18,7 +20,6 @@
#include "handlermgr.h"
#include "ss_msgid.pb.h"
#include "ss_proto.pb.h"
#include "dbpool.h"
struct MsgNode
{
@ -86,8 +87,10 @@ void App::Init(int argc, char* argv[])
InitLog();
HandlerMgr::Instance()->Init();
a8::Timer::Instance()->Init();
f8::MsgQueue::Instance()->Init();
JsonDataMgr::Instance()->Init();
DBPool::Instance()->Init();
f8::DBPool::Instance()->Init();
f8::DBPool::Instance()->SetThreadNum(10);
GSListener::Instance()->Init();
uuid.SetMachineId(instance_id);
@ -112,8 +115,9 @@ void App::UnInit()
return;
}
GSListener::Instance()->UnInit();
DBPool::Instance()->UnInit();
f8::DBPool::Instance()->UnInit();
JsonDataMgr::Instance()->UnInit();
f8::MsgQueue::Instance()->UnInit();
a8::Timer::Instance()->UnInit();
HandlerMgr::Instance()->UnInit();
UnInitLog();
@ -320,7 +324,7 @@ void App::ProcessGSMsg(f8::MsgHdr& hdr)
break;
case HID_DBPool:
{
ProcessNetMsg(handler, DBPool::Instance(), hdr);
// ProcessNetMsg(handler, DBPool::Instance(), hdr);
}
break;
}
@ -339,6 +343,15 @@ void App::ProcessIMMsg()
while (im_work_node_) {
IMMsgNode *pdelnode = im_work_node_;
switch (im_work_node_->msgid) {
case f8::IM_SysMsgQueue:
{
const a8::XParams* param = (const a8::XParams*)pdelnode->params.param1.GetUserData();
f8::MsgQueue::Instance()->ProcessMsg(pdelnode->params.sender.GetInt(),
*param
);
delete param;
}
break;
case IM_ExecGM:
{
HandlerMgr::Instance()->ProcGMMsg(pdelnode->params.param3,

View File

@ -6,6 +6,10 @@
#include "GSListener.h"
#include "dbpool.h"
#include "ss_proto.pb.h"
#include "jsondatamgr.h"
#include "app.h"
#include "framework/cpp/dbpool.h"
static void _GMOpsSelfChecking(f8::JsonHttpRequest* request)
{
@ -28,7 +32,7 @@ void HandlerMgr::UnInit()
void HandlerMgr::RegisterNetMsgHandlers()
{
RegisterNetMsgHandler(&gsmsghandler, &HandlerMgr::_SS_Ping);
RegisterNetMsgHandler(&gsmsghandler, &DBPool::_SS_GSM_ExecAsyncSql);
RegisterNetMsgHandler(&gsmsghandler, &HandlerMgr::_SS_GSM_ExecAsyncSql);
}
void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle,
@ -72,6 +76,75 @@ void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname,
gmhandlers_[msgname] = handler;
}
void HandlerMgr::_SS_GSM_ExecAsyncSql(f8::MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg)
{
auto on_ok = [] (a8::XParams& param, const f8::DataSet* data_set)
{
ss::SS_DPM_ExecAsyncSql respmsg;
for (auto& row : *data_set) {
auto p = respmsg.mutable_data_set()->add_rows();
for (auto& value : row) {
p->add_values(value);
}
}
respmsg.set_context_id(param.sender);
GSListener::Instance()->SendMsg(param.param1, respmsg);
};
auto on_error = [] (a8::XParams& param, int error_code, const std::string& error_msg)
{
ss::SS_DPM_ExecAsyncSql respmsg;
respmsg.set_context_id(param.sender);
respmsg.set_error_code(error_code);
respmsg.set_error_msg(error_msg);
GSListener::Instance()->SendMsg(param.param1, respmsg);
};
if (msg.query_type() == 0) {
a8::MutableXObject* conn_info = a8::MutableXObject::NewObject();
{
std::shared_ptr<a8::XObject> conn_cfg = JsonDataMgr::Instance()->GetMysqlConf();
conn_cfg->DeepCopy(*conn_info);
}
conn_info->SetVal("database", a8::Format("gamedb%d_%d", {GAME_ID, App::Instance()->instance_id}));
f8::DBPool::Instance()->ExecAsyncQuery(*conn_info,
msg.sql().c_str(),
{},
a8::XParams()
.SetSender(msg.context_id())
.SetParam1(hdr.socket_handle),
on_ok,
on_error,
msg.hash_code()
);
delete conn_info;
} else if (msg.query_type() == 1) {
a8::MutableXObject* conn_info = a8::MutableXObject::NewObject();
{
std::shared_ptr<a8::XObject> conn_cfg = JsonDataMgr::Instance()->GetMysqlConf();
conn_cfg->DeepCopy(*conn_info);
}
conn_info->SetVal("database", a8::Format("gamedb%d_%d", {GAME_ID, App::Instance()->instance_id}));
f8::DBPool::Instance()->ExecAsyncScript(*conn_info,
msg.sql().c_str(),
{},
a8::XParams()
.SetSender(msg.context_id())
.SetParam1(hdr.socket_handle),
on_ok,
on_error,
msg.hash_code()
);
delete conn_info;
} else {
on_error(a8::XParams()
.SetSender(msg.context_id())
.SetParam1(hdr.socket_handle),
-1,
"参数错误"
);
}
}
void HandlerMgr::_SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg)
{
ss::SS_Pong pongmsg;

View File

@ -12,6 +12,7 @@ namespace a8
namespace ss
{
class SS_Ping;
class SS_GSM_ExecAsyncSql;
}
class HandlerMgr : public a8::Singleton<HandlerMgr>
@ -35,6 +36,7 @@ class HandlerMgr : public a8::Singleton<HandlerMgr>
const std::string& url, const std::string& quyerstr);
void _SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg);
void _SS_GSM_ExecAsyncSql(f8::MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg);
private:
void RegisterNetMsgHandlers();

View File

@ -3,16 +3,23 @@
#include "jsondatamgr.h"
#include "app.h"
#include "framework/cpp/utils.h"
void JsonDataMgr::Init()
{
std::string dbproxyserver_cluster_json_file;
if (getenv("is_dev_env")) {
std::string mysql_cluster_json_file;
if (!f8::IsOnlineEnv()) {
dbproxyserver_cluster_json_file = a8::Format("/var/data/conf_test/game%d/dbproxy/game%d.dbproxy.cluster.json",
{GAME_ID, GAME_ID});
mysql_cluster_json_file = a8::Format("/var/data/conf_test/game%d/dbproxy/game%d.dbproxy.mysql.cluster.json",
{GAME_ID, GAME_ID});
} else {
dbproxyserver_cluster_json_file = a8::Format("../config/game%d.dbproxy.cluster.json", {GAME_ID});
mysql_cluster_json_file = a8::Format("../config/game%d.dbproxy.mysql.cluster.json", {GAME_ID});
}
dbproxyserver_cluster_json_.ReadFromFile(dbproxyserver_cluster_json_file);
mysql_cluster_json_.ReadFromFile(mysql_cluster_json_file);
}
void JsonDataMgr::UnInit()
@ -26,3 +33,11 @@ std::shared_ptr<a8::XObject> JsonDataMgr::GetConf()
}
return dbproxyserver_cluster_json_[App::Instance()->instance_id - 1];
}
std::shared_ptr<a8::XObject> JsonDataMgr::GetMysqlConf()
{
if (App::Instance()->instance_id < 1 || App::Instance()->instance_id > mysql_cluster_json_.Size()) {
abort();
}
return mysql_cluster_json_[App::Instance()->instance_id - 1];
}

View File

@ -11,7 +11,9 @@ class JsonDataMgr : public a8::Singleton<JsonDataMgr>
void UnInit();
std::shared_ptr<a8::XObject> GetConf();
std::shared_ptr<a8::XObject> GetMysqlConf();
private:
a8::XObject dbproxyserver_cluster_json_;
a8::XObject mysql_cluster_json_;
};

View File

@ -38,5 +38,4 @@ message SS_DPM_ExecAsyncSql
optional int32 error_code = 3; //
optional bytes error_msg = 4; //
optional MFDataSet data_set = 5; //
}
}

View File

@ -18,7 +18,7 @@ def printp_stdout(p):
if len(line) > 0:
print(line, end = '')
except Exception as e:
print('build_pb stdout error:' + e)
print('build_pb stdout error:' + str(e))
def printp_stderr(p):
try:
@ -27,7 +27,7 @@ def printp_stderr(p):
if len(line) > 0:
print(line, end = '')
except Exception as e:
print('build_pb stderr error:' + e)
print('build_pb stderr error:' + str(e))
def need_rebuild():
for proto_name in ('ss_proto', 'ss_msgid'):
@ -45,8 +45,8 @@ def rebuild():
p = subprocess.Popen(
'protoc --proto_path=../tools/protobuild --cpp_out=../dbproxy ../tools/protobuild/ss_proto.proto && ' +
'protoc --proto_path=../tools/protobuild --cpp_out=../dbproxy ../tools/protobuild/ss_msgid.proto ',
stdin = None,
stdout = None,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = None,
shell = True)
t1 = threading.Thread(target = printp_stdout, args=(p, ))