#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include enum IoLoopIMMsg_e { kFreeClient = 1, kShutdown = 2, kAsyncConnect = 3, kAsyncClose = 4, }; namespace a8 { struct IoLoopThreadContext { int epoll_fd = a8::INVALID_FD; long long tick = 0; a8::EventFD event_fd; a8::TimerFD timer_fd; a8::XTimer xtimer; std::mutex im_msg_mutex; std::list> im_msg_list; std::map connect_pending_hash; static long long XGetTickCountFunc(void* context) { IoLoopThreadContext* thread_context = (IoLoopThreadContext*)context; return thread_context->tick; } }; void IoLoop::Init(int thread_num) { if (thread_num < 1) { abort(); } thread_num_ = thread_num; event_id_ = 1000; max_client_num_ = 10000; for (int i = 0; i < thread_num; ++i) { int epoll_fd = ::epoll_create(max_client_num_); if (epoll_fd == a8::INVALID_FD) { abort(); } IoLoopThreadContext* thread_context = new IoLoopThreadContext(); thread_context->epoll_fd = epoll_fd; thread_context->xtimer.Init(&IoLoopThreadContext::XGetTickCountFunc, thread_context, 60, 1000); thread_context->event_fd.Init(thread_context); thread_context->event_fd.SetEpollFd(epoll_fd); thread_context->event_fd.OnEvent = std::bind(&IoLoop::OnEvent, this, std::placeholders::_1, std::placeholders::_2); thread_context->timer_fd.Init(thread_context); thread_context->timer_fd.SetEpollFd(epoll_fd); thread_context->timer_fd.Start(1000 * 10); thread_context->timer_fd.OnTimer = std::bind(&IoLoop::OnTimer, this, std::placeholders::_1, std::placeholders::_2); std::thread* thread = new std::thread(&a8::IoLoop::WorkerThreadProc, this, thread_context); worker_threads_.push_back(thread); thread_contexts_.push_back(thread_context); } } void IoLoop::UnInit() { worker_thread_shutdown_ = true; for (int i = 0; i < thread_num_; ++i) { AddIMMsg(thread_contexts_[i], kShutdown, a8::XParams()); worker_threads_[i]->join(); delete worker_threads_[i]; thread_contexts_[i]->event_fd.UnInit(); thread_contexts_[i]->timer_fd.UnInit(); ::close(thread_contexts_[i]->epoll_fd); delete thread_contexts_[i]; } thread_contexts_.clear(); worker_threads_.clear(); } AsyncTcpClient* IoLoop::CreateAsyncTcpClient() { AsyncTcpClient* client = new AsyncTcpClient(); IoLoopThreadContext* thread_context = thread_contexts_[(uintptr_t)client % thread_num_]; client->connect_timer_attacher.xtimer = &thread_context->xtimer; client->SetEpollFd(thread_context->epoll_fd); return client; } void IoLoop::DestoryAsyncTcpClient(AsyncTcpClient* client) { client->Close(); AddIMMsg( thread_contexts_[(uintptr_t)client % thread_num_], kFreeClient, a8::XParams() .SetSender(client) ); } void IoLoop::WorkerThreadProc(IoLoopThreadContext* context) { epoll_event *events = new epoll_event[max_client_num_]; while (!worker_thread_shutdown_) { ProcessIMMsg(context); int nfds = ::epoll_wait(context->epoll_fd, events, max_client_num_, -1); for (int i = 0; i < nfds; ++i) { a8::EpollEventHandler* handler = (a8::EpollEventHandler*)events[i].data.ptr; if (events[i].events & EPOLLOUT) { handler->DoSend(); ++send_times; } if (events[i].events & EPOLLIN) { handler->DoRecv(); ++recv_times; } if (events[i].events & EPOLLRDHUP || events[i].events & EPOLLHUP || events[i].events & EPOLLERR ) { handler->DoError(); ++error_times; } } ++run_times; } ProcessIMMsg(context); delete [] events; } void IoLoop::AddIMMsg(IoLoopThreadContext* context, int imcmd, a8::XParams params) { context->im_msg_mutex.lock(); context->im_msg_list.push_back(std::make_tuple( imcmd, params )); context->im_msg_mutex.unlock(); context->event_fd.Write(++event_id_); } void IoLoop::ProcessIMMsg(IoLoopThreadContext* context) { std::list> msg_list; context->im_msg_mutex.lock(); if (!context->im_msg_list.empty()) { context->im_msg_list.swap(msg_list); } context->im_msg_mutex.unlock(); if (!msg_list.empty()) { for (auto itr = msg_list.begin(); itr != msg_list.end(); ++itr) { int cmd = std::get<0>(*itr); a8::XParams& param = std::get<1>(*itr); switch (cmd) { case kFreeClient: { _IMFreeClient(context, param); ++free_times; } break; case kShutdown: { _IMShutdown(context, param); ++shutdown_times; } break; case kAsyncConnect: { _IMAsyncConnect(context, param); ++connect_times; } break; case kAsyncClose: { _IMAsyncClose(context, param); ++close_times; } break; } } ++immsg_times; } } void IoLoop::OnEvent(void* context, unsigned long long data) { #ifdef DEBUG a8::UdpLog::Instance()->Info("OnEvent %d", {data}); #endif ++event_times; } void IoLoop::OnTimer(void* context, unsigned long long data) { IoLoopThreadContext* thread_context = (IoLoopThreadContext*)context; ++thread_context->tick; ++timer_times; #ifdef DEBUG a8::UdpLog::Instance()->Info("OnTimer %d", {data}); #endif } void IoLoop::PostAsyncConnect(AsyncTcpClient* client, int timeout_ms) { AddIMMsg( thread_contexts_[(uintptr_t)client % thread_num_], kAsyncConnect, a8::XParams() .SetSender(client) .SetParam1(timeout_ms) ); } void IoLoop::PostAsyncClose(AsyncTcpClient* client) { AddIMMsg( thread_contexts_[(uintptr_t)client % thread_num_], kAsyncClose, a8::XParams() .SetSender(client) ); } void IoLoop::_IMFreeClient(IoLoopThreadContext* context, a8::XParams& param) { AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); if (client->ref_count != 1) { abort(); } delete client; } void IoLoop::_IMShutdown(IoLoopThreadContext* context, a8::XParams& param) { } void IoLoop::_IMAsyncConnect(IoLoopThreadContext* context, a8::XParams& param) { AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); client->DoAsyncConnect(); client->connect_timer_attacher.ClearTimerList(); context->xtimer.AddDeadLineTimerAndAttach( param.param1, a8::XParams() .SetSender(client), [] (const a8::XParams& param) { AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); client->DoAsyncClose(); if (client->on_error) { client->on_error(client, 111); } }, &client->connect_timer_attacher.timer_list_ ); } void IoLoop::_IMAsyncClose(IoLoopThreadContext* context, a8::XParams& param) { AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); client->connect_timer_attacher.ClearTimerList(); client->DoAsyncClose(); } }