diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-10-13 11:31:09 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-10-13 11:31:09 +0000 |
| commit | 48c60f3b94349d3de7fcbfef7c4318d5ef9d046b (patch) | |
| tree | 4f5f4b0e4a55734e479991520809b6498989cb7c /java | |
| parent | 6e2caff9d614ea4ce10d050bf0dbd7a0e548a671 (diff) | |
| download | qpid-python-48c60f3b94349d3de7fcbfef7c4318d5ef9d046b.tar.gz | |
QPID-1950 : Simplified the connection by using the default URL and configuring retry rather than using the default failover URL that has multiple brokers defined. Whilst this may not stop the test failing it will make the log files simpler.
Updated FailoverHandler to ensure that any pending write is sync'd if possible. Also updated Test to perform a synchronous operation after the ack to ensure it arrives at the broker, QPID-2138 highlights that it doesn't get there due to what appears to be Mina propagating the exception ahead of the data.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@824704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
2 files changed, 28 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 7fa7004a9e..cb6c196a45 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -147,8 +147,16 @@ public class FailoverHandler implements Runnable // So lets make a new one. _amqProtocolHandler.setStateManager(new AMQStateManager()); - // Close the session, false says don't wait for it to close, just close it. - _amqProtocolHandler.getProtocolSession().closeProtocolSession(false); + // Close the session, we need to wait for it to close as there may have + // been data in transit such as an ack that is still valid to send. + // + // While we are allowing data to continue to be written to the + // socket assuming the connection is still valid, we do not consider + // the possibility that the problem that triggered failover was + // entirely client side. In that situation the socket will still be + // open and the we should really send a ConnectionClose to be AMQP + // compliant. + _amqProtocolHandler.getProtocolSession().closeProtocolSession(); // Use a fresh new StateManager for the reconnection attempts _amqProtocolHandler.setStateManager(new AMQStateManager()); diff --git a/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java index 978b7f1c22..2d688bcc51 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java @@ -179,9 +179,16 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem messages.remove(0).getIntProperty("count"), received.getIntProperty("count")); - // Allow ack to be sent to broker, by performing a synchronous command - // along the session. -// _session.createConsumer(_session.createTemporaryQueue()).close(); + // When the Exception is received by the underlying IO layer it will + // initiate failover. The first step of which is to ensure that the + // existing conection is closed. So in this situation the connection + // will be flushed casuing the above ACK to be sent to the broker. + // + // That said: + // when the socket close is detected on the server it will rise up the + // Mina filter chain and interrupt processing. + // this has been raised as QPID-2138 + _session.createConsumer(_session.createTemporaryQueue()).close(); //Retain IO Layer AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession(); @@ -261,8 +268,14 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem private void initialiseConnection() throws Exception { - //Create Connection - _connection = (AMQConnection) getConnection(); + //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default + _connection = (AMQConnection) getConnection(getConnectionFactory("default").getConnectionURL()); + // The default connection does not have any retries configured so + // Allow this connection to retry so that we can block on the failover. + // The alternative would be to use the getConnection() default. However, + // this would add additional complexity in the logging as a second + // broker is defined in that url. We do not need it for this test. + _connection.getFailoverPolicy().getCurrentMethod().setRetries(1); _connection.setConnectionListener(this); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
