diff options
| author | Gordon Sim <gsim@apache.org> | 2008-09-21 20:39:40 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-09-21 20:39:40 +0000 |
| commit | 03fa9018260242f08d2164f06875fc708fdbf4c7 (patch) | |
| tree | 47bcba0feec9e95ecb06dfb0379b7abdf41c30f8 /cpp/src/qpid/broker/QueuePolicy.cpp | |
| parent | 558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (diff) | |
| download | qpid-python-03fa9018260242f08d2164f06875fc708fdbf4c7.tar.gz | |
Refactoring of queue/queue-policy:
- moved some logic out of Queue.cpp into QueuePolicy.cpp
- moved QueuedMessage definition into its own header file
- added checks for requeue and dequeue
- split QueuePolicy logic into different sub classes
Added ability to request old messages to be discareded to make room for new ones when configured limit has been reached.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697603 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 183 |
1 files changed, 168 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 08838aac79..8aeaaabd55 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -19,39 +19,78 @@ * */ #include "QueuePolicy.h" +#include "Queue.h" #include "qpid/framing/FieldValue.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) : - maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {} - -QueuePolicy::QueuePolicy(const FieldTable& settings) : - maxCount(getInt(settings, maxCountKey, 0)), - maxSize(getInt(settings, maxSizeKey, defaultMaxSize)), count(0), size(0) {} +QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {} void QueuePolicy::enqueued(uint64_t _size) { - if (maxCount) count++; + if (maxCount) ++count; if (maxSize) size += _size; } void QueuePolicy::dequeued(uint64_t _size) { - if (maxCount) count--; + if (maxCount) --count; if (maxSize) size -= _size; } -bool QueuePolicy::limitExceeded() +bool QueuePolicy::checkLimit(const QueuedMessage& m) +{ + bool exceeded = (maxSize && (size.get() + m.payload->contentSize()) > maxSize) || (maxCount && (count.get() + 1) > maxCount); + if (exceeded) { + if (!policyExceeded) { + policyExceeded = true; + QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName()); + } + } else { + if (policyExceeded) { + policyExceeded = false; + QPID_LOG(info, "Queue size within policy for " << m.queue->getName()); + } + } + return !exceeded; +} + +void QueuePolicy::tryEnqueue(const QueuedMessage& m) +{ + if (checkLimit(m)) { + enqueued(m); + } else { + std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue"); + throw ResourceLimitExceededException( + QPID_MSG("Policy exceeded on " << queue << " by message " << m.position + << " of size " << m.payload->contentSize() << " , policy: " << *this)); + } +} + +void QueuePolicy::enqueued(const QueuedMessage& m) +{ + enqueued(m.payload->contentSize()); +} + +void QueuePolicy::dequeued(const QueuedMessage& m) +{ + dequeued(m.payload->contentSize()); +} + +bool QueuePolicy::isEnqueued(const QueuedMessage&) { - return (maxSize && size > maxSize) || (maxCount && count > maxCount); + return true; } void QueuePolicy::update(FieldTable& settings) { if (maxCount) settings.setInt(maxCountKey, maxCount); - if (maxSize) settings.setInt(maxSizeKey, maxSize); + if (maxSize) settings.setInt(maxSizeKey, maxSize); + settings.setString(typeKey, type); } @@ -62,6 +101,17 @@ int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int else return defaultValue; } +std::string QueuePolicy::getType(const FieldTable& settings) +{ + FieldTable::ValuePtr v = settings.get(typeKey); + if (v && v->convertsTo<std::string>()) { + std::string t = v->get<std::string>(); + transform(t.begin(), t.end(), t.begin(), tolower); + if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; + } + return REJECT; +} + void QueuePolicy::setDefaultMaxSize(uint64_t s) { defaultMaxSize = s; @@ -69,20 +119,123 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s) const std::string QueuePolicy::maxCountKey("qpid.max_count"); const std::string QueuePolicy::maxSizeKey("qpid.max_size"); +const std::string QueuePolicy::typeKey("qpid.policy_type"); +const std::string QueuePolicy::REJECT("reject"); +const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk"); +const std::string QueuePolicy::RING("ring"); +const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); +FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : + QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {} + +bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) +{ + return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m); +} + +RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} + +void RingQueuePolicy::enqueued(const QueuedMessage& m) +{ + QueuePolicy::enqueued(m); + qpid::sys::Mutex::ScopedLock l(lock); + queue.push_back(m); +} + +void RingQueuePolicy::dequeued(const QueuedMessage& m) +{ + qpid::sys::Mutex::ScopedLock l(lock); + QueuePolicy::dequeued(m); + //find and remove m from queue + for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { + if (i->position == m.position) { + queue.erase(i); + break; + } + } +} + +bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) +{ + qpid::sys::Mutex::ScopedLock l(lock); + //for non-strict ring policy, a message can be dequeued before acked; need to detect this + for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { + if (i->position == m.position) { + return true; + } + } + return false; +} + +bool RingQueuePolicy::checkLimit(const QueuedMessage& m) +{ + if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept + + QueuedMessage oldest; + { + qpid::sys::Mutex::ScopedLock l(lock); + oldest = queue.front(); + } + if (oldest.queue->acquire(oldest) || !strict) { + qpid::sys::Mutex::ScopedLock l(lock); + if (oldest.position == queue.front().position) { + queue.pop_front(); + QPID_LOG(debug, "Ring policy triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) + << ": removed message " << oldest.position << " to make way for " << m.position); + } + return true; + } else { + QPID_LOG(debug, "Ring policy could not be triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) + << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); + //in strict mode, if oldest message has been delivered (hence + //cannot be acquired) but not yet acked, it should not be + //removed and the attempted enqueue should fail + return false; + } +} + +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings) +{ + uint32_t maxCount = getInt(settings, maxCountKey, 0); + uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize); + if (maxCount || maxSize) { + return createQueuePolicy(maxCount, maxSize, getType(settings)); + } else { + return std::auto_ptr<QueuePolicy>(); + } +} + +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +{ + if (type == RING || type == RING_STRICT) { + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type)); + } else if (type == FLOW_TO_DISK) { + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize)); + } else { + return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type)); + } + +} + + namespace qpid { namespace broker { std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) { - if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size; - else out << "size unlimited, current=" << p.size; + if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get(); + else out << "size: unlimited"; out << "; "; - if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; - else out << "count unlimited, current=" << p.count; + if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get(); + else out << "count: unlimited"; + out << "; type=" << p.type; return out; } } } + |
