diff options
Diffstat (limited to 'java/client/src/test')
9 files changed, 259 insertions, 52 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index a406f9f86e..794fd5c8c1 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -65,6 +65,7 @@ public class MessageListenerMultiConsumerTest extends TestCase private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock + protected void setUp() throws Exception { super.setUp(); @@ -122,30 +123,39 @@ public class MessageListenerMultiConsumerTest extends TestCase TransportConnection.killAllVMBrokers(); } +// public void testRecieveC1thenC2() throws Exception +// { +// +// for (int msg = 0; msg < MSG_COUNT / 2; msg++) +// { +// +// assertTrue(_consumer1.receive() != null); +// } +// +// for (int msg = 0; msg < MSG_COUNT / 2; msg++) +// { +// assertTrue(_consumer2.receive() != null); +// } +// } - public void testRecieveC1thenC2() throws Exception + public void testRecieveInterleaved() throws Exception { - - for (int msg = 0; msg < MSG_COUNT / 2; msg++) + int msg = 0; + int MAX_LOOPS = MSG_COUNT * 2; + for (int loops = 0; msg < MSG_COUNT || loops < MAX_LOOPS; loops++) { - assertTrue(_consumer1.receive() != null); - } - - for (int msg = 0; msg < MSG_COUNT / 2; msg++) - { - assertTrue(_consumer2.receive() != null); + if (_consumer1.receive(100) != null) + { + msg++; + } + if (_consumer2.receive(100) != null) + { + msg++; + } } - } - - public void testRecieveInterleaved() throws Exception - { - for (int msg = 0; msg < MSG_COUNT / 2; msg++) - { - assertTrue(_consumer1.receive() != null); - assertTrue(_consumer2.receive() != null); - } + assertEquals("Not all messages received.", MSG_COUNT, msg); } @@ -161,7 +171,7 @@ public class MessageListenerMultiConsumerTest extends TestCase if (receivedCount1 == MSG_COUNT / 2) { - _allMessagesSent.countDown(); + _allMessagesSent.countDown(); } } @@ -196,6 +206,18 @@ public class MessageListenerMultiConsumerTest extends TestCase assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); } + public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception + { + if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + { + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg, + _consumer2.receive(1000) != null); + } + } + } + public static junit.framework.Test suite() { diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 5fb77af4db..7b5957ac8c 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -144,6 +144,36 @@ public class MessageListenerTest extends TestCase implements MessageListener } + public void testRecieveTheUseMessageListener() throws Exception + { + + _logger.error("Test disabled as initial receive is not called first"); + // Perform initial receive to start connection +// assertTrue(_consumer.receive(2000) != null); +// receivedCount++; + + // Sleep to ensure remaining 4 msgs end up on _synchronousQueue +// Thread.sleep(1000); + + // Set the message listener and wait for the messages to come in. + _consumer.setMessageListener(this); + + _logger.info("Waiting 3 seconds for messages"); + + try + { + _awaitMessages.await(3000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + //Should have recieved all async messages + assertEquals(MSG_COUNT, receivedCount); + + } + + public void onMessage(Message message) { _logger.info("Received Message(" + receivedCount + "):" + message); diff --git a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java index 10bf1a8d6d..42594fff8e 100644 --- a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -83,7 +83,7 @@ public class ResetMessageListenerTest extends TestCase Hashtable<String, String> env = new Hashtable<String, String>(); env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'"); - env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + env.put("queue.queue", "direct://amq.direct//ResetMessageListenerTest"); _context = factory.getInitialContext(env); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java new file mode 100644 index 0000000000..1b5da2631d --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java @@ -0,0 +1,109 @@ +package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import junit.framework.TestCase;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.TextMessage;
+import javax.jms.InvalidDestinationException;
+
+public class InvalidDestinationTest extends TestCase
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private MessageConsumer _consumer;
+
+ private static final String VM_BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ createVMBroker();
+ _connection = new AMQConnection(VM_BROKER, "guest", "guest", "ReceiveTestClient", "test");
+ }
+
+ public void createVMBroker()
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+
+
+
+ public void testInvalidDestination() throws Exception
+ {
+ Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
+ AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+ QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This is the only easy way to create and bind a queue from the API :-(
+ MessageConsumer consumer = queueSession.createConsumer(validDestination);
+
+ QueueSender sender = queueSession.createSender(invalidDestination);
+ TextMessage msg = queueSession.createTextMessage("Hello");
+ try
+ {
+ sender.send(msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.close();
+
+ sender = queueSession.createSender(null);
+ invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+ try
+ {
+ sender.send(invalidDestination,msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.send(validDestination,msg);
+ sender.close();
+ validDestination = new AMQQueue("amq.direct","knownQ");
+ sender = queueSession.createSender(validDestination);
+ sender.send(msg);
+
+
+
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+
+ return new junit.framework.TestSuite(InvalidDestinationTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index 7762cb3fe9..62234ad21f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -330,7 +330,7 @@ public class MessageRequeueTest extends TestCase public void testRequeue() throws JMSException, AMQException, URLSyntaxException { int run = 0; - while (run < 10) +// while (run < 10) { run++; @@ -350,17 +350,10 @@ public class MessageRequeueTest extends TestCase _logger.debug("Create Consumer"); MessageConsumer consumer = session.createConsumer(q); - try - { - Thread.sleep(2000); - } - catch (InterruptedException e) - { - // - } + conn.start(); _logger.debug("Receiving msg"); - Message msg = consumer.receive(1000); + Message msg = consumer.receive(2000); assertNotNull("Message should not be null", msg); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 0828ab398c..190b3861f0 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -100,7 +100,9 @@ public class DurableSubscriptionTest extends TestCase AMQTopic topic = new AMQTopic(con,"MyTopic"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); - MessageProducer producer = session1.createProducer(topic); + + Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + MessageProducer producer = sessionProd.createProducer(topic); Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); @@ -112,12 +114,12 @@ public class DurableSubscriptionTest extends TestCase Message msg; msg = consumer1.receive(); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer1.receive(1000); + msg = consumer1.receive(100); assertEquals(null, msg); msg = consumer2.receive(); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer2.receive(1000); + msg = consumer2.receive(100); assertEquals(null, msg); consumer2.close(); @@ -127,14 +129,14 @@ public class DurableSubscriptionTest extends TestCase producer.send(session1.createTextMessage("B")); - msg = consumer1.receive(); + msg = consumer1.receive(100); assertEquals("B", ((TextMessage) msg).getText()); - msg = consumer1.receive(1000); + msg = consumer1.receive(100); assertEquals(null, msg); - msg = consumer3.receive(); + msg = consumer3.receive(100); assertEquals("B", ((TextMessage) msg).getText()); - msg = consumer3.receive(1000); + msg = consumer3.receive(100); assertEquals(null, msg); con.close(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 2abc139ced..685fe20048 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -53,12 +53,15 @@ public class CommitRollbackTest extends TestCase Queue _jmsQueue; private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class); + private static final String BROKER = "vm://:1"; protected void setUp() throws Exception { super.setUp(); - TransportConnection.createVMBroker(1); - + if (BROKER.startsWith("vm")) + { + TransportConnection.createVMBroker(1); + } testMethod++; queue += testMethod; @@ -68,7 +71,7 @@ public class CommitRollbackTest extends TestCase private void newConnection() throws AMQException, URLSyntaxException, JMSException { - conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"); + conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='" + BROKER + "'"); _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); @@ -87,7 +90,10 @@ public class CommitRollbackTest extends TestCase super.tearDown(); conn.close(); - TransportConnection.killVMBroker(1); + if (BROKER.startsWith("vm")) + { + TransportConnection.killVMBroker(1); + } } /** @@ -261,7 +267,7 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _pubSession.getTransacted()); _logger.info("sending test message"); - String MESSAGE_TEXT = "testGetThenDisconnect"; + String MESSAGE_TEXT = "testGetThenRollback"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); _pubSession.commit(); @@ -394,16 +400,60 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); result = _consumer.receive(1000); assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); - - result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); - + if (result.getJMSRedelivered()) + { + assertEquals("1", ((TextMessage) result).getText()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + } + else + { + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + } result = _consumer.receive(1000); assertNull("test message should be null:" + result, result); + + } + + + public void testPutThenRollbackThenGet() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenRollbackThenGet"; + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + _pubSession.commit(); + + assertNotNull(_consumer.receive(100)); + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("rolling back"); + _pubSession.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + assertNull("test message was put and rolled back, but is still present", result); + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + assertNotNull(_consumer.receive(100)); + } } diff --git a/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/java/client/src/test/java/org/apache/qpid/testutil/Config.java index 8109d20a33..b777cf93b6 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/Config.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/Config.java @@ -172,7 +172,7 @@ public class Config } catch(NumberFormatException e) { - throw new RuntimeException("Bad port number: " + value); + throw new RuntimeException("Bad port number: " + value, e); } } else if("-name".equalsIgnoreCase(key)) diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index f2afa472ab..195ed79dab 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -3,6 +3,7 @@ package org.apache.qpid.testutil; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; import org.apache.log4j.Logger; @@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener } catch (URLSyntaxException e) { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); } } } |
