diff options
| author | Gordon Sim <gsim@apache.org> | 2014-11-10 11:48:48 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-11-10 11:48:48 +0000 |
| commit | a82cf9ffd6aa54f1172fd3215fa5307acfbbbf1b (patch) | |
| tree | b68d0027f3df6a2dea7784b8277d1cf6de010357 /qpid/cpp/src | |
| parent | 4864dbb1d20382cd355adc72b70b3181e9b0c5e0 (diff) | |
| download | qpid-python-a82cf9ffd6aa54f1172fd3215fa5307acfbbbf1b.tar.gz | |
QPID-6167: allow 0-10 to be disabled
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1637822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerOptions.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Protocol.cpp | 45 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Protocol.h | 14 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp | 75 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SecureConnectionFactory.h | 52 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Domain.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/ConnectionCodec.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/MessageTest.cpp | 2 |
12 files changed, 66 insertions, 142 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index dbe58553a3..fe7a809cee 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1099,7 +1099,6 @@ set (qpidbroker_SOURCES qpid/broker/RecoveredDequeue.cpp qpid/broker/RetryList.cpp qpid/broker/SecureConnection.cpp - qpid/broker/SecureConnectionFactory.cpp qpid/broker/Selector.h qpid/broker/Selector.cpp qpid/broker/SelectorExpression.h diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 9e2e12c840..3912dac708 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -32,7 +32,6 @@ #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" -#include "qpid/broker/SecureConnectionFactory.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/Link.h" #include "qpid/broker/PersistableObject.h" @@ -90,6 +89,7 @@ #include <iostream> #include <memory> +#include <set> using qpid::sys::TransportAcceptor; using qpid::sys::TransportConnector; @@ -168,6 +168,7 @@ BrokerOptions::BrokerOptions(const std::string& name) : ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") ("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections") ("listen-disable", optValue(listenDisabled, "<transport name>"), "Transports to disable listening") + ("protocols", optValue(protocols, "<protocol name+version>"), "Which protocol versions to allow") ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") @@ -233,7 +234,6 @@ Broker::Broker(const BrokerOptions& conf) : queues(this), exchanges(this), links(this), - factory(new SecureConnectionFactory(*this)), dtxManager(*timer.get(), conf.dtxDefaultTimeout), sessionManager( qpid::SessionState::Configuration( @@ -242,6 +242,7 @@ Broker::Broker(const BrokerOptions& conf) : *this), queueCleaner(queues, poller, timer.get()), recoveryInProgress(false), + protocolRegistry(std::set<std::string>(conf.protocols.begin(), conf.protocols.end()), this), timestampRcvMsgs(conf.timestampRcvMsgs), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { @@ -1327,7 +1328,7 @@ void Broker::accept() { unsigned accepting = 0; for (TransportMap::const_iterator i = transportMap.begin(); i != transportMap.end(); i++) { if (i->second.acceptor) { - i->second.acceptor->accept(poller, factory.get()); + i->second.acceptor->accept(poller, &protocolRegistry); ++accepting; } } @@ -1341,7 +1342,7 @@ void Broker::connect( const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed) { - connect(name, host, port, transport, factory.get(), failed); + connect(name, host, port, transport, &protocolRegistry, failed); } void Broker::connect( diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 3dbe407cff..7cd0eda875 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -149,7 +149,6 @@ class Broker : public sys::Runnable, public Plugin::Target, QueueRegistry queues; ExchangeRegistry exchanges; LinkRegistry links; - boost::shared_ptr<sys::ConnectionCodec::Factory> factory; DtxManager dtxManager; SessionManager sessionManager; qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject; diff --git a/qpid/cpp/src/qpid/broker/BrokerOptions.h b/qpid/cpp/src/qpid/broker/BrokerOptions.h index 81844a8631..9284be6a0a 100644 --- a/qpid/cpp/src/qpid/broker/BrokerOptions.h +++ b/qpid/cpp/src/qpid/broker/BrokerOptions.h @@ -48,6 +48,7 @@ struct BrokerOptions : public qpid::Options uint16_t port; std::vector<std::string> listenInterfaces; std::vector<std::string> listenDisabled; + std::vector<std::string> protocols; int workerThreads; int connectionBacklog; bool enableMgmt; diff --git a/qpid/cpp/src/qpid/broker/Protocol.cpp b/qpid/cpp/src/qpid/broker/Protocol.cpp index a98160e502..2ef8c66445 100644 --- a/qpid/cpp/src/qpid/broker/Protocol.cpp +++ b/qpid/cpp/src/qpid/broker/Protocol.cpp @@ -19,22 +19,63 @@ * */ #include "Protocol.h" -#include "qpid/sys/ConnectionCodec.h" #include "qpid/broker/RecoverableMessageImpl.h" +#include "qpid/amqp_0_10/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/broker/SecureConnection.h" #include "qpid/log/Statement.h" namespace qpid { namespace broker { +namespace { +const std::string AMQP_0_10("amqp0-10"); +} +ProtocolRegistry::ProtocolRegistry(const std::set<std::string>& e, Broker* b) : enabled(e), broker(b) {} qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s) { + if (v == qpid::framing::ProtocolVersion(0, 10) && isEnabled(AMQP_0_10)) { + return create_0_10(o, id, s, false); + } qpid::sys::ConnectionCodec* codec = 0; for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) { - codec = i->second->create(v, o, id, s); + if (isEnabled(i->first)) { + codec = i->second->create(v, o, id, s); + } } return codec; } +qpid::sys::ConnectionCodec* ProtocolRegistry::create(qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s) +{ + return create_0_10(o, id, s, true); +} + +bool ProtocolRegistry::isEnabled(const std::string& name) +{ + return enabled.empty()/*if nothing is explicitly enabled, assume everything is*/ || enabled.find(name) != enabled.end(); +} + + +typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr; +typedef std::auto_ptr<SecureConnection> SecureConnectionPtr; +typedef std::auto_ptr<qpid::broker::amqp_0_10::Connection> ConnectionPtr; +typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr; + +sys::ConnectionCodec* +ProtocolRegistry::create_0_10(qpid::sys::OutputControl& out, const std::string& id, + const qpid::sys::SecuritySettings& external, bool brokerActsAsClient) +{ + assert(broker); + SecureConnectionPtr sc(new SecureConnection()); + CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient)); + ConnectionPtr i(new broker::amqp_0_10::Connection(c.get(), *broker, id, external, brokerActsAsClient)); + i->setSecureConnection(sc.get()); + c->setInputHandler(InputPtr(i.release())); + sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c)); + return sc.release(); +} + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolRegistry::translate(const Message& m) { boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer; diff --git a/qpid/cpp/src/qpid/broker/Protocol.h b/qpid/cpp/src/qpid/broker/Protocol.h index e736e4b367..59a631848e 100644 --- a/qpid/cpp/src/qpid/broker/Protocol.h +++ b/qpid/cpp/src/qpid/broker/Protocol.h @@ -23,13 +23,14 @@ */ #include <map> #include <string> +#include <set> #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> +#include "qpid/sys/ConnectionCodec.h" #include "qpid/broker/BrokerImportExport.h" namespace qpid { namespace sys { -class ConnectionCodec; class OutputControl; struct SecuritySettings; } @@ -38,6 +39,7 @@ class Buffer; class ProtocolVersion; } namespace broker { +class Broker; class Message; class RecoverableMessage; namespace amqp_0_10 { @@ -63,14 +65,16 @@ class Protocol private: }; -class ProtocolRegistry : public Protocol +class ProtocolRegistry : public Protocol, public qpid::sys::ConnectionCodec::Factory { public: QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); QPID_BROKER_EXTERN boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&); QPID_BROKER_EXTERN boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); QPID_BROKER_EXTERN Message decode(qpid::framing::Buffer&); + QPID_BROKER_EXTERN ProtocolRegistry(const std::set<std::string>& enabled, Broker* b); QPID_BROKER_EXTERN ~ProtocolRegistry(); QPID_BROKER_EXTERN void add(const std::string&, Protocol*); private: @@ -78,6 +82,12 @@ class ProtocolRegistry : public Protocol //limited manipulation of ordering typedef std::map<std::string, Protocol*> Protocols; Protocols protocols; + const std::set<std::string> enabled; + Broker* broker; + + qpid::sys::ConnectionCodec* create_0_10(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&, bool); + bool isEnabled(const std::string&); + }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp deleted file mode 100644 index 0258350043..0000000000 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/SecureConnectionFactory.h" - -#include "qpid/amqp_0_10/Connection.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/amqp_0_10/Connection.h" -#include "qpid/broker/SecureConnection.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/SecuritySettings.h" - -namespace qpid { -namespace broker { - -using framing::ProtocolVersion; -using qpid::sys::SecuritySettings; -typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr; -typedef std::auto_ptr<SecureConnection> SecureConnectionPtr; -typedef std::auto_ptr<qpid::broker::amqp_0_10::Connection> ConnectionPtr; -typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr; - -SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {} - -sys::ConnectionCodec* -SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, - const SecuritySettings& external) { - if (v == ProtocolVersion(0, 10)) { - return create_0_10(out, id, external, false); - } else { - return broker.getProtocolRegistry().create(v, out, id, external); - } - return 0; -} - -sys::ConnectionCodec* -SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, - const SecuritySettings& external) { - // used to create connections from one broker to another - return create_0_10(out, id, external, true); -} - -sys::ConnectionCodec* -SecureConnectionFactory::create_0_10(sys::OutputControl& out, const std::string& id, - const SecuritySettings& external, bool brokerActsAsClient) -{ - SecureConnectionPtr sc(new SecureConnection()); - CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient)); - ConnectionPtr i(new broker::amqp_0_10::Connection(c.get(), broker, id, external, brokerActsAsClient)); - i->setSecureConnection(sc.get()); - c->setInputHandler(InputPtr(i.release())); - sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c)); - return sc.release(); -} - - -}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h deleted file mode 100644 index e64776d5ec..0000000000 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SecureConnectionFactory_ -#define _SecureConnectionFactory_ - -#include "qpid/sys/ConnectionCodec.h" - -namespace qpid { -namespace broker { -class Broker; - -class SecureConnectionFactory : public sys::ConnectionCodec::Factory -{ - public: - SecureConnectionFactory(Broker& b); - - sys::ConnectionCodec* - create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id, - const qpid::sys::SecuritySettings&); - - sys::ConnectionCodec* - create(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&); - - private: - Broker& broker; - - sys::ConnectionCodec* - create_0_10(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&, bool brokerActsAsClient); -}; - -}} - - -#endif diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp index 0ae13eb17b..cc714c0730 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp @@ -138,7 +138,7 @@ class InterconnectFactory : public BrokerContext, public qpid::sys::ConnectionCo boost::shared_ptr<Domain>, BrokerContext&); InterconnectFactory(bool incoming, const std::string& name, const std::string& source, const std::string& target, boost::shared_ptr<Domain>, BrokerContext&, boost::shared_ptr<Relay>); - qpid::sys::ConnectionCodec* create(framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + qpid::sys::ConnectionCodec* create(const framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); bool connect(); void failed(int, std::string); @@ -170,7 +170,7 @@ InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const std next = url.begin(); } -qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) +qpid::sys::ConnectionCodec* InterconnectFactory::create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) { throw qpid::Exception("Not implemented!"); } diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index eb0c1c19d5..2ea381e2bc 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -91,7 +91,7 @@ struct ProtocolPlugin : public Plugin if (broker) { policies = new NodePolicyRegistry(); ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), policies, *broker, options.domain); - broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown + broker->getProtocolRegistry().add("amqp1.0", impl);//registry deletes on shutdown } } diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h index c2890f06dc..969a3877e3 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h +++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h @@ -52,7 +52,7 @@ class ConnectionCodec : public Codec { /** Return 0 if version unknown */ virtual ConnectionCodec* create( - framing::ProtocolVersion, OutputControl&, const std::string& id, + const framing::ProtocolVersion&, OutputControl&, const std::string& id, const SecuritySettings& ) = 0; diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp index 08a2445522..a6c5157b47 100644 --- a/qpid/cpp/src/tests/MessageTest.cpp +++ b/qpid/cpp/src/tests/MessageTest.cpp @@ -61,7 +61,7 @@ QPID_AUTO_TEST_CASE(testEncodeDecode) qpid::framing::Buffer buffer(&bytes[0], bytes.size()); msg.getPersistentContext()->encode(buffer); buffer.reset(); - ProtocolRegistry registry; + ProtocolRegistry registry(std::set<std::string>(), 0); msg = registry.decode(buffer); BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey()); |
