summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-10-05 15:04:15 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-10-05 15:04:15 +0000
commite0e91927a9dc94ee2b85e33812cfbc82b71cfc56 (patch)
tree181ba683c644f6698ef755e02b78981496d2f49f /java
parent199dc525df58cd7793afd3ebd1900de57a63f9c2 (diff)
downloadqpid-python-e0e91927a9dc94ee2b85e33812cfbc82b71cfc56.tar.gz
QPID-1816 : Add Acknowledge tests and QuickAcking manual test helper.
Updated AcknowldegeAfterFailoverTest to correctly cover the failure cases. Sending messages on a dirty transaction and Receiveing messagges on a dirty session. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@821824 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java13
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java218
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java26
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java40
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java148
5 files changed, 426 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index fa15df34ec..fc81e32e4d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -60,6 +60,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -777,8 +778,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
+ {
+ rollback();
+
+ throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+ "Forced rollback");
+ }
+
- // TGM FIXME: what about failover?
// Acknowledge all delivered messages
while (true)
{
@@ -1509,6 +1518,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
sendRecover();
+ markClean();
+
if (!isSuspended)
{
suspendChannel(false);
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
index be268946cd..cce29f3dbd 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -20,20 +20,28 @@
*/
package org.apache.qpid.test.unit.ack;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
*
*/
-public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
+public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements ConnectionListener
{
+ protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+
@Override
public void setUp() throws Exception
{
@@ -46,6 +54,13 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
NUM_MESSAGES = 10;
}
+ @Override
+ protected void init(boolean transacted, int mode) throws Exception
+ {
+ super.init(transacted, mode);
+ ((AMQConnection) _connection).setConnectionListener(this);
+ }
+
protected void prepBroker(int count) throws Exception
{
if (count % 2 == 1)
@@ -107,10 +122,17 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
}
/**
- * @param transacted
- * @param mode
+ * Test that Acking/Committing a message received before failover causes
+ * an exception at commit/ack time.
+ *
+ * Expected behaviour is that in:
+ * * tx mode commit() throws a transacted RolledBackException
+ * * client ack mode throws an IllegalStateException
*
- * @throws Exception
+ * @param transacted is this session trasacted
+ * @param mode What ack mode should be used if not trasacted
+ *
+ * @throws Exception if something goes wrong.
*/
protected void testDirtyAcking(boolean transacted, int mode) throws Exception
{
@@ -125,27 +147,55 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
int count = 0;
assertNotNull("Message " + count + " not correctly received.", msg);
assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
- count++;
-
- //Don't acknowledge just prep the next broker
+ //Don't acknowledge just prep the next broker. Without changing count
+ // Prep the new broker to have all all the messages so we can validate
+ // that they can all be correctly received.
try
{
- prepBroker(count);
+
+ //Stop the connection so we can validate the number of message count
+ // on the queue is correct after failover
+ _connection.stop();
+ failBroker(getFailingPort());
+
+ //Get the connection to the first (main port) broker.
+ Connection connection = getConnection();//getConnectionFactory("connection1").getConnectionURL());
+ // Use a transaction to send messages so we can be sure they arrive.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, NUM_MESSAGES);
+
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+ connection.close();
+
+ //restart connection
+ _connection.start();
}
catch (Exception e)
{
fail("Unable to prep new broker," + e.getMessage());
}
- // Consume the next message
+ // Consume the next message - don't check what it is as a normal would
+ // assume it is msg 1 but as we've fallen over it is msg 0 again.
msg = _consumer.receive(1500);
- assertNotNull("Message " + count + " not correctly received.", msg);
- assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
if (_consumerSession.getTransacted())
{
- _consumerSession.commit();
+ try
+ {
+ _consumerSession.commit();
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ //expected path
+ }
}
else
{
@@ -154,12 +204,32 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
msg.acknowledge();
fail("Session is dirty we should get an IllegalStateException");
}
- catch (IllegalStateException ise)
+ catch (javax.jms.IllegalStateException ise)
{
assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
+ // Recover the sesion and try again.
+ _consumerSession.recover();
}
}
+ msg = _consumer.receive(1500);
+ // Validate we now get the first message back
+ assertEquals(0, msg.getIntProperty(INDEX));
+
+ msg = _consumer.receive(1500);
+ // and the second message
+ assertEquals(1, msg.getIntProperty(INDEX));
+
+ // And now verify that we can now commit the clean session
+ if (_consumerSession.getTransacted())
+ {
+ _consumerSession.commit();
+ }
+ else
+ {
+ msg.acknowledge();
+ }
+
assertEquals("Wrong number of messages on queue", 0,
((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
}
@@ -169,9 +239,129 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
}
- public void testDirtyTransacted() throws Exception
+ public void testDirtyAckingTransacted() throws Exception
{
testDirtyAcking(true, Session.SESSION_TRANSACTED);
}
+ /**
+ * If a transacted session has failed over whilst it has uncommitted sent
+ * data then we need to throw a TransactedRolledbackException on commit()
+ *
+ * The alternative would be to maintain a replay buffer so that the message
+ * could be resent. This is not currently implemented
+ *
+ * @throws Exception if something goes wrong.
+ */
+ public void testDirtySendingTransacted() throws Exception
+ {
+ Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ // Ensure we get failover notifications
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ // Create and send message 0
+ Message msg = producerSession.createMessage();
+ msg.setIntProperty(INDEX, 0);
+ producer.send(msg);
+
+ // DON'T commit message .. fail connection
+
+ failBroker(getFailingPort());
+
+ // Ensure destination exists for sending
+ producerSession.createConsumer(_queue).close();
+
+ // Send the next message
+ msg.setIntProperty(INDEX, 1);
+ try
+ {
+ producer.send(msg);
+ fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
+ }
+ catch (JMSException jmse)
+ {
+ assertEquals("Early warning of dirty session not correct",
+ "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
+ }
+
+ // Ignore that the session is dirty and attempt to commit to validate the
+ // exception is thrown. AND that the above failure notification did NOT
+ // clean up the session.
+
+ try
+ {
+ producerSession.commit();
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ // Normal path.
+ }
+
+ // Resend messages
+ msg.setIntProperty(INDEX, 0);
+ producer.send(msg);
+ msg.setIntProperty(INDEX, 1);
+ producer.send(msg);
+
+ producerSession.commit();
+
+ assertEquals("Wrong number of messages on queue", 2,
+ ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ // AMQConnectionListener Interface.. used so we can validate that we
+ // actually failed over.
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverCompleted.countDown();
+ }
+
+ /**
+ * Override so we can block until failover has completd
+ *
+ * @param port
+ */
+ @Override
+ public void failBroker(int port)
+ {
+ super.failBroker(port);
+
+ try
+ {
+ if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ {
+ fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Failover was interuppted");
+ }
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
index 6558fb4d09..12ed66b3d7 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
@@ -28,10 +28,12 @@ import javax.jms.MessageListener;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
{
private CountDownLatch _receviedAll;
+ private AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
@Override
public void setUp() throws Exception
@@ -49,14 +51,21 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
protected void testAcking(boolean transacted, int mode) throws Exception
{
init(transacted, mode);
-
_consumer.setMessageListener(this);
_connection.start();
- if (!_receviedAll.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS))
{
- fail("failover did not complete");
+ fail("All messages not received.");
+ }
+
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
}
_consumer.close();
@@ -91,8 +100,17 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
}
}
+ /**
+ * Pass the given exception back to the waiting thread to fail the test run.
+ * @param e The exception that is causing the test to fail.
+ */
protected void fail(Exception e)
{
-
+ _causeOfFailure.set(e);
+ // End the test.
+ while (_receviedAll.getCount() != 0)
+ {
+ _receviedAll.countDown();
+ }
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
new file mode 100644
index 0000000000..834b17430b
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.jms.Session;
+
+import javax.jms.Message;
+import javax.jms.Queue;
+
+public class FailoverBeforeConsumingRecoverTest extends RecoverTest
+{
+
+ @Override
+ protected void initTest() throws Exception
+ {
+ super.initTest();
+ failBroker(getFailingPort());
+
+ Queue queue = _consumerSession.createQueue(getTestQueueName());
+ sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
new file mode 100644
index 0000000000..6c4b7ba01b
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * This is a quick manual test to validate acking after failover with a
+ * transacted session.
+ *
+ * Start an external broker then run this test. Std Err will print.
+ * Sent Message: 1
+ * Received Message: 1
+ *
+ * You can then restart the external broker, which will cause failover, which
+ * will be complete when the following appears.
+ *
+ * Failover Complete
+ *
+ * A second message send/receive cycle is then done to validate that the
+ * connection/session are still working.
+ *
+ */
+public class QuickAcking extends QpidTestCase implements ConnectionListener
+{
+ protected AMQConnection _connection;
+ protected Queue _queue;
+ protected Session _session;
+ protected MessageConsumer _consumer;
+ private CountDownLatch _failedOver;
+ private static final String INDEX = "INDEX";
+ private int _count = 0;
+
+ public void setUp()
+ {
+ // Prevent broker startup. Broker must be run manually.
+ }
+
+ public void test() throws Exception
+ {
+ _failedOver = new CountDownLatch(1);
+
+ _connection = new AMQConnection("amqp://guest:guest@client/test?brokerlist='localhost?retries='20'&connectdelay='2000''");
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue("QAtest");
+ _consumer = _session.createConsumer(_queue);
+ _connection.setConnectionListener(this);
+ _connection.start();
+
+ sendAndReceive();
+
+ _failedOver.await();
+
+ sendAndReceive();
+
+ }
+
+ private void sendAndReceive()
+ throws Exception
+ {
+ sendMessage();
+
+ Message message = _consumer.receive();
+
+ if (message.getIntProperty(INDEX) != _count)
+ {
+ throw new Exception("Incorrect message recieved:" + _count);
+ }
+
+ if (_session.getTransacted())
+ {
+ _session.commit();
+ }
+ System.err.println("Recevied Message:" + _count);
+ }
+
+ private void sendMessage() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+ Message message = _session.createMessage();
+ _count++;
+ message.setIntProperty(INDEX, _count);
+
+ producer.send(message);
+ if (_session.getTransacted())
+ {
+ _session.commit();
+ }
+ producer.close();
+
+ System.err.println("Sent Message:" + _count);
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ System.err.println("Failover Complete");
+ _failedOver.countDown();
+ }
+}