From 0370e5550e1d9bc72d742bbbee1f6f0e2835406e Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 5 Apr 2007 13:36:04 +0000 Subject: Merged revisions 525531-525536 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r525531 | rgreig | 2007-04-04 16:18:44 +0100 (Wed, 04 Apr 2007) | 1 line Added standard command line handline ........ r525533 | rgreig | 2007-04-04 16:19:38 +0100 (Wed, 04 Apr 2007) | 1 line Added simeple file copy function. ........ r525535 | rgreig | 2007-04-04 16:20:30 +0100 (Wed, 04 Apr 2007) | 1 line Added comments and logging to track down bug. ........ r525536 | rgreig | 2007-04-04 16:21:43 +0100 (Wed, 04 Apr 2007) | 1 line Fixed dangling transaction problem by correctly binding queue. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@525825 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/store/MessageStore.java | 190 +++++++++++++++++++-- .../org/apache/qpid/server/store/StoreContext.java | 16 +- .../org/apache/qpid/util/CommandLineParser.java | 42 ++++- .../main/java/org/apache/qpid/util/FileUtils.java | 36 ++++ .../org/apache/qpid/ping/PingDurableClient.java | 5 +- .../apache/qpid/requestreply/PingPongProducer.java | 92 +++++----- 6 files changed, 308 insertions(+), 73 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 21988d97a8..2a83d9b649 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -21,73 +21,241 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; +/** + * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues + * and exchanges in a transactional manner. + * + *

All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which + * encapsulates the transactional context they are performed in. Many such operations can be carried out in a single + * transaction. + * + *

The storage and removal of queues and exchanges, are not carried out in a transactional context. + * + *

+ *
CRC Card
Responsibilities + *
Accept transaction boundary demarcations: Begin, Commit, Abort. + *
Store and remove queues. + *
Store and remove exchanges. + *
Store and remove messages. + *
Bind and unbind queues to exchanges. + *
Enqueue and dequeue messages to queues. + *
Generate message identifiers. + *
+ */ public interface MessageStore { /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. - * @param virtualHost the virtual host using by this store - * @param base the base element identifier from which all configuration items are relative. For example, if the base - * element is "store", the all elements used by concrete classes will be "store.foo" etc. - * @param config the apache commons configuration object - * @throws Exception if an error occurs that means the store is unable to configure itself + * + * @param virtualHost The virtual host using by this store + * @param base The base element identifier from which all configuration items are relative. For example, if + * the base element is "store", the all elements used by concrete classes will be "store.foo" etc. + * @param config The apache commons configuration object. + * + * @throws Exception If any error occurs that means the store is unable to configure itself. */ void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. - * @throws Exception if close fails + * + * @throws Exception If the close fails. */ void close() throws Exception; + /** + * Removes the specified message from the store in the given transactional store context. + * + * @param storeContext The transactional context to remove the message in. + * @param messageId Identifies the message to remove. + * + * @throws AMQException If the operation fails for any reason. + */ void removeMessage(StoreContext storeContext, Long messageId) throws AMQException; + /** + * Makes the specified exchange persistent. + * + * @param exchange The exchange to persist. + * + * @throws AMQException If the operation fails for any reason. + */ void createExchange(Exchange exchange) throws AMQException; + /** + * Removes the specified persistent exchange. + * + * @param exchange The exchange to remove. + * + * @throws AMQException If the operation fails for any reason. + */ void removeExchange(Exchange exchange) throws AMQException; + /** + * Binds the specified queue to an exchange with a routing key. + * + * @param exchange The exchange to bind to. + * @param routingKey The routing key to bind by. + * @param queue The queue to bind. + * @param args Additional parameters. + * + * @throws AMQException If the operation fails for any reason. + */ void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + /** + * Unbinds the specified from an exchange under a particular routing key. + * + * @param exchange The exchange to unbind from. + * @param routingKey The routing key to unbind. + * @param queue The queue to unbind. + * @param args Additonal parameters. + * + * @throws AMQException If the operation fails for any reason. + */ void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; - + /** + * Makes the specified queue persistent. + * + * @param queue The queue to store. + * + * @throws AMQException If the operation fails for any reason. + */ void createQueue(AMQQueue queue) throws AMQException; + /** + * Removes the specified queue from the persistent store. + * + * @param name The queue to remove. + * + * @throws AMQException If the operation fails for any reason. + */ void removeQueue(AMQShortString name) throws AMQException; + /** + * Places a message onto a specified queue, in a given transactional context. + * + * @param context The transactional context for the operation. + * @param name The name of the queue to place the message on. + * @param messageId The message to enqueue. + * + * @throws AMQException If the operation fails for any reason. + */ void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException; + /** + * Extracts a message from a specified queue, in a given transactional context. + * + * @param context The transactional context for the operation. + * @param name The name of the queue to take the message from. + * @param messageId The message to dequeue. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException; + /** + * Begins a transactional context. + * + * @param context The transactional context to begin. + * + * @throws AMQException If the operation fails for any reason. + */ void beginTran(StoreContext context) throws AMQException; + /** + * Commits all operations performed within a given transactional context. + * + * @param context The transactional context to commit all operations for. + * + * @throws AMQException If the operation fails for any reason. + */ void commitTran(StoreContext context) throws AMQException; + /** + * Abandons all operations performed within a given transactional context. + * + * @param context The transactional context to abandon. + * + * @throws AMQException If the operation fails for any reason. + */ void abortTran(StoreContext context) throws AMQException; + /** + * Tests a transactional context to see if it has been begun but not yet committed or aborted. + * + * @param context The transactional context to test. + * + * @return true if the transactional context is live, false otherwise. + */ boolean inTran(StoreContext context); /** * Return a valid, currently unused message id. - * @return a message id + * + * @return A fresh message id. */ Long getNewMessageId(); - void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException; + /** + * Stores a chunk of message data. + * + * @param context The transactional context for the operation. + * @param messageId The message to store the data for. + * @param index The index of the data chunk. + * @param contentBody The content of the data chunk. + * @param lastContentBody Flag to indicate that this is the last such chunk for the message. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ + void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, + boolean lastContentBody) throws AMQException; + /** + * Stores message meta-data. + * + * @param context The transactional context for the operation. + * @param messageId The message to store the data for. + * @param messageMetaData The message meta data to store. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException; + /** + * Retrieves message meta-data. + * + * @param context The transactional context for the operation. + * @param messageId The message to get the meta-data for. + * + * @return The message meta data. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; + /** + * Retrieves a chunk of message data. + * + * @param context The transactional context for the operation. + * @param messageId The message to get the data chunk for. + * @param index The offset index of the data chunk within the message. + * + * @return A chunk of message data. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index 2e2f2ba7d6..3ee49d58cf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -22,16 +22,14 @@ package org.apache.qpid.server.store; import org.apache.log4j.Logger; - /** * A context that the store can use to associate with a transactional context. For example, it could store * some kind of txn id. - * + * * @author Apache Software Foundation */ public class StoreContext { - private static final Logger _logger = Logger.getLogger(StoreContext.class); private String _name; @@ -54,7 +52,17 @@ public class StoreContext public void setPayload(Object payload) { - _logger.debug("["+_name+"] Setting payload: " + payload); + _logger.debug("public void setPayload(Object payload = " + payload + "): called"); _payload = payload; } + + /** + * Prints out the transactional context as a string, mainly for debugging purposes. + * + * @return The transactional context as a string. + */ + public String toString() + { + return "<_name = " + _name + ", _payload = " + _payload + ">"; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java index 6173780aa7..9051d6b470 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java @@ -143,8 +143,8 @@ public class CommandLineParser String[] nextOptionSpec = config[i]; addOption(nextOptionSpec[0], nextOptionSpec[1], (nextOptionSpec.length > 2) ? nextOptionSpec[2] : null, - (nextOptionSpec.length > 3) ? ("true".equals(nextOptionSpec[3]) ? true : false) : false, - (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null); + (nextOptionSpec.length > 3) ? ("true".equals(nextOptionSpec[3]) ? true : false) : false, + (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null); } } @@ -209,8 +209,9 @@ public class CommandLineParser // Print usage on each of the command line options. for (CommandLineOption optionInfo : optionMap.values()) { - result += optionInfo.option + " " + ((optionInfo.argument != null) ? (optionInfo.argument + " ") : "") - + optionInfo.comment + "\n"; + result += + optionInfo.option + " " + ((optionInfo.argument != null) ? (optionInfo.argument + " ") : "") + + optionInfo.comment + "\n"; } return result; @@ -603,6 +604,37 @@ public class CommandLineParser } } + /** + * Extracts all name=value pairs from the command line, sets them all as system properties and also returns + * a map of properties containing them. + * + * @param args The command line. + * + * @return A set of properties containing all name=value pairs from the command line. + */ + public static Properties processCommandLine(String[] args, CommandLineParser commandLine) + { + // Capture the command line arguments or display errors and correct usage and then exit. + Properties options = null; + + try + { + options = commandLine.parseCommandLine(args); + + // Add all the trailing command line options (name=value pairs) to system properties. They may be picked up + // from there. + commandLine.addCommandLineToSysProperties(); + } + catch (IllegalArgumentException e) + { + System.out.println(commandLine.getErrors()); + System.out.println(commandLine.getUsage()); + System.exit(1); + } + + return options; + } + /** * Holds information about a command line options. This includes what its name is, whether or not it is a flag, * whether or not it is mandatory, what its user comment is, what its argument reminder text is and what its @@ -646,7 +678,7 @@ public class CommandLineParser * @param formatRegexp The regular expression that the argument to this option must meet to be valid. */ public CommandLineOption(String option, boolean expectsArgs, String comment, String argument, boolean mandatory, - String formatRegexp) + String formatRegexp) { this.option = option; this.expectsArgs = expectsArgs; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java index ba79a6e8d4..3c8d3f916b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -158,4 +158,40 @@ public class FileUtils return is; } + + /** + * Copies the specified source file to the specified destintaion file. If the destinationst file does not exist, + * it is created. + * + * @param src The source file name. + * @param dst The destination file name. + */ + public static void copy(File src, File dst) + { + try + { + InputStream in = new FileInputStream(src); + if (!dst.exists()) + { + dst.createNewFile(); + } + + OutputStream out = new FileOutputStream(dst); + + // Transfer bytes from in to out + byte[] buf = new byte[1024]; + int len; + while ((len = in.read(buf)) > 0) + { + out.write(buf, 0, len); + } + + in.close(); + out.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java index 77526141d6..9439604acd 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -35,6 +35,7 @@ import javax.jms.Message; import org.apache.log4j.Logger; import org.apache.qpid.requestreply.PingPongProducer; +import org.apache.qpid.util.CommandLineParser; import uk.co.thebadgerset.junit.extensions.util.MathUtils; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; @@ -71,6 +72,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * uniqueDests false Prevents destination names being timestamped. * transacted true Only makes sense to test with transactions. * persistent true Only makes sense to test persistent. + * durableDests true Should use durable queues with persistent messages. * commitBatchSize 10 * rate 20 Total default test time is 5 seconds. * @@ -108,6 +110,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); defaults.setProperty(RATE_PROPNAME, "20"); + defaults.setProperty(DURABLE_DESTS_PROPNAME, "true"); } /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */ @@ -150,7 +153,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList try { // Create a ping producer overriding its defaults with all options passed on the command line. - Properties options = processCommandLine(args); + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {})); PingDurableClient pingProducer = new PingDurableClient(options); // Create a shutdown hook to terminate the ping-pong producer. diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 44f7083bb5..913685bca2 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -35,13 +35,10 @@ import javax.jms.*; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.*; import org.apache.qpid.client.message.TestMessageFactory; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; import org.apache.qpid.url.URLSyntaxException; @@ -90,6 +87,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * timeout 30000 In milliseconds. The timeout to stop waiting for replies. * commitBatchSize 1 The number of messages per transaction in transactional mode. * uniqueDests true Whether each receiver only listens to one ping destination or all. + * durableDests false Whether or not durable destinations are used. * ackMode AUTO_ACK The message acknowledgement mode. Possible values are: * 0 - SESSION_TRANSACTED * 1 - AUTO_ACKNOWLEDGE @@ -257,6 +255,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Defines the default value for the unique destinations property. */ public static final boolean UNIQUE_DESTS_DEFAULT = true; + public static final String DURABLE_DESTS_PROPNAME = "durableDests"; + public static final boolean DURABLE_DESTS_DEFAULT = false; + /** Holds the name of the proeprty to get the message acknowledgement mode from. */ public static final String ACK_MODE_PROPNAME = "ackMode"; @@ -299,6 +300,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); + defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); @@ -337,6 +339,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Flag used to indicate if the destinations should be unique client. */ protected boolean _isUnique; + /** Flag used to indicate that durable destination should be used. */ + protected boolean _isDurable; + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ protected boolean _failBeforeCommit; @@ -424,6 +429,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** The prompt to display when asking the user to kill the broker for failover testing. */ private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; + private String _clientID; /** * Creates a ping producer with the specified parameters, of which there are many. See the class level comments @@ -463,6 +469,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _rate = properties.getPropertyAsInteger(RATE_PROPNAME); _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); + _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME); _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME); @@ -498,10 +505,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Generate a unique identifying name for this client, based on it ip address and the current time. InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); + _clientID = address.getHostName() + System.currentTimeMillis(); // Create a connection to the broker. - createConnection(clientID); + createConnection(_clientID); // Create transactional or non-transactional sessions, based on the command line arguments. _producerSession = (Session) getConnection().createSession(_transacted, _ackMode); @@ -509,7 +516,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Create the destinations to send pings to and receive replies from. _replyDestination = _consumerSession.createTemporaryQueue(); - createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique); + createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); // Create the message producer only if instructed to. if (producer) @@ -548,7 +555,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { try { - Properties options = processCommandLine(args); + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {})); // Create a ping producer overriding its defaults with all options passed on the command line. PingPongProducer pingProducer = new PingPongProducer(options); @@ -576,43 +583,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } } - /** - * Extracts all name=value pairs from the command line, sets them all as system properties and also returns - * a map of properties containing them. - * - * @param args The command line. - * - * @return A set of properties containing all name=value pairs from the command line. - * - * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the - * CommandLineParser class. - */ - protected static Properties processCommandLine(String[] args) - { - // Use the command line parser to evaluate the command line. - CommandLineParser commandLine = new CommandLineParser(new String[][] {}); - - // Capture the command line arguments or display errors and correct usage and then exit. - Properties options = null; - - try - { - options = commandLine.parseCommandLine(args); - - // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up - // overridden values from there. - commandLine.addCommandLineToSysProperties(); - } - catch (IllegalArgumentException e) - { - System.out.println(commandLine.getErrors()); - System.out.println(commandLine.getUsage()); - System.exit(1); - } - - return options; - } - /** * Convenience method for a short pause. * @@ -677,11 +647,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * * @throws JMSException Any JMSExceptions are allowed to fall through. */ - public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique) - throws JMSException + public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, + boolean durable) throws JMSException, AMQException { log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " - + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called"); + + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " + + durable + "): called"); _pingDestinations = new ArrayList(); @@ -709,13 +680,30 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Check if this is a pub/sub pinger, in which case create topics. if (_isPubSub) { - destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id); - log.debug("Created topic " + destination); + if (!durable) + { + destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id); + log.debug("Created non-durable topic " + destination); + } + else + { + destination = + AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), + _clientID, (AMQConnection) _connection); + log.debug("Created durable topic " + destination); + } } // Otherwise this is a p2p pinger, in which case create queues. else { - destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id); + AMQShortString destinationName = new AMQShortString(rootName + id); + destination = + new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false, + _isDurable); + ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false); + ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null, + ExchangeDefaults.DIRECT_EXCHANGE_NAME); + log.debug("Created queue " + destination); } -- cgit v1.2.1