From 9c2a3520f5459f6260ac4b9b34958ef4c8776a80 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Mon, 3 Jun 2019 14:55:59 +0800 Subject: [PATCH] 1 --- a8/ioloop.cc | 30 +++++++++++++++-- a8/ioloop.h | 9 ++++- a8/tcpclient2.cc | 88 ++++++++++++++++++++++++++++++++++++++++++------ a8/tcpclient2.h | 10 ++++-- 4 files changed, 119 insertions(+), 18 deletions(-) diff --git a/a8/ioloop.cc b/a8/ioloop.cc index a251f91..7262148 100644 --- a/a8/ioloop.cc +++ b/a8/ioloop.cc @@ -11,23 +11,47 @@ #include #include +#include namespace a8 { + void IoLoop::Init() { - epoll_fd = ::epoll_create(10000); + max_client_num_ = 10000; + epoll_fd = ::epoll_create(max_client_num_); assert(epoll_fd != a8::INVALID_FD); + worker_thread_ = new std::thread(&a8::IoLoop::WorkerThreadProc, this); } void IoLoop::UnInit() { ::close(epoll_fd); epoll_fd = a8::INVALID_FD; + delete worker_thread_; + worker_thread_ = nullptr; } - void IoLoop::Update() + void IoLoop::WorkerThreadProc() { - + epoll_event *events = new epoll_event[max_client_num_]; + while (!worker_thread_shutdown_) { + int nfds = ::epoll_wait(epoll_fd, events, max_client_num_, 1000 * 10); + for (int i = 0; i < nfds; ++i) { + a8::TcpClient2* session = (a8::TcpClient2*)events[i].data.ptr; + if (events[i].events & EPOLLOUT) { + session->DoSend(); + } else if (events[i].events & EPOLLIN) { + session->DoRecv(); + } else if (events[i].events & EPOLLRDHUP || + events[i].events & EPOLLHUP || + events[i].events & EPOLLERR + ) { + session->DoDisConnect(); + } + } + } + delete [] events; } + } diff --git a/a8/ioloop.h b/a8/ioloop.h index b32966e..25730c4 100644 --- a/a8/ioloop.h +++ b/a8/ioloop.h @@ -3,6 +3,7 @@ namespace a8 { + class IoLoop : public a8::Singleton { private: @@ -12,11 +13,17 @@ namespace a8 public: void Init(); void UnInit(); - void Update(); private: + void WorkerThreadProc(); + + private: + int max_client_num_ = 1000; volatile int epoll_fd = a8::INVALID_FD; + volatile bool worker_thread_shutdown_ = false; + std::thread* worker_thread_ = nullptr; }; + } #endif diff --git a/a8/tcpclient2.cc b/a8/tcpclient2.cc index 550b104..95b1f4f 100644 --- a/a8/tcpclient2.cc +++ b/a8/tcpclient2.cc @@ -13,15 +13,14 @@ #include #include -const int MAX_RECV_BUFFERSIZE = 1024 * 10; +static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10; +static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64; namespace a8 { TcpClient2::TcpClient2() { send_buffer_mutex_ = new std::mutex(); - send_cond_mutex_ = new std::mutex(); - send_cond_ = new std::condition_variable(); } TcpClient2::~TcpClient2() @@ -29,10 +28,6 @@ namespace a8 Close(); delete send_buffer_mutex_; send_buffer_mutex_ = nullptr; - delete send_cond_mutex_; - send_cond_mutex_ = nullptr; - delete send_cond_; - send_cond_ = nullptr; } void TcpClient2::Open() @@ -75,8 +70,72 @@ namespace a8 top_node_ = p; bot_node_ = p; } + if (!sending_) { + NotifyEpollSend(); + } send_buffer_mutex_->unlock(); - NotifySendCond(); + } + } + + void TcpClient2::DoConnect() + { + connected_ = true; + if (on_connect) { + on_connect(this); + } + } + + void TcpClient2::DoRecv() + { + if (socket_ == -1) { + return; + } + char recvbuf[DEFAULT_MAX_RECV_BUFFERSIZE]; + while (true) { + int ret = ::recv(socket_, recvbuf, DEFAULT_MAX_RECV_BUFFERSIZE, 0); + if (ret < 0) { + if (errno != EAGAIN) { + Close(); + return; + } else { + } + break; + } else if (ret == 0) { + Close(); + return; + } else { + if (on_socketread) { + on_socketread(this, recvbuf, ret); + } + if (ret < DEFAULT_MAX_RECV_BUFFERSIZE) { + break; + } + } + } + } + + void TcpClient2::DoSend() + { + if(socket_ == -1){ + return; + } + if (!work_node_) { + send_buffer_mutex_.lock(); + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + send_buffer_mutex_.unlock(); + } + if (work_node_) { + AsyncSend(); + } + } + + void TcpClient2::DoDisConnect() + { + connected_ = false; + if (on_disconnect) { + on_disconnect(this); } } @@ -141,6 +200,7 @@ namespace a8 void TcpClient2::ActiveStop() { + sending_ = false; connected_ = false; if (socket_ != INVALID_SOCKET) { shutdown(socket_, 2); @@ -149,10 +209,16 @@ namespace a8 socket_ = INVALID_SOCKET; } - void TcpClient2::NotifySendCond() + void TcpClient2::NotifyEpollSend() { - std::unique_lock lk(*send_cond_mutex_); - send_cond_->notify_all(); + sending_ = true; + #if 0 + struct epoll_event ev; + ev.data.fd = socket_; + ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; + ev.data.ptr = this; + ::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev); + #endif } } diff --git a/a8/tcpclient2.h b/a8/tcpclient2.h index 4a91ab2..2db684e 100644 --- a/a8/tcpclient2.h +++ b/a8/tcpclient2.h @@ -22,19 +22,23 @@ namespace a8 bool Connected(); void SendBuff(const char* buff, unsigned int bufflen); + void DoConnect(); + void DoRecv(); + void DoSend(); + void DoDisConnect(); + private: volatile int socket_ = a8::INVALID_SOCKET; + volatile bool sending_ = false; volatile bool connected_ = false; std::mutex* send_buffer_mutex_ = nullptr; SendQueueNode *top_node_ = nullptr; SendQueueNode *bot_node_ = nullptr; - std::mutex *send_cond_mutex_ = nullptr; - std::condition_variable *send_cond_ = nullptr; void SetActive(bool active); bool ActiveStart(); void ActiveStop(); - void NotifySendCond(); + void NotifyEpollSend(); }; }