diff options
author | Alan Conway <aconway@apache.org> | 2014-04-18 13:49:28 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-04-18 13:49:28 +0000 |
commit | 52e5feddc5ac7c2d7402a8071847acc09753efb0 (patch) | |
tree | e61f9662a4d29773318a3997dfa9ec30eb26698b | |
parent | 9ed3cad52a411a1386c4ae378392f5027def2b37 (diff) | |
download | qpid-python-52e5feddc5ac7c2d7402a8071847acc09753efb0.tar.gz |
QPID-5666: HA fails with resource-limit-exceeded: Exceeded replicated queue limit
This is regression introduced in r1561206: CommitDate: Fri Jan 24 21:54:59 2014 +0000
QPID-5513: HA backup fails if number of replicated queues exceeds number of channels.
Fixed by the current commit. PrimaryQueueLimits was not taking account of queues already
on the broker prior to promotion.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.28@1588471 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.h | 18 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 13 |
7 files changed, 55 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index b4d50d1652..af4ae12177 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -136,7 +136,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : logPrefix("Primary: "), active(false), replicationTest(hb.getSettings().replicateDefault.get()), sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)), - queueLimits(logPrefix) + queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest) { // Note that at this point, we are still rejecting client connections. // So we are safe from client interference while we set up the primary. diff --git a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h index a2322f1545..d614a48099 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h +++ b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h @@ -22,9 +22,12 @@ * */ +#include "ReplicationTest.h" #include <qpid/broker/Queue.h> +#include <qpid/broker/QueueRegistry.h> #include <qpid/framing/amqp_types.h> #include <boost/shared_ptr.hpp> +#include <boost/bind.hpp> #include <string> namespace qpid { @@ -45,8 +48,15 @@ class PrimaryQueueLimits { public: // FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max - PrimaryQueueLimits(const std::string& lp) : - logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) {} + PrimaryQueueLimits(const std::string& lp, + broker::QueueRegistry& qr, + const ReplicationTest& rt + ) : + logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) + { + // Get initial count of replicated queues + qr.eachQueue(boost::bind(&PrimaryQueueLimits::addQueueIfReplicated, this, _1, rt)); + } /** Add a replicated queue *@exception ResourceLimitExceededException if this would exceed the limit. @@ -57,15 +67,22 @@ class PrimaryQueueLimits << " exceeds limit of " << maxQueues << " replicated queues."); throw framing::ResourceLimitExceededException( - "Exceeded replicated queue limit."); + Msg() << "Exceeded replicated queue limit " << queues << " >= " << maxQueues); } else ++queues; } + void addQueueIfReplicated(const boost::shared_ptr<broker::Queue>& q, const ReplicationTest& rt) { + if(rt.useLevel(*q)) addQueue(q); + } + /** Remove a replicated queue. * @pre Was previously added with addQueue */ - void removeQueue(const boost::shared_ptr<broker::Queue>&) { --queues; } + void removeQueue(const boost::shared_ptr<broker::Queue>&) { + assert(queues != 0); + --queues; + } // TODO aconway 2014-01-24: Currently replication links always use the // hard-coded framing::CHANNEL_MAX. In future (e.g. when we support AMQP1.0 @@ -83,7 +100,7 @@ class PrimaryQueueLimits std::string logPrefix; uint64_t maxQueues; uint64_t queues; -}; +}; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 0993c6ea39..bf3d779151 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -41,7 +41,7 @@ RemoteBackup::RemoteBackup( std::ostringstream oss; oss << "Remote backup at " << info << ": "; logPrefix = oss.str(); - QPID_LOG(debug, logPrefix << "Connected"); + QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected")); } RemoteBackup::~RemoteBackup() { diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp index 647523ef2c..d2152363fe 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp @@ -29,20 +29,20 @@ namespace ha { using types::Variant; -ReplicateLevel ReplicationTest::getLevel(const std::string& str) { +ReplicateLevel ReplicationTest::getLevel(const std::string& str) const { Enum<ReplicateLevel> rl(replicateDefault); if (!str.empty()) rl.parse(str); return rl.get(); } -ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) { +ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) const { if (f.isSet(QPID_REPLICATE)) return getLevel(f.getAsString(QPID_REPLICATE)); else return replicateDefault; } -ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) { +ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) const { Variant::Map::const_iterator i = m.find(QPID_REPLICATE); if (i != m.end()) return getLevel(i->second.asString()); @@ -50,7 +50,7 @@ ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) { return replicateDefault; } -ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) { +ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) const { const Variant::Map& qmap(q.getSettings().original); Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE); if (i != qmap.end()) @@ -59,16 +59,15 @@ ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) { return getLevel(q.getSettings().storeSettings); } -ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) { +ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) const { return getLevel(ex.getArgs()); } -ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) -{ +ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) const { return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) : getLevel(q); } -ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) { +ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) const { return ReplicationTest::getLevel(ex); } diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h index c157385ce6..8fe74ee959 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.h +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h @@ -56,18 +56,18 @@ class ReplicationTest replicateDefault(replicateDefault_) {} // Get the replication level set on an object, or default if not set. - ReplicateLevel getLevel(const std::string& str); - ReplicateLevel getLevel(const framing::FieldTable& f); - ReplicateLevel getLevel(const types::Variant::Map& m); - ReplicateLevel getLevel(const broker::Queue&); - ReplicateLevel getLevel(const broker::Exchange&); + ReplicateLevel getLevel(const std::string& str) const; + ReplicateLevel getLevel(const framing::FieldTable& f) const; + ReplicateLevel getLevel(const types::Variant::Map& m) const; + ReplicateLevel getLevel(const broker::Queue&) const; + ReplicateLevel getLevel(const broker::Exchange&) const; // Calculate level for objects that may not have replication set, // including auto-delete/exclusive settings. - ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive); - ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive); - ReplicateLevel useLevel(const broker::Queue&); - ReplicateLevel useLevel(const broker::Exchange&); + ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const; + ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const; + ReplicateLevel useLevel(const broker::Queue&) const; + ReplicateLevel useLevel(const broker::Exchange&) const; private: ReplicateLevel replicateDefault; diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 748c8ef0c1..0f92f7dbcc 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -196,7 +196,7 @@ acl allow all all def ha_status(self): return self.qmf().status - def wait_status(self, status): + def wait_status(self, status, timeout=5): def try_get_status(): self._status = "<unknown>" # Ignore ConnectionError, the broker may not be up yet. @@ -204,7 +204,7 @@ acl allow all all self._status = self.ha_status() return self._status == status; except ConnectionError: return False - assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%( + assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%( self, status, self._status) def wait_queue(self, queue, timeout=1): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index abc62b643e..f22e12a355 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -885,6 +885,19 @@ acl deny all all old_sess.exchange_declare(exchange='ex1', type='fanout') cluster[1].wait_backup("ex1") + def test_resource_limit_bug(self): + """QPID-5666 Regression test: Incorrect resource limit exception for queue creation.""" + cluster = HaCluster(self, 3) + qs = ["q%s"%i for i in xrange(10)] + s = cluster[0].connect().session() + s.sender("q;{create:always}").close() + cluster.kill(0) + cluster[1].promote() + cluster[1].wait_status("active") + s = cluster[1].connect().session() + s.receiver("q;{delete:always}").close() + s.sender("qq;{create:always}").close() + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |