diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-09-04 16:27:54 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-09-04 16:27:54 +0000 |
| commit | e5521a71e0171567e29395b5ba555004635beae1 (patch) | |
| tree | 7cc1d5eeeb5c13b49c85426c97d819b67570d89f /qpid/java/client/src | |
| parent | f60d221977ee66d6921e6744446a3f3aff5d5a7e (diff) | |
| download | qpid-python-e5521a71e0171567e29395b5ba555004635beae1.tar.gz | |
QPID-1809 - The incorrect expcetions were due to a race condition between the mina exception notification thread and the clients main thread blocking for a frame. Occasionally, the client will start blocking just after the notification and so will Timeout. This update ensures that blocking does not occur if the connection has been marked closing or is closed. The lastException set on the StateManager is thrown instead.
The connection close also needed to take into consideration this fact. The syncWrite on for ChannelClose and ConnectionClose are now only down if we are not in a closing situation. As the 0-10 code path does not use the StateManager the changes were applied to the 0-8 Session. Further testing may be needed to validate that the 0-10 client code path does not also have this race condition.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@811471 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
10 files changed, 64 insertions, 59 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index 05ac3dca9e..6bae0166d1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -39,9 +39,4 @@ public class AMQAuthenticationException extends AMQException { super(error, msg, cause); } - public boolean isHardError() - { - return true; - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index fa4e08f62b..d7196c0abb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -33,6 +33,7 @@ import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; @@ -116,12 +117,23 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public void sendClose(long timeout) throws AMQException, FailoverException { - getProtocolHandler().closeSession(this); - getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), - ChannelCloseOkBody.class, timeout); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully. + // we also need to check the state manager for 08/09 as the + // _connection variable may not be updated in time by the error receiving + // thread. + // We can't close the session if we are alreadying in the process of + // closing/closed the connection. + + if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED) + || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING))) + { + + getProtocolHandler().closeSession(this); + getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), + new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), + ChannelCloseOkBody.class, timeout); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } } public void sendCommit() throws AMQException, FailoverException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java index e639a33450..e40cafd72f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java @@ -40,7 +40,6 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<C } public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId) - throws AMQException { session.getStateManager().changeState(AMQState.CONNECTION_OPEN); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index e4e58c317d..287b5957a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -45,7 +45,6 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con { } public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId) - throws AMQException { _logger.debug("ConnectionTune frame received"); final MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java index f2aca58deb..8782e00a12 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java @@ -52,7 +52,7 @@ public class AMQIoTransportProtocolSession extends AMQProtocolSession } @Override - public void closeProtocolSession(boolean waitLast) throws AMQException + public void closeProtocolSession(boolean waitLast) { _ioSender.close(); _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 2389c9e2da..6f62070fd0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -259,7 +259,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** * Called when we want to create a new IoTransport session - * @param brokerDetail + * @param brokerDetail */ public void createIoTransportSession(BrokerDetails brokerDetail) { @@ -271,7 +271,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter brokerDetail.useSSL()); _protocolSession.init(); } - + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case @@ -433,12 +433,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param e the exception to propagate * * @see #propagateExceptionToFrameListeners - * @see #propagateExceptionToStateWaiters */ public void propagateExceptionToAllWaiters(Exception e) { + getStateManager().error(e); propagateExceptionToFrameListeners(e); - propagateExceptionToStateWaiters(e); } /** @@ -469,22 +468,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } - /** - * This caters for the case where we only need to propogate an exception to the the state manager to interupt any - * thing waiting for a state change. - * - * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement. - * - * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal - * cases {@link #propagateExceptionToAllWaiters} would be the correct choice. - * - * @param e the exception to propagate - */ - public void propagateExceptionToStateWaiters(Exception e) - { - getStateManager().error(e); - } - public void notifyFailoverStarting() { // Set the last exception in the sync block to ensure the ordering with add. @@ -601,7 +584,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _protocolLogger.debug(String.format("SEND: [%s] %s", this, message)); } - + final long sentMessages = _messagesOut++; final boolean debug = _logger.isDebugEnabled(); @@ -667,7 +650,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter throw _lastFailoverException; } - if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED) + if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED || + _stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING) { Exception e = _stateManager.getLastException(); if (e != null) @@ -733,25 +717,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void closeConnection(long timeout) throws AMQException { - getStateManager().changeState(AMQState.CONNECTION_CLOSING); - ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode new AMQShortString("JMS client is closing the connection."), 0, 0); final AMQFrame frame = body.generateFrame(0); - try - { - syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _protocolSession.closeProtocolSession(); - } - catch (AMQTimeoutException e) + //If the connection is already closed then don't do a syncWrite + if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) { _protocolSession.closeProtocolSession(false); } - catch (FailoverException e) + else { - _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + try + { + syncWrite(frame, ConnectionCloseOkBody.class, timeout); + _protocolSession.closeProtocolSession(); + } + catch (AMQTimeoutException e) + { + _protocolSession.closeProtocolSession(false); + } + catch (FailoverException e) + { + _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 5e12a5e6f8..0e872170aa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -410,12 +410,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); } - public void closeProtocolSession() throws AMQException + public void closeProtocolSession() { closeProtocolSession(true); } - public void closeProtocolSession(boolean waitLast) throws AMQException + public void closeProtocolSession(boolean waitLast) { _logger.debug("Waiting for last write to join."); if (waitLast && (_lastWriteFuture != null)) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index f8645139f2..70d4697f2c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.Set; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.io.IOException; /** * The state manager is responsible for managing the state of the protocol session. <p/> @@ -86,7 +87,7 @@ public class AMQStateManager implements AMQMethodListener return _currentState; } - public void changeState(AMQState newState) throws AMQException + public void changeState(AMQState newState) { _logger.debug("State changing to " + newState + " from old state " + _currentState); @@ -136,6 +137,22 @@ public class AMQStateManager implements AMQMethodListener */ public void error(Exception error) { + if (error instanceof AMQException) + { + // AMQException should be being notified before closing the + // ProtocolSession. Which will change the State to CLOSED. + // if we have a hard error. + if (((AMQException)error).isHardError()) + { + changeState(AMQState.CONNECTION_CLOSING); + } + } + else + { + // Be on the safe side here and mark the connection closed + changeState(AMQState.CONNECTION_CLOSED); + } + if (_waiters.size() == 0) { _logger.error("No Waiters for error saving as last error:" + error.getMessage()); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index ce79080e97..da44822ec3 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -85,7 +85,7 @@ public class MockAMQConnection extends AMQConnection } @Override - public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException { _connected = true; _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 10ec220d9e..fc7f8148f0 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -200,15 +200,8 @@ public class AMQProtocolHandlerTest extends TestCase _handler.getStateManager().error(trigger); _logger.info("Setting state to be CONNECTION_CLOSED."); - try - { - _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - } - catch (AMQException e) - { - _logger.error("Unable to change the state to closed.", e); - fail("Unable to change the state to closed due to :"+e.getMessage()); - } + + _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); _logger.info("Firing exception"); _handler.propagateExceptionToFrameListeners(trigger); |
