#include "precompile.h" #include #include #include #include "dbengine.h" #include "jsondatamgr.h" static void OnDBError(a8::XParams& param, int error_code, const std::string& error_msg) { #ifdef DEBUG a8::UdpLog::Instance()->Debug ( "on dberror %s", { error_msg } ); #endif a8::UdpLog::Instance()->Warning ( "on dberror %s", { error_msg } ); } void DBEngine::Init() { f8::DBPool::Instance()->Init(); f8::DBPool::Instance()->SetThreadNum(16); f8::DBPool::Instance()->on_dberror = OnDBError; a8::Timer::Instance()->AddRepeatTimer ( 1000 * 3 + (rand() % 3000), a8::XParams(), [] (const a8::XParams& param) { DBEngine::Instance()->RefreshIdx(); } ); } void DBEngine::UnInit() { f8::DBPool::Instance()->UnInit(); } void DBEngine::ExecAsyncQuery(a8::XObject conn_info, const char* querystr, std::vector args, a8::XParams param, f8::AsyncDBOnOkFunc on_ok, f8::AsyncDBOnErrorFunc on_error, long long hash_code) { f8::DBPool::Instance()->ExecAsyncQuery( conn_info, querystr, args, param, on_ok, on_error, hash_code ); } void DBEngine::ExecAsyncScript(a8::XObject conn_info, const char* querystr, std::vector args, a8::XParams param, f8::AsyncDBOnOkFunc on_ok, f8::AsyncDBOnErrorFunc on_error, long long hash_code) { f8::DBPool::Instance()->ExecAsyncScript( conn_info, querystr, args, param, on_ok, on_error, hash_code ); } a8::XObject DBEngine::GetConnInfo(const std::string& data) { return GetConnInfo(a8::openssl::Crc32((unsigned char*)data.data(), data.size())); } a8::XObject DBEngine::GetConnInfo(long long data) { std::shared_ptr mysql_cluster_conf = JsonDataMgr::Instance()->GetMysqlClusterConf(); if (mysql_cluster_conf->Size() <= 0) { abort(); } a8::XObject conn_info = *(mysql_cluster_conf->At(data % mysql_cluster_conf->Size())); return conn_info; } void DBEngine::UpdateFriendApplyIdx(long long crc32_code, long long new_idx) { if (GetFriendApplyCurrIdx(crc32_code) < new_idx) { friend_apply_idx_hash_[GetInstanceId(crc32_code)] = new_idx; } } long long DBEngine::GetFriendApplyCurrIdx(long long crc32_code) { auto itr = friend_apply_idx_hash_.find(GetInstanceId(crc32_code)); return itr != friend_apply_idx_hash_.end() ? itr->second : 0; } void DBEngine::UpdateGroupApplyIdx(long long crc32_code, long long new_idx) { if (GetGroupApplyCurrIdx(crc32_code) < new_idx) { group_apply_idx_hash_[GetInstanceId(crc32_code)] = new_idx; } } long long DBEngine::GetGroupApplyCurrIdx(long long crc32_code) { auto itr = group_apply_idx_hash_.find(GetInstanceId(crc32_code)); return itr != group_apply_idx_hash_.end() ? itr->second : 0; } long long DBEngine::GetEventCurrIdx(long long crc32_code) { auto itr = event_idx_hash_.find(GetInstanceId(crc32_code)); return itr != event_idx_hash_.end() ? itr->second : 0; } void DBEngine::UpdateEventIdx(long long crc32_code, long long new_idx) { if (GetEventCurrIdx(crc32_code) < new_idx) { event_idx_hash_[crc32_code] = new_idx; } } int DBEngine::GetInstanceId(long long crc32_code) { std::shared_ptr mysql_cluster_conf = JsonDataMgr::Instance()->GetMysqlClusterConf(); if (mysql_cluster_conf->Size() <= 0) { abort(); } return (crc32_code % mysql_cluster_conf->Size()) + 1; } void DBEngine::RefreshIdx() { std::shared_ptr mysql_cluster_conf = JsonDataMgr::Instance()->GetMysqlClusterConf(); if (mysql_cluster_conf->Size() <= 0) { abort(); } auto on_ok = [] (a8::XParams& param, const f8::DataSet* data_set) { long long instance_idx = param.sender.GetInt64(); const std::vector& row = (*data_set)[0]; long long friend_apply_idx = a8::XValue(row[0]); long long event_idx = a8::XValue(row[1]); DBEngine::Instance()->UpdateFriendApplyIdx(instance_idx, friend_apply_idx); DBEngine::Instance()->UpdateEventIdx(instance_idx, event_idx); }; auto on_error = [] (a8::XParams& param, int error_code, const std::string& error_msg) { }; for (int i = 0; i < mysql_cluster_conf->Size(); ++i) { a8::XObject conn_info = GetConnInfo(i); DBEngine::Instance()->ExecAsyncQuery ( conn_info, "SELECT MAX(idx) FROM friend_apply UNION " "SELECT MAX(idx) FROM event;", { }, a8::XParams() .SetSender(i), on_ok, on_error, i ); } }