diff options
Diffstat (limited to 'qpid/java')
13 files changed, 506 insertions, 1106 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java new file mode 100644 index 0000000000..09402c140d --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.LogMonitor; + +/** + * Tests the behaviour of JMS asynchronous message listeners as provided by + * {@link MessageListener#onMessage(Message)}. + * + */ +public class AsynchMessageListenerTest extends QpidBrokerTestCase +{ + private static final int MSG_COUNT = 10; + private static final long AWAIT_MESSAGE_TIMEOUT = 2000; + private static final long AWAIT_MESSAGE_TIMEOUT_NEGATIVE = 250; + private final String _testQueueName = getTestQueueName(); + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; + private Queue _queue; + + protected void setUp() throws Exception + { + super.setUp(); + + _consumerConnection = getConnection(); + _consumerConnection.start(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _queue = _consumerSession.createQueue(_testQueueName); + _consumer = _consumerSession.createConsumer(_queue); + + // Populate queue + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + sendMessage(producerSession, _queue, MSG_COUNT); + producerConnection.close(); + + } + + public void testMessageListener() throws Exception + { + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount()); + } + + public void testSynchronousReceiveFollowedByMessageListener() throws Exception + { + // Receive initial message synchronously + assertNotNull("Could not receive first message synchronously", _consumer.receive(AWAIT_MESSAGE_TIMEOUT) != null); + final int numberOfMessagesToReceiveByMessageListener = MSG_COUNT - 1; + + // Consume remainder asynchronously + CountingMessageListener countingMessageListener = new CountingMessageListener(numberOfMessagesToReceiveByMessageListener); + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount()); + } + + public void testMessageListenerSetDisallowsSynchronousReceive() throws Exception + { + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); + _consumer.setMessageListener(countingMessageListener); + + try + { + _consumer.receive(); + fail("Exception not thrown"); + } + catch (JMSException e) + { + // PASS + assertEquals("A listener has already been set.", e.getMessage()); + } + } + + + public void testConnectionStopThenStart() throws Exception + { + int messageToReceivedBeforeConnectionStop = 2; + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop); + + // Consume at least two messages + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + _consumerConnection.stop(); + + assertTrue("Too few messages received afer Connection#stop()", countingMessageListener.getReceivedCount() >= messageToReceivedBeforeConnectionStop); + countingMessageListener.resetLatch(); + + // Restart connection + _consumerConnection.start(); + + // Consume the remainder + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount()); + } + + public void testConnectionStopAndMessageListenerChange() throws Exception + { + int messageToReceivedBeforeConnectionStop = 2; + CountingMessageListener countingMessageListener1 = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop); + + // Consume remainder asynchronously + _consumer.setMessageListener(countingMessageListener1); + countingMessageListener1.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + _consumerConnection.stop(); + assertTrue("Too few messages received afer Connection#stop()", countingMessageListener1.getReceivedCount() >= messageToReceivedBeforeConnectionStop); + + CountingMessageListener countingMessageListener2 = new CountingMessageListener(countingMessageListener1.getOutstandingCount()); + + // Reset Message Listener + _consumer.setMessageListener(countingMessageListener2); + + _consumerConnection.start(); + + // Consume the remainder + countingMessageListener2.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener2.getOutstandingCount()); + + } + + public void testConnectionStopHaltsDeliveryToListener() throws Exception + { + int messageToReceivedBeforeConnectionStop = 2; + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop); + + // Consume at least two messages + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + _consumerConnection.stop(); + + // Connection should now be stopped and listener should receive no more + final int outstandingCountAtStop = countingMessageListener.getOutstandingCount(); + countingMessageListener.resetLatch(); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT_NEGATIVE); + + assertEquals("Unexpected number of outstanding messages", outstandingCountAtStop, countingMessageListener.getOutstandingCount()); + } + + public void testSessionCloseHaltsDelivery() throws Exception + { + int messageToReceivedBeforeConnectionStop = 2; + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop); + + // Consume at least two messages + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + _consumerSession.close(); + + // Once a session is closed, the listener should receive no more + final int outstandingCountAtClose = countingMessageListener.getOutstandingCount(); + countingMessageListener.resetLatch(); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT_NEGATIVE); + + assertEquals("Unexpected number of outstanding messages", outstandingCountAtClose, countingMessageListener.getOutstandingCount()); + } + + public void testImmediatePrefetchWithMessageListener() throws Exception + { + // Close connection provided by setup so we can set IMMEDIATE_PREFETCH + _consumerConnection.close(); + setTestClientSystemProperty(AMQSession.IMMEDIATE_PREFETCH, "true"); + + _consumerConnection = getConnection(); + _consumerConnection.start(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumer = _consumerSession.createConsumer(_queue); + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); + _consumer.setMessageListener(countingMessageListener); + + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of messages received", MSG_COUNT, countingMessageListener.getReceivedCount()); + } + + public void testReceiveTwoConsumers() throws Exception + { + Session consumerSession2 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer _consumer2 = consumerSession2.createConsumer(_queue); + + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); + _consumer.setMessageListener(countingMessageListener); + _consumer2.setMessageListener(countingMessageListener); + + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + assertEquals("Unexpected number of messages received", MSG_COUNT, countingMessageListener.getReceivedCount()); + } + + /** + * Tests the case where the message listener throws an java.lang.Error. + * TODO - a useful test?. + */ + public void testMessageListenerThrowsError() throws Exception + { + int expectedMessages = 1; // The error will kill the dispatcher so only one message will be delivered. + final CountDownLatch awaitMessages = new CountDownLatch(expectedMessages); + final AtomicInteger receivedCount = new AtomicInteger(0); + final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error"; + CountingExceptionListener countingExceptionListener = new CountingExceptionListener(); + _consumerConnection.setExceptionListener(countingExceptionListener); + + _consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message message) + { + try + { + throw new Error(javaLangErrorMessageText); + } + finally + { + receivedCount.incrementAndGet(); + awaitMessages.countDown(); + } + } + }); + + awaitMessages.await(AWAIT_MESSAGE_TIMEOUT, TimeUnit.MILLISECONDS); + + assertEquals("Unexpected number of messages received", expectedMessages, receivedCount.get()); + assertEquals("onException should NOT have been called", 0, countingExceptionListener.getErrorCount()); + + // Check that Error has been written to the application log. + + LogMonitor _monitor = new LogMonitor(_outputFile); + assertTrue("The expected message not written to log file.", + _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT)); + + if (_consumerConnection != null) + { + try + { + _consumerConnection.close(); + } + catch (JMSException e) + { + // Ignore connection close errors for this test. + } + finally + { + _consumerConnection = null; + } + } + } + + private final class CountingExceptionListener implements ExceptionListener + { + private final AtomicInteger _errorCount = new AtomicInteger(); + + @Override + public void onException(JMSException arg0) + { + _errorCount.incrementAndGet(); + } + + public int getErrorCount() + { + return _errorCount.intValue(); + } + } + + private final class CountingMessageListener implements MessageListener + { + private volatile CountDownLatch _awaitMessages; + private final AtomicInteger _receivedCount; + private final AtomicInteger _outstandingMessageCount; + + public CountingMessageListener(final int totalExpectedMessageCount) + { + this(totalExpectedMessageCount, totalExpectedMessageCount); + } + + + public CountingMessageListener(int totalExpectedMessageCount, int numberOfMessagesToAwait) + { + _receivedCount = new AtomicInteger(0); + _outstandingMessageCount = new AtomicInteger(totalExpectedMessageCount); + _awaitMessages = new CountDownLatch(numberOfMessagesToAwait); + } + + public int getOutstandingCount() + { + return _outstandingMessageCount.get(); + } + + public int getReceivedCount() + { + return _receivedCount.get(); + } + + public void resetLatch() + { + _awaitMessages = new CountDownLatch(_outstandingMessageCount.get()); + } + + @Override + public void onMessage(Message message) + { + _receivedCount.incrementAndGet(); + _outstandingMessageCount.decrementAndGet(); + _awaitMessages.countDown(); + } + + public boolean awaitMessages(long timeout) + { + try + { + return _awaitMessages.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java deleted file mode 100644 index 3537dd0533..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.naming.Context; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue - * <p/> - * The message delivery process: - * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s - * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start - * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a - * session can run in any order and a synchronous put/poll will block the dispatcher). - * <p/> - * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered - * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class DispatcherTest extends QpidBrokerTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(DispatcherTest.class); - - private Context _context; - - private static final int MSG_COUNT = 6; - private int _receivedCount = 0; - private int _receivedCountWhileStopped = 0; - private Connection _clientConnection, _producerConnection; - private MessageConsumer _consumer; - private MessageProducer _producer; - private Session _clientSession, _producerSession; - - private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); // all messages Sent Lock - private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); // all messages Sent Lock - - private volatile boolean _connectionStopped = false; - - protected void setUp() throws Exception - { - super.setUp(); - - // Create Client 1 - _clientConnection = getConnection(); - - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue queue = _clientSession.createQueue(this.getClass().getName()); - _consumer = _clientSession.createConsumer(queue); - - // Create Producer - _producerConnection = getConnection(); - - _producerConnection.start(); - - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _producer = _producerSession.createProducer(queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(_producerSession.createTextMessage("Message " + msg)); - } - } - - protected void tearDown() throws Exception - { - - _clientConnection.close(); - - _producerConnection.close(); - super.tearDown(); - } - - public void testAsynchronousRecieve() - { - _logger.info("Test Start"); - - assertTrue(!((AMQConnection) _clientConnection).started()); - - // Set default Message Listener - try - { - _consumer.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message); - - _receivedCount++; - - if (_receivedCount == MSG_COUNT) - { - _allFirstMessagesSent.countDown(); - } - - if (_connectionStopped) - { - _logger.info("Running with Message:" + _receivedCount); - } - - if (_connectionStopped && (_allFirstMessagesSent.getCount() == 0)) - { - _receivedCountWhileStopped++; - } - - if (_allFirstMessagesSent.getCount() == 0) - { - if (_receivedCount == (MSG_COUNT * 2)) - { - _allSecondMessagesSent.countDown(); - } - } - } - }); - - assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started()); - _clientConnection.start(); - } - catch (JMSException e) - { - _logger.error("Error Setting Default ML on consumer1"); - } - - try - { - _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - - try - { - assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started()); - _clientConnection.stop(); - _connectionStopped = true; - } - catch (JMSException e) - { - _logger.error("Error stopping connection"); - } - - try - { - _logger.error("Send additional messages"); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(_producerSession.createTextMessage("Message " + msg)); - } - } - catch (JMSException e) - { - _logger.error("Unable to send additional messages", e); - } - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) - { - // ignore - } - - try - { - _logger.info("Restarting connection"); - - _connectionStopped = false; - _clientConnection.start(); - } - catch (JMSException e) - { - _logger.error("Error Setting Better ML on consumer1", e); - } - - _logger.info("Waiting upto 2 seconds for messages"); - - try - { - _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - - assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount()); - assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount()); - assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount); - assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(DispatcherTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java deleted file mode 100644 index 7461f6c200..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery - * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread - * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at - * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple - * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting - * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining - * messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest -{ - protected void setUp() throws Exception - { - System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true"); - super.setUp(); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java deleted file mode 100644 index 4fd10a0134..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.naming.Context; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery - * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread - * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at - * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple - * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting - * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining - * messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class MessageListenerMultiConsumerTest extends QpidBrokerTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(MessageListenerMultiConsumerTest.class); - - private Context _context; - - private static final int MSG_COUNT = 6; - private int receivedCount1 = 0; - private int receivedCount2 = 0; - private Connection _clientConnection; - private MessageConsumer _consumer1; - private MessageConsumer _consumer2; - private Session _clientSession1; - private Queue _queue; - private final CountDownLatch _allMessagesSent = new CountDownLatch(2); // all messages Sent Lock - private static final String QUEUE_NAME = "queue" + UUID.randomUUID().toString(); - - protected void setUp() throws Exception - { - super.setUp(); - - // Create Client 1 - _clientConnection = getConnection("guest", "guest"); - - _clientConnection.start(); - - _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _queue =_clientSession1.createQueue(QUEUE_NAME); - - _consumer1 = _clientSession1.createConsumer(_queue); - - // Create Client 2 - Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _consumer2 = clientSession2.createConsumer(_queue); - - // Create Producer - Connection producerConnection = getConnection("guest", "guest"); - - producerConnection.start(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producer = producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - producer.send(producerSession.createTextMessage("Message " + msg)); - } - - producerConnection.close(); - - } - - protected void tearDown() throws Exception - { - _clientConnection.close(); - super.tearDown(); - } - - public void testRecieveInterleaved() throws Exception - { - int msg = 0; - int MAX_LOOPS = MSG_COUNT * 2; - for (int loops = 0; (msg < MSG_COUNT) || (loops < MAX_LOOPS); loops++) - { - - if (_consumer1.receive(1000) != null) - { - msg++; - } - - if (_consumer2.receive(1000) != null) - { - msg++; - } - } - - assertEquals("Not all messages received.", MSG_COUNT, msg); - } - - public void testAsynchronousRecieve() throws Exception - { - _consumer1.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message); - - receivedCount1++; - - if (receivedCount1 == (MSG_COUNT / 2)) - { - _allMessagesSent.countDown(); - } - - } - }); - - _consumer2.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message); - - receivedCount2++; - if (receivedCount2 == (MSG_COUNT / 2)) - { - _allMessagesSent.countDown(); - } - } - }); - - _logger.info("Waiting upto 2 seconds for messages"); - - try - { - _allMessagesSent.await(4000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - - assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); - } - - public void testRecieveC2Only() throws Exception - { - if ( - !Boolean.parseBoolean( - System.getProperties().getProperty(AMQSession.IMMEDIATE_PREFETCH, - AMQSession.IMMEDIATE_PREFETCH_DEFAULT))) - { - _logger.info("Performing Receive only on C2"); - for (int msg = 0; msg < MSG_COUNT; msg++) - { - assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg, _consumer2.receive(1000) != null); - } - } - } - - public void testRecieveBoth() throws Exception - { - if ( - !Boolean.parseBoolean( - System.getProperties().getProperty(AMQSession.IMMEDIATE_PREFETCH, - AMQSession.IMMEDIATE_PREFETCH_DEFAULT))) - { - _logger.info("Performing Receive only with two consumers on one session "); - - //Create a new consumer on session one that we don't use - _clientSession1.createConsumer(_queue); - - int msg; - for (msg = 0; msg < (MSG_COUNT / 2); msg++) - { - - // Attempt to receive up to half the messages - // The other half may have gone to the consumer above - final Message message = _consumer1.receive(1000); - if(message == null) - { - break; - } - - } - - _consumer1.close(); - // This will close the unused consumer above. - _clientSession1.close(); - - - // msg will now have recorded the number received on session 1 - // attempt to retrieve the rest on session 2 - for (; msg < MSG_COUNT ; msg++) - { - assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null); - } - - } - else - { - _logger.info("Performing Receive only on both C1 and C2"); - - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) - { - - assertTrue(_consumer1.receive(3000) != null); - } - - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) - { - assertTrue(_consumer2.receive(3000) != null); - } - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java deleted file mode 100644 index 142f301bd0..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.LogMonitor; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.naming.Context; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery - * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread - * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at - * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple - * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting - * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining - * messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener, ExceptionListener -{ - private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class); - - private Context _context; - - private static final int MSG_COUNT = 5; - private int _receivedCount = 0; - private int _errorCount = 0; - private MessageConsumer _consumer; - private Connection _clientConnection; - private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT); - - protected void setUp() throws Exception - { - super.setUp(); - - // Create Client - _clientConnection = getConnection("guest", "guest"); - - _clientConnection.start(); - - Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue queue =clientSession.createQueue("message-listener-test-queue"); - - _consumer = clientSession.createConsumer(queue); - - // Create Producer - - Connection producerConnection = getConnection("guest", "guest"); - - producerConnection.start(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producer = producerSession.createProducer(queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - producer.send(producerSession.createTextMessage("Message " + msg)); - } - - producerConnection.close(); - - } - - protected void tearDown() throws Exception - { - if (_clientConnection != null) - { - _clientConnection.close(); - } - super.tearDown(); - } - - public void testSynchronousReceive() throws Exception - { - for (int msg = 0; msg < MSG_COUNT; msg++) - { - assertTrue(_consumer.receive(2000) != null); - } - } - - public void testSynchronousReceiveNoWait() throws Exception - { - for (int msg = 0; msg < MSG_COUNT; msg++) - { - assertTrue("Failed to receive message " + msg, _consumer.receiveNoWait() != null); - } - } - - public void testAsynchronousReceive() throws Exception - { - _consumer.setMessageListener(this); - - _logger.info("Waiting 3 seconds for messages"); - - try - { - _awaitMessages.await(3000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - // Should have received all async messages - assertEquals(MSG_COUNT, _receivedCount); - - } - - public void testReceiveThenUseMessageListener() throws Exception - { - _logger.error("Test disabled as initial receive is not called first"); - // Perform initial receive to start connection - assertTrue(_consumer.receive(2000) != null); - _receivedCount++; - - // Sleep to ensure remaining 4 msgs end up on _synchronousQueue - Thread.sleep(1000); - - // Set the message listener and wait for the messages to come in. - _consumer.setMessageListener(this); - - _logger.info("Waiting 3 seconds for messages"); - - try - { - _awaitMessages.await(3000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - // Should have received all async messages - assertEquals(MSG_COUNT, _receivedCount); - - _clientConnection.close(); - - Connection conn = getConnection("guest", "guest"); - Session clientSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = clientSession.createQueue("message-listener-test-queue"); - MessageConsumer cons = clientSession.createConsumer(queue); - conn.start(); - - // check that the messages were actually dequeued - assertTrue(cons.receive(2000) == null); - } - - /** - * Tests the case where the message listener throws an java.lang.Error. - * - */ - public void testMessageListenerThrowsError() throws Exception - { - final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error"; - _clientConnection.setExceptionListener(this); - - _awaitMessages = new CountDownLatch(1); - - _consumer.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - try - { - _logger.debug("onMessage called"); - _receivedCount++; - - - throw new Error(javaLangErrorMessageText); - } - finally - { - _awaitMessages.countDown(); - } - } - }); - - - _logger.info("Waiting 3 seconds for message"); - _awaitMessages.await(3000, TimeUnit.MILLISECONDS); - - assertEquals("onMessage should have been called", 1, _receivedCount); - assertEquals("onException should NOT have been called", 0, _errorCount); - - // Check that Error has been written to the application log. - - LogMonitor _monitor = new LogMonitor(_outputFile); - assertTrue("The expected message not written to log file.", - _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT)); - - if (_clientConnection != null) - { - try - { - _clientConnection.close(); - } - catch (JMSException e) - { - // Ignore connection close errors for this test. - } - finally - { - _clientConnection = null; - } - } - } - - public void onMessage(Message message) - { - _logger.info("Received Message(" + _receivedCount + "):" + message); - - _receivedCount++; - _awaitMessages.countDown(); - } - - public void onException(JMSException e) - { - _logger.info("Exception received", e); - _errorCount++; - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(MessageListenerTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java deleted file mode 100644 index 6ff6681c47..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.naming.Context; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery - * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread - * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at - * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple - * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting - * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining - * messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class ResetMessageListenerTest extends QpidBrokerTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(ResetMessageListenerTest.class); - - private Context _context; - - private static final int MSG_COUNT = 6; - private Connection _clientConnection, _producerConnection; - private MessageConsumer _consumer1; - private MessageProducer _producer; - private Session _clientSession, _producerSession; - - private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock - private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock - - protected void setUp() throws Exception - { - super.setUp(); - - _clientConnection = getConnection("guest", "guest"); - _clientConnection.start(); - // Create Client 1 - - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue queue = _clientSession.createQueue("reset-message-listener-test-queue"); - - _consumer1 = _clientSession.createConsumer(queue); - - // Create Producer - _producerConnection = getConnection("guest", "guest"); - - _producerConnection.start(); - - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _producer = _producerSession.createProducer(queue); - - TextMessage m = _producerSession.createTextMessage(); - m.setStringProperty("rank", "first"); - for (int msg = 0; msg < MSG_COUNT; msg++) - { - m.setText("Message " + msg); - _producer.send(m); - } - } - - protected void tearDown() throws Exception - { - _clientConnection.close(); - - super.tearDown(); - } - - public void testAsynchronousRecieve() - { - - _logger.info("Test Start"); - - try - { - _consumer1.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - try - { - if (message.getStringProperty("rank").equals("first")) - { - _allFirstMessagesSent.countDown(); - } - } - catch (JMSException e) - { - e.printStackTrace(); - fail("error receiving message"); - } - } - }); - } - catch (JMSException e) - { - _logger.error("Error Setting Default ML on consumer1"); - } - try - { - assertTrue("Did not receive all first batch of messages", - _allFirstMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS)); - _logger.info("Received first batch of messages"); - } - catch (InterruptedException e) - { - // do nothing - } - - try - { - _clientConnection.stop(); - } - catch (JMSException e) - { - _logger.error("Error stopping connection"); - } - - _logger.info("Reset Message Listener "); - try - { - _consumer1.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - try - { - if (message.getStringProperty("rank").equals("first")) - { - // Something ugly will happen, it'll probably kill the dispatcher - fail("All first set of messages should have been received"); - } - else - { - _allSecondMessagesSent.countDown(); - } - } - catch (JMSException e) - { - e.printStackTrace(); - // Something ugly will happen, it'll probably kill the dispatcher - fail("error receiving message"); - } - } - }); - - _clientConnection.start(); - } - catch (javax.jms.IllegalStateException e) - { - _logger.error("Connection not stopped while setting ML", e); - fail("Unable to change message listener:" + e.getCause()); - } - catch (JMSException e) - { - _logger.error("Error Setting Better ML on consumer1", e); - } - - try - { - _logger.info("Send additional messages"); - TextMessage m = _producerSession.createTextMessage(); - m.setStringProperty("rank", "second"); - for (int msg = 0; msg < MSG_COUNT; msg++) - { - m.setText("Message " + msg); - _producer.send(m); - } - } - catch (JMSException e) - { - _logger.error("Unable to send additional messages", e); - } - - _logger.info("Waiting for messages"); - - try - { - assertTrue(_allSecondMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS)); - } - catch (InterruptedException e) - { - // do nothing - } - assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount()); - assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount()); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(ResetMessageListenerTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java index d7295b298e..08ed2258b2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java @@ -27,7 +27,6 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Session; -import javax.naming.Context; /** @@ -35,9 +34,7 @@ import javax.naming.Context; */ public class SessionCreateTest extends QpidBrokerTestCase { - private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class); - - private Context _context; + private static final Logger LOGGER = LoggerFactory.getLogger(SessionCreateTest.class); private Connection _clientConnection; protected int maxSessions = 65555; @@ -54,7 +51,7 @@ public class SessionCreateTest extends QpidBrokerTestCase Session sess = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(sess); sess.close(); - System.out.println("created session: " + i); + LOGGER.debug("created session: " + i); } _clientConnection.close(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java new file mode 100644 index 0000000000..bf147197e4 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class SynchReceiveTest extends QpidBrokerTestCase +{ + private static final long AWAIT_MESSAGE_TIMEOUT = 2000; + private static final long AWAIT_MESSAGE_TIMEOUT_NEGATIVE = 250; + private static final int MSG_COUNT = 10; + private final String _testQueueName = getTestQueueName(); + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; + private Queue _queue; + + protected void setUp() throws Exception + { + super.setUp(); + + _consumerConnection = getConnection(); + _consumerConnection.start(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _queue = _consumerSession.createQueue(_testQueueName); + _consumer = _consumerSession.createConsumer(_queue); + + // Populate queue + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + sendMessage(producerSession, _queue, MSG_COUNT); + producerConnection.close(); + } + + public void testReceiveWithTimeout() throws Exception + { + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertNotNull("Expected message number " + msg, _consumer.receive(AWAIT_MESSAGE_TIMEOUT)); + } + + assertNull("Received too many messages", _consumer.receive(500)); + } + + public void testReceiveNoWait() throws Exception + { + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertNotNull("Expected message number " + msg, _consumer.receiveNoWait()); + } + + assertNull("Received too many messages", _consumer.receive(500)); + } + + public void testTwoConsumersInterleaved() throws Exception + { + //create a new connection with prefetch set to 1 + _consumerConnection.close(); + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + + _consumerConnection = getConnection(); + _consumerConnection.start(); + Session consumerSession1 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = consumerSession1.createConsumer(_queue); + + Session consumerSession2 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = consumerSession2.createConsumer(_queue); + + final int maxLoops = MSG_COUNT * 2; + int msg = 0; + int loops = 0; + while(msg < MSG_COUNT && loops < maxLoops) + { + if (consumer1.receive(AWAIT_MESSAGE_TIMEOUT) != null) + { + msg++; + } + + if (consumer2.receive(AWAIT_MESSAGE_TIMEOUT) != null) + { + msg++; + } + + loops++; + } + + assertEquals("Not all messages received.", MSG_COUNT, msg); + assertNull("Received too many messages", consumer1.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE)); + assertNull("Received too many messages", consumer2.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE)); + } + + public void testIdleSecondConsumer() throws Exception + { + Session idleSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + @SuppressWarnings("unused") + MessageConsumer idleConsumerOnSameQueue = idleSession.createConsumer(_queue); + + // Since we don't call receive on the idle consumer, all messages will flow to other + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertNotNull("Expected message number " + msg, _consumer.receive(AWAIT_MESSAGE_TIMEOUT)); + } + + assertNull("Received too many messages", _consumer.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE)); + } + + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java deleted file mode 100644 index c764eda799..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.basic; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Message; -import javax.jms.MessageConsumer; - -public class ReceiveTest extends QpidBrokerTestCase -{ - private AMQConnection _connection; - private AMQDestination _destination; - private AMQSession _session; - private MessageConsumer _consumer; - - protected void setUp() throws Exception - { - super.setUp(); - init((AMQConnection) getConnection("guest", "guest")); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - private void init(AMQConnection connection) throws Exception - { - init(connection, new AMQQueue(connection,"ReceiveTest", true)); - } - - private void init(AMQConnection connection, AMQDestination destination) throws Exception - { - _connection = connection; - _destination = destination; - _session = (AMQSession) connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); - _consumer = _session.createConsumer(_destination); - _connection.start(); - } - - public void test() throws Exception - { - Message m = _consumer.receive(5000); - assertNull("should not have received a message", m); - _connection.close(); - } - - - public static junit.framework.Test suite() - { - // TODO: note that this test doesn't use the VMBrokerSetup - // test helper class to create and tear down its - // VMBroker. This is because the main() above seems to - // indicate that it's also used outside of the surefire test - // framework. If it isn't, then this test should also be - // changed to use VMBrokerSetup here. - return new junit.framework.TestSuite(ReceiveTest.class); - } -} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 14671f97af..74b02153ac 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -24,7 +24,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateEx org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testDeleteOptions org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* -org.apache.qpid.client.ResetMessageListenerTest#* // Those tests are testing 0.8 specific semantics org.apache.qpid.test.client.ImmediateAndMandatoryPublishingTest#* @@ -169,3 +168,6 @@ org.apache.qpid.server.message.MessageProtocolConversionTest#* // passwd script is a Java Broker specific command line tool org.apache.qpid.scripts.QpidPasswdTest#* + +// QPID-3604: Immediate Prefetch no longer supported by 0-10 +org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener diff --git a/qpid/java/test-profiles/CPPPrefetchExcludes b/qpid/java/test-profiles/CPPPrefetchExcludes index 7ef52f89c7..9b4d69cebd 100644 --- a/qpid/java/test-profiles/CPPPrefetchExcludes +++ b/qpid/java/test-profiles/CPPPrefetchExcludes @@ -18,6 +18,6 @@ // // those tests should be run with prefetch off -org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only -org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth +org.apache.qpid.client.SynchReceiveTest#testTwoConsumersInterleaved +org.apache.qpid.client.SynchReceiveTest#testIdleSecondConsumer org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index ac6ac8ae1c..90df1cee81 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -54,3 +54,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionC org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* +// QPID-3604: Immediate Prefetch no longer supported by 0-10 +org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener + diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index 2b77911177..18d2e52736 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -36,7 +36,7 @@ org.apache.qpid.server.queue.AddressBasedSortedQueueTest#* // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* -org.apache.qpid.client.MessageListenerTest#testSynchronousReceiveNoWait +org.apache.qpid.client.SynchReceiveTest#testReceiveNoWait // Tests 0.10 client feature org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism |
