diff options
| author | Alan Conway <aconway@apache.org> | 2013-05-22 23:52:17 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-05-22 23:52:17 +0000 |
| commit | 4e57cc3019fbb8cdf27f253d9c1fe1c4321f189b (patch) | |
| tree | 21e369badfe1caad8b56e8c266ae4f7d28660e93 /qpid/cpp | |
| parent | 197e06a18ac0da4e34c29bf13f0290a1137f21df (diff) | |
| download | qpid-python-4e57cc3019fbb8cdf27f253d9c1fe1c4321f189b.tar.gz | |
QPID-4866: HA support for failover exchange
Add support for the "amq.failover" exchange with new HA, to support migration of
clients that used this facility with the old cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1485511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/ha.mk | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/FailoverExchange.cpp | 129 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/FailoverExchange.h | 71 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 44 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/makeMessage.cpp | 57 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/makeMessage.h | 43 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 14 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 23 |
12 files changed, 358 insertions, 42 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 66c9e6284c..8ceeb824c4 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -642,9 +642,13 @@ if (BUILD_HA) qpid/ha/BrokerReplicator.h qpid/ha/ConnectionObserver.cpp qpid/ha/ConnectionObserver.h + qpid/ha/FailoverExchange.cpp + qpid/ha/FailoverExchange.h qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/HaPlugin.cpp + qpid/ha/makeMessage.cpp + qpid/ha/makeMessage.h qpid/ha/Membership.cpp qpid/ha/Membership.h qpid/ha/Primary.cpp diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 85c41355df..327003fc4d 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -36,6 +36,8 @@ ha_la_SOURCES = \ qpid/ha/HaBroker.cpp \ qpid/ha/HaBroker.h \ qpid/ha/HaPlugin.cpp \ + qpid/ha/makeMessage.cpp \ + qpid/ha/makeMessage.h \ qpid/ha/Membership.cpp \ qpid/ha/Membership.h \ qpid/ha/Primary.cpp \ diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp new file mode 100644 index 0000000000..556c7458b6 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "FailoverExchange.h" +#include "makeMessage.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/Array.h" +#include "qpid/RefCounted.h" +#include "qpid/UrlArray.h" +#include <boost/bind.hpp> +#include <algorithm> + + +namespace qpid { +namespace ha { + +using namespace std; + +using namespace broker; +using namespace framing; +using broker::amqp_0_10::MessageTransfer; + +const string FailoverExchange::typeName("amq.failover"); + +namespace { +struct OstreamUrls { + OstreamUrls(const FailoverExchange::Urls& u) : urls(u) {} + FailoverExchange::Urls urls; +}; + +ostream& operator<<(ostream& o, const OstreamUrls& urls) { + ostream_iterator<qpid::Url> out(o, " "); + copy(urls.urls.begin(), urls.urls.end(), out); + return o; +} +} + +FailoverExchange::FailoverExchange(management::Manageable& parent, Broker& b) + : Exchange(typeName, &parent, &b) +{ + QPID_LOG(debug, typeName << " created."); + if (mgmtExchange != 0) + mgmtExchange->set_type(typeName); +} + +void FailoverExchange::setUrls(const vector<Url>& u) { + QPID_LOG(debug, typeName << " URLs set to " << OstreamUrls(u)); + Lock l(lock); + urls = u; +} + +void FailoverExchange::updateUrls(const vector<Url>& u) { + QPID_LOG(debug, typeName << " Updating URLs " << OstreamUrls(u) << " to " + << queues.size() << " subscribers."); + Lock l(lock); + urls=u; + if (!urls.empty() && !queues.empty()) { + for (Queues::const_iterator i = queues.begin(); i != queues.end(); ++i) + sendUpdate(*i, l); + } +} + +string FailoverExchange::getType() const { return typeName; } + +bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, + const framing::FieldTable*) { + QPID_LOG(debug, typeName << " binding " << queue->getName()); + Lock l(lock); + sendUpdate(queue, l); + return queues.insert(queue).second; +} + +bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&, + const framing::FieldTable*) { + QPID_LOG(debug, typeName << " un-binding " << queue->getName()); + Lock l(lock); + return queues.erase(queue); +} + +bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, + const framing::FieldTable*) { + Lock l(lock); + return queues.find(queue) != queues.end(); +} + +void FailoverExchange::route(Deliverable&) { + QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring"); +} + +void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::ScopedLock&) { + QPID_LOG(debug, typeName << " sending " << OstreamUrls(urls) << " to " << queue->getName()); + if (urls.empty()) return; + framing::Array array = vectorToUrlArray(urls); + const ProtocolVersion v; + broker::Message message(makeMessage(Buffer(), typeName)); + MessageTransfer& transfer = MessageTransfer::get(message); + MessageProperties* props = + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(true); + props->setContentLength(0); + props->getApplicationHeaders().setArray(typeName, array); + DeliverableMessage(message, 0).deliverTo(queue); +} + +}} // namespace ha diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.h b/qpid/cpp/src/qpid/ha/FailoverExchange.h new file mode 100644 index 0000000000..6ec1d0f152 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/FailoverExchange.h @@ -0,0 +1,71 @@ +#ifndef QPID_HA_FAILOVEREXCHANGE_H +#define QPID_HA_FAILOVEREXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *ls + */ + +#include "qpid/broker/Exchange.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/Url.h" + +#include <vector> +#include <set> + +namespace qpid { +namespace ha { + +/** + * Failover exchange provides failover host list, as specified in AMQP 0-10. + */ +class FailoverExchange : public broker::Exchange +{ + public: + typedef std::vector<Url> Urls; + + static const std::string typeName; + + FailoverExchange(management::Manageable& parent, broker::Broker& b); + + /** Set the URLs but don't send an update.*/ + void setUrls(const Urls&); + /** Set the URLs and send an update.*/ + void updateUrls(const Urls&); + + // Exchange overrides + std::string getType() const; + bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); + bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); + bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args); + void route(broker::Deliverable& msg); + + private: + void sendUpdate(const boost::shared_ptr<broker::Queue>&, sys::Mutex::ScopedLock&); + + typedef sys::Mutex::ScopedLock Lock; + typedef std::set<boost::shared_ptr<broker::Queue> > Queues; + + sys::Mutex lock; + Urls urls; + Queues queues; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_FAILOVEREXCHANGE_H*/ diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 52a47380dd..3f29949aca 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -64,7 +64,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) broker(b), observer(new ConnectionObserver(*this, systemId)), role(new StandAlone), - membership(BrokerInfo(systemId, STANDALONE), *this) + membership(BrokerInfo(systemId, STANDALONE), *this), + failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)) { // If we are joining a cluster we must start excluding clients now, // otherwise there's a window for a client to connect before we get to @@ -74,6 +75,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder); observer->setObserver(excluder, "Backup: "); broker.getConnectionObservers().add(observer); + broker.getExchanges().registerExchange(failoverExchange); } } @@ -171,7 +173,9 @@ void HaBroker::setPublicUrl(const Url& url) { mgmtObject->set_publicUrl(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url); + vector<Url> urls(1, url); + failoverExchange->updateUrls(urls); + QPID_LOG(debug, role->getLogPrefix() << "Public URL set to: " << url); } void HaBroker::setBrokerUrl(const Url& url) { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 1b3666362a..a214d2acd3 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -27,6 +27,7 @@ #include "types.h" #include "Settings.h" #include "qpid/Url.h" +#include "FailoverExchange.h" #include "qpid/sys/Mutex.h" #include "qmf/org/apache/qpid/ha/HaBroker.h" #include "qpid/management/Manageable.h" @@ -115,6 +116,7 @@ class HaBroker : public management::Manageable boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary boost::shared_ptr<Role> role; Membership membership; + boost::shared_ptr<FailoverExchange> failoverExchange; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 933716e8fa..a9bd7b49f8 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -19,6 +19,7 @@ * */ +#include "makeMessage.h" #include "QueueGuard.h" #include "QueueRange.h" #include "QueueReplicator.h" @@ -41,6 +42,7 @@ using namespace framing; using namespace broker; using namespace std; using sys::Mutex; +using broker::amqp_0_10::MessageTransfer; const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); const string ReplicatingSubscription::QPID_BACK("qpid.ha-back"); @@ -289,7 +291,7 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) +void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); @@ -300,7 +302,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) buffer.reset(); { Mutex::ScopedUnlock u(lock); - sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); } } @@ -328,7 +330,7 @@ void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&) +void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock& l) { if (pos == backupPosition) return; // No need to send. QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition); @@ -338,39 +340,25 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::Scope buffer.reset(); { Mutex::ScopedUnlock u(lock); - sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); + sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); } } -void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer) +void ReplicatingSubscription::sendEvent(const std::string& key, + const framing::Buffer& buffer, + Mutex::ScopedLock&) { - //generate event message - boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody())); - content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize()); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - event->getFrames().append(method); - event->getFrames().append(header); - event->getFrames().append(content); - + broker::Message message = makeMessage(buffer); + MessageTransfer& transfer = MessageTransfer::get(message); DeliveryProperties* props = - event->getFrames().getHeaders()->get<DeliveryProperties>(true); + transfer.getFrames().getHeaders()->get<DeliveryProperties>(true); props->setRoutingKey(key); - // Send the event directly to the base consumer implementation. - //dummy consumer prevents acknowledgements being handled, which is what we want for events - ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>()); + // Send the event directly to the base consumer implementation. The dummy + // consumer prevents acknowledgements being handled, which is what we want + // for events + ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>()); } - // Called in subscription's connection thread. bool ReplicatingSubscription::doDispatch() { diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 2780f4fd00..05584a2e37 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -132,7 +132,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl void sendDequeueEvent(sys::Mutex::ScopedLock&); void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&); void setReady(); - void sendEvent(const std::string& key, framing::Buffer&); + void sendEvent(const std::string& key, const framing::Buffer&, + sys::Mutex::ScopedLock&); friend struct Factory; }; diff --git a/qpid/cpp/src/qpid/ha/makeMessage.cpp b/qpid/cpp/src/qpid/ha/makeMessage.cpp new file mode 100644 index 0000000000..ca0e48f13d --- /dev/null +++ b/qpid/cpp/src/qpid/ha/makeMessage.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "makeMessage.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" + +namespace qpid { +namespace ha { + +broker::Message makeMessage(const framing::Buffer& buffer, + const std::string& destination) +{ + using namespace framing; + using broker::amqp_0_10::MessageTransfer; + + boost::intrusive_ptr<MessageTransfer> transfer( + new qpid::broker::amqp_0_10::MessageTransfer()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), destination, 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + // AMQContentBody::decode is missing a const declaration, so cast it here. + content.castBody<AMQContentBody>()->decode( + const_cast<Buffer&>(buffer), buffer.getSize()); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + transfer->getFrames().append(method); + transfer->getFrames().append(header); + transfer->getFrames().append(content); + return broker::Message(transfer, 0); +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/makeMessage.h b/qpid/cpp/src/qpid/ha/makeMessage.h new file mode 100644 index 0000000000..283b415791 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/makeMessage.h @@ -0,0 +1,43 @@ +#ifndef QPID_HA_MAKEMESSAGE_H +#define QPID_HA_MAKEMESSAGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Message.h" + +namespace qpid { +namespace framing { +class Buffer; +} +namespace ha { + +/** + * Create internal messages used by HA components. + */ +broker::Message makeMessage( + const framing::Buffer& content, + const std::string& destination=std::string() +); + +}} // namespace qpid::ha + +#endif /*!QPID_HA_MAKEMESSAGE_H*/ diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 75393fbdf1..7b0d88a27c 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -151,9 +151,8 @@ acl allow all all def promote(self): self.ready(); self.qpid_ha(["promote"]) def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url]) - def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) + def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]); def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - def agent(self): if not self._agent: cred = self.client_credentials @@ -191,7 +190,7 @@ acl allow all all agent = self.agent() assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout) - # FIXME aconway 2012-05-01: do direct python call to qpid-config code. + # TODO aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): assert subprocess.call( [self.qpid_config_path, "--broker", self.host_port()]+args, @@ -299,7 +298,7 @@ class HaCluster(object): """Start a new broker in the cluster""" i = len(self) assert i <= len(self._ports) - if i == len(self._ports): + if i == len(self._ports): # Adding new broker after cluster init self._ports.append(HaPort(self.test)) self._set_url() self._update_urls() @@ -311,10 +310,9 @@ class HaCluster(object): self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports) def _update_urls(self): - if len(self) > 1: # No failover addresses on a 1 cluster. - for b in self: - b.set_brokers_url(self.url) - b.set_public_url(self.url) + for b in self: + b.set_brokers_url(self.url) + b.set_public_url(self.url) def connect(self, i): """Connect with reconnect_urls""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index b6642e4508..3836381ed2 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -549,7 +549,7 @@ class ReplicationTests(HaBrokerTest): def test_auth(self): """Verify that authentication does not interfere with replication.""" - # FIXME aconway 2012-07-09: generate test sasl config portably for cmake + # TODO aconway 2012-07-09: generate test sasl config portably for cmake sasl_config=os.path.join(self.rootdir, "sasl_config") if not os.path.exists(sasl_config): print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config @@ -1183,6 +1183,24 @@ class ConfigurationTests(HaBrokerTest): b = start("none", "none") check(b, "", "") + def test_failover_exchange(self): + """Verify that the failover exchange correctly reports cluster membership""" + + def strip_url(url): return re.sub('amqp:|tcp:', '', url) + + def assert_url(m, url): + urls = m.properties['amq.failover'] + self.assertEqual(1, len(urls)) + self.assertEqual(strip_url(urls[0]), url) + + cluster = HaCluster(self, 1, args=["--ha-public-url=foo:1234"]) + r = cluster[0].connect().session().receiver("amq.failover") + assert_url(r.fetch(1), "foo:1234") + cluster[0].set_public_url("bar:1234") + assert_url(r.fetch(1), "bar:1234") + cluster[0].set_brokers_url(cluster.url+",xxx:1234") + self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL + class StoreTests(BrokerTest): """Test for HA with persistence.""" @@ -1203,8 +1221,7 @@ class StoreTests(BrokerTest): r = cluster[0].connect().session().receiver("qq") self.assertEqual(r.fetch().content, "foo") r.session.acknowledge() - # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush - # the dequeue operation on qq. + # Sending this message is a hack to flush the dequeue operation on qq. s.send(Message("flush", durable=True)) def verify(broker, x_count): |
