This commit is contained in:
aozhiwei 2022-12-19 13:15:05 +08:00
parent 6bc22929e5
commit 801ee99ec3
5 changed files with 15 additions and 35 deletions

View File

@ -246,7 +246,7 @@ namespace f8
int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params);
if (ret < 0) {
f8::MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
(IM_DbPool,
a8::Args
(
{
@ -268,7 +268,7 @@ namespace f8
last_query_->Next();
}
f8::MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
(IM_DbPool,
a8::Args
(
{
@ -285,7 +285,7 @@ namespace f8
bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params);
if (!ret) {
MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
(IM_DbPool,
a8::Args
(
{
@ -297,7 +297,7 @@ namespace f8
} else {
DataSet* data_set = new DataSet();
MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
(IM_DbPool,
a8::Args
(
{
@ -312,7 +312,7 @@ namespace f8
default:
{
MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
(IM_DbPool,
a8::Args
(
{
@ -326,9 +326,6 @@ namespace f8
}
}
public:
int exec_async_query_msgid = 0;
private:
volatile bool terminated_ = false;
std::mutex *loop_mutex_ = nullptr;
@ -352,7 +349,6 @@ namespace f8
void Init()
{
curr_seqid = 1000001;
exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId();
}
void UnInit()
@ -374,7 +370,6 @@ namespace f8
assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) {
DBThread *db_thread = new DBThread();
db_thread->exec_async_query_msgid = exec_async_query_msgid;
db_thread->Init();
db_thread_pool.push_back(db_thread);
}
@ -429,7 +424,7 @@ namespace f8
DBThread* db_thread = GetDBThread(hash_code);
if (!db_thread) {
MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
(IM_DbPool,
a8::Args
(
{
@ -450,14 +445,13 @@ namespace f8
conn_info.DeepCopy(node->conn_info);
db_thread->AddAsyncQuery(node);
}
int msgid = exec_async_query_msgid;
f8::Timer::Instance()->SetTimeoutEx
(1000 * 10,
[p, msgid] (int event, const a8::Args* args)
[p] (int event, const a8::Args* args)
{
if (event == a8::TIMER_EXEC_EVENT) {
MsgQueue::Instance()->PostMsg
(msgid,
(IM_DbPool,
a8::Args
(
{
@ -487,9 +481,6 @@ namespace f8
return db_thread_pool[hash_code % db_thread_pool.size()];
}
public:
unsigned short exec_async_query_msgid = 0;
private:
long long curr_seqid = 0;
std::map<long long, AsyncQueryRequest*> async_query_hash;
@ -502,7 +493,7 @@ namespace f8
impl_ = new DBPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack
(impl_->exec_async_query_msgid,
(IM_DbPool,
[] (const a8::Args& args)
{
#if 0

View File

@ -218,7 +218,7 @@ namespace f8
std::shared_ptr<a8::XObject> xobj = std::make_shared<a8::XObject>();
if (xobj->ReadFromJsonString(response)) {
f8::MsgQueue::Instance()->PostMsg
(exec_async_http_msgid,
(IM_HttpClientPool,
a8::Args
(
{
@ -229,7 +229,7 @@ namespace f8
));
} else {
f8::MsgQueue::Instance()->PostMsg
(exec_async_http_msgid,
(IM_HttpClientPool,
a8::Args
(
{
@ -242,7 +242,7 @@ namespace f8
}
} else {
f8::MsgQueue::Instance()->PostMsg
(exec_async_http_msgid,
(IM_HttpClientPool,
a8::Args
(
{
@ -254,7 +254,6 @@ namespace f8
}
public:
int exec_async_http_msgid = 0;
int thread_id = 0;
private:
@ -308,7 +307,6 @@ namespace f8
void Init()
{
curr_seqid = 1000001;
exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId();
}
void UnInit()
@ -326,7 +324,6 @@ namespace f8
assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) {
HttpThread *http_thread = new HttpThread();
http_thread->exec_async_http_msgid = exec_async_http_msgid;
http_thread->thread_id = i;
http_thread->Init();
http_thread_pool.push_back(http_thread);
@ -426,7 +423,6 @@ namespace f8
std::atomic<long long> pending_num = {0};
std::map<long long, AsyncHttpRequest*> async_http_hash;
unsigned short exec_async_http_msgid = 0;
std::vector<HttpThread*> http_thread_pool;
pthread_mutex_t* mutex_buf = nullptr;
size_t mutex_buf_size = 0;
@ -449,7 +445,7 @@ namespace f8
impl_ = new HttpClientPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack
(impl_->exec_async_http_msgid,
(IM_HttpClientPool,
[] (const a8::Args& args)
{
--(HttpClientPool::Instance()->impl_->pending_num);

View File

@ -33,7 +33,6 @@ namespace f8
class MsgQueueImp
{
public:
int curr_im_msgid = 10000;
std::map<int, list_head> msg_handlers;
std::mutex im_msg_mutex_;
@ -184,12 +183,6 @@ namespace f8
return imp_->RegisterCallBack(msgid, handle_func);
}
int MsgQueue::AllocIMMsgId()
{
int custom_im_msgid = ++imp_->curr_im_msgid;
return custom_im_msgid;
}
void MsgQueue::PostMsg(int msgid, const a8::Args args)
{
imp_->PostMsg(msgid, std::move(args));

View File

@ -19,7 +19,6 @@ namespace f8
bool HasMsg();
CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc cb);
void RemoveCallBack(CallBackHandle handle);
int AllocIMMsgId();
void PostMsg(int msgid, const a8::Args args);

View File

@ -31,7 +31,8 @@ namespace f8
enum SysInnerMesssage_e
{
IM_SysBegin = 1,
IM_SysMsgQueue = 2,
IM_HttpClientPool = 2,
IM_DbPool = 3,
IM_SysEnd = 99,
};