summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp63
1 files changed, 41 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 68590d3331..d61100d255 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -28,7 +28,8 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
-#include "management/ManagementExchange.h"
+#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ArgsBrokerEcho.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
@@ -50,6 +51,11 @@ using qpid::sys::Acceptor;
using qpid::framing::HandlerUpdater;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::management::ArgsBrokerEcho;
namespace qpid {
namespace broker {
@@ -89,7 +95,7 @@ Broker::Options::Options(const std::string& name) :
("store-async", optValue(storeAsync,"yes|no"),
"Use async persistence storage - if store supports it, enable AIO 0-DIRECT.")
("store-force", optValue(storeForce,"yes|no"),
- "Force changing modes of store, will delete all existing data if mode is change. Be SHURE you want to do this")
+ "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this")
("mgmt,m", optValue(enableMgmt,"yes|no"),
"Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
@@ -115,33 +121,23 @@ Broker::Broker(const Broker::Options& conf) :
sessionManager(conf.ack)
{
if(conf.enableMgmt){
- managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
+ managementAgent = ManagementAgent::getAgent ();
+ managementAgent->setInterval (conf.mgmtPubInterval);
- mgmtObject = ManagementObjectBroker::shared_ptr (new ManagementObjectBroker (conf));
- managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtObject));
+ mgmtObject = management::Broker::shared_ptr (new management::Broker (this, conf));
+ managementAgent->addObject (mgmtObject);
- // Since there is currently no support for virtual hosts, a management object
- // representing the implied single virtual host is added here.
- mgmtVhostObject = ManagementObjectVhost::shared_ptr
- (new ManagementObjectVhost (mgmtObject->getObjectId (), conf));
- managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject));
+ // Since there is currently no support for virtual hosts, a placeholder object
+ // representing the implied single virtual host is added here to keep the
+ // management schema correct.
+ Vhost* vhost = new Vhost (this);
+ vhostObject = Vhost::shared_ptr (vhost);
- queues.setManagementAgent (managementAgent);
- queues.setManagementVhost (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject));
+ queues.setParent (vhost);
}
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- if(conf.enableMgmt) {
- QPID_LOG(info, "Management enabled");
- exchanges.declare(qpid_management, ManagementExchange::typeName);
- Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
- managementAgent->setExchange (mExchange);
- dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
- }
- else
- QPID_LOG(info, "Management not enabled");
-
if(store.get()) {
if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){
throw Exception( "Existing Journal in different mode, backup/move existing data \
@@ -158,6 +154,17 @@ Broker::Broker(const Broker::Options& conf) :
declareStandardExchange(amq_fanout, FanOutExchange::typeName);
declareStandardExchange(amq_match, HeadersExchange::typeName);
+ if(conf.enableMgmt) {
+ QPID_LOG(info, "Management enabled");
+ exchanges.declare(qpid_management, ManagementExchange::typeName);
+ Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
+ Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
+ managementAgent->setExchange (mExchange, dExchange);
+ dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+ }
+ else
+ QPID_LOG(info, "Management not enabled");
+
// Initialize plugins
const Plugin::Plugins& plugins=Plugin::getPlugins();
for (Plugin::Plugins::const_iterator i = plugins.begin();
@@ -236,5 +243,17 @@ void Broker::update(ChannelId channel, FrameHandler::Chains& chains) {
channel, boost::ref(chains)));
}
+ManagementObject::shared_ptr Broker::GetManagementObject(void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Broker::ManagementMethod (uint32_t /*methodId*/,
+ Args& /*_args*/)
+{
+ QPID_LOG (debug, "Broker::ManagementMethod");
+ return Manageable::STATUS_OK;
+}
+
}} // namespace qpid::broker