diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-12-18 16:23:19 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-12-18 16:23:19 +0000 |
| commit | 6f5d96325706a81a91e5bdfbdafb37a296478bf0 (patch) | |
| tree | 67b0a7a2757d1e48397cb6dda41ab397005b9d0b /qpid/java/client/src | |
| parent | 8cc7c55464edb03d8a45f688a34b85c9a08766de (diff) | |
| download | qpid-python-6f5d96325706a81a91e5bdfbdafb37a296478bf0.tar.gz | |
QPID-2273 : Fix Protocol Negotiation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@892301 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
7 files changed, 52 insertions, 13 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6dfb70fe28..0b9be5951f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -308,7 +308,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; - private ProtocolVersion _protocolVersion = ProtocolVersion.v0_91; // FIXME TGM, shouldn't need this protected AMQConnectionDelegate _delegate; @@ -458,9 +457,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); - if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion) || "0-9".equals(amqpVersion)) + if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); + } + else if ("0-9".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_0_9(this); + } + else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_9_1(this); } else { @@ -1541,13 +1548,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ProtocolVersion getProtocolVersion() { - return _protocolVersion; - } - - public void setProtocolVersion(ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion); + return _delegate.getProtocolVersion(); } public boolean isFailingOver() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index e6c3473cb1..23dc244dee 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -61,4 +61,6 @@ public interface AMQConnectionDelegate void setIdleTimeout(long l); int getMaxChannelID(); + + ProtocolVersion getProtocolVersion(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 4d10180667..af21eb7ed0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -301,4 +301,9 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return Integer.MAX_VALUE; } + + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v0_10; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java index d95e2e3dff..70ecedfd8b 100755 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.ProtocolVersion; + public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0 { @@ -28,5 +30,11 @@ public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0 { super(conn); } + + @Override + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v0_9; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index e1d9ae735c..6f44f68b37 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -107,9 +107,13 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn._failoverPolicy.attainedConnection(); _conn._connected = true; + return null; + } + else + { + return _conn._protocolHandler.getSuggestedProtocolVersion(); } - return null; } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) @@ -306,4 +310,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return (int) (Math.pow(2, 16)-1); } + + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v8_0; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java index 1bb93f66a3..442dd7b286 100755 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.ProtocolVersion; + public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0 { @@ -29,4 +31,9 @@ public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0 super(conn); } + @Override + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v0_91; + } }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 505febd42c..a567c2c215 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -171,6 +171,7 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private NetworkDriver _networkDriver; + private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; private long _readBytes; @@ -427,6 +428,7 @@ public class AMQProtocolHandler implements ProtocolEngine Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { + public void run() { // Decode buffer @@ -467,9 +469,8 @@ public class AMQProtocolHandler implements ProtocolEngine // suggesting an alternate ProtocolVersion; the server will then close the // connection. ProtocolInitiation protocolInit = (ProtocolInitiation) message; - ProtocolVersion pv = protocolInit.checkVersion(); - getConnection().setProtocolVersion(pv); - + _suggestedProtocolVersion = protocolInit.checkVersion(); + // get round a bug in old versions of qpid whereby the connection is not closed _stateManager.changeState(AMQState.CONNECTION_CLOSED); } @@ -845,4 +846,10 @@ public class AMQProtocolHandler implements ProtocolEngine { return _networkDriver; } + + public ProtocolVersion getSuggestedProtocolVersion() + { + return _suggestedProtocolVersion; + } + } |
