summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-08 12:30:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-08 12:30:03 +0000
commite7375322dc1083dbfffe49a903d4737a6943907e (patch)
tree8eda764527588da967dc27ba180b0b3353eea335 /qpid/java
parentbf0089dd71fc9b173f7d8670439672394d73f847 (diff)
downloadqpid-python-e7375322dc1083dbfffe49a903d4737a6943907e.tar.gz
QPID-5978 : [Java Client] fail faster when a TCP connection is established, but the AMQP layer is not - e.g. due to SSL negotiation failure
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616736 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java41
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java13
2 files changed, 36 insertions, 18 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index ba5a98411f..15300a5c3b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,8 +20,16 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.qpid.client.HeartbeatListener;
-import org.apache.qpid.util.BytesDataOutput;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +39,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
@@ -59,16 +68,7 @@ import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import org.apache.qpid.util.BytesDataOutput;
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
@@ -182,6 +182,7 @@ public class AMQProtocolHandler implements ProtocolEngine
private long _lastReadTime = System.currentTimeMillis();
private long _lastWriteTime = System.currentTimeMillis();
private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
+ private Throwable _initialConnectionException;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -219,6 +220,8 @@ public class AMQProtocolHandler implements ProtocolEngine
// in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
// otherwise it might deadlock with failover mutex
boolean failoverNotAllowed = false;
+ boolean failedWithoutConnecting = false;
+ Throwable initialConnectionException = null;
synchronized (this)
{
if (_logger.isDebugEnabled())
@@ -256,8 +259,11 @@ public class AMQProtocolHandler implements ProtocolEngine
}
else
{
+ failedWithoutConnecting = true;
+ initialConnectionException = _initialConnectionException;
_logger.debug("We are in process of establishing the initial connection");
}
+ _initialConnectionException = null;
}
else
{
@@ -270,6 +276,16 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection.exceptionReceived(new AMQDisconnectedException(
"Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
}
+ else if(failedWithoutConnecting)
+ {
+ if(initialConnectionException == null)
+ {
+ initialConnectionException = _stateManager.getLastException();
+ }
+ String message = initialConnectionException == null ? "" : initialConnectionException.getMessage();
+ _connection.exceptionReceived(new AMQDisconnectedException(
+ "Connection could not be established: " + message, initialConnectionException));
+ }
}
if (_logger.isDebugEnabled())
@@ -343,6 +359,7 @@ public class AMQProtocolHandler implements ProtocolEngine
if (causeIsAConnectionProblem)
{
_logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause);
+ _initialConnectionException = cause;
}
else
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index 13a16d07b5..1bbf166d82 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -20,17 +20,18 @@
*/
package org.apache.qpid.transport.network.security.ssl;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
public class SSLReceiver implements Receiver<ByteBuffer>
{
@@ -192,7 +193,7 @@ public class SSLReceiver implements Receiver<ByteBuffer>
{
_sslStatus.getSslLock().notifyAll();
}
- exception(new TransportException("Error in SSLReceiver",e));
+ exception(new TransportException("Error in SSLReceiver: " + e.getMessage(),e));
}
}