282 lines
7.3 KiB
C++
282 lines
7.3 KiB
C++
#include <string.h>
|
|
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <thread>
|
|
|
|
#include <unistd.h>
|
|
#include <arpa/inet.h>
|
|
#include <fcntl.h>
|
|
#include <sys/epoll.h>
|
|
#include <netinet/tcp.h>
|
|
|
|
#include <a8/a8.h>
|
|
#include <a8/tcpclient2.h>
|
|
#include <a8/ioloop.h>
|
|
|
|
static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10;
|
|
static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64;
|
|
|
|
namespace a8
|
|
{
|
|
TcpClient2::TcpClient2()
|
|
{
|
|
send_buffer_mutex_ = new std::mutex();
|
|
epoll_fd = a8::IoLoop::Instance()->epoll_fd;
|
|
}
|
|
|
|
TcpClient2::~TcpClient2()
|
|
{
|
|
Close();
|
|
delete send_buffer_mutex_;
|
|
send_buffer_mutex_ = nullptr;
|
|
}
|
|
|
|
void TcpClient2::Open()
|
|
{
|
|
if (!IsActive()) {
|
|
SetActive(true);
|
|
}
|
|
}
|
|
|
|
void TcpClient2::Close()
|
|
{
|
|
if (IsActive()) {
|
|
SetActive(false);
|
|
}
|
|
}
|
|
|
|
bool TcpClient2::IsActive()
|
|
{
|
|
return socket_ != a8::INVALID_SOCKET;
|
|
}
|
|
|
|
bool TcpClient2::Connected()
|
|
{
|
|
return connected_;
|
|
}
|
|
|
|
void TcpClient2::SendBuff(const char* buff, unsigned int bufflen)
|
|
{
|
|
if (bufflen > 0) {
|
|
a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode));
|
|
memset(p, 0, sizeof(SendQueueNode));
|
|
p->buff = (char*)malloc(bufflen);
|
|
memmove(p->buff, buff, bufflen);
|
|
p->bufflen = bufflen;
|
|
send_buffer_mutex_->lock();
|
|
if (bot_node_) {
|
|
bot_node_->next = p;
|
|
bot_node_ = p;
|
|
}else{
|
|
top_node_ = p;
|
|
bot_node_ = p;
|
|
}
|
|
if (!sending_) {
|
|
NotifyEpollSend();
|
|
}
|
|
send_buffer_mutex_->unlock();
|
|
}
|
|
}
|
|
|
|
void TcpClient2::DoConnect()
|
|
{
|
|
connected_ = true;
|
|
if (on_connect) {
|
|
on_connect(this);
|
|
}
|
|
}
|
|
|
|
void TcpClient2::DoRecv()
|
|
{
|
|
if (socket_ == -1) {
|
|
return;
|
|
}
|
|
char recvbuf[DEFAULT_MAX_RECV_BUFFERSIZE];
|
|
while (true) {
|
|
int ret = ::recv(socket_, recvbuf, DEFAULT_MAX_RECV_BUFFERSIZE, 0);
|
|
if (ret < 0) {
|
|
if (errno != EAGAIN) {
|
|
Close();
|
|
return;
|
|
} else {
|
|
}
|
|
break;
|
|
} else if (ret == 0) {
|
|
Close();
|
|
return;
|
|
} else {
|
|
if (on_socketread) {
|
|
on_socketread(this, recvbuf, ret);
|
|
}
|
|
if (ret < DEFAULT_MAX_RECV_BUFFERSIZE) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void TcpClient2::DoSend()
|
|
{
|
|
if (!connected_) {
|
|
DoConnect();
|
|
}
|
|
if(socket_ == -1){
|
|
return;
|
|
}
|
|
if (!work_node_) {
|
|
send_buffer_mutex_->lock();
|
|
work_node_ = top_node_;
|
|
top_node_ = nullptr;
|
|
bot_node_ = nullptr;
|
|
send_buffer_mutex_->unlock();
|
|
}
|
|
if (work_node_) {
|
|
AsyncSend();
|
|
}
|
|
}
|
|
|
|
void TcpClient2::DoDisConnect()
|
|
{
|
|
connected_ = false;
|
|
if (on_disconnect) {
|
|
on_disconnect(this);
|
|
}
|
|
}
|
|
|
|
void TcpClient2::SetActive(bool active)
|
|
{
|
|
if (active) {
|
|
ActiveStart();
|
|
} else {
|
|
ActiveStop();
|
|
}
|
|
}
|
|
|
|
bool TcpClient2::ActiveStart()
|
|
{
|
|
socket_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
|
if (INVALID_SOCKET == socket_) {
|
|
if (on_error) {
|
|
on_error(this, errno);
|
|
}
|
|
return false;
|
|
}
|
|
//set nodelay
|
|
{
|
|
int flag = 1;
|
|
int ret = ::setsockopt(socket_,
|
|
IPPROTO_TCP,
|
|
TCP_NODELAY,
|
|
(char *)&flag,
|
|
sizeof(flag));
|
|
assert(ret >= 0);
|
|
if (ret < 0) {
|
|
abort();
|
|
}
|
|
}
|
|
//set nonblock
|
|
{
|
|
int flags = 0;
|
|
flags = ::fcntl(socket_, F_GETFL, 0);
|
|
::fcntl(socket_, F_SETFL, flags|O_NONBLOCK);
|
|
}
|
|
//add epoll
|
|
{
|
|
struct epoll_event ev;
|
|
ev.data.fd = socket_;
|
|
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR;
|
|
ev.data.ptr = this;
|
|
int n = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_, &ev);
|
|
assert(n == 0);
|
|
if (n != 0) {
|
|
abort();
|
|
}
|
|
}
|
|
sockaddr_in sa;
|
|
memset(&sa, 0, sizeof(sa));
|
|
sa.sin_family = AF_INET;
|
|
sa.sin_addr.s_addr = inet_addr(remote_address.c_str());
|
|
sa.sin_port = htons(remote_port);
|
|
int ret = ::connect(socket_, (sockaddr*)&sa, sizeof(sa));
|
|
if (ret < 0) {
|
|
if (errno != EINPROGRESS) {
|
|
if (on_error) {
|
|
on_error(this, errno);
|
|
}
|
|
::close(socket_);
|
|
socket_ = INVALID_SOCKET;
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void TcpClient2::ActiveStop()
|
|
{
|
|
sending_ = false;
|
|
connected_ = false;
|
|
if (socket_ != INVALID_SOCKET) {
|
|
shutdown(socket_, 2);
|
|
::close(socket_);
|
|
}
|
|
socket_ = INVALID_SOCKET;
|
|
}
|
|
|
|
void TcpClient2::NotifyEpollSend()
|
|
{
|
|
sending_ = true;
|
|
struct epoll_event ev;
|
|
ev.data.fd = socket_;
|
|
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
|
|
ev.data.ptr = this;
|
|
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
|
|
}
|
|
|
|
void TcpClient2::AsyncSend()
|
|
{
|
|
while (work_node_) {
|
|
int sentbytes = ::send(socket_,
|
|
work_node_->buff + work_node_->sent_bytes,
|
|
work_node_->bufflen - work_node_->sent_bytes,
|
|
0);
|
|
if (sentbytes <= 0) {
|
|
auto err_code = errno;
|
|
if (err_code == EAGAIN || err_code == EWOULDBLOCK) {
|
|
break;
|
|
} else {
|
|
Close();
|
|
break;
|
|
}
|
|
}
|
|
work_node_->sent_bytes += sentbytes;
|
|
if (work_node_->sent_bytes >= work_node_->bufflen) {
|
|
a8::SendQueueNode *pdelnode = work_node_;
|
|
work_node_ = work_node_->next;
|
|
if (!work_node_) {
|
|
send_buffer_mutex_->lock();
|
|
if (top_node_) {
|
|
work_node_ = top_node_;
|
|
top_node_ = nullptr;
|
|
bot_node_ = nullptr;
|
|
}
|
|
if (!work_node_) {
|
|
sending_ = false;
|
|
struct epoll_event ev;
|
|
ev.data.fd = socket_;
|
|
ev.events = EPOLLIN | EPOLLRDHUP;
|
|
ev.data.ptr = this;
|
|
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
|
|
}
|
|
send_buffer_mutex_->unlock();
|
|
}
|
|
if (pdelnode->buff) {
|
|
free(pdelnode->buff);
|
|
}
|
|
free(pdelnode);
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|