diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Observers.h | 12 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 23 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/IdSetter.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 12 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 40 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueSnapshot.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueSnapshots.h | 70 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 17 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 53 |
13 files changed, 89 insertions, 158 deletions
diff --git a/qpid/cpp/src/qpid/broker/Observers.h b/qpid/cpp/src/qpid/broker/Observers.h index b7b26a0d38..5357938c77 100644 --- a/qpid/cpp/src/qpid/broker/Observers.h +++ b/qpid/cpp/src/qpid/broker/Observers.h @@ -59,6 +59,14 @@ class Observers std::for_each(copy.begin(), copy.end(), f); } + template <class T> boost::shared_ptr<T> findType() const { + sys::Mutex::ScopedLock l(lock); + typename Set::const_iterator i = + std::find_if(observers.begin(), observers.end(), &isA<T>); + return i == observers.end() ? + boost::shared_ptr<T>() : boost::dynamic_pointer_cast<T>(*i); + } + protected: typedef std::set<ObserverPtr> Set; Observers() : lock(myLock) {} @@ -71,6 +79,10 @@ class Observers std::for_each(observers.begin(), observers.end(), f); } + template <class T> static bool isA(const ObserverPtr&o) { + return boost::dynamic_pointer_cast<T>(o); + } + mutable sys::Mutex myLock; mutable sys::Mutex& lock; Set observers; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 1587b5b33f..0737701431 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -892,7 +892,7 @@ void BrokerReplicator::disconnected() { // Make copy of exchanges so we can work outside the registry lock. ExchangeVector exs; - exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1)); + exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, boost::ref(exs), _1)); for_each(exs.begin(), exs.end(), boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1)); } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 61561b3af6..50e99ef527 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -22,17 +22,18 @@ #include "BackupConnectionExcluder.h" #include "ConnectionObserver.h" #include "HaBroker.h" +#include "IdSetter.h" #include "Primary.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "Settings.h" #include "StandAlone.h" #include "QueueSnapshot.h" -#include "QueueSnapshots.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/assert.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/BrokerObserver.h" #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SignalHandler.h" @@ -60,6 +61,20 @@ using sys::Mutex; using boost::shared_ptr; using boost::dynamic_pointer_cast; +// In a HaBroker we always need to add QueueSnapshot and IdSetter to each queue +// because we don't know in advance which queues might be used for stand-alone +// replication. +// +// TODO aconway 2013-12-13: Can we restrict this to queues identified as replicated? +// +class HaBroker::BrokerObserver : public broker::BrokerObserver { + public: + void queueCreate(const boost::shared_ptr<broker::Queue>& q) { + q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot)); + q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter)); + } +}; + // Called in Plugin::earlyInitialize HaBroker::HaBroker(broker::Broker& b, const Settings& s) : systemId(b.getSystem()->getSystemId().data()), @@ -69,8 +84,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) observer(new ConnectionObserver(*this, systemId)), role(new StandAlone), membership(BrokerInfo(systemId, STANDALONE), *this), - failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)), - queueSnapshots(shared_ptr<QueueSnapshots>(new QueueSnapshots)) + failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)) { // If we are joining a cluster we must start excluding clients now, // otherwise there's a window for a client to connect before we get to @@ -82,8 +96,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) broker.getConnectionObservers().add(observer); broker.getExchanges().registerExchange(failoverExchange); } - // QueueSnapshots are needed for standalone replication as well as cluster. - broker.getBrokerObservers().add(queueSnapshots); + broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver())); } namespace { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index d10014846c..9fadd4f35c 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -54,8 +54,6 @@ class Backup; class ConnectionObserver; class Primary; class Role; -class QueueSnapshot; -class QueueSnapshots; class QueueReplicator; /** @@ -98,14 +96,14 @@ class HaBroker : public management::Manageable void setAddress(const Address&); // set self address from a self-connection - boost::shared_ptr<QueueSnapshots> getQueueSnapshots() { return queueSnapshots; } - boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName); /** Authenticated user ID for queue create/delete */ std::string getUserId() const { return userId; } private: + class BrokerObserver; + void setPublicUrl(const Url&); void setBrokerUrl(const Url&); void updateClientUrl(sys::Mutex::ScopedLock&); @@ -129,7 +127,6 @@ class HaBroker : public management::Manageable boost::shared_ptr<Role> role; Membership membership; boost::shared_ptr<FailoverExchange> failoverExchange; - boost::shared_ptr<QueueSnapshots> queueSnapshots; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h index dc9fa90af9..67da62ef48 100644 --- a/qpid/cpp/src/qpid/ha/IdSetter.h +++ b/qpid/cpp/src/qpid/ha/IdSetter.h @@ -43,15 +43,11 @@ namespace ha { class IdSetter : public broker::MessageInterceptor { public: - IdSetter(const std::string& q, ReplicationId firstId) : nextId(firstId), name(q) { - QPID_LOG(trace, "Initial replication ID for " << name << " =" << nextId.get()); - } - + IdSetter(ReplicationId firstId=1) : nextId(firstId) {} void record(broker::Message& m) { m.setReplicationId(nextId++); } private: sys::AtomicValue<uint32_t> nextId; - std::string name; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 0c0fe983bb..b437190004 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -104,8 +104,6 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : QueueReplicator::copy(hb.getBroker().getExchanges(), qrs); std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1)); - broker::QueueRegistry& queues = hb.getBroker().getQueues(); - queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1)); if (expect.empty()) { QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups."); } @@ -140,15 +138,6 @@ Primary::~Primary() { haBroker.getObserver()->reset(); } -void Primary::initializeQueue(boost::shared_ptr<broker::Queue> q) { - if (replicationTest.useLevel(*q) == ALL) { - boost::shared_ptr<QueueReplicator> qr = haBroker.findQueueReplicator(q->getName()); - ReplicationId firstId = qr ? qr->getMaxId()+1 : ReplicationId(1); - q->getMessageInterceptors().add( - boost::shared_ptr<IdSetter>(new IdSetter(q->getName(), firstId))); - } -} - void Primary::checkReady() { bool activate = false; { @@ -261,7 +250,6 @@ void Primary::queueCreate(const QueuePtr& q) { if (level) { QPID_LOG(debug, logPrefix << "Created queue " << q->getName() << " replication: " << printable(level)); - initializeQueue(q); // Give each queue a unique id. Used by backups to avoid confusion of // same-named queues. q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 7f98f06fec..e0a7065e2c 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -125,7 +125,6 @@ class Primary : public Role RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&); void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&); - void initializeQueue(boost::shared_ptr<broker::Queue>); void checkReady(); void checkReady(RemoteBackupPtr); void setCatchupQueues(const RemoteBackupPtr&, bool createGuards); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index b43658365c..eda3f96180 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -21,8 +21,9 @@ #include "Event.h" #include "HaBroker.h" +#include "IdSetter.h" #include "QueueReplicator.h" -#include "QueueSnapshots.h" +#include "QueueSnapshot.h" #include "ReplicatingSubscription.h" #include "Settings.h" #include "types.h" @@ -122,6 +123,11 @@ QueueReplicator::QueueReplicator(HaBroker& hb, settings(hb.getSettings()), nextId(0), maxId(0) { + // The QueueReplicator will take over setting replication IDs. + boost::shared_ptr<IdSetter> setter = + q->getMessageInterceptors().findType<IdSetter>(); + if (setter) q->getMessageInterceptors().remove(setter); + args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); @@ -212,8 +218,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType()); arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize? arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable()); - arguments.setString(ReplicatingSubscription::QPID_ID_SET, - encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot())); + boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>(); + if (qs) arguments.setString(ReplicatingSubscription::QPID_ID_SET, encodeStr(qs->getSnapshot())); + try { peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, @@ -254,6 +261,7 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) { } // Called in connection thread of the queues bridge to primary. + void QueueReplicator::route(Deliverable& deliverable) { try { @@ -293,11 +301,6 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) { nextId = decodeStr<IdEvent>(data).id; } -ReplicationId QueueReplicator::getMaxId() { - Mutex::ScopedLock l(lock); - return maxId; -} - void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) { if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) { // If the queue is destroyed at the same time we are subscribing, we may @@ -320,14 +323,19 @@ bool QueueReplicator::hasBindings() { return false; } std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; } void QueueReplicator::promoted() { - // Promoted to primary, deal with auto-delete now. - if (queue && queue->isAutoDelete() && subscribed) { - // Make a temporary shared_ptr to prevent premature deletion of queue. - // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue - // which could delete the queue while it's still running it's destroyed logic. - boost::shared_ptr<Queue> q(queue); - q->releaseFromUse(); - q->scheduleAutoDelete(); + if (queue) { + // On primary QueueReplicator no longer sets IDs, start an IdSetter. + queue->getMessageInterceptors().add( + boost::shared_ptr<IdSetter>(new IdSetter(maxId+1))); + // Process auto-deletes + if (queue->isAutoDelete() && subscribed) { + // Make a temporary shared_ptr to prevent premature deletion of queue. + // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue + // which could delete the queue while it's still running it's destroyed logic. + boost::shared_ptr<Queue> q(queue); + q->releaseFromUse(); + q->scheduleAutoDelete(); + } } } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 8938285fe3..a86355f194 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -85,8 +85,6 @@ class QueueReplicator : public broker::Exchange, boost::shared_ptr<broker::Queue> getQueue() const { return queue; } - ReplicationId getMaxId(); - // No-op unused Exchange virtual functions. bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshot.h b/qpid/cpp/src/qpid/ha/QueueSnapshot.h index 5b1054d934..577bd96ef7 100644 --- a/qpid/cpp/src/qpid/ha/QueueSnapshot.h +++ b/qpid/cpp/src/qpid/ha/QueueSnapshot.h @@ -53,7 +53,7 @@ class QueueSnapshot : public broker::QueueObserver void requeued(const broker::Message&) {} - ReplicationIdSet snapshot() { + ReplicationIdSet getSnapshot() { sys::Mutex::ScopedLock l(lock); return set; } diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshots.h b/qpid/cpp/src/qpid/ha/QueueSnapshots.h deleted file mode 100644 index 6612c71f6a..0000000000 --- a/qpid/cpp/src/qpid/ha/QueueSnapshots.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef QPID_HA_QUEUESNAPSHOTS_H -#define QPID_HA_QUEUESNAPSHOTS_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include "QueueSnapshot.h" -#include "hash.h" - -#include "qpid/assert.h" -#include "qpid/broker/BrokerObserver.h" -#include "qpid/broker/Queue.h" -#include "qpid/sys/Mutex.h" - -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace ha { - -/** - * BrokerObserver that maintains a map of the QueueSnapshot for each queue. - * THREAD SAFE. - */ -class QueueSnapshots : public broker::BrokerObserver -{ - public: - boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const { - boost::shared_ptr<QueueSnapshot> qs; - q->getObservers().each( - boost::bind(QueueSnapshots::saveQueueSnapshot, _1, boost::ref(qs))); - return qs; - } - - // BrokerObserver overrides. - void queueCreate(const boost::shared_ptr<broker::Queue>& q) { - q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot)); - } - - private: - static void saveQueueSnapshot( - const boost::shared_ptr<broker::QueueObserver>& observer, - boost::shared_ptr<QueueSnapshot>& out) - { - if (!out) out = boost::dynamic_pointer_cast<QueueSnapshot>(observer); - } -}; - - -}} // namespace qpid::ha - -#endif /*!QPID_HA_QUEUESNAPSHOTS_H*/ diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 2db7845067..635d5047bd 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -22,7 +22,7 @@ #include "Event.h" #include "IdSetter.h" #include "QueueGuard.h" -#include "QueueSnapshots.h" +#include "QueueSnapshot.h" #include "ReplicatingSubscription.h" #include "TxReplicatingSubscription.h" #include "Primary.h" @@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize() { info.printId(os) << ": "; logPrefix = os.str(); - // If this is a non-cluster standalone replication then we need to - // set up an IdSetter if there is not already one. - boost::shared_ptr<IdSetter> idSetter; - queue->getMessageInterceptors().each( - boost::bind(©If, _1, boost::ref(idSetter))); - if (!idSetter) { - QPID_LOG(debug, logPrefix << "Standalone replication"); - queue->getMessageInterceptors().add( - boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1))); - } - // If there's already a guard (we are in failover) use it, else create one. if (primary) guard = primary->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); @@ -152,14 +141,14 @@ void ReplicatingSubscription::initialize() { // between the snapshot and attaching the observer. queue->getObservers().add( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); - boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue); + boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>(); // There may be no snapshot if the queue is being deleted concurrently. if (!snapshot) { queue->getObservers().remove( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted"); } - ReplicationIdSet primaryIds = snapshot->snapshot(); + ReplicationIdSet primaryIds = snapshot->getSnapshot(); std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET); ReplicationIdSet backupIds; if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 1a5d6ddff8..7db24810bf 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -272,33 +272,34 @@ class ReplicationTests(HaBrokerTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" - l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. - try: - primary = HaBroker(self, name="primary", ha_cluster=False, - args=["--ha-queue-replication=yes"]); - pc = primary.connect() - ps = pc.session().sender("q;{create:always}") - pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False, - args=["--ha-queue-replication=yes"]) - br = backup.connect().session().receiver("q;{create:always}") + primary = HaBroker(self, name="primary", ha_cluster=False, + args=["--ha-queue-replication=yes"]); + pc = primary.connect() + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False, + args=["--ha-queue-replication=yes"]) + bs = backup.connect().session() + br = bs.receiver("q;{create:always}") + + def srange(*args): return [str(i) for i in xrange(*args)] + + for m in srange(3): ps.send(m) + # Set up replication with qpid-ha + backup.replicate(primary.host_port(), "q") + backup.assert_browse_backup("q", srange(3)) + for m in srange(3,6): ps.send(str(m)) + backup.assert_browse_backup("q", srange(6)) + self.assertEqual("0", pr.fetch().content) + pr.session.acknowledge() + backup.assert_browse_backup("q", srange(1,6)) + + # Set up replication with qpid-config + ps2 = pc.session().sender("q2;{create:always}") + backup.config_replicate(primary.host_port(), "q2"); + ps2.send("x", timeout=1) + backup.assert_browse_backup("q2", ["x"]) - # Set up replication with qpid-ha - backup.replicate(primary.host_port(), "q") - ps.send("a", timeout=1) - backup.assert_browse_backup("q", ["a"]) - ps.send("b", timeout=1) - backup.assert_browse_backup("q", ["a", "b"]) - self.assertEqual("a", pr.fetch().content) - pr.session.acknowledge() - backup.assert_browse_backup("q", ["b"]) - - # Set up replication with qpid-config - ps2 = pc.session().sender("q2;{create:always}") - backup.config_replicate(primary.host_port(), "q2"); - ps2.send("x", timeout=1) - backup.assert_browse_backup("q2", ["x"]) - finally: l.restore() def test_standalone_queue_replica_failover(self): """Test individual queue replication from a cluster to a standalone |
