summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerObservers.h18
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObservers.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Observers.h32
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp78
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h16
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFactory.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueObservers.h77
-rw-r--r--qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueSnapshots.h8
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp6
13 files changed, 138 insertions, 116 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerObservers.h b/qpid/cpp/src/qpid/broker/BrokerObservers.h
index 8a9bdd4fd4..67427adcb1 100644
--- a/qpid/cpp/src/qpid/broker/BrokerObservers.h
+++ b/qpid/cpp/src/qpid/broker/BrokerObservers.h
@@ -24,19 +24,14 @@
#include "BrokerObserver.h"
#include "Observers.h"
-#include "qpid/sys/Mutex.h"
namespace qpid {
namespace broker {
/**
- * A broker observer that delegates to a collection of broker observers.
- *
- * THREAD SAFE
+ * Collection of BrokerObserver.
*/
-class BrokerObservers : public BrokerObserver,
- public Observers<BrokerObserver>
-{
+class BrokerObservers : public Observers<BrokerObserver> {
public:
void queueCreate(const boost::shared_ptr<Queue>& q) {
each(boost::bind(&BrokerObserver::queueCreate, _1, q));
@@ -54,15 +49,13 @@ class BrokerObservers : public BrokerObserver,
const boost::shared_ptr<Queue>& queue,
const std::string& key,
const framing::FieldTable& args) {
- each(boost::bind(
- &BrokerObserver::bind, _1, exchange, queue, key, args));
+ each(boost::bind(&BrokerObserver::bind, _1, exchange, queue, key, args));
}
void unbind(const boost::shared_ptr<Exchange>& exchange,
const boost::shared_ptr<Queue>& queue,
const std::string& key,
const framing::FieldTable& args) {
- each(boost::bind(
- &BrokerObserver::unbind, _1, exchange, queue, key, args));
+ each(boost::bind(&BrokerObserver::unbind, _1, exchange, queue, key, args));
}
void startTx(const boost::intrusive_ptr<TxBuffer>& tx) {
each(boost::bind(&BrokerObserver::startTx, _1, tx));
@@ -70,6 +63,9 @@ class BrokerObservers : public BrokerObserver,
void startDtx(const boost::intrusive_ptr<DtxBuffer>& dtx) {
each(boost::bind(&BrokerObserver::startDtx, _1, dtx));
}
+
+ private:
+ template <class F> void each(F f) { Observers<BrokerObserver>::each(f); }
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
index e9014c80c3..8b9fb67aa5 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionObservers.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
@@ -30,11 +30,8 @@ namespace broker {
/**
* A collection of connection observers.
- * Calling a ConnectionObserver function will call that function on each observer.
- * THREAD SAFE.
*/
-class ConnectionObservers : public ConnectionObserver,
- public Observers<ConnectionObserver>
+class ConnectionObservers : public Observers<ConnectionObserver>
{
public:
void connection(Connection& c) {
diff --git a/qpid/cpp/src/qpid/broker/Observers.h b/qpid/cpp/src/qpid/broker/Observers.h
index d50c21e559..b7b26a0d38 100644
--- a/qpid/cpp/src/qpid/broker/Observers.h
+++ b/qpid/cpp/src/qpid/broker/Observers.h
@@ -24,7 +24,7 @@
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
-#include <vector>
+#include <set>
#include <algorithm>
namespace qpid {
@@ -37,19 +37,21 @@ template <class Observer>
class Observers
{
public:
- void add(boost::shared_ptr<Observer> observer) {
+ typedef boost::shared_ptr<Observer> ObserverPtr;
+
+ void add(const ObserverPtr& observer) {
sys::Mutex::ScopedLock l(lock);
- observers.push_back(observer);
+ observers.insert(observer);
}
- void remove(boost::shared_ptr<Observer> observer) {
+ void remove(const ObserverPtr& observer) {
sys::Mutex::ScopedLock l(lock);
- typename List::iterator i = std::find(observers.begin(), observers.end(), observer);
- observers.erase(i);
+ observers.erase(observer) ;
}
+ /** Iterate over the observers. */
template <class F> void each(F f) {
- List copy;
+ Set copy; // Make a copy and iterate outside the lock.
{
sys::Mutex::ScopedLock l(lock);
copy = observers;
@@ -58,10 +60,20 @@ class Observers
}
protected:
- typedef std::vector<boost::shared_ptr<Observer> > List;
+ typedef std::set<ObserverPtr> Set;
+ Observers() : lock(myLock) {}
+
+ /** Specify a lock for the Observers to use */
+ Observers(sys::Mutex& l) : lock(l) {}
+
+ /** Iterate over the observers without taking the lock, caller must hold the lock */
+ template <class F> void each(F f, const sys::Mutex::ScopedLock&) {
+ std::for_each(observers.begin(), observers.end(), f);
+ }
- sys::Mutex lock;
- List observers;
+ mutable sys::Mutex myLock;
+ mutable sys::Mutex& lock;
+ Set observers;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 3eaab30394..74d23fdabf 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -195,6 +195,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
persistenceId(0),
settings(b ? merge(_settings, b->getOptions()) : _settings),
eventMode(0),
+ observers(name, messageLock),
broker(b),
deleted(false),
barrier(*this),
@@ -988,68 +989,38 @@ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, Sc
{
current -= QueueDepth(1, msg.getMessageSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->dequeued(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.dequeued(msg, lock);
if (autodelete && isEmpty(lock)) autodelete->check(lock);
}
/** updates queue observers when a message has become unavailable for transfer.
* Requires messageLock be held by caller.
*/
-void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&)
+void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->acquired(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
- }
- }
+ observers.acquired(msg, l);
}
/** updates queue observers when a message has become re-available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&)
+void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->requeued(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.requeued(msg, l);
}
/** updates queue observers when a new consumer has subscribed to this queue.
*/
-void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerAdded(c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
- }
- }
+ observers.consumerAdded(c, l);
}
/** updates queue observers when a consumer has unsubscribed from this queue.
*/
-void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerRemoved(c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
- }
- }
+ observers.consumerRemoved(c, l);
}
@@ -1133,12 +1104,9 @@ void Queue::destroyed()
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
{
- Mutex::ScopedLock lock(messageLock);
- for_each(observers.begin(), observers.end(),
- boost::bind(&QueueObserver::destroy, _1));
- observers.clear();
+ Mutex::ScopedLock l(messageLock);
+ observers.destroy(l);
}
-
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
if (brokerMgmtObject)
@@ -1513,15 +1481,9 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
/** updates queue observers and state when a message has become available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&)
+void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l)
{
- for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
- try {
- (*i)->enqueued(m);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.enqueued(m, l);
mgntEnqStats(m, mgmtObject, brokerMgmtObject);
}
@@ -1538,18 +1500,6 @@ bool Queue::isDeleted() const
return deleted;
}
-void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
-{
- Mutex::ScopedLock lock(messageLock);
- observers.insert(observer);
-}
-
-void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
-{
- Mutex::ScopedLock lock(messageLock);
- observers.erase(observer);
-}
-
void Queue::flush()
{
ScopedUse u(barrier);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 3622b06dbd..6e7b17e1c5 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -31,7 +31,7 @@
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
-#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/QueueObservers.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/TxOp.h"
@@ -169,7 +169,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool eligible;
};
- typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
typedef boost::function1<void, Message&> MessageFunctor;
@@ -211,7 +210,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject;
sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
- Observers observers;
+ QueueObservers observers;
MessageInterceptors interceptors;
std::string seqNoKey;
Broker* broker;
@@ -446,12 +445,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bindings.eachBinding(f);
}
- /** Apply f to each Observer on the queue */
- template <class F> void eachObserver(F f) {
- sys::Mutex::ScopedLock l(messageLock);
- std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
- }
-
/**
* Set the sequence number for the back of the queue, the
* next message enqueued will be pos+1.
@@ -484,11 +477,12 @@ class Queue : public boost::enable_shared_from_this<Queue>,
SubscriptionType type=CONSUMER
);
- QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
- QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>);
+
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
QPID_BROKER_EXTERN MessageInterceptors& getMessageInterceptors() { return interceptors; }
+ QPID_BROKER_EXTERN QueueObservers& getObservers() { return observers; }
+
/**
* Notify queue that recovery has completed.
*/
diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp
index e5d9431555..d31b120cae 100644
--- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp
@@ -90,7 +90,7 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que
if (settings.groupKey.size()) {
boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( name, *(queue->messages), settings));
queue->allocator = mgm;
- queue->addObserver(mgm);
+ queue->getObservers().add(mgm);
} else {
queue->allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *(queue->messages) ));
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 3a609d67f1..9c215d197f 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -244,7 +244,7 @@ void QueueFlowLimit::observe(Queue& queue)
}
/* set up the observer */
- queue.addObserver(shared_from_this());
+ queue.getObservers().add(shared_from_this());
}
/** returns ptr to a QueueFlowLimit, else 0 if no limit */
diff --git a/qpid/cpp/src/qpid/broker/QueueObservers.h b/qpid/cpp/src/qpid/broker/QueueObservers.h
new file mode 100644
index 0000000000..1bf5f1f696
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueueObservers.h
@@ -0,0 +1,77 @@
+#ifndef QPID_BROKER_QUEUEOBSERVERS_H
+#define QPID_BROKER_QUEUEOBSERVERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Observers.h"
+#include "QueueObserver.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A collection of queue observers.
+ */
+class QueueObservers : public Observers<QueueObserver> {
+ public:
+ typedef Observers<QueueObserver> Base;
+
+ // The only public functions are inherited from Observers<QueueObserver>
+ using Base::each; // Avoid function hiding.
+
+ friend class Queue;
+
+ typedef const sys::Mutex::ScopedLock& Lock;
+
+ QueueObservers(const std::string& q, sys::Mutex& lock) : Base(lock), qname(q) {}
+
+ template <class T> void each(void (QueueObserver::*f)(const T&), const T& arg, const char* fname, Lock l) {
+ Base::each(boost::bind(&QueueObservers::wrap<T>, this, f, boost::cref(arg), fname, _1), l);
+ }
+
+ template <class T> void wrap(void (QueueObserver::*f)(const T&), const T& arg, const char* fname, const ObserverPtr& o) {
+ try { (o.get()->*f)(arg); }
+ catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on " << fname << " for queue " << qname << ": " << e.what());
+ }
+ }
+
+
+ // Calls are locked externally by caller.
+ void enqueued(const Message& m, Lock l) { each(&QueueObserver::enqueued, m, "enqueue", l); }
+ void dequeued(const Message& m, Lock l) { each(&QueueObserver::dequeued, m, "dequeue", l); }
+ void acquired(const Message& m, Lock l) { each(&QueueObserver::acquired, m, "acquire", l); }
+ void requeued(const Message& m, Lock l) { each(&QueueObserver::requeued, m, "requeue", l); }
+ void consumerAdded(const Consumer& c, Lock l) { each(&QueueObserver::consumerAdded, c, "consumer added", l); }
+ void consumerRemoved(const Consumer& c, Lock l) { each(&QueueObserver::consumerRemoved, c, "consumer removed", l); }
+ void destroy(Lock l) {
+ Base::each(boost::bind(&QueueObserver::destroy, _1), l);
+ observers.clear();
+ }
+
+ std::string qname;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUEOBSERVERS_H*/
diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
index afb9d9ff4e..41d20342c8 100644
--- a/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
+++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
@@ -93,7 +93,7 @@ void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& a
boost::shared_ptr<QueueObserver> observer(
new ThresholdAlerts(queue.getName(), agent, ctu, ctd, stu, std, (_ctd == 0 && _std == 0))
);
- queue.addObserver(observer);
+ queue.getObservers().add(observer);
}
}
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index d2792e5e17..6ffd53ff21 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -55,8 +55,8 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
info.printId(os) << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
- queue.addObserver(observer);
- // Set first after calling addObserver so we know that the back of the
+ queue.getObservers().add(observer);
+ // Set first after adding the observer so we know that the back of the
// queue+1 is (or will be) a guarded position.
QueuePosition front, back;
q.getRange(front, back, broker::REPLICATOR);
@@ -86,7 +86,7 @@ void QueueGuard::dequeued(const Message& m) {
}
void QueueGuard::cancel() {
- queue.removeObserver(observer);
+ queue.getObservers().remove(observer);
Mutex::ScopedLock l(lock);
if (cancelled) return;
QPID_LOG(debug, logPrefix << "Cancelled");
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 50f2ececdb..b43658365c 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -175,7 +175,7 @@ void QueueReplicator::activate() {
boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
// Enable callback to destroy()
- queue->addObserver(
+ queue->getObservers().add(
boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
}
diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshots.h b/qpid/cpp/src/qpid/ha/QueueSnapshots.h
index 8711f9234c..6612c71f6a 100644
--- a/qpid/cpp/src/qpid/ha/QueueSnapshots.h
+++ b/qpid/cpp/src/qpid/ha/QueueSnapshots.h
@@ -45,18 +45,14 @@ class QueueSnapshots : public broker::BrokerObserver
public:
boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
boost::shared_ptr<QueueSnapshot> qs;
- q->eachObserver(
+ q->getObservers().each(
boost::bind(QueueSnapshots::saveQueueSnapshot, _1, boost::ref(qs)));
return qs;
}
// BrokerObserver overrides.
void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
- q->addObserver(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
- }
-
- void queueDestroy(const boost::shared_ptr<broker::Queue>& q) {
- q->removeObserver(get(q));
+ q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
}
private:
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 95215e1e59..2db7845067 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -150,12 +150,12 @@ void ReplicatingSubscription::initialize() {
// However we must attach the observer _before_ we snapshot for
// initial dequeues to be sure we don't miss any dequeues
// between the snapshot and attaching the observer.
- queue->addObserver(
+ queue->getObservers().add(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
// There may be no snapshot if the queue is being deleted concurrently.
if (!snapshot) {
- queue->removeObserver(
+ queue->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
}
@@ -254,7 +254,7 @@ void ReplicatingSubscription::cancel()
}
QPID_LOG(debug, logPrefix << "Cancelled");
if (primary) primary->removeReplica(*this);
- getQueue()->removeObserver(
+ getQueue()->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
guard->cancel();
ConsumerImpl::cancel();