diff options
| author | Gordon Sim <gsim@apache.org> | 2008-09-21 20:39:40 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-09-21 20:39:40 +0000 |
| commit | 03fa9018260242f08d2164f06875fc708fdbf4c7 (patch) | |
| tree | 47bcba0feec9e95ecb06dfb0379b7abdf41c30f8 /cpp/src/qpid/broker/QueuePolicy.h | |
| parent | 558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (diff) | |
| download | qpid-python-03fa9018260242f08d2164f06875fc708fdbf4c7.tar.gz | |
Refactoring of queue/queue-policy:
- moved some logic out of Queue.cpp into QueuePolicy.cpp
- moved QueuedMessage definition into its own header file
- added checks for requeue and dequeue
- split QueuePolicy logic into different sub classes
Added ability to request old messages to be discareded to make room for new ones when configured limit has been reached.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697603 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.h')
| -rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.h | 101 |
1 files changed, 73 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h index 4511a63b64..d39ce7dc11 100644 --- a/cpp/src/qpid/broker/QueuePolicy.h +++ b/cpp/src/qpid/broker/QueuePolicy.h @@ -21,40 +21,85 @@ #ifndef _QueuePolicy_ #define _QueuePolicy_ +#include <deque> #include <iostream> +#include <memory> +#include "QueuedMessage.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Mutex.h" namespace qpid { - namespace broker { - class QueuePolicy - { - static const std::string maxCountKey; - static const std::string maxSizeKey; - - static uint64_t defaultMaxSize; +namespace broker { + +class QueuePolicy +{ + static uint64_t defaultMaxSize; - const uint32_t maxCount; - const uint64_t maxSize; - uint32_t count; - uint64_t size; + const uint32_t maxCount; + const uint64_t maxSize; + const std::string type; + qpid::sys::AtomicValue<uint32_t> count; + qpid::sys::AtomicValue<uint64_t> size; + bool policyExceeded; - static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); - - public: - QueuePolicy(uint32_t maxCount, uint64_t maxSize); - QueuePolicy(const qpid::framing::FieldTable& settings); - void enqueued(uint64_t size); - void dequeued(uint64_t size); - void update(qpid::framing::FieldTable& settings); - bool limitExceeded(); - uint32_t getMaxCount() const { return maxCount; } - uint64_t getMaxSize() const { return maxSize; } - - static void setDefaultMaxSize(uint64_t); - friend std::ostream& operator<<(std::ostream&, const QueuePolicy&); - }; - } -} + static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); + static std::string getType(const qpid::framing::FieldTable& settings); + + public: + static const std::string maxCountKey; + static const std::string maxSizeKey; + static const std::string typeKey; + static const std::string REJECT; + static const std::string FLOW_TO_DISK; + static const std::string RING; + static const std::string RING_STRICT; + + virtual ~QueuePolicy() {} + void tryEnqueue(const QueuedMessage&); + virtual void dequeued(const QueuedMessage&); + virtual bool isEnqueued(const QueuedMessage&); + virtual bool checkLimit(const QueuedMessage&); + void update(qpid::framing::FieldTable& settings); + uint32_t getMaxCount() const { return maxCount; } + uint64_t getMaxSize() const { return maxSize; } + + static std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); + static std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static void setDefaultMaxSize(uint64_t); + friend std::ostream& operator<<(std::ostream&, const QueuePolicy&); + protected: + QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + + virtual void enqueued(const QueuedMessage&); + void enqueued(uint64_t size); + void dequeued(uint64_t size); +}; + + +class FlowToDiskPolicy : public QueuePolicy +{ + public: + FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize); + bool checkLimit(const QueuedMessage&); +}; + +class RingQueuePolicy : public QueuePolicy +{ + public: + RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + bool isEnqueued(const QueuedMessage&); + bool checkLimit(const QueuedMessage&); + private: + typedef std::deque<QueuedMessage> Messages; + qpid::sys::Mutex lock; + Messages queue; + const bool strict; +}; + +}} #endif |
