summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Observers.h12
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h7
-rw-r--r--qpid/cpp/src/qpid/ha/IdSetter.h6
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h1
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp40
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueSnapshot.h2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueSnapshots.h70
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp17
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py53
13 files changed, 89 insertions, 158 deletions
diff --git a/qpid/cpp/src/qpid/broker/Observers.h b/qpid/cpp/src/qpid/broker/Observers.h
index b7b26a0d38..5357938c77 100644
--- a/qpid/cpp/src/qpid/broker/Observers.h
+++ b/qpid/cpp/src/qpid/broker/Observers.h
@@ -59,6 +59,14 @@ class Observers
std::for_each(copy.begin(), copy.end(), f);
}
+ template <class T> boost::shared_ptr<T> findType() const {
+ sys::Mutex::ScopedLock l(lock);
+ typename Set::const_iterator i =
+ std::find_if(observers.begin(), observers.end(), &isA<T>);
+ return i == observers.end() ?
+ boost::shared_ptr<T>() : boost::dynamic_pointer_cast<T>(*i);
+ }
+
protected:
typedef std::set<ObserverPtr> Set;
Observers() : lock(myLock) {}
@@ -71,6 +79,10 @@ class Observers
std::for_each(observers.begin(), observers.end(), f);
}
+ template <class T> static bool isA(const ObserverPtr&o) {
+ return boost::dynamic_pointer_cast<T>(o);
+ }
+
mutable sys::Mutex myLock;
mutable sys::Mutex& lock;
Set observers;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 1587b5b33f..0737701431 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -892,7 +892,7 @@ void BrokerReplicator::disconnected() {
// Make copy of exchanges so we can work outside the registry lock.
ExchangeVector exs;
- exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
+ exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, boost::ref(exs), _1));
for_each(exs.begin(), exs.end(),
boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1));
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 61561b3af6..50e99ef527 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -22,17 +22,18 @@
#include "BackupConnectionExcluder.h"
#include "ConnectionObserver.h"
#include "HaBroker.h"
+#include "IdSetter.h"
#include "Primary.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "StandAlone.h"
#include "QueueSnapshot.h"
-#include "QueueSnapshots.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/assert.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SignalHandler.h"
@@ -60,6 +61,20 @@ using sys::Mutex;
using boost::shared_ptr;
using boost::dynamic_pointer_cast;
+// In a HaBroker we always need to add QueueSnapshot and IdSetter to each queue
+// because we don't know in advance which queues might be used for stand-alone
+// replication.
+//
+// TODO aconway 2013-12-13: Can we restrict this to queues identified as replicated?
+//
+class HaBroker::BrokerObserver : public broker::BrokerObserver {
+ public:
+ void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+ q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
+ q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter));
+ }
+};
+
// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: systemId(b.getSystem()->getSystemId().data()),
@@ -69,8 +84,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
membership(BrokerInfo(systemId, STANDALONE), *this),
- failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)),
- queueSnapshots(shared_ptr<QueueSnapshots>(new QueueSnapshots))
+ failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
@@ -82,8 +96,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
broker.getConnectionObservers().add(observer);
broker.getExchanges().registerExchange(failoverExchange);
}
- // QueueSnapshots are needed for standalone replication as well as cluster.
- broker.getBrokerObservers().add(queueSnapshots);
+ broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver()));
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index d10014846c..9fadd4f35c 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -54,8 +54,6 @@ class Backup;
class ConnectionObserver;
class Primary;
class Role;
-class QueueSnapshot;
-class QueueSnapshots;
class QueueReplicator;
/**
@@ -98,14 +96,14 @@ class HaBroker : public management::Manageable
void setAddress(const Address&); // set self address from a self-connection
- boost::shared_ptr<QueueSnapshots> getQueueSnapshots() { return queueSnapshots; }
-
boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
/** Authenticated user ID for queue create/delete */
std::string getUserId() const { return userId; }
private:
+ class BrokerObserver;
+
void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
@@ -129,7 +127,6 @@ class HaBroker : public management::Manageable
boost::shared_ptr<Role> role;
Membership membership;
boost::shared_ptr<FailoverExchange> failoverExchange;
- boost::shared_ptr<QueueSnapshots> queueSnapshots;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h
index dc9fa90af9..67da62ef48 100644
--- a/qpid/cpp/src/qpid/ha/IdSetter.h
+++ b/qpid/cpp/src/qpid/ha/IdSetter.h
@@ -43,15 +43,11 @@ namespace ha {
class IdSetter : public broker::MessageInterceptor
{
public:
- IdSetter(const std::string& q, ReplicationId firstId) : nextId(firstId), name(q) {
- QPID_LOG(trace, "Initial replication ID for " << name << " =" << nextId.get());
- }
-
+ IdSetter(ReplicationId firstId=1) : nextId(firstId) {}
void record(broker::Message& m) { m.setReplicationId(nextId++); }
private:
sys::AtomicValue<uint32_t> nextId;
- std::string name;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 0c0fe983bb..b437190004 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -104,8 +104,6 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
- broker::QueueRegistry& queues = hb.getBroker().getQueues();
- queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
if (expect.empty()) {
QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups.");
}
@@ -140,15 +138,6 @@ Primary::~Primary() {
haBroker.getObserver()->reset();
}
-void Primary::initializeQueue(boost::shared_ptr<broker::Queue> q) {
- if (replicationTest.useLevel(*q) == ALL) {
- boost::shared_ptr<QueueReplicator> qr = haBroker.findQueueReplicator(q->getName());
- ReplicationId firstId = qr ? qr->getMaxId()+1 : ReplicationId(1);
- q->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(q->getName(), firstId)));
- }
-}
-
void Primary::checkReady() {
bool activate = false;
{
@@ -261,7 +250,6 @@ void Primary::queueCreate(const QueuePtr& q) {
if (level) {
QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
<< " replication: " << printable(level));
- initializeQueue(q);
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 7f98f06fec..e0a7065e2c 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -125,7 +125,6 @@ class Primary : public Role
RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
- void initializeQueue(boost::shared_ptr<broker::Queue>);
void checkReady();
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index b43658365c..eda3f96180 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,8 +21,9 @@
#include "Event.h"
#include "HaBroker.h"
+#include "IdSetter.h"
#include "QueueReplicator.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "types.h"
@@ -122,6 +123,11 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
settings(hb.getSettings()),
nextId(0), maxId(0)
{
+ // The QueueReplicator will take over setting replication IDs.
+ boost::shared_ptr<IdSetter> setter =
+ q->getMessageInterceptors().findType<IdSetter>();
+ if (setter) q->getMessageInterceptors().remove(setter);
+
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -212,8 +218,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
- arguments.setString(ReplicatingSubscription::QPID_ID_SET,
- encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
+ boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
+ if (qs) arguments.setString(ReplicatingSubscription::QPID_ID_SET, encodeStr(qs->getSnapshot()));
+
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -254,6 +261,7 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
}
// Called in connection thread of the queues bridge to primary.
+
void QueueReplicator::route(Deliverable& deliverable)
{
try {
@@ -293,11 +301,6 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
-ReplicationId QueueReplicator::getMaxId() {
- Mutex::ScopedLock l(lock);
- return maxId;
-}
-
void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
@@ -320,14 +323,19 @@ bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
void QueueReplicator::promoted() {
- // Promoted to primary, deal with auto-delete now.
- if (queue && queue->isAutoDelete() && subscribed) {
- // Make a temporary shared_ptr to prevent premature deletion of queue.
- // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
- // which could delete the queue while it's still running it's destroyed logic.
- boost::shared_ptr<Queue> q(queue);
- q->releaseFromUse();
- q->scheduleAutoDelete();
+ if (queue) {
+ // On primary QueueReplicator no longer sets IDs, start an IdSetter.
+ queue->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
+ // Process auto-deletes
+ if (queue->isAutoDelete() && subscribed) {
+ // Make a temporary shared_ptr to prevent premature deletion of queue.
+ // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+ // which could delete the queue while it's still running it's destroyed logic.
+ boost::shared_ptr<Queue> q(queue);
+ q->releaseFromUse();
+ q->scheduleAutoDelete();
+ }
}
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 8938285fe3..a86355f194 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -85,8 +85,6 @@ class QueueReplicator : public broker::Exchange,
boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
- ReplicationId getMaxId();
-
// No-op unused Exchange virtual functions.
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshot.h b/qpid/cpp/src/qpid/ha/QueueSnapshot.h
index 5b1054d934..577bd96ef7 100644
--- a/qpid/cpp/src/qpid/ha/QueueSnapshot.h
+++ b/qpid/cpp/src/qpid/ha/QueueSnapshot.h
@@ -53,7 +53,7 @@ class QueueSnapshot : public broker::QueueObserver
void requeued(const broker::Message&) {}
- ReplicationIdSet snapshot() {
+ ReplicationIdSet getSnapshot() {
sys::Mutex::ScopedLock l(lock);
return set;
}
diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshots.h b/qpid/cpp/src/qpid/ha/QueueSnapshots.h
deleted file mode 100644
index 6612c71f6a..0000000000
--- a/qpid/cpp/src/qpid/ha/QueueSnapshots.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#ifndef QPID_HA_QUEUESNAPSHOTS_H
-#define QPID_HA_QUEUESNAPSHOTS_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-#include "QueueSnapshot.h"
-#include "hash.h"
-
-#include "qpid/assert.h"
-#include "qpid/broker/BrokerObserver.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/sys/Mutex.h"
-
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace ha {
-
-/**
- * BrokerObserver that maintains a map of the QueueSnapshot for each queue.
- * THREAD SAFE.
- */
-class QueueSnapshots : public broker::BrokerObserver
-{
- public:
- boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
- boost::shared_ptr<QueueSnapshot> qs;
- q->getObservers().each(
- boost::bind(QueueSnapshots::saveQueueSnapshot, _1, boost::ref(qs)));
- return qs;
- }
-
- // BrokerObserver overrides.
- void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
- q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
- }
-
- private:
- static void saveQueueSnapshot(
- const boost::shared_ptr<broker::QueueObserver>& observer,
- boost::shared_ptr<QueueSnapshot>& out)
- {
- if (!out) out = boost::dynamic_pointer_cast<QueueSnapshot>(observer);
- }
-};
-
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_QUEUESNAPSHOTS_H*/
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 2db7845067..635d5047bd 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -22,7 +22,7 @@
#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "TxReplicatingSubscription.h"
#include "Primary.h"
@@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize() {
info.printId(os) << ": ";
logPrefix = os.str();
- // If this is a non-cluster standalone replication then we need to
- // set up an IdSetter if there is not already one.
- boost::shared_ptr<IdSetter> idSetter;
- queue->getMessageInterceptors().each(
- boost::bind(&copyIf, _1, boost::ref(idSetter)));
- if (!idSetter) {
- QPID_LOG(debug, logPrefix << "Standalone replication");
- queue->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
- }
-
// If there's already a guard (we are in failover) use it, else create one.
if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -152,14 +141,14 @@ void ReplicatingSubscription::initialize() {
// between the snapshot and attaching the observer.
queue->getObservers().add(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
- boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+ boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>();
// There may be no snapshot if the queue is being deleted concurrently.
if (!snapshot) {
queue->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
}
- ReplicationIdSet primaryIds = snapshot->snapshot();
+ ReplicationIdSet primaryIds = snapshot->getSnapshot();
std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
ReplicationIdSet backupIds;
if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 1a5d6ddff8..7db24810bf 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -272,33 +272,34 @@ class ReplicationTests(HaBrokerTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- primary = HaBroker(self, name="primary", ha_cluster=False,
- args=["--ha-queue-replication=yes"]);
- pc = primary.connect()
- ps = pc.session().sender("q;{create:always}")
- pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False,
- args=["--ha-queue-replication=yes"])
- br = backup.connect().session().receiver("q;{create:always}")
+ primary = HaBroker(self, name="primary", ha_cluster=False,
+ args=["--ha-queue-replication=yes"]);
+ pc = primary.connect()
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
+ bs = backup.connect().session()
+ br = bs.receiver("q;{create:always}")
+
+ def srange(*args): return [str(i) for i in xrange(*args)]
+
+ for m in srange(3): ps.send(m)
+ # Set up replication with qpid-ha
+ backup.replicate(primary.host_port(), "q")
+ backup.assert_browse_backup("q", srange(3))
+ for m in srange(3,6): ps.send(str(m))
+ backup.assert_browse_backup("q", srange(6))
+ self.assertEqual("0", pr.fetch().content)
+ pr.session.acknowledge()
+ backup.assert_browse_backup("q", srange(1,6))
+
+ # Set up replication with qpid-config
+ ps2 = pc.session().sender("q2;{create:always}")
+ backup.config_replicate(primary.host_port(), "q2");
+ ps2.send("x", timeout=1)
+ backup.assert_browse_backup("q2", ["x"])
- # Set up replication with qpid-ha
- backup.replicate(primary.host_port(), "q")
- ps.send("a", timeout=1)
- backup.assert_browse_backup("q", ["a"])
- ps.send("b", timeout=1)
- backup.assert_browse_backup("q", ["a", "b"])
- self.assertEqual("a", pr.fetch().content)
- pr.session.acknowledge()
- backup.assert_browse_backup("q", ["b"])
-
- # Set up replication with qpid-config
- ps2 = pc.session().sender("q2;{create:always}")
- backup.config_replicate(primary.host_port(), "q2");
- ps2.send("x", timeout=1)
- backup.assert_browse_backup("q2", ["x"])
- finally: l.restore()
def test_standalone_queue_replica_failover(self):
"""Test individual queue replication from a cluster to a standalone