This commit is contained in:
aozhiwei 2019-02-11 20:16:31 +08:00
parent 3739692d07
commit 1417be83e0
2 changed files with 101 additions and 11 deletions

View File

@ -4,6 +4,8 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <mysql.h>
#include <a8/mysql.h> #include <a8/mysql.h>
#include <a8/list.h> #include <a8/list.h>
#include <a8/timer.h> #include <a8/timer.h>
@ -82,16 +84,6 @@ namespace f8
delete work_thread_; delete work_thread_;
work_thread_ = nullptr; work_thread_ = nullptr;
if (last_query_) {
delete last_query_;
last_query_ = nullptr;
}
if (last_conn_) {
delete last_conn_;
last_conn_ = nullptr;
}
delete msg_mutex_; delete msg_mutex_;
msg_mutex_ = nullptr; msg_mutex_ = nullptr;
@ -121,11 +113,25 @@ namespace f8
void WorkThreadProc() void WorkThreadProc()
{ {
mysql_thread_init();
while (!terminated_) { while (!terminated_) {
CheckDB(); CheckDB();
ProcessMsg(); ProcessMsg();
WaitLoopCond(); WaitLoopCond();
} }
if (last_query_) {
delete last_query_;
last_query_ = nullptr;
}
if (last_conn_) {
printf("FreeConn\n");
delete last_conn_;
last_conn_ = nullptr;
}
mysql_thread_end();
} }
void CheckDB() void CheckDB()
@ -135,6 +141,7 @@ namespace f8
} }
last_checkdb_tick_ = a8::XGetTickCount(); last_checkdb_tick_ = a8::XGetTickCount();
if (last_conn_ && last_query_) { if (last_conn_ && last_query_) {
#if 0
if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) { if (last_query_->ExecQuery("SELECT 1;", {}) <= 0) {
#if 0 #if 0
a8::UdpLog::Instance()->Warning("mysql disconnect", {}); a8::UdpLog::Instance()->Warning("mysql disconnect", {});
@ -146,6 +153,7 @@ namespace f8
} }
#endif #endif
} }
#endif
} }
} }
@ -193,8 +201,11 @@ namespace f8
delete last_conn_; delete last_conn_;
last_conn_ = nullptr; last_conn_ = nullptr;
} }
printf("CreateConn\n");
last_conn_ = new a8::mysql::Connection(); last_conn_ = new a8::mysql::Connection();
#if 0
last_query_ = last_conn_->CreateQuery(); last_query_ = last_conn_->CreateQuery();
#endif
if (last_conn_->Connect( if (last_conn_->Connect(
conn_info.Get("host"), conn_info.Get("host"),
conn_info.Get("port"), conn_info.Get("port"),
@ -202,7 +213,9 @@ namespace f8
conn_info.Get("passwd"), conn_info.Get("passwd"),
conn_info.Get("database") conn_info.Get("database")
)) { )) {
#if 0
f8::InitMysqlConnection(last_query_); f8::InitMysqlConnection(last_query_);
#endif
} }
return true; return true;
} }
@ -228,6 +241,7 @@ namespace f8
if (NeedReCreateConn(node->conn_info)) { if (NeedReCreateConn(node->conn_info)) {
ReCreateConn(node->conn_info); ReCreateConn(node->conn_info);
} }
#if 0
switch (node->query_type) { switch (node->query_type) {
case 0: case 0:
{ {
@ -287,6 +301,7 @@ namespace f8
} }
break; break;
} }
#endif
} }
public: public:
@ -314,7 +329,7 @@ namespace f8
void Init() void Init()
{ {
#if 1 #if 0
/*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了, /*mysql_init()不是完全线程安全的,但是只要成功调用一次就后就线程安全了,
线使mysql_init()mysql_init()qsort() 线使mysql_init()mysql_init()qsort()
*/ */
@ -330,6 +345,12 @@ namespace f8
db_thread->UnInit(); db_thread->UnInit();
delete db_thread; delete db_thread;
} }
for (auto& pair : async_query_hash) {
delete pair.second;
}
async_query_hash.clear();
} }
void SetThreadNum(int thread_num) void SetThreadNum(int thread_num)

