Fix the network module.

- This fix must be applied on all cores.
    - Solved a nasty race condition between network threads.
This commit is contained in:
H0zen 2017-02-13 23:59:53 +02:00 committed by Antz
parent fcbc6e77c2
commit 5576da2b9f
4 changed files with 55 additions and 249 deletions

View File

@ -86,9 +86,9 @@ WorldSocket::WorldSocket(void) :
m_RecvWPct(0), m_RecvWPct(0),
m_RecvPct(), m_RecvPct(),
m_Header(sizeof(ClientPktHeader)), m_Header(sizeof(ClientPktHeader)),
m_OutBufferLock(),
m_OutBuffer(0), m_OutBuffer(0),
m_OutBufferSize(65536), m_OutBufferSize(65536),
m_OutActive(false),
m_Seed(static_cast<uint32>(rand32())) m_Seed(static_cast<uint32>(rand32()))
{ {
reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED); reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
@ -107,7 +107,7 @@ WorldSocket::~WorldSocket(void)
WorldPacket* pct; WorldPacket* pct;
while (m_PacketQueue.dequeue_head(pct) == 0) while (m_PacketQueue.dequeue_head(pct) == 0)
{ delete pct; } { delete pct; }
} }
bool WorldSocket::IsClosed(void) const bool WorldSocket::IsClosed(void) const
@ -117,21 +117,15 @@ bool WorldSocket::IsClosed(void) const
void WorldSocket::CloseSocket(void) void WorldSocket::CloseSocket(void)
{ {
{ ACE_GUARD(LockType, Guard, m_OutBufferLock);
ACE_GUARD(LockType, Guard, m_OutBufferLock);
if (closing_) if (closing_)
{ return; } { return; }
closing_ = true; closing_ = true;
peer().close_writer(); peer().close_writer();
}
{ m_Session = NULL;
ACE_GUARD(LockType, Guard, m_SessionLock);
m_Session = NULL;
}
} }
const std::string& WorldSocket::GetRemoteAddress(void) const const std::string& WorldSocket::GetRemoteAddress(void) const
@ -148,14 +142,6 @@ int WorldSocket::SendPacket(const WorldPacket& pkt)
WorldPacket pct = pkt; WorldPacket pct = pkt;
// Dump outgoing packet.
sLog.outWorldPacketDump(uint32(get_handle()), pct.GetOpcode(), pct.GetOpcodeName(), &pct, false);
#ifdef ENABLE_ELUNA
if (!sEluna->OnPacketSend(m_Session, pct))
{ return 0; }
#endif /* ENABLE_ELUNA */
if (iSendPacket(pct) == -1) if (iSendPacket(pct) == -1)
{ {
WorldPacket* npct; WorldPacket* npct;
@ -172,6 +158,8 @@ int WorldSocket::SendPacket(const WorldPacket& pkt)
} }
} }
reactor()->schedule_wakeup(this, ACE_Event_Handler::WRITE_MASK);
return 0; return 0;
} }
@ -193,10 +181,6 @@ int WorldSocket::open(void* a)
if (m_OutBuffer) if (m_OutBuffer)
{ return -1; } { return -1; }
// This will also prevent the socket from being Updated
// while we are initializing it.
m_OutActive = true;
// Hook for the manager. // Hook for the manager.
if (sWorldSocketMgr->OnSocketOpen(this) == -1) if (sWorldSocketMgr->OnSocketOpen(this) == -1)
{ return -1; } { return -1; }
@ -215,15 +199,8 @@ int WorldSocket::open(void* a)
m_Address = remote_addr.get_host_addr(); m_Address = remote_addr.get_host_addr();
// Send startup packet.
WorldPacket packet(SMSG_AUTH_CHALLENGE, 4);
packet << m_Seed;
if (SendPacket(packet) == -1)
{ return -1; }
// Register with ACE Reactor // Register with ACE Reactor
if (reactor()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK) == -1) if (reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1)
{ {
sLog.outError("WorldSocket::open: unable to register client handler errno = %s", ACE_OS::strerror(errno)); sLog.outError("WorldSocket::open: unable to register client handler errno = %s", ACE_OS::strerror(errno));
return -1; return -1;
@ -232,7 +209,11 @@ int WorldSocket::open(void* a)
// reactor takes care of the socket from now on // reactor takes care of the socket from now on
remove_reference(); remove_reference();
return 0; // Send startup packet.
WorldPacket packet(SMSG_AUTH_CHALLENGE, 4);
packet << m_Seed;
return SendPacket(packet);
} }
int WorldSocket::close(int) int WorldSocket::close(int)
@ -255,28 +236,18 @@ int WorldSocket::handle_input(ACE_HANDLE)
{ {
case -1 : case -1 :
{ {
if ((errno == EWOULDBLOCK) || if ((errno == EWOULDBLOCK) || (errno == EAGAIN))
(errno == EAGAIN))
{ {
return Update(); // interesting line ,isn't it ? return 0;
} }
DEBUG_LOG("WorldSocket::handle_input: Peer error closing connection errno = %s", ACE_OS::strerror(errno));
errno = ECONNRESET;
return -1;
} }
case 0: case 0:
{ {
DEBUG_LOG("WorldSocket::handle_input: Peer has closed connection");
errno = ECONNRESET; errno = ECONNRESET;
return -1; return -1;
} }
case 1:
return 1;
default: default:
return Update(); // another interesting line ;) return 0;
} }
ACE_NOTREACHED(return -1); ACE_NOTREACHED(return -1);
@ -292,7 +263,10 @@ int WorldSocket::handle_output(ACE_HANDLE)
const size_t send_len = m_OutBuffer->length(); const size_t send_len = m_OutBuffer->length();
if (send_len == 0) if (send_len == 0)
{ return cancel_wakeup_output(Guard); } {
reactor()->cancel_wakeup(this, ACE_Event_Handler::WRITE_MASK);
return 0;
}
#ifdef MSG_NOSIGNAL #ifdef MSG_NOSIGNAL
ssize_t n = peer().send(m_OutBuffer->rd_ptr(), send_len, MSG_NOSIGNAL); ssize_t n = peer().send(m_OutBuffer->rd_ptr(), send_len, MSG_NOSIGNAL);
@ -301,11 +275,16 @@ int WorldSocket::handle_output(ACE_HANDLE)
#endif // MSG_NOSIGNAL #endif // MSG_NOSIGNAL
if (n == 0) if (n == 0)
{ return -1; } {
return -1;
}
else if (n == -1) else if (n == -1)
{ {
if (errno == EWOULDBLOCK || errno == EAGAIN) if (errno == EWOULDBLOCK || errno == EAGAIN)
{ return schedule_wakeup_output(Guard); } {
return 0;
}
return -1; return -1;
} }
@ -316,85 +295,24 @@ int WorldSocket::handle_output(ACE_HANDLE)
// move the data to the base of the buffer // move the data to the base of the buffer
m_OutBuffer->crunch(); m_OutBuffer->crunch();
return schedule_wakeup_output(Guard); return 0;
} }
else // now n == send_len else // now n == send_len
{ {
m_OutBuffer->reset(); m_OutBuffer->reset();
if (!iFlushPacketQueue()) if(!iFlushPacketQueue()) //no more packets in queue
{ return cancel_wakeup_output(Guard); } {
else reactor()->cancel_wakeup(this, ACE_Event_Handler::WRITE_MASK);
{ return schedule_wakeup_output(Guard); } }
return 0;
} }
ACE_NOTREACHED(return 0); ACE_NOTREACHED(return 0);
} }
int WorldSocket::handle_output_queue(GuardType& g)
{
if (msg_queue()->is_empty())
return cancel_wakeup_output(g);
ACE_Message_Block* mblk;
if (msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
{
sLog.outError("WorldSocket::handle_output_queue dequeue_head");
return -1;
}
const size_t send_len = mblk->length();
#ifdef MSG_NOSIGNAL
ssize_t n = peer().send(mblk->rd_ptr(), send_len, MSG_NOSIGNAL);
#else
ssize_t n = peer().send(mblk->rd_ptr(), send_len);
#endif // MSG_NOSIGNAL
if (n == 0)
{
mblk->release();
return -1;
}
else if (n == -1)
{
if (errno == EWOULDBLOCK || errno == EAGAIN)
{
msg_queue()->enqueue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero);
return schedule_wakeup_output(g);
}
mblk->release();
return -1;
}
else if (n < (ssize_t)send_len) // now n > 0
{
mblk->rd_ptr(static_cast<size_t>(n));
if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
{
sLog.outError("WorldSocket::handle_output_queue enqueue_head");
mblk->release();
return -1;
}
return schedule_wakeup_output(g);
}
else // now n == send_len
{
mblk->release();
return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK;
}
ACE_NOTREACHED(return -1);
}
int WorldSocket::handle_close(ACE_HANDLE h, ACE_Reactor_Mask) int WorldSocket::handle_close(ACE_HANDLE h, ACE_Reactor_Mask)
{ {
// Critical section
{ {
ACE_GUARD_RETURN(LockType, Guard, m_OutBufferLock, -1); ACE_GUARD_RETURN(LockType, Guard, m_OutBufferLock, -1);
@ -404,27 +322,12 @@ int WorldSocket::handle_close(ACE_HANDLE h, ACE_Reactor_Mask)
{ peer().close_writer(); } { peer().close_writer(); }
} }
// Critical section m_Session = NULL;
{
ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1);
m_Session = NULL;
}
reactor()->remove_handler(this, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::ALL_EVENTS_MASK); reactor()->remove_handler(this, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::ALL_EVENTS_MASK);
return 0; return 0;
} }
int WorldSocket::Update(void)
{
if (closing_)
{ return -1; }
if (m_OutActive || m_OutBuffer->length() == 0)
{ return 0; }
return handle_output(get_handle());
}
int WorldSocket::handle_input_header(void) int WorldSocket::handle_input_header(void)
{ {
@ -577,45 +480,6 @@ int WorldSocket::handle_input_missing_data(void)
return size_t(n) == recv_size ? 1 : 2; return size_t(n) == recv_size ? 1 : 2;
} }
int WorldSocket::cancel_wakeup_output(GuardType& g)
{
if (!m_OutActive)
{ return 0; }
m_OutActive = false;
g.release();
if (reactor()->cancel_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
{
// would be good to store errno from reactor with errno guard
sLog.outError("WorldSocket::cancel_wakeup_output");
return -1;
}
return 0;
}
int WorldSocket::schedule_wakeup_output(GuardType& g)
{
if (m_OutActive)
{ return 0; }
m_OutActive = true;
g.release();
if (reactor()->schedule_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
{
sLog.outError("WorldSocket::schedule_wakeup_output");
return -1;
}
return 0;
}
int WorldSocket::ProcessIncoming(WorldPacket* new_pct) int WorldSocket::ProcessIncoming(WorldPacket* new_pct)
{ {
MANGOS_ASSERT(new_pct); MANGOS_ASSERT(new_pct);
@ -664,14 +528,10 @@ int WorldSocket::ProcessIncoming(WorldPacket* new_pct)
return 0; return 0;
default: default:
{ {
ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1);
if (m_Session != NULL) if (m_Session != NULL)
{ {
// OK ,give the packet to WorldSession // OK ,give the packet to WorldSession
aptr.release(); aptr.release();
// WARNING here we call it with locks held.
// Its possible to cause deadlock if QueuePacket calls back
m_Session->QueuePacket(new_pct); m_Session->QueuePacket(new_pct);
return 0; return 0;
} }
@ -876,7 +736,7 @@ int WorldSocket::HandleAuthSession(WorldPacket& recvPacket)
BASIC_LOG("WorldSocket::HandleAuthSession: Client %s attempted to log in using invalid client OS (%s).", GetRemoteAddress().c_str(), os.c_str()); BASIC_LOG("WorldSocket::HandleAuthSession: Client %s attempted to log in using invalid client OS (%s).", GetRemoteAddress().c_str(), os.c_str());
return -1; return -1;
} }
// Check that Key and account name are the same on client and server // Check that Key and account name are the same on client and server
Sha1Hash sha; Sha1Hash sha;
@ -964,8 +824,6 @@ int WorldSocket::HandlePing(WorldPacket& recvPacket)
if (max_count && m_OverSpeedPings > max_count) if (max_count && m_OverSpeedPings > max_count)
{ {
ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1);
if (m_Session && m_Session->GetSecurity() == SEC_PLAYER) if (m_Session && m_Session->GetSecurity() == SEC_PLAYER)
{ {
sLog.outError("WorldSocket::HandlePing: Player kicked for " sLog.outError("WorldSocket::HandlePing: Player kicked for "
@ -980,23 +838,18 @@ int WorldSocket::HandlePing(WorldPacket& recvPacket)
{ m_OverSpeedPings = 0; } { m_OverSpeedPings = 0; }
} }
// critical section if (m_Session)
{ {
ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1); m_Session->SetLatency(latency);
m_Session->SetClientTimeDelay(0); // recalculated on next movement packet
if (m_Session) }
{ else
m_Session->SetLatency(latency); {
m_Session->SetClientTimeDelay(0); // recalculated on next movement packet sLog.outError("WorldSocket::HandlePing: peer sent CMSG_PING, "
} "but is not authenticated or got recently kicked,"
else " address = %s",
{ GetRemoteAddress().c_str());
sLog.outError("WorldSocket::HandlePing: peer sent CMSG_PING, " return -1;
"but is not authenticated or got recently kicked,"
" address = %s",
GetRemoteAddress().c_str());
return -1;
}
} }
WorldPacket packet(SMSG_PONG, 4); WorldPacket packet(SMSG_PONG, 4);

View File

@ -66,8 +66,8 @@ typedef ACE_Acceptor< WorldSocket, ACE_SOCK_ACCEPTOR > WorldAcceptor;
* The class uses reference counting. * The class uses reference counting.
* *
* For output the class uses one buffer (64K usually) and * For output the class uses one buffer (64K usually) and
* a queue where it stores packet if there is no place on * a queue where it stores packet if there is no space left on
* the queue. The reason this is done, is because the server * the buffer. The reason this is done, is because the server
* does really a lot of small-size writes to it, and it doesn't * does really a lot of small-size writes to it, and it doesn't
* scale well to allocate memory for every. When something is * scale well to allocate memory for every. When something is
* written to the output buffer the socket is not immediately * written to the output buffer the socket is not immediately
@ -78,10 +78,9 @@ typedef ACE_Acceptor< WorldSocket, ACE_SOCK_ACCEPTOR > WorldAcceptor;
* sending packets from "producer" threads is minimal, * sending packets from "producer" threads is minimal,
* and doing a lot of writes with small size is tolerated. * and doing a lot of writes with small size is tolerated.
* *
* The calls to Update () method are managed by WorldSocketMgr * The calls to Update () method are managed by WorldSocketMgr.
* and ReactorRunnable.
* *
* For input ,the class uses one 1024 bytes buffer on stack * For input, the class uses one 1024 bytes buffer on stack
* to which it does recv() calls. And then received data is * to which it does recv() calls. And then received data is
* distributed where its needed. 1024 matches pretty well the * distributed where its needed. 1024 matches pretty well the
* traffic generated by client for now. * traffic generated by client for now.
@ -145,27 +144,16 @@ class WorldSocket : protected WorldHandler
/// Called when the socket can write. /// Called when the socket can write.
virtual int handle_output(ACE_HANDLE = ACE_INVALID_HANDLE) override; virtual int handle_output(ACE_HANDLE = ACE_INVALID_HANDLE) override;
/// Drain the queue if its not empty.
int handle_output_queue(GuardType& g);
/// Called when connection is closed or error happens. /// Called when connection is closed or error happens.
virtual int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE, virtual int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
/// Called by WorldSocketMgr/ReactorRunnable.
int Update(void);
private: private:
/// Helper functions for processing incoming data. /// Helper functions for processing incoming data.
int handle_input_header(void); int handle_input_header(void);
int handle_input_payload(void); int handle_input_payload(void);
int handle_input_missing_data(void); int handle_input_missing_data(void);
/// Help functions to mark/unmark the socket for output.
/// @param g the guard is for m_OutBufferLock, the function will release it
int cancel_wakeup_output(GuardType& g);
int schedule_wakeup_output(GuardType& g);
/// process one incoming packet. /// process one incoming packet.
/// @param new_pct received packet ,note that you need to delete it. /// @param new_pct received packet ,note that you need to delete it.
int ProcessIncoming(WorldPacket* new_pct); int ProcessIncoming(WorldPacket* new_pct);
@ -199,9 +187,6 @@ class WorldSocket : protected WorldHandler
/// Class used for managing encryption of the headers /// Class used for managing encryption of the headers
AuthCrypt m_Crypt; AuthCrypt m_Crypt;
/// Mutex lock to protect m_Session
LockType m_SessionLock;
/// Session to which received packets are routed /// Session to which received packets are routed
WorldSession* m_Session; WorldSession* m_Session;
@ -229,9 +214,6 @@ class WorldSocket : protected WorldHandler
/// this allows not-to kick player if its buffer is overflowed. /// this allows not-to kick player if its buffer is overflowed.
PacketQueueT m_PacketQueue; PacketQueueT m_PacketQueue;
/// True if the socket is registered with the reactor for output
bool m_OutActive;
uint32 m_Seed; uint32 m_Seed;
}; };

View File

@ -44,8 +44,7 @@
WorldSocketMgr::WorldSocketMgr() WorldSocketMgr::WorldSocketMgr()
: m_SockOutKBuff(-1), m_SockOutUBuff(65536), m_UseNoDelay(true), : m_SockOutKBuff(-1), m_SockOutUBuff(65536), m_UseNoDelay(true),
acceptor_(NULL),reactor_(NULL), acceptor_(NULL),reactor_(NULL)
sockets_()
{ {
} }
@ -60,28 +59,7 @@ int WorldSocketMgr::svc()
{ {
DEBUG_LOG("Starting Network Thread"); DEBUG_LOG("Starting Network Thread");
SocketSet::iterator i, t; reactor_->run_reactor_event_loop();
while (!reactor_->reactor_event_loop_done())
{
ACE_Time_Value interval(0, 10000);
if (reactor_->run_reactor_event_loop(interval) == -1)
{ break; }
for (i = sockets_->begin(); i != sockets_->end();)
{
if ((*i)->Update() == -1)
{
t = i;
++i;
(*t)->CloseSocket();
(*t)->RemoveReference();
sockets_->erase(t);
}
else
{ ++i; }
}
}
DEBUG_LOG("Network Thread Exitting"); DEBUG_LOG("Network Thread Exitting");
return 0; return 0;
@ -162,10 +140,7 @@ int WorldSocketMgr::OnSocketOpen(WorldSocket* sock)
} }
sock->m_OutBufferSize = static_cast<size_t>(m_SockOutUBuff); sock->m_OutBufferSize = static_cast<size_t>(m_SockOutUBuff);
sock->AddReference();
sock->reactor(reactor_); sock->reactor(reactor_);
sockets_->insert(sock); //no need for synch here, due to ACE_TSS
return 0; return 0;
} }

View File

@ -65,10 +65,6 @@ class WorldSocketMgr : public ACE_Task_Base
ACE_Reactor *reactor_; ACE_Reactor *reactor_;
WorldAcceptor *acceptor_; WorldAcceptor *acceptor_;
typedef std::set<WorldSocket*> SocketSet;
ACE_TSS<SocketSet> sockets_;
}; };
#define sWorldSocketMgr ACE_Singleton<WorldSocketMgr, ACE_Thread_Mutex>::instance() #define sWorldSocketMgr ACE_Singleton<WorldSocketMgr, ACE_Thread_Mutex>::instance()