#include "precompile.h" #include #include #include #include #include #include "ss_proto.pb.h" #include "ss_msgid.pb.h" #include "upstream.h" #include "app.h" const int PACK_MAX = 1024 * 64 * 2; void UpStream::Init(int instance_id, const std::string& remote_ip, int remote_port) { if (remote_ip.empty()) { abort(); } this->instance_id = instance_id; this->remote_ip = remote_ip; this->remote_port = remote_port; recv_bufflen_ = 0; last_pong_tick = a8::XGetTickCount(); recv_buff_ = (char*) malloc(PACK_MAX * 2); tcp_client_ = std::make_shared(); tcp_client_->remote_address = remote_ip; tcp_client_->remote_port = remote_port; tcp_client_->on_error = std::bind(&UpStream::on_error, this, std::placeholders::_1, std::placeholders::_2); tcp_client_->on_connect = std::bind(&UpStream::on_connect, this, std::placeholders::_1); tcp_client_->on_disconnect = std::bind(&UpStream::on_disconnect, this, std::placeholders::_1); tcp_client_->on_socketread = std::bind(&UpStream::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); timer_wp_ = f8::Timer::Instance()->SetIntervalWpEx (1000 * 9 + a8::RandEx(500, 150), [this] (int event, const a8::Args* args) { if (a8::TIMER_EXEC_EVENT == event) { CheckAlive(); } }, &attacher_); } void UpStream::UnInit() { UpStreamMsgNode* work_node; work_node = top_node_; top_node_ = nullptr; bot_node_ = nullptr; while (work_node) { UpStreamMsgNode* pdelnode = work_node; work_node = work_node->next_node; delete pdelnode->msg; delete pdelnode; } if (!timer_wp_.expired()) { f8::Timer::Instance()->Delete(timer_wp_); } tcp_client_->Close(); if(tcp_client_.use_count() != 1) { abort(); } recv_bufflen_ = 0; free(recv_buff_); recv_buff_ = nullptr; } void UpStream::Open() { tcp_client_->Open(); } void UpStream::Close() { tcp_client_->Close(); } bool UpStream::Connected() { return tcp_client_->Connected(); } void UpStream::SendStockMsg() { UpStreamMsgNode* work_node; work_node = top_node_; top_node_ = nullptr; bot_node_ = nullptr; while (work_node) { UpStreamMsgNode* pdelnode = work_node; work_node = work_node->next_node; if (pdelnode->msg) { f8::Net_SendProxyCMsg(tcp_client_.get(), pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg); delete pdelnode->msg; } if (pdelnode->hdr) { ForwardClientMsg(*pdelnode->hdr); f8::MsgHdr::Destroy(pdelnode->hdr); pdelnode->hdr = nullptr; } delete pdelnode; } } void UpStream::ForwardClientMsg(f8::MsgHdr& hdr) { char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); memset(buff, 0, sizeof(f8::WSProxyPackHead_C)); f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff; head->packlen = hdr.buflen; head->msgid = hdr.msgid; head->seqid = hdr.seqid; head->magic_code = f8::MAGIC_CODE; #if 0 head->rpc_error_code = 0; #endif head->socket_handle = hdr.socket_handle; head->ip_saddr = hdr.ip_saddr; if (hdr.buflen > 0) { memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen); } tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen); free(buff); } void UpStream::ForwardClientMsgEx(f8::MsgHdr* hdr) { if (Connected()) { if (top_node_) { SendStockMsg(); } ForwardClientMsg(*hdr); if (hdr->buf) { free((char*)hdr->buf); } free(hdr); } else { AddStockMsg(hdr->socket_handle, 0, nullptr, hdr); } } void UpStream::on_error(a8::TcpClient* sender, int errorId) { f8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d", { errorId, sender->remote_address, sender->remote_port }); } void UpStream::on_connect(a8::TcpClient* sender) { recv_bufflen_ = 0; f8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d", { sender->remote_address, sender->remote_port }); f8::MsgQueue::Instance()->PostMsg ( IM_UpStreamConnect, a8::Args ( { instance_id } ) ); } void UpStream::on_disconnect(a8::TcpClient* sender) { recv_bufflen_ = 0; f8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect " "remote_ip:%s remote_port:%d", { instance_id, sender->remote_address, sender->remote_port }); f8::MsgQueue::Instance()->PostMsg ( IM_UpStreamDisconnect, a8::Args ( { instance_id } ) ); } void UpStream::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) { #if 0 ++App::Instance()->perf.read_count; #endif if (recv_bufflen_ + len > 2 * PACK_MAX) { recv_bufflen_ = 0; f8::UdpLog::Instance()->Debug("recvied target server too long message", {}); return; } else { memmove(&recv_buff_[recv_bufflen_], buf, len); recv_bufflen_ += len; } bool warning = false; unsigned int offset = 0; while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) { f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset]; int real_len = p->packlen + (p->ext_len << 16); if (p->magic_code == f8::MAGIC_CODE) { if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) { break; } App::Instance()->AddSocketMsg(SF_TargetServer, p->socket_handle, instance_id, p->msgid, p->seqid, &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], real_len); offset += sizeof(f8::WSProxyPackHead_S) + real_len; } else { warning = true; offset++; continue; } } if (warning) { f8::UdpLog::Instance()->Debug("recvied bad package", {}); } if (offset > 0 && offset < recv_bufflen_) { memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); } recv_bufflen_ -= offset; #if 1 last_pong_tick = a8::XGetTickCount(); #endif } void UpStream::CheckAlive() { if (!Connected()) { Open(); } else { if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) { last_pong_tick = a8::XGetTickCount(); Open(); } else { ss::SS_Ping msg; SendMsg(0, msg); } } } void UpStream::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, f8::MsgHdr* hdr) { UpStreamMsgNode* node = new UpStreamMsgNode(); node->socket_handle = socket_handle; node->msgid = msgid; node->msg = msg; node->hdr = hdr; if (bot_node_) { bot_node_->next_node = node; bot_node_ = node; } else { top_node_ = node; bot_node_ = node; } }