summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-20 08:11:05 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-20 08:11:05 +0000
commit5bb25f3a489e69aea6a95a80d35f8f8c1a9e6e1d (patch)
tree48439f9af0bda4a7294f46db3d39aec31b34c304 /java/client
parentd71103f736b1ec28550729ba22ffec6e5bd3979b (diff)
downloadqpid-python-5bb25f3a489e69aea6a95a80d35f8f8c1a9e6e1d.tar.gz
Reinstated the two consumer receive test.
Added additional test class to cover the IMMEDIATE_PREFETCHs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530683 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java70
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java72
2 files changed, 119 insertions, 23 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
new file mode 100644
index 0000000000..9e48914431
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest
+{
+
+
+ protected void setUp() throws Exception
+ {
+
+ System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
+ super.setUp();
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 794fd5c8c1..c9407d8ff6 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -62,7 +62,8 @@ public class MessageListenerMultiConsumerTest extends TestCase
private Connection _clientConnection;
private MessageConsumer _consumer1;
private MessageConsumer _consumer2;
-
+ private Session _clientSession1;
+ private Queue _queue;
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
@@ -76,25 +77,25 @@ public class MessageListenerMultiConsumerTest extends TestCase
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "direct://amq.direct//"+this.getClass().getName());
_context = factory.getInitialContext(env);
- Queue queue = (Queue) _context.lookup("queue");
+ _queue = (Queue) _context.lookup("queue");
//Create Client 1
_clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
_clientConnection.start();
- Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer1 = clientSession1.createConsumer(queue);
+ _consumer1 = _clientSession1.createConsumer(_queue);
//Create Client 2
Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer2 = clientSession2.createConsumer(queue);
+ _consumer2 = clientSession2.createConsumer(_queue);
//Create Producer
Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
@@ -104,7 +105,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ MessageProducer producer = producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
@@ -123,20 +124,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
TransportConnection.killAllVMBrokers();
}
-// public void testRecieveC1thenC2() throws Exception
-// {
-//
-// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-// {
-//
-// assertTrue(_consumer1.receive() != null);
-// }
-//
-// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-// {
-// assertTrue(_consumer2.receive() != null);
-// }
-// }
public void testRecieveInterleaved() throws Exception
{
@@ -206,10 +193,12 @@ public class MessageListenerMultiConsumerTest extends TestCase
assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
- public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception
+ public void testRecieveC2Only() throws Exception
{
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ if (!Boolean.parseBoolean(System.getProperties().
+ getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
{
+ _logger.info("Performing Receive only on C2");
for (int msg = 0; msg < MSG_COUNT; msg++)
{
assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg,
@@ -218,6 +207,43 @@ public class MessageListenerMultiConsumerTest extends TestCase
}
}
+ public void testRecieveBoth() throws Exception
+ {
+ if (!Boolean.parseBoolean(System.getProperties().
+ getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
+ {
+ _logger.info("Performing Receive only with two consumers on one session ");
+
+ MessageConsumer consumer2 = _clientSession1.createConsumer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(consumer2.receive() != null);
+ }
+ }
+ else
+ {
+ _logger.info("Performing Receive only on both C1 and C2");
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+ }
+
public static junit.framework.Test suite()
{