diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2008-04-04 18:14:42 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2008-04-04 18:14:42 +0000 |
| commit | a2ea9d432dc5713dadd4c710a982cc466de3ea8b (patch) | |
| tree | d7c955359c88b80b24c4f70146309a806511adda /cpp/src/qpid/broker | |
| parent | 2193d76646028d97b7bfff69335d4239954adbe5 (diff) | |
| download | qpid-python-a2ea9d432dc5713dadd4c710a982cc466de3ea8b.tar.gz | |
Patch from Ted Ross (see QPID-902): This patch contains the following improvements for management:\n1) Schema display cleaned up in the python mgmt-cli\n2) Locking added automatically to management object accessors (manual locking removed from broker/Queue.cpp)\n3) Schemas are now pre-registered with the management agent using a package initializer. This allows management consoles to get schema information for a class even if no instances of the class exist.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@644806 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 32 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 7 |
2 files changed, 17 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index a183ce9d02..03036fb825 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -29,6 +29,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "qpid/management/PackageQpid.h" #include "qpid/management/ManagementExchange.h" #include "qpid/management/ArgsBrokerEcho.h" @@ -112,25 +113,13 @@ Broker::Broker(const Broker::Options& conf) : sessionManager(conf.ack), previewSessionManager(conf.ack) { - // Early-Initialize plugins - const Plugin::Plugins& plugins=Plugin::getPlugins(); - for (Plugin::Plugins::const_iterator i = plugins.begin(); - i != plugins.end(); - i++) - (*i)->earlyInitialize(*this); - - // If no plugin store module registered itself, set up the null store. - if (store == 0) - setStore (new NullMessageStore (false)); - - queues.setStore (store); - dtxManager.setStore (store); - if(conf.enableMgmt){ + QPID_LOG(info, "Management enabled"); ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), conf.mgmtPubInterval); managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); + qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (); systemObject = System::shared_ptr (system); @@ -157,6 +146,20 @@ Broker::Broker(const Broker::Options& conf) : exchanges.setParent (vhost); } + // Early-Initialize plugins + const Plugin::Plugins& plugins=Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->earlyInitialize(*this); + + // If no plugin store module registered itself, set up the null store. + if (store == 0) + setStore (new NullMessageStore (false)); + + queues.setStore (store); + dtxManager.setStore (store); + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if (store != 0) { @@ -172,7 +175,6 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index a405971805..24ed6825b4 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -111,7 +111,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -119,7 +118,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ } }else { if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -138,7 +136,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -157,7 +154,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); @@ -348,7 +344,6 @@ void Queue::consume(Consumer&, bool requestExclusive){ consumerCount++; if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_consumers (); } } @@ -359,7 +354,6 @@ void Queue::cancel(Consumer& c){ consumerCount--; if(exclusive) exclusive = false; if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->dec_consumers (); } } @@ -390,7 +384,6 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); mgmtObject->dec_msgDepth (); |
