diff --git a/server/mymangosd/app.cc b/server/mymangosd/app.cc index aab4a98..acdd1b8 100644 --- a/server/mymangosd/app.cc +++ b/server/mymangosd/app.cc @@ -42,7 +42,6 @@ void App::DispatchSocketMsg(f8::MsgHdr* hdr) } -bool App::PreprocessSocketMsg(f8::MsgHdr* hdr) +void App::PreprocessSocketMsg(f8::MsgHdr* hdr) { - return false; } diff --git a/server/mymangosd/app.h b/server/mymangosd/app.h index 0672b68..b387132 100644 --- a/server/mymangosd/app.h +++ b/server/mymangosd/app.h @@ -14,6 +14,6 @@ public: virtual void Update(int delta_time) override; virtual bool HasTask() override; virtual void DispatchSocketMsg(f8::MsgHdr* hdr) override; - virtual bool PreprocessSocketMsg(f8::MsgHdr* hdr) override; + virtual void PreprocessSocketMsg(f8::MsgHdr* hdr) override; }; diff --git a/third_party/a8/a8/queue.h b/third_party/a8/a8/queue.h index f0b16b4..bba8ffb 100644 --- a/third_party/a8/a8/queue.h +++ b/third_party/a8/a8/queue.h @@ -14,6 +14,9 @@ namespace a8 void Push(list_head* node); void Fetch(); + Queue(const Queue&) = delete; + Queue& operator=(const Queue&) = delete; + private: list_head msg_list_; list_head work_list_; diff --git a/third_party/f8/f8/app.cc b/third_party/f8/f8/app.cc index 4d7ee27..25f1abe 100644 --- a/third_party/f8/f8/app.cc +++ b/third_party/f8/f8/app.cc @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -19,6 +20,7 @@ #include #include #include +#include static const int MAX_ZONE_ID = 100; static const int MAX_NODE_ID = 8; @@ -35,7 +37,7 @@ static const char* const PROJ_LOG_FILENAME_FMT = "log_$pid_%Y%m%d.log"; namespace f8 { - class AppImpl + class App::Impl { private: UserApp* user_app_ = nullptr; @@ -51,7 +53,7 @@ namespace f8 int instance_id_ = 0; std::set flags_; - a8::Queue net_data_queue_; + f8::ThreadPool preprocess_thread_pool_; a8::Queue net_msg_queue_; std::atomic msgnode_size_ = {0}; std::atomic working_msgnode_size_ = {0}; @@ -85,6 +87,22 @@ namespace f8 f8::TGLog::Instance()->Init(user_app_->GetPkgName(), false, 0); f8::HttpClientPool::Instance()->Init(MAX_ALL_HTTP_NUM, MAX_SYS_HTTP_NUM, MAX_USER_HTTP_NUM); user_app_->Init(); + preprocess_thread_pool_.Init + (std::max(1, user_app_->GetPreprocessThreadNum()), + [this] (f8::ThreadPool::Context* ctx) + { + while (!ctx->Terminated()) { + list_head* work_list = ctx->GetWorkList(); + while (!list_empty(work_list)){ + MsgHdr* hdr = list_first_entry(work_list, MsgHdr, entry); + list_del_init(&hdr->entry); + user_app_->PreprocessSocketMsg(hdr); + net_msg_queue_.Push(&hdr->entry); + NotifyLoopCond(); + } + ctx->Wait(1000); + } + }); return true; } @@ -223,8 +241,7 @@ namespace f8 memmove(p + sizeof(MsgHdr), msgbody, bodylen); } ++msgnode_size_; - net_data_queue_.Push(&hdr->entry); - NotifyLoopCond(); + preprocess_thread_pool_.PostMsg(socket_handle, &hdr->entry); } bool ParseOpt() @@ -342,14 +359,14 @@ namespace f8 }; - App::App():impl_(std::make_shared()) + App::App():impl_(std::make_unique()) { } int App::Run(int argc, char* argv[], UserApp* user_app) { - impl_ = std::make_unique(); + impl_ = std::make_unique(); return impl_->Run(argc, argv, user_app); } diff --git a/third_party/f8/f8/app.h b/third_party/f8/f8/app.h index 88a1459..5de88d6 100644 --- a/third_party/f8/f8/app.h +++ b/third_party/f8/f8/app.h @@ -40,7 +40,8 @@ namespace f8 void ResetMaxRunDelayTime(); private: - std::shared_ptr impl_; + class Impl; + std::unique_ptr impl_; }; } diff --git a/third_party/f8/f8/libf8.a b/third_party/f8/f8/libf8.a deleted file mode 100644 index 34f9d7c..0000000 Binary files a/third_party/f8/f8/libf8.a and /dev/null differ diff --git a/third_party/f8/f8/protoutils.cc b/third_party/f8/f8/protoutils.cc index ac11510..9ee2563 100644 --- a/third_party/f8/f8/protoutils.cc +++ b/third_party/f8/f8/protoutils.cc @@ -327,18 +327,18 @@ namespace f8 { MsgHdr* hdr = (MsgHdr*)malloc(sizeof(MsgHdr) + buflen); memmove((void*)hdr, (void*)this, sizeof(MsgHdr) + buflen); - if (user_data) { - hdr->user_data = new(std::any); - *hdr->user_data = *user_data; + if (processed_data) { + hdr->processed_data = new(std::any); + *hdr->processed_data = *processed_data; } return hdr; } void MsgHdr::Destroy(MsgHdr* hdr) { - if (hdr->user_data) { - delete hdr->user_data; - hdr->user_data = nullptr; + if (hdr->processed_data) { + delete hdr->processed_data; + hdr->processed_data = nullptr; } free((void*)hdr); } diff --git a/third_party/f8/f8/protoutils.h b/third_party/f8/f8/protoutils.h index fc7e511..85fe2d3 100644 --- a/third_party/f8/f8/protoutils.h +++ b/third_party/f8/f8/protoutils.h @@ -7,10 +7,14 @@ namespace a8 class AsioTcpClient; } +namespace f8 +{ + class App; +} + namespace f8 { - class AppImpl; struct MsgHdr { public: @@ -32,8 +36,8 @@ namespace f8 int buflen; int payload_buflen; - std::any* user_data; - friend class f8::AppImpl; + std::any* processed_data; + friend class f8::App; }; //普通消息头部 diff --git a/third_party/f8/f8/threadpool.cc b/third_party/f8/f8/threadpool.cc new file mode 100644 index 0000000..5393351 --- /dev/null +++ b/third_party/f8/f8/threadpool.cc @@ -0,0 +1,32 @@ +#include + +#include + +namespace f8 +{ + + class ThreadPool::WorkerThread + { + + }; + + void ThreadPool::Init(int thread_num, std::function cb) + { + + } + + void ThreadPool::UnInit() + { + + } + + void ThreadPool::PostMsg(int key, list_head* node) + { + + } + + void ThreadPool::Start() + { + } + +} diff --git a/third_party/f8/f8/threadpool.h b/third_party/f8/f8/threadpool.h new file mode 100644 index 0000000..557d8a2 --- /dev/null +++ b/third_party/f8/f8/threadpool.h @@ -0,0 +1,27 @@ +#pragma once + +namespace f8 +{ + + class ThreadPool + { + public: + class Context + { + public: + virtual bool* Terminated() = 0; + virtual list_head* GetWorkList() = 0; + virtual void Wait(int ms) = 0; + }; + + void Init(int thread_num, std::function cb); + void UnInit(); + void Start(); + void PostMsg(int key, list_head* node); + + private: + class WorkerThread; + std::vector> threads_; + }; + +} diff --git a/third_party/f8/f8/userapp.h b/third_party/f8/f8/userapp.h index 96cb04a..43190f4 100644 --- a/third_party/f8/f8/userapp.h +++ b/third_party/f8/f8/userapp.h @@ -14,7 +14,7 @@ namespace f8 virtual void Update(int delta_time) = 0; virtual bool HasTask() = 0; virtual void DispatchSocketMsg(f8::MsgHdr* hdr) = 0; - virtual bool PreprocessSocketMsg(f8::MsgHdr* hdr) = 0; + virtual void PreprocessSocketMsg(f8::MsgHdr* hdr) = 0; }; }