summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-11-10 11:48:48 +0000
committerGordon Sim <gsim@apache.org>2014-11-10 11:48:48 +0000
commita82cf9ffd6aa54f1172fd3215fa5307acfbbbf1b (patch)
treeb68d0027f3df6a2dea7784b8277d1cf6de010357 /qpid/cpp/src
parent4864dbb1d20382cd355adc72b70b3181e9b0c5e0 (diff)
downloadqpid-python-a82cf9ffd6aa54f1172fd3215fa5307acfbbbf1b.tar.gz
QPID-6167: allow 0-10 to be disabled
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1637822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerOptions.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Protocol.cpp45
-rw-r--r--qpid/cpp/src/qpid/broker/Protocol.h14
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp75
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.h52
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Domain.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionCodec.h2
-rw-r--r--qpid/cpp/src/tests/MessageTest.cpp2
12 files changed, 66 insertions, 142 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index dbe58553a3..fe7a809cee 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1099,7 +1099,6 @@ set (qpidbroker_SOURCES
qpid/broker/RecoveredDequeue.cpp
qpid/broker/RetryList.cpp
qpid/broker/SecureConnection.cpp
- qpid/broker/SecureConnectionFactory.cpp
qpid/broker/Selector.h
qpid/broker/Selector.cpp
qpid/broker/SelectorExpression.h
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 9e2e12c840..3912dac708 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -32,7 +32,6 @@
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
-#include "qpid/broker/SecureConnectionFactory.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/PersistableObject.h"
@@ -90,6 +89,7 @@
#include <iostream>
#include <memory>
+#include <set>
using qpid::sys::TransportAcceptor;
using qpid::sys::TransportConnector;
@@ -168,6 +168,7 @@ BrokerOptions::BrokerOptions(const std::string& name) :
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections")
("listen-disable", optValue(listenDisabled, "<transport name>"), "Transports to disable listening")
+ ("protocols", optValue(protocols, "<protocol name+version>"), "Which protocol versions to allow")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
@@ -233,7 +234,6 @@ Broker::Broker(const BrokerOptions& conf) :
queues(this),
exchanges(this),
links(this),
- factory(new SecureConnectionFactory(*this)),
dtxManager(*timer.get(), conf.dtxDefaultTimeout),
sessionManager(
qpid::SessionState::Configuration(
@@ -242,6 +242,7 @@ Broker::Broker(const BrokerOptions& conf) :
*this),
queueCleaner(queues, poller, timer.get()),
recoveryInProgress(false),
+ protocolRegistry(std::set<std::string>(conf.protocols.begin(), conf.protocols.end()), this),
timestampRcvMsgs(conf.timestampRcvMsgs),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
@@ -1327,7 +1328,7 @@ void Broker::accept() {
unsigned accepting = 0;
for (TransportMap::const_iterator i = transportMap.begin(); i != transportMap.end(); i++) {
if (i->second.acceptor) {
- i->second.acceptor->accept(poller, factory.get());
+ i->second.acceptor->accept(poller, &protocolRegistry);
++accepting;
}
}
@@ -1341,7 +1342,7 @@ void Broker::connect(
const std::string& host, const std::string& port, const std::string& transport,
boost::function2<void, int, std::string> failed)
{
- connect(name, host, port, transport, factory.get(), failed);
+ connect(name, host, port, transport, &protocolRegistry, failed);
}
void Broker::connect(
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 3dbe407cff..7cd0eda875 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -149,7 +149,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
QueueRegistry queues;
ExchangeRegistry exchanges;
LinkRegistry links;
- boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
DtxManager dtxManager;
SessionManager sessionManager;
qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject;
diff --git a/qpid/cpp/src/qpid/broker/BrokerOptions.h b/qpid/cpp/src/qpid/broker/BrokerOptions.h
index 81844a8631..9284be6a0a 100644
--- a/qpid/cpp/src/qpid/broker/BrokerOptions.h
+++ b/qpid/cpp/src/qpid/broker/BrokerOptions.h
@@ -48,6 +48,7 @@ struct BrokerOptions : public qpid::Options
uint16_t port;
std::vector<std::string> listenInterfaces;
std::vector<std::string> listenDisabled;
+ std::vector<std::string> protocols;
int workerThreads;
int connectionBacklog;
bool enableMgmt;
diff --git a/qpid/cpp/src/qpid/broker/Protocol.cpp b/qpid/cpp/src/qpid/broker/Protocol.cpp
index a98160e502..2ef8c66445 100644
--- a/qpid/cpp/src/qpid/broker/Protocol.cpp
+++ b/qpid/cpp/src/qpid/broker/Protocol.cpp
@@ -19,22 +19,63 @@
*
*/
#include "Protocol.h"
-#include "qpid/sys/ConnectionCodec.h"
#include "qpid/broker/RecoverableMessageImpl.h"
+#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/broker/SecureConnection.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
+namespace {
+const std::string AMQP_0_10("amqp0-10");
+}
+ProtocolRegistry::ProtocolRegistry(const std::set<std::string>& e, Broker* b) : enabled(e), broker(b) {}
qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s)
{
+ if (v == qpid::framing::ProtocolVersion(0, 10) && isEnabled(AMQP_0_10)) {
+ return create_0_10(o, id, s, false);
+ }
qpid::sys::ConnectionCodec* codec = 0;
for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) {
- codec = i->second->create(v, o, id, s);
+ if (isEnabled(i->first)) {
+ codec = i->second->create(v, o, id, s);
+ }
}
return codec;
}
+qpid::sys::ConnectionCodec* ProtocolRegistry::create(qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s)
+{
+ return create_0_10(o, id, s, true);
+}
+
+bool ProtocolRegistry::isEnabled(const std::string& name)
+{
+ return enabled.empty()/*if nothing is explicitly enabled, assume everything is*/ || enabled.find(name) != enabled.end();
+}
+
+
+typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr;
+typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
+typedef std::auto_ptr<qpid::broker::amqp_0_10::Connection> ConnectionPtr;
+typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
+
+sys::ConnectionCodec*
+ProtocolRegistry::create_0_10(qpid::sys::OutputControl& out, const std::string& id,
+ const qpid::sys::SecuritySettings& external, bool brokerActsAsClient)
+{
+ assert(broker);
+ SecureConnectionPtr sc(new SecureConnection());
+ CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient));
+ ConnectionPtr i(new broker::amqp_0_10::Connection(c.get(), *broker, id, external, brokerActsAsClient));
+ i->setSecureConnection(sc.get());
+ c->setInputHandler(InputPtr(i.release()));
+ sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
+ return sc.release();
+}
+
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolRegistry::translate(const Message& m)
{
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer;
diff --git a/qpid/cpp/src/qpid/broker/Protocol.h b/qpid/cpp/src/qpid/broker/Protocol.h
index e736e4b367..59a631848e 100644
--- a/qpid/cpp/src/qpid/broker/Protocol.h
+++ b/qpid/cpp/src/qpid/broker/Protocol.h
@@ -23,13 +23,14 @@
*/
#include <map>
#include <string>
+#include <set>
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
+#include "qpid/sys/ConnectionCodec.h"
#include "qpid/broker/BrokerImportExport.h"
namespace qpid {
namespace sys {
-class ConnectionCodec;
class OutputControl;
struct SecuritySettings;
}
@@ -38,6 +39,7 @@ class Buffer;
class ProtocolVersion;
}
namespace broker {
+class Broker;
class Message;
class RecoverableMessage;
namespace amqp_0_10 {
@@ -63,14 +65,16 @@ class Protocol
private:
};
-class ProtocolRegistry : public Protocol
+class ProtocolRegistry : public Protocol, public qpid::sys::ConnectionCodec::Factory
{
public:
QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
QPID_BROKER_EXTERN boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&);
QPID_BROKER_EXTERN boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
QPID_BROKER_EXTERN Message decode(qpid::framing::Buffer&);
+ QPID_BROKER_EXTERN ProtocolRegistry(const std::set<std::string>& enabled, Broker* b);
QPID_BROKER_EXTERN ~ProtocolRegistry();
QPID_BROKER_EXTERN void add(const std::string&, Protocol*);
private:
@@ -78,6 +82,12 @@ class ProtocolRegistry : public Protocol
//limited manipulation of ordering
typedef std::map<std::string, Protocol*> Protocols;
Protocols protocols;
+ const std::set<std::string> enabled;
+ Broker* broker;
+
+ qpid::sys::ConnectionCodec* create_0_10(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&, bool);
+ bool isEnabled(const std::string&);
+
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
deleted file mode 100644
index 0258350043..0000000000
--- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/SecureConnectionFactory.h"
-
-#include "qpid/amqp_0_10/Connection.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/amqp_0_10/Connection.h"
-#include "qpid/broker/SecureConnection.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/SecuritySettings.h"
-
-namespace qpid {
-namespace broker {
-
-using framing::ProtocolVersion;
-using qpid::sys::SecuritySettings;
-typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr;
-typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
-typedef std::auto_ptr<qpid::broker::amqp_0_10::Connection> ConnectionPtr;
-typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
-
-SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {}
-
-sys::ConnectionCodec*
-SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
- const SecuritySettings& external) {
- if (v == ProtocolVersion(0, 10)) {
- return create_0_10(out, id, external, false);
- } else {
- return broker.getProtocolRegistry().create(v, out, id, external);
- }
- return 0;
-}
-
-sys::ConnectionCodec*
-SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
- const SecuritySettings& external) {
- // used to create connections from one broker to another
- return create_0_10(out, id, external, true);
-}
-
-sys::ConnectionCodec*
-SecureConnectionFactory::create_0_10(sys::OutputControl& out, const std::string& id,
- const SecuritySettings& external, bool brokerActsAsClient)
-{
- SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient));
- ConnectionPtr i(new broker::amqp_0_10::Connection(c.get(), broker, id, external, brokerActsAsClient));
- i->setSecureConnection(sc.get());
- c->setInputHandler(InputPtr(i.release()));
- sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
- return sc.release();
-}
-
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
deleted file mode 100644
index e64776d5ec..0000000000
--- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _SecureConnectionFactory_
-#define _SecureConnectionFactory_
-
-#include "qpid/sys/ConnectionCodec.h"
-
-namespace qpid {
-namespace broker {
-class Broker;
-
-class SecureConnectionFactory : public sys::ConnectionCodec::Factory
-{
- public:
- SecureConnectionFactory(Broker& b);
-
- sys::ConnectionCodec*
- create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
- const qpid::sys::SecuritySettings&);
-
- sys::ConnectionCodec*
- create(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&);
-
- private:
- Broker& broker;
-
- sys::ConnectionCodec*
- create_0_10(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&, bool brokerActsAsClient);
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
index 0ae13eb17b..cc714c0730 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
@@ -138,7 +138,7 @@ class InterconnectFactory : public BrokerContext, public qpid::sys::ConnectionCo
boost::shared_ptr<Domain>, BrokerContext&);
InterconnectFactory(bool incoming, const std::string& name, const std::string& source, const std::string& target,
boost::shared_ptr<Domain>, BrokerContext&, boost::shared_ptr<Relay>);
- qpid::sys::ConnectionCodec* create(framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ qpid::sys::ConnectionCodec* create(const framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
bool connect();
void failed(int, std::string);
@@ -170,7 +170,7 @@ InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const std
next = url.begin();
}
-qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&)
+qpid::sys::ConnectionCodec* InterconnectFactory::create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&)
{
throw qpid::Exception("Not implemented!");
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
index eb0c1c19d5..2ea381e2bc 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
@@ -91,7 +91,7 @@ struct ProtocolPlugin : public Plugin
if (broker) {
policies = new NodePolicyRegistry();
ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), policies, *broker, options.domain);
- broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown
+ broker->getProtocolRegistry().add("amqp1.0", impl);//registry deletes on shutdown
}
}
diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h
index c2890f06dc..969a3877e3 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h
@@ -52,7 +52,7 @@ class ConnectionCodec : public Codec {
/** Return 0 if version unknown */
virtual ConnectionCodec* create(
- framing::ProtocolVersion, OutputControl&, const std::string& id,
+ const framing::ProtocolVersion&, OutputControl&, const std::string& id,
const SecuritySettings&
) = 0;
diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp
index 08a2445522..a6c5157b47 100644
--- a/qpid/cpp/src/tests/MessageTest.cpp
+++ b/qpid/cpp/src/tests/MessageTest.cpp
@@ -61,7 +61,7 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
qpid::framing::Buffer buffer(&bytes[0], bytes.size());
msg.getPersistentContext()->encode(buffer);
buffer.reset();
- ProtocolRegistry registry;
+ ProtocolRegistry registry(std::set<std::string>(), 0);
msg = registry.decode(buffer);
BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());