diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
| commit | de248153d311b1e0211dfe3230afcb306f3c0192 (patch) | |
| tree | 30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/client/src/test | |
| parent | f74e4dc27d1655760d0213fd60cc75c272c26f00 (diff) | |
| download | qpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz | |
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
BROKER
AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver.
BasicRejectMethodHandler - initial place holder.
TxRollbackHandler - Added comment
AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue.
AMQQueue - added the queue reference to the Subscription creation
ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355
DeliveryManager - adjusted deliver call to allow delivery to the head of the queue.
Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed.
SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription.
SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure.
SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue.
AMQStateManager - Added BasicRejectMethodHandler
TransactionalContext - Added option to deliver the messages to the front of the queue.
LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue.
NonTransactionalContext - Added option to deliver the messages to the front of the queue.
DeliverMessageOperation.java DELELTED AS NOT USED.
CLIENT
AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover.
BasicMessageConsumer - updated the rollback so that it sends reject messages to server.
AbstractJMSMessage - whitespace + added extra message properties to the toString()
AMQProtocolHandler - whitespace + extra debug output
TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on.
CLUSTER
ClusteredQueue - AMQQueue changes for message deliveryFirst.
RemoteSubscriptionImpl - Implementation of Subscription
SYSTESTS
AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst.
AMQQueueMBeanTest - changes for message deliveryFirst.
ConcurrencyTest - changes for message deliveryFirst.
DeliveryManagerTest - changes for message deliveryFirst.
SubscriptionTestHelper - Implementation of Subscription
WhiteSpace only
UnacknowledgedMessageMapImpl.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java | 375 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java | 22 |
2 files changed, 390 insertions, 7 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java new file mode 100644 index 0000000000..0d75a6b968 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.transacted; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +/** + * This class tests a number of commits and roll back scenarios + * + * Assumptions; - Assumes empty Queue + */ +public class CommitRollbackTest extends TestCase +{ + protected AMQConnection conn; + protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected String payload = "xyzzy"; + private Session _session; + private MessageProducer _publisher; + private Session _pubSession; + private MessageConsumer _consumer; + Queue _jmsQueue; + + private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class); + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + newConnection(); + } + + private void newConnection() throws AMQException, URLSyntaxException, JMSException + { + conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"); + + _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + + _jmsQueue = _session.createQueue(queue); + _consumer = _session.createConsumer(_jmsQueue); + + _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + + _publisher = _pubSession.createProducer(_pubSession.createQueue(queue)); + + conn.start(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + conn.close(); + TransportConnection.killVMBroker(1); + } + + /** PUT a text message, disconnect before commit, confirm it is gone. */ + public void testPutThenDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("reconnecting without commit"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + //commit to ensure message is removed from queue + _session.commit(); + + assertNull("test message was put and disconnected before commit, but is still present", result); + } + + /** PUT a text message, disconnect before commit, confirm it is gone. */ + public void testPutThenCloseDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("closing publisher without commit"); + _publisher.close(); + + _logger.info("reconnecting without commit"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + //commit to ensure message is removed from queue + _session.commit(); + + assertNull("test message was put and disconnected before commit, but is still present", result); + } + + /** + * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different + * session as producer + */ + public void testPutThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("rolling back"); + _pubSession.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + assertNull("test message was put and rolled back, but is still present", result); + } + + /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */ + public void testGetThenDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + assertNotNull("retrieved message is null", msg); + + _logger.info("closing connection"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + _session.commit(); + + assertNotNull("test message was consumed and disconnected before commit, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + } + + /** + * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the + * same connection but different session as producer + */ + public void testGetThenCloseDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenCloseDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("reconnecting without commit"); + _consumer.close(); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + _session.commit(); + + assertNotNull("test message was consumed and disconnected before commit, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + } + + /** + * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt + * session to the producer + */ + public void testGetThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("rolling back"); + + _session.rollback(); + + _logger.info("receiving result"); + + Message result = _consumer.receive(1000); + + _session.commit(); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } + + /** + * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same + * connection but different session as producer + */ + public void testGetThenCloseRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenCloseRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("Closing consumer"); + _consumer.close(); + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("receiving result"); + + _consumer = _session.createConsumer(_jmsQueue); + + Message result = _consumer.receive(1000); + + _session.commit(); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } + + + /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */ + public void testSend2ThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); + + _logger.info("getting test message"); + assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + + result = _consumer.receive(1000); + + assertNull("test message should be null", result); + } + + public void testSend2ThenCloseAfter1andTryAgain() throws Exception + { +// assertTrue("session is not transacted", _session.getTransacted()); +// assertTrue("session is not transacted", _pubSession.getTransacted()); +// +// _logger.info("sending two test messages"); +// _publisher.send(_pubSession.createTextMessage("1")); +// _publisher.send(_pubSession.createTextMessage("2")); +// _pubSession.commit(); +// +// _logger.info("getting test message"); +// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); +// +// _consumer.close(); +// +// _consumer = _session.createConsumer(_jmsQueue); +// +// _logger.info("receiving result"); +// Message result = _consumer.receive(1000); +// _logger.error("1:" + result); +//// assertNotNull("test message was consumed and rolled back, but is gone", result); +//// assertEquals("1" , ((TextMessage) result).getText()); +//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// _logger.error("2" + result); +//// assertNotNull("test message was consumed and rolled back, but is gone", result); +//// assertEquals("2", ((TextMessage) result).getText()); +//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// _logger.error("3" + result); +// assertNull("test message should be null:" + result, result); + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 4296e43f88..94cbb426e5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -63,12 +63,11 @@ public class TransactedTest extends TestCase super.setUp(); TransportConnection.createVMBroker(1); con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); - session = con.createSession(true, 0); + session = con.createSession(true, Session.SESSION_TRANSACTED); queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); - consumer1 = session.createConsumer(queue1); //Dummy just to create the queue. MessageConsumer consumer2 = session.createConsumer(queue2); @@ -81,7 +80,6 @@ public class TransactedTest extends TestCase prepProducer1 = prepSession.createProducer(queue1); prepCon.start(); - //add some messages prepProducer1.send(prepSession.createTextMessage("A")); prepProducer1.send(prepSession.createTextMessage("B")); @@ -127,24 +125,33 @@ public class TransactedTest extends TestCase public void testRollback() throws Exception { + _logger.info("Sending X Y Z"); producer2.send(session.createTextMessage("X")); producer2.send(session.createTextMessage("Y")); producer2.send(session.createTextMessage("Z")); + _logger.info("Receiving A B"); expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); - expect("C", consumer1.receive(1000)); + //Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it. //rollback + _logger.info("rollback"); session.rollback(); + _logger.info("Receiving A B C"); //ensure sent messages are not visible and received messages are requeued expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); expect("C", consumer1.receive(1000)); + + _logger.info("Starting new connection"); testCon.start(); testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Testing we have no messages left"); assertTrue(null == testConsumer1.receive(1000)); assertTrue(null == testConsumer2.receive(1000)); + + session.commit(); } public void testResendsMsgsAfterSessionClose() throws Exception @@ -152,7 +159,7 @@ public class TransactedTest extends TestCase AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("Q3"), false); + AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); @@ -225,8 +232,9 @@ public class TransactedTest extends TestCase private void expect(String text, Message msg) throws JMSException { - assertTrue(msg instanceof TextMessage); - assertEquals(text, ((TextMessage) msg).getText()); + assertNotNull("Message should not be null", msg); + assertTrue("Message should be a text message", msg instanceof TextMessage); + assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText()); } public static junit.framework.Test suite() |
