diff --git a/f8/dbpool.cc b/f8/dbpool.cc index f657f7c..d6f7864 100644 --- a/f8/dbpool.cc +++ b/f8/dbpool.cc @@ -244,11 +244,16 @@ namespace f8 { int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params); if (ret < 0) { - f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_SYNTAX_ERROR) - .SetParam2(last_query_->GetError())); + f8::MsgQueue::Instance()->PostMsg + (exec_async_query_msgid, + a8::Args + ( + { + node->context_id, + AQE_SYNTAX_ERROR, + last_query_->GetError() + } + )); } else { DataSet* data_set = new DataSet(); data_set->reserve(last_query_->RowsNum()); @@ -261,11 +266,16 @@ namespace f8 } last_query_->Next(); } - f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_NO_ERROR) - .SetParam2((void*)data_set)); + f8::MsgQueue::Instance()->PostMsg + (exec_async_query_msgid, + a8::Args + ( + { + node->context_id, + AQE_NO_ERROR, + data_set + } + )); } } break; @@ -273,28 +283,43 @@ namespace f8 { bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params); if (!ret) { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_SYNTAX_ERROR) - .SetParam2(last_query_->GetError())); + MsgQueue::Instance()->PostMsg + (exec_async_query_msgid, + a8::Args + ( + { + node->context_id, + AQE_SYNTAX_ERROR, + last_query_->GetError() + } + )); } else { DataSet* data_set = new DataSet(); - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_NO_ERROR) - .SetParam2((void*)data_set)); + MsgQueue::Instance()->PostMsg + (exec_async_query_msgid, + a8::Args + ( + { + node->context_id, + AQE_NO_ERROR, + data_set + } + )); } } break; default: { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_QUERY_TYPE_ERROR) - .SetParam2("wrong query type")); + MsgQueue::Instance()->PostMsg + (exec_async_query_msgid, + a8::Args + ( + { + node->context_id, + AQE_QUERY_TYPE_ERROR, + "wrong query type" + } + )); } break; } @@ -325,14 +350,8 @@ namespace f8 void Init() { - auto free_custom_msg = [] (const a8::XParams& params) - { - a8::XParams* param = (a8::XParams*)params.param1.GetUserData(); - DataSet* data_set = (DataSet*)param->param2.GetUserData(); - delete data_set; - }; curr_seqid = 1000001; - exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId(free_custom_msg); + exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId(); } void UnInit() @@ -408,10 +427,15 @@ namespace f8 } DBThread* db_thread = GetDBThread(hash_code); if (!db_thread) { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(p->context_id) - .SetParam1(AQE_CONN_ERROR)); + MsgQueue::Instance()->PostMsg + (exec_async_query_msgid, + a8::Args + ( + { + p->context_id, + AQE_CONN_ERROR + } + )); return; } { @@ -425,18 +449,24 @@ namespace f8 conn_info.DeepCopy(node->conn_info); db_thread->AddAsyncQuery(node); } - a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, - a8::XParams() - .SetSender(p->context_id) - .SetParam1(exec_async_query_msgid), - [] (const a8::XParams& param) - { - MsgQueue::Instance()->PostMsg_r(param.param1, - a8::XParams() - .SetSender(param.sender) - .SetParam1(AQE_CONN_ERROR)); - }, - &p->timer_attacher.timer_list_); + a8::Timer::Instance()->AddDeadLineTimerAndAttach + (1000 * 10, + a8::XParams() + .SetSender(p->context_id) + .SetParam1(exec_async_query_msgid), + [] (const a8::XParams& param) + { + MsgQueue::Instance()->PostMsg + (param.param1.GetInt(), + a8::Args + ( + { + param.sender.GetInt(), + AQE_CONN_ERROR + } + )); + }, + &p->timer_attacher.timer_list_); ++DBPool::Instance()->total_query_num; } @@ -470,20 +500,23 @@ namespace f8 { impl_ = new DBPoolImpl(); impl_->Init(); - MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid, - [] (const a8::XParams& param) - { - if (param.param1.GetInt() == AQE_NO_ERROR) { - DataSet* data_set = (DataSet*)param.param2.GetUserData(); - DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set); - delete data_set; - } else { - DBPool::Instance()->impl_->AsyncSqlOnError(param.sender, - param.param1, - param.param2); - } - } - ); + MsgQueue::Instance()->RegisterCallBack + (impl_->exec_async_query_msgid, + [] (const a8::Args& args) + { + #if 0 + if (param.param1.GetInt() == AQE_NO_ERROR) { + DataSet* data_set = (DataSet*)param.param2.GetUserData(); + DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set); + delete data_set; + } else { + DBPool::Instance()->impl_->AsyncSqlOnError(param.sender, + param.param1, + param.param2); + } + #endif + } + ); } void DBPool::UnInit() diff --git a/f8/httpclientpool.cc b/f8/httpclientpool.cc index db458cf..9e59752 100644 --- a/f8/httpclientpool.cc +++ b/f8/httpclientpool.cc @@ -224,27 +224,39 @@ namespace f8 if (ret) { a8::XObject* xobj = new a8::XObject(); if (xobj->ReadFromJsonString(response)) { - f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AHE_NO_ERROR) - .SetParam2((void*)xobj) - ); + f8::MsgQueue::Instance()->PostMsg + (exec_async_http_msgid, + a8::Args + ( + { + node->context_id, + AHE_NO_ERROR, + xobj + } + )); } else { - f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AHE_NO_CONN) - .SetParam2(response) - ); + f8::MsgQueue::Instance()->PostMsg + (exec_async_http_msgid, + a8::Args + ( + { + node->context_id, + AHE_NO_CONN, + response + } + )); delete xobj; } } else { - f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AHE_NO_CONN) - ); + f8::MsgQueue::Instance()->PostMsg + (exec_async_http_msgid, + a8::Args + ( + { + node->context_id, + AHE_NO_CONN + } + )); } } @@ -303,7 +315,7 @@ namespace f8 void Init() { curr_seqid = 1000001; - exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId(nullptr); + exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId(); } void UnInit() @@ -477,20 +489,23 @@ namespace f8 #endif impl_ = new HttpClientPoolImpl(); impl_->Init(); - MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_http_msgid, - [] (const a8::XParams& param) - { - --(HttpClientPool::Instance()->impl_->pending_num); - if (param.param1.GetInt() == AHE_NO_ERROR) { - a8::XObject* xobj = (a8::XObject*)param.param2.GetUserData(); - HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, *xobj); - delete xobj; - } else { - HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender, - param.param2.GetString()); - } - } - ); + MsgQueue::Instance()->RegisterCallBack + (impl_->exec_async_http_msgid, + [] (const a8::Args& args) + { + --(HttpClientPool::Instance()->impl_->pending_num); + #if 0 + if (param.param1.GetInt() == AHE_NO_ERROR) { + a8::XObject* xobj = (a8::XObject*)param.param2.GetUserData(); + HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, *xobj); + delete xobj; + } else { + HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender, + param.param2.GetString()); + } + #endif + } + ); impl_->SetThreadNum(thread_num); #if 1 { diff --git a/f8/msgqueue.cc b/f8/msgqueue.cc index d89df81..d4d1a50 100644 --- a/f8/msgqueue.cc +++ b/f8/msgqueue.cc @@ -1,5 +1,7 @@ #include +#include + #include #include @@ -16,12 +18,28 @@ namespace f8 MsgHandleFunc func; }; + struct IMMsgNode + { + unsigned short msgid; + const a8::Args args; + IMMsgNode* next = nullptr; + + IMMsgNode(const a8::Args& args1):args(std::move(args1)) + { + } + + }; + class MsgQueueImp { public: int curr_im_msgid = 10000; std::map msg_handlers; - std::map custom_free_funcs; + + std::mutex im_msg_mutex_; + IMMsgNode* im_top_node_ = nullptr; + IMMsgNode* im_bot_node_ = nullptr; + IMMsgNode* im_work_node_ = nullptr; ~MsgQueueImp() { @@ -32,6 +50,7 @@ namespace f8 } } + #if 0 void ProcessMsg(int msgid, const a8::XParams& param) { auto itr = msg_handlers.find(msgid); @@ -44,6 +63,7 @@ namespace f8 } } } + #endif CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc handle_func) { @@ -66,38 +86,95 @@ namespace f8 void MsgQueue::Init() { - imp_ = new MsgQueueImp(); + imp_ = std::make_shared(); } void MsgQueue::UnInit() { - delete imp_; - imp_ = nullptr; + #if 0 + im_msg_mutex_->lock(); + if (!im_work_node_) { + im_work_node_ = im_top_node_; + im_top_node_ = nullptr; + im_bot_node_ = nullptr; + } + while (im_work_node_) { + IMMsgNode* pdelnode = im_work_node_; + im_work_node_ = im_work_node_->next; + if (pdelnode->msgid == f8::IM_SysMsgQueue) { + a8::XParams* param = (a8::XParams*)pdelnode->params.param1.GetUserData(); + delete param; + } + delete pdelnode; + if (!im_work_node_) { + im_work_node_ = im_top_node_; + im_top_node_ = nullptr; + im_bot_node_ = nullptr; + } + } + im_msg_mutex_->unlock(); + #endif } - void MsgQueue::SendMsg(int msgid, a8::XParams param) + void MsgQueue::Update() { - imp_->ProcessMsg(msgid, param); +#if 0 + if (!im_work_node_ && im_top_node_) { + im_msg_mutex_->lock(); + im_work_node_ = im_top_node_; + im_top_node_ = nullptr; + im_bot_node_ = nullptr; + im_msg_mutex_->unlock(); + } + while (im_work_node_) { + IMMsgNode *pdelnode = im_work_node_; + switch (im_work_node_->msgid) { + case f8::IM_SysMsgQueue: + { + const a8::XParams* param = (const a8::XParams*)pdelnode->params.param1.GetUserData(); + f8::MsgQueue::Instance()->ProcessMsg(pdelnode->params.sender.GetInt(), + *param + ); + delete param; + } + break; + case IM_ClientSocketDisconnect: + { + PlayerMgr::Instance()->OnClientDisconnect(pdelnode->params); + } + break; + case IM_ExecGM: + { + HandlerMgr::Instance()->ProcGMMsg(pdelnode->params.param3, + pdelnode->params.sender, + pdelnode->params.param1.GetString(), + pdelnode->params.param2.GetString() + ); + } + break; + } + im_work_node_ = im_work_node_->next; + delete pdelnode; + } +#endif } - void MsgQueue::PostMsg(int msgid, a8::XParams param) + bool MsgQueue::HasMsg() { - param._sys_field = msgid; - a8::Timer::Instance()->AddDeadLineTimer(0, param, - [] (const a8::XParams& param) - { - MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param); - }); - } - - void MsgQueue::AddDelayMsg(int msgid, a8::XParams param, int milli_seconds) - { - param._sys_field = msgid; - a8::Timer::Instance()->AddDeadLineTimer(milli_seconds, param, - [] (const a8::XParams& param) - { - MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param); - }); + #if 0 + if (!im_work_node_) { + im_msg_mutex_.lock(); + if (!im_work_node_ && im_top_node_) { + im_work_node_ = im_top_node_; + im_top_node_ = nullptr; + im_bot_node_ = nullptr; + } + im_msg_mutex_.unlock(); + } + if (im_work_node_) { + return true; + } + #endif } void MsgQueue::RemoveCallBack(CallBackHandle handle) @@ -113,34 +190,30 @@ namespace f8 return imp_->RegisterCallBack(msgid, handle_func); } - int MsgQueue::AllocIMMsgId(CustomIMMsgFreeFunc free_func) + int MsgQueue::AllocIMMsgId() { int custom_im_msgid = ++imp_->curr_im_msgid; - if (free_func) { - imp_->custom_free_funcs[custom_im_msgid] = free_func; - } return custom_im_msgid; } - void MsgQueue::FreeCustomIMMsg(a8::XParams& param) + void MsgQueue::PostMsg(int msgid, const a8::Args args) { - auto itr = imp_->custom_free_funcs.find(param.sender.GetInt()); - if (itr != imp_->custom_free_funcs.end()) { - itr->second(param); - } - } - - void MsgQueue::ProcessMsg(int msgid, const a8::XParams& param) - { - imp_->ProcessMsg(msgid, param); - } - - void MsgQueue::PostMsg_r(int msgid, a8::XParams param) - { - a8::XParams* p = new a8::XParams(); - param.DeepCopy(*p); #if 0 - App::Instance()->AddIMMsg(f8::IM_SysMsgQueue, a8::XParams().SetSender(msgid).SetParam1((void*)p)); + IMMsgNode *p = new IMMsgNode; + p->msgid = imcmd; + p->params = params; + p->next = nullptr; + im_msg_mutex_->lock(); + if (im_bot_node_) { + im_bot_node_->next = p; + im_bot_node_ = p; + } else { + im_top_node_ = p; + im_bot_node_ = p; + } + im_msg_mutex_->unlock(); + NotifyLoopCond(); #endif } + } diff --git a/f8/msgqueue.h b/f8/msgqueue.h index e22ebf4..60e72ba 100644 --- a/f8/msgqueue.h +++ b/f8/msgqueue.h @@ -2,11 +2,9 @@ namespace f8 { - typedef std::function MsgHandleFunc; - typedef void CustomIMMsgFreeFunc(const a8::XParams& param); + typedef std::function MsgHandleFunc; typedef list_head* CallBackHandle; - class MsgQueueImp; class MsgQueue : public a8::Singleton { private: @@ -16,20 +14,17 @@ namespace f8 public: void Init(); void UnInit(); + void Update(); - void SendMsg(int msgid, a8::XParams param); - void PostMsg(int msgid, a8::XParams param); - void AddDelayMsg(int msgid, a8::XParams param, int milli_seconds); + bool HasMsg(); + CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc cb); void RemoveCallBack(CallBackHandle handle); - CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc handle_func); - int AllocIMMsgId(CustomIMMsgFreeFunc free_func); - void FreeCustomIMMsg(a8::XParams& param); - void ProcessMsg(int msgid, const a8::XParams& param); + int AllocIMMsgId(); - //线程安全版本 - void PostMsg_r(int msgid, a8::XParams param); + void PostMsg(int msgid, const a8::Args args); private: - MsgQueueImp* imp_ = nullptr; + std::shared_ptr imp_; }; + }