From f61e1ef7589da893b9b54448224dc0961515eb40 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 26 Oct 2007 19:48:31 +0000 Subject: Session resume support in client & broker: Client can resume a session after voluntary suspend() or network failure. Frames lost in network failure are automatically re-transmitted for transparent re-connection. client::Session improvements: - Locking to avoid races between network & user threads. - Replaced client::StateManager with sys::StateMonitor - avoid heap allocation. qpid::Exception clean up: - use QPID_MSG consistently to format exception messages. - throw typed exceptions (in reply_exceptions.h) for AMQP exceptions. - re-throw correct typed exception on client for exceptions from broker. - Removed QpidError.h rubygen/templates/constants.rb: - constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants. - reply_constants.h: Added throwReplyException(code, text) log::Logger: - Fixed shutdown race in Statement::~Initializer() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/ConcurrentQueue.h | 90 +++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 45 deletions(-) (limited to 'cpp/src/qpid/sys/ConcurrentQueue.h') diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h index 917afc5704..cf8199954e 100644 --- a/cpp/src/qpid/sys/ConcurrentQueue.h +++ b/cpp/src/qpid/sys/ConcurrentQueue.h @@ -22,7 +22,7 @@ * */ -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Waitable.h" #include "qpid/sys/ScopedIncrement.h" #include @@ -39,73 +39,73 @@ namespace sys { * * Also allows consuming threads to wait until an item is available. */ -template class ConcurrentQueue { +template class ConcurrentQueue : public Waitable { public: - ConcurrentQueue() : waiters(0), shutdown(false) {} + struct ShutdownException {}; + + ConcurrentQueue() : shutdownFlag(false) {} - /** Threads in wait() are woken with ShutdownException before - * destroying the queue. - */ - ~ConcurrentQueue() { - Mutex::ScopedLock l(lock); - shutdown = true; - lock.notifyAll(); - while (waiters > 0) - lock.wait(); + /** Waiting threads are notified by ~Waitable */ + ~ConcurrentQueue() { shutdown(); } + + bool shutdown(bool wait=true) { + ScopedLock l(lock); + if (!shutdownFlag) { + shutdownFlag=true; + lock.notifyAll(); + if (wait) lock.waitAll(); + shutdownFlag=true; + return true; + } + return false; } - + /** Push a data item onto the back of the queue */ void push(const T& data) { Mutex::ScopedLock l(lock); queue.push_back(data); + lock.notify(); } /** If the queue is non-empty, pop the front item into data and * return true. If the queue is empty, return false */ - bool pop(T& data) { + bool tryPop(T& data) { Mutex::ScopedLock l(lock); - return popInternal(data); + if (shutdownFlag || queue.empty()) + return false; + data = queue.front(); + queue.pop_front(); + return true; } - /** Wait up to deadline for a data item to be available. - *@return true if data was available, false if timed out. + /** Wait up to a timeout for a data item to be available. + *@return true if data was available, false if timed out or shut down. *@throws ShutdownException if the queue is destroyed. */ - bool waitPop(T& data, Duration timeout) { - Mutex::ScopedLock l(lock); - ScopedIncrement w( - waiters, boost::bind(&ConcurrentQueue::noWaiters, this)); + bool waitPop(T& data, Duration timeout=TIME_INFINITE) { + ScopedLock l(lock); AbsTime deadline(now(), timeout); - while (queue.empty() && lock.wait(deadline)) - ; - return popInternal(data); - } - - private: - - bool popInternal(T& data) { - if (shutdown) - throw ShutdownException(); + { + ScopedWait(*this); + while (!shutdownFlag && queue.empty()) + if (!lock.wait(deadline)) + return false; + } if (queue.empty()) return false; - else { - data = queue.front(); - queue.pop_front(); - return true; - } + data = queue.front(); + queue.pop_front(); + return true; } + + bool isShutdown() { ScopedLock l(lock); return shutdownFlag; } - void noWaiters() { - assert(waiters == 0); - if (shutdown) - lock.notify(); // Notify dtor thread. - } - - Monitor lock; + protected: + Waitable lock; + private: std::deque queue; - size_t waiters; - bool shutdown; + bool shutdownFlag; }; }} // namespace qpid::sys -- cgit v1.2.1