summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-12-20 18:28:58 +0000
committerAlan Conway <aconway@apache.org>2012-12-20 18:28:58 +0000
commit3fc13a8214400aa0da17520b93996d04a59a09df (patch)
tree4f52fc75ecb10b1684b14cbd4faa937937e42693 /cpp
parent83cc609d577b76f4994d4c68ac88f462f371fa4d (diff)
downloadqpid-python-3fc13a8214400aa0da17520b93996d04a59a09df.tar.gz
Bug 886656 - HA backup broker does not properly increment the alternate exchange user count
Set alternate exchange in-use counters correctly on backup brokers. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424617 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp17
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp1
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp5
-rw-r--r--cpp/src/qpid/broker/Queue.cpp1
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp4
-rwxr-xr-xcpp/src/tests/ha_tests.py35
6 files changed, 40 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index e5ab970a55..18618ee005 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -1112,6 +1112,10 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
void Broker::deleteQueue(const std::string& name, const std::string& userId,
const std::string& connectionId, QueueFunctor check)
{
+ QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ );
if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
}
@@ -1126,11 +1130,6 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId,
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
- QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId
- );
-
}
std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
@@ -1177,6 +1176,9 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
void Broker::deleteExchange(const std::string& name, const std::string& userId,
const std::string& connectionId)
{
+ QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId);
if (acl) {
if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
@@ -1187,13 +1189,10 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
}
Exchange::shared_ptr exchange(exchanges.get(name));
if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
- if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange."));
if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name, connectionId, userId);
- QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId);
}
void Broker::bind(const std::string& queueName,
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index bc77f53a9a..f71dbc7351 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -223,6 +223,7 @@ Exchange::~Exchange ()
void Exchange::setAlternate(Exchange::shared_ptr _alternate)
{
alternate = _alternate;
+ alternate->incAlternateUsers();
if (mgmtExchange != 0) {
if (alternate.get() != 0)
mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index bc6a20ff9a..46175fd2e4 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -79,10 +79,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
}
exchanges[name] = exchange;
result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
- if (alternate) {
- exchange->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
+ if (alternate) exchange->setAlternate(alternate);
// Call exchangeCreate inside the lock to ensure correct ordering.
if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
} else {
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e43dde386e..5d75583de1 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -1149,6 +1149,7 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
{
alternateExchange = exchange;
+ alternateExchange->incAlternateUsers();
if (mgmtObject) {
if (exchange.get() != 0)
mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index b59eb530f0..576d0f198b 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -60,10 +60,8 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
if (i == queues.end()) {
Queue::shared_ptr queue = create(name, settings);
//Move this to factory also?
- if (alternate) {
+ if (alternate)
queue->setAlternateExchange(alternate);//need to do this *before* create
- alternate->incAlternateUsers();
- }
if (!recovering) {
//create persistent record if required
queue->create();
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index b29ff42627..fdbd8a153b 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from qpid.datatypes import uuid4
from brokertest import *
from ha_test import *
@@ -613,22 +613,24 @@ acl deny all all
to new members of a cluster. """
cluster = HaCluster(self, 2)
s = cluster[0].connect().session()
+ cluster[0].wait_status("active")
+ cluster[1].wait_status("ready")
# altex exchange: acts as alternate exchange
s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
# altq queue bound to altex, collect re-routed messages.
s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
- # 0ex exchange with alternate-exchange altex and no queues bound
- s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # ex exchange with alternate-exchange altex and no queues bound
+ s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
# create queue q with alternate-exchange altex
s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
# create a bunch of exchanges to ensure we don't clean up prematurely if the
# response comes in multiple fragments.
- for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+ for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i)
def verify(broker):
s = broker.connect().session()
# Verify unmatched message goes to ex's alternate.
- s.sender("0ex").send("foo")
+ s.sender("ex").send("foo")
altq = s.receiver("altq")
self.assertEqual("foo", altq.fetch(timeout=0).content)
s.acknowledge()
@@ -640,20 +642,39 @@ acl deny all all
self.assertEqual("bar", altq.fetch(timeout=0).content)
s.acknowledge()
+ def ss(n): return cluster[n].connect().session()
+
# Sanity check: alternate exchanges on original broker
verify(cluster[0])
+ # Altex is in use as an alternate exchange.
+ self.assertRaises(SessionError,
+ lambda:ss(0).sender("altex;{delete:always}").close())
# Check backup that was connected during setup.
- cluster[1].wait_backup("0ex")
+ cluster[1].wait_status("ready")
+ cluster[1].wait_backup("ex")
cluster[1].wait_backup("q")
cluster.bounce(0)
verify(cluster[1])
+
# Check a newly started backup.
cluster.start()
- cluster[2].wait_backup("0ex")
+ cluster[2].wait_status("ready")
+ cluster[2].wait_backup("ex")
cluster[2].wait_backup("q")
cluster.bounce(1)
verify(cluster[2])
+ # Check that alt-exchange in-use count is replicated
+ s = cluster[2].connect().session();
+
+ self.assertRaises(SessionError,
+ lambda:ss(2).sender("altex;{delete:always}").close())
+ s.sender("q;{delete:always}").close()
+ self.assertRaises(SessionError,
+ lambda:ss(2).sender("altex;{delete:always}").close())
+ s.sender("ex;{delete:always}").close()
+ s.sender("altex;{delete:always}").close()
+
def test_priority_reroute(self):
"""Regression test for QPID-4262, rerouting messages from a priority queue
to itself causes a crash"""