From c3f975ff714ab0077e9946c29556edc3ed5db476 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 8 Oct 2009 08:17:33 +0000 Subject: QPID-1950 : Problem is that the thrown exception whilst an IOException does not signify that the socket has closed. So the broker had two open connections to send messages on. Change was to ensure that the previous Socket/IOSession has been closed before failover starts. Also added protected to ChannelOpenHandler to guard against out of order frames causing a NPE. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@823087 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/handler/BasicRejectMethodHandler.java | 2 +- .../qpid/server/handler/ChannelOpenHandler.java | 7 + .../qpid/client/failover/FailoverHandler.java | 11 + .../MessageDisappearWithIOExceptionTest.java | 331 +++++++++++++++++++++ .../unit/ack/AcknowledgeAfterFailoverTest.java | 2 +- .../apache/qpid/test/utils/FailoverBaseCase.java | 4 +- java/test-profiles/Excludes | 4 + java/test-profiles/test-provider.properties | 2 +- 8 files changed, 358 insertions(+), 5 deletions(-) create mode 100644 java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index f3cab10ed7..fcf3fd4337 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener messages = sendNumberedBytesMessage(_session, _queue, 10); + + // Consume first messasge + Message received = _consumer.receive(2000); + + // Verify received messages + assertNotNull("First message not received.", received); + assertEquals("Incorrect message Received", + messages.remove(0).getIntProperty("count"), + received.getIntProperty("count")); + + // Allow ack to be sent to broker, by performing a synchronous command + // along the session. +// _session.createConsumer(_session.createTemporaryQueue()).close(); + + //Retain IO Layer + AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession(); + + // Send IO Exception - causing failover + _connection.getProtocolHandler(). + exceptionCaught(_connection.getProtocolHandler().getProtocolSession().getIoSession(), + new WriteTimeoutException("WriteTimeoutException to cause failover.")); + + // Verify Failover occured through ConnectionListener + assertTrue("Failover did not occur", + _failoverOccured.await(4000, TimeUnit.MILLISECONDS)); + + //Verify new protocolSession is not the same as the original + assertNotSame("Protocol Session has not changed", + protocolSession, + _connection.getProtocolHandler().getProtocolSession()); + + /***********************************/ + // This verifies that the bug has been resolved + + // Attempt to consume again. Expect 9 messages + for (int count = 1; count < 10; count++) + { + received = _consumer.receive(2000); + assertNotNull("Expected message not received:" + count, received); + assertEquals(messages.remove(0).getIntProperty("count"), + received.getIntProperty("count")); + } + + //Verify there are no more messages + received = _consumer.receive(1000); + assertNull("Message receieved when there should be none:" + received, + received); + +// /***********************************/ +// // This verifies that the bug exists +// +// // Attempt to consume remaining 9 messages.. Expecting NONE. +// // receiving just one message should fail so no need to fail 9 times +// received = _consumer.receive(1000); +// assertNull("Message receieved when it should be null:" + received, received); +// +//// //Close the Connection which you would assume would free the messages +//// _connection.close(); +//// +//// // Reconnect +//// initialiseConnection(); +//// +//// // We should still be unable to receive messages +//// received = _consumer.receive(1000); +//// assertNull("Message receieved when it should be null:" + received, received); +//// +//// _connection.close(); +// +// // Close original IO layer. Expecting messages to be released +// protocolSession.closeProtocolSession(); +// +// // Reconnect and all should be good. +//// initialiseConnection(); +// +// // Attempt to consume again. Expect 9 messages +// for (int count = 1; count < 10; count++) +// { +// received = _consumer.receive(2000); +// assertNotNull("Expected message not received:" + count, received); +// assertEquals(messages.remove(0).getIntProperty("count"), +// received.getIntProperty("count")); +// } +// +// //Verify there are no more messages +// received = _consumer.receive(1000); +// assertNull("Message receieved when there should be none:" + received, +// received); + } + + private void initialiseConnection() + throws Exception + { + //Create Connection + _connection = (AMQConnection) getConnection(); + _connection.setConnectionListener(this); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _queue = _session.createQueue(getName()); + + // Create Consumer + _consumer = _session.createConsumer(_queue); + + //Start connection + _connection.start(); + } + + /** QpidTestCase back port to this release */ + + // modified from QTC as sendMessage is not testable. + // - should be renamed sendBlankBytesMessage + // - should be renamed sendNumberedBytesMessage + public List sendNumberedBytesMessage(Session session, Destination destination, + int count) throws Exception + { + List messages = new ArrayList(count); + + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < count; i++) + { + Message next = session.createMessage(); + + next.setIntProperty("count", count); + + producer.send(next); + + messages.add(next); + } + + producer.close(); + return messages; + } + + public void bytesSent(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover to occur + return true; + } + + public boolean preResubscribe() + { + //Allow failover to occur + return true; + } + + public void failoverComplete() + { + _failoverOccured.countDown(); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java index 30cc48691f..eb36522fac 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -61,7 +61,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con * @param transacted create a transacted session for this test * @param mode if not transacted what ack mode to use for this test * @throws Exception if a problem occured during test setup. - */ + */ @Override protected void init(boolean transacted, int mode) throws Exception { diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 5b5bb4a6a2..0426c4f45f 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -55,7 +55,7 @@ public class FailoverBaseCase extends QpidTestCase super.setUp(); // Set QPID_WORK to $QPID_WORK/ // or /tmp/ if QPID_WORK not set. - setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort()); + setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort()); startBroker(getFailingPort()); } @@ -95,7 +95,7 @@ public class FailoverBaseCase extends QpidTestCase // Ensure we shutdown any secondary brokers, even if we are unable // to cleanly tearDown the QTC. stopBroker(getFailingPort()); - FileUtils.deleteDirectory(System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort()); + FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); } } diff --git a/java/test-profiles/Excludes b/java/test-profiles/Excludes index d14d467b89..863f56ae92 100644 --- a/java/test-profiles/Excludes +++ b/java/test-profiles/Excludes @@ -30,3 +30,7 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck // QPID-143 : Failover can occur between receive and ack but we don't stop the ack. org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk + + +//temp do not commit +org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#* diff --git a/java/test-profiles/test-provider.properties b/java/test-profiles/test-provider.properties index 70a2672263..8cea012c1d 100644 --- a/java/test-profiles/test-provider.properties +++ b/java/test-profiles/test-provider.properties @@ -34,7 +34,7 @@ connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' -connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1' +connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20'' connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}' connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}' -- cgit v1.2.1