summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-25 01:55:06 +0000
committerAlan Conway <aconway@apache.org>2008-10-25 01:55:06 +0000
commit57bd5193208b228c1088586917d7f43f13e0dd9a (patch)
tree564d1aa0d13da985bd2159bbdd8d4b92be4016fb /cpp/src/qpid
parentd1239516d2cd33ceb90be7a74bd5ea73825c577e (diff)
downloadqpid-python-57bd5193208b228c1088586917d7f43f13e0dd9a.tar.gz
Client API change: Centralize access to subscription status, better control of acquire/accept.
client/AckPolicy: removed, functionality moved to Subscription and SubscriptionSettings client/SubscriptionSettings: struct aggregates flow control & accept-acquire parameters for subscribe. client/Subscription: represents active subscription. Query settings, unacked messages, manual accept/acquire client/SubscriptionManager: use AcceptMode, AcquireMode enums rather than confusing bools. Issues addressed by the change: - old use of bool for acceptMode was inverted wrt AMQP enum values, bools are confusing. - old AckPolicy was broken - not possible to access the instance associated with an active subscription - old AckPolicy did not provide a way to do manual acquire, only accept. - setting values on SubscriptionManager to apply to subsequent subscriptions is awkward & error-prone, now can use SubscriptionSettings to control on each subscribe individually. - a subscription is a central concept in AMQP, it deserves to be a class. Subscription and SubscriptionSettings provides a single point for future expansion of interactions with a a Subscription. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707808 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/RangeSet.h8
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp35
-rw-r--r--cpp/src/qpid/client/Dispatcher.h25
-rw-r--r--cpp/src/qpid/client/FailoverListener.cpp2
-rw-r--r--cpp/src/qpid/client/FailoverSubscriptionManager.cpp117
-rw-r--r--cpp/src/qpid/client/FailoverSubscriptionManager.h32
-rw-r--r--cpp/src/qpid/client/FlowControl.h7
-rw-r--r--cpp/src/qpid/client/Handle.h (renamed from cpp/src/qpid/client/AckPolicy.h)61
-rw-r--r--cpp/src/qpid/client/HandleAccess.h (renamed from cpp/src/qpid/client/AckPolicy.cpp)39
-rw-r--r--cpp/src/qpid/client/HandlePrivate.h61
-rw-r--r--cpp/src/qpid/client/LocalQueue.cpp11
-rw-r--r--cpp/src/qpid/client/LocalQueue.h18
-rw-r--r--cpp/src/qpid/client/Subscription.cpp47
-rw-r--r--cpp/src/qpid/client/Subscription.h99
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.cpp116
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.h99
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp94
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h128
-rw-r--r--cpp/src/qpid/client/SubscriptionSettings.h62
19 files changed, 673 insertions, 388 deletions
diff --git a/cpp/src/qpid/RangeSet.h b/cpp/src/qpid/RangeSet.h
index 2a88426f17..1ba4fbbcef 100644
--- a/cpp/src/qpid/RangeSet.h
+++ b/cpp/src/qpid/RangeSet.h
@@ -27,6 +27,7 @@
#include <boost/operators.hpp>
#include <boost/bind.hpp>
#include <algorithm>
+#include <numeric>
namespace qpid {
@@ -53,7 +54,7 @@ class Range {
void begin(const T& t) { begin_ = t; }
void end(const T& t) { end_ = t; }
-
+ size_t size() const { return end_ - begin_; }
bool empty() const { return begin_ == end_; }
bool contains(const T& x) const { return begin_ <= x && x < end_; }
@@ -172,6 +173,7 @@ class RangeSet
// The difference between the start and end of this range set
uint32_t span() const;
+ size_t size() const;
bool empty() const { return ranges.empty(); }
void clear() { ranges.clear(); }
@@ -185,6 +187,7 @@ class RangeSet
template <class S> void decode(S& s) { uint16_t sz; s(sz); ranges.resize(sz/sizeof(Range<T>)); }
private:
+ static size_t accumulateSize(size_t s, const Range<T>& r) { return s+r.size(); }
Ranges ranges;
template <class U> friend std::ostream& operator<<(std::ostream& o, const RangeSet<U>& r);
@@ -317,6 +320,9 @@ template <class T> uint32_t RangeSet<T>::span() const {
return ranges.back().last() - ranges.front().first();
}
+template <class T> size_t RangeSet<T>::size() const {
+ return std::accumulate(rangesBegin(), rangesEnd(), 0, &RangeSet<T>::accumulateSize);
+}
} // namespace qpid
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index c26dba188d..0b7618eb4c 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -19,6 +19,7 @@
*
*/
#include "Dispatcher.h"
+#include "SubscriptionImpl.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -37,18 +38,6 @@ using qpid::sys::Thread;
namespace qpid {
namespace client {
-Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
- : session(s), listener(l), autoAck(a) {}
-
-void Subscriber::received(Message& msg)
-{
-
- if (listener) {
- listener->received(msg);
- autoAck.ack(msg, session);
- }
-}
-
Dispatcher::Dispatcher(const Session& s, const std::string& q)
: session(s),
running(false),
@@ -78,7 +67,7 @@ void Dispatcher::run()
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
Message msg(*content);
- Subscriber::shared_ptr listener = find(msg.getDestination());
+ boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination());
if (!listener) {
QPID_LOG(error, "No listener found for destination " << msg.getDestination());
} else {
@@ -121,7 +110,7 @@ void Dispatcher::setAutoStop(bool b)
autoStop = b;
}
-Subscriber::shared_ptr Dispatcher::find(const std::string& name)
+boost::intrusive_ptr<SubscriptionImpl> Dispatcher::find(const std::string& name)
{
ScopedLock<Mutex> l(lock);
Listeners::iterator i = listeners.find(name);
@@ -131,24 +120,12 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name)
return i->second;
}
-void Dispatcher::listen(
- MessageListener* listener, AckPolicy autoAck
-)
-{
- ScopedLock<Mutex> l(lock);
- defaultListener = Subscriber::shared_ptr(
- new Subscriber(session, listener, autoAck));
-}
-
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck)
-{
+void Dispatcher::listen(const boost::intrusive_ptr<SubscriptionImpl>& subscription) {
ScopedLock<Mutex> l(lock);
- listeners[destination] = Subscriber::shared_ptr(
- new Subscriber(session, listener, autoAck));
+ listeners[subscription->getName()] = subscription;
}
-void Dispatcher::cancel(const std::string& destination)
-{
+void Dispatcher::cancel(const std::string& destination) {
ScopedLock<Mutex> l(lock);
listeners.erase(destination);
if (autoStop && listeners.empty())
diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h
index d85785ed2c..921c6449a3 100644
--- a/cpp/src/qpid/client/Dispatcher.h
+++ b/cpp/src/qpid/client/Dispatcher.h
@@ -30,24 +30,12 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "MessageListener.h"
-#include "AckPolicy.h"
+#include "SubscriptionImpl.h"
namespace qpid {
namespace client {
-///@internal
-class Subscriber : public MessageListener
-{
- AsyncSession session;
- MessageListener* const listener;
- AckPolicy autoAck;
-
-public:
- typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(const Session& session, MessageListener* listener, AckPolicy);
- void received(Message& msg);
-
-};
+class SubscriptionImpl;
///@internal
typedef framing::Handler<framing::FrameSet> FrameSetHandler;
@@ -55,7 +43,7 @@ typedef framing::Handler<framing::FrameSet> FrameSetHandler;
///@internal
class Dispatcher : public sys::Runnable
{
- typedef std::map<std::string, Subscriber::shared_ptr> Listeners;
+ typedef std::map<std::string, boost::intrusive_ptr<SubscriptionImpl> >Listeners;
sys::Mutex lock;
sys::Thread worker;
Session session;
@@ -63,10 +51,10 @@ class Dispatcher : public sys::Runnable
bool running;
bool autoStop;
Listeners listeners;
- Subscriber::shared_ptr defaultListener;
+ boost::intrusive_ptr<SubscriptionImpl> defaultListener;
std::auto_ptr<FrameSetHandler> handler;
- Subscriber::shared_ptr find(const std::string& name);
+ boost::intrusive_ptr<SubscriptionImpl> find(const std::string& name);
bool isStopped();
boost::function<void ()> failoverHandler;
@@ -84,8 +72,7 @@ public:
failoverHandler = fh;
}
- void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy());
- void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy());
+ void listen(const boost::intrusive_ptr<SubscriptionImpl>& subscription);
void cancel(const std::string& destination);
};
diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp
index 98df12fc57..8311e713a4 100644
--- a/cpp/src/qpid/client/FailoverListener.cpp
+++ b/cpp/src/qpid/client/FailoverListener.cpp
@@ -60,7 +60,7 @@ FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, c
std::string qname=session.getId().getName();
session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
- subscriptions->subscribe(*this, qname, FlowControl::unlimited());
+ subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
thread = sys::Thread(*subscriptions);
}
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index 0331cbeb9e..5fa4cb2800 100644
--- a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -68,7 +68,7 @@ FailoverSubscriptionManager::failover ( )
void
FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
- const FlowControl & flow,
+ const SubscriptionSettings & settings,
const std::string & tag,
bool record_this
)
@@ -77,11 +77,11 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener,
subscriptionManager->subscribe ( listener,
queue,
- flow,
+ settings,
tag
);
if ( record_this )
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag, false ) );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, settings, tag, false ) );
}
@@ -89,7 +89,7 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener,
void
FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
- const FlowControl & flow,
+ const SubscriptionSettings & settings,
const std::string & tag,
bool record_this
)
@@ -98,12 +98,12 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
subscriptionManager->subscribe ( localQueue,
queue,
- flow,
+ settings,
tag
);
if ( record_this )
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, false ) );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, settings, tag, false ) );
}
@@ -245,109 +245,4 @@ FailoverSubscriptionManager::stop ( )
lock.notifyAll();
}
-
-
-void
-FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
- const FlowControl & flow
-)
-{
-
- subscriptionManager->setFlowControl ( destination, flow );
-}
-
-
-
-void
-FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
-{
-
- subscriptionManager->setFlowControl ( flow );
-}
-
-
-
-const FlowControl &
-FailoverSubscriptionManager::getFlowControl ( ) const
-{
-
- return subscriptionManager->getFlowControl ( );
-}
-
-
-
-
-void
-FailoverSubscriptionManager::setFlowControl ( const std::string & tag,
- uint32_t messages,
- uint32_t bytes,
- bool window
-)
-{
-
- subscriptionManager->setFlowControl ( tag,
- messages,
- bytes,
- window
- );
-}
-
-
-
-void
-FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
- uint32_t bytes,
- bool window
-)
-{
-
- subscriptionManager->setFlowControl ( messages,
- bytes,
- window
- );
-}
-
-
-
-void
-FailoverSubscriptionManager::setAcceptMode ( bool required )
-{
-
- subscriptionManager->setAcceptMode ( required );
-}
-
-
-
-void
-FailoverSubscriptionManager::setAcquireMode ( bool acquire )
-{
-
- subscriptionManager->setAcquireMode ( acquire );
-}
-
-
-
-void
-FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
-{
-
- subscriptionManager->setAckPolicy ( autoAck );
-}
-
-
-
-AckPolicy &
-FailoverSubscriptionManager::getAckPolicy()
-{
-
- return subscriptionManager->getAckPolicy ( );
-}
-
-
-
-
-
-
-
-
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.h b/cpp/src/qpid/client/FailoverSubscriptionManager.h
index b7631d3a98..0556ba15ec 100644
--- a/cpp/src/qpid/client/FailoverSubscriptionManager.h
+++ b/cpp/src/qpid/client/FailoverSubscriptionManager.h
@@ -31,7 +31,7 @@
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/LocalQueue.h>
-#include <qpid/client/FlowControl.h>
+#include <qpid/client/SubscriptionSettings.h>
#include <qpid/sys/Runnable.h>
#include <qpid/sys/Monitor.h>
@@ -50,13 +50,13 @@ class FailoverSubscriptionManager
void subscribe ( MessageListener & listener,
const std::string & queue,
- const FlowControl & flow,
+ const SubscriptionSettings & ,
const std::string & tag = std::string(),
bool record_this = true );
void subscribe ( LocalQueue & localQueue,
const std::string & queue,
- const FlowControl & flow,
+ const SubscriptionSettings & ,
const std::string & tag=std::string(),
bool record_this = true );
@@ -84,32 +84,6 @@ class FailoverSubscriptionManager
void stop ( );
- void setFlowControl ( const std::string & destintion,
- const FlowControl & flow );
-
- void setFlowControl ( const FlowControl & flow );
-
- const FlowControl & getFlowControl ( ) const;
-
- void setFlowControl ( const std::string & tag,
- uint32_t messages,
- uint32_t bytes,
- bool window=true );
-
- void setFlowControl ( uint32_t messages,
- uint32_t bytes,
- bool window = true
- );
-
- void setAcceptMode ( bool required );
-
- void setAcquireMode ( bool acquire );
-
- void setAckPolicy ( const AckPolicy & autoAck );
-
- AckPolicy & getAckPolicy();
-
-
// Get ready for a failover.
void prepareForFailover ( Session newSession );
void failover ( );
diff --git a/cpp/src/qpid/client/FlowControl.h b/cpp/src/qpid/client/FlowControl.h
index 081061ac02..0f5f8596ec 100644
--- a/cpp/src/qpid/client/FlowControl.h
+++ b/cpp/src/qpid/client/FlowControl.h
@@ -22,6 +22,8 @@
*
*/
+#include <qpid/sys/IntegerTypes.h>
+
namespace qpid {
namespace client {
@@ -40,9 +42,8 @@ namespace client {
* is renewed.
*
* In "window mode" credit is automatically renewed when a message is
- * acknowledged (@see AckPolicy) In non-window mode credit is not
- * automatically renewed, it must be explicitly re-set (@see
- * SubscriptionManager)
+ * accepted. In non-window mode credit is not automatically renewed,
+ * it must be explicitly re-set (@see Subscription)
*/
struct FlowControl {
static const uint32_t UNLIMITED=0xFFFFFFFF;
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/Handle.h
index 84bfb6a46a..4fd82b7646 100644
--- a/cpp/src/qpid/client/AckPolicy.h
+++ b/cpp/src/qpid/client/Handle.h
@@ -1,7 +1,8 @@
-#ifndef QPID_CLIENT_ACKPOLICY_H
-#define QPID_CLIENT_ACKPOLICY_H
+#ifndef QPID_CLIENT_HANDLE_H
+#define QPID_CLIENT_HANDLE_H
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,38 +22,40 @@
*
*/
-#include "qpid/framing/SequenceSet.h"
-#include "qpid/client/AsyncSession.h"
-#include "qpid/client/Message.h"
-
namespace qpid {
namespace client {
+template <class T> class HandlePrivate;
+
/**
- * Policy for automatic acknowledgement of messages.
- *
- *
- * \ingroup clientapi
+ * A handle is like a pointer: it points to some underlying object.
+ * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the
+ * implicit conversion to bool to test for a null handle.
*/
-class AckPolicy
-{
- framing::SequenceSet accepted;
- size_t interval;
- size_t count;
-
+template <class T> class Handle {
public:
- /**
- * Sends accepts and marks completion of received transfers.
- *
- *@param n: send an accept for every n messages received.
- *n==0 means no automatic acknowledgement.
- */
- AckPolicy(size_t n=1);
- void ack(const Message& msg, AsyncSession session);
- void ackOutstanding(AsyncSession session);};
+ ~Handle();
+ Handle(const Handle&);
+ Handle& operator=(const Handle&);
-}} // namespace qpid::client
+ /**@return true if handle is valid, i.e. not null. */
+ bool isValid() const { return impl; }
+
+ /**@return true if handle is null. It is an error to call any function on a null handle. */
+ bool isNull() const { return !impl; }
+ operator bool() const { return impl; }
+ bool operator !() const { return impl; }
+ void swap(Handle<T>&);
+
+ protected:
+ Handle(T* =0);
+ T* impl;
+
+ friend class HandlePrivate<T>;
+};
+
+}} // namespace qpid::client
-#endif /*!QPID_CLIENT_ACKPOLICY_H*/
+#endif /*!QPID_CLIENT_HANDLE_H*/
diff --git a/cpp/src/qpid/client/AckPolicy.cpp b/cpp/src/qpid/client/HandleAccess.h
index 7956ebad0f..f1747db638 100644
--- a/cpp/src/qpid/client/AckPolicy.cpp
+++ b/cpp/src/qpid/client/HandleAccess.h
@@ -1,3 +1,6 @@
+#ifndef QPID_CLIENT_HANDLEACCESS_H
+#define QPID_CLIENT_HANDLEACCESS_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,33 +21,21 @@
* under the License.
*
*/
-#include "AckPolicy.h"
+
+#include <Handle.h>
namespace qpid {
namespace client {
-AckPolicy::AckPolicy(size_t n) : interval(n), count(n) {}
-
-void AckPolicy::ack(const Message& msg, AsyncSession session)
-{
- accepted.add(msg.getId());
- if (interval && --count==0) {
- session.markCompleted(msg.getId(), false, true);
- session.messageAccept(accepted);
- accepted.clear();
- count = interval;
- } else {
- session.markCompleted(msg.getId(), false, false);
- }
-}
-
-void AckPolicy::ackOutstanding(AsyncSession session)
+/**
+ * Provide access to the private impl member of a Handle.
+ */
+template <class T>
+class HandleAccess
{
- if (!accepted.empty()) {
- session.messageAccept(accepted);
- accepted.clear();
- session.sendCompletion();
- }
-}
-
+ public:
+ static boost::shared_ptr<T> getImpl(Handle<T>& h) { return h.impl; }
+};
}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_HANDLEACCESS_H*/
diff --git a/cpp/src/qpid/client/HandlePrivate.h b/cpp/src/qpid/client/HandlePrivate.h
new file mode 100644
index 0000000000..488ce48075
--- /dev/null
+++ b/cpp/src/qpid/client/HandlePrivate.h
@@ -0,0 +1,61 @@
+#ifndef QPID_CLIENT_HANDLEPRIVATE_H
+#define QPID_CLIENT_HANDLEPRIVATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <algorithm>
+
+namespace qpid {
+namespace client {
+
+/** @file
+ * Private implementation of handle, include in .cpp file of handle
+ * subclasses _after_ including the declaration of class T.
+ * T can be any class that can be used with boost::intrusive_ptr.
+ */
+
+template <class T>
+Handle<T>::Handle(T* p) : impl(p) { if (impl) boost::intrusive_ptr_add_ref(impl); }
+
+template <class T>
+Handle<T>::~Handle() { if(impl) boost::intrusive_ptr_release(impl); }
+
+template <class T>
+Handle<T>::Handle(const Handle& h) : impl(h.impl) { if(impl) boost::intrusive_ptr_add_ref(impl); }
+
+template <class T>
+Handle<T>& Handle<T>::operator=(const Handle<T>& h) { Handle<T>(h).swap(*this); return *this; }
+
+template <class T>
+void Handle<T>::swap(Handle<T>& h) { std::swap(impl, h.impl); }
+
+
+/** Access to private impl of a Handle */
+template <class T>
+class HandlePrivate {
+ public:
+ static boost::intrusive_ptr<T> get(Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+};
+
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_HANDLEPRIVATE_H*/
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp
index 99ab6f0133..229d3766ef 100644
--- a/cpp/src/qpid/client/LocalQueue.cpp
+++ b/cpp/src/qpid/client/LocalQueue.cpp
@@ -22,13 +22,15 @@
#include "qpid/Exception.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/reply_exceptions.h"
+#include "HandlePrivate.h"
+#include "SubscriptionImpl.h"
namespace qpid {
namespace client {
using namespace framing;
-LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
+LocalQueue::LocalQueue() {}
LocalQueue::~LocalQueue() {}
Message LocalQueue::pop() { return get(); }
@@ -48,7 +50,9 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) {
if (!ok) return false;
if (content->isA<MessageTransferBody>()) {
result = Message(*content);
- autoAck.ack(result, session);
+ boost::intrusive_ptr<SubscriptionImpl> si = HandlePrivate<SubscriptionImpl>::get(subscription);
+ assert(si);
+ if (si) si->received(result);
return true;
}
else
@@ -56,9 +60,6 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) {
QPID_MSG("Unexpected method: " << content->getMethod()));
}
-void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
-AckPolicy& LocalQueue::getAckPolicy() { return autoAck; }
-
bool LocalQueue::empty() const
{
if (!queue)
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index f81065ef3c..9fe72762c3 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -23,8 +23,8 @@
*/
#include "qpid/client/Message.h"
+#include "qpid/client/Subscription.h"
#include "qpid/client/Demux.h"
-#include "qpid/client/AckPolicy.h"
#include "qpid/sys/Time.h"
namespace qpid {
@@ -38,17 +38,14 @@ namespace client {
*
* \ingroup clientapi
*/
-class LocalQueue
-{
+class LocalQueue {
public:
/** Create a local queue. Subscribe the local queue to a remote broker
* queue with a SubscriptionManager.
*
* LocalQueue is an alternative to implementing a MessageListener.
- *
- *@param ackPolicy Policy for acknowledging messages. @see AckPolicy.
*/
- LocalQueue(AckPolicy ackPolicy=AckPolicy());
+ LocalQueue();
~LocalQueue();
@@ -74,16 +71,9 @@ class LocalQueue
/** Number of messages on the local queue */
size_t size() const;
- /** Set the message acknowledgement policy. @see AckPolicy. */
- void setAckPolicy(AckPolicy);
-
- /** Get the message acknowledgement policy. @see AckPolicy. */
- AckPolicy& getAckPolicy();
-
private:
- Session session;
Demux::QueuePtr queue;
- AckPolicy autoAck;
+ Subscription subscription;
friend class SubscriptionManager;
};
diff --git a/cpp/src/qpid/client/Subscription.cpp b/cpp/src/qpid/client/Subscription.cpp
new file mode 100644
index 0000000000..449c7a736c
--- /dev/null
+++ b/cpp/src/qpid/client/Subscription.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Subscription.h"
+#include "SubscriptionImpl.h"
+#include "HandlePrivate.h"
+
+namespace qpid {
+namespace client {
+
+template class Handle<SubscriptionImpl>;
+
+
+std::string Subscription::getName() const { return impl->getName(); }
+std::string Subscription::getQueue() const { return impl->getQueue(); }
+const SubscriptionSettings& Subscription::getSettings() const { return impl->getSettings(); }
+void Subscription::setFlowControl(const FlowControl& f) { impl->setFlowControl(f); }
+void Subscription::setAutoAck(size_t n) { impl->setAutoAck(n); }
+SequenceSet Subscription::getUnacquired() const { return impl->getUnacquired(); }
+SequenceSet Subscription::getUnaccepted() const { return impl->getUnaccepted(); }
+void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messageIds); }
+void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); }
+Session Subscription::getSession() const { return impl->getSession(); }
+SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
+void Subscription::cancel() { impl->cancel(); }
+
+}} // namespace qpid::client
+
+
diff --git a/cpp/src/qpid/client/Subscription.h b/cpp/src/qpid/client/Subscription.h
new file mode 100644
index 0000000000..2ed56d4e8a
--- /dev/null
+++ b/cpp/src/qpid/client/Subscription.h
@@ -0,0 +1,99 @@
+#ifndef QPID_CLIENT_SUBSCRIPTION_H
+#define QPID_CLIENT_SUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/Handle.h"
+#include "qpid/client/Message.h"
+
+namespace qpid {
+namespace client {
+
+class SubscriptionImpl;
+class SubscriptionManager;
+
+/**
+ * A handle to an active subscription. Provides methods to query the subscription status
+ * and control acknowledgement (acquire and accept) of messages.
+ */
+class Subscription : public Handle<SubscriptionImpl> {
+ public:
+ Subscription(SubscriptionImpl* si=0) : Handle<SubscriptionImpl>(si) {}
+
+ /** The name of the subsctription, used as the "destination" for messages from the broker.
+ * Usually the same as the queue name but can be set differently.
+ */
+ std::string getName() const;
+
+ /** Name of the queue this subscription subscribes to */
+ std::string getQueue() const;
+
+ /** Get the flow control and acknowledgement settings for this subscription */
+ const SubscriptionSettings& getSettings() const;
+
+ /** Set the flow control parameters */
+ void setFlowControl(const FlowControl&);
+
+ /** Automatically acknowledge (acquire and accept) batches of n messages.
+ * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept()
+ * to manually acquire and accept messages.
+ */
+ void setAutoAck(unsigned int n);
+
+ /** Get the set of ID's for messages received by this subscription but not yet acquired.
+ * This will always be empty if getSettings().acquireMode=ACQUIRE_MODE_PRE_ACQUIRED
+ */
+ SequenceSet getUnacquired() const;
+
+ /** Get the set of ID's for messages received by this subscription but not yet accepted. */
+ SequenceSet getUnaccepted() const;
+
+ /** Acquire messageIds and remove them from the unacquired set.
+ * oAdd them to the unaccepted set if getSettings().acceptMode == ACCEPT_MODE_EXPLICIT.
+ */
+ void acquire(const SequenceSet& messageIds);
+
+ /** Accept messageIds and remove them from the unaccepted set.
+ *@pre messageIds is a subset of getUnaccepted()
+ */
+ void accept(const SequenceSet& messageIds);
+
+ /* Acquire a single message */
+ void acquire(const Message& m) { acquire(SequenceSet(m.getId())); }
+
+ /* Accept a single message */
+ void accept(const Message& m) { accept(SequenceSet(m.getId())); }
+
+ /** Get the session associated with this subscription */
+ Session getSession() const;
+
+ /** Get the subscription manager associated with this subscription */
+ SubscriptionManager& getSubscriptionManager() const;
+
+ /** Cancel the subscription. */
+ void cancel();
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTION_H*/
diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp
new file mode 100644
index 0000000000..3363dda11f
--- /dev/null
+++ b/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SubscriptionImpl.h"
+#include "SubscriptionManager.h"
+#include "SubscriptionSettings.h"
+
+namespace qpid {
+namespace client {
+
+using sys::Mutex;
+
+SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
+ : manager(m), name(n), queue(q), settings(s), listener(l)
+{
+ async(manager.getSession()).messageSubscribe(
+ arg::queue=queue,
+ arg::destination=name,
+ arg::acceptMode=settings.acceptMode,
+ arg::acquireMode=settings.acquireMode);
+ setFlowControl(settings.flowControl);
+}
+
+std::string SubscriptionImpl::getName() const { return name; }
+
+std::string SubscriptionImpl::getQueue() const { return queue; }
+
+const SubscriptionSettings& SubscriptionImpl::getSettings() const {
+ Mutex::ScopedLock l(lock);
+ return settings;
+}
+
+void SubscriptionImpl::setFlowControl(const FlowControl& f) {
+ Mutex::ScopedLock l(lock);
+ AsyncSession s=manager.getSession();
+ if (&settings.flowControl != &f) settings.flowControl = f;
+ s.messageSetFlowMode(name, f.window);
+ s.messageFlow(name, CREDIT_UNIT_MESSAGE, f.messages);
+ s.messageFlow(name, CREDIT_UNIT_BYTE, f.bytes);
+ s.sync();
+}
+
+void SubscriptionImpl::setAutoAck(size_t n) {
+ Mutex::ScopedLock l(lock);
+ settings.autoAck = n;
+}
+
+SequenceSet SubscriptionImpl::getUnacquired() const { Mutex::ScopedLock l(lock); return unacquired; }
+SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock); return unaccepted; }
+
+void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
+ Mutex::ScopedLock l(lock);
+ manager.getSession().messageAcquire(messageIds);
+ unacquired.remove(messageIds);
+ if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
+ unaccepted.add(messageIds);
+}
+
+void SubscriptionImpl::accept(const SequenceSet& messageIds) {
+ Mutex::ScopedLock l(lock);
+ manager.getSession().messageAccept(messageIds);
+ unaccepted.remove(messageIds);
+}
+
+Session SubscriptionImpl::getSession() const { return manager.getSession(); }
+
+SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; }
+
+void SubscriptionImpl::cancel() { manager.cancel(name); }
+
+void SubscriptionImpl::received(Message& m) {
+ Mutex::ScopedLock l(lock);
+ manager.getSession().markCompleted(m.getId(), false, false);
+ if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
+ unacquired.add(m.getId());
+ else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
+ unaccepted.add(m.getId());
+
+ if (listener) {
+ Mutex::ScopedUnlock u(lock);
+ listener->received(m);
+ }
+
+ if (settings.autoAck) {
+ if (unacquired.size() + unaccepted.size() >= settings.autoAck) {
+ if (unacquired.size()) {
+ async(manager.getSession()).messageAcquire(unacquired);
+ unaccepted.add(unacquired);
+ unaccepted.clear();
+ }
+ async(manager.getSession()).messageAccept(unaccepted);
+ unaccepted.clear();
+ }
+ }
+}
+
+}} // namespace qpid::client
+
diff --git a/cpp/src/qpid/client/SubscriptionImpl.h b/cpp/src/qpid/client/SubscriptionImpl.h
new file mode 100644
index 0000000000..44fd1a7d6c
--- /dev/null
+++ b/cpp/src/qpid/client/SubscriptionImpl.h
@@ -0,0 +1,99 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONIMPL_H
+#define QPID_CLIENT_SUBSCRIPTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace client {
+
+class SubscriptionManager;
+
+class SubscriptionImpl : public RefCounted, public MessageListener {
+ public:
+ SubscriptionImpl(SubscriptionManager&, const std::string& queue,
+ const SubscriptionSettings&, const std::string& name, MessageListener* =0);
+
+ /** The name of the subsctription, used as the "destination" for messages from the broker.
+ * Usually the same as the queue name but can be set differently.
+ */
+ std::string getName() const;
+
+ /** Name of the queue this subscription subscribes to */
+ std::string getQueue() const;
+
+ /** Get the flow control and acknowledgement settings for this subscription */
+ const SubscriptionSettings& getSettings() const;
+
+ /** Set the flow control parameters */
+ void setFlowControl(const FlowControl&);
+
+ /** Automatically acknowledge (acquire and accept) batches of n messages.
+ * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept()
+ * to manually acquire and accept messages.
+ */
+ void setAutoAck(size_t n);
+
+ /** Get the set of ID's for messages received by this subscription but not yet acquired.
+ * This will always be empty if acquireMode=ACQUIRE_MODE_PRE_ACQUIRED
+ */
+ SequenceSet getUnacquired() const;
+
+ /** Get the set of ID's for messages acquired by this subscription but not yet accepted. */
+ SequenceSet getUnaccepted() const;
+
+ /** Acquire messageIds and remove them from the un-acquired set for the session. */
+ void acquire(const SequenceSet& messageIds);
+
+ /** Accept messageIds and remove them from the un-acceptd set for the session. */
+ void accept(const SequenceSet& messageIds);
+
+ /** Get the session associated with this subscription */
+ Session getSession() const;
+
+ /** Get the subscription manager associated with this subscription */
+ SubscriptionManager& getSubscriptionManager() const;
+
+ /** Cancel the subscription. */
+ void cancel();
+
+ void received(Message&);
+
+ private:
+
+ mutable sys::Mutex lock;
+ SubscriptionManager& manager;
+ std::string name, queue;
+ SubscriptionSettings settings;
+ framing::SequenceSet unacquired, unaccepted;
+ MessageListener* listener;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTIONIMPL_H*/
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index dde93635c8..7e2f2f8595 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -22,6 +22,7 @@
#define _Subscription_
#include "SubscriptionManager.h"
+#include "SubscriptionImpl.h"
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
@@ -34,83 +35,41 @@ namespace qpid {
namespace client {
SubscriptionManager::SubscriptionManager(const Session& s)
- : dispatcher(s), session(s),
- flowControl(UNLIMITED, UNLIMITED, false),
- acceptMode(0), acquireMode(0),
- autoStop(true)
+ : dispatcher(s), session(s), autoStop(true)
{}
-void SubscriptionManager::subscribeInternal(
- const std::string& q, const std::string& dest, const FlowControl& fc)
+Subscription SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
{
- session.messageSubscribe(
- arg::queue=q, arg::destination=dest,
- arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- if (fc.messages || fc.bytes) // No need to set if all 0.
- setFlowControl(dest, fc);
+ std::string name=n.empty() ? q:n;
+ boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
+ dispatcher.listen(si);
+ return subscriptions[name] = Subscription(si.get());
}
-void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
{
- subscribe(listener, q, getFlowControl(), d);
+ std::string name=n.empty() ? q:n;
+ lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
+ boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
+ lq.subscription = Subscription(si.get());
+ return subscriptions[name] = lq.subscription;
}
-void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const std::string& n)
{
- std::string dest=d.empty() ? q:d;
- dispatcher.listen(dest, &listener, autoAck);
- return subscribeInternal(q, dest, fc);
+ return subscribe(listener, q, defaultSettings, n);
}
-void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const std::string& n)
{
- subscribe(lq, q, getFlowControl(), d);
+ return subscribe(lq, q, defaultSettings, n);
}
-void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
-{
- std::string dest=d.empty() ? q:d;
- lq.session=session;
- lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
- return subscribeInternal(q, dest, fc);
-}
-
-void SubscriptionManager::setFlowControl(
- const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
-{
- session.messageSetFlowMode(dest, window);
- session.messageFlow(dest, 0, messages);
- session.messageFlow(dest, 1, bytes);
- session.sync();
-}
-
-void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
- setFlowControl(dest, fc.messages, fc.bytes, fc.window);
-}
-
-void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
-
-void SubscriptionManager::setFlowControl(
- uint32_t messages_, uint32_t bytes_, bool window_)
-{
- setFlowControl(FlowControl(messages_, bytes_, window_));
-}
-
-const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
-
-void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
-
-void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
-
-void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
-
-AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; }
-
-void SubscriptionManager::cancel(const std::string dest)
+void SubscriptionManager::cancel(const std::string& dest)
{
sync(session).messageCancel(dest);
dispatcher.cancel(dest);
@@ -138,10 +97,11 @@ void SubscriptionManager::stop()
bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
LocalQueue lq;
std::string unique = framing::Uuid(true).str();
- subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+ subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique);
AutoCancel ac(*this, unique);
//first wait for message to be delivered if a timeout has been specified
- if (timeout && lq.get(result, timeout)) return true;
+ if (timeout && lq.get(result, timeout))
+ return true;
//make sure message is not on queue before final check
sync(session).messageFlush(unique);
return lq.get(result, 0);
@@ -149,6 +109,10 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du
Session SubscriptionManager::getSession() const { return session; }
+Subscription SubscriptionManager::getSubscription(const std::string& name) const {
+ return subscriptions.at(name);
+}
+
void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
dispatcher.registerFailoverHandler(fh);
}
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 07faa48fee..8b27a2c9b9 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -25,9 +25,10 @@
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Completion.h>
#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
-#include <qpid/client/FlowControl.h>
+#include <qpid/client/Subscription.h>
#include <qpid/sys/Runnable.h>
#include <set>
#include <sstream>
@@ -48,15 +49,10 @@ class SubscriptionManager : public sys::Runnable
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&);
-
qpid::client::Dispatcher dispatcher;
qpid::client::AsyncSession session;
- FlowControl flowControl;
- AckPolicy autoAck;
- bool acceptMode;
- bool acquireMode;
bool autoStop;
+ SubscriptionSettings defaultSettings;
public:
/** Create a new SubscriptionManager associated with a session */
@@ -70,14 +66,13 @@ class SubscriptionManager : public sys::Runnable
*
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
- *@param flow initial FlowControl for the subscription.
- *@param tag Unique destination tag for the listener.
- * If not specified, the queue name is used.
+ *@param settings settings for the subscription.
+ *@param name unique destination name for the subscription, defaults to queue name.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const FlowControl& flow,
- const std::string& tag=std::string());
+ Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -86,13 +81,13 @@ class SubscriptionManager : public sys::Runnable
*
*@param queue Name of the queue to subscribe to.
*@param flow initial FlowControl for the subscription.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
- const std::string& queue,
- const FlowControl& flow,
- const std::string& tag=std::string());
+ Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
/**
* Subscribe a MessagesListener to receive messages from queue.
@@ -102,12 +97,12 @@ class SubscriptionManager : public sys::Runnable
*
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const std::string& tag=std::string());
+ Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& name=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -115,12 +110,12 @@ class SubscriptionManager : public sys::Runnable
* Incoming messages are stored in the queue for you to retrieve.
*
*@param queue Name of the queue to subscribe to.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
- const std::string& queue,
- const std::string& tag=std::string());
+ Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const std::string& name=std::string());
/** Get a single message from a queue.
@@ -131,8 +126,13 @@ class SubscriptionManager : public sys::Runnable
*/
bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
- /** Cancel a subscription. */
- void cancel(const std::string tag);
+ /** Get a subscription by name, returns a null Subscription handle
+ * if not found.
+ */
+ Subscription getSubscription(const std::string& name) const;
+
+ /** Cancel a subscription. See also: Subscription.cancel() */
+ void cancel(const std::string& name);
/** Deliver messages in the current thread until stop() is called.
* Only one thread may be running in a SubscriptionManager at a time.
@@ -157,53 +157,65 @@ class SubscriptionManager : public sys::Runnable
static const uint32_t UNLIMITED=0xFFFFFFFF;
- /** Set the flow control for destination. */
- void setFlowControl(const std::string& destintion, const FlowControl& flow);
-
- /** Set the default initial flow control for subscriptions that do not specify it. */
- void setFlowControl(const FlowControl& flow);
+ /** Set the flow control for a subscription. */
+ void setFlowControl(const std::string& name, const FlowControl& flow) {
+ getSubscription(name).setFlowControl(flow);
+ }
- /** Get the default flow control for new subscriptions that do not specify it. */
- const FlowControl& getFlowControl() const;
-
- /** Set the flow control for destination tag.
- *@param tag: name of the destination.
+ /** Set the flow control for a subscription.
+ *@param name: name of the subscription.
*@param messages: message credit.
*@param bytes: byte credit.
*@param window: if true use window-based flow control.
*/
- void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true);
+ void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true) {
+ setFlowControl(name, messages, bytes, window);
+ }
- /** Set the initial flow control settings to be applied to each new subscribtion.
- *@param messages: message credit.
- *@param bytes: byte credit.
- *@param window: if true use window-based flow control.
+ /** Set the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; }
+
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
*/
- void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
+ const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; }
- /** Set the accept-mode for new subscriptions. Defaults to true.
- *@param required: if true messages must be confirmed by calling
- *Message::acknowledge() or automatically via an AckPolicy, see setAckPolicy()
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
*/
- void setAcceptMode(bool required);
+ SubscriptionSettings& getDefaultSettings() { return defaultSettings; }
- /** Set the acquire-mode for new subscriptions. Defaults to false.
- *@param acquire: if false messages pre-acquired, if true
- * messages are dequed on acknowledgement or on transfer
- * depending on acceptMode.
+ /**
+ * Set the default flow control settings for subscribe() calls
+ * that don't include a SubscriptionSettings parameter.
+ *
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
*/
- void setAcquireMode(bool acquire);
+ void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) {
+ defaultSettings.flowControl = FlowControl(messages, bytes, window);
+ }
- /** Set the acknowledgement policy for new subscriptions.
- * Default is to acknowledge every message automatically.
+ /**
+ *Set the default accept-mode for subscribe() calls that don't
+ *include a SubscriptionSettings parameter.
*/
- void setAckPolicy(const AckPolicy& autoAck);
+ void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; }
- AckPolicy& getAckPolicy();
+ /**
+ * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings.
+ */
+ void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; }
void registerFailoverHandler ( boost::function<void ()> fh );
Session getSession() const;
+
+ private:
+ std::map<std::string, Subscription> subscriptions;
};
/** AutoCancel cancels a subscription in its destructor */
diff --git a/cpp/src/qpid/client/SubscriptionSettings.h b/cpp/src/qpid/client/SubscriptionSettings.h
new file mode 100644
index 0000000000..924814c809
--- /dev/null
+++ b/cpp/src/qpid/client/SubscriptionSettings.h
@@ -0,0 +1,62 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONSETTINGS_H
+#define QPID_CLIENT_SUBSCRIPTIONSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/FlowControl.h"
+#include "qpid/framing/enum.h"
+
+namespace qpid {
+namespace client {
+
+/** Bring AMQP enum definitions for message class into this namespace. */
+using namespace qpid::framing::message;
+
+/**
+ * Settings for a subscription.
+ */
+struct SubscriptionSettings
+{
+ SubscriptionSettings(
+ FlowControl flow=FlowControl::unlimited(),
+ AcceptMode accept=ACCEPT_MODE_EXPLICIT,
+ AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
+ unsigned int autoAck_=1
+ ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_) {}
+
+ FlowControl flowControl; ///@< Flow control settings. @see FlowControl
+ AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
+ AcquireMode acquireMode; ///@< ACQUIRE_MODE_PRE_ACQUIRED or ACQUIRE_MODE_NOT_ACQUIRED
+
+ /** Automatically acknowledge (acquire and accept) batches of autoAck messages.
+ * 0 means no automatic acknowledgement. What it means to "acknowledge" a message depends on
+ * acceptMode and acquireMode:
+ * - ACCEPT_MODE_NONE and ACQUIRE_MODE_PRE_ACQUIRED: do nothing
+ * - ACCEPT_MODE_NONE and ACQUIRE_MODE_NOT_ACQUIRED: send an "acquire" command
+ * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED: send "accept" command
+ * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_NOT_ACQUIRED: send "acquire" and "accept" commands
+ */
+ unsigned int autoAck;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTIONSETTINGS_H*/