This commit is contained in:
aozhiwei 2022-12-17 11:00:31 +08:00
parent 5d7153a5e5
commit db10c37c68
4 changed files with 267 additions and 151 deletions

View File

@ -244,11 +244,16 @@ namespace f8
{
int ret = last_query_->ExecQuery(node->_sql_fmt.c_str(), node->_sql_params);
if (ret < 0) {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR)
.SetParam2(last_query_->GetError()));
f8::MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
a8::Args
(
{
node->context_id,
AQE_SYNTAX_ERROR,
last_query_->GetError()
}
));
} else {
DataSet* data_set = new DataSet();
data_set->reserve(last_query_->RowsNum());
@ -261,11 +266,16 @@ namespace f8
}
last_query_->Next();
}
f8::MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_NO_ERROR)
.SetParam2((void*)data_set));
f8::MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
a8::Args
(
{
node->context_id,
AQE_NO_ERROR,
data_set
}
));
}
}
break;
@ -273,28 +283,43 @@ namespace f8
{
bool ret = last_query_->ExecScript(node->_sql_fmt.c_str(), node->_sql_params);
if (!ret) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_SYNTAX_ERROR)
.SetParam2(last_query_->GetError()));
MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
a8::Args
(
{
node->context_id,
AQE_SYNTAX_ERROR,
last_query_->GetError()
}
));
} else {
DataSet* data_set = new DataSet();
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_NO_ERROR)
.SetParam2((void*)data_set));
MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
a8::Args
(
{
node->context_id,
AQE_NO_ERROR,
data_set
}
));
}
}
break;
default:
{
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AQE_QUERY_TYPE_ERROR)
.SetParam2("wrong query type"));
MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
a8::Args
(
{
node->context_id,
AQE_QUERY_TYPE_ERROR,
"wrong query type"
}
));
}
break;
}
@ -325,14 +350,8 @@ namespace f8
void Init()
{
auto free_custom_msg = [] (const a8::XParams& params)
{
a8::XParams* param = (a8::XParams*)params.param1.GetUserData();
DataSet* data_set = (DataSet*)param->param2.GetUserData();
delete data_set;
};
curr_seqid = 1000001;
exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId(free_custom_msg);
exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId();
}
void UnInit()
@ -408,10 +427,15 @@ namespace f8
}
DBThread* db_thread = GetDBThread(hash_code);
if (!db_thread) {
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
a8::XParams()
.SetSender(p->context_id)
.SetParam1(AQE_CONN_ERROR));
MsgQueue::Instance()->PostMsg
(exec_async_query_msgid,
a8::Args
(
{
p->context_id,
AQE_CONN_ERROR
}
));
return;
}
{
@ -425,16 +449,22 @@ namespace f8
conn_info.DeepCopy(node->conn_info);
db_thread->AddAsyncQuery(node);
}
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
a8::Timer::Instance()->AddDeadLineTimerAndAttach
(1000 * 10,
a8::XParams()
.SetSender(p->context_id)
.SetParam1(exec_async_query_msgid),
[] (const a8::XParams& param)
{
MsgQueue::Instance()->PostMsg_r(param.param1,
a8::XParams()
.SetSender(param.sender)
.SetParam1(AQE_CONN_ERROR));
MsgQueue::Instance()->PostMsg
(param.param1.GetInt(),
a8::Args
(
{
param.sender.GetInt(),
AQE_CONN_ERROR
}
));
},
&p->timer_attacher.timer_list_);
++DBPool::Instance()->total_query_num;
@ -470,9 +500,11 @@ namespace f8
{
impl_ = new DBPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid,
[] (const a8::XParams& param)
MsgQueue::Instance()->RegisterCallBack
(impl_->exec_async_query_msgid,
[] (const a8::Args& args)
{
#if 0
if (param.param1.GetInt() == AQE_NO_ERROR) {
DataSet* data_set = (DataSet*)param.param2.GetUserData();
DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set);
@ -482,6 +514,7 @@ namespace f8
param.param1,
param.param2);
}
#endif
}
);
}

View File

