diff --git a/a8/asiotcpclient.cc b/a8/asiotcpclient.cc index abfef0a..cd949d1 100644 --- a/a8/asiotcpclient.cc +++ b/a8/asiotcpclient.cc @@ -54,7 +54,9 @@ namespace a8 bool AsioTcpClient::IsActive() { + #if 0 return socket_ != a8::INVALID_SOCKET; + #endif } bool AsioTcpClient::Connected() @@ -103,145 +105,20 @@ namespace a8 bool AsioTcpClient::ActiveStart() { - socket_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (INVALID_SOCKET == socket_) { - if (on_error) { - on_error(this, errno); - } - return false; - } - sockaddr_in sa; - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = inet_addr(remote_address.c_str()); - sa.sin_port = htons(remote_port); - if (::connect(socket_, (sockaddr*)&sa, sizeof(sa)) < 0) { - if (on_error) { - on_error(this, errno); - } - ::close(socket_); - socket_ = INVALID_SOCKET; - return false; - } - //set nodelay - { - int flag = 1; - int ret = ::setsockopt(socket_, - IPPROTO_TCP, - TCP_NODELAY, - (char *)&flag, - sizeof(flag)); - assert(ret >= 0); - if (ret < 0) { - abort(); - } - } - connected_ = true; - if (on_connect) { - on_connect(this); - } return true; } void AsioTcpClient::ActiveStop() { connected_ = false; - if (socket_ != INVALID_SOCKET) { - shutdown(socket_, 2); - ::close(socket_); - } - if (worker_thread_) { - worker_thread_shutdown_ = true; - worker_thread_->join(); - delete worker_thread_; - worker_thread_ = nullptr; - } - socket_ = INVALID_SOCKET; } void AsioTcpClient::WorkerThreadProc() { - if (!ActiveStart()) { - return; - } - - sender_thread_shutdown_ = false; - std::thread* senderthread = new std::thread(&AsioTcpClient::SenderThreadProc, this); - char recvBuf[MAX_RECV_BUFFERSIZE]; - while (!worker_thread_shutdown_) { - int ret = ::recv(socket_, recvBuf, MAX_RECV_BUFFERSIZE, 0); - if (ret < 0) { - connected_ = false; - if (on_disconnect) { - on_disconnect(this); - } - worker_thread_shutdown_ = true; - break; - } else if(ret == 0) { - connected_ = false; - if (on_disconnect) { - on_disconnect(this); - } - worker_thread_shutdown_ = true; - break; - } else { - if (on_socketread) { - on_socketread(this, recvBuf, (unsigned int)ret); - } - } - } - sender_thread_shutdown_ = true; - senderthread->join(); - delete senderthread; - senderthread = nullptr; - socket_ = INVALID_SOCKET; } void AsioTcpClient::SenderThreadProc() { - a8::SendQueueNode* worknode = nullptr; - a8::SendQueueNode* currnode = nullptr; - while (!sender_thread_shutdown_) { - if (!worknode && top_node_) { - send_buffer_mutex_->lock(); - worknode = top_node_; - top_node_ = nullptr; - bot_node_ = nullptr; - send_buffer_mutex_->unlock(); - } - - while (worknode && !sender_thread_shutdown_) { - currnode = worknode; - while (currnode->sent_bytes < currnode->bufflen && !sender_thread_shutdown_) { - int len = ::send(socket_, - currnode->buff + currnode->sent_bytes, - currnode->bufflen - currnode->sent_bytes, - 0); - if (len > 0) { - currnode->sent_bytes += len; - } else { - break; - } - } - //send - if (currnode->sent_bytes >= currnode->bufflen) { - worknode = worknode->next; - free(currnode->buff); - free(currnode); - } - - } - { - std::unique_lock lk(*send_cond_mutex_); - send_cond_->wait_for(lk, std::chrono::seconds(1)); - } - } - while (worknode) { - currnode = worknode; - worknode = worknode->next; - free(currnode->buff); - free(currnode); - } } void AsioTcpClient::NotifySendCond() diff --git a/a8/asiotcpclient.h b/a8/asiotcpclient.h index 71dbfee..061efe8 100644 --- a/a8/asiotcpclient.h +++ b/a8/asiotcpclient.h @@ -2,6 +2,8 @@ #ifdef USE_ASIO +#include + namespace a8 { @@ -25,7 +27,8 @@ namespace a8 void SendBuff(const char* buff, unsigned int bufflen); private: - volatile int socket_ = a8::INVALID_SOCKET; + std::shared_ptr resolver_; + std::shared_ptr socket_; volatile bool connected_ = false; volatile bool sender_thread_shutdown_ = false; volatile bool worker_thread_shutdown_ = false;