This commit is contained in:
aozhiwei 2020-01-09 17:50:39 +08:00
parent e899665c7d
commit 67358105a4
4 changed files with 45 additions and 10 deletions

View File

@ -49,6 +49,7 @@ namespace a8
struct epoll_event ev; struct epoll_event ev;
ev.data.fd = fd_; ev.data.fd = fd_;
ev.events = EPOLLIN | EPOLLET; ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = this;
int ret = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_, &ev); int ret = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_, &ev);
if (ret != 0) { if (ret != 0) {
abort(); abort();

View File

@ -15,10 +15,13 @@
#include <a8/ioloop.h> #include <a8/ioloop.h>
#include <a8/asynctcpclient.h> #include <a8/asynctcpclient.h>
#include <a8/eventfd.h> #include <a8/eventfd.h>
#include <a8/timerfd.h>
#include <a8/udplog.h>
enum IoLoopIMMsg_e enum IoLoopIMMsg_e
{ {
kFreeClient = 1 kFreeClient = 1,
kShutdown = 2
}; };
namespace a8 namespace a8
@ -28,6 +31,7 @@ namespace a8
{ {
int epoll_fd = a8::INVALID_FD; int epoll_fd = a8::INVALID_FD;
a8::EventFD event_fd; a8::EventFD event_fd;
a8::TimerFD timer_fd;
std::mutex im_msg_mutex; std::mutex im_msg_mutex;
std::list<std::tuple<int, a8::XParams>> im_msg_list; std::list<std::tuple<int, a8::XParams>> im_msg_list;
}; };
@ -45,14 +49,21 @@ namespace a8
if (epoll_fd == a8::INVALID_FD) { if (epoll_fd == a8::INVALID_FD) {
abort(); abort();
} }
int event_fd = ::eventfd(0, 0);
if (event_fd == a8::INVALID_FD) {
abort();
}
IoLoopThreadContext* thread_context = new IoLoopThreadContext(); IoLoopThreadContext* thread_context = new IoLoopThreadContext();
thread_context->epoll_fd = epoll_fd; thread_context->epoll_fd = epoll_fd;
thread_context->event_fd.Init(thread_context); thread_context->event_fd.Init(thread_context);
thread_context->event_fd.SetEpollFd(epoll_fd); thread_context->event_fd.SetEpollFd(epoll_fd);
thread_context->event_fd.OnEvent = std::bind(&IoLoop::OnEvent,
this,
std::placeholders::_1,
std::placeholders::_2);
thread_context->timer_fd.Init(thread_context);
thread_context->timer_fd.SetEpollFd(epoll_fd);
thread_context->timer_fd.Start(1000 * 10);
thread_context->timer_fd.OnTimer = std::bind(&IoLoop::OnTimer,
this,
std::placeholders::_1,
std::placeholders::_2);
std::thread* thread = new std::thread(&a8::IoLoop::WorkerThreadProc, this, thread_context); std::thread* thread = new std::thread(&a8::IoLoop::WorkerThreadProc, this, thread_context);
worker_threads_.push_back(thread); worker_threads_.push_back(thread);
thread_contexts_.push_back(thread_context); thread_contexts_.push_back(thread_context);
@ -63,8 +74,11 @@ namespace a8
{ {
worker_thread_shutdown_ = true; worker_thread_shutdown_ = true;
for (int i = 0; i < thread_num_; ++i) { for (int i = 0; i < thread_num_; ++i) {
AddIMMsg(thread_contexts_[i], kShutdown, a8::XParams());
worker_threads_[i]->join(); worker_threads_[i]->join();
delete worker_threads_[i]; delete worker_threads_[i];
thread_contexts_[i]->event_fd.UnInit();
thread_contexts_[i]->timer_fd.UnInit();
::close(thread_contexts_[i]->epoll_fd); ::close(thread_contexts_[i]->epoll_fd);
delete thread_contexts_[i]; delete thread_contexts_[i];
} }
@ -81,6 +95,7 @@ namespace a8
void IoLoop::DestoryAsyncTcpClient(AsyncTcpClient* client) void IoLoop::DestoryAsyncTcpClient(AsyncTcpClient* client)
{ {
client->Close();
AddIMMsg( AddIMMsg(
thread_contexts_[(uintptr_t)client % thread_num_], thread_contexts_[(uintptr_t)client % thread_num_],
kFreeClient, kFreeClient,
@ -94,7 +109,7 @@ namespace a8
epoll_event *events = new epoll_event[max_client_num_]; epoll_event *events = new epoll_event[max_client_num_];
while (!worker_thread_shutdown_) { while (!worker_thread_shutdown_) {
ProcessIMMsg(context); ProcessIMMsg(context);
int nfds = ::epoll_wait(context->epoll_fd, events, max_client_num_, 1000 * 10); int nfds = ::epoll_wait(context->epoll_fd, events, max_client_num_, -1);
for (int i = 0; i < nfds; ++i) { for (int i = 0; i < nfds; ++i) {
a8::EpollEventHandler* handler = (a8::EpollEventHandler*)events[i].data.ptr; a8::EpollEventHandler* handler = (a8::EpollEventHandler*)events[i].data.ptr;
if (events[i].events & EPOLLOUT) { if (events[i].events & EPOLLOUT) {
@ -111,6 +126,7 @@ namespace a8
} }
} }
} }
ProcessIMMsg(context);
delete [] events; delete [] events;
} }
@ -149,4 +165,19 @@ namespace a8
} }
} }
void IoLoop::OnEvent(void* context, unsigned long long data)
{
#ifdef DEBUG
a8::UdpLog::Instance()->Info("OnEvent %d", {data});
#endif
}
void IoLoop::OnTimer(void* context, unsigned long long data)
{
AddIMMsg((IoLoopThreadContext*)context, 100, a8::XParams());
#ifdef DEBUG
a8::UdpLog::Instance()->Info("OnTimer %d", {data});
#endif
}
} }

