summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-05-03 15:09:18 +0000
committerRobert Greig <rgreig@apache.org>2007-05-03 15:09:18 +0000
commit07ee3bc84aa1edd801266dbd6b86cfa06a2ea9cc (patch)
tree3fcd2d11322ecf1aa5dc75c57162ca116f5265c9 /java
parent6a499e94e51e5dc975fe59f3746fac594694ea71 (diff)
downloadqpid-python-07ee3bc84aa1edd801266dbd6b86cfa06a2ea9cc.tar.gz
More interop test stuff.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@534903 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java53
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java17
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java29
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java390
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java306
5 files changed, 457 insertions, 338 deletions
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
index 36c270ef11..01d4874c4e 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
@@ -23,13 +23,13 @@ package org.apache.qpid.interop.coordinator;
import java.util.Collection;
import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
import junit.framework.TestCase;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
/**
* An CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
@@ -60,7 +60,8 @@ import org.apache.qpid.util.ConversationHelper;
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Accept notification of test case participants. <td> {@link InvitingTestDecorator}
- * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationHelper}
+ * <tr><td> Accpet JMS Connection to carry out the coordination over.
+ * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationFactory}
* <tr><td> Supply test properties
* </table>
*/
@@ -72,7 +73,8 @@ public abstract class CoordinatingTestCase extends TestCase
/** Holds the contact details for the receving test client. */
TestClientDetails receiver;
- ConversationHelper conversation;
+ /** Holds the conversation factory over which to coordinate the test. */
+ ConversationFactory conversationFactory;
/**
* Creates a new coordinating test case with the specified name.
@@ -125,6 +127,16 @@ public abstract class CoordinatingTestCase extends TestCase
}
/**
+ * Accepts the conversation factory over which to hold the test coordinating conversation.
+ *
+ * @param conversationFactory The conversation factory to coordinate the test over.
+ */
+ public void setConversationFactory(ConversationFactory conversationFactory)
+ {
+ this.conversationFactory = conversationFactory;
+ }
+
+ /**
* Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner
* loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports
* from the participants.
@@ -137,41 +149,48 @@ public abstract class CoordinatingTestCase extends TestCase
*/
protected Message[] sequenceTest(Properties testProperties) throws JMSException
{
+ Session session = conversationFactory.getSession();
+ Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+ Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+
+ ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+ ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+
// Assign the sender role to the sending test client.
- Message assignSender = conversation.getSession().createMessage();
+ Message assignSender = conversationFactory.getSession().createMessage();
assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignSender.setStringProperty("ROLE", "SENDER");
- conversation.send(assignSender);
+ senderConversation.send(senderControlTopic, assignSender);
// Assign the receiver role the receiving client.
- Message assignReceiver = conversation.getSession().createMessage();
+ Message assignReceiver = session.createMessage();
assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignReceiver.setStringProperty("ROLE", "RECEIVER");
- conversation.send(assignReceiver);
+ receiverConversation.send(receiverControlTopic, assignReceiver);
// Wait for the senders and receivers to confirm their roles.
- conversation.receive();
- conversation.receive();
+ senderConversation.receive();
+ receiverConversation.receive();
// Start the test.
- Message start = conversation.getSession().createMessage();
+ Message start = session.createMessage();
start.setStringProperty("CONTROL_TYPE", "START");
- conversation.send(start);
+ senderConversation.send(senderControlTopic, start);
// Wait for the test sender to return its report.
- Message senderReport = conversation.receive();
+ Message senderReport = senderConversation.receive();
// Ask the receiver for its report.
- Message statusRequest = conversation.getSession().createMessage();
+ Message statusRequest = session.createMessage();
statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
- conversation.send(statusRequest);
+ receiverConversation.send(receiverControlTopic, statusRequest);
// Wait for the receiver to send its report.
- Message receiverReport = conversation.receive();
+ Message receiverReport = receiverConversation.receive();
return new Message[] { senderReport, receiverReport };
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
index 5e0f5b4941..3469090369 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
@@ -38,7 +38,7 @@ import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
import org.apache.qpid.util.ClasspathScanner;
import org.apache.qpid.util.CommandLineParser;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
import org.apache.qpid.util.PrettyPrintingUtils;
import uk.co.thebadgerset.junit.extensions.TestRunnerImprovedErrorHandling;
@@ -51,7 +51,7 @@ import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
*
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Find out what test clients are available. <td> {@link ConversationHelper}
+ * <tr><td> Find out what test clients are available. <td> {@link ConversationFactory}
* <tr><td> Decorate available tests to run all available clients. <td> {@link InvitingTestDecorator}
* <tr><td> Attach XML test result logger.
* <tr><td> Terminate the interop testing framework.
@@ -73,7 +73,9 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
/** Holds the conversation helper for the control conversation. */
- private ConversationHelper conversation;
+ private ConversationFactory conversationFactory;
+
+ /** Holds the connection that the coordinating messages are sent over. */
private Connection connection;
/**
@@ -185,14 +187,15 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
Destination controlTopic = session.createTopic("iop.control");
Destination responseQueue = session.createQueue("coordinator");
- conversation = new ConversationHelper(connection, controlTopic, responseQueue, LinkedBlockingQueue.class);
+ conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class);
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
// Broadcast the compulsory invitation to find out what clients are available to test.
Message invite = session.createMessage();
invite.setStringProperty("CONTROL_TYPE", "INVITE");
invite.setJMSReplyTo(responseQueue);
- conversation.send(invite);
+ conversation.send(controlTopic, invite);
// Wait for a short time, to give test clients an opportunity to reply to the invitation.
Collection<Message> enlists = conversation.receiveAll(0, 10000);
@@ -206,7 +209,7 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
Message terminate = session.createMessage();
terminate.setStringProperty("CONTROL_TYPE", "TERMINATE");
- conversation.send(terminate);
+ conversation.send(controlTopic, terminate);
return result;
}
@@ -283,7 +286,7 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
}
// Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
- targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversation);
+ targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
return super.doRun(targetTest, wait);
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
index 2975082631..f6991ef0ef 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
@@ -23,6 +23,8 @@ package org.apache.qpid.interop.coordinator;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
+import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -32,14 +34,14 @@ import junit.framework.TestSuite;
import org.apache.log4j.Logger;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
/**
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationHelper}.
+ * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationFactory}.
* <tr><td> Output test failures for clients unwilling to run the test case. <td> {@link Coordinator}
* <tr><td> Execute coordinated test cases. <td> {@link CoordinatingTestCase}
* </table>
@@ -52,7 +54,10 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
Set<TestClientDetails> allClients;
/** Holds the conversation helper for the control level conversation for coordinating the test through. */
- ConversationHelper conversation;
+ ConversationFactory conversationFactory;
+
+ /** Holds the connection that the control conversation is held over. */
+ Connection connection;
/** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
WrappedSuiteTestDecorator testSuite;
@@ -61,11 +66,12 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
* Creates a wrapped suite test decorator from another one.
*
* @param suite The test suite.
- * @param availableClients The list of all clients that responded to the compulsory invite.
+ * @param availableClients The list of all clients that responded to the compulsory invite.
* @param controlConversation The conversation helper for the control level, test coordination conversation.
+ * @param controlConnection The connection that the coordination messages are sent over.
*/
public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
- ConversationHelper controlConversation)
+ ConversationFactory controlConversation, Connection controlConnection)
{
super(suite);
@@ -74,7 +80,8 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
testSuite = suite;
allClients = availableClients;
- conversation = controlConversation;
+ conversationFactory = controlConversation;
+ connection = controlConnection;
}
/**
@@ -103,11 +110,14 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
Set<TestClientDetails> enlists = null;
try
{
- Message invite = conversation.getSession().createMessage();
+ Message invite = conversationFactory.getSession().createMessage();
+ Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
invite.setStringProperty("CONTROL_TYPE", "INVITE");
invite.setStringProperty("TEST_NAME", coordTest.getName());
- conversation.send(invite);
+ conversation.send(controlTopic, invite);
// Wait for a short time, to give test clients an opportunity to reply to the invitation.
Collection<Message> replies = conversation.receiveAll(allClients.size(), 10000);
@@ -143,6 +153,9 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
coordTest.setSender(enlistedPair.get(0));
coordTest.setReceiver(enlistedPair.get(1));
+ // Pass down the connection to hold the coordination conversation over.
+ coordTest.setConversationFactory(conversationFactory);
+
// Execute the test case.
coordTest.run(testResult);
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
new file mode 100644
index 0000000000..204369b5b9
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
@@ -0,0 +1,390 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
+/**
+ * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
+ * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
+ * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
+ *
+ * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
+ * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
+ * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
+ * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
+ * conversation (the conversation methods can be called many times in parallel):
+ *
+ * <p/><pre>
+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ * Message greeting = conversation.receive();
+ *
+ * // Exchange goodbyes.
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * Message goodbye = conversation.receive();
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * Message greeting = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ * // Exchange goodbyes.
+ * Message goodbye = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p/>Conversation correlation id's are generated on a per thread basis.
+ *
+ * <p/>The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS
+ * sessions are not multi-threaded.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><th> Associate messages to an ongoing conversation using correlation ids.
+ * <tr><td> Auto manage sessions for conversations.
+ * <tr><td> Store messages not in a conversation in dead letter box.
+ * </table>
+ */
+public class ConversationFactory
+{
+ /** Holds a map from correlation id's to queues. */
+ private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+ /** Holds the connection over which the conversation is conducted. */
+ private Connection connection;
+
+ /** Holds the session over which the conversation is conduxted. */
+ private Session session;
+
+ /** The message consumer for incoming messages. */
+ MessageConsumer consumer;
+
+ /** The message producer for outgoing messages. */
+ MessageProducer producer;
+
+ /** The well-known or temporary destination to receive replies on. */
+ Destination receiveDestination;
+
+ /** Holds the queue implementation class for the reply queue. */
+ Class<? extends BlockingQueue> queueClass;
+
+ /** Used to hold any replies that are received outside of the context of a conversation. */
+ BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+ /* Used to hold conversation state on a per thread basis. */
+ /*
+ ThreadLocal<Conversation> threadLocals =
+ new ThreadLocal<Conversation>()
+ {
+ protected Conversation initialValue()
+ {
+ Conversation settings = new Conversation();
+ settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return settings;
+ }
+ };
+ */
+
+ /** Generates new coversation id's as needed. */
+ AtomicLong conversationIdGenerator = new AtomicLong();
+
+ /**
+ * Creates a conversation helper on the specified connection with the default sending destination, and listening
+ * to the specified receiving destination.
+ *
+ * @param connection The connection to build the conversation helper on.
+ * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
+ * queue.
+ * @param queueClass The queue implementation class.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public ConversationFactory(Connection connection, Destination receiveDestination,
+ Class<? extends BlockingQueue> queueClass) throws JMSException
+ {
+ this.connection = connection;
+ this.queueClass = queueClass;
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Check if a well-known receive destination has been provided, or use a temporary queue if not.
+ this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
+
+ consumer = session.createConsumer(receiveDestination);
+ producer = session.createProducer(null);
+
+ consumer.setMessageListener(new Receiver());
+ }
+
+ /**
+ * Creates a new conversation context.
+ *
+ * @return A new conversation context.
+ */
+ public Conversation startConversation()
+ {
+ Conversation conversation = new Conversation();
+ conversation.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return conversation;
+ }
+
+ /**
+ * Ensures that the reply queue for a conversation exists.
+ *
+ * @param conversationId The conversation correlation id.
+ */
+ private void initQueueForId(long conversationId)
+ {
+ if (!idsToQueues.containsKey(conversationId))
+ {
+ idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
+ }
+ }
+
+ /**
+ * Clears the dead letter box, returning all messages that were in it.
+ *
+ * @return All messages in the dead letter box.
+ */
+ public Collection<Message> emptyDeadLetterBox()
+ {
+ Collection<Message> result = new ArrayList<Message>();
+ deadLetterBox.drainTo(result);
+
+ return result;
+ }
+
+ /**
+ * Gets the session over which the conversation is conducted.
+ *
+ * @return The session over which the conversation is conducted.
+ */
+ public Session getSession()
+ {
+ // Conversation settings = threadLocals.get();
+
+ return session;
+ }
+
+ /**
+ * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
+ * destination automatically updated to the last received reply-to destination.
+ */
+ public class Conversation
+ {
+ /** Holds the correlation id for the context. */
+ long conversationId;
+
+ /**
+ * Holds the send destination for the context. This will automatically be updated to the most recently received
+ * reply-to destination.
+ */
+ Destination sendDestination;
+
+ /**
+ * Sends a message to the default sending location. The correlation id of the message will be assigned by this
+ * method, overriding any previously set value.
+ *
+ * @param sendDestination The destination to send to. This may be null to use the last received reply-to
+ * destination.
+ * @param message The message to send.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
+ * send destination is specified and there is no most recent reply-to destination available
+ * to use.
+ */
+ public void send(Destination sendDestination, Message message) throws JMSException
+ {
+ // Conversation settings = threadLocals.get();
+ // long conversationId = conversationId;
+ message.setJMSCorrelationID(Long.toString(conversationId));
+ message.setJMSReplyTo(receiveDestination);
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ // Check if an overriding send to destination has been set or use the last reply-to if not.
+ Destination sendTo = null;
+
+ if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else
+ {
+ throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
+ }
+
+ // Send the message.
+ synchronized (this)
+ {
+ producer.send(sendTo, message);
+ }
+ }
+
+ /**
+ * Gets the next message in an ongoing conversation. This method may block until such a message is received.
+ *
+ * @return The next incoming message in the conversation.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
+ * did not have its reply-to destination set up.
+ */
+ public Message receive() throws JMSException
+ {
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ try
+ {
+ Message result = queue.take();
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = result.getJMSReplyTo();
+
+ return result;
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
+ * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
+ * that timespan will be returned.
+ *
+ * @param num The number of messages to receive, or all if this is less than 1.
+ * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
+ *
+ * @return All messages received within the count limit and the timeout.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public Collection<Message> receiveAll(int num, long timeout) throws JMSException
+ {
+ Collection<Message> result = new ArrayList<Message>();
+
+ for (int i = 0; i < num; i++)
+ {
+ result.add(receive());
+ }
+
+ return result;
+ }
+
+ /**
+ * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
+ * incoming messages using them will go to the dead letter box.
+ */
+ public void end()
+ {
+ // Ensure that the thread local for the current thread is cleaned up.
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+ // threadLocals.remove();
+
+ // Ensure that its queue is removed from the queue map.
+ BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+ // Move any outstanding messages on the threads conversation id into the dead letter box.
+ queue.drainTo(deadLetterBox);
+ }
+ }
+
+ /**
+ * Implements the message listener for this conversation handler.
+ */
+ protected class Receiver implements MessageListener
+ {
+ /**
+ * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
+ * and placed into queues.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+ // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+ // the the dead letter box queue is used.
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+ queue = (queue == null) ? deadLetterBox : queue;
+
+ queue.put(message);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
deleted file mode 100644
index 1fd1fee377..0000000000
--- a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.util;
-
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.*;
-
-/**
- * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
- * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
- * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
- *
- * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
- * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
- * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
- * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
- * conversation (the conversation methods can be called many times in parallel):
- *
- * <p/><pre>
- * ConversationHelper conversation = new ConversationHelper(connection, sendDesitination, replyDestination,
- * java.util.concurrent.LinkedBlockingQueue.class);
- *
- * initiateConversation()
- * {
- * try {
- * // Exchange greetings.
- * conversation.send(conversation.getSession().createTextMessage("Hello."));
- * Message greeting = conversation.receive();
- *
- * // Exchange goodbyes.
- * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- * Message goodbye = conversation.receive();
- * } finally {
- * conversation.end();
- * }
- * }
- *
- * respondToConversation()
- * {
- * try {
- * // Exchange greetings.
- * Message greeting = conversation.receive();
- * conversation.send(conversation.getSession().createTextMessage("Hello."));
- *
- * // Exchange goodbyes.
- * Message goodbye = conversation.receive();
- * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- * } finally {
- * conversation.end();
- * }
- * }
- *
- * </pre>
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><th> Associate messages to an ongoing conversation using correlation ids.
- * <tr><td> Auto manage sessions for conversations.
- * <tr><td> Store messages not in a conversation in dead letter box.
- * </table>
- *
- * @todo Non-transactional, can use shared session. Transactional, must have session per-thread. Session pool? In
- * transactional mode, commits must happen before receiving, or no replies will come in. (unless there were some
- * pending on the queue?). Also, having received on a particular session, must ensure that session is used for all
- * subsequent sends and receive at least until the transaction is committed. So a message selector must be used
- * to restrict receives on that session to prevent it picking up messages bound for other conversations. Or use
- * a temporary response queue, with only that session listening to it.
- *
- * @todo Want something convenient that hides details. Write out some example use cases to get the best feel for
- * it. Pass in connection, send destination, receive destination. Provide endConvo, send, receive
- * methods. Bind corrId, session etc. on thread locals? Clean on endConvo. Provide deadLetter box, that
- * uncorrelated or late messages go in. Provide time-out on wait methods, and global time-out.
- * PingPongProducer provides a good use-case example (sends messages, waits for replies).
- *
- * @todo New correlationId on every send? or correlation id per conversation? or callers choice.
- */
-public class ConversationHelper
-{
- /** Holds a map from correlation id's to queues. */
- private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
-
- private Session session;
- private MessageProducer producer;
- private MessageConsumer consumer;
-
- Class<? extends BlockingQueue> queueClass;
-
- BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
-
- ThreadLocal<PerThreadSettings> threadLocals =
- new ThreadLocal<PerThreadSettings>()
- {
- protected PerThreadSettings initialValue()
- {
- PerThreadSettings settings = new PerThreadSettings();
- settings.conversationId = conversationIdGenerator.getAndIncrement();
-
- return settings;
- }
- };
-
- /** Generates new coversation id's as needed. */
- AtomicLong conversationIdGenerator = new AtomicLong();
-
- /**
- * Creates a conversation helper on the specified connection with the default sending destination, and listening
- * to the specified receiving destination.
- *
- * @param connection The connection to build the conversation helper on.
- * @param sendDestination The default sending destiation for all messages.
- * @param receiveDestination The destination to listen to for incoming messages.
- * @param queueClass The queue implementation class.
- *
- * @throws JMSException All undelying JMSExceptions are allowed to fall through.
- */
- public ConversationHelper(Connection connection, Destination sendDestination, Destination receiveDestination,
- Class<? extends BlockingQueue> queueClass) throws JMSException
- {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(sendDestination);
- consumer = session.createConsumer(receiveDestination);
-
- consumer.setMessageListener(new Receiver());
-
- this.queueClass = queueClass;
- }
-
- /**
- * Sends a message to the default sending location. The correlation id of the message will be assigned by this
- * method, overriding any previously set value.
- *
- * @param message The message to send.
- *
- * @throws JMSException All undelying JMSExceptions are allowed to fall through.
- */
- public void send(Message message) throws JMSException
- {
- PerThreadSettings settings = threadLocals.get();
- long conversationId = settings.conversationId;
- message.setJMSCorrelationID(Long.toString(conversationId));
-
- // Ensure that the reply queue for this conversation exists.
- initQueueForId(conversationId);
-
- producer.send(message);
- }
-
- /**
- * Ensures that the reply queue for a conversation exists.
- *
- * @param conversationId The conversation correlation id.
- */
- private void initQueueForId(long conversationId)
- {
- if (!idsToQueues.containsKey(conversationId))
- {
- idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
- }
- }
-
- /**
- * Gets the next message in an ongoing conversation. This method may block until such a message is received.
- *
- * @return The next incoming message in the conversation.
- */
- public Message receive()
- {
- PerThreadSettings settings = threadLocals.get();
- long conversationId = settings.conversationId;
-
- // Ensure that the reply queue for this conversation exists.
- initQueueForId(conversationId);
-
- BlockingQueue<Message> queue = idsToQueues.get(conversationId);
-
- try
- {
- return queue.take();
- }
- catch (InterruptedException e)
- {
- return null;
- }
- }
-
- /**
- * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
- * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
- * that timespan will be returned.
- *
- * @param num The number of messages to receive, or all if this is less than 1.
- * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
- *
- * @return All messages received within the count limit and the timeout.
- */
- public Collection<Message> receiveAll(int num, long timeout)
- {
- Collection<Message> result = new ArrayList<Message>();
-
- for (int i = 0; i < num; i++)
- {
- result.add(receive());
- }
-
- return result;
- }
-
- /**
- * Completes the conversation. Any open transactions are committed. Any correlation id's pertaining to the
- * conversation are no longer valid, and any incoming messages using them will go to the dead letter box.
- */
- public void end()
- {
- // Ensure that the thread local for the current thread is cleaned up.
- PerThreadSettings settings = threadLocals.get();
- long conversationId = settings.conversationId;
- threadLocals.remove();
-
- // Ensure that its queue is removed from the queue map.
- BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
-
- // Move any outstanding messages on the threads conversation id into the dead letter box.
- queue.drainTo(deadLetterBox);
- }
-
- /**
- * Clears the dead letter box, returning all messages that were in it.
- *
- * @return All messages in the dead letter box.
- */
- public Collection<Message> emptyDeadLetterBox()
- {
- Collection<Message> result = new ArrayList<Message>();
- deadLetterBox.drainTo(result);
-
- return result;
- }
-
- /**
- * Implements the message listener for this conversation handler.
- */
- protected class Receiver implements MessageListener
- {
- /**
- * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
- * and placed into queues.
- *
- * @param message The incoming message.
- */
- public void onMessage(Message message)
- {
- try
- {
- Long conversationId = Long.parseLong(message.getJMSCorrelationID());
-
- // Find the converstaion queue to place the message on. If there is no conversation for the message id,
- // the the dead letter box queue is used.
- BlockingQueue<Message> queue = idsToQueues.get(conversationId);
- queue = (queue == null) ? deadLetterBox : queue;
-
- queue.put(message);
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- protected class PerThreadSettings
- {
- /** Holds the correlation id for the current threads conversation. */
- long conversationId;
- }
-
- public Session getSession()
- {
- return session;
- }
-}