diff options
| author | Gordon Sim <gsim@apache.org> | 2006-12-05 13:14:38 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2006-12-05 13:14:38 +0000 |
| commit | 0c3d1cba3cbb8c5656bb2fbc8de393a5801fabdb (patch) | |
| tree | 05cdf5b65d52e796e4ce5570aa07129bf69b2ca0 /qpid/cpp/lib/broker/BrokerQueue.cpp | |
| parent | 25e43ff200982d174632fda44a7f20240e758250 (diff) | |
| download | qpid-python-0c3d1cba3cbb8c5656bb2fbc8de393a5801fabdb.tar.gz | |
Added queue policy class for controlling when message content should be released from memory.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@482639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/lib/broker/BrokerQueue.cpp')
| -rw-r--r-- | qpid/cpp/lib/broker/BrokerQueue.cpp | 51 |
1 files changed, 43 insertions, 8 deletions
diff --git a/qpid/cpp/lib/broker/BrokerQueue.cpp b/qpid/cpp/lib/broker/BrokerQueue.cpp index 4eabfdec50..26857b6d31 100644 --- a/qpid/cpp/lib/broker/BrokerQueue.cpp +++ b/qpid/cpp/lib/broker/BrokerQueue.cpp @@ -26,6 +26,7 @@ using namespace qpid::broker; using namespace qpid::sys; +using namespace qpid::framing; Queue::Queue(const string& _name, u_int32_t _autodelete, MessageStore* const _store, @@ -62,8 +63,7 @@ void Queue::deliver(Message::shared_ptr& msg){ } void Queue::recover(Message::shared_ptr& msg){ - queueing = true; - messages.push(msg); + push(msg); if (store && msg->expectedContentSize() != msg->encodedContentSize()) { msg->releaseContent(store); } @@ -72,8 +72,7 @@ void Queue::recover(Message::shared_ptr& msg){ void Queue::process(Message::shared_ptr& msg){ Mutex::ScopedLock locker(lock); if(queueing || !dispatch(msg)){ - queueing = true; - messages.push(msg); + push(msg); } } @@ -116,7 +115,7 @@ void Queue::dispatch(){ while(proceed){ Mutex::ScopedLock locker(lock); if(!messages.empty() && dispatch(messages.front())){ - messages.pop(); + pop(); }else{ dispatching = false; proceed = false; @@ -149,7 +148,7 @@ Message::shared_ptr Queue::dequeue(){ Message::shared_ptr msg; if(!messages.empty()){ msg = messages.front(); - messages.pop(); + pop(); } return msg; } @@ -157,10 +156,19 @@ Message::shared_ptr Queue::dequeue(){ u_int32_t Queue::purge(){ Mutex::ScopedLock locker(lock); int count = messages.size(); - while(!messages.empty()) messages.pop(); + while(!messages.empty()) pop(); return count; } +void Queue::pop(){ + messages.pop(); +} + +void Queue::push(Message::shared_ptr& msg){ + queueing = true; + messages.push(msg); +} + u_int32_t Queue::getMessageCount() const{ Mutex::ScopedLock locker(lock); return messages.size(); @@ -190,8 +198,30 @@ void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const st } } -void Queue::create() +namespace { + const std::string qpidMaxSize("qpid.max_size"); + const std::string qpidMaxCount("qpid.max_count"); +} + +void Queue::create(const FieldTable& settings) +{ + //Note: currently field table only contain signed 32 bit ints, which + // restricts the values that can be set on the queue policy. + u_int32_t maxCount(0); + try { + maxCount = settings.getInt(qpidMaxSize); + } catch (FieldNotFoundException& ignore) { + } + u_int32_t maxSize(0); + try { + maxSize = settings.getInt(qpidMaxCount); + } catch (FieldNotFoundException& ignore) { + } + if (maxCount || maxSize) { + setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize))); + } + if (store) { store->create(*this); } @@ -203,3 +233,8 @@ void Queue::destroy() store->destroy(*this); } } + +void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) +{ + policy = _policy; +} |
