代码格式化
This commit is contained in:
parent
fa71782ccc
commit
0891ce5580
272
cpp/dbpool.cc
272
cpp/dbpool.cc
@ -319,152 +319,152 @@ namespace f8
|
|||||||
exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId();
|
exec_async_query_msgid = MsgQueue::Instance()->AllocIMMsgId();
|
||||||
}
|
}
|
||||||
|
|
||||||
void SetThreadNum(int thread_num)
|
void SetThreadNum(int thread_num)
|
||||||
{
|
|
||||||
assert(thread_num > 0);
|
|
||||||
for (int i = 0; i < thread_num; i++) {
|
|
||||||
DBThread *db_thread = new DBThread();
|
|
||||||
db_thread->exec_async_query_msgid = exec_async_query_msgid;
|
|
||||||
db_thread->Init();
|
|
||||||
db_thread_pool.push_back(db_thread);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncQueryRequest* GetAsyncQueryRequest(long long seqid)
|
|
||||||
{
|
|
||||||
auto itr = async_query_hash.find(seqid);
|
|
||||||
return itr != async_query_hash.end() ? itr->second : nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncSqlOnOk(long long seqid, DataSet* data_set)
|
|
||||||
{
|
|
||||||
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
|
|
||||||
if (!request) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (request->on_ok) {
|
|
||||||
request->on_ok(request->param, data_set);
|
|
||||||
}
|
|
||||||
async_query_hash.erase(seqid);
|
|
||||||
delete request;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg)
|
|
||||||
{
|
|
||||||
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
|
|
||||||
if (!request) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (request->on_error) {
|
|
||||||
request->on_error(request->param, errcode, errmsg);
|
|
||||||
}
|
|
||||||
async_query_hash.erase(seqid);
|
|
||||||
delete request;
|
|
||||||
}
|
|
||||||
|
|
||||||
void InternalExecAsyncSql(int exec_type,
|
|
||||||
a8::XObject& conn_info, const char* querystr, std::vector<a8::XValue>& args,
|
|
||||||
a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
|
|
||||||
{
|
|
||||||
AsyncQueryRequest* p = new AsyncQueryRequest();
|
|
||||||
{
|
{
|
||||||
p->context_id = ++curr_seqid;
|
assert(thread_num > 0);
|
||||||
p->param = param;
|
for (int i = 0; i < thread_num; i++) {
|
||||||
p->sql = "";
|
DBThread *db_thread = new DBThread();
|
||||||
p->_sql_fmt = querystr;
|
db_thread->exec_async_query_msgid = exec_async_query_msgid;
|
||||||
p->_sql_params = args;
|
db_thread->Init();
|
||||||
conn_info.DeepCopy(p->conn_info);
|
db_thread_pool.push_back(db_thread);
|
||||||
p->add_time = time(nullptr);
|
}
|
||||||
p->on_ok = on_ok;
|
|
||||||
p->on_error = on_error;
|
|
||||||
async_query_hash[p->context_id] = p;
|
|
||||||
}
|
|
||||||
DBThread* db_thread = GetDBThread(hash_code);
|
|
||||||
if (!db_thread) {
|
|
||||||
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
|
|
||||||
a8::XParams()
|
|
||||||
.SetSender(p->context_id)
|
|
||||||
.SetParam1(AQE_CONN_ERROR));
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AsyncQueryRequest* GetAsyncQueryRequest(long long seqid)
|
||||||
{
|
{
|
||||||
AsyncQueryNode* node = new AsyncQueryNode();
|
auto itr = async_query_hash.find(seqid);
|
||||||
node->socket_handle = 0;
|
return itr != async_query_hash.end() ? itr->second : nullptr;
|
||||||
node->query_type = exec_type;
|
|
||||||
node->context_id = p->context_id;
|
|
||||||
node->sql = "";
|
|
||||||
node->_sql_fmt = querystr;
|
|
||||||
node->_sql_params = args;
|
|
||||||
conn_info.DeepCopy(node->conn_info);
|
|
||||||
db_thread->AddAsyncQuery(node);
|
|
||||||
}
|
}
|
||||||
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
|
|
||||||
a8::XParams()
|
|
||||||
.SetSender(this)
|
|
||||||
.SetParam1(p->context_id),
|
|
||||||
[] (const a8::XParams& param)
|
|
||||||
{
|
|
||||||
|
|
||||||
},
|
void AsyncSqlOnOk(long long seqid, DataSet* data_set)
|
||||||
&p->timer_attacher.timer_list_);
|
{
|
||||||
}
|
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
|
||||||
|
if (!request) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (request->on_ok) {
|
||||||
|
request->on_ok(request->param, data_set);
|
||||||
|
}
|
||||||
|
async_query_hash.erase(seqid);
|
||||||
|
delete request;
|
||||||
|
}
|
||||||
|
|
||||||
DBThread* GetDBThread(long long hash_code)
|
void AsyncSqlOnError(long long seqid, int errcode, const std::string& errmsg)
|
||||||
|
{
|
||||||
|
AsyncQueryRequest* request = GetAsyncQueryRequest(seqid);
|
||||||
|
if (!request) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (request->on_error) {
|
||||||
|
request->on_error(request->param, errcode, errmsg);
|
||||||
|
}
|
||||||
|
async_query_hash.erase(seqid);
|
||||||
|
delete request;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InternalExecAsyncSql(int exec_type,
|
||||||
|
a8::XObject& conn_info, const char* querystr, std::vector<a8::XValue>& args,
|
||||||
|
a8::XParams& param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
|
||||||
|
{
|
||||||
|
AsyncQueryRequest* p = new AsyncQueryRequest();
|
||||||
|
{
|
||||||
|
p->context_id = ++curr_seqid;
|
||||||
|
p->param = param;
|
||||||
|
p->sql = "";
|
||||||
|
p->_sql_fmt = querystr;
|
||||||
|
p->_sql_params = args;
|
||||||
|
conn_info.DeepCopy(p->conn_info);
|
||||||
|
p->add_time = time(nullptr);
|
||||||
|
p->on_ok = on_ok;
|
||||||
|
p->on_error = on_error;
|
||||||
|
async_query_hash[p->context_id] = p;
|
||||||
|
}
|
||||||
|
DBThread* db_thread = GetDBThread(hash_code);
|
||||||
|
if (!db_thread) {
|
||||||
|
MsgQueue::Instance()->PostMsg_r(exec_async_query_msgid,
|
||||||
|
a8::XParams()
|
||||||
|
.SetSender(p->context_id)
|
||||||
|
.SetParam1(AQE_CONN_ERROR));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
AsyncQueryNode* node = new AsyncQueryNode();
|
||||||
|
node->socket_handle = 0;
|
||||||
|
node->query_type = exec_type;
|
||||||
|
node->context_id = p->context_id;
|
||||||
|
node->sql = "";
|
||||||
|
node->_sql_fmt = querystr;
|
||||||
|
node->_sql_params = args;
|
||||||
|
conn_info.DeepCopy(node->conn_info);
|
||||||
|
db_thread->AddAsyncQuery(node);
|
||||||
|
}
|
||||||
|
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
|
||||||
|
a8::XParams()
|
||||||
|
.SetSender(this)
|
||||||
|
.SetParam1(p->context_id),
|
||||||
|
[] (const a8::XParams& param)
|
||||||
|
{
|
||||||
|
|
||||||
|
},
|
||||||
|
&p->timer_attacher.timer_list_);
|
||||||
|
}
|
||||||
|
|
||||||
|
DBThread* GetDBThread(long long hash_code)
|
||||||
|
{
|
||||||
|
if (db_thread_pool.empty() || hash_code < 0) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
return db_thread_pool[hash_code % db_thread_pool.size()];
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
long long curr_seqid = 0;
|
||||||
|
std::map<long long, AsyncQueryRequest*> async_query_hash;
|
||||||
|
|
||||||
|
unsigned short exec_async_query_msgid = 0;
|
||||||
|
std::vector<DBThread*> db_thread_pool;
|
||||||
|
};
|
||||||
|
|
||||||
|
void DBPool::Init()
|
||||||
{
|
{
|
||||||
if (db_thread_pool.empty() || hash_code < 0) {
|
impl_ = new DBPoolImpl();
|
||||||
return nullptr;
|
impl_->Init();
|
||||||
}
|
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid,
|
||||||
return db_thread_pool[hash_code % db_thread_pool.size()];
|
[] (const a8::XParams& param)
|
||||||
}
|
{
|
||||||
|
if (param.param1.GetInt() == AQE_NO_ERROR) {
|
||||||
public:
|
DataSet* data_set = (DataSet*)param.param2.GetUserData();
|
||||||
long long curr_seqid = 0;
|
DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set);
|
||||||
std::map<long long, AsyncQueryRequest*> async_query_hash;
|
delete data_set;
|
||||||
|
} else {
|
||||||
unsigned short exec_async_query_msgid = 0;
|
DBPool::Instance()->impl_->AsyncSqlOnError(param.sender,
|
||||||
std::vector<DBThread*> db_thread_pool;
|
param.param1,
|
||||||
};
|
param.param2);
|
||||||
|
}
|
||||||
void DBPool::Init()
|
|
||||||
{
|
|
||||||
impl_ = new DBPoolImpl();
|
|
||||||
impl_->Init();
|
|
||||||
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_query_msgid,
|
|
||||||
[] (const a8::XParams& param)
|
|
||||||
{
|
|
||||||
if (param.param1.GetInt() == AQE_NO_ERROR) {
|
|
||||||
DataSet* data_set = (DataSet*)param.param2.GetUserData();
|
|
||||||
DBPool::Instance()->impl_->AsyncSqlOnOk(param.sender, data_set);
|
|
||||||
delete data_set;
|
|
||||||
} else {
|
|
||||||
DBPool::Instance()->impl_->AsyncSqlOnError(param.sender,
|
|
||||||
param.param1,
|
|
||||||
param.param2);
|
|
||||||
}
|
}
|
||||||
}
|
);
|
||||||
);
|
}
|
||||||
}
|
|
||||||
|
|
||||||
void DBPool::UnInit()
|
void DBPool::UnInit()
|
||||||
{
|
{
|
||||||
delete impl_;
|
delete impl_;
|
||||||
impl_ = nullptr;
|
impl_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBPool::SetThreadNum(int thread_num)
|
void DBPool::SetThreadNum(int thread_num)
|
||||||
{
|
{
|
||||||
impl_->SetThreadNum(thread_num);
|
impl_->SetThreadNum(thread_num);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector<a8::XValue> args,
|
void DBPool::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector<a8::XValue> args,
|
||||||
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
|
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
|
||||||
{
|
{
|
||||||
impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code);
|
impl_->InternalExecAsyncSql(0, conn_info, querystr, args, param, on_ok, on_error, hash_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector<a8::XValue> args,
|
void DBPool::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector<a8::XValue> args,
|
||||||
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
|
a8::XParams param, AsyncDBOnOkFunc on_ok, AsyncDBOnErrorFunc on_error, long long hash_code)
|
||||||
{
|
{
|
||||||
impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code);
|
impl_->InternalExecAsyncSql(1, conn_info, querystr, args, param, on_ok, on_error, hash_code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user