diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-07-29 02:07:20 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-07-29 02:07:20 +0000 |
| commit | 188748bd3e3fba0e16d3d3d4bc7b1de72e285d09 (patch) | |
| tree | 73e7eccc4529e9453bd4038410d2f1bff2611658 /java/client | |
| parent | 3d5456aa0817248733d721050d6f3bdb9f8782da (diff) | |
| download | qpid-python-188748bd3e3fba0e16d3d3d4bc7b1de72e285d09.tar.gz | |
QPID-1201: fixed up version of aidan's patch, there are still failures when running against an external java broker, however we seem to get past basic connection negotiation now
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@680602 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
6 files changed, 67 insertions, 72 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b3b3cc1ffd..bb28e70d76 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -36,7 +36,6 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpidity.transport.TransportConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +57,7 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; @@ -364,10 +364,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - // We always assume that the broker supports the lates AMQ protocol verions - // thie is currently 0.10 - // TODO: use this code once we have switch to 0.10 - // getDelegate(); _delegate = new AMQConnectionDelegate_0_10(this); } @@ -420,21 +416,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Exception connectionException = null; while (!_connected && retryAllowed) { + ProtocolVersion pe = null; try { - makeBrokerConnection(brokerDetails); - } - catch (AMQProtocolException pe) - { - if (_logger.isInfoEnabled()) - { - _logger.info(pe.getMessage()); - _logger.info("Trying broker supported protocol version: " + - TransportConstants.getVersionMajor() + "." + - TransportConstants.getVersionMinor()); - } - // we need to check whether we have a delegate for the supported protocol - getDelegate(); + pe = makeBrokerConnection(brokerDetails); } catch (Exception e) { @@ -447,6 +432,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect connectionException = e; } + if (pe != null) + { + // reset the delegate to the version returned by the + // broker + initDelegate(pe); + } + if (!_connected) { retryAllowed = _failoverPolicy.failoverAllowed(); @@ -518,23 +510,41 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } - private void getDelegate() throws AMQProtocolException + private void initDelegate(ProtocolVersion pe) throws AMQProtocolException { try { - Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" + - TransportConstants.getVersionMajor() + "_" + - TransportConstants.getVersionMinor()); + Class c = Class.forName(String.format + ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", + pe.getMajorVersion(), pe.getMinorVersion())); Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); } - catch (Exception e) + catch (ClassNotFoundException e) + { + throw new AMQProtocolException + (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, + String.format("Protocol: %s.%s is rquired by the broker but is not " + + "currently supported by this client library implementation", + pe.getMajorVersion(), pe.getMinorVersion()), + e); + } + catch (NoSuchMethodException e) + { + throw new RuntimeException("unable to locate constructor for delegate", e); + } + catch (InstantiationException e) + { + throw new RuntimeException("error instantiating delegate", e); + } + catch (IllegalAccessException e) + { + throw new RuntimeException("error accessing delegate", e); + } + catch (InvocationTargetException e) { - throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, - "Protocol: " + TransportConstants.getVersionMajor() + "." - + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " + - "currently supported by this client library implementation", e); + throw new RuntimeException("error invoking delegate", e); } } @@ -615,9 +625,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return false; } - public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _delegate.makeBrokerConnection(brokerDetail); + return _delegate.makeBrokerConnection(brokerDetail); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 07bd7ea0ae..7f36ec6e99 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -27,12 +27,13 @@ import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; public interface AMQConnectionDelegate { - public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; public Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 825a52c5cb..ce10553210 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -9,13 +9,14 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; -import org.apache.qpidity.ProtocolException; +import org.apache.qpidity.transport.ProtocolVersionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +102,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed * @throws IOException * @throws AMQException */ - public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { _qpidConnection = Client.createConnection(); try @@ -117,14 +118,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed _qpidConnection.setClosedListener(this); _conn._connected = true; } - catch(ProtocolException pe) + catch(ProtocolVersionException pe) { - throw new AMQProtocolException(null, pe.getMessage(), pe); + return new ProtocolVersion(pe.getMajor(), pe.getMinor()); } catch (QpidException e) { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); } + + return null; } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index aab094ca7d..1e65c50304 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; @@ -79,7 +80,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } - public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException { final Set<AMQState> openOrClosedStates = EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); @@ -98,6 +99,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _conn._failoverPolicy.attainedConnection(); _conn._connected = true; } + + return null; } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index fd8063e99b..090620a560 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -38,7 +38,6 @@ import javax.naming.spi.ObjectFactory; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpidity.transport.TransportConstants; public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, @@ -434,23 +433,15 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF */ public XAConnection createXAConnection() throws JMSException { - if (TransportConstants.getVersionMajor() == 0 && - TransportConstants.getVersionMinor() == 8) + try { - throw new UnsupportedOperationException("This protocol version does not support XA operations"); + return new XAConnectionImpl(_connectionDetails, _sslConfig); } - else + catch (Exception e) { - try - { - return new XAConnectionImpl(_connectionDetails, _sslConfig); - } - catch (Exception e) - { - JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); - jmse.setLinkedException(e); - throw jmse; - } + JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); + jmse.setLinkedException(e); + throw jmse; } } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index f8d5bbcb1c..eb0e370560 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -31,7 +31,6 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.url.QpidURL; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; -import org.apache.qpidity.ProtocolException; import org.apache.qpidity.nclient.impl.ClientSession; import org.apache.qpidity.nclient.impl.ClientSessionDelegate; import org.apache.qpidity.transport.Channel; @@ -40,8 +39,8 @@ import org.apache.qpidity.transport.Connection; import org.apache.qpidity.transport.ConnectionClose; import org.apache.qpidity.transport.ConnectionCloseCode; import org.apache.qpidity.transport.ConnectionCloseOk; -import org.apache.qpidity.transport.TransportConstants; import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.ProtocolVersionException; import org.apache.qpidity.transport.SessionDelegate; import org.apache.qpidity.transport.network.io.IoTransport; import org.apache.qpidity.transport.network.mina.MinaHandler; @@ -60,6 +59,8 @@ public class Client implements org.apache.qpidity.nclient.Connection private boolean closed = false; private long timeout = 60000; + private ProtocolHeader header = null; + /** * * @return returns a new connection to the broker. @@ -79,7 +80,6 @@ public class Client implements org.apache.qpidity.nclient.Connection ClientDelegate connectionDelegate = new ClientDelegate() { private boolean receivedClose = false; - private String _unsupportedProtocol; public SessionDelegate getSessionDelegate() { return new ClientSessionDelegate(); @@ -138,28 +138,18 @@ public class Client implements org.apache.qpidity.nclient.Connection this.receivedClose = true; } - @Override public void init(Channel ch, ProtocolHeader hdr) { // TODO: once the merge is done we'll need to update this code - // for handling 0.8 protocol version type i.e. major=8 and minor=0 :( - if (hdr.getMajor() != TransportConstants.getVersionMajor() - || hdr.getMinor() != TransportConstants.getVersionMinor()) + // for handling 0.8 protocol version type i.e. major=8 and mino + if (hdr.getMajor() != 0 || hdr.getMinor() != 10) { - _unsupportedProtocol = TransportConstants.getVersionMajor() + "." + - TransportConstants.getVersionMinor(); - TransportConstants.setVersionMajor( hdr.getMajor() ); - TransportConstants.setVersionMinor( hdr.getMinor() ); + Client.this.header = hdr; _lock.lock(); negotiationComplete.signalAll(); _lock.unlock(); } } - - @Override public String getUnsupportedProtocol() - { - return _unsupportedProtocol; - } }; connectionDelegate.setCondition(_lock,negotiationComplete); @@ -186,18 +176,15 @@ public class Client implements org.apache.qpidity.nclient.Connection } // XXX: hardcoded version numbers - _conn.send(new ProtocolHeader(1, TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor())); + _conn.send(new ProtocolHeader(1, 0, 10)); try { negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); - if( connectionDelegate.getUnsupportedProtocol() != null ) + if (header != null) { _conn.close(); - throw new ProtocolException("Unsupported protocol version: " + connectionDelegate.getUnsupportedProtocol() - , ErrorCode.UNSUPPORTED_PROTOCOL, null); - + throw new ProtocolVersionException(header.getMajor(), header.getMinor()); } } catch (InterruptedException e) |
