This commit is contained in:
aozhiwei 2024-12-22 09:11:02 +08:00
parent c64eba3f15
commit b684e835d7
7 changed files with 307 additions and 217 deletions

View File

@ -28,4 +28,8 @@ aux_source_directory(.
F8_SRC_LIST
)
aux_source_directory(./internal
F8_SRC_LIST
)
add_library(f8 STATIC ${F8_SRC_LIST})

View File

@ -0,0 +1,217 @@
#include <f8/internal/pch.h>
#include <string.h>
#include <assert.h>
#include <f8/internal/asiotcpclient.h>
#include <f8/iomgr.h>
const int MAX_RECV_BUFFERSIZE = 1024 * 64;
namespace f8
{
namespace internal
{
AsioTcpClient::AsioTcpClient(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip,
int remote_port)
{
Init(io_context, remote_ip, remote_port);
}
AsioTcpClient::AsioTcpClient(const std::string& remote_ip, int remote_port)
{
Init(f8::IoMgr::Instance()->GetIoContext(0), remote_ip, remote_port);
}
void AsioTcpClient::Init(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip,
int remote_port)
{
io_context_ = io_context;
remote_address_ = remote_ip;
remote_port_ = remote_port;
endpoint_ = std::make_shared<asio::ip::tcp::endpoint>
(
asio::ip::address::from_string(remote_address_),
remote_port_
);
send_buffer_mutex_ = std::make_shared<std::mutex>();
socket_ = std::make_shared<asio::ip::tcp::socket>(*io_context);
}
AsioTcpClient::~AsioTcpClient()
{
Close();
}
void AsioTcpClient::Open()
{
if (!IsActive()) {
SetActive(true);
}
}
void AsioTcpClient::Close()
{
if (IsActive()) {
SetActive(false);
}
}
bool AsioTcpClient::IsActive()
{
return actived_;
}
bool AsioTcpClient::Connected()
{
return connected_;
}
void AsioTcpClient::SendBuff(const char* buff, unsigned int bufflen)
{
//a8::XPrintf("SendBuff bufflen:%d\n", {bufflen});
if (bufflen > 0) {
a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode));
memset(p, 0, sizeof(a8::SendQueueNode));
p->buff = (char*)malloc(bufflen);
memmove(p->buff, buff, bufflen);
p->bufflen = bufflen;
send_buffer_mutex_->lock();
if (bot_node_) {
bot_node_->next = p;
bot_node_ = p;
}else{
top_node_ = p;
bot_node_ = p;
}
send_buffer_mutex_->unlock();
DoSend();
}
}
void AsioTcpClient::SetActive(bool active)
{
if (active) {
if (!IsActive()) {
ActiveStart();
}
} else {
if (IsActive()) {
ActiveStop();
}
}
}
void AsioTcpClient::ActiveStart()
{
actived_ = true;
connected_ = false;
socket_->async_connect
(*endpoint_,
[this] (const asio::error_code& ec)
{
HandleConnect(ec);
});
}
void AsioTcpClient::ActiveStop()
{
actived_ = true;
connected_ = false;
}
void AsioTcpClient::HandleConnect(const asio::error_code& err)
{
if (err) {
actived_ = false;
connected_ = false;
if (on_error) {
on_error(this, err.value());
}
return;
} else {
connected_ = true;
if (on_connect) {
on_connect(this);
}
DoRead();
}
}
void AsioTcpClient::DoRead()
{
socket_->async_read_some
(asio::buffer(buffer_),
[this](std::error_code ec, std::size_t bytes_transferred)
{
if (!ec) {
if (on_socketread) {
on_socketread(this, buffer_.data(), bytes_transferred);
}
DoRead();
} else {
a8::XPrintf("DoRead error %s\n", {ec.message()});
actived_ = false;
connected_ = false;
if (on_disconnect) {
on_disconnect(this, ec.value());
}
}
});
}
void AsioTcpClient::DoSend()
{
if (!work_node_) {
send_buffer_mutex_->lock();
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
send_buffer_mutex_->unlock();
}
if (work_node_ && !sending_) {
sending_ = true;
char* buf = work_node_->buff + work_node_->sent_bytes;
int buf_len = work_node_->bufflen - work_node_->sent_bytes;
asio::async_write
(*socket_,
asio::buffer(buf, buf_len),
[this] (const asio::error_code& ec, std::size_t bytes_transferred)
{
if (!ec) {
send_buffer_mutex_->lock();
if (work_node_) {
work_node_->sent_bytes += bytes_transferred;
if (work_node_->sent_bytes >= work_node_->bufflen) {
auto pdelnode = work_node_;
work_node_ = work_node_->next;
free(pdelnode->buff);
free((void*)pdelnode);
}
if (!work_node_) {
sending_ = false;
}
send_buffer_mutex_->unlock();
DoSend();
return;
}
send_buffer_mutex_->unlock();
} else {
a8::XPrintf("DoSend error %s\n", {ec.message()});
actived_ = false;
connected_ = false;
if (on_disconnect) {
on_disconnect(this, ec.value());
}
}
});
}
}
}
}