View File

@ -6,9 +6,12 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <mysql.h>
#include <a8/redis.h> #include <a8/redis.h>
#include <a8/timer.h> #include <a8/timer.h>
#include <a8/uuid.h> #include <a8/uuid.h>
#include <a8/mutable_xobject.h>
#include "framework/cpp/netmsghandler.h" #include "framework/cpp/netmsghandler.h"
#include "framework/cpp/msgqueue.h" #include "framework/cpp/msgqueue.h"
@ -33,6 +36,37 @@ static void StopApp(int signo)
exit(0); exit(0);
} }
static void Test()
{
static bool already_test = false;
if (already_test) {
return;
}
already_test = true;
a8::MutableXObject* conn_info = a8::MutableXObject::NewObject();
conn_info->SetVal("host", a8::XValue("127.0.0.1"));
conn_info->SetVal("port", a8::XValue(3306));
conn_info->SetVal("user", a8::XValue("root"));
conn_info->SetVal("passwd", a8::XValue("passwd"));
conn_info->SetVal("database", a8::XValue("gamedb1008_6"));
f8::DBPool::Instance()->ExecAsyncQuery(
*conn_info,
"SELECT 1;",
{},
a8::XParams(),
[] (a8::XParams& param, const f8::DataSet* data_set)
{
},
[] (a8::XParams& param, int error_code, const std::string& error_msg)
{
}
,
rand());
delete conn_info;
}
static MYSQL* mysql_init_conn = nullptr;
void App::Init(int argc, char* argv[]) void App::Init(int argc, char* argv[])
{ {
this->argc = argc; this->argc = argc;
@ -54,12 +88,39 @@ void App::Init(int argc, char* argv[])
InitLog(); InitLog();
a8::Timer::Instance()->Init(); a8::Timer::Instance()->Init();
f8::MsgQueue::Instance()->Init(); f8::MsgQueue::Instance()->Init();
#if 0
#if 1
auto ret = mysql_library_init(0, nullptr, nullptr);
assert(ret == 0);
#endif
mysql_thread_init();
mysql_init_conn = mysql_init(nullptr);
#endif
f8::DBPool::Instance()->Init(); f8::DBPool::Instance()->Init();
f8::DBPool::Instance()->SetThreadNum(10); f8::DBPool::Instance()->SetThreadNum(10);
a8::UdpLog::Instance()->Info("f8test starting instance_id:%d pid:%d", {instance_id, getpid()}); a8::UdpLog::Instance()->Info("f8test starting instance_id:%d pid:%d", {instance_id, getpid()});
signal(SIGINT, StopApp); signal(SIGINT, StopApp);
a8::Timer::Instance()->AddRepeatTimer(1000 * 1,
a8::XParams(),
[] (const a8::XParams& param)
{
#if 1
Test();
#endif
}
);
a8::Timer::Instance()->AddDeadLineTimer(1000 * 10,
a8::XParams(),
[] (const a8::XParams& param)
{
printf("Stop App...\n");
App::Instance()->UnInit();
exit(0);
}
);
} }
void App::UnInit() void App::UnInit()
@ -72,6 +133,14 @@ void App::UnInit()
a8::Timer::Instance()->UnInit(); a8::Timer::Instance()->UnInit();
UnInitLog(); UnInitLog();
#if 0
mysql_close(mysql_init_conn);
mysql_thread_end();
#if 0
mysql_library_end();
#endif
#endif
delete im_msg_mutex_; delete im_msg_mutex_;
im_msg_mutex_ = nullptr; im_msg_mutex_ = nullptr;
delete loop_cond_; delete loop_cond_;