f8/cpp/dbpool.cc
2019-02-14 15:40:45 +08:00

498 lines
18 KiB
C++

#include "precompile.h"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <mysql.h>
#include <a8/mysql.h>
#include <a8/list.h>
#include <a8/timer.h>
#include <a8/timer_attacher.h>
#include <a8/udplog.h>
#include <a8/mutable_xobject.h>
#include "framework/cpp/dbpool.h"
#include "framework/cpp/msgqueue.h"
#include "framework/cpp/utils.h"
namespace f8
{
enum AsyncQueryError
{
AQE_NO_ERROR = 0,
AQE_EXEC_TYPE_ERROR = 1,
AQE_QUERY_TYPE_ERROR = 2,
AQE_SYNTAX_ERROR = 3,
AQE_CONN_ERROR = 4
};
struct AsyncQueryRequest
{
long long context_id = 0;
std::string sql;
#if 1
std::string _sql_fmt;
std::vector<a8::XValue> _sql_params;
a8::XObject conn_info;
#endif
a8::XParams param;
time_t add_time = 0;
AsyncDBOnOkFunc on_ok = nullptr;
AsyncDBOnErrorFunc on_error = nullptr;
a8::TimerAttacher timer_attacher;
};
struct AsyncQueryNode
{
int socket_handle = 0;
int query_type = 0;
long long context_id = 0;
std::string sql;
#if 1
std::string _sql_fmt;
std::vector<a8::XValue> _sql_params;
a8::XObject conn_info;
#endif
AsyncQueryNode* nextnode = nullptr;
};
class DBThread
{
public:
void Init()
{
loop_mutex_ = new std::mutex();
loop_cond_ = new std::condition_variable();
last_checkdb_tick_ = a8::XGetTickCount();
top_node_ = nullptr;
bot_node_ = nullptr;
work_node_ = nullptr;
msg_mutex_ = new std::mutex();
work_thread_ = new std::thread(&DBThread::WorkThreadProc, this);
}
void UnInit()
{
terminated_ = true;
work_thread_->join();
delete work_thread_;
work_thread_ = nullptr;
delete msg_mutex_;
msg_mutex_ = nullptr;
delete loop_cond_;
loop_cond_ = nullptr;
delete loop_mutex_;
loop_mutex_ = nullptr;
}
void AddAsyncQuery(AsyncQueryNode* p)
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
msg_mutex_->lock();
if (bot_node_) {
bot_node_->nextnode = p;
bot_node_ = p;
} else {
top_node_ = p;
bot_node_ = p;
}
msg_mutex_->unlock();
loop_cond_->notify_all();
}
private:
void WorkThreadProc()
{
mysql_thread_init();
while (!terminated_) {
CheckDB();
ProcessMsg();
WaitLoopCond();
}
if (last_query_) {
delete last_query_;
last_query_ = nullptr;
}
if (last_conn_) {
delete last_conn_;
last_conn_ = nullptr;
}
mysql_thread_end();
}
void CheckDB()
{
if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) {
return;
}
last_checkdb_tick_ = a8::XGetTickCount();
if (last_conn_ && last_query_) {
if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) {
#if 0
a8::UdpLog::Instance()->Warning("mysql disconnect", {});
if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) {
InitMysqlConnection(&query);
a8::UdpLog::Instance()->Info("mysql reconnect successed", {});
} else {
a8::UdpLog::Instance()->Info("mysql reconnect failed", {});
}
#endif
}
}
}
void ProcessMsg()
{
if (!work_node_ && top_node_) {
msg_mutex_->lock();
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
msg_mutex_->unlock();
}
while (work_node_) {
AsyncQueryNode *pdelnode = work_node_;
work_node_ = work_node_->nextnode;
ProcAsyncQuery(pdelnode);
delete pdelnode;
}
}
void WaitLoopCond()
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
{
msg_mutex_->lock();
if (!work_node_ && top_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
msg_mutex_->unlock();
}
if (!work_node_) {
loop_cond_->wait_for(lk, std::chrono::seconds(10));
}
}
bool ReCreateConn(a8::XObject& conn_info)
{
if (last_query_) {
delete last_query_;
last_query_ = nullptr;
}
if (last_conn_) {
delete last_conn_;
last_conn_ = nullptr;
}
last_conn_ = new a8::mysql::Connection();
last_query_ = last_conn_->CreateQuery();
if (last_conn_->Connect(
conn_info.Get("host"),
conn_info.Get("port"),
conn_info.Get("user"),
conn_info.Get("passwd"),
conn_info.Get("database")
)) {
f8::InitMysqlConnection(last_query_);
}
return true;
}
bool NeedReCreateConn(a8::XObject& conn_info)
{
if (!last_conn_) {
return true;
}
if (last_conn_->GetHost() == conn_info.Get("host").GetString() &&
last_conn_->GetPort() == conn_info.Get("port").GetInt() &&
last_conn_->GetUser() == conn_info.Get("user").GetString() &&
last_conn_->GetPasswd() == conn_info.Get("passwd").GetString() &&
last_conn_->GetDataBase() == conn_info.Get("database").GetString()
) {
return false;
}
return true;
}
void ProcAsyncQuery(AsyncQueryNode* node)
{
if (NeedReCreateConn(node->conn_info)) {
ReCreateConn(node->conn_info);
}
switch (node->query_type) {
case 0:
{
int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params);
if (ret < 0) {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR)
.SetParam2(last_query_->GetError()));
} else {
DataSet* data_set = new DataSet();
data_set->reserve(last_query_->RowsNum());
while (!last_query_->Eof()) {
auto& row = a8::FastAppend(*data_set);
int field_num = last_query_->FieldsNum();
row.reserve(field_num);
for (int i = 0; i < field_num; i++) {
row.push_back(last_query_->GetValue(i).GetString());
}
last_query_->Next();
}
f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_NO_ERROR)
.SetParam2((void*)data_set));
}
}
break;
case 1:
{
bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params);
if (!ret) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR)
.SetParam2(last_query_->GetError()));
} else {
DataSet* data_set = new DataSet();
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_NO_ERROR)
.SetParam2((void*)data_set));
}
}
break;
default:
{
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_QUERY_TYPE_ERROR)
.SetParam2("不可识别的query类型"));
}
break;
}
}
public:
int exec_async_query_msgid = 0;
private:
volatile bool terminated_ = false;
std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr;
a8::mysql::Connection* last_conn_ = nullptr;
a8::mysql::Query* last_query_ = nullptr;
long long last_checkdb_tick_ = 0;
std::thread *work_thread_ = nullptr;
AsyncQueryNode *top_node_ = nullptr;
AsyncQueryNode *bot_node_ = nullptr;
AsyncQueryNode *work_node_ = nullptr;
std::mutex *msg_mutex_ = nullptr;
};
class DBPoolImpl
{
public:
void Init()
{
curr_seqid = 1000001;
exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId();
}
void UnInit()
{
for (auto& db_thread : db_thread_pool) {
db_thread->UnInit();
delete db_thread;
}
for (auto& pair : async_query_hash) {
delete pair.second;
}
async_query_hash.clear();
}
void SetThreadNum(int thread_num)
{
assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) {
DBThread *db_thread = new DBThread();
db_thread->exec_async_query_msgid = exec_async_query_msgid;
db_thread->Init();
db_thread_pool.push_back(db_thread);
}
}
void AsyncSqlOnOk(long long seqid, DataSet* data_set)
{
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
if (!request) {
return;
}
if (request->on_ok) {
request->on_ok(request->param, data_set);
}
async_query_hash.erase(seqid);
delete request;
}
void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg)
{
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
if (!request) {
return;
}
if (request->on_error) {
request->on_error(request->param, errcode, errmsg);
}
async_query_hash.erase(seqid);
delete request;
}
void InternalExecAsyncSql(int exec_type,
a8::XObject& conn_info, const char* querystr, std::vector<a8::XValue>& args,
a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{
AsyncQueryRequest* p = new AsyncQueryRequest();
{
p->context_id = ++curr_seqid;
p->param = param;
p->sql = "";
p->_sql_fmt = querystr;
p->_sql_params = args;
conn_info.DeepCopy(p->conn_info);
p->add_time = time(nullptr);
p->on_ok = on_ok;
p->on_error = on_error;
async_query_hash[p->context_id] = p;
}
DBThread* db_thread = GetDBThread(hash_code);
if (!db_thread) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(p->context_id)
.SetParam1(AQE_CONN_ERROR));
return;
}
{
AsyncQueryNode* node = new AsyncQueryNode();
node->socket_handle = 0;
node->query_type = exec_type;
node->context_id = p->context_id;
node->sql = "";
node->_sql_fmt = querystr;
node->_sql_params = args;
conn_info.DeepCopy(node->conn_info);
db_thread->AddAsyncQuery(node);
}
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
a8::XParams()
.SetSender(p->context_id)
.SetParam1(exec_async_query_msgid),
[] (const a8::XParams& param)
{
MsgQueue::Instance()->PostMsg_r(param.param1,
a8::XParams()
.SetSender(param.sender)
.SetParam1(AQE_CONN_ERROR));
},
&p->timer_attacher.timer_list_);
}
private:
AsyncQueryRequest* GetAsyncQueryRequest(long long seqid)
{
auto itr = async_query_hash.find(seqid);
return itr != async_query_hash.end() ? itr->second : nullptr;
}
DBThread* GetDBThread(long long hash_code)
{
if (db_thread_pool.empty() || hash_code < 0) {
return nullptr;
}
return db_thread_pool[hash_code % db_thread_pool.size()];
}
public:
unsigned short exec_async_query_msgid = 0;
private:
long long curr_seqid = 0;
std::map<long long, AsyncQueryRequest*> async_query_hash;
std::vector<DBThread*> db_thread_pool;
};
void DBPool::Init()
{
impl_ = new DBPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid,
[] (const a8::XParams& param)
{
if (param.param1.GetInt() == AQE_NO_ERROR) {
DataSet* data_set = (DataSet*)param.param2.GetUserData();
DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set);
delete data_set;
} else {
DBPool::Instance()->impl_->AsyncSqlOnError(param.sender,
param.param1,
param.param2);
}
}
);
}
void DBPool::UnInit()
{
impl_->UnInit();
delete impl_;
impl_ = nullptr;
}
void DBPool::SetThreadNum(int thread_num)
{
impl_->SetThreadNum(thread_num);
}
void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector<a8::XValue> args,
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{
impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code);
}
void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector<a8::XValue> args,
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{
impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code);
}
}