summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java11
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java331
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java4
-rw-r--r--java/test-profiles/Excludes4
-rw-r--r--java/test-profiles/test-provider.properties2
8 files changed, 358 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index f3cab10ed7..fcf3fd4337 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
- }
+ }
else
{
if (message.isQueueDeleted())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 5d7adc6371..9918013888 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,6 +54,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+
final AMQChannel channel = new AMQChannel(session,channelId,
virtualHost.getMessageStore());
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 927f660932..7fa7004a9e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -140,6 +140,17 @@ public class FailoverHandler implements Runnable
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
+
+ // We are failing over so lets ensure any existing ProtocolSessions
+ // are closed. Closing them will update the stateManager which we
+ // probably don't want to record the change to the closed state.
+ // So lets make a new one.
+ _amqProtocolHandler.setStateManager(new AMQStateManager());
+
+ // Close the session, false says don't wait for it to close, just close it.
+ _amqProtocolHandler.getProtocolSession().closeProtocolSession(false);
+
+ // Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
new file mode 100644
index 0000000000..8d707c4c79
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
@@ -0,0 +1,331 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.failover;
+
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.AMQConnectionClosedException;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test case based on user reported error.
+ *
+ * Summary:
+ * A user has reported message loss from their application. On bouncing of
+ * the broker the 'lost' messages are delivered to the broker.
+ *
+ * Note:
+ * The client was using Spring so that may influence the situation.
+ *
+ * Issue:
+ * The log files show 7 instances of the following which result in 7
+ * missing messages.
+ *
+ * The client log files show:
+ *
+ * The broker log file show:
+ *
+ *
+ * 7 missing messages have delivery tags 5-11. Which says that they are
+ * sequentially the next message from the broker.
+ *
+ * The only way for the 'without a handler' log to occur is if the consumer
+ * has been removed from the look up table of the dispatcher.
+ * And the only way for the 'null message' log to occur on the broker is is
+ * if the message does not exist in the unacked-map
+ *
+ * The consumer is only removed from the list during session
+ * closure and failover.
+ *
+ * If the session was closed then the broker would requeue the unacked
+ * messages so the potential exists to have an empty map but the broker
+ * will not send a message out after the unacked map has been cleared.
+ *
+ * When failover occurs the _consumer map is cleared and the consumers are
+ * resubscribed. This is down without first stopping any existing
+ * dispatcher so there exists the potential to receive a message after
+ * the _consumer map has been cleared which is how the 'without a handler'
+ * log statement occurs.
+ *
+ * Scenario:
+ *
+ * Looking over logs the sequence that best fits the events is as follows:
+ * - Something causes Mina to be delayed causing the WriteTimoutException.
+ * - This exception is recevied by AMQProtocolHandler#exceptionCaught
+ * - As the WriteTimeoutException is an IOException this will cause
+ * sessionClosed to be called to start failover.
+ * + This is potentially the issues here. All IOExceptions are treated
+ * as connection failure events.
+ * - Failover Runs
+ * + Failover assumes that the previous connection has been closed.
+ * + Failover binds the existing objects (AMQConnection/Session) to the
+ * new connection objects.
+ * - Everything is reported as being successfully failed over.
+ * However, what is neglected is that the original connection has not
+ * been closed.
+ * + So what occurs is that the broker sends a message to the consumer on
+ * the original connection, as it was not notified of the client
+ * failing over.
+ * As the client failover reuses the original AMQSession and Dispatcher
+ * the new messages the broker sends to the old consumer arrives at the
+ * client and is processed by the same AMQSession and Dispatcher.
+ * However, as the failover process cleared the _consumer map and
+ * resubscribe the consumers the Dispatcher does not recognise the
+ * delivery tag and so logs the 'without a handler' message.
+ * - The Dispatcher then attempts to reject the message, however,
+ * + The AMQSession/Dispatcher pair have been swapped to using a new Mina
+ * ProtocolSession as part of the failover process so the reject is
+ * sent down the second connection. The broker receives the Reject
+ * request but as the Message was sent on a different connection the
+ * unacknowledgemap is empty and a 'message is null' log message
+ * produced.
+ *
+ * Test Strategy:
+ *
+ * It should be easy to demonstrate if we can send an IOException to
+ * AMQProtocolHandler#exceptionCaught and then try sending a message.
+ *
+ * The current unknowns here are the type of consumers that are in use.
+ * If it was an exclusive queue(Durable Subscription) then why did the
+ * resubscribe not fail.
+ *
+ * If it was not exclusive then why did the messages not round robin?
+ */
+public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener
+{
+ private CountDownLatch _failoverOccured = new CountDownLatch(1);
+ AMQConnection _connection;
+ Session _session;
+ Queue _queue;
+ MessageConsumer _consumer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ stopBroker(getFailingPort());
+
+ }
+
+ /**
+ * Test Summary:
+ *
+ * Create a queue consumer and send 10 messages to the broker.
+ *
+ * Consume the first message.
+ * This will pull the rest into the prefetch
+ *
+ * Send an IOException to the MinaProtocolHandler.
+ *
+ * This will force failover to occur.
+ *
+ * 9 messages would normally be expected but it is expected that none will
+ * arrive. As they are still in the prefetch of the first session.
+ *
+ * To free the messages we need to close all connections.
+ * - Simply doing connection.close() and retesting will not be enough as
+ * the original connection's IO layer will still exist and is nolonger
+ * connected to the connection object as a result of failover.
+ *
+ * - Test will need to retain a reference to the original connection IO so
+ * that it can be closed releasing the messages to validate that the
+ * messages have indeed been 'lost' on that sesssion.
+ */
+ public void test() throws Exception
+ {
+ initialiseConnection();
+
+ // Create Producer
+ // Send 10 messages
+ List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10);
+
+ // Consume first messasge
+ Message received = _consumer.receive(2000);
+
+ // Verify received messages
+ assertNotNull("First message not received.", received);
+ assertEquals("Incorrect message Received",
+ messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+
+ // Allow ack to be sent to broker, by performing a synchronous command
+ // along the session.
+// _session.createConsumer(_session.createTemporaryQueue()).close();
+
+ //Retain IO Layer
+ AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();
+
+ // Send IO Exception - causing failover
+ _connection.getProtocolHandler().
+ exceptionCaught(_connection.getProtocolHandler().getProtocolSession().getIoSession(),
+ new WriteTimeoutException("WriteTimeoutException to cause failover."));
+
+ // Verify Failover occured through ConnectionListener
+ assertTrue("Failover did not occur",
+ _failoverOccured.await(4000, TimeUnit.MILLISECONDS));
+
+ //Verify new protocolSession is not the same as the original
+ assertNotSame("Protocol Session has not changed",
+ protocolSession,
+ _connection.getProtocolHandler().getProtocolSession());
+
+ /***********************************/
+ // This verifies that the bug has been resolved
+
+ // Attempt to consume again. Expect 9 messages
+ for (int count = 1; count < 10; count++)
+ {
+ received = _consumer.receive(2000);
+ assertNotNull("Expected message not received:" + count, received);
+ assertEquals(messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+ }
+
+ //Verify there are no more messages
+ received = _consumer.receive(1000);
+ assertNull("Message receieved when there should be none:" + received,
+ received);
+
+// /***********************************/
+// // This verifies that the bug exists
+//
+// // Attempt to consume remaining 9 messages.. Expecting NONE.
+// // receiving just one message should fail so no need to fail 9 times
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when it should be null:" + received, received);
+//
+//// //Close the Connection which you would assume would free the messages
+//// _connection.close();
+////
+//// // Reconnect
+//// initialiseConnection();
+////
+//// // We should still be unable to receive messages
+//// received = _consumer.receive(1000);
+//// assertNull("Message receieved when it should be null:" + received, received);
+////
+//// _connection.close();
+//
+// // Close original IO layer. Expecting messages to be released
+// protocolSession.closeProtocolSession();
+//
+// // Reconnect and all should be good.
+//// initialiseConnection();
+//
+// // Attempt to consume again. Expect 9 messages
+// for (int count = 1; count < 10; count++)
+// {
+// received = _consumer.receive(2000);
+// assertNotNull("Expected message not received:" + count, received);
+// assertEquals(messages.remove(0).getIntProperty("count"),
+// received.getIntProperty("count"));
+// }
+//
+// //Verify there are no more messages
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when there should be none:" + received,
+// received);
+ }
+
+ private void initialiseConnection()
+ throws Exception
+ {
+ //Create Connection
+ _connection = (AMQConnection) getConnection();
+ _connection.setConnectionListener(this);
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _queue = _session.createQueue(getName());
+
+ // Create Consumer
+ _consumer = _session.createConsumer(_queue);
+
+ //Start connection
+ _connection.start();
+ }
+
+ /** QpidTestCase back port to this release */
+
+ // modified from QTC as sendMessage is not testable.
+ // - should be renamed sendBlankBytesMessage
+ // - should be renamed sendNumberedBytesMessage
+ public List<Message> sendNumberedBytesMessage(Session session, Destination destination,
+ int count) throws Exception
+ {
+ List<Message> messages = new ArrayList<Message>(count);
+
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++)
+ {
+ Message next = session.createMessage();
+
+ next.setIntProperty("count", count);
+
+ producer.send(next);
+
+ messages.add(next);
+ }
+
+ producer.close();
+ return messages;
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverOccured.countDown();
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
index 30cc48691f..eb36522fac 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -61,7 +61,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
* @param transacted create a transacted session for this test
* @param mode if not transacted what ack mode to use for this test
* @throws Exception if a problem occured during test setup.
- */
+ */
@Override
protected void init(boolean transacted, int mode) throws Exception
{
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 5b5bb4a6a2..0426c4f45f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -55,7 +55,7 @@ public class FailoverBaseCase extends QpidTestCase
super.setUp();
// Set QPID_WORK to $QPID_WORK/<getFailingPort()>
// or /tmp/<getFailingPort()> if QPID_WORK not set.
- setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort());
startBroker(getFailingPort());
}
@@ -95,7 +95,7 @@ public class FailoverBaseCase extends QpidTestCase
// Ensure we shutdown any secondary brokers, even if we are unable
// to cleanly tearDown the QTC.
stopBroker(getFailingPort());
- FileUtils.deleteDirectory(System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
}
}
diff --git a/java/test-profiles/Excludes b/java/test-profiles/Excludes
index d14d467b89..863f56ae92 100644
--- a/java/test-profiles/Excludes
+++ b/java/test-profiles/Excludes
@@ -30,3 +30,7 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck
// QPID-143 : Failover can occur between receive and ack but we don't stop the ack.
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk
+
+
+//temp do not commit
+org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#*
diff --git a/java/test-profiles/test-provider.properties b/java/test-profiles/test-provider.properties
index 70a2672263..8cea012c1d 100644
--- a/java/test-profiles/test-provider.properties
+++ b/java/test-profiles/test-provider.properties
@@ -34,7 +34,7 @@ connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
+connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20''
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'