summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp68
1 files changed, 38 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index bf64760fc7..d718acff03 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -230,7 +230,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
if (c.filter(msg.payload)) {
if (c.accept(msg.payload)) {
m = msg;
- pop();
+ messages.pop_front();
return true;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -361,13 +361,13 @@ void Queue::cancel(Consumer& c){
mgmtObject->dec_consumerCount ();
}
-QueuedMessage Queue::dequeue(){
+QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if(!messages.empty()){
msg = messages.front();
- pop();
+ messages.pop_front();
}
return msg;
}
@@ -376,35 +376,11 @@ uint32_t Queue::purge(){
Mutex::ScopedLock locker(messageLock);
int count = messages.size();
while(!messages.empty()) {
- QueuedMessage& msg = messages.front();
- if (store && msg.payload->isPersistent()) {
- boost::intrusive_ptr<PersistableMessage> pmsg =
- boost::static_pointer_cast<PersistableMessage>(msg.payload);
- store->dequeue(0, pmsg, *this);
- }
- pop();
+ popAndDequeue();
}
return count;
}
-/**
- * Assumes messageLock is held
- */
-void Queue::pop(){
- QueuedMessage& msg = messages.front();
-
- if (policy.get()) policy->dequeued(msg.payload->contentSize());
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- if (msg.payload->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
- }
- }
- messages.pop_front();
-}
-
void Queue::push(boost::intrusive_ptr<Message>& msg){
Mutex::ScopedLock locker(messageLock);
messages.push_back(QueuedMessage(this, msg, ++sequence));
@@ -421,7 +397,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
} else {
QPID_LOG(error, "Message " << msg << " on " << name
<< " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name));
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
}
} else {
if (policyExceeded) {
@@ -475,6 +451,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ dequeued(msg);
+ }
if (msg->isPersistent() && store) {
msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
@@ -485,6 +465,34 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
return false;
}
+/**
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
+ */
+void Queue::popAndDequeue()
+{
+ boost::intrusive_ptr<Message> msg = messages.front().payload;
+ messages.pop_front();
+ dequeue(0, msg);
+}
+
+/**
+ * Updates policy and management when a message has been dequeued,
+ * expects messageLock to be held
+ */
+void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+{
+ if (policy.get()) policy->dequeued(msg->contentSize());
+ if (mgmtObject != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ if (msg->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ }
+ }
+}
+
namespace
{
@@ -534,7 +542,7 @@ void Queue::destroy()
DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
- pop();
+ popAndDequeue();
}
alternateExchange->decAlternateUsers();
}