This commit is contained in:
aozhiwei 2018-11-26 21:26:02 +08:00
parent ca5ccaf83c
commit 6f668cf5c5
4 changed files with 400 additions and 387 deletions

View File

@ -15,17 +15,20 @@
#include "framework/cpp/msgqueue.h" #include "framework/cpp/msgqueue.h"
#include "framework/cpp/utils.h" #include "framework/cpp/utils.h"
enum AsyncQueryError namespace f8
{ {
enum AsyncQueryError
{
AQE_NO_ERROR = 0, AQE_NO_ERROR = 0,
AQE_EXEC_TYPE_ERROR = 1, AQE_EXEC_TYPE_ERROR = 1,
AQE_QUERY_TYPE_ERROR = 2, AQE_QUERY_TYPE_ERROR = 2,
AQE_SYNTAX_ERROR = 3, AQE_SYNTAX_ERROR = 3,
AQE_CONN_ERROR = 4 AQE_CONN_ERROR = 4
}; };
struct AsyncQueryRequest struct AsyncQueryRequest
{ {
long long context_id = 0; long long context_id = 0;
std::string sql; std::string sql;
#if 1 #if 1
@ -38,10 +41,10 @@ struct AsyncQueryRequest
AsyncDBOnOkFunc on_ok = nullptr; AsyncDBOnOkFunc on_ok = nullptr;
AsyncDBOnErrorFunc on_error = nullptr; AsyncDBOnErrorFunc on_error = nullptr;
a8::TimerAttacher timer_attacher; a8::TimerAttacher timer_attacher;
}; };
struct AsyncQueryNode struct AsyncQueryNode
{ {
int socket_handle = 0; int socket_handle = 0;
int query_type = 0; int query_type = 0;
long long context_id = 0; long long context_id = 0;
@ -52,11 +55,11 @@ struct AsyncQueryNode
a8::XObject conn_info; a8::XObject conn_info;
#endif #endif
AsyncQueryNode* nextnode = nullptr; AsyncQueryNode* nextnode = nullptr;
}; };
class DBThread class DBThread
{ {
public: public:
void Init() void Init()
{ {
@ -72,7 +75,7 @@ public:
work_thread_ = new std::thread(&DBThread::WorkThreadProc, this); work_thread_ = new std::thread(&DBThread::WorkThreadProc, this);
} }
#if 0 #if 0
void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql) void AddAsyncQuery(int sockhandle, int query_type, long long context_id, const std::string& sql)
{ {
AsyncQueryNode *p = new AsyncQueryNode(); AsyncQueryNode *p = new AsyncQueryNode();
@ -93,7 +96,7 @@ public:
msg_mutex_->unlock(); msg_mutex_->unlock();
loop_cond_->notify_all(); loop_cond_->notify_all();
} }
#endif #endif
void AddAsyncQuery(AsyncQueryNode* p) void AddAsyncQuery(AsyncQueryNode* p)
{ {
@ -110,7 +113,7 @@ public:
loop_cond_->notify_all(); loop_cond_->notify_all();
} }
private: private:
void WorkThreadProc() void WorkThreadProc()
{ {
@ -129,7 +132,7 @@ private:
last_checkdb_tick_ = a8::XGetTickCount(); last_checkdb_tick_ = a8::XGetTickCount();
if (last_conn_ && last_query_) { if (last_conn_ && last_query_) {
if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) { if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) {
#if 0 #if 0
a8::UdpLog::Instance()->Warning("mysql disconnect", {}); a8::UdpLog::Instance()->Warning("mysql disconnect", {});
if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) { if (conn.Connect(dbhost_, 3306, dbuser_, dbpasswd_, gamedb_)) {
InitMysqlConnection(&query); InitMysqlConnection(&query);
@ -137,7 +140,7 @@ private:
} else { } else {
a8::UdpLog::Instance()->Info("mysql reconnect failed", {}); a8::UdpLog::Instance()->Info("mysql reconnect failed", {});
} }
#endif #endif
} }
} }
} }
@ -226,7 +229,7 @@ private:
{ {
int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params); int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params);
if (ret < 0) { if (ret < 0) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams() a8::XParams()
.SetSender(node->context_id) .SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR) .SetParam1(AQE_SYNTAX_ERROR)
@ -243,7 +246,7 @@ private:
} }
last_query_->Next(); last_query_->Next();
} }
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid, f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams() a8::XParams()
.SetSender(node->context_id) .SetSender(node->context_id)
.SetParam1(AQE_NO_ERROR) .SetParam1(AQE_NO_ERROR)
@ -282,10 +285,10 @@ private:
} }
} }
public: public:
int exec_async_query_msgid = 0; int exec_async_query_msgid = 0;
private: private:
std::mutex *loop_mutex_ = nullptr; std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr; std::condition_variable *loop_cond_ = nullptr;
@ -298,11 +301,11 @@ private:
AsyncQueryNode *bot_node_ = nullptr; AsyncQueryNode *bot_node_ = nullptr;
AsyncQueryNode *work_node_ = nullptr; AsyncQueryNode *work_node_ = nullptr;
std::mutex *msg_mutex_ = nullptr; std::mutex *msg_mutex_ = nullptr;
}; };
class DBPoolImpl class DBPoolImpl
{ {
public: public:
void Init() void Init()
{ {
@ -464,3 +467,4 @@ void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::v
{ {
impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code); impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code);
} }
}

View File

@ -1,12 +1,14 @@
#pragma once #pragma once
typedef std::vector<std::vector<std::string>> DataSet; namespace f8
typedef void (*AsyncDBOnOkFunc)(a8::XParams& param, const DataSet* data_set);
typedef void (*AsyncDBOnErrorFunc)(a8::XParams& param, int error_code, const std::string& error_msg);
class DBPoolImpl;
class DBPool : public a8::Singleton<DBPool>
{ {
typedef std::vector<std::vector<std::string>> DataSet;
typedef void (*AsyncDBOnOkFunc)(a8::XParams& param, const DataSet* data_set);
typedef void (*AsyncDBOnErrorFunc)(a8::XParams& param, int error_code, const std::string& error_msg);
class DBPoolImpl;
class DBPool : public a8::Singleton<DBPool>
{
private: private:
DBPool() {}; DBPool() {};
friend class a8::Singleton<DBPool>; friend class a8::Singleton<DBPool>;
@ -25,4 +27,5 @@ class DBPool : public a8::Singleton<DBPool>
private: private:
DBPoolImpl* impl_ = nullptr; DBPoolImpl* impl_ = nullptr;
}; };
}

View File

@ -5,15 +5,17 @@
#include "framework/cpp/msgqueue.h" #include "framework/cpp/msgqueue.h"
#include "app.h" #include "app.h"
struct MsgQueueNode namespace f8
{ {
struct MsgQueueNode
{
struct list_head entry; struct list_head entry;
MsgHandleFunc func; MsgHandleFunc func;
}; };
class MsgQueueImp class MsgQueueImp
{ {
public: public:
int curr_im_msgid = 10000; int curr_im_msgid = 10000;
std::map<int, list_head> msg_handlers; std::map<int, list_head> msg_handlers;
@ -47,70 +49,71 @@ public:
return &node->entry; return &node->entry;
} }
}; };
void MsgQueue::Init() void MsgQueue::Init()
{ {
imp_ = new MsgQueueImp(); imp_ = new MsgQueueImp();
} }
void MsgQueue::UnInit() void MsgQueue::UnInit()
{ {
delete imp_; delete imp_;
imp_ = nullptr; imp_ = nullptr;
} }
void MsgQueue::SendMsg(int msgid, a8::XParams param) void MsgQueue::SendMsg(int msgid, a8::XParams param)
{ {
imp_->ProcessMsg(msgid, param); imp_->ProcessMsg(msgid, param);
} }
void MsgQueue::PostMsg(int msgid, a8::XParams param) void MsgQueue::PostMsg(int msgid, a8::XParams param)
{ {
param._sys_field = msgid; param._sys_field = msgid;
a8::Timer::Instance()->AddDeadLineTimer(0, param, a8::Timer::Instance()->AddDeadLineTimer(0, param,
[] (const a8::XParams& param) [] (const a8::XParams& param)
{ {
MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param); MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param);
}); });
} }
void MsgQueue::AddDelayMsg(int msgid, a8::XParams param, int milli_seconds) void MsgQueue::AddDelayMsg(int msgid, a8::XParams param, int milli_seconds)
{ {
param._sys_field = msgid; param._sys_field = msgid;
a8::Timer::Instance()->AddDeadLineTimer(milli_seconds, param, a8::Timer::Instance()->AddDeadLineTimer(milli_seconds, param,
[] (const a8::XParams& param) [] (const a8::XParams& param)
{ {
MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param); MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param);
}); });
} }
void MsgQueue::RemoveCallBack(CallBackHandle handle) void MsgQueue::RemoveCallBack(CallBackHandle handle)
{ {
list_head* head = handle; list_head* head = handle;
MsgQueueNode* node = list_entry(head, struct MsgQueueNode, entry); MsgQueueNode* node = list_entry(head, struct MsgQueueNode, entry);
list_del_init(&node->entry); list_del_init(&node->entry);
delete node; delete node;
} }
CallBackHandle MsgQueue::RegisterCallBack(int msgid, MsgHandleFunc handle_func) CallBackHandle MsgQueue::RegisterCallBack(int msgid, MsgHandleFunc handle_func)
{ {
return imp_->RegisterCallBack(msgid, handle_func); return imp_->RegisterCallBack(msgid, handle_func);
} }
int MsgQueue::AllocIMMsgId() int MsgQueue::AllocIMMsgId()
{ {
return ++imp_->curr_im_msgid; return ++imp_->curr_im_msgid;
} }
void MsgQueue::ProcessMsg(int msgid, const a8::XParams& param) void MsgQueue::ProcessMsg(int msgid, const a8::XParams& param)
{ {
imp_->ProcessMsg(msgid, param); imp_->ProcessMsg(msgid, param);
} }
void MsgQueue::PostMsg_r(int msgid, a8::XParams param) void MsgQueue::PostMsg_r(int msgid, a8::XParams param)
{ {
a8::XParams* p = new a8::XParams(); a8::XParams* p = new a8::XParams();
param.DeepCopy(*p); param.DeepCopy(*p);
App::Instance()->AddIMMsg(f8::IM_SysMsgQueue, a8::XParams().SetSender(msgid).SetParam1((void*)p)); App::Instance()->AddIMMsg(f8::IM_SysMsgQueue, a8::XParams().SetSender(msgid).SetParam1((void*)p));
}
} }

View File

@ -1,11 +1,13 @@
#pragma once #pragma once
typedef std::function<void (const a8::XParams& param)> MsgHandleFunc; namespace f8
typedef list_head* CallBackHandle;
class MsgQueueImp;
class MsgQueue : public a8::Singleton<MsgQueue>
{ {
typedef std::function<void (const a8::XParams& param)> MsgHandleFunc;
typedef list_head* CallBackHandle;
class MsgQueueImp;
class MsgQueue : public a8::Singleton<MsgQueue>
{
private: private:
MsgQueue() {}; MsgQueue() {};
friend class a8::Singleton<MsgQueue>; friend class a8::Singleton<MsgQueue>;
@ -27,4 +29,5 @@ class MsgQueue : public a8::Singleton<MsgQueue>
private: private:
MsgQueueImp* imp_ = nullptr; MsgQueueImp* imp_ = nullptr;
}; };
}