summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-10-08 08:17:33 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-10-08 08:17:33 +0000
commitc3f975ff714ab0077e9946c29556edc3ed5db476 (patch)
treeb1183df76defe3fae7db2acd2b26eb524357a1eb /java
parentd5c872e6f0e840de21cb47f882f45c9412873061 (diff)
downloadqpid-python-c3f975ff714ab0077e9946c29556edc3ed5db476.tar.gz
QPID-1950 : Problem is that the thrown exception whilst an IOException does not signify that the socket has closed. So the broker had two open connections to send messages on. Change was to ensure that the previous Socket/IOSession has been closed before failover starts. Also added protected to ChannelOpenHandler to guard against out of order frames causing a NPE.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@823087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java11
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java331
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java4
-rw-r--r--java/test-profiles/Excludes4
-rw-r--r--java/test-profiles/test-provider.properties2
8 files changed, 358 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index f3cab10ed7..fcf3fd4337 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
- }
+ }
else
{
if (message.isQueueDeleted())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 5d7adc6371..9918013888 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,6 +54,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+
final AMQChannel channel = new AMQChannel(session,channelId,
virtualHost.getMessageStore());
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 927f660932..7fa7004a9e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -140,6 +140,17 @@ public class FailoverHandler implements Runnable
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
+
+ // We are failing over so lets ensure any existing ProtocolSessions
+ // are closed. Closing them will update the stateManager which we
+ // probably don't want to record the change to the closed state.
+ // So lets make a new one.
+ _amqProtocolHandler.setStateManager(new AMQStateManager());
+
+ // Close the session, false says don't wait for it to close, just close it.
+ _amqProtocolHandler.getProtocolSession().closeProtocolSession(false);
+
+ // Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
new file mode 100644
index 0000000000..8d707c4c79
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
@@ -0,0 +1,331 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.failover;
+
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.AMQConnectionClosedException;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test case based on user reported error.
+ *
+ * Summary:
+ * A user has reported message loss from their application. On bouncing of
+ * the broker the 'lost' messages are delivered to the broker.
+ *
+ * Note:
+ * The client was using Spring so that may influence the situation.
+ *
+ * Issue:
+ * The log files show 7 instances of the following which result in 7
+ * missing messages.
+ *
+ * The client log files show:
+ *
+ * The broker log file show:
+ *
+ *
+ * 7 missing messages have delivery tags 5-11. Which says that they are
+ * sequentially the next message from the broker.
+ *
+ * The only way for the 'without a handler' log to occur is if the consumer
+ * has been removed from the look up table of the dispatcher.
+ * And the only way for the 'null message' log to occur on the broker is is
+ * if the message does not exist in the unacked-map
+ *
+ * The consumer is only removed from the list during session
+ * closure and failover.
+ *
+ * If the session was closed then the broker would requeue the unacked
+ * messages so the potential exists to have an empty map but the broker
+ * will not send a message out after the unacked map has been cleared.
+ *
+ * When failover occurs the _consumer map is cleared and the consumers are
+ * resubscribed. This is down without first stopping any existing
+ * dispatcher so there exists the potential to receive a message after
+ * the _consumer map has been cleared which is how the 'without a handler'
+ * log statement occurs.
+ *
+ * Scenario:
+ *
+ * Looking over logs the sequence that best fits the events is as follows:
+ * - Something causes Mina to be delayed causing the WriteTimoutException.
+ * - This exception is recevied by AMQProtocolHandler#exceptionCaught
+ * - As the WriteTimeoutException is an IOException this will cause
+ * sessionClosed to be called to start failover.
+ * + This is potentially the issues here. All IOExceptions are treated
+ * as connection failure events.
+ * - Failover Runs
+ * + Failover assumes that the previous connection has been closed.
+ * + Failover binds the existing objects (AMQConnection/Session) to the
+ * new connection objects.
+ * - Everything is reported as being successfully failed over.
+ * However, what is neglected is that the original connection has not
+ * been closed.
+ * + So what occurs is that the broker sends a message to the consumer on
+ * the original connection, as it was not notified of the client
+ * failing over.
+ * As the client failover reuses the original AMQSession and Dispatcher
+ * the new messages the broker sends to the old consumer arrives at the
+ * client and is processed by the same AMQSession and Dispatcher.
+ * However, as the failover process cleared the _consumer map and
+ * resubscribe the consumers the Dispatcher does not recognise the
+ * delivery tag and so logs the 'without a handler' message.
+ * - The Dispatcher then attempts to reject the message, however,
+ * + The AMQSession/Dispatcher pair have been swapped to using a new Mina
+ * ProtocolSession as part of the failover process so the reject is
+ * sent down the second connection. The broker receives the Reject
+ * request but as the Message was sent on a different connection the
+ * unacknowledgemap is empty and a 'message is null' log message
+ * produced.
+ *
+ * Test Strategy:
+ *
+ * It should be easy to demonstrate if we can send an IOException to
+ * AMQProtocolHandler#exceptionCaught and then try sending a message.
+ *
+ * The current unknowns here are the type of consumers that are in use.
+ * If it was an exclusive queue(Durable Subscription) then why did the
+ * resubscribe not fail.
+ *
+ * If it was not exclusive then why did the messages not round robin?
+ */
+public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener
+{
+ private CountDownLatch _failoverOccured = new CountDownLatch(1);
+ AMQConnection _connection;
+ Session _session;
+ Queue _queue;
+ MessageConsumer _consumer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ stopBroker(getFailingPort());
+
+ }
+
+ /**
+ * Test Summary:
+ *
+ * Create a queue consumer and send 10 messages to the broker.
+ *
+ * Consume the first message.
+ * This will pull the rest into the prefetch
+ *
+ * Send an IOException to the MinaProtocolHandler.
+ *
+ * This will force failover to occur.
+ *
+ * 9 messages would normally be expected but it is expected that none will
+ * arrive. As they are still in the prefetch of the first session.
+ *
+ * To free the messages we need to close all connections.
+ * - Simply doing connection.close() and retesting will not be enough as
+ * the original connection's IO layer will still exist and is nolonger
+ * connected to the connection object as a result of failover.
+ *
+ * - Test will need to retain a reference to the original connection IO so
+ * that it can be closed releasing the messages to validate that the
+ * messages have indeed been 'lost' on that sesssion.
+ */
+ public void test() throws Exception
+ {
+ initialiseConnection();
+
+ // Create Producer
+ // Send 10 messages
+ List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10);
+
+ // Consume first messasge
+ Message received = _consumer.receive(2000);
+
+ // Verify received messages
+ assertNotNull("First message not received.", received);
+ assertEquals("Incorrect message Received",
+ messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+
+ // Allow ack to be sent to broker, by performing a synchronous command
+ // along the session.
+// _session.createConsumer(_session.createTemporaryQueue()).close();
+
+ //Retain IO Layer
+ AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();
+
+ // Send IO Exception - causing failover
+ _connection.getProtocolHandler().
+ exceptionCaught(_connection.getProtocolHandler().getProtocolSession().getIoSession(),
+ new WriteTimeoutException("WriteTimeoutException to cause failover."));
+
+ // Verify Failover occured through ConnectionListener
+ assertTrue("Failover did not occur",
+ _failoverOccured.await(4000, TimeUnit.MILLISECONDS));
+
+ //Verify new protocolSession is not the same as the original
+ assertNotSame("Protocol Session has not changed",
+ protocolSession,
+ _connection.getProtocolHandler().getProtocolSession());
+
+ /***********************************/
+ // This verifies that the bug has been resolved
+
+ // Attempt to consume again. Expect 9 messages
+ for (int count = 1; count < 10; count++)
+ {
+ received = _consumer.receive(2000);
+ assertNotNull("Expected message not received:" + count, received);
+ assertEquals(messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+ }
+
+ //Verify there are no more messages
+ received = _consumer.receive(1000);
+ assertNull("Message receieved when there should be none:" + received,
+ received);
+
+// /***********************************/
+// // This verifies that the bug exists
+//
+// // Attempt to consume remaining 9 messages.. Expecting NONE.
+// // receiving just one message should fail so no need to fail 9 times
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when it should be null:" + received, received);
+//
+//// //Close the Connection which you would assume would free the messages
+//// _connection.close();
+////
+//// // Reconnect
+//// initialiseConnection();
+////
+//// // We should still be unable to receive messages
+//// received = _consumer.receive(1000);
+//// assertNull("Message receieved when it should be null:" + received, received);
+////
+//// _connection.close();
+//
+// // Close original IO layer. Expecting messages to be released
+// protocolSession.closeProtocolSession();
+//
+// // Reconnect and all should be good.
+//// initialiseConnection();
+//
+// // Attempt to consume again. Expect 9 messages
+// for (int count = 1; count < 10; count++)
+// {
+// received = _consumer.receive(2000);
+// assertNotNull("Expected message not received:" + count, received);
+// assertEquals(messages.remove(0).getIntProperty("count"),
+// received.getIntProperty("count"));
+// }
+//
+// //Verify there are no more messages
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when there should be none:" + received,
+// received);
+ }
+
+ private void initialiseConnection()
+ throws Exception
+ {
+ //Create Connection
+ _connection = (AMQConnection) getConnection();
+ _connection.setConnectionListener(this);
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _queue = _session.createQueue(getName());
+
+ // Create Consumer
+ _consumer = _session.createConsumer(_queue);
+
+ //Start connection
+ _connection.start();
+ }
+
+ /** QpidTestCase back port to this release */
+
+ // modified from QTC as sendMessage is not testable.
+ // - should be renamed sendBlankBytesMessage
+ // - should be renamed sendNumberedBytesMessage
+ public List<Message> sendNumberedBytesMessage(Session session, Destination destination,
+ int count) throws Exception
+ {
+ List<Message> messages = new ArrayList<Message>(count);
+
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++)
+ {
+ Message next = session.createMessage();
+
+ next.setIntProperty("count", count);
+
+ producer.send(next);
+
+ messages.add(next);
+ }
+
+ producer.close();
+ return messages;
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverOccured.countDown();
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
index 30cc48691f..eb36522fac 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -61,7 +61,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
* @param transacted create a transacted session for this test
* @param mode if not transacted what ack mode to use for this test
* @throws Exception if a problem occured during test setup.
- */
+ */
@Override
protected void init(boolean transacted, int mode) throws Exception
{
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 5b5bb4a6a2..0426c4f45f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -55,7 +55,7 @@ public class FailoverBaseCase extends QpidTestCase
super.setUp();
// Set QPID_WORK to $QPID_WORK/<getFailingPort()>
// or /tmp/<getFailingPort()> if QPID_WORK not set.
- setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort());
startBroker(getFailingPort());
}
@@ -95,7 +95,7 @@ public class FailoverBaseCase extends QpidTestCase
// Ensure we shutdown any secondary brokers, even if we are unable
// to cleanly tearDown the QTC.
stopBroker(getFailingPort());
- FileUtils.deleteDirectory(System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
}
}
diff --git a/java/test-profiles/Excludes b/java/test-profiles/Excludes
index d14d467b89..863f56ae92 100644
--- a/java/test-profiles/Excludes
+++ b/java/test-profiles/Excludes
@@ -30,3 +30,7 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck
// QPID-143 : Failover can occur between receive and ack but we don't stop the ack.
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk
+
+
+//temp do not commit
+org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#*
diff --git a/java/test-profiles/test-provider.properties b/java/test-profiles/test-provider.properties
index 70a2672263..8cea012c1d 100644
--- a/java/test-profiles/test-provider.properties
+++ b/java/test-profiles/test-provider.properties
@@ -34,7 +34,7 @@ connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
+connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20''
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'