diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-07-29 02:07:20 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-07-29 02:07:20 +0000 |
| commit | 404c17dcd4357950a5707595df5446891786eaf8 (patch) | |
| tree | 2a692547da2c75c8e7bf9dc5b2e6cdf963b401ad /qpid/java | |
| parent | b55b4aaa985dfc6c67923d972a5cfb48e1443174 (diff) | |
| download | qpid-python-404c17dcd4357950a5707595df5446891786eaf8.tar.gz | |
QPID-1201: fixed up version of aidan's patch, there are still failures when running against an external java broker, however we seem to get past basic connection negotiation now
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@680602 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
11 files changed, 106 insertions, 147 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b3b3cc1ffd..bb28e70d76 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -36,7 +36,6 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpidity.transport.TransportConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +57,7 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; @@ -364,10 +364,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - // We always assume that the broker supports the lates AMQ protocol verions - // thie is currently 0.10 - // TODO: use this code once we have switch to 0.10 - // getDelegate(); _delegate = new AMQConnectionDelegate_0_10(this); } @@ -420,21 +416,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Exception connectionException = null; while (!_connected && retryAllowed) { + ProtocolVersion pe = null; try { - makeBrokerConnection(brokerDetails); - } - catch (AMQProtocolException pe) - { - if (_logger.isInfoEnabled()) - { - _logger.info(pe.getMessage()); - _logger.info("Trying broker supported protocol version: " + - TransportConstants.getVersionMajor() + "." + - TransportConstants.getVersionMinor()); - } - // we need to check whether we have a delegate for the supported protocol - getDelegate(); + pe = makeBrokerConnection(brokerDetails); } catch (Exception e) { @@ -447,6 +432,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect connectionException = e; } + if (pe != null) + { + // reset the delegate to the version returned by the + // broker + initDelegate(pe); + } + if (!_connected) { retryAllowed = _failoverPolicy.failoverAllowed(); @@ -518,23 +510,41 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } - private void getDelegate() throws AMQProtocolException + private void initDelegate(ProtocolVersion pe) throws AMQProtocolException { try { - Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" + - TransportConstants.getVersionMajor() + "_" + - TransportConstants.getVersionMinor()); + Class c = Class.forName(String.format + ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", + pe.getMajorVersion(), pe.getMinorVersion())); Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); } - catch (Exception e) + catch (ClassNotFoundException e) + { + throw new AMQProtocolException + (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, + String.format("Protocol: %s.%s is rquired by the broker but is not " + + "currently supported by this client library implementation", + pe.getMajorVersion(), pe.getMinorVersion()), + e); + } + catch (NoSuchMethodException e) + { + throw new RuntimeException("unable to locate constructor for delegate", e); + } + catch (InstantiationException e) + { + throw new RuntimeException("error instantiating delegate", e); + } + catch (IllegalAccessException e) + { + throw new RuntimeException("error accessing delegate", e); + } + catch (InvocationTargetException e) { - throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, - "Protocol: " + TransportConstants.getVersionMajor() + "." - + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " + - "currently supported by this client library implementation", e); + throw new RuntimeException("error invoking delegate", e); } } @@ -615,9 +625,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return false; } - public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _delegate.makeBrokerConnection(brokerDetail); + return _delegate.makeBrokerConnection(brokerDetail); } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 07bd7ea0ae..7f36ec6e99 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -27,12 +27,13 @@ import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; public interface AMQConnectionDelegate { - public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; public Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 825a52c5cb..ce10553210 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -9,13 +9,14 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; -import org.apache.qpidity.ProtocolException; +import org.apache.qpidity.transport.ProtocolVersionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +102,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed * @throws IOException * @throws AMQException */ - public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { _qpidConnection = Client.createConnection(); try @@ -117,14 +118,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed _qpidConnection.setClosedListener(this); _conn._connected = true; } - catch(ProtocolException pe) + catch(ProtocolVersionException pe) { - throw new AMQProtocolException(null, pe.getMessage(), pe); + return new ProtocolVersion(pe.getMajor(), pe.getMinor()); } catch (QpidException e) { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); } + + return null; } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index aab094ca7d..1e65c50304 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; @@ -79,7 +80,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } - public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException { final Set<AMQState> openOrClosedStates = EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); @@ -98,6 +99,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _conn._failoverPolicy.attainedConnection(); _conn._connected = true; } + + return null; } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index fd8063e99b..090620a560 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -38,7 +38,6 @@ import javax.naming.spi.ObjectFactory; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpidity.transport.TransportConstants; public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, @@ -434,23 +433,15 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF */ public XAConnection createXAConnection() throws JMSException { - if (TransportConstants.getVersionMajor() == 0 && - TransportConstants.getVersionMinor() == 8) + try { - throw new UnsupportedOperationException("This protocol version does not support XA operations"); + return new XAConnectionImpl(_connectionDetails, _sslConfig); } - else + catch (Exception e) { - try - { - return new XAConnectionImpl(_connectionDetails, _sslConfig); - } - catch (Exception e) - { - JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); - jmse.setLinkedException(e); - throw jmse; - } + JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); + jmse.setLinkedException(e); + throw jmse; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index f8d5bbcb1c..eb0e370560 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -31,7 +31,6 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.url.QpidURL; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; -import org.apache.qpidity.ProtocolException; import org.apache.qpidity.nclient.impl.ClientSession; import org.apache.qpidity.nclient.impl.ClientSessionDelegate; import org.apache.qpidity.transport.Channel; @@ -40,8 +39,8 @@ import org.apache.qpidity.transport.Connection; import org.apache.qpidity.transport.ConnectionClose; import org.apache.qpidity.transport.ConnectionCloseCode; import org.apache.qpidity.transport.ConnectionCloseOk; -import org.apache.qpidity.transport.TransportConstants; import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.ProtocolVersionException; import org.apache.qpidity.transport.SessionDelegate; import org.apache.qpidity.transport.network.io.IoTransport; import org.apache.qpidity.transport.network.mina.MinaHandler; @@ -60,6 +59,8 @@ public class Client implements org.apache.qpidity.nclient.Connection private boolean closed = false; private long timeout = 60000; + private ProtocolHeader header = null; + /** * * @return returns a new connection to the broker. @@ -79,7 +80,6 @@ public class Client implements org.apache.qpidity.nclient.Connection ClientDelegate connectionDelegate = new ClientDelegate() { private boolean receivedClose = false; - private String _unsupportedProtocol; public SessionDelegate getSessionDelegate() { return new ClientSessionDelegate(); @@ -138,28 +138,18 @@ public class Client implements org.apache.qpidity.nclient.Connection this.receivedClose = true; } - @Override public void init(Channel ch, ProtocolHeader hdr) { // TODO: once the merge is done we'll need to update this code - // for handling 0.8 protocol version type i.e. major=8 and minor=0 :( - if (hdr.getMajor() != TransportConstants.getVersionMajor() - || hdr.getMinor() != TransportConstants.getVersionMinor()) + // for handling 0.8 protocol version type i.e. major=8 and mino + if (hdr.getMajor() != 0 || hdr.getMinor() != 10) { - _unsupportedProtocol = TransportConstants.getVersionMajor() + "." + - TransportConstants.getVersionMinor(); - TransportConstants.setVersionMajor( hdr.getMajor() ); - TransportConstants.setVersionMinor( hdr.getMinor() ); + Client.this.header = hdr; _lock.lock(); negotiationComplete.signalAll(); _lock.unlock(); } } - - @Override public String getUnsupportedProtocol() - { - return _unsupportedProtocol; - } }; connectionDelegate.setCondition(_lock,negotiationComplete); @@ -186,18 +176,15 @@ public class Client implements org.apache.qpidity.nclient.Connection } // XXX: hardcoded version numbers - _conn.send(new ProtocolHeader(1, TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor())); + _conn.send(new ProtocolHeader(1, 0, 10)); try { negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); - if( connectionDelegate.getUnsupportedProtocol() != null ) + if (header != null) { _conn.close(); - throw new ProtocolException("Unsupported protocol version: " + connectionDelegate.getUnsupportedProtocol() - , ErrorCode.UNSUPPORTED_PROTOCOL, null); - + throw new ProtocolVersionException(header.getMajor(), header.getMinor()); } } catch (InterruptedException e) diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java index f6d2829504..0912f3bdd9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -76,9 +76,7 @@ class ToyClient extends SessionDelegate public void closed() {} }); conn.send(new ProtocolHeader - (1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor())); + (1, 0, 10)); Channel ch = conn.getChannel(0); Session ssn = new Session("my-session".getBytes()); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java index 699854fb3b..e770157af4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java @@ -31,10 +31,9 @@ public abstract class ClientDelegate extends ConnectionDelegate public void init(Channel ch, ProtocolHeader hdr) { - if (hdr.getMajor() != TransportConstants.getVersionMajor() && - hdr.getMinor() != TransportConstants.getVersionMinor()) + if (hdr.getMajor() != 0 && hdr.getMinor() != 10) { - throw new RuntimeException("version missmatch: " + hdr); + throw new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java index 962dd9a5da..14bde3f18d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -82,28 +82,12 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> public void init(Channel ch, ProtocolHeader hdr) { - ch.getConnection().send(new ProtocolHeader - (1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor())); - if (hdr.getMajor() != TransportConstants.getVersionMajor() && - hdr.getMinor() != TransportConstants.getVersionMinor()) - { - // XXX - ch.getConnection().send(new ProtocolHeader - (1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor())); - ch.getConnection().close(); - } - else - { - List<Object> plain = new ArrayList<Object>(); - plain.add("PLAIN"); - List<Object> utf8 = new ArrayList<Object>(); - utf8.add("utf8"); - ch.connectionStart(null, plain, utf8); - } + ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor())); + List<Object> plain = new ArrayList<Object>(); + plain.add("PLAIN"); + List<Object> utf8 = new ArrayList<Object>(); + utf8.add("utf8"); + ch.connectionStart(null, plain, utf8); } // ---------------------------------------------- @@ -294,8 +278,4 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> _virtualHost = host; } - public String getUnsupportedProtocol() - { - return null; - } } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolException.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolVersionException.java index 596143a1b9..86fe7a3f3f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolException.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolVersionException.java @@ -1,4 +1,6 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -14,23 +16,37 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. + * */ +package org.apache.qpidity.transport; -package org.apache.qpidity; -public class ProtocolException extends QpidException +/** + * ProtocolVersionException + * + */ + +public final class ProtocolVersionException extends TransportException { - /** - * Constructor for a Ptotocol Exception. - * <p> This is the only provided constructor and the parameters have to be set to null when - * they are unknown. - * @param message A description of the reason of this exception. - * @param errorCode A string specifyin the error code of this exception. - * @param cause The linked Execption. - * - */ - public ProtocolException(String message, ErrorCode errorCode, Throwable cause) + + private final byte major; + private final byte minor; + + public ProtocolVersionException(byte major, byte minor) { - super(message, errorCode, cause); + super(String.format("version missmatch: %s-%s", major, minor)); + this.major = major; + this.minor = minor; } + + public byte getMajor() + { + return this.major; + } + + public byte getMinor() + { + return this.minor; + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java deleted file mode 100644 index e9a0705de0..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.qpidity.transport; - -public class TransportConstants -{ - - private static byte _protocol_version_minor = 10; - private static byte _protocol_version_major = 0; - - public static void setVersionMajor(byte value) - { - _protocol_version_major = value; - } - - public static void setVersionMinor(byte value) - { - _protocol_version_minor = value; - } - - public static byte getVersionMajor() - { - return _protocol_version_major; - } - - public static byte getVersionMinor() - { - return _protocol_version_minor; - } - -} |