View File

@ -26,6 +26,8 @@ namespace a8
void WorkerThreadProc(IoLoopThreadContext* context); void WorkerThreadProc(IoLoopThreadContext* context);
void AddIMMsg(IoLoopThreadContext* context, int imcmd, a8::XParams params); void AddIMMsg(IoLoopThreadContext* context, int imcmd, a8::XParams params);
void ProcessIMMsg(IoLoopThreadContext* context); void ProcessIMMsg(IoLoopThreadContext* context);
void OnEvent(void* context, unsigned long long data);
void OnTimer(void* context, unsigned long long data);
private: private:
std::atomic<unsigned long long> event_id_; std::atomic<unsigned long long> event_id_;

View File

@ -41,11 +41,11 @@ namespace a8
{ {
itimerspec new_value; itimerspec new_value;
new_value.it_interval.tv_sec = 0; new_value.it_interval.tv_sec = ms / 1000;
new_value.it_interval.tv_nsec = ms * 1000 * 1000; new_value.it_interval.tv_nsec = (ms % 1000) * 1000 * 1000;
new_value.it_value.tv_sec = 0; new_value.it_value.tv_sec = ms / 1000;
new_value.it_value.tv_nsec = ms * 1000 * 1000; new_value.it_value.tv_nsec = (ms % 1000) * 1000 * 1000;
if (timerfd_settime(fd_, 0, &new_value, nullptr) != 0) { if (timerfd_settime(fd_, 0, &new_value, nullptr) != 0) {
abort(); abort();
@ -72,6 +72,7 @@ namespace a8
struct epoll_event ev; struct epoll_event ev;
ev.data.fd = fd_; ev.data.fd = fd_;
ev.events = EPOLLIN | EPOLLET; ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = this;
int ret = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_, &ev); int ret = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_, &ev);
if (ret != 0) { if (ret != 0) {
abort(); abort();