@ -224,27 +224,39 @@ namespace f8
if (ret) {
a8::XObject* xobj = new a8::XObject();
if (xobj->ReadFromJsonString(response)) {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_ERROR)
.SetParam2((void*)xobj)
);
f8::MsgQueue::Instance()->PostMsg
(exec_async_http_msgid,
a8::Args
(
{
node->context_id,
AHE_NO_ERROR,
xobj
}
));
} else {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_CONN)
.SetParam2(response)
);
f8::MsgQueue::Instance()->PostMsg
(exec_async_http_msgid,
a8::Args
(
{
node->context_id,
AHE_NO_CONN,
response
}
));
delete xobj;
}
} else {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_CONN)
);
f8::MsgQueue::Instance()->PostMsg
(exec_async_http_msgid,
a8::Args
(
{
node->context_id,
AHE_NO_CONN
}
));
}
}
@ -303,7 +315,7 @@ namespace f8
void Init()
{
curr_seqid = 1000001;
exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId(nullptr);
exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId();
}
void UnInit()
@ -477,10 +489,12 @@ namespace f8
#endif
impl_ = new HttpClientPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_http_msgid,
[] (const a8::XParams& param)
MsgQueue::Instance()->RegisterCallBack
(impl_->exec_async_http_msgid,
[] (const a8::Args& args)
{
--(HttpClientPool::Instance()->impl_->pending_num);
#if 0
if (param.param1.GetInt() == AHE_NO_ERROR) {
a8::XObject* xobj = (a8::XObject*)param.param2.GetUserData();
HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, *xobj);
@ -489,6 +503,7 @@ namespace f8
HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender,
param.param2.GetString());
}
#endif
}
);
impl_->SetThreadNum(thread_num);

View File

@ -1,5 +1,7 @@
#include <assert.h>
#include <mutex>
#include <a8/a8.h>
#include <f8/f8.h>
@ -16,12 +18,28 @@ namespace f8
MsgHandleFunc func;
};
struct IMMsgNode
{
unsigned short msgid;
const a8::Args args;
IMMsgNode* next = nullptr;
IMMsgNode(const a8::Args& args1):args(std::move(args1))
{
}
};
class MsgQueueImp
{
public:
int curr_im_msgid = 10000;
std::map<int, list_head> msg_handlers;
std::map<int, CustomIMMsgFreeFunc*> custom_free_funcs;
std::mutex im_msg_mutex_;
IMMsgNode* im_top_node_ = nullptr;
IMMsgNode* im_bot_node_ = nullptr;
IMMsgNode* im_work_node_ = nullptr;
~MsgQueueImp()
{
@ -32,6 +50,7 @@ namespace f8
}
}
#if 0
void ProcessMsg(int msgid, const a8::XParams& param)
{
auto itr = msg_handlers.find(msgid);
@ -44,6 +63,7 @@ namespace f8
}
}
}
#endif
CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc handle_func)
{
@ -66,38 +86,95 @@ namespace f8
void MsgQueue::Init()
{
imp_ = new MsgQueueImp();
imp_ = std::make_shared<MsgQueueImp>();
}
void MsgQueue::UnInit()
{
delete imp_;
imp_ = nullptr;
#if 0
im_msg_mutex_->lock();
if (!im_work_node_) {
im_work_node_ = im_top_node_;
im_top_node_ = nullptr;
im_bot_node_ = nullptr;
}
while (im_work_node_) {
IMMsgNode* pdelnode = im_work_node_;
im_work_node_ = im_work_node_->next;
if (pdelnode->msgid == f8::IM_SysMsgQueue) {
a8::XParams* param = (a8::XParams*)pdelnode->params.param1.GetUserData();
delete param;
}
delete pdelnode;
if (!im_work_node_) {
im_work_node_ = im_top_node_;
im_top_node_ = nullptr;
im_bot_node_ = nullptr;
}
}
im_msg_mutex_->unlock();
#endif
}
void MsgQueue::SendMsg(int msgid, a8::XParams param)
void MsgQueue::Update()
{
imp_->ProcessMsg(msgid, param);
#if 0
if (!im_work_node_ && im_top_node_) {
im_msg_mutex_->lock();
im_work_node_ = im_top_node_;
im_top_node_ = nullptr;
im_bot_node_ = nullptr;
im_msg_mutex_->unlock();
}
while (im_work_node_) {
IMMsgNode *pdelnode = im_work_node_;
switch (im_work_node_->msgid) {
case f8::IM_SysMsgQueue:
{
const a8::XParams* param = (const a8::XParams*)pdelnode->params.param1.GetUserData();
f8::MsgQueue::Instance()->ProcessMsg(pdelnode->params.sender.GetInt(),
*param
);
delete param;
}
break;
case IM_ClientSocketDisconnect:
{
PlayerMgr::Instance()->OnClientDisconnect(pdelnode->params);
}
break;
case IM_ExecGM:
{
HandlerMgr::Instance()->ProcGMMsg(pdelnode->params.param3,
pdelnode->params.sender,
pdelnode->params.param1.GetString(),
pdelnode->params.param2.GetString()
);
}
break;
}
im_work_node_ = im_work_node_->next;
delete pdelnode;
}
#endif
}
void MsgQueue::PostMsg(int msgid, a8::XParams param)
bool MsgQueue::HasMsg()
{
param._sys_field = msgid;
a8::Timer::Instance()->AddDeadLineTimer(0, param,
[] (const a8::XParams& param)
{
MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param);
});
#if 0
if (!im_work_node_) {
im_msg_mutex_.lock();
if (!im_work_node_ && im_top_node_) {
im_work_node_ = im_top_node_;
im_top_node_ = nullptr;
im_bot_node_ = nullptr;
}
void MsgQueue::AddDelayMsg(int msgid, a8::XParams param, int milli_seconds)
{
param._sys_field = msgid;
a8::Timer::Instance()->AddDeadLineTimer(milli_seconds, param,
[] (const a8::XParams& param)
{
MsgQueue::Instance()->imp_->ProcessMsg(param._sys_field, param);
});
im_msg_mutex_.unlock();
}
if (im_work_node_) {
return true;
}
#endif
}
void MsgQueue::RemoveCallBack(CallBackHandle handle)
@ -113,34 +190,30 @@ namespace f8
return imp_->RegisterCallBack(msgid, handle_func);
}
int MsgQueue::AllocIMMsgId(CustomIMMsgFreeFunc free_func)
int MsgQueue::AllocIMMsgId()
{
int custom_im_msgid = ++imp_->curr_im_msgid;
if (free_func) {
imp_->custom_free_funcs[custom_im_msgid] = free_func;
}
return custom_im_msgid;
}
void MsgQueue::FreeCustomIMMsg(a8::XParams& param)
void MsgQueue::PostMsg(int msgid, const a8::Args args)
{
auto itr = imp_->custom_free_funcs.find(param.sender.GetInt());
if (itr != imp_->custom_free_funcs.end()) {
itr->second(param);
}
}
void MsgQueue::ProcessMsg(int msgid, const a8::XParams& param)
{
imp_->ProcessMsg(msgid, param);
}
void MsgQueue::PostMsg_r(int msgid, a8::XParams param)
{
a8::XParams* p = new a8::XParams();
param.DeepCopy(*p);
#if 0
App::Instance()->AddIMMsg(f8::IM_SysMsgQueue, a8::XParams().SetSender(msgid).SetParam1((void*)p));
IMMsgNode *p = new IMMsgNode;
p->msgid = imcmd;
p->params = params;
p->next = nullptr;
im_msg_mutex_->lock();
if (im_bot_node_) {
im_bot_node_->next = p;
im_bot_node_ = p;
} else {
im_top_node_ = p;
im_bot_node_ = p;
}
im_msg_mutex_->unlock();
NotifyLoopCond();
#endif
}
}

