diff --git a/third_party/f8/f8/CMakeLists.txt b/third_party/f8/f8/CMakeLists.txt index fb18b62..190bdb4 100644 --- a/third_party/f8/f8/CMakeLists.txt +++ b/third_party/f8/f8/CMakeLists.txt @@ -28,4 +28,8 @@ aux_source_directory(. F8_SRC_LIST ) +aux_source_directory(./internal + F8_SRC_LIST +) + add_library(f8 STATIC ${F8_SRC_LIST}) diff --git a/third_party/f8/f8/internal/asiotcpclient.cc b/third_party/f8/f8/internal/asiotcpclient.cc new file mode 100644 index 0000000..bbd2152 --- /dev/null +++ b/third_party/f8/f8/internal/asiotcpclient.cc @@ -0,0 +1,217 @@ +#include + +#include +#include + +#include +#include + +const int MAX_RECV_BUFFERSIZE = 1024 * 64; + +namespace f8 +{ + + namespace internal + { + + AsioTcpClient::AsioTcpClient(std::shared_ptr io_context, + const std::string& remote_ip, + int remote_port) + { + Init(io_context, remote_ip, remote_port); + } + + AsioTcpClient::AsioTcpClient(const std::string& remote_ip, int remote_port) + { + Init(f8::IoMgr::Instance()->GetIoContext(0), remote_ip, remote_port); + } + + void AsioTcpClient::Init(std::shared_ptr io_context, + const std::string& remote_ip, + int remote_port) + { + io_context_ = io_context; + remote_address_ = remote_ip; + remote_port_ = remote_port; + endpoint_ = std::make_shared + ( + asio::ip::address::from_string(remote_address_), + remote_port_ + ); + send_buffer_mutex_ = std::make_shared(); + socket_ = std::make_shared(*io_context); + } + + AsioTcpClient::~AsioTcpClient() + { + Close(); + } + + void AsioTcpClient::Open() + { + if (!IsActive()) { + SetActive(true); + } + } + + void AsioTcpClient::Close() + { + if (IsActive()) { + SetActive(false); + } + } + + bool AsioTcpClient::IsActive() + { + return actived_; + } + + bool AsioTcpClient::Connected() + { + return connected_; + } + + void AsioTcpClient::SendBuff(const char* buff, unsigned int bufflen) + { + //a8::XPrintf("SendBuff bufflen:%d\n", {bufflen}); + if (bufflen > 0) { + a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); + memset(p, 0, sizeof(a8::SendQueueNode)); + p->buff = (char*)malloc(bufflen); + memmove(p->buff, buff, bufflen); + p->bufflen = bufflen; + send_buffer_mutex_->lock(); + if (bot_node_) { + bot_node_->next = p; + bot_node_ = p; + }else{ + top_node_ = p; + bot_node_ = p; + } + send_buffer_mutex_->unlock(); + DoSend(); + } + } + + void AsioTcpClient::SetActive(bool active) + { + if (active) { + if (!IsActive()) { + ActiveStart(); + } + } else { + if (IsActive()) { + ActiveStop(); + } + } + } + + void AsioTcpClient::ActiveStart() + { + actived_ = true; + connected_ = false; + socket_->async_connect + (*endpoint_, + [this] (const asio::error_code& ec) + { + HandleConnect(ec); + }); + } + + void AsioTcpClient::ActiveStop() + { + actived_ = true; + connected_ = false; + } + + void AsioTcpClient::HandleConnect(const asio::error_code& err) + { + if (err) { + actived_ = false; + connected_ = false; + if (on_error) { + on_error(this, err.value()); + } + return; + } else { + connected_ = true; + if (on_connect) { + on_connect(this); + } + DoRead(); + } + } + + void AsioTcpClient::DoRead() + { + socket_->async_read_some + (asio::buffer(buffer_), + [this](std::error_code ec, std::size_t bytes_transferred) + { + if (!ec) { + if (on_socketread) { + on_socketread(this, buffer_.data(), bytes_transferred); + } + DoRead(); + } else { + a8::XPrintf("DoRead error %s\n", {ec.message()}); + actived_ = false; + connected_ = false; + if (on_disconnect) { + on_disconnect(this, ec.value()); + } + } + }); + } + + void AsioTcpClient::DoSend() + { + if (!work_node_) { + send_buffer_mutex_->lock(); + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + send_buffer_mutex_->unlock(); + } + if (work_node_ && !sending_) { + sending_ = true; + char* buf = work_node_->buff + work_node_->sent_bytes; + int buf_len = work_node_->bufflen - work_node_->sent_bytes; + asio::async_write + (*socket_, +asio::buffer(buf, buf_len), + [this] (const asio::error_code& ec, std::size_t bytes_transferred) + { + if (!ec) { + send_buffer_mutex_->lock(); + if (work_node_) { + work_node_->sent_bytes += bytes_transferred; + if (work_node_->sent_bytes >= work_node_->bufflen) { + auto pdelnode = work_node_; + work_node_ = work_node_->next; + free(pdelnode->buff); + free((void*)pdelnode); + } + if (!work_node_) { + sending_ = false; + } + send_buffer_mutex_->unlock(); + DoSend(); + return; + } + send_buffer_mutex_->unlock(); + } else { + a8::XPrintf("DoSend error %s\n", {ec.message()}); + actived_ = false; + connected_ = false; + if (on_disconnect) { + on_disconnect(this, ec.value()); + } + } + }); + + } + } + + } +} diff --git a/third_party/f8/f8/internal/asiotcpclient.h b/third_party/f8/f8/internal/asiotcpclient.h new file mode 100644 index 0000000..2a44f81 --- /dev/null +++ b/third_party/f8/f8/internal/asiotcpclient.h @@ -0,0 +1,65 @@ +#pragma once + +#include + +using asio::ip::tcp; + +namespace f8 +{ + namespace internal + { + + class AsioTcpClient + { + public: + std::function on_error; + std::function on_connect; + std::function on_disconnect; + std::function on_socketread; + + AsioTcpClient(std::shared_ptr io_context, + const std::string& remote_ip, + int remote_port); + AsioTcpClient(const std::string& remote_ip, int remote_port); + virtual ~AsioTcpClient(); + const std::string& GetRemoteAddress() { return remote_address_; } + int GetRemotePort() { return remote_port_; } + + void Open(); + void Close(); + bool IsActive(); + bool Connected(); + void SendBuff(const char* buff, unsigned int bufflen); + + private: + void Init(std::shared_ptr io_context, + const std::string& remote_ip, + int remote_port); + void HandleConnect(const asio::error_code& err); + void DoRead(); + void DoSend(); + + private: + std::shared_ptr io_context_; + std::string remote_address_; + int remote_port_ = 0; + + std::shared_ptr endpoint_; + std::shared_ptr socket_; + volatile bool actived_ = false; + volatile bool connected_ = false; + std::shared_ptr send_buffer_mutex_; + a8::SendQueueNode *top_node_ = nullptr; + a8::SendQueueNode *bot_node_ = nullptr; + volatile a8::SendQueueNode *work_node_ = nullptr; + volatile bool sending_ = false; + std::array buffer_; + + void SetActive(bool active); + void ActiveStart(); + void ActiveStop(); + }; + + } + +} diff --git a/third_party/f8/f8/iomgr.h b/third_party/f8/f8/iomgr.h index 88cad8c..6e230d8 100644 --- a/third_party/f8/f8/iomgr.h +++ b/third_party/f8/f8/iomgr.h @@ -13,9 +13,9 @@ namespace f8 void Init(); void UnInit(); void Update(); + std::shared_ptr GetIoContext(int type); private: - std::shared_ptr GetIoContext(int type); void WorkerThreadProc(std::shared_ptr io_context); private: diff --git a/third_party/f8/f8/libf8.a b/third_party/f8/f8/libf8.a index 32b00d7..34f9d7c 100644 Binary files a/third_party/f8/f8/libf8.a and b/third_party/f8/f8/libf8.a differ diff --git a/third_party/f8/f8/tcpclient.cc b/third_party/f8/f8/tcpclient.cc index 28520f3..107d5c6 100644 --- a/third_party/f8/f8/tcpclient.cc +++ b/third_party/f8/f8/tcpclient.cc @@ -1,213 +1,39 @@ #include -#include -#include - #include -#include - -const int MAX_RECV_BUFFERSIZE = 1024 * 64; +#include namespace f8 { - TcpClient::TcpClient(std::shared_ptr io_context, - const std::string& remote_ip, - int remote_port) - { - Init(io_context, remote_ip, remote_port); - } - TcpClient::TcpClient(const std::string& remote_ip, int remote_port) { - Init(f8::IoMgr::Instance()->GetIoContext(0), remote_ip, remote_port); - } - - void TcpClient::Init(std::shared_ptr io_context, - const std::string& remote_ip, - int remote_port) - { - io_context_ = io_context; - remote_address_ = remote_ip; - remote_port_ = remote_port; - endpoint_ = std::make_shared - ( - asio::ip::address::from_string(remote_address_), - remote_port_ - ); - send_buffer_mutex_ = std::make_shared(); - socket_ = std::make_shared(*io_context); + impl_ = std::make_shared(remote_ip, remote_port); } TcpClient::~TcpClient() { - Close(); + impl_->Close(); } void TcpClient::Open() { - if (!IsActive()) { - SetActive(true); - } + impl_->Open(); } void TcpClient::Close() { - if (IsActive()) { - SetActive(false); - } - } - - bool TcpClient::IsActive() - { - return actived_; + impl_->Close(); } bool TcpClient::Connected() { - return connected_; + return impl_->Connected(); } void TcpClient::SendBuff(const char* buff, unsigned int bufflen) { - //a8::XPrintf("SendBuff bufflen:%d\n", {bufflen}); - if (bufflen > 0) { - a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); - memset(p, 0, sizeof(a8::SendQueueNode)); - p->buff = (char*)malloc(bufflen); - memmove(p->buff, buff, bufflen); - p->bufflen = bufflen; - send_buffer_mutex_->lock(); - if (bot_node_) { - bot_node_->next = p; - bot_node_ = p; - }else{ - top_node_ = p; - bot_node_ = p; - } - send_buffer_mutex_->unlock(); - DoSend(); - } - } - - void TcpClient::SetActive(bool active) - { - if (active) { - if (!IsActive()) { - ActiveStart(); - } - } else { - if (IsActive()) { - ActiveStop(); - } - } - } - - void TcpClient::ActiveStart() - { - actived_ = true; - connected_ = false; - socket_->async_connect - (*endpoint_, - [this] (const asio::error_code& ec) - { - HandleConnect(ec); - }); - } - - void TcpClient::ActiveStop() - { - actived_ = true; - connected_ = false; - } - - void TcpClient::HandleConnect(const asio::error_code& err) - { - if (err) { - actived_ = false; - connected_ = false; - if (on_error) { - on_error(this, err.value()); - } - return; - } else { - connected_ = true; - if (on_connect) { - on_connect(this); - } - DoRead(); - } - } - - void TcpClient::DoRead() - { - socket_->async_read_some - (asio::buffer(buffer_), - [this](std::error_code ec, std::size_t bytes_transferred) - { - if (!ec) { - if (on_socketread) { - on_socketread(this, buffer_.data(), bytes_transferred); - } - DoRead(); - } else { - a8::XPrintf("DoRead error %s\n", {ec.message()}); - actived_ = false; - connected_ = false; - if (on_disconnect) { - on_disconnect(this, ec.value()); - } - } - }); - } - - void TcpClient::DoSend() - { - if (!work_node_) { - send_buffer_mutex_->lock(); - work_node_ = top_node_; - top_node_ = nullptr; - bot_node_ = nullptr; - send_buffer_mutex_->unlock(); - } - if (work_node_ && !sending_) { - sending_ = true; - char* buf = work_node_->buff + work_node_->sent_bytes; - int buf_len = work_node_->bufflen - work_node_->sent_bytes; - asio::async_write - (*socket_, - asio::buffer(buf, buf_len), - [this] (const asio::error_code& ec, std::size_t bytes_transferred) - { - if (!ec) { - send_buffer_mutex_->lock(); - if (work_node_) { - work_node_->sent_bytes += bytes_transferred; - if (work_node_->sent_bytes >= work_node_->bufflen) { - auto pdelnode = work_node_; - work_node_ = work_node_->next; - free(pdelnode->buff); - free((void*)pdelnode); - } - if (!work_node_) { - sending_ = false; - } - send_buffer_mutex_->unlock(); - DoSend(); - return; - } - send_buffer_mutex_->unlock(); - } else { - a8::XPrintf("DoSend error %s\n", {ec.message()}); - actived_ = false; - connected_ = false; - if (on_disconnect) { - on_disconnect(this, ec.value()); - } - } - }); - - } + impl_->SendBuff(buff, bufflen); } } diff --git a/third_party/f8/f8/tcpclient.h b/third_party/f8/f8/tcpclient.h index 734089c..bc61d48 100644 --- a/third_party/f8/f8/tcpclient.h +++ b/third_party/f8/f8/tcpclient.h @@ -1,11 +1,17 @@ #pragma once -#include - -using asio::ip::tcp; - namespace f8 { + namespace internal + { + class AsioTcpClient; + } +} + +namespace f8 +{ + + using TcpClientImpl = f8::internal::AsioTcpClient; class TcpClient { @@ -15,47 +21,19 @@ namespace f8 std::function on_disconnect; std::function on_socketread; - TcpClient(std::shared_ptr io_context, - const std::string& remote_ip, - int remote_port); TcpClient(const std::string& remote_ip, int remote_port); virtual ~TcpClient(); - const std::string& GetRemoteAddress() { return remote_address_; } - int GetRemotePort() { return remote_port_; } + const std::string& GetRemoteAddress(); + int GetRemotePort(); void Open(); void Close(); - bool IsActive(); bool Connected(); void SendBuff(const char* buff, unsigned int bufflen); private: - void Init(std::shared_ptr io_context, - const std::string& remote_ip, - int remote_port); - void HandleConnect(const asio::error_code& err); - void DoRead(); - void DoSend(); + std::shared_ptr impl_; - private: - std::shared_ptr io_context_; - std::string remote_address_; - int remote_port_ = 0; - - std::shared_ptr endpoint_; - std::shared_ptr socket_; - volatile bool actived_ = false; - volatile bool connected_ = false; - std::shared_ptr send_buffer_mutex_; - a8::SendQueueNode *top_node_ = nullptr; - a8::SendQueueNode *bot_node_ = nullptr; - volatile a8::SendQueueNode *work_node_ = nullptr; - volatile bool sending_ = false; - std::array buffer_; - - void SetActive(bool active); - void ActiveStart(); - void ActiveStop(); }; }