Compare commits

..

4 Commits

Author SHA1 Message Date
aozhiwei
e4cc44cf80 1 2024-09-27 10:49:06 +08:00
aozhiwei
5e891d67a7 1 2024-09-26 15:53:03 +08:00
aozhiwei
c99c33e2e9 1 2024-09-26 15:11:16 +08:00
aozhiwei
71fcbd8bc5 1 2024-09-26 15:09:12 +08:00
12 changed files with 118 additions and 90 deletions

View File

@ -60,7 +60,6 @@ namespace a8
void AsioTcpClient::SendBuff(const char* buff, unsigned int bufflen) void AsioTcpClient::SendBuff(const char* buff, unsigned int bufflen)
{ {
//a8::XPrintf("SendBuff bufflen:%d\n", {bufflen});
if (bufflen > 0) { if (bufflen > 0) {
a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode));
memset(p, 0, sizeof(SendQueueNode)); memset(p, 0, sizeof(SendQueueNode));
@ -141,11 +140,10 @@ namespace a8
} }
DoRead(); DoRead();
} else { } else {
a8::XPrintf("DoRead error %s\n", {ec.message()});
actived_ = false; actived_ = false;
connected_ = false; connected_ = false;
if (on_disconnect) { if (on_disconnect) {
on_disconnect(this, ec.value()); on_disconnect(this);
} }
} }
}); });
@ -160,8 +158,7 @@ namespace a8
bot_node_ = nullptr; bot_node_ = nullptr;
send_buffer_mutex_->unlock(); send_buffer_mutex_->unlock();
} }
if (work_node_ && !sending_) { if (work_node_) {
sending_ = true;
char* buf = work_node_->buff + work_node_->sent_bytes; char* buf = work_node_->buff + work_node_->sent_bytes;
int buf_len = work_node_->bufflen - work_node_->sent_bytes; int buf_len = work_node_->bufflen - work_node_->sent_bytes;
asio::async_write asio::async_write
@ -170,30 +167,16 @@ namespace a8
[this] (const asio::error_code& ec, std::size_t bytes_transferred) [this] (const asio::error_code& ec, std::size_t bytes_transferred)
{ {
if (!ec) { if (!ec) {
send_buffer_mutex_->lock(); work_node_->sent_bytes += bytes_transferred;
if (work_node_) { if (work_node_->sent_bytes >= work_node_->bufflen) {
work_node_->sent_bytes += bytes_transferred; auto pdelnode = work_node_;
if (work_node_->sent_bytes >= work_node_->bufflen) { work_node_ = work_node_->next;
auto pdelnode = work_node_; free(pdelnode->buff);
work_node_ = work_node_->next; free((void*)pdelnode);
free(pdelnode->buff);
free((void*)pdelnode);
}
if (!work_node_) {
sending_ = false;
}
send_buffer_mutex_->unlock();
DoSend();
return;
} }
send_buffer_mutex_->unlock(); DoSend();
} else { } else {
a8::XPrintf("DoSend error %s\n", {ec.message()}); abort();
actived_ = false;
connected_ = false;
if (on_disconnect) {
on_disconnect(this, ec.value());
}
} }
}); });

View File

