diff options
Diffstat (limited to 'java')
3 files changed, 282 insertions, 79 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java index 66ebec76e7..f22a405fc3 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java @@ -20,17 +20,26 @@ */ package org.apache.qpid.test.unit.ack; -import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionListener; import javax.jms.Connection; -import javax.jms.Session; -import javax.jms.Message; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TransactionRolledBackException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest +public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener { + protected CountDownLatch _failoverCompleted = new CountDownLatch(1); + private MessageListener _listener = null; + @Override public void setUp() throws Exception { @@ -38,6 +47,27 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessage NUM_MESSAGES = 10; } + /** + * Override default init to add connectionListener so we can verify that + * failover took place + * + * @param transacted create a transacted session for this test + * @param mode if not transacted what ack mode to use for this test + * + * @throws Exception if a problem occured during test setup. + */ + @Override + public void init(boolean transacted, int mode) throws Exception + { + super.init(transacted, mode); + ((AMQConnection) _connection).setConnectionListener(this); + // Override the listener for the dirtyAck testing. + if (_listener != null) + { + _consumer.setMessageListener(_listener); + } + } + protected void prepBroker(int count) throws Exception { //Stop the connection whilst we repopulate the broker, or the no_ack @@ -107,4 +137,220 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessage } } + + int msgCount = 0; + boolean cleaned = false; + + class DirtyAckingHandler implements MessageListener + { + /** + * Validate first message but do nothing with it. + * + * Failover + * + * The receive the message again + * + * @param message + */ + public void onMessage(Message message) + { + // Stop processing if we have an error and had to stop running. + if (_receviedAll.getCount() == 0) + { + _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message); + return; + } + + try + { + // Check we have the next message as expected + assertNotNull("Message " + msgCount + " not correctly received.", message); + assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX)); + + if (msgCount == 0 && _failoverCompleted.getCount() != 0) + { + // This is the first message we've received so lets fail the broker + + failBroker(getFailingPort()); + + repopulateBroker(); + + _logger.error("Received first msg so failing over"); + + return; + } + + msgCount++; + + // Don't acknowlege the first message after failover so we can commit + // them together + if (msgCount == 1) + { + _logger.error("Received first msg after failover ignoring:" + msgCount); + + // Acknowledge the first message if we are now on the cleaned pass + if (cleaned) + { + _receviedAll.countDown(); + } + + return; + } + + if (_consumerSession.getTransacted()) + { + try + { + _consumerSession.commit(); + if (!cleaned) + { + fail("Session is dirty we should get an TransactionRolledBackException"); + } + } + catch (TransactionRolledBackException trbe) + { + //expected path + } + } + else + { + try + { + message.acknowledge(); + if (!cleaned) + { + fail("Session is dirty we should get an IllegalStateException"); + } + } + catch (javax.jms.IllegalStateException ise) + { + assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage()); + // Recover the sesion and try again. + _consumerSession.recover(); + } + } + + // Acknowledge the last message if we are in a clean state + // this will then trigger test teardown. + if (cleaned) + { + _receviedAll.countDown(); + } + + //Reset message count so we can try again. + msgCount = 0; + cleaned = true; + } + catch (Exception e) + { + // If something goes wrong stop and notifiy main thread. + fail(e); + } + } + } + + /** + * Test that Acking/Committing a message received before failover causes + * an exception at commit/ack time. + * + * Expected behaviour is that in: + * * tx mode commit() throws a transacted RolledBackException + * * client ack mode throws an IllegalStateException + * + * @param transacted is this session trasacted + * @param mode What ack mode should be used if not trasacted + * + * @throws Exception if something goes wrong. + */ + protected void testDirtyAcking(boolean transacted, int mode) throws Exception + { + NUM_MESSAGES = 2; + _listener = new DirtyAckingHandler(); + + super.testAcking(transacted, mode); + } + + public void testDirtyClientAck() throws Exception + { + testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE); + } + + public void testDirtyAckingTransacted() throws Exception + { + testDirtyAcking(true, Session.SESSION_TRANSACTED); + } + + private void repopulateBroker() throws Exception + { + // Repopulate this new broker so we can test what happends after failover + + //Get the connection to the first (main port) broker. + Connection connection = getConnection(); + // Use a transaction to send messages so we can be sure they arrive. + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, NUM_MESSAGES); + + assertEquals("Wrong number of messages on queue", NUM_MESSAGES, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + + connection.close(); + } + + // AMQConnectionListener Interface.. used so we can validate that we + // actually failed over. + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover + return true; + } + + public boolean preResubscribe() + { + //Allow failover + return true; + } + + public void failoverComplete() + { + _failoverCompleted.countDown(); + } + + /** + * Override so we can block until failover has completd + * + * @param port + */ + @Override + public void failBroker(int port) + { + super.failBroker(port); + + try + { + if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) + { + // Use an exception so that we use our local fail() that notifies the main thread of failure + throw new Exception("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); + } + + } + catch (Exception e) + { + // Use an exception so that we use our local fail() that notifies the main thread of failure + fail(e); + } + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java index cce29f3dbd..7e3f00d4c5 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -54,6 +54,14 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con NUM_MESSAGES = 10; } + /** + * Override default init to add connectionListener so we can verify that + * failover took place + * + * @param transacted create a transacted session for this test + * @param mode if not transacted what ack mode to use for this test + * @throws Exception if a problem occured during test setup. + */ @Override protected void init(boolean transacted, int mode) throws Exception { @@ -244,75 +252,6 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con testDirtyAcking(true, Session.SESSION_TRANSACTED); } - /** - * If a transacted session has failed over whilst it has uncommitted sent - * data then we need to throw a TransactedRolledbackException on commit() - * - * The alternative would be to maintain a replay buffer so that the message - * could be resent. This is not currently implemented - * - * @throws Exception if something goes wrong. - */ - public void testDirtySendingTransacted() throws Exception - { - Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); - - // Ensure we get failover notifications - ((AMQConnection) _connection).setConnectionListener(this); - - MessageProducer producer = producerSession.createProducer(_queue); - - // Create and send message 0 - Message msg = producerSession.createMessage(); - msg.setIntProperty(INDEX, 0); - producer.send(msg); - - // DON'T commit message .. fail connection - - failBroker(getFailingPort()); - - // Ensure destination exists for sending - producerSession.createConsumer(_queue).close(); - - // Send the next message - msg.setIntProperty(INDEX, 1); - try - { - producer.send(msg); - fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); - } - catch (JMSException jmse) - { - assertEquals("Early warning of dirty session not correct", - "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); - } - - // Ignore that the session is dirty and attempt to commit to validate the - // exception is thrown. AND that the above failure notification did NOT - // clean up the session. - - try - { - producerSession.commit(); - fail("Session is dirty we should get an TransactionRolledBackException"); - } - catch (TransactionRolledBackException trbe) - { - // Normal path. - } - - // Resend messages - msg.setIntProperty(INDEX, 0); - producer.send(msg); - msg.setIntProperty(INDEX, 1); - producer.send(msg); - - producerSession.commit(); - - assertEquals("Wrong number of messages on queue", 2, - ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue)); - } - // AMQConnectionListener Interface.. used so we can validate that we // actually failed over. diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java index 12ed66b3d7..db50444d4a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java @@ -32,14 +32,22 @@ import java.util.concurrent.atomic.AtomicReference; public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener { - private CountDownLatch _receviedAll; - private AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null); + protected CountDownLatch _receviedAll; + protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null); @Override public void setUp() throws Exception { super.setUp(); + } + + @Override + public void init(boolean transacted, int mode) throws Exception + { _receviedAll = new CountDownLatch(NUM_MESSAGES); + + super.init(transacted, mode); + _consumer.setMessageListener(this); } /** @@ -51,13 +59,22 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message protected void testAcking(boolean transacted, int mode) throws Exception { init(transacted, mode); - _consumer.setMessageListener(this); _connection.start(); if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS)) { - fail("All messages not received."); + // Check to see if we ended due to an exception in the onMessage handler + Exception cause = _causeOfFailure.get(); + if (cause != null) + { + cause.printStackTrace(); + fail(cause.getMessage()); + } + else + { + fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES); + } } // Check to see if we ended due to an exception in the onMessage handler @@ -102,11 +119,12 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message /** * Pass the given exception back to the waiting thread to fail the test run. - * @param e The exception that is causing the test to fail. + * + * @param e The exception that is causing the test to fail. */ protected void fail(Exception e) { - _causeOfFailure.set(e); + _causeOfFailure.set(e); // End the test. while (_receviedAll.getCount() != 0) { |
