From 198b43a1d3d58356949029eb64995711a1026c9b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 18 Dec 2009 16:23:19 +0000 Subject: QPID-2273 : Fix Protocol Negotiation git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@892301 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/Main.java | 15 ++++ .../server/configuration/ServerConfiguration.java | 5 ++ .../output/ProtocolOutputConverterRegistry.java | 1 - .../protocol/MultiVersionProtocolEngine.java | 83 ++++++++++++++++++---- .../java/org/apache/qpid/client/AMQConnection.java | 19 ++--- .../apache/qpid/client/AMQConnectionDelegate.java | 2 + .../qpid/client/AMQConnectionDelegate_0_10.java | 5 ++ .../qpid/client/AMQConnectionDelegate_0_9.java | 8 +++ .../qpid/client/AMQConnectionDelegate_8_0.java | 11 ++- .../qpid/client/AMQConnectionDelegate_9_1.java | 7 ++ .../qpid/client/protocol/AMQProtocolHandler.java | 13 +++- .../org/apache/qpid/transport/ProtocolHeader.java | 8 ++- .../qpid/transport/network/InputHandler.java | 3 +- .../templates/model/ProtocolVersionListClass.vm | 7 +- 14 files changed, 156 insertions(+), 31 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 845983857c..90afd2e4ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -140,6 +140,12 @@ public class Main .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line") .withLongOpt("exclude-0-10").create(); + Option exclude0_9_1 = + OptionBuilder.withArgName("exclude-0-9-1").hasArg() + .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line") + .withLongOpt("exclude-0-9-1").create(); + + Option exclude0_9 = OptionBuilder.withArgName("exclude-0-9").hasArg() .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line") @@ -179,6 +185,7 @@ public class Main options.addOption(logwatchconfig); options.addOption(port); options.addOption(exclude0_10); + options.addOption(exclude0_9_1); options.addOption(exclude0_9); options.addOption(exclude0_8); options.addOption(mport); @@ -335,6 +342,7 @@ public class Main Set ports = new HashSet(); Set exclude_0_10 = new HashSet(); + Set exclude_0_9_1 = new HashSet(); Set exclude_0_9 = new HashSet(); Set exclude_0_8 = new HashSet(); @@ -343,6 +351,7 @@ public class Main parsePortList(ports, serverConfig.getPorts()); parsePortList(exclude_0_10, serverConfig.getPortExclude010()); + parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); parsePortList(exclude_0_9, serverConfig.getPortExclude09()); parsePortList(exclude_0_8, serverConfig.getPortExclude08()); @@ -351,6 +360,7 @@ public class Main { parsePortArray(ports, portStr); parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); + parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1")); parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); @@ -399,6 +409,11 @@ public class Main { supported.remove(VERSION.v0_10); } + + if(exclude_0_9_1.contains(port)) + { + supported.remove(VERSION.v0_9_1); + } if(exclude_0_9.contains(port)) { supported.remove(VERSION.v0_9); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 66a7279134..879eb7c9e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -538,6 +538,11 @@ public class ServerConfiguration implements SignalHandler return getConfig().getList("connector.non010port", Collections.EMPTY_LIST); } + public List getPortExclude091() + { + return getConfig().getList("connector.non091port", Collections.EMPTY_LIST); + } + public List getPortExclude09() { return getConfig().getList("connector.non09port", Collections.EMPTY_LIST); diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java index 3a94160e22..dbefeb61f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -45,7 +45,6 @@ public class ProtocolOutputConverterRegistry register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory()); register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory()); register(ProtocolVersion.v0_91, org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory()); - } private static void register(ProtocolVersion version, Factory converter) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 78e21a8f14..9a1c6c9418 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -135,7 +135,7 @@ private static final byte[] AMQP_0_9_1_HEADER = (byte) 'M', (byte) 'Q', (byte) 'P', - (byte) 1, + (byte) 0, (byte) 0, (byte) 9, (byte) 1 @@ -250,6 +250,59 @@ private static final byte[] AMQP_0_9_1_HEADER = new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + private class ClosedDelegateProtocolEngine implements ProtocolEngine + { + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getWrittenBytes() + { + return 0; + } + + public long getReadBytes() + { + return 0; + } + + public void received(ByteBuffer msg) + { + _logger.error("Error processing incoming data, could not negotiate a common protocol"); + } + + public void exception(Throwable t) + { + _logger.error("Error establishing session", t); + } + + public void closed() + { + + } + + public void writerIdle() + { + + } + + public void readerIdle() + { + + } + } + private class SelfDelegateProtocolEngine implements ProtocolEngine { @@ -303,12 +356,14 @@ private static final byte[] AMQP_0_9_1_HEADER = ProtocolEngine newDelegate = null; + byte[] newestSupported = null; for(int i = 0; newDelegate == null && i < _creators.length; i++) { if(_supported.contains(_creators[i].getVersion())) { + newestSupported = _creators[i].getHeaderIdentifier(); byte[] compareBytes = _creators[i].getHeaderIdentifier(); boolean equal = true; for(int j = 0; equal && j return ERROR; } + byte protoClass = input.get(pos + 4); byte instance = input.get(pos + 5); byte major = input.get(pos + 6); byte minor = input.get(pos + 7); - receiver.received(new ProtocolHeader(instance, major, minor)); + receiver.received(new ProtocolHeader(protoClass, instance, major, minor)); needed = Frame.HEADER_SIZE; return FRAME_HDR; case FRAME_HDR: diff --git a/java/common/templates/model/ProtocolVersionListClass.vm b/java/common/templates/model/ProtocolVersionListClass.vm index 110342647e..78605c70ff 100644 --- a/java/common/templates/model/ProtocolVersionListClass.vm +++ b/java/common/templates/model/ProtocolVersionListClass.vm @@ -149,15 +149,20 @@ public class ProtocolVersion implements Comparable private static final ProtocolVersion _defaultVersion; + public static final ProtocolVersion v0_10 = new ProtocolVersion((byte)0,(byte)10); + #foreach( $version in $model.getVersionSet() ) #set( $versionId = "v$version.getMajor()_$version.getMinor()" ) - public static final ProtocolVersion $versionId = new ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor()); + public static final ProtocolVersion $versionId = new ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor()); #end + static { SortedSet versions = new TreeSet(); + versions.add(v0_10); + _nameToVersionMap.put("0-10", v0_10); #foreach( $version in $model.getVersionSet() ) #set( $versionId = "v$version.getMajor()_$version.getMinor()" ) versions.add($versionId); -- cgit v1.2.1