add timerfd eventfd

This commit is contained in:
aozhiwei 2020-01-09 15:59:26 +08:00
parent f9f17a60fb
commit 28ec90a305
10 changed files with 367 additions and 57 deletions

View File

@ -22,7 +22,6 @@ namespace a8
AsyncTcpClient::AsyncTcpClient()
{
send_buffer_mutex_ = new std::mutex();
epoll_fd = a8::IoLoop::Instance()->epoll_fd;
}
AsyncTcpClient::~AsyncTcpClient()
@ -87,6 +86,11 @@ namespace a8
}
}
void AsyncTcpClient::SetEpollFd(int epoll_fd)
{
epoll_fd_ = epoll_fd;
}
void AsyncTcpClient::DoRecv()
{
if (socket_ == -1) {
@ -136,7 +140,7 @@ namespace a8
}
}
void AsyncTcpClient::DoDisConnect()
void AsyncTcpClient::DoError()
{
connected_ = false;
if (on_disconnect) {
@ -187,7 +191,7 @@ namespace a8
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR;
ev.data.ptr = this;
int n = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_, &ev);
int n = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, socket_, &ev);
assert(n == 0);
if (n != 0) {
abort();
@ -230,7 +234,7 @@ namespace a8
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, socket_, &ev);
}
void AsyncTcpClient::AsyncSend()
@ -266,7 +270,7 @@ namespace a8
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, socket_, &ev);
}
send_buffer_mutex_->unlock();
}

View File

