From 5576da2b9fea3b8079bb6ad425364631c8085937 Mon Sep 17 00:00:00 2001 From: H0zen Date: Mon, 13 Feb 2017 23:59:53 +0200 Subject: [PATCH] Fix the network module. - This fix must be applied on all cores. - Solved a nasty race condition between network threads. --- src/game/Server/WorldSocket.cpp | 245 ++++++----------------------- src/game/Server/WorldSocket.h | 26 +-- src/game/Server/WorldSocketMgr.cpp | 29 +--- src/game/Server/WorldSocketMgr.h | 4 - 4 files changed, 55 insertions(+), 249 deletions(-) diff --git a/src/game/Server/WorldSocket.cpp b/src/game/Server/WorldSocket.cpp index 0ef4c129..acaa23c9 100644 --- a/src/game/Server/WorldSocket.cpp +++ b/src/game/Server/WorldSocket.cpp @@ -86,9 +86,9 @@ WorldSocket::WorldSocket(void) : m_RecvWPct(0), m_RecvPct(), m_Header(sizeof(ClientPktHeader)), + m_OutBufferLock(), m_OutBuffer(0), m_OutBufferSize(65536), - m_OutActive(false), m_Seed(static_cast(rand32())) { reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED); @@ -107,7 +107,7 @@ WorldSocket::~WorldSocket(void) WorldPacket* pct; while (m_PacketQueue.dequeue_head(pct) == 0) - { delete pct; } + { delete pct; } } bool WorldSocket::IsClosed(void) const @@ -117,21 +117,15 @@ bool WorldSocket::IsClosed(void) const void WorldSocket::CloseSocket(void) { - { - ACE_GUARD(LockType, Guard, m_OutBufferLock); + ACE_GUARD(LockType, Guard, m_OutBufferLock); - if (closing_) - { return; } + if (closing_) + { return; } - closing_ = true; - peer().close_writer(); - } + closing_ = true; + peer().close_writer(); - { - ACE_GUARD(LockType, Guard, m_SessionLock); - - m_Session = NULL; - } + m_Session = NULL; } const std::string& WorldSocket::GetRemoteAddress(void) const @@ -148,14 +142,6 @@ int WorldSocket::SendPacket(const WorldPacket& pkt) WorldPacket pct = pkt; - // Dump outgoing packet. - sLog.outWorldPacketDump(uint32(get_handle()), pct.GetOpcode(), pct.GetOpcodeName(), &pct, false); - -#ifdef ENABLE_ELUNA - if (!sEluna->OnPacketSend(m_Session, pct)) - { return 0; } -#endif /* ENABLE_ELUNA */ - if (iSendPacket(pct) == -1) { WorldPacket* npct; @@ -172,6 +158,8 @@ int WorldSocket::SendPacket(const WorldPacket& pkt) } } + reactor()->schedule_wakeup(this, ACE_Event_Handler::WRITE_MASK); + return 0; } @@ -193,10 +181,6 @@ int WorldSocket::open(void* a) if (m_OutBuffer) { return -1; } - // This will also prevent the socket from being Updated - // while we are initializing it. - m_OutActive = true; - // Hook for the manager. if (sWorldSocketMgr->OnSocketOpen(this) == -1) { return -1; } @@ -215,15 +199,8 @@ int WorldSocket::open(void* a) m_Address = remote_addr.get_host_addr(); - // Send startup packet. - WorldPacket packet(SMSG_AUTH_CHALLENGE, 4); - packet << m_Seed; - - if (SendPacket(packet) == -1) - { return -1; } - // Register with ACE Reactor - if (reactor()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK) == -1) + if (reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) { sLog.outError("WorldSocket::open: unable to register client handler errno = %s", ACE_OS::strerror(errno)); return -1; @@ -232,7 +209,11 @@ int WorldSocket::open(void* a) // reactor takes care of the socket from now on remove_reference(); - return 0; + // Send startup packet. + WorldPacket packet(SMSG_AUTH_CHALLENGE, 4); + packet << m_Seed; + + return SendPacket(packet); } int WorldSocket::close(int) @@ -255,28 +236,18 @@ int WorldSocket::handle_input(ACE_HANDLE) { case -1 : { - if ((errno == EWOULDBLOCK) || - (errno == EAGAIN)) + if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) { - return Update(); // interesting line ,isn't it ? + return 0; } - - DEBUG_LOG("WorldSocket::handle_input: Peer error closing connection errno = %s", ACE_OS::strerror(errno)); - - errno = ECONNRESET; - return -1; } case 0: { - DEBUG_LOG("WorldSocket::handle_input: Peer has closed connection"); - errno = ECONNRESET; return -1; } - case 1: - return 1; default: - return Update(); // another interesting line ;) + return 0; } ACE_NOTREACHED(return -1); @@ -292,7 +263,10 @@ int WorldSocket::handle_output(ACE_HANDLE) const size_t send_len = m_OutBuffer->length(); if (send_len == 0) - { return cancel_wakeup_output(Guard); } + { + reactor()->cancel_wakeup(this, ACE_Event_Handler::WRITE_MASK); + return 0; + } #ifdef MSG_NOSIGNAL ssize_t n = peer().send(m_OutBuffer->rd_ptr(), send_len, MSG_NOSIGNAL); @@ -301,11 +275,16 @@ int WorldSocket::handle_output(ACE_HANDLE) #endif // MSG_NOSIGNAL if (n == 0) - { return -1; } + { + return -1; + } + else if (n == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) - { return schedule_wakeup_output(Guard); } + { + return 0; + } return -1; } @@ -316,85 +295,24 @@ int WorldSocket::handle_output(ACE_HANDLE) // move the data to the base of the buffer m_OutBuffer->crunch(); - return schedule_wakeup_output(Guard); + return 0; } else // now n == send_len { m_OutBuffer->reset(); - if (!iFlushPacketQueue()) - { return cancel_wakeup_output(Guard); } - else - { return schedule_wakeup_output(Guard); } + if(!iFlushPacketQueue()) //no more packets in queue + { + reactor()->cancel_wakeup(this, ACE_Event_Handler::WRITE_MASK); + } + return 0; } ACE_NOTREACHED(return 0); } -int WorldSocket::handle_output_queue(GuardType& g) -{ - if (msg_queue()->is_empty()) - return cancel_wakeup_output(g); - - ACE_Message_Block* mblk; - - if (msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) - { - sLog.outError("WorldSocket::handle_output_queue dequeue_head"); - return -1; - } - - const size_t send_len = mblk->length(); - -#ifdef MSG_NOSIGNAL - ssize_t n = peer().send(mblk->rd_ptr(), send_len, MSG_NOSIGNAL); -#else - ssize_t n = peer().send(mblk->rd_ptr(), send_len); -#endif // MSG_NOSIGNAL - - if (n == 0) - { - mblk->release(); - - return -1; - } - else if (n == -1) - { - if (errno == EWOULDBLOCK || errno == EAGAIN) - { - msg_queue()->enqueue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero); - return schedule_wakeup_output(g); - } - - mblk->release(); - return -1; - } - else if (n < (ssize_t)send_len) // now n > 0 - { - mblk->rd_ptr(static_cast(n)); - - if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) - { - sLog.outError("WorldSocket::handle_output_queue enqueue_head"); - mblk->release(); - return -1; - } - - return schedule_wakeup_output(g); - } - else // now n == send_len - { - mblk->release(); - - return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK; - } - - ACE_NOTREACHED(return -1); -} - int WorldSocket::handle_close(ACE_HANDLE h, ACE_Reactor_Mask) { - // Critical section { ACE_GUARD_RETURN(LockType, Guard, m_OutBufferLock, -1); @@ -404,27 +322,12 @@ int WorldSocket::handle_close(ACE_HANDLE h, ACE_Reactor_Mask) { peer().close_writer(); } } - // Critical section - { - ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1); - - m_Session = NULL; - } + m_Session = NULL; reactor()->remove_handler(this, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::ALL_EVENTS_MASK); return 0; } -int WorldSocket::Update(void) -{ - if (closing_) - { return -1; } - - if (m_OutActive || m_OutBuffer->length() == 0) - { return 0; } - - return handle_output(get_handle()); -} int WorldSocket::handle_input_header(void) { @@ -577,45 +480,6 @@ int WorldSocket::handle_input_missing_data(void) return size_t(n) == recv_size ? 1 : 2; } -int WorldSocket::cancel_wakeup_output(GuardType& g) -{ - if (!m_OutActive) - { return 0; } - - m_OutActive = false; - - g.release(); - - if (reactor()->cancel_wakeup - (this, ACE_Event_Handler::WRITE_MASK) == -1) - { - // would be good to store errno from reactor with errno guard - sLog.outError("WorldSocket::cancel_wakeup_output"); - return -1; - } - - return 0; -} - -int WorldSocket::schedule_wakeup_output(GuardType& g) -{ - if (m_OutActive) - { return 0; } - - m_OutActive = true; - - g.release(); - - if (reactor()->schedule_wakeup - (this, ACE_Event_Handler::WRITE_MASK) == -1) - { - sLog.outError("WorldSocket::schedule_wakeup_output"); - return -1; - } - - return 0; -} - int WorldSocket::ProcessIncoming(WorldPacket* new_pct) { MANGOS_ASSERT(new_pct); @@ -664,14 +528,10 @@ int WorldSocket::ProcessIncoming(WorldPacket* new_pct) return 0; default: { - ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1); - if (m_Session != NULL) { // OK ,give the packet to WorldSession aptr.release(); - // WARNING here we call it with locks held. - // Its possible to cause deadlock if QueuePacket calls back m_Session->QueuePacket(new_pct); return 0; } @@ -876,7 +736,7 @@ int WorldSocket::HandleAuthSession(WorldPacket& recvPacket) BASIC_LOG("WorldSocket::HandleAuthSession: Client %s attempted to log in using invalid client OS (%s).", GetRemoteAddress().c_str(), os.c_str()); return -1; } - + // Check that Key and account name are the same on client and server Sha1Hash sha; @@ -964,8 +824,6 @@ int WorldSocket::HandlePing(WorldPacket& recvPacket) if (max_count && m_OverSpeedPings > max_count) { - ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1); - if (m_Session && m_Session->GetSecurity() == SEC_PLAYER) { sLog.outError("WorldSocket::HandlePing: Player kicked for " @@ -980,23 +838,18 @@ int WorldSocket::HandlePing(WorldPacket& recvPacket) { m_OverSpeedPings = 0; } } - // critical section + if (m_Session) { - ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1); - - if (m_Session) - { - m_Session->SetLatency(latency); - m_Session->SetClientTimeDelay(0); // recalculated on next movement packet - } - else - { - sLog.outError("WorldSocket::HandlePing: peer sent CMSG_PING, " - "but is not authenticated or got recently kicked," - " address = %s", - GetRemoteAddress().c_str()); - return -1; - } + m_Session->SetLatency(latency); + m_Session->SetClientTimeDelay(0); // recalculated on next movement packet + } + else + { + sLog.outError("WorldSocket::HandlePing: peer sent CMSG_PING, " + "but is not authenticated or got recently kicked," + " address = %s", + GetRemoteAddress().c_str()); + return -1; } WorldPacket packet(SMSG_PONG, 4); diff --git a/src/game/Server/WorldSocket.h b/src/game/Server/WorldSocket.h index f8b80358..cd51f682 100644 --- a/src/game/Server/WorldSocket.h +++ b/src/game/Server/WorldSocket.h @@ -66,8 +66,8 @@ typedef ACE_Acceptor< WorldSocket, ACE_SOCK_ACCEPTOR > WorldAcceptor; * The class uses reference counting. * * For output the class uses one buffer (64K usually) and - * a queue where it stores packet if there is no place on - * the queue. The reason this is done, is because the server + * a queue where it stores packet if there is no space left on + * the buffer. The reason this is done, is because the server * does really a lot of small-size writes to it, and it doesn't * scale well to allocate memory for every. When something is * written to the output buffer the socket is not immediately @@ -78,10 +78,9 @@ typedef ACE_Acceptor< WorldSocket, ACE_SOCK_ACCEPTOR > WorldAcceptor; * sending packets from "producer" threads is minimal, * and doing a lot of writes with small size is tolerated. * - * The calls to Update () method are managed by WorldSocketMgr - * and ReactorRunnable. + * The calls to Update () method are managed by WorldSocketMgr. * - * For input ,the class uses one 1024 bytes buffer on stack + * For input, the class uses one 1024 bytes buffer on stack * to which it does recv() calls. And then received data is * distributed where its needed. 1024 matches pretty well the * traffic generated by client for now. @@ -145,27 +144,16 @@ class WorldSocket : protected WorldHandler /// Called when the socket can write. virtual int handle_output(ACE_HANDLE = ACE_INVALID_HANDLE) override; - /// Drain the queue if its not empty. - int handle_output_queue(GuardType& g); - /// Called when connection is closed or error happens. virtual int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); - /// Called by WorldSocketMgr/ReactorRunnable. - int Update(void); - private: /// Helper functions for processing incoming data. int handle_input_header(void); int handle_input_payload(void); int handle_input_missing_data(void); - /// Help functions to mark/unmark the socket for output. - /// @param g the guard is for m_OutBufferLock, the function will release it - int cancel_wakeup_output(GuardType& g); - int schedule_wakeup_output(GuardType& g); - /// process one incoming packet. /// @param new_pct received packet ,note that you need to delete it. int ProcessIncoming(WorldPacket* new_pct); @@ -199,9 +187,6 @@ class WorldSocket : protected WorldHandler /// Class used for managing encryption of the headers AuthCrypt m_Crypt; - /// Mutex lock to protect m_Session - LockType m_SessionLock; - /// Session to which received packets are routed WorldSession* m_Session; @@ -229,9 +214,6 @@ class WorldSocket : protected WorldHandler /// this allows not-to kick player if its buffer is overflowed. PacketQueueT m_PacketQueue; - /// True if the socket is registered with the reactor for output - bool m_OutActive; - uint32 m_Seed; }; diff --git a/src/game/Server/WorldSocketMgr.cpp b/src/game/Server/WorldSocketMgr.cpp index aa1edfdc..353db477 100644 --- a/src/game/Server/WorldSocketMgr.cpp +++ b/src/game/Server/WorldSocketMgr.cpp @@ -44,8 +44,7 @@ WorldSocketMgr::WorldSocketMgr() : m_SockOutKBuff(-1), m_SockOutUBuff(65536), m_UseNoDelay(true), - acceptor_(NULL),reactor_(NULL), - sockets_() + acceptor_(NULL),reactor_(NULL) { } @@ -60,28 +59,7 @@ int WorldSocketMgr::svc() { DEBUG_LOG("Starting Network Thread"); - SocketSet::iterator i, t; - - while (!reactor_->reactor_event_loop_done()) - { - ACE_Time_Value interval(0, 10000); - if (reactor_->run_reactor_event_loop(interval) == -1) - { break; } - - for (i = sockets_->begin(); i != sockets_->end();) - { - if ((*i)->Update() == -1) - { - t = i; - ++i; - (*t)->CloseSocket(); - (*t)->RemoveReference(); - sockets_->erase(t); - } - else - { ++i; } - } - } + reactor_->run_reactor_event_loop(); DEBUG_LOG("Network Thread Exitting"); return 0; @@ -162,10 +140,7 @@ int WorldSocketMgr::OnSocketOpen(WorldSocket* sock) } sock->m_OutBufferSize = static_cast(m_SockOutUBuff); - - sock->AddReference(); sock->reactor(reactor_); - sockets_->insert(sock); //no need for synch here, due to ACE_TSS return 0; } diff --git a/src/game/Server/WorldSocketMgr.h b/src/game/Server/WorldSocketMgr.h index 6b3ee3c3..872edbf9 100644 --- a/src/game/Server/WorldSocketMgr.h +++ b/src/game/Server/WorldSocketMgr.h @@ -65,10 +65,6 @@ class WorldSocketMgr : public ACE_Task_Base ACE_Reactor *reactor_; WorldAcceptor *acceptor_; - - typedef std::set SocketSet; - ACE_TSS sockets_; - }; #define sWorldSocketMgr ACE_Singleton::instance()