summaryrefslogtreecommitdiff
path: root/java/perftests/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /java/perftests/src
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src')
-rw-r--r--java/perftests/src/main/java/json2.js487
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java74
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java54
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java6
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java10
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java59
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java8
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java6
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java20
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java82
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java35
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java49
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java42
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java65
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java13
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java17
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java22
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java22
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluator.java64
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java39
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java2
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java3
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java168
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java38
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java37
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java8
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java67
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java12
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java9
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java59
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java111
-rw-r--r--java/perftests/src/main/java/test-utils.js85
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileHelperTest.java79
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileTestHelper.java (renamed from java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileHelper.java)2
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java2
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java23
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/MessageProviderTest.java1
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java2
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/property/ListPropertyValueTest.java1
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/property/PropertyValueFactoryTest.java1
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RandomPropertyValueTest.java1
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RangePropertyValueTest.java1
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java61
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java131
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js34
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest.java40
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java33
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js23
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest.java81
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ParticipantConfigTest.java43
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/message/ParticipantResultTest.java4
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/SeriesStatisticsTest.java43
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java52
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java10
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv4
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java22
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java18
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java22
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json3
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java4
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java6
61 files changed, 2295 insertions, 225 deletions
diff --git a/java/perftests/src/main/java/json2.js b/java/perftests/src/main/java/json2.js
new file mode 100644
index 0000000000..2dbf60d398
--- /dev/null
+++ b/java/perftests/src/main/java/json2.js
@@ -0,0 +1,487 @@
+/*
+ http://www.JSON.org/json2.js
+ 2011-10-19
+
+ Public Domain.
+
+ NO WARRANTY EXPRESSED OR IMPLIED. USE AT YOUR OWN RISK.
+
+ See http://www.JSON.org/js.html
+
+
+ This code should be minified before deployment.
+ See http://javascript.crockford.com/jsmin.html
+
+ USE YOUR OWN COPY. IT IS EXTREMELY UNWISE TO LOAD CODE FROM SERVERS YOU DO
+ NOT CONTROL.
+
+
+ This file creates a global JSON object containing two methods: stringify
+ and parse.
+
+ JSON.stringify(value, replacer, space)
+ value any JavaScript value, usually an object or array.
+
+ replacer an optional parameter that determines how object
+ values are stringified for objects. It can be a
+ function or an array of strings.
+
+ space an optional parameter that specifies the indentation
+ of nested structures. If it is omitted, the text will
+ be packed without extra whitespace. If it is a number,
+ it will specify the number of spaces to indent at each
+ level. If it is a string (such as '\t' or '&nbsp;'),
+ it contains the characters used to indent at each level.
+
+ This method produces a JSON text from a JavaScript value.
+
+ When an object value is found, if the object contains a toJSON
+ method, its toJSON method will be called and the result will be
+ stringified. A toJSON method does not serialize: it returns the
+ value represented by the name/value pair that should be serialized,
+ or undefined if nothing should be serialized. The toJSON method
+ will be passed the key associated with the value, and this will be
+ bound to the value
+
+ For example, this would serialize Dates as ISO strings.
+
+ Date.prototype.toJSON = function (key) {
+ function f(n) {
+ // Format integers to have at least two digits.
+ return n < 10 ? '0' + n : n;
+ }
+
+ return this.getUTCFullYear() + '-' +
+ f(this.getUTCMonth() + 1) + '-' +
+ f(this.getUTCDate()) + 'T' +
+ f(this.getUTCHours()) + ':' +
+ f(this.getUTCMinutes()) + ':' +
+ f(this.getUTCSeconds()) + 'Z';
+ };
+
+ You can provide an optional replacer method. It will be passed the
+ key and value of each member, with this bound to the containing
+ object. The value that is returned from your method will be
+ serialized. If your method returns undefined, then the member will
+ be excluded from the serialization.
+
+ If the replacer parameter is an array of strings, then it will be
+ used to select the members to be serialized. It filters the results
+ such that only members with keys listed in the replacer array are
+ stringified.
+
+ Values that do not have JSON representations, such as undefined or
+ functions, will not be serialized. Such values in objects will be
+ dropped; in arrays they will be replaced with null. You can use
+ a replacer function to replace those with JSON values.
+ JSON.stringify(undefined) returns undefined.
+
+ The optional space parameter produces a stringification of the
+ value that is filled with line breaks and indentation to make it
+ easier to read.
+
+ If the space parameter is a non-empty string, then that string will
+ be used for indentation. If the space parameter is a number, then
+ the indentation will be that many spaces.
+
+ Example:
+
+ text = JSON.stringify(['e', {pluribus: 'unum'}]);
+ // text is '["e",{"pluribus":"unum"}]'
+
+
+ text = JSON.stringify(['e', {pluribus: 'unum'}], null, '\t');
+ // text is '[\n\t"e",\n\t{\n\t\t"pluribus": "unum"\n\t}\n]'
+
+ text = JSON.stringify([new Date()], function (key, value) {
+ return this[key] instanceof Date ?
+ 'Date(' + this[key] + ')' : value;
+ });
+ // text is '["Date(---current time---)"]'
+
+
+ JSON.parse(text, reviver)
+ This method parses a JSON text to produce an object or array.
+ It can throw a SyntaxError exception.
+
+ The optional reviver parameter is a function that can filter and
+ transform the results. It receives each of the keys and values,
+ and its return value is used instead of the original value.
+ If it returns what it received, then the structure is not modified.
+ If it returns undefined then the member is deleted.
+
+ Example:
+
+ // Parse the text. Values that look like ISO date strings will
+ // be converted to Date objects.
+
+ myData = JSON.parse(text, function (key, value) {
+ var a;
+ if (typeof value === 'string') {
+ a =
+/^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}(?:\.\d*)?)Z$/.exec(value);
+ if (a) {
+ return new Date(Date.UTC(+a[1], +a[2] - 1, +a[3], +a[4],
+ +a[5], +a[6]));
+ }
+ }
+ return value;
+ });
+
+ myData = JSON.parse('["Date(09/09/2001)"]', function (key, value) {
+ var d;
+ if (typeof value === 'string' &&
+ value.slice(0, 5) === 'Date(' &&
+ value.slice(-1) === ')') {
+ d = new Date(value.slice(5, -1));
+ if (d) {
+ return d;
+ }
+ }
+ return value;
+ });
+
+
+ This is a reference implementation. You are free to copy, modify, or
+ redistribute.
+*/
+
+/*jslint evil: true, regexp: true */
+
+/*members "", "\b", "\t", "\n", "\f", "\r", "\"", JSON, "\\", apply,
+ call, charCodeAt, getUTCDate, getUTCFullYear, getUTCHours,
+ getUTCMinutes, getUTCMonth, getUTCSeconds, hasOwnProperty, join,
+ lastIndex, length, parse, prototype, push, replace, slice, stringify,
+ test, toJSON, toString, valueOf
+*/
+
+
+// Create a JSON object only if one does not already exist. We create the
+// methods in a closure to avoid creating global variables.
+
+var JSON;
+if (!JSON) {
+ JSON = {};
+}
+
+(function () {
+ 'use strict';
+
+ function f(n) {
+ // Format integers to have at least two digits.
+ return n < 10 ? '0' + n : n;
+ }
+
+ if (typeof Date.prototype.toJSON !== 'function') {
+
+ Date.prototype.toJSON = function (key) {
+
+ return isFinite(this.valueOf())
+ ? this.getUTCFullYear() + '-' +
+ f(this.getUTCMonth() + 1) + '-' +
+ f(this.getUTCDate()) + 'T' +
+ f(this.getUTCHours()) + ':' +
+ f(this.getUTCMinutes()) + ':' +
+ f(this.getUTCSeconds()) + 'Z'
+ : null;
+ };
+
+ String.prototype.toJSON =
+ Number.prototype.toJSON =
+ Boolean.prototype.toJSON = function (key) {
+ return this.valueOf();
+ };
+ }
+
+ var cx = /[\u0000\u00ad\u0600-\u0604\u070f\u17b4\u17b5\u200c-\u200f\u2028-\u202f\u2060-\u206f\ufeff\ufff0-\uffff]/g,
+ escapable = /[\\\"\x00-\x1f\x7f-\x9f\u00ad\u0600-\u0604\u070f\u17b4\u17b5\u200c-\u200f\u2028-\u202f\u2060-\u206f\ufeff\ufff0-\uffff]/g,
+ gap,
+ indent,
+ meta = { // table of character substitutions
+ '\b': '\\b',
+ '\t': '\\t',
+ '\n': '\\n',
+ '\f': '\\f',
+ '\r': '\\r',
+ '"' : '\\"',
+ '\\': '\\\\'
+ },
+ rep;
+
+
+ function quote(string) {
+
+// If the string contains no control characters, no quote characters, and no
+// backslash characters, then we can safely slap some quotes around it.
+// Otherwise we must also replace the offending characters with safe escape
+// sequences.
+
+ escapable.lastIndex = 0;
+ return escapable.test(string) ? '"' + string.replace(escapable, function (a) {
+ var c = meta[a];
+ return typeof c === 'string'
+ ? c
+ : '\\u' + ('0000' + a.charCodeAt(0).toString(16)).slice(-4);
+ }) + '"' : '"' + string + '"';
+ }
+
+
+ function str(key, holder) {
+
+// Produce a string from holder[key].
+
+ var i, // The loop counter.
+ k, // The member key.
+ v, // The member value.
+ length,
+ mind = gap,
+ partial,
+ value = holder[key];
+
+// If the value has a toJSON method, call it to obtain a replacement value.
+
+ if (value && typeof value === 'object' &&
+ typeof value.toJSON === 'function') {
+ value = value.toJSON(key);
+ }
+
+// If we were called with a replacer function, then call the replacer to
+// obtain a replacement value.
+
+ if (typeof rep === 'function') {
+ value = rep.call(holder, key, value);
+ }
+
+// What happens next depends on the value's type.
+
+ switch (typeof value) {
+ case 'string':
+ return quote(value);
+
+ case 'number':
+
+// JSON numbers must be finite. Encode non-finite numbers as null.
+
+ return isFinite(value) ? String(value) : 'null';
+
+ case 'boolean':
+ case 'null':
+
+// If the value is a boolean or null, convert it to a string. Note:
+// typeof null does not produce 'null'. The case is included here in
+// the remote chance that this gets fixed someday.
+
+ return String(value);
+
+// If the type is 'object', we might be dealing with an object or an array or
+// null.
+
+ case 'object':
+
+// Due to a specification blunder in ECMAScript, typeof null is 'object',
+// so watch out for that case.
+
+ if (!value) {
+ return 'null';
+ }
+
+// Make an array to hold the partial results of stringifying this object value.
+
+ gap += indent;
+ partial = [];
+
+// Is the value an array?
+
+ if (Object.prototype.toString.apply(value) === '[object Array]') {
+
+// The value is an array. Stringify every element. Use null as a placeholder
+// for non-JSON values.
+
+ length = value.length;
+ for (i = 0; i < length; i += 1) {
+ partial[i] = str(i, value) || 'null';
+ }
+
+// Join all of the elements together, separated with commas, and wrap them in
+// brackets.
+
+ v = partial.length === 0
+ ? '[]'
+ : gap
+ ? '[\n' + gap + partial.join(',\n' + gap) + '\n' + mind + ']'
+ : '[' + partial.join(',') + ']';
+ gap = mind;
+ return v;
+ }
+
+// If the replacer is an array, use it to select the members to be stringified.
+
+ if (rep && typeof rep === 'object') {
+ length = rep.length;
+ for (i = 0; i < length; i += 1) {
+ if (typeof rep[i] === 'string') {
+ k = rep[i];
+ v = str(k, value);
+ if (v) {
+ partial.push(quote(k) + (gap ? ': ' : ':') + v);
+ }
+ }
+ }
+ } else {
+
+// Otherwise, iterate through all of the keys in the object.
+
+ for (k in value) {
+ if (Object.prototype.hasOwnProperty.call(value, k)) {
+ v = str(k, value);
+ if (v) {
+ partial.push(quote(k) + (gap ? ': ' : ':') + v);
+ }
+ }
+ }
+ }
+
+// Join all of the member texts together, separated with commas,
+// and wrap them in braces.
+
+ v = partial.length === 0
+ ? '{}'
+ : gap
+ ? '{\n' + gap + partial.join(',\n' + gap) + '\n' + mind + '}'
+ : '{' + partial.join(',') + '}';
+ gap = mind;
+ return v;
+ }
+ }
+
+// If the JSON object does not yet have a stringify method, give it one.
+
+ if (typeof JSON.stringify !== 'function') {
+ JSON.stringify = function (value, replacer, space) {
+
+// The stringify method takes a value and an optional replacer, and an optional
+// space parameter, and returns a JSON text. The replacer can be a function
+// that can replace values, or an array of strings that will select the keys.
+// A default replacer method can be provided. Use of the space parameter can
+// produce text that is more easily readable.
+
+ var i;
+ gap = '';
+ indent = '';
+
+// If the space parameter is a number, make an indent string containing that
+// many spaces.
+
+ if (typeof space === 'number') {
+ for (i = 0; i < space; i += 1) {
+ indent += ' ';
+ }
+
+// If the space parameter is a string, it will be used as the indent string.
+
+ } else if (typeof space === 'string') {
+ indent = space;
+ }
+
+// If there is a replacer, it must be a function or an array.
+// Otherwise, throw an error.
+
+ rep = replacer;
+ if (replacer && typeof replacer !== 'function' &&
+ (typeof replacer !== 'object' ||
+ typeof replacer.length !== 'number')) {
+ throw new Error('JSON.stringify');
+ }
+
+// Make a fake root object containing our value under the key of ''.
+// Return the result of stringifying the value.
+
+ return str('', {'': value});
+ };
+ }
+
+
+// If the JSON object does not yet have a parse method, give it one.
+
+ if (typeof JSON.parse !== 'function') {
+ JSON.parse = function (text, reviver) {
+
+// The parse method takes a text and an optional reviver function, and returns
+// a JavaScript value if the text is a valid JSON text.
+
+ var j;
+
+ function walk(holder, key) {
+
+// The walk method is used to recursively walk the resulting structure so
+// that modifications can be made.
+
+ var k, v, value = holder[key];
+ if (value && typeof value === 'object') {
+ for (k in value) {
+ if (Object.prototype.hasOwnProperty.call(value, k)) {
+ v = walk(value, k);
+ if (v !== undefined) {
+ value[k] = v;
+ } else {
+ delete value[k];
+ }
+ }
+ }
+ }
+ return reviver.call(holder, key, value);
+ }
+
+
+// Parsing happens in four stages. In the first stage, we replace certain
+// Unicode characters with escape sequences. JavaScript handles many characters
+// incorrectly, either silently deleting them, or treating them as line endings.
+
+ text = String(text);
+ cx.lastIndex = 0;
+ if (cx.test(text)) {
+ text = text.replace(cx, function (a) {
+ return '\\u' +
+ ('0000' + a.charCodeAt(0).toString(16)).slice(-4);
+ });
+ }
+
+// In the second stage, we run the text against regular expressions that look
+// for non-JSON patterns. We are especially concerned with '()' and 'new'
+// because they can cause invocation, and '=' because it can cause mutation.
+// But just to be safe, we want to reject all unexpected forms.
+
+// We split the second stage into 4 regexp operations in order to work around
+// crippling inefficiencies in IE's and Safari's regexp engines. First we
+// replace the JSON backslash pairs with '@' (a non-JSON character). Second, we
+// replace all simple value tokens with ']' characters. Third, we delete all
+// open brackets that follow a colon or comma or that begin the text. Finally,
+// we look to see that the remaining characters are only whitespace or ']' or
+// ',' or ':' or '{' or '}'. If that is so, then the text is safe for eval.
+
+ if (/^[\],:{}\s]*$/
+ .test(text.replace(/\\(?:["\\\/bfnrt]|u[0-9a-fA-F]{4})/g, '@')
+ .replace(/"[^"\\\n\r]*"|true|false|null|-?\d+(?:\.\d*)?(?:[eE][+\-]?\d+)?/g, ']')
+ .replace(/(?:^|:|,)(?:\s*\[)+/g, ''))) {
+
+// In the third stage we use the eval function to compile the text into a
+// JavaScript structure. The '{' operator is subject to a syntactic ambiguity
+// in JavaScript: it can begin a block or an object literal. We wrap the text
+// in parens to eliminate the ambiguity.
+
+ j = eval('(' + text + ')');
+
+// In the optional fourth stage, we recursively walk the new structure, passing
+// each name/value pair to a reviver function for possible transformation.
+
+ return typeof reviver === 'function'
+ ? walk({'': j}, '')
+ : j;
+ }
+
+// If the text is not JSON parseable, then a SyntaxError is thrown.
+
+ throw new SyntaxError('JSON.parse');
+ };
+ }
+}());
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java b/java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java
new file mode 100644
index 0000000000..fb4c1b700b
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ConfigFileHelper
+{
+ /**
+ * Returns absolute paths to the config file(s).
+ * <p/>
+ * If testConfigPath is a directory, its .js and .json files are returned.
+ * Otherwise, the returned list just contains testConfigPath.
+ */
+ public List<String> getTestConfigFiles(String testConfigPath)
+ {
+ final List<String> testConfigFile = new ArrayList<String>();
+ final File configFileOrDirectory = new File(testConfigPath);
+
+ if (configFileOrDirectory.isDirectory())
+ {
+ final String[] configFiles = configFileOrDirectory.list(new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ boolean suffixOk = name.endsWith(".json") || name.endsWith(".js");
+ return new File(dir, name).isFile() && suffixOk;
+ }
+ });
+
+ for (String configFile : configFiles)
+ {
+ testConfigFile.add(new File(configFileOrDirectory, configFile).getAbsolutePath());
+ }
+ }
+ else
+ {
+ testConfigFile.add(configFileOrDirectory.getAbsolutePath());
+ }
+
+ return testConfigFile;
+ }
+
+ /**
+ * generateOutputCsvNameFrom("/config/testConfigFile.js", "/output") returns /output/testConfigFile.csv
+ */
+ public String generateOutputCsvNameFrom(String testConfigFile, String outputDir)
+ {
+ final String filenameOnlyWithExtension = new File(testConfigFile).getName();
+ final String cvsFile = filenameOnlyWithExtension.replaceFirst(".?\\w*$", ".csv");
+
+ return new File(outputDir, cvsFile).getAbsolutePath();
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java b/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java
index aa9c582bf8..aea0ea301a 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java
@@ -19,13 +19,9 @@
*/
package org.apache.qpid.disttest;
-import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
-import java.io.FilenameFilter;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import javax.naming.Context;
@@ -54,6 +50,7 @@ public class ControllerRunner extends AbstractRunner
private final Aggregator _aggregator = new Aggregator();
+ private final ConfigFileHelper _configFileHelper = new ConfigFileHelper();
public ControllerRunner()
{
@@ -89,7 +86,8 @@ public class ControllerRunner extends AbstractRunner
{
Controller controller = new Controller(jmsDelegate, DistributedTestConstants.REGISTRATION_TIMEOUT, DistributedTestConstants.COMMAND_RESPONSE_TIMEOUT);
- final List<String> testConfigFiles = getTestConfigFiles();
+ String testConfigPath = getCliOptions().get(ControllerRunner.TEST_CONFIG_PROP);
+ List<String> testConfigFiles = _configFileHelper.getTestConfigFiles(testConfigPath);
createClientsIfNotDistributed(testConfigFiles);
try
@@ -105,11 +103,14 @@ public class ControllerRunner extends AbstractRunner
runTest(controller, testConfigFile);
}
}
+ catch(Exception e)
+ {
+ LOGGER.error("Problem running test", e);
+ }
finally
{
controller.stopAllRegisteredClients();
}
-
}
private void runTest(Controller controller, String testConfigFile)
@@ -120,7 +121,8 @@ public class ControllerRunner extends AbstractRunner
ResultsForAllTests rawResultsForAllTests = controller.runAllTests();
ResultsForAllTests resultsForAllTests = _aggregator.aggregateResults(rawResultsForAllTests);
- final String outputFile = generateOutputCsvNameFrom(testConfigFile);
+ String outputDir = getCliOptions().get(ControllerRunner.OUTPUT_DIR_PROP);
+ final String outputFile = _configFileHelper.generateOutputCsvNameFrom(testConfigFile, outputDir);
writeResultsToFile(resultsForAllTests, outputFile);
}
@@ -176,44 +178,6 @@ public class ControllerRunner extends AbstractRunner
}
}
- private String generateOutputCsvNameFrom(String testConfigFile)
- {
- final String filenameOnlyWithExtension = new File(testConfigFile).getName();
- final String cvsFile = filenameOnlyWithExtension.replaceFirst(".?\\w*$", ".csv");
- final String outputDir = String.valueOf(getCliOptions().get(ControllerRunner.OUTPUT_DIR_PROP));
-
- return new File(outputDir, cvsFile).getAbsolutePath();
- }
-
- private List<String> getTestConfigFiles()
- {
- final List<String> testConfigFile = new ArrayList<String>();
- final File configFileOrDirectory = new File(getCliOptions().get(ControllerRunner.TEST_CONFIG_PROP));
-
- if (configFileOrDirectory.isDirectory())
- {
- final String[] configFiles = configFileOrDirectory.list(new FilenameFilter()
- {
- @Override
- public boolean accept(File dir, String name)
- {
- return new File(dir, name).isFile() && name.endsWith(".json");
- }
- });
-
- for (String configFile : configFiles)
- {
- testConfigFile.add(new File(configFileOrDirectory, configFile).getAbsolutePath());
- }
- }
- else
- {
- testConfigFile.add(configFileOrDirectory.getAbsolutePath());
- }
-
- return testConfigFile;
- }
-
private Config buildTestConfigFrom(String testConfigFile)
{
ConfigReader configReader = new ConfigReader();
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java b/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java
index c4892edca9..a6f872e841 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java
@@ -26,8 +26,10 @@ public abstract class DistributedTestConstants
public static final String MSG_COMMAND_PROPERTY = "COMMAND";
public static final String MSG_JSON_PROPERTY = "JSON";
- public static final long REGISTRATION_TIMEOUT = 60000;
- public static final long COMMAND_RESPONSE_TIMEOUT = 10000;
+ public static final long REGISTRATION_TIMEOUT = 60 * 1000;
+
+ /** set to a long time out because stopping clients can take a long time */
+ public static final long COMMAND_RESPONSE_TIMEOUT = 120 * 1000;
public static final String CONTROLLER_QUEUE_JNDI_NAME = "controllerqueue";
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
index 1d2d862301..0d5457c992 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
@@ -103,9 +103,9 @@ public class Client
public void processInstruction(final Command command)
{
- if (LOGGER.isInfoEnabled())
+ if (LOGGER.isDebugEnabled())
{
- LOGGER.info("Client " + getClientName() + " received command: " + command);
+ LOGGER.debug("Client " + getClientName() + " received command: " + command);
}
String responseMessage = null;
try
@@ -174,9 +174,9 @@ public class Client
{
if (_state.compareAndSet(ClientState.RUNNING_TEST, ClientState.READY))
{
- LOGGER.info("Tearing down test on client: " + _clientJmsDelegate.getClientName());
+ LOGGER.debug("Tearing down test on client: " + _clientJmsDelegate.getClientName());
- _clientJmsDelegate.closeTestConnections();
+ _clientJmsDelegate.tearDownTest();
}
else
{
@@ -190,7 +190,7 @@ public class Client
public void sendResults(ParticipantResult testResult)
{
_clientJmsDelegate.sendResponseMessage(testResult);
- LOGGER.info("Sent test results " + testResult);
+ LOGGER.debug("Sent test results " + testResult);
}
@Override
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
index 1b5e8276c2..f9d50e8e64 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
@@ -20,13 +20,16 @@
package org.apache.qpid.disttest.client;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -53,12 +56,17 @@ public class ConsumerParticipant implements Participant
private long _startTime;
private volatile Exception _asyncMessageListenerException;
+ private List<Long> _messageLatencies;
public ConsumerParticipant(final ClientJmsDelegate delegate, final CreateConsumerCommand command)
{
_jmsDelegate = delegate;
_command = command;
_resultFactory = new ParticipantResultFactory();
+ if (command.isEvaluateLatency())
+ {
+ _messageLatencies = new ArrayList<Long>();
+ }
}
@Override
@@ -78,6 +86,8 @@ public class ConsumerParticipant implements Participant
}
else
{
+ LOGGER.info("Consumer {} registering listener", getName());
+
_jmsDelegate.registerListener(_command.getParticipantName(), new MessageListener(){
@Override
@@ -105,14 +115,14 @@ public class ConsumerParticipant implements Participant
numberOfMessagesSent,
payloadSize,
totalPayloadSize,
- start, end);
+ start, end, _messageLatencies);
return result;
}
private void synchronousRun()
{
- LOGGER.debug("entered synchronousRun: " + this);
+ LOGGER.info("Consumer {} about to consume messages", getName());
_startTime = System.currentTimeMillis();
@@ -130,25 +140,42 @@ public class ConsumerParticipant implements Participant
*/
private boolean processMessage(Message message)
{
- int messageCount = _totalNumberOfMessagesReceived.incrementAndGet();
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("message " + messageCount + " received by " + this);
- }
- int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message);
- _allConsumedPayloadSizes.add(messagePayloadSize);
- _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize);
-
+ int messageCount = message == null? _totalNumberOfMessagesReceived.get() : _totalNumberOfMessagesReceived.incrementAndGet() ;
boolean batchEnabled = _command.getBatchSize() > 0;
boolean batchComplete = batchEnabled && messageCount % _command.getBatchSize() == 0;
-
- if (!batchEnabled || batchComplete)
+ if (message != null)
{
- if (LOGGER.isTraceEnabled() && batchEnabled)
+ if (LOGGER.isTraceEnabled())
{
- LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
+ LOGGER.trace("message " + messageCount + " received by " + this);
+ }
+ int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message);
+ _allConsumedPayloadSizes.add(messagePayloadSize);
+ _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize);
+
+ if (_command.isEvaluateLatency())
+ {
+ long mesageTimestamp;
+ try
+ {
+ mesageTimestamp = message.getJMSTimestamp();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Cannot get message timestamp!", e);
+ }
+ long latency = System.currentTimeMillis() - mesageTimestamp;
+ _messageLatencies.add(latency);
+ }
+
+ if (!batchEnabled || batchComplete)
+ {
+ if (LOGGER.isTraceEnabled() && batchEnabled)
+ {
+ LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
+ }
+ _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
}
- _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
}
boolean reachedExpectedNumberOfMessages = _command.getNumberOfMessages() > 0 && messageCount >= _command.getNumberOfMessages();
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java
index 2dcf8940b6..6af1e1316a 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java
@@ -169,8 +169,14 @@ public class MessageProvider
protected Message createTextMessage(Session ssn, final CreateProducerCommand command) throws JMSException
{
String payload = getMessagePayload(command);
- TextMessage msg = ssn.createTextMessage();
+
+ TextMessage msg = null;
+ synchronized(ssn)
+ {
+ msg = ssn.createTextMessage();
+ }
msg.setText(payload);
+
return msg;
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java
index d5e307d50f..bb9ce26f7e 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java
@@ -56,7 +56,7 @@ public class ParticipantExecutor
{
_client = client;
- LOGGER.info("Starting test participant in background thread: " + this);
+ LOGGER.debug("Starting test participant in background thread: " + this);
_executor.execute(new ParticipantRunnable());
}
@@ -94,9 +94,9 @@ public class ParticipantExecutor
ParticipantResult result = null;
try
{
- if (LOGGER.isInfoEnabled())
+ if (LOGGER.isDebugEnabled())
{
- LOGGER.info("About to run participant " + _participant);
+ LOGGER.debug("About to run participant " + _participant);
}
result = _participant.doIt(_client.getClientName());
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java
index 7f6b96b87c..50c0a74ccd 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.disttest.client;
+import java.util.Collection;
import java.util.Date;
import org.apache.qpid.disttest.message.ConsumerParticipantResult;
@@ -26,12 +27,24 @@ import org.apache.qpid.disttest.message.CreateParticpantCommand;
import org.apache.qpid.disttest.message.CreateProducerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.apache.qpid.disttest.message.ProducerParticipantResult;
+import org.apache.qpid.disttest.results.aggregation.SeriesStatistics;
public class ParticipantResultFactory
{
- public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName, CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize, long totalPayloadReceived, Date start, Date end)
+ public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName,
+ CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize,
+ long totalPayloadReceived, Date start, Date end)
+ {
+ return createForConsumer(participantName, clientRegisteredName, command, acknowledgeMode, numberOfMessagesReceived,
+ payloadSize, totalPayloadReceived, start, end, null);
+ }
+
+ public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName,
+ CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize,
+ long totalPayloadReceived, Date start, Date end, Collection<Long> messageLatencies)
{
ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult();
+ consumerParticipantResult.setMessageLatencies(messageLatencies);
setTestProperties(consumerParticipantResult, command, participantName, clientRegisteredName, acknowledgeMode);
setTestResultProperties(consumerParticipantResult, numberOfMessagesReceived, payloadSize, totalPayloadReceived, start, end);
@@ -45,6 +58,11 @@ public class ParticipantResultFactory
consumerParticipantResult.setTotalNumberOfConsumers(1);
consumerParticipantResult.setTotalNumberOfProducers(0);
+ SeriesStatistics statistics = new SeriesStatistics(messageLatencies);
+ consumerParticipantResult.setAverageLatency(statistics.getAverage());
+ consumerParticipantResult.setMinLatency(statistics.getMinimum());
+ consumerParticipantResult.setMaxLatency(statistics.getMaximum());
+ consumerParticipantResult.setLatencyStandardDeviation(statistics.getStandardDeviation());
return consumerParticipantResult;
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
index bcad81b4aa..63cbe98b5c 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
@@ -22,10 +22,14 @@ package org.apache.qpid.disttest.client;
import java.util.Date;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import javax.jms.Message;
import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.client.utils.ExecutorWithLimits;
+import org.apache.qpid.disttest.client.utils.ExecutorWithLimitsFactory;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
import org.apache.qpid.disttest.message.CreateProducerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
@@ -40,7 +44,9 @@ public class ProducerParticipant implements Participant
private final CreateProducerCommand _command;
- private ParticipantResultFactory _resultFactory;
+ private final ParticipantResultFactory _resultFactory;
+
+ private ExecutorWithLimits _limiter;
public ProducerParticipant(final ClientJmsDelegate jmsDelegate, final CreateProducerCommand command)
{
@@ -59,10 +65,10 @@ public class ProducerParticipant implements Participant
int acknowledgeMode = _jmsDelegate.getAcknowledgeMode(_command.getSessionName());
- long expectedDuration = _command.getMaximumDuration() - _command.getStartDelay();
-
doSleepForStartDelay();
+ final long requiredDuration = _command.getMaximumDuration() - _command.getStartDelay();
+
final long startTime = System.currentTimeMillis();
Message lastPublishedMessage = null;
@@ -70,11 +76,30 @@ public class ProducerParticipant implements Participant
long totalPayloadSizeOfAllMessagesSent = 0;
NavigableSet<Integer> allProducedPayloadSizes = new TreeSet<Integer>();
+ _limiter = ExecutorWithLimitsFactory.createExecutorWithLimit(startTime, requiredDuration);
+
+ LOGGER.info("Producer {} about to send messages", getName());
+
while (true)
{
- numberOfMessagesSent++;
+ try
+ {
+ lastPublishedMessage = _limiter.execute(new Callable<Message>()
+ {
+ @Override
+ public Message call() throws Exception
+ {
+ return _jmsDelegate.sendNextMessage(_command);
+ }
+ });
+ }
+ catch (CancellationException ce)
+ {
+ LOGGER.debug("Producer send was cancelled due to maximum duration {} ms", requiredDuration);
+ break;
+ }
- lastPublishedMessage = _jmsDelegate.sendNextMessage(_command);
+ numberOfMessagesSent++;
int lastPayloadSize = _jmsDelegate.calculatePayloadSizeFrom(lastPublishedMessage);
totalPayloadSizeOfAllMessagesSent += lastPayloadSize;
@@ -96,15 +121,11 @@ public class ProducerParticipant implements Participant
}
_jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName());
- if (_command.getInterval() > 0)
- {
- // sleep for given time
- Thread.sleep(_command.getInterval());
- }
+ doSleepForInterval();
}
if (_command.getNumberOfMessages() > 0 && numberOfMessagesSent >= _command.getNumberOfMessages()
- || expectedDuration > 0 && System.currentTimeMillis() - startTime >= expectedDuration)
+ || requiredDuration > 0 && System.currentTimeMillis() - startTime >= requiredDuration)
{
break;
}
@@ -140,23 +161,43 @@ public class ProducerParticipant implements Participant
private void doSleepForStartDelay()
{
- if (_command.getStartDelay() > 0)
+ long sleepTime = _command.getStartDelay();
+ if (sleepTime > 0)
{
+ LOGGER.debug("{} sleeping for {} milliseconds before starting", getName(), sleepTime);
// start delay is specified. Sleeping...
- try
- {
- Thread.sleep(_command.getStartDelay());
- }
- catch (final InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ doSleep(sleepTime);
+ }
+ }
+
+ private void doSleepForInterval() throws InterruptedException
+ {
+ long sleepTime = _command.getInterval();
+ if (sleepTime > 0)
+ {
+ doSleep(sleepTime);
+ }
+ }
+
+ private void doSleep(long sleepTime)
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ }
+ catch (final InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
}
}
@Override
public void releaseResources()
{
+ if (_limiter != null)
+ {
+ _limiter.shutdown();
+ }
_jmsDelegate.closeTestProducer(_command.getParticipantName());
}
@@ -171,4 +212,5 @@ public class ProducerParticipant implements Participant
{
return "ProducerParticipant [command=" + _command + "]";
}
+
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java
new file mode 100644
index 0000000000..f64107c125
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimits.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+/**
+ * Implementations of this interface execute a {@link Callable} but place some
+ * kind of limit on that execution, such as time.
+ */
+public interface ExecutorWithLimits
+{
+ <T> T execute(Callable<T> callback) throws CancellationException, Exception;
+
+ void shutdown();
+
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java
new file mode 100644
index 0000000000..4d17d76568
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithLimitsFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import java.util.concurrent.Callable;
+
+public class ExecutorWithLimitsFactory
+{
+ /**
+ * Creates an {@link ExecutorWithLimits} that will permit the execution of {@link Callable} implementations until
+ * until <code>allowedTimeInMillis</code> milliseconds have elapsed beyond <code>startTime</code>.
+ * If <code>allowedTimeInMillis</code> is less than or equal to zero, a {@link ExecutorWithNoLimits}
+ * is created that enforces no time-limit.
+ *
+ * @param startTime start time (milliseconds)
+ * @param allowedTimeInMillis allowed time (milliseconds)
+ *
+ * @return ExecutionLimiter implementation
+ */
+ public static ExecutorWithLimits createExecutorWithLimit(long startTime, long allowedTimeInMillis)
+ {
+ if (allowedTimeInMillis > 0)
+ {
+ return new ExecutorWithTimeLimit(startTime, allowedTimeInMillis);
+ }
+ else
+ {
+ return new ExecutorWithNoLimits();
+ }
+ }
+
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java
new file mode 100644
index 0000000000..f729a72fa5
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimits.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Executes a {@link Callable} without any limits.
+ */
+public class ExecutorWithNoLimits implements ExecutorWithLimits
+{
+
+ @Override
+ public <T> T execute(Callable<T> _callback) throws Exception
+ {
+ return _callback.call();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ // Deliberately blank
+ }
+
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java
new file mode 100644
index 0000000000..4fa3960d92
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimit.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Executes a {@link Callable} but limits the execution time. If the execution
+ * time is exceeded the callable will be cancelled.
+ */
+public class ExecutorWithTimeLimit implements ExecutorWithLimits
+{
+ private final long _endTime;
+ private final ExecutorService _singleThreadExecutor = Executors.newSingleThreadExecutor();
+
+ public ExecutorWithTimeLimit(long startTime, long allowedTimeInMillis)
+ {
+ _endTime = startTime + allowedTimeInMillis;
+ }
+
+ @Override
+ public <T> T execute(Callable<T> callback) throws CancellationException, Exception
+ {
+ final long timeRemaining = _endTime - System.currentTimeMillis();
+ if (timeRemaining <= 0)
+ {
+ throw new CancellationException("Too little time remains to schedule callable");
+ }
+
+ List<Future<T>> l = _singleThreadExecutor.invokeAll(Collections.singletonList(callback), timeRemaining, TimeUnit.MILLISECONDS);
+ return l.get(0).get();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ _singleThreadExecutor.shutdown();
+ }
+
+
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java
index 7c935065f0..513e633566 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java
@@ -92,15 +92,18 @@ public class Controller
}
}
- private void awaitLatch(CountDownLatch latch, long timeout, String messageWithOneDecimalPlaceholder)
+ private void awaitStopResponses(CountDownLatch latch, long timeout)
{
+ String message = "Timed out after %d waiting for stop command responses. Expecting %d more responses.";
+
try
{
- final boolean countedDownOK = latch.await(timeout, TimeUnit.MILLISECONDS);
+ boolean countedDownOK = latch.await(timeout, TimeUnit.MILLISECONDS);
if (!countedDownOK)
{
- final long latchCount = latch.getCount();
- String formattedMessage = String.format(messageWithOneDecimalPlaceholder, latchCount);
+ long latchCount = latch.getCount();
+ String formattedMessage = String.format(message, timeout, latchCount);
+ LOGGER.error(formattedMessage);
throw new DistributedTestException(formattedMessage);
}
}
@@ -141,7 +144,7 @@ public class Controller
_jmsDelegate.sendCommandToClient(clientName, command);
}
- awaitLatch(_stopClientsResponseLatch, _commandResponseTimeout, "Timed out waiting for stop command responses. Expecting %d more responses.");
+ awaitStopResponses(_stopClientsResponseLatch, _commandResponseTimeout);
LOGGER.info("Stopped all clients");
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java
index 30595269b3..e973f07c12 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java
@@ -104,6 +104,11 @@ public class TestRunner
return _testResult;
}
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Couldn't run test", e);
+ throw e;
+ }
finally
{
_jmsDelegate.removeCommandListener(participantResultListener);
@@ -140,7 +145,6 @@ public class TestRunner
}
Runtime.getRuntime().removeShutdownHook(_removeQueuesShutdownHook);
-
}
}
@@ -176,7 +180,7 @@ public class TestRunner
void awaitCommandResponses()
{
- awaitLatch(_commandResponseLatch, _commandResponseTimeout, "Timed out waiting for command responses. Expecting %d more responses.");
+ awaitLatch(_commandResponseLatch, _commandResponseTimeout, "Timed out waiting for command responses");
}
@@ -204,7 +208,7 @@ public class TestRunner
{
try
{
- awaitLatch(_testResultsLatch, interval, "Waiting for participant results... Expecting %d more responses.");
+ awaitLatch(_testResultsLatch, interval, "still waiting for participant results");
}
catch (DistributedTestException e)
{
@@ -253,7 +257,7 @@ public class TestRunner
setOriginalTestDetailsOn(result);
_testResult.addParticipantResult(result);
- LOGGER.info("Received result " + result);
+ LOGGER.debug("Received result " + result);
_testResultsLatch.countDown();
checkForResponseError(result);
@@ -276,7 +280,7 @@ public class TestRunner
_jmsDelegate.sendCommandToClient(registeredClientName, command);
}
- private void awaitLatch(CountDownLatch latch, long timeout, String messageWithOneDecimalPlaceholder)
+ private void awaitLatch(CountDownLatch latch, long timeout, String message)
{
try
{
@@ -284,7 +288,8 @@ public class TestRunner
if (!countedDownOK)
{
final long latchCount = latch.getCount();
- String formattedMessage = String.format(messageWithOneDecimalPlaceholder, latchCount);
+ String formattedMessage = "After " + timeout + "ms ... " + message + " ... Expecting " + latchCount + " more responses.";
+ LOGGER.info(formattedMessage); // info rather than error because we time out periodically so we can log progress
throw new DistributedTestException(formattedMessage);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java
index 6288b42eac..bd7d239a90 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java
@@ -22,24 +22,44 @@ package org.apache.qpid.disttest.controller.config;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.Reader;
+import java.io.StringReader;
import org.apache.qpid.disttest.client.property.PropertyValue;
import org.apache.qpid.disttest.json.PropertyValueAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public class ConfigReader
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigReader.class);
public Config getConfigFromFile(String fileName) throws FileNotFoundException
{
- FileReader reader = new FileReader(fileName);
+ Reader reader = getConfigReader(fileName);
Config config = readConfig(reader);
return config;
}
+ protected Reader getConfigReader(String fileName) throws FileNotFoundException
+ {
+ Reader reader = null;
+ if (fileName.endsWith(".js"))
+ {
+ LOGGER.info("Evaluating javascript:" + fileName);
+ reader = new StringReader(new JavaScriptConfigEvaluator().evaluateJavaScript(fileName));
+ }
+ else
+ {
+ LOGGER.info("Loading JSON:" + fileName);
+ reader = new FileReader(fileName);
+ }
+ return reader;
+ }
+
public Config readConfig(Reader reader)
{
Gson gson = new GsonBuilder()
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
index ed47e02667..110de8a4ea 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.disttest.controller.config;
import org.apache.qpid.disttest.message.CreateConsumerCommand;
@@ -10,6 +30,7 @@ public class ConsumerConfig extends ParticipantConfig
private String _selector;
private boolean _noLocal;
private boolean _synchronous;
+ private boolean _evaluateLatency;
// For Gson
public ConsumerConfig()
@@ -58,6 +79,7 @@ public class ConsumerConfig extends ParticipantConfig
createConsumerCommand.setSelector(_selector);
createConsumerCommand.setNoLocal(_noLocal);
createConsumerCommand.setSynchronous(_synchronous);
+ createConsumerCommand.setEvaluateLatency(_evaluateLatency);
return createConsumerCommand;
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluator.java
new file mode 100644
index 0000000000..bbc22af00f
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluator.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.InputStreamReader;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A helper class to load and evaluate JavaScript configuration, producing a JSON string.
+ */
+public class JavaScriptConfigEvaluator
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(JavaScriptConfigEvaluator.class);
+
+ public static final String TEST_CONFIG_VARIABLE_NAME = "jsonObject";
+
+ public String evaluateJavaScript(String fileName) throws FileNotFoundException
+ {
+ ScriptEngineManager mgr = new ScriptEngineManager();
+ ScriptEngine engine = mgr.getEngineByName("JavaScript");
+ try
+ {
+ engine.eval(new InputStreamReader(getClass().getClassLoader().getResourceAsStream("json2.js")));
+ engine.eval(new InputStreamReader(getClass().getClassLoader().getResourceAsStream("test-utils.js")));
+ engine.eval(new FileReader(fileName));
+ engine.eval("jsonString = JSON.stringify(" + TEST_CONFIG_VARIABLE_NAME + ")");
+ }
+ catch (ScriptException e)
+ {
+ throw new DistributedTestException("Exception while evaluating test config", e);
+ }
+ String result = (String) engine.get("jsonString");
+
+ LOGGER.debug("Evaluated javascript file " + fileName + ". Generated the following JSON: " + result);
+ return result;
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java
index 31037a3038..16f7b0d18d 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java
@@ -18,10 +18,20 @@
*/
package org.apache.qpid.disttest.controller.config;
+import org.apache.commons.lang.ObjectUtils;
import org.apache.qpid.disttest.message.CreateParticpantCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class ParticipantConfig
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantConfig.class);
+
+ public static final String DURATION_OVERRIDE_SYSTEM_PROPERTY = "qpid.disttest.duration";
+
+ /** used to ensure we only log about the overridden duration once */
+ private boolean _alreadyLoggedAboutOverriddenDuration;
+
private String _destinationName;
private long _numberOfMessages;
private String _name;
@@ -58,7 +68,34 @@ public abstract class ParticipantConfig
createParticipantCommand.setDestinationName(_destinationName);
createParticipantCommand.setNumberOfMessages(_numberOfMessages);
createParticipantCommand.setBatchSize(_batchSize);
- createParticipantCommand.setMaximumDuration(_maximumDuration);
+
+ Long maximumDuration = (Long)ObjectUtils.defaultIfNull(getOverriddenDuration(), _maximumDuration);
+ createParticipantCommand.setMaximumDuration(maximumDuration);
}
+ private Long getOverriddenDuration()
+ {
+ String overriddenDurationString = System.getProperty(DURATION_OVERRIDE_SYSTEM_PROPERTY);
+ if(overriddenDurationString != null)
+ {
+ try
+ {
+ long overriddenDuration = Long.valueOf(overriddenDurationString);
+
+ if(!_alreadyLoggedAboutOverriddenDuration)
+ {
+ LOGGER.info("Applied overridden maximum duration " + overriddenDuration);
+ _alreadyLoggedAboutOverriddenDuration = true;
+ }
+
+ return overriddenDuration;
+ }
+ catch (NumberFormatException e)
+ {
+ LOGGER.error("Couldn't parse overridden duration " + overriddenDurationString, e);
+ }
+ }
+
+ return null;
+ }
} \ No newline at end of file
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java
index 7806528a8c..f2369ed671 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java
@@ -37,7 +37,7 @@ public class ProducerConfig extends ParticipantConfig
public ProducerConfig()
{
_deliveryMode = Message.DEFAULT_DELIVERY_MODE;
- _messageSize = 0;
+ _messageSize = 1024;
_priority = Message.DEFAULT_PRIORITY;
_timeToLive = Message.DEFAULT_TIME_TO_LIVE;
_interval = 0;
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java
index cffc2b7c50..45a4551cbc 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java
@@ -38,7 +38,6 @@ public class QueueConfig
public QueueConfig(String name, boolean durable, Map<String, Object> attributes)
{
- super();
this._name = name;
this._durable = durable;
this._attributes = attributes;
@@ -49,8 +48,6 @@ public class QueueConfig
return _name;
}
- // TODO x-qpid-capacity and x-qpid-flow-resume-capacity need to be typed as numeric but we currrently
- // pass these as a string.
public Map<String, Object> getAttributes()
{
return _attributes;
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
index d68fc86a0e..3f8afc9a9a 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
@@ -35,6 +35,7 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -72,6 +73,7 @@ public class ClientJmsDelegate
private Map<String, Session> _testSessions;
private Map<String, MessageProducer> _testProducers;
private Map<String, MessageConsumer> _testConsumers;
+ private Map<String, Session> _testSubscriptions;
private Map<String, MessageProvider> _testMessageProviders;
private final MessageProvider _defaultMessageProvider;
@@ -92,6 +94,7 @@ public class ClientJmsDelegate
_testSessions = new HashMap<String, Session>();
_testProducers = new HashMap<String, MessageProducer>();
_testConsumers = new HashMap<String, MessageConsumer>();
+ _testSubscriptions = new HashMap<String, Session>();
_testMessageProviders = new HashMap<String, MessageProvider>();
_defaultMessageProvider = new MessageProvider(null);
}
@@ -193,7 +196,7 @@ public class ClientJmsDelegate
final boolean transacted = command.getAcknowledgeMode() == Session.SESSION_TRANSACTED;
final Session newSession = connection.createSession(transacted, command.getAcknowledgeMode());
- LOGGER.info("Created session " + command.getSessionName() + " with transacted = " + newSession.getTransacted() + " and acknowledgeMode = " + newSession.getAcknowledgeMode());
+ LOGGER.debug("Created session " + command.getSessionName() + " with transacted = " + newSession.getTransacted() + " and acknowledgeMode = " + newSession.getAcknowledgeMode());
addSession(command.getSessionName(), newSession);
}
@@ -212,30 +215,35 @@ public class ClientJmsDelegate
{
throw new DistributedTestException("No test session found called: " + command.getSessionName(), command);
}
- final Destination destination = session.createQueue(command.getDestinationName());
- final MessageProducer jmsProducer = session.createProducer(destination);
- if (command.getPriority() != -1)
- {
- jmsProducer.setPriority(command.getPriority());
- }
- if (command.getTimeToLive() > 0)
- {
- jmsProducer.setTimeToLive(command.getTimeToLive());
- }
- if (command.getDeliveryMode() == DeliveryMode.NON_PERSISTENT
- || command.getDeliveryMode() == DeliveryMode.PERSISTENT)
+ synchronized(session)
{
- jmsProducer.setDeliveryMode(command.getDeliveryMode());
- }
+ final Destination destination = session.createQueue(command.getDestinationName());
+
+ final MessageProducer jmsProducer = session.createProducer(destination);
- addProducer(command.getParticipantName(), jmsProducer);
+ if (command.getPriority() != -1)
+ {
+ jmsProducer.setPriority(command.getPriority());
+ }
+ if (command.getTimeToLive() > 0)
+ {
+ jmsProducer.setTimeToLive(command.getTimeToLive());
+ }
+
+ if (command.getDeliveryMode() == DeliveryMode.NON_PERSISTENT
+ || command.getDeliveryMode() == DeliveryMode.PERSISTENT)
+ {
+ jmsProducer.setDeliveryMode(command.getDeliveryMode());
+ }
+
+ addProducer(command.getParticipantName(), jmsProducer);
+ }
}
catch (final JMSException jmse)
{
throw new DistributedTestException("Unable to create new producer: " + command, jmse);
}
-
}
public void createConsumer(final CreateConsumerCommand command)
@@ -247,11 +255,37 @@ public class ClientJmsDelegate
{
throw new DistributedTestException("No test session found called: " + command.getSessionName(), command);
}
- final Destination destination = command.isTopic() ? session.createTopic(command.getDestinationName())
- : session.createQueue(command.getDestinationName());
- final MessageConsumer jmsConsumer = session.createConsumer(destination, command.getSelector());
- _testConsumers.put(command.getParticipantName(), jmsConsumer);
+ synchronized(session)
+ {
+ Destination destination;
+ MessageConsumer jmsConsumer;
+ if(command.isTopic())
+ {
+ Topic topic = session.createTopic(command.getDestinationName());
+ if(command.isDurableSubscription())
+ {
+ String subscription = "subscription-" + command.getParticipantName() + System.currentTimeMillis();
+ jmsConsumer = session.createDurableSubscriber(topic, subscription);
+
+ _testSubscriptions.put(subscription, session);
+ LOGGER.debug("created durable suscription " + subscription + " to topic " + topic);
+ }
+ else
+ {
+ jmsConsumer = session.createConsumer(topic, command.getSelector());
+ }
+
+ destination = topic;
+ }
+ else
+ {
+ destination = session.createQueue(command.getDestinationName());
+ jmsConsumer = session.createConsumer(destination, command.getSelector());
+ }
+
+ _testConsumers.put(command.getParticipantName(), jmsConsumer);
+ }
}
catch (final JMSException jmse)
{
@@ -346,7 +380,10 @@ public class ClientJmsDelegate
final Session session = _testSessions.get(sessionName);
if (session.getTransacted())
{
- session.commit();
+ synchronized(session)
+ {
+ session.commit();
+ }
}
else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
@@ -461,13 +498,16 @@ public class ClientJmsDelegate
try
{
final Session session = _testSessions.get(sessionName);
- if (session.getTransacted())
+ synchronized(session)
{
- session.rollback();
- }
- else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- session.recover();
+ if (session.getTransacted())
+ {
+ session.rollback();
+ }
+ else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ session.recover();
+ }
}
}
catch (final JMSException jmse)
@@ -482,13 +522,16 @@ public class ClientJmsDelegate
try
{
final Session session = _testSessions.get(sessionName);
- if (session.getTransacted())
- {
- session.rollback();
- }
- else
+ synchronized(session)
{
- session.recover();
+ if (session.getTransacted())
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.recover();
+ }
}
}
catch (final JMSException jmse)
@@ -503,38 +546,62 @@ public class ClientJmsDelegate
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("clientName", _clientName).toString();
}
- public void closeTestConnections()
+ public void tearDownTest()
{
StringBuilder jmsErrorMessages = new StringBuilder();
- int failedCloseCounter = 0;
- for (final Map.Entry<String, Connection> entry : _testConnections.entrySet())
+ int failureCounter = 0;
+
+ for(String subscription : _testSubscriptions.keySet())
+ {
+ Session session = _testSubscriptions.get(subscription);
+ try
+ {
+ session.unsubscribe(subscription);
+ }
+ catch (JMSException e)
+ {
+ LOGGER.error("Failed to unsubscribe '" + subscription + "' :" + e.getLocalizedMessage(), e);
+ failureCounter++;
+ appendErrorMessage(jmsErrorMessages, e);
+ }
+ }
+
+ for (Map.Entry<String, Connection> entry : _testConnections.entrySet())
{
- final Connection connection = entry.getValue();
+ Connection connection = entry.getValue();
try
{
connection.close();
}
- catch (final JMSException e)
+ catch (JMSException e)
{
LOGGER.error("Failed to close connection '" + entry.getKey() + "' :" + e.getLocalizedMessage(), e);
- failedCloseCounter++;
- if (jmsErrorMessages.length() > 0)
- {
- jmsErrorMessages.append('\n');
- }
- jmsErrorMessages.append(e.getMessage());
+ failureCounter++;
+ appendErrorMessage(jmsErrorMessages, e);
}
}
+
_testConnections.clear();
+ _testSubscriptions.clear();
_testSessions.clear();
_testProducers.clear();
_testConsumers.clear();
- if (failedCloseCounter > 0)
+
+ if (failureCounter > 0)
{
- throw new DistributedTestException("Close failed for " + failedCloseCounter + " connection(s) with the following errors: " + jmsErrorMessages.toString());
+ throw new DistributedTestException("Tear down test encountered " + failureCounter + " failures with the following errors: " + jmsErrorMessages.toString());
}
}
+ private void appendErrorMessage(StringBuilder errorMessages, JMSException e)
+ {
+ if (errorMessages.length() > 0)
+ {
+ errorMessages.append('\n');
+ }
+ errorMessages.append(e.getMessage());
+ }
+
public void closeTestConsumer(String consumerName)
{
MessageConsumer consumer = _testConsumers.get(consumerName);
@@ -543,7 +610,7 @@ public class ClientJmsDelegate
try
{
consumer.close();
- LOGGER.info("Closed test consumer " + consumerName);
+ LOGGER.debug("Closed test consumer " + consumerName);
}
catch (JMSException e)
{
@@ -568,15 +635,16 @@ public class ClientJmsDelegate
}
}
+ /** only supports text messages - returns 0 for other message types */
public int calculatePayloadSizeFrom(Message message)
{
try
{
if (message != null && message instanceof TextMessage)
{
- return ((TextMessage) message).getText().getBytes().length;
+ return ((TextMessage) message).getText().getBytes().length;
}
- // TODO support other message types
+
return 0;
}
catch (JMSException e)
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
index 69da409be5..c80e641e5c 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
@@ -48,11 +48,13 @@ public class ControllerJmsDelegate
{
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJmsDelegate.class);
+ private static final String QUEUE_CREATOR_CLASS_NAME_SYSTEM_PROPERTY = "qpid.disttest.queue.creator.class";
+
private final Map<String, Destination> _clientNameToQueueMap = new ConcurrentHashMap<String, Destination>();
private final Connection _connection;
private final Destination _controllerQueue;
private final Session _session;
- private final QueueCreator _queueCreator;
+ private QueueCreator _queueCreator;
private List<CommandListener> _commandListeners = new CopyOnWriteArrayList<CommandListener>();
@@ -63,7 +65,39 @@ public class ControllerJmsDelegate
_connection.start();
_controllerQueue = (Destination) context.lookup("controllerqueue");
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _queueCreator = new QpidQueueCreator();
+
+ createVendorSpecificQueueCreator();
+ }
+
+ private void createVendorSpecificQueueCreator()
+ {
+ String queueCreatorClassName = System.getProperty(QUEUE_CREATOR_CLASS_NAME_SYSTEM_PROPERTY);
+ if(queueCreatorClassName == null)
+ {
+ queueCreatorClassName = QpidQueueCreator.class.getName();
+ }
+ else
+ {
+ LOGGER.info("Using overridden queue creator class " + queueCreatorClassName);
+ }
+
+ try
+ {
+ Class<? extends QueueCreator> queueCreatorClass = (Class<? extends QueueCreator>) Class.forName(queueCreatorClassName);
+ _queueCreator = queueCreatorClass.newInstance();
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e);
+ }
}
public void start()
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
new file mode 100644
index 0000000000..4d4850eccf
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.jms;
+
+import java.util.List;
+
+import javax.jms.Session;
+
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+public class NoOpQueueCreator implements QueueCreator
+{
+ @Override
+ public void createQueues(Session session, List<QueueConfig> configs)
+ {
+ }
+
+ @Override
+ public void deleteQueues(Session session, List<QueueConfig> configs)
+ {
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
index 912ce54495..6874abe7d4 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
@@ -21,7 +21,6 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
import javax.jms.Session;
-
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
@@ -29,11 +28,9 @@ import org.apache.qpid.disttest.controller.config.QueueConfig;
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class QpidQueueCreator implements QueueCreator
{
private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
-
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
@Override
@@ -69,7 +66,7 @@ public class QpidQueueCreator implements QueueCreator
EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(),
destination, autoDelete);
- LOGGER.info("Created queue " + queueConfig);
+ LOGGER.debug("Created queue " + queueConfig);
}
catch (Exception e)
{
@@ -86,12 +83,11 @@ public class QpidQueueCreator implements QueueCreator
// use #deleteQueue.
AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName());
session.sendQueueDelete(destination.getAMQQueueName());
- LOGGER.info("Deleted queue " + queueConfig.getName());
+ LOGGER.debug("Deleted queue " + queueConfig.getName());
}
catch (Exception e)
{
throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e);
}
}
-
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
index f92e3ea538..ad9aa31472 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
@@ -18,13 +18,15 @@
*/
package org.apache.qpid.disttest.message;
-import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSIING_SUBSCRIPTION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSING_SUBSCRIPTION;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_DURABLE_SUBSCRIPTION;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_NO_LOCAL;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SELECTOR;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SYNCHRONOUS_CONSUMER;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_TOPIC;
+import java.util.Collection;
+
public class ConsumerParticipantResult extends ParticipantResult
{
private boolean _topic;
@@ -34,6 +36,12 @@ public class ConsumerParticipantResult extends ParticipantResult
private boolean _noLocal;
private boolean _synchronousConsumer;
+ private Collection<Long> _messageLatencies;
+ private long _minLatency;
+ private long _maxLatency;
+ private double _averageLatency;
+ private double _latencyStandardDeviation;
+
public ConsumerParticipantResult()
{
super(CommandType.CONSUMER_PARTICIPANT_RESULT);
@@ -57,7 +65,7 @@ public class ConsumerParticipantResult extends ParticipantResult
}
- @OutputAttribute(attribute=IS_BROWSIING_SUBSCRIPTION)
+ @OutputAttribute(attribute=IS_BROWSING_SUBSCRIPTION)
public boolean isBrowsingSubscription()
{
return _browsingSubscription;
@@ -115,4 +123,59 @@ public class ConsumerParticipantResult extends ParticipantResult
{
return _topic;
}
+
+ public Collection<Long> getMessageLatencies()
+ {
+ return _messageLatencies;
+ }
+
+ public void setMessageLatencies(Collection<Long> messageLatencies)
+ {
+ _messageLatencies = messageLatencies;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.MIN_LATENCY)
+ public long getMinLatency()
+ {
+ return _minLatency;
+ }
+
+ public void setMinLatency(long minLatency)
+ {
+ _minLatency = minLatency;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.MAX_LATENCY)
+ public long getMaxLatency()
+ {
+ return _maxLatency;
+ }
+
+ public void setMaxLatency(long maxLatency)
+ {
+ _maxLatency = maxLatency;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.AVERAGE_LATENCY)
+ public double getAverageLatency()
+ {
+ return _averageLatency;
+ }
+
+ public void setAverageLatency(double averageLatency)
+ {
+ _averageLatency = averageLatency;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.LATENCY_STANDARD_DEVIATION)
+ public double getLatencyStandardDeviation()
+ {
+ return _latencyStandardDeviation;
+ }
+
+ public void setLatencyStandardDeviation(double latencyStandardDeviation)
+ {
+ _latencyStandardDeviation = latencyStandardDeviation;
+ }
+
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
index 678e428f94..68c21fbf83 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
@@ -28,7 +28,7 @@ public class CreateConsumerCommand extends CreateParticpantCommand
private boolean _noLocal;
private boolean _synchronous;
private long _receiveTimeout = 5000;
-
+ private boolean _evaluateLatency;
public CreateConsumerCommand()
{
@@ -105,4 +105,14 @@ public class CreateConsumerCommand extends CreateParticpantCommand
{
return _receiveTimeout;
}
+
+ public boolean isEvaluateLatency()
+ {
+ return _evaluateLatency;
+ }
+
+ public void setEvaluateLatency(boolean evaluateLatency)
+ {
+ _evaluateLatency = evaluateLatency;
+ }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
index ccc7c0d9fb..0418562a2d 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
@@ -45,7 +45,7 @@ public enum ParticipantAttribute
PRODUCER_INTERVAL("producerIntervalMs"),
IS_TOPIC("isTopic"),
IS_DURABLE_SUBSCRIPTION("isDurableSubscription"),
- IS_BROWSIING_SUBSCRIPTION("isBrowsingSubscription"),
+ IS_BROWSING_SUBSCRIPTION("isBrowsingSubscription"),
IS_SELECTOR("isSelector"),
IS_NO_LOCAL("isNoLocal"),
IS_SYNCHRONOUS_CONSUMER("isSynchronousConsumer"),
@@ -54,7 +54,12 @@ public enum ParticipantAttribute
TOTAL_PAYLOAD_PROCESSED("totalPayloadProcessedB"),
THROUGHPUT("throughputKbPerS"),
TIME_TAKEN("timeTakenMs"),
- ERROR_MESSAGE("errorMessage");
+ ERROR_MESSAGE("errorMessage"),
+ MIN_LATENCY("minLatency"),
+ MAX_LATENCY("maxLatency"),
+ AVERAGE_LATENCY("averageLatency"),
+ LATENCY_STANDARD_DEVIATION("latencyStandardDeviation")
+ ;
private String _displayName;
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
index 207d0131eb..4dcabe6c7b 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
@@ -23,7 +23,9 @@ import java.util.Date;
import java.util.NavigableSet;
import java.util.TreeSet;
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.ProducerParticipantResult;
public class ParticipantResultAggregator
{
@@ -42,8 +44,13 @@ public class ParticipantResultAggregator
private NavigableSet<Integer> _encounteredIterationNumbers = new TreeSet<Integer>();
private NavigableSet<Integer> _encounteredBatchSizes = new TreeSet<Integer>();
private NavigableSet<Integer> _encounteredAcknowledgeMode = new TreeSet<Integer>();
+ private NavigableSet<Integer> _encounteredDeliveryModes = new TreeSet<Integer>();
+ private NavigableSet<Boolean> _encounteredDurableSubscriptions = new TreeSet<Boolean>();
+ private NavigableSet<Boolean> _encounteredTopics = new TreeSet<Boolean>();
private NavigableSet<String> _encountedTestNames = new TreeSet<String>();
+ private SeriesStatistics _latencyStatistics = new SeriesStatistics();
+
public ParticipantResultAggregator(Class<? extends ParticipantResult> targetClass, String aggregateResultName)
{
_aggregatedResultName = aggregateResultName;
@@ -56,12 +63,31 @@ public class ParticipantResultAggregator
{
rollupConstantAttributes(result);
computeVariableAttributes(result);
+ if (result instanceof ConsumerParticipantResult)
+ {
+ ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult)result;
+ _latencyStatistics.addMessageLatencies(consumerParticipantResult.getMessageLatencies());
+ _latencyStatistics.aggregate();
+ }
}
}
public ParticipantResult getAggregatedResult()
{
- ParticipantResult aggregatedResult = new ParticipantResult(_aggregatedResultName);
+ ParticipantResult aggregatedResult;
+ if (_targetClass == ConsumerParticipantResult.class)
+ {
+ ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult(_aggregatedResultName);
+ consumerParticipantResult.setAverageLatency(_latencyStatistics.getAverage());
+ consumerParticipantResult.setMinLatency(_latencyStatistics.getMinimum());
+ consumerParticipantResult.setMaxLatency(_latencyStatistics.getMaximum());
+ consumerParticipantResult.setLatencyStandardDeviation(_latencyStatistics.getStandardDeviation());
+ aggregatedResult = consumerParticipantResult;
+ }
+ else
+ {
+ aggregatedResult = new ParticipantResult(_aggregatedResultName);
+ }
setRolledUpConstantAttributes(aggregatedResult);
setComputedVariableAttributes(aggregatedResult);
@@ -94,6 +120,17 @@ public class ParticipantResultAggregator
_encounteredIterationNumbers.add(result.getIterationNumber());
_encounteredBatchSizes.add(result.getBatchSize());
_encounteredAcknowledgeMode.add(result.getAcknowledgeMode());
+ if (result instanceof ProducerParticipantResult)
+ {
+ ProducerParticipantResult producerParticipantResult = (ProducerParticipantResult) result;
+ _encounteredDeliveryModes.add(producerParticipantResult.getDeliveryMode());
+ }
+ else if(result instanceof ConsumerParticipantResult)
+ {
+ ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult)result;
+ _encounteredDurableSubscriptions.add(consumerParticipantResult.isDurableSubscription());
+ _encounteredTopics.add(consumerParticipantResult.isTopic());
+ }
}
private void setComputedVariableAttributes(ParticipantResult aggregatedResult)
@@ -129,6 +166,26 @@ public class ParticipantResultAggregator
{
aggregatedResult.setAcknowledgeMode(_encounteredAcknowledgeMode.first());
}
+ if (aggregatedResult instanceof ProducerParticipantResult)
+ {
+ ProducerParticipantResult producerParticipantResult = (ProducerParticipantResult) aggregatedResult;
+ if(_encounteredDeliveryModes.size() == 1)
+ {
+ producerParticipantResult.setDeliveryMode(_encounteredDeliveryModes.first());
+ }
+ }
+ if (aggregatedResult instanceof ConsumerParticipantResult)
+ {
+ ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult) aggregatedResult;
+ if(_encounteredDurableSubscriptions.size() == 1)
+ {
+ consumerParticipantResult.setDurableSubscription(_encounteredDurableSubscriptions.first());
+ }
+ if(_encounteredTopics.size() == 1)
+ {
+ consumerParticipantResult.setTopic(_encounteredTopics.first());
+ }
+ }
}
private double calculateThroughputInKiloBytesPerSecond()
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java
new file mode 100644
index 0000000000..b93c210473
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class SeriesStatistics
+{
+ private long _minValue;
+ private long _maxValue;
+ private double _mean;
+ private double _standardDeviation;
+ private Collection<Long> _series = new CopyOnWriteArrayList<Long>();
+
+ public SeriesStatistics()
+ {
+ super();
+ }
+
+ public SeriesStatistics(Collection<Long> messageLatencies)
+ {
+ setMessageLatencies(messageLatencies);
+ }
+
+ public void addMessageLatencies(Collection<Long> messageLatencies)
+ {
+ if (messageLatencies != null)
+ {
+ _series.addAll(messageLatencies);
+ }
+ }
+
+ public void setMessageLatencies(Collection<Long> messageLatencies)
+ {
+ _series = messageLatencies;
+ aggregate();
+ }
+
+ public void aggregate()
+ {
+ if (_series != null && _series.size() > 0)
+ {
+ long minLatency = Long.MAX_VALUE;
+ long maxLatency = Long.MIN_VALUE;
+ long totalLatency = 0;
+ for (Long latency : _series)
+ {
+ totalLatency += latency;
+ minLatency = Math.min(minLatency, latency);
+ maxLatency = Math.max(maxLatency, latency);
+ }
+ _mean = ((double) totalLatency) / (double) _series.size();
+ _minValue = minLatency;
+ _maxValue = maxLatency;
+ double sum = 0;
+ for (Long latency : _series)
+ {
+ double diff = latency - _mean;
+ sum += diff * diff;
+ }
+ long size = _series.size() == 1 ? 1: _series.size() - 1;
+ _standardDeviation = Math.sqrt(sum / (double) size);
+ }
+ else
+ {
+ _mean = 0;
+ _minValue = 0;
+ _maxValue = 0;
+ _standardDeviation = 0;
+ }
+ }
+
+ public long getMinimum()
+ {
+ return _minValue;
+ }
+
+ public long getMaximum()
+ {
+ return _maxValue;
+ }
+
+ public double getAverage()
+ {
+ return _mean;
+ }
+
+ public double getStandardDeviation()
+ {
+ return _standardDeviation;
+ }
+}
diff --git a/java/perftests/src/main/java/test-utils.js b/java/perftests/src/main/java/test-utils.js
new file mode 100644
index 0000000000..7bfe233266
--- /dev/null
+++ b/java/perftests/src/main/java/test-utils.js
@@ -0,0 +1,85 @@
+var QPID;
+if (!QPID) {
+ QPID = {};
+}
+(function () {
+ 'use strict';
+
+ if (typeof QPID.times !== 'function') {
+ QPID.times = function (multiplicity, template, timeIndexName)
+ {
+ var retVal = new Array();
+ for (var i = 0; i < multiplicity; i++)
+ {
+ var templateName = template._name;
+ var teamplateAsString = JSON.stringify(template);
+ if (timeIndexName)
+ {
+ teamplateAsString = teamplateAsString.replace(new RegExp(timeIndexName, "g"), i);
+ }
+ var expandedObject = JSON.parse(teamplateAsString);
+ if (!(timeIndexName))
+ {
+ expandedObject._name = templateName + "_" + i;
+ }
+ retVal[i] = expandedObject;
+ }
+ return retVal;
+ }
+ }
+
+ if (typeof QPID.iterations !== 'function') {
+ QPID.iterations = function (values, template)
+ {
+ var retVal = new Array()
+
+ var iterationNumber = 0;
+
+ for (variableName in values)
+ {
+ var variableValues = values[variableName]
+ for (i in variableValues)
+ {
+ var variableValue = variableValues[i]
+ var templateTestString = JSON.stringify(template)
+ var actualString = templateTestString.replace(new RegExp(variableName, "g"), variableValue)
+ var iteration = JSON.parse(actualString)
+ iteration._iterationNumber = iterationNumber
+ retVal[iterationNumber] = iteration
+ iterationNumber++
+ }
+ }
+
+ return retVal
+ }
+ }
+
+ if (typeof QPID.transform !== 'function') {
+
+ /**
+ * Function to transform JSON using specified transformation function.
+ * Any number of transformation function could be passed after the template argument.
+ * Each function should return a transformed JSON object.
+ * Example
+ * var json = transform({"name": "Test1"}, function(json){json.name="Test"; return json;});
+ */
+ QPID.transform = function (template)
+ {
+ var json = template;
+ for (var i=1, len=arguments.length; i<len; i++)
+ {
+ json = arguments[i](json);
+ }
+ return json;
+ }
+ }
+
+ if (typeof QPID.cloneJSON !== 'function') {
+ QPID.cloneJSON = function (json)
+ {
+ return JSON.parse( JSON.stringify( json ));
+ }
+ }
+
+}());
+
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileHelperTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileHelperTest.java
new file mode 100644
index 0000000000..a10b3b359e
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileHelperTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+
+public class ConfigFileHelperTest extends QpidTestCase
+{
+ private File _testDir;
+ private ConfigFileHelper _configFileHelper = new ConfigFileHelper();
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _testDir = TestFileUtils.createTestDirectory();
+ }
+
+ public void testGenerateOutputCsvNameFrom()
+ {
+ String outputDir = "/tmp/outputDir";
+
+ assertEquals("/tmp/outputDir/my.json.file.csv", _configFileHelper.generateOutputCsvNameFrom("/tmp/my.json.file.json", outputDir));
+ assertEquals("/tmp/outputDir/my.js.file.csv", _configFileHelper.generateOutputCsvNameFrom("/tmp/my.js.file.js", outputDir));
+ }
+
+ public void testGetTestConfigFilesForDirectory() throws Exception
+ {
+ String jsFile = createFile("file1.js");
+ String jsonFile = createFile("file2.json");
+ createFile("file.txt");
+ createDir("dir.js");
+
+ String testConfigPath = _testDir.getAbsolutePath();
+
+ List<String> configFiles = _configFileHelper.getTestConfigFiles(testConfigPath);
+
+ Set<String> expectedFiles = new HashSet<String>(Arrays.asList(jsFile, jsonFile));
+ Set<String> actualFiles = new HashSet<String>(configFiles);
+
+ assertEquals(expectedFiles, actualFiles);
+ }
+
+ private void createDir(String dirName)
+ {
+ File dir = new File(_testDir, dirName);
+ dir.mkdir();
+ }
+
+ private String createFile(String fileName) throws IOException
+ {
+ File file = new File(_testDir, fileName);
+ file.createNewFile();
+ return file.getAbsolutePath();
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileHelper.java b/java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileTestHelper.java
index 12ba3b56ad..71cd61db82 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileHelper.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/ConfigFileTestHelper.java
@@ -26,7 +26,7 @@ import java.io.Reader;
import org.apache.qpid.disttest.controller.config.Config;
import org.apache.qpid.disttest.controller.config.ConfigReader;
-public class ConfigFileHelper
+public class ConfigFileTestHelper
{
public static Reader getConfigFileReader(Class<?> testClass, String resourceName)
{
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java
index 198baa6ef4..dd50766918 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ClientTest.java
@@ -125,7 +125,7 @@ public class ClientTest extends TestCase
_client.tearDownTest();
- verify(_delegate).closeTestConnections();
+ verify(_delegate).tearDownTest();
verify(_participantRegistry).clear();
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
index ff7cfd2b41..58589d36f4 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
@@ -29,6 +29,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Collection;
+
import javax.jms.Message;
import javax.jms.Session;
@@ -36,6 +38,7 @@ import junit.framework.TestCase;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.CreateConsumerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.mockito.InOrder;
@@ -177,4 +180,24 @@ public class ConsumerParticipantTest extends TestCase
verify(_delegate).closeTestConsumer(PARTICIPANT_NAME1);
}
+ public void testLatency() throws Exception
+ {
+ int numberOfMessages = 1;
+ long totalPayloadSize = PAYLOAD_SIZE_PER_MESSAGE * numberOfMessages;
+ _command.setNumberOfMessages(numberOfMessages);
+ _command.setEvaluateLatency(true);
+ _consumerParticipant = new ConsumerParticipant(_delegate, _command);
+ ParticipantResult result = _consumerParticipant.doIt(CLIENT_NAME);
+
+ assertExpectedConsumerResults(result, PARTICIPANT_NAME1, CLIENT_NAME, _testStartTime,
+ Session.CLIENT_ACKNOWLEDGE, null, numberOfMessages, PAYLOAD_SIZE_PER_MESSAGE, totalPayloadSize, null);
+
+ _inOrder.verify(_delegate).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT);
+ _inOrder.verify(_delegate).calculatePayloadSizeFrom(_mockMessage);
+ _inOrder.verify(_delegate).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ assertTrue("Unexpected consuemr results", result instanceof ConsumerParticipantResult);
+ Collection<Long> latencies = ((ConsumerParticipantResult)result).getMessageLatencies();
+ assertNotNull("Message latency is not cllected", latencies);
+ assertEquals("Unexpected message latency results", 1, latencies.size());
+ }
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/MessageProviderTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/MessageProviderTest.java
index ffc3733eb7..1ff8d3e5d7 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/MessageProviderTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/MessageProviderTest.java
@@ -59,6 +59,7 @@ public class MessageProviderTest extends TestCase
{
MessageProvider messageProvider = new MessageProvider(null)
{
+ @Override
public String getMessagePayload(CreateProducerCommand command)
{
return super.getMessagePayload(command);
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java
index cf05623e8f..a3ac11b756 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java
@@ -121,7 +121,7 @@ public class ProducerParticipantTest extends TestCase
_command.setBatchSize(batchSize);
_command.setDeliveryMode(deliveryMode);
- ParticipantResult result = (ParticipantResult) _producer.doIt(CLIENT_NAME);
+ ParticipantResult result = _producer.doIt(CLIENT_NAME);
assertExpectedProducerResults(result, PARTICIPANT_NAME1, CLIENT_NAME, _testStartTime,
Session.AUTO_ACKNOWLEDGE, null, numberOfMessages, PAYLOAD_SIZE_PER_MESSAGE, totalPayloadSize, null);
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/ListPropertyValueTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/ListPropertyValueTest.java
index 75a634ba54..c54355bc76 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/ListPropertyValueTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/ListPropertyValueTest.java
@@ -32,6 +32,7 @@ public class ListPropertyValueTest extends TestCase
private ListPropertyValue _generator;
private List<PropertyValue> _items;
+ @Override
public void setUp() throws Exception
{
super.setUp();
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/PropertyValueFactoryTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/PropertyValueFactoryTest.java
index 2d560163c2..17397db5b8 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/PropertyValueFactoryTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/PropertyValueFactoryTest.java
@@ -24,6 +24,7 @@ public class PropertyValueFactoryTest extends TestCase
{
private PropertyValueFactory _factory;
+ @Override
public void setUp() throws Exception
{
super.setUp();
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RandomPropertyValueTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RandomPropertyValueTest.java
index bd5de3e370..878141895c 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RandomPropertyValueTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RandomPropertyValueTest.java
@@ -26,6 +26,7 @@ public class RandomPropertyValueTest extends TestCase
{
private RandomPropertyValue _generator;
+ @Override
public void setUp() throws Exception
{
super.setUp();
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RangePropertyValueTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RangePropertyValueTest.java
index 91791c9d55..6932919bed 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RangePropertyValueTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/property/RangePropertyValueTest.java
@@ -26,6 +26,7 @@ public class RangePropertyValueTest extends TestCase
{
private RangePropertyValue _generator;
+ @Override
public void setUp() throws Exception
{
super.setUp();
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java
new file mode 100644
index 0000000000..37820d2582
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithNoLimitsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.Callable;
+
+import junit.framework.TestCase;
+
+public class ExecutorWithNoLimitsTest extends TestCase
+{
+ private final static Object RESULT = new Object();
+
+ private ExecutorWithLimits _limiter = new ExecutorWithNoLimits();
+ @SuppressWarnings("unchecked")
+ private Callable<Object> _callback = mock(Callable.class);
+
+ public void testNormalExecution() throws Exception
+ {
+ when(_callback.call()).thenReturn(RESULT);
+ final Object actualResult = _limiter.execute(_callback);
+ verify(_callback).call();
+ assertEquals(RESULT, actualResult);
+ }
+
+ public void testCallableThrowsException() throws Exception
+ {
+ when(_callback.call()).thenThrow(new Exception("mocked exception"));
+
+ try
+ {
+ _limiter.execute(_callback);
+ fail("Exception not thrown");
+ }
+ catch (Exception e)
+ {
+ // PASS
+ }
+ verify(_callback).call();
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java
new file mode 100644
index 0000000000..a201a7bacf
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/utils/ExecutorWithTimeLimitTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client.utils;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+import junit.framework.TestCase;
+
+public class ExecutorWithTimeLimitTest extends TestCase
+{
+ private static final int TIMEOUT = 500;
+ private static final Object RESULT = new Object();
+
+ private ExecutorWithLimits _limiter;
+ @SuppressWarnings("unchecked")
+ private Callable<Object> _callback = mock(Callable.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _limiter = new ExecutorWithTimeLimit(System.currentTimeMillis(), TIMEOUT);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ if (_limiter != null)
+ {
+ _limiter.shutdown();
+ }
+ }
+
+ public void testCallableCompletesNormally() throws Exception
+ {
+ when(_callback.call()).thenReturn(RESULT);
+
+ final Object actualResult = _limiter.execute(_callback);
+
+ verify(_callback).call();
+ assertEquals(RESULT, actualResult);
+ }
+
+ public void testCallableThrowsException() throws Exception
+ {
+ when(_callback.call()).thenThrow(new Exception("mocked exception"));
+
+ try
+ {
+ _limiter.execute(_callback);
+ fail("Exception not thrown");
+ }
+ catch (CancellationException ce)
+ {
+ fail("Wrong exception thrown");
+ }
+ catch (Exception e)
+ {
+ // PASS
+ }
+ verify(_callback).call();
+ }
+
+ public void testCallableNotRunDueToInsufficentTimeRemaining() throws Exception
+ {
+ long now = System.currentTimeMillis();
+ ExecutorWithLimits shortTimeLimiter = new ExecutorWithTimeLimit(now - 100, 100);
+ try
+ {
+ shortTimeLimiter.execute(_callback);
+ fail("Exception not thrown");
+ }
+ catch (CancellationException ca)
+ {
+ // PASS
+ }
+ finally
+ {
+ shortTimeLimiter.shutdown();
+ }
+
+ verify(_callback, never()).call();
+ }
+
+ public void testExecutionInterruptedByTimeout() throws Exception
+ {
+ Callable<Void> oversleepingCallback = new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ Thread.sleep(TIMEOUT * 2);
+ return null;
+ }
+ };
+
+ try
+ {
+ _limiter.execute(oversleepingCallback);
+ fail("Exception not thrown");
+ }
+ catch (CancellationException ca)
+ {
+ // PASS
+ }
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js
new file mode 100644
index 0000000000..07f8bf9d92
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js
@@ -0,0 +1,34 @@
+jsonObject = {
+ "_tests":
+ QPID.iterations( { "__ACK_MODE": [ 0, 1 ] },
+ {
+ // this is a comment - it wouldn't be allowed if this were pure JSON
+
+ "_name": "Test 1",
+ "_queues": [
+ {
+ "_name": "Json-Queue-Name"
+ }
+ ],
+
+ "_clients": QPID.times(2,
+ {
+ "_name": "repeatingClient__CLIENT_INDEX",
+ "_connections": [
+ {
+ "_name": "connection1",
+ "_sessions": [
+ {
+ "_sessionName": "session1",
+ "_acknowledgeMode": "__ACK_MODE",
+ "_consumers": []
+ }
+ ]
+ }
+ ]
+ },
+ "__CLIENT_INDEX"
+ )
+ })
+
+} \ No newline at end of file
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest.java
index af9ec28db0..257f139849 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest.java
@@ -25,10 +25,8 @@ import java.util.Map;
import junit.framework.TestCase;
-import org.apache.qpid.disttest.ConfigFileHelper;
-import org.apache.qpid.disttest.client.MessageProvider;
+import org.apache.qpid.disttest.ConfigFileTestHelper;
import org.apache.qpid.disttest.client.property.PropertyValue;
-import org.apache.qpid.disttest.controller.CommandForClient;
public class ConfigReaderTest extends TestCase
{
@@ -38,7 +36,7 @@ public class ConfigReaderTest extends TestCase
protected void setUp()
{
ConfigReader configReader = new ConfigReader();
- Reader reader = ConfigFileHelper.getConfigFileReader(getClass(), "sampleConfig.json");
+ Reader reader = ConfigFileTestHelper.getConfigFileReader(getClass(), "sampleConfig.json");
_config = configReader.readConfig(reader);
}
@@ -110,4 +108,38 @@ public class ConfigReaderTest extends TestCase
assertNotNull("id property is not found", properties.get("id"));
}
+ public void testReadsJS() throws Exception
+ {
+ ConfigReader configReader = new ConfigReader();
+ String path = getClass().getResource("ConfigReaderTest-test-config.js").toURI().getPath();
+ _config = configReader.getConfigFromFile(path);
+ List<TestConfig> testConfigs = _config.getTestConfigs();
+ assertEquals("Unexpected number of tests", 2, testConfigs.size());
+ TestConfig testConfig1 = _config.getTestConfigs().get(0);
+ List<ClientConfig> cleintConfigs = testConfig1.getClients();
+ assertEquals("Unexpected number of test 1 clients", 2, cleintConfigs.size());
+ List<QueueConfig> queueConfigs = testConfig1.getQueues();
+ assertEquals("Unexpected number of test 1 queue", 1, queueConfigs.size());
+ assertEquals("Unexpected queue name", "Json-Queue-Name", queueConfigs.get(0).getName());
+ ClientConfig cleintConfig = cleintConfigs.get(0);
+ List<ConnectionConfig> connectionConfigs = cleintConfig.getConnections();
+ assertEquals("Unexpected number of connections", 1, connectionConfigs.size());
+ List<SessionConfig> sessionConfigs = connectionConfigs.get(0).getSessions();
+ assertEquals("Unexpected number of sessions", 1, sessionConfigs.size());
+ assertEquals("Unexpected ack mode", 0, sessionConfigs.get(0).getAcknowledgeMode());
+
+ TestConfig testConfig2 = _config.getTestConfigs().get(1);
+ List<ClientConfig> cleintConfigs2 = testConfig2.getClients();
+ assertEquals("Unexpected number of test 1 clients", 2, cleintConfigs2.size());
+ List<QueueConfig> queueConfigs2 = testConfig2.getQueues();
+ assertEquals("Unexpected number of test 1 queue", 1, queueConfigs2.size());
+ assertEquals("Unexpected queue name", "Json-Queue-Name", queueConfigs2.get(0).getName());
+ ClientConfig cleintConfig2 = cleintConfigs2.get(0);
+ List<ConnectionConfig> connectionConfigs2 = cleintConfig2.getConnections();
+ assertEquals("Unexpected number of connections", 1, connectionConfigs2.size());
+ List<SessionConfig> sessionConfigs2 = connectionConfigs2.get(0).getSessions();
+ assertEquals("Unexpected number of sessions", 1, sessionConfigs2.size());
+ assertEquals("Unexpected ack mode", 1, sessionConfigs2.get(0).getAcknowledgeMode());
+ }
+
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java
index 7998eae37e..860f6af565 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/IterationValueTest.java
@@ -22,19 +22,19 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-import junit.framework.TestCase;
-
+import org.apache.qpid.disttest.message.CreateConnectionCommand;
import org.apache.qpid.disttest.message.CreateConsumerCommand;
-import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class IterationValueTest extends TestCase
+public class IterationValueTest extends QpidTestCase
{
- private static final int MESSAGE_SIZE = 10;
+ private static final int MAXIMUM_DURATION = 10;
+
+ private static final boolean IS_DURABLE_SUBSCRIPTION = true;
- private CreateProducerCommand _createProducerCommand;
private CreateConsumerCommand _createConsumerCommand;
private Map<String, String> _iterationValueMap;
@@ -42,37 +42,40 @@ public class IterationValueTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _createProducerCommand = mock(CreateProducerCommand.class);
_createConsumerCommand = mock(CreateConsumerCommand.class);
- _iterationValueMap = Collections.singletonMap("_messageSize", String.valueOf(MESSAGE_SIZE));
+ _iterationValueMap = new HashMap<String, String>();
+ _iterationValueMap.put("_maximumDuration", String.valueOf(MAXIMUM_DURATION));
+ _iterationValueMap.put("_durableSubscription", String.valueOf(IS_DURABLE_SUBSCRIPTION));
}
public void testApplyPopulatedIterationValueToCommandWithMatchingProperties() throws Exception
{
IterationValue iterationValue = new IterationValue(_iterationValueMap);
- iterationValue.applyToCommand(_createProducerCommand);
+ iterationValue.applyToCommand(_createConsumerCommand);
- verify(_createProducerCommand).setMessageSize(MESSAGE_SIZE);
+ verify(_createConsumerCommand).setMaximumDuration(MAXIMUM_DURATION);
+ verify(_createConsumerCommand).setDurableSubscription(IS_DURABLE_SUBSCRIPTION);
}
public void testApplyPopulatedIterationValueToCommandWithoutMatchingProperties() throws Exception
{
IterationValue iterationValue = new IterationValue(_iterationValueMap);
- iterationValue.applyToCommand(_createConsumerCommand);
+ CreateConnectionCommand createConnectionCommand = mock(CreateConnectionCommand.class);
+ iterationValue.applyToCommand(createConnectionCommand);
- verifyZeroInteractions(_createConsumerCommand);
+ verifyZeroInteractions(createConnectionCommand);
}
public void testApplyUnpopulatedIterationValueToCommand() throws Exception
{
IterationValue iterationValue = new IterationValue();
- iterationValue.applyToCommand(_createProducerCommand);
+ iterationValue.applyToCommand(_createConsumerCommand);
- verifyZeroInteractions(_createProducerCommand);
+ verifyZeroInteractions(_createConsumerCommand);
}
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js
new file mode 100644
index 0000000000..f64af82feb
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js
@@ -0,0 +1,23 @@
+jsonObject = {
+ "_countries":
+ QPID.iterations( { "__ITERATING_VALUE": [ 0, 1 ] },
+ {
+ // this is a comment - it wouldn't be allowed if this were pure JSON
+
+ "_name": "Country",
+ "_regions": QPID.times(2,
+ {
+ "_name": "repeatingRegion__REGION_INDEX",
+ "_towns": [
+ {
+ "_name": "town1",
+ "_iteratingAttribute": "__ITERATING_VALUE",
+ "_consumers": []
+ }
+ ]
+ },
+ "__REGION_INDEX"
+ )
+ })
+
+} \ No newline at end of file
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest.java
new file mode 100644
index 0000000000..eb4063888b
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import static org.apache.commons.beanutils.PropertyUtils.getProperty;
+
+import java.util.List;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import com.google.gson.Gson;
+
+public class JavaScriptConfigEvaluatorTest extends TestCase
+{
+ public void testEvaluateJavaScript() throws Exception
+ {
+ String jsFilePath = getClass().getResource("JavaScriptConfigEvaluatorTest-test-config.js").toURI().getPath();
+
+ String rawConfig = new JavaScriptConfigEvaluator().evaluateJavaScript(jsFilePath);
+
+ Object configAsObject = getObject(rawConfig);
+
+ // Tests are produced by the QPID.iterations js function
+ assertEquals("Unexpected number of countries", 2, getPropertyAsList(configAsObject, "_countries").size());
+
+ Object country0 = getProperty(configAsObject, "_countries.[0]");
+ assertEquals("Unexpected country name", "Country", getProperty(country0, "_name"));
+ assertEquals("Unexpected country iteration number", 0, getPropertyAsInt(country0, "_iterationNumber"));
+
+ assertEquals("Unexpected number of regions", 2, getPropertyAsList(country0, "_regions").size());
+ // Region names are produced by the QPID.times js function
+ assertEquals("Unexpected region name", "repeatingRegion0", getProperty(country0, "_regions.[0]._name"));
+ assertEquals("Unexpected region name", "repeatingRegion1", getProperty(country0, "_regions.[1]._name"));
+ // Iterating attribute are produced by the QPID.iterations js function
+ assertEquals("Unexpected iterating attribute", "0", getProperty(country0, "_regions.[0]._towns.[0]._iteratingAttribute"));
+
+ Object country1 = getProperty(configAsObject, "_countries.[1]");
+ assertEquals("Unexpected country iteration number", 1, getPropertyAsInt(country1, "_iterationNumber"));
+ assertEquals("Unexpected iterating attribute", "1", getProperty(country1, "_regions.[0]._towns.[0]._iteratingAttribute"));
+ }
+
+ private int getPropertyAsInt(Object configAsObject, String property) throws Exception
+ {
+ Number propertyValue = (Number) getProperty(configAsObject, property);
+
+ return propertyValue.intValue();
+ }
+
+ private List<?> getPropertyAsList(Object configAsObject, String property)
+ throws Exception
+ {
+ return (List<?>)getProperty(configAsObject, property);
+ }
+
+ private Object getObject(String jsonStringIn)
+ {
+ Gson gson = new Gson();
+ @SuppressWarnings("rawtypes")
+ TreeMap object = gson.fromJson(jsonStringIn, TreeMap.class);
+ return object;
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ParticipantConfigTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ParticipantConfigTest.java
new file mode 100644
index 0000000000..f58cc628a4
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ParticipantConfigTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.apache.qpid.disttest.message.CreateParticpantCommand;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class ParticipantConfigTest extends QpidTestCase
+{
+ public void testCreateProducerCommandAppliesDurationOverride()
+ {
+ long overriddenDuration = 123;
+ setTestSystemProperty(ParticipantConfig.DURATION_OVERRIDE_SYSTEM_PROPERTY, String.valueOf(overriddenDuration));
+
+ CreateParticpantCommand createParticipantCommand = mock(CreateParticpantCommand.class);
+ ParticipantConfig participantConfig = new ParticipantConfig("name", "destinationName", 1, 2, 5000)
+ {
+ };
+
+ participantConfig.setParticipantProperties(createParticipantCommand);
+
+ verify(createParticipantCommand).setMaximumDuration(overriddenDuration);
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/message/ParticipantResultTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/message/ParticipantResultTest.java
index 12731c06f4..34727a7b8d 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/message/ParticipantResultTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/message/ParticipantResultTest.java
@@ -22,7 +22,7 @@ import static org.apache.qpid.disttest.message.ParticipantAttribute.*;
import static org.apache.qpid.disttest.message.ParticipantAttribute.CONFIGURED_CLIENT_NAME;
import static org.apache.qpid.disttest.message.ParticipantAttribute.DELIVERY_MODE;
import static org.apache.qpid.disttest.message.ParticipantAttribute.ERROR_MESSAGE;
-import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSIING_SUBSCRIPTION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSING_SUBSCRIPTION;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_DURABLE_SUBSCRIPTION;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_NO_LOCAL;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SELECTOR;
@@ -127,7 +127,7 @@ public class ParticipantResultTest extends TestCase
assertEquals(topic, result.getAttributes().get(IS_TOPIC));
assertEquals(durable, result.getAttributes().get(IS_DURABLE_SUBSCRIPTION));
- assertEquals(browsingSubscription, result.getAttributes().get(IS_BROWSIING_SUBSCRIPTION));
+ assertEquals(browsingSubscription, result.getAttributes().get(IS_BROWSING_SUBSCRIPTION));
assertEquals(selector, result.getAttributes().get(IS_SELECTOR));
assertEquals(noLocal, result.getAttributes().get(IS_NO_LOCAL));
assertEquals(synchronousConsumer, result.getAttributes().get(IS_SYNCHRONOUS_CONSUMER));
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/SeriesStatisticsTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/SeriesStatisticsTest.java
new file mode 100644
index 0000000000..ec8da8418f
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/SeriesStatisticsTest.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import junit.framework.TestCase;
+
+public class SeriesStatisticsTest extends TestCase
+{
+ public static Collection<Long> SERIES = Arrays.asList(new Long[] { 2l, 4l, 4l, 4l, 5l, 5l, 7l, 9l, 5l });
+
+ public void testAggregate()
+ {
+ SeriesStatistics results = new SeriesStatistics();
+ results.addMessageLatencies(SERIES);
+ results.aggregate();
+ assertEquals("Unexpected average", 5.0, results.getAverage(), 0.01);
+ assertEquals("Unexpected min", 2, results.getMinimum());
+ assertEquals("Unexpected max", 9, results.getMaximum());
+ assertEquals("Unexpected standard deviation", 2.0, results.getStandardDeviation(), 0.01);
+ }
+
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java
index a803120cc6..9c00e7cf1c 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java
@@ -19,6 +19,7 @@
package org.apache.qpid.disttest.results.aggregation;
import java.util.Date;
+import java.util.List;
import junit.framework.TestCase;
@@ -26,8 +27,6 @@ import org.apache.qpid.disttest.controller.TestResult;
import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.apache.qpid.disttest.message.ProducerParticipantResult;
-import org.apache.qpid.disttest.results.aggregation.AggregatedTestResult;
-import org.apache.qpid.disttest.results.aggregation.TestResultAggregator;
public class TestResultAggregatorTest extends TestCase
{
@@ -105,6 +104,55 @@ public class TestResultAggregatorTest extends TestCase
assertEquals(TestResultAggregator.AGGREGATED_ERROR_MESSAGE, aggregatedTestResult.getAllParticipantResult().getErrorMessage());
}
+ public void testAggregateResultsForConsumerWithLatencyResults() throws Exception
+ {
+ TestResult originalTestResult = createResultsFromTest();
+ List<ParticipantResult> results = originalTestResult.getParticipantResults();
+ for (ParticipantResult participantResult : results)
+ {
+ if (participantResult instanceof ConsumerParticipantResult)
+ {
+ ((ConsumerParticipantResult)participantResult).setMessageLatencies(SeriesStatisticsTest.SERIES);
+ break;
+ }
+ }
+
+ int numberOfOriginalParticipantResults = originalTestResult.getParticipantResults().size();
+ int expectedNumberOfResults = numberOfOriginalParticipantResults + EXPECTED_NUMBER_OF_AGGREGATED_RESULTS;
+
+ AggregatedTestResult aggregatedTestResult = _aggregator.aggregateTestResult(originalTestResult);
+
+ aggregatedTestResult.getAllConsumerParticipantResult().getTotalPayloadProcessed();
+ assertEquals(expectedNumberOfResults, aggregatedTestResult.getParticipantResults().size());
+
+ assertMinimalAggregatedResults(
+ aggregatedTestResult.getAllConsumerParticipantResult(),
+ TEST1_NAME, TEST1_ITERATION_NUMBER,
+ BATCH_SIZE, NUMBER_OF_MESSAGES_CONSUMED_IN_TOTAL, 2, 0);
+
+ assertLatencyAggregatedResults(aggregatedTestResult.getAllConsumerParticipantResult());
+
+ assertMinimalAggregatedResults(
+ aggregatedTestResult.getAllProducerParticipantResult(),
+ TEST1_NAME, TEST1_ITERATION_NUMBER,
+ BATCH_SIZE, NUMBER_OF_MESSAGES_PRODUCED, 0, 1);
+
+ assertMinimalAggregatedResults(
+ aggregatedTestResult.getAllParticipantResult(),
+ TEST1_NAME, TEST1_ITERATION_NUMBER,
+ BATCH_SIZE, NUMBER_OF_MESSAGES_CONSUMED_IN_TOTAL, 2, 1);
+ }
+
+ private void assertLatencyAggregatedResults(ParticipantResult allConsumerParticipantResult)
+ {
+ assertTrue("Unexpected result", allConsumerParticipantResult instanceof ConsumerParticipantResult);
+ ConsumerParticipantResult results = (ConsumerParticipantResult)allConsumerParticipantResult;
+ assertEquals("Unexpected average", 5.0, results.getAverageLatency(), 0.01);
+ assertEquals("Unexpected min", 2, results.getMinLatency());
+ assertEquals("Unexpected max", 9, results.getMaxLatency());
+ assertEquals("Unexpected standard deviation", 2.0, results.getLatencyStandardDeviation(), 0.01);
+ }
+
private void assertMinimalAggregatedResults(ParticipantResult result, String expectedTestName, int expectedIterationNumber, int expectedBatchSize, long expectedNumberOfMessagesProcessed, int expectedTotalNumberOfConsumers, int expectedTotalNumberOfProducers)
{
assertEquals("Unexpected test name in " + result.getParticipantName(), expectedTestName, result.getTestName());
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java
index 088746d8cd..565f59d25b 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java
@@ -22,7 +22,7 @@ import static org.apache.qpid.disttest.message.ParticipantAttribute.BATCH_SIZE;
import static org.apache.qpid.disttest.message.ParticipantAttribute.CONFIGURED_CLIENT_NAME;
import static org.apache.qpid.disttest.message.ParticipantAttribute.*;
import static org.apache.qpid.disttest.message.ParticipantAttribute.ERROR_MESSAGE;
-import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSIING_SUBSCRIPTION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSING_SUBSCRIPTION;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_DURABLE_SUBSCRIPTION;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_NO_LOCAL;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SELECTOR;
@@ -58,7 +58,6 @@ import org.apache.qpid.disttest.controller.ResultsForAllTests;
import org.apache.qpid.disttest.controller.TestResult;
import org.apache.qpid.disttest.message.ParticipantAttribute;
import org.apache.qpid.disttest.message.ParticipantResult;
-import org.apache.qpid.qmf.QMFProperty.AccessCode;
public class CSVFormaterTest extends TestCase
{
@@ -109,7 +108,7 @@ public class CSVFormaterTest extends TestCase
participantAttributes.put(PRODUCER_INTERVAL, 9);
participantAttributes.put(IS_TOPIC, true);
participantAttributes.put(IS_DURABLE_SUBSCRIPTION, false);
- participantAttributes.put(IS_BROWSIING_SUBSCRIPTION, true);
+ participantAttributes.put(IS_BROWSING_SUBSCRIPTION, true);
participantAttributes.put(IS_SELECTOR, false);
participantAttributes.put(IS_NO_LOCAL, true);
participantAttributes.put(IS_SYNCHRONOUS_CONSUMER, false);
@@ -119,7 +118,10 @@ public class CSVFormaterTest extends TestCase
participantAttributes.put(THROUGHPUT, 2048);
participantAttributes.put(TIME_TAKEN, 1000);
participantAttributes.put(ERROR_MESSAGE, "error");
-
+ participantAttributes.put(MIN_LATENCY, 2l);
+ participantAttributes.put(MAX_LATENCY, 9l);
+ participantAttributes.put(AVERAGE_LATENCY, 5.0f);
+ participantAttributes.put(LATENCY_STANDARD_DEVIATION, 2.0f);
return participantAttributes;
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv b/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv
index cfffb1e549..ada2303d46 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv
@@ -1,2 +1,2 @@
-testName,iterationNumber,clientName,participantName,numberOfMessages,payloadSizeB,priority,timeToLiveMs,acknowledgeMode,deliveryMode,batchSize,maximumDurationMs,producerStartDelayMs,producerIntervalMs,isTopic,isDurableSubscription,isBrowsingSubscription,isSelector,isNoLocal,isSynchronousConsumer,totalNumberOfConsumers,totalNumberOfProducers,totalPayloadProcessedB,throughputKbPerS,timeTakenMs,errorMessage
-TEST1,0,CONFIGURED_CLIENT1,PARTICIPANT,0,1,2,3,4,5,6,7,8,9,true,false,true,false,true,false,1,2,1024,2048,1000,error \ No newline at end of file
+testName,iterationNumber,clientName,participantName,numberOfMessages,payloadSizeB,priority,timeToLiveMs,acknowledgeMode,deliveryMode,batchSize,maximumDurationMs,producerStartDelayMs,producerIntervalMs,isTopic,isDurableSubscription,isBrowsingSubscription,isSelector,isNoLocal,isSynchronousConsumer,totalNumberOfConsumers,totalNumberOfProducers,totalPayloadProcessedB,throughputKbPerS,timeTakenMs,errorMessage,minLatency,maxLatency,averageLatency,latencyStandardDeviation
+TEST1,0,CONFIGURED_CLIENT1,PARTICIPANT,0,1,2,3,4,5,6,7,8,9,true,false,true,false,true,false,1,2,1024,2048,1000,error,2,9,5.0,2.0
diff --git a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java
index 7f0c23eb38..75783eef4b 100644
--- a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java
+++ b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.systest.disttest.clientonly;
import java.util.Map;
@@ -66,7 +86,7 @@ public class ControllerQueue
public <T extends Command> T getNext(boolean assertMessageExists) throws JMSException
{
- final Message message = _controllerQueueMessageConsumer.receive(1000);
+ final Message message = _controllerQueueMessageConsumer.receive(2000);
if(assertMessageExists)
{
Assert.assertNotNull("No message received from control queue", message);
diff --git a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java
index 4a872a7ee2..5b5a60ac43 100644
--- a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java
@@ -32,6 +32,7 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.client.Client;
import org.apache.qpid.disttest.client.ClientState;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
@@ -158,7 +159,7 @@ public class DistributedClientTest extends DistributedTestSystemTestBase
assertState(_client, RUNNING_TEST);
}
- public void testParticipantsSendResults() throws JMSException
+ public void testParticipantsSendResults() throws Exception
{
createTestProducer(TEST_SESSION_NAME, TEST_PRODUCER_NAME, TEST_DESTINATION);
@@ -204,20 +205,21 @@ public class DistributedClientTest extends DistributedTestSystemTestBase
assertState(_client, READY);
}
- private void sendCommandToClient(final Command command) throws JMSException
+ private void sendCommandToClient(final Command command) throws Exception
{
final Message message = JmsMessageAdaptor.commandToMessage(_session, command);
_clientQueueProducer.send(message);
+ ((AMQSession<?, ?>)_session).sync();
}
- private void sendCommandAndValidateResponse(final Command command, boolean shouldSucceed) throws JMSException
+ private void sendCommandAndValidateResponse(final Command command, boolean shouldSucceed) throws Exception
{
sendCommandToClient(command);
Response response = _controllerQueue.getNext();
validateResponse(command.getType(), response, shouldSucceed);
}
- private void sendCommandAndValidateResponse(final Command command) throws JMSException
+ private void sendCommandAndValidateResponse(final Command command) throws Exception
{
sendCommandAndValidateResponse(command, true);
}
@@ -258,7 +260,7 @@ public class DistributedClientTest extends DistributedTestSystemTestBase
createTestSession(connectionName, sessionName, true);
}
- private void createTestProducer(String sessionName, String producerName, String destinationName, boolean shouldSucceed) throws JMSException
+ private void createTestProducer(String sessionName, String producerName, String destinationName, boolean shouldSucceed) throws Exception
{
final CreateProducerCommand createProducerCommand = new CreateProducerCommand();
createProducerCommand.setParticipantName(producerName);
@@ -269,12 +271,12 @@ public class DistributedClientTest extends DistributedTestSystemTestBase
sendCommandAndValidateResponse(createProducerCommand, shouldSucceed);
}
- private void createTestProducer(String sessionName, String producerName, String destinationName) throws JMSException
+ private void createTestProducer(String sessionName, String producerName, String destinationName) throws Exception
{
createTestProducer(sessionName, producerName, destinationName, true);
}
- private void createTestConsumer(String sessionName, String consumerName, String destinationName, boolean shouldSucceed) throws JMSException
+ private void createTestConsumer(String sessionName, String consumerName, String destinationName, boolean shouldSucceed) throws Exception
{
final CreateConsumerCommand createConsumerCommand = new CreateConsumerCommand();
createConsumerCommand.setSessionName(sessionName);
@@ -285,7 +287,7 @@ public class DistributedClientTest extends DistributedTestSystemTestBase
sendCommandAndValidateResponse(createConsumerCommand, shouldSucceed);
}
- private void createTestConsumer(String sessionName, String consumerName, String destinationName) throws JMSException
+ private void createTestConsumer(String sessionName, String consumerName, String destinationName) throws Exception
{
createTestConsumer(sessionName, consumerName, destinationName, true);
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java
index 9fd90d3215..ddb4cb7e51 100644
--- a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java
@@ -23,6 +23,7 @@ import static org.apache.qpid.systest.disttest.SystemTestConstants.COMMAND_RESPO
import static org.apache.qpid.systest.disttest.SystemTestConstants.REGISTRATION_TIMEOUT;
import static org.apache.qpid.systest.disttest.SystemTestConstants.TEST_RESULT_TIMEOUT;
+import java.util.Collection;
import java.util.List;
import javax.jms.Message;
@@ -31,7 +32,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
-import org.apache.qpid.disttest.ConfigFileHelper;
+import org.apache.qpid.disttest.ConfigFileTestHelper;
import org.apache.qpid.disttest.client.Client;
import org.apache.qpid.disttest.client.ClientState;
import org.apache.qpid.disttest.controller.Controller;
@@ -73,6 +74,19 @@ public class ControllerAndClientTest extends DistributedTestSystemTestBase
List<ParticipantResult> test1ParticipantResults = testResult1.getParticipantResults();
assertEquals("Unexpected number of participant results for test 1", 2, test1ParticipantResults.size());
assertParticipantNames(test1ParticipantResults, "participantConsumer1", "participantProducer1");
+ ConsumerParticipantResult result = null;
+ for (ParticipantResult participantResult : test1ParticipantResults)
+ {
+ if (participantResult instanceof ConsumerParticipantResult)
+ {
+ result = (ConsumerParticipantResult)participantResult;
+ break;
+ }
+ }
+ assertNotNull("Consumer results not recived", result);
+ Collection<Long> latencies = result.getMessageLatencies();
+ assertNotNull("Latency results are not collected", latencies);
+ assertEquals("Unexpected latency results", 1, latencies.size());
}
public void testProducerClient() throws Exception
@@ -86,7 +100,7 @@ public class ControllerAndClientTest extends DistributedTestSystemTestBase
// cleaning manually
while(consumer.receive(1000l) != null);
- final Config config = ConfigFileHelper.getConfigFromResource(getClass(), "produceClient.json");
+ final Config config = ConfigFileTestHelper.getConfigFromResource(getClass(), "produceClient.json");
_controller.setConfig(config);
final Client client1 = new Client(new ClientJmsDelegate(_context));
final Thread client1Thread = createBackgroundClientThread(client1);
@@ -151,7 +165,7 @@ public class ControllerAndClientTest extends DistributedTestSystemTestBase
List<ParticipantResult> test1ParticipantResults = testResult.getParticipantResults();
assertEquals("Unexpected number of participant results for test", 2, test1ParticipantResults.size());
- ParticipantResult producer1 = (ParticipantResult) test1ParticipantResults.get(1);
+ ParticipantResult producer1 = test1ParticipantResults.get(1);
assertEquals(expectedMessageSize, producer1.getPayloadSize());
assertEquals(iterationNumber, producer1.getIterationNumber());
@@ -167,7 +181,7 @@ public class ControllerAndClientTest extends DistributedTestSystemTestBase
private List<TestResult> runTestsForTwoClients(String jsonConfigFile, int expectedNumberOfTests) throws NamingException, InterruptedException
{
- final Config config = ConfigFileHelper.getConfigFromResource(getClass(), jsonConfigFile);
+ final Config config = ConfigFileTestHelper.getConfigFromResource(getClass(), jsonConfigFile);
_controller.setConfig(config);
final Client client1 = new Client(new ClientJmsDelegate(_context));
diff --git a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json
index 8d210dce84..a008dc40d8 100644
--- a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json
+++ b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json
@@ -42,7 +42,8 @@
{
"_name": "participantConsumer1",
"_destinationName": "direct://amq.direct//testQueue",
- "_numberOfMessages": 1
+ "_numberOfMessages": 1,
+ "_evaluateLatency": true
}
]
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java
index ad7f0e0682..74c4724901 100644
--- a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java
@@ -38,7 +38,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
-import org.apache.qpid.disttest.ConfigFileHelper;
+import org.apache.qpid.disttest.ConfigFileTestHelper;
import org.apache.qpid.disttest.controller.Controller;
import org.apache.qpid.disttest.controller.config.Config;
import org.apache.qpid.disttest.jms.ControllerJmsDelegate;
@@ -96,7 +96,7 @@ public class DistributedControllerTest extends DistributedTestSystemTestBase
public void testControllerSendsOneCommandToSingleClient() throws Exception
{
- Config config = ConfigFileHelper.getConfigFromResource(getClass(), "distributedControllerTest.json");
+ Config config = ConfigFileTestHelper.getConfigFromResource(getClass(), "distributedControllerTest.json");
_controller.setConfig(config);
sendRegistration(CLIENT1);
diff --git a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java
index 63c9b42858..7e58e1b5b1 100644
--- a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java
@@ -62,10 +62,10 @@ public class EndToEndTest extends QpidBrokerTestCase
assertEquals("Unexpected number of lines in CSV", numberOfExpectedRows, csvLines.length);
assertDataRowsHaveCorrectTestAndClientName("End To End 1", "producingClient", "participantProducer1", csvLines[1], 1);
- assertDataRowsHaveCorrectTestAndClientName("End To End 1", "consumingClient", "participantConsumer1", csvLines[2], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "consumingClient", "participantConsumer1", csvLines[3], 1);
- assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_PARTICIPANTS_NAME, csvLines[3], 1);
- assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_CONSUMER_PARTICIPANTS_NAME, csvLines[4], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_PARTICIPANTS_NAME, csvLines[4], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_CONSUMER_PARTICIPANTS_NAME, csvLines[2], 1);
assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_PRODUCER_PARTICIPANTS_NAME, csvLines[5], 1);
}