diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-07-02 10:05:49 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-07-02 10:05:49 +0000 |
| commit | f305d3d0eaed6b4b9390b86df3fc79013c905a7d (patch) | |
| tree | 6c505cbad3cae9b8bdebd19d69216f916cc4a197 /java/client/src | |
| parent | b010894ebe6c468fef0c14ad869b80ef336ab11f (diff) | |
| download | qpid-python-f305d3d0eaed6b4b9390b86df3fc79013c905a7d.tar.gz | |
QPID-962 Exception handling was... unpleasing... Fix up of patch from rhs
AMQConnection: Refactor listener and remove list, we're only interested in the most recent one anyway. Add get/set for lastException, which can now be any Exception
AMQConnectionDelegate_0_8.java: Stop masking/stackign exceptions, just throw them.
AMQProtocolHandler.java: attainState can now throw any sort of Exception
AMQStateManager.java: attainState can now throw any Exception
ConnectionTest.java: check that exception cause is not null
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673343 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
5 files changed, 62 insertions, 55 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index a504fb2c27..cf6ac54e55 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -234,7 +235,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /* * The last error code that occured on the connection. Used to return the correct exception to the client */ - protected AMQException _lastAMQException = null; + protected Exception _lastException = null; /* * The connection meta data @@ -378,13 +379,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _delegate = new AMQConnectionDelegate_0_10(this); } - final ArrayList<JMSException> exceptions = new ArrayList<JMSException>(); - class Listener implements ExceptionListener { public void onException(JMSException e) { - exceptions.add(e); + _lastException = e; + try + { + getProtocolHandler().getStateManager().changeState(AMQState.CONNECTION_CLOSED); + + } + catch (AMQException e1) + { + // Wow, badness + } } } @@ -443,9 +451,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // We are not currently connected _connected = false; - Exception lastException = new Exception(); - lastException.initCause(new ConnectException()); - // TMG FIXME this seems... wrong... boolean retryAllowed = true; while (!_connected && retryAllowed ) @@ -453,8 +458,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(brokerDetails); - lastException = null; - _connected = true; } catch (AMQProtocolException pe) { @@ -470,12 +473,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (Exception e) { - lastException = e; - + _lastException = e; + } + if (_lastException != null) + { if (_logger.isInfoEnabled()) { _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), - e.getCause()); + _lastException.getCause()); } retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); @@ -498,31 +503,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { // Eat it, we've hopefully got all the exceptions if this happened } - if (exceptions.size() > 0) - { - JMSException e = exceptions.get(0); - int code = -1; - try - { - code = new Integer(e.getErrorCode()).intValue(); - } - catch (NumberFormatException nfe) - { - // Ignore this, we have some error codes and messages swapped around - } - - throw new AMQConnectionFailureException(AMQConstant.getConstant(code), - e.getMessage(), e); - } - else if (lastException != null) + + if (_lastException != null) { - if (lastException.getCause() != null) + if (_lastException.getCause() != null) { - message = lastException.getCause().getMessage(); + message = _lastException.getCause().getMessage(); } else { - message = lastException.getMessage(); + message = _lastException.getMessage(); } } @@ -534,24 +524,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else // can only be "" if getMessage() returned it therfore lastException != null { - message = "Unable to Connect:" + lastException.getClass(); + message = "Unable to Connect:" + _lastException.getClass(); } } - AMQException e = new AMQConnectionFailureException(message, null); + AMQException e = new AMQConnectionFailureException(message, _lastException); - if (lastException != null) + if (_lastException != null) { - if (lastException instanceof UnresolvedAddressException) + if (_lastException instanceof UnresolvedAddressException) { e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(), null); } - - if (e.getCause() != null) - { - e.initCause(lastException); - } } throw e; @@ -1507,4 +1492,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } + + public Exception getLastException() + { + return _lastException; + } + + public void setLastException(Exception exception) + { + _lastException = exception; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java index 637cff5b7e..79f81c4a2d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java @@ -25,7 +25,9 @@ import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.EnumSet; import java.util.Iterator; +import java.util.Set; import javax.jms.JMSException; import javax.jms.XASession; @@ -76,24 +78,23 @@ public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } - public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException { - try + final Set<AMQState> openOrClosedStates = + EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); + + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + // this blocks until the connection has been set up or when an error + // has prevented the connection being set up + + AMQState state = _conn._protocolHandler.attainState(openOrClosedStates); + if(state == AMQState.CONNECTION_OPEN) { - TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); - // this blocks until the connection has been set up or when an error - // has prevented the connection being set up - _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN); _conn._failoverPolicy.attainedConnection(); // Again this should be changed to a suitable notify _conn._connected = true; - } - catch (AMQException e) - { - _conn._lastAMQException = e; - throw e; - } + } } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 2d8074eea2..1b75d6e829 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -559,7 +559,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _frameListeners.remove(listener); } */ - public void attainState(AMQState s) throws AMQException + public void attainState(AMQState s) throws Exception { getStateManager().attainState(s); } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index eda1a1f5fd..21f190bd7e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -102,7 +102,7 @@ public class AMQStateManager } - public void attainState(final AMQState s) throws AMQException + public void attainState(final AMQState s) throws Exception { synchronized (_stateLock) { @@ -118,6 +118,11 @@ public class AMQStateManager catch (InterruptedException e) { _logger.warn("Thread interrupted"); + if (_protocolSession.getAMQConnection().getLastException() != null) + { + throw _protocolSession.getAMQConnection().getLastException(); + } + } if (_currentState != s) @@ -169,6 +174,11 @@ public class AMQStateManager catch (InterruptedException e) { _logger.warn("Thread interrupted"); + if (_protocolSession.getAMQConnection().getLastException() != null) + { + throw new AMQException(null, "Could not attain state due to exception", + _protocolSession.getAMQConnection().getLastException()); + } } if (!stateSet.contains(_currentState)) diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index f856e8c20b..97eed08ab1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -134,6 +134,7 @@ public class ConnectionTest extends TestCase } catch (AMQException amqe) { + assertNotNull("No cause set", amqe.getCause()); if (amqe.getCause().getClass() == Exception.class) { System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure."); |
