Skip to content
Snippets Groups Projects
Commit a07553c2 authored by Nat Goodspeed's avatar Nat Goodspeed
Browse files

DRTVWR-476: Add LLThreadSafeQueue::tryPushFrontFor().

tryPushFrontFor() is pushFront() with a std::chrono::duration timeout.
parent 8c1c1b69
No related branches found
No related tags found
No related merge requests found
...@@ -30,8 +30,11 @@ ...@@ -30,8 +30,11 @@
#include "llexception.h" #include "llexception.h"
#include <deque> #include <deque>
#include <string> #include <string>
#include <chrono>
#include "mutex.h" #include "mutex.h"
#include "llcoros.h" #include "llcoros.h"
#include LLCOROS_MUTEX_HEADER
#include <boost/fiber/timed_mutex.hpp>
#include LLCOROS_CONDVAR_HEADER #include LLCOROS_CONDVAR_HEADER
// //
...@@ -83,10 +86,20 @@ class LLThreadSafeQueue ...@@ -83,10 +86,20 @@ class LLThreadSafeQueue
// the caller is blocked. // the caller is blocked.
void pushFront(ElementT const & element); void pushFront(ElementT const & element);
// Try to add an element to the front ofqueue without blocking. Returns // Try to add an element to the front of queue without blocking. Returns
// true only if the element was actually added. // true only if the element was actually added.
bool tryPushFront(ElementT const & element); bool tryPushFront(ElementT const & element);
// Try to add an element to the front of queue, blocking if full but with
// timeout. Returns true if the element was added.
// There are potentially two different timeouts involved: how long to try
// to lock the mutex, versus how long to wait for the queue to stop being
// full. Careful settings for each timeout might be orders of magnitude
// apart. However, this method conflates them.
template <typename Rep, typename Period>
bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
ElementT const & element);
// Pop the element at the end of the queue (will block if the queue is // Pop the element at the end of the queue (will block if the queue is
// empty). // empty).
// //
...@@ -120,10 +133,10 @@ class LLThreadSafeQueue ...@@ -120,10 +133,10 @@ class LLThreadSafeQueue
U32 mCapacity; U32 mCapacity;
bool mClosed; bool mClosed;
LLCoros::Mutex mLock; boost::fibers::timed_mutex mLock;
typedef LLCoros::LockType lock_t; typedef std::unique_lock<decltype(mLock)> lock_t;
LLCoros::ConditionVariable mCapacityCond; boost::fibers::condition_variable_any mCapacityCond;
LLCoros::ConditionVariable mEmptyCond; boost::fibers::condition_variable_any mEmptyCond;
}; };
// LLThreadSafeQueue // LLThreadSafeQueue
...@@ -162,6 +175,46 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) ...@@ -162,6 +175,46 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
} }
template <typename ElementT>
template <typename Rep, typename Period>
bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
ElementT const & element)
{
// Convert duration to time_point: passing the same timeout duration to
// each of multiple calls is wrong.
auto endpoint = std::chrono::steady_clock::now() + timeout;
lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock_until(endpoint))
return false;
while (true)
{
if (mClosed)
{
return false;
}
if (mStorage.size() < mCapacity)
{
mStorage.push_front(element);
lock1.unlock();
mEmptyCond.notify_one();
return true;
}
// Storage Full. Wait for signal.
if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint))
{
// timed out -- formally we might recheck both conditions above
return false;
}
// If we didn't time out, we were notified for some reason. Loop back
// to check.
}
}
template<typename ElementT> template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment