#include "precompile.h" #include #include #include #include #include "ss_proto.pb.h" #include "ss_msgid.pb.h" #include "master.h" #include "app.h" void Master::Init(int instance_id, const std::string& remote_ip, int remote_port) { this->instance_id = instance_id; this->remote_ip = remote_ip; this->remote_port = remote_port; recv_bufflen_ = 0; last_pong_tick = a8::XGetTickCount(); recv_buff_ = (char*) malloc(PACK_MAX * 2); tcp_client_ = std::make_shared(); tcp_client_->remote_address = remote_ip; tcp_client_->remote_port = remote_port; tcp_client_->on_error = std::bind(&Master::on_error, this, std::placeholders::_1, std::placeholders::_2); tcp_client_->on_connect = std::bind(&Master::on_connect, this, std::placeholders::_1); tcp_client_->on_disconnect = std::bind(&Master::on_disconnect, this, std::placeholders::_1); tcp_client_->on_socketread = std::bind(&Master::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); timer_wp_ = f8::Timer::Instance()->SetIntervalWpEx (1000 * 9 + a8::RandEx(500, 150), [this] (int event, const a8::Args* args) { if (a8::TIMER_EXEC_EVENT == event) { CheckAlive(); } }, &attacher_); } void Master::UnInit() { if (!timer_wp_.expired()) { f8::Timer::Instance()->Delete(timer_wp_); } tcp_client_->Close(); if (tcp_client_.use_count() != 1) { abort(); } tcp_client_ = nullptr; recv_bufflen_ = 0; free(recv_buff_); recv_buff_ = nullptr; } void Master::Open() { tcp_client_->Open(); } void Master::Close() { tcp_client_->Close(); } bool Master::Connected() { return tcp_client_->Connected(); } void Master::on_error(a8::TcpClient* sender, int errorId) { f8::UdpLog::Instance()->Error("Master errorid=%d remote_ip:%s remote_port:%d", { errorId, sender->remote_address, sender->remote_port }); } void Master::on_connect(a8::TcpClient* sender) { recv_bufflen_ = 0; f8::UdpLog::Instance()->Info("masterserver connected", {}); } void Master::on_disconnect(a8::TcpClient* sender) { recv_bufflen_ = 0; f8::UdpLog::Instance()->Info("masterserver %d disconnected after 10s later reconnect", {instance_id}); } void Master::on_socketread(a8::TcpClient* sender, char* buf, unsigned int len) { #if 0 ++App::Instance()->perf.read_count; #endif if (recv_bufflen_ + len > 2 * PACK_MAX) { recv_bufflen_ = 0; f8::UdpLog::Instance()->Debug("recvied masterserver too long message", {}); return; } else { memmove(&recv_buff_[recv_bufflen_], buf, len); recv_bufflen_ += len; } bool warning = false; unsigned int offset = 0; while (recv_bufflen_ - offset >= sizeof(f8::PackHead)) { f8::PackHead* p = (f8::PackHead*) &recv_buff_[offset]; if (p->magic_code == f8::MAGIC_CODE) { if (recv_bufflen_ - offset < sizeof(f8::PackHead) + p->packlen) { break; } App::Instance()->AddSocketMsg(SF_MasterServer, 0, instance_id, p->msgid, p->seqid, &recv_buff_[offset + sizeof(f8::PackHead)], p->packlen); offset += sizeof(f8::PackHead) + p->packlen; } else { warning = true; offset++; continue; } } if (warning) { f8::UdpLog::Instance()->Debug("recvied bad package", {}); } if (offset > 0 && offset < recv_bufflen_) { memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); } recv_bufflen_ -= offset; #if 1 last_pong_tick = a8::XGetTickCount(); #endif } void Master::CheckAlive() { if (!Connected()) { Open(); } else { if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) { last_pong_tick = a8::XGetTickCount(); Close(); Open(); } else { ss::SS_Ping msg; SendMsg(msg); } } }