diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 32 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 7 |
2 files changed, 17 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index a183ce9d02..03036fb825 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -29,6 +29,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "qpid/management/PackageQpid.h" #include "qpid/management/ManagementExchange.h" #include "qpid/management/ArgsBrokerEcho.h" @@ -112,25 +113,13 @@ Broker::Broker(const Broker::Options& conf) : sessionManager(conf.ack), previewSessionManager(conf.ack) { - // Early-Initialize plugins - const Plugin::Plugins& plugins=Plugin::getPlugins(); - for (Plugin::Plugins::const_iterator i = plugins.begin(); - i != plugins.end(); - i++) - (*i)->earlyInitialize(*this); - - // If no plugin store module registered itself, set up the null store. - if (store == 0) - setStore (new NullMessageStore (false)); - - queues.setStore (store); - dtxManager.setStore (store); - if(conf.enableMgmt){ + QPID_LOG(info, "Management enabled"); ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), conf.mgmtPubInterval); managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); + qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (); systemObject = System::shared_ptr (system); @@ -157,6 +146,20 @@ Broker::Broker(const Broker::Options& conf) : exchanges.setParent (vhost); } + // Early-Initialize plugins + const Plugin::Plugins& plugins=Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->earlyInitialize(*this); + + // If no plugin store module registered itself, set up the null store. + if (store == 0) + setStore (new NullMessageStore (false)); + + queues.setStore (store); + dtxManager.setStore (store); + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if (store != 0) { @@ -172,7 +175,6 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index a405971805..24ed6825b4 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -111,7 +111,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -119,7 +118,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ } }else { if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -138,7 +136,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -157,7 +154,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); @@ -348,7 +344,6 @@ void Queue::consume(Consumer&, bool requestExclusive){ consumerCount++; if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_consumers (); } } @@ -359,7 +354,6 @@ void Queue::cancel(Consumer& c){ consumerCount--; if(exclusive) exclusive = false; if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->dec_consumers (); } } @@ -390,7 +384,6 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); mgmtObject->dec_msgDepth (); |
