summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt4
-rw-r--r--qpid/cpp/src/ha.mk2
-rw-r--r--qpid/cpp/src/qpid/ha/FailoverExchange.cpp129
-rw-r--r--qpid/cpp/src/qpid/ha/FailoverExchange.h71
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp44
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h3
-rw-r--r--qpid/cpp/src/qpid/ha/makeMessage.cpp57
-rw-r--r--qpid/cpp/src/qpid/ha/makeMessage.h43
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py14
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py23
12 files changed, 358 insertions, 42 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 66c9e6284c..8ceeb824c4 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -642,9 +642,13 @@ if (BUILD_HA)
qpid/ha/BrokerReplicator.h
qpid/ha/ConnectionObserver.cpp
qpid/ha/ConnectionObserver.h
+ qpid/ha/FailoverExchange.cpp
+ qpid/ha/FailoverExchange.h
qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h
qpid/ha/HaPlugin.cpp
+ qpid/ha/makeMessage.cpp
+ qpid/ha/makeMessage.h
qpid/ha/Membership.cpp
qpid/ha/Membership.h
qpid/ha/Primary.cpp
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 85c41355df..327003fc4d 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -36,6 +36,8 @@ ha_la_SOURCES = \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
+ qpid/ha/makeMessage.cpp \
+ qpid/ha/makeMessage.h \
qpid/ha/Membership.cpp \
qpid/ha/Membership.h \
qpid/ha/Primary.cpp \
diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
new file mode 100644
index 0000000000..556c7458b6
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverExchange.h"
+#include "makeMessage.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/Array.h"
+#include "qpid/RefCounted.h"
+#include "qpid/UrlArray.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+using namespace broker;
+using namespace framing;
+using broker::amqp_0_10::MessageTransfer;
+
+const string FailoverExchange::typeName("amq.failover");
+
+namespace {
+struct OstreamUrls {
+ OstreamUrls(const FailoverExchange::Urls& u) : urls(u) {}
+ FailoverExchange::Urls urls;
+};
+
+ostream& operator<<(ostream& o, const OstreamUrls& urls) {
+ ostream_iterator<qpid::Url> out(o, " ");
+ copy(urls.urls.begin(), urls.urls.end(), out);
+ return o;
+}
+}
+
+FailoverExchange::FailoverExchange(management::Manageable& parent, Broker& b)
+ : Exchange(typeName, &parent, &b)
+{
+ QPID_LOG(debug, typeName << " created.");
+ if (mgmtExchange != 0)
+ mgmtExchange->set_type(typeName);
+}
+
+void FailoverExchange::setUrls(const vector<Url>& u) {
+ QPID_LOG(debug, typeName << " URLs set to " << OstreamUrls(u));
+ Lock l(lock);
+ urls = u;
+}
+
+void FailoverExchange::updateUrls(const vector<Url>& u) {
+ QPID_LOG(debug, typeName << " Updating URLs " << OstreamUrls(u) << " to "
+ << queues.size() << " subscribers.");
+ Lock l(lock);
+ urls=u;
+ if (!urls.empty() && !queues.empty()) {
+ for (Queues::const_iterator i = queues.begin(); i != queues.end(); ++i)
+ sendUpdate(*i, l);
+ }
+}
+
+string FailoverExchange::getType() const { return typeName; }
+
+bool FailoverExchange::bind(Queue::shared_ptr queue, const string&,
+ const framing::FieldTable*) {
+ QPID_LOG(debug, typeName << " binding " << queue->getName());
+ Lock l(lock);
+ sendUpdate(queue, l);
+ return queues.insert(queue).second;
+}
+
+bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&,
+ const framing::FieldTable*) {
+ QPID_LOG(debug, typeName << " un-binding " << queue->getName());
+ Lock l(lock);
+ return queues.erase(queue);
+}
+
+bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const,
+ const framing::FieldTable*) {
+ Lock l(lock);
+ return queues.find(queue) != queues.end();
+}
+
+void FailoverExchange::route(Deliverable&) {
+ QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
+}
+
+void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::ScopedLock&) {
+ QPID_LOG(debug, typeName << " sending " << OstreamUrls(urls) << " to " << queue->getName());
+ if (urls.empty()) return;
+ framing::Array array = vectorToUrlArray(urls);
+ const ProtocolVersion v;
+ broker::Message message(makeMessage(Buffer(), typeName));
+ MessageTransfer& transfer = MessageTransfer::get(message);
+ MessageProperties* props =
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true);
+ props->setContentLength(0);
+ props->getApplicationHeaders().setArray(typeName, array);
+ DeliverableMessage(message, 0).deliverTo(queue);
+}
+
+}} // namespace ha
diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.h b/qpid/cpp/src/qpid/ha/FailoverExchange.h
new file mode 100644
index 0000000000..6ec1d0f152
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/FailoverExchange.h
@@ -0,0 +1,71 @@
+#ifndef QPID_HA_FAILOVEREXCHANGE_H
+#define QPID_HA_FAILOVEREXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *ls
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/Url.h"
+
+#include <vector>
+#include <set>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Failover exchange provides failover host list, as specified in AMQP 0-10.
+ */
+class FailoverExchange : public broker::Exchange
+{
+ public:
+ typedef std::vector<Url> Urls;
+
+ static const std::string typeName;
+
+ FailoverExchange(management::Manageable& parent, broker::Broker& b);
+
+ /** Set the URLs but don't send an update.*/
+ void setUrls(const Urls&);
+ /** Set the URLs and send an update.*/
+ void updateUrls(const Urls&);
+
+ // Exchange overrides
+ std::string getType() const;
+ bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args);
+ void route(broker::Deliverable& msg);
+
+ private:
+ void sendUpdate(const boost::shared_ptr<broker::Queue>&, sys::Mutex::ScopedLock&);
+
+ typedef sys::Mutex::ScopedLock Lock;
+ typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
+
+ sys::Mutex lock;
+ Urls urls;
+ Queues queues;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_FAILOVEREXCHANGE_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 52a47380dd..3f29949aca 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -64,7 +64,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
- membership(BrokerInfo(systemId, STANDALONE), *this)
+ membership(BrokerInfo(systemId, STANDALONE), *this),
+ failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
@@ -74,6 +75,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
+ broker.getExchanges().registerExchange(failoverExchange);
}
}
@@ -171,7 +173,9 @@ void HaBroker::setPublicUrl(const Url& url) {
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
+ vector<Url> urls(1, url);
+ failoverExchange->updateUrls(urls);
+ QPID_LOG(debug, role->getLogPrefix() << "Public URL set to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 1b3666362a..a214d2acd3 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -27,6 +27,7 @@
#include "types.h"
#include "Settings.h"
#include "qpid/Url.h"
+#include "FailoverExchange.h"
#include "qpid/sys/Mutex.h"
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qpid/management/Manageable.h"
@@ -115,6 +116,7 @@ class HaBroker : public management::Manageable
boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
boost::shared_ptr<Role> role;
Membership membership;
+ boost::shared_ptr<FailoverExchange> failoverExchange;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 933716e8fa..a9bd7b49f8 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -19,6 +19,7 @@
*
*/
+#include "makeMessage.h"
#include "QueueGuard.h"
#include "QueueRange.h"
#include "QueueReplicator.h"
@@ -41,6 +42,7 @@ using namespace framing;
using namespace broker;
using namespace std;
using sys::Mutex;
+using broker::amqp_0_10::MessageTransfer;
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
@@ -289,7 +291,7 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
+void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
@@ -300,7 +302,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
buffer.reset();
{
Mutex::ScopedUnlock u(lock);
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
}
}
@@ -328,7 +330,7 @@ void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last
}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock& l)
{
if (pos == backupPosition) return; // No need to send.
QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
@@ -338,39 +340,25 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::Scope
buffer.reset();
{
Mutex::ScopedUnlock u(lock);
- sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
+ sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
}
}
-void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
+void ReplicatingSubscription::sendEvent(const std::string& key,
+ const framing::Buffer& buffer,
+ Mutex::ScopedLock&)
{
- //generate event message
- boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content((AMQContentBody()));
- content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- event->getFrames().append(method);
- event->getFrames().append(header);
- event->getFrames().append(content);
-
+ broker::Message message = makeMessage(buffer);
+ MessageTransfer& transfer = MessageTransfer::get(message);
DeliveryProperties* props =
- event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ transfer.getFrames().getHeaders()->get<DeliveryProperties>(true);
props->setRoutingKey(key);
- // Send the event directly to the base consumer implementation.
- //dummy consumer prevents acknowledgements being handled, which is what we want for events
- ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>());
+ // Send the event directly to the base consumer implementation. The dummy
+ // consumer prevents acknowledgements being handled, which is what we want
+ // for events
+ ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>());
}
-
// Called in subscription's connection thread.
bool ReplicatingSubscription::doDispatch()
{
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 2780f4fd00..05584a2e37 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -132,7 +132,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
void sendDequeueEvent(sys::Mutex::ScopedLock&);
void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
void setReady();
- void sendEvent(const std::string& key, framing::Buffer&);
+ void sendEvent(const std::string& key, const framing::Buffer&,
+ sys::Mutex::ScopedLock&);
friend struct Factory;
};
diff --git a/qpid/cpp/src/qpid/ha/makeMessage.cpp b/qpid/cpp/src/qpid/ha/makeMessage.cpp
new file mode 100644
index 0000000000..ca0e48f13d
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/makeMessage.cpp
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "makeMessage.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+namespace qpid {
+namespace ha {
+
+broker::Message makeMessage(const framing::Buffer& buffer,
+ const std::string& destination)
+{
+ using namespace framing;
+ using broker::amqp_0_10::MessageTransfer;
+
+ boost::intrusive_ptr<MessageTransfer> transfer(
+ new qpid::broker::amqp_0_10::MessageTransfer());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
+ // AMQContentBody::decode is missing a const declaration, so cast it here.
+ content.castBody<AMQContentBody>()->decode(
+ const_cast<Buffer&>(buffer), buffer.getSize());
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
+ transfer->getFrames().append(content);
+ return broker::Message(transfer, 0);
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/makeMessage.h b/qpid/cpp/src/qpid/ha/makeMessage.h
new file mode 100644
index 0000000000..283b415791
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/makeMessage.h
@@ -0,0 +1,43 @@
+#ifndef QPID_HA_MAKEMESSAGE_H
+#define QPID_HA_MAKEMESSAGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Message.h"
+
+namespace qpid {
+namespace framing {
+class Buffer;
+}
+namespace ha {
+
+/**
+ * Create internal messages used by HA components.
+ */
+broker::Message makeMessage(
+ const framing::Buffer& content,
+ const std::string& destination=std::string()
+);
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_MAKEMESSAGE_H*/
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 75393fbdf1..7b0d88a27c 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -151,9 +151,8 @@ acl allow all all
def promote(self): self.ready(); self.qpid_ha(["promote"])
def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
- def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+ def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]);
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-
def agent(self):
if not self._agent:
cred = self.client_credentials
@@ -191,7 +190,7 @@ acl allow all all
agent = self.agent()
assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
- # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+ # TODO aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
assert subprocess.call(
[self.qpid_config_path, "--broker", self.host_port()]+args,
@@ -299,7 +298,7 @@ class HaCluster(object):
"""Start a new broker in the cluster"""
i = len(self)
assert i <= len(self._ports)
- if i == len(self._ports):
+ if i == len(self._ports): # Adding new broker after cluster init
self._ports.append(HaPort(self.test))
self._set_url()
self._update_urls()
@@ -311,10 +310,9 @@ class HaCluster(object):
self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports)
def _update_urls(self):
- if len(self) > 1: # No failover addresses on a 1 cluster.
- for b in self:
- b.set_brokers_url(self.url)
- b.set_public_url(self.url)
+ for b in self:
+ b.set_brokers_url(self.url)
+ b.set_public_url(self.url)
def connect(self, i):
"""Connect with reconnect_urls"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index b6642e4508..3836381ed2 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -549,7 +549,7 @@ class ReplicationTests(HaBrokerTest):
def test_auth(self):
"""Verify that authentication does not interfere with replication."""
- # FIXME aconway 2012-07-09: generate test sasl config portably for cmake
+ # TODO aconway 2012-07-09: generate test sasl config portably for cmake
sasl_config=os.path.join(self.rootdir, "sasl_config")
if not os.path.exists(sasl_config):
print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
@@ -1183,6 +1183,24 @@ class ConfigurationTests(HaBrokerTest):
b = start("none", "none")
check(b, "", "")
+ def test_failover_exchange(self):
+ """Verify that the failover exchange correctly reports cluster membership"""
+
+ def strip_url(url): return re.sub('amqp:|tcp:', '', url)
+
+ def assert_url(m, url):
+ urls = m.properties['amq.failover']
+ self.assertEqual(1, len(urls))
+ self.assertEqual(strip_url(urls[0]), url)
+
+ cluster = HaCluster(self, 1, args=["--ha-public-url=foo:1234"])
+ r = cluster[0].connect().session().receiver("amq.failover")
+ assert_url(r.fetch(1), "foo:1234")
+ cluster[0].set_public_url("bar:1234")
+ assert_url(r.fetch(1), "bar:1234")
+ cluster[0].set_brokers_url(cluster.url+",xxx:1234")
+ self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL
+
class StoreTests(BrokerTest):
"""Test for HA with persistence."""
@@ -1203,8 +1221,7 @@ class StoreTests(BrokerTest):
r = cluster[0].connect().session().receiver("qq")
self.assertEqual(r.fetch().content, "foo")
r.session.acknowledge()
- # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush
- # the dequeue operation on qq.
+ # Sending this message is a hack to flush the dequeue operation on qq.
s.send(Message("flush", durable=True))
def verify(broker, x_count):