summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java362
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java233
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java44
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java252
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java258
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java228
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java133
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java82
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes4
-rw-r--r--qpid/java/test-profiles/CPPPrefetchExcludes4
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes3
-rw-r--r--qpid/java/test-profiles/JavaPre010Excludes2
13 files changed, 506 insertions, 1106 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java
new file mode 100644
index 0000000000..09402c140d
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.LogMonitor;
+
+/**
+ * Tests the behaviour of JMS asynchronous message listeners as provided by
+ * {@link MessageListener#onMessage(Message)}.
+ *
+ */
+public class AsynchMessageListenerTest extends QpidBrokerTestCase
+{
+ private static final int MSG_COUNT = 10;
+ private static final long AWAIT_MESSAGE_TIMEOUT = 2000;
+ private static final long AWAIT_MESSAGE_TIMEOUT_NEGATIVE = 250;
+ private final String _testQueueName = getTestQueueName();
+ private Connection _consumerConnection;
+ private Session _consumerSession;
+ private MessageConsumer _consumer;
+ private Queue _queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _consumerConnection = getConnection();
+ _consumerConnection.start();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _queue = _consumerSession.createQueue(_testQueueName);
+ _consumer = _consumerSession.createConsumer(_queue);
+
+ // Populate queue
+ Connection producerConnection = getConnection();
+ Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ sendMessage(producerSession, _queue, MSG_COUNT);
+ producerConnection.close();
+
+ }
+
+ public void testMessageListener() throws Exception
+ {
+ CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT);
+ _consumer.setMessageListener(countingMessageListener);
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount());
+ }
+
+ public void testSynchronousReceiveFollowedByMessageListener() throws Exception
+ {
+ // Receive initial message synchronously
+ assertNotNull("Could not receive first message synchronously", _consumer.receive(AWAIT_MESSAGE_TIMEOUT) != null);
+ final int numberOfMessagesToReceiveByMessageListener = MSG_COUNT - 1;
+
+ // Consume remainder asynchronously
+ CountingMessageListener countingMessageListener = new CountingMessageListener(numberOfMessagesToReceiveByMessageListener);
+ _consumer.setMessageListener(countingMessageListener);
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount());
+ }
+
+ public void testMessageListenerSetDisallowsSynchronousReceive() throws Exception
+ {
+ CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT);
+ _consumer.setMessageListener(countingMessageListener);
+
+ try
+ {
+ _consumer.receive();
+ fail("Exception not thrown");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ assertEquals("A listener has already been set.", e.getMessage());
+ }
+ }
+
+
+ public void testConnectionStopThenStart() throws Exception
+ {
+ int messageToReceivedBeforeConnectionStop = 2;
+ CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
+
+ // Consume at least two messages
+ _consumer.setMessageListener(countingMessageListener);
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ _consumerConnection.stop();
+
+ assertTrue("Too few messages received afer Connection#stop()", countingMessageListener.getReceivedCount() >= messageToReceivedBeforeConnectionStop);
+ countingMessageListener.resetLatch();
+
+ // Restart connection
+ _consumerConnection.start();
+
+ // Consume the remainder
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount());
+ }
+
+ public void testConnectionStopAndMessageListenerChange() throws Exception
+ {
+ int messageToReceivedBeforeConnectionStop = 2;
+ CountingMessageListener countingMessageListener1 = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
+
+ // Consume remainder asynchronously
+ _consumer.setMessageListener(countingMessageListener1);
+ countingMessageListener1.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ _consumerConnection.stop();
+ assertTrue("Too few messages received afer Connection#stop()", countingMessageListener1.getReceivedCount() >= messageToReceivedBeforeConnectionStop);
+
+ CountingMessageListener countingMessageListener2 = new CountingMessageListener(countingMessageListener1.getOutstandingCount());
+
+ // Reset Message Listener
+ _consumer.setMessageListener(countingMessageListener2);
+
+ _consumerConnection.start();
+
+ // Consume the remainder
+ countingMessageListener2.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener2.getOutstandingCount());
+
+ }
+
+ public void testConnectionStopHaltsDeliveryToListener() throws Exception
+ {
+ int messageToReceivedBeforeConnectionStop = 2;
+ CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
+
+ // Consume at least two messages
+ _consumer.setMessageListener(countingMessageListener);
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ _consumerConnection.stop();
+
+ // Connection should now be stopped and listener should receive no more
+ final int outstandingCountAtStop = countingMessageListener.getOutstandingCount();
+ countingMessageListener.resetLatch();
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT_NEGATIVE);
+
+ assertEquals("Unexpected number of outstanding messages", outstandingCountAtStop, countingMessageListener.getOutstandingCount());
+ }
+
+ public void testSessionCloseHaltsDelivery() throws Exception
+ {
+ int messageToReceivedBeforeConnectionStop = 2;
+ CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
+
+ // Consume at least two messages
+ _consumer.setMessageListener(countingMessageListener);
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ _consumerSession.close();
+
+ // Once a session is closed, the listener should receive no more
+ final int outstandingCountAtClose = countingMessageListener.getOutstandingCount();
+ countingMessageListener.resetLatch();
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT_NEGATIVE);
+
+ assertEquals("Unexpected number of outstanding messages", outstandingCountAtClose, countingMessageListener.getOutstandingCount());
+ }
+
+ public void testImmediatePrefetchWithMessageListener() throws Exception
+ {
+ // Close connection provided by setup so we can set IMMEDIATE_PREFETCH
+ _consumerConnection.close();
+ setTestClientSystemProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
+
+ _consumerConnection = getConnection();
+ _consumerConnection.start();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumer = _consumerSession.createConsumer(_queue);
+ CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT);
+ _consumer.setMessageListener(countingMessageListener);
+
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+
+ assertEquals("Unexpected number of messages received", MSG_COUNT, countingMessageListener.getReceivedCount());
+ }
+
+ public void testReceiveTwoConsumers() throws Exception
+ {
+ Session consumerSession2 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer _consumer2 = consumerSession2.createConsumer(_queue);
+
+ CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT);
+ _consumer.setMessageListener(countingMessageListener);
+ _consumer2.setMessageListener(countingMessageListener);
+
+ countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT);
+ assertEquals("Unexpected number of messages received", MSG_COUNT, countingMessageListener.getReceivedCount());
+ }
+
+ /**
+ * Tests the case where the message listener throws an java.lang.Error.
+ * TODO - a useful test?.
+ */
+ public void testMessageListenerThrowsError() throws Exception
+ {
+ int expectedMessages = 1; // The error will kill the dispatcher so only one message will be delivered.
+ final CountDownLatch awaitMessages = new CountDownLatch(expectedMessages);
+ final AtomicInteger receivedCount = new AtomicInteger(0);
+ final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error";
+ CountingExceptionListener countingExceptionListener = new CountingExceptionListener();
+ _consumerConnection.setExceptionListener(countingExceptionListener);
+
+ _consumer.setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(Message message)
+ {
+ try
+ {
+ throw new Error(javaLangErrorMessageText);
+ }
+ finally
+ {
+ receivedCount.incrementAndGet();
+ awaitMessages.countDown();
+ }
+ }
+ });
+
+ awaitMessages.await(AWAIT_MESSAGE_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ assertEquals("Unexpected number of messages received", expectedMessages, receivedCount.get());
+ assertEquals("onException should NOT have been called", 0, countingExceptionListener.getErrorCount());
+
+ // Check that Error has been written to the application log.
+
+ LogMonitor _monitor = new LogMonitor(_outputFile);
+ assertTrue("The expected message not written to log file.",
+ _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT));
+
+ if (_consumerConnection != null)
+ {
+ try
+ {
+ _consumerConnection.close();
+ }
+ catch (JMSException e)
+ {
+ // Ignore connection close errors for this test.
+ }
+ finally
+ {
+ _consumerConnection = null;
+ }
+ }
+ }
+
+ private final class CountingExceptionListener implements ExceptionListener
+ {
+ private final AtomicInteger _errorCount = new AtomicInteger();
+
+ @Override
+ public void onException(JMSException arg0)
+ {
+ _errorCount.incrementAndGet();
+ }
+
+ public int getErrorCount()
+ {
+ return _errorCount.intValue();
+ }
+ }
+
+ private final class CountingMessageListener implements MessageListener
+ {
+ private volatile CountDownLatch _awaitMessages;
+ private final AtomicInteger _receivedCount;
+ private final AtomicInteger _outstandingMessageCount;
+
+ public CountingMessageListener(final int totalExpectedMessageCount)
+ {
+ this(totalExpectedMessageCount, totalExpectedMessageCount);
+ }
+
+
+ public CountingMessageListener(int totalExpectedMessageCount, int numberOfMessagesToAwait)
+ {
+ _receivedCount = new AtomicInteger(0);
+ _outstandingMessageCount = new AtomicInteger(totalExpectedMessageCount);
+ _awaitMessages = new CountDownLatch(numberOfMessagesToAwait);
+ }
+
+ public int getOutstandingCount()
+ {
+ return _outstandingMessageCount.get();
+ }
+
+ public int getReceivedCount()
+ {
+ return _receivedCount.get();
+ }
+
+ public void resetLatch()
+ {
+ _awaitMessages = new CountDownLatch(_outstandingMessageCount.get());
+ }
+
+ @Override
+ public void onMessage(Message message)
+ {
+ _receivedCount.incrementAndGet();
+ _outstandingMessageCount.decrementAndGet();
+ _awaitMessages.countDown();
+ }
+
+ public boolean awaitMessages(long timeout)
+ {
+ try
+ {
+ return _awaitMessages.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java
deleted file mode 100644
index 3537dd0533..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.client;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.Context;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
- */
-public class DispatcherTest extends QpidBrokerTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(DispatcherTest.class);
-
- private Context _context;
-
- private static final int MSG_COUNT = 6;
- private int _receivedCount = 0;
- private int _receivedCountWhileStopped = 0;
- private Connection _clientConnection, _producerConnection;
- private MessageConsumer _consumer;
- private MessageProducer _producer;
- private Session _clientSession, _producerSession;
-
- private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); // all messages Sent Lock
- private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); // all messages Sent Lock
-
- private volatile boolean _connectionStopped = false;
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- // Create Client 1
- _clientConnection = getConnection();
-
- _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = _clientSession.createQueue(this.getClass().getName());
- _consumer = _clientSession.createConsumer(queue);
-
- // Create Producer
- _producerConnection = getConnection();
-
- _producerConnection.start();
-
- _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _producer = _producerSession.createProducer(queue);
-
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- _producer.send(_producerSession.createTextMessage("Message " + msg));
- }
- }
-
- protected void tearDown() throws Exception
- {
-
- _clientConnection.close();
-
- _producerConnection.close();
- super.tearDown();
- }
-
- public void testAsynchronousRecieve()
- {
- _logger.info("Test Start");
-
- assertTrue(!((AMQConnection) _clientConnection).started());
-
- // Set default Message Listener
- try
- {
- _consumer.setMessageListener(new MessageListener()
- {
- public void onMessage(Message message)
- {
- _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message);
-
- _receivedCount++;
-
- if (_receivedCount == MSG_COUNT)
- {
- _allFirstMessagesSent.countDown();
- }
-
- if (_connectionStopped)
- {
- _logger.info("Running with Message:" + _receivedCount);
- }
-
- if (_connectionStopped && (_allFirstMessagesSent.getCount() == 0))
- {
- _receivedCountWhileStopped++;
- }
-
- if (_allFirstMessagesSent.getCount() == 0)
- {
- if (_receivedCount == (MSG_COUNT * 2))
- {
- _allSecondMessagesSent.countDown();
- }
- }
- }
- });
-
- assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started());
- _clientConnection.start();
- }
- catch (JMSException e)
- {
- _logger.error("Error Setting Default ML on consumer1");
- }
-
- try
- {
- _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
-
- try
- {
- assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started());
- _clientConnection.stop();
- _connectionStopped = true;
- }
- catch (JMSException e)
- {
- _logger.error("Error stopping connection");
- }
-
- try
- {
- _logger.error("Send additional messages");
-
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- _producer.send(_producerSession.createTextMessage("Message " + msg));
- }
- }
- catch (JMSException e)
- {
- _logger.error("Unable to send additional messages", e);
- }
-
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e)
- {
- // ignore
- }
-
- try
- {
- _logger.info("Restarting connection");
-
- _connectionStopped = false;
- _clientConnection.start();
- }
- catch (JMSException e)
- {
- _logger.error("Error Setting Better ML on consumer1", e);
- }
-
- _logger.info("Waiting upto 2 seconds for messages");
-
- try
- {
- _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
-
- assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount());
- assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount());
- assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount);
- assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(DispatcherTest.class);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
deleted file mode 100644
index 7461f6c200..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.client;
-
-/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
- * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
- * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
- * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
- * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
- * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
- * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
- */
-public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest
-{
- protected void setUp() throws Exception
- {
- System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
- super.setUp();
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
deleted file mode 100644
index 4fd10a0134..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.client;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.Context;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
- * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
- * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
- * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
- * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
- * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
- * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
- */
-public class MessageListenerMultiConsumerTest extends QpidBrokerTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MessageListenerMultiConsumerTest.class);
-
- private Context _context;
-
- private static final int MSG_COUNT = 6;
- private int receivedCount1 = 0;
- private int receivedCount2 = 0;
- private Connection _clientConnection;
- private MessageConsumer _consumer1;
- private MessageConsumer _consumer2;
- private Session _clientSession1;
- private Queue _queue;
- private final CountDownLatch _allMessagesSent = new CountDownLatch(2); // all messages Sent Lock
- private static final String QUEUE_NAME = "queue" + UUID.randomUUID().toString();
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- // Create Client 1
- _clientConnection = getConnection("guest", "guest");
-
- _clientConnection.start();
-
- _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _queue =_clientSession1.createQueue(QUEUE_NAME);
-
- _consumer1 = _clientSession1.createConsumer(_queue);
-
- // Create Client 2
- Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _consumer2 = clientSession2.createConsumer(_queue);
-
- // Create Producer
- Connection producerConnection = getConnection("guest", "guest");
-
- producerConnection.start();
-
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer = producerSession.createProducer(_queue);
-
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- producer.send(producerSession.createTextMessage("Message " + msg));
- }
-
- producerConnection.close();
-
- }
-
- protected void tearDown() throws Exception
- {
- _clientConnection.close();
- super.tearDown();
- }
-
- public void testRecieveInterleaved() throws Exception
- {
- int msg = 0;
- int MAX_LOOPS = MSG_COUNT * 2;
- for (int loops = 0; (msg < MSG_COUNT) || (loops < MAX_LOOPS); loops++)
- {
-
- if (_consumer1.receive(1000) != null)
- {
- msg++;
- }
-
- if (_consumer2.receive(1000) != null)
- {
- msg++;
- }
- }
-
- assertEquals("Not all messages received.", MSG_COUNT, msg);
- }
-
- public void testAsynchronousRecieve() throws Exception
- {
- _consumer1.setMessageListener(new MessageListener()
- {
- public void onMessage(Message message)
- {
- _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message);
-
- receivedCount1++;
-
- if (receivedCount1 == (MSG_COUNT / 2))
- {
- _allMessagesSent.countDown();
- }
-
- }
- });
-
- _consumer2.setMessageListener(new MessageListener()
- {
- public void onMessage(Message message)
- {
- _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message);
-
- receivedCount2++;
- if (receivedCount2 == (MSG_COUNT / 2))
- {
- _allMessagesSent.countDown();
- }
- }
- });
-
- _logger.info("Waiting upto 2 seconds for messages");
-
- try
- {
- _allMessagesSent.await(4000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
-
- assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
- }
-
- public void testRecieveC2Only() throws Exception
- {
- if (
- !Boolean.parseBoolean(
- System.getProperties().getProperty(AMQSession.IMMEDIATE_PREFETCH,
- AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
- {
- _logger.info("Performing Receive only on C2");
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg, _consumer2.receive(1000) != null);
- }
- }
- }
-
- public void testRecieveBoth() throws Exception
- {
- if (
- !Boolean.parseBoolean(
- System.getProperties().getProperty(AMQSession.IMMEDIATE_PREFETCH,
- AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
- {
- _logger.info("Performing Receive only with two consumers on one session ");
-
- //Create a new consumer on session one that we don't use
- _clientSession1.createConsumer(_queue);
-
- int msg;
- for (msg = 0; msg < (MSG_COUNT / 2); msg++)
- {
-
- // Attempt to receive up to half the messages
- // The other half may have gone to the consumer above
- final Message message = _consumer1.receive(1000);
- if(message == null)
- {
- break;
- }
-
- }
-
- _consumer1.close();
- // This will close the unused consumer above.
- _clientSession1.close();
-
-
- // msg will now have recorded the number received on session 1
- // attempt to retrieve the rest on session 2
- for (; msg < MSG_COUNT ; msg++)
- {
- assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null);
- }
-
- }
- else
- {
- _logger.info("Performing Receive only on both C1 and C2");
-
- for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
- {
-
- assertTrue(_consumer1.receive(3000) != null);
- }
-
- for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
- {
- assertTrue(_consumer2.receive(3000) != null);
- }
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
deleted file mode 100644
index 142f301bd0..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.client;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.LogMonitor;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.Context;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
- * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
- * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at
- * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
- * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
- * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
- * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
- */
-public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener, ExceptionListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class);
-
- private Context _context;
-
- private static final int MSG_COUNT = 5;
- private int _receivedCount = 0;
- private int _errorCount = 0;
- private MessageConsumer _consumer;
- private Connection _clientConnection;
- private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- // Create Client
- _clientConnection = getConnection("guest", "guest");
-
- _clientConnection.start();
-
- Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue =clientSession.createQueue("message-listener-test-queue");
-
- _consumer = clientSession.createConsumer(queue);
-
- // Create Producer
-
- Connection producerConnection = getConnection("guest", "guest");
-
- producerConnection.start();
-
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer = producerSession.createProducer(queue);
-
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- producer.send(producerSession.createTextMessage("Message " + msg));
- }
-
- producerConnection.close();
-
- }
-
- protected void tearDown() throws Exception
- {
- if (_clientConnection != null)
- {
- _clientConnection.close();
- }
- super.tearDown();
- }
-
- public void testSynchronousReceive() throws Exception
- {
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- assertTrue(_consumer.receive(2000) != null);
- }
- }
-
- public void testSynchronousReceiveNoWait() throws Exception
- {
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- assertTrue("Failed to receive message " + msg, _consumer.receiveNoWait() != null);
- }
- }
-
- public void testAsynchronousReceive() throws Exception
- {
- _consumer.setMessageListener(this);
-
- _logger.info("Waiting 3 seconds for messages");
-
- try
- {
- _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
- // Should have received all async messages
- assertEquals(MSG_COUNT, _receivedCount);
-
- }
-
- public void testReceiveThenUseMessageListener() throws Exception
- {
- _logger.error("Test disabled as initial receive is not called first");
- // Perform initial receive to start connection
- assertTrue(_consumer.receive(2000) != null);
- _receivedCount++;
-
- // Sleep to ensure remaining 4 msgs end up on _synchronousQueue
- Thread.sleep(1000);
-
- // Set the message listener and wait for the messages to come in.
- _consumer.setMessageListener(this);
-
- _logger.info("Waiting 3 seconds for messages");
-
- try
- {
- _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
- // Should have received all async messages
- assertEquals(MSG_COUNT, _receivedCount);
-
- _clientConnection.close();
-
- Connection conn = getConnection("guest", "guest");
- Session clientSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = clientSession.createQueue("message-listener-test-queue");
- MessageConsumer cons = clientSession.createConsumer(queue);
- conn.start();
-
- // check that the messages were actually dequeued
- assertTrue(cons.receive(2000) == null);
- }
-
- /**
- * Tests the case where the message listener throws an java.lang.Error.
- *
- */
- public void testMessageListenerThrowsError() throws Exception
- {
- final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error";
- _clientConnection.setExceptionListener(this);
-
- _awaitMessages = new CountDownLatch(1);
-
- _consumer.setMessageListener(new MessageListener()
- {
- public void onMessage(Message message)
- {
- try
- {
- _logger.debug("onMessage called");
- _receivedCount++;
-
-
- throw new Error(javaLangErrorMessageText);
- }
- finally
- {
- _awaitMessages.countDown();
- }
- }
- });
-
-
- _logger.info("Waiting 3 seconds for message");
- _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
-
- assertEquals("onMessage should have been called", 1, _receivedCount);
- assertEquals("onException should NOT have been called", 0, _errorCount);
-
- // Check that Error has been written to the application log.
-
- LogMonitor _monitor = new LogMonitor(_outputFile);
- assertTrue("The expected message not written to log file.",
- _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT));
-
- if (_clientConnection != null)
- {
- try
- {
- _clientConnection.close();
- }
- catch (JMSException e)
- {
- // Ignore connection close errors for this test.
- }
- finally
- {
- _clientConnection = null;
- }
- }
- }
-
- public void onMessage(Message message)
- {
- _logger.info("Received Message(" + _receivedCount + "):" + message);
-
- _receivedCount++;
- _awaitMessages.countDown();
- }
-
- public void onException(JMSException e)
- {
- _logger.info("Exception received", e);
- _errorCount++;
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(MessageListenerTest.class);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
deleted file mode 100644
index 6ff6681c47..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.client;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
- * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
- * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
- * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
- * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
- * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
- * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
- */
-public class ResetMessageListenerTest extends QpidBrokerTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(ResetMessageListenerTest.class);
-
- private Context _context;
-
- private static final int MSG_COUNT = 6;
- private Connection _clientConnection, _producerConnection;
- private MessageConsumer _consumer1;
- private MessageProducer _producer;
- private Session _clientSession, _producerSession;
-
- private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
- private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- _clientConnection = getConnection("guest", "guest");
- _clientConnection.start();
- // Create Client 1
-
- _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = _clientSession.createQueue("reset-message-listener-test-queue");
-
- _consumer1 = _clientSession.createConsumer(queue);
-
- // Create Producer
- _producerConnection = getConnection("guest", "guest");
-
- _producerConnection.start();
-
- _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _producer = _producerSession.createProducer(queue);
-
- TextMessage m = _producerSession.createTextMessage();
- m.setStringProperty("rank", "first");
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- m.setText("Message " + msg);
- _producer.send(m);
- }
- }
-
- protected void tearDown() throws Exception
- {
- _clientConnection.close();
-
- super.tearDown();
- }
-
- public void testAsynchronousRecieve()
- {
-
- _logger.info("Test Start");
-
- try
- {
- _consumer1.setMessageListener(new MessageListener()
- {
- public void onMessage(Message message)
- {
- try
- {
- if (message.getStringProperty("rank").equals("first"))
- {
- _allFirstMessagesSent.countDown();
- }
- }
- catch (JMSException e)
- {
- e.printStackTrace();
- fail("error receiving message");
- }
- }
- });
- }
- catch (JMSException e)
- {
- _logger.error("Error Setting Default ML on consumer1");
- }
- try
- {
- assertTrue("Did not receive all first batch of messages",
- _allFirstMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS));
- _logger.info("Received first batch of messages");
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
-
- try
- {
- _clientConnection.stop();
- }
- catch (JMSException e)
- {
- _logger.error("Error stopping connection");
- }
-
- _logger.info("Reset Message Listener ");
- try
- {
- _consumer1.setMessageListener(new MessageListener()
- {
- public void onMessage(Message message)
- {
- try
- {
- if (message.getStringProperty("rank").equals("first"))
- {
- // Something ugly will happen, it'll probably kill the dispatcher
- fail("All first set of messages should have been received");
- }
- else
- {
- _allSecondMessagesSent.countDown();
- }
- }
- catch (JMSException e)
- {
- e.printStackTrace();
- // Something ugly will happen, it'll probably kill the dispatcher
- fail("error receiving message");
- }
- }
- });
-
- _clientConnection.start();
- }
- catch (javax.jms.IllegalStateException e)
- {
- _logger.error("Connection not stopped while setting ML", e);
- fail("Unable to change message listener:" + e.getCause());
- }
- catch (JMSException e)
- {
- _logger.error("Error Setting Better ML on consumer1", e);
- }
-
- try
- {
- _logger.info("Send additional messages");
- TextMessage m = _producerSession.createTextMessage();
- m.setStringProperty("rank", "second");
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- m.setText("Message " + msg);
- _producer.send(m);
- }
- }
- catch (JMSException e)
- {
- _logger.error("Unable to send additional messages", e);
- }
-
- _logger.info("Waiting for messages");
-
- try
- {
- assertTrue(_allSecondMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS));
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
- assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount());
- assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount());
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ResetMessageListenerTest.class);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java
index d7295b298e..08ed2258b2 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java
@@ -27,7 +27,6 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.Session;
-import javax.naming.Context;
/**
@@ -35,9 +34,7 @@ import javax.naming.Context;
*/
public class SessionCreateTest extends QpidBrokerTestCase
{
- private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class);
-
- private Context _context;
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionCreateTest.class);
private Connection _clientConnection;
protected int maxSessions = 65555;
@@ -54,7 +51,7 @@ public class SessionCreateTest extends QpidBrokerTestCase
Session sess = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(sess);
sess.close();
- System.out.println("created session: " + i);
+ LOGGER.debug("created session: " + i);
}
_clientConnection.close();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java
new file mode 100644
index 0000000000..bf147197e4
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class SynchReceiveTest extends QpidBrokerTestCase
+{
+ private static final long AWAIT_MESSAGE_TIMEOUT = 2000;
+ private static final long AWAIT_MESSAGE_TIMEOUT_NEGATIVE = 250;
+ private static final int MSG_COUNT = 10;
+ private final String _testQueueName = getTestQueueName();
+ private Connection _consumerConnection;
+ private Session _consumerSession;
+ private MessageConsumer _consumer;
+ private Queue _queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _consumerConnection = getConnection();
+ _consumerConnection.start();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _queue = _consumerSession.createQueue(_testQueueName);
+ _consumer = _consumerSession.createConsumer(_queue);
+
+ // Populate queue
+ Connection producerConnection = getConnection();
+ Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ sendMessage(producerSession, _queue, MSG_COUNT);
+ producerConnection.close();
+ }
+
+ public void testReceiveWithTimeout() throws Exception
+ {
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertNotNull("Expected message number " + msg, _consumer.receive(AWAIT_MESSAGE_TIMEOUT));
+ }
+
+ assertNull("Received too many messages", _consumer.receive(500));
+ }
+
+ public void testReceiveNoWait() throws Exception
+ {
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertNotNull("Expected message number " + msg, _consumer.receiveNoWait());
+ }
+
+ assertNull("Received too many messages", _consumer.receive(500));
+ }
+
+ public void testTwoConsumersInterleaved() throws Exception
+ {
+ //create a new connection with prefetch set to 1
+ _consumerConnection.close();
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
+
+ _consumerConnection = getConnection();
+ _consumerConnection.start();
+ Session consumerSession1 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer1 = consumerSession1.createConsumer(_queue);
+
+ Session consumerSession2 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = consumerSession2.createConsumer(_queue);
+
+ final int maxLoops = MSG_COUNT * 2;
+ int msg = 0;
+ int loops = 0;
+ while(msg < MSG_COUNT && loops < maxLoops)
+ {
+ if (consumer1.receive(AWAIT_MESSAGE_TIMEOUT) != null)
+ {
+ msg++;
+ }
+
+ if (consumer2.receive(AWAIT_MESSAGE_TIMEOUT) != null)
+ {
+ msg++;
+ }
+
+ loops++;
+ }
+
+ assertEquals("Not all messages received.", MSG_COUNT, msg);
+ assertNull("Received too many messages", consumer1.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE));
+ assertNull("Received too many messages", consumer2.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE));
+ }
+
+ public void testIdleSecondConsumer() throws Exception
+ {
+ Session idleSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ @SuppressWarnings("unused")
+ MessageConsumer idleConsumerOnSameQueue = idleSession.createConsumer(_queue);
+
+ // Since we don't call receive on the idle consumer, all messages will flow to other
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertNotNull("Expected message number " + msg, _consumer.receive(AWAIT_MESSAGE_TIMEOUT));
+ }
+
+ assertNull("Received too many messages", _consumer.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE));
+ }
+
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
deleted file mode 100644
index c764eda799..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.basic;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-
-public class ReceiveTest extends QpidBrokerTestCase
-{
- private AMQConnection _connection;
- private AMQDestination _destination;
- private AMQSession _session;
- private MessageConsumer _consumer;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- init((AMQConnection) getConnection("guest", "guest"));
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- private void init(AMQConnection connection) throws Exception
- {
- init(connection, new AMQQueue(connection,"ReceiveTest", true));
- }
-
- private void init(AMQConnection connection, AMQDestination destination) throws Exception
- {
- _connection = connection;
- _destination = destination;
- _session = (AMQSession) connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
- _consumer = _session.createConsumer(_destination);
- _connection.start();
- }
-
- public void test() throws Exception
- {
- Message m = _consumer.receive(5000);
- assertNull("should not have received a message", m);
- _connection.close();
- }
-
-
- public static junit.framework.Test suite()
- {
- // TODO: note that this test doesn't use the VMBrokerSetup
- // test helper class to create and tear down its
- // VMBroker. This is because the main() above seems to
- // indicate that it's also used outside of the surefire test
- // framework. If it isn't, then this test should also be
- // changed to use VMBrokerSetup here.
- return new junit.framework.TestSuite(ReceiveTest.class);
- }
-}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 14671f97af..74b02153ac 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -24,7 +24,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateEx
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testDeleteOptions
org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#*
-org.apache.qpid.client.ResetMessageListenerTest#*
// Those tests are testing 0.8 specific semantics
org.apache.qpid.test.client.ImmediateAndMandatoryPublishingTest#*
@@ -169,3 +168,6 @@ org.apache.qpid.server.message.MessageProtocolConversionTest#*
// passwd script is a Java Broker specific command line tool
org.apache.qpid.scripts.QpidPasswdTest#*
+
+// QPID-3604: Immediate Prefetch no longer supported by 0-10
+org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener
diff --git a/qpid/java/test-profiles/CPPPrefetchExcludes b/qpid/java/test-profiles/CPPPrefetchExcludes
index 7ef52f89c7..9b4d69cebd 100644
--- a/qpid/java/test-profiles/CPPPrefetchExcludes
+++ b/qpid/java/test-profiles/CPPPrefetchExcludes
@@ -18,6 +18,6 @@
//
// those tests should be run with prefetch off
-org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only
-org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
+org.apache.qpid.client.SynchReceiveTest#testTwoConsumersInterleaved
+org.apache.qpid.client.SynchReceiveTest#testIdleSecondConsumer
org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index ac6ac8ae1c..90df1cee81 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -54,3 +54,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionC
org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
+// QPID-3604: Immediate Prefetch no longer supported by 0-10
+org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener
+
diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes
index 2b77911177..18d2e52736 100644
--- a/qpid/java/test-profiles/JavaPre010Excludes
+++ b/qpid/java/test-profiles/JavaPre010Excludes
@@ -36,7 +36,7 @@ org.apache.qpid.server.queue.AddressBasedSortedQueueTest#*
// Those tests are written against the 0.10 path
org.apache.qpid.test.unit.message.UTF8Test#*
-org.apache.qpid.client.MessageListenerTest#testSynchronousReceiveNoWait
+org.apache.qpid.client.SynchReceiveTest#testReceiveNoWait
// Tests 0.10 client feature
org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism