From 7ca0b598456c392e986fe57fccda7dfca0c5e6ed Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Thu, 1 Nov 2018 13:47:18 +0800 Subject: [PATCH] add dbpool.* --- server/dbproxy/GSListener.cc | 2 +- server/dbproxy/constant.h | 20 +-- server/dbproxy/dbpool.cc | 270 +++++++++++++++++++++++++++++++++++ server/dbproxy/dbpool.h | 29 ++++ third_party/a8engine | 2 +- 5 files changed, 304 insertions(+), 19 deletions(-) create mode 100644 server/dbproxy/dbpool.cc create mode 100644 server/dbproxy/dbpool.h diff --git a/server/dbproxy/GSListener.cc b/server/dbproxy/GSListener.cc index 461c631..d775dc1 100644 --- a/server/dbproxy/GSListener.cc +++ b/server/dbproxy/GSListener.cc @@ -27,7 +27,7 @@ public: if (buflen - offset < sizeof(PackHead) + p->packlen) { break; } - App::Instance()->AddSocketMsg(SF_Client, + App::Instance()->AddSocketMsg(SF_GameServer, socket_handle, saddr, p->msgid, diff --git a/server/dbproxy/constant.h b/server/dbproxy/constant.h index 11552c3..16ebc0a 100644 --- a/server/dbproxy/constant.h +++ b/server/dbproxy/constant.h @@ -2,36 +2,22 @@ enum SocketFrom_e { - SF_Client, - SF_TargetServer, + SF_GameServer, }; enum InnerMesssage_e { IM_ClientSocketDisconnect = 100, - IM_PlayerOffline, IM_ExecGM, - IM_TargetConnDisconnect }; //网络处理对象 enum NetHandler_e { - HID_Player, - HID_PlayerMgr, - HID_RoomSvrMgr, HID_GSListener, + HID_DBPool, }; -enum PlayerState_e -{ - PS_None, - PS_InRoom, - PS_Matching, - PS_WaitingMatch -}; - -const char* const PROJ_NAME = "game1008_wsproxy"; +const char* const PROJ_NAME = "game1008_dbsproxy"; const char* const PROJ_ROOT = "/data/logs/%s"; -const int POSTFIX_LEN = 7; diff --git a/server/dbproxy/dbpool.cc b/server/dbproxy/dbpool.cc new file mode 100644 index 0000000..591ce72 --- /dev/null +++ b/server/dbproxy/dbpool.cc @@ -0,0 +1,270 @@ +#include "precompile.h" +#include +#include +#include +#include +#include +#include "dbpool.h" +#include "GSListener.h" +#include "ss_proto.pb.h" + +enum AsyncQueryError +{ + AQE_NO_ERROR = 0, + AQE_EXEC_TYPE_ERROR = 1, + AQE_QUERY_TYPE_ERROR = 2, + AQE_SYNTAX_ERROR = 3, +}; + +struct AsyncQueryNode +{ + int socket_handle = 0; + int query_type = 0; + long long context_id = 0; + std::string sql; + AsyncQueryNode* nextnode = nullptr; +}; + +class DBThread +{ +public: + + void Init() + { + loop_mutex_ = new std::mutex(); + loop_cond_ = new std::condition_variable(); + + gamedb_ = "gamedb1008_1"; + dbhost_ = "127.0.0.1"; + dbuser_ = "root"; + dbpasswd_ = "keji178"; + last_checkdb_tick_ = a8::XGetTickCount(); + + top_node_ = nullptr; + bot_node_ = nullptr; + work_node_ = nullptr; + msg_mutex_ = new std::mutex(); + work_thread_ = new std::thread(&DBThread::WorkThreadProc, this); + } + + void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql) + { + AsyncQueryNode *p = new AsyncQueryNode(); + p->query_type = query_type; + p->socket_handle = sockhandle; + p->context_id = context_id; + p->sql = sql; + + std::unique_lock lk(*loop_mutex_); + msg_mutex_->lock(); + if (bot_node_) { + bot_node_->nextnode = p; + bot_node_ = p; + } else { + top_node_ = p; + bot_node_ = p; + } + msg_mutex_->unlock(); + loop_cond_->notify_all(); + } + +private: + + void WorkThreadProc() + { + a8::mysql::Connection conn; + a8::mysql::Query* query = conn.CreateQuery(); + conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_); + #if 0 + DumpMysqlInfo(query); + #endif + + while (true) { + CheckDB(conn, *query); + ProcessMsg(*query); + WaitLoopCond(); + } + } + + void CheckDB(a8::mysql::Connection& conn, a8::mysql::Query& query) + { + if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) { + return; + } + last_checkdb_tick_ = a8::XGetTickCount(); + if (query.ExecQuery("SELECT 1;", {}) <= 0) { + a8::UdpLog::Instance()->Warning("mysql disconnect", {}); + if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) { + #if 0 + DumpMysqlInfo(query); + #endif + a8::UdpLog::Instance()->Info("mysql reconnect successed", {}); + } else { + a8::UdpLog::Instance()->Info("mysql reconnect failed", {}); + } + } + } + + void ProcessMsg(a8::mysql::Query& query) + { + if (!work_node_ && top_node_) { + msg_mutex_->lock(); + work_node_ = top_node_; + top_node_ = NULL; + bot_node_ = NULL; + msg_mutex_->unlock(); + } + while (work_node_) { + AsyncQueryNode *pdelnode = work_node_; + work_node_ = work_node_->nextnode; + ProcAsyncQuery(query, pdelnode); + delete pdelnode; + } + } + + void WaitLoopCond() + { + std::unique_lock lk(*loop_mutex_); + { + msg_mutex_->lock(); + if (!work_node_ && top_node_) { + work_node_ = top_node_; + top_node_ = NULL; + bot_node_ = NULL; + } + msg_mutex_->unlock(); + } + if (!work_node_) { + loop_cond_->wait_for(lk, std::chrono::seconds(10)); + } + } + + void ProcAsyncQuery(a8::mysql::Query& query, AsyncQueryNode* node) + { + ss::SS_DPM_ExecAsyncSql respmsg; + respmsg.set_context_id(node->context_id); + respmsg.set_sql(node->sql); + switch (node->query_type) { + case 0: + { + int ret = query.ExecQuery(node->sql.c_str(), {}); + if (ret < 0) { + respmsg.set_error_code(AQE_SYNTAX_ERROR); + respmsg.set_error_msg(query.GetError()); + } else { + respmsg.mutable_data_set()->mutable_rows()->Reserve(query.RowsNum()); + while (!query.Eof()) { + auto row = respmsg.mutable_data_set()->add_rows(); + int field_num = query.FieldsNum(); + row->mutable_values()->Reserve(field_num); + for (int i = 0; i < field_num; i++) { + row->add_values(query.GetValue(i).GetString()); + } + query.Next(); + } + } + GSListener::Instance()->SendMsg(node->socket_handle, respmsg); + } + break; + case 1: + { + bool ret = query.ExecScript(node->sql.c_str(), {}); + if (!ret) { + respmsg.set_error_code(AQE_SYNTAX_ERROR); + respmsg.set_error_msg(query.GetError()); + } else { + } + GSListener::Instance()->SendMsg(node->socket_handle, respmsg); + } + break; + default: + { + respmsg.set_error_code(AQE_QUERY_TYPE_ERROR); + respmsg.set_error_msg("不可识别的query类型"); + GSListener::Instance()->SendMsg(node->socket_handle, respmsg); + } + break; + } + } + +private: + std::mutex *loop_mutex_ = nullptr; + std::condition_variable *loop_cond_ = nullptr; + + std::string gamedb_; + std::string dbhost_; + std::string dbuser_; + std::string dbpasswd_; + long long last_checkdb_tick_ = 0; + + std::thread *work_thread_ = nullptr; + AsyncQueryNode *top_node_ = nullptr; + AsyncQueryNode *bot_node_ = nullptr; + AsyncQueryNode *work_node_ = nullptr; + std::mutex *msg_mutex_ = nullptr; +}; + +void DBPool::Init() +{ + #if 1 + /*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了, + 如果有多线程并发使用mysql_init(),建议在程序初始化时空调一次mysql_init(),他的这点特性很像qsort() + */ + a8::mysql::Connection conn; + #endif + db_single_thread_ = new DBThread(); + db_single_thread_->Init(); + for (int i = 0; i < 10; i++) { + DBThread *db_thread = new DBThread(); + db_thread->Init(); + db_thread_pool_.push_back(db_thread); + } +} + +void DBPool::UnInit() +{ +} + +void DBPool::Update() +{ +} + +void DBPool::_SS_Ping(MsgHdr& hdr, const ss::SS_Ping& msg) +{ + ss::SS_Pong respmsg; + GSListener::Instance()->SendMsg(hdr.socket_handle, respmsg); +} + +void DBPool::_SS_GSM_ExecAsyncSql(MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg) +{ + DBThread *db_thread = db_single_thread_; + switch (msg.exec_type()) { + case 0: + { + db_thread->AddAsyncQuery(hdr.socket_handle, msg.query_type(), msg.context_id(), msg.sql()); + } + break; + case 1: + { + if (db_thread_pool_.size() > 0) { + if (msg.hash_code() != 0) { + db_thread = db_thread_pool_[msg.hash_code() % db_thread_pool_.size()]; + } else { + db_thread = db_thread_pool_[rand() % db_thread_pool_.size()]; + } + } + db_thread->AddAsyncQuery(hdr.socket_handle, msg.query_type(), msg.context_id(), msg.sql()); + } + break; + default: + { + ss::SS_DPM_ExecAsyncSql respmsg; + respmsg.set_context_id(msg.context_id()); + respmsg.set_sql(msg.sql()); + respmsg.set_error_code(AQE_EXEC_TYPE_ERROR); + respmsg.set_error_msg("不可识别的Exec类型"); + GSListener::Instance()->SendMsg(hdr.socket_handle, respmsg); + } + break; + } +} diff --git a/server/dbproxy/dbpool.h b/server/dbproxy/dbpool.h new file mode 100644 index 0000000..17ec458 --- /dev/null +++ b/server/dbproxy/dbpool.h @@ -0,0 +1,29 @@ +#pragma once + +namespace ss +{ + class SS_Ping; + class SS_GSM_ExecAsyncSql; +} + +class DBThread; +class DBPool +{ + public: + enum { HID = HID_DBPool }; + + public: + void Init(); + void UnInit(); + void Update(); + + void _SS_Ping(MsgHdr& hdr, const ss::SS_Ping& msg); + + void _SS_GSM_ExecAsyncSql(MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg); + + private: + + DBThread* db_single_thread_ = nullptr; + std::vector db_thread_pool_; +}; + diff --git a/third_party/a8engine b/third_party/a8engine index 55c5492..5255275 160000 --- a/third_party/a8engine +++ b/third_party/a8engine @@ -1 +1 @@ -Subproject commit 55c5492183c6191dd73be99013b0083be6a1b5be +Subproject commit 52552756404c7047c4438cf11425299c88cb74a7