添加心跳检查

This commit is contained in:
aozhiwei 2018-12-20 20:32:43 +08:00
parent 77ba240ddb
commit efc2143ed6
12 changed files with 83 additions and 61 deletions

View File

@ -13,6 +13,7 @@ include_directories(
/usr/include/python3.4m /usr/include/python3.4m
/usr/include/hiredis /usr/include/hiredis
../../third_party ../../third_party
.
) )
link_directories( link_directories(

View File

@ -21,10 +21,10 @@ public:
//packagelen + msgid + magiccode + msgbody //packagelen + msgid + magiccode + msgbody
//2 + 2 + 4+ xx + \0 + xx //2 + 2 + 4+ xx + \0 + xx
bool warning = false; bool warning = false;
while (buflen - offset >= sizeof(PackHead)) { while (buflen - offset >= sizeof(f8::PackHead)) {
PackHead* p = (PackHead*)&buf[offset]; f8::PackHead* p = (f8::PackHead*)&buf[offset];
if (p->magic_code == MAGIC_CODE) { if (p->magic_code == f8::MAGIC_CODE) {
if (buflen - offset < sizeof(PackHead) + p->packlen) { if (buflen - offset < sizeof(f8::PackHead) + p->packlen) {
break; break;
} }
App::Instance()->AddSocketMsg(SF_Client, App::Instance()->AddSocketMsg(SF_Client,
@ -32,9 +32,9 @@ public:
saddr, saddr,
p->msgid, p->msgid,
p->seqid, p->seqid,
&buf[offset + sizeof(PackHead)], &buf[offset + sizeof(f8::PackHead)],
p->packlen); p->packlen);
offset += sizeof(PackHead) + p->packlen; offset += sizeof(f8::PackHead) + p->packlen;
} else { } else {
warning = true; warning = true;
offset++; offset++;
@ -54,7 +54,8 @@ public:
a8::XParams() a8::XParams()
.SetSender(socket_handle) .SetSender(socket_handle)
.SetParam1(url) .SetParam1(url)
.SetParam2(querystr)); .SetParam2(querystr)
.SetParam3(saddr));
} }
virtual void OnDisConnect() override virtual void OnDisConnect() override
@ -94,20 +95,20 @@ void GCListener::UnInit()
tcp_listener_ = nullptr; tcp_listener_ = nullptr;
} }
void GCListener::ForwardTargetConnMsg(MsgHdr& hdr) void GCListener::ForwardTargetConnMsg(f8::MsgHdr& hdr)
{ {
char* buff = (char*)malloc(sizeof(PackHead) + hdr.buflen); char* buff = (char*)malloc(sizeof(f8::PackHead) + hdr.buflen);
PackHead* head = (PackHead*)buff; f8::PackHead* head = (f8::PackHead*)buff;
head->packlen = hdr.buflen; head->packlen = hdr.buflen;
head->msgid = hdr.msgid; head->msgid = hdr.msgid;
head->seqid = hdr.seqid; head->seqid = hdr.seqid;
head->magic_code = MAGIC_CODE; head->magic_code = f8::MAGIC_CODE;
head->rpc_error_code = 0; head->rpc_error_code = 0;
if (hdr.buflen > 0) { if (hdr.buflen > 0) {
memmove(buff + sizeof(PackHead), hdr.buf, hdr.buflen); memmove(buff + sizeof(f8::PackHead), hdr.buf, hdr.buflen);
} }
tcp_listener_->SendClientMsg(hdr.socket_handle, buff, sizeof(PackHead) + head->packlen); tcp_listener_->SendClientMsg(hdr.socket_handle, buff, sizeof(f8::PackHead) + head->packlen);
free(buff); free(buff);
} }

View File