@ -14,7 +14,7 @@ namespace a8
public: public:
std::function<void (a8::AsioTcpClient*, int)> on_error; std::function<void (a8::AsioTcpClient*, int)> on_error;
std::function<void (a8::AsioTcpClient*)> on_connect; std::function<void (a8::AsioTcpClient*)> on_connect;
std::function<void (a8::AsioTcpClient*, int)> on_disconnect; std::function<void (a8::AsioTcpClient*)> on_disconnect;
std::function<void (a8::AsioTcpClient*, char*, unsigned int)> on_socketread; std::function<void (a8::AsioTcpClient*, char*, unsigned int)> on_socketread;
AsioTcpClient(std::shared_ptr<asio::io_context> io_context, AsioTcpClient(std::shared_ptr<asio::io_context> io_context,
const std::string& remote_ip, const std::string& remote_ip,
@ -47,7 +47,6 @@ namespace a8
SendQueueNode *top_node_ = nullptr; SendQueueNode *top_node_ = nullptr;
SendQueueNode *bot_node_ = nullptr; SendQueueNode *bot_node_ = nullptr;
volatile SendQueueNode *work_node_ = nullptr; volatile SendQueueNode *work_node_ = nullptr;
volatile bool sending_ = false;
std::array<char, 1024 * 64> buffer_; std::array<char, 1024 * 64> buffer_;
void SetActive(bool active); void SetActive(bool active);

View File

@ -1,6 +1,8 @@
#include <a8/a8.h> #include <a8/a8.h>
#include <a8/perfmonitor.h> #include <a8/perfmonitor.h>
#include <mutex>
namespace a8 namespace a8
{ {
@ -10,10 +12,60 @@ namespace a8
void PerfMonitor::Init() void PerfMonitor::Init()
{ {
mutex_ = std::make_shared<std::mutex>();
} }
void PerfMonitor::UnInit() void PerfMonitor::UnInit()
{ {
} }
long long PerfMonitor::AddV(const std::string key, long long val)
{
long long result = 0;
mutex_->lock();
auto itr = dyn_hash_.find(key);
if (itr == dyn_hash_.end()) {
dyn_hash_[key] = val;
result = val;
} else {
itr->second = itr->second + val;
result = itr->second;
}
mutex_->unlock();
return result;
}
long long PerfMonitor::SubV(const std::string key, long long val)
{
return AddV(key, -val);
}
void PerfMonitor::SetV(const std::string key, long long val)
{
mutex_->lock();
dyn_hash_[key] = val;
mutex_->unlock();
}
long long PerfMonitor::GetV(const std::string key)
{
long long result = 0;
mutex_->lock();
auto itr = dyn_hash_.find(key);
if (itr != dyn_hash_.end()) {
result = itr->second;
}
mutex_->unlock();
return result;
}
void PerfMonitor::Dump()
{
mutex_->lock();
for (auto& pair : dyn_hash_) {
a8::XPrintf("Perfmonitor::V %s = %d\n", {pair.first, pair.second});
}
mutex_->unlock();
}
} }

View File

@ -1,7 +1,5 @@
#pragma once #pragma once
#include <atomic>
#include <a8/singleton.h> #include <a8/singleton.h>
namespace a8 namespace a8
@ -17,16 +15,21 @@ namespace a8
void Init(); void Init();
void UnInit(); void UnInit();
long long AddV(const std::string key, long long val);
long long SubV(const std::string key, long long val);
void SetV(const std::string key, long long val);
long long GetV(const std::string key);
void Dump();
public: public:
long long send_eagain_times = 0; long long send_eagain_times = 0;
long long recv_eagain_times = 0; long long recv_eagain_times = 0;
long long max_send_time = 0; long long max_send_time = 0;
long long max_recv_time = 0; long long max_recv_time = 0;
std::atomic<long long> server_send_bytes = {0};
std::atomic<long long> server_consume_bytes = {0};
std::atomic<long long> conn_send_bytes = {0};
std::atomic<long long> conn_consume_bytes = {0};
private:
std::shared_ptr<std::mutex> mutex_;
std::map<std::string, long long> dyn_hash_;
}; };
} }

View File

@ -342,4 +342,13 @@ namespace a8
abort(); abort();
} }
void TestSleep(int count, int interval, const std::string text)
{
int i = 0;
while (i++ < count) {
sleep(interval);
}
XPrintf("TestSleep %s\n", {text});
}
} }

View File

@ -97,4 +97,5 @@ namespace a8
void ClearSendQueue(a8::SendQueueNode* node); void ClearSendQueue(a8::SendQueueNode* node);
void Abort(); void Abort();
void TestSleep(int count, int interval, const std::string text);
} }

View File

