From 0f0bb4d46355a335ddd4c97bf2b6d7af09fd8ca2 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Fri, 22 Jun 2007 15:39:27 +0000 Subject: Added Immediate and Mandatory message tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@549849 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/ImmediateMessageTest.java | 686 +++++++++++++++++++++ .../qpid/server/exchange/MandatoryMessageTest.java | 135 ++++ .../exchange/MessagingTestConfigProperties.java | 282 +++++++++ 3 files changed, 1103 insertions(+) create mode 100644 java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java create mode 100644 java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java create mode 100644 java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java (limited to 'java/systests/src') diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java new file mode 100644 index 0000000000..05fbceca20 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java @@ -0,0 +1,686 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; + +import org.apache.log4j.NDC; + +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import java.util.ArrayList; +import java.util.List; + +/** + * ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS + * feature. A message may be marked with an immediate delivery flag, which means that a consumer must be connected + * to receive the message, through a valid route, when it is sent, or when its transaction is committed in the case + * of transactional messaging. If this is not the case, the broker should return the message with a NO_CONSUMERS code. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Check that an immediate message is sent succesfully not using transactions when a consumer is connected. + *
Check that an immediate message is committed succesfully in a transaction when a consumer is connected. + *
Check that an immediate message results in no consumers code, not using transactions, when no consumer is + * connected. + *
Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is + * connected. + *
+ * + * @todo Write a test decorator, the sole function of which is to populate test context properties, from sys properties, + * from trailing prop=value pairs on the command line, from test properties files or other sources. This should + * run through stanard JUnit without the JUnit toolkit extensions, and through Maven surefire, and also through + * the JUnit toolkit extended test runners. + * + * @todo Veto test topologies using bounce back. Or else the bounce back client will act as an immediate consumer. + */ +public class ImmediateMessageTest extends TestCase +{ + /** Used for debugging. */ + private static final Logger log = LoggerFactory.getLogger(ImmediateMessageTest.class); + + /** Used to read the tests configurable properties through. */ + ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /** All these tests should have the immediate flag on. */ + private boolean immediateFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true); + + /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */ + public void test_QPID_517_ImmediateOkNoTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + + // Send one message with no errors. + PublisherReceiverImpl.testNoExceptions(testProps); + } + + /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */ + public void test_QPID_517_ImmediateOkTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, true); + + // Send one message with no errors. + PublisherReceiverImpl.testNoExceptions(testProps); + } + + /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */ + public void test_QPID_517_ImmediateFailsNoConsumerNoTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + // Send one message and get a linked no consumers exception. + PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class); + } + + /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */ + public void test_QPID_517_ImmediateFailsNoConsumerTx() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + // Send one message and get a linked no consumers exception. + PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class); + } + + protected void setUp() throws Exception + { + NDC.push(getName()); + + // Ensure that the in-vm broker is created. + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + try + { + // Ensure that the in-vm broker is cleaned up so that the next test starts afresh. + TransportConnection.killVMBroker(1); + ApplicationRegistry.remove(1); + } + finally + { + NDC.pop(); + } + } + + /* + * Stuff below: + * + * This will get tidied into some sort on JMS convenience framework, through which practically any usefull test + * topology can be created. This will become a replacement for PingPongProducer. + * + * Base everything on standard connection properties defined in PingPongProducer. Split JMS and AMQP-only properties. + * + * Integrate with ConversationFactory, so that it will work with prod/con pairs. + * + * Support pub/rec pairs. + * Support m*n pub/rec setups. All pubs/recs on one machine. + * + * Support bounce back clients, with configurable bounce back behavior. All, one in X, round robin one in m, etc. + * + * Support pairing of m*n pub/rec setups with bounce back clients. JVM running a test, can simulate m publishers, + * will receive (a known subset of) all messages sent, bounced back to n receivers. Co-location of pub/rec will be + * the normal model to allow accurate timings to be taken. + * + * Support creation of pub or rec only. + * Support clock synching of pub/rec on different JVMs, by calculating clock offsets. Must also provide an accuracy + * estimate to +- the results. + * + * Augment the interop Coordinator, to become a full distributed test coordinator. Capable of querying available + * tests machines, looking at test parameters and farming out tests onto the test machines, passing all test + * parameters, standard naming of pub/rec config parameters used to set up m*n test topologies, run test cases, + * report results, tear down m*n topologies. Need to split the re-usable general purpose distributed test coordinator + * from the Qpid specific test framework for creating test-topoloigies and passing Qpid specific parameters. + * + * Write all tests against pub/rec pairs, without coding to the fact that the topology may be anything from 1:1 in + * JVM to m*n with bounce back clients accross many machines. That is, make the test topology orthogonal to the test + * case. + */ + + private static class ExceptionMonitor implements ExceptionListener + { + List exceptions = new ArrayList(); + + public void onException(JMSException e) + { + log.debug("ExceptionMonitor got JMSException: ", e); + + exceptions.add(e); + } + + public boolean assertNoExceptions() + { + return exceptions.isEmpty(); + } + + public boolean assertOneJMSException() + { + return exceptions.size() == 1; + } + + public boolean assertOneJMSExceptionWithLinkedCause(Class aClass) + { + if (exceptions.size() == 1) + { + JMSException e = exceptions.get(0); + + Exception linkedCause = e.getLinkedException(); + + if ((linkedCause != null) && aClass.isInstance(linkedCause)) + { + return true; + } + } + + return false; + } + + public void reset() + { + exceptions = new ArrayList(); + } + } + + /** + * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple + * convenience method for code that does anticipate handling connection failures. All exceptions that indicate + * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure + * handler. + * + * @param messagingProps Any additional connection properties. + * + * @return A JMS conneciton. + * + * @todo Move this to a Utils library class or base test class. Also move the copy in interop.TestClient too. + * + * @todo Make in VM broker creation step optional on whether one is to be used or not. + */ + public static Connection createConnection(ParsedProperties messagingProps) + { + log.debug("public static Connection createConnection(Properties messagingProps = " + messagingProps + "): called"); + + try + { + // Extract the configured connection properties from the test configuration. + String conUsername = messagingProps.getProperty(USERNAME_PROPNAME); + String conPassword = messagingProps.getProperty(PASSWORD_PROPNAME); + String virtualHost = messagingProps.getProperty(VIRTUAL_HOST_PROPNAME); + String brokerUrl = messagingProps.getProperty(BROKER_PROPNAME); + + // Set up the broker connection url. + String connectionString = + "amqp://" + conUsername + ":" + conPassword + "/" + ((virtualHost != null) ? virtualHost : "") + + "?brokerlist='" + brokerUrl + "'"; + + // messagingProps.setProperty(CONNECTION_PROPNAME, connectionString); + + Context ctx = new InitialContext(messagingProps); + + ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME); + Connection connection = cf.createConnection(); + + return connection; + } + catch (NamingException e) + { + log.debug("Got NamingException: ", e); + throw new RuntimeException("Got JNDI NamingException whilst looking up the connection factory.", e); + } + catch (JMSException e) + { + log.debug("Got JMSException: ", e); + throw new RuntimeException("Could not establish connection due to JMSException.", e); + } + } + + /** + * Creates a publisher and a receiver on the same connection, configured according the to specified standard + * properties. + * + * @param messagingProps The connection properties. + * + * @return A publisher/receiver client pair. + */ + public static PublisherReceiver createPublisherReceiverPairSharedConnection(ParsedProperties messagingProps) + { + try + { + int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME); + boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME); + String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME); + String destinationReceiveRoot = messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME); + boolean createPublisherProducer = messagingProps.getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME); + boolean createPublisherConsumer = messagingProps.getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME); + boolean createReceiverProducer = messagingProps.getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME); + boolean createReceiverConsumer = messagingProps.getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME); + boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME); + + // Check if any Qpid/AMQP specific flags or options need to be set. + boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME); + boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME); + boolean needsQpidOptions = immediate | mandatory; + + log.debug("ackMode = " + ackMode); + log.debug("useTopics = " + useTopics); + log.debug("destinationSendRoot = " + destinationSendRoot); + log.debug("destinationReceiveRoot = " + destinationReceiveRoot); + log.debug("createPublisherProducer = " + createPublisherProducer); + log.debug("createPublisherConsumer = " + createPublisherConsumer); + log.debug("createReceiverProducer = " + createReceiverProducer); + log.debug("createReceiverConsumer = " + createReceiverConsumer); + log.debug("transactional = " + transactional); + log.debug("immediate = " + immediate); + log.debug("mandatory = " + mandatory); + log.debug("needsQpidOptions = " + needsQpidOptions); + + // Create connection, sessions and producer/consumer pairs on each session. + Connection connection = createConnection(messagingProps); + + // Add the connection exception listener to assert on exception conditions with. + ExceptionMonitor exceptionMonitor = new ExceptionMonitor(); + connection.setExceptionListener(exceptionMonitor); + + Session publisherSession = connection.createSession(transactional, ackMode); + Session receiverSession = connection.createSession(transactional, ackMode); + + Destination publisherProducerDestination = + useTopics ? publisherSession.createTopic(destinationSendRoot) + : publisherSession.createQueue(destinationSendRoot); + + MessageProducer publisherProducer = + createPublisherProducer + ? (needsQpidOptions + ? ((AMQSession) publisherSession).createProducer(publisherProducerDestination, mandatory, immediate) + : publisherSession.createProducer(publisherProducerDestination)) : null; + + MessageConsumer publisherConsumer = + createPublisherConsumer + ? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null; + + MessageProducer receiverProducer = + createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot)) + : null; + + MessageConsumer receiverConsumer = + createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot)) + : null; + + // Start listening for incoming messages. + connection.start(); + + // Package everything up. + ProducerConsumerPair publisher = + new ProducerConsumerPairImpl(publisherProducer, publisherConsumer, publisherSession); + ProducerConsumerPair receiver = + new ProducerConsumerPairImpl(receiverProducer, receiverConsumer, receiverSession); + + PublisherReceiver result = new PublisherReceiverImpl(publisher, receiver, connection, exceptionMonitor); + + return result; + } + catch (JMSException e) + { + log.debug("Got JMSException: ", e); + throw new RuntimeException("Could not create publisher/receiver pair due to a JMSException.", e); + } + } + + public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException + { + return client.getSession().createMessage(); + } + + /** + * A ProducerConsumerPair is a pair consisting of one message producer and one message consumer. It is a standard + * unit of connectivity allowing a full-duplex conversation to be held, provided both the consumer and producer + * are instantiated and configured. + * + * In some situations a test, or piece of application code will be written with differing numbers of publishers + * and receivers in different roles, where one role produces only and one consumes only. This messaging topology + * can still make use of producer/consumer pairs as standard building blocks, combined into publisher/receiver + * units to fulfill the different messaging roles, with the publishers consumer uninstantiated and the receivers + * producer uninstantiated. Use a {@link PublisherReceiver} for this. + * + *

