This commit is contained in:
aozhiwei 2020-05-03 18:13:39 +08:00
parent 828142a941
commit 14faa4a76c
2 changed files with 12 additions and 133 deletions

View File

@ -44,17 +44,6 @@ void IMConn::Init(int instance_id, const std::string& remote_ip, int remote_port
void IMConn::UnInit()
{
IMConnMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
IMConnMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
delete pdelnode->msg;
delete pdelnode;
}
a8::Timer::Instance()->DeleteTimer(timer_);
timer_ = nullptr;
tcp_client_->Close();
@ -80,73 +69,9 @@ bool IMConn::Connected()
return tcp_client_->Connected();
}
void IMConn::SendStockMsg()
{
IMConnMsgNode* work_node;
work_node = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
while (work_node) {
IMConnMsgNode* pdelnode = work_node;
work_node = work_node->next_node;
if (pdelnode->msg) {
f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg);
delete pdelnode->msg;
}
if (pdelnode->hdr) {
ForwardClientMsg(*pdelnode->hdr);
if (pdelnode->hdr->buf) {
free((char*)pdelnode->hdr->buf);
}
free(pdelnode->hdr);
}
delete pdelnode;
}
}
void IMConn::ForwardClientMsg(f8::MsgHdr& hdr)
{
char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
memset(buff, 0, sizeof(f8::WSProxyPackHead_C));
f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff;
head->packlen = hdr.buflen;
head->msgid = hdr.msgid;
head->seqid = hdr.seqid;
head->magic_code = f8::MAGIC_CODE;
#if 0
head->rpc_error_code = 0;
#endif
head->socket_handle = hdr.socket_handle;
head->ip_saddr = hdr.ip_saddr;
if (hdr.buflen > 0) {
memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen);
}
tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen);
free(buff);
}
void IMConn::ForwardClientMsgEx(f8::MsgHdr* hdr)
{
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
ForwardClientMsg(*hdr);
if (hdr->buf) {
free((char*)hdr->buf);
}
free(hdr);
} else {
AddStockMsg(hdr->socket_handle, 0, nullptr, hdr);
}
}
void IMConn::on_error(a8::AsyncTcpClient* sender, int errorId)
{
a8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d",
a8::UdpLog::Instance()->Error("imserver errorid=%d remote_ip:%s remote_port:%d",
{
errorId,
sender->remote_address,
@ -157,7 +82,7 @@ void IMConn::on_error(a8::AsyncTcpClient* sender, int errorId)
void IMConn::on_connect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d",
a8::UdpLog::Instance()->Info("imserver connected remote_ip:%s remote_port:%d",
{
sender->remote_address,
sender->remote_port
@ -171,7 +96,7 @@ void IMConn::on_connect(a8::AsyncTcpClient* sender)
void IMConn::on_disconnect(a8::AsyncTcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect "
a8::UdpLog::Instance()->Info("imserver %d disconnected after 10s later reconnect "
"remote_ip:%s remote_port:%d",
{
instance_id,
@ -191,7 +116,7 @@ void IMConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int l
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Debug("recvied target server too long message", {});
a8::UdpLog::Instance()->Debug("recvied imserver too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
@ -200,21 +125,20 @@ void IMConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int l
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) {
f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset];
int real_len = p->packlen + (p->ext_len << 16);
while (recv_bufflen_ - offset >= sizeof(f8::PackHead)) {
f8::PackHead* p = (f8::PackHead*) &recv_buff_[offset];
if (p->magic_code == f8::MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) {
if (recv_bufflen_ - offset < sizeof(f8::PackHead) + p->packlen) {
break;
}
App::Instance()->AddSocketMsg(SF_IMConn,
p->socket_handle,
0,
instance_id,
p->msgid,
p->seqid,
&recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)],
real_len);
offset += sizeof(f8::WSProxyPackHead_S) + real_len;
&recv_buff_[offset + sizeof(f8::PackHead)],
p->packlen);
offset += sizeof(f8::PackHead) + p->packlen;
} else {
warning = true;
offset++;
@ -248,20 +172,3 @@ void IMConn::CheckAlive()
}
}
}
void IMConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr)
{
IMConnMsgNode* node = new IMConnMsgNode();
node->socket_handle = socket_handle;
node->msgid = msgid;
node->msg = msg;
node->hdr = hdr;
if (bot_node_) {
bot_node_->next_node = node;
bot_node_ = node;
} else {
top_node_ = node;
bot_node_ = node;
}
}

View File

@ -8,16 +8,6 @@ namespace a8
class AsyncTcpClient;
}
struct IMConnMsgNode
{
unsigned short socket_handle = 0;
int msgid = 0;
::google::protobuf::Message* msg = nullptr;
f8::MsgHdr* hdr = nullptr;
IMConnMsgNode* next_node = nullptr;
};
struct timer_list;
class IMConn
{
@ -39,22 +29,9 @@ class IMConn
void SendMsg(int socket_handle, T& msg)
{
static int msgid = f8::Net_GetMessageId(msg);
if (Connected()) {
if (top_node_) {
SendStockMsg();
}
f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg);
} else {
T* new_msg = new T();
*new_msg = msg;
AddStockMsg(socket_handle, msgid, new_msg, nullptr);
}
f8::Net_SendMsg(tcp_client_, 0, msgid, msg);
}
void SendStockMsg();
void ForwardClientMsg(f8::MsgHdr& hdr);
void ForwardClientMsgEx(f8::MsgHdr* hdr);
private:
void on_error(a8::AsyncTcpClient* sender, int errorId);
void on_connect(a8::AsyncTcpClient* sender);
@ -62,15 +39,10 @@ class IMConn
void on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len);
void CheckAlive();
void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
f8::MsgHdr* hdr);
private:
char *recv_buff_ = nullptr;
unsigned int recv_bufflen_ = 0;
a8::AsyncTcpClient* tcp_client_ = nullptr;
timer_list* timer_ = nullptr;
IMConnMsgNode* top_node_ = nullptr;
IMConnMsgNode* bot_node_ = nullptr;
};