summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-10-31 01:08:41 +0000
committerAlan Conway <aconway@apache.org>2012-10-31 01:08:41 +0000
commitf05b6fdf2077ec1ef6efa65f25f750ee101736a4 (patch)
treed39779407fa7009a16bb14ebbf08ab788c58383c /qpid/cpp/src
parent1762961137051d64d56c93876bf675c507781e1b (diff)
downloadqpid-python-f05b6fdf2077ec1ef6efa65f25f750ee101736a4.tar.gz
QPID-4401: HA bindings for QMF exchanges not replicated.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1403946 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp27
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py24
3 files changed, 28 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index ab29356fef..0851236f4a 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -321,9 +321,9 @@ Broker::Broker(const Broker::Options& conf) :
std::string qmfDirect("qmf.default.direct");
std::pair<Exchange::shared_ptr, bool> topicPair(
- exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, noReplicateArgs()));
+ exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false));
std::pair<Exchange::shared_ptr, bool> directPair(
- exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, noReplicateArgs()));
+ exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false));
boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index cc1aff798e..5769538982 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -613,14 +613,6 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
throw Exception(QPID_MSG("Unexpected queue response: " << values));
if (!queueTracker->response(name)) return; // Response is out-of-date
QPID_LOG(debug, logPrefix << "Queue response: " << name);
- if (broker.getQueues().find(name)) { // Already exists
- if (findQueueReplicator(name))
- return; // Already replicated
- else {
- QPID_LOG(debug, logPrefix << "Deleting queue to make way for replica: " << name);
- broker.deleteQueue(name, userId, remoteHost);
- }
- }
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<QueueReplicator> qr = replicateQueue(
@@ -642,21 +634,11 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
throw Exception(QPID_MSG("Unexpected exchange response: " << values));
if (!exchangeTracker->response(name)) return; // Response is out of date.
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
- if (broker.getExchanges().find(name)) {
- if (replicatedExchanges.find(name) != replicatedExchanges.end())
- return; // Already replicated
- else {
- QPID_LOG(debug, logPrefix << "Deleting exchange to make way for replica: "
- << name);
- broker.deleteExchange(name, userId, remoteHost);
- }
- }
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
CreateExchangeResult result = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- assert(result.second);
replicatedExchanges.insert(name);
}
@@ -778,11 +760,9 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
remoteHost);
boost::shared_ptr<QueueReplicator> qr;
if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first);
- if (result.second) {
- if (!alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
- }
+ if (result.second && !alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
}
return qr;
}
@@ -803,7 +783,6 @@ BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
args,
userId,
remoteHost);
-
alternates.addExchange(result.first);
if (!alternateExchange.empty()) {
alternates.setAlternate(
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index ba7b4cb638..2c7e164c42 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -26,7 +26,7 @@ from brokertest import *
from ha_test import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
-from qpidtoollibs import BrokerAgent
+from qpidtoollibs import BrokerAgent, EventHelper
from uuid import UUID
log = getLogger(__name__)
@@ -801,6 +801,27 @@ acl deny all all
# The backup does not log this as an error so we only check the backup log for errors.
self.assert_log_no_errors(cluster[1])
+ def test_qmf_replication(self):
+ """QPID-4401: Verify that QMF built-in exchanges have default replication"""
+ cluster = HaCluster(self, 2)
+ sn = cluster.connect(0).session()
+ events = sn.receiver("events;{create:always,node:{x-bindings:[{exchange:'qmf.default.topic',queue:'events',key:'agent.ind.event.org_apache_qpid_broker.#'}]}}")
+ def verify_qmf_events(qname):
+ sn.sender("%s;{create:always}"%(qname)).close() # Generate a QMF event
+ found = False
+ try:
+ while not found:
+ m = events.fetch(timeout=1) # Receive
+ def class_name(m): return m.content[0]['_schema_id']['_class_name']
+ def q_name(m): return m.content[0]['_values']['qName']
+ if class_name(m) == 'queueDeclare' and q_name(m) == qname: found = True
+ except Empty: pass
+ assert(found)
+ verify_qmf_events("q1")
+ cluster[1].wait_status("ready")
+ cluster.kill(0)
+ verify_qmf_events("q2")
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -936,6 +957,7 @@ class RecoveryTests(HaBrokerTest):
for b in cluster: b.wait_backup("q1")
for i in xrange(100): s1.send(str(i))
# Kill primary and 2 backups
+ cluster[3].wait_status("ready")
for i in [0,1,2]: cluster.kill(i, False)
cluster[3].promote() # New primary, backups will be 1 and 2
cluster[3].wait_status("recovering")