dbproxy/server/dbproxy/handlermgr.cc
aozhiwei 78010ef5fc 1
2019-03-04 20:27:53 +08:00

153 lines
5.7 KiB
C++

#include "precompile.h"
#include <a8/mutable_xobject.h>
#include "handlermgr.h"
#include "GSListener.h"
#include "dbpool.h"
#include "ss_proto.pb.h"
#include "jsondatamgr.h"
#include "app.h"
#include "framework/cpp/dbpool.h"
static void _GMOpsSelfChecking(f8::JsonHttpRequest* request)
{
request->resp_xobj->SetVal("errcode", 1);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("healthy", 1);
request->resp_xobj->SetVal("max_rundelay", 10);
}
void HandlerMgr::Init()
{
RegisterNetMsgHandlers();
RegisterGMMsgHandler("Ops@selfChecking", _GMOpsSelfChecking);
}
void HandlerMgr::UnInit()
{
}
void HandlerMgr::RegisterNetMsgHandlers()
{
RegisterNetMsgHandler(&gsmsghandler, &HandlerMgr::_SS_Ping);
RegisterNetMsgHandler(&gsmsghandler, &HandlerMgr::_SS_GSM_ExecAsyncSql);
}
void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle,
const std::string& url, const std::string& querystr)
{
if (url != "/webapp/index.php") {
GSListener::Instance()->SendText(sockhandle, a8::HttpResponse(404, ""));
return;
}
a8::HTTPRequest request;
a8::ParserUrlQueryString(querystr.c_str(), request);
std::string msgname = a8::Get(request, "c").GetString() + "@" + a8::Get(request, "a").GetString();
auto itr = gmhandlers_.find(msgname);
if (itr != gmhandlers_.end()) {
f8::JsonHttpRequest* request = new f8::JsonHttpRequest;
request->saddr = saddr;
request->socket_handle = sockhandle;
request->query_str = querystr;
request->request.ReadFromUrlQueryString(querystr);
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
itr->second(request);
if (!request->pending){
std::string response;
request->resp_xobj->ToJsonStr(response);
GSListener::Instance()->SendText(sockhandle, a8::HttpResponse(response));
delete request;
}
} else {
GSListener::Instance()->SendText(sockhandle, a8::HttpResponse("{}"));
}
}
void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname,
void (*handler)(f8::JsonHttpRequest*))
{
gmhandlers_[msgname] = handler;
}
void HandlerMgr::_SS_GSM_ExecAsyncSql(f8::MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg)
{
auto on_ok = [] (a8::XParams& param, const f8::DataSet* data_set)
{
ss::SS_DPM_ExecAsyncSql respmsg;
for (auto& row : *data_set) {
auto p = respmsg.mutable_data_set()->add_rows();
for (auto& value : row) {
p->add_values(value);
}
}
respmsg.set_context_id(param.sender);
GSListener::Instance()->SendMsg(param.param1, respmsg);
};
auto on_error = [] (a8::XParams& param, int error_code, const std::string& error_msg)
{
ss::SS_DPM_ExecAsyncSql respmsg;
respmsg.set_context_id(param.sender);
respmsg.set_error_code(error_code);
respmsg.set_error_msg(error_msg);
GSListener::Instance()->SendMsg(param.param1, respmsg);
};
if (msg.query_type() == 0) {
a8::MutableXObject* conn_info = a8::MutableXObject::NewObject();
{
std::shared_ptr<a8::XObject> conn_cfg = JsonDataMgr::Instance()->GetMysqlConf();
conn_cfg->DeepCopy(*conn_info);
}
conn_info->SetVal("database", a8::Format("gamedb%d_%d", {GAME_ID, App::Instance()->instance_id}));
f8::DBPool::Instance()->ExecAsyncQuery(*conn_info,
msg.sql().c_str(),
{},
a8::XParams()
.SetSender(msg.context_id())
.SetParam1(hdr.socket_handle),
on_ok,
on_error,
msg.hash_code()
);
delete conn_info;
} else if (msg.query_type() == 1) {
a8::MutableXObject* conn_info = a8::MutableXObject::NewObject();
{
std::shared_ptr<a8::XObject> conn_cfg = JsonDataMgr::Instance()->GetMysqlConf();
conn_cfg->DeepCopy(*conn_info);
}
conn_info->SetVal("database", a8::Format("gamedb%d_%d", {GAME_ID, App::Instance()->instance_id}));
f8::DBPool::Instance()->ExecAsyncScript(*conn_info,
msg.sql().c_str(),
{},
a8::XParams()
.SetSender(msg.context_id())
.SetParam1(hdr.socket_handle),
on_ok,
on_error,
msg.hash_code()
);
delete conn_info;
} else {
on_error(a8::XParams()
.SetSender(msg.context_id())
.SetParam1(hdr.socket_handle),
-1,
"参数错误"
);
}
}
void HandlerMgr::_SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg)
{
ss::SS_Pong pongmsg;
GSListener::Instance()->SendMsg(hdr.socket_handle, pongmsg);
}