This commit is contained in:
aozhiwei 2020-05-03 15:43:50 +08:00
parent 106de8fcc1
commit 99a165eb3f
9 changed files with 836 additions and 0 deletions

267
server/imserver/IMConn.cc Normal file
View File

@ -0,0 +1,267 @@
#include "precompile.h"
#include <string.h>
#include "ss_proto.pb.h"
#include "ss_msgid.pb.h"
#include "IMConn.h"
#include <a8/tcpclient.h>
#include <a8/udplog.h>
#include <a8/timer.h>
#include <a8/ioloop.h>
#include <a8/asynctcpclient.h>
#include "app.h"
const int PACK_MAX = 1024 * 64 * 2;
void IMConn::Init(int instance_id, const std::string& remote_ip, int remote_port)
{
if (remote_ip.empty()) {
abort();
}
this->instance_id = instance_id;
this->remote_ip = remote_ip;
this->remote_port = remote_port;
recv_bufflen_ = 0;
last_pong_tick = a8::XGetTickCount();
recv_buff_ = (char*) malloc(PACK_MAX * 2);
tcp_client_ = a8::IoLoop::Instance()->CreateAsyncTcpClient();
tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&IMConn::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&IMConn::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&IMConn::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&IMConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150),
a8::XParams().SetSender(this),
[] (const a8::XParams& param)
{
IMConn* conn = (IMConn*)param.sender.GetUserData();
conn->CheckAlive();
});
}
void IMConn::UnInit()
{
IMConnMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
IMConnMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
delete pdelnode->msg;
delete pdelnode;
}
a8::Timer::Instance()->DeleteTimer(timer_);
timer_ = nullptr;
tcp_client_->Close();
a8::IoLoop::Instance()->DestoryAsyncTcpClient(tcp_client_);
tcp_client_ = nullptr;
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
}
void IMConn::Open()
{
tcp_client_->Open();
}
void IMConn::Close()
{
tcp_client_->Close();
}
bool IMConn::Connected()
{
return tcp_client_->Connected();
}
void IMConn::SendStockMsg()
{
IMConnMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
IMConnMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
if (pdelnode->msg) {
f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg);
delete pdelnode->msg;
}
if (pdelnode->hdr) {
ForwardClientMsg(*pdelnode->hdr);
if (pdelnode->hdr->buf) {
free((char*)pdelnode->hdr->buf);
}
free(pdelnode->hdr);
}
delete pdelnode;
}
}
void IMConn::ForwardClientMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
memset(buff, 0, sizeof(f8::WSProxyPackHead_C));
f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
#if 0
head->rpc_error_code = 0;
#endif
head->socket_handle = hdr.socket_handle;
head->ip_saddr = hdr.ip_saddr;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen);
}
tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen);
free(buff);
}
void IMConn::ForwardClientMsgEx(f8::MsgHdr* hdr)
{
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
ForwardClientMsg(*hdr);
if (hdr->buf) {
free((char*)hdr->buf);
}
free(hdr);
} else {
AddStockMsg(hdr->socket_handle, 0, nullptr, hdr);
}
}
void IMConn::on_error(a8::AsyncTcpClient* sender, int errorId)
{
a8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d",
{
errorId,
sender->remote_address,
sender->remote_port
});
}
void IMConn::on_connect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d",
{
sender->remote_address,
sender->remote_port
});
App::Instance()->AddIMMsg(IM_IMConnConnect,
a8::XParams()
.SetSender(instance_id)
);
}
void IMConn::on_disconnect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect "
"remote_ip:%s remote_port:%d",
{
instance_id,
sender->remote_address,
sender->remote_port
});
App::Instance()->AddIMMsg(IM_IMConnDisconnect,
a8::XParams()
.SetSender(instance_id)
);
}
void IMConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len)
{
#if 0
++App::Instance()->perf.read_count;
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Debug("recvied target server too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
recv_bufflen_ += len;
}
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) {
f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset];
int real_len = p->packlen + (p->ext_len << 16);
if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) {
break;
}
App::Instance()->AddSocketMsg(SF_TargetServer,
p->socket_handle,
instance_id,
p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)],
real_len);
offset += sizeof(f8::WSProxyPackHead_S) + real_len;
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
a8::UdpLog::Instance()->Debug("recvied bad package", {});
}
if (offset > 0 && offset < recv_bufflen_) {
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
#if 1
last_pong_tick = a8::XGetTickCount();
#endif
}
void IMConn::CheckAlive()
{
if (!Connected()) {
Open();
} else {
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
last_pong_tick = a8::XGetTickCount();
Open();
} else {
ss::SS_Ping msg;
SendMsg(0, msg);
}
}
}
void IMConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr)
{
IMConnMsgNode* node = new IMConnMsgNode();
node->socket_handle = socket_handle;
node->msgid = msgid;
node->msg = msg;
node->hdr = hdr;
if (bot_node_) {
bot_node_->next_node = node;
bot_node_ = node;
} else {
top_node_ = node;
bot_node_ = node;
}
}

