f8/cpp/dbpool.cc
aozhiwei 6f668cf5c5 1
2018-11-26 21:26:02 +08:00

471 lines
16 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "precompile.h"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <a8/mysql.h>
#include <a8/list.h>
#include <a8/timer.h>
#include <a8/timer_attacher.h>
#include <a8/udplog.h>
#include <a8/mutable_xobject.h>
#include "framework/cpp/dbpool.h"
#include "framework/cpp/msgqueue.h"
#include "framework/cpp/utils.h"
namespace f8
{
enum AsyncQueryError
{
AQE_NO_ERROR = 0,
AQE_EXEC_TYPE_ERROR = 1,
AQE_QUERY_TYPE_ERROR = 2,
AQE_SYNTAX_ERROR = 3,
AQE_CONN_ERROR = 4
};
struct AsyncQueryRequest
{
long long context_id = 0;
std::string sql;
#if 1
std::string _sql_fmt;
std::vector<a8::XValue> _sql_params;
a8::XObject conn_info;
#endif
a8::XParams param;
time_t add_time = 0;
AsyncDBOnOkFunc on_ok = nullptr;
AsyncDBOnErrorFunc on_error = nullptr;
a8::TimerAttacher timer_attacher;
};
struct AsyncQueryNode
{
int socket_handle = 0;
int query_type = 0;
long long context_id = 0;
std::string sql;
#if 1
std::string _sql_fmt;
std::vector<a8::XValue> _sql_params;
a8::XObject conn_info;
#endif
AsyncQueryNode* nextnode = nullptr;
};
class DBThread
{
public:
void Init()
{
loop_mutex_ = new std::mutex();
loop_cond_ = new std::condition_variable();
last_checkdb_tick_ = a8::XGetTickCount();
top_node_ = nullptr;
bot_node_ = nullptr;
work_node_ = nullptr;
msg_mutex_ = new std::mutex();
work_thread_ = new std::thread(&DBThread::WorkThreadProc, this);
}
#if 0
void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql)
{
AsyncQueryNode *p = new AsyncQueryNode();
p->query_type = query_type;
p->socket_handle = sockhandle;
p->context_id = context_id;
p->sql = sql;
std::unique_lock<std::mutex> lk(*loop_mutex_);
msg_mutex_->lock();
if (bot_node_) {
bot_node_->nextnode = p;
bot_node_ = p;
} else {
top_node_ = p;
bot_node_ = p;
}
msg_mutex_->unlock();
loop_cond_->notify_all();
}
#endif
void AddAsyncQuery(AsyncQueryNode* p)
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
msg_mutex_->lock();
if (bot_node_) {
bot_node_->nextnode = p;
bot_node_ = p;
} else {
top_node_ = p;
bot_node_ = p;
}
msg_mutex_->unlock();
loop_cond_->notify_all();
}
private:
void WorkThreadProc()
{
while (true) {
CheckDB();
ProcessMsg();
WaitLoopCond();
}
}
void CheckDB()
{
if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) {
return;
}
last_checkdb_tick_ = a8::XGetTickCount();
if (last_conn_ && last_query_) {
if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) {
#if 0
a8::UdpLog::Instance()->Warning("mysql disconnect", {});
if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) {
InitMysqlConnection(&query);
a8::UdpLog::Instance()->Info("mysql reconnect successed", {});
} else {
a8::UdpLog::Instance()->Info("mysql reconnect failed", {});
}
#endif
}
}
}
void ProcessMsg()
{
if (!work_node_ && top_node_) {
msg_mutex_->lock();
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
msg_mutex_->unlock();
}
while (work_node_) {
AsyncQueryNode *pdelnode = work_node_;
work_node_ = work_node_->nextnode;
ProcAsyncQuery(pdelnode);
delete pdelnode;
}
}
void WaitLoopCond()
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
{
msg_mutex_->lock();
if (!work_node_ && top_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
msg_mutex_->unlock();
}
if (!work_node_) {
loop_cond_->wait_for(lk, std::chrono::seconds(10));
}
}
bool ReCreateConn(a8::XObject& conn_info)
{
if (last_query_) {
delete last_query_;
last_query_ = nullptr;
}
if (last_conn_) {
delete last_conn_;
last_conn_ = nullptr;
}
last_conn_ = new a8::mysql::Connection();
last_query_ = last_conn_->CreateQuery();
if (last_conn_->Connect(
conn_info.Get("host"),
conn_info.Get("port"),
conn_info.Get("user"),
conn_info.Get("passwd"),
conn_info.Get("database")
)) {
f8::InitMysqlConnection(last_query_);
}
return true;
}
bool NeedReCreateConn(a8::XObject& conn_info)
{
if (!last_conn_) {
return true;
}
if (last_conn_->GetHost() == conn_info.Get("host").GetString() &&
last_conn_->GetPort() == conn_info.Get("port").GetInt() &&
last_conn_->GetUser() == conn_info.Get("user").GetString() &&
last_conn_->GetPasswd() == conn_info.Get("passwd").GetString() &&
last_conn_->GetDataBase() == conn_info.Get("database").GetString()
) {
return false;
}
return true;
}
void ProcAsyncQuery(AsyncQueryNode* node)
{
if (NeedReCreateConn(node->conn_info)) {
ReCreateConn(node->conn_info);
}
switch (node->query_type) {
case 0:
{
int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params);
if (ret < 0) {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR)
.SetParam2(last_query_->GetError()));
} else {
DataSet* data_set = new DataSet();
data_set->reserve(last_query_->RowsNum());
while (!last_query_->Eof()) {
auto& row = a8::FastAppend(*data_set);
int field_num = last_query_->FieldsNum();
row.reserve(field_num);
for (int i = 0; i < field_num; i++) {
row.push_back(last_query_->GetValue(i).GetString());
}
last_query_->Next();
}
f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_NO_ERROR)
.SetParam2((void*)data_set));
}
}
break;
case 1:
{
bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params);
if (!ret) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR)
.SetParam2(last_query_->GetError()));
} else {
DataSet* data_set = new DataSet();
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_NO_ERROR)
.SetParam2((void*)data_set));
}
}
break;
default:
{
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_QUERY_TYPE_ERROR)
.SetParam2("不可识别的query类型"));
}
break;
}
}
public:
int exec_async_query_msgid = 0;
private:
std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr;
a8::mysql::Connection* last_conn_ = nullptr;
a8::mysql::Query* last_query_ = nullptr;
long long last_checkdb_tick_ = 0;
std::thread *work_thread_ = nullptr;
AsyncQueryNode *top_node_ = nullptr;
AsyncQueryNode *bot_node_ = nullptr;
AsyncQueryNode *work_node_ = nullptr;
std::mutex *msg_mutex_ = nullptr;
};
class DBPoolImpl
{
public:
void Init()
{
#if 1
/*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了,
如果有多线程并发使用mysql_init()建议在程序初始化时空调一次mysql_init()他的这点特性很像qsort()
*/
a8::mysql::Connection conn;
#endif
curr_seqid = 1000001;
exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId();
}
void SetThreadNum(int thread_num)
{
assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) {
DBThread *db_thread = new DBThread();
db_thread->exec_async_query_msgid = exec_async_query_msgid;
db_thread->Init();
db_thread_pool.push_back(db_thread);
}
}
AsyncQueryRequest* GetAsyncQueryRequest(long long seqid)
{
auto itr = async_query_hash.find(seqid);
return itr != async_query_hash.end() ? itr->second : nullptr;
}
void AsyncSqlOnOk(long long seqid, DataSet* data_set)
{
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
if (!request) {
return;
}
if (request->on_ok) {
request->on_ok(request->param, data_set);
}
async_query_hash.erase(seqid);
delete request;
}
void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg)
{
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
if (!request) {
return;
}
if (request->on_error) {
request->on_error(request->param, errcode, errmsg);
}
async_query_hash.erase(seqid);
delete request;
}
void InternalExecAsyncSql(int exec_type,
a8::XObject& conn_info, const char* querystr, std::vector<a8::XValue>& args,
a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{
AsyncQueryRequest* p = new AsyncQueryRequest();
{
p->context_id = ++curr_seqid;
p->param = param;
p->sql = "";
p->_sql_fmt = querystr;
p->_sql_params = args;
conn_info.DeepCopy(p->conn_info);
p->add_time = time(nullptr);
p->on_ok = on_ok;
p->on_error = on_error;
async_query_hash[p->context_id] = p;
}
DBThread* db_thread = GetDBThread(hash_code);
if (!db_thread) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(p->context_id)
.SetParam1(AQE_CONN_ERROR));
return;
}
{
AsyncQueryNode* node = new AsyncQueryNode();
node->socket_handle = 0;
node->query_type = exec_type;
node->context_id = p->context_id;
node->sql = "";
node->_sql_fmt = querystr;
node->_sql_params = args;
conn_info.DeepCopy(node->conn_info);
db_thread->AddAsyncQuery(node);
}
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
a8::XParams()
.SetSender(this)
.SetParam1(p->context_id),
[] (const a8::XParams& param)
{
},
&p->timer_attacher.timer_list_);
}
DBThread* GetDBThread(long long hash_code)
{
if (db_thread_pool.empty() || hash_code < 0) {
return nullptr;
}
return db_thread_pool[hash_code % db_thread_pool.size()];
}
public:
long long curr_seqid = 0;
std::map<long long, AsyncQueryRequest*> async_query_hash;
unsigned short exec_async_query_msgid = 0;
std::vector<DBThread*> db_thread_pool;
};
void DBPool::Init()
{
impl_ = new DBPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid,
[] (const a8::XParams& param)
{
if (param.param1.GetInt() == AQE_NO_ERROR) {
DataSet* data_set = (DataSet*)param.param2.GetUserData();
DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set);
delete data_set;
} else {
DBPool::Instance()->impl_->AsyncSqlOnError(param.sender,
param.param1,
param.param2);
}
}
);
}
void DBPool::UnInit()
{
delete impl_;
impl_ = nullptr;
}
void DBPool::SetThreadNum(int thread_num)
{
impl_->SetThreadNum(thread_num);
}
void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector<a8::XValue> args,
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{
impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code);
}
void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector<a8::XValue> args,
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{
impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code);
}
}