From 07ee3bc84aa1edd801266dbd6b86cfa06a2ea9cc Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 3 May 2007 15:09:18 +0000 Subject: More interop test stuff. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@534903 13f79535-47bb-0310-9956-ffa450edef68 --- .../interop/coordinator/CoordinatingTestCase.java | 53 ++- .../qpid/interop/coordinator/Coordinator.java | 17 +- .../interop/coordinator/InvitingTestDecorator.java | 29 +- .../org/apache/qpid/util/ConversationFactory.java | 390 +++++++++++++++++++++ .../org/apache/qpid/util/ConversationHelper.java | 306 ---------------- 5 files changed, 457 insertions(+), 338 deletions(-) create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java (limited to 'java/integrationtests/src') diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java index 36c270ef11..01d4874c4e 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java @@ -23,13 +23,13 @@ package org.apache.qpid.interop.coordinator; import java.util.Collection; import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; -import javax.jms.JMSException; -import javax.jms.Message; +import javax.jms.*; import junit.framework.TestCase; -import org.apache.qpid.util.ConversationHelper; +import org.apache.qpid.util.ConversationFactory; /** * An CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a @@ -60,7 +60,8 @@ import org.apache.qpid.util.ConversationHelper; *

*
CRC Card
Responsibilities Collaborations *
Accept notification of test case participants. {@link InvitingTestDecorator} - *
Coordinate the test sequence amongst participants. {@link ConversationHelper} + *
Accpet JMS Connection to carry out the coordination over. + *
Coordinate the test sequence amongst participants. {@link ConversationFactory} *
Supply test properties *
*/ @@ -72,7 +73,8 @@ public abstract class CoordinatingTestCase extends TestCase /** Holds the contact details for the receving test client. */ TestClientDetails receiver; - ConversationHelper conversation; + /** Holds the conversation factory over which to coordinate the test. */ + ConversationFactory conversationFactory; /** * Creates a new coordinating test case with the specified name. @@ -124,6 +126,16 @@ public abstract class CoordinatingTestCase extends TestCase return receiver; } + /** + * Accepts the conversation factory over which to hold the test coordinating conversation. + * + * @param conversationFactory The conversation factory to coordinate the test over. + */ + public void setConversationFactory(ConversationFactory conversationFactory) + { + this.conversationFactory = conversationFactory; + } + /** * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner * loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports @@ -137,41 +149,48 @@ public abstract class CoordinatingTestCase extends TestCase */ protected Message[] sequenceTest(Properties testProperties) throws JMSException { + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); + + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + // Assign the sender role to the sending test client. - Message assignSender = conversation.getSession().createMessage(); + Message assignSender = conversationFactory.getSession().createMessage(); assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignSender.setStringProperty("ROLE", "SENDER"); - conversation.send(assignSender); + senderConversation.send(senderControlTopic, assignSender); // Assign the receiver role the receiving client. - Message assignReceiver = conversation.getSession().createMessage(); + Message assignReceiver = session.createMessage(); assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignReceiver.setStringProperty("ROLE", "RECEIVER"); - conversation.send(assignReceiver); + receiverConversation.send(receiverControlTopic, assignReceiver); // Wait for the senders and receivers to confirm their roles. - conversation.receive(); - conversation.receive(); + senderConversation.receive(); + receiverConversation.receive(); // Start the test. - Message start = conversation.getSession().createMessage(); + Message start = session.createMessage(); start.setStringProperty("CONTROL_TYPE", "START"); - conversation.send(start); + senderConversation.send(senderControlTopic, start); // Wait for the test sender to return its report. - Message senderReport = conversation.receive(); + Message senderReport = senderConversation.receive(); // Ask the receiver for its report. - Message statusRequest = conversation.getSession().createMessage(); + Message statusRequest = session.createMessage(); statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); - conversation.send(statusRequest); + receiverConversation.send(receiverControlTopic, statusRequest); // Wait for the receiver to send its report. - Message receiverReport = conversation.receive(); + Message receiverReport = receiverConversation.receive(); return new Message[] { senderReport, receiverReport }; } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java index 5e0f5b4941..3469090369 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java @@ -38,7 +38,7 @@ import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun; import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P; import org.apache.qpid.util.ClasspathScanner; import org.apache.qpid.util.CommandLineParser; -import org.apache.qpid.util.ConversationHelper; +import org.apache.qpid.util.ConversationFactory; import org.apache.qpid.util.PrettyPrintingUtils; import uk.co.thebadgerset.junit.extensions.TestRunnerImprovedErrorHandling; @@ -51,7 +51,7 @@ import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; * *

