From 1a7cd39c3268480a05d25ec26a5ab002814d7f5e Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Mon, 27 May 2019 09:38:20 +0800 Subject: [PATCH] add tcpsesion2.* --- a8/tcplistener.cc | 4 + a8/tcpsession.cc | 3 + a8/tcpsession.h | 3 +- a8/tcpsession2.cc | 395 ++++++++++++++++++++++++++++++++++++++++++++++ a8/tcpsession2.h | 74 +++++++++ 5 files changed, 478 insertions(+), 1 deletion(-) create mode 100644 a8/tcpsession2.cc create mode 100644 a8/tcpsession2.h diff --git a/a8/tcplistener.cc b/a8/tcplistener.cc index 9950728..15a79fa 100644 --- a/a8/tcplistener.cc +++ b/a8/tcplistener.cc @@ -10,7 +10,11 @@ #include #include +#ifdef A8_TCP_SESSION2 +#include +#else #include +#endif #include namespace a8 diff --git a/a8/tcpsession.cc b/a8/tcpsession.cc index 6eecfc3..8e17bd9 100644 --- a/a8/tcpsession.cc +++ b/a8/tcpsession.cc @@ -14,6 +14,8 @@ #include #include +#ifndef A8_TCP_SESSION2 + static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10; static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64; @@ -390,3 +392,4 @@ namespace a8 } } +#endif diff --git a/a8/tcpsession.h b/a8/tcpsession.h index cf39a4e..cbb30aa 100644 --- a/a8/tcpsession.h +++ b/a8/tcpsession.h @@ -1,6 +1,7 @@ #ifndef A8_TCPSESSION_H #define A8_TCPSESSION_H +#ifndef A8_TCP_SESSION2 #include namespace a8 @@ -69,5 +70,5 @@ namespace a8 }; } - +#endif #endif diff --git a/a8/tcpsession2.cc b/a8/tcpsession2.cc new file mode 100644 index 0000000..420e17a --- /dev/null +++ b/a8/tcpsession2.cc @@ -0,0 +1,395 @@ +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +#ifdef A8_TCP_SESSION2 + +static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10; +static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64; + +namespace a8 +{ + + TcpSession::TcpSession() + { + INIT_LIST_HEAD(&session_entry); + max_packet_len_ = DEFAULT_MAX_PACKET_LEN; + } + + TcpSession::~TcpSession() + { + Destory(); + } + + void TcpSession::SetMaxPacketLen(int max_packet_len) + { + max_packet_len_ = std::max(max_packet_len, DEFAULT_MAX_PACKET_LEN); + } + + int TcpSession::Socket() + { + return socket_; + } + + void TcpSession::OnError(int) + { + } + + void TcpSession::OnConnect() + { + } + + void TcpSession::OnDisConnect() + { + } + + void TcpSession::DecodePacket(char* buf, int& offset, unsigned int buflen) + { + DecodeUserPacket(buf, offset, buflen); + } + + bool TcpSession::Alive() + { + return is_activite || time(nullptr) - create_time < 60 * 5; + } + + void TcpSession::SendBuff(const char* buff, unsigned int bufflen) + { + if (socket_ == -1) { + return; + } + if (bufflen > 0) { + a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); + memset(p, 0, sizeof(a8::SendQueueNode)); + p->buff = (char*)malloc(bufflen); + memmove(p->buff, buff, bufflen); + p->bufflen = bufflen; +#ifdef NEW_NET + bool is_first_package = false; + send_buffer_mutex_.lock(); + if (!work_node_ && !top_node_) { + work_node_ = p; + is_first_package = true; + } else { + if (bot_node_) { + bot_node_->next = p; + bot_node_ = p; + } else { + top_node_ = p; + bot_node_ = p; + } + } + send_buffer_mutex_.unlock(); + if (work_node_ && is_first_package) { + AsyncSend(is_first_package); + } +#else + send_buffer_mutex_.lock(); + if(work_node_ == NULL && top_node_ == NULL){ + work_node_ = p; + AsyncSend(true); + }else{ + if (bot_node_){ + bot_node_->next = p; + bot_node_ = p; + }else{ + top_node_ = p; + bot_node_ = p; + } + } + send_buffer_mutex_.unlock(); +#endif + } + } + + void TcpSession::SendText(const std::string& text) + { + SendBuff(text.c_str(), text.size()); + } + + void TcpSession::SetSocket(int sock) + { + socket_ = sock; + } + + bool TcpSession::AllocRecvBuf() + { + if (!recv_buff_) { + recv_buff_ = (char *)malloc(max_packet_len_ + 1); + } + recv_bufflen_ = 0; + return recv_buff_ != nullptr; + } + + void TcpSession::Reset() + { + ClearSendBuff(); + socket_ = -1; + remote_address = ""; + remote_port = 0; + top_node_ = NULL; + bot_node_ = NULL; + work_node_ = NULL; + socket_handle = 0; + recv_bufflen_ = 0; + is_activite = false; + } + + void TcpSession::Destory() + { + if (recv_buff_) { + recv_bufflen_ = 0; + free(recv_buff_); + recv_buff_ = NULL; + } + Close(); + ClearSendBuff(); + } + + void TcpSession::_ForceClose() + { + if (socket_ != -1) { + int oldsocket = socket_; + ::close(socket_); + socket_ = -1; + struct epoll_event ev; + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, oldsocket, &ev); + master->FreeClientWithNoLock(this); + } + } + + void TcpSession::OnSocketRead(char* buf, unsigned int buflen) + { + unsigned int already_read_bytes = 0; + do { + if (already_read_bytes < buflen) { + int read_bytes = std::min(buflen - already_read_bytes, + (unsigned int)max_packet_len_ - recv_bufflen_); + if (read_bytes > 0) { + memmove(&recv_buff_[recv_bufflen_], buf + already_read_bytes, read_bytes); + recv_bufflen_ += read_bytes; + already_read_bytes += read_bytes; + } + } + + int offset = 0; + int prev_offset = 0; + do { + prev_offset = offset; + DecodePacket(recv_buff_, offset, recv_bufflen_); + } while (prev_offset < offset && offset < recv_bufflen_); + + if (offset > 0 && offset < recv_bufflen_){ + memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset); + } + recv_bufflen_ -= offset; + if (recv_bufflen_ >= max_packet_len_) { + //收到超长包 + Close(); + return; + } + } while (already_read_bytes < buflen); + } + + void TcpSession::DoClientRecv() + { + if (socket_ == -1) { + return; + } + char recvbuf[DEFAULT_MAX_RECV_BUFFERSIZE]; + while (true) { +#ifdef A8_PERF + a8::tick_t begin_tick = a8::XGetTickCount(); +#endif + int ret = ::recv(socket_, recvbuf, DEFAULT_MAX_RECV_BUFFERSIZE, 0); +#ifdef A8_PERF + a8::tick_t end_tick = a8::XGetTickCount(); + if (end_tick - begin_tick > a8::PerfMonitor::Instance()->max_recv_time) { + a8::PerfMonitor::Instance()->max_recv_time = end_tick - begin_tick; + } +#endif + if (ret < 0) { + if (errno != EAGAIN) { + Close(); + return; + } else { +#ifdef A8_PERF + ++a8::PerfMonitor::Instance()->recv_eagain_times; +#endif + } + break; + } else if (ret == 0) { + Close(); + return; + } else { + OnSocketRead(recvbuf, ret); + if (ret < DEFAULT_MAX_RECV_BUFFERSIZE) { + break; + } + } + } + } + + void TcpSession::DoClientSend() + { + if(socket_ == -1){ + return; + } +#ifdef NEW_NET + if (!work_node_) { + send_buffer_mutex_.lock(); + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + send_buffer_mutex_.unlock(); + } + if (work_node_) { + AsyncSend(false); + } +#else + send_buffer_mutex_.lock(); + if (!work_node_) { + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + } + if (work_node_) { + AsyncSend(false); + } + send_buffer_mutex_.unlock(); +#endif + } + + void TcpSession::Close() + { + if (socket_ != -1){ + master->LockClients(); + _ForceClose(); + master->UnLockClients(); + } + OnDisConnect(); + } + + void TcpSession::ClearSendBuff() + { + a8::SendQueueNode* p_top_node_ = nullptr; + a8::SendQueueNode* p_work_node_ = nullptr; + send_buffer_mutex_.lock(); + p_top_node_ = top_node_; + p_work_node_ = work_node_; + top_node_ = NULL; + bot_node_ = NULL; + work_node_ = NULL; + send_buffer_mutex_.unlock(); + + a8::SendQueueNode *pdelnode = NULL; + while (p_top_node_) { + pdelnode = p_top_node_; + p_top_node_ = p_top_node_->next; + if (pdelnode->buff) { + free(pdelnode->buff); + } + free(pdelnode); + } + while (p_work_node_) { + pdelnode = p_work_node_; + p_work_node_ = p_work_node_->next; + if (pdelnode->buff) { + free(pdelnode->buff); + } + free(pdelnode); + } + } + + void TcpSession::AsyncSend(bool is_first_package) + { + while (work_node_) { +#ifdef A8_PERF + a8::tick_t begin_tick = a8::XGetTickCount(); +#endif + int sentbytes = ::send(socket_, + work_node_->buff + work_node_->sent_bytes, + work_node_->bufflen - work_node_->sent_bytes, + 0); +#ifdef A8_PERF + a8::tick_t end_tick = a8::XGetTickCount(); + if (end_tick - begin_tick > a8::PerfMonitor::Instance()->max_send_time) { + a8::PerfMonitor::Instance()->max_send_time = end_tick - begin_tick; + } +#endif + if (sentbytes <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { +#ifdef A8_PERF + ++a8::PerfMonitor::Instance()->send_eagain_times; +#endif +#ifdef NEW_NET + if (is_first_package) { + struct epoll_event ev; + ev.data.fd = socket_; + ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP; + ev.data.ptr = this; + ::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev); + return; + } +#endif + continue; + } else { + break; + } + } + work_node_->sent_bytes += sentbytes; + if (work_node_->sent_bytes >= work_node_->bufflen) { + a8::SendQueueNode *pdelnode = work_node_; +#ifdef NEW_NET +#else + a8::SendQueueNode *nextnode = work_node_->next; +#endif +#ifdef NEW_NET + send_buffer_mutex_.lock(); + work_node_ = work_node_->next; + if (!work_node_) { + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + } +#ifdef NEW_NET + if (!work_node_ && !is_first_package) { + struct epoll_event ev; + ev.data.fd = socket_; + ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; + ev.data.ptr = this; + ::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev); + } +#endif + send_buffer_mutex_.unlock(); +#else + work_node_ = work_node_->next; //!!!!要处理重入问题 +#endif + if (pdelnode->buff) { + free(pdelnode->buff); + } + free(pdelnode); +#ifdef NEW_NET +#else + if (!nextnode) { + break; + } +#endif + } + } + } + +} +#endif diff --git a/a8/tcpsession2.h b/a8/tcpsession2.h new file mode 100644 index 0000000..04385db --- /dev/null +++ b/a8/tcpsession2.h @@ -0,0 +1,74 @@ +#ifndef A8_TCPSESSION2_H +#define A8_TCPSESSION2_H + +#ifdef A8_TCP_SESSION2 +#include + +namespace a8 +{ + + struct SendQueueNode; + class TcpListener; + class TcpSession + { + public: + bool is_activite = false; + time_t create_time = 0; + a8::TcpListener* master = nullptr; + unsigned long saddr = 0; + std::string remote_address; + int remote_port = 0; + int socket_handle = -1; + int epoll_fd = 0; + list_head session_entry; + + public: + TcpSession(); + virtual ~TcpSession(); + + void SetMaxPacketLen(int max_packet_len); + int Socket(); + + virtual void OnError(int); + virtual void OnConnect(); + virtual void OnDisConnect(); + bool Alive(); + + protected: + virtual void SendBuff(const char* buff, unsigned int bufflen); + void SendText(const std::string& text); + void SetSocket(int sock); + bool AllocRecvBuf(); + virtual void Reset(); + virtual void Destory(); + void _ForceClose(); + virtual void OnSocketRead(char* buf, unsigned int buflen); + virtual void DecodePacket(char* buf, int& offset, unsigned int buflen); + virtual void DecodeUserPacket(char* buf, int& offset, unsigned int buflen) = 0; + void DoClientRecv(); + void DoClientSend(); + void Close(); + + private: + + void ClearSendBuff(); + void AsyncSend(bool is_first_package); + + protected: + char *recv_buff_ = nullptr; + int recv_bufflen_ = 0; + int max_packet_len_ = 0; + + private: + int socket_ = 0; + a8::SendQueueNode* top_node_ = nullptr; + a8::SendQueueNode* bot_node_ = nullptr; + a8::SendQueueNode* work_node_ = nullptr; + std::mutex send_buffer_mutex_; + friend class TcpListener; + friend class TcpListenerImpl; + }; + +} +#endif +#endif