diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-08 12:30:03 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-08 12:30:03 +0000 |
| commit | e7375322dc1083dbfffe49a903d4737a6943907e (patch) | |
| tree | 8eda764527588da967dc27ba180b0b3353eea335 /qpid/java | |
| parent | bf0089dd71fc9b173f7d8670439672394d73f847 (diff) | |
| download | qpid-python-e7375322dc1083dbfffe49a903d4737a6943907e.tar.gz | |
QPID-5978 : [Java Client] fail faster when a TCP connection is established, but the AMQP layer is not - e.g. due to SSL negotiation failure
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616736 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
2 files changed, 36 insertions, 18 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index ba5a98411f..15300a5c3b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,8 +20,16 @@ */ package org.apache.qpid.client.protocol; -import org.apache.qpid.client.HeartbeatListener; -import org.apache.qpid.util.BytesDataOutput; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +39,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.HeartbeatListener; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; @@ -59,16 +68,7 @@ import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.qpid.util.BytesDataOutput; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the @@ -182,6 +182,7 @@ public class AMQProtocolHandler implements ProtocolEngine private long _lastReadTime = System.currentTimeMillis(); private long _lastWriteTime = System.currentTimeMillis(); private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; + private Throwable _initialConnectionException; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -219,6 +220,8 @@ public class AMQProtocolHandler implements ProtocolEngine // in order to execute AMQConnection#exceptionRecievedout out of synchronization block, // otherwise it might deadlock with failover mutex boolean failoverNotAllowed = false; + boolean failedWithoutConnecting = false; + Throwable initialConnectionException = null; synchronized (this) { if (_logger.isDebugEnabled()) @@ -256,8 +259,11 @@ public class AMQProtocolHandler implements ProtocolEngine } else { + failedWithoutConnecting = true; + initialConnectionException = _initialConnectionException; _logger.debug("We are in process of establishing the initial connection"); } + _initialConnectionException = null; } else { @@ -270,6 +276,16 @@ public class AMQProtocolHandler implements ProtocolEngine _connection.exceptionReceived(new AMQDisconnectedException( "Server closed connection and reconnection not permitted.", _stateManager.getLastException())); } + else if(failedWithoutConnecting) + { + if(initialConnectionException == null) + { + initialConnectionException = _stateManager.getLastException(); + } + String message = initialConnectionException == null ? "" : initialConnectionException.getMessage(); + _connection.exceptionReceived(new AMQDisconnectedException( + "Connection could not be established: " + message, initialConnectionException)); + } } if (_logger.isDebugEnabled()) @@ -343,6 +359,7 @@ public class AMQProtocolHandler implements ProtocolEngine if (causeIsAConnectionProblem) { _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause); + _initialConnectionException = cause; } else { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java index 13a16d07b5..1bbf166d82 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java @@ -20,17 +20,18 @@ */ package org.apache.qpid.transport.network.security.ssl; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.util.Logger; public class SSLReceiver implements Receiver<ByteBuffer> { @@ -192,7 +193,7 @@ public class SSLReceiver implements Receiver<ByteBuffer> { _sslStatus.getSslLock().notifyAll(); } - exception(new TransportException("Error in SSLReceiver",e)); + exception(new TransportException("Error in SSLReceiver: " + e.getMessage(),e)); } } |
