diff --git a/f8/dbpool.cc b/f8/dbpool.cc index ba59601..87d8cb2 100644 --- a/f8/dbpool.cc +++ b/f8/dbpool.cc @@ -246,7 +246,7 @@ namespace f8 int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params); if (ret < 0) { f8::MsgQueue::Instance()->PostMsg - (exec_async_query_msgid, + (IM_DbPool, a8::Args ( { @@ -268,7 +268,7 @@ namespace f8 last_query_->Next(); } f8::MsgQueue::Instance()->PostMsg - (exec_async_query_msgid, + (IM_DbPool, a8::Args ( { @@ -285,7 +285,7 @@ namespace f8 bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params); if (!ret) { MsgQueue::Instance()->PostMsg - (exec_async_query_msgid, + (IM_DbPool, a8::Args ( { @@ -297,7 +297,7 @@ namespace f8 } else { DataSet* data_set = new DataSet(); MsgQueue::Instance()->PostMsg - (exec_async_query_msgid, + (IM_DbPool, a8::Args ( { @@ -312,7 +312,7 @@ namespace f8 default: { MsgQueue::Instance()->PostMsg - (exec_async_query_msgid, + (IM_DbPool, a8::Args ( { @@ -326,9 +326,6 @@ namespace f8 } } - public: - int exec_async_query_msgid = 0; - private: volatile bool terminated_ = false; std::mutex *loop_mutex_ = nullptr; @@ -352,7 +349,6 @@ namespace f8 void Init() { curr_seqid = 1000001; - exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId(); } void UnInit() @@ -374,7 +370,6 @@ namespace f8 assert(thread_num > 0); for (int i = 0; i < thread_num; i++) { DBThread *db_thread = new DBThread(); - db_thread->exec_async_query_msgid = exec_async_query_msgid; db_thread->Init(); db_thread_pool.push_back(db_thread); } @@ -429,7 +424,7 @@ namespace f8 DBThread* db_thread = GetDBThread(hash_code); if (!db_thread) { MsgQueue::Instance()->PostMsg - (exec_async_query_msgid, + (IM_DbPool, a8::Args ( { @@ -450,14 +445,13 @@ namespace f8 conn_info.DeepCopy(node->conn_info); db_thread->AddAsyncQuery(node); } - int msgid = exec_async_query_msgid; f8::Timer::Instance()->SetTimeoutEx (1000 * 10, - [p, msgid] (int event, const a8::Args* args) + [p] (int event, const a8::Args* args) { if (event == a8::TIMER_EXEC_EVENT) { MsgQueue::Instance()->PostMsg - (msgid, + (IM_DbPool, a8::Args ( { @@ -487,9 +481,6 @@ namespace f8 return db_thread_pool[hash_code % db_thread_pool.size()]; } - public: - unsigned short exec_async_query_msgid = 0; - private: long long curr_seqid = 0; std::map async_query_hash; @@ -502,7 +493,7 @@ namespace f8 impl_ = new DBPoolImpl(); impl_->Init(); MsgQueue::Instance()->RegisterCallBack - (impl_->exec_async_query_msgid, + (IM_DbPool, [] (const a8::Args& args) { #if 0 diff --git a/f8/httpclientpool.cc b/f8/httpclientpool.cc index f178519..44f6e43 100644 --- a/f8/httpclientpool.cc +++ b/f8/httpclientpool.cc @@ -218,7 +218,7 @@ namespace f8 std::shared_ptr xobj = std::make_shared(); if (xobj->ReadFromJsonString(response)) { f8::MsgQueue::Instance()->PostMsg - (exec_async_http_msgid, + (IM_HttpClientPool, a8::Args ( { @@ -229,7 +229,7 @@ namespace f8 )); } else { f8::MsgQueue::Instance()->PostMsg - (exec_async_http_msgid, + (IM_HttpClientPool, a8::Args ( { @@ -242,7 +242,7 @@ namespace f8 } } else { f8::MsgQueue::Instance()->PostMsg - (exec_async_http_msgid, + (IM_HttpClientPool, a8::Args ( { @@ -254,7 +254,6 @@ namespace f8 } public: - int exec_async_http_msgid = 0; int thread_id = 0; private: @@ -308,7 +307,6 @@ namespace f8 void Init() { curr_seqid = 1000001; - exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId(); } void UnInit() @@ -326,7 +324,6 @@ namespace f8 assert(thread_num > 0); for (int i = 0; i < thread_num; i++) { HttpThread *http_thread = new HttpThread(); - http_thread->exec_async_http_msgid = exec_async_http_msgid; http_thread->thread_id = i; http_thread->Init(); http_thread_pool.push_back(http_thread); @@ -426,7 +423,6 @@ namespace f8 std::atomic pending_num = {0}; std::map async_http_hash; - unsigned short exec_async_http_msgid = 0; std::vector http_thread_pool; pthread_mutex_t* mutex_buf = nullptr; size_t mutex_buf_size = 0; @@ -449,7 +445,7 @@ namespace f8 impl_ = new HttpClientPoolImpl(); impl_->Init(); MsgQueue::Instance()->RegisterCallBack - (impl_->exec_async_http_msgid, + (IM_HttpClientPool, [] (const a8::Args& args) { --(HttpClientPool::Instance()->impl_->pending_num); diff --git a/f8/msgqueue.cc b/f8/msgqueue.cc index fb65e7a..d399a6c 100644 --- a/f8/msgqueue.cc +++ b/f8/msgqueue.cc @@ -33,7 +33,6 @@ namespace f8 class MsgQueueImp { public: - int curr_im_msgid = 10000; std::map msg_handlers; std::mutex im_msg_mutex_; @@ -184,12 +183,6 @@ namespace f8 return imp_->RegisterCallBack(msgid, handle_func); } - int MsgQueue::AllocIMMsgId() - { - int custom_im_msgid = ++imp_->curr_im_msgid; - return custom_im_msgid; - } - void MsgQueue::PostMsg(int msgid, const a8::Args args) { imp_->PostMsg(msgid, std::move(args)); diff --git a/f8/msgqueue.h b/f8/msgqueue.h index 60e72ba..bbea8f9 100644 --- a/f8/msgqueue.h +++ b/f8/msgqueue.h @@ -19,7 +19,6 @@ namespace f8 bool HasMsg(); CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc cb); void RemoveCallBack(CallBackHandle handle); - int AllocIMMsgId(); void PostMsg(int msgid, const a8::Args args); diff --git a/f8/types.h b/f8/types.h index 88261e4..956ac39 100644 --- a/f8/types.h +++ b/f8/types.h @@ -31,7 +31,8 @@ namespace f8 enum SysInnerMesssage_e { IM_SysBegin = 1, - IM_SysMsgQueue = 2, + IM_HttpClientPool = 2, + IM_DbPool = 3, IM_SysEnd = 99, };