From 3fc13a8214400aa0da17520b93996d04a59a09df Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 20 Dec 2012 18:28:58 +0000 Subject: Bug 886656 - HA backup broker does not properly increment the alternate exchange user count Set alternate exchange in-use counters correctly on backup brokers. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424617 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.cpp | 17 ++++++++-------- cpp/src/qpid/broker/Exchange.cpp | 1 + cpp/src/qpid/broker/ExchangeRegistry.cpp | 5 +---- cpp/src/qpid/broker/Queue.cpp | 1 + cpp/src/qpid/broker/QueueRegistry.cpp | 4 +--- cpp/src/tests/ha_tests.py | 35 +++++++++++++++++++++++++------- 6 files changed, 40 insertions(+), 23 deletions(-) (limited to 'cpp') diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e5ab970a55..18618ee005 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -1112,6 +1112,10 @@ std::pair, bool> Broker::createQueue( void Broker::deleteQueue(const std::string& name, const std::string& userId, const std::string& connectionId, QueueFunctor check) { + QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name + << " user:" << userId + << " rhost:" << connectionId + ); if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); } @@ -1126,11 +1130,6 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, } else { throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); } - QPID_LOG_CAT(debug, model, "Delete queue. name:" << name - << " user:" << userId - << " rhost:" << connectionId - ); - } std::pair Broker::createExchange( @@ -1177,6 +1176,9 @@ std::pair Broker::createExchange( void Broker::deleteExchange(const std::string& name, const std::string& userId, const std::string& connectionId) { + QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name + << " user:" << userId + << " rhost:" << connectionId); if (acl) { if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); @@ -1187,13 +1189,10 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, } Exchange::shared_ptr exchange(exchanges.get(name)); if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); - if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); + if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange.")); if (exchange->isDurable()) store->destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); exchanges.destroy(name, connectionId, userId); - QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name - << " user:" << userId - << " rhost:" << connectionId); } void Broker::bind(const std::string& queueName, diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index bc77f53a9a..f71dbc7351 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -223,6 +223,7 @@ Exchange::~Exchange () void Exchange::setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; + alternate->incAlternateUsers(); if (mgmtExchange != 0) { if (alternate.get() != 0) mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId()); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index bc6a20ff9a..46175fd2e4 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -79,10 +79,7 @@ pair ExchangeRegistry::declare( } exchanges[name] = exchange; result = std::pair(exchange, true); - if (alternate) { - exchange->setAlternate(alternate); - alternate->incAlternateUsers(); - } + if (alternate) exchange->setAlternate(alternate); // Call exchangeCreate inside the lock to ensure correct ordering. if (broker) broker->getConfigurationObservers().exchangeCreate(exchange); } else { diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e43dde386e..5d75583de1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -1149,6 +1149,7 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) void Queue::setAlternateExchange(boost::shared_ptr exchange) { alternateExchange = exchange; + alternateExchange->incAlternateUsers(); if (mgmtObject) { if (exchange.get() != 0) mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId()); diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index b59eb530f0..576d0f198b 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -60,10 +60,8 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, if (i == queues.end()) { Queue::shared_ptr queue = create(name, settings); //Move this to factory also? - if (alternate) { + if (alternate) queue->setAlternateExchange(alternate);//need to do this *before* create - alternate->incAlternateUsers(); - } if (!recovering) { //create persistent record if required queue->create(); diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index b29ff42627..fdbd8a153b 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4 from brokertest import * from ha_test import * @@ -613,22 +613,24 @@ acl deny all all to new members of a cluster. """ cluster = HaCluster(self, 2) s = cluster[0].connect().session() + cluster[0].wait_status("active") + cluster[1].wait_status("ready") # altex exchange: acts as alternate exchange s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") # altq queue bound to altex, collect re-routed messages. s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") - # 0ex exchange with alternate-exchange altex and no queues bound - s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # ex exchange with alternate-exchange altex and no queues bound + s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") # create queue q with alternate-exchange altex s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") # create a bunch of exchanges to ensure we don't clean up prematurely if the # response comes in multiple fragments. - for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i) def verify(broker): s = broker.connect().session() # Verify unmatched message goes to ex's alternate. - s.sender("0ex").send("foo") + s.sender("ex").send("foo") altq = s.receiver("altq") self.assertEqual("foo", altq.fetch(timeout=0).content) s.acknowledge() @@ -640,20 +642,39 @@ acl deny all all self.assertEqual("bar", altq.fetch(timeout=0).content) s.acknowledge() + def ss(n): return cluster[n].connect().session() + # Sanity check: alternate exchanges on original broker verify(cluster[0]) + # Altex is in use as an alternate exchange. + self.assertRaises(SessionError, + lambda:ss(0).sender("altex;{delete:always}").close()) # Check backup that was connected during setup. - cluster[1].wait_backup("0ex") + cluster[1].wait_status("ready") + cluster[1].wait_backup("ex") cluster[1].wait_backup("q") cluster.bounce(0) verify(cluster[1]) + # Check a newly started backup. cluster.start() - cluster[2].wait_backup("0ex") + cluster[2].wait_status("ready") + cluster[2].wait_backup("ex") cluster[2].wait_backup("q") cluster.bounce(1) verify(cluster[2]) + # Check that alt-exchange in-use count is replicated + s = cluster[2].connect().session(); + + self.assertRaises(SessionError, + lambda:ss(2).sender("altex;{delete:always}").close()) + s.sender("q;{delete:always}").close() + self.assertRaises(SessionError, + lambda:ss(2).sender("altex;{delete:always}").close()) + s.sender("ex;{delete:always}").close() + s.sender("altex;{delete:always}").close() + def test_priority_reroute(self): """Regression test for QPID-4262, rerouting messages from a priority queue to itself causes a crash""" -- cgit v1.2.1