diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-19 18:57:30 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-19 18:57:30 +0000 |
| commit | b97be677001ec35469d080a98ba88276f2300651 (patch) | |
| tree | 2d947cee25396e96ad1a49f43d9de82e04f0f96a /cpp/src/qpid/broker/Queue.cpp | |
| parent | f25df3a53ca1ef5eec396512fd584823e7f6636d (diff) | |
| download | qpid-python-b97be677001ec35469d080a98ba88276f2300651.tar.gz | |
QPID-651 applied patch from Ted
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@586578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 23 |
1 files changed, 22 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index af248b8fae..456e055c74 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -77,7 +77,11 @@ void Queue::deliver(Message::shared_ptr& msg){ if (!enqueue(0, msg)){ push(msg); msg->enqueueComplete(); + if (mgmtObjectPtr != 0) + mgmtObjectPtr->enqueue (msg->contentSize ()); }else { + if (mgmtObjectPtr != 0) + mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST); push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -89,6 +93,8 @@ void Queue::deliver(Message::shared_ptr& msg){ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued + if (mgmtObjectPtr != 0) + mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -97,8 +103,15 @@ void Queue::recover(Message::shared_ptr& msg){ } void Queue::process(Message::shared_ptr& msg){ - + + uint32_t mask = MSG_MASK_TX; + + if (msg->isPersistent ()) + mask |= MSG_MASK_PERSIST; + push(msg); + if (mgmtObjectPtr != 0) + mgmtObjectPtr->enqueue (msg->contentSize (), mask); serializer.execute(dispatchCallback); } @@ -267,6 +280,14 @@ QueuedMessage Queue::dequeue(){ if(!messages.empty()){ msg = messages.front(); pop(); + if (mgmtObjectPtr != 0){ + uint32_t mask = 0; + + if (msg.payload->isPersistent ()) + mask |= MSG_MASK_PERSIST; + + mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask); + } } return msg; } |
