a8/a8/tcpsession.cc
2019-07-13 12:58:47 +08:00

368 lines
9.9 KiB
C++

#include <memory.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <netinet/tcp.h>
#include <thread>
#include <mutex>
#include <a8/a8.h>
#include <a8/tcpsession.h>
#include <a8/tcplistener.h>
#include <a8/perfmonitor.h>
static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10;
static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64;
namespace a8
{
TcpSession::TcpSession()
{
INIT_LIST_HEAD(&session_entry);
max_packet_len_ = DEFAULT_MAX_PACKET_LEN;
}
TcpSession::~TcpSession()
{
}
void TcpSession::SetMaxPacketLen(int max_packet_len)
{
max_packet_len_ = std::max(max_packet_len, DEFAULT_MAX_PACKET_LEN);
}
int TcpSession::Socket()
{
return socket_;
}
void TcpSession::OnError(int)
{
}
void TcpSession::OnConnect()
{
}
void TcpSession::OnDisConnect()
{
}
void TcpSession::DecodePacket(char* buf, int& offset, unsigned int buflen)
{
DecodeUserPacket(buf, offset, buflen);
}
bool TcpSession::Alive()
{
return is_activite || time(nullptr) - create_time < 60 * 5;
}
void TcpSession::SendBuff(const char* buff, unsigned int bufflen)
{
if (socket_ == -1) {
return;
}
if (bufflen > 0) {
a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode));
memset(p, 0, sizeof(a8::SendQueueNode));
p->buff = (char*)malloc(bufflen);
memmove(p->buff, buff, bufflen);
p->bufflen = bufflen;
send_buffer_mutex_.lock();
if (bot_node_) {
bot_node_->next = p;
bot_node_ = p;
} else {
top_node_ = p;
bot_node_ = p;
}
if (!sending_) {
NotifyEpollSend();
}
send_buffer_mutex_.unlock();
++master->send_node_num;
}
}
void TcpSession::SendText(const std::string& text)
{
SendBuff(text.c_str(), text.size());
}
void TcpSession::SetSocket(int sock)
{
socket_ = sock;
}
bool TcpSession::AllocRecvBuf()
{
if (!recv_buff_) {
recv_buff_ = (char *)malloc(max_packet_len_ + 1);
}
recv_bufflen_ = 0;
return recv_buff_ != nullptr;
}
void TcpSession::Reset()
{
ClearSendBuff();
ClearWorkBuff();
socket_ = -1;
remote_address = "";
remote_port = 0;
top_node_ = nullptr;
bot_node_ = nullptr;
work_node_ = nullptr;
socket_handle = 0;
recv_bufflen_ = 0;
is_activite = false;
sending_ = false;
}
void TcpSession::Destory()
{
if (recv_buff_) {
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
}
Close();
ClearSendBuff();
ClearWorkBuff();
}
void TcpSession::_ForceClose()
{
if (socket_ != -1) {
int oldsocket = socket_;
::close(socket_);
socket_ = -1;
struct epoll_event ev;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, oldsocket, &ev);
master->FreeClientWithNoLock(this);
}
}
void TcpSession::OnSocketRead(char* buf, unsigned int buflen)
{
unsigned int already_read_bytes = 0;
do {
if (already_read_bytes < buflen) {
int read_bytes = std::min(buflen - already_read_bytes,
(unsigned int)max_packet_len_ - recv_bufflen_);
if (read_bytes > 0) {
memmove(&recv_buff_[recv_bufflen_], buf + already_read_bytes, read_bytes);
recv_bufflen_ += read_bytes;
already_read_bytes += read_bytes;
}
}
int offset = 0;
int prev_offset = 0;
do {
prev_offset = offset;
DecodePacket(recv_buff_, offset, recv_bufflen_);
} while (prev_offset < offset && offset < recv_bufflen_);
if (offset > 0 && offset < recv_bufflen_){
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
if (recv_bufflen_ >= max_packet_len_) {
//收到超长包
Close();
return;
}
} while (already_read_bytes < buflen);
}
void TcpSession::DoClientRecv()
{
if (socket_ == -1) {
return;
}
char recvbuf[DEFAULT_MAX_RECV_BUFFERSIZE];
while (true) {
#ifdef A8_PERF
a8::tick_t begin_tick = a8::XGetTickCount();
#endif
int ret = ::recv(socket_, recvbuf, DEFAULT_MAX_RECV_BUFFERSIZE, 0);
#ifdef A8_PERF
a8::tick_t end_tick = a8::XGetTickCount();
if (end_tick - begin_tick > a8::PerfMonitor::Instance()->max_recv_time) {
a8::PerfMonitor::Instance()->max_recv_time = end_tick - begin_tick;
}
#endif
if (ret < 0) {
if (errno != EAGAIN) {
Close();
return;
} else {
#ifdef A8_PERF
++a8::PerfMonitor::Instance()->recv_eagain_times;
#endif
}
break;
} else if (ret == 0) {
Close();
return;
} else {
OnSocketRead(recvbuf, ret);
if (ret < DEFAULT_MAX_RECV_BUFFERSIZE) {
break;
}
}
}
}
void TcpSession::DoClientSend()
{
if(socket_ == -1){
return;
}
++epoll_out_times;
if (!work_node_) {
send_buffer_mutex_.lock();
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
send_buffer_mutex_.unlock();
}
if (work_node_) {
AsyncSend();
}
}
void TcpSession::Close()
{
if (socket_ != -1){
master->LockClients();
_ForceClose();
master->UnLockClients();
}
OnDisConnect();
}
void TcpSession::ClearSendBuff()
{
a8::SendQueueNode* p_top_node_ = nullptr;
send_buffer_mutex_.lock();
p_top_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
send_buffer_mutex_.unlock();
a8::SendQueueNode *pdelnode = nullptr;
while (p_top_node_) {
pdelnode = p_top_node_;
p_top_node_ = p_top_node_->next;
if (pdelnode->buff) {
free(pdelnode->buff);
}
free(pdelnode);
--master->send_node_num;
}
}
void TcpSession::ClearWorkBuff()
{
while (work_node_) {
a8::SendQueueNode *pdelnode = work_node_;
work_node_ = work_node_->next;
if (pdelnode->buff) {
free(pdelnode->buff);
}
free(pdelnode);
--master->send_node_num;
}
}
void TcpSession::AsyncSend()
{
while (work_node_) {
int sentbytes = ::send(socket_,
work_node_->buff + work_node_->sent_bytes,
work_node_->bufflen - work_node_->sent_bytes,
0);
if (sentbytes <= 0) {
auto err_code = errno;
if (err_code == EAGAIN || err_code == EWOULDBLOCK) {
break;
} else {
Close();
break;
}
}
work_node_->sent_bytes += sentbytes;
if (work_node_->sent_bytes >= work_node_->bufflen) {
a8::SendQueueNode *pdelnode = work_node_;
work_node_ = work_node_->next;
--master->send_node_num;
++master->sent_node_num;
master->sent_bytes_num += pdelnode->sent_bytes;
if (!work_node_) {
send_buffer_mutex_.lock();
if (top_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
if (!work_node_) {
sending_ = false;
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
}
send_buffer_mutex_.unlock();
}
if (pdelnode->buff) {
free(pdelnode->buff);
}
free(pdelnode);
}
}
}
void TcpSession::NotifyEpollSend()
{
sending_ = true;
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
}
int TcpSession::DirectSend(a8::SendQueueNode* p)
{
++direct_send_times;
while (true) {
int sentbytes = ::send(socket_,
p->buff + p->sent_bytes,
p->bufflen - p->sent_bytes,
0);
if (sentbytes <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return -1;
} else {
Close();
return -2;
}
} else {
p->sent_bytes += sentbytes;
if (p->sent_bytes >= p->bufflen) {
return 0;
}
}
}
}
}