diff --git a/third_party/f8/f8/app.cc b/third_party/f8/f8/app.cc index e98e6c8..3bf30e0 100644 --- a/third_party/f8/f8/app.cc +++ b/third_party/f8/f8/app.cc @@ -18,6 +18,7 @@ #include #include #include +#include static const int MAX_ZONE_ID = 100; static const int MAX_NODE_ID = 8; @@ -77,6 +78,7 @@ namespace f8 uuid_->SetMachineId((node_id_ - 1) * MAX_NODE_ID + instance_id_); InitLog(); + f8::IoMgr::Instance()->Init(); f8::MsgQueue::Instance()->Init(); f8::Timer::Instance()->Init(); f8::TGLog::Instance()->Init(user_app_->GetPkgName(), false, 0); @@ -92,6 +94,7 @@ namespace f8 f8::HttpClientPool::Instance()->UnInit(); f8::Timer::Instance()->UnInit(); f8::MsgQueue::Instance()->UnInit(); + f8::IoMgr::Instance()->UnInit(); UnInitLog(); delete loop_cond_; @@ -115,6 +118,7 @@ namespace f8 f8::MsgQueue::Instance()->Update(); DispatchNetMsg(); user_app->Update(delta_time); + f8::IoMgr::Instance()->Update(); a8::tick_t end_tick = a8::XGetTickCount(); if (end_tick - begin_tick > max_run_delay_time_) { max_run_delay_time_ = end_tick - begin_tick; diff --git a/third_party/f8/f8/iomgr.cc b/third_party/f8/f8/iomgr.cc new file mode 100644 index 0000000..a28ce69 --- /dev/null +++ b/third_party/f8/f8/iomgr.cc @@ -0,0 +1,63 @@ +#include + +#include + +static const int IC_Max = 1; + +namespace f8 +{ + + IoMgr::IoMgr() + { + + } + + void IoMgr::Init() + { + for (int i = 0; i < IC_Max; ++i) { + io_contexts_.push_back(std::vector>()); + io_works_.push_back(std::vector>()); + auto& contexts = io_contexts_.at(i); + auto& works = io_works_.at(i); + auto c = std::make_shared(); + auto w = std::make_shared(*c); + works.push_back(w); + contexts.push_back(c); + //new std::thread(&IoMgr::WorkerThreadProc, this, contexts.at(0)); + } + } + + void IoMgr::UnInit() + { + + } + + void IoMgr::Update() + { + for (auto type_io_contexts : io_contexts_) { + for (auto io_context : type_io_contexts) { + io_context->poll(); + } + } + } + + std::shared_ptr IoMgr::GetIoContext(int type) + { + if (type >= 0 && type < (int)io_contexts_.size()) { + auto& contexts = io_contexts_.at(type); + if (!contexts.empty()) { + return contexts.at(rand() % contexts.size()); + } + } + return nullptr; + } + + void IoMgr::WorkerThreadProc(std::shared_ptr io_context) + { + while (true) { + io_context->run(); + io_context->reset(); + } + } + +} diff --git a/third_party/f8/f8/iomgr.h b/third_party/f8/f8/iomgr.h new file mode 100644 index 0000000..d5603b7 --- /dev/null +++ b/third_party/f8/f8/iomgr.h @@ -0,0 +1,26 @@ +#pragma once + +#include + +namespace f8 +{ + + class IoMgr + { + A8_DECLARE_SINGLETON(IoMgr); + public: + + void Init(); + void UnInit(); + void Update(); + + private: + std::shared_ptr GetIoContext(int type); + void WorkerThreadProc(std::shared_ptr io_context); + + private: + std::vector>> io_works_; + std::vector>> io_contexts_; + }; + +} diff --git a/third_party/f8/f8/libf8.a b/third_party/f8/f8/libf8.a new file mode 100644 index 0000000..32b00d7 Binary files /dev/null and b/third_party/f8/f8/libf8.a differ diff --git a/third_party/f8/f8/tcpclient.cc b/third_party/f8/f8/tcpclient.cc new file mode 100644 index 0000000..f939109 --- /dev/null +++ b/third_party/f8/f8/tcpclient.cc @@ -0,0 +1,198 @@ +#include + +#include +#include + +#include + +const int MAX_RECV_BUFFERSIZE = 1024 * 64; + +namespace f8 +{ + + TcpClient::TcpClient(std::shared_ptr io_context, const std::string& remote_ip, int remote_port) + { + io_context_ = io_context; + remote_address_ = remote_ip; + remote_port_ = remote_port; + endpoint_ = std::make_shared + ( + asio::ip::address::from_string(remote_address_), + remote_port_ + ); + send_buffer_mutex_ = std::make_shared(); + socket_ = std::make_shared(*io_context); + } + + TcpClient::~TcpClient() + { + Close(); + } + + void TcpClient::Open() + { + if (!IsActive()) { + SetActive(true); + } + } + + void TcpClient::Close() + { + if (IsActive()) { + SetActive(false); + } + } + + bool TcpClient::IsActive() + { + return actived_; + } + + bool TcpClient::Connected() + { + return connected_; + } + + void TcpClient::SendBuff(const char* buff, unsigned int bufflen) + { + //a8::XPrintf("SendBuff bufflen:%d\n", {bufflen}); + if (bufflen > 0) { + a8::SendQueueNode* p = (a8::SendQueueNode*)malloc(sizeof(a8::SendQueueNode)); + memset(p, 0, sizeof(a8::SendQueueNode)); + p->buff = (char*)malloc(bufflen); + memmove(p->buff, buff, bufflen); + p->bufflen = bufflen; + send_buffer_mutex_->lock(); + if (bot_node_) { + bot_node_->next = p; + bot_node_ = p; + }else{ + top_node_ = p; + bot_node_ = p; + } + send_buffer_mutex_->unlock(); + DoSend(); + } + } + + void TcpClient::SetActive(bool active) + { + if (active) { + if (!IsActive()) { + ActiveStart(); + } + } else { + if (IsActive()) { + ActiveStop(); + } + } + } + + void TcpClient::ActiveStart() + { + actived_ = true; + connected_ = false; + socket_->async_connect + (*endpoint_, + [this] (const asio::error_code& ec) + { + HandleConnect(ec); + }); + } + + void TcpClient::ActiveStop() + { + actived_ = true; + connected_ = false; + } + + void TcpClient::HandleConnect(const asio::error_code& err) + { + if (err) { + actived_ = false; + connected_ = false; + if (on_error) { + on_error(this, err.value()); + } + return; + } else { + connected_ = true; + if (on_connect) { + on_connect(this); + } + DoRead(); + } + } + + void TcpClient::DoRead() + { + socket_->async_read_some + (asio::buffer(buffer_), + [this](std::error_code ec, std::size_t bytes_transferred) + { + if (!ec) { + if (on_socketread) { + on_socketread(this, buffer_.data(), bytes_transferred); + } + DoRead(); + } else { + a8::XPrintf("DoRead error %s\n", {ec.message()}); + actived_ = false; + connected_ = false; + if (on_disconnect) { + on_disconnect(this, ec.value()); + } + } + }); + } + + void TcpClient::DoSend() + { + if (!work_node_) { + send_buffer_mutex_->lock(); + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + send_buffer_mutex_->unlock(); + } + if (work_node_ && !sending_) { + sending_ = true; + char* buf = work_node_->buff + work_node_->sent_bytes; + int buf_len = work_node_->bufflen - work_node_->sent_bytes; + asio::async_write + (*socket_, + asio::buffer(buf, buf_len), + [this] (const asio::error_code& ec, std::size_t bytes_transferred) + { + if (!ec) { + send_buffer_mutex_->lock(); + if (work_node_) { + work_node_->sent_bytes += bytes_transferred; + if (work_node_->sent_bytes >= work_node_->bufflen) { + auto pdelnode = work_node_; + work_node_ = work_node_->next; + free(pdelnode->buff); + free((void*)pdelnode); + } + if (!work_node_) { + sending_ = false; + } + send_buffer_mutex_->unlock(); + DoSend(); + return; + } + send_buffer_mutex_->unlock(); + } else { + a8::XPrintf("DoSend error %s\n", {ec.message()}); + actived_ = false; + connected_ = false; + if (on_disconnect) { + on_disconnect(this, ec.value()); + } + } + }); + + } + } + +} diff --git a/third_party/f8/f8/tcpclient.h b/third_party/f8/f8/tcpclient.h new file mode 100644 index 0000000..c7bcf29 --- /dev/null +++ b/third_party/f8/f8/tcpclient.h @@ -0,0 +1,56 @@ +#pragma once + +#include + +using asio::ip::tcp; + +namespace f8 +{ + + class TcpClient + { + public: + std::function on_error; + std::function on_connect; + std::function on_disconnect; + std::function on_socketread; + TcpClient(std::shared_ptr io_context, + const std::string& remote_ip, + int remote_port); + virtual ~TcpClient(); + const std::string& GetRemoteAddress() { return remote_address_; } + int GetRemotePort() { return remote_port_; } + + void Open(); + void Close(); + bool IsActive(); + bool Connected(); + void SendBuff(const char* buff, unsigned int bufflen); + + private: + void HandleConnect(const asio::error_code& err); + void DoRead(); + void DoSend(); + + private: + std::shared_ptr io_context_; + std::string remote_address_; + int remote_port_ = 0; + + std::shared_ptr endpoint_; + std::shared_ptr socket_; + volatile bool actived_ = false; + volatile bool connected_ = false; + std::shared_ptr send_buffer_mutex_; + a8::SendQueueNode *top_node_ = nullptr; + a8::SendQueueNode *bot_node_ = nullptr; + volatile a8::SendQueueNode *work_node_ = nullptr; + volatile bool sending_ = false; + std::array buffer_; + + void SetActive(bool active); + void ActiveStart(); + void ActiveStop(); + }; + +}