@ -12,8 +12,8 @@
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <a8/a8.h> #include <a8/a8.h>
#include <a8/tcpclient.h>
#include <a8/perfmonitor.h> #include <a8/perfmonitor.h>
#include <a8/tcpclient.h>
const int MAX_RECV_BUFFERSIZE = 1024 * 64; const int MAX_RECV_BUFFERSIZE = 1024 * 64;
@ -66,9 +66,6 @@ namespace a8
void TcpClient::SendBuff(const char* buff, unsigned int bufflen) void TcpClient::SendBuff(const char* buff, unsigned int bufflen)
{ {
if (bufflen > 0) { if (bufflen > 0) {
#ifdef A8_PERFT
PerfMonitor::Instance()->conn_send_bytes += bufflen;
#endif
a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode));
memset(p, 0, sizeof(SendQueueNode)); memset(p, 0, sizeof(SendQueueNode));
p->buff = (char*)malloc(bufflen); p->buff = (char*)malloc(bufflen);
@ -84,6 +81,9 @@ namespace a8
} }
send_buffer_mutex_->unlock(); send_buffer_mutex_->unlock();
NotifySendCond(); NotifySendCond();
#ifdef MMCHK
a8::PerfMonitor::Instance()->AddV("a8.TcpClient.count", 1);
#endif
} }
} }
@ -221,9 +221,6 @@ namespace a8
currnode->buff + currnode->sent_bytes, currnode->buff + currnode->sent_bytes,
currnode->bufflen - currnode->sent_bytes, currnode->bufflen - currnode->sent_bytes,
0); 0);
#ifdef A8_PERFT
PerfMonitor::Instance()->conn_consume_bytes += len;
#endif
if (len > 0) { if (len > 0) {
currnode->sent_bytes += len; currnode->sent_bytes += len;
} else { } else {
@ -235,6 +232,9 @@ namespace a8
worknode = worknode->next; worknode = worknode->next;
free(currnode->buff); free(currnode->buff);
free(currnode); free(currnode);
#ifdef MMCHK
a8::PerfMonitor::Instance()->SubV("a8.TcpClient.count", 1);
#endif
} }
} }
@ -248,6 +248,9 @@ namespace a8
worknode = worknode->next; worknode = worknode->next;
free(currnode->buff); free(currnode->buff);
free(currnode); free(currnode);
#ifdef MMCHK
a8::PerfMonitor::Instance()->SubV("a8.TcpClient.count", 1);
#endif
} }
} }

View File

