This commit is contained in:
aozhiwei 2024-12-24 18:41:50 +08:00
parent 8d7062488f
commit fb6dd34caa
11 changed files with 103 additions and 20 deletions

View File

@ -42,7 +42,6 @@ void App::DispatchSocketMsg(f8::MsgHdr* hdr)
}
bool App::PreprocessSocketMsg(f8::MsgHdr* hdr)
void App::PreprocessSocketMsg(f8::MsgHdr* hdr)
{
return false;
}

View File

@ -14,6 +14,6 @@ public:
virtual void Update(int delta_time) override;
virtual bool HasTask() override;
virtual void DispatchSocketMsg(f8::MsgHdr* hdr) override;
virtual bool PreprocessSocketMsg(f8::MsgHdr* hdr) override;
virtual void PreprocessSocketMsg(f8::MsgHdr* hdr) override;
};

View File

@ -14,6 +14,9 @@ namespace a8
void Push(list_head* node);
void Fetch();
Queue(const Queue&) = delete;
Queue& operator=(const Queue&) = delete;
private:
list_head msg_list_;
list_head work_list_;

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <a8/uuid.h>
@ -19,6 +20,7 @@
#include <f8/timer.h>
#include <f8/userapp.h>
#include <f8/iomgr.h>
#include <f8/threadpool.h>
static const int MAX_ZONE_ID = 100;
static const int MAX_NODE_ID = 8;
@ -35,7 +37,7 @@ static const char* const PROJ_LOG_FILENAME_FMT = "log_$pid_%Y%m%d.log";
namespace f8
{
class AppImpl
class App::Impl
{
private:
UserApp* user_app_ = nullptr;
@ -51,7 +53,7 @@ namespace f8
int instance_id_ = 0;
std::set<int> flags_;
a8::Queue net_data_queue_;
f8::ThreadPool preprocess_thread_pool_;
a8::Queue net_msg_queue_;
std::atomic<long long> msgnode_size_ = {0};
std::atomic<long long> working_msgnode_size_ = {0};
@ -85,6 +87,22 @@ namespace f8
f8::TGLog::Instance()->Init(user_app_->GetPkgName(), false, 0);
f8::HttpClientPool::Instance()->Init(MAX_ALL_HTTP_NUM, MAX_SYS_HTTP_NUM, MAX_USER_HTTP_NUM);
user_app_->Init();
preprocess_thread_pool_.Init
(std::max(1, user_app_->GetPreprocessThreadNum()),
[this] (f8::ThreadPool::Context* ctx)
{
while (!ctx->Terminated()) {
list_head* work_list = ctx->GetWorkList();
while (!list_empty(work_list)){
MsgHdr* hdr = list_first_entry(work_list, MsgHdr, entry);
list_del_init(&hdr->entry);
user_app_->PreprocessSocketMsg(hdr);
net_msg_queue_.Push(&hdr->entry);
NotifyLoopCond();
}
ctx->Wait(1000);
}
});
return true;
}
@ -223,8 +241,7 @@ namespace f8
memmove(p + sizeof(MsgHdr), msgbody, bodylen);
}
++msgnode_size_;
net_data_queue_.Push(&hdr->entry);
NotifyLoopCond();
preprocess_thread_pool_.PostMsg(socket_handle, &hdr->entry);
}
bool ParseOpt()
@ -342,14 +359,14 @@ namespace f8
};
App::App():impl_(std::make_shared<AppImpl>())
App::App():impl_(std::make_unique<Impl>())
{
}
int App::Run(int argc, char* argv[], UserApp* user_app)
{
impl_ = std::make_unique<AppImpl>();
impl_ = std::make_unique<Impl>();
return impl_->Run(argc, argv, user_app);
}

View File

@ -40,7 +40,8 @@ namespace f8
void ResetMaxRunDelayTime();
private:
std::shared_ptr<class AppImpl> impl_;
class Impl;
std::unique_ptr<Impl> impl_;
};
}

Binary file not shown.

View File

@ -327,18 +327,18 @@ namespace f8
{
MsgHdr* hdr = (MsgHdr*)malloc(sizeof(MsgHdr) + buflen);
memmove((void*)hdr, (void*)this, sizeof(MsgHdr) + buflen);
if (user_data) {
hdr->user_data = new(std::any);
*hdr->user_data = *user_data;
if (processed_data) {
hdr->processed_data = new(std::any);
*hdr->processed_data = *processed_data;
}
return hdr;
}
void MsgHdr::Destroy(MsgHdr* hdr)
{
if (hdr->user_data) {
delete hdr->user_data;
hdr->user_data = nullptr;
if (hdr->processed_data) {
delete hdr->processed_data;
hdr->processed_data = nullptr;
}
free((void*)hdr);
}

View File

@ -7,10 +7,14 @@ namespace a8
class AsioTcpClient;
}
namespace f8
{
class App;
}
namespace f8
{
class AppImpl;
struct MsgHdr
{
public:
@ -32,8 +36,8 @@ namespace f8
int buflen;
int payload_buflen;
std::any* user_data;
friend class f8::AppImpl;
std::any* processed_data;
friend class f8::App;
};
//普通消息头部

32
third_party/f8/f8/threadpool.cc vendored Normal file
View File

@ -0,0 +1,32 @@
#include <f8/internal/pch.h>
#include <f8/threadpool.h>
namespace f8
{
class ThreadPool::WorkerThread
{
};
void ThreadPool::Init(int thread_num, std::function<void(Context*)> cb)
{
}
void ThreadPool::UnInit()
{
}
void ThreadPool::PostMsg(int key, list_head* node)
{
}
void ThreadPool::Start()
{
}
}

27
third_party/f8/f8/threadpool.h vendored Normal file
View File

@ -0,0 +1,27 @@
#pragma once
namespace f8
{
class ThreadPool
{
public:
class Context
{
public:
virtual bool* Terminated() = 0;
virtual list_head* GetWorkList() = 0;
virtual void Wait(int ms) = 0;
};
void Init(int thread_num, std::function<void(Context*)> cb);
void UnInit();
void Start();
void PostMsg(int key, list_head* node);
private:
class WorkerThread;
std::vector<std::shared_ptr<WorkerThread>> threads_;
};
}

View File

@ -14,7 +14,7 @@ namespace f8
virtual void Update(int delta_time) = 0;
virtual bool HasTask() = 0;
virtual void DispatchSocketMsg(f8::MsgHdr* hdr) = 0;
virtual bool PreprocessSocketMsg(f8::MsgHdr* hdr) = 0;
virtual void PreprocessSocketMsg(f8::MsgHdr* hdr) = 0;
};
}