#include "precompile.h" #include #include #include #include #include #include #include #include #include #include #include "framework/cpp/dbpool.h" #include "framework/cpp/msgqueue.h" #include "framework/cpp/utils.h" namespace f8 { enum AsyncQueryError { AQE_NO_ERROR = 0, AQE_EXEC_TYPE_ERROR = 1, AQE_QUERY_TYPE_ERROR = 2, AQE_SYNTAX_ERROR = 3, AQE_CONN_ERROR = 4 }; struct AsyncQueryRequest { long long context_id = 0; std::string sql; #if 1 std::string _sql_fmt; std::vector _sql_params; a8::XObject conn_info; #endif a8::XParams param; time_t add_time = 0; AsyncDBOnOkFunc on_ok = nullptr; AsyncDBOnErrorFunc on_error = nullptr; a8::TimerAttacher timer_attacher; }; struct AsyncQueryNode { int socket_handle = 0; int query_type = 0; long long context_id = 0; std::string sql; #if 1 std::string _sql_fmt; std::vector _sql_params; a8::XObject conn_info; #endif AsyncQueryNode* nextnode = nullptr; }; class DBThread { public: void Init() { loop_mutex_ = new std::mutex(); loop_cond_ = new std::condition_variable(); last_checkdb_tick_ = a8::XGetTickCount(); top_node_ = nullptr; bot_node_ = nullptr; work_node_ = nullptr; msg_mutex_ = new std::mutex(); work_thread_ = new std::thread(&DBThread::WorkThreadProc, this); } void UnInit() { terminated_ = true; work_thread_->join(); delete work_thread_; work_thread_ = nullptr; delete msg_mutex_; msg_mutex_ = nullptr; delete loop_cond_; loop_cond_ = nullptr; delete loop_mutex_; loop_mutex_ = nullptr; } void AddAsyncQuery(AsyncQueryNode* p) { std::unique_lock lk(*loop_mutex_); msg_mutex_->lock(); if (bot_node_) { bot_node_->nextnode = p; bot_node_ = p; } else { top_node_ = p; bot_node_ = p; } msg_mutex_->unlock(); loop_cond_->notify_all(); } private: void WorkThreadProc() { mysql_thread_init(); while (!terminated_) { CheckDB(); ProcessMsg(); ++DBPool::Instance()->run_loop_num; WaitLoopCond(); } if (last_query_) { delete last_query_; last_query_ = nullptr; } if (last_conn_) { delete last_conn_; last_conn_ = nullptr; } mysql_thread_end(); } void CheckDB() { if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) { return; } last_checkdb_tick_ = a8::XGetTickCount(); if (last_conn_ && last_query_) { if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) { #if 0 a8::UdpLog::Instance()->Warning("mysql disconnect", {}); if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) { InitMysqlConnection(&query); a8::UdpLog::Instance()->Info("mysql reconnect successed", {}); } else { a8::UdpLog::Instance()->Info("mysql reconnect failed", {}); } #endif } } } void ProcessMsg() { if (!work_node_ && top_node_) { msg_mutex_->lock(); work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; msg_mutex_->unlock(); } while (work_node_) { AsyncQueryNode *pdelnode = work_node_; work_node_ = work_node_->nextnode; ProcAsyncQuery(pdelnode); ++DBPool::Instance()->exec_query_num; delete pdelnode; } } void WaitLoopCond() { std::unique_lock lk(*loop_mutex_); { msg_mutex_->lock(); if (!work_node_ && top_node_) { work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; } msg_mutex_->unlock(); } if (!work_node_) { loop_cond_->wait_for(lk, std::chrono::seconds(10)); } } bool ReCreateConn(a8::XObject& conn_info) { if (last_query_) { delete last_query_; last_query_ = nullptr; } if (last_conn_) { delete last_conn_; last_conn_ = nullptr; } last_conn_ = new a8::mysql::Connection(); last_query_ = last_conn_->CreateQuery(); if (last_conn_->Connect( conn_info.Get("host"), conn_info.Get("port"), conn_info.Get("user"), conn_info.Get("passwd"), conn_info.Get("database") )) { f8::InitMysqlConnection(last_query_); } return true; } bool NeedReCreateConn(a8::XObject& conn_info) { if (!last_conn_) { return true; } if (last_conn_->GetHost() == conn_info.Get("host").GetString() && last_conn_->GetPort() == conn_info.Get("port").GetInt() && last_conn_->GetUser() == conn_info.Get("user").GetString() && last_conn_->GetPasswd() == conn_info.Get("passwd").GetString() && last_conn_->GetDataBase() == conn_info.Get("database").GetString() ) { return false; } return true; } void ProcAsyncQuery(AsyncQueryNode* node) { if (NeedReCreateConn(node->conn_info)) { ReCreateConn(node->conn_info); } switch (node->query_type) { case 0: { int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params); if (ret < 0) { f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AQE_SYNTAX_ERROR) .SetParam2(last_query_->GetError())); } else { DataSet* data_set = new DataSet(); data_set->reserve(last_query_->RowsNum()); while (!last_query_->Eof()) { auto& row = a8::FastAppend(*data_set); int field_num = last_query_->FieldsNum(); row.reserve(field_num); for (int i = 0; i < field_num; i++) { row.push_back(last_query_->GetValue(i).GetString()); } last_query_->Next(); } f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AQE_NO_ERROR) .SetParam2((void*)data_set)); } } break; case 1: { bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params); if (!ret) { MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AQE_SYNTAX_ERROR) .SetParam2(last_query_->GetError())); } else { DataSet* data_set = new DataSet(); MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AQE_NO_ERROR) .SetParam2((void*)data_set)); } } break; default: { MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AQE_QUERY_TYPE_ERROR) .SetParam2("wrong query type")); } break; } } public: int exec_async_query_msgid = 0; private: volatile bool terminated_ = false; std::mutex *loop_mutex_ = nullptr; std::condition_variable *loop_cond_ = nullptr; a8::mysql::Connection* last_conn_ = nullptr; a8::mysql::Query* last_query_ = nullptr; long long last_checkdb_tick_ = 0; std::thread *work_thread_ = nullptr; AsyncQueryNode *top_node_ = nullptr; AsyncQueryNode *bot_node_ = nullptr; AsyncQueryNode *work_node_ = nullptr; std::mutex *msg_mutex_ = nullptr; }; class DBPoolImpl { public: void Init() { auto free_custom_msg = [] (const a8::XParams& params) { a8::XParams* param = (a8::XParams*)params.param1.GetUserData(); DataSet* data_set = (DataSet*)param->param2.GetUserData(); delete data_set; }; curr_seqid = 1000001; exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId(free_custom_msg); } void UnInit() { for (auto& db_thread : db_thread_pool) { db_thread->UnInit(); delete db_thread; } for (auto& pair : async_query_hash) { delete pair.second; } async_query_hash.clear(); } void SetThreadNum(int thread_num) { assert(thread_num > 0); for (int i = 0; i < thread_num; i++) { DBThread *db_thread = new DBThread(); db_thread->exec_async_query_msgid = exec_async_query_msgid; db_thread->Init(); db_thread_pool.push_back(db_thread); } } void AsyncSqlOnOk(long long seqid, DataSet* data_set) { AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); if (!request) { return; } if (request->on_ok) { request->on_ok(request->param, data_set); } async_query_hash.erase(seqid); delete request; } void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg) { AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); if (!request) { return; } if (DBPool::Instance()->on_dberror) { DBPool::Instance()->on_dberror(request->param, errcode, errmsg); } if (request->on_error) { request->on_error(request->param, errcode, errmsg); } async_query_hash.erase(seqid); delete request; } void InternalExecAsyncSql(int exec_type, a8::XObject& conn_info, const char* querystr, std::vector& args, a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) { AsyncQueryRequest* p = new AsyncQueryRequest(); { p->context_id = ++curr_seqid; p->param = param; p->sql = ""; p->_sql_fmt = querystr; p->_sql_params = args; conn_info.DeepCopy(p->conn_info); p->add_time = time(nullptr); p->on_ok = on_ok; p->on_error = on_error; async_query_hash[p->context_id] = p; } DBThread* db_thread = GetDBThread(hash_code); if (!db_thread) { MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() .SetSender(p->context_id) .SetParam1(AQE_CONN_ERROR)); return; } { AsyncQueryNode* node = new AsyncQueryNode(); node->socket_handle = 0; node->query_type = exec_type; node->context_id = p->context_id; node->sql = ""; node->_sql_fmt = querystr; node->_sql_params = args; conn_info.DeepCopy(node->conn_info); db_thread->AddAsyncQuery(node); } a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, a8::XParams() .SetSender(p->context_id) .SetParam1(exec_async_query_msgid), [] (const a8::XParams& param) { MsgQueue::Instance()->PostMsg_r(param.param1, a8::XParams() .SetSender(param.sender) .SetParam1(AQE_CONN_ERROR)); }, &p->timer_attacher.timer_list_); ++DBPool::Instance()->total_query_num; } private: AsyncQueryRequest* GetAsyncQueryRequest(long long seqid) { auto itr = async_query_hash.find(seqid); return itr != async_query_hash.end() ? itr->second : nullptr; } DBThread* GetDBThread(long long hash_code) { if (db_thread_pool.empty() || hash_code < 0) { return nullptr; } return db_thread_pool[hash_code % db_thread_pool.size()]; } public: unsigned short exec_async_query_msgid = 0; private: long long curr_seqid = 0; std::map async_query_hash; std::vector db_thread_pool; }; void DBPool::Init() { impl_ = new DBPoolImpl(); impl_->Init(); MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid, [] (const a8::XParams& param) { if (param.param1.GetInt() == AQE_NO_ERROR) { DataSet* data_set = (DataSet*)param.param2.GetUserData(); DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set); delete data_set; } else { DBPool::Instance()->impl_->AsyncSqlOnError(param.sender, param.param1, param.param2); } } ); } void DBPool::UnInit() { impl_->UnInit(); delete impl_; impl_ = nullptr; } void DBPool::SetThreadNum(int thread_num) { impl_->SetThreadNum(thread_num); } void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector args, a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) { impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code); } void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector args, a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) { impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code); } }