View File

@ -0,0 +1,65 @@
#pragma once
#include <asio.hpp>
using asio::ip::tcp;
namespace f8
{
namespace internal
{
class AsioTcpClient
{
public:
std::function<void (f8::internal::AsioTcpClient*, int)> on_error;
std::function<void (f8::internal::AsioTcpClient*)> on_connect;
std::function<void (f8::internal::AsioTcpClient*, int)> on_disconnect;
std::function<void (f8::internal::AsioTcpClient*, char*, unsigned int)> on_socketread;
AsioTcpClient(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip,
int remote_port);
AsioTcpClient(const std::string& remote_ip, int remote_port);
virtual ~AsioTcpClient();
const std::string& GetRemoteAddress() { return remote_address_; }
int GetRemotePort() { return remote_port_; }
void Open();
void Close();
bool IsActive();
bool Connected();
void SendBuff(const char* buff, unsigned int bufflen);
private:
void Init(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip,
int remote_port);
void HandleConnect(const asio::error_code& err);
void DoRead();
void DoSend();
private:
std::shared_ptr<asio::io_context> io_context_;
std::string remote_address_;
int remote_port_ = 0;
std::shared_ptr<asio::ip::tcp::endpoint> endpoint_;
std::shared_ptr<asio::ip::tcp::socket> socket_;
volatile bool actived_ = false;
volatile bool connected_ = false;
std::shared_ptr<std::mutex> send_buffer_mutex_;
a8::SendQueueNode *top_node_ = nullptr;
a8::SendQueueNode *bot_node_ = nullptr;
volatile a8::SendQueueNode *work_node_ = nullptr;
volatile bool sending_ = false;
std::array<char, 1024 * 64> buffer_;
void SetActive(bool active);
void ActiveStart();
void ActiveStop();
};
}
}

View File

@ -13,9 +13,9 @@ namespace f8
void Init();
void UnInit();
void Update();
std::shared_ptr<asio::io_context> GetIoContext(int type);
private:
std::shared_ptr<asio::io_context> GetIoContext(int type);
void WorkerThreadProc(std::shared_ptr<asio::io_context> io_context);
private:

Binary file not shown.

View File

