diff options
10 files changed, 461 insertions, 52 deletions
diff --git a/qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json b/qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json index d504c486aa..f910fc894d 100644 --- a/qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json +++ b/qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json @@ -5,13 +5,13 @@ "_iterations":[ { "_messageSize": 1024, - "_numberOfMessages": 1000000, + "_maximumDuration": 30000, "_acknowledgeMode": 1, "_deliveryMode": 1 }, { "_messageSize": 1024, - "_numberOfMessages": 250000, + "_maximumDuration": 30000, "_acknowledgeMode": 0, "_deliveryMode": 1 } @@ -74,13 +74,13 @@ "_iterations":[ { "_messageSize": 1024, - "_numberOfMessages": 100000, + "_maximumDuration": 30000, "_acknowledgeMode": 1, "_deliveryMode": 2 }, { "_messageSize": 1024, - "_numberOfMessages": 25000, + "_maximumDuration": 30000, "_acknowledgeMode": 0, "_deliveryMode": 2 } diff --git a/qpid/java/perftests/etc/testdefs/short/MessageSize.json b/qpid/java/perftests/etc/testdefs/short/MessageSize.json index 3354d72d3e..693e418291 100644 --- a/qpid/java/perftests/etc/testdefs/short/MessageSize.json +++ b/qpid/java/perftests/etc/testdefs/short/MessageSize.json @@ -5,47 +5,47 @@ "_iterations":[ { "_messageSize": 256, - "_numberOfMessages": 1000000 + "_maximumDuration": 30000 }, { "_messageSize": 512, - "_numberOfMessages": 500000 + "_maximumDuration": 30000 }, { "_messageSize": 1024, - "_numberOfMessages": "250000" + "_maximumDuration": 30000 }, { "_messageSize": 2048, - "_numberOfMessages": "125000" + "_maximumDuration": 30000 }, { "_messageSize": 4096, - "_numberOfMessages": "62500" + "_maximumDuration": 30000 }, { "_messageSize": 8192, - "_numberOfMessages": 31250 + "_maximumDuration": 30000 }, { "_messageSize": 16384, - "_numberOfMessages": 15625 + "_maximumDuration": 30000 }, { "_messageSize": 32768, - "_numberOfMessages": 7812 + "_maximumDuration": 30000 }, { "_messageSize": 65536, - "_numberOfMessages": 3906 + "_maximumDuration": 30000 }, { "_messageSize": 131072, - "_numberOfMessages": 195 + "_maximumDuration": 30000 }, { "_messageSize": 262144, - "_numberOfMessages": 97 + "_maximumDuration": 30000 } ], "_queues":[ @@ -109,47 +109,47 @@ "_iterations":[ { "_messageSize": 256, - "_numberOfMessages": 20000 + "_maximumDuration": 30000 }, { "_messageSize": 512, - "_numberOfMessages": 20000 + "_maximumDuration": 30000 }, { "_messageSize": 1024, - "_numberOfMessages": 20000 + "_maximumDuration": 30000 }, { "_messageSize": 2048, - "_numberOfMessages": 20000 + "_maximumDuration": 30000 }, { "_messageSize": 4096, - "_numberOfMessages": 4000 + "_maximumDuration": 30000 }, { "_messageSize": 8192, - "_numberOfMessages": 4000 + "_maximumDuration": 30000 }, { "_messageSize": 16384, - "_numberOfMessages": 4000 + "_maximumDuration": 30000 }, { "_messageSize": 32768, - "_numberOfMessages": 2000 + "_maximumDuration": 30000 }, { "_messageSize": 65536, - "_numberOfMessages": 2000 + "_maximumDuration": 30000 }, { "_messageSize": 131072, - "_numberOfMessages": 2000 + "_maximumDuration": 30000 }, { "_messageSize": 262144, - "_numberOfMessages": 2000 + "_maximumDuration": 30000 } ], "_queues":[ diff --git a/qpid/java/perftests/etc/testdefs/short/QueueTypes.json b/qpid/java/perftests/etc/testdefs/short/QueueTypes.json index 2237e8d8b9..427f3d9795 100644 --- a/qpid/java/perftests/etc/testdefs/short/QueueTypes.json +++ b/qpid/java/perftests/etc/testdefs/short/QueueTypes.json @@ -16036,7 +16036,7 @@ "_destinationName": "direct://amq.direct//sorted-queue?durable='true'", "_deliveryMode": 2, "_messageSize": 1024, - "_numberOfMessages": 20000, + "_maximumDuration": 30000, "_messageProviderName": "messageProvider" } ] @@ -16059,7 +16059,7 @@ { "_name": "Consumer1", "_destinationName": "direct://amq.direct//sorted-queue?durable='true'", - "_numberOfMessages": 20000 + "_maximumDuration": 30000 } ] } @@ -16094,7 +16094,7 @@ "_destinationName": "direct://amq.direct//simple-queue?durable='true'", "_deliveryMode": 2, "_messageSize": 1024, - "_numberOfMessages": 20000 + "_maximumDuration": 30000 } ] } @@ -16116,7 +16116,7 @@ { "_name": "Consumer1", "_destinationName": "direct://amq.direct//simple-queue?durable='true'", - "_numberOfMessages": 20000 + "_maximumDuration": 30000 } ] } @@ -16161,7 +16161,7 @@ "_destinationName": "direct://amq.direct//priority-queue?durable='true'", "_deliveryMode": 2, "_messageSize": 1024, - "_numberOfMessages": 20000, + "_maximumDuration": 30000, "_messageProviderName": "messageProvider" } ] @@ -16184,7 +16184,7 @@ { "_name": "Consumer1", "_destinationName": "direct://amq.direct//priority-queue?durable='true'", - "_numberOfMessages": 20000 + "_maximumDuration": 30000 } ] } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java index bcad81b4aa..49f6ae103c 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java @@ -22,10 +22,14 @@ package org.apache.qpid.disttest.client; import java.util.Date; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import javax.jms.Message; import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.client.utils.ExecutorWithLimits; +import org.apache.qpid.disttest.client.utils.ExecutorWithLimitsFactory; import org.apache.qpid.disttest.jms.ClientJmsDelegate; import org.apache.qpid.disttest.message.CreateProducerCommand; import org.apache.qpid.disttest.message.ParticipantResult; @@ -40,7 +44,9 @@ public class ProducerParticipant implements Participant private final CreateProducerCommand _command; - private ParticipantResultFactory _resultFactory; + private final ParticipantResultFactory _resultFactory; + + private ExecutorWithLimits _limiter; public ProducerParticipant(final ClientJmsDelegate jmsDelegate, final CreateProducerCommand command) { @@ -59,10 +65,10 @@ public class ProducerParticipant implements Participant int acknowledgeMode = _jmsDelegate.getAcknowledgeMode(_command.getSessionName()); - long expectedDuration = _command.getMaximumDuration() - _command.getStartDelay(); - doSleepForStartDelay(); + final long requiredDuration = _command.getMaximumDuration() - _command.getStartDelay(); + final long startTime = System.currentTimeMillis(); Message lastPublishedMessage = null; @@ -70,11 +76,28 @@ public class ProducerParticipant implements Participant long totalPayloadSizeOfAllMessagesSent = 0; NavigableSet<Integer> allProducedPayloadSizes = new TreeSet<Integer>(); + _limiter = ExecutorWithLimitsFactory.createExecutorWithLimit(startTime, requiredDuration); + while (true) { - numberOfMessagesSent++; + try + { + lastPublishedMessage = _limiter.execute(new Callable<Message>() + { + @Override + public Message call() throws Exception + { + return _jmsDelegate.sendNextMessage(_command); + } + }); + } + catch (CancellationException ce) + { + LOGGER.trace("Producer send was cancelled due to maximum duration {} ms", requiredDuration); + break; + } - lastPublishedMessage = _jmsDelegate.sendNextMessage(_command); + numberOfMessagesSent++; int lastPayloadSize = _jmsDelegate.calculatePayloadSizeFrom(lastPublishedMessage); totalPayloadSizeOfAllMessagesSent += lastPayloadSize; @@ -96,15 +119,11 @@ public class ProducerParticipant implements Participant } _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName()); - if (_command.getInterval() > 0) - { - // sleep for given time - Thread.sleep(_command.getInterval()); - } + doSleepForInterval(); } if (_command.getNumberOfMessages() > 0 && numberOfMessagesSent >= _command.getNumberOfMessages() - || expectedDuration > 0 && System.currentTimeMillis() - startTime >= expectedDuration) + || requiredDuration > 0 && System.currentTimeMillis() - startTime >= requiredDuration) { break; } @@ -140,23 +159,42 @@ public class ProducerParticipant implements Participant private void doSleepForStartDelay() { - if (_command.getStartDelay() > 0) + long sleepTime = _command.getStartDelay(); + if (sleepTime > 0) { // start delay is specified. Sleeping... - try - { - Thread.sleep(_command.getStartDelay()); - } - catch (final InterruptedException e) - { - Thread.currentThread().interrupt(); - } + doSleep(sleepTime); + } + } + + private void doSleepForInterval() throws InterruptedException + { + long sleepTime = _command.getInterval(); + if (sleepTime > 0) + { + doSleep(sleepTime); + } + } + + private void doSleep(long sleepTime) + { + try + { + Thread.sleep(sleepTime); + } + catch (final InterruptedException e) + { + Thread.currentThread().interrupt(); } } @Override public void releaseResources() { + if (_limiter != null) + { + _limiter.shutdown(); + } _jmsDelegate.closeTestProducer(_command.getParticipantName()); } @@ -171,4 +209,5 @@ public class ProducerParticipant implements Participant { return "ProducerParticipant [command=" + _command + "]"; } + } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java new file mode 100644 index 0000000000..f64107c125 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client.utils; + +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; + +/** + * Implementations of this interface execute a {@link Callable} but place some + * kind of limit on that execution, such as time. + */ +public interface ExecutorWithLimits +{ + <T> T execute(Callable<T> callback) throws CancellationException, Exception; + + void shutdown(); + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java new file mode 100644 index 0000000000..4d17d76568 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client.utils; + +import java.util.concurrent.Callable; + +public class ExecutorWithLimitsFactory +{ + /** + * Creates an {@link ExecutorWithLimits} that will permit the execution of {@link Callable} implementations until + * until <code>allowedTimeInMillis</code> milliseconds have elapsed beyond <code>startTime</code>. + * If <code>allowedTimeInMillis</code> is less than or equal to zero, a {@link ExecutorWithNoLimits} + * is created that enforces no time-limit. + * + * @param startTime start time (milliseconds) + * @param allowedTimeInMillis allowed time (milliseconds) + * + * @return ExecutionLimiter implementation + */ + public static ExecutorWithLimits createExecutorWithLimit(long startTime, long allowedTimeInMillis) + { + if (allowedTimeInMillis > 0) + { + return new ExecutorWithTimeLimit(startTime, allowedTimeInMillis); + } + else + { + return new ExecutorWithNoLimits(); + } + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java new file mode 100644 index 0000000000..f729a72fa5 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client.utils; + +import java.util.concurrent.Callable; + +/** + * Executes a {@link Callable} without any limits. + */ +public class ExecutorWithNoLimits implements ExecutorWithLimits +{ + + @Override + public <T> T execute(Callable<T> _callback) throws Exception + { + return _callback.call(); + } + + @Override + public void shutdown() + { + // Deliberately blank + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java new file mode 100644 index 0000000000..4fa3960d92 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client.utils; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Executes a {@link Callable} but limits the execution time. If the execution + * time is exceeded the callable will be cancelled. + */ +public class ExecutorWithTimeLimit implements ExecutorWithLimits +{ + private final long _endTime; + private final ExecutorService _singleThreadExecutor = Executors.newSingleThreadExecutor(); + + public ExecutorWithTimeLimit(long startTime, long allowedTimeInMillis) + { + _endTime = startTime + allowedTimeInMillis; + } + + @Override + public <T> T execute(Callable<T> callback) throws CancellationException, Exception + { + final long timeRemaining = _endTime - System.currentTimeMillis(); + if (timeRemaining <= 0) + { + throw new CancellationException("Too little time remains to schedule callable"); + } + + List<Future<T>> l = _singleThreadExecutor.invokeAll(Collections.singletonList(callback), timeRemaining, TimeUnit.MILLISECONDS); + return l.get(0).get(); + } + + @Override + public void shutdown() + { + _singleThreadExecutor.shutdown(); + } + + +} diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java new file mode 100644 index 0000000000..37820d2582 --- /dev/null +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client.utils; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Callable; + +import junit.framework.TestCase; + +public class ExecutorWithNoLimitsTest extends TestCase +{ + private final static Object RESULT = new Object(); + + private ExecutorWithLimits _limiter = new ExecutorWithNoLimits(); + @SuppressWarnings("unchecked") + private Callable<Object> _callback = mock(Callable.class); + + public void testNormalExecution() throws Exception + { + when(_callback.call()).thenReturn(RESULT); + final Object actualResult = _limiter.execute(_callback); + verify(_callback).call(); + assertEquals(RESULT, actualResult); + } + + public void testCallableThrowsException() throws Exception + { + when(_callback.call()).thenThrow(new Exception("mocked exception")); + + try + { + _limiter.execute(_callback); + fail("Exception not thrown"); + } + catch (Exception e) + { + // PASS + } + verify(_callback).call(); + } +} diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java new file mode 100644 index 0000000000..7a85de99ee --- /dev/null +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client.utils; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; + +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; + +import junit.framework.TestCase; + +public class ExecutorWithTimeLimitTest extends TestCase +{ + private static final int TIMEOUT = 500; + private static final Object RESULT = new Object(); + + private ExecutorWithLimits _limiter = new ExecutorWithTimeLimit(System.currentTimeMillis(), TIMEOUT); + @SuppressWarnings("unchecked") + private Callable<Object> _callback = mock(Callable.class); + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + _limiter.shutdown(); + } + + public void testCallableCompletesNormally() throws Exception + { + when(_callback.call()).thenReturn(RESULT); + + final Object actualResult = _limiter.execute(_callback); + + verify(_callback).call(); + assertEquals(RESULT, actualResult); + } + + public void testCallableThrowsException() throws Exception + { + when(_callback.call()).thenThrow(new Exception("mocked exception")); + + try + { + _limiter.execute(_callback); + fail("Exception not thrown"); + } + catch (CancellationException ce) + { + fail("Wrong exception thrown"); + } + catch (Exception e) + { + // PASS + } + verify(_callback).call(); + } + + public void testCallableNotRunDueToInsufficentTimeRemaining() throws Exception + { + long now = System.currentTimeMillis(); + _limiter = new ExecutorWithTimeLimit(now - 100, 100); + + try + { + _limiter.execute(_callback); + fail("Exception not thrown"); + } + catch (CancellationException ca) + { + // PASS + } + + verify(_callback, never()).call(); + } + + public void testExecutionInterruptedByTimeout() throws Exception + { + Callable<Void> oversleepingCallback = new Callable<Void>() + { + @Override + public Void call() throws Exception + { + Thread.sleep(TIMEOUT * 2); + return null; + } + }; + + try + { + _limiter.execute(oversleepingCallback); + fail("Exception not thrown"); + } + catch (CancellationException ca) + { + // PASS + } + } +} |
