This commit is contained in:
aozhiwei 2020-01-14 21:16:04 +08:00
parent 1ba049779b
commit fe32c02b9e
5 changed files with 87 additions and 13 deletions

View File

@ -21,15 +21,17 @@ namespace a8
AsyncTcpClient::AsyncTcpClient() AsyncTcpClient::AsyncTcpClient()
{ {
ref_count_ = 1; ref_count = 1;
send_buffer_mutex_ = new std::mutex(); send_buffer_mutex_ = new std::mutex();
} }
AsyncTcpClient::~AsyncTcpClient() AsyncTcpClient::~AsyncTcpClient()
{ {
if (!list_empty(&connect_timer_attacher.timer_list_)) {
abort();
}
Close(); Close();
delete send_buffer_mutex_; A8_SAFE_DELETE(send_buffer_mutex_);
send_buffer_mutex_ = nullptr;
} }
void AsyncTcpClient::Open() void AsyncTcpClient::Open()
@ -128,12 +130,18 @@ namespace a8
} }
if (!connected_) { if (!connected_) {
int error = 0; int error = 0;
socklen_t len = 0; socklen_t len = sizeof(error);
::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len); ::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len);
if( !error ) { if( !error ) {
DoConnect(); DoConnect();
} else { } else {
connected_ = false; connected_ = false;
if (socket_ != a8::INVALID_SOCKET) {
::shutdown(socket_, 2);
::close(socket_);
}
socket_ = INVALID_SOCKET;
ClearSendBuff();
if (on_error) { if (on_error) {
on_error(this, error); on_error(this, error);
} }
@ -247,11 +255,23 @@ namespace a8
{ {
sending_ = false; sending_ = false;
connected_ = false; connected_ = false;
if (socket_ != INVALID_SOCKET) { if (socket_ != a8::INVALID_SOCKET) {
shutdown(socket_, 2); ::shutdown(socket_, 2);
::close(socket_); ::close(socket_);
} }
socket_ = INVALID_SOCKET; socket_ = a8::INVALID_SOCKET;
ClearSendBuff();
}
void AsyncTcpClient::ClearSendBuff()
{
send_buffer_mutex_->lock();
a8::ClearSendQueue(top_node_);
top_node_ = nullptr;
bot_node_ = nullptr;
send_buffer_mutex_->unlock();
a8::ClearSendQueue(work_node_);
work_node_ = nullptr;
} }
void AsyncTcpClient::NotifyEpollSend() void AsyncTcpClient::NotifyEpollSend()

View File

@ -2,6 +2,7 @@
#define A8_ASYNC_TCPCLIENT_H #define A8_ASYNC_TCPCLIENT_H
#include <atomic> #include <atomic>
#include <a8/timer_attacher.h>
#include <a8/epolleventhandler.h> #include <a8/epolleventhandler.h>
namespace a8 namespace a8
@ -17,6 +18,8 @@ namespace a8
std::string remote_address; std::string remote_address;
int remote_port = 0; int remote_port = 0;
int timeout_ms = 1000 * 10; int timeout_ms = 1000 * 10;
a8::XTimerAttacher connect_timer_attacher;
std::atomic<long long> ref_count;
AsyncTcpClient(); AsyncTcpClient();
virtual ~AsyncTcpClient(); virtual ~AsyncTcpClient();
@ -38,17 +41,17 @@ namespace a8
void DoAsyncConnect(); void DoAsyncConnect();
void DoAsyncClose(); void DoAsyncClose();
void ClearSendBuff();
private: private:
std::atomic<long long> ref_count_;
volatile int epoll_fd_ = a8::INVALID_FD; volatile int epoll_fd_ = a8::INVALID_FD;
volatile int socket_ = a8::INVALID_SOCKET; volatile int socket_ = a8::INVALID_SOCKET;
volatile bool sending_ = false; volatile bool sending_ = false;
volatile bool connected_ = false; volatile bool connected_ = false;
std::mutex* send_buffer_mutex_ = nullptr; std::mutex* send_buffer_mutex_ = nullptr;
SendQueueNode *top_node_ = nullptr; a8::SendQueueNode *top_node_ = nullptr;
SendQueueNode *bot_node_ = nullptr; a8::SendQueueNode *bot_node_ = nullptr;
SendQueueNode *work_node_ = nullptr; a8::SendQueueNode *work_node_ = nullptr;
void SetActive(bool active); void SetActive(bool active);
void NotifyEpollSend(); void NotifyEpollSend();

View File

@ -17,6 +17,7 @@
#include <a8/eventfd.h> #include <a8/eventfd.h>
#include <a8/timerfd.h> #include <a8/timerfd.h>
#include <a8/udplog.h> #include <a8/udplog.h>
#include <a8/xtimer.h>
enum IoLoopIMMsg_e enum IoLoopIMMsg_e
{ {
@ -32,11 +33,20 @@ namespace a8
struct IoLoopThreadContext struct IoLoopThreadContext
{ {
int epoll_fd = a8::INVALID_FD; int epoll_fd = a8::INVALID_FD;
long long tick = 0;
a8::EventFD event_fd; a8::EventFD event_fd;
a8::TimerFD timer_fd; a8::TimerFD timer_fd;
a8::XTimer xtimer;
std::mutex im_msg_mutex; std::mutex im_msg_mutex;
std::list<std::tuple<int, a8::XParams>> im_msg_list; std::list<std::tuple<int, a8::XParams>> im_msg_list;
std::map<AsyncTcpClient*, long long> connect_pending_hash; std::map<AsyncTcpClient*, long long> connect_pending_hash;
static long long XGetTickCountFunc(void* context)
{
IoLoopThreadContext* thread_context = (IoLoopThreadContext*)context;
return thread_context->tick;
}
}; };
void IoLoop::Init(int thread_num) void IoLoop::Init(int thread_num)
@ -54,6 +64,10 @@ namespace a8
} }
IoLoopThreadContext* thread_context = new IoLoopThreadContext(); IoLoopThreadContext* thread_context = new IoLoopThreadContext();
thread_context->epoll_fd = epoll_fd; thread_context->epoll_fd = epoll_fd;
thread_context->xtimer.Init(&IoLoopThreadContext::XGetTickCountFunc,
thread_context,
60,
1000);
thread_context->event_fd.Init(thread_context); thread_context->event_fd.Init(thread_context);
thread_context->event_fd.SetEpollFd(epoll_fd); thread_context->event_fd.SetEpollFd(epoll_fd);
thread_context->event_fd.OnEvent = std::bind(&IoLoop::OnEvent, thread_context->event_fd.OnEvent = std::bind(&IoLoop::OnEvent,
@ -92,7 +106,9 @@ namespace a8
AsyncTcpClient* IoLoop::CreateAsyncTcpClient() AsyncTcpClient* IoLoop::CreateAsyncTcpClient()
{ {
AsyncTcpClient* client = new AsyncTcpClient(); AsyncTcpClient* client = new AsyncTcpClient();
client->SetEpollFd(thread_contexts_[(uintptr_t)client % thread_num_]->epoll_fd); IoLoopThreadContext* thread_context = thread_contexts_[(uintptr_t)client % thread_num_];
client->connect_timer_attacher.xtimer = &thread_context->xtimer;
client->SetEpollFd(thread_context->epoll_fd);
return client; return client;
} }
@ -191,7 +207,8 @@ namespace a8
void IoLoop::OnTimer(void* context, unsigned long long data) void IoLoop::OnTimer(void* context, unsigned long long data)
{ {
AddIMMsg((IoLoopThreadContext*)context, 100, a8::XParams()); IoLoopThreadContext* thread_context = (IoLoopThreadContext*)context;
++thread_context->tick;
#ifdef DEBUG #ifdef DEBUG
a8::UdpLog::Instance()->Info("OnTimer %d", {data}); a8::UdpLog::Instance()->Info("OnTimer %d", {data});
#endif #endif
@ -221,6 +238,9 @@ namespace a8
void IoLoop::_IMFreeClient(IoLoopThreadContext* context, a8::XParams& param) void IoLoop::_IMFreeClient(IoLoopThreadContext* context, a8::XParams& param)
{ {
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
if (client->ref_count != 1) {
abort();
}
delete client; delete client;
} }
@ -233,11 +253,27 @@ namespace a8
{ {
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
client->DoAsyncConnect(); client->DoAsyncConnect();
client->connect_timer_attacher.ClearTimerList();
context->xtimer.AddDeadLineTimerAndAttach(
param.param1,
a8::XParams()
.SetSender(client),
[] (const a8::XParams& param)
{
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
client->DoAsyncClose();
if (client->on_error) {
client->on_error(client, 111);
}
},
&client->connect_timer_attacher.timer_list_
);
} }
void IoLoop::_IMAsyncClose(IoLoopThreadContext* context, a8::XParams& param) void IoLoop::_IMAsyncClose(IoLoopThreadContext* context, a8::XParams& param)
{ {
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData(); AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
client->connect_timer_attacher.ClearTimerList();
client->DoAsyncClose(); client->DoAsyncClose();
} }

View File

@ -251,4 +251,17 @@ namespace a8
return angle; return angle;
} }
void ClearSendQueue(a8::SendQueueNode* node)
{
a8::SendQueueNode* tmp_node = node;
while (tmp_node) {
a8::SendQueueNode* pdelnode = tmp_node;
tmp_node = pdelnode->next;
if (pdelnode->buff) {
free(pdelnode->buff);
}
free(pdelnode);
}
}
} }

View File

@ -88,6 +88,8 @@ namespace a8
int High32(long long int64_val); int High32(long long int64_val);
float RandAngle(); float RandAngle();
void ClearSendQueue(a8::SendQueueNode* node);
} }
#endif #endif