This commit is contained in:
aozhiwei 2018-11-24 11:42:53 +08:00
parent 0d0645b296
commit e90d758f5e
2 changed files with 217 additions and 182 deletions

View File

@ -7,6 +7,7 @@
#include <a8/mysql.h> #include <a8/mysql.h>
#include <a8/list.h> #include <a8/list.h>
#include <a8/timer.h> #include <a8/timer.h>
#include <a8/timer_attacher.h>
#include <a8/udplog.h> #include <a8/udplog.h>
#include <a8/mutable_xobject.h> #include <a8/mutable_xobject.h>
@ -25,13 +26,18 @@ enum AsyncQueryError
struct AsyncQueryRequest struct AsyncQueryRequest
{ {
list_head entry;
long long context_id = 0; long long context_id = 0;
std::string sql; std::string sql;
#if 1
std::string _sql_fmt;
std::initializer_list<a8::XValue> _sql_params;
a8::XObject conn_info;
#endif
a8::XParams param; a8::XParams param;
time_t add_time = 0; time_t add_time = 0;
AsyncDBOnOkFunc on_ok = nullptr; AsyncDBOnOkFunc on_ok = nullptr;
AsyncDBOnErrorFunc on_error = nullptr; AsyncDBOnErrorFunc on_error = nullptr;
a8::TimerAttacher timer_attacher;
}; };
struct AsyncQueryNode struct AsyncQueryNode
@ -66,6 +72,7 @@ public:
work_thread_ = new std::thread(&DBThread::WorkThreadProc, this); work_thread_ = new std::thread(&DBThread::WorkThreadProc, this);
} }
#if 0
void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql) void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql)
{ {
AsyncQueryNode *p = new AsyncQueryNode(); AsyncQueryNode *p = new AsyncQueryNode();
@ -86,6 +93,7 @@ public:
msg_mutex_->unlock(); msg_mutex_->unlock();
loop_cond_->notify_all(); loop_cond_->notify_all();
} }
#endif
void AddAsyncQuery(AsyncQueryNode* p) void AddAsyncQuery(AsyncQueryNode* p)
{ {
@ -107,26 +115,20 @@ private:
void WorkThreadProc() void WorkThreadProc()
{ {
while (true) { while (true) {
#if 0 CheckDB();
a8::mysql::Connection conn; ProcessMsg();
a8::mysql::Query* query = conn.CreateQuery();
conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_);
InitMysqlConnection(query);
CheckDB(conn, *query);
ProcessMsg(*query);
#endif
WaitLoopCond(); WaitLoopCond();
} }
} }
void CheckDB(a8::mysql::Connection& conn, a8::mysql::Query& query) void CheckDB()
{ {
if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) { if (a8::XGetTickCount() - last_checkdb_tick_ < 1000 * 60 * 5) {
return; return;
} }
last_checkdb_tick_ = a8::XGetTickCount(); last_checkdb_tick_ = a8::XGetTickCount();
if (query.ExecQuery("SELECT 1;", {}) <= 0) { if (last_conn_ && last_query_) {
if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) {
#if 0 #if 0
a8::UdpLog::Instance()->Warning("mysql disconnect", {}); a8::UdpLog::Instance()->Warning("mysql disconnect", {});
if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) { if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) {
@ -138,8 +140,9 @@ private:
#endif #endif
} }
} }
}
void ProcessMsg(a8::mysql::Query& query) void ProcessMsg()
{ {
if (!work_node_ && top_node_) { if (!work_node_ && top_node_) {
msg_mutex_->lock(); msg_mutex_->lock();
@ -151,7 +154,7 @@ private:
while (work_node_) { while (work_node_) {
AsyncQueryNode *pdelnode = work_node_; AsyncQueryNode *pdelnode = work_node_;
work_node_ = work_node_->nextnode; work_node_ = work_node_->nextnode;
ProcAsyncQuery(query, pdelnode); ProcAsyncQuery(pdelnode);
delete pdelnode; delete pdelnode;
} }
} }
@ -173,29 +176,72 @@ private:
} }
} }
void ProcAsyncQuery(a8::mysql::Query& query, AsyncQueryNode* node) bool ReCreateConn(a8::XObject& conn_info)
{ {
if (last_query_) {
delete last_query_;
last_query_ = nullptr;
}
if (last_conn_) {
delete last_conn_;
last_conn_ = nullptr;
}
last_conn_ = new a8::mysql::Connection();
last_query_ = last_conn_->CreateQuery();
if (last_conn_->Connect(
conn_info.Get("host"),
conn_info.Get("port"),
conn_info.Get("user"),
conn_info.Get("passwd"),
conn_info.Get("database")
)) {
InitMysqlConnection(last_query_);
}
return true;
}
bool NeedReCreateConn(a8::XObject& conn_info)
{
if (!last_conn_) {
return true;
}
if (last_conn_->GetHost() == conn_info.Get("host").GetString() &&
last_conn_->GetPort() == conn_info.Get("port").GetInt() &&
last_conn_->GetUser() == conn_info.Get("user").GetString() &&
last_conn_->GetPasswd() == conn_info.Get("passwd").GetString() &&
last_conn_->GetDataBase() == conn_info.Get("database").GetString()
) {
return false;
}
return true;
}
void ProcAsyncQuery(AsyncQueryNode* node)
{
if (NeedReCreateConn(node->conn_info)) {
ReCreateConn(node->conn_info);
}
switch (node->query_type) { switch (node->query_type) {
case 0: case 0:
{ {
int ret = query.ExecQuery(node->sql.c_str(), {}); int ret = last_query_->ExecQuery(node->sql.c_str(), {});
if (ret < 0) { if (ret < 0) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams() a8::XParams()
.SetSender(node->context_id) .SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR) .SetParam1(AQE_SYNTAX_ERROR)
.SetParam2(query.GetError())); .SetParam2(last_query_->GetError()));
} else { } else {
DataSet* data_set = new DataSet(); DataSet* data_set = new DataSet();
data_set->reserve(query.RowsNum()); data_set->reserve(last_query_->RowsNum());
while (!query.Eof()) { while (!last_query_->Eof()) {
auto& row = a8::FastAppend(*data_set); auto& row = a8::FastAppend(*data_set);
int field_num = query.FieldsNum(); int field_num = last_query_->FieldsNum();
row.reserve(field_num); row.reserve(field_num);
for (int i = 0; i < field_num; i++) { for (int i = 0; i < field_num; i++) {
row.push_back(query.GetValue(i).GetString()); row.push_back(last_query_->GetValue(i).GetString());
} }
query.Next(); last_query_->Next();
} }
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams() a8::XParams()
@ -207,13 +253,13 @@ private:
break; break;
case 1: case 1:
{ {
bool ret = query.ExecScript(node->sql.c_str(), {}); bool ret = last_query_->ExecScript(node->sql.c_str(), {});
if (!ret) { if (!ret) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams() a8::XParams()
.SetSender(node->context_id) .SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR) .SetParam1(AQE_SYNTAX_ERROR)
.SetParam2(query.GetError())); .SetParam2(last_query_->GetError()));
} else { } else {
DataSet* data_set = new DataSet(); DataSet* data_set = new DataSet();
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
@ -238,16 +284,13 @@ private:
public: public:
int exec_async_query_msgid = 0; int exec_async_query_msgid = 0;
private: private:
std::mutex *loop_mutex_ = nullptr; std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr; std::condition_variable *loop_cond_ = nullptr;
#if 0 a8::mysql::Connection* last_conn_ = nullptr;
std::string gamedb_; a8::mysql::Query* last_query_ = nullptr;
std::string dbhost_;
std::string dbuser_;
std::string dbpasswd_;
#endif
long long last_checkdb_tick_ = 0; long long last_checkdb_tick_ = 0;
std::thread *work_thread_ = nullptr; std::thread *work_thread_ = nullptr;
@ -257,28 +300,141 @@ private:
std::mutex *msg_mutex_ = nullptr; std::mutex *msg_mutex_ = nullptr;
}; };
void DBPool::Init() class DBPoolImpl
{
public:
void Init()
{ {
curr_seqid_ = 1000001;
#if 0
INIT_LIST_HEAD(&query_list_);
#endif
#if 1 #if 1
/*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了, /*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了,
线使mysql_init()mysql_init()qsort() 线使mysql_init()mysql_init()qsort()
*/ */
a8::mysql::Connection conn; a8::mysql::Connection conn;
#endif #endif
exec_async_query_msgid_ = MsgQueue::Instance()->AllocIMMsgId(); curr_seqid = 1000001;
MsgQueue::Instance()->RegisterCallBack(exec_async_query_msgid_, exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId();
}
void SetThreadNum(int thread_num)
{
assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) {
DBThread *db_thread = new DBThread();
db_thread->exec_async_query_msgid = exec_async_query_msgid;
db_thread->Init();
db_thread_pool.push_back(db_thread);
}
}
AsyncQueryRequest* GetAsyncQueryRequest(long long seqid)
{
auto itr = async_query_hash.find(seqid);
return itr != async_query_hash.end() ? itr->second : nullptr;
}
void AsyncSqlOnOk(long long seqid, DataSet* data_set)
{
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
if (!request) {
return;
}
if (request->on_ok) {
request->on_ok(request->param, data_set);
}
async_query_hash.erase(seqid);
delete request;
}
void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg)
{
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
if (!request) {
return;
}
if (request->on_error) {
request->on_error(request->param, errcode, errmsg);
}
async_query_hash.erase(seqid);
delete request;
}
void InternalExecAsyncSql(int exec_type,
a8::XObject& conn_info, const char* querystr, std::initializer_list<a8::XValue>& args,
a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{
AsyncQueryRequest* p = new AsyncQueryRequest();
{
p->context_id = ++curr_seqid;
p->param = param;
p->sql = "";
p->_sql_fmt = querystr;
p->_sql_params = args;
conn_info.DeepCopy(p->conn_info);
p->add_time = time(nullptr);
p->on_ok = on_ok;
p->on_error = on_error;
async_query_hash[p->context_id] = p;
}
DBThread* db_thread = GetDBThread(hash_code);
if (!db_thread) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(p->context_id)
.SetParam1(AQE_CONN_ERROR));
return;
}
{
AsyncQueryNode* node = new AsyncQueryNode();
node->socket_handle = 0;
node->query_type = exec_type;
node->context_id = p->context_id;
node->sql = "";
node->_sql_fmt = querystr;
node->_sql_params = args;
conn_info.DeepCopy(node->conn_info);
db_thread->AddAsyncQuery(node);
}
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
a8::XParams()
.SetSender(this)
.SetParam1(p->context_id),
[] (const a8::XParams& param)
{
},
&p->timer_attacher.timer_list_);
}
DBThread* GetDBThread(long long hash_code)
{
if (db_thread_pool.empty() || hash_code < 0) {
return nullptr;
}
return db_thread_pool[hash_code % db_thread_pool.size()];
}
public:
long long curr_seqid = 0;
std::map<long long, AsyncQueryRequest*> async_query_hash;
unsigned short exec_async_query_msgid = 0;
std::vector<DBThread*> db_thread_pool;
};
void DBPool::Init()
{
impl_ = new DBPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid,
[] (const a8::XParams& param) [] (const a8::XParams& param)
{ {
if (param.param1.GetInt() == AQE_NO_ERROR) { if (param.param1.GetInt() == AQE_NO_ERROR) {
DataSet* data_set = (DataSet*)param.param2.GetUserData(); DataSet* data_set = (DataSet*)param.param2.GetUserData();
DBPool::Instance()->AsyncSqlOnOk(param.sender, data_set); DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set);
delete data_set; delete data_set;
} else { } else {
DBPool::Instance()->AsyncSqlOnError(param.sender, DBPool::Instance()->impl_->AsyncSqlOnError(param.sender,
param.param1, param.param1,
param.param2); param.param2);
} }
@ -288,134 +444,23 @@ void DBPool::Init()
void DBPool::UnInit() void DBPool::UnInit()
{ {
delete impl_;
impl_ = nullptr;
} }
void DBPool::SetThreadNum(int thread_num) void DBPool::SetThreadNum(int thread_num)
{ {
assert(thread_num > 0); impl_->SetThreadNum(thread_num);
for (int i = 0; i < thread_num; i++) {
DBThread *db_thread = new DBThread();
db_thread->exec_async_query_msgid = exec_async_query_msgid_;
db_thread->Init();
db_thread_pool_.push_back(db_thread);
}
} }
void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::initializer_list<a8::XValue> args, void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::initializer_list<a8::XValue> args,
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{ {
long long context_id = ++curr_seqid_; impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code);
{
AsyncQueryRequest* p = new AsyncQueryRequest();
p->context_id = context_id;
p->param = param;
p->sql = "";
p->add_time = time(nullptr);
p->on_ok = on_ok;
p->on_error = on_error;
#if 0
list_add_tail(&p->entry, &query_list_);
#endif
async_query_hash_[p->context_id] = p;
}
if (db_thread_pool_.empty()) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid_,
a8::XParams()
.SetSender(context_id)
.SetParam1(AQE_CONN_ERROR));
return;
}
DBThread *db_thread = nullptr;
if (hash_code != 0) {
db_thread = db_thread_pool_[hash_code % db_thread_pool_.size()];
} else {
db_thread = db_thread_pool_[rand() % db_thread_pool_.size()];
}
{
AsyncQueryNode* node = new AsyncQueryNode();
node->socket_handle = 0;
node->query_type = 0;
node->context_id = context_id;
node->sql = "";
node->_sql_fmt = querystr;
node->_sql_params = args;
conn_info.DeepCopy(node->conn_info);
db_thread->AddAsyncQuery(node);
}
} }
void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::initializer_list<a8::XValue> args, void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::initializer_list<a8::XValue> args,
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code) a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
{ {
long long context_id = ++curr_seqid_; impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code);
{
AsyncQueryRequest* p = new AsyncQueryRequest();
p->context_id = context_id;
p->param = param;
p->sql = "";
p->add_time = time(nullptr);
p->on_ok = on_ok;
p->on_error = on_error;
#if 0
list_add_tail(&p->entry, &query_list_);
#endif
async_query_hash_[p->context_id] = p;
}
if (db_thread_pool_.empty()) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid_,
a8::XParams()
.SetSender(context_id)
.SetParam1(AQE_CONN_ERROR));
return;
}
DBThread *db_thread = nullptr;
if (hash_code != 0) {
db_thread = db_thread_pool_[hash_code % db_thread_pool_.size()];
} else {
db_thread = db_thread_pool_[rand() % db_thread_pool_.size()];
}
{
AsyncQueryNode* node = new AsyncQueryNode();
node->socket_handle = 0;
node->query_type = 1;
node->context_id = context_id;
node->sql = "";
node->_sql_fmt = querystr;
node->_sql_params = args;
conn_info.DeepCopy(node->conn_info);
db_thread->AddAsyncQuery(node);
}
}
AsyncQueryRequest* DBPool::GetAsyncQueryRequest(long long seqid)
{
auto itr = async_query_hash_.find(seqid);
return itr != async_query_hash_.end() ? itr->second : nullptr;
}
void DBPool::AsyncSqlOnOk(long long seqid, DataSet* data_set)
{
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
if (!request) {
return;
}
if (request->on_ok) {
request->on_ok(request->param, data_set);
}
async_query_hash_.erase(seqid);
delete request;
}
void DBPool::AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg)
{
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
if (!request) {
return;
}
if (request->on_error) {
request->on_error(request->param, errcode, errmsg);
}
async_query_hash_.erase(seqid);
delete request;
} }

