summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp42
1 files changed, 28 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index e9b1db0413..d80c13f12a 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -28,6 +28,7 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
+#include "Link.h"
#include "qpid/management/PackageQpid.h"
#include "qpid/management/ManagementExchange.h"
#include "qpid/management/ArgsBrokerEcho.h"
@@ -60,7 +61,7 @@ using qpid::sys::Dispatcher;
using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
-using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -129,15 +130,16 @@ Broker::Broker(const Broker::Options& conf) :
config(conf),
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+ links(this),
factory(*this),
sessionManager(conf.ack)
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
- ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
- conf.mgmtPubInterval);
- managementAgent = ManagementAgent::getAgent ();
- managementAgent->setInterval (conf.mgmtPubInterval);
+ ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ conf.mgmtPubInterval, this);
+ managementAgent = management::ManagementAgent::getAgent ();
+ ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval);
qpid::management::PackageQpid packageInitializer (managementAgent);
System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
@@ -163,6 +165,7 @@ Broker::Broker(const Broker::Options& conf) :
queues.setParent (vhost);
exchanges.setParent (vhost);
+ links.setParent (vhost);
}
// Early-Initialize plugins
@@ -178,11 +181,12 @@ Broker::Broker(const Broker::Options& conf) :
queues.setStore (store);
dtxManager.setStore (store);
+ links.setStore (store);
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
if (store != 0) {
- RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
conf.stagingThreshold);
store->recover(recoverer);
}
@@ -197,8 +201,9 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(qpid_management, ManagementExchange::typeName);
Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
- managementAgent->setExchange (mExchange, dExchange);
- dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+ ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange);
+ dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent
+ ((ManagementBroker*) managementAgent.get());
}
else
QPID_LOG(info, "Management not enabled");
@@ -285,7 +290,7 @@ void Broker::shutdown() {
Broker::~Broker() {
shutdown();
- ManagementAgent::shutdown ();
+ ManagementBroker::shutdown ();
delete store;
if (config.auth) {
#if HAVE_SASL
@@ -319,7 +324,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case management::Broker::METHOD_CONNECT : {
management::ArgsBrokerConnect& hp=
dynamic_cast<management::ArgsBrokerConnect&>(args);
- connect(hp.i_host, hp.i_port);
+
+ if (hp.i_useSsl)
+ return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED;
+
+ std::pair<Link::shared_ptr, bool> response =
+ links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable);
+ if (hp.i_durable && response.second)
+ store->create(*response.first);
+
status = Manageable::STATUS_OK;
break;
}
@@ -355,10 +368,11 @@ void Broker::accept() {
// TODO: How to chose the protocolFactory to use for the connection
void Broker::connect(
- const std::string& host, uint16_t port,
- sys::ConnectionCodec::Factory* f)
+ const std::string& host, uint16_t port, bool /*useSsl*/,
+ sys::ConnectionCodec::Factory* f,
+ sys::ProtocolAccess* access)
{
- getProtocolFactory()->connect(poller, host, port, f ? f : &factory);
+ getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access);
}
void Broker::connect(
@@ -366,7 +380,7 @@ void Broker::connect(
{
url.throwIfEmpty();
TcpAddress addr=boost::get<TcpAddress>(url[0]);
- connect(addr.host, addr.port, f);
+ connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0);
}
}} // namespace qpid::broker