summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-09-21 20:39:40 +0000
committerGordon Sim <gsim@apache.org>2008-09-21 20:39:40 +0000
commit03fa9018260242f08d2164f06875fc708fdbf4c7 (patch)
tree47bcba0feec9e95ecb06dfb0379b7abdf41c30f8 /cpp/src/qpid/broker/QueuePolicy.cpp
parent558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (diff)
downloadqpid-python-03fa9018260242f08d2164f06875fc708fdbf4c7.tar.gz
Refactoring of queue/queue-policy:
- moved some logic out of Queue.cpp into QueuePolicy.cpp - moved QueuedMessage definition into its own header file - added checks for requeue and dequeue - split QueuePolicy logic into different sub classes Added ability to request old messages to be discareded to make room for new ones when configured limit has been reached. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697603 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp183
1 files changed, 168 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 08838aac79..8aeaaabd55 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -19,39 +19,78 @@
*
*/
#include "QueuePolicy.h"
+#include "Queue.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) :
- maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
-
-QueuePolicy::QueuePolicy(const FieldTable& settings) :
- maxCount(getInt(settings, maxCountKey, 0)),
- maxSize(getInt(settings, maxSizeKey, defaultMaxSize)), count(0), size(0) {}
+QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
void QueuePolicy::enqueued(uint64_t _size)
{
- if (maxCount) count++;
+ if (maxCount) ++count;
if (maxSize) size += _size;
}
void QueuePolicy::dequeued(uint64_t _size)
{
- if (maxCount) count--;
+ if (maxCount) --count;
if (maxSize) size -= _size;
}
-bool QueuePolicy::limitExceeded()
+bool QueuePolicy::checkLimit(const QueuedMessage& m)
+{
+ bool exceeded = (maxSize && (size.get() + m.payload->contentSize()) > maxSize) || (maxCount && (count.get() + 1) > maxCount);
+ if (exceeded) {
+ if (!policyExceeded) {
+ policyExceeded = true;
+ QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+ }
+ } else {
+ if (policyExceeded) {
+ policyExceeded = false;
+ QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+ }
+ }
+ return !exceeded;
+}
+
+void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+{
+ if (checkLimit(m)) {
+ enqueued(m);
+ } else {
+ std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
+ throw ResourceLimitExceededException(
+ QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
+ << " of size " << m.payload->contentSize() << " , policy: " << *this));
+ }
+}
+
+void QueuePolicy::enqueued(const QueuedMessage& m)
+{
+ enqueued(m.payload->contentSize());
+}
+
+void QueuePolicy::dequeued(const QueuedMessage& m)
+{
+ dequeued(m.payload->contentSize());
+}
+
+bool QueuePolicy::isEnqueued(const QueuedMessage&)
{
- return (maxSize && size > maxSize) || (maxCount && count > maxCount);
+ return true;
}
void QueuePolicy::update(FieldTable& settings)
{
if (maxCount) settings.setInt(maxCountKey, maxCount);
- if (maxSize) settings.setInt(maxSizeKey, maxSize);
+ if (maxSize) settings.setInt(maxSizeKey, maxSize);
+ settings.setString(typeKey, type);
}
@@ -62,6 +101,17 @@ int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int
else return defaultValue;
}
+std::string QueuePolicy::getType(const FieldTable& settings)
+{
+ FieldTable::ValuePtr v = settings.get(typeKey);
+ if (v && v->convertsTo<std::string>()) {
+ std::string t = v->get<std::string>();
+ transform(t.begin(), t.end(), t.begin(), tolower);
+ if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
+ }
+ return REJECT;
+}
+
void QueuePolicy::setDefaultMaxSize(uint64_t s)
{
defaultMaxSize = s;
@@ -69,20 +119,123 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s)
const std::string QueuePolicy::maxCountKey("qpid.max_count");
const std::string QueuePolicy::maxSizeKey("qpid.max_size");
+const std::string QueuePolicy::typeKey("qpid.policy_type");
+const std::string QueuePolicy::REJECT("reject");
+const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk");
+const std::string QueuePolicy::RING("ring");
+const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
+FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
+ QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+
+bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+{
+ return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+}
+
+RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+
+void RingQueuePolicy::enqueued(const QueuedMessage& m)
+{
+ QueuePolicy::enqueued(m);
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queue.push_back(m);
+}
+
+void RingQueuePolicy::dequeued(const QueuedMessage& m)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ QueuePolicy::dequeued(m);
+ //find and remove m from queue
+ for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
+ if (i->position == m.position) {
+ queue.erase(i);
+ break;
+ }
+ }
+}
+
+bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ //for non-strict ring policy, a message can be dequeued before acked; need to detect this
+ for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
+ if (i->position == m.position) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+{
+ if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
+
+ QueuedMessage oldest;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ oldest = queue.front();
+ }
+ if (oldest.queue->acquire(oldest) || !strict) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (oldest.position == queue.front().position) {
+ queue.pop_front();
+ QPID_LOG(debug, "Ring policy triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ << ": removed message " << oldest.position << " to make way for " << m.position);
+ }
+ return true;
+ } else {
+ QPID_LOG(debug, "Ring policy could not be triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
+ //in strict mode, if oldest message has been delivered (hence
+ //cannot be acquired) but not yet acked, it should not be
+ //removed and the attempted enqueue should fail
+ return false;
+ }
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
+{
+ uint32_t maxCount = getInt(settings, maxCountKey, 0);
+ uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
+ if (maxCount || maxSize) {
+ return createQueuePolicy(maxCount, maxSize, getType(settings));
+ } else {
+ return std::auto_ptr<QueuePolicy>();
+ }
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+ if (type == RING || type == RING_STRICT) {
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+ } else if (type == FLOW_TO_DISK) {
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+ } else {
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+ }
+
+}
+
+
namespace qpid {
namespace broker {
std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
{
- if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
- else out << "size unlimited, current=" << p.size;
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+ else out << "size: unlimited";
out << "; ";
- if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
- else out << "count unlimited, current=" << p.count;
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+ else out << "count: unlimited";
+ out << "; type=" << p.type;
return out;
}
}
}
+