diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-10-05 15:04:15 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-10-05 15:04:15 +0000 |
| commit | e0e91927a9dc94ee2b85e33812cfbc82b71cfc56 (patch) | |
| tree | 181ba683c644f6698ef755e02b78981496d2f49f /java | |
| parent | 199dc525df58cd7793afd3ebd1900de57a63f9c2 (diff) | |
| download | qpid-python-e0e91927a9dc94ee2b85e33812cfbc82b71cfc56.tar.gz | |
QPID-1816 : Add Acknowledge tests and QuickAcking manual test helper.
Updated AcknowldegeAfterFailoverTest to correctly cover the failure cases. Sending messages on a dirty transaction and Receiveing messagges on a dirty session.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@821824 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
5 files changed, 426 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fa15df34ec..fc81e32e4d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -60,6 +60,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -777,8 +778,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { + //Check that we are clean to commit. + if (_failedOverDirty) + { + rollback(); + + throw new TransactionRolledBackException("Connection failover has occured since last send. " + + "Forced rollback"); + } + - // TGM FIXME: what about failover? // Acknowledge all delivered messages while (true) { @@ -1509,6 +1518,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic sendRecover(); + markClean(); + if (!isSuspended) { suspendChannel(false); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java index be268946cd..cce29f3dbd 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -20,20 +20,28 @@ */ package org.apache.qpid.test.unit.ack; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionListener; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TransactionRolledBackException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * */ -public class AcknowledgeAfterFailoverTest extends AcknowledgeTest +public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements ConnectionListener { + protected CountDownLatch _failoverCompleted = new CountDownLatch(1); + @Override public void setUp() throws Exception { @@ -46,6 +54,13 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest NUM_MESSAGES = 10; } + @Override + protected void init(boolean transacted, int mode) throws Exception + { + super.init(transacted, mode); + ((AMQConnection) _connection).setConnectionListener(this); + } + protected void prepBroker(int count) throws Exception { if (count % 2 == 1) @@ -107,10 +122,17 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest } /** - * @param transacted - * @param mode + * Test that Acking/Committing a message received before failover causes + * an exception at commit/ack time. + * + * Expected behaviour is that in: + * * tx mode commit() throws a transacted RolledBackException + * * client ack mode throws an IllegalStateException * - * @throws Exception + * @param transacted is this session trasacted + * @param mode What ack mode should be used if not trasacted + * + * @throws Exception if something goes wrong. */ protected void testDirtyAcking(boolean transacted, int mode) throws Exception { @@ -125,27 +147,55 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest int count = 0; assertNotNull("Message " + count + " not correctly received.", msg); assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX)); - count++; - - //Don't acknowledge just prep the next broker + //Don't acknowledge just prep the next broker. Without changing count + // Prep the new broker to have all all the messages so we can validate + // that they can all be correctly received. try { - prepBroker(count); + + //Stop the connection so we can validate the number of message count + // on the queue is correct after failover + _connection.stop(); + failBroker(getFailingPort()); + + //Get the connection to the first (main port) broker. + Connection connection = getConnection();//getConnectionFactory("connection1").getConnectionURL()); + // Use a transaction to send messages so we can be sure they arrive. + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, NUM_MESSAGES); + + assertEquals("Wrong number of messages on queue", NUM_MESSAGES, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + + connection.close(); + + //restart connection + _connection.start(); } catch (Exception e) { fail("Unable to prep new broker," + e.getMessage()); } - // Consume the next message + // Consume the next message - don't check what it is as a normal would + // assume it is msg 1 but as we've fallen over it is msg 0 again. msg = _consumer.receive(1500); - assertNotNull("Message " + count + " not correctly received.", msg); - assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX)); if (_consumerSession.getTransacted()) { - _consumerSession.commit(); + try + { + _consumerSession.commit(); + fail("Session is dirty we should get an TransactionRolledBackException"); + } + catch (TransactionRolledBackException trbe) + { + //expected path + } } else { @@ -154,12 +204,32 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest msg.acknowledge(); fail("Session is dirty we should get an IllegalStateException"); } - catch (IllegalStateException ise) + catch (javax.jms.IllegalStateException ise) { assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage()); + // Recover the sesion and try again. + _consumerSession.recover(); } } + msg = _consumer.receive(1500); + // Validate we now get the first message back + assertEquals(0, msg.getIntProperty(INDEX)); + + msg = _consumer.receive(1500); + // and the second message + assertEquals(1, msg.getIntProperty(INDEX)); + + // And now verify that we can now commit the clean session + if (_consumerSession.getTransacted()) + { + _consumerSession.commit(); + } + else + { + msg.acknowledge(); + } + assertEquals("Wrong number of messages on queue", 0, ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); } @@ -169,9 +239,129 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE); } - public void testDirtyTransacted() throws Exception + public void testDirtyAckingTransacted() throws Exception { testDirtyAcking(true, Session.SESSION_TRANSACTED); } + /** + * If a transacted session has failed over whilst it has uncommitted sent + * data then we need to throw a TransactedRolledbackException on commit() + * + * The alternative would be to maintain a replay buffer so that the message + * could be resent. This is not currently implemented + * + * @throws Exception if something goes wrong. + */ + public void testDirtySendingTransacted() throws Exception + { + Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + + // Ensure we get failover notifications + ((AMQConnection) _connection).setConnectionListener(this); + + MessageProducer producer = producerSession.createProducer(_queue); + + // Create and send message 0 + Message msg = producerSession.createMessage(); + msg.setIntProperty(INDEX, 0); + producer.send(msg); + + // DON'T commit message .. fail connection + + failBroker(getFailingPort()); + + // Ensure destination exists for sending + producerSession.createConsumer(_queue).close(); + + // Send the next message + msg.setIntProperty(INDEX, 1); + try + { + producer.send(msg); + fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); + } + catch (JMSException jmse) + { + assertEquals("Early warning of dirty session not correct", + "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); + } + + // Ignore that the session is dirty and attempt to commit to validate the + // exception is thrown. AND that the above failure notification did NOT + // clean up the session. + + try + { + producerSession.commit(); + fail("Session is dirty we should get an TransactionRolledBackException"); + } + catch (TransactionRolledBackException trbe) + { + // Normal path. + } + + // Resend messages + msg.setIntProperty(INDEX, 0); + producer.send(msg); + msg.setIntProperty(INDEX, 1); + producer.send(msg); + + producerSession.commit(); + + assertEquals("Wrong number of messages on queue", 2, + ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue)); + } + + // AMQConnectionListener Interface.. used so we can validate that we + // actually failed over. + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover + return true; + } + + public boolean preResubscribe() + { + //Allow failover + return true; + } + + public void failoverComplete() + { + _failoverCompleted.countDown(); + } + + /** + * Override so we can block until failover has completd + * + * @param port + */ + @Override + public void failBroker(int port) + { + super.failBroker(port); + + try + { + if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) + { + fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); + } + } + catch (InterruptedException e) + { + fail("Failover was interuppted"); + } + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java index 6558fb4d09..12ed66b3d7 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java @@ -28,10 +28,12 @@ import javax.jms.MessageListener; import javax.jms.Session; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener { private CountDownLatch _receviedAll; + private AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null); @Override public void setUp() throws Exception @@ -49,14 +51,21 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message protected void testAcking(boolean transacted, int mode) throws Exception { init(transacted, mode); - _consumer.setMessageListener(this); _connection.start(); - if (!_receviedAll.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) + if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS)) { - fail("failover did not complete"); + fail("All messages not received."); + } + + // Check to see if we ended due to an exception in the onMessage handler + Exception cause = _causeOfFailure.get(); + if (cause != null) + { + cause.printStackTrace(); + fail(cause.getMessage()); } _consumer.close(); @@ -91,8 +100,17 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message } } + /** + * Pass the given exception back to the waiting thread to fail the test run. + * @param e The exception that is causing the test to fail. + */ protected void fail(Exception e) { - + _causeOfFailure.set(e); + // End the test. + while (_receviedAll.getCount() != 0) + { + _receviedAll.countDown(); + } } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java new file mode 100644 index 0000000000..834b17430b --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.jms.Session; + +import javax.jms.Message; +import javax.jms.Queue; + +public class FailoverBeforeConsumingRecoverTest extends RecoverTest +{ + + @Override + protected void initTest() throws Exception + { + super.initTest(); + failBroker(getFailingPort()); + + Queue queue = _consumerSession.createQueue(getTestQueueName()); + sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java new file mode 100644 index 0000000000..6c4b7ba01b --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.QpidTestCase; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * This is a quick manual test to validate acking after failover with a + * transacted session. + * + * Start an external broker then run this test. Std Err will print. + * Sent Message: 1 + * Received Message: 1 + * + * You can then restart the external broker, which will cause failover, which + * will be complete when the following appears. + * + * Failover Complete + * + * A second message send/receive cycle is then done to validate that the + * connection/session are still working. + * + */ +public class QuickAcking extends QpidTestCase implements ConnectionListener +{ + protected AMQConnection _connection; + protected Queue _queue; + protected Session _session; + protected MessageConsumer _consumer; + private CountDownLatch _failedOver; + private static final String INDEX = "INDEX"; + private int _count = 0; + + public void setUp() + { + // Prevent broker startup. Broker must be run manually. + } + + public void test() throws Exception + { + _failedOver = new CountDownLatch(1); + + _connection = new AMQConnection("amqp://guest:guest@client/test?brokerlist='localhost?retries='20'&connectdelay='2000''"); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _queue = _session.createQueue("QAtest"); + _consumer = _session.createConsumer(_queue); + _connection.setConnectionListener(this); + _connection.start(); + + sendAndReceive(); + + _failedOver.await(); + + sendAndReceive(); + + } + + private void sendAndReceive() + throws Exception + { + sendMessage(); + + Message message = _consumer.receive(); + + if (message.getIntProperty(INDEX) != _count) + { + throw new Exception("Incorrect message recieved:" + _count); + } + + if (_session.getTransacted()) + { + _session.commit(); + } + System.err.println("Recevied Message:" + _count); + } + + private void sendMessage() throws JMSException + { + MessageProducer producer = _session.createProducer(_queue); + Message message = _session.createMessage(); + _count++; + message.setIntProperty(INDEX, _count); + + producer.send(message); + if (_session.getTransacted()) + { + _session.commit(); + } + producer.close(); + + System.err.println("Sent Message:" + _count); + } + + public void bytesSent(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bytesReceived(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + System.err.println("Failover Complete"); + _failedOver.countDown(); + } +} |
