summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-09-07 14:31:40 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-09-07 14:31:40 +0000
commit4a5fb79921be484f6547faebc76b7c57e944048e (patch)
tree6d910e6284d75d2cac5243b8254ebcb07e8e62bc /qpid/java
parent18dd75f093b430e2d34450fd967af431dfd4a4b8 (diff)
downloadqpid-python-4a5fb79921be484f6547faebc76b7c57e944048e.tar.gz
QPID-1809, QPID-2081 : Corrected ChannelClose logic. Removed an unnecessary sync on the failoverMutex in AMQSession that was causing the notification of the close to be blocked until a TimeOutException occured.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@812153 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java91
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java63
3 files changed, 111 insertions, 72 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index dad656bd50..2e3e417c95 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -721,25 +721,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!_closed.getAndSet(true))
{
- synchronized (getFailoverMutex())
+ synchronized (_messageDeliveryLock)
{
- synchronized (_messageDeliveryLock)
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ AMQException amqe;
+ if (e instanceof AMQException)
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException("Closing session forcibly", e);
- }
-
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
}
+
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
}
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 2b6745ebe4..2cf19bf391 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -32,7 +32,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +47,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
}
public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
- throws AMQException
+ throws AMQException
{
_logger.debug("ChannelClose method received");
@@ -59,52 +58,62 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
-
-
ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody();
AMQFrame frame = body.generateFrame(channelId);
session.writeFrame(frame);
-
- if (errorCode != AMQConstant.REPLY_SUCCESS)
+ try
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
- }
-
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- throw new AMQNoConsumersException("Error: " + reason, null, null);
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- throw new AMQNoRouteException("Error: " + reason, null, null);
- }
- else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- _logger.debug("Broker responded with Invalid Argument.");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ }
+
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ throw new AMQNoConsumersException("Error: " + reason, null, null);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ throw new AMQNoRouteException("Error: " + reason, null, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ {
+ _logger.debug("Broker responded with Invalid Argument.");
+
+ throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
+ }
+ else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+ {
+ _logger.debug("Broker responded with Invalid Routing Key.");
+
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
+ }
+ else
+ {
+
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
+ }
- throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
- }
- else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
- {
- _logger.debug("Broker responded with Invalid Routing Key.");
-
- throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
- }
- else
- {
- throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
}
-
}
- // fixme why is this only done when the close is expected...
- // should the above forced closes not also cause a close?
- // ----------
- // Closing the session only when it is expected allows the errors to be processed
- // Calling this here will prevent failover. So we should do this for all exceptions
- // that should never cause failover. Such as authentication errors.
-
- session.channelClosed(channelId, errorCode, String.valueOf(reason));
+ finally
+ {
+ // fixme why is this only done when the close is expected...
+ // should the above forced closes not also cause a close?
+ // ----------
+ // Closing the session only when it is expected allows the errors to be processed
+ // Calling this here will prevent failover. So we should do this for all exceptions
+ // that should never cause failover. Such as authentication errors.
+ // ----
+ // 2009-09-07 - ritchiem
+ // calling channelClosed will only close this session and will not
+ // prevent failover. If we don't close the session here then we will
+ // have problems during the session close as it will attempt to
+ // close the session that the broker has closed,
+
+ session.channelClosed(channelId, errorCode, String.valueOf(reason));
+ }
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
index 3eddc8d7a5..3fb6cd3526 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.test.unit.close;
-import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
@@ -32,9 +31,52 @@ import org.apache.qpid.test.utils.QpidTestCase;
import javax.jms.Session;
-/** QPID-1085 */
+/** QPID-1809
+ *
+ * Race condition on error handling and close logic.
+ *
+ * See most often with SimpleACLTest as this test is the expects the server to
+ * shut the connection/channels. This sort of testing is not performed by many,
+ * if any, of the other system tests.
+ *
+ * The problem is that we have two threads
+ *
+ * MainThread Exception(Mina)Thread
+ * | |
+ * Performs |
+ * ACtion |
+ * | Receives Server
+ * | Close
+ * Blocks for |
+ * Response |
+ * | Starts To Notify
+ * | client
+ * | |
+ * | <----- Notify Main Thread
+ * Notification |
+ * wakes client |
+ * | |
+ * Client then |
+ * processes Error. |
+ * | |
+ * Potentially Attempting Close Channel/Connection
+ * Connection Close
+ *
+ * The two threads both attempt to close the connection but the main thread does
+ * so assuming that the connection is open and valid.
+ *
+ * The Exception thread must modify the connection so that no furter syncWait
+ * commands are performed.
+ *
+ * This test sends an ExchangeDeclare that is Asynchronous and will fail and
+ * so cause a ChannelClose error but we perform a syncWait so that we can be
+ * sure to test that the BlockingWaiter is correctly awoken.
+ *
+ */
public class JavaServerCloseRaceConditionTest extends QpidTestCase
{
+ private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup";
+
public void test() throws Exception
{
@@ -42,14 +84,11 @@ public class JavaServerCloseRaceConditionTest extends QpidTestCase
AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQDestination destination = (AMQDestination) session.createQueue(getTestQueueName());
-
// Set no wait true so that we block the connection
// Also set a different exchange class string so the attempt to declare
// the exchange causes an exchange.
- ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), destination.getExchangeName(), new AMQShortString("NewTypeForException"),
- destination.getExchangeName().toString().startsWith("amq."),
- false, false, false, true, null);
+ ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null,
+ true, false, false, false, true, null);
AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId());
@@ -60,10 +99,7 @@ public class JavaServerCloseRaceConditionTest extends QpidTestCase
}
catch (Exception e)
{
- if (!(e instanceof AMQAuthenticationException))
- {
- fail("Cause was not AMQAuthenticationException. Was " + e.getClass() + ":" + e.getMessage());
- }
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
}
try
@@ -76,10 +112,7 @@ public class JavaServerCloseRaceConditionTest extends QpidTestCase
}
catch (Exception e)
{
- if (!(e instanceof AMQAuthenticationException))
- {
- fail("Cause was not AMQAuthenticationException. Was " + e.getClass() + ":" + e.getMessage());
- }
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
}
}