76
server/imserver/IMConn.h Normal file
View File

@ -0,0 +1,76 @@
#pragma once
#include "framework/cpp/protoutils.h"
namespace a8
{
class TcpClient;
class AsyncTcpClient;
}
struct IMConnMsgNode
{
unsigned short socket_handle = 0;
int msgid = 0;
::google::protobuf::Message* msg = nullptr;
f8::MsgHdr* hdr = nullptr;
IMConnMsgNode* next_node = nullptr;
};
struct timer_list;
class IMConn
{
public:
int instance_id = 0;
std::string remote_ip;
int remote_port = 0;
a8::tick_t last_pong_tick = 0;
public:
void Init(int instance_id, const std::string& remote_ip, int remote_port);
void UnInit();
void Open();
void Close();
bool Connected();
template <typename T>
void SendMsg(int socket_handle, T& msg)
{
static int msgid = f8::Net_GetMessageId(msg);
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg);
} else {
T* new_msg = new T();
*new_msg = msg;
AddStockMsg(socket_handle, msgid, new_msg, nullptr);
}
}
void SendStockMsg();
void ForwardClientMsg(f8::MsgHdr& hdr);
void ForwardClientMsgEx(f8::MsgHdr* hdr);
private:
void on_error(a8::AsyncTcpClient* sender, int errorId);
void on_connect(a8::AsyncTcpClient* sender);
void on_disconnect(a8::AsyncTcpClient* sender);
void on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len);
void CheckAlive();
void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr);
private:
char *recv_buff_ = nullptr;
unsigned int recv_bufflen_ = 0;
a8::AsyncTcpClient* tcp_client_ = nullptr;
timer_list* timer_ = nullptr;
IMConnMsgNode* top_node_ = nullptr;
IMConnMsgNode* bot_node_ = nullptr;
};

View File

@ -0,0 +1,50 @@
#include "precompile.h"
#include "IMConnMgr.h"
#include "IMConn.h"
#include "jsondatamgr.h"
#include "app.h"
void IMConnMgr::Init()
{
}
void IMConnMgr::UnInit()
{
for (auto& pair : id_hash_) {
pair.second->UnInit();
delete pair.second;
}
}
IMConn* IMConnMgr::GetConnByKey(const std::string& key)
{
auto itr = key_hash_.find(key);
return itr != key_hash_.end() ? itr->second : nullptr;
}
IMConn* IMConnMgr::GetConnById(int instance_id)
{
auto itr = id_hash_.find(instance_id);
return itr != id_hash_.end() ? itr->second : nullptr;
}
IMConn* IMConnMgr::RecreateIMConn(const std::string& host, int port)
{
std::string key = host + ":" + a8::XValue(port).GetString();
IMConn* conn = GetConnByKey(key);
if (conn) {
return conn;
}
while (GetConnById(++curr_id_)) {};
int instance_id = curr_id_;
std::string remote_ip = host;
int remote_port = port;
conn = new IMConn();
conn->Init(instance_id, remote_ip, remote_port);
id_hash_[conn->instance_id] = conn;
key_hash_[key] = conn;
conn->Open();
return conn;
}

View File

@ -0,0 +1,23 @@
#pragma once
class IMConn;
class IMConnMgr : public a8::Singleton<IMConnMgr>
{
private:
IMConnMgr() {};
friend class a8::Singleton<IMConnMgr>;
public:
void Init();
void UnInit();
IMConn* GetConnByKey(const std::string& key);
IMConn* GetConnById(int instance_id);
IMConn* RecreateIMConn(const std::string& host, int port);
private:
unsigned short curr_id_ = 1000;
std::map<std::string, IMConn*> key_hash_;
std::map<int, IMConn*> id_hash_;
};

267
server/imserver/MSConn.cc Normal file
View File

