diff options
| author | Keith Wall <kwall@apache.org> | 2012-07-05 14:56:45 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-07-05 14:56:45 +0000 |
| commit | c47834db296c0fda1bc97c662412a6310d788a5a (patch) | |
| tree | 7dddf13443321d755e44fa7e2e4f6dd706ec51e0 /qpid/java/perftests/src | |
| parent | 8b555d057f483874d9384c15bd989c975d18e0b0 (diff) | |
| download | qpid-python-c47834db296c0fda1bc97c662412a6310d788a5a.tar.gz | |
QPID-4110 added topic support to performance tests.
Specifically:
- Added support for creating and tearing down durable subscriptions
- Improved IterationValueTest so that we test the format for boolean properties (such as durableSubscription)
- Added test and chart definitions
Applied patch from Philip Harvey <phil@philharveyonline.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1357650 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src')
6 files changed, 119 insertions, 36 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java index 2478b49cfd..0d5457c992 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java @@ -176,7 +176,7 @@ public class Client { LOGGER.debug("Tearing down test on client: " + _clientJmsDelegate.getClientName()); - _clientJmsDelegate.closeTestConnections(); + _clientJmsDelegate.tearDownTest(); } else { diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java index cffc2b7c50..45a4551cbc 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java @@ -38,7 +38,6 @@ public class QueueConfig public QueueConfig(String name, boolean durable, Map<String, Object> attributes) { - super(); this._name = name; this._durable = durable; this._attributes = attributes; @@ -49,8 +48,6 @@ public class QueueConfig return _name; } - // TODO x-qpid-capacity and x-qpid-flow-resume-capacity need to be typed as numeric but we currrently - // pass these as a string. public Map<String, Object> getAttributes() { return _attributes; diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java index f1dd911f0b..15c4f5f70f 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java @@ -35,6 +35,7 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import javax.naming.Context; import javax.naming.NamingException; @@ -72,6 +73,7 @@ public class ClientJmsDelegate private Map<String, Session> _testSessions; private Map<String, MessageProducer> _testProducers; private Map<String, MessageConsumer> _testConsumers; + private Map<String, Session> _testSubscriptions; private Map<String, MessageProvider> _testMessageProviders; private final MessageProvider _defaultMessageProvider; @@ -92,6 +94,7 @@ public class ClientJmsDelegate _testSessions = new HashMap<String, Session>(); _testProducers = new HashMap<String, MessageProducer>(); _testConsumers = new HashMap<String, MessageConsumer>(); + _testSubscriptions = new HashMap<String, Session>(); _testMessageProviders = new HashMap<String, MessageProvider>(); _defaultMessageProvider = new MessageProvider(null); } @@ -255,9 +258,31 @@ public class ClientJmsDelegate synchronized(session) { - final Destination destination = command.isTopic() ? session.createTopic(command.getDestinationName()) - : session.createQueue(command.getDestinationName()); - final MessageConsumer jmsConsumer = session.createConsumer(destination, command.getSelector()); + Destination destination; + MessageConsumer jmsConsumer; + if(command.isTopic()) + { + Topic topic = session.createTopic(command.getDestinationName()); + if(command.isDurableSubscription()) + { + String subscription = "subscription-" + command.getParticipantName() + System.currentTimeMillis(); + jmsConsumer = session.createDurableSubscriber(topic, subscription); + + _testSubscriptions.put(subscription, session); + LOGGER.debug("created durable suscription " + subscription + " to topic " + topic); + } + else + { + jmsConsumer = session.createConsumer(topic, command.getSelector()); + } + + destination = topic; + } + else + { + destination = session.createQueue(command.getDestinationName()); + jmsConsumer = session.createConsumer(destination, command.getSelector()); + } _testConsumers.put(command.getParticipantName(), jmsConsumer); } @@ -521,36 +546,59 @@ public class ClientJmsDelegate return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("clientName", _clientName).toString(); } - public void closeTestConnections() + public void tearDownTest() { StringBuilder jmsErrorMessages = new StringBuilder(); - int failedCloseCounter = 0; - for (final Map.Entry<String, Connection> entry : _testConnections.entrySet()) + int failureCounter = 0; + + for(String subscription : _testSubscriptions.keySet()) { - final Connection connection = entry.getValue(); + Session session = _testSubscriptions.get(subscription); + try + { + session.unsubscribe(subscription); + } + catch (JMSException e) + { + LOGGER.error("Failed to unsubscribe '" + subscription + "' :" + e.getLocalizedMessage(), e); + failureCounter++; + appendErrorMessage(jmsErrorMessages, e); + } + } + + for (Map.Entry<String, Connection> entry : _testConnections.entrySet()) + { + Connection connection = entry.getValue(); try { connection.close(); } - catch (final JMSException e) + catch (JMSException e) { LOGGER.error("Failed to close connection '" + entry.getKey() + "' :" + e.getLocalizedMessage(), e); - failedCloseCounter++; - if (jmsErrorMessages.length() > 0) - { - jmsErrorMessages.append('\n'); - } - jmsErrorMessages.append(e.getMessage()); + failureCounter++; + appendErrorMessage(jmsErrorMessages, e); } } + _testConnections.clear(); _testSessions.clear(); _testProducers.clear(); _testConsumers.clear(); - if (failedCloseCounter > 0) + + if (failureCounter > 0) + { + throw new DistributedTestException("Tear down test encountered " + failureCounter + " failures with the following errors: " + jmsErrorMessages.toString()); + } + } + + private void appendErrorMessage(StringBuilder errorMessages, JMSException e) + { + if (errorMessages.length() > 0) { - throw new DistributedTestException("Close failed for " + failedCloseCounter + " connection(s) with the following errors: " + jmsErrorMessages.toString()); + errorMessages.append('\n'); } + errorMessages.append(e.getMessage()); } public void closeTestConsumer(String consumerName) diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java index dde717c71b..4dcabe6c7b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java @@ -25,6 +25,7 @@ import java.util.TreeSet; import org.apache.qpid.disttest.message.ConsumerParticipantResult; import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.message.ProducerParticipantResult; public class ParticipantResultAggregator { @@ -43,6 +44,9 @@ public class ParticipantResultAggregator private NavigableSet<Integer> _encounteredIterationNumbers = new TreeSet<Integer>(); private NavigableSet<Integer> _encounteredBatchSizes = new TreeSet<Integer>(); private NavigableSet<Integer> _encounteredAcknowledgeMode = new TreeSet<Integer>(); + private NavigableSet<Integer> _encounteredDeliveryModes = new TreeSet<Integer>(); + private NavigableSet<Boolean> _encounteredDurableSubscriptions = new TreeSet<Boolean>(); + private NavigableSet<Boolean> _encounteredTopics = new TreeSet<Boolean>(); private NavigableSet<String> _encountedTestNames = new TreeSet<String>(); private SeriesStatistics _latencyStatistics = new SeriesStatistics(); @@ -116,6 +120,17 @@ public class ParticipantResultAggregator _encounteredIterationNumbers.add(result.getIterationNumber()); _encounteredBatchSizes.add(result.getBatchSize()); _encounteredAcknowledgeMode.add(result.getAcknowledgeMode()); + if (result instanceof ProducerParticipantResult) + { + ProducerParticipantResult producerParticipantResult = (ProducerParticipantResult) result; + _encounteredDeliveryModes.add(producerParticipantResult.getDeliveryMode()); + } + else if(result instanceof ConsumerParticipantResult) + { + ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult)result; + _encounteredDurableSubscriptions.add(consumerParticipantResult.isDurableSubscription()); + _encounteredTopics.add(consumerParticipantResult.isTopic()); + } } private void setComputedVariableAttributes(ParticipantResult aggregatedResult) @@ -151,6 +166,26 @@ public class ParticipantResultAggregator { aggregatedResult.setAcknowledgeMode(_encounteredAcknowledgeMode.first()); } + if (aggregatedResult instanceof ProducerParticipantResult) + { + ProducerParticipantResult producerParticipantResult = (ProducerParticipantResult) aggregatedResult; + if(_encounteredDeliveryModes.size() == 1) + { + producerParticipantResult.setDeliveryMode(_encounteredDeliveryModes.first()); + } + } + if (aggregatedResult instanceof ConsumerParticipantResult) + { + ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult) aggregatedResult; + if(_encounteredDurableSubscriptions.size() == 1) + { + consumerParticipantResult.setDurableSubscription(_encounteredDurableSubscriptions.first()); + } + if(_encounteredTopics.size() == 1) + { + consumerParticipantResult.setTopic(_encounteredTopics.first()); + } + } } private double calculateThroughputInKiloBytesPerSecond() diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java index 198baa6ef4..dd50766918 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java @@ -125,7 +125,7 @@ public class ClientTest extends TestCase _client.tearDownTest(); - verify(_delegate).closeTestConnections(); + verify(_delegate).tearDownTest(); verify(_participantRegistry).clear(); } diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java index 7998eae37e..860f6af565 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java @@ -22,19 +22,19 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; -import junit.framework.TestCase; - +import org.apache.qpid.disttest.message.CreateConnectionCommand; import org.apache.qpid.disttest.message.CreateConsumerCommand; -import org.apache.qpid.disttest.message.CreateProducerCommand; +import org.apache.qpid.test.utils.QpidTestCase; -public class IterationValueTest extends TestCase +public class IterationValueTest extends QpidTestCase { - private static final int MESSAGE_SIZE = 10; + private static final int MAXIMUM_DURATION = 10; + + private static final boolean IS_DURABLE_SUBSCRIPTION = true; - private CreateProducerCommand _createProducerCommand; private CreateConsumerCommand _createConsumerCommand; private Map<String, String> _iterationValueMap; @@ -42,37 +42,40 @@ public class IterationValueTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _createProducerCommand = mock(CreateProducerCommand.class); _createConsumerCommand = mock(CreateConsumerCommand.class); - _iterationValueMap = Collections.singletonMap("_messageSize", String.valueOf(MESSAGE_SIZE)); + _iterationValueMap = new HashMap<String, String>(); + _iterationValueMap.put("_maximumDuration", String.valueOf(MAXIMUM_DURATION)); + _iterationValueMap.put("_durableSubscription", String.valueOf(IS_DURABLE_SUBSCRIPTION)); } public void testApplyPopulatedIterationValueToCommandWithMatchingProperties() throws Exception { IterationValue iterationValue = new IterationValue(_iterationValueMap); - iterationValue.applyToCommand(_createProducerCommand); + iterationValue.applyToCommand(_createConsumerCommand); - verify(_createProducerCommand).setMessageSize(MESSAGE_SIZE); + verify(_createConsumerCommand).setMaximumDuration(MAXIMUM_DURATION); + verify(_createConsumerCommand).setDurableSubscription(IS_DURABLE_SUBSCRIPTION); } public void testApplyPopulatedIterationValueToCommandWithoutMatchingProperties() throws Exception { IterationValue iterationValue = new IterationValue(_iterationValueMap); - iterationValue.applyToCommand(_createConsumerCommand); + CreateConnectionCommand createConnectionCommand = mock(CreateConnectionCommand.class); + iterationValue.applyToCommand(createConnectionCommand); - verifyZeroInteractions(_createConsumerCommand); + verifyZeroInteractions(createConnectionCommand); } public void testApplyUnpopulatedIterationValueToCommand() throws Exception { IterationValue iterationValue = new IterationValue(); - iterationValue.applyToCommand(_createProducerCommand); + iterationValue.applyToCommand(_createConsumerCommand); - verifyZeroInteractions(_createProducerCommand); + verifyZeroInteractions(_createConsumerCommand); } } |