@ -1,9 +1,11 @@
#ifndef A8_ASYNC_TCPCLIENT_H
#define A8_ASYNC_TCPCLIENT_H
#include <a8/epolleventhandler.h>
namespace a8
{
class AsyncTcpClient
class AsyncTcpClient : public EpollEventHandler
{
public:
std::function<void (a8::AsyncTcpClient*, int)> on_error;
@ -23,12 +25,14 @@ namespace a8
void SendBuff(const char* buff, unsigned int bufflen);
void DoConnect();
void DoRecv();
void DoSend();
void DoDisConnect();
virtual void SetEpollFd(int epoll_fd) override;
virtual void DoRecv() override;
virtual void DoSend() override;
virtual void DoError() override;
private:
volatile int epoll_fd = a8::INVALID_FD;
volatile int epoll_fd_ = a8::INVALID_FD;
volatile int socket_ = a8::INVALID_SOCKET;
volatile bool sending_ = false;
volatile bool connected_ = false;

16
a8/epolleventhandler.h Normal file
View File

@ -0,0 +1,16 @@
#ifndef A8_EPOLLEVENTHANDLER_H
#define A8_EPOLLEVENTHANDLER_H
namespace a8
{
class EpollEventHandler
{
public:
virtual void SetEpollFd(int epoll_fd) = 0;
virtual void DoSend() = 0;
virtual void DoRecv() = 0;
virtual void DoError() = 0;
};
}
#endif

59
a8/eventfd.cc Normal file
View File

@ -0,0 +1,59 @@
#include <unistd.h>
#include <sys/eventfd.h>
#include <a8/a8.h>
#include <a8/eventfd.h>
namespace a8
{
EventFD::EventFD()
{
fd_ = ::eventfd(0, 0);
if (fd_ == a8::INVALID_FD) {
abort();
}
}
EventFD::~EventFD()
{
::close(fd_);
fd_ = a8::INVALID_FD;
}
void EventFD::Init(void* context)
{
context_ = context;
}
void EventFD::Write(unsigned long long value)
{
eventfd_write(fd_, value);
}
void EventFD::SetEpollFd(int epoll_fd)
{
epoll_fd_ = epoll_fd;
}
void EventFD::DoRecv()
{
eventfd_t value = 0;
::eventfd_read(fd_, &value);
if (OnEvent) {
OnEvent(context_, value);
}
}
void EventFD::DoSend()
{
abort();
}
void EventFD::DoError()
{
abort();
}
}

30
a8/eventfd.h Normal file
View File

@ -0,0 +1,30 @@
#ifndef A8_EVENTFD_H
#define A8_EVENTFD_H
#include <a8/epolleventhandler.h>
namespace a8
{
class EventFD : public EpollEventHandler
{
public:
std::function<void (void*, unsigned long long)> OnEvent;
EventFD();
virtual ~EventFD();
void Init(void* context);
void Write(unsigned long long value);
virtual void SetEpollFd(int epoll_fd) override;
virtual void DoRecv() override;
virtual void DoSend() override;
virtual void DoError() override;
private:
int epoll_fd_ = a8::INVALID_FD;
void* context_ = nullptr;
int fd_ = a8::INVALID_FD;
};
}
#endif

View File

@ -6,52 +6,147 @@
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <netinet/tcp.h>
#include <mutex>
#include <thread>
#include <a8/ioloop.h>
#include <a8/asynctcpclient.h>
#include <a8/eventfd.h>
enum IoLoopIMMsg_e
{
kFreeClient = 1
};
namespace a8
{
void IoLoop::Init()
struct IoLoopThreadContext
{
int epoll_fd = a8::INVALID_FD;
a8::EventFD event_fd;
std::mutex im_msg_mutex;
std::list<std::tuple<int, a8::XParams>> im_msg_list;
};
void IoLoop::Init(int thread_num)
{
if (thread_num < 1) {
abort();
}
thread_num_ = thread_num;
event_id_ = 1000;
max_client_num_ = 10000;
epoll_fd = ::epoll_create(max_client_num_);
assert(epoll_fd != a8::INVALID_FD);
worker_thread_ = new std::thread(&a8::IoLoop::WorkerThreadProc, this);
for (int i = 0; i < thread_num; ++i) {
int epoll_fd = ::epoll_create(max_client_num_);
if (epoll_fd == a8::INVALID_FD) {
abort();
}
int event_fd = ::eventfd(0, 0);
if (event_fd == a8::INVALID_FD) {
abort();
}
IoLoopThreadContext* thread_context = new IoLoopThreadContext();
thread_context->epoll_fd = epoll_fd;
thread_context->event_fd.Init(thread_context);
thread_context->event_fd.SetEpollFd(epoll_fd);
std::thread* thread = new std::thread(&a8::IoLoop::WorkerThreadProc, this, thread_context);
worker_threads_.push_back(thread);
thread_contexts_.push_back(thread_context);
}
}
void IoLoop::UnInit()
{
::close(epoll_fd);
epoll_fd = a8::INVALID_FD;
delete worker_thread_;
worker_thread_ = nullptr;
worker_thread_shutdown_ = true;
for (int i = 0; i < thread_num_; ++i) {
worker_threads_[i]->join();
delete worker_threads_[i];
::close(thread_contexts_[i]->epoll_fd);
delete thread_contexts_[i];
}
thread_contexts_.clear();
worker_threads_.clear();
}
void IoLoop::WorkerThreadProc()
AsyncTcpClient* IoLoop::CreateAsyncTcpClient()
{
AsyncTcpClient* client = new AsyncTcpClient();
client->SetEpollFd(thread_contexts_[(uintptr_t)client % thread_num_]->epoll_fd);
return client;
}
void IoLoop::DestoryAsyncTcpClient(AsyncTcpClient* client)
{
AddIMMsg(
thread_contexts_[(uintptr_t)client % thread_num_],
kFreeClient,
a8::XParams()
.SetSender(client)
);
}
void IoLoop::WorkerThreadProc(IoLoopThreadContext* context)
{
epoll_event *events = new epoll_event[max_client_num_];
while (!worker_thread_shutdown_) {
int nfds = ::epoll_wait(epoll_fd, events, max_client_num_, 1000 * 10);
ProcessIMMsg(context);
int nfds = ::epoll_wait(context->epoll_fd, events, max_client_num_, 1000 * 10);
for (int i = 0; i < nfds; ++i) {
a8::AsyncTcpClient* session = (a8::AsyncTcpClient*)events[i].data.ptr;
a8::EpollEventHandler* handler = (a8::EpollEventHandler*)events[i].data.ptr;
if (events[i].events & EPOLLOUT) {
session->DoSend();
} else if (events[i].events & EPOLLIN) {
session->DoRecv();
} else if (events[i].events & EPOLLRDHUP ||
events[i].events & EPOLLHUP ||
events[i].events & EPOLLERR
) {
session->DoDisConnect();
handler->DoSend();
}
if (events[i].events & EPOLLIN) {
handler->DoRecv();
}
if (events[i].events & EPOLLRDHUP ||
events[i].events & EPOLLHUP ||
events[i].events & EPOLLERR
) {
handler->DoError();
}
}
}
delete [] events;
}
void IoLoop::AddIMMsg(IoLoopThreadContext* context, int imcmd, a8::XParams params)
{
context->im_msg_mutex.lock();
context->im_msg_list.push_back(std::make_tuple(
imcmd,
params
));
context->im_msg_mutex.unlock();
context->event_fd.Write(++event_id_);
}
void IoLoop::ProcessIMMsg(IoLoopThreadContext* context)
{
std::list<std::tuple<int, a8::XParams>> msg_list;
context->im_msg_mutex.lock();
if (!context->im_msg_list.empty()) {
context->im_msg_list.swap(msg_list);
}
context->im_msg_mutex.unlock();
if (!msg_list.empty()) {
for (auto itr = msg_list.begin(); itr != msg_list.end(); ++itr) {
int cmd = std::get<0>(*itr);
a8::XParams& param = std::get<1>(*itr);
switch (cmd) {
case kFreeClient:
{
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
delete client;
}
break;
}
}
}
}
}

View File

@ -1,12 +1,14 @@
#ifndef A8_IOLOOP_H
#define A8_IOLOOP_H
#include <atomic>
#include <a8/a8.h>
#include <a8/types.h>
namespace a8
{
class AsyncTcpClient;
struct IoLoopThreadContext;
class IoLoop : public a8::Singleton<IoLoop>
{
private:
@ -14,17 +16,24 @@ namespace a8
friend class a8::Singleton<IoLoop>;
public:
void Init();
void Init(int thread_num);
void UnInit();
volatile int epoll_fd = a8::INVALID_FD;
AsyncTcpClient* CreateAsyncTcpClient();
void DestoryAsyncTcpClient(AsyncTcpClient* client);
private:
void WorkerThreadProc();
void WorkerThreadProc(IoLoopThreadContext* context);
void AddIMMsg(IoLoopThreadContext* context, int imcmd, a8::XParams params);
void ProcessIMMsg(IoLoopThreadContext* context);
private:
int max_client_num_ = 1000;
std::atomic<unsigned long long> event_id_;
int thread_num_ = 1;
int max_client_num_ = 10000;
volatile bool worker_thread_shutdown_ = false;
std::thread* worker_thread_ = nullptr;
std::vector<IoLoopThreadContext*> thread_contexts_;
std::vector<std::thread*> worker_threads_;
};
}

84
a8/timerfd.cc Normal file
View File

@ -0,0 +1,84 @@
#include <unistd.h>
#include <sys/timerfd.h>
#include <a8/a8.h>
#include <a8/timerfd.h>
namespace a8
{
TimerFD::TimerFD()
{
fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (fd_ == a8::INVALID_FD) {
abort();
}
}
TimerFD::~TimerFD()
{
::close(fd_);
fd_ = a8::INVALID_FD;
}
void TimerFD::Init(void* context)
{
context_ = context;
}
void TimerFD::Start(long long ms)
{
itimerspec new_value;
new_value.it_interval.tv_sec = 0;
new_value.it_interval.tv_nsec = ms * 1000 * 1000;
new_value.it_value.tv_sec = 0;
new_value.it_value.tv_nsec = ms * 1000 * 1000;
if (timerfd_settime(fd_, 0, &new_value, nullptr) != 0) {
abort();
}
}
void TimerFD::Stop()
{
itimerspec new_value;
new_value.it_interval.tv_sec = 0;
new_value.it_interval.tv_nsec = 0;
new_value.it_value.tv_sec = 0;
new_value.it_value.tv_nsec = 0;
if (timerfd_settime(fd_, 0, &new_value, nullptr) != 0) {
abort();
}
}
void TimerFD::SetEpollFd(int epoll_fd)
{
epoll_fd_ = epoll_fd;
}
void TimerFD::DoRecv()
{
unsigned long long value = 0;
::read(fd_, &value, sizeof(value));
if (OnTimer) {
OnTimer(context_, value);
}
}
void TimerFD::DoSend()
{
abort();
}
void TimerFD::DoError()
{
abort();
}
}

31
a8/timerfd.h Normal file
View File

@ -0,0 +1,31 @@
#ifndef A8_TIMERFD_H
#define A8_TIMERFD_H
#include <a8/epolleventhandler.h>
namespace a8
{
class TimerFD : public EpollEventHandler
{
public:
std::function<void (void*, unsigned long long)> OnTimer;
TimerFD();
virtual ~TimerFD();
void Init(void* context);
void Start(long long ms);
void Stop();
virtual void SetEpollFd(int epoll_fd) override;
virtual void DoRecv() override;
virtual void DoSend() override;
virtual void DoError() override;
private:
int epoll_fd_ = a8::INVALID_FD;
void* context_ = nullptr;
int fd_ = a8::INVALID_FD;
};
}
#endif

View File

@ -3,28 +3,6 @@
namespace a8
{
namespace reflect
{
class Class;
}
struct ReflectibleObject
{
a8::reflect::Class *metaclass = nullptr;
};
struct IReflectible
{
virtual a8::ReflectibleObject* GetReflectibleObject() const = 0;
virtual void set(a8::ReflectibleObject* ) = 0;
};
template<class T>
struct DataLink
{
T* data = nullptr;
};
typedef long long tick_t;
struct SendQueueNode