From 1cdff642b61184bdfefa974edbef5fd93436f2ca Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Sat, 19 Sep 2020 22:25:46 +0800 Subject: [PATCH] 1 --- server/rankserver/dbengine.cc | 194 +++++++++++++++++++++++++++++++ server/rankserver/dbengine.h | 53 +++++++++ server/rankserver/jsondatamgr.cc | 12 +- server/rankserver/jsondatamgr.h | 2 + 4 files changed, 260 insertions(+), 1 deletion(-) create mode 100644 server/rankserver/dbengine.cc create mode 100644 server/rankserver/dbengine.h diff --git a/server/rankserver/dbengine.cc b/server/rankserver/dbengine.cc new file mode 100644 index 0000000..042f6b3 --- /dev/null +++ b/server/rankserver/dbengine.cc @@ -0,0 +1,194 @@ +#include "precompile.h" + +#include +#include +#include + +#include "dbengine.h" +#include "jsondatamgr.h" + +static void OnDBError(a8::XParams& param, int error_code, const std::string& error_msg) +{ +#ifdef DEBUG + a8::UdpLog::Instance()->Debug + ( + "on dberror %s", + { + error_msg + } + ); +#endif + a8::UdpLog::Instance()->Warning + ( + "on dberror %s", + { + error_msg + } + ); +} + +void DBEngine::Init() +{ + f8::DBPool::Instance()->Init(); + f8::DBPool::Instance()->SetThreadNum(16); + f8::DBPool::Instance()->on_dberror = OnDBError; + a8::Timer::Instance()->AddRepeatTimer + ( + 1000 * 3 + (rand() % 3000), + a8::XParams(), + [] (const a8::XParams& param) + { + DBEngine::Instance()->RefreshIdx(); + } + ); +} + +void DBEngine::UnInit() +{ + f8::DBPool::Instance()->UnInit(); +} + +void DBEngine::ExecAsyncQuery(a8::XObject conn_info, + const char* querystr, + std::vector args, + a8::XParams param, + f8::AsyncDBOnOkFunc on_ok, + f8::AsyncDBOnErrorFunc on_error, + long long hash_code) +{ + f8::DBPool::Instance()->ExecAsyncQuery( + conn_info, + querystr, + args, + param, + on_ok, + on_error, + hash_code + ); +} + +void DBEngine::ExecAsyncScript(a8::XObject conn_info, + const char* querystr, + std::vector args, + a8::XParams param, + f8::AsyncDBOnOkFunc on_ok, + f8::AsyncDBOnErrorFunc on_error, + long long hash_code) +{ + f8::DBPool::Instance()->ExecAsyncScript( + conn_info, + querystr, + args, + param, + on_ok, + on_error, + hash_code + ); +} + +a8::XObject DBEngine::GetConnInfo(const std::string& data) +{ + return GetConnInfo(a8::openssl::Crc32((unsigned char*)data.data(), data.size())); +} + +a8::XObject DBEngine::GetConnInfo(long long data) +{ + std::shared_ptr mysql_cluster_conf = + JsonDataMgr::Instance()->GetMysqlClusterConf(); + if (mysql_cluster_conf->Size() <= 0) { + abort(); + } + a8::XObject conn_info = *(mysql_cluster_conf->At(data % mysql_cluster_conf->Size())); + return conn_info; +} + +void DBEngine::UpdateFriendApplyIdx(long long crc32_code, long long new_idx) +{ + if (GetFriendApplyCurrIdx(crc32_code) < new_idx) { + friend_apply_idx_hash_[GetInstanceId(crc32_code)] = new_idx; + } +} + +long long DBEngine::GetFriendApplyCurrIdx(long long crc32_code) +{ + auto itr = friend_apply_idx_hash_.find(GetInstanceId(crc32_code)); + return itr != friend_apply_idx_hash_.end() ? itr->second : 0; +} + +void DBEngine::UpdateGroupApplyIdx(long long crc32_code, long long new_idx) +{ + if (GetGroupApplyCurrIdx(crc32_code) < new_idx) { + group_apply_idx_hash_[GetInstanceId(crc32_code)] = new_idx; + } +} + +long long DBEngine::GetGroupApplyCurrIdx(long long crc32_code) +{ + auto itr = group_apply_idx_hash_.find(GetInstanceId(crc32_code)); + return itr != group_apply_idx_hash_.end() ? itr->second : 0; +} + +long long DBEngine::GetEventCurrIdx(long long crc32_code) +{ + auto itr = event_idx_hash_.find(GetInstanceId(crc32_code)); + return itr != event_idx_hash_.end() ? itr->second : 0; +} + +void DBEngine::UpdateEventIdx(long long crc32_code, long long new_idx) +{ + if (GetEventCurrIdx(GetInstanceId(crc32_code)) < new_idx) { + event_idx_hash_[GetInstanceId(crc32_code)] = new_idx; + } +} + +int DBEngine::GetInstanceId(long long crc32_code) +{ + std::shared_ptr mysql_cluster_conf = + JsonDataMgr::Instance()->GetMysqlClusterConf(); + if (mysql_cluster_conf->Size() <= 0) { + abort(); + } + return (crc32_code % mysql_cluster_conf->Size()) + 1; +} + +void DBEngine::RefreshIdx() +{ + std::shared_ptr mysql_cluster_conf = + JsonDataMgr::Instance()->GetMysqlClusterConf(); + if (mysql_cluster_conf->Size() <= 0) { + abort(); + } + + auto on_ok = + [] (a8::XParams& param, const f8::DataSet* data_set) + { + long long instance_idx = param.sender.GetInt64(); + const std::vector& row0 = (*data_set)[0]; + const std::vector& row1 = (*data_set)[1]; + long long friend_apply_idx = a8::XValue(row0[0]); + long long event_idx = a8::XValue(row1[0]); + DBEngine::Instance()->UpdateFriendApplyIdx(instance_idx, friend_apply_idx); + DBEngine::Instance()->UpdateEventIdx(instance_idx, event_idx); + }; + auto on_error = + [] (a8::XParams& param, int error_code, const std::string& error_msg) + { + }; + + for (int i = 0; i < mysql_cluster_conf->Size(); ++i) { + a8::XObject conn_info = GetConnInfo(i); + DBEngine::Instance()->ExecAsyncQuery + ( + conn_info, + "SELECT IFNULL(MAX(idx), 0) FROM friend_apply UNION ALL " + "SELECT IFNULL(MAX(idx), 0) FROM event;", + { + }, + a8::XParams() + .SetSender(i), + on_ok, + on_error, + i + ); + } +} diff --git a/server/rankserver/dbengine.h b/server/rankserver/dbengine.h new file mode 100644 index 0000000..5f82a1c --- /dev/null +++ b/server/rankserver/dbengine.h @@ -0,0 +1,53 @@ +#pragma once + +#include "framework/cpp/dbpool.h" + +class DBEngine : public a8::Singleton +{ + private: + DBEngine() {}; + friend class a8::Singleton; + + public: + void Init(); + void UnInit(); + + //执行异步并行查询 + void ExecAsyncQuery(a8::XObject conn_info, + const char* querystr, + std::vector args, + a8::XParams param, + f8::AsyncDBOnOkFunc on_ok, + f8::AsyncDBOnErrorFunc on_error, + long long hash_code = 0); + //执行异步并行sql + void ExecAsyncScript(a8::XObject conn_info, + const char* querystr, + std::vector args, + a8::XParams param, + f8::AsyncDBOnOkFunc on_ok, + f8::AsyncDBOnErrorFunc on_error, + long long hash_code = 0); + + a8::XObject GetConnInfo(const std::string& data); + a8::XObject GetConnInfo(long long data); + + long long GetFriendApplyCurrIdx(long long crc32_code); + long long GetGroupApplyCurrIdx(long long crc32_code); + long long GetEventCurrIdx(long long crc32_code); + +private: + void UpdateFriendApplyIdx(long long crc32_code, long long new_idx); + void UpdateGroupApplyIdx(long long crc32_code, long long new_idx); + void UpdateEventIdx(long long crc32_code, long long new_idx); + +private: + int GetInstanceId(long long crc32_code); + void RefreshIdx(); + +private: + std::map friend_apply_idx_hash_; + std::map group_apply_idx_hash_; + std::map event_idx_hash_; + +}; diff --git a/server/rankserver/jsondatamgr.cc b/server/rankserver/jsondatamgr.cc index 6c81b6a..2027393 100644 --- a/server/rankserver/jsondatamgr.cc +++ b/server/rankserver/jsondatamgr.cc @@ -7,7 +7,6 @@ void JsonDataMgr::Init() { - std::string rankserver_cluster_json_file; if (!f8::IsOnlineEnv()) { if (f8::IsTestEnv()) { work_path_ = a8::Format("/root/pub/%d/%d/conf_test/game%d/rankserver.test", @@ -25,6 +24,13 @@ void JsonDataMgr::Init() }); } } + std::string rankserver_cluster_json_file; + std::string mysql_cluster_json_file; + mysql_cluster_json_file = a8::Format("%s/node1/friend.rankserver.mysql.cluster.json", + { + work_path_, + }); + rankserver_cluster_json_file = a8::Format("%s/node1/game9003.rankserver.cluster.json", { work_path_, @@ -44,3 +50,7 @@ std::shared_ptr JsonDataMgr::GetConf() return rankserver_cluster_json_[App::Instance()->instance_id - 1]; } +std::shared_ptr JsonDataMgr::GetMysqlClusterConf() +{ + return std::make_shared(mysql_cluster_json_); +} diff --git a/server/rankserver/jsondatamgr.h b/server/rankserver/jsondatamgr.h index e9c0b88..34d9b26 100644 --- a/server/rankserver/jsondatamgr.h +++ b/server/rankserver/jsondatamgr.h @@ -10,9 +10,11 @@ class JsonDataMgr : public a8::Singleton void Init(); void UnInit(); + std::shared_ptr GetMysqlClusterConf(); std::shared_ptr GetConf(); private: std::string work_path_ = "../config"; + a8::XObject mysql_cluster_json_; a8::XObject rankserver_cluster_json_; };