diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-07-15 17:02:07 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-07-15 17:02:07 +0000 |
| commit | d85f0070fc18312f0aeaf6ee23c89ad0b68dd182 (patch) | |
| tree | 28de0b25026771a89f690e614bcd610b52f3383b /qpid/java/systests | |
| parent | f3c0cdf6f1d482127323eb6b505fe1ca0ed500f1 (diff) | |
| download | qpid-python-d85f0070fc18312f0aeaf6ee23c89ad0b68dd182.tar.gz | |
QPID-1079 : Based on Code Review : Remvoed AutoCreateVMBroker code from QpidTestCase. Removed VMTestCase and all references to it, it was only used in JUnit4 testSuite wrappers. Rather than move QpidTestCase to a new module all client tests have that require this class have been moved to systests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@676971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
16 files changed, 16 insertions, 790 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java index 012a983be5..22a1b119fa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.failure; import junit.framework.TestCase; -import org.apache.qpid.testutil.QpidClientConnectionHelper; +import org.apache.qpid.test.utils.QpidClientConnectionHelper; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java index 8f85d4a356..824edd7022 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java @@ -33,7 +33,7 @@ import javax.jms.*; * <tr><td> Provide a message consumer for receiving messages. * </table> * - * @todo Update the {@link org.apache.qpid.util.ConversationFactory} so that it accepts these as the basic conversation + * @todo Update the {@link org.apache.qpid.test.utils.ConversationFactory} so that it accepts these as the basic conversation * connection units. */ public interface CircuitEnd diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalCircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalCircuitFactory.java index 41766b9fae..ec70759cf7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalCircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalCircuitFactory.java @@ -26,7 +26,7 @@ import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl; import org.apache.qpid.test.framework.localcircuit.LocalPublisherImpl; import org.apache.qpid.test.framework.localcircuit.LocalReceiverImpl; import org.apache.qpid.test.framework.sequencers.CircuitFactory; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.util.ParsedProperties; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java index aefeb17d59..f375eda4d1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java @@ -23,7 +23,7 @@ package org.apache.qpid.test.framework.distributedcircuit; import org.apache.log4j.Logger; import org.apache.qpid.test.framework.*; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.TimingController; import org.apache.qpid.junit.extensions.TimingControllerAware; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java index 55f05ec6f2..d532109dc3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java @@ -37,7 +37,7 @@ import org.apache.qpid.test.framework.MessagingTestConfigProperties; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.framework.TestUtils; import org.apache.qpid.test.framework.clocksynch.UDPClockReference; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.TKTestRunner; import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java index d2f8ca896c..bdcfc996d6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java @@ -27,7 +27,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.test.framework.FrameworkBaseCase; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.framework.sequencers.CircuitFactory; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java index d6ae390a4a..eed9b1f290 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java @@ -30,7 +30,7 @@ import org.apache.qpid.test.framework.FrameworkBaseCase; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.framework.sequencers.CircuitFactory; import org.apache.qpid.test.framework.sequencers.FanOutCircuitFactory; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropTestDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropTestDecorator.java index 38ab66d6ae..413d5558f2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropTestDecorator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropTestDecorator.java @@ -29,7 +29,7 @@ import org.apache.qpid.test.framework.FrameworkBaseCase; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.framework.sequencers.CircuitFactory; import org.apache.qpid.test.framework.sequencers.InteropCircuitFactory; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; @@ -47,7 +47,7 @@ import java.util.*; * * <p><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Broadcast test invitations and collect enlists. <td> {@link org.apache.qpid.util.ConversationFactory}. + * <tr><td> Broadcast test invitations and collect enlists. <td> {@link org.apache.qpid.test.utils.ConversationFactory}. * <tr><td> Output test failures for clients unwilling to run the test case. <td> {@link Coordinator} * <tr><td> Execute distributed test cases. <td> {@link FrameworkBaseCase} * <tr><td> Fail non-participating pairings. <td> {@link OptOutTestCase} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/BaseCircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/BaseCircuitFactory.java index c7bde1ae03..bd27fc3d90 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/BaseCircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/BaseCircuitFactory.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.test.framework.Circuit; import org.apache.qpid.test.framework.TestClientDetails; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import java.util.LinkedList; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/CircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/CircuitFactory.java index 0a48d66981..e69952918d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/CircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/CircuitFactory.java @@ -23,7 +23,7 @@ package org.apache.qpid.test.framework.sequencers; import org.apache.qpid.test.framework.Assertion; import org.apache.qpid.test.framework.Circuit; import org.apache.qpid.test.framework.TestClientDetails; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.util.ParsedProperties; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/FanOutCircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/FanOutCircuitFactory.java index 7f8a821c69..8a9c48d8e7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/FanOutCircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/FanOutCircuitFactory.java @@ -27,7 +27,7 @@ import org.apache.qpid.test.framework.Circuit; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.framework.TestUtils; import org.apache.qpid.test.framework.distributedcircuit.DistributedCircuitImpl; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.util.ParsedProperties; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/InteropCircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/InteropCircuitFactory.java index 8604dd7800..7df80bbf10 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/InteropCircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/InteropCircuitFactory.java @@ -27,7 +27,7 @@ import org.apache.qpid.test.framework.Circuit; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.framework.TestUtils; import org.apache.qpid.test.framework.distributedcircuit.DistributedCircuitImpl; -import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.util.ParsedProperties; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index 579e3350ff..ec23256f38 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -22,8 +22,8 @@ package org.apache.qpid.test.unit.close; import org.apache.qpid.AMQException; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.QpidClientConnection; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.testutil.QpidClientConnection; import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 39655ec845..9b879c14d1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -335,7 +336,7 @@ public class QpidTestCase extends TestCase else if (_broker.equals(VM)) { TransportConnection.killAllVMBrokers(); - //ApplicationRegistry.removeAll(); + ApplicationRegistry.removeAll(); } _brokerStarted = false; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java deleted file mode 100644 index c43b65a805..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.testutil; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.JMSAMQException; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -/** - * @todo This was originally cut and paste from the client module leading to a duplicate class, then altered very - * slightly. To avoid the duplicate class the name was altered slightly to have 'Helper' on the end in order - * to distinguish it from the original. Delete this class and use the original instead, just upgrade it to - * provide the new features needed. - */ -public class QpidClientConnectionHelper implements ExceptionListener -{ - - private static final Logger _logger = Logger.getLogger(QpidClientConnectionHelper.class); - - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnectionHelper(String broker) - { - super(); - setVirtualHost("/test"); - setBrokerList(broker); - setPrefetch(5000); - } - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); - _logger.info("connecting to Qpid :" + brokerUrl); - connection = factory.createConnection(); - - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (URLSyntaxException e) - { - throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); - } - } - } - - public void disconnect() throws JMSException - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() - { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - sender.setDeliveryMode(deliveryMode); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - m.setIntProperty("index", i + 1); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) - { - connect(); - } - - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - Message result; - - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message); - } - else if (null == message) - { - result = null; - } - else - { - _logger.info("warning: received non-text message"); - result = message; - } - - return result; - } - - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public Message getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } - - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) - { - connect(); - } - - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; - - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - - session.commit(); - consumer.close(); - _logger.info("consumed: " + messagesReceived); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/ConversationFactory.java deleted file mode 100644 index 00cb458c86..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/util/ConversationFactory.java +++ /dev/null @@ -1,479 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import org.apache.log4j.Logger; - -import javax.jms.*; - -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation - * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant - * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids. - * - * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a - * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation - * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order - * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded - * conversation (the conversation methods can be called many times in parallel): - * - * <p/><pre> - * class Initiator - * { - * ConversationHelper conversation = new ConversationHelper(connection, null, - * java.util.concurrent.LinkedBlockingQueue.class); - * - * initiateConversation() - * { - * try { - * // Exchange greetings. - * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello.")); - * Message greeting = conversation.receive(); - * - * // Exchange goodbyes. - * conversation.send(conversation.getSession().createTextMessage("Goodbye.")); - * Message goodbye = conversation.receive(); - * } finally { - * conversation.end(); - * } - * } - * } - * - * class Responder - * { - * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination, - * java.util.concurrent.LinkedBlockingQueue.class); - * - * respondToConversation() - * { - * try { - * // Exchange greetings. - * Message greeting = conversation.receive(); - * conversation.send(conversation.getSession().createTextMessage("Hello.")); - * - * // Exchange goodbyes. - * Message goodbye = conversation.receive(); - * conversation.send(conversation.getSession().createTextMessage("Goodbye.")); - * } finally { - * conversation.end(); - * } - * } - * } - * </pre> - * - * <p/>Conversation correlation id's are generated on a per thread basis. - * - * <p/>The same controlSession is shared amongst all conversations. Calls to send are therefore synchronized because JMS - * sessions are not multi-threaded. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Associate messages to an ongoing conversation using correlation ids. - * <tr><td> Auto manage sessions for conversations. - * <tr><td> Store messages not in a conversation in dead letter box. - * </table> - */ -public class ConversationFactory -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(ConversationFactory.class); - - /** Holds a map from correlation id's to queues. */ - private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>(); - - /** Holds the connection over which the conversation is conducted. */ - private Connection connection; - - /** Holds the controlSession over which the conversation is conduxted. */ - private Session session; - - /** The message consumer for incoming messages. */ - MessageConsumer consumer; - - /** The message producer for outgoing messages. */ - MessageProducer producer; - - /** The well-known or temporary destination to receive replies on. */ - Destination receiveDestination; - - /** Holds the queue implementation class for the reply queue. */ - Class<? extends BlockingQueue> queueClass; - - /** Used to hold any replies that are received outside of the context of a conversation. */ - BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>(); - - /* Used to hold conversation state on a per thread basis. */ - /* - ThreadLocal<Conversation> threadLocals = - new ThreadLocal<Conversation>() - { - protected Conversation initialValue() - { - Conversation settings = new Conversation(); - settings.conversationId = conversationIdGenerator.getAndIncrement(); - - return settings; - } - }; - */ - - /** Generates new coversation id's as needed. */ - AtomicLong conversationIdGenerator = new AtomicLong(); - - /** - * Creates a conversation helper on the specified connection with the default sending destination, and listening - * to the specified receiving destination. - * - * @param connection The connection to build the conversation helper on. - * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary - * queue. - * @param queueClass The queue implementation class. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - public ConversationFactory(Connection connection, Destination receiveDestination, - Class<? extends BlockingQueue> queueClass) throws JMSException - { - log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination - + ", Class<? extends BlockingQueue> queueClass = " + queueClass + "): called"); - - this.connection = connection; - this.queueClass = queueClass; - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Check if a well-known receive destination has been provided, or use a temporary queue if not. - this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue(); - - consumer = session.createConsumer(receiveDestination); - producer = session.createProducer(null); - - consumer.setMessageListener(new Receiver()); - } - - /** - * Creates a new conversation context. - * - * @return A new conversation context. - */ - public Conversation startConversation() - { - log.debug("public Conversation startConversation(): called"); - - Conversation conversation = new Conversation(); - conversation.conversationId = conversationIdGenerator.getAndIncrement(); - - return conversation; - } - - /** - * Ensures that the reply queue for a conversation exists. - * - * @param conversationId The conversation correlation id. - */ - private void initQueueForId(long conversationId) - { - if (!idsToQueues.containsKey(conversationId)) - { - idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass)); - } - } - - /** - * Clears the dead letter box, returning all messages that were in it. - * - * @return All messages in the dead letter box. - */ - public Collection<Message> emptyDeadLetterBox() - { - log.debug("public Collection<Message> emptyDeadLetterBox(): called"); - - Collection<Message> result = new ArrayList<Message>(); - deadLetterBox.drainTo(result); - - return result; - } - - /** - * Gets the controlSession over which the conversation is conducted. - * - * @return The controlSession over which the conversation is conducted. - */ - public Session getSession() - { - // Conversation settings = threadLocals.get(); - - return session; - } - - /** - * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply - * destination automatically updated to the last received reply-to destination. - */ - public class Conversation - { - /** Holds the correlation id for the context. */ - long conversationId; - - /** - * Holds the send destination for the context. This will automatically be updated to the most recently received - * reply-to destination. - */ - Destination sendDestination; - - /** - * Sends a message to the default sending location. The correlation id of the message will be assigned by this - * method, overriding any previously set value. - * - * @param sendDestination The destination to send to. This may be null to use the last received reply-to - * destination. - * @param message The message to send. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no - * send destination is specified and there is no most recent reply-to destination available - * to use. - */ - public void send(Destination sendDestination, Message message) throws JMSException - { - log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = " + message - + "): called"); - - // Conversation settings = threadLocals.get(); - // long conversationId = conversationId; - message.setJMSCorrelationID(Long.toString(conversationId)); - message.setJMSReplyTo(receiveDestination); - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - - // Check if an overriding send to destination has been set or use the last reply-to if not. - Destination sendTo = null; - - if (sendDestination != null) - { - sendTo = sendDestination; - } - else if (sendDestination != null) - { - sendTo = sendDestination; - } - else - { - throw new JMSException("The send destination was specified, and no most recent reply-to available to use."); - } - - // Send the message. - synchronized (this) - { - producer.send(sendTo, message); - } - } - - /** - * Gets the next message in an ongoing conversation. This method may block until such a message is received. - * - * @return The next incoming message in the conversation. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message - * did not have its reply-to destination set up. - */ - public Message receive() throws JMSException - { - log.debug("public Message receive(): called"); - - // Conversation settings = threadLocals.get(); - // long conversationId = settings.conversationId; - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - - BlockingQueue<Message> queue = idsToQueues.get(conversationId); - - try - { - Message result = queue.take(); - - // Keep the reply-to destination to send replies to. - sendDestination = result.getJMSReplyTo(); - - return result; - } - catch (InterruptedException e) - { - return null; - } - } - - /** - * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are - * received they will be returned. If a timeout is specified, then all messages up to the limit, received within - * that timespan will be returned. At least one of the message count or timeout should be set to a value of - * 1 or greater. - * - * @param num The number of messages to receive, or all if this is less than 1. - * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1. - * - * @return All messages received within the count limit and the timeout. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. - */ - public Collection<Message> receiveAll(int num, long timeout) throws JMSException - { - log.debug("public Collection<Message> receiveAll(int num = " + num + ", long timeout = " + timeout - + "): called"); - - // Check that a timeout or message count was set. - if ((num < 1) && (timeout < 1)) - { - throw new IllegalArgumentException("At least one of message count (num) or timeout must be set."); - } - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - BlockingQueue<Message> queue = idsToQueues.get(conversationId); - - // Used to collect the received messages in. - Collection<Message> result = new ArrayList<Message>(); - - // Used to indicate when the timeout or message count has expired. - boolean receiveMore = true; - - int messageCount = 0; - - // Receive messages until the timeout or message count expires. - do - { - try - { - Message next = null; - - // Try to receive the message with a timeout if one has been set. - if (timeout > 0) - { - next = queue.poll(timeout, TimeUnit.MILLISECONDS); - - // Check if the timeout expired, and stop receiving if so. - if (next == null) - { - receiveMore = false; - } - } - // Receive the message without a timeout. - else - { - next = queue.take(); - } - - // Increment the message count if a message was received. - messageCount += (next != null) ? 1 : 0; - - // Check if all the requested messages were received, and stop receiving if so. - if ((num > 0) && (messageCount >= num)) - { - receiveMore = false; - } - - // Keep the reply-to destination to send replies to. - sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination; - - if (next != null) - { - result.add(next); - } - } - catch (InterruptedException e) - { - // Restore the threads interrupted status. - Thread.currentThread().interrupt(); - - // Stop receiving but return the messages received so far. - receiveMore = false; - } - } - while (receiveMore); - - return result; - } - - /** - * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any - * incoming messages using them will go to the dead letter box. - */ - public void end() - { - log.debug("public void end(): called"); - - // Ensure that the thread local for the current thread is cleaned up. - // Conversation settings = threadLocals.get(); - // long conversationId = settings.conversationId; - // threadLocals.remove(); - - // Ensure that its queue is removed from the queue map. - BlockingQueue<Message> queue = idsToQueues.remove(conversationId); - - // Move any outstanding messages on the threads conversation id into the dead letter box. - queue.drainTo(deadLetterBox); - } - } - - /** - * Implements the message listener for this conversation handler. - */ - protected class Receiver implements MessageListener - { - /** - * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id - * and placed into queues. - * - * @param message The incoming message. - */ - public void onMessage(Message message) - { - log.debug("public void onMessage(Message message = " + message + "): called"); - - try - { - Long conversationId = Long.parseLong(message.getJMSCorrelationID()); - - // Find the converstaion queue to place the message on. If there is no conversation for the message id, - // the the dead letter box queue is used. - BlockingQueue<Message> queue = idsToQueues.get(conversationId); - queue = (queue == null) ? deadLetterBox : queue; - - queue.put(message); - } - catch (JMSException e) - { - throw new RuntimeException(e); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } -} |
