diff options
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 63 |
1 files changed, 41 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 68590d3331..d61100d255 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,7 +28,8 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" -#include "management/ManagementExchange.h" +#include "qpid/management/ManagementExchange.h" +#include "qpid/management/ArgsBrokerEcho.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" @@ -50,6 +51,11 @@ using qpid::sys::Acceptor; using qpid::framing::HandlerUpdater; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::management::ArgsBrokerEcho; namespace qpid { namespace broker { @@ -89,7 +95,7 @@ Broker::Options::Options(const std::string& name) : ("store-async", optValue(storeAsync,"yes|no"), "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.") ("store-force", optValue(storeForce,"yes|no"), - "Force changing modes of store, will delete all existing data if mode is change. Be SHURE you want to do this") + "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this") ("mgmt,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), @@ -115,33 +121,23 @@ Broker::Broker(const Broker::Options& conf) : sessionManager(conf.ack) { if(conf.enableMgmt){ - managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); + managementAgent = ManagementAgent::getAgent (); + managementAgent->setInterval (conf.mgmtPubInterval); - mgmtObject = ManagementObjectBroker::shared_ptr (new ManagementObjectBroker (conf)); - managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtObject)); + mgmtObject = management::Broker::shared_ptr (new management::Broker (this, conf)); + managementAgent->addObject (mgmtObject); - // Since there is currently no support for virtual hosts, a management object - // representing the implied single virtual host is added here. - mgmtVhostObject = ManagementObjectVhost::shared_ptr - (new ManagementObjectVhost (mgmtObject->getObjectId (), conf)); - managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject)); + // Since there is currently no support for virtual hosts, a placeholder object + // representing the implied single virtual host is added here to keep the + // management schema correct. + Vhost* vhost = new Vhost (this); + vhostObject = Vhost::shared_ptr (vhost); - queues.setManagementAgent (managementAgent); - queues.setManagementVhost (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject)); + queues.setParent (vhost); } exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - if(conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); - exchanges.declare(qpid_management, ManagementExchange::typeName); - Exchange::shared_ptr mExchange = exchanges.get (qpid_management); - managementAgent->setExchange (mExchange); - dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); - } - else - QPID_LOG(info, "Management not enabled"); - if(store.get()) { if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){ throw Exception( "Existing Journal in different mode, backup/move existing data \ @@ -158,6 +154,17 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_fanout, FanOutExchange::typeName); declareStandardExchange(amq_match, HeadersExchange::typeName); + if(conf.enableMgmt) { + QPID_LOG(info, "Management enabled"); + exchanges.declare(qpid_management, ManagementExchange::typeName); + Exchange::shared_ptr mExchange = exchanges.get (qpid_management); + Exchange::shared_ptr dExchange = exchanges.get (amq_direct); + managementAgent->setExchange (mExchange, dExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); + } + else + QPID_LOG(info, "Management not enabled"); + // Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); for (Plugin::Plugins::const_iterator i = plugins.begin(); @@ -236,5 +243,17 @@ void Broker::update(ChannelId channel, FrameHandler::Chains& chains) { channel, boost::ref(chains))); } +ManagementObject::shared_ptr Broker::GetManagementObject(void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Broker::ManagementMethod (uint32_t /*methodId*/, + Args& /*_args*/) +{ + QPID_LOG (debug, "Broker::ManagementMethod"); + return Manageable::STATUS_OK; +} + }} // namespace qpid::broker |
