diff options
| author | Ted Ross <tross@apache.org> | 2008-06-30 19:00:49 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-06-30 19:00:49 +0000 |
| commit | 5a848a6a699d5ab8de93a646a44614378e56871f (patch) | |
| tree | e2a15b5ad2dd1a30e206601dc8f6902ea875f2e7 /cpp/src/qpid/broker/Queue.cpp | |
| parent | 258cccda74ffaa478366bfacda07e61bd88b20ec (diff) | |
| download | qpid-python-5a848a6a699d5ab8de93a646a44614378e56871f.tar.gz | |
QPID-1160 - Per-thread counters in management API to avoid locking
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@672864 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 21 |
1 files changed, 4 insertions, 17 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index becca8dfcf..40f249bc11 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -71,7 +71,7 @@ Queue::Queue(const string& _name, bool _autodelete, if (agent.get () != 0) { mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0)); + (new management::Queue (agent.get(), this, parent, _name, _store != 0, _autodelete, _owner != 0)); // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. @@ -113,6 +113,7 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ + if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -128,19 +129,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ // if no store then mark as enqueued if (!enqueue(0, msg)){ if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); } push(msg); msg->enqueueComplete(); }else { if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } @@ -155,12 +152,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); } if (store && !msg->isContentLoaded()) { @@ -173,12 +168,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); @@ -362,10 +355,8 @@ void Queue::consume(Consumer& c, bool requestExclusive){ } consumerCount++; - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); + if (mgmtObject.get() != 0) mgmtObject->inc_consumerCount (); - } } void Queue::cancel(Consumer& c){ @@ -373,10 +364,8 @@ void Queue::cancel(Consumer& c){ Mutex::ScopedLock locker(consumerLock); consumerCount--; if(exclusive) exclusive = 0; - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); + if (mgmtObject.get() != 0) mgmtObject->dec_consumerCount (); - } } QueuedMessage Queue::dequeue(){ @@ -413,10 +402,8 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - mgmtObject->dec_msgDepth (); if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); |