*
CRC Card
Responsibilities Collaborations - *
Find out what test clients are available. {@link ConversationHelper} + *
Find out what test clients are available. {@link ConversationFactory} *
Decorate available tests to run all available clients. {@link InvitingTestDecorator} *
Attach XML test result logger. *
Terminate the interop testing framework. @@ -73,7 +73,9 @@ public class Coordinator extends TestRunnerImprovedErrorHandling Set enlistedClients = new HashSet(); /** Holds the conversation helper for the control conversation. */ - private ConversationHelper conversation; + private ConversationFactory conversationFactory; + + /** Holds the connection that the coordinating messages are sent over. */ private Connection connection; /** @@ -185,14 +187,15 @@ public class Coordinator extends TestRunnerImprovedErrorHandling Destination controlTopic = session.createTopic("iop.control"); Destination responseQueue = session.createQueue("coordinator"); - conversation = new ConversationHelper(connection, controlTopic, responseQueue, LinkedBlockingQueue.class); + conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); // Broadcast the compulsory invitation to find out what clients are available to test. Message invite = session.createMessage(); invite.setStringProperty("CONTROL_TYPE", "INVITE"); invite.setJMSReplyTo(responseQueue); - conversation.send(invite); + conversation.send(controlTopic, invite); // Wait for a short time, to give test clients an opportunity to reply to the invitation. Collection enlists = conversation.receiveAll(0, 10000); @@ -206,7 +209,7 @@ public class Coordinator extends TestRunnerImprovedErrorHandling Message terminate = session.createMessage(); terminate.setStringProperty("CONTROL_TYPE", "TERMINATE"); - conversation.send(terminate); + conversation.send(controlTopic, terminate); return result; } @@ -283,7 +286,7 @@ public class Coordinator extends TestRunnerImprovedErrorHandling } // Wrap the tests in an inviting test decorator, to perform the invite/test cycle. - targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversation); + targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection); return super.doRun(targetTest, wait); } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java index 2975082631..f6991ef0ef 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java @@ -23,6 +23,8 @@ package org.apache.qpid.interop.coordinator; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; +import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -32,14 +34,14 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; -import org.apache.qpid.util.ConversationHelper; +import org.apache.qpid.util.ConversationFactory; import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; /** *

*
CRC Card
Responsibilities Collaborations - *
Broadcast test invitations and collect enlists. {@link ConversationHelper}. + *
Broadcast test invitations and collect enlists. {@link ConversationFactory}. *
Output test failures for clients unwilling to run the test case. {@link Coordinator} *
Execute coordinated test cases. {@link CoordinatingTestCase} *
@@ -52,7 +54,10 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator Set allClients; /** Holds the conversation helper for the control level conversation for coordinating the test through. */ - ConversationHelper conversation; + ConversationFactory conversationFactory; + + /** Holds the connection that the control conversation is held over. */ + Connection connection; /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */ WrappedSuiteTestDecorator testSuite; @@ -61,11 +66,12 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator * Creates a wrapped suite test decorator from another one. * * @param suite The test suite. - * @param availableClients The list of all clients that responded to the compulsory invite. + * @param availableClients The list of all clients that responded to the compulsory invite. * @param controlConversation The conversation helper for the control level, test coordination conversation. + * @param controlConnection The connection that the coordination messages are sent over. */ public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, - ConversationHelper controlConversation) + ConversationFactory controlConversation, Connection controlConnection) { super(suite); @@ -74,7 +80,8 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator testSuite = suite; allClients = availableClients; - conversation = controlConversation; + conversationFactory = controlConversation; + connection = controlConnection; } /** @@ -103,11 +110,14 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator Set enlists = null; try { - Message invite = conversation.getSession().createMessage(); + Message invite = conversationFactory.getSession().createMessage(); + Destination controlTopic = conversationFactory.getSession().createTopic("iop.control"); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + invite.setStringProperty("CONTROL_TYPE", "INVITE"); invite.setStringProperty("TEST_NAME", coordTest.getName()); - conversation.send(invite); + conversation.send(controlTopic, invite); // Wait for a short time, to give test clients an opportunity to reply to the invitation. Collection replies = conversation.receiveAll(allClients.size(), 10000); @@ -143,6 +153,9 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator coordTest.setSender(enlistedPair.get(0)); coordTest.setReceiver(enlistedPair.get(1)); + // Pass down the connection to hold the coordination conversation over. + coordTest.setConversationFactory(conversationFactory); + // Execute the test case. coordTest.run(testResult); } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java new file mode 100644 index 0000000000..204369b5b9 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java @@ -0,0 +1,390 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.*; + +/** + * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation + * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant + * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids. + * + *

One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a + * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation + * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order + * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded + * conversation (the conversation methods can be called many times in parallel): + * + *

+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ *                                                          java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ *  try {
+ *   // Exchange greetings.
+ *   conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ *   Message greeting = conversation.receive();
+ *
+ *   // Exchange goodbyes.
+ *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ *   Message goodbye = conversation.receive();
+ *  } finally {
+ *   conversation.end();
+ *  }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ *                                                          java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ *   try {
+ *   // Exchange greetings.
+ *   Message greeting = conversation.receive();
+ *   conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ *   // Exchange goodbyes.
+ *   Message goodbye = conversation.receive();
+ *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ *  } finally {
+ *   conversation.end();
+ *  }
+ * }
+ * }
+ * 
+ * + *

Conversation correlation id's are generated on a per thread basis. + * + *

The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS + * sessions are not multi-threaded. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Associate messages to an ongoing conversation using correlation ids. + *
Auto manage sessions for conversations. + *
Store messages not in a conversation in dead letter box. + *
+ */ +public class ConversationFactory +{ + /** Holds a map from correlation id's to queues. */ + private Map> idsToQueues = new HashMap>(); + + /** Holds the connection over which the conversation is conducted. */ + private Connection connection; + + /** Holds the session over which the conversation is conduxted. */ + private Session session; + + /** The message consumer for incoming messages. */ + MessageConsumer consumer; + + /** The message producer for outgoing messages. */ + MessageProducer producer; + + /** The well-known or temporary destination to receive replies on. */ + Destination receiveDestination; + + /** Holds the queue implementation class for the reply queue. */ + Class queueClass; + + /** Used to hold any replies that are received outside of the context of a conversation. */ + BlockingQueue deadLetterBox = new LinkedBlockingQueue(); + + /* Used to hold conversation state on a per thread basis. */ + /* + ThreadLocal threadLocals = + new ThreadLocal() + { + protected Conversation initialValue() + { + Conversation settings = new Conversation(); + settings.conversationId = conversationIdGenerator.getAndIncrement(); + + return settings; + } + }; + */ + + /** Generates new coversation id's as needed. */ + AtomicLong conversationIdGenerator = new AtomicLong(); + + /** + * Creates a conversation helper on the specified connection with the default sending destination, and listening + * to the specified receiving destination. + * + * @param connection The connection to build the conversation helper on. + * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary + * queue. + * @param queueClass The queue implementation class. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. + */ + public ConversationFactory(Connection connection, Destination receiveDestination, + Class queueClass) throws JMSException + { + this.connection = connection; + this.queueClass = queueClass; + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Check if a well-known receive destination has been provided, or use a temporary queue if not. + this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue(); + + consumer = session.createConsumer(receiveDestination); + producer = session.createProducer(null); + + consumer.setMessageListener(new Receiver()); + } + + /** + * Creates a new conversation context. + * + * @return A new conversation context. + */ + public Conversation startConversation() + { + Conversation conversation = new Conversation(); + conversation.conversationId = conversationIdGenerator.getAndIncrement(); + + return conversation; + } + + /** + * Ensures that the reply queue for a conversation exists. + * + * @param conversationId The conversation correlation id. + */ + private void initQueueForId(long conversationId) + { + if (!idsToQueues.containsKey(conversationId)) + { + idsToQueues.put(conversationId, ReflectionUtils.newInstance(queueClass)); + } + } + + /** + * Clears the dead letter box, returning all messages that were in it. + * + * @return All messages in the dead letter box. + */ + public Collection emptyDeadLetterBox() + { + Collection result = new ArrayList(); + deadLetterBox.drainTo(result); + + return result; + } + + /** + * Gets the session over which the conversation is conducted. + * + * @return The session over which the conversation is conducted. + */ + public Session getSession() + { + // Conversation settings = threadLocals.get(); + + return session; + } + + /** + * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply + * destination automatically updated to the last received reply-to destination. + */ + public class Conversation + { + /** Holds the correlation id for the context. */ + long conversationId; + + /** + * Holds the send destination for the context. This will automatically be updated to the most recently received + * reply-to destination. + */ + Destination sendDestination; + + /** + * Sends a message to the default sending location. The correlation id of the message will be assigned by this + * method, overriding any previously set value. + * + * @param sendDestination The destination to send to. This may be null to use the last received reply-to + * destination. + * @param message The message to send. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no + * send destination is specified and there is no most recent reply-to destination available + * to use. + */ + public void send(Destination sendDestination, Message message) throws JMSException + { + // Conversation settings = threadLocals.get(); + // long conversationId = conversationId; + message.setJMSCorrelationID(Long.toString(conversationId)); + message.setJMSReplyTo(receiveDestination); + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + // Check if an overriding send to destination has been set or use the last reply-to if not. + Destination sendTo = null; + + if (sendDestination != null) + { + sendTo = sendDestination; + } + else if (sendDestination != null) + { + sendTo = sendDestination; + } + else + { + throw new JMSException("The send destination was specified, and no most recent reply-to available to use."); + } + + // Send the message. + synchronized (this) + { + producer.send(sendTo, message); + } + } + + /** + * Gets the next message in an ongoing conversation. This method may block until such a message is received. + * + * @return The next incoming message in the conversation. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message + * did not have its reply-to destination set up. + */ + public Message receive() throws JMSException + { + // Conversation settings = threadLocals.get(); + // long conversationId = settings.conversationId; + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + BlockingQueue queue = idsToQueues.get(conversationId); + + try + { + Message result = queue.take(); + + // Keep the reply-to destination to send replies to. + sendDestination = result.getJMSReplyTo(); + + return result; + } + catch (InterruptedException e) + { + return null; + } + } + + /** + * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are + * received they will be returned. If a timeout is specified, then all messages up to the limit, received within + * that timespan will be returned. + * + * @param num The number of messages to receive, or all if this is less than 1. + * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1. + * + * @return All messages received within the count limit and the timeout. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. + */ + public Collection receiveAll(int num, long timeout) throws JMSException + { + Collection result = new ArrayList(); + + for (int i = 0; i < num; i++) + { + result.add(receive()); + } + + return result; + } + + /** + * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any + * incoming messages using them will go to the dead letter box. + */ + public void end() + { + // Ensure that the thread local for the current thread is cleaned up. + // Conversation settings = threadLocals.get(); + // long conversationId = settings.conversationId; + // threadLocals.remove(); + + // Ensure that its queue is removed from the queue map. + BlockingQueue queue = idsToQueues.remove(conversationId); + + // Move any outstanding messages on the threads conversation id into the dead letter box. + queue.drainTo(deadLetterBox); + } + } + + /** + * Implements the message listener for this conversation handler. + */ + protected class Receiver implements MessageListener + { + /** + * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id + * and placed into queues. + * + * @param message The incoming message. + */ + public void onMessage(Message message) + { + try + { + Long conversationId = Long.parseLong(message.getJMSCorrelationID()); + + // Find the converstaion queue to place the message on. If there is no conversation for the message id, + // the the dead letter box queue is used. + BlockingQueue queue = idsToQueues.get(conversationId); + queue = (queue == null) ? deadLetterBox : queue; + + queue.put(message); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java deleted file mode 100644 index 1fd1fee377..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.*; - -/** - * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation - * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant - * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids. - * - *

One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a - * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation - * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order - * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded - * conversation (the conversation methods can be called many times in parallel): - * - *

- * ConversationHelper conversation = new ConversationHelper(connection, sendDesitination, replyDestination,
- *                                                          java.util.concurrent.LinkedBlockingQueue.class);
- *
- * initiateConversation()
- * {
- *  try {
- *   // Exchange greetings.
- *   conversation.send(conversation.getSession().createTextMessage("Hello."));
- *   Message greeting = conversation.receive();
- *
- *   // Exchange goodbyes.
- *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- *   Message goodbye = conversation.receive();
- *  } finally {
- *   conversation.end();
- *  }
- * }
- *
- * respondToConversation()
- * {
- *   try {
- *   // Exchange greetings.
- *   Message greeting = conversation.receive();
- *   conversation.send(conversation.getSession().createTextMessage("Hello."));
- *
- *   // Exchange goodbyes.
- *   Message goodbye = conversation.receive();
- *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- *  } finally {
- *   conversation.end();
- *  }
- * }
- *
- * 
- * - *

- *
CRC Card
Responsibilities Collaborations - *
Associate messages to an ongoing conversation using correlation ids. - *
Auto manage sessions for conversations. - *
Store messages not in a conversation in dead letter box. - *
- * - * @todo Non-transactional, can use shared session. Transactional, must have session per-thread. Session pool? In - * transactional mode, commits must happen before receiving, or no replies will come in. (unless there were some - * pending on the queue?). Also, having received on a particular session, must ensure that session is used for all - * subsequent sends and receive at least until the transaction is committed. So a message selector must be used - * to restrict receives on that session to prevent it picking up messages bound for other conversations. Or use - * a temporary response queue, with only that session listening to it. - * - * @todo Want something convenient that hides details. Write out some example use cases to get the best feel for - * it. Pass in connection, send destination, receive destination. Provide endConvo, send, receive - * methods. Bind corrId, session etc. on thread locals? Clean on endConvo. Provide deadLetter box, that - * uncorrelated or late messages go in. Provide time-out on wait methods, and global time-out. - * PingPongProducer provides a good use-case example (sends messages, waits for replies). - * - * @todo New correlationId on every send? or correlation id per conversation? or callers choice. - */ -public class ConversationHelper -{ - /** Holds a map from correlation id's to queues. */ - private Map> idsToQueues = new HashMap>(); - - private Session session; - private MessageProducer producer; - private MessageConsumer consumer; - - Class queueClass; - - BlockingQueue deadLetterBox = new LinkedBlockingQueue(); - - ThreadLocal threadLocals = - new ThreadLocal() - { - protected PerThreadSettings initialValue() - { - PerThreadSettings settings = new PerThreadSettings(); - settings.conversationId = conversationIdGenerator.getAndIncrement(); - - return settings; - } - }; - - /** Generates new coversation id's as needed. */ - AtomicLong conversationIdGenerator = new AtomicLong(); - - /** - * Creates a conversation helper on the specified connection with the default sending destination, and listening - * to the specified receiving destination. - * - * @param connection The connection to build the conversation helper on. - * @param sendDestination The default sending destiation for all messages. - * @param receiveDestination The destination to listen to for incoming messages. - * @param queueClass The queue implementation class. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. - */ - public ConversationHelper(Connection connection, Destination sendDestination, Destination receiveDestination, - Class queueClass) throws JMSException - { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(sendDestination); - consumer = session.createConsumer(receiveDestination); - - consumer.setMessageListener(new Receiver()); - - this.queueClass = queueClass; - } - - /** - * Sends a message to the default sending location. The correlation id of the message will be assigned by this - * method, overriding any previously set value. - * - * @param message The message to send. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. - */ - public void send(Message message) throws JMSException - { - PerThreadSettings settings = threadLocals.get(); - long conversationId = settings.conversationId; - message.setJMSCorrelationID(Long.toString(conversationId)); - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - - producer.send(message); - } - - /** - * Ensures that the reply queue for a conversation exists. - * - * @param conversationId The conversation correlation id. - */ - private void initQueueForId(long conversationId) - { - if (!idsToQueues.containsKey(conversationId)) - { - idsToQueues.put(conversationId, ReflectionUtils.newInstance(queueClass)); - } - } - - /** - * Gets the next message in an ongoing conversation. This method may block until such a message is received. - * - * @return The next incoming message in the conversation. - */ - public Message receive() - { - PerThreadSettings settings = threadLocals.get(); - long conversationId = settings.conversationId; - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - - BlockingQueue queue = idsToQueues.get(conversationId); - - try - { - return queue.take(); - } - catch (InterruptedException e) - { - return null; - } - } - - /** - * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are - * received they will be returned. If a timeout is specified, then all messages up to the limit, received within - * that timespan will be returned. - * - * @param num The number of messages to receive, or all if this is less than 1. - * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1. - * - * @return All messages received within the count limit and the timeout. - */ - public Collection receiveAll(int num, long timeout) - { - Collection result = new ArrayList(); - - for (int i = 0; i < num; i++) - { - result.add(receive()); - } - - return result; - } - - /** - * Completes the conversation. Any open transactions are committed. Any correlation id's pertaining to the - * conversation are no longer valid, and any incoming messages using them will go to the dead letter box. - */ - public void end() - { - // Ensure that the thread local for the current thread is cleaned up. - PerThreadSettings settings = threadLocals.get(); - long conversationId = settings.conversationId; - threadLocals.remove(); - - // Ensure that its queue is removed from the queue map. - BlockingQueue queue = idsToQueues.remove(conversationId); - - // Move any outstanding messages on the threads conversation id into the dead letter box. - queue.drainTo(deadLetterBox); - } - - /** - * Clears the dead letter box, returning all messages that were in it. - * - * @return All messages in the dead letter box. - */ - public Collection emptyDeadLetterBox() - { - Collection result = new ArrayList(); - deadLetterBox.drainTo(result); - - return result; - } - - /** - * Implements the message listener for this conversation handler. - */ - protected class Receiver implements MessageListener - { - /** - * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id - * and placed into queues. - * - * @param message The incoming message. - */ - public void onMessage(Message message) - { - try - { - Long conversationId = Long.parseLong(message.getJMSCorrelationID()); - - // Find the converstaion queue to place the message on. If there is no conversation for the message id, - // the the dead letter box queue is used. - BlockingQueue queue = idsToQueues.get(conversationId); - queue = (queue == null) ? deadLetterBox : queue; - - queue.put(message); - } - catch (JMSException e) - { - throw new RuntimeException(e); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - - protected class PerThreadSettings - { - /** Holds the correlation id for the current threads conversation. */ - long conversationId; - } - - public Session getSession() - { - return session; - } -} -- cgit v1.2.1