This commit is contained in:
azw 2023-05-05 23:39:14 +00:00
parent c5e180a96a
commit ace8590f25
2 changed files with 6 additions and 126 deletions

View File

@ -54,7 +54,9 @@ namespace a8
bool AsioTcpClient::IsActive()
{
#if 0
return socket_ != a8::INVALID_SOCKET;
#endif
}
bool AsioTcpClient::Connected()
@ -103,145 +105,20 @@ namespace a8
bool AsioTcpClient::ActiveStart()
{
socket_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (INVALID_SOCKET == socket_) {
if (on_error) {
on_error(this, errno);
}
return false;
}
sockaddr_in sa;
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = inet_addr(remote_address.c_str());
sa.sin_port = htons(remote_port);
if (::connect(socket_, (sockaddr*)&sa, sizeof(sa)) < 0) {
if (on_error) {
on_error(this, errno);
}
::close(socket_);
socket_ = INVALID_SOCKET;
return false;
}
//set nodelay
{
int flag = 1;
int ret = ::setsockopt(socket_,
IPPROTO_TCP,
TCP_NODELAY,
(char *)&flag,
sizeof(flag));
assert(ret >= 0);
if (ret < 0) {
abort();
}
}
connected_ = true;
if (on_connect) {
on_connect(this);
}
return true;
}
void AsioTcpClient::ActiveStop()
{
connected_ = false;
if (socket_ != INVALID_SOCKET) {
shutdown(socket_, 2);
::close(socket_);
}
if (worker_thread_) {
worker_thread_shutdown_ = true;
worker_thread_->join();
delete worker_thread_;
worker_thread_ = nullptr;
}
socket_ = INVALID_SOCKET;
}
void AsioTcpClient::WorkerThreadProc()
{
if (!ActiveStart()) {
return;
}
sender_thread_shutdown_ = false;
std::thread* senderthread = new std::thread(&AsioTcpClient::SenderThreadProc, this);
char recvBuf[MAX_RECV_BUFFERSIZE];
while (!worker_thread_shutdown_) {
int ret = ::recv(socket_, recvBuf, MAX_RECV_BUFFERSIZE, 0);
if (ret < 0) {
connected_ = false;
if (on_disconnect) {
on_disconnect(this);
}
worker_thread_shutdown_ = true;
break;
} else if(ret == 0) {
connected_ = false;
if (on_disconnect) {
on_disconnect(this);
}
worker_thread_shutdown_ = true;
break;
} else {
if (on_socketread) {
on_socketread(this, recvBuf, (unsigned int)ret);
}
}
}
sender_thread_shutdown_ = true;
senderthread->join();
delete senderthread;
senderthread = nullptr;
socket_ = INVALID_SOCKET;
}
void AsioTcpClient::SenderThreadProc()
{
a8::SendQueueNode* worknode = nullptr;
a8::SendQueueNode* currnode = nullptr;
while (!sender_thread_shutdown_) {
if (!worknode && top_node_) {
send_buffer_mutex_->lock();
worknode = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
send_buffer_mutex_->unlock();
}
while (worknode && !sender_thread_shutdown_) {
currnode = worknode;
while (currnode->sent_bytes < currnode->bufflen && !sender_thread_shutdown_) {
int len = ::send(socket_,
currnode->buff + currnode->sent_bytes,
currnode->bufflen - currnode->sent_bytes,
0);
if (len > 0) {
currnode->sent_bytes += len;
} else {
break;
}
}
//send
if (currnode->sent_bytes >= currnode->bufflen) {
worknode = worknode->next;
free(currnode->buff);
free(currnode);
}
}
{
std::unique_lock<std::mutex> lk(*send_cond_mutex_);
send_cond_->wait_for(lk, std::chrono::seconds(1));
}
}
while (worknode) {
currnode = worknode;
worknode = worknode->next;
free(currnode->buff);
free(currnode);
}
}
void AsioTcpClient::NotifySendCond()

View File

@ -2,6 +2,8 @@
#ifdef USE_ASIO
#include <asio.hpp>
namespace a8
{
@ -25,7 +27,8 @@ namespace a8
void SendBuff(const char* buff, unsigned int bufflen);
private:
volatile int socket_ = a8::INVALID_SOCKET;
std::shared_ptr<asio::ip::tcp::resolver> resolver_;
std::shared_ptr<asio::ip::tcp::socket> socket_;
volatile bool connected_ = false;
volatile bool sender_thread_shutdown_ = false;
volatile bool worker_thread_shutdown_ = false;