diff options
| author | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
| commit | 0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch) | |
| tree | d478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp/src/qpid/broker/Broker.cpp | |
| parent | 4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff) | |
| download | qpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz | |
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections)
2) Improved handling of federation links:
a) Links can be created even if the remote broker is not reachable
b) If links are lost, re-establishment will occur using an exponential back-off algorithm
3) Durability of exchanges is now viewable through management
4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins.
5) General configuration storage capability has been added to the store/recover interface. This is used for federation links.
6) Management object-ids for durable objects are now themselves durable.
(Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 42 |
1 files changed, 28 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e9b1db0413..d80c13f12a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,6 +28,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "Link.h" #include "qpid/management/PackageQpid.h" #include "qpid/management/ManagementExchange.h" #include "qpid/management/ArgsBrokerEcho.h" @@ -60,7 +61,7 @@ using qpid::sys::Dispatcher; using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; -using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -129,15 +130,16 @@ Broker::Broker(const Broker::Options& conf) : config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), + links(this), factory(*this), sessionManager(conf.ack) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); - ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), - conf.mgmtPubInterval); - managementAgent = ManagementAgent::getAgent (); - managementAgent->setInterval (conf.mgmtPubInterval); + ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), + conf.mgmtPubInterval, this); + managementAgent = management::ManagementAgent::getAgent (); + ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); @@ -163,6 +165,7 @@ Broker::Broker(const Broker::Options& conf) : queues.setParent (vhost); exchanges.setParent (vhost); + links.setParent (vhost); } // Early-Initialize plugins @@ -178,11 +181,12 @@ Broker::Broker(const Broker::Options& conf) : queues.setStore (store); dtxManager.setStore (store); + links.setStore (store); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if (store != 0) { - RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, conf.stagingThreshold); store->recover(recoverer); } @@ -197,8 +201,9 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); - managementAgent->setExchange (mExchange, dExchange); - dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); + ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent + ((ManagementBroker*) managementAgent.get()); } else QPID_LOG(info, "Management not enabled"); @@ -285,7 +290,7 @@ void Broker::shutdown() { Broker::~Broker() { shutdown(); - ManagementAgent::shutdown (); + ManagementBroker::shutdown (); delete store; if (config.auth) { #if HAVE_SASL @@ -319,7 +324,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case management::Broker::METHOD_CONNECT : { management::ArgsBrokerConnect& hp= dynamic_cast<management::ArgsBrokerConnect&>(args); - connect(hp.i_host, hp.i_port); + + if (hp.i_useSsl) + return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED; + + std::pair<Link::shared_ptr, bool> response = + links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable); + if (hp.i_durable && response.second) + store->create(*response.first); + status = Manageable::STATUS_OK; break; } @@ -355,10 +368,11 @@ void Broker::accept() { // TODO: How to chose the protocolFactory to use for the connection void Broker::connect( - const std::string& host, uint16_t port, - sys::ConnectionCodec::Factory* f) + const std::string& host, uint16_t port, bool /*useSsl*/, + sys::ConnectionCodec::Factory* f, + sys::ProtocolAccess* access) { - getProtocolFactory()->connect(poller, host, port, f ? f : &factory); + getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access); } void Broker::connect( @@ -366,7 +380,7 @@ void Broker::connect( { url.throwIfEmpty(); TcpAddress addr=boost::get<TcpAddress>(url[0]); - connect(addr.host, addr.port, f); + connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0); } }} // namespace qpid::broker |
