diff options
| author | Alan Conway <aconway@apache.org> | 2008-07-01 18:01:11 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-07-01 18:01:11 +0000 |
| commit | b010894ebe6c468fef0c14ad869b80ef336ab11f (patch) | |
| tree | 87fd021e862ad21abffc9457711f066651e67418 /cpp/src/qpid | |
| parent | 4db79de7e806ceba3a243abef9847f15fc41cc40 (diff) | |
| download | qpid-python-b010894ebe6c468fef0c14ad869b80ef336ab11f.tar.gz | |
Added timeout to SubscriptionManager::get(), LocalQueue::get() and BlockingQueue::get()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673158 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 97 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Waitable.h | 10 |
6 files changed, 98 insertions, 80 deletions
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index 04cee40a37..99ab6f0133 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -31,14 +31,25 @@ using namespace framing; LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {} LocalQueue::~LocalQueue() {} -Message LocalQueue::pop() { +Message LocalQueue::pop() { return get(); } + +Message LocalQueue::get() { + Message result; + bool ok = get(result, sys::TIME_INFINITE); + assert(ok); (void) ok; + return result; +} + +bool LocalQueue::get(Message& result, sys::Duration timeout) { if (!queue) throw ClosedException(); - FrameSet::shared_ptr content = queue->pop(); + FrameSet::shared_ptr content; + bool ok = queue->pop(content, timeout); + if (!ok) return false; if (content->isA<MessageTransferBody>()) { - Message m(*content); - autoAck.ack(m, session); - return m; + result = Message(*content); + autoAck.ack(result, session); + return true; } else throw CommandInvalidException( diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index 273814f179..f81065ef3c 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -25,6 +25,7 @@ #include "qpid/client/Message.h" #include "qpid/client/Demux.h" #include "qpid/client/AckPolicy.h" +#include "qpid/sys/Time.h" namespace qpid { namespace client { @@ -42,7 +43,7 @@ class LocalQueue public: /** Create a local queue. Subscribe the local queue to a remote broker * queue with a SubscriptionManager. - * + * * LocalQueue is an alternative to implementing a MessageListener. * *@param ackPolicy Policy for acknowledging messages. @see AckPolicy. @@ -51,14 +52,22 @@ class LocalQueue ~LocalQueue(); - /** Pop the next message off the local queue. + /** Wait up to timeout for the next message from the local queue. + *@param result Set to the message from the queue. + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if queue was empty after timeout. + */ + bool get(Message& result, sys::Duration timeout=0); + + /** Get the next message off the local queue, or wait for a + * message from the broker queue. *@exception ClosedException if subscription has been closed. */ + Message get(); + + /** Synonym for get(). */ Message pop(); - /** Synonym for pop(). */ - Message get() { return pop(); } - /** Return true if local queue is empty. */ bool empty() const; @@ -72,10 +81,11 @@ class LocalQueue AckPolicy& getAckPolicy(); private: - friend class SubscriptionManager; Session session; Demux::QueuePtr queue; AckPolicy autoAck; + + friend class SubscriptionManager; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 324b11e1df..9bb75f9a49 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -129,10 +129,13 @@ void SubscriptionManager::stop() dispatcher.stop(); } -Message SubscriptionManager::get(const std::string& queue) { +bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { LocalQueue lq; - subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str()); - return lq.get(); + std::string unique = framing::Uuid(true).str(); + subscribe(lq, queue, FlowControl::messageCredit(1), unique); + AutoCancel ac(*this, unique); + sync(session).messageFlush(unique); + return lq.get(result, timeout); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 0aa55099f5..3dad15fd29 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -122,10 +122,14 @@ class SubscriptionManager : public sys::Runnable const std::string& queue, const std::string& tag=std::string()); - /** - * Get a single message from a queue. + + /** Get a single message from a queue. + *@param result is set to the message from the queue. + *@ + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if no message available after timeout. */ - Message get(const std::string& queue); + bool get(Message& result, const std::string& queue, sys::Duration timeout=0); /** Cancel a subscription. */ void cancel(const std::string tag); @@ -191,6 +195,15 @@ class SubscriptionManager : public sys::Runnable AckPolicy& getAckPolicy(); }; +/** AutoCancel cancels a subscription in its destructor */ +class AutoCancel { + public: + AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {} + ~AutoCancel() { sm.cancel(tag); } + private: + SubscriptionManager& sm; + std::string tag; +}; }} // namespace qpid::client diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index 86020fad81..9bb215ff7f 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -35,47 +35,45 @@ namespace sys { template <class T> class BlockingQueue { - mutable sys::Waitable lock; + mutable sys::Waitable waitable; std::queue<T> queue; - bool closed; public: - BlockingQueue() : closed(false) {} + BlockingQueue() {} ~BlockingQueue() { close(); } - /** Block until there is a value to pop */ - T pop() - { - Waitable::ScopedLock l(lock); - if (!queueWait()) throw ClosedException(); - return popInternal(); - } - - /** Non-blocking pop. If there is a value set outValue and return - * true, else return false; + /** Pop from the queue, block up to timeout if empty. + *@param result Set to value popped from queue. + *@param timeout Defaults to infinite. + *@return true if result was set, false if queue empty after timeout. */ - bool tryPop(T& outValue) { - Waitable::ScopedLock l(lock); + bool pop(T& result, Duration timeout=TIME_INFINITE) { + Mutex::ScopedLock l(waitable); + { + Waitable::ScopedWait w(waitable); + AbsTime deadline(now(),timeout); + while (queue.empty()) waitable.wait(deadline); + } if (queue.empty()) return false; - outValue = popInternal(); + result = queue.front(); + queue.pop(); + if (!queue.empty()) + waitable.notify(); // Notify another waiter. return true; } - /** Non-blocking pop. If there is a value return it, else return - * valueIfEmpty. - */ - T tryPop(const T& valueIfEmpty=T()) { - T result=valueIfEmpty; - tryPop(result); + T pop() { + T result; + bool ok = pop(result); + assert(ok); (void) ok; // Infinite wait. return result; } - + /** Push a value onto the queue */ - void push(const T& t) - { - Waitable::ScopedLock l(lock); + void push(const T& t) { + Mutex::ScopedLock l(waitable); queue.push(t); - queueNotify(0); + waitable.notify(); // Notify a waiter. } /** @@ -84,56 +82,33 @@ public: */ void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException())) { - Waitable::ScopedLock l(lock); - if (!closed) { - lock.setException(ex); - closed = true; - lock.notifyAll(); - lock.waitWaiters(); // Ensure no threads are still waiting. + Mutex::ScopedLock l(waitable); + if (!waitable.hasException()) { + waitable.setException(ex); + waitable.notifyAll(); + waitable.waitWaiters(); // Ensure no threads are still waiting. } } /** Open a closed queue. */ void open() { - Waitable::ScopedLock l(lock); - closed=false; + Mutex::ScopedLock l(waitable); + waitable.resetException(); } bool isClosed() const { - Waitable::ScopedLock l(lock); - return closed; + Mutex::ScopedLock l(waitable); + return waitable.hasException(); } bool empty() const { - Waitable::ScopedLock l(lock); + Mutex::ScopedLock l(waitable); return queue.empty(); } size_t size() const { - Waitable::ScopedLock l(lock); + Mutex::ScopedLock l(waitable); return queue.size(); } - - private: - - void queueNotify(size_t ignore) { - if (!queue.empty() && lock.hasWaiters()>ignore) - lock.notify(); // Notify another waiter. - } - - bool queueWait() { - Waitable::ScopedWait w(lock); - while (!closed && queue.empty()) - lock.wait(); - return !queue.empty(); - } - - T popInternal() { - T t=queue.front(); - queue.pop(); - queueNotify(1); - return t; - } - }; }} diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h index 61b7e7d82b..7701b6f97d 100644 --- a/cpp/src/qpid/sys/Waitable.h +++ b/cpp/src/qpid/sys/Waitable.h @@ -76,6 +76,12 @@ class Waitable : public Monitor { } + /** True if the waitable has an exception */ + bool hasException() const { return exception; } + + /** Clear the exception if any */ + void resetException() { exception.reset(); } + /** Throws an exception if one is set before or during the wait. */ void wait() { ExCheck e(exception); @@ -88,8 +94,6 @@ class Waitable : public Monitor { return Monitor::wait(absoluteTime); } - ExceptionHolder exception; - private: struct ExCheck { const ExceptionHolder& exception; @@ -98,6 +102,8 @@ class Waitable : public Monitor { }; size_t waiters; + ExceptionHolder exception; + friend struct ScopedWait; }; |
