summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-04-18 13:49:28 +0000
committerAlan Conway <aconway@apache.org>2014-04-18 13:49:28 +0000
commit52e5feddc5ac7c2d7402a8071847acc09753efb0 (patch)
treee61f9662a4d29773318a3997dfa9ec30eb26698b
parent9ed3cad52a411a1386c4ae378392f5027def2b37 (diff)
downloadqpid-python-52e5feddc5ac7c2d7402a8071847acc09753efb0.tar.gz
QPID-5666: HA fails with resource-limit-exceeded: Exceeded replicated queue limit
This is regression introduced in r1561206: CommitDate: Fri Jan 24 21:54:59 2014 +0000 QPID-5513: HA backup fails if number of replicated queues exceeds number of channels. Fixed by the current commit. PrimaryQueueLimits was not taking account of queues already on the broker prior to promotion. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.28@1588471 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h27
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.cpp15
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h18
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py13
7 files changed, 55 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index b4d50d1652..af4ae12177 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -136,7 +136,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
- queueLimits(logPrefix)
+ queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
{
// Note that at this point, we are still rejecting client connections.
// So we are safe from client interference while we set up the primary.
diff --git a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
index a2322f1545..d614a48099 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
@@ -22,9 +22,12 @@
*
*/
+#include "ReplicationTest.h"
#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueRegistry.h>
#include <qpid/framing/amqp_types.h>
#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
#include <string>
namespace qpid {
@@ -45,8 +48,15 @@ class PrimaryQueueLimits
{
public:
// FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max
- PrimaryQueueLimits(const std::string& lp) :
- logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) {}
+ PrimaryQueueLimits(const std::string& lp,
+ broker::QueueRegistry& qr,
+ const ReplicationTest& rt
+ ) :
+ logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0)
+ {
+ // Get initial count of replicated queues
+ qr.eachQueue(boost::bind(&PrimaryQueueLimits::addQueueIfReplicated, this, _1, rt));
+ }
/** Add a replicated queue
*@exception ResourceLimitExceededException if this would exceed the limit.
@@ -57,15 +67,22 @@ class PrimaryQueueLimits
<< " exceeds limit of " << maxQueues
<< " replicated queues.");
throw framing::ResourceLimitExceededException(
- "Exceeded replicated queue limit.");
+ Msg() << "Exceeded replicated queue limit " << queues << " >= " << maxQueues);
}
else ++queues;
}
+ void addQueueIfReplicated(const boost::shared_ptr<broker::Queue>& q, const ReplicationTest& rt) {
+ if(rt.useLevel(*q)) addQueue(q);
+ }
+
/** Remove a replicated queue.
* @pre Was previously added with addQueue
*/
- void removeQueue(const boost::shared_ptr<broker::Queue>&) { --queues; }
+ void removeQueue(const boost::shared_ptr<broker::Queue>&) {
+ assert(queues != 0);
+ --queues;
+ }
// TODO aconway 2014-01-24: Currently replication links always use the
// hard-coded framing::CHANNEL_MAX. In future (e.g. when we support AMQP1.0
@@ -83,7 +100,7 @@ class PrimaryQueueLimits
std::string logPrefix;
uint64_t maxQueues;
uint64_t queues;
-};
+};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 0993c6ea39..bf3d779151 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -41,7 +41,7 @@ RemoteBackup::RemoteBackup(
std::ostringstream oss;
oss << "Remote backup at " << info << ": ";
logPrefix = oss.str();
- QPID_LOG(debug, logPrefix << "Connected");
+ QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected"));
}
RemoteBackup::~RemoteBackup() {
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
index 647523ef2c..d2152363fe 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -29,20 +29,20 @@ namespace ha {
using types::Variant;
-ReplicateLevel ReplicationTest::getLevel(const std::string& str) {
+ReplicateLevel ReplicationTest::getLevel(const std::string& str) const {
Enum<ReplicateLevel> rl(replicateDefault);
if (!str.empty()) rl.parse(str);
return rl.get();
}
-ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) {
+ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) const {
if (f.isSet(QPID_REPLICATE))
return getLevel(f.getAsString(QPID_REPLICATE));
else
return replicateDefault;
}
-ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
+ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) const {
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end())
return getLevel(i->second.asString());
@@ -50,7 +50,7 @@ ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
return replicateDefault;
}
-ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
+ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) const {
const Variant::Map& qmap(q.getSettings().original);
Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE);
if (i != qmap.end())
@@ -59,16 +59,15 @@ ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
return getLevel(q.getSettings().storeSettings);
}
-ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) {
+ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) const {
return getLevel(ex.getArgs());
}
-ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q)
-{
+ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) const {
return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) : getLevel(q);
}
-ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) {
+ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) const {
return ReplicationTest::getLevel(ex);
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index c157385ce6..8fe74ee959 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -56,18 +56,18 @@ class ReplicationTest
replicateDefault(replicateDefault_) {}
// Get the replication level set on an object, or default if not set.
- ReplicateLevel getLevel(const std::string& str);
- ReplicateLevel getLevel(const framing::FieldTable& f);
- ReplicateLevel getLevel(const types::Variant::Map& m);
- ReplicateLevel getLevel(const broker::Queue&);
- ReplicateLevel getLevel(const broker::Exchange&);
+ ReplicateLevel getLevel(const std::string& str) const;
+ ReplicateLevel getLevel(const framing::FieldTable& f) const;
+ ReplicateLevel getLevel(const types::Variant::Map& m) const;
+ ReplicateLevel getLevel(const broker::Queue&) const;
+ ReplicateLevel getLevel(const broker::Exchange&) const;
// Calculate level for objects that may not have replication set,
// including auto-delete/exclusive settings.
- ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive);
- ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive);
- ReplicateLevel useLevel(const broker::Queue&);
- ReplicateLevel useLevel(const broker::Exchange&);
+ ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const;
+ ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const;
+ ReplicateLevel useLevel(const broker::Queue&) const;
+ ReplicateLevel useLevel(const broker::Exchange&) const;
private:
ReplicateLevel replicateDefault;
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 748c8ef0c1..0f92f7dbcc 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -196,7 +196,7 @@ acl allow all all
def ha_status(self): return self.qmf().status
- def wait_status(self, status):
+ def wait_status(self, status, timeout=5):
def try_get_status():
self._status = "<unknown>"
# Ignore ConnectionError, the broker may not be up yet.
@@ -204,7 +204,7 @@ acl allow all all
self._status = self.ha_status()
return self._status == status;
except ConnectionError: return False
- assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%(
+ assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
self, status, self._status)
def wait_queue(self, queue, timeout=1):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index abc62b643e..f22e12a355 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -885,6 +885,19 @@ acl deny all all
old_sess.exchange_declare(exchange='ex1', type='fanout')
cluster[1].wait_backup("ex1")
+ def test_resource_limit_bug(self):
+ """QPID-5666 Regression test: Incorrect resource limit exception for queue creation."""
+ cluster = HaCluster(self, 3)
+ qs = ["q%s"%i for i in xrange(10)]
+ s = cluster[0].connect().session()
+ s.sender("q;{create:always}").close()
+ cluster.kill(0)
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ s = cluster[1].connect().session()
+ s.receiver("q;{delete:always}").close()
+ s.sender("qq;{create:always}").close()
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit