add dbpool.*
This commit is contained in:
parent
8532cdc6e6
commit
7ca0b59845
@ -27,7 +27,7 @@ public:
|
|||||||
if (buflen - offset < sizeof(PackHead) + p->packlen) {
|
if (buflen - offset < sizeof(PackHead) + p->packlen) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
App::Instance()->AddSocketMsg(SF_Client,
|
App::Instance()->AddSocketMsg(SF_GameServer,
|
||||||
socket_handle,
|
socket_handle,
|
||||||
saddr,
|
saddr,
|
||||||
p->msgid,
|
p->msgid,
|
||||||
|
@ -2,36 +2,22 @@
|
|||||||
|
|
||||||
enum SocketFrom_e
|
enum SocketFrom_e
|
||||||
{
|
{
|
||||||
SF_Client,
|
SF_GameServer,
|
||||||
SF_TargetServer,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
enum InnerMesssage_e
|
enum InnerMesssage_e
|
||||||
{
|
{
|
||||||
IM_ClientSocketDisconnect = 100,
|
IM_ClientSocketDisconnect = 100,
|
||||||
IM_PlayerOffline,
|
|
||||||
IM_ExecGM,
|
IM_ExecGM,
|
||||||
IM_TargetConnDisconnect
|
|
||||||
};
|
};
|
||||||
|
|
||||||
//网络处理对象
|
//网络处理对象
|
||||||
enum NetHandler_e
|
enum NetHandler_e
|
||||||
{
|
{
|
||||||
HID_Player,
|
|
||||||
HID_PlayerMgr,
|
|
||||||
HID_RoomSvrMgr,
|
|
||||||
HID_GSListener,
|
HID_GSListener,
|
||||||
|
HID_DBPool,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum PlayerState_e
|
const char* const PROJ_NAME = "game1008_dbsproxy";
|
||||||
{
|
|
||||||
PS_None,
|
|
||||||
PS_InRoom,
|
|
||||||
PS_Matching,
|
|
||||||
PS_WaitingMatch
|
|
||||||
};
|
|
||||||
|
|
||||||
const char* const PROJ_NAME = "game1008_wsproxy";
|
|
||||||
const char* const PROJ_ROOT = "/data/logs/%s";
|
const char* const PROJ_ROOT = "/data/logs/%s";
|
||||||
|
|
||||||
const int POSTFIX_LEN = 7;
|
|
||||||
|
270
server/dbproxy/dbpool.cc
Normal file
270
server/dbproxy/dbpool.cc
Normal file
@ -0,0 +1,270 @@
|
|||||||
|
#include "precompile.h"
|
||||||
|
#include <thread>
|
||||||
|
#include <chrono>
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <a8/mysql.h>
|
||||||
|
#include "dbpool.h"
|
||||||
|
#include "GSListener.h"
|
||||||
|
#include "ss_proto.pb.h"
|
||||||
|
|
||||||
|
enum AsyncQueryError
|
||||||
|
{
|
||||||
|
AQE_NO_ERROR = 0,
|
||||||
|
AQE_EXEC_TYPE_ERROR = 1,
|
||||||
|
AQE_QUERY_TYPE_ERROR = 2,
|
||||||
|
AQE_SYNTAX_ERROR = 3,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct AsyncQueryNode
|
||||||
|
{
|
||||||
|
int socket_handle = 0;
|
||||||
|
int query_type = 0;
|
||||||
|
long long context_id = 0;
|
||||||
|
std::string sql;
|
||||||
|
AsyncQueryNode* nextnode = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DBThread
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
void Init()
|
||||||
|
{
|
||||||
|
loop_mutex_ = new std::mutex();
|
||||||
|
loop_cond_ = new std::condition_variable();
|
||||||
|
|
||||||
|
gamedb_ = "gamedb1008_1";
|
||||||
|
dbhost_ = "127.0.0.1";
|
||||||
|
dbuser_ = "root";
|
||||||
|
dbpasswd_ = "keji178";
|
||||||
|
last_checkdb_tick_ = a8::XGetTickCount();
|
||||||
|
|
||||||
|
top_node_ = nullptr;
|
||||||
|
bot_node_ = nullptr;
|
||||||
|
work_node_ = nullptr;
|
||||||
|
msg_mutex_ = new std::mutex();
|
||||||
|
work_thread_ = new std::thread(&DBThread::WorkThreadProc, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql)
|
||||||
|
{
|
||||||
|
AsyncQueryNode *p = new AsyncQueryNode();
|
||||||
|
p->query_type = query_type;
|
||||||
|
p->socket_handle = sockhandle;
|
||||||
|
p->context_id = context_id;
|
||||||
|
p->sql = sql;
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> lk(*loop_mutex_);
|
||||||
|
msg_mutex_->lock();
|
||||||
|
if (bot_node_) {
|
||||||
|
bot_node_->nextnode = p;
|
||||||
|
bot_node_ = p;
|
||||||
|
} else {
|
||||||
|
top_node_ = p;
|
||||||
|
bot_node_ = p;
|
||||||
|
}
|
||||||
|
msg_mutex_->unlock();
|
||||||
|
loop_cond_->notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
void WorkThreadProc()
|
||||||
|
{
|
||||||
|
a8::mysql::Connection conn;
|
||||||
|
a8::mysql::Query* query = conn.CreateQuery();
|
||||||
|
conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_);
|
||||||
|
#if 0
|
||||||
|
DumpMysqlInfo(query);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
CheckDB(conn, *query);
|
||||||
|
ProcessMsg(*query);
|
||||||
|
WaitLoopCond();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void CheckDB(a8::mysql::Connection& conn, a8::mysql::Query& query)
|
||||||
|
{
|
||||||
|
if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
last_checkdb_tick_ = a8::XGetTickCount();
|
||||||
|
if (query.ExecQuery("SELECT 1;", {}) <= 0) {
|
||||||
|
a8::UdpLog::Instance()->Warning("mysql disconnect", {});
|
||||||
|
if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) {
|
||||||
|
#if 0
|
||||||
|
DumpMysqlInfo(query);
|
||||||
|
#endif
|
||||||
|
a8::UdpLog::Instance()->Info("mysql reconnect successed", {});
|
||||||
|
} else {
|
||||||
|
a8::UdpLog::Instance()->Info("mysql reconnect failed", {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ProcessMsg(a8::mysql::Query& query)
|
||||||
|
{
|
||||||
|
if (!work_node_ && top_node_) {
|
||||||
|
msg_mutex_->lock();
|
||||||
|
work_node_ = top_node_;
|
||||||
|
top_node_ = NULL;
|
||||||
|
bot_node_ = NULL;
|
||||||
|
msg_mutex_->unlock();
|
||||||
|
}
|
||||||
|
while (work_node_) {
|
||||||
|
AsyncQueryNode *pdelnode = work_node_;
|
||||||
|
work_node_ = work_node_->nextnode;
|
||||||
|
ProcAsyncQuery(query, pdelnode);
|
||||||
|
delete pdelnode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void WaitLoopCond()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(*loop_mutex_);
|
||||||
|
{
|
||||||
|
msg_mutex_->lock();
|
||||||
|
if (!work_node_ && top_node_) {
|
||||||
|
work_node_ = top_node_;
|
||||||
|
top_node_ = NULL;
|
||||||
|
bot_node_ = NULL;
|
||||||
|
}
|
||||||
|
msg_mutex_->unlock();
|
||||||
|
}
|
||||||
|
if (!work_node_) {
|
||||||
|
loop_cond_->wait_for(lk, std::chrono::seconds(10));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ProcAsyncQuery(a8::mysql::Query& query, AsyncQueryNode* node)
|
||||||
|
{
|
||||||
|
ss::SS_DPM_ExecAsyncSql respmsg;
|
||||||
|
respmsg.set_context_id(node->context_id);
|
||||||
|
respmsg.set_sql(node->sql);
|
||||||
|
switch (node->query_type) {
|
||||||
|
case 0:
|
||||||
|
{
|
||||||
|
int ret = query.ExecQuery(node->sql.c_str(), {});
|
||||||
|
if (ret < 0) {
|
||||||
|
respmsg.set_error_code(AQE_SYNTAX_ERROR);
|
||||||
|
respmsg.set_error_msg(query.GetError());
|
||||||
|
} else {
|
||||||
|
respmsg.mutable_data_set()->mutable_rows()->Reserve(query.RowsNum());
|
||||||
|
while (!query.Eof()) {
|
||||||
|
auto row = respmsg.mutable_data_set()->add_rows();
|
||||||
|
int field_num = query.FieldsNum();
|
||||||
|
row->mutable_values()->Reserve(field_num);
|
||||||
|
for (int i = 0; i < field_num; i++) {
|
||||||
|
row->add_values(query.GetValue(i).GetString());
|
||||||
|
}
|
||||||
|
query.Next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GSListener::Instance()->SendMsg(node->socket_handle, respmsg);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
{
|
||||||
|
bool ret = query.ExecScript(node->sql.c_str(), {});
|
||||||
|
if (!ret) {
|
||||||
|
respmsg.set_error_code(AQE_SYNTAX_ERROR);
|
||||||
|
respmsg.set_error_msg(query.GetError());
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
GSListener::Instance()->SendMsg(node->socket_handle, respmsg);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
respmsg.set_error_code(AQE_QUERY_TYPE_ERROR);
|
||||||
|
respmsg.set_error_msg("不可识别的query类型");
|
||||||
|
GSListener::Instance()->SendMsg(node->socket_handle, respmsg);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::mutex *loop_mutex_ = nullptr;
|
||||||
|
std::condition_variable *loop_cond_ = nullptr;
|
||||||
|
|
||||||
|
std::string gamedb_;
|
||||||
|
std::string dbhost_;
|
||||||
|
std::string dbuser_;
|
||||||
|
std::string dbpasswd_;
|
||||||
|
long long last_checkdb_tick_ = 0;
|
||||||
|
|
||||||
|
std::thread *work_thread_ = nullptr;
|
||||||
|
AsyncQueryNode *top_node_ = nullptr;
|
||||||
|
AsyncQueryNode *bot_node_ = nullptr;
|
||||||
|
AsyncQueryNode *work_node_ = nullptr;
|
||||||
|
std::mutex *msg_mutex_ = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
void DBPool::Init()
|
||||||
|
{
|
||||||
|
#if 1
|
||||||
|
/*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了,
|
||||||
|
如果有多线程并发使用mysql_init(),建议在程序初始化时空调一次mysql_init(),他的这点特性很像qsort()
|
||||||
|
*/
|
||||||
|
a8::mysql::Connection conn;
|
||||||
|
#endif
|
||||||
|
db_single_thread_ = new DBThread();
|
||||||
|
db_single_thread_->Init();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
DBThread *db_thread = new DBThread();
|
||||||
|
db_thread->Init();
|
||||||
|
db_thread_pool_.push_back(db_thread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBPool::UnInit()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBPool::Update()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBPool::_SS_Ping(MsgHdr& hdr, const ss::SS_Ping& msg)
|
||||||
|
{
|
||||||
|
ss::SS_Pong respmsg;
|
||||||
|
GSListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBPool::_SS_GSM_ExecAsyncSql(MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg)
|
||||||
|
{
|
||||||
|
DBThread *db_thread = db_single_thread_;
|
||||||
|
switch (msg.exec_type()) {
|
||||||
|
case 0:
|
||||||
|
{
|
||||||
|
db_thread->AddAsyncQuery(hdr.socket_handle, msg.query_type(), msg.context_id(), msg.sql());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
{
|
||||||
|
if (db_thread_pool_.size() > 0) {
|
||||||
|
if (msg.hash_code() != 0) {
|
||||||
|
db_thread = db_thread_pool_[msg.hash_code() % db_thread_pool_.size()];
|
||||||
|
} else {
|
||||||
|
db_thread = db_thread_pool_[rand() % db_thread_pool_.size()];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
db_thread->AddAsyncQuery(hdr.socket_handle, msg.query_type(), msg.context_id(), msg.sql());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
ss::SS_DPM_ExecAsyncSql respmsg;
|
||||||
|
respmsg.set_context_id(msg.context_id());
|
||||||
|
respmsg.set_sql(msg.sql());
|
||||||
|
respmsg.set_error_code(AQE_EXEC_TYPE_ERROR);
|
||||||
|
respmsg.set_error_msg("不可识别的Exec类型");
|
||||||
|
GSListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
29
server/dbproxy/dbpool.h
Normal file
29
server/dbproxy/dbpool.h
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace ss
|
||||||
|
{
|
||||||
|
class SS_Ping;
|
||||||
|
class SS_GSM_ExecAsyncSql;
|
||||||
|
}
|
||||||
|
|
||||||
|
class DBThread;
|
||||||
|
class DBPool
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
enum { HID = HID_DBPool };
|
||||||
|
|
||||||
|
public:
|
||||||
|
void Init();
|
||||||
|
void UnInit();
|
||||||
|
void Update();
|
||||||
|
|
||||||
|
void _SS_Ping(MsgHdr& hdr, const ss::SS_Ping& msg);
|
||||||
|
|
||||||
|
void _SS_GSM_ExecAsyncSql(MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg);
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
DBThread* db_single_thread_ = nullptr;
|
||||||
|
std::vector<DBThread*> db_thread_pool_;
|
||||||
|
};
|
||||||
|
|
2
third_party/a8engine
vendored
2
third_party/a8engine
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 55c5492183c6191dd73be99013b0083be6a1b5be
|
Subproject commit 52552756404c7047c4438cf11425299c88cb74a7
|
Loading…
x
Reference in New Issue
Block a user