summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-10-19 18:57:30 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-10-19 18:57:30 +0000
commitb97be677001ec35469d080a98ba88276f2300651 (patch)
tree2d947cee25396e96ad1a49f43d9de82e04f0f96a /cpp/src/qpid/broker/Queue.cpp
parentf25df3a53ca1ef5eec396512fd584823e7f6636d (diff)
downloadqpid-python-b97be677001ec35469d080a98ba88276f2300651.tar.gz
QPID-651 applied patch from Ted
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@586578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp23
1 files changed, 22 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index af248b8fae..456e055c74 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -77,7 +77,11 @@ void Queue::deliver(Message::shared_ptr& msg){
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize ());
}else {
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
@@ -89,6 +93,8 @@ void Queue::deliver(Message::shared_ptr& msg){
void Queue::recover(Message::shared_ptr& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -97,8 +103,15 @@ void Queue::recover(Message::shared_ptr& msg){
}
void Queue::process(Message::shared_ptr& msg){
-
+
+ uint32_t mask = MSG_MASK_TX;
+
+ if (msg->isPersistent ())
+ mask |= MSG_MASK_PERSIST;
+
push(msg);
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), mask);
serializer.execute(dispatchCallback);
}
@@ -267,6 +280,14 @@ QueuedMessage Queue::dequeue(){
if(!messages.empty()){
msg = messages.front();
pop();
+ if (mgmtObjectPtr != 0){
+ uint32_t mask = 0;
+
+ if (msg.payload->isPersistent ())
+ mask |= MSG_MASK_PERSIST;
+
+ mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask);
+ }
}
return msg;
}