View File

@ -2,11 +2,9 @@
namespace f8
{
typedef std::function<void (const a8::XParams& param)> MsgHandleFunc;
typedef void CustomIMMsgFreeFunc(const a8::XParams& param);
typedef std::function<void (const a8::Args&)> MsgHandleFunc;
typedef list_head* CallBackHandle;
class MsgQueueImp;
class MsgQueue : public a8::Singleton<MsgQueue>
{
private:
@ -16,20 +14,17 @@ namespace f8
public:
void Init();
void UnInit();
void Update();
void SendMsg(int msgid, a8::XParams param);
void PostMsg(int msgid, a8::XParams param);
void AddDelayMsg(int msgid, a8::XParams param, int milli_seconds);
bool HasMsg();
CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc cb);
void RemoveCallBack(CallBackHandle handle);
CallBackHandle RegisterCallBack(int msgid, MsgHandleFunc handle_func);
int AllocIMMsgId(CustomIMMsgFreeFunc free_func);
void FreeCustomIMMsg(a8::XParams& param);
void ProcessMsg(int msgid, const a8::XParams& param);
int AllocIMMsgId();
//线程安全版本
void PostMsg_r(int msgid, a8::XParams param);
void PostMsg(int msgid, const a8::Args args);
private:
MsgQueueImp* imp_ = nullptr;
std::shared_ptr<class MsgQueueImp> imp_;
};
}