tcpsession2 ok

This commit is contained in:
aozhiwei 2019-05-27 11:35:35 +08:00
parent 1a7cd39c32
commit ce69fddf1c
3 changed files with 60 additions and 106 deletions

View File

@ -1,7 +1,11 @@
#ifndef A8_BASE_HTTPSESSION_H
#define A8_BASE_HTTPSESSION_H
#ifdef A8_TCP_SESSION2
#include <a8/tcpsession2.h>
#else
#include <a8/tcpsession.h>
#endif
namespace a8
{

View File

@ -10,7 +10,11 @@
#include <mutex>
#include <a8/a8.h>
#ifdef A8_TCP_SESSION2
#include <a8/tcpsession2.h>
#else
#include <a8/tcpsession.h>
#endif
#include <a8/tcplistener.h>
#include <a8/perfmonitor.h>
@ -76,41 +80,22 @@ namespace a8
p->buff = (char*)malloc(bufflen);
memmove(p->buff, buff, bufflen);
p->bufflen = bufflen;
#ifdef NEW_NET
bool is_first_package = false;
send_buffer_mutex_.lock();
if (!work_node_ && !top_node_) {
work_node_ = p;
is_first_package = true;
if (bot_node_) {
bot_node_->next = p;
bot_node_ = p;
} else {
if (bot_node_) {
bot_node_->next = p;
bot_node_ = p;
} else {
top_node_ = p;
bot_node_ = p;
}
top_node_ = p;
bot_node_ = p;
}
#if 1
NotifyEpollSend();
#else
if (!sending_) {
NotifyEpollSend();
}
#endif
send_buffer_mutex_.unlock();
if (work_node_ && is_first_package) {
AsyncSend(is_first_package);
}
#else
send_buffer_mutex_.lock();
if(work_node_ == NULL && top_node_ == NULL){
work_node_ = p;
AsyncSend(true);
}else{
if (bot_node_){
bot_node_->next = p;
bot_node_ = p;
}else{
top_node_ = p;
bot_node_ = p;
}
}
send_buffer_mutex_.unlock();
#endif
}
}
@ -139,9 +124,9 @@ namespace a8
socket_ = -1;
remote_address = "";
remote_port = 0;
top_node_ = NULL;
bot_node_ = NULL;
work_node_ = NULL;
top_node_ = nullptr;
bot_node_ = nullptr;
work_node_ = nullptr;
socket_handle = 0;
recv_bufflen_ = 0;
is_activite = false;
@ -152,7 +137,7 @@ namespace a8
if (recv_buff_) {
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = NULL;
recv_buff_ = nullptr;
}
Close();
ClearSendBuff();
@ -247,7 +232,6 @@ namespace a8
if(socket_ == -1){
return;
}
#ifdef NEW_NET
if (!work_node_) {
send_buffer_mutex_.lock();
work_node_ = top_node_;
@ -256,20 +240,8 @@ namespace a8
send_buffer_mutex_.unlock();
}
if (work_node_) {
AsyncSend(false);
AsyncSend();
}
#else
send_buffer_mutex_.lock();
if (!work_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
if (work_node_) {
AsyncSend(false);
}
send_buffer_mutex_.unlock();
#endif
}
void TcpSession::Close()
@ -289,12 +261,12 @@ namespace a8
send_buffer_mutex_.lock();
p_top_node_ = top_node_;
p_work_node_ = work_node_;
top_node_ = NULL;
bot_node_ = NULL;
work_node_ = NULL;
top_node_ = nullptr;
bot_node_ = nullptr;
work_node_ = nullptr;
send_buffer_mutex_.unlock();
a8::SendQueueNode *pdelnode = NULL;
a8::SendQueueNode *pdelnode = nullptr;
while (p_top_node_) {
pdelnode = p_top_node_;
p_top_node_ = p_top_node_->next;
@ -313,83 +285,59 @@ namespace a8
}
}
void TcpSession::AsyncSend(bool is_first_package)
void TcpSession::AsyncSend()
{
while (work_node_) {
#ifdef A8_PERF
a8::tick_t begin_tick = a8::XGetTickCount();
#endif
int sentbytes = ::send(socket_,
work_node_->buff + work_node_->sent_bytes,
work_node_->bufflen - work_node_->sent_bytes,
0);
#ifdef A8_PERF
a8::tick_t end_tick = a8::XGetTickCount();
if (end_tick - begin_tick > a8::PerfMonitor::Instance()->max_send_time) {
a8::PerfMonitor::Instance()->max_send_time = end_tick - begin_tick;
}
#endif
if (sentbytes <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
#ifdef A8_PERF
++a8::PerfMonitor::Instance()->send_eagain_times;
#endif
#ifdef NEW_NET
if (is_first_package) {
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
return;
}
#endif
continue;
break;
} else {
Close();
break;
}
}
work_node_->sent_bytes += sentbytes;
if (work_node_->sent_bytes >= work_node_->bufflen) {
a8::SendQueueNode *pdelnode = work_node_;
#ifdef NEW_NET
#else
a8::SendQueueNode *nextnode = work_node_->next;
#endif
#ifdef NEW_NET
send_buffer_mutex_.lock();
work_node_ = work_node_->next;
if (!work_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
send_buffer_mutex_.lock();
if (!top_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
if (!work_node_) {
sending_ = false;
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
}
send_buffer_mutex_.unlock();
}
#ifdef NEW_NET
if (!work_node_ && !is_first_package) {
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
}
#endif
send_buffer_mutex_.unlock();
#else
work_node_ = work_node_->next; //!!!!要处理重入问题
#endif
if (pdelnode->buff) {
free(pdelnode->buff);
}
free(pdelnode);
#ifdef NEW_NET
#else
if (!nextnode) {
break;
}
#endif
}
}
}
void TcpSession::NotifyEpollSend()
{
sending_ = true;
struct epoll_event ev;
ev.data.fd = socket_;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
ev.data.ptr = this;
::epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socket_, &ev);
}
}
#endif

View File

@ -52,7 +52,8 @@ namespace a8
private:
void ClearSendBuff();
void AsyncSend(bool is_first_package);
void AsyncSend();
void NotifyEpollSend();
protected:
char *recv_buff_ = nullptr;
@ -61,6 +62,7 @@ namespace a8
private:
int socket_ = 0;
volatile bool sending_ = false;
a8::SendQueueNode* top_node_ = nullptr;
a8::SendQueueNode* bot_node_ = nullptr;
a8::SendQueueNode* work_node_ = nullptr;