@ -22,11 +22,11 @@ class GCListener : public a8::Singleton<GCListener>
template <typename T> template <typename T>
void SendMsg(unsigned short socket_handle, T& msg) void SendMsg(unsigned short socket_handle, T& msg)
{ {
static int msgid = ::Net_GetMessageId(msg); static int msgid = f8::Net_GetMessageId(msg);
Net_SendMsg(tcp_listener_, socket_handle, 0, msgid, msg); f8::Net_SendMsg(tcp_listener_, socket_handle, 0, msgid, msg);
} }
void ForwardTargetConnMsg(MsgHdr& hdr); void ForwardTargetConnMsg(f8::MsgHdr& hdr);
void SendText(unsigned short sockhandle, const std::string& text); void SendText(unsigned short sockhandle, const std::string& text);
void ForceCloseClient(unsigned short sockhandle); void ForceCloseClient(unsigned short sockhandle);

View File

@ -278,7 +278,7 @@ void App::DispatchMsg()
msg_mutex_->unlock(); msg_mutex_->unlock();
} }
MsgHdr hdr; f8::MsgHdr hdr;
while (work_node_) { while (work_node_) {
MsgNode *pdelnode = work_node_; MsgNode *pdelnode = work_node_;
work_node_ = pdelnode->next; work_node_ = pdelnode->next;
@ -316,7 +316,7 @@ void App::DispatchMsg()
} }
} }
void App::ProcessClientMsg(MsgHdr& hdr) void App::ProcessClientMsg(f8::MsgHdr& hdr)
{ {
if (hdr.msgid < 100) { if (hdr.msgid < 100) {
return; return;
@ -346,7 +346,7 @@ void App::ProcessClientMsg(MsgHdr& hdr)
} }
} }
void App::ProcessTargetServerMsg(MsgHdr& hdr) void App::ProcessTargetServerMsg(f8::MsgHdr& hdr)
{ {
if (hdr.msgid < 100) { if (hdr.msgid < 100) {
return; return;
@ -382,7 +382,8 @@ void App::ProcessIMMsg()
break; break;
case IM_ExecGM: case IM_ExecGM:
{ {
HandlerMgr::Instance()->ProcGMMsg(pdelnode->params.sender, HandlerMgr::Instance()->ProcGMMsg(pdelnode->params.param3,
pdelnode->params.sender,
pdelnode->params.param1.GetString(), pdelnode->params.param1.GetString(),
pdelnode->params.param2.GetString() pdelnode->params.param2.GetString()
); );

View File

@ -41,8 +41,8 @@ private:
void DispatchMsg(); void DispatchMsg();
void ProcessIMMsg(); void ProcessIMMsg();
void ProcessClientMsg(MsgHdr& hdr); void ProcessClientMsg(f8::MsgHdr& hdr);
void ProcessTargetServerMsg(MsgHdr& hdr); void ProcessTargetServerMsg(f8::MsgHdr& hdr);
void InitLog(); void InitLog();
void UnInitLog(); void UnInitLog();

View File

@ -6,17 +6,26 @@
#include "GCListener.h" #include "GCListener.h"
void _GMAppEcho(a8::HTTPRequest& request, a8::MutableXObject* xobj) static void _GMAppEcho(f8::JsonHttpRequest* request)
{ {
xobj->SetVal("error_code", 1); request->resp_xobj->SetVal("error_code", 1);
xobj->SetVal("error_msg", ""); request->resp_xobj->SetVal("error_msg", "");
xobj->SetVal("error_msg", a8::Get(request, "msg")); request->resp_xobj->SetVal("error_msg", request->request.Get("msg"));
}
static void _GMOpsSelfChecking(f8::JsonHttpRequest* request)
{
request->resp_xobj->SetVal("errcode", 0);
request->resp_xobj->SetVal("errmsg", "");
request->resp_xobj->SetVal("healthy", 1);
request->resp_xobj->SetVal("max_rundelay", 10);
} }
void HandlerMgr::Init() void HandlerMgr::Init()
{ {
RegisterNetMsgHandlers(); RegisterNetMsgHandlers();
RegisterGMMsgHandler("app$echo", _GMAppEcho); RegisterGMMsgHandler("App$echo", _GMAppEcho);
RegisterGMMsgHandler("Ops$selfChecking", _GMOpsSelfChecking);
} }
void HandlerMgr::UnInit() void HandlerMgr::UnInit()
@ -27,9 +36,10 @@ void HandlerMgr::RegisterNetMsgHandlers()
{ {
} }
void HandlerMgr::ProcGMMsg(int sockhandle, const std::string& url, const std::string& querystr) void HandlerMgr::ProcGMMsg(unsigned long saddr, int sockhandle,
const std::string& url, const std::string& querystr)
{ {
if (url != "/index.php") { if (url != "/webapp/index.php") {
GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(404, "")); GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(404, ""));
return; return;
} }
@ -37,24 +47,30 @@ void HandlerMgr::ProcGMMsg(int sockhandle, const std::string& url, const std::st
a8::HTTPRequest request; a8::HTTPRequest request;
a8::ParserUrlQueryString(querystr.c_str(), request); a8::ParserUrlQueryString(querystr.c_str(), request);
std::string msgname = a8::Get(request, "c").GetString() + "$" + a8::Get(request, "a").GetString(); std::string msgname = a8::Get(request, "c").GetString() + "@" + a8::Get(request, "a").GetString();
auto itr = gmhandlers_.find(msgname); auto itr = gmhandlers_.find(msgname);
if (itr != gmhandlers_.end()) { if (itr != gmhandlers_.end()) {
a8::MutableXObject* xobj = a8::MutableXObject::NewObject(); f8::JsonHttpRequest* request = new f8::JsonHttpRequest;
itr->second(request, xobj); request->saddr = saddr;
request->socket_handle = sockhandle;
request->query_str = querystr;
request->request.ReadFromUrlQueryString(querystr);
itr->second(request);
if (!request->pending){
std::string response; std::string response;
xobj->ToJsonStr(response); request->resp_xobj->ToJsonStr(response);
GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(response)); GCListener::Instance()->SendText(sockhandle, a8::HttpResponse(response));
delete xobj; delete request;
}
} else { } else {
GCListener::Instance()->SendText(sockhandle, a8::HttpResponse("{}")); GCListener::Instance()->SendText(sockhandle, a8::HttpResponse("{}"));
} }
} }
void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname, void HandlerMgr::RegisterGMMsgHandler(const std::string& msgname,
void (*handler)(a8::HTTPRequest&, a8::MutableXObject*)) void (*handler)(f8::JsonHttpRequest*))
{ {
gmhandlers_[msgname] = handler; gmhandlers_[msgname] = handler;
} }

View File

@ -21,16 +21,17 @@ class HandlerMgr : public a8::Singleton<HandlerMgr>
void Init(); void Init();
void UnInit(); void UnInit();
NetMsgHandlerObject gcmsghandler; f8::NetMsgHandlerObject gcmsghandler;
NetMsgHandlerObject rsmsghandler; f8::NetMsgHandlerObject rsmsghandler;
NetMsgHandlerObject gsmsghandler; f8::NetMsgHandlerObject gsmsghandler;
void ProcGMMsg(int sockhandle, const std::string& url, const std::string& quyerstr); void ProcGMMsg(unsigned long saddr, int sockhandle,
const std::string& url, const std::string& querystr);
private: private:
void RegisterNetMsgHandlers(); void RegisterNetMsgHandlers();
void RegisterGMMsgHandler(const std::string& msgname, void RegisterGMMsgHandler(const std::string& msgname,
void (*)(a8::HTTPRequest&, a8::MutableXObject*)); void (*)(f8::JsonHttpRequest*));
std::map<std::string, void (*)(a8::HTTPRequest&, a8::MutableXObject*)> gmhandlers_; std::map<std::string, void (*)(f8::JsonHttpRequest*)> gmhandlers_;
}; };

View File

@ -15,4 +15,6 @@ namespace google
} }
} }
#include "framework/cpp/types.h"
#include "framework/cpp/utils.h"
#include "framework/cpp/protoutils.h" #include "framework/cpp/protoutils.h"

