summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/ha.mk1
-rw-r--r--qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h73
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp156
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h17
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py51
6 files changed, 244 insertions, 55 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index c06b21029f..42a5ce8316 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -626,6 +626,7 @@ set (ha_default ON)
option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
if (BUILD_HA)
set (ha_SOURCES
+ qpid/ha/AltExchangeSetter.h
qpid/ha/BackupConnectionExcluder.h
qpid/ha/BrokerInfo.cpp
qpid/ha/BrokerInfo.h
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 906dd775bd..9ec46a5156 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -23,6 +23,7 @@
dmoduleexec_LTLIBRARIES += ha.la
ha_la_SOURCES = \
+ qpid/ha/AltExchangeSetter.h \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
qpid/ha/BackupConnectionExcluder.h \
diff --git a/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
new file mode 100644
index 0000000000..08690e68bc
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
@@ -0,0 +1,73 @@
+#ifndef QPID_HA_ALTERNATEEXCHANGESETTER_H
+#define QPID_HA_ALTERNATEEXCHANGESETTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "boost/function.hpp"
+#include <map>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Sets the alternate exchange on queues and exchanges.
+ * Holds onto queues/exchanges if necessary till the alternate exchange is available.
+ * THREAD UNSAFE
+ */
+class AlternateExchangeSetter
+{
+ public:
+ typedef boost::function<void(boost::shared_ptr<broker::Exchange>)> SetFunction;
+
+ AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {}
+
+ void setAlternate(const std::string& altEx, const SetFunction& setter) {
+ broker::Exchange::shared_ptr ex = exchanges.find(altEx);
+ if (ex) setter(ex); // Set immediately.
+ else setters.insert(Setters::value_type(altEx, setter)); // Save for later.
+ }
+
+ void addExchange(boost::shared_ptr<broker::Exchange> exchange) {
+ // Update the setters for this exchange
+ std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName());
+ for (Setters::iterator i = range.first; i != range.second; ++i)
+ i->second(exchange);
+ setters.erase(range.first, range.second);
+ }
+
+ void clear() {
+ if (!setters.empty())
+ QPID_LOG(warning, "Some alternate exchanges were not resolved.");
+ setters.clear();
+ }
+
+ private:
+ typedef std::multimap<std::string, SetFunction> Setters;
+ broker::ExchangeRegistry& exchanges;
+ Setters setters;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_ALTERNATEEXCHANGESETTER_H*/
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 3eb30a9ec9..4dd9947f90 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -41,6 +41,7 @@
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include <algorithm>
#include <sstream>
+#include <iostream>
#include <assert.h>
namespace qpid {
@@ -56,6 +57,7 @@ using qmf::org::apache::qpid::broker::EventSubscribe;
using qmf::org::apache::qpid::ha::EventMembersUpdate;
using namespace framing;
using std::string;
+using std::ostream;
using types::Variant;
using namespace broker;
@@ -72,6 +74,7 @@ const string SCHEMA_ID("_schema_id");
const string VALUES("_values");
const string ALTEX("altEx");
+const string ALTEXCHANGE("altExchange");
const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
@@ -93,6 +96,7 @@ const string QNAME("qName");
const string QUEUE("queue");
const string TYPE("type");
const string HA_BROKER("habroker");
+const string PARTIAL("partial");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -122,7 +126,9 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) {
+void sendQuery(const string& packageName, const string& className, const string& queueName,
+ SessionHandler& sessionHandler)
+{
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[_WHAT] = OBJECT;
@@ -142,6 +148,7 @@ void sendQuery(const string& packageName, const string& className, const string&
props->setAppId(QMF2);
props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ headerBody.get<qpid::framing::MessageProperties>(true)->setCorrelationId(className);
AMQFrame header(headerBody);
header.setBof(false);
header.setEof(false);
@@ -164,14 +171,14 @@ Variant::Map asMapVoid(const Variant& value) {
if (!value.isVoid()) return value.asMap();
else return Variant::Map();
}
-
} // namespace
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l),
- initialized(false)
+ initialized(false),
+ alternates(hb.getBroker().getExchanges())
{}
void BrokerReplicator::initialize() {
@@ -261,13 +268,15 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
void BrokerReplicator::route(Deliverable& msg) {
const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+ const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>();
Variant::List list;
try {
- if (!isQMFv2(msg.getMessage()) || !headers)
+ if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties)
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
string content = msg.getMessage().getFrames().getContent();
amqp_0_10::ListCodec::decode(content, list);
+ QPID_LOG(trace, "Broker replicator received: " << *messageProperties);
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
@@ -295,6 +304,10 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
+ if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) {
+ // We have received all of the exchange response.
+ alternates.clear();
+ }
}
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
@@ -321,19 +334,10 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
broker.getQueues().destroy(name);
stopQueueReplicator(name);
}
- std::pair<boost::shared_ptr<Queue>, bool> result =
- broker.createQueue(
- name,
- values[DURABLE].asBool(),
- autoDel,
- 0, // no owner regardless of exclusivity on primary
- // FIXME aconway 2012-07-06: handle alternate exchange
- values[ALTEX].asString(),
- args,
- userId,
- remoteHost);
- assert(result.second); // Should be true since we destroyed existing queue above
- startQueueReplicator(result.first);
+ boost::shared_ptr<Queue> queue = createQueue(
+ name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
+ assert(queue); // Should be created since we destroed the previous queue above.
+ if (queue) startQueueReplicator(queue);
}
}
@@ -371,17 +375,9 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
broker.getExchanges().destroy(name);
QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
}
- std::pair<boost::shared_ptr<Exchange>, bool> result =
- broker.createExchange(
- name,
- values[EXTYPE].asString(),
- values[DURABLE].asBool(),
- // FIXME aconway 2012-07-06: handle alternate exchanges
- values[ALTEX].asString(),
- args,
- userId,
- remoteHost);
- assert(result.second);
+ boost::shared_ptr<Exchange> exchange =
+ createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString());
+ assert(exchange);
}
}
@@ -444,6 +440,22 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
haBroker.setMembership(members);
}
+namespace {
+
+// Get the alternate exchange from the exchange field of a queue or exchange response.
+static const string EXCHANGE_KEY_PREFIX("org.apache.qpid.broker:exchange:");
+
+string getAltExchange(const types::Variant& var) {
+ if (!var.isVoid()) {
+ management::ObjectId oid(var);
+ string key = oid.getV2Key();
+ if (key.find(EXCHANGE_KEY_PREFIX) != 0) throw Exception("Invalid exchange reference: "+key);
+ return key.substr(EXCHANGE_KEY_PREFIX.size());
+ }
+ else return string();
+}
+}
+
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.isReplicated(
@@ -456,22 +468,12 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Queue response: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- std::pair<boost::shared_ptr<Queue>, bool> result =
- broker.createQueue(
- name,
- values[DURABLE].asBool(),
- values[AUTODELETE].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- ""/*TODO: need to include alternate-exchange*/,
- args,
- userId,
- remoteHost);
-
+ boost::shared_ptr<Queue> queue =
+ createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+ getAltExchange(values[ALTEXCHANGE]));
// It is normal for the queue to already exist if we are failing over.
- if (result.second)
- startQueueReplicator(result.first);
- else
- QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+ if (queue) startQueueReplicator(queue);
+ else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -481,16 +483,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- bool created = broker.createExchange(
- name,
- values[TYPE].asString(),
- values[DURABLE].asBool(),
- "", // FIXME aconway 2012-07-09: need to include alternate-exchange
- args,
- userId,
- remoteHost
- ).second;
- QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name);
+ boost::shared_ptr<Exchange> exchange = createExchange(
+ name, values[TYPE].asString(), values[DURABLE].asBool(), args,
+ getAltExchange(values[ALTEXCHANGE]));
+ QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
}
namespace {
@@ -576,6 +572,60 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) {
}
}
+boost::shared_ptr<Queue> BrokerReplicator::createQueue(
+ const std::string& name,
+ bool durable,
+ bool autodelete,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& alternateExchange)
+{
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ durable,
+ autodelete,
+ 0, // no owner regardless of exclusivity on primary
+ string(), // Set alternate exchange below
+ arguments,
+ userId,
+ remoteHost);
+ if (result.second) {
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
+ }
+ return result.first;
+ }
+ else return boost::shared_ptr<Queue>();
+}
+
+boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const qpid::framing::FieldTable& args,
+ const std::string& alternateExchange)
+{
+ std::pair<boost::shared_ptr<Exchange>, bool> result =
+ broker.createExchange(
+ name,
+ type,
+ durable,
+ string(), // Set alternate exchange below
+ args,
+ userId,
+ remoteHost);
+ if (result.second) {
+ alternates.addExchange(result.first);
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
+ }
+ return result.first;
+ }
+ else return boost::shared_ptr<Exchange>();
+}
+
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index e2ca8f9e14..dbe4822d74 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -24,8 +24,10 @@
#include "types.h"
#include "ReplicationTest.h"
+#include "AlternateExchangeSetter.h"
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
+#include "qpid/management/ManagementObject.h"
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -95,6 +97,20 @@ class BrokerReplicator : public broker::Exchange,
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
void stopQueueReplicator(const std::string& name);
+ boost::shared_ptr<broker::Queue> createQueue(
+ const std::string& name,
+ bool durable,
+ bool autodelete,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& alternateExchange);
+
+ boost::shared_ptr<broker::Exchange> createExchange(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const qpid::framing::FieldTable& args,
+ const std::string& alternateExchange);
+
std::string logPrefix;
std::string userId, remoteHost;
ReplicationTest replicationTest;
@@ -102,6 +118,7 @@ class BrokerReplicator : public broker::Exchange,
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
bool initialized;
+ AlternateExchangeSetter alternates;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f900a841d5..6f8fa344d5 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
@@ -63,6 +63,7 @@ class HaBroker(Broker):
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
"--log-enable=debug+:ha::",
+ "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12:
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
@@ -188,7 +189,7 @@ class HaCluster(object):
self.broker_id += 1
return name
- def start(self, update_urls=True):
+ def start(self, update_urls=True, args=[]):
"""Start a new broker in the cluster"""
b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
self._brokers.append(b)
@@ -758,6 +759,52 @@ acl deny all all
s1.sender("ex").send("foo");
self.assertEqual(s1.receiver("q").fetch().content, "foo")
+ def test_alterante_exchange(self):
+ """Verify that alternate-exchange on exchanges and queues is propagated
+ to new members of a cluster. """
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session()
+ # altex exchange: acts as alternate exchange
+ s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+ # altq queue bound to altex, collect re-routed messages.
+ s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+ # 0ex exchange with alternate-exchange altex and no queues bound
+ s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # create queue q with alternate-exchange altex
+ s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
+ # create a bunch of exchanges to ensure we don't clean up prematurely if the
+ # response comes in multiple fragments.
+ for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+
+ def verify(broker):
+ s = broker.connect().session()
+ # Verify unmatched message goes to ex's alternate.
+ s.sender("0ex").send("foo")
+ altq = s.receiver("altq")
+ self.assertEqual("foo", altq.fetch(timeout=0).content)
+ s.acknowledge()
+ # Verify rejected message goes to q's alternate.
+ s.sender("q").send("bar")
+ msg = s.receiver("q").fetch(timeout=0)
+ self.assertEqual("bar", msg.content)
+ s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+ self.assertEqual("bar", altq.fetch(timeout=0).content)
+ s.acknowledge()
+
+ # Sanity check: alternate exchanges on original broker
+ verify(cluster[0])
+ # Check backup that was connected during setup.
+ cluster[1].wait_backup("0ex")
+ cluster[1].wait_backup("q")
+ cluster.bounce(0)
+ verify(cluster[1])
+ # Check a newly started backup.
+ cluster.start()
+ cluster[2].wait_backup("0ex")
+ cluster[2].wait_backup("q")
+ cluster.bounce(1)
+ verify(cluster[2])
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit