diff options
| author | Ted Ross <tross@apache.org> | 2008-06-30 19:00:49 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-06-30 19:00:49 +0000 |
| commit | 5a848a6a699d5ab8de93a646a44614378e56871f (patch) | |
| tree | e2a15b5ad2dd1a30e206601dc8f6902ea875f2e7 /cpp/src/qpid/broker | |
| parent | 258cccda74ffaa478366bfacda07e61bd88b20ec (diff) | |
| download | qpid-python-5a848a6a699d5ab8de93a646a44614378e56871f.tar.gz | |
QPID-1160 - Per-thread counters in management API to avoid locking
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@672864 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/System.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Vhost.cpp | 2 |
9 files changed, 30 insertions, 35 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 18b2c52dad..9274de0555 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -38,13 +38,17 @@ namespace broker { Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), - mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest, - args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes)), listener(l), name(Uuid(true).str()), persistenceId(0) { - if (!args.i_durable) - management::ManagementAgent::getAgent()->addObject(mgmtObject); + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); + if (agent.get() != 0) { + mgmtObject = management::Bridge::shared_ptr + (new management::Bridge(agent.get(), this, link, id, args.i_durable, args.i_src, args.i_dest, + args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, + args.i_tag, args.i_excludes)); + if (!args.i_durable) + agent->addObject(mgmtObject); + } } Bridge::~Bridge() diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index a3dd93899a..0b7886b3ba 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -135,7 +135,7 @@ Broker::Broker(const Broker::Options& conf) : if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), - conf.mgmtPubInterval, this); + conf.mgmtPubInterval, this, conf.workerThreads + 3); managementAgent = management::ManagementAgent::getAgent (); ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); @@ -143,7 +143,7 @@ Broker::Broker(const Broker::Options& conf) : System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); systemObject = System::shared_ptr (system); - mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port)); + mgmtObject = management::Broker::shared_ptr (new management::Broker (managementAgent.get(), this, system, conf.port)); mgmtObject->set_workerThreads (conf.workerThreads); mgmtObject->set_maxConns (conf.maxConnections); mgmtObject->set_connBacklog (conf.connectionBacklog); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index b6f6b9cee9..9e763f6775 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -65,7 +65,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); if (agent.get() != 0) - mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink)); + mgmtObject = management::Connection::shared_ptr + (new management::Connection(agent.get(), this, parent, mgmtId, !isLink)); agent->addObject(mgmtObject); } } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 30a93e338c..c72b148338 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name, durable)); + (new management::Exchange (agent.get(), this, parent, _name, durable)); agent->addObject (mgmtExchange); } } @@ -56,7 +56,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name, durable)); + (new management::Exchange (agent.get(), this, parent, _name, durable)); if (!durable) { if (name == "") agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID @@ -134,7 +134,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang { uint64_t queueId = mo->getObjectId(); mgmtBinding = management::Binding::shared_ptr - (new management::Binding (this, (Manageable*) parent, queueId, key, args)); + (new management::Binding (agent.get(), this, (Manageable*) parent, queueId, key, args)); agent->addObject (mgmtBinding); } } diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 630ce68150..87c0020dcb 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -63,7 +63,7 @@ Link::Link(LinkRegistry* _links, if (agent.get() != 0) { mgmtObject = management::Link::shared_ptr - (new management::Link(this, parent, _host, _port, _useSsl, _durable)); + (new management::Link(agent.get(), this, parent, _host, _port, _useSsl, _durable)); if (!durable) agent->addObject(mgmtObject); } @@ -109,7 +109,8 @@ void Link::startConnectionLH () boost::bind (&Link::closed, this, _1, _2)); } catch(std::exception& e) { setStateLH(STATE_WAITING); - mgmtObject->set_lastError (e.what()); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError (e.what()); } } @@ -141,7 +142,8 @@ void Link::closed (int, std::string text) if (state != STATE_FAILED) { setStateLH(STATE_WAITING); - mgmtObject->set_lastError (text); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError (text); } if (closing) @@ -257,7 +259,8 @@ void Link::notifyConnectionForced(const string text) Mutex::ScopedLock mutex(lock); setStateLH(STATE_FAILED); - mgmtObject->set_lastError(text); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError(text); } void Link::setPersistenceId(uint64_t id) const diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index becca8dfcf..40f249bc11 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -71,7 +71,7 @@ Queue::Queue(const string& _name, bool _autodelete, if (agent.get () != 0) { mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0)); + (new management::Queue (agent.get(), this, parent, _name, _store != 0, _autodelete, _owner != 0)); // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. @@ -113,6 +113,7 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ + if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -128,19 +129,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ // if no store then mark as enqueued if (!enqueue(0, msg)){ if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); } push(msg); msg->enqueueComplete(); }else { if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } @@ -155,12 +152,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); } if (store && !msg->isContentLoaded()) { @@ -173,12 +168,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); @@ -362,10 +355,8 @@ void Queue::consume(Consumer& c, bool requestExclusive){ } consumerCount++; - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); + if (mgmtObject.get() != 0) mgmtObject->inc_consumerCount (); - } } void Queue::cancel(Consumer& c){ @@ -373,10 +364,8 @@ void Queue::cancel(Consumer& c){ Mutex::ScopedLock locker(consumerLock); consumerCount--; if(exclusive) exclusive = 0; - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); + if (mgmtObject.get() != 0) mgmtObject->dec_consumerCount (); - } } QueuedMessage Queue::dequeue(){ @@ -413,10 +402,8 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - mgmtObject->dec_msgDepth (); if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index d7089424a5..95145e5d0e 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -64,7 +64,7 @@ SessionState::SessionState( ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); if (agent.get () != 0) { mgmtObject = management::Session::shared_ptr - (new management::Session (this, parent, getId().getName())); + (new management::Session (agent.get(), this, parent, getId().getName())); mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); agent->addObject (mgmtObject); diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp index da886710ac..107942fab5 100644 --- a/cpp/src/qpid/broker/System.cpp +++ b/cpp/src/qpid/broker/System.cpp @@ -63,7 +63,7 @@ System::System (string _dataDir) } mgmtObject = management::System::shared_ptr - (new management::System (this, systemId)); + (new management::System (agent.get(), this, systemId)); struct utsname _uname; if (uname (&_uname) == 0) { diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp index a809679d57..cfe497c788 100644 --- a/cpp/src/qpid/broker/Vhost.cpp +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -32,7 +32,7 @@ Vhost::Vhost (management::Manageable* parentBroker) if (agent.get () != 0) { mgmtObject = management::Vhost::shared_ptr - (new management::Vhost (this, parentBroker, "/")); + (new management::Vhost (agent.get(), this, parentBroker, "/")); agent->addObject (mgmtObject, 3, 1); } } |
