summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-07-02 10:05:49 +0000
committerAidan Skinner <aidan@apache.org>2008-07-02 10:05:49 +0000
commitf305d3d0eaed6b4b9390b86df3fc79013c905a7d (patch)
tree6c505cbad3cae9b8bdebd19d69216f916cc4a197 /java/client/src
parentb010894ebe6c468fef0c14ad869b80ef336ab11f (diff)
downloadqpid-python-f305d3d0eaed6b4b9390b86df3fc79013c905a7d.tar.gz
QPID-962 Exception handling was... unpleasing... Fix up of patch from rhs
AMQConnection: Refactor listener and remove list, we're only interested in the most recent one anyway. Add get/set for lastException, which can now be any Exception AMQConnectionDelegate_0_8.java: Stop masking/stackign exceptions, just throw them. AMQProtocolHandler.java: attainState can now throw any sort of Exception AMQStateManager.java: attainState can now throw any Exception ConnectionTest.java: check that exception cause is not null git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673343 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java77
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java12
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java1
5 files changed, 62 insertions, 55 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index a504fb2c27..cf6ac54e55 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -234,7 +235,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/*
* The last error code that occured on the connection. Used to return the correct exception to the client
*/
- protected AMQException _lastAMQException = null;
+ protected Exception _lastException = null;
/*
* The connection meta data
@@ -378,13 +379,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_delegate = new AMQConnectionDelegate_0_10(this);
}
- final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
-
class Listener implements ExceptionListener
{
public void onException(JMSException e)
{
- exceptions.add(e);
+ _lastException = e;
+ try
+ {
+ getProtocolHandler().getStateManager().changeState(AMQState.CONNECTION_CLOSED);
+
+ }
+ catch (AMQException e1)
+ {
+ // Wow, badness
+ }
}
}
@@ -443,9 +451,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// We are not currently connected
_connected = false;
- Exception lastException = new Exception();
- lastException.initCause(new ConnectException());
-
// TMG FIXME this seems... wrong...
boolean retryAllowed = true;
while (!_connected && retryAllowed )
@@ -453,8 +458,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(brokerDetails);
- lastException = null;
- _connected = true;
}
catch (AMQProtocolException pe)
{
@@ -470,12 +473,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (Exception e)
{
- lastException = e;
-
+ _lastException = e;
+ }
+ if (_lastException != null)
+ {
if (_logger.isInfoEnabled())
{
_logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
- e.getCause());
+ _lastException.getCause());
}
retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
@@ -498,31 +503,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
// Eat it, we've hopefully got all the exceptions if this happened
}
- if (exceptions.size() > 0)
- {
- JMSException e = exceptions.get(0);
- int code = -1;
- try
- {
- code = new Integer(e.getErrorCode()).intValue();
- }
- catch (NumberFormatException nfe)
- {
- // Ignore this, we have some error codes and messages swapped around
- }
-
- throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
- e.getMessage(), e);
- }
- else if (lastException != null)
+
+ if (_lastException != null)
{
- if (lastException.getCause() != null)
+ if (_lastException.getCause() != null)
{
- message = lastException.getCause().getMessage();
+ message = _lastException.getCause().getMessage();
}
else
{
- message = lastException.getMessage();
+ message = _lastException.getMessage();
}
}
@@ -534,24 +524,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else // can only be "" if getMessage() returned it therfore lastException != null
{
- message = "Unable to Connect:" + lastException.getClass();
+ message = "Unable to Connect:" + _lastException.getClass();
}
}
- AMQException e = new AMQConnectionFailureException(message, null);
+ AMQException e = new AMQConnectionFailureException(message, _lastException);
- if (lastException != null)
+ if (_lastException != null)
{
- if (lastException instanceof UnresolvedAddressException)
+ if (_lastException instanceof UnresolvedAddressException)
{
e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
null);
}
-
- if (e.getCause() != null)
- {
- e.initCause(lastException);
- }
}
throw e;
@@ -1507,4 +1492,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncPersistence;
}
+
+ public Exception getLastException()
+ {
+ return _lastException;
+ }
+
+ public void setLastException(Exception exception)
+ {
+ _lastException = exception;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
index 637cff5b7e..79f81c4a2d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
@@ -25,7 +25,9 @@ import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Iterator;
+import java.util.Set;
import javax.jms.JMSException;
import javax.jms.XASession;
@@ -76,24 +78,23 @@ public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
{
- try
+ final Set<AMQState> openOrClosedStates =
+ EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
+
+ TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ // this blocks until the connection has been set up or when an error
+ // has prevented the connection being set up
+
+ AMQState state = _conn._protocolHandler.attainState(openOrClosedStates);
+ if(state == AMQState.CONNECTION_OPEN)
{
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- // this blocks until the connection has been set up or when an error
- // has prevented the connection being set up
- _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN);
_conn._failoverPolicy.attainedConnection();
// Again this should be changed to a suitable notify
_conn._connected = true;
- }
- catch (AMQException e)
- {
- _conn._lastAMQException = e;
- throw e;
- }
+ }
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 2d8074eea2..1b75d6e829 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -559,7 +559,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_frameListeners.remove(listener);
}
*/
- public void attainState(AMQState s) throws AMQException
+ public void attainState(AMQState s) throws Exception
{
getStateManager().attainState(s);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index eda1a1f5fd..21f190bd7e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -102,7 +102,7 @@ public class AMQStateManager
}
- public void attainState(final AMQState s) throws AMQException
+ public void attainState(final AMQState s) throws Exception
{
synchronized (_stateLock)
{
@@ -118,6 +118,11 @@ public class AMQStateManager
catch (InterruptedException e)
{
_logger.warn("Thread interrupted");
+ if (_protocolSession.getAMQConnection().getLastException() != null)
+ {
+ throw _protocolSession.getAMQConnection().getLastException();
+ }
+
}
if (_currentState != s)
@@ -169,6 +174,11 @@ public class AMQStateManager
catch (InterruptedException e)
{
_logger.warn("Thread interrupted");
+ if (_protocolSession.getAMQConnection().getLastException() != null)
+ {
+ throw new AMQException(null, "Could not attain state due to exception",
+ _protocolSession.getAMQConnection().getLastException());
+ }
}
if (!stateSet.contains(_currentState))
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index f856e8c20b..97eed08ab1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -134,6 +134,7 @@ public class ConnectionTest extends TestCase
}
catch (AMQException amqe)
{
+ assertNotNull("No cause set", amqe.getCause());
if (amqe.getCause().getClass() == Exception.class)
{
System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure.");