diff options
| author | Alan Conway <aconway@apache.org> | 2008-07-01 18:01:11 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-07-01 18:01:11 +0000 |
| commit | b010894ebe6c468fef0c14ad869b80ef336ab11f (patch) | |
| tree | 87fd021e862ad21abffc9457711f066651e67418 /cpp/src/qpid/client | |
| parent | 4db79de7e806ceba3a243abef9847f15fc41cc40 (diff) | |
| download | qpid-python-b010894ebe6c468fef0c14ad869b80ef336ab11f.tar.gz | |
Added timeout to SubscriptionManager::get(), LocalQueue::get() and BlockingQueue::get()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673158 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 19 |
4 files changed, 54 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index 04cee40a37..99ab6f0133 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -31,14 +31,25 @@ using namespace framing; LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {} LocalQueue::~LocalQueue() {} -Message LocalQueue::pop() { +Message LocalQueue::pop() { return get(); } + +Message LocalQueue::get() { + Message result; + bool ok = get(result, sys::TIME_INFINITE); + assert(ok); (void) ok; + return result; +} + +bool LocalQueue::get(Message& result, sys::Duration timeout) { if (!queue) throw ClosedException(); - FrameSet::shared_ptr content = queue->pop(); + FrameSet::shared_ptr content; + bool ok = queue->pop(content, timeout); + if (!ok) return false; if (content->isA<MessageTransferBody>()) { - Message m(*content); - autoAck.ack(m, session); - return m; + result = Message(*content); + autoAck.ack(result, session); + return true; } else throw CommandInvalidException( diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index 273814f179..f81065ef3c 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -25,6 +25,7 @@ #include "qpid/client/Message.h" #include "qpid/client/Demux.h" #include "qpid/client/AckPolicy.h" +#include "qpid/sys/Time.h" namespace qpid { namespace client { @@ -42,7 +43,7 @@ class LocalQueue public: /** Create a local queue. Subscribe the local queue to a remote broker * queue with a SubscriptionManager. - * + * * LocalQueue is an alternative to implementing a MessageListener. * *@param ackPolicy Policy for acknowledging messages. @see AckPolicy. @@ -51,14 +52,22 @@ class LocalQueue ~LocalQueue(); - /** Pop the next message off the local queue. + /** Wait up to timeout for the next message from the local queue. + *@param result Set to the message from the queue. + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if queue was empty after timeout. + */ + bool get(Message& result, sys::Duration timeout=0); + + /** Get the next message off the local queue, or wait for a + * message from the broker queue. *@exception ClosedException if subscription has been closed. */ + Message get(); + + /** Synonym for get(). */ Message pop(); - /** Synonym for pop(). */ - Message get() { return pop(); } - /** Return true if local queue is empty. */ bool empty() const; @@ -72,10 +81,11 @@ class LocalQueue AckPolicy& getAckPolicy(); private: - friend class SubscriptionManager; Session session; Demux::QueuePtr queue; AckPolicy autoAck; + + friend class SubscriptionManager; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 324b11e1df..9bb75f9a49 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -129,10 +129,13 @@ void SubscriptionManager::stop() dispatcher.stop(); } -Message SubscriptionManager::get(const std::string& queue) { +bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { LocalQueue lq; - subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str()); - return lq.get(); + std::string unique = framing::Uuid(true).str(); + subscribe(lq, queue, FlowControl::messageCredit(1), unique); + AutoCancel ac(*this, unique); + sync(session).messageFlush(unique); + return lq.get(result, timeout); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 0aa55099f5..3dad15fd29 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -122,10 +122,14 @@ class SubscriptionManager : public sys::Runnable const std::string& queue, const std::string& tag=std::string()); - /** - * Get a single message from a queue. + + /** Get a single message from a queue. + *@param result is set to the message from the queue. + *@ + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if no message available after timeout. */ - Message get(const std::string& queue); + bool get(Message& result, const std::string& queue, sys::Duration timeout=0); /** Cancel a subscription. */ void cancel(const std::string tag); @@ -191,6 +195,15 @@ class SubscriptionManager : public sys::Runnable AckPolicy& getAckPolicy(); }; +/** AutoCancel cancels a subscription in its destructor */ +class AutoCancel { + public: + AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {} + ~AutoCancel() { sm.cancel(tag); } + private: + SubscriptionManager& sm; + std::string tag; +}; }} // namespace qpid::client |
