From 4e1da8706dc8302caa7f97e29ac7cc33a60b5bb3 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 25 Jan 2007 12:46:22 +0000 Subject: (Submitted by Rupert Smith) Class has been documented. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499764 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/ping/AbstractPingClient.java | 10 +- .../org/apache/qpid/ping/AbstractPingProducer.java | 256 +++++++++++++++------ 2 files changed, 191 insertions(+), 75 deletions(-) (limited to 'qpid/java/perftests/src') diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java index c1dd5b18ad..97b411323e 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java @@ -45,8 +45,8 @@ import org.apache.qpid.jms.Session; * Keep track of p2p or topic ping type. * * - * @todo This base class does not seem particularly usefull and some methods are duplicated in {@link AbstractPingProducer}, - * consider merging it into that class. + * @todo This base class does not seem particularly usefull and all functionality is duplicated in {@link AbstractPingProducer}. + * Merge it into that class. */ public abstract class AbstractPingClient { @@ -175,11 +175,11 @@ public abstract class AbstractPingClient /** * Sets the connection that this ping client is using. * - * @param _connection The ping connection. + * @param connection The ping connection. */ - public void setConnection(AMQConnection _connection) + public void setConnection(AMQConnection connection) { - this._connection = _connection; + this._connection = connection; } /** diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java index debaa0d785..091a865473 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.ping; import java.io.IOException; @@ -13,6 +33,7 @@ import javax.jms.Message; import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; @@ -21,84 +42,105 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Session; /** - * This abstract class captures functionality that is common to all ping producers. It provides functionality to - * manage a session, and a convenience method to commit a transaction on the session. It also provides a framework - * for running a ping loop, and terminating that loop on exceptions or a shutdown handler. - *

+ * Provides common functionality that ping producers (the senders of ping messages) can use. This base class keeps + * track of the connection used to send pings; provides a convenience method to commit a transaction only when a session + * to commit on is transactional; keeps track of whether the ping client is pinging to a queue or a topic; provides + * prompts to the console to terminate brokers before and after commits, in order to test failover functionality; + * requires sub-classes to implement a ping loop, that this provides a run loop to repeatedly call; provides a + * default shutdown hook to cleanly terminate the run loop; keeps track of the destinations to send pings to; + * provides a convenience method to generate short pauses; and provides a convience formatter for outputing readable + * timestamps for pings. + * *

*
CRC Card
Responsibilities Collaborations - *
Manage the connection. - *
Provide clean shutdown on exception or shutdown hook. - *
Provide useable shutdown hook implementation. - *
Run a ping loop. + *
Commit the current transcation on a session. + *
Generate failover promts. + *
Keep track the connection. + *
Keep track of p2p or topic ping type. + *
Call ping loop to repeatedly send pings. + *
Provide a shutdown hook. + *
Generate short pauses. *
* - * @author Rupert Smith + * @todo Destination count versus list of desintations is redundant. Use _destinions.size() to get the count and + * use a list of 1 destination when only 1 is needed. It is only important to distinguish when 1 destination + * is shared between multiple ping producers on the same JVM or if each ping producer has its own single + * destination. + * + * @todo Timestamp messages in nanos, not millis. Millis seems to have bad resolution, at least on windows. */ public abstract class AbstractPingProducer implements Runnable, ExceptionListener { private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class); - /** tells if the test is being done for pubsub or p2p */ + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ private boolean _isPubSub = false; - /** - * Used to format time stamping output. - */ - protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - /** This id generator is used to generate ids to append to the queue name to ensure that queues are unique. */ - private static AtomicInteger _queueSequenceID = new AtomicInteger(); + /** A convenient formatter to use when time stamping output. */ + protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); /** - * Used to tell the ping loop when to terminate, it only runs while this is true. + * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when + * creating multiple ping producers in the same JVM. */ + private static AtomicInteger _queueSequenceID = new AtomicInteger(); + + /** Used to tell the ping loop when to terminate, it only runs while this is true. */ protected boolean _publish = true; - /** - * Holds the connection handle to the broker. - */ + /** Holds the connection to the broker. */ private Connection _connection; - /** - * Holds the producer session, need to create test messages. - */ + /** Holds the producer session, needed to create ping messages. */ private Session _producerSession; - /** - * Holds the number of destinations for multiple-destination test. By default it will be 1 - */ + /** Holds the number of destinations that this ping producer will send pings to, defaulting to a single destination. */ protected int _destinationCount = 1; - /** list of all the destinations for multiple-destinations test */ + /** Holds the set of destiniations that this ping producer pings. */ private List _destinations = new ArrayList(); - /** - * Holds the message producer to send the pings through. - */ + /** Holds the message producer to send the pings through. */ protected org.apache.qpid.jms.MessageProducer _producer; + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ protected boolean _failBeforeCommit = false; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ protected boolean _failAfterCommit = false; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ protected boolean _failBeforeSend = false; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ protected boolean _failAfterSend = false; + + /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ protected boolean _failOnce = true; /** Holds the number of sends that should be performed in every transaction when using transactions. */ protected int _txBatchSize = 1; /** - * Sets the test for pubsub or p2p. - * @param value + * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. + * + * @param pubsub true if this client is pinging a topic, false if it is pinging a queue. */ - public void setPubSub(boolean value) + public void setPubSub(boolean pubsub) { - _isPubSub = value; + _isPubSub = pubsub; } + /** + * Checks whether this client is a p2p or pub/sub ping client. + * + * @return true if this client is pinging a topic, false if it is pinging a queue. + */ public boolean isPubSub() { return _isPubSub; } + /** * Convenience method for a short pause. * @@ -124,10 +166,11 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene public abstract void pingLoop(); /** - * Generates a test message of the specified size. + * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. * * @param replyQueue The reply-to destination for the message. * @param messageSize The desired size of the message in bytes. + * @param persistent true if the message should use persistent delivery, false otherwise. * * @return A freshly generated test message. * @@ -138,6 +181,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); // Timestamp the message. msg.setLongProperty("timestamp", System.currentTimeMillis()); + return msg; } @@ -191,85 +235,119 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene }); } + /** + * Gets the underlying connection that this ping client is running on. + * + * @return The underlying connection that this ping client is running on. + */ public Connection getConnection() { return _connection; } + /** + * Sets the connection that this ping client is using. + * + * @param connection The ping connection. + */ public void setConnection(Connection connection) { this._connection = connection; } + /** + * Gets the producer session that the ping client is using to send pings on. + * + * @return The producer session that the ping client is using to send pings on. + */ public Session getProducerSession() { return _producerSession; } + /** + * Keeps track of the producer session that the ping client is using to send pings on. + * + * @param session The producer session that the ping client is using to send pings on. + */ public void setProducerSession(Session session) { this._producerSession = session; } + /** + * Gets the number of destinations that this ping client is sending to. + * + * @return The number of destinations that this ping client is sending to. + */ public int getDestinationsCount() { return _destinationCount; } + /** + * Sets the number of destination that this ping client should send to. + * + * @param count The number of destination that this ping client should send to. + * + * @deprectaed Use _destinations.size() instead. + */ public void setDestinationsCount(int count) { this._destinationCount = count; } + /** + * Commits the transaction on the producer session. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * + * @deprecated Use the commitTx(Session session) method instead, to explicitly specify which session is being + * committed. This makes it more obvious what is going on. + */ protected void commitTx() throws JMSException { commitTx(getProducerSession()); } /** - * Creates destinations dynamically and adds to the destinations list for multiple-destinations test - * @param count + * Creates the specified number of destinations to send pings to. Topics or Queues will be created depending on + * the value of the {@link #_isPubSub} flag. + * + * @param count The number of ping destinations to create. */ protected void createDestinations(int count) { - if (isPubSub()) - { - createTopics(count); - } - else - { - createQueues(count); - } - } - - private void createQueues(int count) - { + // Create the desired number of ping destinations. for (int i = 0; i < count; i++) { - AMQShortString name = - new AMQShortString("AMQQueue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis()); - AMQQueue queue = new AMQQueue(name, name, false, false, false); - - _destinations.add(queue); - } - } + AMQDestination destination = null; - private void createTopics(int count) - { - for (int i = 0; i < count; i++) - { - AMQShortString name = - new AMQShortString("AMQTopic_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis()); - AMQTopic topic = new AMQTopic(name); + // Check if this is a pub/sub pinger, in which case create topics. + if (isPubSub()) + { + AMQShortString name = + new AMQShortString("AMQTopic_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis()); + destination = new AMQTopic(name); + } + // Otherwise this is a p2p pinger, in which case create queues. + else + { + AMQShortString name = + new AMQShortString("AMQQueue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis()); + destination = new AMQQueue(name, name, false, false, false); + } - _destinations.add(topic); + _destinations.add(destination); } } /** - * Returns the destination from the destinations list with given index. This is for multiple-destinations test - * @param index - * @return Destination with given index + * Returns the destination from the destinations list with the given index. + * + * @param index The index of the destination to get. + * + * @return Destination with the given index. */ protected Destination getDestination(int index) { @@ -277,8 +355,21 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene } /** - * Convenience method to commit the transaction on the session associated with this pinger. + * Convenience method to commit the transaction on the specified session. If the session to commit on is not + * a transactional session, this method does nothing (unless the failover after send flag is set). + * + *

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit + * is applied. This flag applies whether the pinger is transactional or not. + * + *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the + * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker + * after the commit is applied. These flags will only apply if using a transactional pinger. + * * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + * + * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit + * method, because commits only apply to transactional pingers, but fail after send applied to transactional + * and non-transactional alike. */ protected void commitTx(Session session) throws JMSException { @@ -351,11 +442,27 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene } } + /** + * Sends the specified message to the default destination of the ping producer. + * + * @param message The message to send. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ protected void sendMessage(Message message) throws JMSException { sendMessage(null, message); } + /** + * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination + * of the ping producer. If an explicit destination is set, this overrides the default. + * + * @param destination The destination to send to. + * @param message The message to send. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ protected void sendMessage(Destination destination, Message message) throws JMSException { if (_failBeforeSend) @@ -376,9 +483,15 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene else { _producer.send(destination, message); - } + } } + /** + * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + * + * @param broker The name of the broker to terminate. + */ protected void doFailover(String broker) { System.out.println("Kill Broker " + broker + " now then press return"); @@ -392,6 +505,10 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene System.out.println("Continuing."); } + /** + * Prompts the user to terminate the broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + */ protected void doFailover() { System.out.println("Kill Broker now then press return"); @@ -403,6 +520,5 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene { } System.out.println("Continuing."); - } } -- cgit v1.2.1