From fe32c02b9e7c96c8f018cc29c638c5ada4d9c4ba Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Tue, 14 Jan 2020 21:16:04 +0800 Subject: [PATCH] 1 --- a8/asynctcpclient.cc | 34 +++++++++++++++++++++++++++------- a8/asynctcpclient.h | 11 +++++++---- a8/ioloop.cc | 40 ++++++++++++++++++++++++++++++++++++++-- a8/sysutils.cc | 13 +++++++++++++ a8/sysutils.h | 2 ++ 5 files changed, 87 insertions(+), 13 deletions(-) diff --git a/a8/asynctcpclient.cc b/a8/asynctcpclient.cc index c5a9228..301e121 100644 --- a/a8/asynctcpclient.cc +++ b/a8/asynctcpclient.cc @@ -21,15 +21,17 @@ namespace a8 AsyncTcpClient::AsyncTcpClient() { - ref_count_ = 1; + ref_count = 1; send_buffer_mutex_ = new std::mutex(); } AsyncTcpClient::~AsyncTcpClient() { + if (!list_empty(&connect_timer_attacher.timer_list_)) { + abort(); + } Close(); - delete send_buffer_mutex_; - send_buffer_mutex_ = nullptr; + A8_SAFE_DELETE(send_buffer_mutex_); } void AsyncTcpClient::Open() @@ -128,12 +130,18 @@ namespace a8 } if (!connected_) { int error = 0; - socklen_t len = 0; + socklen_t len = sizeof(error); ::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len); if( !error ) { DoConnect(); } else { connected_ = false; + if (socket_ != a8::INVALID_SOCKET) { + ::shutdown(socket_, 2); + ::close(socket_); + } + socket_ = INVALID_SOCKET; + ClearSendBuff(); if (on_error) { on_error(this, error); } @@ -247,11 +255,23 @@ namespace a8 { sending_ = false; connected_ = false; - if (socket_ != INVALID_SOCKET) { - shutdown(socket_, 2); + if (socket_ != a8::INVALID_SOCKET) { + ::shutdown(socket_, 2); ::close(socket_); } - socket_ = INVALID_SOCKET; + socket_ = a8::INVALID_SOCKET; + ClearSendBuff(); + } + + void AsyncTcpClient::ClearSendBuff() + { + send_buffer_mutex_->lock(); + a8::ClearSendQueue(top_node_); + top_node_ = nullptr; + bot_node_ = nullptr; + send_buffer_mutex_->unlock(); + a8::ClearSendQueue(work_node_); + work_node_ = nullptr; } void AsyncTcpClient::NotifyEpollSend() diff --git a/a8/asynctcpclient.h b/a8/asynctcpclient.h index ad5c88b..41fcff5 100644 --- a/a8/asynctcpclient.h +++ b/a8/asynctcpclient.h @@ -2,6 +2,7 @@ #define A8_ASYNC_TCPCLIENT_H #include +#include #include namespace a8 @@ -17,6 +18,8 @@ namespace a8 std::string remote_address; int remote_port = 0; int timeout_ms = 1000 * 10; + a8::XTimerAttacher connect_timer_attacher; + std::atomic ref_count; AsyncTcpClient(); virtual ~AsyncTcpClient(); @@ -38,17 +41,17 @@ namespace a8 void DoAsyncConnect(); void DoAsyncClose(); + void ClearSendBuff(); private: - std::atomic ref_count_; volatile int epoll_fd_ = a8::INVALID_FD; volatile int socket_ = a8::INVALID_SOCKET; volatile bool sending_ = false; volatile bool connected_ = false; std::mutex* send_buffer_mutex_ = nullptr; - SendQueueNode *top_node_ = nullptr; - SendQueueNode *bot_node_ = nullptr; - SendQueueNode *work_node_ = nullptr; + a8::SendQueueNode *top_node_ = nullptr; + a8::SendQueueNode *bot_node_ = nullptr; + a8::SendQueueNode *work_node_ = nullptr; void SetActive(bool active); void NotifyEpollSend(); diff --git a/a8/ioloop.cc b/a8/ioloop.cc index 4b523eb..9853016 100644 --- a/a8/ioloop.cc +++ b/a8/ioloop.cc @@ -17,6 +17,7 @@ #include #include #include +#include enum IoLoopIMMsg_e { @@ -32,11 +33,20 @@ namespace a8 struct IoLoopThreadContext { int epoll_fd = a8::INVALID_FD; + long long tick = 0; a8::EventFD event_fd; a8::TimerFD timer_fd; + a8::XTimer xtimer; std::mutex im_msg_mutex; std::list> im_msg_list; std::map connect_pending_hash; + + static long long XGetTickCountFunc(void* context) + { + IoLoopThreadContext* thread_context = (IoLoopThreadContext*)context; + return thread_context->tick; + } + }; void IoLoop::Init(int thread_num) @@ -54,6 +64,10 @@ namespace a8 } IoLoopThreadContext* thread_context = new IoLoopThreadContext(); thread_context->epoll_fd = epoll_fd; + thread_context->xtimer.Init(&IoLoopThreadContext::XGetTickCountFunc, + thread_context, + 60, + 1000); thread_context->event_fd.Init(thread_context); thread_context->event_fd.SetEpollFd(epoll_fd); thread_context->event_fd.OnEvent = std::bind(&IoLoop::OnEvent, @@ -92,7 +106,9 @@ namespace a8 AsyncTcpClient* IoLoop::CreateAsyncTcpClient() { AsyncTcpClient* client = new AsyncTcpClient(); - client->SetEpollFd(thread_contexts_[(uintptr_t)client % thread_num_]->epoll_fd); + IoLoopThreadContext* thread_context = thread_contexts_[(uintptr_t)client % thread_num_]; + client->connect_timer_attacher.xtimer = &thread_context->xtimer; + client->SetEpollFd(thread_context->epoll_fd); return client; } @@ -191,7 +207,8 @@ namespace a8 void IoLoop::OnTimer(void* context, unsigned long long data) { - AddIMMsg((IoLoopThreadContext*)context, 100, a8::XParams()); + IoLoopThreadContext* thread_context = (IoLoopThreadContext*)context; + ++thread_context->tick; #ifdef DEBUG a8::UdpLog::Instance()->Info("OnTimer %d", {data}); #endif @@ -221,6 +238,9 @@ namespace a8 void IoLoop::_IMFreeClient(IoLoopThreadContext* context, a8::XParams& param) { AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); + if (client->ref_count != 1) { + abort(); + } delete client; } @@ -233,11 +253,27 @@ namespace a8 { AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); client->DoAsyncConnect(); + client->connect_timer_attacher.ClearTimerList(); + context->xtimer.AddDeadLineTimerAndAttach( + param.param1, + a8::XParams() + .SetSender(client), + [] (const a8::XParams& param) + { + AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); + client->DoAsyncClose(); + if (client->on_error) { + client->on_error(client, 111); + } + }, + &client->connect_timer_attacher.timer_list_ + ); } void IoLoop::_IMAsyncClose(IoLoopThreadContext* context, a8::XParams& param) { AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); + client->connect_timer_attacher.ClearTimerList(); client->DoAsyncClose(); } diff --git a/a8/sysutils.cc b/a8/sysutils.cc index 9c50bba..b01e928 100644 --- a/a8/sysutils.cc +++ b/a8/sysutils.cc @@ -251,4 +251,17 @@ namespace a8 return angle; } + void ClearSendQueue(a8::SendQueueNode* node) + { + a8::SendQueueNode* tmp_node = node; + while (tmp_node) { + a8::SendQueueNode* pdelnode = tmp_node; + tmp_node = pdelnode->next; + if (pdelnode->buff) { + free(pdelnode->buff); + } + free(pdelnode); + } + } + } diff --git a/a8/sysutils.h b/a8/sysutils.h index 0ace7a6..b66b8b7 100644 --- a/a8/sysutils.h +++ b/a8/sysutils.h @@ -88,6 +88,8 @@ namespace a8 int High32(long long int64_val); float RandAngle(); + + void ClearSendQueue(a8::SendQueueNode* node); } #endif