diff options
Diffstat (limited to 'java')
8 files changed, 358 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index f3cab10ed7..fcf3fd4337 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); // throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); - } + } else { if (message.isQueueDeleted()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 5d7adc6371..9918013888 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -54,6 +54,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); + + // Protect the broker against out of order frame request. + if (virtualHost == null) + { + throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null); + } + final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()); diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 927f660932..7fa7004a9e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -140,6 +140,17 @@ public class FailoverHandler implements Runnable // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); + + // We are failing over so lets ensure any existing ProtocolSessions + // are closed. Closing them will update the stateManager which we + // probably don't want to record the change to the closed state. + // So lets make a new one. + _amqProtocolHandler.setStateManager(new AMQStateManager()); + + // Close the session, false says don't wait for it to close, just close it. + _amqProtocolHandler.getProtocolSession().closeProtocolSession(false); + + // Use a fresh new StateManager for the reconnection attempts _amqProtocolHandler.setStateManager(new AMQStateManager()); diff --git a/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java new file mode 100644 index 0000000000..8d707c4c79 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java @@ -0,0 +1,331 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.failover; + +import org.apache.mina.common.WriteTimeoutException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.FailoverBaseCase; +import org.apache.qpid.AMQConnectionClosedException; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Test case based on user reported error. + * + * Summary: + * A user has reported message loss from their application. On bouncing of + * the broker the 'lost' messages are delivered to the broker. + * + * Note: + * The client was using Spring so that may influence the situation. + * + * Issue: + * The log files show 7 instances of the following which result in 7 + * missing messages. + * + * The client log files show: + * + * The broker log file show: + * + * + * 7 missing messages have delivery tags 5-11. Which says that they are + * sequentially the next message from the broker. + * + * The only way for the 'without a handler' log to occur is if the consumer + * has been removed from the look up table of the dispatcher. + * And the only way for the 'null message' log to occur on the broker is is + * if the message does not exist in the unacked-map + * + * The consumer is only removed from the list during session + * closure and failover. + * + * If the session was closed then the broker would requeue the unacked + * messages so the potential exists to have an empty map but the broker + * will not send a message out after the unacked map has been cleared. + * + * When failover occurs the _consumer map is cleared and the consumers are + * resubscribed. This is down without first stopping any existing + * dispatcher so there exists the potential to receive a message after + * the _consumer map has been cleared which is how the 'without a handler' + * log statement occurs. + * + * Scenario: + * + * Looking over logs the sequence that best fits the events is as follows: + * - Something causes Mina to be delayed causing the WriteTimoutException. + * - This exception is recevied by AMQProtocolHandler#exceptionCaught + * - As the WriteTimeoutException is an IOException this will cause + * sessionClosed to be called to start failover. + * + This is potentially the issues here. All IOExceptions are treated + * as connection failure events. + * - Failover Runs + * + Failover assumes that the previous connection has been closed. + * + Failover binds the existing objects (AMQConnection/Session) to the + * new connection objects. + * - Everything is reported as being successfully failed over. + * However, what is neglected is that the original connection has not + * been closed. + * + So what occurs is that the broker sends a message to the consumer on + * the original connection, as it was not notified of the client + * failing over. + * As the client failover reuses the original AMQSession and Dispatcher + * the new messages the broker sends to the old consumer arrives at the + * client and is processed by the same AMQSession and Dispatcher. + * However, as the failover process cleared the _consumer map and + * resubscribe the consumers the Dispatcher does not recognise the + * delivery tag and so logs the 'without a handler' message. + * - The Dispatcher then attempts to reject the message, however, + * + The AMQSession/Dispatcher pair have been swapped to using a new Mina + * ProtocolSession as part of the failover process so the reject is + * sent down the second connection. The broker receives the Reject + * request but as the Message was sent on a different connection the + * unacknowledgemap is empty and a 'message is null' log message + * produced. + * + * Test Strategy: + * + * It should be easy to demonstrate if we can send an IOException to + * AMQProtocolHandler#exceptionCaught and then try sending a message. + * + * The current unknowns here are the type of consumers that are in use. + * If it was an exclusive queue(Durable Subscription) then why did the + * resubscribe not fail. + * + * If it was not exclusive then why did the messages not round robin? + */ +public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener +{ + private CountDownLatch _failoverOccured = new CountDownLatch(1); + AMQConnection _connection; + Session _session; + Queue _queue; + MessageConsumer _consumer; + + public void setUp() throws Exception + { + super.setUp(); + stopBroker(getFailingPort()); + + } + + /** + * Test Summary: + * + * Create a queue consumer and send 10 messages to the broker. + * + * Consume the first message. + * This will pull the rest into the prefetch + * + * Send an IOException to the MinaProtocolHandler. + * + * This will force failover to occur. + * + * 9 messages would normally be expected but it is expected that none will + * arrive. As they are still in the prefetch of the first session. + * + * To free the messages we need to close all connections. + * - Simply doing connection.close() and retesting will not be enough as + * the original connection's IO layer will still exist and is nolonger + * connected to the connection object as a result of failover. + * + * - Test will need to retain a reference to the original connection IO so + * that it can be closed releasing the messages to validate that the + * messages have indeed been 'lost' on that sesssion. + */ + public void test() throws Exception + { + initialiseConnection(); + + // Create Producer + // Send 10 messages + List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10); + + // Consume first messasge + Message received = _consumer.receive(2000); + + // Verify received messages + assertNotNull("First message not received.", received); + assertEquals("Incorrect message Received", + messages.remove(0).getIntProperty("count"), + received.getIntProperty("count")); + + // Allow ack to be sent to broker, by performing a synchronous command + // along the session. +// _session.createConsumer(_session.createTemporaryQueue()).close(); + + //Retain IO Layer + AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession(); + + // Send IO Exception - causing failover + _connection.getProtocolHandler(). + exceptionCaught(_connection.getProtocolHandler().getProtocolSession().getIoSession(), + new WriteTimeoutException("WriteTimeoutException to cause failover.")); + + // Verify Failover occured through ConnectionListener + assertTrue("Failover did not occur", + _failoverOccured.await(4000, TimeUnit.MILLISECONDS)); + + //Verify new protocolSession is not the same as the original + assertNotSame("Protocol Session has not changed", + protocolSession, + _connection.getProtocolHandler().getProtocolSession()); + + /***********************************/ + // This verifies that the bug has been resolved + + // Attempt to consume again. Expect 9 messages + for (int count = 1; count < 10; count++) + { + received = _consumer.receive(2000); + assertNotNull("Expected message not received:" + count, received); + assertEquals(messages.remove(0).getIntProperty("count"), + received.getIntProperty("count")); + } + + //Verify there are no more messages + received = _consumer.receive(1000); + assertNull("Message receieved when there should be none:" + received, + received); + +// /***********************************/ +// // This verifies that the bug exists +// +// // Attempt to consume remaining 9 messages.. Expecting NONE. +// // receiving just one message should fail so no need to fail 9 times +// received = _consumer.receive(1000); +// assertNull("Message receieved when it should be null:" + received, received); +// +//// //Close the Connection which you would assume would free the messages +//// _connection.close(); +//// +//// // Reconnect +//// initialiseConnection(); +//// +//// // We should still be unable to receive messages +//// received = _consumer.receive(1000); +//// assertNull("Message receieved when it should be null:" + received, received); +//// +//// _connection.close(); +// +// // Close original IO layer. Expecting messages to be released +// protocolSession.closeProtocolSession(); +// +// // Reconnect and all should be good. +//// initialiseConnection(); +// +// // Attempt to consume again. Expect 9 messages +// for (int count = 1; count < 10; count++) +// { +// received = _consumer.receive(2000); +// assertNotNull("Expected message not received:" + count, received); +// assertEquals(messages.remove(0).getIntProperty("count"), +// received.getIntProperty("count")); +// } +// +// //Verify there are no more messages +// received = _consumer.receive(1000); +// assertNull("Message receieved when there should be none:" + received, +// received); + } + + private void initialiseConnection() + throws Exception + { + //Create Connection + _connection = (AMQConnection) getConnection(); + _connection.setConnectionListener(this); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _queue = _session.createQueue(getName()); + + // Create Consumer + _consumer = _session.createConsumer(_queue); + + //Start connection + _connection.start(); + } + + /** QpidTestCase back port to this release */ + + // modified from QTC as sendMessage is not testable. + // - should be renamed sendBlankBytesMessage + // - should be renamed sendNumberedBytesMessage + public List<Message> sendNumberedBytesMessage(Session session, Destination destination, + int count) throws Exception + { + List<Message> messages = new ArrayList<Message>(count); + + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < count; i++) + { + Message next = session.createMessage(); + + next.setIntProperty("count", count); + + producer.send(next); + + messages.add(next); + } + + producer.close(); + return messages; + } + + public void bytesSent(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover to occur + return true; + } + + public boolean preResubscribe() + { + //Allow failover to occur + return true; + } + + public void failoverComplete() + { + _failoverOccured.countDown(); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java index 30cc48691f..eb36522fac 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -61,7 +61,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con * @param transacted create a transacted session for this test * @param mode if not transacted what ack mode to use for this test * @throws Exception if a problem occured during test setup. - */ + */ @Override protected void init(boolean transacted, int mode) throws Exception { diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 5b5bb4a6a2..0426c4f45f 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -55,7 +55,7 @@ public class FailoverBaseCase extends QpidTestCase super.setUp(); // Set QPID_WORK to $QPID_WORK/<getFailingPort()> // or /tmp/<getFailingPort()> if QPID_WORK not set. - setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort()); + setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort()); startBroker(getFailingPort()); } @@ -95,7 +95,7 @@ public class FailoverBaseCase extends QpidTestCase // Ensure we shutdown any secondary brokers, even if we are unable // to cleanly tearDown the QTC. stopBroker(getFailingPort()); - FileUtils.deleteDirectory(System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort()); + FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); } } diff --git a/java/test-profiles/Excludes b/java/test-profiles/Excludes index d14d467b89..863f56ae92 100644 --- a/java/test-profiles/Excludes +++ b/java/test-profiles/Excludes @@ -30,3 +30,7 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck // QPID-143 : Failover can occur between receive and ack but we don't stop the ack. org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk + + +//temp do not commit +org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#* diff --git a/java/test-profiles/test-provider.properties b/java/test-profiles/test-provider.properties index 70a2672263..8cea012c1d 100644 --- a/java/test-profiles/test-provider.properties +++ b/java/test-profiles/test-provider.properties @@ -34,7 +34,7 @@ connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' -connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1' +connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20'' connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}' connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}' |
