summaryrefslogtreecommitdiff
path: root/java/testkit/testkit.py
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2009-11-11 03:28:39 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2009-11-11 03:28:39 +0000
commitaeec92612398e5d55f42168d54bd0d80a49a134e (patch)
tree402700c6e760d19650d56075fc6a6274d24dc146 /java/testkit/testkit.py
parentccc54f3eccfef128ad50a02bbcc56d5bc866d079 (diff)
downloadqpid-python-aeec92612398e5d55f42168d54bd0d80a49a134e.tar.gz
testkit.py provides the plumbing for running longer duration tests using the multi-broker framework defined in brokertest.py
For the time being testkit is carrying it's own copy of brokertest.py. The goal is to use the one available under the /python folder asap. Testkit is intended run as, 1) an ant target via "ant testkit" (to allow automated testing) 2) standalone against a release If running standalone you need to have the qpid/python files in the python path and $QP_CP should be set to the classpath that contains the qpid jars.Assuming $PYTHON_DIR points to the python folder you could run it as follows. $PYTHON_DIR/qpid-python-test -m testkit The ant target is currently not operational as there seems to be a few issues when running under jython. Tests ========= Currently only 3 tests are added. 1. test_multiplexing_con 2. test_multiplexing_con_tx 3. test_failover All tests are using the generic Sender and Receiver via the TestLauncher checked under the testkit module. Currently there are occasional test failures for test_multiplexing_con_tx The 'test_failover' test is currently failing due to a known bug. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@834754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/testkit/testkit.py')
-rwxr-xr-xjava/testkit/testkit.py215
1 files changed, 215 insertions, 0 deletions
diff --git a/java/testkit/testkit.py b/java/testkit/testkit.py
new file mode 100755
index 0000000000..c79c7eadcd
--- /dev/null
+++ b/java/testkit/testkit.py
@@ -0,0 +1,215 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time, string
+from brokertest import *
+from qpid.messaging import *
+
+
+try:
+ import java.lang.System
+ _cp = java.lang.System.getProperty("java.class.path");
+except ImportError:
+ _cp = checkenv("QP_CP")
+
+# The base test case has support for launching the genric
+# receiver and sender through the TestLauncher with all the options.
+#
+class JavaClientTest(BrokerTest):
+ """Base Case for Java Test cases"""
+
+ client_class = "org.apache.qpid.testkit.TestLauncher"
+
+ # currently there is no transparent reconnection.
+ # temp hack: just creating the queue here and closing it.
+ def start_error_watcher(self,broker=None):
+ ssn = broker.connect().session()
+ err_watcher = ssn.receiver("control {create:always}", capacity=1)
+ ssn.close()
+
+ def client(self,**options):
+ cmd = ["java","-cp",_cp]
+ cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
+ cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
+ cmd += ["-Dport=" + str(options.get("port",5672))]
+ cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
+ cmd += ["-Dssn_count=" + str(options.get("ssn_count",1))]
+ cmd += ["-Dqueue_name=" + options.get("queue_name","queue")]
+ cmd += ["-Dexchange_name=" + options.get("exchange_name","amq.direct")]
+ cmd += ["-Drouting_key=" + options.get("routing_key","routing_key")]
+ cmd += ["-Dunique_dests=" + str(options.get("unique_dests",True))]
+ cmd += ["-Ddurable=" + str(options.get("durable",False))]
+ cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
+ cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
+ cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
+ cmd += ["-Dsender=" + str(options.get("sender",False))]
+ cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
+ cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
+ cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))]
+ cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
+ cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
+ cmd += ["-Dreliability=" + options.get("reliability", "exactly_once")]
+ cmd += [self.client_class]
+
+ print str(options.get("port",5672))
+ return cmd
+
+ # currently there is no transparent reconnection.
+ # temp hack: just creating a receiver and closing session soon after.
+ def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
+ ssn = broker.connect().session()
+ err_watcher = ssn.receiver("control {create:always}", capacity=1)
+ i = run_time/error_ck_freq
+ for j in range(i):
+ try:
+ m = err_watcher.fetch(timeout=error_ck_freq)
+ print self.check_for_error()
+ except messaging.Empty, e:
+ pass # do nothing
+ ssn.close()
+
+ def check_for_error(msg):
+ raise Exception("Error:%s \ntime:%s\ntrace:%s\n" %
+ (msg.properties["desc"],
+ msg.properties["time"],
+ msg.properties["exception-trace"]
+ ))
+
+ def terminate_and_capture_logs(self,popen, process_name):
+ popen.terminate()
+ log = os.path.join(self.dir, process_name+".out")
+ f = open(log, 'w')
+ f.write(popen.stdout.read())
+ f.close()
+
+ log = os.path.join(self.dir, process_name+".err")
+ f = open(log, 'w')
+ f.write(popen.stderr.read())
+ f.close()
+
+ def verify(self, receiver,sender):
+ sender_running = receiver.is_running()
+ receiver_running = sender.is_running()
+
+ self.terminate_and_capture_logs(receiver,"receiver")
+ self.terminate_and_capture_logs(sender,"sender")
+
+ self.assertTrue(receiver_running,"Receiver has exited prematually")
+ self.assertTrue(sender_running,"Sender has exited prematually")
+
+
+class ConcurrencyTest(JavaClientTest):
+ """A concurrency test suite for the JMS client"""
+
+ def test_multiplexing_con(self):
+ """Tests multiple sessions on a single connection"""
+
+ cluster = Cluster(self, 2)
+ p = cluster[0].port
+
+ self.start_error_watcher(broker=cluster[0])
+
+ receiver = self.popen(self.client(receiver=True,
+ ssn_count=25,
+ port=p,
+ test_name=self.id()),
+ expect=EXPECT_EXIT_FAIL)
+
+ sender = self.popen(self.client(sender=True,
+ ssn_count=25,
+ port=p,
+ test_name=self.id()),
+ expect=EXPECT_EXIT_FAIL)
+
+ self.monitor_clients(broker=cluster[0],run_time=60)
+ self.verify(receiver,sender)
+
+
+ def test_multiplexing_con_tx(self):
+ """Tests multiple transacted sessions on a single connection"""
+
+ cluster = Cluster(self, 2)
+ ssn = cluster[0].connect().session()
+ p = cluster[0].port
+
+ self.start_error_watcher(broker=cluster[0])
+
+ receiver = self.popen(self.client(receiver=True,
+ ssn_count=25,
+ port=p,
+ transacted=True,
+ test_name=self.id()),
+ expect=EXPECT_EXIT_FAIL)
+
+ sender = self.popen(self.client(sender=True,
+ ssn_count=25,
+ port=p,
+ transacted=True,
+ test_name=self.id()),
+ expect=EXPECT_EXIT_FAIL)
+
+ self.monitor_clients(broker=cluster[0],run_time=60)
+ ssn.close();
+ self.verify(receiver,sender)
+
+class SoakTest(JavaClientTest):
+ """A soak test suite for the JMS client"""
+
+ def test_failover(self):
+ cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL)
+ p = cluster[0].port
+ self.start_error_watcher(broker=cluster[0])
+ receiver = self.popen(self.client(receiver=True,
+ ssn_count=1,
+ port=p,
+ reliability="at_least_once",
+ test_name=self.id()),
+ expect=EXPECT_EXIT_FAIL)
+
+ sender = self.popen(self.client(sender=True,
+ ssn_count=1,
+ port=p,
+ reliability="at_least_once",
+ test_name=self.id()),
+ expect=EXPECT_EXIT_FAIL)
+
+ # grace period for java clients to get the failover properly setup.
+ time.sleep(30)
+ error_msg=None
+ # Kill original brokers, start new ones.
+ try:
+ for i in range(4):
+ cluster[i].kill()
+ b=cluster.start()
+ self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
+ except ConnectError, e1:
+ error_msg = "Unable to connect to new cluster node"
+ except SessionError, e2:
+ error_msg = "Session error while connected to new cluster node"
+
+ # verify also captures out/err streams
+ self.verify(receiver,sender)
+ if error_msg:
+ raise Exception(error_msg)
+
+if __name__ == '__main__':
+ if not test.main(): sys.exit(1)
+