summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-18 18:05:25 +0000
committerRobert Greig <rgreig@apache.org>2006-12-18 18:05:25 +0000
commit5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b (patch)
tree6bbffb82ac5a1a2d16a360936201f515dd863c90 /java/client/src/test
parent9876d09ea5ec9718cf7c3e994bb4588ce42b7e17 (diff)
downloadqpid-python-5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b.tar.gz
QPID-212 QPID-214 Patch supplied by Rob Godfrey
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java61
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java84
2 files changed, 131 insertions, 14 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 817fcfb9e8..9f31f7f010 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,16 +19,15 @@
*/
package org.apache.qpid.test.unit.ack;
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.log4j.Logger;
import javax.jms.*;
-import junit.framework.TestCase;
-
public class RecoverTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
@@ -43,11 +42,9 @@ public class RecoverTest extends TestCase
{
super.tearDown();
TransportConnection.killAllVMBrokers();
- //Thread.sleep(2000);
}
-
public void testRecoverResendsMsgs() throws Exception
{
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
@@ -147,7 +144,7 @@ public class RecoverTest extends TestCase
_logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message)tm3).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
@@ -155,7 +152,7 @@ public class RecoverTest extends TestCase
tm4 = (TextMessage) consumer.receive(3000);
assertEquals("msg4", tm4.getText());
- ((org.apache.qpid.jms.Message)tm4).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
@@ -178,8 +175,6 @@ public class RecoverTest extends TestCase
Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -196,7 +191,7 @@ public class RecoverTest extends TestCase
TextMessage tm2 = (TextMessage) consumer2.receive();
assertNotNull(tm2);
- assertEquals("msg2",tm2.getText());
+ assertEquals("msg2", tm2.getText());
tm2.acknowledge();
@@ -204,13 +199,51 @@ public class RecoverTest extends TestCase
TextMessage tm1 = (TextMessage) consumer.receive(2000);
assertNotNull(tm1);
- assertEquals("msg1",tm1.getText());
+ assertEquals("msg1", tm1.getText());
con.close();
}
-
+ public void testRecoverInAutoAckListener() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("Q1", "Q1", false, true);
+ MessageProducer producer = consumerSession.createProducer(queue);
+ producer.send(consumerSession.createTextMessage("hello"));
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int count = 0;
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (count++ == 0)
+ {
+ assertFalse(message.getJMSRedelivered());
+ consumerSession.recover();
+ }
+ else if (count++ == 1)
+ {
+ assertTrue(message.getJMSRedelivered());
+ }
+ else
+ {
+ fail("Message delivered too many times!");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error recovering session: " + e, e);
+ }
+ }
+ });
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(RecoverTest.class);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
new file mode 100644
index 0000000000..de517459df
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CloseWithBlockingReceiveTest extends TestCase
+{
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testReceiveReturnsNull() throws Exception
+ {
+ final Connection connection = new AMQConnection("vm://:1", "guest", "guest",
+ "fred", "/test");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(new AMQTopic("banana"));
+ connection.start();
+
+ Runnable r = new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(1000);
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ };
+ long startTime = System.currentTimeMillis();
+ new Thread(r).start();
+ Message m = consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() - startTime < 10000);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class);
+ }
+
+}