diff --git a/a8/asynctcpclient.cc b/a8/asynctcpclient.cc index 552db50..3fdeccd 100644 --- a/a8/asynctcpclient.cc +++ b/a8/asynctcpclient.cc @@ -22,7 +22,6 @@ namespace a8 AsyncTcpClient::AsyncTcpClient() { send_buffer_mutex_ = new std::mutex(); - epoll_fd = a8::IoLoop::Instance()->epoll_fd; } AsyncTcpClient::~AsyncTcpClient() @@ -87,6 +86,11 @@ namespace a8 } } + void AsyncTcpClient::SetEpollFd(int epoll_fd) + { + epoll_fd_ = epoll_fd; + } + void AsyncTcpClient::DoRecv() { if (socket_ == -1) { @@ -136,7 +140,7 @@ namespace a8 } } - void AsyncTcpClient::DoDisConnect() + void AsyncTcpClient::DoError() { connected_ = false; if (on_disconnect) { @@ -187,7 +191,7 @@ namespace a8 ev.data.fd = socket_; ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR; ev.data.ptr = this; - int n = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_, &ev); + int n = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, socket_, &ev); assert(n == 0); if (n != 0) { abort(); @@ -230,7 +234,7 @@ namespace a8 ev.data.fd = socket_; ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; ev.data.ptr = this; - ::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev); + ::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, socket_, &ev); } void AsyncTcpClient::AsyncSend() @@ -266,7 +270,7 @@ namespace a8 ev.data.fd = socket_; ev.events = EPOLLIN | EPOLLRDHUP; ev.data.ptr = this; - ::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev); + ::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, socket_, &ev); } send_buffer_mutex_->unlock(); } diff --git a/a8/asynctcpclient.h b/a8/asynctcpclient.h index 38661f7..eb61bb5 100644 --- a/a8/asynctcpclient.h +++ b/a8/asynctcpclient.h @@ -1,9 +1,11 @@ #ifndef A8_ASYNC_TCPCLIENT_H #define A8_ASYNC_TCPCLIENT_H +#include + namespace a8 { - class AsyncTcpClient + class AsyncTcpClient : public EpollEventHandler { public: std::function on_error; @@ -23,12 +25,14 @@ namespace a8 void SendBuff(const char* buff, unsigned int bufflen); void DoConnect(); - void DoRecv(); - void DoSend(); - void DoDisConnect(); + + virtual void SetEpollFd(int epoll_fd) override; + virtual void DoRecv() override; + virtual void DoSend() override; + virtual void DoError() override; private: - volatile int epoll_fd = a8::INVALID_FD; + volatile int epoll_fd_ = a8::INVALID_FD; volatile int socket_ = a8::INVALID_SOCKET; volatile bool sending_ = false; volatile bool connected_ = false; diff --git a/a8/epolleventhandler.h b/a8/epolleventhandler.h new file mode 100644 index 0000000..0c3856d --- /dev/null +++ b/a8/epolleventhandler.h @@ -0,0 +1,16 @@ +#ifndef A8_EPOLLEVENTHANDLER_H +#define A8_EPOLLEVENTHANDLER_H + +namespace a8 +{ + class EpollEventHandler + { + public: + virtual void SetEpollFd(int epoll_fd) = 0; + virtual void DoSend() = 0; + virtual void DoRecv() = 0; + virtual void DoError() = 0; + }; +} + +#endif diff --git a/a8/eventfd.cc b/a8/eventfd.cc new file mode 100644 index 0000000..62a8747 --- /dev/null +++ b/a8/eventfd.cc @@ -0,0 +1,59 @@ +#include + +#include + +#include +#include + +namespace a8 +{ + + EventFD::EventFD() + { + fd_ = ::eventfd(0, 0); + if (fd_ == a8::INVALID_FD) { + abort(); + } + } + + EventFD::~EventFD() + { + ::close(fd_); + fd_ = a8::INVALID_FD; + } + + void EventFD::Init(void* context) + { + context_ = context; + } + + void EventFD::Write(unsigned long long value) + { + eventfd_write(fd_, value); + } + + void EventFD::SetEpollFd(int epoll_fd) + { + epoll_fd_ = epoll_fd; + } + + void EventFD::DoRecv() + { + eventfd_t value = 0; + ::eventfd_read(fd_, &value); + if (OnEvent) { + OnEvent(context_, value); + } + } + + void EventFD::DoSend() + { + abort(); + } + + void EventFD::DoError() + { + abort(); + } + +} diff --git a/a8/eventfd.h b/a8/eventfd.h new file mode 100644 index 0000000..e3a7ea6 --- /dev/null +++ b/a8/eventfd.h @@ -0,0 +1,30 @@ +#ifndef A8_EVENTFD_H +#define A8_EVENTFD_H + +#include + +namespace a8 +{ + class EventFD : public EpollEventHandler + { + public: + std::function OnEvent; + + EventFD(); + virtual ~EventFD(); + + void Init(void* context); + void Write(unsigned long long value); + virtual void SetEpollFd(int epoll_fd) override; + virtual void DoRecv() override; + virtual void DoSend() override; + virtual void DoError() override; + + private: + int epoll_fd_ = a8::INVALID_FD; + void* context_ = nullptr; + int fd_ = a8::INVALID_FD; + }; +} + +#endif diff --git a/a8/ioloop.cc b/a8/ioloop.cc index bea4f4a..6f9b0c0 100644 --- a/a8/ioloop.cc +++ b/a8/ioloop.cc @@ -6,52 +6,147 @@ #include #include #include +#include #include +#include #include #include #include +#include + +enum IoLoopIMMsg_e +{ + kFreeClient = 1 +}; namespace a8 { - void IoLoop::Init() + struct IoLoopThreadContext { + int epoll_fd = a8::INVALID_FD; + a8::EventFD event_fd; + std::mutex im_msg_mutex; + std::list> im_msg_list; + }; + + void IoLoop::Init(int thread_num) + { + if (thread_num < 1) { + abort(); + } + thread_num_ = thread_num; + event_id_ = 1000; max_client_num_ = 10000; - epoll_fd = ::epoll_create(max_client_num_); - assert(epoll_fd != a8::INVALID_FD); - worker_thread_ = new std::thread(&a8::IoLoop::WorkerThreadProc, this); + for (int i = 0; i < thread_num; ++i) { + int epoll_fd = ::epoll_create(max_client_num_); + if (epoll_fd == a8::INVALID_FD) { + abort(); + } + int event_fd = ::eventfd(0, 0); + if (event_fd == a8::INVALID_FD) { + abort(); + } + IoLoopThreadContext* thread_context = new IoLoopThreadContext(); + thread_context->epoll_fd = epoll_fd; + thread_context->event_fd.Init(thread_context); + thread_context->event_fd.SetEpollFd(epoll_fd); + std::thread* thread = new std::thread(&a8::IoLoop::WorkerThreadProc, this, thread_context); + worker_threads_.push_back(thread); + thread_contexts_.push_back(thread_context); + } } void IoLoop::UnInit() { - ::close(epoll_fd); - epoll_fd = a8::INVALID_FD; - delete worker_thread_; - worker_thread_ = nullptr; + worker_thread_shutdown_ = true; + for (int i = 0; i < thread_num_; ++i) { + worker_threads_[i]->join(); + delete worker_threads_[i]; + ::close(thread_contexts_[i]->epoll_fd); + delete thread_contexts_[i]; + } + thread_contexts_.clear(); + worker_threads_.clear(); } - void IoLoop::WorkerThreadProc() + AsyncTcpClient* IoLoop::CreateAsyncTcpClient() + { + AsyncTcpClient* client = new AsyncTcpClient(); + client->SetEpollFd(thread_contexts_[(uintptr_t)client % thread_num_]->epoll_fd); + return client; + } + + void IoLoop::DestoryAsyncTcpClient(AsyncTcpClient* client) + { + AddIMMsg( + thread_contexts_[(uintptr_t)client % thread_num_], + kFreeClient, + a8::XParams() + .SetSender(client) + ); + } + + void IoLoop::WorkerThreadProc(IoLoopThreadContext* context) { epoll_event *events = new epoll_event[max_client_num_]; while (!worker_thread_shutdown_) { - int nfds = ::epoll_wait(epoll_fd, events, max_client_num_, 1000 * 10); + ProcessIMMsg(context); + int nfds = ::epoll_wait(context->epoll_fd, events, max_client_num_, 1000 * 10); for (int i = 0; i < nfds; ++i) { - a8::AsyncTcpClient* session = (a8::AsyncTcpClient*)events[i].data.ptr; + a8::EpollEventHandler* handler = (a8::EpollEventHandler*)events[i].data.ptr; if (events[i].events & EPOLLOUT) { - session->DoSend(); - } else if (events[i].events & EPOLLIN) { - session->DoRecv(); - } else if (events[i].events & EPOLLRDHUP || - events[i].events & EPOLLHUP || - events[i].events & EPOLLERR - ) { - session->DoDisConnect(); + handler->DoSend(); + } + if (events[i].events & EPOLLIN) { + handler->DoRecv(); + } + if (events[i].events & EPOLLRDHUP || + events[i].events & EPOLLHUP || + events[i].events & EPOLLERR + ) { + handler->DoError(); } } } delete [] events; } + void IoLoop::AddIMMsg(IoLoopThreadContext* context, int imcmd, a8::XParams params) + { + context->im_msg_mutex.lock(); + context->im_msg_list.push_back(std::make_tuple( + imcmd, + params + )); + context->im_msg_mutex.unlock(); + context->event_fd.Write(++event_id_); + } + + void IoLoop::ProcessIMMsg(IoLoopThreadContext* context) + { + std::list> msg_list; + context->im_msg_mutex.lock(); + if (!context->im_msg_list.empty()) { + context->im_msg_list.swap(msg_list); + } + context->im_msg_mutex.unlock(); + if (!msg_list.empty()) { + for (auto itr = msg_list.begin(); itr != msg_list.end(); ++itr) { + int cmd = std::get<0>(*itr); + a8::XParams& param = std::get<1>(*itr); + switch (cmd) { + case kFreeClient: + { + AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); + delete client; + } + break; + } + } + } + } + } diff --git a/a8/ioloop.h b/a8/ioloop.h index 10e4806..fdb5687 100644 --- a/a8/ioloop.h +++ b/a8/ioloop.h @@ -1,12 +1,14 @@ #ifndef A8_IOLOOP_H #define A8_IOLOOP_H +#include #include #include namespace a8 { - + class AsyncTcpClient; + struct IoLoopThreadContext; class IoLoop : public a8::Singleton { private: @@ -14,17 +16,24 @@ namespace a8 friend class a8::Singleton; public: - void Init(); + void Init(int thread_num); void UnInit(); - volatile int epoll_fd = a8::INVALID_FD; + + AsyncTcpClient* CreateAsyncTcpClient(); + void DestoryAsyncTcpClient(AsyncTcpClient* client); private: - void WorkerThreadProc(); + void WorkerThreadProc(IoLoopThreadContext* context); + void AddIMMsg(IoLoopThreadContext* context, int imcmd, a8::XParams params); + void ProcessIMMsg(IoLoopThreadContext* context); private: - int max_client_num_ = 1000; + std::atomic event_id_; + int thread_num_ = 1; + int max_client_num_ = 10000; volatile bool worker_thread_shutdown_ = false; - std::thread* worker_thread_ = nullptr; + std::vector thread_contexts_; + std::vector worker_threads_; }; } diff --git a/a8/timerfd.cc b/a8/timerfd.cc new file mode 100644 index 0000000..6aab2ce --- /dev/null +++ b/a8/timerfd.cc @@ -0,0 +1,84 @@ +#include + +#include + +#include +#include + +namespace a8 +{ + + TimerFD::TimerFD() + { + fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + if (fd_ == a8::INVALID_FD) { + abort(); + } + } + + TimerFD::~TimerFD() + { + ::close(fd_); + fd_ = a8::INVALID_FD; + } + + void TimerFD::Init(void* context) + { + context_ = context; + } + + void TimerFD::Start(long long ms) + { + itimerspec new_value; + + new_value.it_interval.tv_sec = 0; + new_value.it_interval.tv_nsec = ms * 1000 * 1000; + + new_value.it_value.tv_sec = 0; + new_value.it_value.tv_nsec = ms * 1000 * 1000; + + if (timerfd_settime(fd_, 0, &new_value, nullptr) != 0) { + abort(); + } + } + + void TimerFD::Stop() + { + itimerspec new_value; + + new_value.it_interval.tv_sec = 0; + new_value.it_interval.tv_nsec = 0; + + new_value.it_value.tv_sec = 0; + new_value.it_value.tv_nsec = 0; + + if (timerfd_settime(fd_, 0, &new_value, nullptr) != 0) { + abort(); + } + } + + void TimerFD::SetEpollFd(int epoll_fd) + { + epoll_fd_ = epoll_fd; + } + + void TimerFD::DoRecv() + { + unsigned long long value = 0; + ::read(fd_, &value, sizeof(value)); + if (OnTimer) { + OnTimer(context_, value); + } + } + + void TimerFD::DoSend() + { + abort(); + } + + void TimerFD::DoError() + { + abort(); + } + +} diff --git a/a8/timerfd.h b/a8/timerfd.h new file mode 100644 index 0000000..5677b1e --- /dev/null +++ b/a8/timerfd.h @@ -0,0 +1,31 @@ +#ifndef A8_TIMERFD_H +#define A8_TIMERFD_H + +#include + +namespace a8 +{ + class TimerFD : public EpollEventHandler + { + public: + std::function OnTimer; + + TimerFD(); + virtual ~TimerFD(); + + void Init(void* context); + void Start(long long ms); + void Stop(); + virtual void SetEpollFd(int epoll_fd) override; + virtual void DoRecv() override; + virtual void DoSend() override; + virtual void DoError() override; + + private: + int epoll_fd_ = a8::INVALID_FD; + void* context_ = nullptr; + int fd_ = a8::INVALID_FD; + }; +} + +#endif diff --git a/a8/types.h b/a8/types.h index f56afa2..c947814 100644 --- a/a8/types.h +++ b/a8/types.h @@ -3,28 +3,6 @@ namespace a8 { - namespace reflect - { - class Class; - } - - struct ReflectibleObject - { - a8::reflect::Class *metaclass = nullptr; - }; - - struct IReflectible - { - virtual a8::ReflectibleObject* GetReflectibleObject() const = 0; - virtual void set(a8::ReflectibleObject* ) = 0; - }; - - template - struct DataLink - { - T* data = nullptr; - }; - typedef long long tick_t; struct SendQueueNode