This commit is contained in:
aozhiwei 2019-06-03 14:55:59 +08:00
parent 8cee06b528
commit 9c2a3520f5
4 changed files with 119 additions and 18 deletions

View File

@ -11,23 +11,47 @@
#include <thread>
#include <a8/ioloop.h>
#include <a8/tcpclient2.h>
namespace a8
{
void IoLoop::Init()
{
epoll_fd = ::epoll_create(10000);
max_client_num_ = 10000;
epoll_fd = ::epoll_create(max_client_num_);
assert(epoll_fd != a8::INVALID_FD);
worker_thread_ = new std::thread(&a8::IoLoop::WorkerThreadProc, this);
}
void IoLoop::UnInit()
{
::close(epoll_fd);
epoll_fd = a8::INVALID_FD;
delete worker_thread_;
worker_thread_ = nullptr;
}
void IoLoop::Update()
void IoLoop::WorkerThreadProc()
{
epoll_event *events = new epoll_event[max_client_num_];
while (!worker_thread_shutdown_) {
int nfds = ::epoll_wait(epoll_fd, events, max_client_num_, 1000 * 10);
for (int i = 0; i < nfds; ++i) {
a8::TcpClient2* session = (a8::TcpClient2*)events[i].data.ptr;
if (events[i].events & EPOLLOUT) {
session->DoSend();
} else if (events[i].events & EPOLLIN) {
session->DoRecv();
} else if (events[i].events & EPOLLRDHUP ||
events[i].events & EPOLLHUP ||
events[i].events & EPOLLERR
) {
session->DoDisConnect();
}
}
}
delete [] events;
}
}

View File

@ -3,6 +3,7 @@
namespace a8
{
class IoLoop : public a8::Singleton<IoLoop>
{
private:
@ -12,11 +13,17 @@ namespace a8
public:
void Init();
void UnInit();
void Update();
private:
void WorkerThreadProc();
private:
int max_client_num_ = 1000;
volatile int epoll_fd = a8::INVALID_FD;
volatile bool worker_thread_shutdown_ = false;
std::thread* worker_thread_ = nullptr;
};
}
#endif

View File

@ -13,15 +13,14 @@
#include <a8/a8.h>
#include <a8/tcpclient2.h>
const int MAX_RECV_BUFFERSIZE = 1024 * 10;
static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10;
static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64;
namespace a8
{
TcpClient2::TcpClient2()
{
send_buffer_mutex_ = new std::mutex();
send_cond_mutex_ = new std::mutex();
send_cond_ = new std::condition_variable();
}
TcpClient2::~TcpClient2()
@ -29,10 +28,6 @@ namespace a8
Close();
delete send_buffer_mutex_;
send_buffer_mutex_ = nullptr;
delete send_cond_mutex_;
send_cond_mutex_ = nullptr;
delete send_cond_;
send_cond_ = nullptr;
}
void TcpClient2::Open()
@ -75,8 +70,72 @@ namespace a8
top_node_ = p;
bot_node_ = p;
}
if (!sending_) {
NotifyEpollSend();
}
send_buffer_mutex_->unlock();
NotifySendCond();
}
}
void TcpClient2::DoConnect()
{
connected_ = true;
if (on_connect) {
on_connect(this);
}
}
void TcpClient2::DoRecv()
{
if (socket_ == -1) {
return;
}
char recvbuf[DEFAULT_MAX_RECV_BUFFERSIZE];
while (true) {
int ret = ::recv(socket_, recvbuf, DEFAULT_MAX_RECV_BUFFERSIZE, 0);
if (ret < 0) {
if (errno != EAGAIN) {
Close();
return;
} else {
}
break;
} else if (ret == 0) {
Close();
return;
} else {
if (on_socketread) {
on_socketread(this, recvbuf, ret);
}
if (ret < DEFAULT_MAX_RECV_BUFFERSIZE) {
break;
}
}
}
}
void TcpClient2::DoSend()
{
if(socket_ == -1){
return;
}
if (!work_node_) {
send_buffer_mutex_.lock();
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
send_buffer_mutex_.unlock();
}
if (work_node_) {
AsyncSend();
}
}
void TcpClient2::DoDisConnect()
{
connected_ = false;
if (on_disconnect) {
on_disconnect(this);
}
}
@ -141,6 +200,7 @@ namespace a8
void TcpClient2::ActiveStop()
{
sending_ = false;
connected_ = false;
if (socket_ != INVALID_SOCKET) {
shutdown(socket_, 2);
@ -149,10 +209,16 @@ namespace a8
socket_ = INVALID_SOCKET;
}
void TcpClient2::NotifySendCond()
void TcpClient2::NotifyEpollSend()
{
std::unique_lock<std::mutex> lk(*send_cond_mutex_);
send_cond_->notify_all();
sending_ = true;
#if 0
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
#endif
}
}

View File

@ -22,19 +22,23 @@ namespace a8
bool Connected();
void SendBuff(const char* buff, unsigned int bufflen);
void DoConnect();
void DoRecv();
void DoSend();
void DoDisConnect();
private:
volatile int socket_ = a8::INVALID_SOCKET;
volatile bool sending_ = false;
volatile bool connected_ = false;
std::mutex* send_buffer_mutex_ = nullptr;
SendQueueNode *top_node_ = nullptr;
SendQueueNode *bot_node_ = nullptr;
std::mutex *send_cond_mutex_ = nullptr;
std::condition_variable *send_cond_ = nullptr;
void SetActive(bool active);
bool ActiveStart();
void ActiveStop();
void NotifySendCond();
void NotifyEpollSend();
};
}