diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/ha.mk | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h | 73 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 156 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 17 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 51 |
6 files changed, 244 insertions, 55 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index c06b21029f..42a5ce8316 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -626,6 +626,7 @@ set (ha_default ON) option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default}) if (BUILD_HA) set (ha_SOURCES + qpid/ha/AltExchangeSetter.h qpid/ha/BackupConnectionExcluder.h qpid/ha/BrokerInfo.cpp qpid/ha/BrokerInfo.h diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 906dd775bd..9ec46a5156 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -23,6 +23,7 @@ dmoduleexec_LTLIBRARIES += ha.la ha_la_SOURCES = \ + qpid/ha/AltExchangeSetter.h \ qpid/ha/Backup.cpp \ qpid/ha/Backup.h \ qpid/ha/BackupConnectionExcluder.h \ diff --git a/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h new file mode 100644 index 0000000000..08690e68bc --- /dev/null +++ b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h @@ -0,0 +1,73 @@ +#ifndef QPID_HA_ALTERNATEEXCHANGESETTER_H +#define QPID_HA_ALTERNATEEXCHANGESETTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "boost/function.hpp" +#include <map> + +namespace qpid { +namespace ha { + +/** + * Sets the alternate exchange on queues and exchanges. + * Holds onto queues/exchanges if necessary till the alternate exchange is available. + * THREAD UNSAFE + */ +class AlternateExchangeSetter +{ + public: + typedef boost::function<void(boost::shared_ptr<broker::Exchange>)> SetFunction; + + AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {} + + void setAlternate(const std::string& altEx, const SetFunction& setter) { + broker::Exchange::shared_ptr ex = exchanges.find(altEx); + if (ex) setter(ex); // Set immediately. + else setters.insert(Setters::value_type(altEx, setter)); // Save for later. + } + + void addExchange(boost::shared_ptr<broker::Exchange> exchange) { + // Update the setters for this exchange + std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName()); + for (Setters::iterator i = range.first; i != range.second; ++i) + i->second(exchange); + setters.erase(range.first, range.second); + } + + void clear() { + if (!setters.empty()) + QPID_LOG(warning, "Some alternate exchanges were not resolved."); + setters.clear(); + } + + private: + typedef std::multimap<std::string, SetFunction> Setters; + broker::ExchangeRegistry& exchanges; + Setters setters; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_ALTERNATEEXCHANGESETTER_H*/ diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 3eb30a9ec9..4dd9947f90 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -41,6 +41,7 @@ #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include <algorithm> #include <sstream> +#include <iostream> #include <assert.h> namespace qpid { @@ -56,6 +57,7 @@ using qmf::org::apache::qpid::broker::EventSubscribe; using qmf::org::apache::qpid::ha::EventMembersUpdate; using namespace framing; using std::string; +using std::ostream; using types::Variant; using namespace broker; @@ -72,6 +74,7 @@ const string SCHEMA_ID("_schema_id"); const string VALUES("_values"); const string ALTEX("altEx"); +const string ALTEXCHANGE("altExchange"); const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); @@ -93,6 +96,7 @@ const string QNAME("qName"); const string QUEUE("queue"); const string TYPE("type"); const string HA_BROKER("habroker"); +const string PARTIAL("partial"); const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#"); @@ -122,7 +126,9 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { +void sendQuery(const string& packageName, const string& className, const string& queueName, + SessionHandler& sessionHandler) +{ framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[_WHAT] = OBJECT; @@ -142,6 +148,7 @@ void sendQuery(const string& packageName, const string& className, const string& props->setAppId(QMF2); props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); + headerBody.get<qpid::framing::MessageProperties>(true)->setCorrelationId(className); AMQFrame header(headerBody); header.setBof(false); header.setEof(false); @@ -164,14 +171,14 @@ Variant::Map asMapVoid(const Variant& value) { if (!value.isVoid()) return value.asMap(); else return Variant::Map(); } - } // namespace BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), haBroker(hb), broker(hb.getBroker()), link(l), - initialized(false) + initialized(false), + alternates(hb.getBroker().getExchanges()) {} void BrokerReplicator::initialize() { @@ -261,13 +268,15 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH void BrokerReplicator::route(Deliverable& msg) { const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); + const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>(); Variant::List list; try { - if (!isQMFv2(msg.getMessage()) || !headers) + if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getFrames().getContent(); amqp_0_10::ListCodec::decode(content, list); + QPID_LOG(trace, "Broker replicator received: " << *messageProperties); if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); @@ -295,6 +304,10 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == BINDING) doResponseBind(values); else if (type == HA_BROKER) doResponseHaBroker(values); } + if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) { + // We have received all of the exchange response. + alternates.clear(); + } } } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() @@ -321,19 +334,10 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { broker.getQueues().destroy(name); stopQueueReplicator(name); } - std::pair<boost::shared_ptr<Queue>, bool> result = - broker.createQueue( - name, - values[DURABLE].asBool(), - autoDel, - 0, // no owner regardless of exclusivity on primary - // FIXME aconway 2012-07-06: handle alternate exchange - values[ALTEX].asString(), - args, - userId, - remoteHost); - assert(result.second); // Should be true since we destroyed existing queue above - startQueueReplicator(result.first); + boost::shared_ptr<Queue> queue = createQueue( + name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString()); + assert(queue); // Should be created since we destroed the previous queue above. + if (queue) startQueueReplicator(queue); } } @@ -371,17 +375,9 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { broker.getExchanges().destroy(name); QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); } - std::pair<boost::shared_ptr<Exchange>, bool> result = - broker.createExchange( - name, - values[EXTYPE].asString(), - values[DURABLE].asBool(), - // FIXME aconway 2012-07-06: handle alternate exchanges - values[ALTEX].asString(), - args, - userId, - remoteHost); - assert(result.second); + boost::shared_ptr<Exchange> exchange = + createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString()); + assert(exchange); } } @@ -444,6 +440,22 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { haBroker.setMembership(members); } +namespace { + +// Get the alternate exchange from the exchange field of a queue or exchange response. +static const string EXCHANGE_KEY_PREFIX("org.apache.qpid.broker:exchange:"); + +string getAltExchange(const types::Variant& var) { + if (!var.isVoid()) { + management::ObjectId oid(var); + string key = oid.getV2Key(); + if (key.find(EXCHANGE_KEY_PREFIX) != 0) throw Exception("Invalid exchange reference: "+key); + return key.substr(EXCHANGE_KEY_PREFIX.size()); + } + else return string(); +} +} + void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.isReplicated( @@ -456,22 +468,12 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Queue response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - std::pair<boost::shared_ptr<Queue>, bool> result = - broker.createQueue( - name, - values[DURABLE].asBool(), - values[AUTODELETE].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, - ""/*TODO: need to include alternate-exchange*/, - args, - userId, - remoteHost); - + boost::shared_ptr<Queue> queue = + createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); // It is normal for the queue to already exist if we are failing over. - if (result.second) - startQueueReplicator(result.first); - else - QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); + if (queue) startQueueReplicator(queue); + else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -481,16 +483,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - bool created = broker.createExchange( - name, - values[TYPE].asString(), - values[DURABLE].asBool(), - "", // FIXME aconway 2012-07-09: need to include alternate-exchange - args, - userId, - remoteHost - ).second; - QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name); + boost::shared_ptr<Exchange> exchange = createExchange( + name, values[TYPE].asString(), values[DURABLE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); + QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name); } namespace { @@ -576,6 +572,60 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) { } } +boost::shared_ptr<Queue> BrokerReplicator::createQueue( + const std::string& name, + bool durable, + bool autodelete, + const qpid::framing::FieldTable& arguments, + const std::string& alternateExchange) +{ + std::pair<boost::shared_ptr<Queue>, bool> result = + broker.createQueue( + name, + durable, + autodelete, + 0, // no owner regardless of exclusivity on primary + string(), // Set alternate exchange below + arguments, + userId, + remoteHost); + if (result.second) { + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); + } + return result.first; + } + else return boost::shared_ptr<Queue>(); +} + +boost::shared_ptr<Exchange> BrokerReplicator::createExchange( + const std::string& name, + const std::string& type, + bool durable, + const qpid::framing::FieldTable& args, + const std::string& alternateExchange) +{ + std::pair<boost::shared_ptr<Exchange>, bool> result = + broker.createExchange( + name, + type, + durable, + string(), // Set alternate exchange below + args, + userId, + remoteHost); + if (result.second) { + alternates.addExchange(result.first); + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); + } + return result.first; + } + else return boost::shared_ptr<Exchange>(); +} + bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index e2ca8f9e14..dbe4822d74 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -24,8 +24,10 @@ #include "types.h" #include "ReplicationTest.h" +#include "AlternateExchangeSetter.h" #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" +#include "qpid/management/ManagementObject.h" #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> @@ -95,6 +97,20 @@ class BrokerReplicator : public broker::Exchange, void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); void stopQueueReplicator(const std::string& name); + boost::shared_ptr<broker::Queue> createQueue( + const std::string& name, + bool durable, + bool autodelete, + const qpid::framing::FieldTable& arguments, + const std::string& alternateExchange); + + boost::shared_ptr<broker::Exchange> createExchange( + const std::string& name, + const std::string& type, + bool durable, + const qpid::framing::FieldTable& args, + const std::string& alternateExchange); + std::string logPrefix; std::string userId, remoteHost; ReplicationTest replicationTest; @@ -102,6 +118,7 @@ class BrokerReplicator : public broker::Exchange, broker::Broker& broker; boost::shared_ptr<broker::Link> link; bool initialized; + AlternateExchangeSetter alternates; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f900a841d5..6f8fa344d5 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition @@ -63,6 +63,7 @@ class HaBroker(Broker): args = copy(args) args += ["--load-module", BrokerTest.ha_lib, "--log-enable=debug+:ha::", + "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12: # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] @@ -188,7 +189,7 @@ class HaCluster(object): self.broker_id += 1 return name - def start(self, update_urls=True): + def start(self, update_urls=True, args=[]): """Start a new broker in the cluster""" b = HaBroker(self.test, name=self.next_name(), **self.kwargs) self._brokers.append(b) @@ -758,6 +759,52 @@ acl deny all all s1.sender("ex").send("foo"); self.assertEqual(s1.receiver("q").fetch().content, "foo") + def test_alterante_exchange(self): + """Verify that alternate-exchange on exchanges and queues is propagated + to new members of a cluster. """ + cluster = HaCluster(self, 2) + s = cluster[0].connect().session() + # altex exchange: acts as alternate exchange + s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") + # altq queue bound to altex, collect re-routed messages. + s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") + # 0ex exchange with alternate-exchange altex and no queues bound + s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # create queue q with alternate-exchange altex + s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") + # create a bunch of exchanges to ensure we don't clean up prematurely if the + # response comes in multiple fragments. + for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + + def verify(broker): + s = broker.connect().session() + # Verify unmatched message goes to ex's alternate. + s.sender("0ex").send("foo") + altq = s.receiver("altq") + self.assertEqual("foo", altq.fetch(timeout=0).content) + s.acknowledge() + # Verify rejected message goes to q's alternate. + s.sender("q").send("bar") + msg = s.receiver("q").fetch(timeout=0) + self.assertEqual("bar", msg.content) + s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + self.assertEqual("bar", altq.fetch(timeout=0).content) + s.acknowledge() + + # Sanity check: alternate exchanges on original broker + verify(cluster[0]) + # Check backup that was connected during setup. + cluster[1].wait_backup("0ex") + cluster[1].wait_backup("q") + cluster.bounce(0) + verify(cluster[1]) + # Check a newly started backup. + cluster.start() + cluster[2].wait_backup("0ex") + cluster[2].wait_backup("q") + cluster.bounce(1) + verify(cluster[2]) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |
