This commit is contained in:
aozhiwei 2020-01-14 13:27:38 +08:00
parent 67358105a4
commit 1ba049779b
4 changed files with 136 additions and 30 deletions

View File

@ -14,13 +14,14 @@
#include <a8/asynctcpclient.h>
#include <a8/ioloop.h>
static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10;
static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64;
namespace a8
{
AsyncTcpClient::AsyncTcpClient()
{
ref_count_ = 1;
send_buffer_mutex_ = new std::mutex();
}
@ -122,12 +123,23 @@ namespace a8
void AsyncTcpClient::DoSend()
{
if (!connected_) {
DoConnect();
}
if(socket_ == -1){
return;
}
if (!connected_) {
int error = 0;
socklen_t len = 0;
::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len);
if( !error ) {
DoConnect();
} else {
connected_ = false;
if (on_error) {
on_error(this, error);
}
return;
}
}
if (!work_node_) {
send_buffer_mutex_->lock();
work_node_ = top_node_;
@ -151,20 +163,20 @@ namespace a8
void AsyncTcpClient::SetActive(bool active)
{
if (active) {
ActiveStart();
a8::IoLoop::Instance()->PostAsyncConnect(this, timeout_ms);
} else {
ActiveStop();
a8::IoLoop::Instance()->PostAsyncClose(this);
}
}
bool AsyncTcpClient::ActiveStart()
void AsyncTcpClient::DoAsyncConnect()
{
socket_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (INVALID_SOCKET == socket_) {
if (on_error) {
on_error(this, errno);
}
return false;
return;
}
//set nodelay
{
@ -185,38 +197,53 @@ namespace a8
flags = ::fcntl(socket_, F_GETFL, 0);
::fcntl(socket_, F_SETFL, flags|O_NONBLOCK);
}
//add epoll
{
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR;
ev.data.ptr = this;
int n = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, socket_, &ev);
assert(n == 0);
if (n != 0) {
abort();
}
}
sockaddr_in sa;
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = inet_addr(remote_address.c_str());
sa.sin_port = htons(remote_port);
int ret = ::connect(socket_, (sockaddr*)&sa, sizeof(sa));
if (ret < 0) {
if (errno != EINPROGRESS) {
if (ret == 0) {
//add epoll
{
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
ev.data.ptr = this;
int n = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, socket_, &ev);
assert(n == 0);
if (n != 0) {
abort();
}
}
DoConnect();
} else if (ret < 0) {
if (errno == EINPROGRESS) {
//add epoll
{
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR;
ev.data.ptr = this;
int n = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, socket_, &ev);
assert(n == 0);
if (n != 0) {
abort();
}
}
} else {
connected_ = false;
if (on_error) {
on_error(this, errno);
}
::close(socket_);
socket_ = INVALID_SOCKET;
return false;
return;
}
}
return true;
}
void AsyncTcpClient::ActiveStop()
void AsyncTcpClient::DoAsyncClose()
{
sending_ = false;
connected_ = false;

View File

@ -1,10 +1,12 @@
#ifndef A8_ASYNC_TCPCLIENT_H
#define A8_ASYNC_TCPCLIENT_H
#include <atomic>
#include <a8/epolleventhandler.h>
namespace a8
{
class IoLoop;
class AsyncTcpClient : public EpollEventHandler
{
public:
@ -14,6 +16,7 @@ namespace a8
std::function<void (a8::AsyncTcpClient*, char*, unsigned int)> on_socketread;
std::string remote_address;
int remote_port = 0;
int timeout_ms = 1000 * 10;
AsyncTcpClient();
virtual ~AsyncTcpClient();
@ -32,6 +35,12 @@ namespace a8
virtual void DoError() override;
private:
void DoAsyncConnect();
void DoAsyncClose();
private:
std::atomic<long long> ref_count_;
volatile int epoll_fd_ = a8::INVALID_FD;
volatile int socket_ = a8::INVALID_SOCKET;
volatile bool sending_ = false;
@ -42,10 +51,10 @@ namespace a8
SendQueueNode *work_node_ = nullptr;
void SetActive(bool active);
bool ActiveStart();
void ActiveStop();
void NotifyEpollSend();
void AsyncSend();
friend class IoLoop;
};
}

View File

@ -21,7 +21,9 @@
enum IoLoopIMMsg_e
{
kFreeClient = 1,
kShutdown = 2
kShutdown = 2,
kAsyncConnect = 3,
kAsyncClose = 4,
};
namespace a8
@ -34,6 +36,7 @@ namespace a8
a8::TimerFD timer_fd;
std::mutex im_msg_mutex;
std::list<std::tuple<int, a8::XParams>> im_msg_list;
std::map<AsyncTcpClient*, long long> connect_pending_hash;
};
void IoLoop::Init(int thread_num)
@ -156,8 +159,22 @@ namespace a8
switch (cmd) {
case kFreeClient:
{
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
delete client;
_IMFreeClient(context, param);
}
break;
case kShutdown:
{
_IMShutdown(context, param);
}
break;
case kAsyncConnect:
{
_IMAsyncConnect(context, param);
}
break;
case kAsyncClose:
{
_IMAsyncClose(context, param);
}
break;
}
@ -180,4 +197,48 @@ namespace a8
#endif
}
void IoLoop::PostAsyncConnect(AsyncTcpClient* client, int timeout_ms)
{
AddIMMsg(
thread_contexts_[(uintptr_t)client % thread_num_],
kAsyncConnect,
a8::XParams()
.SetSender(client)
.SetParam1(timeout_ms)
);
}
void IoLoop::PostAsyncClose(AsyncTcpClient* client)
{
AddIMMsg(
thread_contexts_[(uintptr_t)client % thread_num_],
kAsyncClose,
a8::XParams()
.SetSender(client)
);
}
void IoLoop::_IMFreeClient(IoLoopThreadContext* context, a8::XParams& param)
{
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
delete client;
}
void IoLoop::_IMShutdown(IoLoopThreadContext* context, a8::XParams& param)
{
}
void IoLoop::_IMAsyncConnect(IoLoopThreadContext* context, a8::XParams& param)
{
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
client->DoAsyncConnect();
}
void IoLoop::_IMAsyncClose(IoLoopThreadContext* context, a8::XParams& param)
{
AsyncTcpClient* client = (AsyncTcpClient*)param.sender.GetUserData();
client->DoAsyncClose();
}
}

View File

@ -28,6 +28,13 @@ namespace a8
void ProcessIMMsg(IoLoopThreadContext* context);
void OnEvent(void* context, unsigned long long data);
void OnTimer(void* context, unsigned long long data);
void PostAsyncConnect(AsyncTcpClient* client, int timeout_ms);
void PostAsyncClose(AsyncTcpClient* client);
void _IMFreeClient(IoLoopThreadContext* context, a8::XParams& param);
void _IMShutdown(IoLoopThreadContext* context, a8::XParams& param);
void _IMAsyncConnect(IoLoopThreadContext* context, a8::XParams& param);
void _IMAsyncClose(IoLoopThreadContext* context, a8::XParams& param);
private:
std::atomic<unsigned long long> event_id_;
@ -36,6 +43,8 @@ namespace a8
volatile bool worker_thread_shutdown_ = false;
std::vector<IoLoopThreadContext*> thread_contexts_;
std::vector<std::thread*> worker_threads_;
friend class AsyncTcpClient;
};
}