summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-04-02 09:55:27 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-04-02 09:55:27 +0000
commit42dcda5fb197d0fa85788c9aa04d6c1b2ae1822d (patch)
treefb7241e67c2133e4c5f36d9842bdf744370d2c02 /qpid/java
parenteb34891fb4886a975039df3aa58ecbffa299acd6 (diff)
downloadqpid-python-42dcda5fb197d0fa85788c9aa04d6c1b2ae1822d.tar.gz
QPID-829 Remove 0.10 specific URL. The code path is now selected based on broker response. We first try the highest protocol version and update the handler if the broker replies with a different protocol version. NOTE that we need to update the current java broker and 0.8 client for handling protocol headers. This should happen with the M2.1 merge. For the moment we only support an in VM 0.8 broker. Moreover, we'll need to migrate to a 0.10 vs 99.0 protocol version.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@643822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties2
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties2
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties2
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties2
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java48
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java (renamed from qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java)4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java46
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java38
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/ErrorCode.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/ProtocolException.java36
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java24
-rw-r--r--qpid/java/test-provider.properties2
21 files changed, 222 insertions, 98 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
index cc465e9251..4b86126cf6 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
@@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# Register an AMQP destination in JNDI
# destination.[jniName] = [BindingURL]
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties
index 0f1dd43aa9..4b98477a5f 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties
@@ -21,7 +21,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# Register an AMQP destination in JNDI
# destination.[jniName] = [BindingURL]
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties
index dc9061866a..675ac7fc0f 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties
@@ -21,7 +21,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties
index e732ce560d..8d6706eeb8 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties
@@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties
index 394d5f9036..601c5a24e2 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties
@@ -20,7 +20,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672
+connectionfactory.qpidConnectionfactory = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index fc11794cba..adbe03e986 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -50,10 +50,7 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.*;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -64,8 +61,8 @@ import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.QpidURL;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpidity.transport.TransportConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -234,30 +231,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
- /* This JVM arg is only used for test code
- Unless u pass a url it is difficult to determine which version to use
- Most of the test code use an AMQConnection constructor that doesn't use
- the url. So you need this switch to say which code path to test.
-
- Another complication is that when a constructor is called with out a url
- they would construct a 0-8 url and pass into the construtor that takes a url.
-
- In such an instance u need the jvm argument to force an 0-10 connection
- Once the 0-10 code base stabilises, 0-10 will be the default.
- */
-
- if (Boolean.getBoolean("SwitchCon"))
- {
- connectionURL.setURLVersion((Boolean.getBoolean("0-10")? ConnectionURL.URL_0_10:ConnectionURL.URL_0_8));
- }
-
- if (connectionURL.getURLVersion() == ConnectionURL.URL_0_10)
+ _failoverPolicy = new FailoverPolicy(connectionURL);
+ if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM))
{
- _delegate = new AMQConnectionDelegate_0_10(this);
+ _delegate = new AMQConnectionDelegate_0_8(this);
}
else
{
- _delegate = new AMQConnectionDelegate_0_8(this);
+ // We always assume that the broker supports the lates AMQ protocol verions
+ // thie is currently 0.10
+ // TODO: use this code once we have switch to 0.10
+ // getDelegate();
+ _delegate = new AMQConnectionDelegate_0_10(this);
}
if (_logger.isInfoEnabled())
@@ -299,7 +284,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
- _failoverPolicy = new FailoverPolicy(connectionURL);
_protocolHandler = new AMQProtocolHandler(this);
// We are not currently connected
@@ -316,6 +300,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
lastException = null;
_connected = true;
}
+ catch (AMQProtocolException pe)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info(pe.getMessage());
+ _logger.info("Trying broker supported protocol version: " +
+ TransportConstants.getVersionMajor() + "." +
+ TransportConstants.getVersionMinor());
+ }
+ // we need to check whether we have a delegate for the supported protocol
+ getDelegate();
+ }
catch (Exception e)
{
lastException = e;
@@ -383,6 +379,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_connectionMetaData = new QpidConnectionMetaData(this);
}
+ private void getDelegate() throws AMQProtocolException
+ {
+ try
+ {
+ Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" +
+ TransportConstants.getVersionMajor() + "_" +
+ TransportConstants.getVersionMinor());
+ Class partypes[] = new Class[1];
+ partypes[0] = AMQConnection.class;
+ _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
+ }
+ catch (Exception e)
+ {
+ throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
+ "Protocol: " + TransportConstants.getVersionMajor() + "."
+ + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
+ "currently supported by this client library implementation", e);
+ }
+ }
+
protected AMQConnection(String username, String password, String clientName, String virtualHost)
{
_clientName = clientName;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index bf1ed49492..bde60c433f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -6,6 +6,7 @@ import javax.jms.JMSException;
import javax.jms.XASession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.jms.BrokerDetails;
@@ -14,6 +15,7 @@ import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ProtocolException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,6 +115,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
_conn.getUsername(), _conn.getPassword());
_qpidConnection.setClosedListener(this);
}
+ catch(ProtocolException pe)
+ {
+ throw new AMQProtocolException(null, pe.getMessage(), pe);
+ }
catch (QpidException e)
{
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
index dfc87e21b1..3fcec67fe1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
@@ -37,6 +37,7 @@ import javax.naming.spi.ObjectFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpidity.transport.TransportConstants;
public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
@@ -429,9 +430,10 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
*/
public XAConnection createXAConnection() throws JMSException
{
- if (_connectionDetails.getURLVersion() == ConnectionURL.URL_0_8)
+ if (TransportConstants.getVersionMajor() == 0 &&
+ TransportConstants.getVersionMinor() == 8)
{
- throw new UnsupportedOperationException("This version does not support XA operations");
+ throw new UnsupportedOperationException("This protocol version does not support XA operations");
}
else
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index ae9a5ff802..770fab7a81 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -24,10 +24,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.net.MalformedURLException;
-import org.apache.qpid.client.url.URLParser_0_8;
-import org.apache.qpid.client.url.URLParser_0_10;
+import org.apache.qpid.client.url.URLParser;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
@@ -53,7 +51,6 @@ public class AMQConnectionURL implements ConnectionURL
private AMQShortString _defaultTopicExchangeName;
private AMQShortString _temporaryTopicExchangeName;
private AMQShortString _temporaryQueueExchangeName;
- private byte _urlVersion;
public AMQConnectionURL(String fullURL) throws URLSyntaxException
{
@@ -62,48 +59,7 @@ public class AMQConnectionURL implements ConnectionURL
_options = new HashMap<String, String>();
_brokers = new LinkedList<BrokerDetails>();
_failoverOptions = new HashMap<String, String>();
-
- if (!Boolean.getBoolean("SwitchCon"))
- {
- // We need to decided the version based on URL
- if (fullURL.startsWith("qpid"))
- {
- //URLParser
- URLParser_0_10 parser = null;
- try
- {
- parser = new URLParser_0_10(fullURL);
- }
- catch (MalformedURLException e)
- {
- throw new URLSyntaxException(fullURL,e.getMessage(),0,0);
- }
- setBrokerDetails(parser.getAllBrokerDetails());
- // use the first instance username and password
- // This is temporary as the URL must be changed for olding this information as part of the full URL
- BrokerDetails firstBroker = getBrokerDetails(0);
- setUsername(firstBroker.getProperty(BrokerDetails.USERNAME));
- setPassword(firstBroker.getProperty(BrokerDetails.PASSWORD));
- setClientName(firstBroker.getProperty(BrokerDetails.CLIENT_ID));
- setVirtualHost(firstBroker.getProperty(BrokerDetails.VIRTUAL_HOST));
- _urlVersion = URL_0_10;
- }
- else
- {
- new URLParser_0_8(this);
- _urlVersion = URL_0_8;
- }
- }
- }
-
- public byte getURLVersion()
- {
- return _urlVersion;
- }
-
- public void setURLVersion(byte version)
- {
- _urlVersion = version;
+ new URLParser(this);
}
public String getURL()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
index 5cde4d196a..b975713ad7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
@@ -11,11 +11,11 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.url.URLSyntaxException;
-public class URLParser_0_8
+public class URLParser
{
private AMQConnectionURL _url;
- public URLParser_0_8(AMQConnectionURL url)throws URLSyntaxException
+ public URLParser(AMQConnectionURL url)throws URLSyntaxException
{
_url = url;
parseURL(_url.getURL());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 7cae7f8a9f..8ce302564b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -43,11 +43,7 @@ public interface ConnectionURL
public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
public static final byte URL_0_8 = 1;
public static final byte URL_0_10 = 2;
-
- byte getURLVersion();
-
- void setURLVersion(byte version);
-
+
String getURL();
String getFailoverMethod();
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index 3944e2c3f3..8b787615e4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -11,6 +11,7 @@ import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.url.QpidURL;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ProtocolException;
import org.apache.qpidity.nclient.impl.ClientSession;
import org.apache.qpidity.nclient.impl.ClientSessionDelegate;
import org.apache.qpidity.transport.Channel;
@@ -49,14 +50,14 @@ public class Client implements org.apache.qpidity.nclient.Connection
public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
{
- Condition negotiationComplete = _lock.newCondition();
+ final Condition negotiationComplete = _lock.newCondition();
closeOk = _lock.newCondition();
_lock.lock();
ConnectionDelegate connectionDelegate = new ConnectionDelegate()
{
private boolean receivedClose = false;
-
+ private String _unsupportedProtocol;
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
@@ -115,6 +116,32 @@ public class Client implements org.apache.qpidity.nclient.Connection
this.receivedClose = true;
}
+
+ @Override public void init(Channel ch, ProtocolHeader hdr)
+ {
+ // TODO: once the merge is done we'll need to update this code
+ // for handling 0.8 protocol version type i.e. major=8 and minor=0 :(
+ if (hdr.getMajor() != TransportConstants.getVersionMajor()
+ || hdr.getMinor() != TransportConstants.getVersionMinor())
+ {
+ _unsupportedProtocol = TransportConstants.getVersionMajor() + "." +
+ TransportConstants.getVersionMinor();
+ TransportConstants.setVersionMajor( hdr.getMajor() );
+ TransportConstants.setVersionMinor( hdr.getMinor() );
+ _lock.lock();
+ negotiationComplete.signalAll();
+ _lock.unlock();
+ }
+ else
+ {
+ ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8");
+ }
+ }
+
+ @Override public String getUnsupportedProtocol()
+ {
+ return _unsupportedProtocol;
+ }
};
connectionDelegate.setCondition(_lock,negotiationComplete);
@@ -123,8 +150,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
connectionDelegate.setVirtualHost(virtualHost);
if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
- {
- System.out.println("Using NIO");
+ {
if( _logger.isDebugEnabled())
{
_logger.debug("using NIO");
@@ -142,13 +168,21 @@ public class Client implements org.apache.qpidity.nclient.Connection
}
// XXX: hardcoded version numbers
- _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR)));
+ _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
try
{
negotiationComplete.await();
+ if( connectionDelegate.getUnsupportedProtocol() != null )
+ {
+ _conn.close();
+ throw new ProtocolException("Unsupported protocol version: " + connectionDelegate.getUnsupportedProtocol()
+ , ErrorCode.UNSUPPORTED_PROTOCOL, null);
+
+ }
}
- catch (Exception e)
+ catch (InterruptedException e)
{
//
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties
index 893958949f..32ed16a392 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties
@@ -23,7 +23,9 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.local = qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672
+connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
+#qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672
+
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java
new file mode 100644
index 0000000000..bbc569839a
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java
@@ -0,0 +1,38 @@
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+/* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+public class AMQProtocolException extends AMQException
+{
+ /**
+ * Constructor for a Protocol Exception
+ * <p> This is the only provided constructor and the parameters have to be
+ * set to null when they are unknown.
+ *
+ * @param msg A description of the reason of this exception .
+ * @param errorCode A string specifyin the error code of this exception.
+ * @param cause The linked Execption.
+ */
+ public AMQProtocolException(AMQConstant errorCode, String msg, Throwable cause)
+ {
+ super(errorCode, msg, cause);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index 375df2a45d..8dee790a9e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -153,6 +153,15 @@ public final class AMQConstant
public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
+ /**
+ * The server does not support the protocol version
+ */
+ public static final AMQConstant UNSUPPORTED_BROKER_PROTOCOL_ERROR = new AMQConstant(542, "broker unsupported protocol", true);
+ /**
+ * The client imp does not support the protocol version
+ */
+ public static final AMQConstant UNSUPPORTED_CLIENT_PROTOCOL_ERROR = new AMQConstant(543, "client unsupported protocol", true);
+
/** The AMQP status code. */
private int _code;
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ErrorCode.java b/qpid/java/common/src/main/java/org/apache/qpidity/ErrorCode.java
index 4ff6939139..4b18c46d16 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/ErrorCode.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/ErrorCode.java
@@ -6,6 +6,7 @@ public enum ErrorCode
UNDEFINED(1,"undefined",true),
MESSAGE_REJECTED(2,"message_rejected",true),
CONNECTION_ERROR(3,"connection was closed",true),
+ UNSUPPORTED_PROTOCOL(4, "protocol version is unsupported", true),
//This might change in the spec, the error class is not applicable
NO_ERROR(200,"reply-success",true),
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolException.java b/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolException.java
new file mode 100644
index 0000000000..596143a1b9
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolException.java
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpidity;
+
+public class ProtocolException extends QpidException
+{
+ /**
+ * Constructor for a Ptotocol Exception.
+ * <p> This is the only provided constructor and the parameters have to be set to null when
+ * they are unknown.
+ * @param message A description of the reason of this exception.
+ * @param errorCode A string specifyin the error code of this exception.
+ * @param cause The linked Execption.
+ *
+ */
+ public ProtocolException(String message, ErrorCode errorCode, Throwable cause)
+ {
+ super(message, errorCode, cause);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
index e455be0873..10b68bbb20 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -75,7 +75,9 @@ class ToyClient extends SessionDelegate
}
public void closed() {}
});
- conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR)));
+ conn.send(new ConnectionEvent(0, new ProtocolHeader(1,
+ TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
Channel ch = conn.getChannel(0);
Session ssn = new Session();
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
index 21c7b8c16b..4815f1025f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
@@ -83,7 +83,8 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
{
// XXX
- ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR)));
+ ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
ch.getConnection().close();
}
else
@@ -282,4 +283,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
{
_virtualHost = host;
}
+
+ public String getUnsupportedProtocol()
+ {
+ return null;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
index 47f7f17578..54429a1a4f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
@@ -2,6 +2,26 @@ package org.apache.qpidity.transport;
public class TransportConstants
{
- public static final byte CONNECTION_VERSION_MAJOR = 99;
- public static final byte CONNECTION_VERSION_MINOR = 0;
+ private static byte _protocol_version_minor = 0;
+ private static byte _protocol_version_major = 99;
+
+ public static void setVersionMajor(byte value)
+ {
+ _protocol_version_major = value;
+ }
+
+ public static void setVersionMinor(byte value)
+ {
+ _protocol_version_minor = value;
+ }
+
+ public static byte getVersionMajor()
+ {
+ return _protocol_version_major;
+ }
+
+ public static byte getVersionMinor()
+ {
+ return _protocol_version_minor;
+ }
}
diff --git a/qpid/java/test-provider.properties b/qpid/java/test-provider.properties
index 8dcba7230f..38cc146ae6 100644
--- a/qpid/java/test-provider.properties
+++ b/qpid/java/test-provider.properties
@@ -1,4 +1,4 @@
-connectionfactory.local = qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672
+connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
queue.MyQueue = example.MyQueue
queue.xaQueue = xaQueue