+ *
CRC Card
Responsibilities + *
Provide a message producer for sending messages. + *
Provide a message consumer for receiving messages. + *
+ * + * @todo Update the {@link org.apache.qpid.util.ConversationFactory} so that it accepts these as the basic + * conversation connection units. + */ + public static interface ProducerConsumerPair + { + public MessageProducer getProducer(); + + public MessageConsumer getConsumer(); + + public void send(Message message) throws JMSException; + + public Session getSession(); + + public void close() throws JMSException; + } + + /** + * A single producer and consumer. + */ + public static class ProducerConsumerPairImpl implements ProducerConsumerPair + { + MessageProducer producer; + + MessageConsumer consumer; + + Session session; + + public ProducerConsumerPairImpl(MessageProducer producer, MessageConsumer consumer, Session session) + { + this.producer = producer; + this.consumer = consumer; + this.session = session; + } + + public MessageProducer getProducer() + { + return null; + } + + public MessageConsumer getConsumer() + { + return null; + } + + public void send(Message message) throws JMSException + { + producer.send(message); + } + + public Session getSession() + { + return session; + } + + public void close() throws JMSException + { + if (producer != null) + { + producer.close(); + } + + if (consumer != null) + { + consumer.close(); + } + } + } + + /** + * Multiple producers and consumers made to look like a single producer and consumer. All methods repeated accross + * all producers and consumers. + */ + public static class MultiProducerConsumerPairImpl implements ProducerConsumerPair + { + public MessageProducer getProducer() + { + throw new RuntimeException("Not implemented."); + } + + public MessageConsumer getConsumer() + { + throw new RuntimeException("Not implemented."); + } + + public void send(Message message) throws JMSException + { + throw new RuntimeException("Not implemented."); + } + + public Session getSession() + { + throw new RuntimeException("Not implemented."); + } + + public void close() + { + throw new RuntimeException("Not implemented."); + } + } + + /** + * A PublisherReceiver consists of two sets of producer/consumer pairs, one for an 'instigating' publisher + * role, and one for a more 'passive' receiver role. + * + *

A set of publishers and receivers forms a typical test configuration where both roles are to be controlled + * from within a single JVM. This is not a particularly usefull arrangement for applications which want to place + * these roles on physically seperate machines and pass messages between them. It is a faily normal arrangement for + * test code though, either to publish and receive messages through an in-VM message broker in order to test its + * expected behaviour, or to publish and receive (possibly bounced back) messages through a seperate broker instance + * in order to take performance timings. In the case of performance timings, the co-location of the publisher and + * receiver means that the timings are taken on the same machine for accurate timing without the need for clock + * synchronization. + * + *

+ *
CRC Card
Responsibilities + *
Manage an m*n array of publisher and recievers. + *
+ */ + public static interface PublisherReceiver + { + public ProducerConsumerPair getPublisher(); + + public ProducerConsumerPair getReceiver(); + + public void start(); + + public void send(ParsedProperties testProps, int numMessages); + + public ExceptionMonitor getConnectionExceptionMonitor(); + + public ExceptionMonitor getExceptionMonitor(); + + public void close(); + } + + public static class PublisherReceiverImpl implements PublisherReceiver + { + private ProducerConsumerPair publisher; + private ProducerConsumerPair receiver; + private Connection connection; + private ExceptionMonitor connectionExceptionMonitor; + private ExceptionMonitor exceptionMonitor; + + public PublisherReceiverImpl(ProducerConsumerPair publisher, ProducerConsumerPair receiver, Connection connection, + ExceptionMonitor connectionExceptionMonitor) + { + this.publisher = publisher; + this.receiver = receiver; + this.connection = connection; + this.connectionExceptionMonitor = connectionExceptionMonitor; + this.exceptionMonitor = new ExceptionMonitor(); + } + + public ProducerConsumerPair getPublisher() + { + return publisher; + } + + public ProducerConsumerPair getReceiver() + { + return receiver; + } + + public void start() + { } + + public void close() + { + try + { + publisher.close(); + receiver.close(); + connection.close(); + } + catch (JMSException e) + { + throw new RuntimeException("Got JMSException during close.", e); + } + } + + public ExceptionMonitor getConnectionExceptionMonitor() + { + return connectionExceptionMonitor; + } + + public ExceptionMonitor getExceptionMonitor() + { + return exceptionMonitor; + } + + public void send(ParsedProperties testProps, int numMessages) + { + boolean transactional = testProps.getPropertyAsBoolean(TRANSACTED_PROPNAME); + + // Send an immediate message through the publisher and ensure that it results in a JMSException. + try + { + getPublisher().send(createTestMessage(getPublisher(), testProps)); + + if (transactional) + { + getPublisher().getSession().commit(); + } + } + catch (JMSException e) + { + log.debug("Got JMSException: ", e); + exceptionMonitor.onException(e); + } + } + + public static void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */) + { + PublisherReceiver testClients; + + // Create a standard publisher/receiver test client pair on a shared connection, individual sessions. + testClients = createPublisherReceiverPairSharedConnection(testProps); + testClients.start(); + + testClients.send(testProps, 1); + + pause(1000L); + + String errors = ""; + + if (!testClients.getConnectionExceptionMonitor().assertOneJMSExceptionWithLinkedCause(aClass)) + { + errors += "Was expecting linked exception type " + aClass.getName() + ".\n"; + } + + // Clean up the publisher/receiver client pair. + testClients.close(); + + assertEquals(errors, "", errors); + } + + /** + */ + public static void testNoExceptions(ParsedProperties testProps) + { + PublisherReceiver testClients; + + // Create a standard publisher/receiver test client pair on a shared connection, individual sessions. + testClients = createPublisherReceiverPairSharedConnection(testProps); + testClients.start(); + + testClients.send(testProps, 1); + + pause(1000L); + + String errors = ""; + + if (!testClients.getConnectionExceptionMonitor().assertNoExceptions()) + { + errors += "There were connection exceptions.\n"; + } + + if (!testClients.getExceptionMonitor().assertNoExceptions()) + { + errors += "There were exceptions on producer.\n"; + } + + // Clean up the publisher/receiver client pair. + testClients.close(); + + assertEquals(errors, "", errors); + } + } + + /** + * Pauses for the specified length of time. In the event of failing to pause for at least that length of time + * due to interuption of the thread, a RutimeException is raised to indicate the failure. The interupted status + * of the thread is restores in that case. This method should only be used when it is expected that the pause + * will be succesfull, for example in test code that relies on inejecting a pause. + * + * @param t The minimum time to pause for in milliseconds. + */ + public static void pause(long t) + { + try + { + Thread.sleep(t); + } + catch (InterruptedException e) + { + // Restore the interrupted status + Thread.currentThread().interrupt(); + + throw new RuntimeException("Failed to generate the requested pause length.", e); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java new file mode 100644 index 0000000000..f41acca11b --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; + +import org.apache.log4j.NDC; + +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.client.transport.TransportConnection; +import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*; +import org.apache.qpid.server.registry.ApplicationRegistry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +/** + * MandatoryMessageTest tests for the desired behaviour of mandatory messages. Mandatory messages are a non-JMS + * feature. A message may be marked with a mandatory delivery flag, which means that a valid route for the message + * must exist, when it is sent, or when its transaction is committed in the case of transactional messaging. If this + * is not the case, the broker should return the message with a NO_CONSUMERS code. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Check that a mandatory message is sent succesfully not using transactions when a consumer is connected. + *
Check that a mandatory message is committed succesfully in a transaction when a consumer is connected. + *
Check that a mandatory message results in no route code, not using transactions, when no consumer is + * connected. + *
Check that a mandatory message results in no route code, upon transaction commit, when a consumer is + * connected. + *
+ */ +public class MandatoryMessageTest extends TestCase +{ + /** Used for debugging. */ + private static final Logger log = LoggerFactory.getLogger(MandatoryMessageTest.class); + + /** Used to read the tests configurable properties through. */ + ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /** All these tests should have the mandatory flag on. */ + // private boolean mandatoryFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true); + private boolean mandatoryFlag = testProps.setProperty(MANDATORY_PROPNAME, true); + + /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */ + public void test_QPID_508_MandatoryOkNoTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + + // Send one message with no errors. + ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps); + } + + /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */ + public void test_QPID_508_MandatoryOkTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, true); + + // Send one message with no errors. + ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps); + } + + /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */ + public void test_QPID_508_MandatoryFailsNoRouteNoTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + // Send one message and get a linked no consumers exception. + ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class); + } + + /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */ + public void test_QPID_508_MandatoryFailsNoRouteTx() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + // Send one message and get a linked no consumers exception. + ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class); + } + + protected void setUp() throws Exception + { + NDC.push(getName()); + + // Ensure that the in-vm broker is created. + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + try + { + // Ensure that the in-vm broker is cleaned up so that the next test starts afresh. + TransportConnection.killVMBroker(1); + ApplicationRegistry.remove(1); + } + finally + { + NDC.pop(); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java new file mode 100644 index 0000000000..9c8cefc492 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java @@ -0,0 +1,282 @@ +package org.apache.qpid.server.exchange; + +import org.apache.qpid.jms.Session; + +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +/** + * MessagingTestConfigProperties defines a set of property names and default values for specifying a messaging topology, + * and test parameters for running a messaging test over that topology. A Properties object holding some of these + * properties, superimposed onto the defaults, is used to establish test topologies and control test behaviour. + * + *

A complete list of the parameters, default values and comments on their usage is provided here: + * + *

+ *
Parameters
Parameter Default Comments + *
messageSize 0 Message size in bytes. Not including any headers. + *
destinationName ping The root name to use to generate destination names to ping. + *
persistent false Determines whether peristent delivery is used. + *
transacted false Determines whether messages are sent/received in transactions. + *
broker tcp://localhost:5672 Determines the broker to connect to. + *
virtualHost test Determines the virtual host to send all ping over. + *
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. + *
verbose false The verbose flag for debugging. Prints to console on every message. + *
pubsub false Whether to ping topics or queues. Uses p2p by default. + *
username guest The username to access the broker with. + *
password guest The password to access the broker with. + *
selector null Not used. Defines a message selector to filter pings with. + *
destinationCount 1 The number of receivers listening to the pings. + *
timeout 30000 In milliseconds. The timeout to stop waiting for replies. + *
commitBatchSize 1 The number of messages per transaction in transactional mode. + *
uniqueDests true Whether each receiver only listens to one ping destination or all. + *
durableDests false Whether or not durable destinations are used. + *
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: + * 0 - SESSION_TRANSACTED + * 1 - AUTO_ACKNOWLEDGE + * 2 - CLIENT_ACKNOWLEDGE + * 3 - DUPS_OK_ACKNOWLEDGE + * 257 - NO_ACKNOWLEDGE + * 258 - PRE_ACKNOWLEDGE + *
maxPending 0 The maximum size in bytes, of messages sent but not yet received. + * Limits the volume of messages currently buffered on the client + * or broker. Can help scale test clients by limiting amount of buffered + * data to avoid out of memory errors. + *
+ * + *

+ *
CRC Card
Responsibilities Collaborations + *
Provide the names and defaults of all test parameters. + *
+ */ +public class MessagingTestConfigProperties +{ + // ====================== Connection Properties ================================== + + /** Holds the name of the default connection configuration. */ + public static final String CONNECTION_NAME = "broker"; + + /** Holds the name of the property to get the initial context factory name from. */ + public static final String INITIAL_CONTEXT_FACTORY_PROPNAME = "java.naming.factory.initial"; + + /** Defines the class to use as the initial context factory by default. */ + public static final String INITIAL_CONTEXT_FACTORY_DEFAULT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + /** Holds the name of the default connection factory configuration property. */ + public static final String CONNECTION_PROPNAME = "connectionfactory.broker"; + + /** Defeins the default connection configuration. */ + public static final String CONNECTION_DEFAULT = "amqp://guest:guest@clientid/?brokerlist='vm://:1'"; + + /** Holds the name of the property to get the test broker url from. */ + public static final String BROKER_PROPNAME = "qpid.test.broker"; + + /** Holds the default broker url for the test. */ + public static final String BROKER_DEFAULT = "vm://:1"; + + /** Holds the name of the property to get the test broker virtual path. */ + public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; + + /** Holds the default virtual path for the test. */ + public static final String VIRTUAL_HOST_DEFAULT = ""; + + /** Holds the name of the property to get the broker access username from. */ + public static final String USERNAME_PROPNAME = "username"; + + /** Holds the default broker log on username. */ + public static final String USERNAME_DEFAULT = "guest"; + + /** Holds the name of the property to get the broker access password from. */ + public static final String PASSWORD_PROPNAME = "password"; + + /** Holds the default broker log on password. */ + public static final String PASSWORD_DEFAULT = "guest"; + + // ====================== Messaging Topology Properties ========================== + + /** Holds the name of the property to get the bind publisher procuder flag from. */ + public static final String PUBLISHER_PRODUCER_BIND_PROPNAME = "publisherProducerBind"; + + /** Holds the default value of the publisher producer flag. */ + public static final boolean PUBLISHER_PRODUCER_BIND_DEFAULT = true; + + /** Holds the name of the property to get the bind publisher procuder flag from. */ + public static final String PUBLISHER_CONSUMER_BIND_PROPNAME = "publisherConsumerBind"; + + /** Holds the default value of the publisher consumer flag. */ + public static final boolean PUBLISHER_CONSUMER_BIND_DEFAULT = false; + + /** Holds the name of the property to get the bind receiver procuder flag from. */ + public static final String RECEIVER_PRODUCER_BIND_PROPNAME = "receiverProducerBind"; + + /** Holds the default value of the receiver producer flag. */ + public static final boolean RECEIVER_PRODUCER_BIND_DEFAULT = false; + + /** Holds the name of the property to get the bind receiver procuder flag from. */ + public static final String RECEIVER_CONSUMER_BIND_PROPNAME = "receiverConsumerBind"; + + /** Holds the default value of the receiver consumer flag. */ + public static final boolean RECEIVER_CONSUMER_BIND_DEFAULT = true; + + /** Holds the name of the property to get the destination name root from. */ + public static final String SEND_DESTINATION_NAME_ROOT_PROPNAME = "sendDestinationRoot"; + + /** Holds the root of the name of the default destination to send to. */ + public static final String SEND_DESTINATION_NAME_ROOT_DEFAULT = "sendTo"; + + /** Holds the name of the property to get the destination name root from. */ + public static final String RECEIVE_DESTINATION_NAME_ROOT_PROPNAME = "receiveDestinationRoot"; + + /** Holds the root of the name of the default destination to send to. */ + public static final String RECEIVE_DESTINATION_NAME_ROOT_DEFAULT = "receiveFrom"; + + /** Holds the name of the proeprty to get the destination count from. */ + public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; + + /** Defines the default number of destinations to ping. */ + public static final int DESTINATION_COUNT_DEFAULT = 1; + + /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ + public static final String PUBSUB_PROPNAME = "pubsub"; + + /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ + public static final boolean PUBSUB_DEFAULT = false; + + // ====================== JMS Options and Flags ================================= + + /** Holds the name of the property to get the test delivery mode from. */ + public static final String PERSISTENT_MODE_PROPNAME = "persistent"; + + /** Holds the message delivery mode to use for the test. */ + public static final boolean PERSISTENT_MODE_DEFAULT = false; + + /** Holds the name of the property to get the test transactional mode from. */ + public static final String TRANSACTED_PROPNAME = "transacted"; + + /** Holds the transactional mode to use for the test. */ + public static final boolean TRANSACTED_DEFAULT = false; + + /** Holds the name of the property to set the no local flag from. */ + public static final String NO_LOCAL_PROPNAME = "noLocal"; + + /** Defines the default value of the no local flag to use when consuming messages. */ + public static final boolean NO_LOCAL_DEFAULT = false; + + /** Holds the name of the property to get the message acknowledgement mode from. */ + public static final String ACK_MODE_PROPNAME = "ackMode"; + + /** Defines the default message acknowledgement mode. */ + public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + // ====================== Qpid Options and Flags ================================ + + /** Holds the name of the property to set the exclusive flag from. */ + public static final String EXCLUSIVE_PROPNAME = "exclusive"; + + /** Defines the default value of the exclusive flag to use when consuming messages. */ + public static final boolean EXCLUSIVE_DEFAULT = false; + + /** Holds the name of the property to set the immediate flag from. */ + public static final String IMMEDIATE_PROPNAME = "immediate"; + + /** Defines the default value of the immediate flag to use when sending messages. */ + public static final boolean IMMEDIATE_DEFAULT = false; + + /** Holds the name of the property to set the mandatory flag from. */ + public static final String MANDATORY_PROPNAME = "mandatory"; + + /** Defines the default value of the mandatory flag to use when sending messages. */ + public static final boolean MANDATORY_DEFAULT = false; + + /** Holds the name of the property to get the durable destinations flag from. */ + public static final String DURABLE_DESTS_PROPNAME = "durableDests"; + + /** Default value for the durable destinations flag. */ + public static final boolean DURABLE_DESTS_DEFAULT = false; + + /** Holds the name of the proeprty to set the prefetch size from. */ + public static final String PREFECTH_PROPNAME = "prefetch"; + + /** Defines the default prefetch size to use when consuming messages. */ + public static final int PREFETCH_DEFAULT = 100; + + // ====================== Common Test Parameters ================================ + + /** Holds the name of the property to get the test message size from. */ + public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; + + /** Used to set up a default message size. */ + public static final int MESSAGE_SIZE_DEAFULT = 0; + + /** Holds the name of the property to get the message rate from. */ + public static final String RATE_PROPNAME = "rate"; + + /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ + public static final int RATE_DEFAULT = 0; + + /** Holds the name of the proeprty to get the. */ + public static final String SELECTOR_PROPNAME = "selector"; + + /** Holds the default message selector. */ + public static final String SELECTOR_DEFAULT = ""; + + /** Holds the name of the property to get the waiting timeout for response messages. */ + public static final String TIMEOUT_PROPNAME = "timeout"; + + /** Default time to wait before assuming that a ping has timed out. */ + public static final long TIMEOUT_DEFAULT = 30000; + + /** Holds the name of the property to get the commit batch size from. */ + public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; + + /** Defines the default number of pings to send in each transaction when running transactionally. */ + public static final int TX_BATCH_SIZE_DEFAULT = 1; + + /** Holds the name of the property to set the maximum amount of pending message data for a producer to hold. */ + public static final String MAX_PENDING_PROPNAME = "maxPending"; + + /** Defines the default maximum quantity of pending message data to allow producers to hold. */ + public static final int MAX_PENDING_DEFAULT = 0; + + /** Holds the name of the property to get the verbose mode proeprty from. */ + public static final String VERBOSE_PROPNAME = "verbose"; + + /** Holds the default verbose mode. */ + public static final boolean VERBOSE_DEFAULT = false; + + /** Holds the default configuration properties. */ + public static ParsedProperties defaults = new ParsedProperties(); + + static + { + defaults.setPropertyIfNull(INITIAL_CONTEXT_FACTORY_PROPNAME, INITIAL_CONTEXT_FACTORY_DEFAULT); + defaults.setPropertyIfNull(CONNECTION_PROPNAME, CONNECTION_DEFAULT); + defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); + defaults.setPropertyIfNull(PUBLISHER_PRODUCER_BIND_PROPNAME, PUBLISHER_PRODUCER_BIND_DEFAULT); + defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT); + defaults.setPropertyIfNull(RECEIVER_PRODUCER_BIND_PROPNAME, RECEIVER_PRODUCER_BIND_DEFAULT); + defaults.setPropertyIfNull(RECEIVER_CONSUMER_BIND_PROPNAME, RECEIVER_CONSUMER_BIND_DEFAULT); + defaults.setPropertyIfNull(SEND_DESTINATION_NAME_ROOT_PROPNAME, SEND_DESTINATION_NAME_ROOT_DEFAULT); + defaults.setPropertyIfNull(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME, RECEIVE_DESTINATION_NAME_ROOT_DEFAULT); + defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); + defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); + defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); + defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); + defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); + defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); + defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); + defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); + defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT); + defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT); + defaults.setPropertyIfNull(EXCLUSIVE_PROPNAME, EXCLUSIVE_DEFAULT); + defaults.setPropertyIfNull(IMMEDIATE_PROPNAME, IMMEDIATE_DEFAULT); + defaults.setPropertyIfNull(MANDATORY_PROPNAME, MANDATORY_DEFAULT); + } +} -- cgit v1.2.1