View File

@ -4,8 +4,7 @@ typedef std::vector<std::vector<std::string>> DataSet;
typedef void (*AsyncDBOnOkFunc)(a8::XParams& param, const DataSet* data_set); typedef void (*AsyncDBOnOkFunc)(a8::XParams& param, const DataSet* data_set);
typedef void (*AsyncDBOnErrorFunc)(a8::XParams& param, int error_code, const std::string& error_msg); typedef void (*AsyncDBOnErrorFunc)(a8::XParams& param, int error_code, const std::string& error_msg);
struct AsyncQueryRequest; class DBPoolImpl;
class DBThread;
class DBPool : public a8::Singleton<DBPool> class DBPool : public a8::Singleton<DBPool>
{ {
private: private:
@ -25,14 +24,5 @@ class DBPool : public a8::Singleton<DBPool>
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code); a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code);
private: private:
AsyncQueryRequest* GetAsyncQueryRequest(long long seqid); DBPoolImpl* impl_ = nullptr;
void AsyncSqlOnOk(long long seqid, DataSet* data_set);
void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg);
private:
long long curr_seqid_ = 0;
std::map<long long, AsyncQueryRequest*> async_query_hash_;
unsigned short exec_async_query_msgid_ = 0;
std::vector<DBThread*> db_thread_pool_;
}; };