@ -1,213 +1,39 @@
#include <f8/internal/pch.h>
#include <string.h>
#include <assert.h>
#include <f8/tcpclient.h>
#include <f8/iomgr.h>
const int MAX_RECV_BUFFERSIZE = 1024 * 64;
#include <f8/internal/asiotcpclient.h>
namespace f8
{
TcpClient::TcpClient(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip,
int remote_port)
{
Init(io_context, remote_ip, remote_port);
}
TcpClient::TcpClient(const std::string& remote_ip, int remote_port)
{
Init(f8::IoMgr::Instance()->GetIoContext(0), remote_ip, remote_port);
}
void TcpClient::Init(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip,
int remote_port)
{
io_context_ = io_context;
remote_address_ = remote_ip;
remote_port_ = remote_port;
endpoint_ = std::make_shared<asio::ip::tcp::endpoint>
(
asio::ip::address::from_string(remote_address_),
remote_port_
);
send_buffer_mutex_ = std::make_shared<std::mutex>();
socket_ = std::make_shared<asio::ip::tcp::socket>(*io_context);
impl_ = std::make_shared<TcpClientImpl>(remote_ip, remote_port);
}
TcpClient::~TcpClient()
{
Close();
impl_->Close();
}
void TcpClient::Open()
{
if (!IsActive()) {
SetActive(true);
}
impl_->Open();
}
void TcpClient::Close()
{
if (IsActive()) {
SetActive(false);
}
}
bool TcpClient::IsActive()
{
return actived_;
impl_->Close();
}
bool TcpClient::Connected()
{
return connected_;
return impl_->Connected();
}
void TcpClient::SendBuff(const char* buff, unsigned int bufflen)
{
//a8::XPrintf("SendBuff bufflen:%d\n", {bufflen});
if (bufflen > 0) {
a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode));
memset(p, 0, sizeof(a8::SendQueueNode));
p->buff = (char*)malloc(bufflen);
memmove(p->buff, buff, bufflen);
p->bufflen = bufflen;
send_buffer_mutex_->lock();
if (bot_node_) {
bot_node_->next = p;
bot_node_ = p;
}else{
top_node_ = p;
bot_node_ = p;
}
send_buffer_mutex_->unlock();
DoSend();
}
}
void TcpClient::SetActive(bool active)
{
if (active) {
if (!IsActive()) {
ActiveStart();
}
} else {
if (IsActive()) {
ActiveStop();
}
}
}
void TcpClient::ActiveStart()
{
actived_ = true;
connected_ = false;
socket_->async_connect
(*endpoint_,
[this] (const asio::error_code& ec)
{
HandleConnect(ec);
});
}
void TcpClient::ActiveStop()
{
actived_ = true;
connected_ = false;
}
void TcpClient::HandleConnect(const asio::error_code& err)
{
if (err) {
actived_ = false;
connected_ = false;
if (on_error) {
on_error(this, err.value());
}
return;
} else {
connected_ = true;
if (on_connect) {
on_connect(this);
}
DoRead();
}
}
void TcpClient::DoRead()
{
socket_->async_read_some
(asio::buffer(buffer_),
[this](std::error_code ec, std::size_t bytes_transferred)
{
if (!ec) {
if (on_socketread) {
on_socketread(this, buffer_.data(), bytes_transferred);
}
DoRead();
} else {
a8::XPrintf("DoRead error %s\n", {ec.message()});
actived_ = false;
connected_ = false;
if (on_disconnect) {
on_disconnect(this, ec.value());
}
}
});
}
void TcpClient::DoSend()
{
if (!work_node_) {
send_buffer_mutex_->lock();
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
send_buffer_mutex_->unlock();
}
if (work_node_ && !sending_) {
sending_ = true;
char* buf = work_node_->buff + work_node_->sent_bytes;
int buf_len = work_node_->bufflen - work_node_->sent_bytes;
asio::async_write
(*socket_,
asio::buffer(buf, buf_len),
[this] (const asio::error_code& ec, std::size_t bytes_transferred)
{
if (!ec) {
send_buffer_mutex_->lock();
if (work_node_) {
work_node_->sent_bytes += bytes_transferred;
if (work_node_->sent_bytes >= work_node_->bufflen) {
auto pdelnode = work_node_;
work_node_ = work_node_->next;
free(pdelnode->buff);
free((void*)pdelnode);
}
if (!work_node_) {
sending_ = false;
}
send_buffer_mutex_->unlock();
DoSend();
return;
}
send_buffer_mutex_->unlock();
} else {
a8::XPrintf("DoSend error %s\n", {ec.message()});
actived_ = false;
connected_ = false;
if (on_disconnect) {
on_disconnect(this, ec.value());
}
}
});
}
impl_->SendBuff(buff, bufflen);
}
}

View File

@ -1,11 +1,17 @@
#pragma once
#include <asio.hpp>
using asio::ip::tcp;
namespace f8
{
namespace internal
{
class AsioTcpClient;
}
}
namespace f8
{
using TcpClientImpl = f8::internal::AsioTcpClient;
class TcpClient
{
@ -15,47 +21,19 @@ namespace f8
std::function<void (f8::TcpClient*, int)> on_disconnect;
std::function<void (f8::TcpClient*, char*, unsigned int)> on_socketread;
TcpClient(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip,
int remote_port);
TcpClient(const std::string& remote_ip, int remote_port);
virtual ~TcpClient();
const std::string& GetRemoteAddress() { return remote_address_; }
int GetRemotePort() { return remote_port_; }
const std::string& GetRemoteAddress();
int GetRemotePort();
void Open();
void Close();
bool IsActive();
bool Connected();
void SendBuff(const char* buff, unsigned int bufflen);
private:
void Init(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip,
int remote_port);
void HandleConnect(const asio::error_code& err);
void DoRead();
void DoSend();
std::shared_ptr<TcpClientImpl> impl_;
private:
std::shared_ptr<asio::io_context> io_context_;
std::string remote_address_;
int remote_port_ = 0;
std::shared_ptr<asio::ip::tcp::endpoint> endpoint_;
std::shared_ptr<asio::ip::tcp::socket> socket_;
volatile bool actived_ = false;
volatile bool connected_ = false;
std::shared_ptr<std::mutex> send_buffer_mutex_;
a8::SendQueueNode *top_node_ = nullptr;
a8::SendQueueNode *bot_node_ = nullptr;
volatile a8::SendQueueNode *work_node_ = nullptr;
volatile bool sending_ = false;
std::array<char, 1024 * 64> buffer_;
void SetActive(bool active);
void ActiveStart();
void ActiveStop();
};
}