diff options
Diffstat (limited to 'java/client/src/test')
4 files changed, 125 insertions, 48 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 11a39df10f..6062528d43 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -42,16 +42,13 @@ import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; /** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue - * <p/> - * The message delivery process: - * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s - * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start - * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a - * session can run in any order and a synchronous put/poll will block the dispatcher). - * <p/> - * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered - * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery + * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread + * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at + * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple + * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting + * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining + * messages will be left on the queue and lost, subsequent messages on the session will arrive first. */ public class MessageListenerMultiConsumerTest extends TestCase { @@ -66,7 +63,6 @@ public class MessageListenerMultiConsumerTest extends TestCase private MessageConsumer _consumer1; private MessageConsumer _consumer2; - private boolean _testAsync; private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock protected void setUp() throws Exception @@ -116,16 +112,10 @@ public class MessageListenerMultiConsumerTest extends TestCase producerConnection.close(); - _testAsync = false; } protected void tearDown() throws Exception { - //Should have recieved all async messages - if (_testAsync) - { - assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); - } _clientConnection.close(); super.tearDown(); @@ -161,8 +151,6 @@ public class MessageListenerMultiConsumerTest extends TestCase public void testAsynchronousRecieve() throws Exception { - _testAsync = true; - _consumer1.setMessageListener(new MessageListener() { public void onMessage(Message message) @@ -173,7 +161,7 @@ public class MessageListenerMultiConsumerTest extends TestCase if (receivedCount1 == MSG_COUNT / 2) { - _allMessagesSent.countDown(); + _allMessagesSent.countDown(); } } @@ -198,13 +186,14 @@ public class MessageListenerMultiConsumerTest extends TestCase try { - _allMessagesSent.await(2000, TimeUnit.MILLISECONDS); + _allMessagesSent.await(4000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { //do nothing } + assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); } diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 3e44888459..dc01005247 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.client; import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -38,18 +40,17 @@ import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.AMQBindingURL; /** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue - * <p/> - * The message delivery process: - * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s - * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start - * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a - * session can run in any order and a synchronous put/poll will block the dispatcher). - * <p/> - * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered - * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery + * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread + * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at + * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple + * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting + * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining + * messages will be left on the queue and lost, subsequent messages on the session will arrive first. */ public class MessageListenerTest extends TestCase implements MessageListener { @@ -61,7 +62,7 @@ public class MessageListenerTest extends TestCase implements MessageListener private int receivedCount = 0; private MessageConsumer _consumer; private Connection _clientConnection; - private boolean _testAsync; + private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT); protected void setUp() throws Exception { @@ -71,9 +72,9 @@ public class MessageListenerTest extends TestCase implements MessageListener InitialContextFactory factory = new PropertiesFileInitialContextFactory(); Hashtable<String, String> env = new Hashtable<String, String>(); - + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); - env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + env.put("queue.queue", "MessageListenerTest"); _context = factory.getInitialContext(env); @@ -86,7 +87,6 @@ public class MessageListenerTest extends TestCase implements MessageListener Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _consumer = clientSession.createConsumer(queue); //Create Producer @@ -106,16 +106,10 @@ public class MessageListenerTest extends TestCase implements MessageListener producerConnection.close(); - _testAsync = false; } protected void tearDown() throws Exception { - //Should have recieved all async messages - if (_testAsync) - { - assertEquals(MSG_COUNT, receivedCount); - } _clientConnection.close(); super.tearDown(); @@ -125,7 +119,6 @@ public class MessageListenerTest extends TestCase implements MessageListener public void testSynchronousRecieve() throws Exception { - for (int msg = 0; msg < MSG_COUNT; msg++) { assertTrue(_consumer.receive(2000) != null); @@ -134,21 +127,20 @@ public class MessageListenerTest extends TestCase implements MessageListener public void testAsynchronousRecieve() throws Exception { - _testAsync = true; - _consumer.setMessageListener(this); - _logger.info("Waiting 3 seconds for messages"); try { - Thread.sleep(2000); + _awaitMessages.await(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { //do nothing } + //Should have recieved all async messages + assertEquals(MSG_COUNT, receivedCount); } @@ -157,6 +149,7 @@ public class MessageListenerTest extends TestCase implements MessageListener _logger.info("Received Message(" + receivedCount + "):" + message); receivedCount++; + _awaitMessages.countDown(); } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java new file mode 100644 index 0000000000..e70196dff2 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.AMQShortString; + +public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandlerNoCloseOk.class); + + private static ChannelCloseMethodHandlerNoCloseOk _handler = new ChannelCloseMethodHandlerNoCloseOk(); + + public static ChannelCloseMethodHandlerNoCloseOk getInstance() + { + return _handler; + } + + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + { + _logger.debug("ChannelClose method received"); + ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); + + AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); + AMQShortString reason = method.replyText; + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); + } + + // For this test Method Handler .. don't send Close-OK +// // TODO: Be aware of possible changes to parameter order as versions change. +// AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor()); +// protocolSession.writeFrame(frame); + if (errorCode != AMQConstant.REPLY_SUCCESS) + { + _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); + if (errorCode == AMQConstant.NO_CONSUMERS) + { + throw new AMQNoConsumersException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + throw new AMQNoRouteException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.INVALID_SELECTOR) + { + _logger.debug("Broker responded with Invalid Selector."); + + throw new AMQInvalidSelectorException(String.valueOf(reason)); + } + else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) + { + _logger.debug("Broker responded with Invalid Routing Key."); + + throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); + } + else + { + throw new AMQChannelClosedException(errorCode, "Error: " + reason); + } + + } + protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); + } +}
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java index 1ed9750338..d128f30727 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java @@ -27,7 +27,6 @@ import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseMethodHandler; import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler; import org.apache.qpid.client.handler.BasicDeliverMethodHandler; import org.apache.qpid.client.handler.BasicReturnMethodHandler; @@ -91,7 +90,7 @@ public class NoCloseOKStateManager extends AMQStateManager // frame2handlerMap = new HashMap(); // Use Test Handler for Close methods to not send Close-OKs - frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance()); + frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance()); frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance()); frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); |
