diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 68 |
1 files changed, 31 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index befc5c4eff..8bbccda844 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -186,6 +186,8 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ + if (policy.get() && !policy->isEnqueued(msg)) return; + Listeners copy; { Mutex::ScopedLock locker(messageLock); @@ -415,29 +417,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ Listeners copy; { Mutex::ScopedLock locker(messageLock); - messages.push_back(QueuedMessage(this, msg, ++sequence)); - if (policy.get()) { - policy->enqueued(msg->contentSize()); - if (policy->limitExceeded()) { - if (!policyExceeded) { - policyExceeded = true; - QPID_LOG(info, "Queue size exceeded policy for " << name); - } - if (store) { - QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory"); - msg->releaseContent(store); - } else { - QPID_LOG(error, "Message " << msg << " on " << name - << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); - throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); - } - } else { - if (policyExceeded) { - policyExceeded = false; - QPID_LOG(info, "Queue size within policy for " << name); - } - } - } + QueuedMessage qm(this, msg, ++sequence); + if (policy.get()) policy->tryEnqueue(qm); + + messages.push_back(qm); listeners.swap(copy); } for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); @@ -486,15 +469,16 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) } // return true if store exists, -bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) +bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { + if (policy.get() && !policy->isEnqueued(msg)) return false; { Mutex::ScopedLock locker(messageLock); dequeued(msg); } - if (msg->isPersistent() && store) { - msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); + if (msg.payload->isPersistent() && store) { + msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); return true; } @@ -508,7 +492,7 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) */ void Queue::popAndDequeue() { - boost::intrusive_ptr<Message> msg = messages.front().payload; + QueuedMessage msg = messages.front(); messages.pop_front(); dequeue(0, msg); } @@ -517,15 +501,15 @@ void Queue::popAndDequeue() * Updates policy and management when a message has been dequeued, * expects messageLock to be held */ -void Queue::dequeued(boost::intrusive_ptr<Message>& msg) +void Queue::dequeued(const QueuedMessage& msg) { - if (policy.get()) policy->dequeued(msg->contentSize()); + if (policy.get()) policy->dequeued(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg->contentSize()); - if (msg->isPersistent ()){ + mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); + if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); } } } @@ -551,10 +535,7 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings) { - std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings)); - if (_policy->getMaxCount() || _policy->getMaxSize()) { - setPolicy(_policy); - } + setPolicy(QueuePolicy::createQueuePolicy(_settings)); //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue with no-local=" << noLocal); @@ -720,6 +701,19 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +bool Queue::releaseMessageContent(const QueuedMessage& m) +{ + if (store) { + QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); + m.payload->releaseContent(store); + return true; + } else { + QPID_LOG(warning, "Message " << m.position << " on " << name + << " cannot be released from memory as the queue is not durable"); + return false; + } +} + ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; |
