summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java60
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java30
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java109
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java13
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java16
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java76
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/Config.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java3
9 files changed, 259 insertions, 52 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index a406f9f86e..794fd5c8c1 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -65,6 +65,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
+
protected void setUp() throws Exception
{
super.setUp();
@@ -122,30 +123,39 @@ public class MessageListenerMultiConsumerTest extends TestCase
TransportConnection.killAllVMBrokers();
}
+// public void testRecieveC1thenC2() throws Exception
+// {
+//
+// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+// {
+//
+// assertTrue(_consumer1.receive() != null);
+// }
+//
+// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+// {
+// assertTrue(_consumer2.receive() != null);
+// }
+// }
- public void testRecieveC1thenC2() throws Exception
+ public void testRecieveInterleaved() throws Exception
{
-
- for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ int msg = 0;
+ int MAX_LOOPS = MSG_COUNT * 2;
+ for (int loops = 0; msg < MSG_COUNT || loops < MAX_LOOPS; loops++)
{
- assertTrue(_consumer1.receive() != null);
- }
-
- for (int msg = 0; msg < MSG_COUNT / 2; msg++)
- {
- assertTrue(_consumer2.receive() != null);
+ if (_consumer1.receive(100) != null)
+ {
+ msg++;
+ }
+ if (_consumer2.receive(100) != null)
+ {
+ msg++;
+ }
}
- }
-
- public void testRecieveInterleaved() throws Exception
- {
- for (int msg = 0; msg < MSG_COUNT / 2; msg++)
- {
- assertTrue(_consumer1.receive() != null);
- assertTrue(_consumer2.receive() != null);
- }
+ assertEquals("Not all messages received.", MSG_COUNT, msg);
}
@@ -161,7 +171,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
if (receivedCount1 == MSG_COUNT / 2)
{
- _allMessagesSent.countDown();
+ _allMessagesSent.countDown();
}
}
@@ -196,6 +206,18 @@ public class MessageListenerMultiConsumerTest extends TestCase
assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
+ public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception
+ {
+ if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ {
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg,
+ _consumer2.receive(1000) != null);
+ }
+ }
+ }
+
public static junit.framework.Test suite()
{
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
index 5fb77af4db..7b5957ac8c 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -144,6 +144,36 @@ public class MessageListenerTest extends TestCase implements MessageListener
}
+ public void testRecieveTheUseMessageListener() throws Exception
+ {
+
+ _logger.error("Test disabled as initial receive is not called first");
+ // Perform initial receive to start connection
+// assertTrue(_consumer.receive(2000) != null);
+// receivedCount++;
+
+ // Sleep to ensure remaining 4 msgs end up on _synchronousQueue
+// Thread.sleep(1000);
+
+ // Set the message listener and wait for the messages to come in.
+ _consumer.setMessageListener(this);
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+ //Should have recieved all async messages
+ assertEquals(MSG_COUNT, receivedCount);
+
+ }
+
+
public void onMessage(Message message)
{
_logger.info("Received Message(" + receivedCount + "):" + message);
diff --git a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
index 10bf1a8d6d..42594fff8e 100644
--- a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -83,7 +83,7 @@ public class ResetMessageListenerTest extends TestCase
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "direct://amq.direct//ResetMessageListenerTest");
_context = factory.getInitialContext(env);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
new file mode 100644
index 0000000000..1b5da2631d
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
@@ -0,0 +1,109 @@
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import junit.framework.TestCase;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.TextMessage;
+import javax.jms.InvalidDestinationException;
+
+public class InvalidDestinationTest extends TestCase
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private MessageConsumer _consumer;
+
+ private static final String VM_BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ createVMBroker();
+ _connection = new AMQConnection(VM_BROKER, "guest", "guest", "ReceiveTestClient", "test");
+ }
+
+ public void createVMBroker()
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+
+
+
+ public void testInvalidDestination() throws Exception
+ {
+ Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
+ AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+ QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This is the only easy way to create and bind a queue from the API :-(
+ MessageConsumer consumer = queueSession.createConsumer(validDestination);
+
+ QueueSender sender = queueSession.createSender(invalidDestination);
+ TextMessage msg = queueSession.createTextMessage("Hello");
+ try
+ {
+ sender.send(msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.close();
+
+ sender = queueSession.createSender(null);
+ invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+ try
+ {
+ sender.send(invalidDestination,msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.send(validDestination,msg);
+ sender.close();
+ validDestination = new AMQQueue("amq.direct","knownQ");
+ sender = queueSession.createSender(validDestination);
+ sender.send(msg);
+
+
+
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+
+ return new junit.framework.TestSuite(InvalidDestinationTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index 7762cb3fe9..62234ad21f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -330,7 +330,7 @@ public class MessageRequeueTest extends TestCase
public void testRequeue() throws JMSException, AMQException, URLSyntaxException
{
int run = 0;
- while (run < 10)
+// while (run < 10)
{
run++;
@@ -350,17 +350,10 @@ public class MessageRequeueTest extends TestCase
_logger.debug("Create Consumer");
MessageConsumer consumer = session.createConsumer(q);
- try
- {
- Thread.sleep(2000);
- }
- catch (InterruptedException e)
- {
- //
- }
+ conn.start();
_logger.debug("Receiving msg");
- Message msg = consumer.receive(1000);
+ Message msg = consumer.receive(2000);
assertNotNull("Message should not be null", msg);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 0828ab398c..190b3861f0 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -100,7 +100,9 @@ public class DurableSubscriptionTest extends TestCase
AMQTopic topic = new AMQTopic(con,"MyTopic");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
- MessageProducer producer = session1.createProducer(topic);
+
+ Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ MessageProducer producer = sessionProd.createProducer(topic);
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
@@ -112,12 +114,12 @@ public class DurableSubscriptionTest extends TestCase
Message msg;
msg = consumer1.receive();
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(100);
assertEquals(null, msg);
msg = consumer2.receive();
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(100);
assertEquals(null, msg);
consumer2.close();
@@ -127,14 +129,14 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("B"));
- msg = consumer1.receive();
+ msg = consumer1.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(100);
assertEquals(null, msg);
- msg = consumer3.receive();
+ msg = consumer3.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
- msg = consumer3.receive(1000);
+ msg = consumer3.receive(100);
assertEquals(null, msg);
con.close();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 2abc139ced..685fe20048 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -53,12 +53,15 @@ public class CommitRollbackTest extends TestCase
Queue _jmsQueue;
private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class);
+ private static final String BROKER = "vm://:1";
protected void setUp() throws Exception
{
super.setUp();
- TransportConnection.createVMBroker(1);
-
+ if (BROKER.startsWith("vm"))
+ {
+ TransportConnection.createVMBroker(1);
+ }
testMethod++;
queue += testMethod;
@@ -68,7 +71,7 @@ public class CommitRollbackTest extends TestCase
private void newConnection() throws AMQException, URLSyntaxException, JMSException
{
- conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'");
+ conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='" + BROKER + "'");
_session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -87,7 +90,10 @@ public class CommitRollbackTest extends TestCase
super.tearDown();
conn.close();
- TransportConnection.killVMBroker(1);
+ if (BROKER.startsWith("vm"))
+ {
+ TransportConnection.killVMBroker(1);
+ }
}
/**
@@ -261,7 +267,7 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _pubSession.getTransacted());
_logger.info("sending test message");
- String MESSAGE_TEXT = "testGetThenDisconnect";
+ String MESSAGE_TEXT = "testGetThenRollback";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
_pubSession.commit();
@@ -394,16 +400,60 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
result = _consumer.receive(1000);
assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("1", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-
- result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("2", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-
+ if (result.getJMSRedelivered())
+ {
+ assertEquals("1", ((TextMessage) result).getText());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+ }
+ else
+ {
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+ }
result = _consumer.receive(1000);
assertNull("test message should be null:" + result, result);
+
+ }
+
+
+ public void testPutThenRollbackThenGet() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testPutThenRollbackThenGet";
+
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+ _pubSession.commit();
+
+ assertNotNull(_consumer.receive(100));
+
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _logger.info("rolling back");
+ _pubSession.rollback();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+ assertNull("test message was put and rolled back, but is still present", result);
+
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ assertNotNull(_consumer.receive(100));
+
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/java/client/src/test/java/org/apache/qpid/testutil/Config.java
index 8109d20a33..b777cf93b6 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/Config.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/Config.java
@@ -172,7 +172,7 @@ public class Config
}
catch(NumberFormatException e)
{
- throw new RuntimeException("Bad port number: " + value);
+ throw new RuntimeException("Bad port number: " + value, e);
}
}
else if("-name".equalsIgnoreCase(key))
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
index f2afa472ab..195ed79dab 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -3,6 +3,7 @@ package org.apache.qpid.testutil;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.log4j.Logger;
@@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener
}
catch (URLSyntaxException e)
{
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
}
}
}