summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp32
-rw-r--r--cpp/src/qpid/broker/Queue.cpp7
2 files changed, 17 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index a183ce9d02..03036fb825 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -29,6 +29,7 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
+#include "qpid/management/PackageQpid.h"
#include "qpid/management/ManagementExchange.h"
#include "qpid/management/ArgsBrokerEcho.h"
@@ -112,25 +113,13 @@ Broker::Broker(const Broker::Options& conf) :
sessionManager(conf.ack),
previewSessionManager(conf.ack)
{
- // Early-Initialize plugins
- const Plugin::Plugins& plugins=Plugin::getPlugins();
- for (Plugin::Plugins::const_iterator i = plugins.begin();
- i != plugins.end();
- i++)
- (*i)->earlyInitialize(*this);
-
- // If no plugin store module registered itself, set up the null store.
- if (store == 0)
- setStore (new NullMessageStore (false));
-
- queues.setStore (store);
- dtxManager.setStore (store);
-
if(conf.enableMgmt){
+ QPID_LOG(info, "Management enabled");
ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
conf.mgmtPubInterval);
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
+ qpid::management::PackageQpid packageInitializer (managementAgent);
System* system = new System ();
systemObject = System::shared_ptr (system);
@@ -157,6 +146,20 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.setParent (vhost);
}
+ // Early-Initialize plugins
+ const Plugin::Plugins& plugins=Plugin::getPlugins();
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ i++)
+ (*i)->earlyInitialize(*this);
+
+ // If no plugin store module registered itself, set up the null store.
+ if (store == 0)
+ setStore (new NullMessageStore (false));
+
+ queues.setStore (store);
+ dtxManager.setStore (store);
+
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
if (store != 0) {
@@ -172,7 +175,6 @@ Broker::Broker(const Broker::Options& conf) :
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- QPID_LOG(info, "Management enabled");
exchanges.declare(qpid_management, ManagementExchange::typeName);
Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index a405971805..24ed6825b4 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -111,7 +111,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete();
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -119,7 +118,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
}
}else {
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -138,7 +136,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
@@ -157,7 +154,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
@@ -348,7 +344,6 @@ void Queue::consume(Consumer&, bool requestExclusive){
consumerCount++;
if (mgmtObject.get() != 0){
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_consumers ();
}
}
@@ -359,7 +354,6 @@ void Queue::cancel(Consumer& c){
consumerCount--;
if(exclusive) exclusive = false;
if (mgmtObject.get() != 0){
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->dec_consumers ();
}
}
@@ -390,7 +384,6 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
mgmtObject->dec_msgDepth ();