summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java31
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java43
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java96
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java3
4 files changed, 125 insertions, 48 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 11a39df10f..6062528d43 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -42,16 +42,13 @@ import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
public class MessageListenerMultiConsumerTest extends TestCase
{
@@ -66,7 +63,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
private MessageConsumer _consumer1;
private MessageConsumer _consumer2;
- private boolean _testAsync;
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
protected void setUp() throws Exception
@@ -116,16 +112,10 @@ public class MessageListenerMultiConsumerTest extends TestCase
producerConnection.close();
- _testAsync = false;
}
protected void tearDown() throws Exception
{
- //Should have recieved all async messages
- if (_testAsync)
- {
- assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
- }
_clientConnection.close();
super.tearDown();
@@ -161,8 +151,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
public void testAsynchronousRecieve() throws Exception
{
- _testAsync = true;
-
_consumer1.setMessageListener(new MessageListener()
{
public void onMessage(Message message)
@@ -173,7 +161,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
if (receivedCount1 == MSG_COUNT / 2)
{
- _allMessagesSent.countDown();
+ _allMessagesSent.countDown();
}
}
@@ -198,13 +186,14 @@ public class MessageListenerMultiConsumerTest extends TestCase
try
{
- _allMessagesSent.await(2000, TimeUnit.MILLISECONDS);
+ _allMessagesSent.await(4000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
//do nothing
}
+ assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
index 3e44888459..dc01005247 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -21,6 +21,8 @@
package org.apache.qpid.client;
import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -38,18 +40,17 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
public class MessageListenerTest extends TestCase implements MessageListener
{
@@ -61,7 +62,7 @@ public class MessageListenerTest extends TestCase implements MessageListener
private int receivedCount = 0;
private MessageConsumer _consumer;
private Connection _clientConnection;
- private boolean _testAsync;
+ private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
protected void setUp() throws Exception
{
@@ -71,9 +72,9 @@ public class MessageListenerTest extends TestCase implements MessageListener
InitialContextFactory factory = new PropertiesFileInitialContextFactory();
Hashtable<String, String> env = new Hashtable<String, String>();
-
+
env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "MessageListenerTest");
_context = factory.getInitialContext(env);
@@ -86,7 +87,6 @@ public class MessageListenerTest extends TestCase implements MessageListener
Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
_consumer = clientSession.createConsumer(queue);
//Create Producer
@@ -106,16 +106,10 @@ public class MessageListenerTest extends TestCase implements MessageListener
producerConnection.close();
- _testAsync = false;
}
protected void tearDown() throws Exception
{
- //Should have recieved all async messages
- if (_testAsync)
- {
- assertEquals(MSG_COUNT, receivedCount);
- }
_clientConnection.close();
super.tearDown();
@@ -125,7 +119,6 @@ public class MessageListenerTest extends TestCase implements MessageListener
public void testSynchronousRecieve() throws Exception
{
-
for (int msg = 0; msg < MSG_COUNT; msg++)
{
assertTrue(_consumer.receive(2000) != null);
@@ -134,21 +127,20 @@ public class MessageListenerTest extends TestCase implements MessageListener
public void testAsynchronousRecieve() throws Exception
{
- _testAsync = true;
-
_consumer.setMessageListener(this);
-
_logger.info("Waiting 3 seconds for messages");
try
{
- Thread.sleep(2000);
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
//do nothing
}
+ //Should have recieved all async messages
+ assertEquals(MSG_COUNT, receivedCount);
}
@@ -157,6 +149,7 @@ public class MessageListenerTest extends TestCase implements MessageListener
_logger.info("Received Message(" + receivedCount + "):" + message);
receivedCount++;
+ _awaitMessages.countDown();
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
new file mode 100644
index 0000000000..e70196dff2
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
+
+public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
+
+ private static ChannelCloseMethodHandlerNoCloseOk _handler = new ChannelCloseMethodHandlerNoCloseOk();
+
+ public static ChannelCloseMethodHandlerNoCloseOk getInstance()
+ {
+ return _handler;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("ChannelClose method received");
+ ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
+ AMQShortString reason = method.replyText;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+ }
+
+ // For this test Method Handler .. don't send Close-OK
+// // TODO: Be aware of possible changes to parameter order as versions change.
+// AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
+// protocolSession.writeFrame(frame);
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
+ {
+ _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ throw new AMQNoConsumersException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR)
+ {
+ _logger.debug("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(String.valueOf(reason));
+ }
+ else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+ {
+ _logger.debug("Broker responded with Invalid Routing Key.");
+
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+ }
+ else
+ {
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+ }
+
+ }
+ protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ }
+} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
index 1ed9750338..d128f30727 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
@@ -27,7 +27,6 @@ import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
import org.apache.qpid.client.handler.BasicReturnMethodHandler;
@@ -91,7 +90,7 @@ public class NoCloseOKStateManager extends AMQStateManager
//
frame2handlerMap = new HashMap();
// Use Test Handler for Close methods to not send Close-OKs
- frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance());
+ frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance());
frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());