diff --git a/cpp/dbpool.cc b/cpp/dbpool.cc index 0712ce5..ad7e2b2 100644 --- a/cpp/dbpool.cc +++ b/cpp/dbpool.cc @@ -15,301 +15,304 @@ #include "framework/cpp/msgqueue.h" #include "framework/cpp/utils.h" -enum AsyncQueryError +namespace f8 { - AQE_NO_ERROR = 0, - AQE_EXEC_TYPE_ERROR = 1, - AQE_QUERY_TYPE_ERROR = 2, - AQE_SYNTAX_ERROR = 3, - AQE_CONN_ERROR = 4 -}; -struct AsyncQueryRequest -{ - long long context_id = 0; - std::string sql; + enum AsyncQueryError + { + AQE_NO_ERROR = 0, + AQE_EXEC_TYPE_ERROR = 1, + AQE_QUERY_TYPE_ERROR = 2, + AQE_SYNTAX_ERROR = 3, + AQE_CONN_ERROR = 4 + }; + + struct AsyncQueryRequest + { + long long context_id = 0; + std::string sql; #if 1 - std::string _sql_fmt; - std::vector _sql_params; - a8::XObject conn_info; + std::string _sql_fmt; + std::vector _sql_params; + a8::XObject conn_info; #endif - a8::XParams param; - time_t add_time = 0; - AsyncDBOnOkFunc on_ok = nullptr; - AsyncDBOnErrorFunc on_error = nullptr; - a8::TimerAttacher timer_attacher; -}; + a8::XParams param; + time_t add_time = 0; + AsyncDBOnOkFunc on_ok = nullptr; + AsyncDBOnErrorFunc on_error = nullptr; + a8::TimerAttacher timer_attacher; + }; -struct AsyncQueryNode -{ - int socket_handle = 0; - int query_type = 0; - long long context_id = 0; - std::string sql; + struct AsyncQueryNode + { + int socket_handle = 0; + int query_type = 0; + long long context_id = 0; + std::string sql; #if 1 - std::string _sql_fmt; - std::vector _sql_params; - a8::XObject conn_info; + std::string _sql_fmt; + std::vector _sql_params; + a8::XObject conn_info; #endif - AsyncQueryNode* nextnode = nullptr; -}; + AsyncQueryNode* nextnode = nullptr; + }; -class DBThread -{ -public: - - void Init() + class DBThread { - loop_mutex_ = new std::mutex(); - loop_cond_ = new std::condition_variable(); + public: - last_checkdb_tick_ = a8::XGetTickCount(); + void Init() + { + loop_mutex_ = new std::mutex(); + loop_cond_ = new std::condition_variable(); - top_node_ = nullptr; - bot_node_ = nullptr; - work_node_ = nullptr; - msg_mutex_ = new std::mutex(); - work_thread_ = new std::thread(&DBThread::WorkThreadProc, this); - } + last_checkdb_tick_ = a8::XGetTickCount(); - #if 0 - void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql) - { - AsyncQueryNode *p = new AsyncQueryNode(); - p->query_type = query_type; - p->socket_handle = sockhandle; - p->context_id = context_id; - p->sql = sql; - - std::unique_lock lk(*loop_mutex_); - msg_mutex_->lock(); - if (bot_node_) { - bot_node_->nextnode = p; - bot_node_ = p; - } else { - top_node_ = p; - bot_node_ = p; - } - msg_mutex_->unlock(); - loop_cond_->notify_all(); - } - #endif - - void AddAsyncQuery(AsyncQueryNode* p) - { - std::unique_lock lk(*loop_mutex_); - msg_mutex_->lock(); - if (bot_node_) { - bot_node_->nextnode = p; - bot_node_ = p; - } else { - top_node_ = p; - bot_node_ = p; - } - msg_mutex_->unlock(); - loop_cond_->notify_all(); - } - -private: - - void WorkThreadProc() - { - while (true) { - CheckDB(); - ProcessMsg(); - WaitLoopCond(); - } - } - - void CheckDB() - { - if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) { - return; - } - last_checkdb_tick_ = a8::XGetTickCount(); - if (last_conn_ && last_query_) { - if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) { - #if 0 - a8::UdpLog::Instance()->Warning("mysql disconnect", {}); - if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) { - InitMysqlConnection(&query); - a8::UdpLog::Instance()->Info("mysql reconnect successed", {}); - } else { - a8::UdpLog::Instance()->Info("mysql reconnect failed", {}); - } - #endif - } - } - } - - void ProcessMsg() - { - if (!work_node_ && top_node_) { - msg_mutex_->lock(); - work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; - msg_mutex_->unlock(); + work_node_ = nullptr; + msg_mutex_ = new std::mutex(); + work_thread_ = new std::thread(&DBThread::WorkThreadProc, this); } - while (work_node_) { - AsyncQueryNode *pdelnode = work_node_; - work_node_ = work_node_->nextnode; - ProcAsyncQuery(pdelnode); - delete pdelnode; - } - } - void WaitLoopCond() - { - std::unique_lock lk(*loop_mutex_); +#if 0 + void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql) { + AsyncQueryNode *p = new AsyncQueryNode(); + p->query_type = query_type; + p->socket_handle = sockhandle; + p->context_id = context_id; + p->sql = sql; + + std::unique_lock lk(*loop_mutex_); msg_mutex_->lock(); + if (bot_node_) { + bot_node_->nextnode = p; + bot_node_ = p; + } else { + top_node_ = p; + bot_node_ = p; + } + msg_mutex_->unlock(); + loop_cond_->notify_all(); + } +#endif + + void AddAsyncQuery(AsyncQueryNode* p) + { + std::unique_lock lk(*loop_mutex_); + msg_mutex_->lock(); + if (bot_node_) { + bot_node_->nextnode = p; + bot_node_ = p; + } else { + top_node_ = p; + bot_node_ = p; + } + msg_mutex_->unlock(); + loop_cond_->notify_all(); + } + + private: + + void WorkThreadProc() + { + while (true) { + CheckDB(); + ProcessMsg(); + WaitLoopCond(); + } + } + + void CheckDB() + { + if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) { + return; + } + last_checkdb_tick_ = a8::XGetTickCount(); + if (last_conn_ && last_query_) { + if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) { +#if 0 + a8::UdpLog::Instance()->Warning("mysql disconnect", {}); + if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) { + InitMysqlConnection(&query); + a8::UdpLog::Instance()->Info("mysql reconnect successed", {}); + } else { + a8::UdpLog::Instance()->Info("mysql reconnect failed", {}); + } +#endif + } + } + } + + void ProcessMsg() + { if (!work_node_ && top_node_) { + msg_mutex_->lock(); work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; + msg_mutex_->unlock(); + } + while (work_node_) { + AsyncQueryNode *pdelnode = work_node_; + work_node_ = work_node_->nextnode; + ProcAsyncQuery(pdelnode); + delete pdelnode; } - msg_mutex_->unlock(); } - if (!work_node_) { - loop_cond_->wait_for(lk, std::chrono::seconds(10)); - } - } - bool ReCreateConn(a8::XObject& conn_info) - { - if (last_query_) { - delete last_query_; - last_query_ = nullptr; + void WaitLoopCond() + { + std::unique_lock lk(*loop_mutex_); + { + msg_mutex_->lock(); + if (!work_node_ && top_node_) { + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + } + msg_mutex_->unlock(); + } + if (!work_node_) { + loop_cond_->wait_for(lk, std::chrono::seconds(10)); + } } - if (last_conn_) { - delete last_conn_; - last_conn_ = nullptr; - } - last_conn_ = new a8::mysql::Connection(); - last_query_ = last_conn_->CreateQuery(); - if (last_conn_->Connect( - conn_info.Get("host"), - conn_info.Get("port"), - conn_info.Get("user"), - conn_info.Get("passwd"), - conn_info.Get("database") - )) { - f8::InitMysqlConnection(last_query_); - } - return true; - } - bool NeedReCreateConn(a8::XObject& conn_info) - { - if (!last_conn_) { + bool ReCreateConn(a8::XObject& conn_info) + { + if (last_query_) { + delete last_query_; + last_query_ = nullptr; + } + if (last_conn_) { + delete last_conn_; + last_conn_ = nullptr; + } + last_conn_ = new a8::mysql::Connection(); + last_query_ = last_conn_->CreateQuery(); + if (last_conn_->Connect( + conn_info.Get("host"), + conn_info.Get("port"), + conn_info.Get("user"), + conn_info.Get("passwd"), + conn_info.Get("database") + )) { + f8::InitMysqlConnection(last_query_); + } return true; } - if (last_conn_->GetHost() == conn_info.Get("host").GetString() && - last_conn_->GetPort() == conn_info.Get("port").GetInt() && - last_conn_->GetUser() == conn_info.Get("user").GetString() && - last_conn_->GetPasswd() == conn_info.Get("passwd").GetString() && - last_conn_->GetDataBase() == conn_info.Get("database").GetString() - ) { - return false; - } - return true; - } - void ProcAsyncQuery(AsyncQueryNode* node) - { - if (NeedReCreateConn(node->conn_info)) { - ReCreateConn(node->conn_info); + bool NeedReCreateConn(a8::XObject& conn_info) + { + if (!last_conn_) { + return true; + } + if (last_conn_->GetHost() == conn_info.Get("host").GetString() && + last_conn_->GetPort() == conn_info.Get("port").GetInt() && + last_conn_->GetUser() == conn_info.Get("user").GetString() && + last_conn_->GetPasswd() == conn_info.Get("passwd").GetString() && + last_conn_->GetDataBase() == conn_info.Get("database").GetString() + ) { + return false; + } + return true; } - switch (node->query_type) { - case 0: - { - int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params); - if (ret < 0) { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_SYNTAX_ERROR) - .SetParam2(last_query_->GetError())); - } else { - DataSet* data_set = new DataSet(); - data_set->reserve(last_query_->RowsNum()); - while (!last_query_->Eof()) { - auto& row = a8::FastAppend(*data_set); - int field_num = last_query_->FieldsNum(); - row.reserve(field_num); - for (int i = 0; i < field_num; i++) { - row.push_back(last_query_->GetValue(i).GetString()); + + void ProcAsyncQuery(AsyncQueryNode* node) + { + if (NeedReCreateConn(node->conn_info)) { + ReCreateConn(node->conn_info); + } + switch (node->query_type) { + case 0: + { + int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params); + if (ret < 0) { + f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, + a8::XParams() + .SetSender(node->context_id) + .SetParam1(AQE_SYNTAX_ERROR) + .SetParam2(last_query_->GetError())); + } else { + DataSet* data_set = new DataSet(); + data_set->reserve(last_query_->RowsNum()); + while (!last_query_->Eof()) { + auto& row = a8::FastAppend(*data_set); + int field_num = last_query_->FieldsNum(); + row.reserve(field_num); + for (int i = 0; i < field_num; i++) { + row.push_back(last_query_->GetValue(i).GetString()); + } + last_query_->Next(); } - last_query_->Next(); + f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, + a8::XParams() + .SetSender(node->context_id) + .SetParam1(AQE_NO_ERROR) + .SetParam2((void*)data_set)); } - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_NO_ERROR) - .SetParam2((void*)data_set)); } - } - break; - case 1: - { - bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params); - if (!ret) { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_SYNTAX_ERROR) - .SetParam2(last_query_->GetError())); - } else { - DataSet* data_set = new DataSet(); - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_NO_ERROR) - .SetParam2((void*)data_set)); + break; + case 1: + { + bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params); + if (!ret) { + MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, + a8::XParams() + .SetSender(node->context_id) + .SetParam1(AQE_SYNTAX_ERROR) + .SetParam2(last_query_->GetError())); + } else { + DataSet* data_set = new DataSet(); + MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, + a8::XParams() + .SetSender(node->context_id) + .SetParam1(AQE_NO_ERROR) + .SetParam2((void*)data_set)); + } } + break; + default: + { + MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, + a8::XParams() + .SetSender(node->context_id) + .SetParam1(AQE_QUERY_TYPE_ERROR) + .SetParam2("不可识别的query类型")); + } + break; } - break; - default: - { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(node->context_id) - .SetParam1(AQE_QUERY_TYPE_ERROR) - .SetParam2("不可识别的query类型")); - } - break; } - } -public: - int exec_async_query_msgid = 0; + public: + int exec_async_query_msgid = 0; -private: - std::mutex *loop_mutex_ = nullptr; - std::condition_variable *loop_cond_ = nullptr; + private: + std::mutex *loop_mutex_ = nullptr; + std::condition_variable *loop_cond_ = nullptr; - a8::mysql::Connection* last_conn_ = nullptr; - a8::mysql::Query* last_query_ = nullptr; - long long last_checkdb_tick_ = 0; + a8::mysql::Connection* last_conn_ = nullptr; + a8::mysql::Query* last_query_ = nullptr; + long long last_checkdb_tick_ = 0; - std::thread *work_thread_ = nullptr; - AsyncQueryNode *top_node_ = nullptr; - AsyncQueryNode *bot_node_ = nullptr; - AsyncQueryNode *work_node_ = nullptr; - std::mutex *msg_mutex_ = nullptr; -}; + std::thread *work_thread_ = nullptr; + AsyncQueryNode *top_node_ = nullptr; + AsyncQueryNode *bot_node_ = nullptr; + AsyncQueryNode *work_node_ = nullptr; + std::mutex *msg_mutex_ = nullptr; + }; -class DBPoolImpl -{ -public: - - void Init() + class DBPoolImpl { + public: + + void Init() + { #if 1 - /*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了, - 如果有多线程并发使用mysql_init(),建议在程序初始化时空调一次mysql_init(),他的这点特性很像qsort() - */ + /*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了, + 如果有多线程并发使用mysql_init(),建议在程序初始化时空调一次mysql_init(),他的这点特性很像qsort() + */ a8::mysql::Connection conn; #endif curr_seqid = 1000001; @@ -464,3 +467,4 @@ void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::v { impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code); } +} diff --git a/cpp/dbpool.h b/cpp/dbpool.h index 1fe1db7..8a415c9 100644 --- a/cpp/dbpool.h +++ b/cpp/dbpool.h @@ -1,28 +1,31 @@ #pragma once -typedef std::vector> DataSet; -typedef void (*AsyncDBOnOkFunc)(a8::XParams& param, const DataSet* data_set); -typedef void (*AsyncDBOnErrorFunc)(a8::XParams& param, int error_code, const std::string& error_msg); - -class DBPoolImpl; -class DBPool : public a8::Singleton +namespace f8 { - private: - DBPool() {}; - friend class a8::Singleton; + typedef std::vector> DataSet; + typedef void (*AsyncDBOnOkFunc)(a8::XParams& param, const DataSet* data_set); + typedef void (*AsyncDBOnErrorFunc)(a8::XParams& param, int error_code, const std::string& error_msg); - public: - void Init(); - void UnInit(); - void SetThreadNum(int thread_num); + class DBPoolImpl; + class DBPool : public a8::Singleton + { + private: + DBPool() {}; + friend class a8::Singleton; - //执行异步并行查询 - void ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector args, - a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code); - //执行异步并行sql - void ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector args, - a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code); + public: + void Init(); + void UnInit(); + void SetThreadNum(int thread_num); - private: - DBPoolImpl* impl_ = nullptr; -}; + //执行异步并行查询 + void ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector args, + a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code); + //执行异步并行sql + void ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector args, + a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code); + + private: + DBPoolImpl* impl_ = nullptr; + }; +} diff --git a/cpp/msgqueue.cc b/cpp/msgqueue.cc index fe8ac0e..7ab5660 100644 --- a/cpp/msgqueue.cc +++ b/cpp/msgqueue.cc @@ -5,112 +5,115 @@ #include "framework/cpp/msgqueue.h" #include "app.h" -struct MsgQueueNode +namespace f8 { - struct list_head entry; - MsgHandleFunc func; -}; - -class MsgQueueImp -{ -public: - int curr_im_msgid = 10000; - std::map msg_handlers; - - void ProcessMsg(int msgid, const a8::XParams& param) + struct MsgQueueNode { - auto itr = msg_handlers.find(msgid); - if (itr != msg_handlers.end()) { - list_head* head = &itr->second; - struct MsgQueueNode *node = nullptr; - struct MsgQueueNode *tmp = nullptr; - list_for_each_entry_safe(node, tmp, head, entry) { - node->func(param); + struct list_head entry; + MsgHandleFunc func; + }; + + class MsgQueueImp + { + public: + int curr_im_msgid = 10000; + std::map msg_handlers; + + void ProcessMsg(int msgid, const a8::XParams& param) + { + auto itr = msg_handlers.find(msgid); + if (itr != msg_handlers.end()) { + list_head* head = &itr->second; + struct MsgQueueNode *node = nullptr; + struct MsgQueueNode *tmp = nullptr; + list_for_each_entry_safe(node, tmp, head, entry) { + node->func(param); + } } } - } - CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc handle_func) - { - MsgQueueNode* node = new MsgQueueNode(); - INIT_LIST_HEAD(&node->entry); - node->func = handle_func; + CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc handle_func) + { + MsgQueueNode* node = new MsgQueueNode(); + INIT_LIST_HEAD(&node->entry); + node->func = handle_func; - auto itr = msg_handlers.find(msgid); - if (itr == msg_handlers.end()) { - msg_handlers[msgid] = list_head(); - itr = msg_handlers.find(msgid); - assert(itr != msg_handlers.end()); - INIT_LIST_HEAD(&itr->second); + auto itr = msg_handlers.find(msgid); + if (itr == msg_handlers.end()) { + msg_handlers[msgid] = list_head(); + itr = msg_handlers.find(msgid); + assert(itr != msg_handlers.end()); + INIT_LIST_HEAD(&itr->second); + } + list_add_tail(&node->entry, &itr->second); + return &node->entry; } - list_add_tail(&node->entry, &itr->second); - return &node->entry; + + }; + + void MsgQueue::Init() + { + imp_ = new MsgQueueImp(); } -}; + void MsgQueue::UnInit() + { + delete imp_; + imp_ = nullptr; + } -void MsgQueue::Init() -{ - imp_ = new MsgQueueImp(); -} + void MsgQueue::SendMsg(int msgid, a8::XParams param) + { + imp_->ProcessMsg(msgid, param); + } -void MsgQueue::UnInit() -{ - delete imp_; - imp_ = nullptr; -} + void MsgQueue::PostMsg(int msgid, a8::XParams param) + { + param._sys_field = msgid; + a8::Timer::Instance()->AddDeadLineTimer(0, param, + [] (const a8::XParams& param) + { + MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param); + }); + } -void MsgQueue::SendMsg(int msgid, a8::XParams param) -{ - imp_->ProcessMsg(msgid, param); -} + void MsgQueue::AddDelayMsg(int msgid, a8::XParams param, int milli_seconds) + { + param._sys_field = msgid; + a8::Timer::Instance()->AddDeadLineTimer(milli_seconds, param, + [] (const a8::XParams& param) + { + MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param); + }); + } -void MsgQueue::PostMsg(int msgid, a8::XParams param) -{ - param._sys_field = msgid; - a8::Timer::Instance()->AddDeadLineTimer(0, param, - [] (const a8::XParams& param) - { - MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param); - }); -} + void MsgQueue::RemoveCallBack(CallBackHandle handle) + { + list_head* head = handle; + MsgQueueNode* node = list_entry(head, struct MsgQueueNode, entry); + list_del_init(&node->entry); + delete node; + } -void MsgQueue::AddDelayMsg(int msgid, a8::XParams param, int milli_seconds) -{ - param._sys_field = msgid; - a8::Timer::Instance()->AddDeadLineTimer(milli_seconds, param, - [] (const a8::XParams& param) - { - MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param); - }); -} + CallBackHandle MsgQueue::RegisterCallBack(int msgid, MsgHandleFunc handle_func) + { + return imp_->RegisterCallBack(msgid, handle_func); + } -void MsgQueue::RemoveCallBack(CallBackHandle handle) -{ - list_head* head = handle; - MsgQueueNode* node = list_entry(head, struct MsgQueueNode, entry); - list_del_init(&node->entry); - delete node; -} + int MsgQueue::AllocIMMsgId() + { + return ++imp_->curr_im_msgid; + } -CallBackHandle MsgQueue::RegisterCallBack(int msgid, MsgHandleFunc handle_func) -{ - return imp_->RegisterCallBack(msgid, handle_func); -} + void MsgQueue::ProcessMsg(int msgid, const a8::XParams& param) + { + imp_->ProcessMsg(msgid, param); + } -int MsgQueue::AllocIMMsgId() -{ - return ++imp_->curr_im_msgid; -} - -void MsgQueue::ProcessMsg(int msgid, const a8::XParams& param) -{ - imp_->ProcessMsg(msgid, param); -} - -void MsgQueue::PostMsg_r(int msgid, a8::XParams param) -{ - a8::XParams* p = new a8::XParams(); - param.DeepCopy(*p); - App::Instance()->AddIMMsg(f8::IM_SysMsgQueue, a8::XParams().SetSender(msgid).SetParam1((void*)p)); + void MsgQueue::PostMsg_r(int msgid, a8::XParams param) + { + a8::XParams* p = new a8::XParams(); + param.DeepCopy(*p); + App::Instance()->AddIMMsg(f8::IM_SysMsgQueue, a8::XParams().SetSender(msgid).SetParam1((void*)p)); + } } diff --git a/cpp/msgqueue.h b/cpp/msgqueue.h index 5e23737..b4d39d6 100644 --- a/cpp/msgqueue.h +++ b/cpp/msgqueue.h @@ -1,30 +1,33 @@ #pragma once -typedef std::function MsgHandleFunc; -typedef list_head* CallBackHandle; - -class MsgQueueImp; -class MsgQueue : public a8::Singleton +namespace f8 { - private: - MsgQueue() {}; - friend class a8::Singleton; + typedef std::function MsgHandleFunc; + typedef list_head* CallBackHandle; - public: - void Init(); - void UnInit(); + class MsgQueueImp; + class MsgQueue : public a8::Singleton + { + private: + MsgQueue() {}; + friend class a8::Singleton; - void SendMsg(int msgid, a8::XParams param); - void PostMsg(int msgid, a8::XParams param); - void AddDelayMsg(int msgid, a8::XParams param, int milli_seconds); - void RemoveCallBack(CallBackHandle handle); - CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc handle_func); - int AllocIMMsgId(); - void ProcessMsg(int msgid, const a8::XParams& param); + public: + void Init(); + void UnInit(); - //线程安全版本 - void PostMsg_r(int msgid, a8::XParams param); + void SendMsg(int msgid, a8::XParams param); + void PostMsg(int msgid, a8::XParams param); + void AddDelayMsg(int msgid, a8::XParams param, int milli_seconds); + void RemoveCallBack(CallBackHandle handle); + CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc handle_func); + int AllocIMMsgId(); + void ProcessMsg(int msgid, const a8::XParams& param); - private: - MsgQueueImp* imp_ = nullptr; -}; + //线程安全版本 + void PostMsg_r(int msgid, a8::XParams param); + + private: + MsgQueueImp* imp_ = nullptr; + }; +}