summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-06-30 19:00:49 +0000
committerTed Ross <tross@apache.org>2008-06-30 19:00:49 +0000
commit5a848a6a699d5ab8de93a646a44614378e56871f (patch)
treee2a15b5ad2dd1a30e206601dc8f6902ea875f2e7 /cpp/src/qpid/broker/Queue.cpp
parent258cccda74ffaa478366bfacda07e61bd88b20ec (diff)
downloadqpid-python-5a848a6a699d5ab8de93a646a44614378e56871f.tar.gz
QPID-1160 - Per-thread counters in management API to avoid locking
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@672864 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp21
1 files changed, 4 insertions, 17 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index becca8dfcf..40f249bc11 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -71,7 +71,7 @@ Queue::Queue(const string& _name, bool _autodelete,
if (agent.get () != 0)
{
mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0));
+ (new management::Queue (agent.get(), this, parent, _name, _store != 0, _autodelete, _owner != 0));
// Add the object to the management agent only if this queue is not durable.
// If it's durable, we will add it later when the queue is assigned a persistenceId.
@@ -113,6 +113,7 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
+
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -128,19 +129,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
// if no store then mark as enqueued
if (!enqueue(0, msg)){
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
}
push(msg);
msg->enqueueComplete();
}else {
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
@@ -155,12 +152,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
}
if (store && !msg->isContentLoaded()) {
@@ -173,12 +168,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
if (msg->isPersistent ()) {
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
@@ -362,10 +355,8 @@ void Queue::consume(Consumer& c, bool requestExclusive){
}
consumerCount++;
- if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
+ if (mgmtObject.get() != 0)
mgmtObject->inc_consumerCount ();
- }
}
void Queue::cancel(Consumer& c){
@@ -373,10 +364,8 @@ void Queue::cancel(Consumer& c){
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
if(exclusive) exclusive = 0;
- if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
+ if (mgmtObject.get() != 0)
mgmtObject->dec_consumerCount ();
- }
}
QueuedMessage Queue::dequeue(){
@@ -413,10 +402,8 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- mgmtObject->dec_msgDepth ();
if (msg.payload->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());