From 99a165eb3f0de846b59312f7500ab30cc508efd7 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Sun, 3 May 2020 15:43:50 +0800 Subject: [PATCH] 1 --- server/imserver/IMConn.cc | 267 +++++++++++++++++++++++++++++++++++ server/imserver/IMConn.h | 76 ++++++++++ server/imserver/IMConnMgr.cc | 50 +++++++ server/imserver/IMConnMgr.h | 23 +++ server/imserver/MSConn.cc | 267 +++++++++++++++++++++++++++++++++++ server/imserver/MSConn.h | 76 ++++++++++ server/imserver/MSConnMgr.cc | 50 +++++++ server/imserver/MSConnMgr.h | 23 +++ server/imserver/constant.h | 4 + 9 files changed, 836 insertions(+) create mode 100644 server/imserver/IMConn.cc create mode 100644 server/imserver/IMConn.h create mode 100644 server/imserver/IMConnMgr.cc create mode 100644 server/imserver/IMConnMgr.h create mode 100644 server/imserver/MSConn.cc create mode 100644 server/imserver/MSConn.h create mode 100644 server/imserver/MSConnMgr.cc create mode 100644 server/imserver/MSConnMgr.h diff --git a/server/imserver/IMConn.cc b/server/imserver/IMConn.cc new file mode 100644 index 0000000..e623902 --- /dev/null +++ b/server/imserver/IMConn.cc @@ -0,0 +1,267 @@ +#include "precompile.h" + +#include + +#include "ss_proto.pb.h" +#include "ss_msgid.pb.h" +#include "IMConn.h" +#include +#include +#include +#include +#include +#include "app.h" + +const int PACK_MAX = 1024 * 64 * 2; + +void IMConn::Init(int instance_id, const std::string& remote_ip, int remote_port) +{ + if (remote_ip.empty()) { + abort(); + } + this->instance_id = instance_id; + this->remote_ip = remote_ip; + this->remote_port = remote_port; + + recv_bufflen_ = 0; + last_pong_tick = a8::XGetTickCount(); + recv_buff_ = (char*) malloc(PACK_MAX * 2); + tcp_client_ = a8::IoLoop::Instance()->CreateAsyncTcpClient(); + tcp_client_->remote_address = remote_ip; + tcp_client_->remote_port = remote_port; + tcp_client_->on_error = std::bind(&IMConn::on_error, this, std::placeholders::_1, std::placeholders::_2); + tcp_client_->on_connect = std::bind(&IMConn::on_connect, this, std::placeholders::_1); + tcp_client_->on_disconnect = std::bind(&IMConn::on_disconnect, this, std::placeholders::_1); + tcp_client_->on_socketread = std::bind(&IMConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), + a8::XParams().SetSender(this), + [] (const a8::XParams& param) + { + IMConn* conn = (IMConn*)param.sender.GetUserData(); + conn->CheckAlive(); + }); +} + +void IMConn::UnInit() +{ + IMConnMsgNode* work_node; + work_node = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + while (work_node) { + IMConnMsgNode* pdelnode = work_node; + work_node = work_node->next_node; + delete pdelnode->msg; + delete pdelnode; + } + + a8::Timer::Instance()->DeleteTimer(timer_); + timer_ = nullptr; + tcp_client_->Close(); + a8::IoLoop::Instance()->DestoryAsyncTcpClient(tcp_client_); + tcp_client_ = nullptr; + recv_bufflen_ = 0; + free(recv_buff_); + recv_buff_ = nullptr; +} + +void IMConn::Open() +{ + tcp_client_->Open(); +} + +void IMConn::Close() +{ + tcp_client_->Close(); +} + +bool IMConn::Connected() +{ + return tcp_client_->Connected(); +} + +void IMConn::SendStockMsg() +{ + IMConnMsgNode* work_node; + work_node = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + while (work_node) { + IMConnMsgNode* pdelnode = work_node; + work_node = work_node->next_node; + + if (pdelnode->msg) { + f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg); + delete pdelnode->msg; + } + if (pdelnode->hdr) { + ForwardClientMsg(*pdelnode->hdr); + if (pdelnode->hdr->buf) { + free((char*)pdelnode->hdr->buf); + } + free(pdelnode->hdr); + } + delete pdelnode; + } +} + +void IMConn::ForwardClientMsg(f8::MsgHdr& hdr) +{ + char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); + memset(buff, 0, sizeof(f8::WSProxyPackHead_C)); + f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff; + head->packlen = hdr.buflen; + head->msgid = hdr.msgid; + head->seqid = hdr.seqid; + head->magic_code = f8::MAGIC_CODE; + #if 0 + head->rpc_error_code = 0; + #endif + head->socket_handle = hdr.socket_handle; + head->ip_saddr = hdr.ip_saddr; + + if (hdr.buflen > 0) { + memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen); + } + + tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen); + free(buff); +} + +void IMConn::ForwardClientMsgEx(f8::MsgHdr* hdr) +{ + if (Connected()) { + if (top_node_) { + SendStockMsg(); + } + ForwardClientMsg(*hdr); + if (hdr->buf) { + free((char*)hdr->buf); + } + free(hdr); + } else { + AddStockMsg(hdr->socket_handle, 0, nullptr, hdr); + } +} + +void IMConn::on_error(a8::AsyncTcpClient* sender, int errorId) +{ + a8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d", + { + errorId, + sender->remote_address, + sender->remote_port + }); +} + +void IMConn::on_connect(a8::AsyncTcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d", + { + sender->remote_address, + sender->remote_port + }); + App::Instance()->AddIMMsg(IM_IMConnConnect, + a8::XParams() + .SetSender(instance_id) + ); +} + +void IMConn::on_disconnect(a8::AsyncTcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect " + "remote_ip:%s remote_port:%d", + { + instance_id, + sender->remote_address, + sender->remote_port + }); + App::Instance()->AddIMMsg(IM_IMConnDisconnect, + a8::XParams() + .SetSender(instance_id) + ); +} + +void IMConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len) +{ + #if 0 + ++App::Instance()->perf.read_count; + #endif + if (recv_bufflen_ + len > 2 * PACK_MAX) { + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Debug("recvied target server too long message", {}); + return; + } else { + memmove(&recv_buff_[recv_bufflen_], buf, len); + recv_bufflen_ += len; + } + + bool warning = false; + unsigned int offset = 0; + while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) { + f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset]; + int real_len = p->packlen + (p->ext_len << 16); + if (p->magic_code == f8::MAGIC_CODE) { + if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) { + break; + } + App::Instance()->AddSocketMsg(SF_TargetServer, + p->socket_handle, + instance_id, + p->msgid, + p->seqid, + &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], + real_len); + offset += sizeof(f8::WSProxyPackHead_S) + real_len; + } else { + warning = true; + offset++; + continue; + } + } + + if (warning) { + a8::UdpLog::Instance()->Debug("recvied bad package", {}); + } + if (offset > 0 && offset < recv_bufflen_) { + memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); + } + recv_bufflen_ -= offset; + #if 1 + last_pong_tick = a8::XGetTickCount(); + #endif +} + +void IMConn::CheckAlive() +{ + if (!Connected()) { + Open(); + } else { + if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) { + last_pong_tick = a8::XGetTickCount(); + Open(); + } else { + ss::SS_Ping msg; + SendMsg(0, msg); + } + } +} + +void IMConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, + f8::MsgHdr* hdr) +{ + IMConnMsgNode* node = new IMConnMsgNode(); + node->socket_handle = socket_handle; + node->msgid = msgid; + node->msg = msg; + node->hdr = hdr; + if (bot_node_) { + bot_node_->next_node = node; + bot_node_ = node; + } else { + top_node_ = node; + bot_node_ = node; + } +} diff --git a/server/imserver/IMConn.h b/server/imserver/IMConn.h new file mode 100644 index 0000000..26f21f2 --- /dev/null +++ b/server/imserver/IMConn.h @@ -0,0 +1,76 @@ +#pragma once + +#include "framework/cpp/protoutils.h" + +namespace a8 +{ + class TcpClient; + class AsyncTcpClient; +} + +struct IMConnMsgNode +{ + unsigned short socket_handle = 0; + int msgid = 0; + ::google::protobuf::Message* msg = nullptr; + f8::MsgHdr* hdr = nullptr; + + IMConnMsgNode* next_node = nullptr; +}; + +struct timer_list; +class IMConn +{ + public: + int instance_id = 0; + std::string remote_ip; + int remote_port = 0; + a8::tick_t last_pong_tick = 0; + + public: + void Init(int instance_id, const std::string& remote_ip, int remote_port); + void UnInit(); + + void Open(); + void Close(); + bool Connected(); + + template + void SendMsg(int socket_handle, T& msg) + { + static int msgid = f8::Net_GetMessageId(msg); + if (Connected()) { + if (top_node_) { + SendStockMsg(); + } + f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg); + } else { + T* new_msg = new T(); + *new_msg = msg; + AddStockMsg(socket_handle, msgid, new_msg, nullptr); + } + } + + void SendStockMsg(); + void ForwardClientMsg(f8::MsgHdr& hdr); + void ForwardClientMsgEx(f8::MsgHdr* hdr); + + private: + void on_error(a8::AsyncTcpClient* sender, int errorId); + void on_connect(a8::AsyncTcpClient* sender); + void on_disconnect(a8::AsyncTcpClient* sender); + void on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len); + + void CheckAlive(); + void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, + f8::MsgHdr* hdr); + + private: + char *recv_buff_ = nullptr; + unsigned int recv_bufflen_ = 0; + a8::AsyncTcpClient* tcp_client_ = nullptr; + timer_list* timer_ = nullptr; + + IMConnMsgNode* top_node_ = nullptr; + IMConnMsgNode* bot_node_ = nullptr; +}; diff --git a/server/imserver/IMConnMgr.cc b/server/imserver/IMConnMgr.cc new file mode 100644 index 0000000..ef8308d --- /dev/null +++ b/server/imserver/IMConnMgr.cc @@ -0,0 +1,50 @@ +#include "precompile.h" + +#include "IMConnMgr.h" +#include "IMConn.h" +#include "jsondatamgr.h" +#include "app.h" + +void IMConnMgr::Init() +{ +} + +void IMConnMgr::UnInit() +{ + for (auto& pair : id_hash_) { + pair.second->UnInit(); + delete pair.second; + } +} + +IMConn* IMConnMgr::GetConnByKey(const std::string& key) +{ + auto itr = key_hash_.find(key); + return itr != key_hash_.end() ? itr->second : nullptr; +} + +IMConn* IMConnMgr::GetConnById(int instance_id) +{ + auto itr = id_hash_.find(instance_id); + return itr != id_hash_.end() ? itr->second : nullptr; +} + +IMConn* IMConnMgr::RecreateIMConn(const std::string& host, int port) +{ + std::string key = host + ":" + a8::XValue(port).GetString(); + IMConn* conn = GetConnByKey(key); + if (conn) { + return conn; + } + while (GetConnById(++curr_id_)) {}; + int instance_id = curr_id_; + std::string remote_ip = host; + int remote_port = port; + + conn = new IMConn(); + conn->Init(instance_id, remote_ip, remote_port); + id_hash_[conn->instance_id] = conn; + key_hash_[key] = conn; + conn->Open(); + return conn; +} diff --git a/server/imserver/IMConnMgr.h b/server/imserver/IMConnMgr.h new file mode 100644 index 0000000..bde0d2b --- /dev/null +++ b/server/imserver/IMConnMgr.h @@ -0,0 +1,23 @@ +#pragma once + +class IMConn; +class IMConnMgr : public a8::Singleton +{ + private: + IMConnMgr() {}; + friend class a8::Singleton; + + public: + + void Init(); + void UnInit(); + + IMConn* GetConnByKey(const std::string& key); + IMConn* GetConnById(int instance_id); + IMConn* RecreateIMConn(const std::string& host, int port); + + private: + unsigned short curr_id_ = 1000; + std::map key_hash_; + std::map id_hash_; +}; diff --git a/server/imserver/MSConn.cc b/server/imserver/MSConn.cc new file mode 100644 index 0000000..8fe1e51 --- /dev/null +++ b/server/imserver/MSConn.cc @@ -0,0 +1,267 @@ +#include "precompile.h" + +#include + +#include "ss_proto.pb.h" +#include "ss_msgid.pb.h" +#include "MSConn.h" +#include +#include +#include +#include +#include +#include "app.h" + +const int PACK_MAX = 1024 * 64 * 2; + +void MSConn::Init(int instance_id, const std::string& remote_ip, int remote_port) +{ + if (remote_ip.empty()) { + abort(); + } + this->instance_id = instance_id; + this->remote_ip = remote_ip; + this->remote_port = remote_port; + + recv_bufflen_ = 0; + last_pong_tick = a8::XGetTickCount(); + recv_buff_ = (char*) malloc(PACK_MAX * 2); + tcp_client_ = a8::IoLoop::Instance()->CreateAsyncTcpClient(); + tcp_client_->remote_address = remote_ip; + tcp_client_->remote_port = remote_port; + tcp_client_->on_error = std::bind(&MSConn::on_error, this, std::placeholders::_1, std::placeholders::_2); + tcp_client_->on_connect = std::bind(&MSConn::on_connect, this, std::placeholders::_1); + tcp_client_->on_disconnect = std::bind(&MSConn::on_disconnect, this, std::placeholders::_1); + tcp_client_->on_socketread = std::bind(&MSConn::on_socketread, this ,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + timer_ = a8::Timer::Instance()->AddRepeatTimer(1000 * 9 + a8::RandEx(500, 150), + a8::XParams().SetSender(this), + [] (const a8::XParams& param) + { + MSConn* conn = (MSConn*)param.sender.GetUserData(); + conn->CheckAlive(); + }); +} + +void MSConn::UnInit() +{ + MSConnMsgNode* work_node; + work_node = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + while (work_node) { + MSConnMsgNode* pdelnode = work_node; + work_node = work_node->next_node; + delete pdelnode->msg; + delete pdelnode; + } + + a8::Timer::Instance()->DeleteTimer(timer_); + timer_ = nullptr; + tcp_client_->Close(); + a8::IoLoop::Instance()->DestoryAsyncTcpClient(tcp_client_); + tcp_client_ = nullptr; + recv_bufflen_ = 0; + free(recv_buff_); + recv_buff_ = nullptr; +} + +void MSConn::Open() +{ + tcp_client_->Open(); +} + +void MSConn::Close() +{ + tcp_client_->Close(); +} + +bool MSConn::Connected() +{ + return tcp_client_->Connected(); +} + +void MSConn::SendStockMsg() +{ + MSConnMsgNode* work_node; + work_node = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + while (work_node) { + MSConnMsgNode* pdelnode = work_node; + work_node = work_node->next_node; + + if (pdelnode->msg) { + f8::Net_SendProxyCMsg(tcp_client_, pdelnode->socket_handle, pdelnode->msgid, *pdelnode->msg); + delete pdelnode->msg; + } + if (pdelnode->hdr) { + ForwardClientMsg(*pdelnode->hdr); + if (pdelnode->hdr->buf) { + free((char*)pdelnode->hdr->buf); + } + free(pdelnode->hdr); + } + delete pdelnode; + } +} + +void MSConn::ForwardClientMsg(f8::MsgHdr& hdr) +{ + char* buff = (char*)malloc(sizeof(f8::WSProxyPackHead_C) + hdr.buflen); + memset(buff, 0, sizeof(f8::WSProxyPackHead_C)); + f8::WSProxyPackHead_C* head = (f8::WSProxyPackHead_C*)buff; + head->packlen = hdr.buflen; + head->msgid = hdr.msgid; + head->seqid = hdr.seqid; + head->magic_code = f8::MAGIC_CODE; + #if 0 + head->rpc_error_code = 0; + #endif + head->socket_handle = hdr.socket_handle; + head->ip_saddr = hdr.ip_saddr; + + if (hdr.buflen > 0) { + memmove(buff + sizeof(f8::WSProxyPackHead_C), hdr.buf, hdr.buflen); + } + + tcp_client_->SendBuff(buff, sizeof(f8::WSProxyPackHead_C) + head->packlen); + free(buff); +} + +void MSConn::ForwardClientMsgEx(f8::MsgHdr* hdr) +{ + if (Connected()) { + if (top_node_) { + SendStockMsg(); + } + ForwardClientMsg(*hdr); + if (hdr->buf) { + free((char*)hdr->buf); + } + free(hdr); + } else { + AddStockMsg(hdr->socket_handle, 0, nullptr, hdr); + } +} + +void MSConn::on_error(a8::AsyncTcpClient* sender, int errorId) +{ + a8::UdpLog::Instance()->Error("target server errorid=%d remote_ip:%s remote_port:%d", + { + errorId, + sender->remote_address, + sender->remote_port + }); +} + +void MSConn::on_connect(a8::AsyncTcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("target server connected remote_ip:%s remote_port:%d", + { + sender->remote_address, + sender->remote_port + }); + App::Instance()->AddIMMsg(IM_MSConnConnect, + a8::XParams() + .SetSender(instance_id) + ); +} + +void MSConn::on_disconnect(a8::AsyncTcpClient* sender) +{ + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Info("target server %d disconnected after 10s later reconnect " + "remote_ip:%s remote_port:%d", + { + instance_id, + sender->remote_address, + sender->remote_port + }); + App::Instance()->AddIMMsg(IM_MSConnDisconnect, + a8::XParams() + .SetSender(instance_id) + ); +} + +void MSConn::on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len) +{ + #if 0 + ++App::Instance()->perf.read_count; + #endif + if (recv_bufflen_ + len > 2 * PACK_MAX) { + recv_bufflen_ = 0; + a8::UdpLog::Instance()->Debug("recvied target server too long message", {}); + return; + } else { + memmove(&recv_buff_[recv_bufflen_], buf, len); + recv_bufflen_ += len; + } + + bool warning = false; + unsigned int offset = 0; + while (recv_bufflen_ - offset >= sizeof(f8::WSProxyPackHead_S)) { + f8::WSProxyPackHead_S* p = (f8::WSProxyPackHead_S*) &recv_buff_[offset]; + int real_len = p->packlen + (p->ext_len << 16); + if (p->magic_code == f8::MAGIC_CODE) { + if (recv_bufflen_ - offset < sizeof(f8::WSProxyPackHead_S) + real_len) { + break; + } + App::Instance()->AddSocketMsg(SF_TargetServer, + p->socket_handle, + instance_id, + p->msgid, + p->seqid, + &recv_buff_[offset + sizeof(f8::WSProxyPackHead_S)], + real_len); + offset += sizeof(f8::WSProxyPackHead_S) + real_len; + } else { + warning = true; + offset++; + continue; + } + } + + if (warning) { + a8::UdpLog::Instance()->Debug("recvied bad package", {}); + } + if (offset > 0 && offset < recv_bufflen_) { + memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); + } + recv_bufflen_ -= offset; + #if 1 + last_pong_tick = a8::XGetTickCount(); + #endif +} + +void MSConn::CheckAlive() +{ + if (!Connected()) { + Open(); + } else { + if (a8::XGetTickCount() - last_pong_tick > 60 * 10 * 1000) { + last_pong_tick = a8::XGetTickCount(); + Open(); + } else { + ss::SS_Ping msg; + SendMsg(0, msg); + } + } +} + +void MSConn::AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, + f8::MsgHdr* hdr) +{ + MSConnMsgNode* node = new MSConnMsgNode(); + node->socket_handle = socket_handle; + node->msgid = msgid; + node->msg = msg; + node->hdr = hdr; + if (bot_node_) { + bot_node_->next_node = node; + bot_node_ = node; + } else { + top_node_ = node; + bot_node_ = node; + } +} diff --git a/server/imserver/MSConn.h b/server/imserver/MSConn.h new file mode 100644 index 0000000..1b0a93f --- /dev/null +++ b/server/imserver/MSConn.h @@ -0,0 +1,76 @@ +#pragma once + +#include "framework/cpp/protoutils.h" + +namespace a8 +{ + class TcpClient; + class AsyncTcpClient; +} + +struct MSConnMsgNode +{ + unsigned short socket_handle = 0; + int msgid = 0; + ::google::protobuf::Message* msg = nullptr; + f8::MsgHdr* hdr = nullptr; + + MSConnMsgNode* next_node = nullptr; +}; + +struct timer_list; +class MSConn +{ + public: + int instance_id = 0; + std::string remote_ip; + int remote_port = 0; + a8::tick_t last_pong_tick = 0; + + public: + void Init(int instance_id, const std::string& remote_ip, int remote_port); + void UnInit(); + + void Open(); + void Close(); + bool Connected(); + + template + void SendMsg(int socket_handle, T& msg) + { + static int msgid = f8::Net_GetMessageId(msg); + if (Connected()) { + if (top_node_) { + SendStockMsg(); + } + f8::Net_SendProxyCMsg(tcp_client_, socket_handle, msgid, msg); + } else { + T* new_msg = new T(); + *new_msg = msg; + AddStockMsg(socket_handle, msgid, new_msg, nullptr); + } + } + + void SendStockMsg(); + void ForwardClientMsg(f8::MsgHdr& hdr); + void ForwardClientMsgEx(f8::MsgHdr* hdr); + + private: + void on_error(a8::AsyncTcpClient* sender, int errorId); + void on_connect(a8::AsyncTcpClient* sender); + void on_disconnect(a8::AsyncTcpClient* sender); + void on_socketread(a8::AsyncTcpClient* sender, char* buf, unsigned int len); + + void CheckAlive(); + void AddStockMsg(unsigned short socket_handle, int msgid, ::google::protobuf::Message* msg, + f8::MsgHdr* hdr); + + private: + char *recv_buff_ = nullptr; + unsigned int recv_bufflen_ = 0; + a8::AsyncTcpClient* tcp_client_ = nullptr; + timer_list* timer_ = nullptr; + + MSConnMsgNode* top_node_ = nullptr; + MSConnMsgNode* bot_node_ = nullptr; +}; diff --git a/server/imserver/MSConnMgr.cc b/server/imserver/MSConnMgr.cc new file mode 100644 index 0000000..acc303c --- /dev/null +++ b/server/imserver/MSConnMgr.cc @@ -0,0 +1,50 @@ +#include "precompile.h" + +#include "MSConnMgr.h" +#include "MSConn.h" +#include "jsondatamgr.h" +#include "app.h" + +void MSConnMgr::Init() +{ +} + +void MSConnMgr::UnInit() +{ + for (auto& pair : id_hash_) { + pair.second->UnInit(); + delete pair.second; + } +} + +MSConn* MSConnMgr::GetConnByKey(const std::string& key) +{ + auto itr = key_hash_.find(key); + return itr != key_hash_.end() ? itr->second : nullptr; +} + +MSConn* MSConnMgr::GetConnById(int instance_id) +{ + auto itr = id_hash_.find(instance_id); + return itr != id_hash_.end() ? itr->second : nullptr; +} + +MSConn* MSConnMgr::RecreateMSConn(const std::string& host, int port) +{ + std::string key = host + ":" + a8::XValue(port).GetString(); + MSConn* conn = GetConnByKey(key); + if (conn) { + return conn; + } + while (GetConnById(++curr_id_)) {}; + int instance_id = curr_id_; + std::string remote_ip = host; + int remote_port = port; + + conn = new MSConn(); + conn->Init(instance_id, remote_ip, remote_port); + id_hash_[conn->instance_id] = conn; + key_hash_[key] = conn; + conn->Open(); + return conn; +} diff --git a/server/imserver/MSConnMgr.h b/server/imserver/MSConnMgr.h new file mode 100644 index 0000000..074d6a2 --- /dev/null +++ b/server/imserver/MSConnMgr.h @@ -0,0 +1,23 @@ +#pragma once + +class MSConn; +class MSConnMgr : public a8::Singleton +{ + private: + MSConnMgr() {}; + friend class a8::Singleton; + + public: + + void Init(); + void UnInit(); + + MSConn* GetConnByKey(const std::string& key); + MSConn* GetConnById(int instance_id); + MSConn* RecreateMSConn(const std::string& host, int port); + + private: + unsigned short curr_id_ = 1000; + std::map key_hash_; + std::map id_hash_; +}; diff --git a/server/imserver/constant.h b/server/imserver/constant.h index 5a26681..a03de9a 100644 --- a/server/imserver/constant.h +++ b/server/imserver/constant.h @@ -17,6 +17,10 @@ enum InnerMesssage_e IM_TargetConnConnect, IM_TargetConnDisconnect, IM_IMServerSocketDisconnect, + IM_IMConnConnect, + IM_IMConnDisconnect, + IM_MSConnConnect, + IM_MSConnDisconnect }; //网络处理对象