This commit is contained in:
aozhiwei 2023-12-08 10:44:16 +08:00
parent 380074e2f7
commit af7a9093a0
4 changed files with 32 additions and 14 deletions

View File

@ -60,6 +60,7 @@ namespace a8
void AsioTcpClient::SendBuff(const char* buff, unsigned int bufflen) void AsioTcpClient::SendBuff(const char* buff, unsigned int bufflen)
{ {
//a8::XPrintf("SendBuff bufflen:%d\n", {bufflen});
if (bufflen > 0) { if (bufflen > 0) {
a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode));
memset(p, 0, sizeof(SendQueueNode)); memset(p, 0, sizeof(SendQueueNode));
@ -140,10 +141,11 @@ namespace a8
} }
DoRead(); DoRead();
} else { } else {
a8::XPrintf("DoRead error %s\n", {ec.message()});
actived_ = false; actived_ = false;
connected_ = false; connected_ = false;
if (on_disconnect) { if (on_disconnect) {
on_disconnect(this); on_disconnect(this, ec.value());
} }
} }
}); });
@ -158,7 +160,8 @@ namespace a8
bot_node_ = nullptr; bot_node_ = nullptr;
send_buffer_mutex_->unlock(); send_buffer_mutex_->unlock();
} }
if (work_node_) { if (work_node_ && !sending_) {
sending_ = true;
char* buf = work_node_->buff + work_node_->sent_bytes; char* buf = work_node_->buff + work_node_->sent_bytes;
int buf_len = work_node_->bufflen - work_node_->sent_bytes; int buf_len = work_node_->bufflen - work_node_->sent_bytes;
asio::async_write asio::async_write
@ -167,16 +170,30 @@ namespace a8
[this] (const asio::error_code& ec, std::size_t bytes_transferred) [this] (const asio::error_code& ec, std::size_t bytes_transferred)
{ {
if (!ec) { if (!ec) {
work_node_->sent_bytes += bytes_transferred; send_buffer_mutex_->lock();
if (work_node_->sent_bytes >= work_node_->bufflen) { if (work_node_) {
auto pdelnode = work_node_; work_node_->sent_bytes += bytes_transferred;
work_node_ = work_node_->next; if (work_node_->sent_bytes >= work_node_->bufflen) {
free(pdelnode->buff); auto pdelnode = work_node_;
free((void*)pdelnode); work_node_ = work_node_->next;
free(pdelnode->buff);
free((void*)pdelnode);
}
if (!work_node_) {
sending_ = false;
}
send_buffer_mutex_->unlock();
DoSend();
return;
} }
DoSend(); send_buffer_mutex_->unlock();
} else { } else {
abort(); a8::XPrintf("DoSend error %s\n", {ec.message()});
actived_ = false;
connected_ = false;
if (on_disconnect) {
on_disconnect(this, ec.value());
}
} }
}); });

View File

@ -14,7 +14,7 @@ namespace a8
public: public:
std::function<void (a8::AsioTcpClient*, int)> on_error; std::function<void (a8::AsioTcpClient*, int)> on_error;
std::function<void (a8::AsioTcpClient*)> on_connect; std::function<void (a8::AsioTcpClient*)> on_connect;
std::function<void (a8::AsioTcpClient*)> on_disconnect; std::function<void (a8::AsioTcpClient*, int)> on_disconnect;
std::function<void (a8::AsioTcpClient*, char*, unsigned int)> on_socketread; std::function<void (a8::AsioTcpClient*, char*, unsigned int)> on_socketread;
AsioTcpClient(std::shared_ptr<asio::io_context> io_context, AsioTcpClient(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip, const std::string& remote_ip,
@ -47,6 +47,7 @@ namespace a8
SendQueueNode *top_node_ = nullptr; SendQueueNode *top_node_ = nullptr;
SendQueueNode *bot_node_ = nullptr; SendQueueNode *bot_node_ = nullptr;
volatile SendQueueNode *work_node_ = nullptr; volatile SendQueueNode *work_node_ = nullptr;
volatile bool sending_ = false;
std::array<char, 1024 * 64> buffer_; std::array<char, 1024 * 64> buffer_;
void SetActive(bool active); void SetActive(bool active);

View File

@ -66,10 +66,10 @@ namespace a8
socket->SendBuff(data.data(), data.size()); socket->SendBuff(data.data(), data.size());
}; };
tcp_client_->on_disconnect = tcp_client_->on_disconnect =
[this] (a8::AsioTcpClient* socket) [this] (a8::AsioTcpClient* socket, int err)
{ {
if (on_disconnect) { if (on_disconnect) {
on_disconnect(this); on_disconnect(this, err);
} }
}; };
tcp_client_->on_socketread = tcp_client_->on_socketread =

View File

@ -19,7 +19,7 @@ namespace a8
std::function<void (a8::WebSocketClient*, int)> on_error; std::function<void (a8::WebSocketClient*, int)> on_error;
std::function<void (a8::WebSocketClient*)> on_connect; std::function<void (a8::WebSocketClient*)> on_connect;
std::function<void (a8::WebSocketClient*)> on_disconnect; std::function<void (a8::WebSocketClient*, int)> on_disconnect;
std::function<void (char*, int&, unsigned int)> on_decode_userpacket; std::function<void (char*, int&, unsigned int)> on_decode_userpacket;
void Open(); void Open();