diff options
| author | Alan Conway <aconway@apache.org> | 2012-10-31 01:08:41 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-10-31 01:08:41 +0000 |
| commit | f05b6fdf2077ec1ef6efa65f25f750ee101736a4 (patch) | |
| tree | d39779407fa7009a16bb14ebbf08ab788c58383c /qpid/cpp/src | |
| parent | 1762961137051d64d56c93876bf675c507781e1b (diff) | |
| download | qpid-python-f05b6fdf2077ec1ef6efa65f25f750ee101736a4.tar.gz | |
QPID-4401: HA bindings for QMF exchanges not replicated.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1403946 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 27 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 24 |
3 files changed, 28 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index ab29356fef..0851236f4a 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -321,9 +321,9 @@ Broker::Broker(const Broker::Options& conf) : std::string qmfDirect("qmf.default.direct"); std::pair<Exchange::shared_ptr, bool> topicPair( - exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, noReplicateArgs())); + exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false)); std::pair<Exchange::shared_ptr, bool> directPair( - exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, noReplicateArgs())); + exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false)); boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2); boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index cc1aff798e..5769538982 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -613,14 +613,6 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { throw Exception(QPID_MSG("Unexpected queue response: " << values)); if (!queueTracker->response(name)) return; // Response is out-of-date QPID_LOG(debug, logPrefix << "Queue response: " << name); - if (broker.getQueues().find(name)) { // Already exists - if (findQueueReplicator(name)) - return; // Already replicated - else { - QPID_LOG(debug, logPrefix << "Deleting queue to make way for replica: " << name); - broker.deleteQueue(name, userId, remoteHost); - } - } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); boost::shared_ptr<QueueReplicator> qr = replicateQueue( @@ -642,21 +634,11 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { throw Exception(QPID_MSG("Unexpected exchange response: " << values)); if (!exchangeTracker->response(name)) return; // Response is out of date. QPID_LOG(debug, logPrefix << "Exchange response: " << name); - if (broker.getExchanges().find(name)) { - if (replicatedExchanges.find(name) != replicatedExchanges.end()) - return; // Already replicated - else { - QPID_LOG(debug, logPrefix << "Deleting exchange to make way for replica: " - << name); - broker.deleteExchange(name, userId, remoteHost); - } - } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); CreateExchangeResult result = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - assert(result.second); replicatedExchanges.insert(name); } @@ -778,11 +760,9 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue( remoteHost); boost::shared_ptr<QueueReplicator> qr; if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first); - if (result.second) { - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); - } + if (result.second && !alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); } return qr; } @@ -803,7 +783,6 @@ BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( args, userId, remoteHost); - alternates.addExchange(result.first); if (!alternateExchange.empty()) { alternates.setAlternate( diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index ba7b4cb638..2c7e164c42 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -26,7 +26,7 @@ from brokertest import * from ha_test import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO -from qpidtoollibs import BrokerAgent +from qpidtoollibs import BrokerAgent, EventHelper from uuid import UUID log = getLogger(__name__) @@ -801,6 +801,27 @@ acl deny all all # The backup does not log this as an error so we only check the backup log for errors. self.assert_log_no_errors(cluster[1]) + def test_qmf_replication(self): + """QPID-4401: Verify that QMF built-in exchanges have default replication""" + cluster = HaCluster(self, 2) + sn = cluster.connect(0).session() + events = sn.receiver("events;{create:always,node:{x-bindings:[{exchange:'qmf.default.topic',queue:'events',key:'agent.ind.event.org_apache_qpid_broker.#'}]}}") + def verify_qmf_events(qname): + sn.sender("%s;{create:always}"%(qname)).close() # Generate a QMF event + found = False + try: + while not found: + m = events.fetch(timeout=1) # Receive + def class_name(m): return m.content[0]['_schema_id']['_class_name'] + def q_name(m): return m.content[0]['_values']['qName'] + if class_name(m) == 'queueDeclare' and q_name(m) == qname: found = True + except Empty: pass + assert(found) + verify_qmf_events("q1") + cluster[1].wait_status("ready") + cluster.kill(0) + verify_qmf_events("q2") + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -936,6 +957,7 @@ class RecoveryTests(HaBrokerTest): for b in cluster: b.wait_backup("q1") for i in xrange(100): s1.send(str(i)) # Kill primary and 2 backups + cluster[3].wait_status("ready") for i in [0,1,2]: cluster.kill(i, False) cluster[3].promote() # New primary, backups will be 1 and 2 cluster[3].wait_status("recovering") |
