summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-07-01 18:01:11 +0000
committerAlan Conway <aconway@apache.org>2008-07-01 18:01:11 +0000
commitb010894ebe6c468fef0c14ad869b80ef336ab11f (patch)
tree87fd021e862ad21abffc9457711f066651e67418 /cpp/src/qpid/client
parent4db79de7e806ceba3a243abef9847f15fc41cc40 (diff)
downloadqpid-python-b010894ebe6c468fef0c14ad869b80ef336ab11f.tar.gz
Added timeout to SubscriptionManager::get(), LocalQueue::get() and BlockingQueue::get()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673158 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/LocalQueue.cpp21
-rw-r--r--cpp/src/qpid/client/LocalQueue.h22
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp9
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h19
4 files changed, 54 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp
index 04cee40a37..99ab6f0133 100644
--- a/cpp/src/qpid/client/LocalQueue.cpp
+++ b/cpp/src/qpid/client/LocalQueue.cpp
@@ -31,14 +31,25 @@ using namespace framing;
LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
LocalQueue::~LocalQueue() {}
-Message LocalQueue::pop() {
+Message LocalQueue::pop() { return get(); }
+
+Message LocalQueue::get() {
+ Message result;
+ bool ok = get(result, sys::TIME_INFINITE);
+ assert(ok); (void) ok;
+ return result;
+}
+
+bool LocalQueue::get(Message& result, sys::Duration timeout) {
if (!queue)
throw ClosedException();
- FrameSet::shared_ptr content = queue->pop();
+ FrameSet::shared_ptr content;
+ bool ok = queue->pop(content, timeout);
+ if (!ok) return false;
if (content->isA<MessageTransferBody>()) {
- Message m(*content);
- autoAck.ack(m, session);
- return m;
+ result = Message(*content);
+ autoAck.ack(result, session);
+ return true;
}
else
throw CommandInvalidException(
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index 273814f179..f81065ef3c 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -25,6 +25,7 @@
#include "qpid/client/Message.h"
#include "qpid/client/Demux.h"
#include "qpid/client/AckPolicy.h"
+#include "qpid/sys/Time.h"
namespace qpid {
namespace client {
@@ -42,7 +43,7 @@ class LocalQueue
public:
/** Create a local queue. Subscribe the local queue to a remote broker
* queue with a SubscriptionManager.
- *
+ *
* LocalQueue is an alternative to implementing a MessageListener.
*
*@param ackPolicy Policy for acknowledging messages. @see AckPolicy.
@@ -51,14 +52,22 @@ class LocalQueue
~LocalQueue();
- /** Pop the next message off the local queue.
+ /** Wait up to timeout for the next message from the local queue.
+ *@param result Set to the message from the queue.
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if queue was empty after timeout.
+ */
+ bool get(Message& result, sys::Duration timeout=0);
+
+ /** Get the next message off the local queue, or wait for a
+ * message from the broker queue.
*@exception ClosedException if subscription has been closed.
*/
+ Message get();
+
+ /** Synonym for get(). */
Message pop();
- /** Synonym for pop(). */
- Message get() { return pop(); }
-
/** Return true if local queue is empty. */
bool empty() const;
@@ -72,10 +81,11 @@ class LocalQueue
AckPolicy& getAckPolicy();
private:
- friend class SubscriptionManager;
Session session;
Demux::QueuePtr queue;
AckPolicy autoAck;
+
+ friend class SubscriptionManager;
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 324b11e1df..9bb75f9a49 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -129,10 +129,13 @@ void SubscriptionManager::stop()
dispatcher.stop();
}
-Message SubscriptionManager::get(const std::string& queue) {
+bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
LocalQueue lq;
- subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str());
- return lq.get();
+ std::string unique = framing::Uuid(true).str();
+ subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+ AutoCancel ac(*this, unique);
+ sync(session).messageFlush(unique);
+ return lq.get(result, timeout);
}
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 0aa55099f5..3dad15fd29 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -122,10 +122,14 @@ class SubscriptionManager : public sys::Runnable
const std::string& queue,
const std::string& tag=std::string());
- /**
- * Get a single message from a queue.
+
+ /** Get a single message from a queue.
+ *@param result is set to the message from the queue.
+ *@
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if no message available after timeout.
*/
- Message get(const std::string& queue);
+ bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
/** Cancel a subscription. */
void cancel(const std::string tag);
@@ -191,6 +195,15 @@ class SubscriptionManager : public sys::Runnable
AckPolicy& getAckPolicy();
};
+/** AutoCancel cancels a subscription in its destructor */
+class AutoCancel {
+ public:
+ AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+ ~AutoCancel() { sm.cancel(tag); }
+ private:
+ SubscriptionManager& sm;
+ std::string tag;
+};
}} // namespace qpid::client