From f3dc157e59ec686e42334bb2f6bae3c1f97b2daf Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Tue, 30 Jan 2007 16:40:20 +0000 Subject: (Submitted by Rupert Smith) Ping tests refactored. Unused ping test classes removed. JUnit-toolkit 0.5-SNAPSHOT added to the build. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501455 13f79535-47bb-0310-9956-ffa450edef68 --- java/mvn-repo/README.txt | 1 + ...-toolkit-maven-plugin-0.5-20070130.111904-1.jar | Bin 0 -> 9768 bytes ...lkit-maven-plugin-0.5-20070130.111904-1.jar.md5 | 1 + ...kit-maven-plugin-0.5-20070130.111904-1.jar.sha1 | 1 + ...-toolkit-maven-plugin-0.5-20070130.111904-1.pom | 91 ++ ...lkit-maven-plugin-0.5-20070130.111904-1.pom.md5 | 1 + ...kit-maven-plugin-0.5-20070130.111904-1.pom.sha1 | 1 + .../0.5-SNAPSHOT/maven-metadata.xml | 12 + .../0.5-SNAPSHOT/maven-metadata.xml.md5 | 1 + .../0.5-SNAPSHOT/maven-metadata.xml.sha1 | 1 + .../junit-toolkit-maven-plugin/maven-metadata.xml | 12 + .../maven-metadata.xml.md5 | 1 + .../maven-metadata.xml.sha1 | 1 + .../junit-toolkit-0.5-20070130.111852-1.jar | Bin 0 -> 67495 bytes .../junit-toolkit-0.5-20070130.111852-1.jar.md5 | 1 + .../junit-toolkit-0.5-20070130.111852-1.jar.sha1 | 1 + .../junit-toolkit-0.5-20070130.111852-1.pom | 111 ++ .../junit-toolkit-0.5-20070130.111852-1.pom.md5 | 1 + .../junit-toolkit-0.5-20070130.111852-1.pom.sha1 | 1 + .../junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml | 12 + .../0.5-SNAPSHOT/maven-metadata.xml.md5 | 1 + .../0.5-SNAPSHOT/maven-metadata.xml.sha1 | 1 + .../thebadgerset/junit-toolkit/maven-metadata.xml | 11 + .../junit-toolkit/maven-metadata.xml.md5 | 1 + .../junit-toolkit/maven-metadata.xml.sha1 | 1 + .../mvn-repo/uk/co/thebadgerset/maven-metadata.xml | 9 + .../uk/co/thebadgerset/maven-metadata.xml.md5 | 1 + .../uk/co/thebadgerset/maven-metadata.xml.sha1 | 1 + java/perftests/bin/run_many.sh | 30 - java/perftests/bin/serviceProvidingClient.sh | 33 - .../bin/serviceRequestReply-MultipleClients.sh | 53 - .../perftests/bin/serviceRequestReply-QuickTest.sh | 43 - .../bin/serviceRequestingClient-createLogFile.sh | 38 - java/perftests/bin/serviceRequestingClient.sh | 34 - java/perftests/bin/setupclasspath.sh | 12 - java/perftests/bin/testPingClient.sh | 33 - java/perftests/bin/testPingProducer.sh | 33 - java/perftests/bin/testPingPublisher.sh | 33 - java/perftests/bin/testPingSubscriber.sh | 33 - java/perftests/bin/topic-QuickTest.sh | 55 - java/perftests/bin/topicListener.sh | 26 - java/perftests/bin/topicPublisher.sh | 23 - java/perftests/pom.xml | 261 ++--- .../org/apache/qpid/ping/AbstractPingClient.java | 204 ---- .../org/apache/qpid/ping/AbstractPingProducer.java | 524 --------- .../java/org/apache/qpid/ping/TestPingClient.java | 192 ---- .../java/org/apache/qpid/ping/TestPingItself.java | 223 ---- .../org/apache/qpid/ping/TestPingProducer.java | 249 ----- .../org/apache/qpid/ping/TestPingPublisher.java | 197 ---- .../org/apache/qpid/ping/TestPingSubscriber.java | 134 --- .../main/java/org/apache/qpid/ping/Throttle.java | 67 -- .../apache/qpid/requestreply/PingPongBouncer.java | 206 +++- .../apache/qpid/requestreply/PingPongProducer.java | 1137 +++++++++++++------- .../qpid/requestreply/ServiceProvidingClient.java | 235 ---- .../qpid/requestreply/ServiceRequestingClient.java | 428 -------- .../org/apache/qpid/ping/PingAsyncTestPerf.java | 555 +++++----- .../java/org/apache/qpid/ping/PingTestPerf.java | 332 +++--- .../org/apache/qpid/ping/ThrottleTestPerf.java | 63 -- .../apache/qpid/requestreply/PingPongTestPerf.java | 301 +++--- java/pom.xml | 9 +- 60 files changed, 1813 insertions(+), 4260 deletions(-) create mode 100644 java/mvn-repo/README.txt create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.sha1 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.sha1 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 create mode 100644 java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml create mode 100644 java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 create mode 100644 java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 delete mode 100755 java/perftests/bin/run_many.sh delete mode 100755 java/perftests/bin/serviceProvidingClient.sh delete mode 100755 java/perftests/bin/serviceRequestReply-MultipleClients.sh delete mode 100755 java/perftests/bin/serviceRequestReply-QuickTest.sh delete mode 100755 java/perftests/bin/serviceRequestingClient-createLogFile.sh delete mode 100755 java/perftests/bin/serviceRequestingClient.sh delete mode 100755 java/perftests/bin/setupclasspath.sh delete mode 100755 java/perftests/bin/testPingClient.sh delete mode 100755 java/perftests/bin/testPingProducer.sh delete mode 100755 java/perftests/bin/testPingPublisher.sh delete mode 100755 java/perftests/bin/testPingSubscriber.sh delete mode 100755 java/perftests/bin/topic-QuickTest.sh delete mode 100755 java/perftests/bin/topicListener.sh delete mode 100755 java/perftests/bin/topicPublisher.sh delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java (limited to 'java') diff --git a/java/mvn-repo/README.txt b/java/mvn-repo/README.txt new file mode 100644 index 0000000000..e24cb7a41b --- /dev/null +++ b/java/mvn-repo/README.txt @@ -0,0 +1 @@ +Temporary local repository for jars that are not available in the central repository yet. diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar new file mode 100644 index 0000000000..43c678f547 Binary files /dev/null and b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar differ diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 new file mode 100644 index 0000000000..87820b3b0a --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 @@ -0,0 +1 @@ +8aff63861edb0a6bb47b5fad955a6ba5 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 new file mode 100644 index 0000000000..5a72a41b79 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 @@ -0,0 +1 @@ +600209771b236268f1b939e4a924899875ee8562 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom new file mode 100644 index 0000000000..65587eb683 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom @@ -0,0 +1,91 @@ + + 4.0.0 + uk.co.thebadgerset + junit-toolkit-maven-plugin + maven-plugin + junit-toolkit-maven-plugin + 0.5-20070130.111904-1 + Maven plugin for the JUnit Toolkit to run performance tests with TKTestRunner. + http://www.thebadgerset.co.uk/projects/junit-toolkit-maven-plugin + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + scm:${scm.setup}/junit-toolkit-maven-plugin + + + The Badger Set trading as Liberty Bishop (1151) ltd. + http://www.thebadgerset.co.uk/ + + + src/main + + + + maven-compiler-plugin + 2.0.1 + + 1.5 + 1.5 + false + + + + + + + + uk.co.thebadgerset + junit-toolkit + 0.5-SNAPSHOT + + + org.apache.maven + maven-plugin-api + 2.0.4 + + + + + + maven-pmd-plugin + + true + utf-8 + 20 + 1.5 + + + + maven-jxr-plugin + + + maven-javadoc-plugin + + + maven-checkstyle-plugin + + ../mavenbuild/coding_standards.xml + + + + + + + false + release-repo + The Badger Set Maven2 Repository + file://c:/temp + + + snapshot-repo + The Badger Set Maven2 Snapshot Repository + file://c:/temp + + deployed + + \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 new file mode 100644 index 0000000000..adf20d93ad --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 @@ -0,0 +1 @@ +4ab65f208ffa4400551233321b90933a \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 new file mode 100644 index 0000000000..aeb3966048 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 @@ -0,0 +1 @@ +84f491024bd60142781ef9035f4394cb1379902d \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml new file mode 100644 index 0000000000..0a46c1d79a --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml @@ -0,0 +1,12 @@ + + uk.co.thebadgerset + junit-toolkit-maven-plugin + 0.5-SNAPSHOT + + + 20070130.111904 + 1 + + 20070130111904 + + \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 new file mode 100644 index 0000000000..4e7eab390b --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 @@ -0,0 +1 @@ +c619b7ac915b2eba622d556b2d2e0c25 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 new file mode 100644 index 0000000000..83a5267307 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 @@ -0,0 +1 @@ +db7b5d51a53a5018611391ecc3346032a6c20dda \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml new file mode 100644 index 0000000000..a3bff0dde2 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml @@ -0,0 +1,12 @@ + + uk.co.thebadgerset + junit-toolkit-maven-plugin + 0.5-SNAPSHOT + + 0.5-SNAPSHOT + + 0.5-SNAPSHOT + + 20070130111904 + + \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 new file mode 100644 index 0000000000..395a968533 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 @@ -0,0 +1 @@ +da47ce66de64d4ba056d0a9c901c5676 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 new file mode 100644 index 0000000000..b396785e6c --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 @@ -0,0 +1 @@ +70adf93da1c1757152e954750ceb2477a8659a99 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar new file mode 100644 index 0000000000..db71961ce4 Binary files /dev/null and b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar differ diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.md5 new file mode 100644 index 0000000000..37cccd095d --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.md5 @@ -0,0 +1 @@ +1205e91299592f83deb2c7ec22750a76 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.sha1 new file mode 100644 index 0000000000..f44cc451a2 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.sha1 @@ -0,0 +1 @@ +4e8e71ec99dd11019f1af126ebac7f87a13bfbc4 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom new file mode 100644 index 0000000000..2377459213 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom @@ -0,0 +1,111 @@ + + 4.0.0 + uk.co.thebadgerset + junit-toolkit + junit-toolkit + 0.5-20070130.111852-1 + JUnit Toolkit enhances JUnit with performance testing, asymptotic behaviour analysis, and concurrency testing. + http://www.thebadgerset.co.uk/junit-toolkit + + + rupert + Rupert Smith + rupertlssmith (contactable on g-m-a-i-l) + + + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + scm:svn:http://www.thebadgerset.co.uk/svn/junit-toolkit + + + The Badger Set trading as Liberty Bishop (1151) ltd. + http://www.thebadgerset.co.uk/ + + + src/main + + + + maven-compiler-plugin + 2.0.1 + + 1.5 + 1.5 + false + + + + + + + + + apache.snapshots + Apache SNAPSHOT Repository + http://people.apache.org/repo/m2-snapshot-repository + + + + + junit + junit + 3.8.1 + + + log4j + log4j + 1.2.8 + + + regexp + regexp + 1.3 + + + + + + maven-pmd-plugin + + true + utf-8 + 20 + 1.5 + + + + maven-jxr-plugin + + + maven-javadoc-plugin + + + maven-checkstyle-plugin + + ../mavenbuild/coding_standards.xml + + + + + + + false + release-repo + The Badger Set Maven2 Repository + file://c:/temp + + + snapshot-repo + The Badger Set Maven2 Snapshot Repository + file://c:/temp + + deployed + + \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.md5 new file mode 100644 index 0000000000..1f7d620826 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.md5 @@ -0,0 +1 @@ +a60cfd37a65a44a06cee8d2e6c2f3dcf \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.sha1 new file mode 100644 index 0000000000..2d9f5181e0 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.sha1 @@ -0,0 +1 @@ +257525cd9afcbf8a202fb5ec83df727b028e0fa4 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml new file mode 100644 index 0000000000..b2560dad04 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml @@ -0,0 +1,12 @@ + + uk.co.thebadgerset + junit-toolkit + 0.5-SNAPSHOT + + + 20070130.111852 + 1 + + 20070130111852 + + \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 new file mode 100644 index 0000000000..d179176f11 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 @@ -0,0 +1 @@ +ee20ec4f594fd6efa85274c88d5d87ee \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 new file mode 100644 index 0000000000..02f9de8303 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 @@ -0,0 +1 @@ +d8eb4f60a311e78bb6be82426fee0e579176344d \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml new file mode 100644 index 0000000000..8fad4186db --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml @@ -0,0 +1,11 @@ + + uk.co.thebadgerset + junit-toolkit + 0.5-SNAPSHOT + + + 0.5-SNAPSHOT + + 20070130111852 + + \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 new file mode 100644 index 0000000000..5a4b9b2418 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 @@ -0,0 +1 @@ +4278265ef1e27202cffc5d54d429b510 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 new file mode 100644 index 0000000000..cef38679a5 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 @@ -0,0 +1 @@ +31c796d63b1c0625d26f5df2ecb44220222546bd \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml b/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml new file mode 100644 index 0000000000..4c367f55d3 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml @@ -0,0 +1,9 @@ + + + + junit-toolkit-maven-plugin + junit-toolkit + junit-toolkit-maven-plugin + + + \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 b/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 new file mode 100644 index 0000000000..da122e37da --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 @@ -0,0 +1 @@ +8eee1c76f27e4d20ffcd48d87897b923 \ No newline at end of file diff --git a/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 b/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 new file mode 100644 index 0000000000..fd57ffc943 --- /dev/null +++ b/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 @@ -0,0 +1 @@ +300ebe0bfa8ae2b56d26a9fae9073ef9f902b0e7 \ No newline at end of file diff --git a/java/perftests/bin/run_many.sh b/java/perftests/bin/run_many.sh deleted file mode 100755 index cca2ffec21..0000000000 --- a/java/perftests/bin/run_many.sh +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -# args: -# -# -# - -for i in `seq 1 $1`; do - $3 >$2.$i.out 2>>$2.err & - echo $! > $2.$i.pid -done; diff --git a/java/perftests/bin/serviceProvidingClient.sh b/java/perftests/bin/serviceProvidingClient.sh deleted file mode 100755 index 0f4264be10..0000000000 --- a/java/perftests/bin/serviceProvidingClient.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: - -if [[ $# != 1 ]] ; then - echo "usage: ./serviceProvidingClient.sh [ ] [selector]" - exit 1 -fi - -thehosts=$1 -shift - -. ./setupclasspath.sh -echo $CP - -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.requestreply.ServiceProvidingClient $thehosts guest guest /test serviceQ "$@" diff --git a/java/perftests/bin/serviceRequestReply-MultipleClients.sh b/java/perftests/bin/serviceRequestReply-MultipleClients.sh deleted file mode 100755 index 81558c2c0b..0000000000 --- a/java/perftests/bin/serviceRequestReply-MultipleClients.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: - -if [[ $# < 3 ]] ; then - echo "usage: ./serviceRequestReply-QuickTest.sh [ ]" - exit 1 -fi - -thehosts=$1 -shift - -numberofmessages=$1 -shift - -numberofclients=$1 -shift - -. ./setupclasspath.sh -echo $CP - -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.requestreply.ServiceProvidingClient $thehosts guest guest /test serviceQ "$@" & - -providingclient=$! - -./run_many.sh $numberofclients requestClients "./serviceRequestingClient.sh $thehosts $numberofmessages $@" - -sleeping=$(( numberofmessages * 1 / 10 )) - -echo "Sleeping for $sleeping secconds to completion" -sleep $sleeping - -kill $providingclient - -echo "Results" -cat requestClients.*.out \ No newline at end of file diff --git a/java/perftests/bin/serviceRequestReply-QuickTest.sh b/java/perftests/bin/serviceRequestReply-QuickTest.sh deleted file mode 100755 index 31c5e9eb74..0000000000 --- a/java/perftests/bin/serviceRequestReply-QuickTest.sh +++ /dev/null @@ -1,43 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: - -if [[ $# < 2 ]] ; then - echo "usage: ./serviceRequestReply-QuickTest.sh [ ]" - exit 1 -fi - -thehosts=$1 -shift - -numberofmessages=$1 -shift - -. ./setupclasspath.sh -echo $CP - -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.requestreply.ServiceProvidingClient $thehosts guest guest /test serviceQ "$@" & - -providingclient=$! - -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ $numberofmessages "$@" - -kill $providingclient - diff --git a/java/perftests/bin/serviceRequestingClient-createLogFile.sh b/java/perftests/bin/serviceRequestingClient-createLogFile.sh deleted file mode 100755 index c078caf7d1..0000000000 --- a/java/perftests/bin/serviceRequestingClient-createLogFile.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -##LOGDIR=$QPID_HOME/logs -LOGDIR=../logs -date=`date +"%y%m%d%H%M%S"` -LOGFILE=$LOGDIR/perftest.log.$date - -## create the log dir -if [ ! -d $LOGDIR ]; then - mkdir $LOGDIR -fi - -echo "********** Running the test **************" -echo "creating logfile $LOGFILE" -echo - -./serviceRequestingClient.sh $@ 2>&1 | tee $LOGFILE - -echo "********** End of test ******************" -echo \ No newline at end of file diff --git a/java/perftests/bin/serviceRequestingClient.sh b/java/perftests/bin/serviceRequestingClient.sh deleted file mode 100755 index c03cc519c6..0000000000 --- a/java/perftests/bin/serviceRequestingClient.sh +++ /dev/null @@ -1,34 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# usage: ./serviceRequestingClient.sh [] [ ] - -if [[ $# < 2 ]] ; then - echo "usage: ./serviceRequestingClient.sh [] [ ]" - exit 1 -fi - -thehosts=$1 -shift - -# XXX -Xms1024m -XX:NewSize=300m -. ./setupclasspath.sh -echo $CP - -$JAVA_HOME/bin/java -cp $CP -Dlog.dir="$QPID_HOME/logs" -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@" diff --git a/java/perftests/bin/setupclasspath.sh b/java/perftests/bin/setupclasspath.sh deleted file mode 100755 index ef7a037c11..0000000000 --- a/java/perftests/bin/setupclasspath.sh +++ /dev/null @@ -1,12 +0,0 @@ -if [ -z $QPID_HOME ] ; then - echo "QPID_HOME must be set" - exit -fi - -CP=../lib/qpid-performance.jar:$QPID_HOME/lib/qpid-incubating.jar - -if [ `uname -o` == "Cygwin" ] ; then - CP=`cygpath --path --windows $CP` -fi - - diff --git a/java/perftests/bin/testPingClient.sh b/java/perftests/bin/testPingClient.sh deleted file mode 100755 index 4eca4a7999..0000000000 --- a/java/perftests/bin/testPingClient.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: -# -if [[ $# < 1 ]] ; then - echo "usage: ./testPingClient.sh []" - exit 1 -fi - -thehosts=$1 -shift -echo $thehosts -# XXX -Xms1024m -XX:NewSize=300m -. ./setupclasspath.sh -echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.ping.TestPingClient $thehosts guest guest /test "$@" diff --git a/java/perftests/bin/testPingProducer.sh b/java/perftests/bin/testPingProducer.sh deleted file mode 100755 index 39ab487b60..0000000000 --- a/java/perftests/bin/testPingProducer.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: -# -if [[ $# < 1 ]] ; then - echo "usage: ./testPingProducer.sh []" - exit 1 -fi - -thehosts=$1 -shift -echo $thehosts -# XXX -Xms1024m -XX:NewSize=300m -. ./setupclasspath.sh -echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.ping.TestPingProducer $thehosts /test diff --git a/java/perftests/bin/testPingPublisher.sh b/java/perftests/bin/testPingPublisher.sh deleted file mode 100755 index e8219e7612..0000000000 --- a/java/perftests/bin/testPingPublisher.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: -# -if [[ $# < 1 ]] ; then - echo "usage: ./testPingPublisher.sh " - exit 1 -fi - -thehosts=$1 -shift -echo $thehosts -# XXX -Xms1024m -XX:NewSize=300m -. ./setupclasspath.sh -echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.pingpong.TestPingPublisher $thehosts /test diff --git a/java/perftests/bin/testPingSubscriber.sh b/java/perftests/bin/testPingSubscriber.sh deleted file mode 100755 index a0520be093..0000000000 --- a/java/perftests/bin/testPingSubscriber.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: -# -if [[ $# < 1 ]] ; then - echo "usage: ./testPingSubscriber.sh []" - exit 1 -fi - -thehosts=$1 -shift -echo $thehosts -# XXX -Xms1024m -XX:NewSize=300m -. ./setupclasspath.sh -echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="debug" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.pingpong.TestPingSubscriber $thehosts guest guest /test "$@" diff --git a/java/perftests/bin/topic-QuickTest.sh b/java/perftests/bin/topic-QuickTest.sh deleted file mode 100755 index 931f102893..0000000000 --- a/java/perftests/bin/topic-QuickTest.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: - -if [[ $# < 5 ]] ; then - echo "usage: ./topic-QuickTest.sh [other params for both listener and publisher]" - exit 1 -fi - -host=$1 -shift - -port=$1 -shift - -nomessages=$1 -shift - -noclients=$1 -shift - -batches=$1 -shift - -sleeptime=$(( 2 * $noclients )) - -. ./setupclasspath.sh -echo $CP - -./run_many.sh $noclients topic "$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level='warn' -Damqj.test.logging.level='info' -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.topic.Listener -host $host -port $port $@" & - -echo -echo "Pausing for $sleeptime seconds to allow clients to connect" -sleep $sleeptime - -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.topic.Publisher -host $host -port $port -messages $nomessages -clients $noclients -batch $batches $@ - - diff --git a/java/perftests/bin/topicListener.sh b/java/perftests/bin/topicListener.sh deleted file mode 100755 index 757a8c9edb..0000000000 --- a/java/perftests/bin/topicListener.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -# XXX -Xmx512m -Xms512m -XX:NewSize=150m -. ./setupclasspath.sh -echo $CP - -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.topic.Listener $@ diff --git a/java/perftests/bin/topicPublisher.sh b/java/perftests/bin/topicPublisher.sh deleted file mode 100755 index 8bcdaca3c4..0000000000 --- a/java/perftests/bin/topicPublisher.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# XXX -Xmx512m -Xms512m -XX:NewSize=150m -. ./setupclasspath.sh -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.topic.Publisher $@ diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 396701d3ed..712be2e34a 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -19,6 +19,7 @@ + 4.0.0 org.apache.qpid qpid-perftests @@ -35,9 +36,33 @@ .. - perftests.log4j + perftests.log4j + + + + junit-toolkit.snapshots + JUnit Toolkit SNAPSHOT Repository + file://${basedir}/../mvn-repo + + true + + + + + + + + junit-toolkit-plugin.snapshots + JUnit Toolkit SNAPSHOT Repository + file://${basedir}/../mvn-repo + + true + + + + @@ -50,15 +75,15 @@ log4j - - junit - junit + uk.co.thebadgerset + junit-toolkit + - uk.co.thebadgerset - junit-toolkit + junit + junit @@ -72,45 +97,41 @@ org.apache.maven.plugins maven-surefire-plugin - - - - - uk.co.thebadgerset junit-toolkit-maven-plugin - 0.3 - target - ${project.build.finalName}-all-test-deps.jar + target + ${project.build.finalName}-all-test-deps.jar - + log4j.configuration ${log4j.perftests} @@ -119,7 +140,7 @@ amqj.logging.level warn - + badger.level warn @@ -127,148 +148,44 @@ amqj.test.logging.level info - + - - -n PingOnce -s [1] -r 1 -t testPingOk -o . org.apache.qpid.ping.PingTestPerf - - - -n ThrottleTest -r 5 -s [10,10000],samples=100,exp -t testThrottle -o . org.apache.qpid.ping.ThrottleTestPerf - - - -n Skim-Tx -s [1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true - -n Skim-Size -s [1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messagesize=51200 - -n Skim-Many -s [1] -c [4] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf - -n Skim-Queues -s [1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf destinationcount=10 - -n Skim-Duration -s [1000] -d10S -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf - -n Skim-Rate -s [1000] -d10S -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=100 - - - -n VT-Qpid-1 -s [15000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 CommitBatchSize=20000 transacted=true - -n VT-Qpid-2 -s [15000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 - - -n VT-Qpid-3 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true rate=10000 BatchSize=3000000 CommitBatchSize=40000 transacted=true - -n VT-Qpid-4 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true rate=10000 BatchSize=3000000 - - - - -n PT-Qpid-1 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true CommitBatchSize=20000 transacted=true - -n PT-Qpid-2 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true - - - -n PT-Qpid-3 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true transacted=true CommitBatchSize=20000 - -n PT-Qpid-4 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true - - - -n PT-Qpid-5 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=1 transacted=true - -n PT-Qpid-6 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=1 - - - -n PT-Qpid-7 -s [2500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=10 transacted=true - -n PT-Qpid-8 -s [2500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=10 - - - -n PT-Qpid-9 -s [2500] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 transacted=true CommitBatchSize=500 - -n PT-Qpid-10 -s [2500] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 - - - -n PT-Qpid-11 -s [250] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=10 transacted=true CommitBatchSize=50 - -n PT-Qpid-12 -s [250] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=10 - - - -n PT-Qpid-13 -s [250] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 transacted=true CommitBatchSize=50 - -n PT-Qpid-14 -s [250] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 - - - -n VQ-Qpid-1 -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true CommitBatchSize=40000 - -n VQ-Qpid-2 -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 - - -n VQ-Qpid-3 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=10000 BatchSize=3000000 transacted=true CommitBatchSize=40000 - -n VQ-Qpid-4 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=10000 BatchSize=3000000 - - - - -n PQ-Qpid-1 -s [15000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true - -n PQ-Qpid-2 -s [15000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf - - - -n PQ-Qpid-3 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationname=ping transacted=true CommitBatchSize=500 - -n PQ-Qpid-4 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationname=ping - - - -n PQ-Qpid-5 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=1 transacted=true CommitBatchSize=500 - -n PQ-Qpid-6 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=1 - - - -n PQ-Qpid-7 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 transacted=true CommitBatchSize=500 - -n PQ-Qpid-8 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 - - - -n PQ-Qpid-9 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=50 destinationcount=1 transacted=true CommitBatchSize=50 - -n PQ-Qpid-10 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=50 destinationcount=1 - - - -n PQ-Qpid-11 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 transacted=true CommitBatchSize=50 - -n PQ-Qpid-12 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 - - - -n PQ-Qpid-13 -s [15] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=15 transacted=true CommitBatchSize=15 - -n PQ-Qpid-14 -s [15] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=15 - - - - -n LT-Qpid-1-512b -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=512 transacted=true CommitBatchSize=10000 - -n LT-Qpid-2-512b -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=512 - - -n LT-Qpid-1-1K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 transacted=true CommitBatchSize=10000 - -n LT-Qpid-2-1K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 - - -n LT-Qpid-1-5K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=5120 transacted=true CommitBatchSize=10000 - -n LT-Qpid-2-5K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=5120 - - -n LT-Qpid-1-10K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=10240 transacted=true CommitBatchSize=10000 - -n LT-Qpid-2-10K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=10240 - - -n LT-Qpid-1-50K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=51200 transacted=true CommitBatchSize=10000 - -n LT-Qpid-2-50K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=51200 - - -n LT-Qpid-1-1M -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=1048576 transacted=true CommitBatchSize=10000 - -n LT-Qpid-2-1M -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=1048476 - - - -n LT-Qpid-3-512b -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=512 transacted=true CommitBatchSize=10000 - -n LT-Qpid-4-512b -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=512 - - -n LT-Qpid-3-1K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true CommitBatchSize=10000 - -n LT-Qpid-4-1K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 - - -n LT-Qpid-3-5K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=5120 transacted=true CommitBatchSize=10000 - -n LT-Qpid-4-5K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=5120 - - -n LT-Qpid-3-10K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=10240 transacted=true CommitBatchSize=10000 - -n LT-Qpid-4-10K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=10240 - - -n LT-Qpid-3-50K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=51200 transacted=true CommitBatchSize=10000 - -n LT-Qpid-4-50K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=51200 - - -n LT-Qpid-3-1M -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=1048576 transacted=true CommitBatchSize=10000 - -n LT-Qpid-4-1M -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=1048576 - - - - -n FT-Qpid-1 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true CommitBatchSize=10000 broker="tcp://localhost:5001;tcp://localhost:5002" FailBeforeSend=true - -n FT-Qpid-2 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true CommitBatchSize=10000 broker="tcp://localhost:5001;tcp://localhost:5002" FailAfterSend=true - -n FT-Qpid-3 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true CommitBatchSize=10000 broker="tcp://localhost:5001;tcp://localhost:5002" FailAfterCommit=true - - - -n FT-Qpid-4 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 broker="tcp://localhost:5001;tcp://localhost:5002" transacted=false FailBeforeSend=true - -n FT-Qpid-5 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 broker="tcp://localhost:5001;tcp://localhost:5002" transacted=false FailAfterSend=true - + + -n Ping-Once -s [1] -r 1 -t testPingOk -o . org.apache.qpid.ping.PingTestPerf + -n Ping-Once-Async -s [1] -r 1 -t testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf + + + -n Ping-Tx -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true + -n Ping-Size -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messagesize=512 + -n Ping-Concurrent -s [100] -c [4] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf + + -n Ping-Many-Queues -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf destinationcount=4 + + -n Ping-Duration -s [100] -d10S -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf + -n Ping-Rate -s [100] -d10S -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=500 + -n Ping-PubSub -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true + + -n Ping-Many-Topics -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=4 + + + -n Ping-Persistent -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true + + + -n Ping-Batch-Logging -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10 + + + -n Ping-Failover-Before-Send -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailBeforeSend=true + + + -n Ping-Failover-After-Send -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailAfterSend=true + + + -n Ping-Failover-Before-Commit -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailBeforeCommit=true + + + -n Ping-Failover-After-Commit -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailAfterCommit=true + @@ -276,13 +193,13 @@ test - + ---> + + src/ false @@ -321,7 +238,7 @@ **/*.java - + / false diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java deleted file mode 100644 index 97b411323e..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import java.io.IOException; -import java.text.SimpleDateFormat; - -import javax.jms.JMSException; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.jms.Session; - -/** - * Provides common functionality that ping clients (the recipients of ping messages) can use. This base class keeps - * track of the connection used to send pings, provides a convenience method to commit a transaction only when a session - * to commit on is transactional, keeps track of whether the ping client is pinging to a queue or a topic, provides - * prompts to the console to terminate brokers before and after commits, in order to test failover functionality, and - * provides a convience formatter for outputing readable timestamps for pings. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Commit the current transcation on a session. - *
Generate failover promts. - *
Keep track the connection. - *
Keep track of p2p or topic ping type. - *
- * - * @todo This base class does not seem particularly usefull and all functionality is duplicated in {@link AbstractPingProducer}. - * Merge it into that class. - */ -public abstract class AbstractPingClient -{ - private static final Logger _logger = Logger.getLogger(TestPingClient.class); - - /** A convenient formatter to use when time stamping output. */ - protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** Holds the connection to the broker. */ - private AMQConnection _connection; - - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - private boolean _isPubSub = false; - - /** - * This flag is used to indicate that the user should be prompted to kill a broker, in order to test - * failover, immediately before committing a transaction. - */ - protected boolean _failBeforeCommit = false; - - /** - * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test - * failover, immediate after committing a transaction. - */ - protected boolean _failAfterCommit = false; - - /** - * Convenience method to commit the transaction on the specified session. If the session to commit on is not - * a transactional session, this method does nothing. - * - *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the - * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker - * after the commit is applied. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - */ - protected void commitTx(Session session) throws JMSException - { - if (session.getTransacted()) - { - try - { - if (_failBeforeCommit) - { - _logger.trace("Failing Before Commit"); - doFailover(); - } - - session.commit(); - - if (_failAfterCommit) - { - _logger.trace("Failing After Commit"); - doFailover(); - } - - _logger.trace("Session Commited."); - } - catch (JMSException e) - { - _logger.trace("JMSException on commit:" + e.getMessage(), e); - - try - { - session.rollback(); - _logger.debug("Message rolled back."); - } - catch (JMSException jmse) - { - _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - } - - /** - * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - * - * @param broker The name of the broker to terminate. - */ - protected void doFailover(String broker) - { - System.out.println("Kill Broker " + broker + " now."); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - } - - /** - * Prompts the user to terminate the broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - */ - protected void doFailover() - { - System.out.println("Kill Broker now."); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - - } - - /** - * Gets the underlying connection that this ping client is running on. - * - * @return The underlying connection that this ping client is running on. - */ - public AMQConnection getConnection() - { - return _connection; - } - - /** - * Sets the connection that this ping client is using. - * - * @param connection The ping connection. - */ - public void setConnection(AMQConnection connection) - { - this._connection = connection; - } - - /** - * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. - * - * @param pubsub true if this client is pinging a topic, false if it is pinging a queue. - */ - public void setPubSub(boolean pubsub) - { - _isPubSub = pubsub; - } - - /** - * Checks whether this client is a p2p or pub/sub ping client. - * - * @return true if this client is pinging a topic, false if it is pinging a queue. - */ - public boolean isPubSub() - { - return _isPubSub; - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java deleted file mode 100644 index 091a865473..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java +++ /dev/null @@ -1,524 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.*; -import javax.jms.Connection; -import javax.jms.Message; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.message.TestMessageFactory; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.Session; - -/** - * Provides common functionality that ping producers (the senders of ping messages) can use. This base class keeps - * track of the connection used to send pings; provides a convenience method to commit a transaction only when a session - * to commit on is transactional; keeps track of whether the ping client is pinging to a queue or a topic; provides - * prompts to the console to terminate brokers before and after commits, in order to test failover functionality; - * requires sub-classes to implement a ping loop, that this provides a run loop to repeatedly call; provides a - * default shutdown hook to cleanly terminate the run loop; keeps track of the destinations to send pings to; - * provides a convenience method to generate short pauses; and provides a convience formatter for outputing readable - * timestamps for pings. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Commit the current transcation on a session. - *
Generate failover promts. - *
Keep track the connection. - *
Keep track of p2p or topic ping type. - *
Call ping loop to repeatedly send pings. - *
Provide a shutdown hook. - *
Generate short pauses. - *
- * - * @todo Destination count versus list of desintations is redundant. Use _destinions.size() to get the count and - * use a list of 1 destination when only 1 is needed. It is only important to distinguish when 1 destination - * is shared between multiple ping producers on the same JVM or if each ping producer has its own single - * destination. - * - * @todo Timestamp messages in nanos, not millis. Millis seems to have bad resolution, at least on windows. - */ -public abstract class AbstractPingProducer implements Runnable, ExceptionListener -{ - private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class); - - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - private boolean _isPubSub = false; - - /** A convenient formatter to use when time stamping output. */ - protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** - * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when - * creating multiple ping producers in the same JVM. - */ - private static AtomicInteger _queueSequenceID = new AtomicInteger(); - - /** Used to tell the ping loop when to terminate, it only runs while this is true. */ - protected boolean _publish = true; - - /** Holds the connection to the broker. */ - private Connection _connection; - - /** Holds the producer session, needed to create ping messages. */ - private Session _producerSession; - - /** Holds the number of destinations that this ping producer will send pings to, defaulting to a single destination. */ - protected int _destinationCount = 1; - - /** Holds the set of destiniations that this ping producer pings. */ - private List _destinations = new ArrayList(); - - /** Holds the message producer to send the pings through. */ - protected org.apache.qpid.jms.MessageProducer _producer; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ - protected boolean _failBeforeCommit = false; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ - protected boolean _failAfterCommit = false; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ - protected boolean _failBeforeSend = false; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ - protected boolean _failAfterSend = false; - - /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ - protected boolean _failOnce = true; - - /** Holds the number of sends that should be performed in every transaction when using transactions. */ - protected int _txBatchSize = 1; - - /** - * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. - * - * @param pubsub true if this client is pinging a topic, false if it is pinging a queue. - */ - public void setPubSub(boolean pubsub) - { - _isPubSub = pubsub; - } - - /** - * Checks whether this client is a p2p or pub/sub ping client. - * - * @return true if this client is pinging a topic, false if it is pinging a queue. - */ - public boolean isPubSub() - { - return _isPubSub; - } - - /** - * Convenience method for a short pause. - * - * @param sleepTime The time in milliseconds to pause for. - */ - public static void pause(long sleepTime) - { - if (sleepTime > 0) - { - try - { - Thread.sleep(sleepTime); - } - catch (InterruptedException ie) - { } - } - } - - /** - * Implementations should provide this method to perform a single ping cycle (which may send many messages). The - * run loop will repeatedly call this method until the publish flag is set to false. - */ - public abstract void pingLoop(); - - /** - * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. - * - * @param replyQueue The reply-to destination for the message. - * @param messageSize The desired size of the message in bytes. - * @param persistent true if the message should use persistent delivery, false otherwise. - * - * @return A freshly generated test message. - * - * @throws JMSException All underlying JMSException are allowed to fall through. - */ - public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException - { - ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); - // Timestamp the message. - msg.setLongProperty("timestamp", System.currentTimeMillis()); - - return msg; - } - - /** - * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this - * flag has been cleared. - */ - public void stop() - { - _publish = false; - } - - /** - * Implements a ping loop that repeatedly pings until the publish flag becomes false. - */ - public void run() - { - // Keep running until the publish flag is cleared. - while (_publish) - { - pingLoop(); - } - } - - /** - * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the - * connection, this clears the publish flag which in turn will halt the ping loop. - * - * @param e The exception that triggered this callback method. - */ - public void onException(JMSException e) - { - _publish = false; - _logger.debug("There was a JMSException: " + e.getMessage(), e); - } - - /** - * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered - * with the runtime system as a shutdown hook. - * - * @return A shutdown hook for the ping loop. - */ - public Thread getShutdownHook() - { - return new Thread(new Runnable() - { - public void run() - { - stop(); - } - }); - } - - /** - * Gets the underlying connection that this ping client is running on. - * - * @return The underlying connection that this ping client is running on. - */ - public Connection getConnection() - { - return _connection; - } - - /** - * Sets the connection that this ping client is using. - * - * @param connection The ping connection. - */ - public void setConnection(Connection connection) - { - this._connection = connection; - } - - /** - * Gets the producer session that the ping client is using to send pings on. - * - * @return The producer session that the ping client is using to send pings on. - */ - public Session getProducerSession() - { - return _producerSession; - } - - /** - * Keeps track of the producer session that the ping client is using to send pings on. - * - * @param session The producer session that the ping client is using to send pings on. - */ - public void setProducerSession(Session session) - { - this._producerSession = session; - } - - /** - * Gets the number of destinations that this ping client is sending to. - * - * @return The number of destinations that this ping client is sending to. - */ - public int getDestinationsCount() - { - return _destinationCount; - } - - /** - * Sets the number of destination that this ping client should send to. - * - * @param count The number of destination that this ping client should send to. - * - * @deprectaed Use _destinations.size() instead. - */ - public void setDestinationsCount(int count) - { - this._destinationCount = count; - } - - /** - * Commits the transaction on the producer session. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - * - * @deprecated Use the commitTx(Session session) method instead, to explicitly specify which session is being - * committed. This makes it more obvious what is going on. - */ - protected void commitTx() throws JMSException - { - commitTx(getProducerSession()); - } - - /** - * Creates the specified number of destinations to send pings to. Topics or Queues will be created depending on - * the value of the {@link #_isPubSub} flag. - * - * @param count The number of ping destinations to create. - */ - protected void createDestinations(int count) - { - // Create the desired number of ping destinations. - for (int i = 0; i < count; i++) - { - AMQDestination destination = null; - - // Check if this is a pub/sub pinger, in which case create topics. - if (isPubSub()) - { - AMQShortString name = - new AMQShortString("AMQTopic_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis()); - destination = new AMQTopic(name); - } - // Otherwise this is a p2p pinger, in which case create queues. - else - { - AMQShortString name = - new AMQShortString("AMQQueue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis()); - destination = new AMQQueue(name, name, false, false, false); - } - - _destinations.add(destination); - } - } - - /** - * Returns the destination from the destinations list with the given index. - * - * @param index The index of the destination to get. - * - * @return Destination with the given index. - */ - protected Destination getDestination(int index) - { - return _destinations.get(index); - } - - /** - * Convenience method to commit the transaction on the specified session. If the session to commit on is not - * a transactional session, this method does nothing (unless the failover after send flag is set). - * - *

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit - * is applied. This flag applies whether the pinger is transactional or not. - * - *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the - * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker - * after the commit is applied. These flags will only apply if using a transactional pinger. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - * - * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit - * method, because commits only apply to transactional pingers, but fail after send applied to transactional - * and non-transactional alike. - */ - protected void commitTx(Session session) throws JMSException - { - _logger.trace("Batch time reached"); - if (_failAfterSend) - { - _logger.trace("Batch size reached"); - if (_failOnce) - { - _failAfterSend = false; - } - - _logger.trace("Failing After Send"); - doFailover(); - } - - if (session.getTransacted()) - { - try - { - if (_failBeforeCommit) - { - if (_failOnce) - { - _failBeforeCommit = false; - } - - _logger.trace("Failing Before Commit"); - doFailover(); - } - - session.commit(); - - if (_failAfterCommit) - { - if (_failOnce) - { - _failAfterCommit = false; - } - - _logger.trace("Failing After Commit"); - doFailover(); - } - - _logger.trace("Session Commited."); - } - catch (JMSException e) - { - _logger.trace("JMSException on commit:" + e.getMessage(), e); - - // Warn that the bounce back client is not available. - if (e.getLinkedException() instanceof AMQNoConsumersException) - { - _logger.debug("No consumers on queue."); - } - - try - { - session.rollback(); - _logger.trace("Message rolled back."); - } - catch (JMSException jmse) - { - _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - } - - /** - * Sends the specified message to the default destination of the ping producer. - * - * @param message The message to send. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - protected void sendMessage(Message message) throws JMSException - { - sendMessage(null, message); - } - - /** - * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination - * of the ping producer. If an explicit destination is set, this overrides the default. - * - * @param destination The destination to send to. - * @param message The message to send. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - protected void sendMessage(Destination destination, Message message) throws JMSException - { - if (_failBeforeSend) - { - if (_failOnce) - { - _failBeforeSend = false; - } - - _logger.trace("Failing Before Send"); - doFailover(); - } - - if (destination == null) - { - _producer.send(message); - } - else - { - _producer.send(destination, message); - } - } - - /** - * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - * - * @param broker The name of the broker to terminate. - */ - protected void doFailover(String broker) - { - System.out.println("Kill Broker " + broker + " now then press return"); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - } - - /** - * Prompts the user to terminate the broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - */ - protected void doFailover() - { - System.out.println("Kill Broker now then press return"); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java deleted file mode 100644 index 949ace20e1..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import java.net.InetAddress; - -import javax.jms.*; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.jms.Session; - -/** - * PingClient is a message listener that received time stamped ping messages. It can work out how long a ping took, - * provided that its clokc is synchronized to that of the ping producer, or by running it on the same machine (or jvm) - * as the ping producer. - *

- *

There is a verbose mode flag which causes information about each ping to be output to the console - * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should - * be disabled for real timing tests as writing to the console will slow things down. - *

- *

- *
CRC Card
Responsibilities Collaborations - *
Provide command line invocation to start the ping consumer on a configurable broker url. - *
- * - * @todo Add a better command line interpreter to the main method. The command line is not very nice... - */ -class TestPingClient extends AbstractPingClient implements MessageListener -{ - private static final Logger _logger = Logger.getLogger(TestPingClient.class); - - /** - * Used to indicate that the reply generator should log timing info to the console (logger info level). - */ - private boolean _verbose = false; - - /** - * The producer session. - */ - private Session _consumerSession; - - /** - * Creates a TestPingClient on the specified session. - * - * @param brokerDetails - * @param username - * @param password - * @param queueName - * @param virtualpath - * @param transacted - * @param selector - * @param verbose - * @param afterCommit - *@param beforeCommit @throws Exception All underlying exceptions allowed to fall through. This is only test code... - */ - public TestPingClient(String brokerDetails, String username, String password, String queueName, String virtualpath, - boolean transacted, String selector, boolean verbose, boolean afterCommit, boolean beforeCommit) throws Exception - { - // Create a connection to the broker. - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); - - setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath)); - - // Create a transactional or non-transactional session depending on the command line parameter. - _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - - // Connect a consumer to the ping queue and register this to be called back by it. - Queue q = new AMQQueue(queueName); - MessageConsumer consumer = _consumerSession.createConsumer(q, 1, false, false, selector); - - consumer.setMessageListener(this); - - // Hang on to the verbose flag setting. - _verbose = verbose; - - // Set failover interrupts - _failAfterCommit = afterCommit; - _failBeforeCommit = beforeCommit; - } - - /** - * Starts a stand alone ping-pong client running in verbose mode. - * - * @param args - */ - public static void main(String[] args) throws Exception - { - _logger.info("Starting..."); - - // Display help on the command line. - if (args.length < 4) - { - System.out.println( - "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector] [failover::commit]"); - System.exit(1); - } - - // Extract all command line parameters. - String brokerDetails = args[0]; - String username = args[1]; - String password = args[2]; - String virtualpath = args[3]; - String queueName = (args.length >= 5) ? args[4] : "ping"; - boolean verbose = (args.length >= 6) ? Boolean.parseBoolean(args[5]) : true; - boolean transacted = (args.length >= 7) ? Boolean.parseBoolean(args[6]) : false; - String selector = (args.length == 8) ? args[7] : null; - - boolean afterCommit = false; - boolean beforeCommit = false; - - for (String arg : args) - { - if (arg.startsWith("failover:")) - { - //failover:: - String[] parts = arg.split(":"); - if (parts.length == 3) - { - if (parts[2].equals("commit")) - { - afterCommit = parts[1].equals("after"); - beforeCommit = parts[1].equals("before"); - } - } - else - { - System.out.println("Unrecognized failover request:" + arg); - } - } - } - - // Create the test ping client and set it running. - TestPingClient pingClient = - new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose, afterCommit, beforeCommit); - - pingClient.getConnection().start(); - - System.out.println("Waiting..."); - } - /** - * This is a callback method that is notified of all messages for which this has been registered as a message - * listener on a message consumer. - * - * @param message The message that triggered this callback. - */ - public void onMessage(javax.jms.Message message) - { - try - { - // Spew out some timing information if verbose mode is on. - if (_verbose) - { - Long timestamp = message.getLongProperty("timestamp"); - - if (timestamp != null) - { - long diff = System.currentTimeMillis() - timestamp; - System.out.println("Ping time: " + diff); - } - } - - // Commit the transaction if running in transactional mode. - commitTx(_consumerSession); - } - catch (JMSException e) - { - _logger.error("There was a JMSException: " + e.getMessage(), e); - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java deleted file mode 100644 index acb0135b86..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.ping; - -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; -import org.apache.qpid.topic.Config; - -/** - * This class is used to test sending and receiving messages to (pingQueue) and from a queue (replyQueue). - * The producer and consumer created by this test send and receive messages to and from the same Queue. ie. - * pingQueue and replyQueue are same. - * This class extends @see org.apache.qpid.requestreply.PingPongProducer which different ping and reply Queues - */ -public class TestPingItself extends PingPongProducer -{ - private static final Logger _logger = Logger.getLogger(TestPingItself.class); - - /** - * If noOfDestinations is <= 1 : There will be one Queue and one consumer instance for the test - * If noOfDestinations is > 1 : This creats a client for tests with multiple queues. Creates as many consumer instances - * as there are queues, each listening to a Queue. A producer is created which picks up a queue from - * the list of queues to send message - * - * @param brokerDetails - * @param username - * @param password - * @param virtualpath - * @param queueName - * @param selector - * @param transacted - * @param persistent - * @param messageSize - * @param verbose - * @param afterCommit - * @param beforeCommit - * @param afterSend - * @param beforeSend - * @param failOnce - * @param batchSize - * @param noOfDestinations - * @throws Exception - */ - public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName, - String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose, - boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce, - int batchSize, int noOfDestinations, int rate, boolean pubsub) throws Exception - { - super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, - messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, - noOfDestinations, rate, pubsub); - - if (noOfDestinations > DEFAULT_DESTINATION_COUNT) - { - createDestinations(noOfDestinations); - _persistent = persistent; - _messageSize = messageSize; - _verbose = verbose; - - createConsumers(selector); - createProducer(); - } - } - - /** - * Sets the replyQueue to be the same as ping queue. - */ - @Override - public void createConsumer(String selector) throws JMSException - { - // Create a message consumer to get the replies with and register this to be called back by it. - setReplyDestination(getPingDestination()); - MessageConsumer consumer = - getConsumerSession().createConsumer(getReplyDestination(), PREFETCH, false, EXCLUSIVE, selector); - consumer.setMessageListener(this); - } - - /** - * Starts a ping-pong loop running from the command line. - * - * @param args The command line arguments as defined above. - */ - public static void main(String[] args) throws Exception - { - // Extract the command line. - Config config = new Config(); - config.setOptions(args); - if (args.length == 0) - { - _logger.info("Running test with default values..."); - } - - String brokerDetails = config.getHost() + ":" + config.getPort(); - String virtualpath = "/test"; - boolean verbose = true; - boolean transacted = config.isTransacted(); - boolean persistent = config.usePersistentMessages(); - int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE; - int messageCount = config.getMessages(); - int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT; - int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_BATCH_SIZE; - int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE; - boolean pubsub = config.isPubSub(); - - String destName = config.getDestination(); - if (destName == null) - { - destName = PING_DESTINATION_NAME; - } - - boolean afterCommit = false; - boolean beforeCommit = false; - boolean afterSend = false; - boolean beforeSend = false; - boolean failOnce = false; - - for (String arg : args) - { - if (arg.startsWith("failover:")) - { - //failover:: - String[] parts = arg.split(":"); - if (parts.length == 3) - { - if (parts[2].equals("commit")) - { - afterCommit = parts[1].equals("after"); - beforeCommit = parts[1].equals("before"); - } - - if (parts[2].equals("send")) - { - afterSend = parts[1].equals("after"); - beforeSend = parts[1].equals("before"); - } - - if (parts[1].equals("once")) - { - failOnce = true; - } - - } - else - { - System.out.println("Unrecognized failover request:" + arg); - } - } - } - - // Create a ping producer to handle the request/wait/reply cycle. - TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, destName, null, - transacted, persistent, messageSize, verbose, afterCommit, - beforeCommit, afterSend, beforeSend, failOnce, batchSize, - destCount, rate, pubsub); - - pingItself.getConnection().start(); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingItself.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - pingItself.getConnection().setExceptionListener(pingItself); - - if ((destCount > DEFAULT_DESTINATION_COUNT) || (messageCount > 0)) - { - _logger.info("Destinations Count:" + destCount + ", Transacted:" + transacted + ", persistent:" + - persistent + ",Message Size:" + messageSize + " bytes, pubsub:" + pubsub); - pingItself.pingLoop(); - } - else - { - _logger.info("Destination:" + destName + ", Transacted:" + transacted + ", persistent:" + - persistent + ",Message Size:" + messageSize + " bytes, pubsub:" + pubsub); - // set the message count to 0 to run this loop - // Run a few priming pings to remove warm up time from test results. - pingItself.prime(PRIMING_LOOPS); - - _logger.info("Running the infinite loop and pinging the broker..."); - // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. - Thread pingThread = new Thread(pingItself); - pingThread.run(); - pingThread.join(); - } - - pingItself.getConnection().close(); - } - - private static void usage() - { - System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port" + - "-destinationname : queue/topic name\n" + - "-transacted : (true/false). Default is false\n" + - "-persistent : (true/false). Default is false\n" + - "-pubsub : (true/false). Default is false\n" + - "-selector : selector string\n" + - "-payload : paylaod size. Default is 0\n" + - "-messages : no of messages to be sent (if 0, the ping loop will run indefinitely)\n" + - "-destinationscount : no of destinations for multi-destinations test\n" + - "-batchsize : batch size\n" + - "-rate : thruput rate\n"); - System.exit(0); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java deleted file mode 100644 index d9e81d39de..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import java.net.InetAddress; -import java.text.SimpleDateFormat; -import java.util.Date; - -import javax.jms.*; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.jms.MessageProducer; -import org.apache.qpid.jms.Session; - -/** - * PingProducer is a client that sends timestamped pings to a queue. It is designed to be run from the command line - * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session and - * configured message producer. - *

- *

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop - * does all its work through helper methods, so that code wishing to run a ping cycle is not forced to do so - * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is - * also registered to terminate the ping loop cleanly. - *

- *

- *
CRC Card
Responsibilities Collaborations - *
Provide a ping cycle. - *
Provide command line invocation to loop the ping cycle on a configurable broker url. - *
- */ -class TestPingProducer extends AbstractPingProducer -{ - private static final Logger _logger = Logger.getLogger(TestPingProducer.class); - - /** - * Used to set up a default message size. - */ - private static final int DEFAULT_MESSAGE_SIZE = 0; - - /** - * Used to define how long to wait between pings. - */ - private static final long SLEEP_TIME = 250; - - /** - * Holds the name of the queue to send pings on. - */ - private static final String PING_QUEUE_NAME = "ping"; - - private static TestPingProducer _pingProducer; - - /** - * Determines whether this producer sends persistent messages from the run method. - */ - private boolean _persistent = false; - - /** - * Holds the message size to send, from the run method. - */ - private int _messageSize = DEFAULT_MESSAGE_SIZE; - - /** - * Used to indicate that the ping loop should print out whenever it pings. - */ - private boolean _verbose = false; - - public TestPingProducer(String brokerDetails, String username, String password, String virtualpath, String queueName, - boolean transacted, boolean persistent, int messageSize, boolean verbose, boolean afterCommit, - boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize) - throws Exception - { - // Create a connection to the broker. - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); - - setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath)); - - // Create a transactional or non-transactional session, based on the command line arguments. - setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE)); - - // Create a queue to send the pings on. - Queue pingQueue = new AMQQueue(queueName); - _producer = (MessageProducer) getProducerSession().createProducer(pingQueue); - - _persistent = persistent; - _messageSize = messageSize; - - _verbose = verbose; - - // Set failover interrupts - _failAfterCommit = afterCommit; - _failBeforeCommit = beforeCommit; - _failAfterSend = afterSend; - _failBeforeSend = beforeSend; - _txBatchSize = batchSize; - _failOnce = failOnce; - } - - /** - * Starts a ping-pong loop running from the command line. The bounce back client {@link TestPingClient} also needs - * to be started to bounce the pings back again. - * - * @param args The command line arguments as defined above. - */ - public static void main(String[] args) throws Exception - { - // Extract the command line. - if (args.length < 2) - { - System.err.println( - "Usage: TestPingPublisher " - + "[ "); - System.exit(0); - } - - String brokerDetails = args[0]; - String virtualpath = args[1]; - boolean verbose = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : true; - boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; - boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; - int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE; - int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1; - - boolean afterCommit = false; - boolean beforeCommit = false; - boolean afterSend = false; - boolean beforeSend = false; - boolean failOnce = false; - - for (String arg : args) - { - if (arg.startsWith("failover:")) - { - //failover:: - String[] parts = arg.split(":"); - if (parts.length == 3) - { - if (parts[2].equals("commit")) - { - afterCommit = parts[1].equals("after"); - beforeCommit = parts[1].equals("before"); - } - - if (parts[2].equals("send")) - { - afterSend = parts[1].equals("after"); - beforeSend = parts[1].equals("before"); - } - - if (parts[1].equals("once")) - { - failOnce = true; - } - } - else - { - System.out.println("Unrecognized failover request:" + arg); - } - } - } - - // Create a ping producer to generate the pings. - _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, transacted, - persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, - beforeSend, failOnce, batchSize); - - // Start the connection running. - _pingProducer.getConnection().start(); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook()); - - // Ensure the ping loop execption listener is registered on the connection to terminate it on error. - _pingProducer.getConnection().setExceptionListener(_pingProducer); - - // Start the ping loop running until it is interrupted. - Thread pingThread = new Thread(_pingProducer); - pingThread.run(); - pingThread.join(); - } - - /** - * Sends the specified ping message. - * - * @param message The message to send. - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - public void ping(Message message) throws JMSException - { - sendMessage(message); - - // Keep the messageId to correlate with the reply. - String messageId = message.getJMSMessageID(); - - // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of - // this method, as the message will not be sent until the transaction is committed. - commitTx(); - } - - /** - * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and - * waits for short pauses in between each. - */ - public void pingLoop() - { - try - { - // Generate a sample message and time stamp it. - ObjectMessage msg = getTestMessage(null, _messageSize, _persistent); - msg.setLongProperty("timestamp", System.currentTimeMillis()); - - // Send the message. - ping(msg); - - if (_verbose) - { - System.out.println("Pinged at: " + timestampFormatter.format(new Date())); //" + " with id: " + msg.getJMSMessageID()); - } - // Introduce a short pause if desired. - pause(SLEEP_TIME); - } - catch (JMSException e) - { - _publish = false; - _logger.error("There was a JMSException: " + e.getMessage(), e); - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java deleted file mode 100644 index 3b2dcc4d36..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pingpong; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.AMQException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.qpid.client.message.TestMessageFactory; -import org.apache.qpid.jms.MessageProducer; -import org.apache.qpid.jms.Session; - -import javax.jms.*; -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * A client that behaves as follows: - *

  • Connects to a queue, whose name is specified as a cmd-line argument
  • - *
  • Creates a temporary queue
  • - *
  • Creates messages containing a property that is the name of the temporary queue
  • - *
  • Fires off a message on the original queue and waits for a response on the temporary queue
  • - *
- */ -public class TestPingPublisher implements ExceptionListener -{ - private static final Logger _log = Logger.getLogger(TestPingPublisher.class); - - private AMQConnection _connection; - - private boolean _publish; - private static int _messageSize = 0; - private long SLEEP_TIME = 0L; - -// private class CallbackHandler implements MessageListener -// { -// -// private int _actualMessageCount; -// -// -// public void onMessage(Message m) -// { -// if (_log.isDebugEnabled()) -// { -// _log.debug("Message received: " + m); -// } -// _actualMessageCount++; -// if (_actualMessageCount % 1000 == 0) -// { -// _log.info("Received message count: " + _actualMessageCount); -// } -// } -// } - - public TestPingPublisher(String brokerDetails, String clientID, String virtualpath) throws AMQException, URLSyntaxException - { - try - { - createConnection(brokerDetails, clientID, virtualpath); - - Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - //AMQQueue destination = new AMQQueue("ping"); - AMQTopic destination = new AMQTopic("ping"); - MessageProducer producer = (MessageProducer) session.createProducer(destination); - - _connection.setExceptionListener(this); - - _connection.start(); - - int msgCount = 0; - while (_publish) - { -/* - TextMessage msg = session.createTextMessage( - "Presented to in conjunction with Mahnah Mahnah and the Snowths: " + ++messageNumber); -*/ - ObjectMessage msg = null; - if (_messageSize != 0) - { - msg = TestMessageFactory.newObjectMessage(session, _messageSize); - } - else - { - msg = session.createObjectMessage(); - } - - Long time = System.nanoTime(); - msg.setStringProperty("timestampString", Long.toString(time)); - msg.setLongProperty("timestamp", time); - - ((BasicMessageProducer) producer).send(msg, DeliveryMode.PERSISTENT, true); - - _log.info("Message Sent:" + msgCount++); - _log.debug(msg); - - if (msgCount == Integer.MAX_VALUE) - { - _publish = false; - } - - if (SLEEP_TIME > 0) - { - try - { - Thread.sleep(SLEEP_TIME); - } - catch (InterruptedException ie) - { - //do nothing - } - } - } - - } - catch (JMSException e) - { - e.printStackTrace(); - } - } - - private void createConnection(String brokerDetails, String clientID, String virtualpath) throws AMQException, URLSyntaxException - { - _publish = true; - _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath); - _log.info("Connected with URL:" + _connection.toURL()); - } - - /** - * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank - * means the server will allocate a name. - */ - public static void main(String[] args) - { - if (args.length < 2) - { - System.err.println("Usage: TestPingPublisher [message size in bytes]"); - System.exit(0); - } - try - { - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); - if (args.length > 2 ) - { - _messageSize = Integer.parseInt(args[2]); - } - new TestPingPublisher(args[0], clientID, args[1]); - } - catch (UnknownHostException e) - { - e.printStackTrace(); - } - catch (AMQException e) - { - System.err.println("Error in client: " + e); - e.printStackTrace(); - } - catch (URLSyntaxException e) - { - System.err.println("Error in connection arguments : " + e); - } - - //System.exit(0); - } - - /** - * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) - */ - public void onException(JMSException e) - { - System.err.println(e.getMessage()); - - _publish = false; - e.printStackTrace(System.err); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java deleted file mode 100644 index b43319744a..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pingpong; - -import org.apache.log4j.Logger; -import org.apache.log4j.Level; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.jms.Session; - -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Topic; -import javax.jms.JMSException; -import java.net.InetAddress; - -public class TestPingSubscriber -{ - private static final Logger _logger = Logger.getLogger(TestPingSubscriber.class); - - private static class TestPingMessageListener implements MessageListener - { - public TestPingMessageListener() - { - } - - long _lastTimestamp = 0L; - long _lastTimestampString = 0L; - - public void onMessage(javax.jms.Message message) - { - Long time = System.nanoTime(); - - if (_logger.isInfoEnabled()) - { - long timestampString = 0L; - - try - { - long timestamp = message.getLongProperty("timestamp"); - timestampString = Long.parseLong(message.getStringProperty("timestampString")); - - if (timestampString != timestamp) - { - _logger.info("Timetamps differ!:\n" + - "timestamp:" + timestamp + "\n" + - "timestampString:" + timestampString); - } - - } - catch (JMSException jmse) - { - _logger.error("JMSException caught:" + jmse.getMessage(), jmse); - } - - - long stringDiff = time - timestampString; - - _logger.info("Ping: TS:" + stringDiff / 1000 + "us"); - - // _logger.info(_name + " got message '" + message + "\n"); - } - } - } - - public static void main(String[] args) - { - _logger.info("Starting..."); - - if (args.length < 4) - { - System.out.println("Usage: brokerdetails username password virtual-path [selector] "); - System.exit(1); - } - try - { - InetAddress address = InetAddress.getLocalHost(); - AMQConnection con1 = new AMQConnection(args[0], args[1], args[2], - address.getHostName(), args[3]); - - _logger.info("Connected with URL:" + con1.toURL()); - - final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session) - con1.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - String selector = null; - - if (args.length == 5) - { - selector = args[4]; - _logger.info("Message selector is <" + selector + ">..."); - } - else - { - _logger.info("Not using message selector "); - } - - Topic t = new AMQTopic("ping"); - - MessageConsumer consumer1 = session1.createConsumer(t, - 1, false, false, selector); - - consumer1.setMessageListener(new TestPingMessageListener()); - con1.start(); - } - catch (Throwable t) - { - System.err.println("Fatal error: " + t); - t.printStackTrace(); - } - - System.out.println("Waiting..."); - } -} - diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java b/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java deleted file mode 100644 index 1e98e45bba..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.apache.qpid.ping; - -/** - * Throttle is a helper class used in situations where a controlled rate of processing is desired. It allows a certain - * number of operations-per-second to be defined and supplies a {@link #throttle} method that can only be called at - * most at that rate. The first call to the throttle method will return immediately, subsequent calls will introduce - * a short pause to fill out the remainder of the current cycle to attain the desired rate. If there is no remainder - * left then it will return immediately. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- * - * @author Rupert Smith - */ -public class Throttle -{ - /** Holds the length of the cycle in nano seconds. */ - long cycleLengthNanos = 0L; - - /** Records the nano time of the last call to the throttle method. */ - long lastTimeNanos = 0L; - - /** - * Sets up the desired rate of operation per second that the throttle method should restrict to. - * - * @param opsPerSecond The maximum number of calls per second that the throttle method will take. - */ - public void setRate(int opsPerSecond) - { - // Calculate the length of a cycle. - cycleLengthNanos = 1000000000 / opsPerSecond; - } - - /** - * Introduces a short pause to fill out any time left in the cycle since this method was last called, of length - * defined by a call to the {@link #setRate} method. - */ - public void throttle() - { - // Record the time now. - long currentTimeNanos = System.nanoTime(); - - // Check if there is any time remaining in the current cycle and introduce a short wait to fill out the - // remainder of the cycle if needed. - long remainingTimeNanos = cycleLengthNanos - (currentTimeNanos - lastTimeNanos); - - if (remainingTimeNanos > 0) - { - long milliWait = remainingTimeNanos / 1000000; - int nanoWait = (int) (remainingTimeNanos % 1000000); - - try - { - Thread.currentThread().sleep(milliWait, nanoWait); - } - catch (InterruptedException e) - { - // Just ignore this? - } - } - - // Keep the time of the last call to this method to calculate the next cycle. - //lastTimeNanos = currentTimeNanos; - lastTimeNanos = System.nanoTime(); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java index bae6aa0dc2..87edd31575 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.requestreply; +import java.io.IOException; import java.net.InetAddress; +import java.text.SimpleDateFormat; import java.util.Date; import javax.jms.*; @@ -32,7 +34,6 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.Session; -import org.apache.qpid.ping.AbstractPingClient; import org.apache.qpid.topic.Config; /** @@ -58,7 +59,7 @@ import org.apache.qpid.topic.Config; * * @todo Make verbose accept a number of messages, only prints to console every X messages. */ -public class PingPongBouncer extends AbstractPingClient implements MessageListener +public class PingPongBouncer implements MessageListener { private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); @@ -73,6 +74,9 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen /** The default exclusive flag for the message consumer. */ private static final boolean EXCLUSIVE = false; + /** A convenient formatter to use when time stamping output. */ + protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ private boolean _verbose = false; @@ -93,6 +97,24 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen /** The producer session. */ private Session _producerSession; + /** Holds the connection to the broker. */ + private AMQConnection _connection; + + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + private boolean _isPubSub = false; + + /** + * This flag is used to indicate that the user should be prompted to kill a broker, in order to test + * failover, immediately before committing a transaction. + */ + protected boolean _failBeforeCommit = false; + + /** + * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test + * failover, immediate after committing a transaction. + */ + protected boolean _failAfterCommit = false; + /** * Creates a PingPongBouncer on the specified producer and consumer sessions. * @@ -110,8 +132,8 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen * @throws Exception All underlying exceptions allowed to fall through. This is only test code... */ public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, - String destinationName, boolean persistent, boolean transacted, String selector, - boolean verbose, boolean pubsub) throws Exception + String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose, + boolean pubsub) throws Exception { // Create a client id to uniquely identify this client. InetAddress address = InetAddress.getLocalHost(); @@ -133,7 +155,8 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen // Create the queue to listen for message on. createConsumerDestination(destinationName); - MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); + MessageConsumer consumer = + _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); // Create a producer for the replies, without a default destination. _replyProducer = _producerSession.createProducer(null); @@ -144,18 +167,6 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen consumer.setMessageListener(this); } - private void createConsumerDestination(String name) - { - if (isPubSub()) - { - _consumerDestination = new AMQTopic(name); - } - else - { - _consumerDestination = new AMQQueue(name); - } - } - /** * Starts a stand alone ping-pong client running in verbose mode. * @@ -177,12 +188,13 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen Config config = new Config(); config.setOptions(args); String brokerDetails = config.getHost() + ":" + config.getPort(); - String virtualpath = "/test"; + String virtualpath = "/test"; String destinationName = config.getDestination(); if (destinationName == null) { destinationName = DEFAULT_DESTINATION_NAME; } + String selector = config.getSelector(); boolean transacted = config.isTransacted(); boolean persistent = config.usePersistentMessages(); @@ -192,13 +204,22 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen //String selector = null; // Instantiate the ping pong client with the command line options and start it running. - PingPongBouncer pingBouncer = new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, - destinationName, persistent, transacted, selector, verbose, pubsub); + PingPongBouncer pingBouncer = + new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted, + selector, verbose, pubsub); pingBouncer.getConnection().start(); System.out.println("Waiting..."); } + private static void usage() + { + System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + + "-persistent : (true/false). Default is false\n" + + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); + } + /** * This is a callback method that is notified of all messages for which this has been registered as a message * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to @@ -260,14 +281,145 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen } } - private static void usage() + /** + * Gets the underlying connection that this ping client is running on. + * + * @return The underlying connection that this ping client is running on. + */ + public AMQConnection getConnection() + { + return _connection; + } + + /** + * Sets the connection that this ping client is using. + * + * @param connection The ping connection. + */ + public void setConnection(AMQConnection connection) + { + this._connection = connection; + } + + /** + * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. + * + * @param pubsub true if this client is pinging a topic, false if it is pinging a queue. + */ + public void setPubSub(boolean pubsub) + { + _isPubSub = pubsub; + } + + /** + * Checks whether this client is a p2p or pub/sub ping client. + * + * @return true if this client is pinging a topic, false if it is pinging a queue. + */ + public boolean isPubSub() + { + return _isPubSub; + } + + /** + * Convenience method to commit the transaction on the specified session. If the session to commit on is not + * a transactional session, this method does nothing. + * + *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the + * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker + * after the commit is applied. + * + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + */ + protected void commitTx(Session session) throws JMSException + { + if (session.getTransacted()) + { + try + { + if (_failBeforeCommit) + { + _logger.trace("Failing Before Commit"); + doFailover(); + } + + session.commit(); + + if (_failAfterCommit) + { + _logger.trace("Failing After Commit"); + doFailover(); + } + + _logger.trace("Session Commited."); + } + catch (JMSException e) + { + _logger.trace("JMSException on commit:" + e.getMessage(), e); + + try + { + session.rollback(); + _logger.debug("Message rolled back."); + } + catch (JMSException jmse) + { + _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } + } + } + } + + /** + * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + * + * @param broker The name of the broker to terminate. + */ + protected void doFailover(String broker) + { + System.out.println("Kill Broker " + broker + " now."); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + } + + /** + * Prompts the user to terminate the broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + */ + protected void doFailover() { - System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + - "-destinationname : queue/topic name\n" + - "-transacted : (true/false). Default is false\n" + - "-persistent : (true/false). Default is false\n" + - "-pubsub : (true/false). Default is false\n" + - "-selector : selector string\n"); + System.out.println("Kill Broker now."); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + + } + + private void createConsumerDestination(String name) + { + if (isPubSub()) + { + _consumerDestination = new AMQTopic(name); + } + else + { + _consumerDestination = new AMQQueue(name); + } } /** diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 263e62cf04..310ec5f5e3 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -20,344 +20,353 @@ */ package org.apache.qpid.requestreply; +import java.io.IOException; import java.net.InetAddress; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.jms.*; import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.client.*; +import org.apache.qpid.client.message.TestMessageFactory; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; -import org.apache.qpid.ping.AbstractPingProducer; -import org.apache.qpid.ping.Throttle; import org.apache.qpid.topic.Config; +import uk.co.thebadgerset.junit.extensions.BatchedThrottle; +import uk.co.thebadgerset.junit.extensions.Throttle; + /** * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back - * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line - * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session, - * message producer and message consumer to run the ping-pong cycle on. - *

+ * client (see {@link PingPongBouncer} for the bounce back client). + * *

The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. * This means that this class has to do some work to correlate pings with pongs; it expectes the original message - * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then - * this correlation would not need to be done. - *

+ * correlation id in the ping to be bounced back in the reply correlation id. + * + *

This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. + * It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings + * within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform + * failover testing. + * *

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is * also registered to terminate the ping-pong loop cleanly. - *

+ * *

*
CRC Card
Responsibilities Collaborations - *
Provide a ping and wait for response cycle. + *
Provide a ping and wait for all responses cycle. *
Provide command line invocation to loop the ping cycle on a configurable broker url. *
* - * @todo Make temp queue per ping a command line option. - * @todo Make the queue name a command line option. + * @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping. + * Use the rate and throttling only. + * + * @todo Make shared or unique destinations a configurable option, hard coded to false. */ -public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener +public class PingPongProducer implements Runnable, MessageListener, ExceptionListener { private static final Logger _logger = Logger.getLogger(PingPongProducer.class); - /** - * Used to set up a default message size. - */ - protected static final int DEFAULT_MESSAGE_SIZE = 0; + /** Holds the name of the property to get the test message size from. */ + public static final String MESSAGE_SIZE_PROPNAME = "messagesize"; - /** - * This is set and used when the test is for multiple-destinations - */ - protected static final int DEFAULT_DESTINATION_COUNT = 0; + /** Holds the name of the property to get the ping queue name from. */ + public static final String PING_QUEUE_NAME_PROPNAME = "destinationname"; - protected static final int DEFAULT_RATE = 0; + /** Holds the name of the property to get the test delivery mode from. */ + public static final String PERSISTENT_MODE_PROPNAME = "persistent"; - /** - * Used to define how long to wait between pings. - */ - protected static final long SLEEP_TIME = 250; + /** Holds the name of the property to get the test transactional mode from. */ + public static final String TRANSACTED_PROPNAME = "transacted"; - /** - * Used to define how long to wait before assuming that a ping has timed out. - */ - protected static final long TIMEOUT = 9000; + /** Holds the name of the property to get the test broker url from. */ + public static final String BROKER_PROPNAME = "broker"; - /** - * Holds the name of the destination to send pings on. - */ - protected static final String PING_DESTINATION_NAME = "ping"; + /** Holds the name of the property to get the test broker virtual path. */ + public static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; - /** - * The batch size. - */ - protected static final int DEFAULT_BATCH_SIZE = 100; + /** Holds the name of the property to get the message rate from. */ + public static final String RATE_PROPNAME = "rate"; - protected static final int PREFETCH = 100; - protected static final boolean NO_LOCAL = true; - protected static final boolean EXCLUSIVE = false; + public static final String VERBOSE_OUTPUT_PROPNAME = "verbose"; - /** - * The number of priming loops to run. - */ - protected static final int PRIMING_LOOPS = 3; + /** Holds the true or false depending on wether it is P2P test or PubSub */ + public static final String IS_PUBSUB_PROPNAME = "pubsub"; - /** - * A source for providing sequential unique correlation ids. - */ + public static final String FAIL_AFTER_COMMIT_PROPNAME = "FailAfterCommit"; + + public static final String FAIL_BEFORE_COMMIT_PROPNAME = "FailBeforeCommit"; + + public static final String FAIL_AFTER_SEND_PROPNAME = "FailAfterSend"; + + public static final String FAIL_BEFORE_SEND_PROPNAME = "FailBeforeSend"; + + public static final String FAIL_ONCE_PROPNAME = "FailOnce"; + + public static final String USERNAME_PROPNAME = "username"; + + public static final String PASSWORD_PROPNAME = "password"; + + public static final String SELECTOR_PROPNAME = "selector"; + + public static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount"; + + /** Holds the name of the property to get the waiting timeout for response messages. */ + public static final String TIMEOUT_PROPNAME = "timeout"; + + public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize"; + + /** Used to set up a default message size. */ + public static final int DEFAULT_MESSAGE_SIZE = 0; + + /** Holds the name of the default destination to send pings on. */ + public static final String DEFAULT_PING_DESTINATION_NAME = "ping"; + + /** Defines the default number of destinations to ping. */ + public static final int DEFAULT_DESTINATION_COUNT = 1; + + /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ + public static final int DEFAULT_RATE = 0; + + /** Defines the default wait between pings. */ + public static final long DEFAULT_SLEEP_TIME = 250; + + /** Default time to wait before assuming that a ping has timed out. */ + public static final long DEFAULT_TIMEOUT = 9000; + + /** Defines the default number of pings to send in each transaction when running transactionally. */ + public static final int DEFAULT_TX_BATCH_SIZE = 100; + + /** Defines the default prefetch size to use when consuming messages. */ + public static final int DEFAULT_PREFETCH = 100; + + /** Defines the default value of the no local flag to use when consuming messages. */ + public static final boolean DEFAULT_NO_LOCAL = false; + + /** Defines the default value of the exclusive flag to use when consuming messages. */ + public static final boolean DEFAULT_EXCLUSIVE = false; + + /** Holds the message delivery mode to use for the test. */ + public static final boolean DEFAULT_PERSISTENT_MODE = false; + + /** Holds the transactional mode to use for the test. */ + public static final boolean DEFAULT_TRANSACTED = false; + + /** Holds the default broker url for the test. */ + public static final String DEFAULT_BROKER = "tcp://localhost:5672"; + + /** Holds the default virtual path for the test. */ + public static final String DEFAULT_VIRTUAL_PATH = "test"; + + /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ + public static final boolean DEFAULT_PUBSUB = false; + + /** Holds the default broker log on username. */ + public static final String DEFAULT_USERNAME = "guest"; + + /** Holds the default broker log on password. */ + public static final String DEFAULT_PASSWORD = "guest"; + + /** Holds the default message selector. */ + public static final String DEFAULT_SELECTOR = null; + + /** Holds the default failover after commit test flag. */ + public static final String DEFAULT_FAIL_AFTER_COMMIT = "false"; + + /** Holds the default failover before commit test flag. */ + public static final String DEFAULT_FAIL_BEFORE_COMMIT = "false"; + + /** Holds the default failover after send test flag. */ + public static final String DEFAULT_FAIL_AFTER_SEND = "false"; + + /** Holds the default failover before send test flag. */ + public static final String DEFAULT_FAIL_BEFORE_SEND = "false"; + + /** Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle. */ + public static final String DEFAULT_FAIL_ONCE = "true"; + + /** Holds the default verbose mode. */ + public static final boolean DEFAULT_VERBOSE = false; + + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong idGenerator = new AtomicLong(0L); /** - * Holds a map from message ids to latches on which threads wait for replies. + * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross + * multiple ping producers on the same JVM. */ - private static Map trafficLights = new HashMap(); + private static Map trafficLights = + Collections.synchronizedMap(new HashMap()); + + /** A convenient formatter to use when time stamping output. */ + protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); /** - * Destination where the responses messages will arrive + * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when + * creating multiple ping producers in the same JVM. */ + protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); + + /** Destination where the response messages will arrive. */ private Destination _replyDestination; - /** - * Destination where the producer will be sending message to - */ - private Destination _pingDestination; + /** Destination where the producer will be sending message to. */ + //private Destination _pingDestination; - /** - * Determines whether this producer sends persistent messages from the run method. - */ + /** Determines whether this producer sends persistent messages. */ protected boolean _persistent; - /** - * Holds the message size to send, from the run method. - */ + /** Determines what size of messages this producer sends. */ protected int _messageSize; - /** - * Used to indicate that the ping loop should print out whenever it pings. - */ + /** Used to indicate that the ping loop should print out whenever it pings. */ protected boolean _verbose = false; + /** Holds the session on which ping replies are received. */ protected Session _consumerSession; - /** - * Used to restrict the sending rate to a specified limit. - */ - private Throttle rateLimiter = null; + /** Used to restrict the sending rate to a specified limit. */ + private Throttle _rateLimiter = null; + + /** Holds a message listener that this message listener chains all its messages to. */ + private ChainedMessageListener _chainedMessageListener = null; + + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + protected boolean _isPubSub = false; /** - * The throttler can only reliably restrict to a few hundred cycles per second, so a throttling batch size is used - * to group sends together into batches large enough that the throttler runs slower than that. + * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers + * on the same JVM using this id generator will allow them to ping on the same queues. */ - int _throttleBatchSize; + protected AtomicInteger _queueSharedId = new AtomicInteger(); - private MessageListener _messageListener = null; + /** Used to tell the ping loop when to terminate, it only runs while this is true. */ + protected boolean _publish = true; - private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted, - boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit, - boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate) - throws Exception - { - // Create a connection to the broker. - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); + /** Holds the connection to the broker. */ + private Connection _connection; - setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath)); + /** Holds the producer session, needed to create ping messages. */ + private Session _producerSession; - // Create transactional or non-transactional sessions, based on the command line arguments. - setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE)); - _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + /** Holds the set of destiniations that this ping producer pings. */ + protected List _pingDestinations = new ArrayList(); - _persistent = persistent; - _messageSize = messageSize; - _verbose = verbose; + /** Holds the message producer to send the pings through. */ + protected MessageProducer _producer; - // Set failover interrupts - _failAfterCommit = afterCommit; - _failBeforeCommit = beforeCommit; - _failAfterSend = afterSend; - _failBeforeSend = beforeSend; - _failOnce = failOnce; - _txBatchSize = batchSize; - - // Calculate a throttling batch size and rate such that the throttle runs slower than 100 cycles per second - // and batched sends within each cycle multiply up to give the desired rate. - // - // total rate = throttle rate * batch size. - // 1 < throttle rate < 100 - // 1 < total rate < 20000 - if (rate > DEFAULT_RATE) - { - // Log base 10 over 2 is used here to get a feel for what power of 100 the total rate is. - // As the total rate goes up the powers of 100 the batch size goes up by powers of 100 to keep the - // throttle rate back into the range 1 to 100. - int x = (int) (Math.log10(rate) / 2); - _throttleBatchSize = (int) Math.pow(100, x); - int throttleRate = rate / _throttleBatchSize; - - _logger.debug("rate = " + rate); - _logger.debug("x = " + x); - _logger.debug("_throttleBatchSize = " + _throttleBatchSize); - _logger.debug("throttleRate = " + throttleRate); - - rateLimiter = new Throttle(); - rateLimiter.setRate(throttleRate); - } - } + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ + protected boolean _failBeforeCommit = false; - /** - * Creates a ping pong producer with the specified connection details and type. - * - * @param brokerDetails - * @param username - * @param password - * @param virtualpath - * @param transacted - * @throws Exception All allowed to fall through. This is only test code... - */ - public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, - String destinationName, String selector, boolean transacted, boolean persistent, - int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit, - boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, - int noOfDestinations, int rate, boolean pubsub) throws Exception - { - this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit, - beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate); + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ + protected boolean _failAfterCommit = false; - _destinationCount = noOfDestinations; - setPubSub(pubsub); + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ + protected boolean _failBeforeSend = false; - if (noOfDestinations == DEFAULT_DESTINATION_COUNT) - { - if (destinationName != null) - { - createPingDestination(destinationName); - // Create producer and the consumer - createProducer(); - createConsumer(selector); - } - else - { - _logger.error("Destination is not specified"); - throw new IllegalArgumentException("Destination is not specified"); - } - } - } + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ + protected boolean _failAfterSend = false; - private void createPingDestination(String name) - { - if (isPubSub()) - { - _pingDestination = new AMQTopic(name); - } - else - { - _pingDestination = new AMQQueue(name); - } - } + /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ + protected boolean _failOnce = true; + + /** Holds the number of sends that should be performed in every transaction when using transactions. */ + protected int _txBatchSize = 1; /** - * Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer - * is created with null destination, so that any destination can be specified while sending + * Creates a ping producer with the specified parameters, of which there are many. See their individual comments + * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, + * to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves. * - * @throws JMSException + * @param brokerDetails The URL of the broker to send pings to. + * @param username The username to log onto the broker with. + * @param password The password to log onto the broker with. + * @param virtualpath The virtual host name to use on the broker. + * @param destinationName The name (or root where multiple destinations are used) of the desitination to send + * pings to. + * @param selector The selector to filter replies with. + * @param transacted Indicates whether or not pings are sent and received in transactions. + * @param persistent Indicates whether pings are sent using peristent delivery. + * @param messageSize Specifies the size of ping messages to send. + * @param verbose Indicates that information should be printed to the console on every ping. + * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test failover. + * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test failover. + * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover. + * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test failover. + * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not all. + * @param txBatchSize Specifies the number of pings to send in each transaction. + * @param noOfDestinations The number of destinations to ping. Must be 1 or more. + * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as + * possible, with no rate restriction. + * @param pubsub + * + * @throws Exception Any exceptions are allowed to fall through. */ - public void createProducer() throws JMSException + public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, + String destinationName, String selector, boolean transacted, boolean persistent, int messageSize, + boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend, + boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate, + boolean pubsub) throws Exception { - if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT) - { - // create producer with initial destination as null for test with multiple-destinations - // In this case, a different destination will be used while sending the message - _producer = (MessageProducer) getProducerSession().createProducer(null); - } - else + // Check that one or more destinations were specified. + if (noOfDestinations < 1) { - // Create a producer with known destination to send the pings on. - _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination); - + throw new IllegalArgumentException("There must be at least one destination."); } - _producer.setDisableMessageTimestamp(true); - _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - } + // Create a connection to the broker. + InetAddress address = InetAddress.getLocalHost(); + String clientID = address.getHostName() + System.currentTimeMillis(); - /** - * Creates the temporary destination to listen to the responses - * - * @param selector - * @throws JMSException - */ - public void createConsumer(String selector) throws JMSException - { - // Create a temporary destination to get the pongs on. - if (isPubSub()) - { - _replyDestination = _consumerSession.createTemporaryTopic(); - } - else - { - _replyDestination = _consumerSession.createTemporaryQueue(); - } + _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath); - // Create a message consumer to get the replies with and register this to be called back by it. - MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); - consumer.setMessageListener(this); - } + // Create transactional or non-transactional sessions, based on the command line arguments. + _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - /** - * Creates consumer instances for each destination. This is used when test is being done with multiple destinations. - * - * @param selector - * @throws JMSException - */ - public void createConsumers(String selector) throws JMSException - { - for (int i = 0; i < getDestinationsCount(); i++) + // Set up a throttle to control the send rate, if a rate > 0 is specified. + if (rate > 0) { - MessageConsumer consumer = - getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector); - consumer.setMessageListener(this); + _rateLimiter = new BatchedThrottle(); + _rateLimiter.setRate(rate); } - } - - public Session getConsumerSession() - { - return _consumerSession; - } + // Create the temporary queue for replies. + _replyDestination = _consumerSession.createTemporaryQueue(); - public Destination getPingDestination() - { - return _pingDestination; - } + // Create the producer and the consumers for all reply destinations. + createProducer(); + createPingDestinations(noOfDestinations, selector, destinationName, true); + createReplyConsumers(getReplyDestinations(), selector); - protected void setPingDestination(Destination destination) - { - _pingDestination = destination; + // Keep all the remaining options. + _persistent = persistent; + _messageSize = messageSize; + _verbose = verbose; + _failAfterCommit = afterCommit; + _failBeforeCommit = beforeCommit; + _failAfterSend = afterSend; + _failBeforeSend = beforeSend; + _failOnce = failOnce; + _txBatchSize = txBatchSize; + _isPubSub = pubsub; } /** - * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs + * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs * to be started to bounce the pings back again. - *

- *

The command line takes from 2 to 4 arguments: - *

- *
brokerDetails The broker connection string. - *
virtualPath The virtual path. - *
transacted A boolean flag, telling this client whether or not to use transactions. - *
size The size of ping messages to use, in bytes. - *
- * - * @param args The command line arguments as defined above. + * + * @param args The command line arguments. */ public static void main(String[] args) throws Exception { @@ -373,21 +382,21 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, String brokerDetails = config.getHost() + ":" + config.getPort(); String virtualpath = "/test"; - String selector = config.getSelector(); + String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector(); boolean verbose = true; boolean transacted = config.isTransacted(); boolean persistent = config.usePersistentMessages(); int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE; //int messageCount = config.getMessages(); int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT; - int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_BATCH_SIZE; + int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_TX_BATCH_SIZE; int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE; boolean pubsub = config.isPubSub(); String destName = config.getDestination(); if (destName == null) { - destName = PING_DESTINATION_NAME; + destName = DEFAULT_PING_DESTINATION_NAME; } boolean afterCommit = false; @@ -429,15 +438,13 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, } // Create a ping producer to handle the request/wait/reply cycle. - PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, - destName, selector, transacted, persistent, messageSize, verbose, - afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, - destCount, rate, pubsub); + PingPongProducer pingProducer = + new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector, + transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, + beforeSend, failOnce, batchSize, destCount, rate, pubsub); pingProducer.getConnection().start(); - // Run a few priming pings to remove warm up time from test results. - //pingProducer.prime(PRIMING_LOOPS); // Create a shutdown hook to terminate the ping-pong producer. Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); @@ -450,50 +457,107 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, pingThread.join(); } - private static void usage() + /** + * Convenience method for a short pause. + * + * @param sleepTime The time in milliseconds to pause for. + */ + public static void pause(long sleepTime) { - System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port" + - "-destinationname : queue/topic name\n" + - "-transacted : (true/false). Default is false\n" + - "-persistent : (true/false). Default is false\n" + - "-pubsub : (true/false). Default is false\n" + - "-selector : selector string\n" + - "-payload : paylaod size. Default is 0\n" + - //"-messages : no of messages to be sent (if 0, the ping loop will run indefinitely)\n" + - "-destinationscount : no of destinations for multi-destinations test\n" + - "-batchsize : batch size\n" + - "-rate : thruput rate\n"); + if (sleepTime > 0) + { + try + { + Thread.sleep(sleepTime); + } + catch (InterruptedException ie) + { } + } } /** - * Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client - * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling - * this a few times, in order to prime the JVMs JIT compilation. + * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply + * to destination of this pinger. * - * @param x The number of priming loops to run. - * @throws JMSException All underlying exceptions are allowed to fall through. + * @return The single reply to destination of this pinger, wrapped in a list. */ - public void prime(int x) throws JMSException + public List getReplyDestinations() { - for (int i = 0; i < x; i++) + _logger.debug("public List getReplyDestinations(): called"); + + List replyDestinations = new ArrayList(); + replyDestinations.add(_replyDestination); + + return replyDestinations; + } + + /** + * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery + * flag is set accoring the ping producer creation options. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void createProducer() throws JMSException + { + _logger.debug("public void createProducer(): called"); + + _producer = (MessageProducer) _producerSession.createProducer(null); + //_producer.setDisableMessageTimestamp(true); + _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } + + /** + * Creates consumers for the specified number of destinations. The destinations themselves are also created by + * this method. + * + * @param noOfDestinations The number of destinations to create consumers for. + * @param selector The message selector to filter the consumers with. + * @param rootName The root of the name, or actual name if only one is being created. + * @param unique true to make the destinations unique to this pinger, false to share + * the numbering with all pingers on the same JVM. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique) + throws JMSException + { + _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = " + + unique + "): called"); + + // Create the desired number of ping destinations and consumers for them. + for (int i = 0; i < noOfDestinations; i++) { - // Create and send a small message. - Message first = getTestMessage(_replyDestination, 0, false); - sendMessage(first); + AMQDestination destination = null; - commitTx(); + int id; - try + // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. + if (unique) { - Thread.sleep(100); + id = _queueJVMSequenceID.incrementAndGet(); } - catch (InterruptedException ignore) + else { - + id = _queueSharedId.incrementAndGet(); } - } + // Check if this is a pub/sub pinger, in which case create topics. + if (_isPubSub) + { + AMQShortString name = new AMQShortString(rootName + id); + destination = new AMQTopic(name); + } + // Otherwise this is a p2p pinger, in which case create queues. + else + { + AMQShortString name = new AMQShortString(rootName + id); + destination = new AMQQueue(name, name, false, false, false); + } + // Keep the destination. + _pingDestinations.add(destination); + } } /** @@ -505,52 +569,64 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, */ public void onMessage(Message message) { + _logger.debug("public void onMessage(Message message): called"); try { - - // Store the reply, if it has a correlation id that is expected. + // Extract the messages correlation id. String correlationID = message.getJMSCorrelationID(); + _logger.debug("correlationID = " + correlationID); - if (_verbose) - { - _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID); - //_logger.debug("Received from : " + message.getJMSDestination()); - } - - // Turn the traffic light to green. + // Countdown on the traffic light if there is one for the matching correlation id. CountDownLatch trafficLight = trafficLights.get(correlationID); if (trafficLight != null) { - if (_messageListener != null) - { - synchronized (trafficLight) - { - _messageListener.onMessage(message); - trafficLight.countDown(); - } - } - else + _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + + // Decrement the countdown latch. Before this point, it is possible that two threads might enter this + // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block + // ensures that each thread will get a unique value for the remaining messages. + long trueCount = -1; + long remainingCount = -1; + + synchronized (trafficLight) { trafficLight.countDown(); - } - _logger.trace("Reply was expected, decrementing the latch for the id."); + trueCount = trafficLight.getCount(); + remainingCount = trueCount - 1; - long remainingCount = trafficLight.getCount(); + _logger.debug("remainingCount = " + remainingCount); + _logger.debug("trueCount = " + trueCount); - if ((remainingCount % _txBatchSize) == 0) - { - commitTx(getConsumerSession()); - } + // Commit on transaction batch size boundaries. At this point in time the waiting producer remains + // blocked, even on the last message. + if ((remainingCount % _txBatchSize) == 0) + { + commitTx(_consumerSession); + } + // Forward the message and remaining count to any interested chained message listener. + if (_chainedMessageListener != null) + { + _chainedMessageListener.onMessage(message, (int) remainingCount); + } + + // Check if this is the last message, in which case release any waiting producers. This is done + // after the transaction has been committed and any listeners notified. + if (trueCount == 1) + { + trafficLight.countDown(); + } + } } else { - _logger.trace("There was no thread waiting for reply: " + correlationID); + _logger.debug("There was no thread waiting for reply: " + correlationID); } + // Print out ping times for every message in verbose mode only. if (_verbose) { Long timestamp = message.getLongProperty("timestamp"); @@ -566,32 +642,70 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, { _logger.warn("There was a JMSException: " + e.getMessage(), e); } + + _logger.debug("public void onMessage(Message message): ending"); } /** * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out - * before a reply arrives, then a null reply is returned from this method. + * before a reply arrives, then a null reply is returned from this method. This method generates a new unqiue + * correlation id for the messages. * * @param message The message to send. * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. + * * @return The number of replies received. This may be less than the number sent if the timeout terminated the * wait for all prematurely. + * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException { - String messageCorrelationId = null; + _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + + timeout + "): called"); + + // Create a unique correlation id to put on the messages before sending them. + String messageCorrelationId = Long.toString(idGenerator.incrementAndGet()); + + return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId); + } + + /** + * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out + * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify + * the correlation id. + * + * @param message The message to send. + * @param numPings The number of ping messages to send. + * @param timeout The timeout in milliseconds. + * @param messageCorrelationId The message correlation id. + * + * @return The number of replies received. This may be less than the number sent if the timeout terminated the + * wait for all prematurely. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) + throws JMSException, InterruptedException + { + _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); try { - // Put a unique correlation id on the message before sending it. - messageCorrelationId = Long.toString(getNewID()); - + // Create a count down latch to count the number of replies with. This is created before the messages are + // sent so that the replies cannot be received before the count down is created. + // One is added to this, so that the last reply becomes a special case. The special case is that the + // chained message listener must be called before this sender can be unblocked, but that decrementing the + // countdown needs to be done before the chained listener can be called. + CountDownLatch trafficLight = new CountDownLatch(numPings + 1); + trafficLights.put(messageCorrelationId, trafficLight); + + // Send the specifed number of messages. pingNoWaitForReply(message, numPings, messageCorrelationId); - CountDownLatch trafficLight = trafficLights.get(messageCorrelationId); - // Block the current thread until a reply to the message is received, or it times out. + // Block the current thread until replies to all the message are received, or it times out. trafficLight.await(timeout, TimeUnit.MILLISECONDS); // Work out how many replies were receieved. @@ -606,45 +720,37 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, _logger.info("Got all replies on id, " + messageCorrelationId); } - commitTx(getConsumerSession()); + commitTx(_consumerSession); + + _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); return numReplies; } + // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived, + // so will be a memory leak if this is not done. finally { - removeLock(messageCorrelationId); + trafficLights.remove(messageCorrelationId); } } - public long getNewID() - { - return idGenerator.incrementAndGet(); - } - - public CountDownLatch removeLock(String correlationID) - { - return trafficLights.remove(correlationID); - } - - - /* - * Sends the specified ping message but does not wait for a correlating reply. - * - * @param message The message to send. - * @param numPings The number of pings to send. - * @return The reply, or null if no reply arrives before the timeout. - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException + /** + * Sends the specified number of ping messages and does not wait for correlating replies. + * + * @param message The message to send. + * @param numPings The number of pings to send. + * @param messageCorrelationId A correlation id to place on all messages sent. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException { - // Create a count down latch to count the number of replies with. This is created before the message is sent - // so that the message is not received before the count down is created. - CountDownLatch trafficLight = new CountDownLatch(numPings); - trafficLights.put(messageCorrelationId, trafficLight); + _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + + ", String messageCorrelationId = " + messageCorrelationId + "): called"); message.setJMSCorrelationID(messageCorrelationId); - // Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the + // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is // needed. boolean committed = false; @@ -652,55 +758,46 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, // Send all of the ping messages. for (int i = 0; i < numPings; i++) { - // Reset the committed flag to indicate that there are uncommitted message. + // Reset the committed flag to indicate that there are uncommitted messages. committed = false; // Re-timestamp the message. message.setLongProperty("timestamp", System.currentTimeMillis()); - // Check if the test is with multiple-destinations, in which case round robin the destinations - // as the messages are sent. - if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT) - { - sendMessage(getDestination(i % getDestinationsCount()), message); - } - else - { - sendMessage(message); - } + // Round robin the destinations as the messages are sent. + //return _destinationCount; + sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message); - // Apply message rate throttling if a rate limit has been set up and the throttling batch limit has been - // reached. See the comment on the throttle batch size for information about the use of batches here. - if ((rateLimiter != null) && ((i % _throttleBatchSize) == 0)) + // Apply message rate throttling if a rate limit has been set up. + if (_rateLimiter != null) { - rateLimiter.throttle(); + _rateLimiter.throttle(); } // Call commit every time the commit batch size is reached. if ((i % _txBatchSize) == 0) { - commitTx(); + commitTx(_producerSession); committed = true; } + + // Spew out per message timings on every message sonly in verbose mode. + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + + messageCorrelationId); + } } // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages. if (!committed) { - commitTx(); - } - - // Spew out per message timings only in verbose mode. - if (_verbose) - { - _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); + commitTx(_producerSession); } - } /** - * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and - * waits for replies and inserts short pauses in between each. + * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. */ public void pingLoop() { @@ -711,10 +808,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, msg.setLongProperty("timestamp", System.currentTimeMillis()); // Send the message and wait for a reply. - pingAndWaitForReply(msg, DEFAULT_BATCH_SIZE, TIMEOUT); + pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT); // Introduce a short pause if desired. - pause(SLEEP_TIME); + pause(DEFAULT_SLEEP_TIME); } catch (JMSException e) { @@ -728,79 +825,299 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, } } - public Destination getReplyDestination() + /*public Destination getReplyDestination() { return _replyDestination; + }*/ + + /** + * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set + * here. + * + * @param messageListener The chained message listener. + */ + public void setChainedMessageListener(ChainedMessageListener messageListener) + { + _chainedMessageListener = messageListener; } - protected void setReplyDestination(Destination destination) + /** + * Removes any chained message listeners from this pinger. + */ + public void removeChainedMessageListener() { - _replyDestination = destination; + _chainedMessageListener = null; } - public void setMessageListener(MessageListener messageListener) + /** + * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. + * + * @param replyQueue The reply-to destination for the message. + * @param messageSize The desired size of the message in bytes. + * @param persistent true if the message should use persistent delivery, false otherwise. + * + * @return A freshly generated test message. + * + * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. + */ + public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException { - _messageListener = messageListener; + ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); + // Timestamp the message. + //msg.setLongProperty("timestamp", System.currentTimeMillis()); + + return msg; } - public CountDownLatch getEndLock(String correlationID) + /** + * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this + * flag has been cleared. + */ + public void stop() { - return trafficLights.get(correlationID); + _publish = false; } - /* - * When the test is being performed with multiple queues, then this method will be used, which has a loop to - * pick up the next queue from the queues list and sends message to it. - * - * @param message - * @param numPings - * @throws JMSException - */ - /*private void pingMultipleQueues(Message message, int numPings) throws JMSException + /** + * Implements a ping loop that repeatedly pings until the publish flag becomes false. + */ + public void run() { - int queueIndex = 0; - for (int i = 0; i < numPings; i++) + // Keep running until the publish flag is cleared. + while (_publish) { - // Re-timestamp the message. - message.setLongProperty("timestamp", System.currentTimeMillis()); + pingLoop(); + } + } - sendMessage(getDestination(queueIndex++), message); + /** + * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the + * connection, this clears the publish flag which in turn will halt the ping loop. + * + * @param e The exception that triggered this callback method. + */ + public void onException(JMSException e) + { + _publish = false; + _logger.debug("There was a JMSException: " + e.getMessage(), e); + } - // reset the counter to get the first queue - if (queueIndex == (getDestinationsCount() - 1)) + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered + * with the runtime system as a shutdown hook. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() { - queueIndex = 0; - } + public void run() + { + stop(); + } + }); + } + + /** + * Gets the underlying connection that this ping client is running on. + * + * @return The underlying connection that this ping client is running on. + */ + public Connection getConnection() + { + return _connection; + } + + /** + * Creates consumers for the specified destinations and registers this pinger to listen to their messages. + * + * @param destinations The destinations to listen to. + * @param selector A selector to filter the messages with. + * + * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. + */ + public void createReplyConsumers(Collection destinations, String selector) throws JMSException + { + _logger.debug("public void createReplyConsumers(Collection destinations = " + destinations + + ", String selector = " + selector + "): called"); + + for (Destination destination : destinations) + { + // Create a consumer for the destination and set this pinger to listen to its messages. + MessageConsumer consumer = + _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE, + selector); + consumer.setMessageListener(this); } - }*/ + } /** - * A connection listener that logs out any failover complete events. Could do more interesting things with this - * at some point... + * Closes the pingers connection. + * + * @throws JMSException All JMSException are allowed to fall through. */ - public static class FailoverNotifier implements ConnectionListener + public void close() throws JMSException { - public void bytesSent(long count) + _logger.debug("public void close(): called"); + + if (_connection != null) { + _connection.close(); } + } + + /** + * Convenience method to commit the transaction on the specified session. If the session to commit on is not + * a transactional session, this method does nothing (unless the failover after send flag is set). + * + *

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit + * is applied. This flag applies whether the pinger is transactional or not. + * + *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the + * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker + * after the commit is applied. These flags will only apply if using a transactional pinger. + * + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + * + * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit + * method, because commits only apply to transactional pingers, but fail after send applied to transactional + * and non-transactional alike. + */ + protected void commitTx(Session session) throws JMSException + { + _logger.debug("protected void commitTx(Session session): called"); - public void bytesReceived(long count) + _logger.trace("Batch time reached"); + if (_failAfterSend) { + _logger.trace("Batch size reached"); + if (_failOnce) + { + _failAfterSend = false; + } + + _logger.trace("Failing After Send"); + doFailover(); } - public boolean preFailover(boolean redirect) + if (session.getTransacted()) { - return true; //Allow failover + try + { + if (_failBeforeCommit) + { + if (_failOnce) + { + _failBeforeCommit = false; + } + + _logger.trace("Failing Before Commit"); + doFailover(); + } + + session.commit(); + + if (_failAfterCommit) + { + if (_failOnce) + { + _failAfterCommit = false; + } + + _logger.trace("Failing After Commit"); + doFailover(); + } + + _logger.trace("Session Commited."); + } + catch (JMSException e) + { + _logger.trace("JMSException on commit:" + e.getMessage(), e); + + // Warn that the bounce back client is not available. + if (e.getLinkedException() instanceof AMQNoConsumersException) + { + _logger.debug("No consumers on queue."); + } + + try + { + session.rollback(); + _logger.trace("Message rolled back."); + } + catch (JMSException jmse) + { + _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } + } } + } - public boolean preResubscribe() + /** + * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination + * of the ping producer. If an explicit destination is set, this overrides the default. + * + * @param destination The destination to send to. + * @param message The message to send. + * + * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through. + */ + protected void sendMessage(Destination destination, Message message) throws JMSException + { + if (_failBeforeSend) { - return true; // Allow resubscription + if (_failOnce) + { + _failBeforeSend = false; + } + + _logger.trace("Failing Before Send"); + doFailover(); } - public void failoverComplete() + if (destination == null) + { + _producer.send(message); + } + else { - _logger.info("App got failover complete callback."); + _producer.send(destination, message); } } + + /** + * Prompts the user to terminate the broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + */ + protected void doFailover() + { + System.out.println("Kill Broker now then press return"); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + } + + /** + * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's + * {@link PingPongProducer#onMessage} method is called, the chained listener set through the + * {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected + * count of messages with that correlation id. + * + * Provided only one pinger is producing messages with that correlation id, the chained listener will always be + * given unique message counts. It will always be called while the producer waiting for all messages to arrive is + * still blocked. + */ + public static interface ChainedMessageListener + { + public void onMessage(Message message, int remainingCount) throws JMSException; + } } diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java deleted file mode 100644 index bab732e2a6..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.Session; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.*; -import java.net.InetAddress; -import java.net.UnknownHostException; - -public class ServiceProvidingClient -{ - private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class); - - private MessageProducer _destinationProducer; - - private Destination _responseDest; - - private AMQConnection _connection; - - private Session _session; - private Session _producerSession; - - private boolean _isTransactional; - - public ServiceProvidingClient(String brokerDetails, String username, String password, - String clientName, String virtualPath, String serviceName, - final int deliveryMode, boolean transactedMode, String selector) - throws AMQException, JMSException, URLSyntaxException - { - _isTransactional = transactedMode; - - _logger.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent") - + "\t isTransactional: " + _isTransactional); - - _connection = new AMQConnection(brokerDetails, username, password, clientName, virtualPath); - _connection.setConnectionListener(new ConnectionListener() - { - - public void bytesSent(long count) - { - } - - public void bytesReceived(long count) - { - } - - public boolean preFailover(boolean redirect) - { - return true; - } - - public boolean preResubscribe() - { - return true; - } - - public void failoverComplete() - { - _logger.info("App got failover complete callback"); - } - }); - _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); - _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); - - _logger.info("Service (queue) name is '" + serviceName + "'..."); - - AMQQueue destination = new AMQQueue(serviceName); - - MessageConsumer consumer = _session.createConsumer(destination, - 100, true, false, selector); - - consumer.setMessageListener(new MessageListener() - { - private int _messageCount; - - public void onMessage(Message message) - { - //_logger.info("Got message '" + message + "'"); - TextMessage tm = (TextMessage) message; - try - { - Destination responseDest = tm.getJMSReplyTo(); - if (responseDest == null) - { - _logger.info("Producer not created because the response destination is null."); - return; - } - - if (!responseDest.equals(_responseDest)) - { - _responseDest = responseDest; - - _logger.info("About to create a producer"); - _destinationProducer = _producerSession.createProducer(responseDest); - _destinationProducer.setDisableMessageTimestamp(true); - _destinationProducer.setDeliveryMode(deliveryMode); - _logger.info("After create a producer"); - } - } - catch (JMSException e) - { - _logger.error("Error creating destination"); - } - _messageCount++; - if (_messageCount % 1000 == 0) - { - _logger.info("Received message total: " + _messageCount); - _logger.info("Sending response to '" + _responseDest + "'"); - } - - try - { - String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText(); - TextMessage msg = _producerSession.createTextMessage(payload); - if (tm.propertyExists("timeSent")) - { - _logger.info("timeSent property set on message"); - long timesent = tm.getLongProperty("timeSent"); - _logger.info("timeSent value is: " + timesent); - msg.setLongProperty("timeSent", timesent); - } - - _destinationProducer.send(msg); - - if (_isTransactional) - { - _producerSession.commit(); - } - if (_isTransactional) - { - _session.commit(); - } - if (_messageCount % 1000 == 0) - { - _logger.info("Sent response to '" + _responseDest + "'"); - } - } - catch (JMSException e) - { - _logger.error("Error sending message: " + e, e); - } - } - }); - } - - public void run() throws JMSException - { - _connection.start(); - _logger.info("Waiting..."); - } - - public static void main(String[] args) - { - _logger.info("Starting..."); - - if (args.length < 5) - { - System.out.println("Usage: serviceProvidingClient [ ] [selector]"); - System.exit(1); - } - String clientId = null; - try - { - InetAddress address = InetAddress.getLocalHost(); - clientId = address.getHostName() + System.currentTimeMillis(); - } - catch (UnknownHostException e) - { - _logger.error("Error: " + e, e); - } - - int deliveryMode = DeliveryMode.NON_PERSISTENT; - boolean transactedMode = false; - - if (args.length > 7) - { - deliveryMode = args[args.length - 2].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT - : DeliveryMode.NON_PERSISTENT; - - transactedMode = args[args.length - 1].toUpperCase().charAt(0) == 'T' ? true : false; - } - - String selector = null; - if ((args.length == 8) || (args.length == 7)) - { - selector = args[args.length - 1]; - } - - try - { - ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2], - clientId, args[3], args[4], - deliveryMode, transactedMode, selector); - client.run(); - } - catch (JMSException e) - { - _logger.error("Error: " + e, e); - } - catch (AMQException e) - { - _logger.error("Error: " + e, e); - } - catch (URLSyntaxException e) - { - _logger.error("Error: " + e, e); - } - } -} - diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java deleted file mode 100644 index 57512929c1..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.message.TestMessageFactory; -import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.jms.MessageConsumer; -import org.apache.qpid.jms.MessageProducer; -import org.apache.qpid.jms.Session; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.*; -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * A client that behaves as follows: - *

  • Connects to a queue, whose name is specified as a cmd-line argument
  • - *
  • Creates a temporary queue
  • - *
  • Creates messages containing a property(reply-to) that is the name of the temporary queue
  • - *
  • Fires off a message on the original queue and registers the callbackHandler to listen to the response on the temporary queue
  • - *
  • Start the loop to send all messages
  • - *
  • CallbackHandler keeps listening to the responses and exits if all the messages have been received back or - * if the waiting time for next message is elapsed
  • - *
- */ -public class ServiceRequestingClient implements ExceptionListener -{ - private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class); - - private long _messageIdentifier = 0; - - // time for which callbackHandler should wait for a message before exiting. Default time= 60 secs - private static long _callbackHandlerWaitingTime = 60000; - - private String MESSAGE_DATA; - - private AMQConnection _connection; - - private Session _session; - private Session _producerSession; - - private long _averageLatency; - - private int _messageCount; - private boolean _isTransactional; - - private volatile boolean _completed; - - private AMQDestination _tempDestination; - - private MessageProducer _producer; - - private Object _waiter; - - private class CallbackHandler implements MessageListener - { - private int _actualMessageCount; - - private long _startTime; - // The time when the last message was received by the callbackHandler - private long _messageReceivedTime = 0; - private Object _timerCallbackHandler = new Object(); - - public CallbackHandler(long startTime) - { - _startTime = startTime; - // Start the timer thread, which will keep checking if test should exit because the waiting time has elapsed - (new Thread(new TimerThread())).start(); - } - - public void onMessage(Message m) - { - _messageReceivedTime = System.currentTimeMillis(); - if (_log.isDebugEnabled()) - { - _log.debug("Message received: " + m); - } - try - { - m.getPropertyNames(); - if (m.propertyExists("timeSent")) - { - long timeSent = m.getLongProperty("timeSent"); - if (_averageLatency == 0) - { - _averageLatency = _messageReceivedTime - timeSent; - _log.info("Latency " + _averageLatency); - } - else - { - _log.info("Individual latency: " + (_messageReceivedTime - timeSent)); - _averageLatency = (_averageLatency + (_messageReceivedTime - timeSent)) / 2; - _log.info("Average latency now: " + _averageLatency); - } - } - if(_isTransactional) - { - _session.commit(); - } - } - catch (JMSException e) - { - _log.error("Error getting latency data: " + e, e); - } - _actualMessageCount++; - if (_actualMessageCount % 1000 == 0) - { - _log.info("Received message count: " + _actualMessageCount); - } - - checkForMessageID(m); - - if (_actualMessageCount == _messageCount) - { - finishTesting(_actualMessageCount); - } - } - - /** - * sets completed flag to true, closes the callbackHandler connection and notifies the waiter thread, - * so that the callbackHandler can finish listening for messages. This causes the test to finish. - * @param receivedMessageCount - */ - private void finishTesting(int receivedMessageCount) - { - _completed = true; - notifyWaiter(); - notifyTimerThread(); - - long timeTaken = System.currentTimeMillis() - _startTime; - _log.info("***** Result *****"); - _log.info("Total messages received = " + receivedMessageCount); - _log.info("Total time taken to receive " + receivedMessageCount + " messages was " + - timeTaken + "ms, equivalent to " + - (receivedMessageCount / (timeTaken / 1000.0)) + " messages per second"); - - try - { - _connection.close(); - _log.info("Connection closed"); - } - catch (JMSException e) - { - _log.error("Error closing connection"); - } - } - - private void notifyTimerThread() - { - if (_timerCallbackHandler != null) - { - synchronized (_timerCallbackHandler) - { - _timerCallbackHandler.notify(); - } - } - } - - /** - * Thread class implementing the timer for callbackHandler. The thread will exit the test if the waiting time - * has elapsed before next message is received. - */ - private class TimerThread implements Runnable - { - public void run() - { - do - { - try - { - synchronized(_timerCallbackHandler) - { - _timerCallbackHandler.wait(_callbackHandlerWaitingTime); - } - } - catch (InterruptedException ignore) - { - - } - - // exit if callbackHandler has received all messages - if (_completed) - { - return; - } - } - while ((System.currentTimeMillis() - _messageReceivedTime) < _callbackHandlerWaitingTime); - - // waiting time has elapsed, so exit the test - _log.info(""); - _log.info("Exited after waiting for " + _callbackHandlerWaitingTime/1000 + " secs"); - finishTesting(_actualMessageCount); - } - } - } // end of CallbackHandler class - - /** - * Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID - * of previous message. - * @param receivedMsg - */ - private void checkForMessageID(Message receivedMsg) - { - try - { - JMSTextMessage msg = (JMSTextMessage)receivedMsg; - if (! (msg.getDeliveryTag() == _messageIdentifier + 1)) - { - _log.info("Out of sequence message received. Previous AMQ MessageID= " + _messageIdentifier + - ", Received AMQ messageID= " + receivedMsg.getJMSMessageID()); - } - _messageIdentifier = msg.getDeliveryTag(); - } - catch (Exception ex) - { - _log.error("Error in checking messageID ", ex); - } - - } - - private void notifyWaiter() - { - if (_waiter != null) - { - synchronized (_waiter) - { - _waiter.notify(); - } - } - } - - public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password, - String vpath, String commandQueueName, - int deliveryMode, boolean transactedMode, - final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException - { - _isTransactional = transactedMode; - - _log.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent")); - _log.info("isTransactional: " + _isTransactional); - - _messageCount = messageCount; - MESSAGE_DATA = TestMessageFactory.createMessagePayload(messageDataLength); - try - { - createConnection(brokerHosts, clientID, username, password, vpath); - _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); - _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); - - _connection.setExceptionListener(this); - - AMQQueue destination = new AMQQueue(commandQueueName); - _producer = (MessageProducer) _producerSession.createProducer(destination); - _producer.setDisableMessageTimestamp(true); - _producer.setDeliveryMode(deliveryMode); - - _tempDestination = new AMQQueue("TempResponse" + - Long.toString(System.currentTimeMillis()), true); - MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true, - true, null); - - //Send first message, then wait a bit to allow the provider to get initialised - TextMessage first = _session.createTextMessage(MESSAGE_DATA); - first.setJMSReplyTo(_tempDestination); - _producer.send(first); - if (_isTransactional) - { - _producerSession.commit(); - } - try - { - Thread.sleep(1000); - } - catch (InterruptedException ignore) - { - } - - //now start the clock and the test... - final long startTime = System.currentTimeMillis(); - - messageConsumer.setMessageListener(new CallbackHandler(startTime)); - } - catch (JMSException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - /** - * Run the test and notify an object upon receipt of all responses. - * - * @param waiter the object that will be notified - * @throws JMSException - */ - public void run(Object waiter) throws JMSException - { - _waiter = waiter; - _connection.start(); - for (int i = 1; i < _messageCount; i++) - { - TextMessage msg = _producerSession.createTextMessage(MESSAGE_DATA + i); - msg.setJMSReplyTo(_tempDestination); - if (i % 1000 == 0) - { - long timeNow = System.currentTimeMillis(); - msg.setLongProperty("timeSent", timeNow); - } - _producer.send(msg); - if (_isTransactional) - { - _producerSession.commit(); - } - - } - _log.info("Finished sending " + _messageCount + " messages"); - } - - public boolean isCompleted() - { - return _completed; - } - - private void createConnection(String brokerHosts, String clientID, String username, String password, - String vpath) throws AMQException, URLSyntaxException - { - _connection = new AMQConnection(brokerHosts, username, password, clientID, vpath); - } - - /** - * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank - * means the server will allocate a name. - */ - public static void main(String[] args) - { - if ((args.length < 6) || (args.length == 8)) - { - System.err.println("Usage: ServiceRequestingClient " + - " [] " + - "[ ] " + - "[]"); - System.exit(1); - } - try - { - int messageSize = 4096; - boolean transactedMode = false; - int deliveryMode = DeliveryMode.NON_PERSISTENT; - - if (args.length > 6) - { - messageSize = Integer.parseInt(args[6]); - } - if (args.length > 7) - { - deliveryMode = args[7].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT - : DeliveryMode.NON_PERSISTENT; - - transactedMode = args[8].toUpperCase().charAt(0) == 'T' ? true : false; - } - - if (args.length > 9) - { - _callbackHandlerWaitingTime = Long.parseLong(args[9]) * 1000; - } - - _log.info("Each message size = " + messageSize + " bytes"); - - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); - ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3], - args[4], deliveryMode, transactedMode, Integer.parseInt(args[5]), - messageSize); - Object waiter = new Object(); - client.run(waiter); - - // Start a thread to - synchronized (waiter) - { - while (!client.isCompleted()) - { - waiter.wait(); - } - } - } - catch (UnknownHostException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (Exception e) - { - System.err.println("Error in client: " + e); - e.printStackTrace(); - } - } - - /** - * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) - */ - public void onException(JMSException e) - { - System.err.println(e.getMessage()); - e.printStackTrace(System.err); - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index bd39ec34a1..e10e6353b7 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -1,311 +1,302 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.ping; -//import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -//import uk.co.thebadgerset.junit.extensions.TimingController; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.ObjectMessage; -import junit.framework.Assert; import junit.framework.Test; import junit.framework.TestSuite; + import org.apache.log4j.Logger; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.CountDownLatch; +import org.apache.qpid.requestreply.PingPongProducer; +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware +/** + * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller + * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test + * that it extends because it can output timings as replies are received, rather than waiting until all expected replies + * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending + * and recieving clients working asynchronously. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Send many ping messages and output timings asynchronously on batches received. + *
+ */ +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware { -// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; -// private TimingController _timingController; + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; -// private AsyncMessageListener _listener; + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ public PingAsyncTestPerf(String name) { super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testAsyncPingOk(int numPings) throws Exception + { + _logger.debug("public void testAsyncPingOk(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + ObjectMessage msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } } -// /** -// * Compile all the tests into a test suite. -// */ -// public static Test suite() -// { -// // Build a new test suite -// TestSuite suite = new TestSuite("Ping Performance Tests"); -// -// // Run performance tests in read committed mode. -// suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); -// -// return suite; -// } -// -// protected void setUp() throws Exception -// { -// // Create the test setups on a per thread basis, only if they have not already been created. -// -// if (threadSetup.get() == null) -// { -// PerThreadSetup perThreadSetup = new PerThreadSetup(); -// -// // Extract the test set up paramaeters. -// String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); -// String username = "guest"; -// String password = "guest"; -// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); -// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); -// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); -// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); -// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); -// String selector = null; -// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); -// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); -// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); -// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); -// -// -// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); -// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); -// boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); -// boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); -// boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); -// -// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); -// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); -// -// // This is synchronized because there is a race condition, which causes one connection to sleep if -// // all threads try to create connection concurrently -// synchronized (this) -// { -// // Establish a client to ping a Queue and listen the reply back from same Queue -// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, -// destinationname, selector, transacted, persistent, -// messageSize, verbose, -// afterCommit, beforeCommit, afterSend, beforeSend, failOnce, -// commitbatchSize, destinationscount, rate, pubsub); -// } -// -// // Attach the per-thread set to the thread. -// threadSetup.set(perThreadSetup); -// -// _listener = new AsyncMessageListener(batchSize); -// -// perThreadSetup._pingItselfClient.setMessageListener(_listener); -// // Start the client connection -// perThreadSetup._pingItselfClient.getConnection().start(); -// -// } -// } -// -// -// public void testAsyncPingOk(int numPings) -// { -// _timingController = this.getTimingController(); -// -// _listener.setTotalMessages(numPings); -// -// PerThreadSetup perThreadSetup = threadSetup.get(); -// if (numPings == 0) -// { -// _logger.error("Number of pings requested was zero."); -// fail("Number of pings requested was zero."); -// } -// -// // Generate a sample message. This message is already time stamped and has its reply-to destination set. -// ObjectMessage msg = null; -// -// try -// { -// msg = perThreadSetup._pingItselfClient.getTestMessage(null, -// Integer.parseInt(testParameters.getProperty( -// MESSAGE_SIZE_PROPNAME)), -// Boolean.parseBoolean(testParameters.getProperty( -// PERSISTENT_MODE_PROPNAME))); -// } -// catch (JMSException e) -// { -// -// } -// -// // start the test -// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); -// -// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); -// -// try -// { -// _logger.debug("Sending messages"); -// -// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); -// -// _logger.debug("All sent"); -// } -// catch (JMSException e) -// { -// e.printStackTrace(); -// Assert.fail("JMS Exception Received" + e); -// } -// catch (InterruptedException e) -// { -// e.printStackTrace(); -// } -// -// try -// { -// _logger.debug("Awating test finish"); -// -// perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS); -// -// if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0) -// { -// _logger.error("Timeout occured"); -// } -// //Allow the time out to exit the loop. -// } -// catch (InterruptedException e) -// { -// //ignore -// _logger.error("Awaiting test end was interrupted."); -// -// } -// -// // Fail the test if the timeout was exceeded. -// int numReplies = numPings - (int) perThreadSetup._pingItselfClient.removeLock(correlationID).getCount(); -// -// _logger.info("Test Finished"); -// -// if (numReplies != numPings) -// { -// try -// { -// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); -// } -// catch (JMSException e) -// { -// _logger.error("Error commiting received messages", e); -// } -// try -// { -// if (_timingController != null) -// { -// _logger.trace("Logging missing message count"); -// _timingController.completeTest(false, numPings - numReplies); -// } -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); -// } -// } -// -// public void setTimingController(TimingController timingController) -// { -// _timingController = timingController; -// } -// -// public TimingController getTimingController() -// { -// return _timingController; -// } -// -// -// private class AsyncMessageListener implements MessageListener -// { -// private volatile int _totalMessages; -// private int _batchSize; -// PerThreadSetup _perThreadSetup; -// -// public AsyncMessageListener(int batchSize) -// { -// this(batchSize, -1); -// } -// -// public AsyncMessageListener(int batchSize, int totalMessages) -// { -// _batchSize = batchSize; -// _totalMessages = totalMessages; -// _perThreadSetup = threadSetup.get(); -// } -// -// public void setTotalMessages(int newTotal) -// { -// _totalMessages = newTotal; -// } -// -// public void onMessage(Message message) -// { -// try -// { -// _logger.trace("Message Received"); -// -// CountDownLatch count = _perThreadSetup._pingItselfClient.getEndLock(message.getJMSCorrelationID()); -// -// if (count != null) -// { -// int messagesLeft = (int) count.getCount() - 1;// minus one as we haven't yet counted the current message -// -// if ((messagesLeft % _batchSize) == 0) -// { -// doDone(_batchSize); -// } -// else if (messagesLeft == 0) -// { -// doDone(_totalMessages % _batchSize); -// } -// } -// -// } -// catch (JMSException e) -// { -// _logger.warn("There was a JMSException", e); -// } -// -// } -// -// private void doDone(int messageCount) -// { -// _logger.trace("Messages received:" + messageCount); -// _logger.trace("Total Messages :" + _totalMessages); -// -// try -// { -// if (_timingController != null) -// { -// _timingController.completeTest(true, messageCount); -// } -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// } -// -// } + /** + * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the + * pinger, in order to receive notifications about every message received and the number remaining to be + * received. Whenever the number remaining crosses a batch size boundary this results listener outputs + * a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + + tc.completeTest(true, receivedInBatch); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } } diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index fbc67881c2..c4e72f4bb6 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -1,7 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.ping; -import java.util.Properties; - import javax.jms.*; import junit.framework.Assert; @@ -10,169 +28,85 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; +import org.apache.qpid.requestreply.PingPongProducer; + import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +import uk.co.thebadgerset.junit.extensions.TestThreadAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; /** * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times * simultaneously to simluate many clients/producers/connections. - *

+ * *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single * full round trip ping. This test may be scaled up using a suitable JUnit test runner. - *

+ * *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, * except if the connection is lost in which case an attempt to re-establish the setup is made. - *

+ * *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the * temporary queue. - *

+ * *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - *

+ * *

*
CRC Card
Responsibilities Collaborations *
* * @author Rupert Smith */ -public class PingTestPerf extends AsymptoticTestCase //implements TimingControllerAware +public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware { private static Logger _logger = Logger.getLogger(PingTestPerf.class); - /** - * Holds the name of the property to get the test message size from. - */ - protected static final String MESSAGE_SIZE_PROPNAME = "messagesize"; - - /** - * Holds the name of the property to get the ping queue name from. - */ - protected static final String PING_DESTINATION_NAME_PROPNAME = "destinationname"; - - /** - * holds the queue count, if the test is being performed with multiple queues - */ - protected static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount"; - - /** - * Holds the name of the property to get the test delivery mode from. - */ - protected static final String PERSISTENT_MODE_PROPNAME = "persistent"; - - /** - * Holds the name of the property to get the test transactional mode from. - */ - protected static final String TRANSACTED_PROPNAME = "transacted"; - - /** - * Holds the name of the property to get the test broker url from. - */ - protected static final String BROKER_PROPNAME = "broker"; - - /** - * Holds the name of the property to get the test broker virtual path. - */ - protected static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; - - /** - * Holds the name of the property to get the waiting timeout for response messages. - */ - protected static final String TIMEOUT_PROPNAME = "timeout"; - - /** Holds the name of the property to get the message rate from. */ - protected static final String RATE_PROPNAME = "rate"; - - protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose"; - - /** Holds the true or false depending on wether it is P2P test or PubSub */ - protected static final String IS_PUBSUB_PROPNAME = "pubsub"; - /** - * Holds the size of message body to attach to the ping messages. - */ - protected static final int MESSAGE_SIZE_DEFAULT = 1024; - - protected static final int BATCH_SIZE_DEFAULT = 1000; - - protected static final int COMMIT_BATCH_SIZE_DEFAULT = BATCH_SIZE_DEFAULT; - - /** - * Holds the name of the queue to which pings are sent. - */ - private static final String PING_DESTINATION_NAME_DEFAULT = "ping"; - - /** - * Holds the message delivery mode to use for the test. - */ - protected static final boolean PERSISTENT_MODE_DEFAULT = false; - - /** - * Holds the transactional mode to use for the test. - */ - protected static final boolean TRANSACTED_DEFAULT = false; - - /** - * Holds the default broker url for the test. - */ - protected static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** - * Holds the default virtual path for the test. - */ - protected static final String VIRTUAL_PATH_DEFAULT = "/test"; - - /** - * Sets a default ping timeout. - */ - protected static final long TIMEOUT_DEFAULT = 3000; - - /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */ - private static final int RATE_DEFAULT = 0; - - protected static final String FAIL_AFTER_COMMIT = "FailAfterCommit"; - protected static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit"; - protected static final String FAIL_AFTER_SEND = "FailAfterSend"; - protected static final String FAIL_BEFORE_SEND = "FailBeforeSend"; - protected static final String COMMIT_BATCH_SIZE = "CommitBatchSize"; - protected static final String BATCH_SIZE = "BatchSize"; - protected static final String FAIL_ONCE = "FailOnce"; - - /** - * Thread local to hold the per-thread test setup fields. - */ + /** Thread local to hold the per-thread test setup fields. */ ThreadLocal threadSetup = new ThreadLocal(); - Object _lock = new Object(); - - // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in - // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner - // of the test parameters to log with the results. - protected Properties testParameters = System.getProperties(); - //private Properties testParameters = new ContextualProperties(System.getProperties()); + /** Holds a property reader to extract the test parameters from. */ + protected ParsedProperties testParameters = new ParsedProperties(System.getProperties()); public PingTestPerf(String name) { super(name); - // Sets up the test parameters with defaults. - setSystemPropertyIfNull(FAIL_AFTER_COMMIT, "false"); - setSystemPropertyIfNull(FAIL_BEFORE_COMMIT, "false"); - setSystemPropertyIfNull(FAIL_AFTER_SEND, "false"); - setSystemPropertyIfNull(FAIL_BEFORE_SEND, "false"); - setSystemPropertyIfNull(FAIL_ONCE, "true"); - - setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT)); - setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT)); - setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT)); - setSystemPropertyIfNull(PING_DESTINATION_NAME_PROPNAME, PING_DESTINATION_NAME_DEFAULT); - setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT)); - setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT)); - setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); - setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); - setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT)); - setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME, Integer.toString(0)); - setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false)); - setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT)); - setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false)); + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.DEFAULT_PING_DESTINATION_NAME); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_VERBOSE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_RATE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_PUBSUB)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, + Long.toString(PingPongProducer.DEFAULT_TIMEOUT)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.DEFAULT_FAIL_AFTER_SEND); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.DEFAULT_FAIL_BEFORE_SEND); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE); } /** @@ -187,20 +121,6 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll suite.addTest(new PingTestPerf("testPingOk")); return suite; - //return new junit.framework.TestSuite(PingTestPerf.class); - } - - protected static void setSystemPropertyIfNull(String propName, String propValue) - { - if (System.getProperty(propName) == null) - { - System.setProperty(propName, propValue); - } - } - - public void testPing(int jim) throws Exception - { - testPingOk(1); } public void testPingOk(int numPings) throws Exception @@ -214,15 +134,15 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - perThreadSetup._pingItselfClient.getTestMessage(null, - Integer.parseInt(testParameters.getProperty( - MESSAGE_SIZE_PROPNAME)), - Boolean.parseBoolean(testParameters.getProperty( - PERSISTENT_MODE_PROPNAME))); + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger( + PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean( + PingPongProducer.PERSISTENT_MODE_PROPNAME)); // start the test - long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); - int numReplies = perThreadSetup._pingItselfClient.pingAndWaitForReply(msg, numPings, timeout); + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout); // Fail the test if the timeout was exceeded. if (numReplies != numPings) @@ -232,75 +152,87 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll } } - - protected void setUp() throws Exception + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() { - // Log4j will propagate the test name as a thread local in all log output. - // Carefull when using this, it can cause memory leaks when not cleaned up properly. - //NDC.push(getName()); + _logger.debug("public void threadSetUp(): called"); - // Create the test setups on a per thread basis, only if they have not already been created. - - if (threadSetup.get() == null) + try { PerThreadSetup perThreadSetup = new PerThreadSetup(); // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); - String username = "guest"; - String password = "guest"; - String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); - int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); - String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); - boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); - boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); - String selector = null; - boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); - int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); - int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); - boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); - - boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); - boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); - boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); - boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); - boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME); + int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME); + int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME); + boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME); + boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME); + boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME); + boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME); + int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME); + Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME); - int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); + // Extract the test set up paramaeters. + int destinationscount = + Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME)); // This is synchronized because there is a race condition, which causes one connection to sleep if - // all threads try to create connection concurrently - synchronized (_lock) + // all threads try to create connection concurrently. + synchronized (this) { // Establish a client to ping a Destination and listen the reply back from same Destination - perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, - destinationname, selector, transacted, persistent, - messageSize, verbose, afterCommit, beforeCommit, - afterSend, beforeSend, failOnce, batchSize, destinationscount, - rate, pubsub); + perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName, + selector, transacted, persistent, messageSize, verbose, + failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend, + failOnce, batchSize, destinationscount, rate, pubsub); } // Start the client connection - perThreadSetup._pingItselfClient.getConnection().start(); + perThreadSetup._pingClient.getConnection().start(); // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } } - protected void tearDown() throws Exception + /** + * Performs test fixture clean + */ + public void threadTearDown() { + _logger.debug("public void threadTearDown(): called"); + try { - /* - if ((_pingItselfClient != null) && (_pingItselfClient.getConnection() != null)) + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) { - _pingItselfClient.getConnection().close(); + perThreadSetup._pingClient.close(); } - */ + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); } - finally + catch (JMSException e) { - //NDC.pop(); + _logger.warn("There was an exception during per thread tear down."); } } @@ -309,6 +241,6 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll /** * Holds the test ping client. */ - protected TestPingItself _pingItselfClient; + protected PingClient _pingClient; } } diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java deleted file mode 100644 index 274c8b5fc8..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.apache.qpid.ping; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; - -/** - * Tests the {@link Throttle} implementation. Test timings can be taken using this test class to confirm that the - * throttle works as it should, and what the maximum rate is that it works reliably. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Enable test timings to be taken to confirm that the throttle works at the correct rate. - *
- * - * @author Rupert Smith - */ -public class ThrottleTestPerf extends AsymptoticTestCase -{ - ThreadLocal threadSetup = new ThreadLocal(); - - public ThrottleTestPerf(String name) - { - super(name); - } - - /** - * Compile all the tests into a test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new ThrottleTestPerf("testThrottle")); - - return suite; - } - - public void testThrottle(int opsPerSecond) - { - Throttle throttle = threadSetup.get(); - - // Setting this on every test call won't cause any harm, convenient to use the size parameter for this. - throttle.setRate(opsPerSecond); - - // Run the test at the throttled rate, do this for the num of opsPerSecond, then every test should take 1 second. - for (int i = 0; i < opsPerSecond; i++) - { - throttle.throttle(); - } - } - - protected void setUp() - { - if (threadSetup.get() == null) - { - threadSetup.set(new Throttle()); - } - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java index fca133c425..81967d332a 100644 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -1,7 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.requestreply; -import java.util.Properties; - import javax.jms.*; import junit.framework.Assert; @@ -11,149 +29,87 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; /** * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from * a producer to a conumer, then the consumer replies to the message on a temporary queue. - *

+ * *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled - * up using a suitable JUnit test runner. See {@link TKTestRunner} or {@link PPTestRunner} for more information on how - * to do this. - *

+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more + * information on how to do this. + * *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads * gets its own connection/producer/consumer, this is only re-established if the connection is lost. - *

+ * *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come * back on the temporary queue. - *

+ * *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - *

+ * *

*
CRC Card
Responsibilities Collaborations *
* * @author Rupert Smith */ -public class PingPongTestPerf extends AsymptoticTestCase //implements TimingControllerAware +public class PingPongTestPerf extends AsymptoticTestCase { private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); - /** - * Holds the name of the property to get the test message size from. - */ - private static final String MESSAGE_SIZE_PROPNAME = "messagesize"; - - /** - * Holds the name of the property to get the ping queue name from. - */ - private static final String PING_QUEUE_NAME_PROPNAME = "destinationname"; - - /** - * Holds the name of the property to get the test delivery mode from. - */ - private static final String PERSISTENT_MODE_PROPNAME = "persistent"; - - /** - * Holds the name of the property to get the test transactional mode from. - */ - private static final String TRANSACTED_PROPNAME = "transacted"; - - /** - * Holds the name of the property to get the test broker url from. - */ - private static final String BROKER_PROPNAME = "broker"; - - /** - * Holds the name of the property to get the test broker virtual path. - */ - private static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; - - /** - * Holds the size of message body to attach to the ping messages. - */ - private static final int MESSAGE_SIZE_DEFAULT = 0; - - private static final int BATCH_SIZE_DEFAULT = 2; - - /** - * Holds the name of the queue to which pings are sent. - */ - private static final String PING_QUEUE_NAME_DEFAULT = "ping"; - - /** - * Holds the message delivery mode to use for the test. - */ - private static final boolean PERSISTENT_MODE_DEFAULT = false; - - /** - * Holds the transactional mode to use for the test. - */ - private static final boolean TRANSACTED_DEFAULT = false; - - /** - * Holds the default broker url for the test. - */ - private static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** - * Holds the default virtual path for the test. - */ - private static final String VIRTUAL_PATH_DEFAULT = "/test"; - - /** - * Sets a default ping timeout. - */ - private static final long TIMEOUT = 15000; - - /** Holds the name of the property to get the message rate from. */ - private static final String RATE_PROPNAME = "rate"; - - private static final String VERBOSE_OUTPUT_PROPNAME = "verbose"; - - /** Holds the true or false depending on wether it is P2P test or PubSub */ - private static final String IS_PUBSUB_PROPNAME = "pubsub"; - - /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */ - private static final int RATE_DEFAULT = 0; - - private static final String FAIL_AFTER_COMMIT = "FailAfterCommit"; - private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit"; - private static final String FAIL_AFTER_SEND = "FailAfterSend"; - private static final String FAIL_BEFORE_SEND = "FailBeforeSend"; - private static final String BATCH_SIZE = "BatchSize"; - private static final String FAIL_ONCE = "FailOnce"; - - /** - * Thread local to hold the per-thread test setup fields. - */ + /** Thread local to hold the per-thread test setup fields. */ ThreadLocal threadSetup = new ThreadLocal(); - Object _lock = new Object(); // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. - private Properties testParameters = System.getProperties(); - //private Properties testParameters = new ContextualProperties(System.getProperties()); + //private Properties testParameters = System.getProperties(); + private ParsedProperties testParameters = new ParsedProperties(System.getProperties()); public PingPongTestPerf(String name) { super(name); // Sets up the test parameters with defaults. - setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT)); - setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT)); - setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); - setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT)); - setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT)); - setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); - setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); - setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false)); - setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT)); - setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.DEFAULT_PING_DESTINATION_NAME); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_VERBOSE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_RATE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_PUBSUB)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, + Long.toString(PingPongProducer.DEFAULT_TIMEOUT)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.DEFAULT_FAIL_AFTER_SEND); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.DEFAULT_FAIL_BEFORE_SEND); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE); } /** @@ -185,19 +141,15 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestination(), - Integer.parseInt(testParameters.getProperty( - MESSAGE_SIZE_PROPNAME)), - Boolean.parseBoolean(testParameters.getProperty( - PERSISTENT_MODE_PROPNAME))); - - // Use the test timing controller to reset the test timer now and obtain the current time. - // This can be used to remove the message creation time from the test. - //TestTimingController timingUtils = getTimingController(); - //long startTime = timingUtils.restart(); + perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger( + PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean( + PingPongProducer.PERSISTENT_MODE_PROPNAME)); // Send the message and wait for a reply. - int numReplies = perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, TIMEOUT); + int numReplies = + perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.DEFAULT_TIMEOUT); // Fail the test if the timeout was exceeded. if (numReplies != numPings) @@ -206,82 +158,93 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont } } - protected void setUp() throws Exception + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() { - // Log4j will propagate the test name as a thread local in all log output. - // Carefull when using this, it can cause memory leaks when not cleaned up properly. - //NDC.push(getName()); - - // Create the test setups on a per thread basis, only if they have not already been created. - if (threadSetup.get() == null) + try { PerThreadSetup perThreadSetup = new PerThreadSetup(); // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); - String username = "guest"; - String password = "guest"; - String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); - String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME); - boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); - boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); - String selector = null; - boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); - int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); - int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); - boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); - - boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); - boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); - boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); - boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); - int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); - Boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); - - synchronized(_lock) + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME); + int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME); + int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME); + boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME); + boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME); + boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME); + boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME); + int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME); + Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME); + + synchronized (this) { // Establish a bounce back client on the ping queue to bounce back the pings. - perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, - queueName, persistent, transacted, selector, verbose, pubsub); + perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath, + destinationName, persistent, transacted, selector, + verbose, pubsub); // Start the connections for client and producer running. perThreadSetup._testPingBouncer.getConnection().start(); // Establish a ping-pong client on the ping queue to send the pings with. - perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath, - queueName, selector, transacted, persistent, messageSize, - verbose, afterCommit, beforeCommit, afterSend, - beforeSend, failOnce, batchSize, 0, rate, pubsub); + perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath, + destinationName, selector, transacted, persistent, + messageSize, verbose, failAfterCommit, + failBeforeCommit, failAfterSend, failBeforeSend, + failOnce, batchSize, 0, rate, pubsub); perThreadSetup._testPingProducer.getConnection().start(); } // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } } - protected void tearDown() throws Exception + /** + * Performs test fixture clean + */ + public void threadTearDown() { + _logger.debug("public void threadTearDown(): called"); + try { - /**if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null)) - { - _testPingBouncer.getConnection().close(); - } - - if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null)) - { - _testPingProducer.getConnection().close(); - }*/ + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + perThreadSetup._testPingProducer.close(); + //perThreadSetup._testPingBouncer.close(); + } + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); } - finally + catch (JMSException e) { - //NDC.pop(); + _logger.warn("There was an exception during per thread tear down."); } } - private static class PerThreadSetup + protected static class PerThreadSetup { /** * Holds the test ping-pong producer. diff --git a/java/pom.xml b/java/pom.xml index e1d9805bbb..2150e61861 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -348,13 +348,12 @@ - uk.co.thebadgerset junit-toolkit-maven-plugin - 0.3 + 0.5-SNAPSHOT - --> @@ -459,8 +458,8 @@ uk.co.thebadgerset junit-toolkit - 0.3 - test + 0.5-SNAPSHOT + compile -- cgit v1.2.1