summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp23
1 files changed, 22 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index af248b8fae..456e055c74 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -77,7 +77,11 @@ void Queue::deliver(Message::shared_ptr& msg){
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize ());
}else {
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
@@ -89,6 +93,8 @@ void Queue::deliver(Message::shared_ptr& msg){
void Queue::recover(Message::shared_ptr& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -97,8 +103,15 @@ void Queue::recover(Message::shared_ptr& msg){
}
void Queue::process(Message::shared_ptr& msg){
-
+
+ uint32_t mask = MSG_MASK_TX;
+
+ if (msg->isPersistent ())
+ mask |= MSG_MASK_PERSIST;
+
push(msg);
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), mask);
serializer.execute(dispatchCallback);
}
@@ -267,6 +280,14 @@ QueuedMessage Queue::dequeue(){
if(!messages.empty()){
msg = messages.front();
pop();
+ if (mgmtObjectPtr != 0){
+ uint32_t mask = 0;
+
+ if (msg.payload->isPersistent ())
+ mask |= MSG_MASK_PERSIST;
+
+ mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask);
+ }
}
return msg;
}