dbproxy/server/dbproxy/dbpool.cc
aozhiwei 7e07d0641a 1
2019-01-14 18:53:08 +08:00

275 lines
7.8 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "precompile.h"
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <a8/mysql.h>
#include "dbpool.h"
#include "GSListener.h"
#include "ss_proto.pb.h"
enum AsyncQueryError
{
AQE_NO_ERROR = 0,
AQE_EXEC_TYPE_ERROR = 1,
AQE_QUERY_TYPE_ERROR = 2,
AQE_SYNTAX_ERROR = 3,
};
struct AsyncQueryNode
{
int socket_handle = 0;
int query_type = 0;
long long context_id = 0;
std::string sql;
AsyncQueryNode* nextnode = nullptr;
};
class DBThread
{
public:
void Init()
{
loop_mutex_ = new std::mutex();
loop_cond_ = new std::condition_variable();
gamedb_ = a8::Format("gamedb%d_1", {GAME_ID});
dbhost_ = "127.0.0.1";
dbuser_ = "root";
dbpasswd_ = "keji178";
last_checkdb_tick_ = a8::XGetTickCount();
top_node_ = nullptr;
bot_node_ = nullptr;
work_node_ = nullptr;
msg_mutex_ = new std::mutex();
work_thread_ = new std::thread(&DBThread::WorkThreadProc, this);
}
void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql)
{
AsyncQueryNode *p = new AsyncQueryNode();
p->query_type = query_type;
p->socket_handle = sockhandle;
p->context_id = context_id;
p->sql = sql;
std::unique_lock<std::mutex> lk(*loop_mutex_);
msg_mutex_->lock();
if (bot_node_) {
bot_node_->nextnode = p;
bot_node_ = p;
} else {
top_node_ = p;
bot_node_ = p;
}
msg_mutex_->unlock();
loop_cond_->notify_all();
}
private:
void WorkThreadProc()
{
a8::mysql::Connection conn;
a8::mysql::Query* query = conn.CreateQuery();
conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_);
#if 0
DumpMysqlInfo(query);
#endif
while (true) {
CheckDB(conn, *query);
ProcessMsg(*query);
WaitLoopCond();
}
}
void CheckDB(a8::mysql::Connection& conn, a8::mysql::Query& query)
{
if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) {
return;
}
last_checkdb_tick_ = a8::XGetTickCount();
if (query.ExecQuery("SELECT 1;", {}) <= 0) {
a8::UdpLog::Instance()->Warning("mysql disconnect", {});
if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) {
#if 0
DumpMysqlInfo(query);
#endif
a8::UdpLog::Instance()->Info("mysql reconnect successed", {});
} else {
a8::UdpLog::Instance()->Info("mysql reconnect failed", {});
}
}
}
void ProcessMsg(a8::mysql::Query& query)
{
if (!work_node_ && top_node_) {
msg_mutex_->lock();
work_node_ = top_node_;
top_node_ = NULL;
bot_node_ = NULL;
msg_mutex_->unlock();
}
while (work_node_) {
AsyncQueryNode *pdelnode = work_node_;
work_node_ = work_node_->nextnode;
ProcAsyncQuery(query, pdelnode);
delete pdelnode;
}
}
void WaitLoopCond()
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
{
msg_mutex_->lock();
if (!work_node_ && top_node_) {
work_node_ = top_node_;
top_node_ = NULL;
bot_node_ = NULL;
}
msg_mutex_->unlock();
}
if (!work_node_) {
loop_cond_->wait_for(lk, std::chrono::seconds(10));
}
}
void ProcAsyncQuery(a8::mysql::Query& query, AsyncQueryNode* node)
{
ss::SS_DPM_ExecAsyncSql respmsg;
respmsg.set_context_id(node->context_id);
respmsg.set_sql(node->sql);
switch (node->query_type) {
case 0:
{
int ret = query.ExecQuery(node->sql.c_str(), {});
if (ret < 0) {
respmsg.set_error_code(AQE_SYNTAX_ERROR);
respmsg.set_error_msg(query.GetError());
} else {
respmsg.mutable_data_set()->mutable_rows()->Reserve(query.RowsNum());
while (!query.Eof()) {
auto row = respmsg.mutable_data_set()->add_rows();
int field_num = query.FieldsNum();
row->mutable_values()->Reserve(field_num);
for (int i = 0; i < field_num; i++) {
row->add_values(query.GetValue(i).GetString());
}
query.Next();
}
}
GSListener::Instance()->SendMsg(node->socket_handle, respmsg);
}
break;
case 1:
{
bool ret = query.ExecScript(node->sql.c_str(), {});
if (!ret) {
respmsg.set_error_code(AQE_SYNTAX_ERROR);
respmsg.set_error_msg(query.GetError());
} else {
}
GSListener::Instance()->SendMsg(node->socket_handle, respmsg);
}
break;
default:
{
respmsg.set_error_code(AQE_QUERY_TYPE_ERROR);
respmsg.set_error_msg("不可识别的query类型");
GSListener::Instance()->SendMsg(node->socket_handle, respmsg);
}
break;
}
}
private:
std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr;
std::string gamedb_;
std::string dbhost_;
std::string dbuser_;
std::string dbpasswd_;
long long last_checkdb_tick_ = 0;
std::thread *work_thread_ = nullptr;
AsyncQueryNode *top_node_ = nullptr;
AsyncQueryNode *bot_node_ = nullptr;
AsyncQueryNode *work_node_ = nullptr;
std::mutex *msg_mutex_ = nullptr;
};
void DBPool::Init()
{
#if 0
#if 1
/*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了,
如果有多线程并发使用mysql_init()建议在程序初始化时空调一次mysql_init()他的这点特性很像qsort()
*/
a8::mysql::Connection conn;
#endif
db_single_thread_ = new DBThread();
db_single_thread_->Init();
for (int i = 0; i < 10; i++) {
DBThread *db_thread = new DBThread();
db_thread->Init();
db_thread_pool_.push_back(db_thread);
}
#endif
}
void DBPool::UnInit()
{
}
void DBPool::Update()
{
}
void DBPool::_SS_Ping(f8::MsgHdr& hdr, const ss::SS_Ping& msg)
{
ss::SS_Pong respmsg;
GSListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
}
void DBPool::_SS_GSM_ExecAsyncSql(f8::MsgHdr& hdr, const ss::SS_GSM_ExecAsyncSql& msg)
{
#if 0
DBThread *db_thread = db_single_thread_;
switch (msg.exec_type()) {
case 0:
{
db_thread->AddAsyncQuery(hdr.socket_handle, msg.query_type(), msg.context_id(), msg.sql());
}
break;
case 1:
{
if (db_thread_pool_.size() > 0) {
if (msg.hash_code() != 0) {
db_thread = db_thread_pool_[msg.hash_code() % db_thread_pool_.size()];
} else {
db_thread = db_thread_pool_[rand() % db_thread_pool_.size()];
}
}
db_thread->AddAsyncQuery(hdr.socket_handle, msg.query_type(), msg.context_id(), msg.sql());
}
break;
default:
{
ss::SS_DPM_ExecAsyncSql respmsg;
respmsg.set_context_id(msg.context_id());
respmsg.set_sql(msg.sql());
respmsg.set_error_code(AQE_EXEC_TYPE_ERROR);
respmsg.set_error_msg("不可识别的Exec类型");
GSListener::Instance()->SendMsg(hdr.socket_handle, respmsg);
}
break;
}
#endif
}