diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-05-29 22:50:35 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-05-29 22:50:35 +0000 |
| commit | 4e82caaece6d7b1626edc0b0913d45cad8771596 (patch) | |
| tree | a08c7faba239fcc8bd9ab73c9ccd9ca7a7ca02ce /qpid/java/broker/src/main | |
| parent | 3f64f140c95f54adfd5d698765f01d04670a0af0 (diff) | |
| download | qpid-python-4e82caaece6d7b1626edc0b0913d45cad8771596.tar.gz | |
QPID-4029: add ability to selectively include a protocol version on a given port, overriding an exclusion on the same port or it being disabled on all ports.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1344040 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
5 files changed, 233 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index ac1fcf05db..2b43d41c7a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -53,23 +53,15 @@ import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.EnumSet; -import java.util.Formatter; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; -import java.util.logging.ConsoleHandler; -import java.util.logging.FileHandler; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.LogRecord; public class Broker { private static final Logger LOGGER = Logger.getLogger(Broker.class); - private static final int IPV4_ADDRESS_LENGTH = 4; - private static final char IPV4_LITERAL_SEPARATOR = '.'; private volatile Thread _shutdownHookThread; protected static class InitException extends RuntimeException @@ -165,36 +157,71 @@ public class Broker parsePortList(sslPorts, serverConfig.getSSLPorts()); } + //1-0 excludes and includes Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0)); if(exclude_1_0.isEmpty()) { parsePortList(exclude_1_0, serverConfig.getPortExclude10()); } + Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0)); + if(include_1_0.isEmpty()) + { + parsePortList(include_1_0, serverConfig.getPortInclude10()); + } + + //0-10 excludes and includes Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10)); if(exclude_0_10.isEmpty()) { parsePortList(exclude_0_10, serverConfig.getPortExclude010()); } + Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10)); + if(include_0_10.isEmpty()) + { + parsePortList(include_0_10, serverConfig.getPortInclude010()); + } + + //0-9-1 excludes and includes Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1)); if(exclude_0_9_1.isEmpty()) { parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); } + Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1)); + if(include_0_9_1.isEmpty()) + { + parsePortList(include_0_9_1, serverConfig.getPortInclude091()); + } + + //0-9 excludes and includes Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9)); if(exclude_0_9.isEmpty()) { parsePortList(exclude_0_9, serverConfig.getPortExclude09()); } + Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9)); + if(include_0_9.isEmpty()) + { + parsePortList(include_0_9, serverConfig.getPortInclude09()); + } + + //0-8 excludes and includes Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8)); if(exclude_0_8.isEmpty()) { parsePortList(exclude_0_8, serverConfig.getPortExclude08()); } + Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8)); + if(include_0_8.isEmpty()) + { + parsePortList(include_0_8, serverConfig.getPortInclude08()); + } + String bindAddr = options.getBind(); if (bindAddr == null) { @@ -220,8 +247,8 @@ public class Broker final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port); final Set<AmqpProtocolVersion> supported = - getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, - exclude_0_8, serverConfig); + getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, + include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig); final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP); @@ -251,8 +278,8 @@ public class Broker final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort); final Set<AmqpProtocolVersion> supported = - getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, - exclude_0_9, exclude_0_8, serverConfig); + getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, + include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig); final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP); @@ -283,27 +310,36 @@ public class Broker final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9, final Set<Integer> exclude_0_8, + final Set<Integer> include_1_0, + final Set<Integer> include_0_10, + final Set<Integer> include_0_9_1, + final Set<Integer> include_0_9, + final Set<Integer> include_0_8, final ServerConfiguration serverConfig) { final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); - if(exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) + if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port)) { supported.remove(AmqpProtocolVersion.v1_0_0); } - if(exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) + + if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port)) { supported.remove(AmqpProtocolVersion.v0_10); } - if(exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) + + if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port)) { supported.remove(AmqpProtocolVersion.v0_9_1); } - if(exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) + + if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port)) { supported.remove(AmqpProtocolVersion.v0_9); } - if(exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) + + if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port)) { supported.remove(AmqpProtocolVersion.v0_8); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java index d871c724fd..cec614881d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java @@ -37,6 +37,7 @@ public class BrokerOptions private final Set<Integer> _ports = new HashSet<Integer>(); private final Set<Integer> _sslPorts = new HashSet<Integer>(); private final Map<ProtocolExclusion,Set<Integer>> _exclusionMap = new HashMap<ProtocolExclusion, Set<Integer>>(); + private final Map<ProtocolInclusion,Set<Integer>> _inclusionMap = new HashMap<ProtocolInclusion, Set<Integer>>(); private String _configFile; private String _logConfigFile; @@ -161,4 +162,21 @@ public class BrokerOptions { _bundleContext = bundleContext; } + + public Set<Integer> getIncludedPorts(final ProtocolInclusion includeProtocol) + { + final Set<Integer> includedPorts = _inclusionMap.get(includeProtocol); + return includedPorts == null ? Collections.<Integer>emptySet() : includedPorts; + } + + public void addIncludedPort(final ProtocolInclusion includeProtocol, final int port) + { + if (!_inclusionMap.containsKey(includeProtocol)) + { + _inclusionMap.put(includeProtocol, new HashSet<Integer>()); + } + + Set<Integer> ports = _inclusionMap.get(includeProtocol); + ports.add(port); + } }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 70fa414e3c..9fe7a6619f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -85,6 +85,32 @@ public class Main .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line") .withLongOpt("exclude-0-8").create(); + private static final Option OPTION_INCLUDE_1_0 = + OptionBuilder.withArgName("port").hasArg() + .withDescription("accept AMQP1-0 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line") + .withLongOpt("include-1-0").create(); + +private static final Option OPTION_INCLUDE_0_10 = + OptionBuilder.withArgName("port").hasArg() + .withDescription("accept AMQP0-10 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line") + .withLongOpt("include-0-10").create(); + +private static final Option OPTION_INCLUDE_0_9_1 = + OptionBuilder.withArgName("port").hasArg() + .withDescription("accept AMQP0-9-1 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line") + .withLongOpt("include-0-9-1").create(); + +private static final Option OPTION_INCLUDE_0_9 = + OptionBuilder.withArgName("port").hasArg() + .withDescription("accept AMQP0-9 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line") + .withLongOpt("include-0-9").create(); + +private static final Option OPTION_INCLUDE_0_8 = + OptionBuilder.withArgName("port").hasArg() + .withDescription("accept AMQP0-8 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line") + .withLongOpt("include-0-8").create(); + + private static final Option OPTION_JMX_PORT_REGISTRY_SERVER = OptionBuilder.withArgName("port").hasArg() .withDescription("listen on the specified management (registry server) port. Overrides any value in the config file") @@ -127,6 +153,11 @@ public class Main OPTIONS.addOption(OPTION_EXCLUDE_0_9_1); OPTIONS.addOption(OPTION_EXCLUDE_0_9); OPTIONS.addOption(OPTION_EXCLUDE_0_8); + OPTIONS.addOption(OPTION_INCLUDE_1_0); + OPTIONS.addOption(OPTION_INCLUDE_0_10); + OPTIONS.addOption(OPTION_INCLUDE_0_9_1); + OPTIONS.addOption(OPTION_INCLUDE_0_9); + OPTIONS.addOption(OPTION_INCLUDE_0_8); OPTIONS.addOption(OPTION_BIND); OPTIONS.addOption(OPTION_JMX_PORT_REGISTRY_SERVER); @@ -256,6 +287,10 @@ public class Main { parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe); } + for(ProtocolInclusion pe : ProtocolInclusion.values()) + { + parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe); + } } String[] sslPortStr = _commandLine.getOptionValues(OPTION_SSLPORT.getOpt()); @@ -266,6 +301,10 @@ public class Main { parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe); } + for(ProtocolInclusion pe : ProtocolInclusion.values()) + { + parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe); + } } setExceptionHandler(); @@ -399,4 +438,23 @@ public class Main } } } + + private static void parseProtocolInclusions(final BrokerOptions options, final Object[] ports, + final ProtocolInclusion includedProtocol) throws InitException + { + if(ports != null) + { + for(int i = 0; i < ports.length; i++) + { + try + { + options.addIncludedPort(includedProtocol, Integer.parseInt(String.valueOf(ports[i]))); + } + catch (NumberFormatException e) + { + throw new InitException("Invalid port for inclusion: " + ports[i], e); + } + } + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolInclusion.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolInclusion.java new file mode 100644 index 0000000000..85fbe2e02e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolInclusion.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import java.util.HashMap; +import java.util.Map; + +public enum ProtocolInclusion +{ + v0_8("include-0-8","--include-0-8"), + v0_9("include-0-9", "--include-0-9"), + v0_9_1("include-0-9-1", "--include-0-9-1"), + v0_10("include-0-10", "--include-0-10"), + v1_0("include-1-0", "--include-1-0"); + + private static final Map<String, ProtocolInclusion> MAP = new HashMap<String,ProtocolInclusion>(); + + static + { + for(ProtocolInclusion pe : ProtocolInclusion.values()) + { + MAP.put(pe.getArg(), pe); + } + } + + private String _arg; + private String _includeName; + + private ProtocolInclusion(final String includeName, final String arg) + { + _includeName = includeName; + _arg = arg; + } + + public String getArg() + { + return _arg; + } + + public String getIncludeName() + { + return _includeName; + } + + public static ProtocolInclusion lookup(final String arg) + { + ProtocolInclusion ex = MAP.get(arg); + + if(ex == null) + { + throw new IllegalArgumentException(arg + " is not a valid protocol inclusion"); + } + + return ex; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index baf6d5e6ad..651fd26059 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -94,6 +94,11 @@ public class ServerConfiguration extends ConfigurationPlugin public static final String CONNECTOR_AMQP09ENABLED = "connector.amqp09enabled"; public static final String CONNECTOR_AMQP08ENABLED = "connector.amqp08enabled"; public static final String CONNECTOR_AMQP_SUPPORTED_REPLY = "connector.amqpDefaultSupportedProtocolReply"; + public static final String CONNECTOR_INCLUDE_10 = "connector.include10"; + public static final String CONNECTOR_INCLUDE_010 = "connector.include010"; + public static final String CONNECTOR_INCLUDE_091 = "connector.include091"; + public static final String CONNECTOR_INCLUDE_09 = "connector.include09"; + public static final String CONNECTOR_INCLUDE_08 = "connector.include08"; { envVarMap.put("QPID_PORT", "connector.port"); @@ -714,6 +719,31 @@ public class ServerConfiguration extends ConfigurationPlugin return getListValue("connector.non08port"); } + public List getPortInclude08() + { + return getListValue(CONNECTOR_INCLUDE_08); + } + + public List getPortInclude09() + { + return getListValue(CONNECTOR_INCLUDE_09); + } + + public List getPortInclude091() + { + return getListValue(CONNECTOR_INCLUDE_091); + } + + public List getPortInclude010() + { + return getListValue(CONNECTOR_INCLUDE_010); + } + + public List getPortInclude10() + { + return getListValue(CONNECTOR_INCLUDE_10); + } + public String getBind() { return getStringValue("connector.bind", WILDCARD_ADDRESS); |
