diff options
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 42 |
1 files changed, 28 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e9b1db0413..d80c13f12a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,6 +28,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "Link.h" #include "qpid/management/PackageQpid.h" #include "qpid/management/ManagementExchange.h" #include "qpid/management/ArgsBrokerEcho.h" @@ -60,7 +61,7 @@ using qpid::sys::Dispatcher; using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; -using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -129,15 +130,16 @@ Broker::Broker(const Broker::Options& conf) : config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), + links(this), factory(*this), sessionManager(conf.ack) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); - ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), - conf.mgmtPubInterval); - managementAgent = ManagementAgent::getAgent (); - managementAgent->setInterval (conf.mgmtPubInterval); + ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), + conf.mgmtPubInterval, this); + managementAgent = management::ManagementAgent::getAgent (); + ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); @@ -163,6 +165,7 @@ Broker::Broker(const Broker::Options& conf) : queues.setParent (vhost); exchanges.setParent (vhost); + links.setParent (vhost); } // Early-Initialize plugins @@ -178,11 +181,12 @@ Broker::Broker(const Broker::Options& conf) : queues.setStore (store); dtxManager.setStore (store); + links.setStore (store); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if (store != 0) { - RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, conf.stagingThreshold); store->recover(recoverer); } @@ -197,8 +201,9 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); - managementAgent->setExchange (mExchange, dExchange); - dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); + ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent + ((ManagementBroker*) managementAgent.get()); } else QPID_LOG(info, "Management not enabled"); @@ -285,7 +290,7 @@ void Broker::shutdown() { Broker::~Broker() { shutdown(); - ManagementAgent::shutdown (); + ManagementBroker::shutdown (); delete store; if (config.auth) { #if HAVE_SASL @@ -319,7 +324,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case management::Broker::METHOD_CONNECT : { management::ArgsBrokerConnect& hp= dynamic_cast<management::ArgsBrokerConnect&>(args); - connect(hp.i_host, hp.i_port); + + if (hp.i_useSsl) + return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED; + + std::pair<Link::shared_ptr, bool> response = + links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable); + if (hp.i_durable && response.second) + store->create(*response.first); + status = Manageable::STATUS_OK; break; } @@ -355,10 +368,11 @@ void Broker::accept() { // TODO: How to chose the protocolFactory to use for the connection void Broker::connect( - const std::string& host, uint16_t port, - sys::ConnectionCodec::Factory* f) + const std::string& host, uint16_t port, bool /*useSsl*/, + sys::ConnectionCodec::Factory* f, + sys::ProtocolAccess* access) { - getProtocolFactory()->connect(poller, host, port, f ? f : &factory); + getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access); } void Broker::connect( @@ -366,7 +380,7 @@ void Broker::connect( { url.throwIfEmpty(); TcpAddress addr=boost::get<TcpAddress>(url[0]); - connect(addr.host, addr.port, f); + connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0); } }} // namespace qpid::broker |
