diff --git a/a8/ioloop.h b/a8/ioloop.h index 25730c4..10e4806 100644 --- a/a8/ioloop.h +++ b/a8/ioloop.h @@ -1,6 +1,9 @@ #ifndef A8_IOLOOP_H #define A8_IOLOOP_H +#include +#include + namespace a8 { @@ -13,13 +16,13 @@ namespace a8 public: void Init(); void UnInit(); + volatile int epoll_fd = a8::INVALID_FD; private: void WorkerThreadProc(); private: int max_client_num_ = 1000; - volatile int epoll_fd = a8::INVALID_FD; volatile bool worker_thread_shutdown_ = false; std::thread* worker_thread_ = nullptr; }; diff --git a/a8/tcpclient2.cc b/a8/tcpclient2.cc index 5636b1e..b4fb83c 100644 --- a/a8/tcpclient2.cc +++ b/a8/tcpclient2.cc @@ -12,6 +12,7 @@ #include #include +#include static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10; static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64; @@ -21,6 +22,7 @@ namespace a8 TcpClient2::TcpClient2() { send_buffer_mutex_ = new std::mutex(); + epoll_fd = a8::IoLoop::Instance()->epoll_fd; } TcpClient2::~TcpClient2() @@ -116,21 +118,24 @@ namespace a8 void TcpClient2::DoSend() { + if (!connected_) { + if (on_connect) { + on_connect(this); + } + } if(socket_ == -1){ return; } - #if 0 if (!work_node_) { - send_buffer_mutex_.lock(); + send_buffer_mutex_->lock(); work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; - send_buffer_mutex_.unlock(); + send_buffer_mutex_->unlock(); } if (work_node_) { AsyncSend(); } - #endif } void TcpClient2::DoDisConnect() @@ -214,13 +219,56 @@ namespace a8 void TcpClient2::NotifyEpollSend() { sending_ = true; - #if 0 struct epoll_event ev; ev.data.fd = socket_; ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; ev.data.ptr = this; ::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev); - #endif + } + + void TcpClient2::AsyncSend() + { + while (work_node_) { + int sentbytes = ::send(socket_, + work_node_->buff + work_node_->sent_bytes, + work_node_->bufflen - work_node_->sent_bytes, + 0); + if (sentbytes <= 0) { + auto err_code = errno; + if (err_code == EAGAIN || err_code == EWOULDBLOCK) { + break; + } else { + Close(); + break; + } + } + work_node_->sent_bytes += sentbytes; + if (work_node_->sent_bytes >= work_node_->bufflen) { + a8::SendQueueNode *pdelnode = work_node_; + work_node_ = work_node_->next; + if (!work_node_) { + send_buffer_mutex_->lock(); + if (top_node_) { + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + } + if (!work_node_) { + sending_ = false; + struct epoll_event ev; + ev.data.fd = socket_; + ev.events = EPOLLIN | EPOLLRDHUP; + ev.data.ptr = this; + ::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev); + } + send_buffer_mutex_->unlock(); + } + if (pdelnode->buff) { + free(pdelnode->buff); + } + free(pdelnode); + } + } } } diff --git a/a8/tcpclient2.h b/a8/tcpclient2.h index 2db684e..8a61f6e 100644 --- a/a8/tcpclient2.h +++ b/a8/tcpclient2.h @@ -28,17 +28,20 @@ namespace a8 void DoDisConnect(); private: + volatile int epoll_fd = a8::INVALID_FD; volatile int socket_ = a8::INVALID_SOCKET; volatile bool sending_ = false; volatile bool connected_ = false; std::mutex* send_buffer_mutex_ = nullptr; SendQueueNode *top_node_ = nullptr; SendQueueNode *bot_node_ = nullptr; + SendQueueNode *work_node_ = nullptr; void SetActive(bool active); bool ActiveStart(); void ActiveStop(); void NotifyEpollSend(); + void AsyncSend(); }; }