diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-04-02 09:55:27 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-04-02 09:55:27 +0000 |
| commit | 42dcda5fb197d0fa85788c9aa04d6c1b2ae1822d (patch) | |
| tree | fb7241e67c2133e4c5f36d9842bdf744370d2c02 /qpid/java/client/src | |
| parent | eb34891fb4886a975039df3aa58ecbffa299acd6 (diff) | |
| download | qpid-python-42dcda5fb197d0fa85788c9aa04d6c1b2ae1822d.tar.gz | |
QPID-829 Remove 0.10 specific URL. The code path is now selected based on broker response. We first try the highest protocol version and update the handler if the broker replies with a different protocol version. NOTE that we need to update the current java broker and 0.8 client for handling protocol headers. This should happen with the M2.1 merge. For the moment we only support an in VM 0.8 broker. Moreover, we'll need to migrate to a 0.10 vs 99.0 protocol version.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@643822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 68 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java | 6 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java | 6 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java | 48 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java (renamed from qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java) | 4 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java | 6 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java | 46 | ||||
| -rw-r--r-- | qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties | 4 |
8 files changed, 100 insertions, 88 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index fc11794cba..adbe03e986 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -50,10 +50,7 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import org.apache.qpid.AMQConnectionFailureException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.*; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; @@ -64,8 +61,8 @@ import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.QpidURL; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpidity.transport.TransportConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -234,30 +231,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { - /* This JVM arg is only used for test code - Unless u pass a url it is difficult to determine which version to use - Most of the test code use an AMQConnection constructor that doesn't use - the url. So you need this switch to say which code path to test. - - Another complication is that when a constructor is called with out a url - they would construct a 0-8 url and pass into the construtor that takes a url. - - In such an instance u need the jvm argument to force an 0-10 connection - Once the 0-10 code base stabilises, 0-10 will be the default. - */ - - if (Boolean.getBoolean("SwitchCon")) - { - connectionURL.setURLVersion((Boolean.getBoolean("0-10")? ConnectionURL.URL_0_10:ConnectionURL.URL_0_8)); - } - - if (connectionURL.getURLVersion() == ConnectionURL.URL_0_10) + _failoverPolicy = new FailoverPolicy(connectionURL); + if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM)) { - _delegate = new AMQConnectionDelegate_0_10(this); + _delegate = new AMQConnectionDelegate_0_8(this); } else { - _delegate = new AMQConnectionDelegate_0_8(this); + // We always assume that the broker supports the lates AMQ protocol verions + // thie is currently 0.10 + // TODO: use this code once we have switch to 0.10 + // getDelegate(); + _delegate = new AMQConnectionDelegate_0_10(this); } if (_logger.isInfoEnabled()) @@ -299,7 +284,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } - _failoverPolicy = new FailoverPolicy(connectionURL); _protocolHandler = new AMQProtocolHandler(this); // We are not currently connected @@ -316,6 +300,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect lastException = null; _connected = true; } + catch (AMQProtocolException pe) + { + if (_logger.isInfoEnabled()) + { + _logger.info(pe.getMessage()); + _logger.info("Trying broker supported protocol version: " + + TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor()); + } + // we need to check whether we have a delegate for the supported protocol + getDelegate(); + } catch (Exception e) { lastException = e; @@ -383,6 +379,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _connectionMetaData = new QpidConnectionMetaData(this); } + private void getDelegate() throws AMQProtocolException + { + try + { + Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" + + TransportConstants.getVersionMajor() + "_" + + TransportConstants.getVersionMinor()); + Class partypes[] = new Class[1]; + partypes[0] = AMQConnection.class; + _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); + } + catch (Exception e) + { + throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, + "Protocol: " + TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " + + "currently supported by this client library implementation", e); + } + } + protected AMQConnection(String username, String password, String clientName, String virtualHost) { _clientName = clientName; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index bf1ed49492..bde60c433f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -6,6 +6,7 @@ import javax.jms.JMSException; import javax.jms.XASession; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQProtocolException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.jms.BrokerDetails; @@ -14,6 +15,7 @@ import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; +import org.apache.qpidity.ProtocolException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +115,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed _conn.getUsername(), _conn.getPassword()); _qpidConnection.setClosedListener(this); } + catch(ProtocolException pe) + { + throw new AMQProtocolException(null, pe.getMessage(), pe); + } catch (QpidException e) { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index dfc87e21b1..3fcec67fe1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -37,6 +37,7 @@ import javax.naming.spi.ObjectFactory; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpidity.transport.TransportConstants; public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, @@ -429,9 +430,10 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF */ public XAConnection createXAConnection() throws JMSException { - if (_connectionDetails.getURLVersion() == ConnectionURL.URL_0_8) + if (TransportConstants.getVersionMajor() == 0 && + TransportConstants.getVersionMinor() == 8) { - throw new UnsupportedOperationException("This version does not support XA operations"); + throw new UnsupportedOperationException("This protocol version does not support XA operations"); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index ae9a5ff802..770fab7a81 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -24,10 +24,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.net.MalformedURLException; -import org.apache.qpid.client.url.URLParser_0_8; -import org.apache.qpid.client.url.URLParser_0_10; +import org.apache.qpid.client.url.URLParser; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; @@ -53,7 +51,6 @@ public class AMQConnectionURL implements ConnectionURL private AMQShortString _defaultTopicExchangeName; private AMQShortString _temporaryTopicExchangeName; private AMQShortString _temporaryQueueExchangeName; - private byte _urlVersion; public AMQConnectionURL(String fullURL) throws URLSyntaxException { @@ -62,48 +59,7 @@ public class AMQConnectionURL implements ConnectionURL _options = new HashMap<String, String>(); _brokers = new LinkedList<BrokerDetails>(); _failoverOptions = new HashMap<String, String>(); - - if (!Boolean.getBoolean("SwitchCon")) - { - // We need to decided the version based on URL - if (fullURL.startsWith("qpid")) - { - //URLParser - URLParser_0_10 parser = null; - try - { - parser = new URLParser_0_10(fullURL); - } - catch (MalformedURLException e) - { - throw new URLSyntaxException(fullURL,e.getMessage(),0,0); - } - setBrokerDetails(parser.getAllBrokerDetails()); - // use the first instance username and password - // This is temporary as the URL must be changed for olding this information as part of the full URL - BrokerDetails firstBroker = getBrokerDetails(0); - setUsername(firstBroker.getProperty(BrokerDetails.USERNAME)); - setPassword(firstBroker.getProperty(BrokerDetails.PASSWORD)); - setClientName(firstBroker.getProperty(BrokerDetails.CLIENT_ID)); - setVirtualHost(firstBroker.getProperty(BrokerDetails.VIRTUAL_HOST)); - _urlVersion = URL_0_10; - } - else - { - new URLParser_0_8(this); - _urlVersion = URL_0_8; - } - } - } - - public byte getURLVersion() - { - return _urlVersion; - } - - public void setURLVersion(byte version) - { - _urlVersion = version; + new URLParser(this); } public String getURL() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java index 5cde4d196a..b975713ad7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java @@ -11,11 +11,11 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.URLHelper; import org.apache.qpid.url.URLSyntaxException; -public class URLParser_0_8 +public class URLParser { private AMQConnectionURL _url; - public URLParser_0_8(AMQConnectionURL url)throws URLSyntaxException + public URLParser(AMQConnectionURL url)throws URLSyntaxException { _url = url; parseURL(_url.getURL()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 7cae7f8a9f..8ce302564b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -43,11 +43,7 @@ public interface ConnectionURL public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; public static final byte URL_0_8 = 1; public static final byte URL_0_10 = 2; - - byte getURLVersion(); - - void setURLVersion(byte version); - + String getURL(); String getFailoverMethod(); diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index 3944e2c3f3..8b787615e4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -11,6 +11,7 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.url.QpidURL; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; +import org.apache.qpidity.ProtocolException; import org.apache.qpidity.nclient.impl.ClientSession; import org.apache.qpidity.nclient.impl.ClientSessionDelegate; import org.apache.qpidity.transport.Channel; @@ -49,14 +50,14 @@ public class Client implements org.apache.qpidity.nclient.Connection public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException { - Condition negotiationComplete = _lock.newCondition(); + final Condition negotiationComplete = _lock.newCondition(); closeOk = _lock.newCondition(); _lock.lock(); ConnectionDelegate connectionDelegate = new ConnectionDelegate() { private boolean receivedClose = false; - + private String _unsupportedProtocol; public SessionDelegate getSessionDelegate() { return new ClientSessionDelegate(); @@ -115,6 +116,32 @@ public class Client implements org.apache.qpidity.nclient.Connection this.receivedClose = true; } + + @Override public void init(Channel ch, ProtocolHeader hdr) + { + // TODO: once the merge is done we'll need to update this code + // for handling 0.8 protocol version type i.e. major=8 and minor=0 :( + if (hdr.getMajor() != TransportConstants.getVersionMajor() + || hdr.getMinor() != TransportConstants.getVersionMinor()) + { + _unsupportedProtocol = TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor(); + TransportConstants.setVersionMajor( hdr.getMajor() ); + TransportConstants.setVersionMinor( hdr.getMinor() ); + _lock.lock(); + negotiationComplete.signalAll(); + _lock.unlock(); + } + else + { + ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8"); + } + } + + @Override public String getUnsupportedProtocol() + { + return _unsupportedProtocol; + } }; connectionDelegate.setCondition(_lock,negotiationComplete); @@ -123,8 +150,7 @@ public class Client implements org.apache.qpidity.nclient.Connection connectionDelegate.setVirtualHost(virtualHost); if (System.getProperty("transport","mina").equalsIgnoreCase("nio")) - { - System.out.println("Using NIO"); + { if( _logger.isDebugEnabled()) { _logger.debug("using NIO"); @@ -142,13 +168,21 @@ public class Client implements org.apache.qpidity.nclient.Connection } // XXX: hardcoded version numbers - _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.CONNECTION_VERSION_MAJOR, TransportConstants.CONNECTION_VERSION_MINOR))); + _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); try { negotiationComplete.await(); + if( connectionDelegate.getUnsupportedProtocol() != null ) + { + _conn.close(); + throw new ProtocolException("Unsupported protocol version: " + connectionDelegate.getUnsupportedProtocol() + , ErrorCode.UNSUPPORTED_PROTOCOL, null); + + } } - catch (Exception e) + catch (InterruptedException e) { // } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties index 893958949f..32ed16a392 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/tests.properties @@ -23,7 +23,9 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.local = qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672 +connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' +#qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672 + # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] |
