wsproxy/server/wsproxy/target_conn.cc
2018-08-03 19:55:28 +08:00

147 lines
4.4 KiB
C++

#include "precompile.h"
#include <string.h>
#include "ss_proto.pb.h"
#include "ss_msgid.pb.h"
#include "target_conn.h"
#include <a8/tcpclient.h>
#include <a8/udplog.h>
#include <a8/timer.h>
#include "app.h"
const int PACK_MAX = 1024 * 64;
void RSConn::Init(int instance_id, const std::string& remote_ip, int remote_port)
{
this->instance_id = instance_id;
this->remote_ip = remote_ip;
this->remote_port = remote_port;
recv_bufflen_ = 0;
last_pong_tick = a8::XGetTickCount();
recv_buff_ = (char*) malloc(PACK_MAX * 2);
tcp_client_ = new a8::TcpClient();
tcp_client_->remote_address = remote_ip;
tcp_client_->remote_port = remote_port;
tcp_client_->on_error = std::bind(&RSConn::on_error, this, std::placeholders::_1, std::placeholders::_2);
tcp_client_->on_connect = std::bind(&RSConn::on_connect, this, std::placeholders::_1);
tcp_client_->on_disconnect = std::bind(&RSConn::on_disconnect, this, std::placeholders::_1);
tcp_client_->on_socketread = std::bind(&RSConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150),
a8::XParams().SetSender(this),
[] (const a8::XParams& param)
{
RSConn* conn = (RSConn*)param.sender.GetUserData();
conn->CheckAlive();
});
}
void RSConn::UnInit()
{
a8::Timer::Instance()->DeleteTimer(timer_);
timer_ = nullptr;
tcp_client_->Close();
delete tcp_client_;
tcp_client_ = nullptr;
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
}
void RSConn::Open()
{
tcp_client_->Open();
}
void RSConn::Close()
{
tcp_client_->Close();
}
bool RSConn::Connected()
{
return tcp_client_->Connected();
}
void RSConn::on_error(a8::TcpClient* sender, int errorId)
{
a8::UdpLog::Instance()->Error("RSConn errorid=%d", {errorId});
}
void RSConn::on_connect(a8::TcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("room server connected", {});
}
void RSConn::on_disconnect(a8::TcpClient* sender)
{
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Info("room server %d disconnected after 10s later reconnect", {instance_id});
App::Instance()->AddIMMsg(IM_RSConnDisconnect,
a8::XParams()
.SetSender(instance_id)
);
}
void RSConn::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
{
#if 0
++App::Instance()->perf.read_count;
#endif
if (recv_bufflen_ + len > 2 * PACK_MAX) {
recv_bufflen_ = 0;
a8::UdpLog::Instance()->Debug("recvied room server too long message", {});
return;
} else {
memmove(&recv_buff_[recv_bufflen_], buf, len);
recv_bufflen_ += len;
}
bool warning = false;
unsigned int offset = 0;
while (recv_bufflen_ - offset > sizeof(PackHead)) {
PackHead* p = (PackHead*) &recv_buff_[offset];
if (p->magiccode == MAGIC_CODE) {
if (recv_bufflen_ - offset < sizeof(PackHead) + p->packlen) {
break;
}
App::Instance()->AddSocketMsg(SF_RoomServer,
instance_id,
0,
p->msgid,
&recv_buff_[offset + sizeof(PackHead)],
p->packlen);
offset += sizeof(PackHead) + p->packlen;
} else {
warning = true;
offset++;
continue;
}
}
if (warning) {
a8::UdpLog::Instance()->Debug("recvied bad package", {});
}
if (offset > 0 && offset < recv_bufflen_) {
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
}
void RSConn::CheckAlive()
{
if (!Connected()) {
Open();
} else {
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
last_pong_tick = a8::XGetTickCount();
Open();
} else {
ss::SS_Ping msg;
SendToRoomServer(msg);
}
}
}