summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-16 21:55:31 +0000
committerRobert Greig <rgreig@apache.org>2006-12-16 21:55:31 +0000
commitc95df4c1fdccd685442599c1aaa780c8dd7424b9 (patch)
tree81ed196b5c01a28ac07e684657c8b63218fb9fba /java/client/src/test
parentbcf477d8c2a3e30e5ec109d7af5861d7f344eff1 (diff)
downloadqpid-python-c95df4c1fdccd685442599c1aaa780c8dd7424b9.tar.gz
QPID-207 : Patch supplied by Rob Godfrey - fix implementation of acknowledge as per JMS spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487903 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java89
1 files changed, 79 insertions, 10 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 27a2b6a5e9..295bb80306 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -36,18 +36,21 @@ public class RecoverTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
- static
+ protected void setUp() throws Exception
{
- String workdir = System.getProperty("QPID_WORK");
- if (workdir == null || workdir.equals(""))
- {
- String tempdir = System.getProperty("java.io.tmpdir");
- System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
- System.setProperty("QPID_WORK", tempdir);
- }
- DOMConfigurator.configure("../broker/etc/log4j.xml");
+ super.setUp();
+ TransportConnection.createVMBroker(1);
}
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ //Thread.sleep(2000);
+ }
+
+
+
public void testRecoverResendsMsgs() throws Exception
{
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
@@ -104,8 +107,74 @@ public class RecoverTest extends TestCase
con.close();
}
+
+ public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ TextMessage tm2 = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
+
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. Calling recover with two outstanding messages");
+ // no ack for last three messages so when I call recover I expect to get three messages back
+ consumerSession.recover();
+ TextMessage tm3 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg3", tm3.getText());
+
+ TextMessage tm4 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm4.getText());
+
+
+ _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
+ ((org.apache.qpid.jms.Message)tm3).acknowledgeThis();
+
+ _logger.info("Calling recover");
+ // all acked so no messages to be delivered
+ consumerSession.recover();
+
+ tm4 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm4.getText());
+ ((org.apache.qpid.jms.Message)tm4).acknowledgeThis();
+
+ _logger.info("Calling recover");
+ // all acked so no messages to be delivered
+ consumerSession.recover();
+
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ con.close();
+ }
+
+
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(RecoverTest.class));
+ return new junit.framework.TestSuite(RecoverTest.class);
}
}