This commit is contained in:
azw 2023-04-13 09:45:11 +00:00
parent 059071f824
commit 00606e0349
2 changed files with 32 additions and 31 deletions

View File

@ -16,6 +16,16 @@
#include "app.h"
#include "downstreammgr.h"
class RequestTarget
{
public:
long long context_id = 0;
std::string account_id;
f8::MsgHdr* hdr_copy = nullptr;
f8::TimerWp timer_wp;
long long req_tick = 0;
};
void MasterMgr::Init()
{
curr_context_id_ = a8::MakeInt64(0, time(nullptr) + 1000 * 60 * 10);
@ -41,8 +51,9 @@ void MasterMgr::UnInit()
void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_ResponseTargetServer& msg)
{
f8::MsgHdr* context_hdr = GetHdrByContextId(msg.context_id());
if (context_hdr) {
auto req = GetRequestByContextId(msg.context_id());
if (req) {
#if 0
int socket_handle = context_hdr->socket_handle;
if (msg.error_code() == 0) {
UpStream* conn = UpStreamMgr::Instance()->RecreateUpStream
@ -59,6 +70,7 @@ void MasterMgr::_SS_MS_ResponseTargetServer(f8::MsgHdr& hdr, const ss::SS_MS_Res
}
}
RemoveRequest(socket_handle, msg.context_id(), true);
#endif
}
}
@ -96,7 +108,9 @@ void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr,
std::shared_ptr<Master> svr = GetConnById(code % mastersvr_hash_.size() + 1);
if (svr) {
++curr_context_id_;
f8::MsgHdr* new_hdr = hdr.Clone();
auto req = std::make_shared<RequestTarget>();
req->context_id = curr_context_id_;
req->hdr_copy = hdr.Clone();
ss::SS_WSP_RequestTargetServer msg;
msg.set_context_id(curr_context_id_);
@ -109,14 +123,16 @@ void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr,
pending_socket_hash_[hdr.socket_handle] = curr_context_id_;
assert(pending_request_hash_.find(curr_context_id_) == pending_request_hash_.end());
pending_request_hash_[curr_context_id_] = new_hdr;
pending_request_hash_[curr_context_id_] = req;
#if 1
f8::Timer::Instance()->SetTimeout
req->timer_wp = f8::Timer::Instance()->SetTimeoutWp
(1000 * 10,
[] (int event, const a8::Args* args)
[req] (int event, const a8::Args* args)
{
if (a8::TIMER_EXEC_EVENT == event) {
}
}
);
#else
auto timer_func =
@ -138,17 +154,6 @@ void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr,
}
DownStreamMgr::Instance()->AddPendingAccount(param.param2.GetString(), param.param1, param.param3);
};
f8::Timer::Instance()->SetTimeout
(1000 * 10,
a8::XParams()
.SetSender(curr_context_id_)
.SetParam1(hdr.socket_handle)
.SetParam2(account_id)
.SetParam3(a8::XGetTickCount()),
timer_func,
&timer_attacher->timer_list_,
timer_after_func
);
#endif
}
}
@ -156,18 +161,13 @@ void MasterMgr::RequestTargetServer(f8::MsgHdr& hdr,
void MasterMgr::RemoveRequest(int socket_handle, long long context_id, bool auto_free)
{
if (context_id == GetContextIdBySocket(socket_handle)) {
f8::MsgHdr* hdr = GetHdrByContextId(context_id);
if (hdr) {
#if 0
a8::TimerAttacher* timer_attacher = (a8::TimerAttacher*)hdr->user_data;
delete timer_attacher;
hdr->user_data = nullptr;
#endif
auto req = GetRequestByContextId(context_id);
if (req) {
if (auto_free) {
if (hdr->buf) {
free((char*)hdr->buf);
if (req->hdr_copy->buf) {
free((char*)req->hdr_copy->buf);
}
free(hdr);
free(req->hdr_copy);
}
}
pending_request_hash_.erase(context_id);
@ -181,7 +181,7 @@ long long MasterMgr::GetContextIdBySocket(int socket_handle)
return itr != pending_socket_hash_.end() ? itr->second : 0;
}
f8::MsgHdr* MasterMgr::GetHdrByContextId(long long context_id)
std::shared_ptr<RequestTarget> MasterMgr::GetRequestByContextId(long long context_id)
{
auto itr = pending_request_hash_.find(context_id);
return itr != pending_request_hash_.end() ? itr->second : nullptr;

View File

@ -10,6 +10,7 @@ namespace ss
class SS_MS_ResponseTargetServer;
}
class RequestTarget;
class Master;
class MasterMgr : public a8::Singleton<MasterMgr>
{
@ -36,13 +37,13 @@ class MasterMgr : public a8::Singleton<MasterMgr>
private:
long long GetContextIdBySocket(int socket_handle);
f8::MsgHdr* GetHdrByContextId(long long context_id);
std::shared_ptr<RequestTarget> GetRequestByContextId(long long context_id);
std::shared_ptr<Master> GetConnById(int instance_id);
private:
long long curr_context_id_ = 0;
std::map<int, std::shared_ptr<Master>> mastersvr_hash_;
std::map<int, long long> pending_socket_hash_;
std::map<long long, f8::MsgHdr*> pending_request_hash_;
std::map<long long, std::shared_ptr<RequestTarget>> pending_request_hash_;
};