From 7330fde606adbaf606ae19ea27637affe51d6cf0 Mon Sep 17 00:00:00 2001 From: azw Date: Wed, 27 Sep 2023 16:36:53 +0000 Subject: [PATCH] 1 --- f8/app.cc | 60 ++++++++++++++++++++++++++++++++++++++++++++++++- f8/app.h | 17 ++++++++++++++ f8/protoutils.h | 2 ++ 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/f8/app.cc b/f8/app.cc index d952c92..4b27ba5 100644 --- a/f8/app.cc +++ b/f8/app.cc @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -12,6 +13,7 @@ #include #include #include +#include static const int MAX_ZONE_ID = 100; static const int MAX_NODE_ID = 8; @@ -68,6 +70,7 @@ namespace f8 while (!Terminated()) { f8::Timer::Instance()->Update(); f8::MsgQueue::Instance()->Update(); + DispatchNetMsg(); user_app->Update(); Schedule(); } @@ -192,7 +195,17 @@ namespace f8 void App::Schedule() { std::unique_lock lk(*loop_mutex_); - if (!user_app_->HasTask()) { + bool has_task = false; + { + queue_.Fetch(); + if (list_empty(queue_.GetWorkList())) { + has_task = true; + } + } + if (!has_task) { + has_task = user_app_->HasTask(); + } + if (!has_task) { #if 1 int sleep_time = 1; loop_cond_->wait_for(lk, std::chrono::milliseconds(sleep_time)); @@ -206,4 +219,49 @@ namespace f8 } } + void App::DispatchNetMsg() + { + queue_.Fetch(); + list_head* work_list = queue_.GetWorkList(); + while (!list_empty(work_list)){ + MsgHdr* hdr = list_first_entry(work_list, MsgHdr, entry); + list_del_init(&hdr->entry); + user_app_->DispatchSocketMsg(hdr); + } + } + + void App::AddSocketMsg(int sockfrom, + int sockhandle, + long ip_saddr, + unsigned short msgid, + unsigned int seqid, + const char *msgbody, + int bodylen) + { + char *p = (char*)malloc(sizeof(MsgHdr) + bodylen); + MsgHdr* hdr = (MsgHdr*)p; + hdr->sockfrom = sockfrom; + hdr->seqid = seqid; + hdr->msgid = msgid; + hdr->socket_handle = sockhandle; + hdr->ip_saddr = ip_saddr; + hdr->buf = p + sizeof(MsgHdr); + hdr->buflen = bodylen; + hdr->offset = 0; + hdr->hum = nullptr; + hdr->user_data = nullptr; + if (bodylen > 0) { + memmove((void*)hdr->buf, msgbody, bodylen); + } + ++msgnode_size_; + queue_.Push(&hdr->entry); + NotifyLoopCond(); + } + + void App::FreeSocketMsg(MsgHdr* hdr) + { + --msgnode_size_; + free(hdr); + } + } diff --git a/f8/app.h b/f8/app.h index 6d9e2ae..97d9643 100644 --- a/f8/app.h +++ b/f8/app.h @@ -1,6 +1,9 @@ #pragma once +#include + #include +#include namespace a8 { @@ -21,6 +24,7 @@ namespace f8 virtual void UnInit() = 0; virtual void Update() = 0; virtual bool HasTask() = 0; + virtual void DispatchSocketMsg(MsgHdr* hdr) = 0; }; class App : public a8::Singleton @@ -45,6 +49,14 @@ namespace f8 int GetPid(); auto Terminated() { return terminated_; }; void Terminate() { terminated_ = true; }; + void AddSocketMsg(int sockfrom, + int sockhandle, + long ip_saddr, + unsigned short msgid, + unsigned int seqid, + const char *msgbody, + int bodylen); + void FreeSocketMsg(MsgHdr* hdr); private: bool Init(); @@ -53,6 +65,7 @@ namespace f8 void InitLog(); void UnInitLog(); void Schedule(); + void DispatchNetMsg(); private: UserApp* user_app_ = nullptr; @@ -66,6 +79,10 @@ namespace f8 int instance_id_ = 0; std::set flags_; + a8::Queue queue_; + std::atomic msgnode_size_ = {0}; + std::atomic working_msgnode_size_ = {0}; + std::shared_ptr uuid_; std::mutex *loop_mutex_ = nullptr; std::condition_variable *loop_cond_ = nullptr; diff --git a/f8/protoutils.h b/f8/protoutils.h index 9707aa2..99ec287 100644 --- a/f8/protoutils.h +++ b/f8/protoutils.h @@ -13,6 +13,7 @@ namespace f8 struct MsgHdr { + int sockfrom; unsigned int seqid; unsigned short msgid; int socket_handle; @@ -22,6 +23,7 @@ namespace f8 int offset; Player *hum = nullptr; const void* user_data = nullptr; + list_head entry; MsgHdr* Clone(); static void Destroy(MsgHdr* hdr);