From e90d758f5ed2fb1392efe70f1960e4dd23c908ed Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Sat, 24 Nov 2018 11:42:53 +0800 Subject: [PATCH] 1 --- cpp/dbpool.cc | 385 ++++++++++++++++++++++++++++---------------------- cpp/dbpool.h | 14 +- 2 files changed, 217 insertions(+), 182 deletions(-) diff --git a/cpp/dbpool.cc b/cpp/dbpool.cc index 2447a3d..947ddfb 100644 --- a/cpp/dbpool.cc +++ b/cpp/dbpool.cc @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -25,13 +26,18 @@ enum AsyncQueryError struct AsyncQueryRequest { - list_head entry; long long context_id = 0; std::string sql; +#if 1 + std::string _sql_fmt; + std::initializer_list _sql_params; + a8::XObject conn_info; +#endif a8::XParams param; time_t add_time = 0; AsyncDBOnOkFunc on_ok = nullptr; AsyncDBOnErrorFunc on_error = nullptr; + a8::TimerAttacher timer_attacher; }; struct AsyncQueryNode @@ -66,6 +72,7 @@ public: work_thread_ = new std::thread(&DBThread::WorkThreadProc, this); } + #if 0 void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql) { AsyncQueryNode *p = new AsyncQueryNode(); @@ -86,6 +93,7 @@ public: msg_mutex_->unlock(); loop_cond_->notify_all(); } + #endif void AddAsyncQuery(AsyncQueryNode* p) { @@ -107,39 +115,34 @@ private: void WorkThreadProc() { while (true) { - #if 0 - a8::mysql::Connection conn; - a8::mysql::Query* query = conn.CreateQuery(); - conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_); - InitMysqlConnection(query); - - CheckDB(conn, *query); - ProcessMsg(*query); - #endif + CheckDB(); + ProcessMsg(); WaitLoopCond(); } } - void CheckDB(a8::mysql::Connection& conn, a8::mysql::Query& query) + void CheckDB() { if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) { return; } last_checkdb_tick_ = a8::XGetTickCount(); - if (query.ExecQuery("SELECT 1;", {}) <= 0) { - #if 0 - a8::UdpLog::Instance()->Warning("mysql disconnect", {}); - if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) { - InitMysqlConnection(&query); - a8::UdpLog::Instance()->Info("mysql reconnect successed", {}); - } else { - a8::UdpLog::Instance()->Info("mysql reconnect failed", {}); + if (last_conn_ && last_query_) { + if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) { + #if 0 + a8::UdpLog::Instance()->Warning("mysql disconnect", {}); + if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) { + InitMysqlConnection(&query); + a8::UdpLog::Instance()->Info("mysql reconnect successed", {}); + } else { + a8::UdpLog::Instance()->Info("mysql reconnect failed", {}); + } + #endif } - #endif } } - void ProcessMsg(a8::mysql::Query& query) + void ProcessMsg() { if (!work_node_ && top_node_) { msg_mutex_->lock(); @@ -151,7 +154,7 @@ private: while (work_node_) { AsyncQueryNode *pdelnode = work_node_; work_node_ = work_node_->nextnode; - ProcAsyncQuery(query, pdelnode); + ProcAsyncQuery(pdelnode); delete pdelnode; } } @@ -173,29 +176,72 @@ private: } } - void ProcAsyncQuery(a8::mysql::Query& query, AsyncQueryNode* node) + bool ReCreateConn(a8::XObject& conn_info) { + if (last_query_) { + delete last_query_; + last_query_ = nullptr; + } + if (last_conn_) { + delete last_conn_; + last_conn_ = nullptr; + } + last_conn_ = new a8::mysql::Connection(); + last_query_ = last_conn_->CreateQuery(); + if (last_conn_->Connect( + conn_info.Get("host"), + conn_info.Get("port"), + conn_info.Get("user"), + conn_info.Get("passwd"), + conn_info.Get("database") + )) { + InitMysqlConnection(last_query_); + } + return true; + } + + bool NeedReCreateConn(a8::XObject& conn_info) + { + if (!last_conn_) { + return true; + } + if (last_conn_->GetHost() == conn_info.Get("host").GetString() && + last_conn_->GetPort() == conn_info.Get("port").GetInt() && + last_conn_->GetUser() == conn_info.Get("user").GetString() && + last_conn_->GetPasswd() == conn_info.Get("passwd").GetString() && + last_conn_->GetDataBase() == conn_info.Get("database").GetString() + ) { + return false; + } + return true; + } + + void ProcAsyncQuery(AsyncQueryNode* node) + { + if (NeedReCreateConn(node->conn_info)) { + ReCreateConn(node->conn_info); + } switch (node->query_type) { case 0: { - int ret = query.ExecQuery(node->sql.c_str(), {}); + int ret = last_query_->ExecQuery(node->sql.c_str(), {}); if (ret < 0) { MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AQE_SYNTAX_ERROR) - .SetParam2(query.GetError())); + .SetParam2(last_query_->GetError())); } else { DataSet* data_set = new DataSet(); - data_set->reserve(query.RowsNum()); - while (!query.Eof()) { + data_set->reserve(last_query_->RowsNum()); + while (!last_query_->Eof()) { auto& row = a8::FastAppend(*data_set); - int field_num = query.FieldsNum(); + int field_num = last_query_->FieldsNum(); row.reserve(field_num); for (int i = 0; i < field_num; i++) { - row.push_back(query.GetValue(i).GetString()); + row.push_back(last_query_->GetValue(i).GetString()); } - query.Next(); + last_query_->Next(); } MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() @@ -207,13 +253,13 @@ private: break; case 1: { - bool ret = query.ExecScript(node->sql.c_str(), {}); + bool ret = last_query_->ExecScript(node->sql.c_str(), {}); if (!ret) { MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AQE_SYNTAX_ERROR) - .SetParam2(query.GetError())); + .SetParam2(last_query_->GetError())); } else { DataSet* data_set = new DataSet(); MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, @@ -238,16 +284,13 @@ private: public: int exec_async_query_msgid = 0; + private: std::mutex *loop_mutex_ = nullptr; std::condition_variable *loop_cond_ = nullptr; - #if 0 - std::string gamedb_; - std::string dbhost_; - std::string dbuser_; - std::string dbpasswd_; - #endif + a8::mysql::Connection* last_conn_ = nullptr; + a8::mysql::Query* last_query_ = nullptr; long long last_checkdb_tick_ = 0; std::thread *work_thread_ = nullptr; @@ -257,30 +300,143 @@ private: std::mutex *msg_mutex_ = nullptr; }; +class DBPoolImpl +{ +public: + + void Init() + { +#if 1 + /*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了, + 如果有多线程并发使用mysql_init(),建议在程序初始化时空调一次mysql_init(),他的这点特性很像qsort() + */ + a8::mysql::Connection conn; +#endif + curr_seqid = 1000001; + exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId(); + } + + void SetThreadNum(int thread_num) + { + assert(thread_num > 0); + for (int i = 0; i < thread_num; i++) { + DBThread *db_thread = new DBThread(); + db_thread->exec_async_query_msgid = exec_async_query_msgid; + db_thread->Init(); + db_thread_pool.push_back(db_thread); + } + } + + AsyncQueryRequest* GetAsyncQueryRequest(long long seqid) + { + auto itr = async_query_hash.find(seqid); + return itr != async_query_hash.end() ? itr->second : nullptr; + } + + void AsyncSqlOnOk(long long seqid, DataSet* data_set) + { + AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); + if (!request) { + return; + } + if (request->on_ok) { + request->on_ok(request->param, data_set); + } + async_query_hash.erase(seqid); + delete request; + } + + void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg) + { + AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); + if (!request) { + return; + } + if (request->on_error) { + request->on_error(request->param, errcode, errmsg); + } + async_query_hash.erase(seqid); + delete request; + } + + void InternalExecAsyncSql(int exec_type, + a8::XObject& conn_info, const char* querystr, std::initializer_list& args, + a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) + { + AsyncQueryRequest* p = new AsyncQueryRequest(); + { + p->context_id = ++curr_seqid; + p->param = param; + p->sql = ""; + p->_sql_fmt = querystr; + p->_sql_params = args; + conn_info.DeepCopy(p->conn_info); + p->add_time = time(nullptr); + p->on_ok = on_ok; + p->on_error = on_error; + async_query_hash[p->context_id] = p; + } + DBThread* db_thread = GetDBThread(hash_code); + if (!db_thread) { + MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, + a8::XParams() + .SetSender(p->context_id) + .SetParam1(AQE_CONN_ERROR)); + return; + } + { + AsyncQueryNode* node = new AsyncQueryNode(); + node->socket_handle = 0; + node->query_type = exec_type; + node->context_id = p->context_id; + node->sql = ""; + node->_sql_fmt = querystr; + node->_sql_params = args; + conn_info.DeepCopy(node->conn_info); + db_thread->AddAsyncQuery(node); + } + a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, + a8::XParams() + .SetSender(this) + .SetParam1(p->context_id), + [] (const a8::XParams& param) + { + + }, + &p->timer_attacher.timer_list_); + } + + DBThread* GetDBThread(long long hash_code) + { + if (db_thread_pool.empty() || hash_code < 0) { + return nullptr; + } + return db_thread_pool[hash_code % db_thread_pool.size()]; + } + +public: + long long curr_seqid = 0; + std::map async_query_hash; + + unsigned short exec_async_query_msgid = 0; + std::vector db_thread_pool; +}; + void DBPool::Init() { - curr_seqid_ = 1000001; - #if 0 - INIT_LIST_HEAD(&query_list_); - #endif - #if 1 - /*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了, - 如果有多线程并发使用mysql_init(),建议在程序初始化时空调一次mysql_init(),他的这点特性很像qsort() - */ - a8::mysql::Connection conn; - #endif - exec_async_query_msgid_ = MsgQueue::Instance()->AllocIMMsgId(); - MsgQueue::Instance()->RegisterCallBack(exec_async_query_msgid_, + impl_ = new DBPoolImpl(); + impl_->Init(); + MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid, [] (const a8::XParams& param) { if (param.param1.GetInt() == AQE_NO_ERROR) { DataSet* data_set = (DataSet*)param.param2.GetUserData(); - DBPool::Instance()->AsyncSqlOnOk(param.sender, data_set); + DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set); delete data_set; } else { - DBPool::Instance()->AsyncSqlOnError(param.sender, - param.param1, - param.param2); + DBPool::Instance()->impl_->AsyncSqlOnError(param.sender, + param.param1, + param.param2); } } ); @@ -288,134 +444,23 @@ void DBPool::Init() void DBPool::UnInit() { - + delete impl_; + impl_ = nullptr; } void DBPool::SetThreadNum(int thread_num) { - assert(thread_num > 0); - for (int i = 0; i < thread_num; i++) { - DBThread *db_thread = new DBThread(); - db_thread->exec_async_query_msgid = exec_async_query_msgid_; - db_thread->Init(); - db_thread_pool_.push_back(db_thread); - } + impl_->SetThreadNum(thread_num); } void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::initializer_list args, a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) { - long long context_id = ++curr_seqid_; - { - AsyncQueryRequest* p = new AsyncQueryRequest(); - p->context_id = context_id; - p->param = param; - p->sql = ""; - p->add_time = time(nullptr); - p->on_ok = on_ok; - p->on_error = on_error; - #if 0 - list_add_tail(&p->entry, &query_list_); - #endif - async_query_hash_[p->context_id] = p; - } - if (db_thread_pool_.empty()) { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid_, - a8::XParams() - .SetSender(context_id) - .SetParam1(AQE_CONN_ERROR)); - return; - } - DBThread *db_thread = nullptr; - if (hash_code != 0) { - db_thread = db_thread_pool_[hash_code % db_thread_pool_.size()]; - } else { - db_thread = db_thread_pool_[rand() % db_thread_pool_.size()]; - } - { - AsyncQueryNode* node = new AsyncQueryNode(); - node->socket_handle = 0; - node->query_type = 0; - node->context_id = context_id; - node->sql = ""; - node->_sql_fmt = querystr; - node->_sql_params = args; - conn_info.DeepCopy(node->conn_info); - db_thread->AddAsyncQuery(node); - } + impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code); } void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::initializer_list args, a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) { - long long context_id = ++curr_seqid_; - { - AsyncQueryRequest* p = new AsyncQueryRequest(); - p->context_id = context_id; - p->param = param; - p->sql = ""; - p->add_time = time(nullptr); - p->on_ok = on_ok; - p->on_error = on_error; - #if 0 - list_add_tail(&p->entry, &query_list_); - #endif - async_query_hash_[p->context_id] = p; - } - if (db_thread_pool_.empty()) { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid_, - a8::XParams() - .SetSender(context_id) - .SetParam1(AQE_CONN_ERROR)); - return; - } - DBThread *db_thread = nullptr; - if (hash_code != 0) { - db_thread = db_thread_pool_[hash_code % db_thread_pool_.size()]; - } else { - db_thread = db_thread_pool_[rand() % db_thread_pool_.size()]; - } - { - AsyncQueryNode* node = new AsyncQueryNode(); - node->socket_handle = 0; - node->query_type = 1; - node->context_id = context_id; - node->sql = ""; - node->_sql_fmt = querystr; - node->_sql_params = args; - conn_info.DeepCopy(node->conn_info); - db_thread->AddAsyncQuery(node); - } -} - -AsyncQueryRequest* DBPool::GetAsyncQueryRequest(long long seqid) -{ - auto itr = async_query_hash_.find(seqid); - return itr != async_query_hash_.end() ? itr->second : nullptr; -} - -void DBPool::AsyncSqlOnOk(long long seqid, DataSet* data_set) -{ - AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); - if (!request) { - return; - } - if (request->on_ok) { - request->on_ok(request->param, data_set); - } - async_query_hash_.erase(seqid); - delete request; -} - -void DBPool::AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg) -{ - AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); - if (!request) { - return; - } - if (request->on_error) { - request->on_error(request->param, errcode, errmsg); - } - async_query_hash_.erase(seqid); - delete request; + impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code); } diff --git a/cpp/dbpool.h b/cpp/dbpool.h index df4cb3e..850ca87 100644 --- a/cpp/dbpool.h +++ b/cpp/dbpool.h @@ -4,8 +4,7 @@ typedef std::vector> DataSet; typedef void (*AsyncDBOnOkFunc)(a8::XParams& param, const DataSet* data_set); typedef void (*AsyncDBOnErrorFunc)(a8::XParams& param, int error_code, const std::string& error_msg); -struct AsyncQueryRequest; -class DBThread; +class DBPoolImpl; class DBPool : public a8::Singleton { private: @@ -25,14 +24,5 @@ class DBPool : public a8::Singleton a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code); private: - AsyncQueryRequest* GetAsyncQueryRequest(long long seqid); - void AsyncSqlOnOk(long long seqid, DataSet* data_set); - void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg); - - private: - long long curr_seqid_ = 0; - std::map async_query_hash_; - - unsigned short exec_async_query_msgid_ = 0; - std::vector db_thread_pool_; + DBPoolImpl* impl_ = nullptr; };