summaryrefslogtreecommitdiff
path: root/java/systests
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-06-27 15:49:51 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-06-27 15:49:51 +0000
commit9bac85e36d60df28fb6f86e3bbe9e3f46689aa04 (patch)
tree593c9d3d5cd46191099b558d0ef2d3ef012f968d /java/systests
parentf10117cd6464a107b086e0b7f7ea44a496b04c3d (diff)
downloadqpid-python-9bac85e36d60df28fb6f86e3bbe9e3f46689aa04.tar.gz
Merged revisions 550748-551121 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r550748 | ritchiem | 2007-06-26 10:20:17 +0100 (Tue, 26 Jun 2007) | 1 line Added xml file for logging during sustained tests ........ r550773 | rupertlssmith | 2007-06-26 12:03:04 +0100 (Tue, 26 Jun 2007) | 1 line Immediate and mandatory flag tests added. ........ r550849 | rupertlssmith | 2007-06-26 17:43:58 +0100 (Tue, 26 Jun 2007) | 1 line QPID-509 Mandatory messages not returned outside a transaction. They are now. ........ r551117 | ritchiem | 2007-06-27 11:51:34 +0100 (Wed, 27 Jun 2007) | 2 lines Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki. http://cwiki.apache.org/qpid/sustained-tests.html ........ r551118 | ritchiem | 2007-06-27 11:51:51 +0100 (Wed, 27 Jun 2007) | 1 line Added intelij files to ignore list ........ r551119 | ritchiem | 2007-06-27 11:55:34 +0100 (Wed, 27 Jun 2007) | 1 line POM update to add Apache content to built jars ........ r551120 | ritchiem | 2007-06-27 11:58:25 +0100 (Wed, 27 Jun 2007) | 1 line Updated default guest password as it was not correct. ........ r551121 | ritchiem | 2007-06-27 12:00:48 +0100 (Wed, 27 Jun 2007) | 1 line Added additional information to log message when available to aid the explination of a failed connection ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551207 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java344
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java198
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java27
3 files changed, 507 insertions, 62 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
index 05fbceca20..048fcfb0b3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
@@ -25,10 +25,11 @@ import junit.framework.TestCase;
import org.apache.log4j.NDC;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,8 +42,11 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
/**
* ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS
@@ -58,6 +62,10 @@ import java.util.List;
* connected.
* <tr><td> Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is
* connected.
+ * <tr><td> Check that an immediate message results in no consumers code, not using transactions, when a consumer is
+ * disconnected.
+ * <tr><dt> Check that an immediate message results in no consumers code, in a transaction, when a consumer is
+ * disconnected.
* </table>
*
* @todo Write a test decorator, the sole function of which is to populate test context properties, from sys properties,
@@ -73,63 +81,215 @@ public class ImmediateMessageTest extends TestCase
private static final Logger log = LoggerFactory.getLogger(ImmediateMessageTest.class);
/** Used to read the tests configurable properties through. */
- ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+ ParsedProperties testProps;
- /** All these tests should have the immediate flag on. */
- private boolean immediateFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
+ /** Used to create unique destination names for each test.
+ * @todo Move into the test framework.
+ */
+ private static AtomicLong uniqueDestsId = new AtomicLong();
/** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
- public void test_QPID_517_ImmediateOkNoTx() throws Exception
+ public void test_QPID_517_ImmediateOkNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */
- public void test_QPID_517_ImmediateOkTx() throws Exception
+ public void test_QPID_517_ImmediateOkTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkNoTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
/** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerNoTx() throws Exception
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
/** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerTx() throws Exception
+ public void test_QPID_517_ImmediateFailsNoRouteTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
protected void setUp() throws Exception
{
NDC.push(getName());
+ testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the immediate flag on. */
+ testProps.setProperty(IMMEDIATE_PROPNAME, true);
+
+ /** Bind the receivers consumer by default. */
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true);
+
// Ensure that the in-vm broker is created.
TransportConnection.createVMBroker(1);
}
@@ -220,10 +380,64 @@ public class ImmediateMessageTest extends TestCase
return false;
}
+ /**
+ * Reports the number of exceptions held by this monitor.
+ *
+ * @return The number of exceptions held by this monitor.
+ */
+ public int size()
+ {
+ return exceptions.size();
+ }
+
public void reset()
{
exceptions = new ArrayList();
}
+
+ /**
+ * Provides a dump of the stack traces of all exceptions that this exception monitor was notified of. Mainly
+ * use for debugging/test failure reporting purposes.
+ *
+ * @return A string containing a dump of the stack traces of all exceptions.
+ */
+ public String toString()
+ {
+ String result = "ExceptionMonitor: holds " + exceptions.size() + " exceptions.\n\n";
+
+ for (JMSException ex : exceptions)
+ {
+ result += getStackTrace(ex) + "\n";
+ }
+
+ return result;
+ }
+
+ /**
+ * Prints an exception stack trace into a string.
+ *
+ * @param t The throwable to get the stack trace from.
+ *
+ * @return A string containing the throwables stack trace.
+ */
+ public static String getStackTrace(Throwable t)
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw, true);
+ t.printStackTrace(pw);
+ pw.flush();
+ sw.flush();
+
+ return sw.toString();
+ }
+ }
+
+ public static class MessageMonitor implements MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message): called");
+ }
}
/**
@@ -290,22 +504,30 @@ public class ImmediateMessageTest extends TestCase
{
try
{
- int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
- boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
- String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME);
- String destinationReceiveRoot = messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME);
+ // Get a unique offset to append to destination names to make them unique to the connection.
+ long uniqueId = uniqueDestsId.incrementAndGet();
+
+ // Extract the standard test configuration parameters relevant to the connection.
+ String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME) + "_" + uniqueId;
+ String destinationReceiveRoot =
+ messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME) + "_" + uniqueId;
boolean createPublisherProducer = messagingProps.getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME);
boolean createPublisherConsumer = messagingProps.getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME);
boolean createReceiverProducer = messagingProps.getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME);
boolean createReceiverConsumer = messagingProps.getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME);
+
+ // Check which JMS flags and options are to be set.
+ int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ boolean durableSubscription = messagingProps.getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME);
// Check if any Qpid/AMQP specific flags or options need to be set.
boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME);
boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME);
boolean needsQpidOptions = immediate | mandatory;
- log.debug("ackMode = " + ackMode);
+ /*log.debug("ackMode = " + ackMode);
log.debug("useTopics = " + useTopics);
log.debug("destinationSendRoot = " + destinationSendRoot);
log.debug("destinationReceiveRoot = " + destinationReceiveRoot);
@@ -316,7 +538,7 @@ public class ImmediateMessageTest extends TestCase
log.debug("transactional = " + transactional);
log.debug("immediate = " + immediate);
log.debug("mandatory = " + mandatory);
- log.debug("needsQpidOptions = " + needsQpidOptions);
+ log.debug("needsQpidOptions = " + needsQpidOptions);*/
// Create connection, sessions and producer/consumer pairs on each session.
Connection connection = createConnection(messagingProps);
@@ -329,7 +551,7 @@ public class ImmediateMessageTest extends TestCase
Session receiverSession = connection.createSession(transactional, ackMode);
Destination publisherProducerDestination =
- useTopics ? publisherSession.createTopic(destinationSendRoot)
+ useTopics ? (Destination) publisherSession.createTopic(destinationSendRoot)
: publisherSession.createQueue(destinationSendRoot);
MessageProducer publisherProducer =
@@ -342,13 +564,29 @@ public class ImmediateMessageTest extends TestCase
createPublisherConsumer
? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null;
+ if (publisherConsumer != null)
+ {
+ publisherConsumer.setMessageListener(new MessageMonitor());
+ }
+
MessageProducer receiverProducer =
createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot))
: null;
+ Destination receiverConsumerDestination =
+ useTopics ? (Destination) receiverSession.createTopic(destinationSendRoot)
+ : receiverSession.createQueue(destinationSendRoot);
+
MessageConsumer receiverConsumer =
- createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot))
- : null;
+ createReceiverConsumer
+ ? ((durableSubscription && useTopics)
+ ? receiverSession.createDurableSubscriber((Topic) receiverConsumerDestination, "testsub")
+ : receiverSession.createConsumer(receiverConsumerDestination)) : null;
+
+ if (receiverConsumer != null)
+ {
+ receiverConsumer.setMessageListener(new MessageMonitor());
+ }
// Start listening for incoming messages.
connection.start();
@@ -372,7 +610,8 @@ public class ImmediateMessageTest extends TestCase
public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException
{
- return client.getSession().createMessage();
+ return client.getSession().createTextMessage("Hello");
+ // return client.getSession().createMessage();
}
/**
@@ -428,12 +667,12 @@ public class ImmediateMessageTest extends TestCase
public MessageProducer getProducer()
{
- return null;
+ return producer;
}
public MessageConsumer getConsumer()
{
- return null;
+ return consumer;
}
public void send(Message message) throws JMSException
@@ -524,6 +763,10 @@ public class ImmediateMessageTest extends TestCase
public ExceptionMonitor getExceptionMonitor();
+ public void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */);
+
+ public void testNoExceptions(ParsedProperties testProps);
+
public void close();
}
@@ -603,62 +846,63 @@ public class ImmediateMessageTest extends TestCase
}
}
- public static void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */)
+ public void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */)
{
- PublisherReceiver testClients;
-
- // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
- testClients = createPublisherReceiverPairSharedConnection(testProps);
- testClients.start();
-
- testClients.send(testProps, 1);
-
+ start();
+ send(testProps, 1);
pause(1000L);
String errors = "";
- if (!testClients.getConnectionExceptionMonitor().assertOneJMSExceptionWithLinkedCause(aClass))
+ ExceptionMonitor connectionExceptionMonitor = getConnectionExceptionMonitor();
+ if (!connectionExceptionMonitor.assertOneJMSExceptionWithLinkedCause(aClass))
{
- errors += "Was expecting linked exception type " + aClass.getName() + ".\n";
+ errors += "Was expecting linked exception type " + aClass.getName() + " on the connection.\n";
+ errors +=
+ (connectionExceptionMonitor.size() > 0)
+ ? ("Actually got the following exceptions on the connection, " + connectionExceptionMonitor)
+ : "Got no exceptions on the connection.";
}
// Clean up the publisher/receiver client pair.
- testClients.close();
+ close();
assertEquals(errors, "", errors);
}
/**
*/
- public static void testNoExceptions(ParsedProperties testProps)
+ public void testNoExceptions(ParsedProperties testProps)
{
- PublisherReceiver testClients;
-
- // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
- testClients = createPublisherReceiverPairSharedConnection(testProps);
- testClients.start();
-
- testClients.send(testProps, 1);
-
+ start();
+ send(testProps, 1);
pause(1000L);
String errors = "";
- if (!testClients.getConnectionExceptionMonitor().assertNoExceptions())
+ if (!getConnectionExceptionMonitor().assertNoExceptions())
{
- errors += "There were connection exceptions.\n";
+ errors += "Was expecting no exceptions.\n";
+ errors += "Got the following exceptions on the connection, " + getConnectionExceptionMonitor();
}
- if (!testClients.getExceptionMonitor().assertNoExceptions())
+ if (!getExceptionMonitor().assertNoExceptions())
{
- errors += "There were exceptions on producer.\n";
+ errors += "Was expecting no exceptions.\n";
+ errors += "Got the following exceptions on the producer, " + getExceptionMonitor();
}
// Clean up the publisher/receiver client pair.
- testClients.close();
+ close();
assertEquals(errors, "", errors);
}
+
+ public static PublisherReceiver connectClients(ParsedProperties testProps)
+ {
+ // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
+ return createPublisherReceiverPairSharedConnection(testProps);
+ }
}
/**
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
index f41acca11b..09a32aa3eb 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
@@ -49,6 +49,10 @@ import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
* connected.
* <tr><td> Check that a mandatory message results in no route code, upon transaction commit, when a consumer is
* connected.
+ * <tr><td> Check that a mandatory message is sent succesfully, not using transactions, when a consumer is
+ * disconnected but the route exists.
+ * <tr><dt> Check that a mandatory message is send successfully, in a transactions, when a consumer is
+ * disconnected but when the route exists.
* </table>
*/
public class MandatoryMessageTest extends TestCase
@@ -57,64 +61,234 @@ public class MandatoryMessageTest extends TestCase
private static final Logger log = LoggerFactory.getLogger(MandatoryMessageTest.class);
/** Used to read the tests configurable properties through. */
- ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+ ParsedProperties testProps;
- /** All these tests should have the mandatory flag on. */
- // private boolean mandatoryFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
- private boolean mandatoryFlag = testProps.setProperty(MANDATORY_PROPNAME, true);
+ /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
/** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
- public void test_QPID_508_MandatoryOkNoTx() throws Exception
+ public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
- public void test_QPID_508_MandatoryOkTx() throws Exception
+ public void test_QPID_508_MandatoryOkTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
// Send one message with no errors.
- ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteNoTx() throws Exception
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
/** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteTx() throws Exception
+ public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
protected void setUp() throws Exception
{
NDC.push(getName());
+ testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the mandatory flag on. */
+ testProps.setProperty(MANDATORY_PROPNAME, true);
+
+ /** Bind the receivers consumer by default. */
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true);
+
// Ensure that the in-vm broker is created.
TransportConnection.createVMBroker(1);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
index 9c8cefc492..b584c8c80b 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import org.apache.qpid.jms.Session;
@@ -167,6 +187,12 @@ public class MessagingTestConfigProperties
/** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the durable subscriptions flag from, when doing pub/sub messaging. */
+ public static final String DURABLE_SUBSCRIPTION_PROPNAME = "durableSubscription";
+
+ /** Defines the default value of the durable subscriptions flag. */
+ public static final boolean DURABLE_SUBSCRIPTION_DEFAULT = false;
+
// ====================== Qpid Options and Flags ================================
/** Holds the name of the property to set the exclusive flag from. */
@@ -272,6 +298,7 @@ public class MessagingTestConfigProperties
defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT);
defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);