diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 68 |
1 files changed, 38 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index bf64760fc7..d718acff03 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -230,7 +230,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) if (c.filter(msg.payload)) { if (c.accept(msg.payload)) { m = msg; - pop(); + messages.pop_front(); return true; } else { //message(s) are available but consumer hasn't got enough credit @@ -361,13 +361,13 @@ void Queue::cancel(Consumer& c){ mgmtObject->dec_consumerCount (); } -QueuedMessage Queue::dequeue(){ +QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); if(!messages.empty()){ msg = messages.front(); - pop(); + messages.pop_front(); } return msg; } @@ -376,35 +376,11 @@ uint32_t Queue::purge(){ Mutex::ScopedLock locker(messageLock); int count = messages.size(); while(!messages.empty()) { - QueuedMessage& msg = messages.front(); - if (store && msg.payload->isPersistent()) { - boost::intrusive_ptr<PersistableMessage> pmsg = - boost::static_pointer_cast<PersistableMessage>(msg.payload); - store->dequeue(0, pmsg, *this); - } - pop(); + popAndDequeue(); } return count; } -/** - * Assumes messageLock is held - */ -void Queue::pop(){ - QueuedMessage& msg = messages.front(); - - if (policy.get()) policy->dequeued(msg.payload->contentSize()); - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - if (msg.payload->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); - } - } - messages.pop_front(); -} - void Queue::push(boost::intrusive_ptr<Message>& msg){ Mutex::ScopedLock locker(messageLock); messages.push_back(QueuedMessage(this, msg, ++sequence)); @@ -421,7 +397,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ } else { QPID_LOG(error, "Message " << msg << " on " << name << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); - throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name)); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); } } else { if (policyExceeded) { @@ -475,6 +451,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { + { + Mutex::ScopedLock locker(messageLock); + dequeued(msg); + } if (msg->isPersistent() && store) { msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); @@ -485,6 +465,34 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) return false; } +/** + * Removes a message from the in-memory delivery queue as well + * dequeing it from the logical (and persistent if applicable) queue + */ +void Queue::popAndDequeue() +{ + boost::intrusive_ptr<Message> msg = messages.front().payload; + messages.pop_front(); + dequeue(0, msg); +} + +/** + * Updates policy and management when a message has been dequeued, + * expects messageLock to be held + */ +void Queue::dequeued(boost::intrusive_ptr<Message>& msg) +{ + if (policy.get()) policy->dequeued(msg->contentSize()); + if (mgmtObject != 0){ + mgmtObject->inc_msgTotalDequeues (); + mgmtObject->inc_byteTotalDequeues (msg->contentSize()); + if (msg->isPersistent ()){ + mgmtObject->inc_msgPersistDequeues (); + mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + } + } +} + namespace { @@ -534,7 +542,7 @@ void Queue::destroy() DeliverableMessage msg(messages.front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); - pop(); + popAndDequeue(); } alternateExchange->decAlternateUsers(); } |