summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-09-04 16:27:54 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-09-04 16:27:54 +0000
commit61f551d4782033e4ed40661f2cccc04502190925 (patch)
treec3f8b5662679618ae6ea6ac6f62903aadf7939dd /java/client/src/main
parent067053041000736b8c47469e4d6746551551d5ba (diff)
downloadqpid-python-61f551d4782033e4ed40661f2cccc04502190925.tar.gz
QPID-1809 - The incorrect expcetions were due to a race condition between the mina exception notification thread and the clients main thread blocking for a frame. Occasionally, the client will start blocking just after the notification and so will Timeout. This update ensures that blocking does not occur if the connection has been marked closing or is closed. The lastException set on the StateManager is thrown instead.
The connection close also needed to take into consideration this fact. The syncWrite on for ChannelClose and ConnectionClose are now only down if we are not in a closing situation. As the 0-10 code path does not use the StateManager the changes were applied to the 0-8 Session. Further testing may be needed to validate that the 0-10 client code path does not also have this race condition. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@811471 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java19
8 files changed, 61 insertions, 49 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
index 05ac3dca9e..6bae0166d1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
@@ -39,9 +39,4 @@ public class AMQAuthenticationException extends AMQException
{
super(error, msg, cause);
}
- public boolean isHardError()
- {
- return true;
- }
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index fa4e08f62b..d7196c0abb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -33,6 +33,7 @@ import org.apache.qpid.client.message.*;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
@@ -116,12 +117,23 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public void sendClose(long timeout) throws AMQException, FailoverException
{
- getProtocolHandler().closeSession(this);
- getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
- ChannelCloseOkBody.class, timeout);
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully.
+ // we also need to check the state manager for 08/09 as the
+ // _connection variable may not be updated in time by the error receiving
+ // thread.
+ // We can't close the session if we are alreadying in the process of
+ // closing/closed the connection.
+
+ if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
+ || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING)))
+ {
+
+ getProtocolHandler().closeSession(this);
+ getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
+ ChannelCloseOkBody.class, timeout);
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully.
+ }
}
public void sendCommit() throws AMQException, FailoverException
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
index e639a33450..e40cafd72f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
@@ -40,7 +40,6 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<C
}
public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId)
- throws AMQException
{
session.getStateManager().changeState(AMQState.CONNECTION_OPEN);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index e4e58c317d..287b5957a1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -45,7 +45,6 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con
{ }
public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId)
- throws AMQException
{
_logger.debug("ConnectionTune frame received");
final MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
index f2aca58deb..8782e00a12 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
@@ -52,7 +52,7 @@ public class AMQIoTransportProtocolSession extends AMQProtocolSession
}
@Override
- public void closeProtocolSession(boolean waitLast) throws AMQException
+ public void closeProtocolSession(boolean waitLast)
{
_ioSender.close();
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 2389c9e2da..6f62070fd0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -259,7 +259,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/**
* Called when we want to create a new IoTransport session
- * @param brokerDetail
+ * @param brokerDetail
*/
public void createIoTransportSession(BrokerDetails brokerDetail)
{
@@ -271,7 +271,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
brokerDetail.useSSL());
_protocolSession.init();
}
-
+
/**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
@@ -433,12 +433,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param e the exception to propagate
*
* @see #propagateExceptionToFrameListeners
- * @see #propagateExceptionToStateWaiters
*/
public void propagateExceptionToAllWaiters(Exception e)
{
+ getStateManager().error(e);
propagateExceptionToFrameListeners(e);
- propagateExceptionToStateWaiters(e);
}
/**
@@ -469,22 +468,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
- /**
- * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
- * thing waiting for a state change.
- *
- * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
- *
- * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
- * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
- *
- * @param e the exception to propagate
- */
- public void propagateExceptionToStateWaiters(Exception e)
- {
- getStateManager().error(e);
- }
-
public void notifyFailoverStarting()
{
// Set the last exception in the sync block to ensure the ordering with add.
@@ -601,7 +584,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
}
-
+
final long sentMessages = _messagesOut++;
final boolean debug = _logger.isDebugEnabled();
@@ -667,7 +650,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
throw _lastFailoverException;
}
- if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED)
+ if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED ||
+ _stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING)
{
Exception e = _stateManager.getLastException();
if (e != null)
@@ -733,25 +717,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void closeConnection(long timeout) throws AMQException
{
- getStateManager().changeState(AMQState.CONNECTION_CLOSING);
-
ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
new AMQShortString("JMS client is closing the connection."), 0, 0);
final AMQFrame frame = body.generateFrame(0);
- try
- {
- syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _protocolSession.closeProtocolSession();
- }
- catch (AMQTimeoutException e)
+ //If the connection is already closed then don't do a syncWrite
+ if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
_protocolSession.closeProtocolSession(false);
}
- catch (FailoverException e)
+ else
{
- _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ try
+ {
+ syncWrite(frame, ConnectionCloseOkBody.class, timeout);
+ _protocolSession.closeProtocolSession();
+ }
+ catch (AMQTimeoutException e)
+ {
+ _protocolSession.closeProtocolSession(false);
+ }
+ catch (FailoverException e)
+ {
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 5e12a5e6f8..0e872170aa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -410,12 +410,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
}
- public void closeProtocolSession() throws AMQException
+ public void closeProtocolSession()
{
closeProtocolSession(true);
}
- public void closeProtocolSession(boolean waitLast) throws AMQException
+ public void closeProtocolSession(boolean waitLast)
{
_logger.debug("Waiting for last write to join.");
if (waitLast && (_lastWriteFuture != null))
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index f8645139f2..70d4697f2c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
/**
* The state manager is responsible for managing the state of the protocol session. <p/>
@@ -86,7 +87,7 @@ public class AMQStateManager implements AMQMethodListener
return _currentState;
}
- public void changeState(AMQState newState) throws AMQException
+ public void changeState(AMQState newState)
{
_logger.debug("State changing to " + newState + " from old state " + _currentState);
@@ -136,6 +137,22 @@ public class AMQStateManager implements AMQMethodListener
*/
public void error(Exception error)
{
+ if (error instanceof AMQException)
+ {
+ // AMQException should be being notified before closing the
+ // ProtocolSession. Which will change the State to CLOSED.
+ // if we have a hard error.
+ if (((AMQException)error).isHardError())
+ {
+ changeState(AMQState.CONNECTION_CLOSING);
+ }
+ }
+ else
+ {
+ // Be on the safe side here and mark the connection closed
+ changeState(AMQState.CONNECTION_CLOSED);
+ }
+
if (_waiters.size() == 0)
{
_logger.error("No Waiters for error saving as last error:" + error.getMessage());