summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json8
-rw-r--r--qpid/java/perftests/etc/testdefs/short/MessageSize.json44
-rw-r--r--qpid/java/perftests/etc/testdefs/short/QueueTypes.json12
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java79
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java35
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java49
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java42
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java65
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java61
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java118
10 files changed, 461 insertions, 52 deletions
diff --git a/qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json b/qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json
index d504c486aa..f910fc894d 100644
--- a/qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json
+++ b/qpid/java/perftests/etc/testdefs/short/AcknowledgementModes.json
@@ -5,13 +5,13 @@
"_iterations":[
{
"_messageSize": 1024,
- "_numberOfMessages": 1000000,
+ "_maximumDuration": 30000,
"_acknowledgeMode": 1,
"_deliveryMode": 1
},
{
"_messageSize": 1024,
- "_numberOfMessages": 250000,
+ "_maximumDuration": 30000,
"_acknowledgeMode": 0,
"_deliveryMode": 1
}
@@ -74,13 +74,13 @@
"_iterations":[
{
"_messageSize": 1024,
- "_numberOfMessages": 100000,
+ "_maximumDuration": 30000,
"_acknowledgeMode": 1,
"_deliveryMode": 2
},
{
"_messageSize": 1024,
- "_numberOfMessages": 25000,
+ "_maximumDuration": 30000,
"_acknowledgeMode": 0,
"_deliveryMode": 2
}
diff --git a/qpid/java/perftests/etc/testdefs/short/MessageSize.json b/qpid/java/perftests/etc/testdefs/short/MessageSize.json
index 3354d72d3e..693e418291 100644
--- a/qpid/java/perftests/etc/testdefs/short/MessageSize.json
+++ b/qpid/java/perftests/etc/testdefs/short/MessageSize.json
@@ -5,47 +5,47 @@
"_iterations":[
{
"_messageSize": 256,
- "_numberOfMessages": 1000000
+ "_maximumDuration": 30000
},
{
"_messageSize": 512,
- "_numberOfMessages": 500000
+ "_maximumDuration": 30000
},
{
"_messageSize": 1024,
- "_numberOfMessages": "250000"
+ "_maximumDuration": 30000
},
{
"_messageSize": 2048,
- "_numberOfMessages": "125000"
+ "_maximumDuration": 30000
},
{
"_messageSize": 4096,
- "_numberOfMessages": "62500"
+ "_maximumDuration": 30000
},
{
"_messageSize": 8192,
- "_numberOfMessages": 31250
+ "_maximumDuration": 30000
},
{
"_messageSize": 16384,
- "_numberOfMessages": 15625
+ "_maximumDuration": 30000
},
{
"_messageSize": 32768,
- "_numberOfMessages": 7812
+ "_maximumDuration": 30000
},
{
"_messageSize": 65536,
- "_numberOfMessages": 3906
+ "_maximumDuration": 30000
},
{
"_messageSize": 131072,
- "_numberOfMessages": 195
+ "_maximumDuration": 30000
},
{
"_messageSize": 262144,
- "_numberOfMessages": 97
+ "_maximumDuration": 30000
}
],
"_queues":[
@@ -109,47 +109,47 @@
"_iterations":[
{
"_messageSize": 256,
- "_numberOfMessages": 20000
+ "_maximumDuration": 30000
},
{
"_messageSize": 512,
- "_numberOfMessages": 20000
+ "_maximumDuration": 30000
},
{
"_messageSize": 1024,
- "_numberOfMessages": 20000
+ "_maximumDuration": 30000
},
{
"_messageSize": 2048,
- "_numberOfMessages": 20000
+ "_maximumDuration": 30000
},
{
"_messageSize": 4096,
- "_numberOfMessages": 4000
+ "_maximumDuration": 30000
},
{
"_messageSize": 8192,
- "_numberOfMessages": 4000
+ "_maximumDuration": 30000
},
{
"_messageSize": 16384,
- "_numberOfMessages": 4000
+ "_maximumDuration": 30000
},
{
"_messageSize": 32768,
- "_numberOfMessages": 2000
+ "_maximumDuration": 30000
},
{
"_messageSize": 65536,
- "_numberOfMessages": 2000
+ "_maximumDuration": 30000
},
{
"_messageSize": 131072,
- "_numberOfMessages": 2000
+ "_maximumDuration": 30000
},
{
"_messageSize": 262144,
- "_numberOfMessages": 2000
+ "_maximumDuration": 30000
}
],
"_queues":[
diff --git a/qpid/java/perftests/etc/testdefs/short/QueueTypes.json b/qpid/java/perftests/etc/testdefs/short/QueueTypes.json
index 2237e8d8b9..427f3d9795 100644
--- a/qpid/java/perftests/etc/testdefs/short/QueueTypes.json
+++ b/qpid/java/perftests/etc/testdefs/short/QueueTypes.json
@@ -16036,7 +16036,7 @@
"_destinationName": "direct://amq.direct//sorted-queue?durable='true'",
"_deliveryMode": 2,
"_messageSize": 1024,
- "_numberOfMessages": 20000,
+ "_maximumDuration": 30000,
"_messageProviderName": "messageProvider"
}
]
@@ -16059,7 +16059,7 @@
{
"_name": "Consumer1",
"_destinationName": "direct://amq.direct//sorted-queue?durable='true'",
- "_numberOfMessages": 20000
+ "_maximumDuration": 30000
}
]
}
@@ -16094,7 +16094,7 @@
"_destinationName": "direct://amq.direct//simple-queue?durable='true'",
"_deliveryMode": 2,
"_messageSize": 1024,
- "_numberOfMessages": 20000
+ "_maximumDuration": 30000
}
]
}
@@ -16116,7 +16116,7 @@
{
"_name": "Consumer1",
"_destinationName": "direct://amq.direct//simple-queue?durable='true'",
- "_numberOfMessages": 20000
+ "_maximumDuration": 30000
}
]
}
@@ -16161,7 +16161,7 @@
"_destinationName": "direct://amq.direct//priority-queue?durable='true'",
"_deliveryMode": 2,
"_messageSize": 1024,
- "_numberOfMessages": 20000,
+ "_maximumDuration": 30000,
"_messageProviderName": "messageProvider"
}
]
@@ -16184,7 +16184,7 @@
{
"_name": "Consumer1",
"_destinationName": "direct://amq.direct//priority-queue?durable='true'",
- "_numberOfMessages": 20000
+ "_maximumDuration": 30000
}
]
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
index bcad81b4aa..49f6ae103c 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
@@ -22,10 +22,14 @@ package org.apache.qpid.disttest.client;
import java.util.Date;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import javax.jms.Message;
import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.client.utils.ExecutorWithLimits;
+import org.apache.qpid.disttest.client.utils.ExecutorWithLimitsFactory;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
import org.apache.qpid.disttest.message.CreateProducerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
@@ -40,7 +44,9 @@ public class ProducerParticipant implements Participant
private final CreateProducerCommand _command;
- private ParticipantResultFactory _resultFactory;
+ private final ParticipantResultFactory _resultFactory;
+
+ private ExecutorWithLimits _limiter;
public ProducerParticipant(final ClientJmsDelegate jmsDelegate, final CreateProducerCommand command)
{
@@ -59,10 +65,10 @@ public class ProducerParticipant implements Participant
int acknowledgeMode = _jmsDelegate.getAcknowledgeMode(_command.getSessionName());
- long expectedDuration = _command.getMaximumDuration() - _command.getStartDelay();
-
doSleepForStartDelay();
+ final long requiredDuration = _command.getMaximumDuration() - _command.getStartDelay();
+
final long startTime = System.currentTimeMillis();
Message lastPublishedMessage = null;
@@ -70,11 +76,28 @@ public class ProducerParticipant implements Participant
long totalPayloadSizeOfAllMessagesSent = 0;
NavigableSet<Integer> allProducedPayloadSizes = new TreeSet<Integer>();
+ _limiter = ExecutorWithLimitsFactory.createExecutorWithLimit(startTime, requiredDuration);
+
while (true)
{
- numberOfMessagesSent++;
+ try
+ {
+ lastPublishedMessage = _limiter.execute(new Callable<Message>()
+ {
+ @Override
+ public Message call() throws Exception
+ {
+ return _jmsDelegate.sendNextMessage(_command);
+ }
+ });
+ }
+ catch (CancellationException ce)
+ {
+ LOGGER.trace("Producer send was cancelled due to maximum duration {} ms", requiredDuration);
+ break;
+ }
- lastPublishedMessage = _jmsDelegate.sendNextMessage(_command);
+ numberOfMessagesSent++;
int lastPayloadSize = _jmsDelegate.calculatePayloadSizeFrom(lastPublishedMessage);
totalPayloadSizeOfAllMessagesSent += lastPayloadSize;
@@ -96,15 +119,11 @@ public class ProducerParticipant implements Participant
}
_jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName());
- if (_command.getInterval() > 0)
- {
- // sleep for given time
- Thread.sleep(_command.getInterval());
- }
+ doSleepForInterval();
}
if (_command.getNumberOfMessages() > 0 && numberOfMessagesSent >= _command.getNumberOfMessages()
- || expectedDuration > 0 && System.currentTimeMillis() - startTime >= expectedDuration)
+ || requiredDuration > 0 && System.currentTimeMillis() - startTime >= requiredDuration)
{
break;
}
@@ -140,23 +159,42 @@ public class ProducerParticipant implements Participant
private void doSleepForStartDelay()
{
- if (_command.getStartDelay() > 0)
+ long sleepTime = _command.getStartDelay();
+ if (sleepTime > 0)
{
// start delay is specified. Sleeping...
- try
- {
- Thread.sleep(_command.getStartDelay());
- }
- catch (final InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ doSleep(sleepTime);
+ }
+ }
+
+ private void doSleepForInterval() throws InterruptedException
+ {
+ long sleepTime = _command.getInterval();
+ if (sleepTime > 0)
+ {
+ doSleep(sleepTime);
+ }
+ }
+
+ private void doSleep(long sleepTime)
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ }
+ catch (final InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
}
}
@Override
public void releaseResources()
{
+ if (_limiter != null)
+ {
+ _limiter.shutdown();
+ }
_jmsDelegate.closeTestProducer(_command.getParticipantName());
}
@@ -171,4 +209,5 @@ public class ProducerParticipant implements Participant
{
return "ProducerParticipant [command=" + _command + "]";
}
+
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java
new file mode 100644
index 0000000000..f64107c125
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+/**
+ * Implementations of this interface execute a {@link Callable} but place some
+ * kind of limit on that execution, such as time.
+ */
+public interface ExecutorWithLimits
+{
+ <T> T execute(Callable<T> callback) throws CancellationException, Exception;
+
+ void shutdown();
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java
new file mode 100644
index 0000000000..4d17d76568
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import java.util.concurrent.Callable;
+
+public class ExecutorWithLimitsFactory
+{
+ /**
+ * Creates an {@link ExecutorWithLimits} that will permit the execution of {@link Callable} implementations until
+ * until <code>allowedTimeInMillis</code> milliseconds have elapsed beyond <code>startTime</code>.
+ * If <code>allowedTimeInMillis</code> is less than or equal to zero, a {@link ExecutorWithNoLimits}
+ * is created that enforces no time-limit.
+ *
+ * @param startTime start time (milliseconds)
+ * @param allowedTimeInMillis allowed time (milliseconds)
+ *
+ * @return ExecutionLimiter implementation
+ */
+ public static ExecutorWithLimits createExecutorWithLimit(long startTime, long allowedTimeInMillis)
+ {
+ if (allowedTimeInMillis > 0)
+ {
+ return new ExecutorWithTimeLimit(startTime, allowedTimeInMillis);
+ }
+ else
+ {
+ return new ExecutorWithNoLimits();
+ }
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java
new file mode 100644
index 0000000000..f729a72fa5
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Executes a {@link Callable} without any limits.
+ */
+public class ExecutorWithNoLimits implements ExecutorWithLimits
+{
+
+ @Override
+ public <T> T execute(Callable<T> _callback) throws Exception
+ {
+ return _callback.call();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ // Deliberately blank
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java
new file mode 100644
index 0000000000..4fa3960d92
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Executes a {@link Callable} but limits the execution time. If the execution
+ * time is exceeded the callable will be cancelled.
+ */
+public class ExecutorWithTimeLimit implements ExecutorWithLimits
+{
+ private final long _endTime;
+ private final ExecutorService _singleThreadExecutor = Executors.newSingleThreadExecutor();
+
+ public ExecutorWithTimeLimit(long startTime, long allowedTimeInMillis)
+ {
+ _endTime = startTime + allowedTimeInMillis;
+ }
+
+ @Override
+ public <T> T execute(Callable<T> callback) throws CancellationException, Exception
+ {
+ final long timeRemaining = _endTime - System.currentTimeMillis();
+ if (timeRemaining <= 0)
+ {
+ throw new CancellationException("Too little time remains to schedule callable");
+ }
+
+ List<Future<T>> l = _singleThreadExecutor.invokeAll(Collections.singletonList(callback), timeRemaining, TimeUnit.MILLISECONDS);
+ return l.get(0).get();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ _singleThreadExecutor.shutdown();
+ }
+
+
+}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java
new file mode 100644
index 0000000000..37820d2582
--- /dev/null
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.Callable;
+
+import junit.framework.TestCase;
+
+public class ExecutorWithNoLimitsTest extends TestCase
+{
+ private final static Object RESULT = new Object();
+
+ private ExecutorWithLimits _limiter = new ExecutorWithNoLimits();
+ @SuppressWarnings("unchecked")
+ private Callable<Object> _callback = mock(Callable.class);
+
+ public void testNormalExecution() throws Exception
+ {
+ when(_callback.call()).thenReturn(RESULT);
+ final Object actualResult = _limiter.execute(_callback);
+ verify(_callback).call();
+ assertEquals(RESULT, actualResult);
+ }
+
+ public void testCallableThrowsException() throws Exception
+ {
+ when(_callback.call()).thenThrow(new Exception("mocked exception"));
+
+ try
+ {
+ _limiter.execute(_callback);
+ fail("Exception not thrown");
+ }
+ catch (Exception e)
+ {
+ // PASS
+ }
+ verify(_callback).call();
+ }
+}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java
new file mode 100644
index 0000000000..7a85de99ee
--- /dev/null
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+import junit.framework.TestCase;
+
+public class ExecutorWithTimeLimitTest extends TestCase
+{
+ private static final int TIMEOUT = 500;
+ private static final Object RESULT = new Object();
+
+ private ExecutorWithLimits _limiter = new ExecutorWithTimeLimit(System.currentTimeMillis(), TIMEOUT);
+ @SuppressWarnings("unchecked")
+ private Callable<Object> _callback = mock(Callable.class);
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ _limiter.shutdown();
+ }
+
+ public void testCallableCompletesNormally() throws Exception
+ {
+ when(_callback.call()).thenReturn(RESULT);
+
+ final Object actualResult = _limiter.execute(_callback);
+
+ verify(_callback).call();
+ assertEquals(RESULT, actualResult);
+ }
+
+ public void testCallableThrowsException() throws Exception
+ {
+ when(_callback.call()).thenThrow(new Exception("mocked exception"));
+
+ try
+ {
+ _limiter.execute(_callback);
+ fail("Exception not thrown");
+ }
+ catch (CancellationException ce)
+ {
+ fail("Wrong exception thrown");
+ }
+ catch (Exception e)
+ {
+ // PASS
+ }
+ verify(_callback).call();
+ }
+
+ public void testCallableNotRunDueToInsufficentTimeRemaining() throws Exception
+ {
+ long now = System.currentTimeMillis();
+ _limiter = new ExecutorWithTimeLimit(now - 100, 100);
+
+ try
+ {
+ _limiter.execute(_callback);
+ fail("Exception not thrown");
+ }
+ catch (CancellationException ca)
+ {
+ // PASS
+ }
+
+ verify(_callback, never()).call();
+ }
+
+ public void testExecutionInterruptedByTimeout() throws Exception
+ {
+ Callable<Void> oversleepingCallback = new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ Thread.sleep(TIMEOUT * 2);
+ return null;
+ }
+ };
+
+ try
+ {
+ _limiter.execute(oversleepingCallback);
+ fail("Exception not thrown");
+ }
+ catch (CancellationException ca)
+ {
+ // PASS
+ }
+ }
+}