diff options
| author | Alan Conway <aconway@apache.org> | 2007-11-23 13:37:42 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-11-23 13:37:42 +0000 |
| commit | a3aaa263858f07d37e5860136300f76fab8d7ecd (patch) | |
| tree | f188402580f36e06113a92c3c74575d13040c1d0 /cpp/src/qpid/broker/Queue.cpp | |
| parent | cb070d9813e4232b4ec8409ca555b529ee5cee4b (diff) | |
| download | qpid-python-a3aaa263858f07d37e5860136300f76fab8d7ecd.tar.gz | |
QPID-689 from tross@redhat.com.
This patch introduces formal schema specification for management and
code generation for management classes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@597662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 69 |
1 files changed, 46 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 41a5767457..376b9367d0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete, if (parent != 0) { mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete)); + (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0)); ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); agent->addObject (mgmtObject); @@ -92,11 +92,21 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ if (!enqueue(0, msg)){ push(msg); msg->enqueueComplete(); - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize ()); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + } }else { - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + } push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -108,8 +118,15 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ void Queue::recover(intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + } + if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -118,15 +135,19 @@ void Queue::recover(intrusive_ptr<Message>& msg){ } void Queue::process(intrusive_ptr<Message>& msg){ - - uint32_t mask = management::MSG_MASK_TX; - - if (msg->isPersistent ()) - mask |= management::MSG_MASK_PERSIST; - push(msg); - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), mask); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgTxnEnqueues (); + mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + if (msg->isPersistent ()) { + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + } + } serializer.execute(dispatchCallback); } @@ -309,7 +330,7 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){ } if (mgmtObject != 0){ - mgmtObject->incConsumers (); + mgmtObject->inc_consumers (); } } @@ -321,7 +342,7 @@ void Queue::cancel(Consumer::ptr c){ cancel(c, browsers); } if (mgmtObject != 0){ - mgmtObject->decConsumers (); + mgmtObject->dec_consumers (); } if(exclusive == c) exclusive.reset(); } @@ -341,12 +362,14 @@ QueuedMessage Queue::dequeue(){ msg = messages.front(); pop(); if (mgmtObject != 0){ - uint32_t mask = 0; - - if (msg.payload->isPersistent ()) - mask |= management::MSG_MASK_PERSIST; - - mgmtObject->dequeue (msg.payload->contentSize (), mask); + mgmtObject->inc_msgTotalDequeues (); + //mgmtObject->inc_byteTotalDequeues (msg->contentSize ()); + mgmtObject->dec_msgDepth (); + //mgmtObject->dec_byteDepth (msg->contentSize ()); + if (0){//msg->isPersistent ()) { + mgmtObject->inc_msgPersistDequeues (); + //mgmtObject->inc_bytePersistDequeues (msg->contentSize ()); + } } } return msg; |
