From 1ba049779b32362fd06dd73afdd59b20ff50de89 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Tue, 14 Jan 2020 13:27:38 +0800 Subject: [PATCH] 1 --- a8/asynctcpclient.cc | 77 ++++++++++++++++++++++++++++++-------------- a8/asynctcpclient.h | 13 ++++++-- a8/ioloop.cc | 67 ++++++++++++++++++++++++++++++++++++-- a8/ioloop.h | 9 ++++++ 4 files changed, 136 insertions(+), 30 deletions(-) diff --git a/a8/asynctcpclient.cc b/a8/asynctcpclient.cc index 3fdeccd..c5a9228 100644 --- a/a8/asynctcpclient.cc +++ b/a8/asynctcpclient.cc @@ -14,13 +14,14 @@ #include #include -static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10; static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64; namespace a8 { + AsyncTcpClient::AsyncTcpClient() { + ref_count_ = 1; send_buffer_mutex_ = new std::mutex(); } @@ -122,12 +123,23 @@ namespace a8 void AsyncTcpClient::DoSend() { - if (!connected_) { - DoConnect(); - } if(socket_ == -1){ return; } + if (!connected_) { + int error = 0; + socklen_t len = 0; + ::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len); + if( !error ) { + DoConnect(); + } else { + connected_ = false; + if (on_error) { + on_error(this, error); + } + return; + } + } if (!work_node_) { send_buffer_mutex_->lock(); work_node_ = top_node_; @@ -151,20 +163,20 @@ namespace a8 void AsyncTcpClient::SetActive(bool active) { if (active) { - ActiveStart(); + a8::IoLoop::Instance()->PostAsyncConnect(this, timeout_ms); } else { - ActiveStop(); + a8::IoLoop::Instance()->PostAsyncClose(this); } } - bool AsyncTcpClient::ActiveStart() + void AsyncTcpClient::DoAsyncConnect() { socket_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (INVALID_SOCKET == socket_) { if (on_error) { on_error(this, errno); } - return false; + return; } //set nodelay { @@ -185,38 +197,53 @@ namespace a8 flags = ::fcntl(socket_, F_GETFL, 0); ::fcntl(socket_, F_SETFL, flags|O_NONBLOCK); } - //add epoll - { - struct epoll_event ev; - ev.data.fd = socket_; - ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR; - ev.data.ptr = this; - int n = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, socket_, &ev); - assert(n == 0); - if (n != 0) { - abort(); - } - } sockaddr_in sa; memset(&sa, 0, sizeof(sa)); sa.sin_family = AF_INET; sa.sin_addr.s_addr = inet_addr(remote_address.c_str()); sa.sin_port = htons(remote_port); int ret = ::connect(socket_, (sockaddr*)&sa, sizeof(sa)); - if (ret < 0) { - if (errno != EINPROGRESS) { + if (ret == 0) { + //add epoll + { + struct epoll_event ev; + ev.data.fd = socket_; + ev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR; + ev.data.ptr = this; + int n = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, socket_, &ev); + assert(n == 0); + if (n != 0) { + abort(); + } + } + DoConnect(); + } else if (ret < 0) { + if (errno == EINPROGRESS) { + //add epoll + { + struct epoll_event ev; + ev.data.fd = socket_; + ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR; + ev.data.ptr = this; + int n = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, socket_, &ev); + assert(n == 0); + if (n != 0) { + abort(); + } + } + } else { + connected_ = false; if (on_error) { on_error(this, errno); } ::close(socket_); socket_ = INVALID_SOCKET; - return false; + return; } } - return true; } - void AsyncTcpClient::ActiveStop() + void AsyncTcpClient::DoAsyncClose() { sending_ = false; connected_ = false; diff --git a/a8/asynctcpclient.h b/a8/asynctcpclient.h index eb61bb5..ad5c88b 100644 --- a/a8/asynctcpclient.h +++ b/a8/asynctcpclient.h @@ -1,10 +1,12 @@ #ifndef A8_ASYNC_TCPCLIENT_H #define A8_ASYNC_TCPCLIENT_H +#include #include namespace a8 { + class IoLoop; class AsyncTcpClient : public EpollEventHandler { public: @@ -14,6 +16,7 @@ namespace a8 std::function on_socketread; std::string remote_address; int remote_port = 0; + int timeout_ms = 1000 * 10; AsyncTcpClient(); virtual ~AsyncTcpClient(); @@ -32,6 +35,12 @@ namespace a8 virtual void DoError() override; private: + + void DoAsyncConnect(); + void DoAsyncClose(); + + private: + std::atomic ref_count_; volatile int epoll_fd_ = a8::INVALID_FD; volatile int socket_ = a8::INVALID_SOCKET; volatile bool sending_ = false; @@ -42,10 +51,10 @@ namespace a8 SendQueueNode *work_node_ = nullptr; void SetActive(bool active); - bool ActiveStart(); - void ActiveStop(); void NotifyEpollSend(); void AsyncSend(); + + friend class IoLoop; }; } diff --git a/a8/ioloop.cc b/a8/ioloop.cc index 99d8125..4b523eb 100644 --- a/a8/ioloop.cc +++ b/a8/ioloop.cc @@ -21,7 +21,9 @@ enum IoLoopIMMsg_e { kFreeClient = 1, - kShutdown = 2 + kShutdown = 2, + kAsyncConnect = 3, + kAsyncClose = 4, }; namespace a8 @@ -34,6 +36,7 @@ namespace a8 a8::TimerFD timer_fd; std::mutex im_msg_mutex; std::list> im_msg_list; + std::map connect_pending_hash; }; void IoLoop::Init(int thread_num) @@ -156,8 +159,22 @@ namespace a8 switch (cmd) { case kFreeClient: { - AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); - delete client; + _IMFreeClient(context, param); + } + break; + case kShutdown: + { + _IMShutdown(context, param); + } + break; + case kAsyncConnect: + { + _IMAsyncConnect(context, param); + } + break; + case kAsyncClose: + { + _IMAsyncClose(context, param); } break; } @@ -180,4 +197,48 @@ namespace a8 #endif } + void IoLoop::PostAsyncConnect(AsyncTcpClient* client, int timeout_ms) + { + AddIMMsg( + thread_contexts_[(uintptr_t)client % thread_num_], + kAsyncConnect, + a8::XParams() + .SetSender(client) + .SetParam1(timeout_ms) + ); + } + + void IoLoop::PostAsyncClose(AsyncTcpClient* client) + { + AddIMMsg( + thread_contexts_[(uintptr_t)client % thread_num_], + kAsyncClose, + a8::XParams() + .SetSender(client) + ); + } + + void IoLoop::_IMFreeClient(IoLoopThreadContext* context, a8::XParams& param) + { + AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); + delete client; + } + + void IoLoop::_IMShutdown(IoLoopThreadContext* context, a8::XParams& param) + { + + } + + void IoLoop::_IMAsyncConnect(IoLoopThreadContext* context, a8::XParams& param) + { + AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); + client->DoAsyncConnect(); + } + + void IoLoop::_IMAsyncClose(IoLoopThreadContext* context, a8::XParams& param) + { + AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); + client->DoAsyncClose(); + } + } diff --git a/a8/ioloop.h b/a8/ioloop.h index f324286..34224a4 100644 --- a/a8/ioloop.h +++ b/a8/ioloop.h @@ -28,6 +28,13 @@ namespace a8 void ProcessIMMsg(IoLoopThreadContext* context); void OnEvent(void* context, unsigned long long data); void OnTimer(void* context, unsigned long long data); + void PostAsyncConnect(AsyncTcpClient* client, int timeout_ms); + void PostAsyncClose(AsyncTcpClient* client); + + void _IMFreeClient(IoLoopThreadContext* context, a8::XParams& param); + void _IMShutdown(IoLoopThreadContext* context, a8::XParams& param); + void _IMAsyncConnect(IoLoopThreadContext* context, a8::XParams& param); + void _IMAsyncClose(IoLoopThreadContext* context, a8::XParams& param); private: std::atomic event_id_; @@ -36,6 +43,8 @@ namespace a8 volatile bool worker_thread_shutdown_ = false; std::vector thread_contexts_; std::vector worker_threads_; + + friend class AsyncTcpClient; }; }