summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-12-18 16:23:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-12-18 16:23:19 +0000
commit6f5d96325706a81a91e5bdfbdafb37a296478bf0 (patch)
tree67b0a7a2757d1e48397cb6dda41ab397005b9d0b /qpid/java/client/src
parent8cc7c55464edb03d8a45f688a34b85c9a08766de (diff)
downloadqpid-python-6f5d96325706a81a91e5bdfbdafb37a296478bf0.tar.gz
QPID-2273 : Fix Protocol Negotiation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@892301 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java5
-rwxr-xr-xqpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java11
-rwxr-xr-xqpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java13
7 files changed, 52 insertions, 13 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 6dfb70fe28..0b9be5951f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -308,7 +308,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
- private ProtocolVersion _protocolVersion = ProtocolVersion.v0_91; // FIXME TGM, shouldn't need this
protected AMQConnectionDelegate _delegate;
@@ -458,9 +457,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion) || "0-9".equals(amqpVersion))
+ if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
+ }
+ else if ("0-9".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_0_9(this);
+ }
+ else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_9_1(this);
}
else
{
@@ -1541,13 +1548,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ProtocolVersion getProtocolVersion()
{
- return _protocolVersion;
- }
-
- public void setProtocolVersion(ProtocolVersion protocolVersion)
- {
- _protocolVersion = protocolVersion;
- _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+ return _delegate.getProtocolVersion();
}
public boolean isFailingOver()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index e6c3473cb1..23dc244dee 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -61,4 +61,6 @@ public interface AMQConnectionDelegate
void setIdleTimeout(long l);
int getMaxChannelID();
+
+ ProtocolVersion getProtocolVersion();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 4d10180667..af21eb7ed0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -301,4 +301,9 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
return Integer.MAX_VALUE;
}
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v0_10;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
index d95e2e3dff..70ecedfd8b 100755
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.ProtocolVersion;
+
public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0
{
@@ -28,5 +30,11 @@ public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0
{
super(conn);
}
+
+ @Override
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v0_9;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index e1d9ae735c..6f44f68b37 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -107,9 +107,13 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
_conn._failoverPolicy.attainedConnection();
_conn._connected = true;
+ return null;
+ }
+ else
+ {
+ return _conn._protocolHandler.getSuggestedProtocolVersion();
}
- return null;
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
@@ -306,4 +310,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return (int) (Math.pow(2, 16)-1);
}
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v8_0;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
index 1bb93f66a3..442dd7b286 100755
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.ProtocolVersion;
+
public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0
{
@@ -29,4 +31,9 @@ public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0
super(conn);
}
+ @Override
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v0_91;
+ }
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 505febd42c..a567c2c215 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -171,6 +171,7 @@ public class AMQProtocolHandler implements ProtocolEngine
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private NetworkDriver _networkDriver;
+ private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
@@ -427,6 +428,7 @@ public class AMQProtocolHandler implements ProtocolEngine
Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
+
public void run()
{
// Decode buffer
@@ -467,9 +469,8 @@ public class AMQProtocolHandler implements ProtocolEngine
// suggesting an alternate ProtocolVersion; the server will then close the
// connection.
ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- ProtocolVersion pv = protocolInit.checkVersion();
- getConnection().setProtocolVersion(pv);
-
+ _suggestedProtocolVersion = protocolInit.checkVersion();
+
// get round a bug in old versions of qpid whereby the connection is not closed
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
@@ -845,4 +846,10 @@ public class AMQProtocolHandler implements ProtocolEngine
{
return _networkDriver;
}
+
+ public ProtocolVersion getSuggestedProtocolVersion()
+ {
+ return _suggestedProtocolVersion;
+ }
+
}