@ -0,0 +1,267 @@
#include "precompile.h"
#include <string.h>
#include "ss_proto.pb.h"
#include "ss_msgid.pb.h"
#include "MSConn.h"
#include <a8/tcpclient.h>
#include <a8/udplog.h>
#include <a8/timer.h>
#include <a8/ioloop.h>
#include <a8/asynctcpclient.h>
#include "app.h"
const int PACK_MAX = 1024 * 64 * 2;
void MSConn::Init(int instance_id, const std::string& remote_ip, int remote_port)
{
if (remote_ip.empty()) {
abort();
}
this->instance_id = instance_id;
this->remote_ip = remote_ip;
this->remote_port = remote_port;
recv_bufflen_ = 0;
last_pong_tick = a8::XGetTickCount();
recv_buff_ = (char*) malloc(PACK_MAX * 2);
tcp_client_ = a8::IoLoop::Instance()->CreateAsyncTcpClient();
tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&MSConn::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&MSConn::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&MSConn::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&MSConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150),
a8::XParams().SetSender(this),
[] (const a8::XParams& param)
{
MSConn* conn = (MSConn*)param.sender.GetUserData();
conn->CheckAlive();
});
}
void MSConn::UnInit()
{
MSConnMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
MSConnMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
delete pdelnode->msg;
delete pdelnode;
}
a8::Timer::Instance()->DeleteTimer(timer_);
timer_ = nullptr;
tcp_client_->Close();
a8::IoLoop::Instance()->DestoryAsyncTcpClient(tcp_client_);
tcp_client_ = nullptr;
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
}
void MSConn::Open()
{
tcp_client_->Open();
}
void MSConn::Close()
{
tcp_client_->Close();
}
bool MSConn::Connected()
{
return tcp_client_->Connected();
}
void MSConn::SendStockMsg()
{
MSConnMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
MSConnMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
if (pdelnode->msg) {
f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg);
delete pdelnode->msg;
}
if (pdelnode->hdr) {
ForwardClientMsg(*pdelnode->hdr);
if (pdelnode->hdr->buf) {
free((char*)pdelnode->hdr->buf);
}
free(pdelnode->hdr);
}
delete pdelnode;
}
}
void MSConn::ForwardClientMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
memset(buff, 0, sizeof(f8::WSProxyPackHead_C));
f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
#if 0
head->rpc_error_code = 0;
#endif
head->socket_handle = hdr.socket_handle;
head->ip_saddr = hdr.ip_saddr;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen);
}
tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen);
free(buff);
}
void MSConn::ForwardClientMsgEx(f8::MsgHdr* hdr)
{
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
ForwardClientMsg(*hdr);
if (hdr->buf) {
free((char*)hdr->buf);
}
free(hdr);
} else {
AddStockMsg(hdr->socket_handle, 0, nullptr, hdr);
}
}
void MSConn::on_error(a8::AsyncTcpClient* sender, int errorId)
{
a8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d",
{
errorId,
sender->remote_address,
sender->remote_port
});
}
void MSConn::on_connect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d",
{
sender->remote_address,
sender->remote_port
});
App::Instance()->AddIMMsg(IM_MSConnConnect,
a8::XParams()
.SetSender(instance_id)
);
}
void MSConn::on_disconnect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect "
"remote_ip:%s remote_port:%d",
{
instance_id,
sender->remote_address,
sender->remote_port
});
App::Instance()->AddIMMsg(IM_MSConnDisconnect,
a8::XParams()
.SetSender(instance_id)
);
}
void MSConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len)
{
#if 0
++App::Instance()->perf.read_count;
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Debug("recvied target server too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
recv_bufflen_ += len;
}
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) {
f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset];
int real_len = p->packlen + (p->ext_len << 16);
if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) {
break;
}
App::Instance()->AddSocketMsg(SF_TargetServer,
p->socket_handle,
instance_id,
p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)],
real_len);
offset += sizeof(f8::WSProxyPackHead_S) + real_len;
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
a8::UdpLog::Instance()->Debug("recvied bad package", {});
}
if (offset > 0 && offset < recv_bufflen_) {
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
#if 1
last_pong_tick = a8::XGetTickCount();
#endif
}
void MSConn::CheckAlive()
{
if (!Connected()) {
Open();
} else {
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
last_pong_tick = a8::XGetTickCount();
Open();
} else {
ss::SS_Ping msg;
SendMsg(0, msg);
}
}
}
void MSConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr)
{
MSConnMsgNode* node = new MSConnMsgNode();
node->socket_handle = socket_handle;
node->msgid = msgid;
node->msg = msg;
node->hdr = hdr;
if (bot_node_) {
bot_node_->next_node = node;
bot_node_ = node;
} else {
top_node_ = node;
bot_node_ = node;
}
}

