87 lines
2.9 KiB
C++
87 lines
2.9 KiB
C++
#pragma once
|
|
|
|
#include "../CountdownTimer.h"
|
|
#include "../../C/Baselib_SystemFutex.h"
|
|
#include "../../C/Baselib_Thread.h"
|
|
|
|
#if !PLATFORM_FUTEX_NATIVE_SUPPORT
|
|
#error "Only use this implementation on top of a proper futex, in all other situations us ConditionVariable_SemaphoreBased.inl.h"
|
|
#endif
|
|
|
|
namespace baselib
|
|
{
|
|
BASELIB_CPP_INTERFACE
|
|
{
|
|
inline void ConditionVariable::Wait()
|
|
{
|
|
m_Data.waiters.fetch_add(1, memory_order_relaxed);
|
|
m_Lock.Release();
|
|
while (!m_Data.TryConsumeWakeup())
|
|
{
|
|
Baselib_SystemFutex_Wait(&m_Data.wakeups.obj, 0, std::numeric_limits<uint32_t>::max());
|
|
}
|
|
m_Lock.Acquire();
|
|
}
|
|
|
|
inline bool ConditionVariable::TimedWait(const timeout_ms timeoutInMilliseconds)
|
|
{
|
|
m_Data.waiters.fetch_add(1, memory_order_relaxed);
|
|
m_Lock.Release();
|
|
|
|
uint32_t timeLeft = timeoutInMilliseconds.count();
|
|
auto timer = CountdownTimer::StartNew(timeoutInMilliseconds);
|
|
do
|
|
{
|
|
Baselib_SystemFutex_Wait(&m_Data.wakeups.obj, 0, timeLeft);
|
|
if (m_Data.TryConsumeWakeup())
|
|
{
|
|
m_Lock.Acquire();
|
|
return true;
|
|
}
|
|
timeLeft = timer.GetTimeLeftInMilliseconds().count();
|
|
}
|
|
while (timeLeft);
|
|
|
|
do
|
|
{
|
|
int32_t waiters = m_Data.waiters.load(memory_order_relaxed);
|
|
while (waiters > 0)
|
|
{
|
|
if (m_Data.waiters.compare_exchange_weak(waiters, waiters - 1, memory_order_relaxed, memory_order_relaxed))
|
|
{
|
|
m_Lock.Acquire();
|
|
return false;
|
|
}
|
|
}
|
|
Baselib_Thread_YieldExecution();
|
|
}
|
|
while (!m_Data.TryConsumeWakeup());
|
|
|
|
m_Lock.Acquire();
|
|
return true;
|
|
}
|
|
|
|
inline void ConditionVariable::Notify(uint16_t count)
|
|
{
|
|
int32_t waitingThreads = m_Data.waiters.load(memory_order_acquire);
|
|
do
|
|
{
|
|
int32_t threadsToWakeup = count < waitingThreads ? count : waitingThreads;
|
|
if (threadsToWakeup == 0)
|
|
{
|
|
atomic_thread_fence(memory_order_release);
|
|
return;
|
|
}
|
|
|
|
if (m_Data.waiters.compare_exchange_weak(waitingThreads, waitingThreads - threadsToWakeup, memory_order_relaxed, memory_order_relaxed))
|
|
{
|
|
m_Data.wakeups.fetch_add(threadsToWakeup, memory_order_release);
|
|
Baselib_SystemFutex_Notify(&m_Data.wakeups.obj, threadsToWakeup, Baselib_WakeupFallbackStrategy_OneByOne);
|
|
return;
|
|
}
|
|
}
|
|
while (waitingThreads > 0);
|
|
}
|
|
}
|
|
}
|