From 67358105a4e2301c48f351d02fde1c59ef8a4a3e Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Thu, 9 Jan 2020 17:50:39 +0800 Subject: [PATCH] 1 --- a8/eventfd.cc | 1 + a8/ioloop.cc | 43 +++++++++++++++++++++++++++++++++++++------ a8/ioloop.h | 2 ++ a8/timerfd.cc | 9 +++++---- 4 files changed, 45 insertions(+), 10 deletions(-) diff --git a/a8/eventfd.cc b/a8/eventfd.cc index c0dd8cb..b10a51c 100644 --- a/a8/eventfd.cc +++ b/a8/eventfd.cc @@ -49,6 +49,7 @@ namespace a8 struct epoll_event ev; ev.data.fd = fd_; ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = this; int ret = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_, &ev); if (ret != 0) { abort(); diff --git a/a8/ioloop.cc b/a8/ioloop.cc index 6f9b0c0..99d8125 100644 --- a/a8/ioloop.cc +++ b/a8/ioloop.cc @@ -15,10 +15,13 @@ #include #include #include +#include +#include enum IoLoopIMMsg_e { - kFreeClient = 1 + kFreeClient = 1, + kShutdown = 2 }; namespace a8 @@ -28,6 +31,7 @@ namespace a8 { int epoll_fd = a8::INVALID_FD; a8::EventFD event_fd; + a8::TimerFD timer_fd; std::mutex im_msg_mutex; std::list> im_msg_list; }; @@ -45,14 +49,21 @@ namespace a8 if (epoll_fd == a8::INVALID_FD) { abort(); } - int event_fd = ::eventfd(0, 0); - if (event_fd == a8::INVALID_FD) { - abort(); - } IoLoopThreadContext* thread_context = new IoLoopThreadContext(); thread_context->epoll_fd = epoll_fd; thread_context->event_fd.Init(thread_context); thread_context->event_fd.SetEpollFd(epoll_fd); + thread_context->event_fd.OnEvent = std::bind(&IoLoop::OnEvent, + this, + std::placeholders::_1, + std::placeholders::_2); + thread_context->timer_fd.Init(thread_context); + thread_context->timer_fd.SetEpollFd(epoll_fd); + thread_context->timer_fd.Start(1000 * 10); + thread_context->timer_fd.OnTimer = std::bind(&IoLoop::OnTimer, + this, + std::placeholders::_1, + std::placeholders::_2); std::thread* thread = new std::thread(&a8::IoLoop::WorkerThreadProc, this, thread_context); worker_threads_.push_back(thread); thread_contexts_.push_back(thread_context); @@ -63,8 +74,11 @@ namespace a8 { worker_thread_shutdown_ = true; for (int i = 0; i < thread_num_; ++i) { + AddIMMsg(thread_contexts_[i], kShutdown, a8::XParams()); worker_threads_[i]->join(); delete worker_threads_[i]; + thread_contexts_[i]->event_fd.UnInit(); + thread_contexts_[i]->timer_fd.UnInit(); ::close(thread_contexts_[i]->epoll_fd); delete thread_contexts_[i]; } @@ -81,6 +95,7 @@ namespace a8 void IoLoop::DestoryAsyncTcpClient(AsyncTcpClient* client) { + client->Close(); AddIMMsg( thread_contexts_[(uintptr_t)client % thread_num_], kFreeClient, @@ -94,7 +109,7 @@ namespace a8 epoll_event *events = new epoll_event[max_client_num_]; while (!worker_thread_shutdown_) { ProcessIMMsg(context); - int nfds = ::epoll_wait(context->epoll_fd, events, max_client_num_, 1000 * 10); + int nfds = ::epoll_wait(context->epoll_fd, events, max_client_num_, -1); for (int i = 0; i < nfds; ++i) { a8::EpollEventHandler* handler = (a8::EpollEventHandler*)events[i].data.ptr; if (events[i].events & EPOLLOUT) { @@ -111,6 +126,7 @@ namespace a8 } } } + ProcessIMMsg(context); delete [] events; } @@ -149,4 +165,19 @@ namespace a8 } } + void IoLoop::OnEvent(void* context, unsigned long long data) + { + #ifdef DEBUG + a8::UdpLog::Instance()->Info("OnEvent %d", {data}); + #endif + } + + void IoLoop::OnTimer(void* context, unsigned long long data) + { + AddIMMsg((IoLoopThreadContext*)context, 100, a8::XParams()); + #ifdef DEBUG + a8::UdpLog::Instance()->Info("OnTimer %d", {data}); + #endif + } + } diff --git a/a8/ioloop.h b/a8/ioloop.h index fdb5687..f324286 100644 --- a/a8/ioloop.h +++ b/a8/ioloop.h @@ -26,6 +26,8 @@ namespace a8 void WorkerThreadProc(IoLoopThreadContext* context); void AddIMMsg(IoLoopThreadContext* context, int imcmd, a8::XParams params); void ProcessIMMsg(IoLoopThreadContext* context); + void OnEvent(void* context, unsigned long long data); + void OnTimer(void* context, unsigned long long data); private: std::atomic event_id_; diff --git a/a8/timerfd.cc b/a8/timerfd.cc index c69d332..8a03346 100644 --- a/a8/timerfd.cc +++ b/a8/timerfd.cc @@ -41,11 +41,11 @@ namespace a8 { itimerspec new_value; - new_value.it_interval.tv_sec = 0; - new_value.it_interval.tv_nsec = ms * 1000 * 1000; + new_value.it_interval.tv_sec = ms / 1000; + new_value.it_interval.tv_nsec = (ms % 1000) * 1000 * 1000; - new_value.it_value.tv_sec = 0; - new_value.it_value.tv_nsec = ms * 1000 * 1000; + new_value.it_value.tv_sec = ms / 1000; + new_value.it_value.tv_nsec = (ms % 1000) * 1000 * 1000; if (timerfd_settime(fd_, 0, &new_value, nullptr) != 0) { abort(); @@ -72,6 +72,7 @@ namespace a8 struct epoll_event ev; ev.data.fd = fd_; ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = this; int ret = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_, &ev); if (ret != 0) { abort();