diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-18 18:05:25 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-18 18:05:25 +0000 |
| commit | 5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b (patch) | |
| tree | 6bbffb82ac5a1a2d16a360936201f515dd863c90 /java/client/src/test | |
| parent | 9876d09ea5ec9718cf7c3e994bb4588ce42b7e17 (diff) | |
| download | qpid-python-5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b.tar.gz | |
QPID-212 QPID-214 Patch supplied by Rob Godfrey
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
2 files changed, 131 insertions, 14 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 817fcfb9e8..9f31f7f010 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,16 +19,15 @@ */ package org.apache.qpid.test.unit.ack; +import junit.framework.TestCase; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.log4j.Logger; import javax.jms.*; -import junit.framework.TestCase; - public class RecoverTest extends TestCase { private static final Logger _logger = Logger.getLogger(RecoverTest.class); @@ -43,11 +42,9 @@ public class RecoverTest extends TestCase { super.tearDown(); TransportConnection.killAllVMBrokers(); - //Thread.sleep(2000); } - public void testRecoverResendsMsgs() throws Exception { Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); @@ -147,7 +144,7 @@ public class RecoverTest extends TestCase _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); - ((org.apache.qpid.jms.Message)tm3).acknowledgeThis(); + ((org.apache.qpid.jms.Message) tm3).acknowledgeThis(); _logger.info("Calling recover"); // all acked so no messages to be delivered @@ -155,7 +152,7 @@ public class RecoverTest extends TestCase tm4 = (TextMessage) consumer.receive(3000); assertEquals("msg4", tm4.getText()); - ((org.apache.qpid.jms.Message)tm4).acknowledgeThis(); + ((org.apache.qpid.jms.Message) tm4).acknowledgeThis(); _logger.info("Calling recover"); // all acked so no messages to be delivered @@ -178,8 +175,6 @@ public class RecoverTest extends TestCase Queue queue2 = new AMQQueue("Q2", "Q2", false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -196,7 +191,7 @@ public class RecoverTest extends TestCase TextMessage tm2 = (TextMessage) consumer2.receive(); assertNotNull(tm2); - assertEquals("msg2",tm2.getText()); + assertEquals("msg2", tm2.getText()); tm2.acknowledge(); @@ -204,13 +199,51 @@ public class RecoverTest extends TestCase TextMessage tm1 = (TextMessage) consumer.receive(2000); assertNotNull(tm1); - assertEquals("msg1",tm1.getText()); + assertEquals("msg1", tm1.getText()); con.close(); } - + public void testRecoverInAutoAckListener() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = new AMQQueue("Q1", "Q1", false, true); + MessageProducer producer = consumerSession.createProducer(queue); + producer.send(consumerSession.createTextMessage("hello")); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() + { + private int count = 0; + + public void onMessage(Message message) + { + try + { + if (count++ == 0) + { + assertFalse(message.getJMSRedelivered()); + consumerSession.recover(); + } + else if (count++ == 1) + { + assertTrue(message.getJMSRedelivered()); + } + else + { + fail("Message delivered too many times!"); + } + } + catch (JMSException e) + { + _logger.error("Error recovering session: " + e, e); + } + } + }); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(RecoverTest.class); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java new file mode 100644 index 0000000000..de517459df --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -0,0 +1,84 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.transport.TransportConnection; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageConsumer; +import javax.jms.Message; + +/** + * @author Apache Software Foundation + */ +public class CloseWithBlockingReceiveTest extends TestCase +{ + + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testReceiveReturnsNull() throws Exception + { + final Connection connection = new AMQConnection("vm://:1", "guest", "guest", + "fred", "/test"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new AMQTopic("banana")); + connection.start(); + + Runnable r = new Runnable() + { + + public void run() + { + try + { + Thread.sleep(1000); + connection.close(); + } + catch (Exception e) + { + } + } + }; + long startTime = System.currentTimeMillis(); + new Thread(r).start(); + Message m = consumer.receive(10000); + assertTrue(System.currentTimeMillis() - startTime < 10000); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class); + } + +} |
