282 lines
7.8 KiB
C++
282 lines
7.8 KiB
C++
#include "precompile.h"
|
|
|
|
#include <string.h>
|
|
|
|
#include <a8/tcpclient.h>
|
|
|
|
#include <f8/udplog.h>
|
|
#include <f8/timer.h>
|
|
#include <f8/msgqueue.h>
|
|
|
|
#include "ss_proto.pb.h"
|
|
#include "ss_msgid.pb.h"
|
|
#include "upstream.h"
|
|
|
|
#include "app.h"
|
|
|
|
const int PACK_MAX = 1024 * 64 * 2;
|
|
|
|
void UpStream::Init(int instance_id, const std::string& remote_ip, int remote_port)
|
|
{
|
|
if (remote_ip.empty()) {
|
|
abort();
|
|
}
|
|
this->instance_id = instance_id;
|
|
this->remote_ip = remote_ip;
|
|
this->remote_port = remote_port;
|
|
|
|
recv_bufflen_ = 0;
|
|
last_pong_tick = a8::XGetTickCount();
|
|
recv_buff_ = (char*) malloc(PACK_MAX * 2);
|
|
tcp_client_ = std::make_shared<a8::TcpClient>(remote_ip, remote_port);
|
|
tcp_client_->on_error = std::bind(&UpStream::on_error, this, std::placeholders::_1, std::placeholders::_2);
|
|
tcp_client_->on_connect = std::bind(&UpStream::on_connect, this, std::placeholders::_1);
|
|
tcp_client_->on_disconnect = std::bind(&UpStream::on_disconnect, this, std::placeholders::_1);
|
|
tcp_client_->on_socketread = std::bind(&UpStream::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
|
|
timer_wp_ = f8::Timer::Instance()->SetIntervalWpEx
|
|
(1000 * 9 + a8::RandEx(500, 150),
|
|
[this] (int event, const a8::Args* args)
|
|
{
|
|
if (a8::TIMER_EXEC_EVENT == event) {
|
|
CheckAlive();
|
|
}
|
|
},
|
|
&attacher_);
|
|
}
|
|
|
|
void UpStream::UnInit()
|
|
{
|
|
UpStreamMsgNode* work_node;
|
|
work_node = top_node_;
|
|
top_node_ = nullptr;
|
|
bot_node_ = nullptr;
|
|
while (work_node) {
|
|
UpStreamMsgNode* pdelnode = work_node;
|
|
work_node = work_node->next_node;
|
|
delete pdelnode->msg;
|
|
delete pdelnode;
|
|
}
|
|
|
|
if (!timer_wp_.expired()) {
|
|
f8::Timer::Instance()->Delete(timer_wp_);
|
|
}
|
|
tcp_client_->Close();
|
|
if(tcp_client_.use_count() != 1) {
|
|
abort();
|
|
}
|
|
recv_bufflen_ = 0;
|
|
free(recv_buff_);
|
|
recv_buff_ = nullptr;
|
|
}
|
|
|
|
void UpStream::Open()
|
|
{
|
|
tcp_client_->Open();
|
|
}
|
|
|
|
void UpStream::Close()
|
|
{
|
|
tcp_client_->Close();
|
|
}
|
|
|
|
bool UpStream::Connected()
|
|
{
|
|
return tcp_client_->Connected();
|
|
}
|
|
|
|
void UpStream::SendStockMsg()
|
|
{
|
|
UpStreamMsgNode* work_node;
|
|
work_node = top_node_;
|
|
top_node_ = nullptr;
|
|
bot_node_ = nullptr;
|
|
while (work_node) {
|
|
UpStreamMsgNode* pdelnode = work_node;
|
|
work_node = work_node->next_node;
|
|
|
|
if (pdelnode->msg) {
|
|
f8::Net_SendProxyCMsg(tcp_client_.get(), pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg);
|
|
delete pdelnode->msg;
|
|
}
|
|
if (pdelnode->hdr) {
|
|
ForwardClientMsg(*pdelnode->hdr);
|
|
f8::MsgHdr::Destroy(pdelnode->hdr);
|
|
pdelnode->hdr = nullptr;
|
|
}
|
|
delete pdelnode;
|
|
}
|
|
}
|
|
|
|
void UpStream::ForwardClientMsg(f8::MsgHdr& hdr)
|
|
{
|
|
char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen);
|
|
memset(buff, 0, sizeof(f8::WSProxyPackHead_C));
|
|
f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff;
|
|
head->packlen = hdr.buflen;
|
|
head->msgid = hdr.msgid;
|
|
head->seqid = hdr.seqid;
|
|
head->magic_code = f8::MAGIC_CODE;
|
|
#if 0
|
|
head->rpc_error_code = 0;
|
|
#endif
|
|
head->socket_handle = hdr.socket_handle;
|
|
head->ip_saddr = hdr.ip_saddr;
|
|
|
|
if (hdr.buflen > 0) {
|
|
memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen);
|
|
}
|
|
|
|
tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen);
|
|
free(buff);
|
|
}
|
|
|
|
void UpStream::ForwardClientMsgEx(f8::MsgHdr* hdr)
|
|
{
|
|
if (Connected()) {
|
|
if (top_node_) {
|
|
SendStockMsg();
|
|
}
|
|
ForwardClientMsg(*hdr);
|
|
if (hdr->buf) {
|
|
free((char*)hdr->buf);
|
|
}
|
|
free(hdr);
|
|
} else {
|
|
AddStockMsg(hdr->socket_handle, 0, nullptr, hdr);
|
|
}
|
|
}
|
|
|
|
void UpStream::on_error(a8::TcpClient* sender, int errorId)
|
|
{
|
|
f8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d",
|
|
{
|
|
errorId,
|
|
sender->GetRemoteAddress(),
|
|
sender->GetRemotePort()
|
|
});
|
|
}
|
|
|
|
void UpStream::on_connect(a8::TcpClient* sender)
|
|
{
|
|
recv_bufflen_ = 0;
|
|
f8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d",
|
|
{
|
|
sender->GetRemoteAddress(),
|
|
sender->GetRemotePort()
|
|
});
|
|
f8::MsgQueue::Instance()->PostMsg
|
|
(
|
|
IM_UpStreamConnect,
|
|
a8::Args
|
|
(
|
|
{
|
|
instance_id
|
|
}
|
|
)
|
|
);
|
|
}
|
|
|
|
void UpStream::on_disconnect(a8::TcpClient* sender)
|
|
{
|
|
recv_bufflen_ = 0;
|
|
f8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect "
|
|
"remote_ip:%s remote_port:%d",
|
|
{
|
|
instance_id,
|
|
sender->GetRemoteAddress(),
|
|
sender->GetRemotePort()
|
|
});
|
|
f8::MsgQueue::Instance()->PostMsg
|
|
(
|
|
IM_UpStreamDisconnect,
|
|
a8::Args
|
|
(
|
|
{
|
|
instance_id
|
|
}
|
|
)
|
|
);
|
|
}
|
|
|
|
void UpStream::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len)
|
|
{
|
|
#if 0
|
|
++App::Instance()->perf.read_count;
|
|
#endif
|
|
if (recv_bufflen_ + len > 2 * PACK_MAX) {
|
|
recv_bufflen_ = 0;
|
|
f8::UdpLog::Instance()->Debug("recvied target server too long message", {});
|
|
return;
|
|
} else {
|
|
memmove(&recv_buff_[recv_bufflen_], buf, len);
|
|
recv_bufflen_ += len;
|
|
}
|
|
|
|
bool warning = false;
|
|
unsigned int offset = 0;
|
|
while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) {
|
|
f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset];
|
|
int real_len = p->packlen + (p->ext_len << 16);
|
|
if (p->magic_code == f8::MAGIC_CODE) {
|
|
if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) {
|
|
break;
|
|
}
|
|
App::Instance()->AddSocketMsg(SF_TargetServer,
|
|
p->socket_handle,
|
|
instance_id,
|
|
p->msgid,
|
|
p->seqid,
|
|
&recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)],
|
|
real_len);
|
|
offset += sizeof(f8::WSProxyPackHead_S) + real_len;
|
|
} else {
|
|
warning = true;
|
|
offset++;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (warning) {
|
|
f8::UdpLog::Instance()->Debug("recvied bad package", {});
|
|
}
|
|
if (offset > 0 && offset < recv_bufflen_) {
|
|
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
|
|
}
|
|
recv_bufflen_ -= offset;
|
|
#if 1
|
|
last_pong_tick = a8::XGetTickCount();
|
|
#endif
|
|
}
|
|
|
|
void UpStream::CheckAlive()
|
|
{
|
|
if (!Connected()) {
|
|
Open();
|
|
} else {
|
|
if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) {
|
|
last_pong_tick = a8::XGetTickCount();
|
|
Open();
|
|
} else {
|
|
ss::SS_Ping msg;
|
|
SendMsg(0, msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
void UpStream::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg,
|
|
f8::MsgHdr* hdr)
|
|
{
|
|
UpStreamMsgNode* node = new UpStreamMsgNode();
|
|
node->socket_handle = socket_handle;
|
|
node->msgid = msgid;
|
|
node->msg = msg;
|
|
node->hdr = hdr;
|
|
if (bot_node_) {
|
|
bot_node_->next_node = node;
|
|
bot_node_ = node;
|
|
} else {
|
|
top_node_ = node;
|
|
bot_node_ = node;
|
|
}
|
|
}
|