diff options
| author | Alan Conway <aconway@apache.org> | 2012-12-20 18:28:58 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-12-20 18:28:58 +0000 |
| commit | 3fc13a8214400aa0da17520b93996d04a59a09df (patch) | |
| tree | 4f52fc75ecb10b1684b14cbd4faa937937e42693 /cpp | |
| parent | 83cc609d577b76f4994d4c68ac88f462f371fa4d (diff) | |
| download | qpid-python-3fc13a8214400aa0da17520b93996d04a59a09df.tar.gz | |
Bug 886656 - HA backup broker does not properly increment the alternate exchange user count
Set alternate exchange in-use counters correctly on backup brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424617 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 4 | ||||
| -rwxr-xr-x | cpp/src/tests/ha_tests.py | 35 |
6 files changed, 40 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e5ab970a55..18618ee005 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -1112,6 +1112,10 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( void Broker::deleteQueue(const std::string& name, const std::string& userId, const std::string& connectionId, QueueFunctor check) { + QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name + << " user:" << userId + << " rhost:" << connectionId + ); if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); } @@ -1126,11 +1130,6 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, } else { throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); } - QPID_LOG_CAT(debug, model, "Delete queue. name:" << name - << " user:" << userId - << " rhost:" << connectionId - ); - } std::pair<Exchange::shared_ptr, bool> Broker::createExchange( @@ -1177,6 +1176,9 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( void Broker::deleteExchange(const std::string& name, const std::string& userId, const std::string& connectionId) { + QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name + << " user:" << userId + << " rhost:" << connectionId); if (acl) { if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); @@ -1187,13 +1189,10 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, } Exchange::shared_ptr exchange(exchanges.get(name)); if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); - if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); + if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange.")); if (exchange->isDurable()) store->destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); exchanges.destroy(name, connectionId, userId); - QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name - << " user:" << userId - << " rhost:" << connectionId); } void Broker::bind(const std::string& queueName, diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index bc77f53a9a..f71dbc7351 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -223,6 +223,7 @@ Exchange::~Exchange () void Exchange::setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; + alternate->incAlternateUsers(); if (mgmtExchange != 0) { if (alternate.get() != 0) mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId()); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index bc6a20ff9a..46175fd2e4 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -79,10 +79,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare( } exchanges[name] = exchange; result = std::pair<Exchange::shared_ptr, bool>(exchange, true); - if (alternate) { - exchange->setAlternate(alternate); - alternate->incAlternateUsers(); - } + if (alternate) exchange->setAlternate(alternate); // Call exchangeCreate inside the lock to ensure correct ordering. if (broker) broker->getConfigurationObservers().exchangeCreate(exchange); } else { diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e43dde386e..5d75583de1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -1149,6 +1149,7 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) { alternateExchange = exchange; + alternateExchange->incAlternateUsers(); if (mgmtObject) { if (exchange.get() != 0) mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId()); diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index b59eb530f0..576d0f198b 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -60,10 +60,8 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, if (i == queues.end()) { Queue::shared_ptr queue = create(name, settings); //Move this to factory also? - if (alternate) { + if (alternate) queue->setAlternateExchange(alternate);//need to do this *before* create - alternate->incAlternateUsers(); - } if (!recovering) { //create persistent record if required queue->create(); diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index b29ff42627..fdbd8a153b 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4 from brokertest import * from ha_test import * @@ -613,22 +613,24 @@ acl deny all all to new members of a cluster. """ cluster = HaCluster(self, 2) s = cluster[0].connect().session() + cluster[0].wait_status("active") + cluster[1].wait_status("ready") # altex exchange: acts as alternate exchange s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") # altq queue bound to altex, collect re-routed messages. s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") - # 0ex exchange with alternate-exchange altex and no queues bound - s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # ex exchange with alternate-exchange altex and no queues bound + s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") # create queue q with alternate-exchange altex s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") # create a bunch of exchanges to ensure we don't clean up prematurely if the # response comes in multiple fragments. - for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i) def verify(broker): s = broker.connect().session() # Verify unmatched message goes to ex's alternate. - s.sender("0ex").send("foo") + s.sender("ex").send("foo") altq = s.receiver("altq") self.assertEqual("foo", altq.fetch(timeout=0).content) s.acknowledge() @@ -640,20 +642,39 @@ acl deny all all self.assertEqual("bar", altq.fetch(timeout=0).content) s.acknowledge() + def ss(n): return cluster[n].connect().session() + # Sanity check: alternate exchanges on original broker verify(cluster[0]) + # Altex is in use as an alternate exchange. + self.assertRaises(SessionError, + lambda:ss(0).sender("altex;{delete:always}").close()) # Check backup that was connected during setup. - cluster[1].wait_backup("0ex") + cluster[1].wait_status("ready") + cluster[1].wait_backup("ex") cluster[1].wait_backup("q") cluster.bounce(0) verify(cluster[1]) + # Check a newly started backup. cluster.start() - cluster[2].wait_backup("0ex") + cluster[2].wait_status("ready") + cluster[2].wait_backup("ex") cluster[2].wait_backup("q") cluster.bounce(1) verify(cluster[2]) + # Check that alt-exchange in-use count is replicated + s = cluster[2].connect().session(); + + self.assertRaises(SessionError, + lambda:ss(2).sender("altex;{delete:always}").close()) + s.sender("q;{delete:always}").close() + self.assertRaises(SessionError, + lambda:ss(2).sender("altex;{delete:always}").close()) + s.sender("ex;{delete:always}").close() + s.sender("altex;{delete:always}").close() + def test_priority_reroute(self): """Regression test for QPID-4262, rerouting messages from a priority queue to itself causes a crash""" |