76
server/imserver/MSConn.h Normal file
View File

@ -0,0 +1,76 @@
#pragma once
#include "framework/cpp/protoutils.h"
namespace a8
{
class TcpClient;
class AsyncTcpClient;
}
struct MSConnMsgNode
{
unsigned short socket_handle = 0;
int msgid = 0;
::google::protobuf::Message* msg = nullptr;
f8::MsgHdr* hdr = nullptr;
MSConnMsgNode* next_node = nullptr;
};
struct timer_list;
class MSConn
{
public:
int instance_id = 0;
std::string remote_ip;
int remote_port = 0;
a8::tick_t last_pong_tick = 0;
public:
void Init(int instance_id, const std::string& remote_ip, int remote_port);
void UnInit();
void Open();
void Close();
bool Connected();
template <typename T>
void SendMsg(int socket_handle, T& msg)
{
static int msgid = f8::Net_GetMessageId(msg);
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg);
} else {
T* new_msg = new T();
*new_msg = msg;
AddStockMsg(socket_handle, msgid, new_msg, nullptr);
}
}
void SendStockMsg();
void ForwardClientMsg(f8::MsgHdr& hdr);
void ForwardClientMsgEx(f8::MsgHdr* hdr);
private:
void on_error(a8::AsyncTcpClient* sender, int errorId);
void on_connect(a8::AsyncTcpClient* sender);
void on_disconnect(a8::AsyncTcpClient* sender);
void on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len);
void CheckAlive();
void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr);
private:
char *recv_buff_ = nullptr;
unsigned int recv_bufflen_ = 0;
a8::AsyncTcpClient* tcp_client_ = nullptr;
timer_list* timer_ = nullptr;
MSConnMsgNode* top_node_ = nullptr;
MSConnMsgNode* bot_node_ = nullptr;
};

View File

@ -0,0 +1,50 @@
#include "precompile.h"
#include "MSConnMgr.h"
#include "MSConn.h"
#include "jsondatamgr.h"
#include "app.h"
void MSConnMgr::Init()
{
}
void MSConnMgr::UnInit()
{
for (auto& pair : id_hash_) {
pair.second->UnInit();
delete pair.second;
}
}
MSConn* MSConnMgr::GetConnByKey(const std::string& key)
{
auto itr = key_hash_.find(key);
return itr != key_hash_.end() ? itr->second : nullptr;
}
MSConn* MSConnMgr::GetConnById(int instance_id)
{
auto itr = id_hash_.find(instance_id);
return itr != id_hash_.end() ? itr->second : nullptr;
}
MSConn* MSConnMgr::RecreateMSConn(const std::string& host, int port)
{
std::string key = host + ":" + a8::XValue(port).GetString();
MSConn* conn = GetConnByKey(key);
if (conn) {
return conn;
}
while (GetConnById(++curr_id_)) {};
int instance_id = curr_id_;
std::string remote_ip = host;
int remote_port = port;
conn = new MSConn();
conn->Init(instance_id, remote_ip, remote_port);
id_hash_[conn->instance_id] = conn;
key_hash_[key] = conn;
conn->Open();
return conn;
}

View File

@ -0,0 +1,23 @@
#pragma once
class MSConn;
class MSConnMgr : public a8::Singleton<MSConnMgr>
{
private:
MSConnMgr() {};
friend class a8::Singleton<MSConnMgr>;
public:
void Init();
void UnInit();
MSConn* GetConnByKey(const std::string& key);
MSConn* GetConnById(int instance_id);
MSConn* RecreateMSConn(const std::string& host, int port);
private:
unsigned short curr_id_ = 1000;
std::map<std::string, MSConn*> key_hash_;
std::map<int, MSConn*> id_hash_;
};

View File

@ -17,6 +17,10 @@ enum InnerMesssage_e
IM_TargetConnConnect,
IM_TargetConnDisconnect,
IM_IMServerSocketDisconnect,
IM_IMConnConnect,
IM_IMConnDisconnect,
IM_MSConnConnect,
IM_MSConnDisconnect
};
//网络处理对象