From b9d6982944b248e02a0e1e5ea8de4a69f96c1457 Mon Sep 17 00:00:00 2001 From: azw Date: Fri, 5 May 2023 01:12:44 +0000 Subject: [PATCH] 1 --- a8/asiotcpclient.cc | 251 ++++++++++++++++++++++++++++++++++++++++++++ a8/asiotcpclient.h | 45 ++++++++ 2 files changed, 296 insertions(+) create mode 100644 a8/asiotcpclient.cc create mode 100644 a8/asiotcpclient.h diff --git a/a8/asiotcpclient.cc b/a8/asiotcpclient.cc new file mode 100644 index 0000000..6462601 --- /dev/null +++ b/a8/asiotcpclient.cc @@ -0,0 +1,251 @@ +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +const int MAX_RECV_BUFFERSIZE = 1024 * 64; + +namespace a8 +{ + AsioTcpClient::AsioTcpClient() + { + send_buffer_mutex_ = new std::mutex(); + send_cond_mutex_ = new std::mutex(); + send_cond_ = new std::condition_variable(); + } + + AsioTcpClient::~AsioTcpClient() + { + Close(); + delete send_buffer_mutex_; + send_buffer_mutex_ = nullptr; + delete send_cond_mutex_; + send_cond_mutex_ = nullptr; + delete send_cond_; + send_cond_ = nullptr; + } + + void AsioTcpClient::Open() + { + if (!IsActive()) { + SetActive(true); + } + } + + void AsioTcpClient::Close() + { + if (IsActive()) { + SetActive(false); + } + } + + bool AsioTcpClient::IsActive() + { + return socket_ != a8::INVALID_SOCKET; + } + + bool AsioTcpClient::Connected() + { + return connected_; + } + + void AsioTcpClient::SendBuff(const char* buff, unsigned int bufflen) + { + if (bufflen > 0) { + a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); + memset(p, 0, sizeof(SendQueueNode)); + p->buff = (char*)malloc(bufflen); + memmove(p->buff, buff, bufflen); + p->bufflen = bufflen; + send_buffer_mutex_->lock(); + if (bot_node_) { + bot_node_->next = p; + bot_node_ = p; + }else{ + top_node_ = p; + bot_node_ = p; + } + send_buffer_mutex_->unlock(); + NotifySendCond(); + } + } + + void AsioTcpClient::SetActive(bool active) + { + if (active) { + if (IsActive()) { + return; + } + if (worker_thread_) { + ActiveStop(); + } + worker_thread_shutdown_ = false; + worker_thread_ = new std::thread(&AsioTcpClient::WorkerThreadProc, this); + } else { + if (worker_thread_) { + ActiveStop(); + } + } + } + + bool AsioTcpClient::ActiveStart() + { + socket_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (INVALID_SOCKET == socket_) { + if (on_error) { + on_error(this, errno); + } + return false; + } + sockaddr_in sa; + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = inet_addr(remote_address.c_str()); + sa.sin_port = htons(remote_port); + if (::connect(socket_, (sockaddr*)&sa, sizeof(sa)) < 0) { + if (on_error) { + on_error(this, errno); + } + ::close(socket_); + socket_ = INVALID_SOCKET; + return false; + } + //set nodelay + { + int flag = 1; + int ret = ::setsockopt(socket_, + IPPROTO_TCP, + TCP_NODELAY, + (char *)&flag, + sizeof(flag)); + assert(ret >= 0); + if (ret < 0) { + abort(); + } + } + connected_ = true; + if (on_connect) { + on_connect(this); + } + return true; + } + + void AsioTcpClient::ActiveStop() + { + connected_ = false; + if (socket_ != INVALID_SOCKET) { + shutdown(socket_, 2); + ::close(socket_); + } + if (worker_thread_) { + worker_thread_shutdown_ = true; + worker_thread_->join(); + delete worker_thread_; + worker_thread_ = nullptr; + } + socket_ = INVALID_SOCKET; + } + + void AsioTcpClient::WorkerThreadProc() + { + if (!ActiveStart()) { + return; + } + + sender_thread_shutdown_ = false; + std::thread* senderthread = new std::thread(&AsioTcpClient::SenderThreadProc, this); + char recvBuf[MAX_RECV_BUFFERSIZE]; + while (!worker_thread_shutdown_) { + int ret = ::recv(socket_, recvBuf, MAX_RECV_BUFFERSIZE, 0); + if (ret < 0) { + connected_ = false; + if (on_disconnect) { + on_disconnect(this); + } + worker_thread_shutdown_ = true; + break; + } else if(ret == 0) { + connected_ = false; + if (on_disconnect) { + on_disconnect(this); + } + worker_thread_shutdown_ = true; + break; + } else { + if (on_socketread) { + on_socketread(this, recvBuf, (unsigned int)ret); + } + } + } + sender_thread_shutdown_ = true; + senderthread->join(); + delete senderthread; + senderthread = nullptr; + socket_ = INVALID_SOCKET; + } + + void AsioTcpClient::SenderThreadProc() + { + a8::SendQueueNode* worknode = nullptr; + a8::SendQueueNode* currnode = nullptr; + while (!sender_thread_shutdown_) { + if (!worknode && top_node_) { + send_buffer_mutex_->lock(); + worknode = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + send_buffer_mutex_->unlock(); + } + + while (worknode && !sender_thread_shutdown_) { + currnode = worknode; + while (currnode->sent_bytes < currnode->bufflen && !sender_thread_shutdown_) { + int len = ::send(socket_, + currnode->buff + currnode->sent_bytes, + currnode->bufflen - currnode->sent_bytes, + 0); + if (len > 0) { + currnode->sent_bytes += len; + } else { + break; + } + } + //send + if (currnode->sent_bytes >= currnode->bufflen) { + worknode = worknode->next; + free(currnode->buff); + free(currnode); + } + + } + { + std::unique_lock lk(*send_cond_mutex_); + send_cond_->wait_for(lk, std::chrono::seconds(1)); + } + } + while (worknode) { + currnode = worknode; + worknode = worknode->next; + free(currnode->buff); + free(currnode); + } + } + + void AsioTcpClient::NotifySendCond() + { + std::unique_lock lk(*send_cond_mutex_); + send_cond_->notify_all(); + } + +} diff --git a/a8/asiotcpclient.h b/a8/asiotcpclient.h new file mode 100644 index 0000000..e676552 --- /dev/null +++ b/a8/asiotcpclient.h @@ -0,0 +1,45 @@ +#pragma once + +namespace a8 +{ + + class AsioTcpClient + { + public: + std::function on_error; + std::function on_connect; + std::function on_disconnect; + std::function on_socketread; + std::string remote_address; + int remote_port = 0; + + AsioTcpClient(); + virtual ~AsioTcpClient(); + + void Open(); + void Close(); + bool IsActive(); + bool Connected(); + void SendBuff(const char* buff, unsigned int bufflen); + + private: + volatile int socket_ = a8::INVALID_SOCKET; + volatile bool connected_ = false; + volatile bool sender_thread_shutdown_ = false; + volatile bool worker_thread_shutdown_ = false; + std::thread* worker_thread_ = nullptr; + std::mutex* send_buffer_mutex_ = nullptr; + SendQueueNode *top_node_ = nullptr; + SendQueueNode *bot_node_ = nullptr; + std::mutex *send_cond_mutex_ = nullptr; + std::condition_variable *send_cond_ = nullptr; + + void SetActive(bool active); + bool ActiveStart(); + void ActiveStop(); + void WorkerThreadProc(); + void SenderThreadProc(); + void NotifySendCond(); + }; + +}