From 0891ce5580a5a3ee0875d3af9389971aa1d8f663 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Mon, 11 Feb 2019 15:13:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cpp/dbpool.cc | 272 +++++++++++++++++++++++++------------------------- 1 file changed, 136 insertions(+), 136 deletions(-) diff --git a/cpp/dbpool.cc b/cpp/dbpool.cc index a909892..844f69a 100644 --- a/cpp/dbpool.cc +++ b/cpp/dbpool.cc @@ -319,152 +319,152 @@ namespace f8 exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId(); } - void SetThreadNum(int thread_num) - { - assert(thread_num > 0); - for (int i = 0; i < thread_num; i++) { - DBThread *db_thread = new DBThread(); - db_thread->exec_async_query_msgid = exec_async_query_msgid; - db_thread->Init(); - db_thread_pool.push_back(db_thread); - } - } - - AsyncQueryRequest* GetAsyncQueryRequest(long long seqid) - { - auto itr = async_query_hash.find(seqid); - return itr != async_query_hash.end() ? itr->second : nullptr; - } - - void AsyncSqlOnOk(long long seqid, DataSet* data_set) - { - AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); - if (!request) { - return; - } - if (request->on_ok) { - request->on_ok(request->param, data_set); - } - async_query_hash.erase(seqid); - delete request; - } - - void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg) - { - AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); - if (!request) { - return; - } - if (request->on_error) { - request->on_error(request->param, errcode, errmsg); - } - async_query_hash.erase(seqid); - delete request; - } - - void InternalExecAsyncSql(int exec_type, - a8::XObject& conn_info, const char* querystr, std::vector& args, - a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) - { - AsyncQueryRequest* p = new AsyncQueryRequest(); + void SetThreadNum(int thread_num) { - p->context_id = ++curr_seqid; - p->param = param; - p->sql = ""; - p->_sql_fmt = querystr; - p->_sql_params = args; - conn_info.DeepCopy(p->conn_info); - p->add_time = time(nullptr); - p->on_ok = on_ok; - p->on_error = on_error; - async_query_hash[p->context_id] = p; - } - DBThread* db_thread = GetDBThread(hash_code); - if (!db_thread) { - MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, - a8::XParams() - .SetSender(p->context_id) - .SetParam1(AQE_CONN_ERROR)); - return; + assert(thread_num > 0); + for (int i = 0; i < thread_num; i++) { + DBThread *db_thread = new DBThread(); + db_thread->exec_async_query_msgid = exec_async_query_msgid; + db_thread->Init(); + db_thread_pool.push_back(db_thread); + } } + + AsyncQueryRequest* GetAsyncQueryRequest(long long seqid) { - AsyncQueryNode* node = new AsyncQueryNode(); - node->socket_handle = 0; - node->query_type = exec_type; - node->context_id = p->context_id; - node->sql = ""; - node->_sql_fmt = querystr; - node->_sql_params = args; - conn_info.DeepCopy(node->conn_info); - db_thread->AddAsyncQuery(node); + auto itr = async_query_hash.find(seqid); + return itr != async_query_hash.end() ? itr->second : nullptr; } - a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, - a8::XParams() - .SetSender(this) - .SetParam1(p->context_id), - [] (const a8::XParams& param) - { - }, - &p->timer_attacher.timer_list_); - } + void AsyncSqlOnOk(long long seqid, DataSet* data_set) + { + AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); + if (!request) { + return; + } + if (request->on_ok) { + request->on_ok(request->param, data_set); + } + async_query_hash.erase(seqid); + delete request; + } - DBThread* GetDBThread(long long hash_code) + void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg) + { + AsyncQueryRequest* request = GetAsyncQueryRequest(seqid); + if (!request) { + return; + } + if (request->on_error) { + request->on_error(request->param, errcode, errmsg); + } + async_query_hash.erase(seqid); + delete request; + } + + void InternalExecAsyncSql(int exec_type, + a8::XObject& conn_info, const char* querystr, std::vector& args, + a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) + { + AsyncQueryRequest* p = new AsyncQueryRequest(); + { + p->context_id = ++curr_seqid; + p->param = param; + p->sql = ""; + p->_sql_fmt = querystr; + p->_sql_params = args; + conn_info.DeepCopy(p->conn_info); + p->add_time = time(nullptr); + p->on_ok = on_ok; + p->on_error = on_error; + async_query_hash[p->context_id] = p; + } + DBThread* db_thread = GetDBThread(hash_code); + if (!db_thread) { + MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, + a8::XParams() + .SetSender(p->context_id) + .SetParam1(AQE_CONN_ERROR)); + return; + } + { + AsyncQueryNode* node = new AsyncQueryNode(); + node->socket_handle = 0; + node->query_type = exec_type; + node->context_id = p->context_id; + node->sql = ""; + node->_sql_fmt = querystr; + node->_sql_params = args; + conn_info.DeepCopy(node->conn_info); + db_thread->AddAsyncQuery(node); + } + a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, + a8::XParams() + .SetSender(this) + .SetParam1(p->context_id), + [] (const a8::XParams& param) + { + + }, + &p->timer_attacher.timer_list_); + } + + DBThread* GetDBThread(long long hash_code) + { + if (db_thread_pool.empty() || hash_code < 0) { + return nullptr; + } + return db_thread_pool[hash_code % db_thread_pool.size()]; + } + + public: + long long curr_seqid = 0; + std::map async_query_hash; + + unsigned short exec_async_query_msgid = 0; + std::vector db_thread_pool; + }; + + void DBPool::Init() { - if (db_thread_pool.empty() || hash_code < 0) { - return nullptr; - } - return db_thread_pool[hash_code % db_thread_pool.size()]; - } - -public: - long long curr_seqid = 0; - std::map async_query_hash; - - unsigned short exec_async_query_msgid = 0; - std::vector db_thread_pool; -}; - -void DBPool::Init() -{ - impl_ = new DBPoolImpl(); - impl_->Init(); - MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid, - [] (const a8::XParams& param) - { - if (param.param1.GetInt() == AQE_NO_ERROR) { - DataSet* data_set = (DataSet*)param.param2.GetUserData(); - DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set); - delete data_set; - } else { - DBPool::Instance()->impl_->AsyncSqlOnError(param.sender, - param.param1, - param.param2); + impl_ = new DBPoolImpl(); + impl_->Init(); + MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid, + [] (const a8::XParams& param) + { + if (param.param1.GetInt() == AQE_NO_ERROR) { + DataSet* data_set = (DataSet*)param.param2.GetUserData(); + DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set); + delete data_set; + } else { + DBPool::Instance()->impl_->AsyncSqlOnError(param.sender, + param.param1, + param.param2); + } } - } - ); -} + ); + } -void DBPool::UnInit() -{ - delete impl_; - impl_ = nullptr; -} + void DBPool::UnInit() + { + delete impl_; + impl_ = nullptr; + } -void DBPool::SetThreadNum(int thread_num) -{ - impl_->SetThreadNum(thread_num); -} + void DBPool::SetThreadNum(int thread_num) + { + impl_->SetThreadNum(thread_num); + } -void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector args, - a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) -{ - impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code); -} + void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector args, + a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) + { + impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code); + } -void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector args, - a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) -{ - impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code); -} + void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector args, + a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) + { + impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code); + } }