diff options
Diffstat (limited to 'java/client/src/test')
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java | 89 |
1 files changed, 79 insertions, 10 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 27a2b6a5e9..295bb80306 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -36,18 +36,21 @@ public class RecoverTest extends TestCase { private static final Logger _logger = Logger.getLogger(RecoverTest.class); - static + protected void setUp() throws Exception { - String workdir = System.getProperty("QPID_WORK"); - if (workdir == null || workdir.equals("")) - { - String tempdir = System.getProperty("java.io.tmpdir"); - System.out.println("QPID_WORK not set using tmp directory: " + tempdir); - System.setProperty("QPID_WORK", tempdir); - } - DOMConfigurator.configure("../broker/etc/log4j.xml"); + super.setUp(); + TransportConnection.createVMBroker(1); } + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + //Thread.sleep(2000); + } + + + public void testRecoverResendsMsgs() throws Exception { Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); @@ -104,8 +107,74 @@ public class RecoverTest extends TestCase con.close(); } + + public void testRecoverResendsMsgsAckOnEarlier() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("someQ", "someQ", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + TextMessage tm2 = (TextMessage) consumer.receive(); + tm.acknowledge(); + _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both"); + + consumer.receive(); + consumer.receive(); + _logger.info("Received all four messages. Calling recover with two outstanding messages"); + // no ack for last three messages so when I call recover I expect to get three messages back + consumerSession.recover(); + TextMessage tm3 = (TextMessage) consumer.receive(3000); + assertEquals("msg3", tm3.getText()); + + TextMessage tm4 = (TextMessage) consumer.receive(3000); + assertEquals("msg4", tm4.getText()); + + + _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); + ((org.apache.qpid.jms.Message)tm3).acknowledgeThis(); + + _logger.info("Calling recover"); + // all acked so no messages to be delivered + consumerSession.recover(); + + tm4 = (TextMessage) consumer.receive(3000); + assertEquals("msg4", tm4.getText()); + ((org.apache.qpid.jms.Message)tm4).acknowledgeThis(); + + _logger.info("Calling recover"); + // all acked so no messages to be delivered + consumerSession.recover(); + + + tm = (TextMessage) consumer.receiveNoWait(); + assertNull(tm); + _logger.info("No messages redelivered as is expected"); + + con.close(); + } + + public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(RecoverTest.class)); + return new junit.framework.TestSuite(RecoverTest.class); } } |