View File

@ -64,14 +64,14 @@ bool TargetConn::Connected()
return tcp_client_->Connected(); return tcp_client_->Connected();
} }
void TargetConn::ForwardClientMsg(MsgHdr& hdr) void TargetConn::ForwardClientMsg(f8::MsgHdr& hdr)
{ {
char* buff = (char*)malloc(sizeof(WSProxyPackHead_C) + hdr.buflen); char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
WSProxyPackHead_C* head = (WSProxyPackHead_C*)buff; f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff;
head->packlen = hdr.buflen; head->packlen = hdr.buflen;
head->msgid = hdr.msgid; head->msgid = hdr.msgid;
head->seqid = hdr.seqid; head->seqid = hdr.seqid;
head->magic_code = MAGIC_CODE; head->magic_code = f8::MAGIC_CODE;
#if 0 #if 0
head->rpc_error_code = 0; head->rpc_error_code = 0;
#endif #endif
@ -79,10 +79,10 @@ void TargetConn::ForwardClientMsg(MsgHdr& hdr)
head->ip_saddr = hdr.ip_saddr; head->ip_saddr = hdr.ip_saddr;
if (hdr.buflen > 0) { if (hdr.buflen > 0) {
memmove(buff + sizeof(WSProxyPackHead_C), hdr.buf, hdr.buflen); memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen);
} }
tcp_client_->SendBuff(buff, sizeof(WSProxyPackHead_C) + head->packlen); tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen);
free(buff); free(buff);
} }
@ -123,10 +123,10 @@ void TargetConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int le
bool warning = false; bool warning = false;
unsigned int offset = 0; unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(WSProxyPackHead_S)) { while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) {
WSProxyPackHead_S* p = (WSProxyPackHead_S*) &recv_buff_[offset]; f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset];
if (p->magic_code == MAGIC_CODE) { if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(PackHead) + p->packlen) { if (recv_bufflen_ - offset < sizeof(f8::PackHead) + p->packlen) {
break; break;
} }
App::Instance()->AddSocketMsg(SF_TargetServer, App::Instance()->AddSocketMsg(SF_TargetServer,
@ -134,9 +134,9 @@ void TargetConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int le
instance_id, instance_id,
p->msgid, p->msgid,
p->seqid, p->seqid,
&recv_buff_[offset + sizeof(WSProxyPackHead_S)], &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)],
p->packlen); p->packlen);
offset += sizeof(PackHead) + p->packlen; offset += sizeof(f8::PackHead) + p->packlen;
} else { } else {
warning = true; warning = true;
offset++; offset++;

View File

@ -28,15 +28,15 @@ class TargetConn
template <typename T> template <typename T>
void SendMsg(T& msg) void SendMsg(T& msg)
{ {
static int msgid = ::Net_GetMessageId(msg); static int msgid = f8::Net_GetMessageId(msg);
#if 1 #if 1
Net_SendProxyCMsg(tcp_client_, msgid, msg); f8::Net_SendProxyCMsg(tcp_client_, msgid, msg);
#else #else
Net_SendMsg(tcp_client_, 0, msgid, msg); f8::Net_SendMsg(tcp_client_, 0, msgid, msg);
#endif #endif
} }
void ForwardClientMsg(MsgHdr& hdr); void ForwardClientMsg(f8::MsgHdr& hdr);
private: private:
void on_error(a8::TcpClient* sender, int errorId); void on_error(a8::TcpClient* sender, int errorId);

@ -1 +1 @@
Subproject commit 52552756404c7047c4438cf11425299c88cb74a7 Subproject commit 58c82113e3ab51acaf44fda66efd30f8e326e394

@ -1 +1 @@
Subproject commit 2dafab9662fe759a442a7ef11ead0bf5885b433f Subproject commit 6614d9299ac02b3f8b12f0921a660341a7a7ccfe