@ -24,10 +24,16 @@ namespace a8
{ {
INIT_LIST_HEAD(&session_entry); INIT_LIST_HEAD(&session_entry);
max_packet_len_ = DEFAULT_MAX_PACKET_LEN; max_packet_len_ = DEFAULT_MAX_PACKET_LEN;
#ifdef MMCHK
a8::PerfMonitor::Instance()->AddV("a8.TcpSession.count", 1);
#endif
} }
TcpSession::~TcpSession() TcpSession::~TcpSession()
{ {
#ifdef MMCHK
a8::PerfMonitor::Instance()->SubV("a8.TcpSession.count", 1);
#endif
} }
void TcpSession::SetMaxPacketLen(int max_packet_len) void TcpSession::SetMaxPacketLen(int max_packet_len)
@ -68,9 +74,6 @@ namespace a8
return; return;
} }
if (bufflen > 0) { if (bufflen > 0) {
#ifdef A8_PERFT
PerfMonitor::Instance()->server_send_bytes += bufflen;
#endif
a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode));
memset(p, 0, sizeof(a8::SendQueueNode)); memset(p, 0, sizeof(a8::SendQueueNode));
p->buff = (char*)malloc(bufflen); p->buff = (char*)malloc(bufflen);
@ -89,6 +92,9 @@ namespace a8
} }
send_buffer_mutex_.unlock(); send_buffer_mutex_.unlock();
++master->send_node_num; ++master->send_node_num;
#ifdef MMCHK
a8::PerfMonitor::Instance()->AddV("a8.TcpSession.sendBuf", 1);
#endif
} }
} }
@ -269,6 +275,9 @@ namespace a8
} }
free(pdelnode); free(pdelnode);
--master->send_node_num; --master->send_node_num;
#ifdef MMCHK
a8::PerfMonitor::Instance()->SubV("a8.TcpSession.sendBuf", 1);
#endif
} }
} }
@ -282,6 +291,9 @@ namespace a8
} }
free(pdelnode); free(pdelnode);
--master->send_node_num; --master->send_node_num;
#ifdef MMCHK
a8::PerfMonitor::Instance()->SubV("a8.TcpSession.sendBuf", 1);
#endif
} }
} }
@ -302,13 +314,13 @@ namespace a8
} }
} }
work_node_->sent_bytes += sentbytes; work_node_->sent_bytes += sentbytes;
#ifdef A8_PERFT
PerfMonitor::Instance()->server_consume_bytes += sentbytes;
#endif
if (work_node_->sent_bytes >= work_node_->bufflen) { if (work_node_->sent_bytes >= work_node_->bufflen) {
a8::SendQueueNode *pdelnode = work_node_; a8::SendQueueNode *pdelnode = work_node_;
work_node_ = work_node_->next; work_node_ = work_node_->next;
--master->send_node_num; --master->send_node_num;
#ifdef MMCHK
a8::PerfMonitor::Instance()->SubV("a8.TcpSession.sendBuf", 1);
#endif
++master->sent_node_num; ++master->sent_node_num;
master->sent_bytes_num += pdelnode->sent_bytes; master->sent_bytes_num += pdelnode->sent_bytes;
if (!work_node_) { if (!work_node_) {

View File

@ -51,7 +51,7 @@ namespace a8
ClearTimeOutSocket(); ClearTimeOutSocket();
a8::TcpSession* p = nullptr; a8::TcpSession* p = nullptr;
if (top_node_) { if (top_node_) {
if (time(nullptr) - top_node_->addtime >= 30) { if (time(nullptr) - top_node_->addtime >= 20) {
p = top_node_->session; p = top_node_->session;
a8::TcpSessionPool::TcpSessionNode* pdelnode = top_node_; a8::TcpSessionPool::TcpSessionNode* pdelnode = top_node_;
top_node_ = top_node_->next; top_node_ = top_node_->next;
@ -68,12 +68,13 @@ namespace a8
void TcpSessionPool::ClearTimeOutSocket() void TcpSessionPool::ClearTimeOutSocket()
{ {
while (top_node_) { while (top_node_) {
if(time(nullptr) - top_node_->addtime > 60 * 5){ if(time(nullptr) - top_node_->addtime > 30){
a8::TcpSessionPool::TcpSessionNode* pdelnode = top_node_; a8::TcpSessionPool::TcpSessionNode* pdelnode = top_node_;
top_node_ = top_node_->next; top_node_ = top_node_->next;
if (!top_node_) { if (!top_node_) {
bot_node_ = NULL; bot_node_ = NULL;
} }
pdelnode->session->Destory();
delete pdelnode->session; delete pdelnode->session;
delete pdelnode; delete pdelnode;
count_--; count_--;

View File

@ -66,10 +66,10 @@ namespace a8
socket->SendBuff(data.data(), data.size()); socket->SendBuff(data.data(), data.size());
}; };
tcp_client_->on_disconnect = tcp_client_->on_disconnect =
[this] (a8::AsioTcpClient* socket, int err) [this] (a8::AsioTcpClient* socket)
{ {
if (on_disconnect) { if (on_disconnect) {
on_disconnect(this, err); on_disconnect(this);
} }
}; };
tcp_client_->on_socketread = tcp_client_->on_socketread =

View File

@ -19,7 +19,7 @@ namespace a8
std::function<void (a8::WebSocketClient*, int)> on_error; std::function<void (a8::WebSocketClient*, int)> on_error;
std::function<void (a8::WebSocketClient*)> on_connect; std::function<void (a8::WebSocketClient*)> on_connect;
std::function<void (a8::WebSocketClient*, int)> on_disconnect; std::function<void (a8::WebSocketClient*)> on_disconnect;
std::function<void (char*, int&, unsigned int)> on_decode_userpacket; std::function<void (char*, int&, unsigned int)> on_decode_userpacket;
void Open(); void Open();

View File

@ -45,7 +45,6 @@ struct xtimer_list {
TimerType_e timer_type; TimerType_e timer_type;
long long expires; long long expires;
int expire_time; int expire_time;
int deleting;
struct xtvec_base *base; struct xtvec_base *base;
a8::TimerCb cb; a8::TimerCb cb;
@ -221,10 +220,6 @@ namespace a8
if (!timer) { if (!timer) {
abort(); abort();
} }
if (timer->deleting) {
abort();
}
timer->deleting = 1;
if (base_->running_timer == timer) { if (base_->running_timer == timer) {
base_->running_timer = nullptr; base_->running_timer = nullptr;
} }
@ -232,16 +227,6 @@ namespace a8
if (!list_empty(&timer->attach_entry)) { if (!list_empty(&timer->attach_entry)) {
list_del_init(&timer->attach_entry); list_del_init(&timer->attach_entry);
} }
#if 1
while (!list_empty(&timer->destory_handle_list)) {
XTimerDestoryHandleNode* node = list_first_entry(&timer->destory_handle_list,
XTimerDestoryHandleNode,
entry);
list_del_init(&node->entry);
node->cb(timer);
delete node;
}
#endif
if (is_destory) { if (is_destory) {
if (timer->cb) { if (timer->cb) {
timer->cb(TIMER_DESTORY_EVENT, nullptr); timer->cb(TIMER_DESTORY_EVENT, nullptr);
@ -252,7 +237,6 @@ namespace a8
} }
} }
timer->cb = nullptr; timer->cb = nullptr;
#if 0
while (!list_empty(&timer->destory_handle_list)) { while (!list_empty(&timer->destory_handle_list)) {
XTimerDestoryHandleNode* node = list_first_entry(&timer->destory_handle_list, XTimerDestoryHandleNode* node = list_first_entry(&timer->destory_handle_list,
XTimerDestoryHandleNode, XTimerDestoryHandleNode,
@ -261,7 +245,6 @@ namespace a8
node->cb(timer); node->cb(timer);
delete node; delete node;
} }
#endif
if (to_free_list) { if (to_free_list) {
AddToFreeList(timer); AddToFreeList(timer);
} else { } else {
@ -290,36 +273,22 @@ namespace a8
void ClearAttacher(Attacher* attacher) void ClearAttacher(Attacher* attacher)
{ {
#if 1
while (!list_empty(&attacher->timer_list_)) {
xtimer_list* tmp_timer = list_first_entry(&attacher->timer_list_, struct xtimer_list, attach_entry);
InternalDelete(tmp_timer, false, true);
}
#else
struct list_head* pos = nullptr; struct list_head* pos = nullptr;
struct list_head* n = nullptr; struct list_head* n = nullptr;
list_for_each_safe(pos, n, &attacher->timer_list_) { list_for_each_safe(pos, n, &attacher->timer_list_) {
xtimer_list* tmp_timer = list_entry(pos, struct xtimer_list, attach_entry); xtimer_list* tmp_timer = list_entry(pos, struct xtimer_list, attach_entry);
InternalDelete(tmp_timer, false, true); InternalDelete(tmp_timer, false, true);
} }
#endif
} }
void DestoryAttacher(Attacher* attacher) void DestoryAttacher(Attacher* attacher)
{ {
#if 1
while (!list_empty(&attacher->timer_list_)) {
xtimer_list* tmp_timer = list_first_entry(&attacher->timer_list_, struct xtimer_list, attach_entry);
InternalDelete(tmp_timer, true, true);
}
#else
struct list_head* pos = nullptr; struct list_head* pos = nullptr;
struct list_head* n = nullptr; struct list_head* n = nullptr;
list_for_each_safe(pos, n, &attacher->timer_list_) { list_for_each_safe(pos, n, &attacher->timer_list_) {
xtimer_list* tmp_timer = list_entry(pos, struct xtimer_list, attach_entry); xtimer_list* tmp_timer = list_entry(pos, struct xtimer_list, attach_entry);
InternalDelete(tmp_timer, true, true); InternalDelete(tmp_timer, true, true);
} }
#endif
} }
void UpdateTimer() void UpdateTimer()
@ -421,14 +390,11 @@ namespace a8
void ClearTimer() void ClearTimer()
{ {
auto free_timers = auto free_timers =
[this] (list_head* head, bool force_no_deleteing = false) [this] (list_head* head)
{ {
while (!list_empty(head)) { while (!list_empty(head)) {
struct xtimer_list *timer; struct xtimer_list *timer;
timer = list_first_entry(head, struct xtimer_list,entry); timer = list_first_entry(head, struct xtimer_list,entry);
if (force_no_deleteing) {
timer->deleting = 0;
}
InternalDelete(timer, true, false); InternalDelete(timer, true, false);
} }
}; };
@ -441,7 +407,7 @@ namespace a8
for (int j = 0; j < TVR_SIZE; j++) { for (int j = 0; j < TVR_SIZE; j++) {
free_timers(base_->tv1.vec + j); free_timers(base_->tv1.vec + j);
} }
free_timers(&base_->free_timer, true); free_timers(&base_->free_timer);
} }
void AddTimerDestoryHandle(std::weak_ptr<XTimerPtr>& timer_ptr, void AddTimerDestoryHandle(std::weak_ptr<XTimerPtr>& timer_ptr,
@ -465,7 +431,6 @@ namespace a8
timer->expires = expires; timer->expires = expires;
timer->expire_time = expire_time; timer->expire_time = expire_time;
timer->base = base_; timer->base = base_;
timer->deleting = 0;
timer->cb = std::move(cb); timer->cb = std::move(cb);
} }