diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2009-11-11 03:28:39 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2009-11-11 03:28:39 +0000 |
commit | aeec92612398e5d55f42168d54bd0d80a49a134e (patch) | |
tree | 402700c6e760d19650d56075fc6a6274d24dc146 /java/testkit/testkit.py | |
parent | ccc54f3eccfef128ad50a02bbcc56d5bc866d079 (diff) | |
download | qpid-python-aeec92612398e5d55f42168d54bd0d80a49a134e.tar.gz |
testkit.py provides the plumbing for running longer duration tests using the multi-broker framework defined in brokertest.py
For the time being testkit is carrying it's own copy of brokertest.py. The goal is to use the one available under the /python folder asap.
Testkit is intended run as,
1) an ant target via "ant testkit" (to allow automated testing)
2) standalone against a release
If running standalone you need to have the qpid/python files in the python path and $QP_CP should be set to the classpath that contains the qpid jars.Assuming $PYTHON_DIR points to the python folder you could run it as follows.
$PYTHON_DIR/qpid-python-test -m testkit
The ant target is currently not operational as there seems to be a few issues when running under jython.
Tests
=========
Currently only 3 tests are added.
1. test_multiplexing_con
2. test_multiplexing_con_tx
3. test_failover
All tests are using the generic Sender and Receiver via the TestLauncher checked under the testkit module.
Currently there are occasional test failures for test_multiplexing_con_tx
The 'test_failover' test is currently failing due to a known bug.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@834754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/testkit/testkit.py')
-rwxr-xr-x | java/testkit/testkit.py | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/java/testkit/testkit.py b/java/testkit/testkit.py new file mode 100755 index 0000000000..c79c7eadcd --- /dev/null +++ b/java/testkit/testkit.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time, string +from brokertest import * +from qpid.messaging import * + + +try: + import java.lang.System + _cp = java.lang.System.getProperty("java.class.path"); +except ImportError: + _cp = checkenv("QP_CP") + +# The base test case has support for launching the genric +# receiver and sender through the TestLauncher with all the options. +# +class JavaClientTest(BrokerTest): + """Base Case for Java Test cases""" + + client_class = "org.apache.qpid.testkit.TestLauncher" + + # currently there is no transparent reconnection. + # temp hack: just creating the queue here and closing it. + def start_error_watcher(self,broker=None): + ssn = broker.connect().session() + err_watcher = ssn.receiver("control {create:always}", capacity=1) + ssn.close() + + def client(self,**options): + cmd = ["java","-cp",_cp] + cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")] + cmd += ["-Dhost=" + options.get("host","127.0.0.1")] + cmd += ["-Dport=" + str(options.get("port",5672))] + cmd += ["-Dcon_count=" + str(options.get("con_count",1))] + cmd += ["-Dssn_count=" + str(options.get("ssn_count",1))] + cmd += ["-Dqueue_name=" + options.get("queue_name","queue")] + cmd += ["-Dexchange_name=" + options.get("exchange_name","amq.direct")] + cmd += ["-Drouting_key=" + options.get("routing_key","routing_key")] + cmd += ["-Dunique_dests=" + str(options.get("unique_dests",True))] + cmd += ["-Ddurable=" + str(options.get("durable",False))] + cmd += ["-Dtransacted=" + str(options.get("transacted",False))] + cmd += ["-Dreceiver=" + str(options.get("receiver",False))] + cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))] + cmd += ["-Dsender=" + str(options.get("sender",False))] + cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))] + cmd += ["-Dtx_size=" + str(options.get("tx_size",10))] + cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))] + cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] + cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] + cmd += ["-Dreliability=" + options.get("reliability", "exactly_once")] + cmd += [self.client_class] + + print str(options.get("port",5672)) + return cmd + + # currently there is no transparent reconnection. + # temp hack: just creating a receiver and closing session soon after. + def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60): + ssn = broker.connect().session() + err_watcher = ssn.receiver("control {create:always}", capacity=1) + i = run_time/error_ck_freq + for j in range(i): + try: + m = err_watcher.fetch(timeout=error_ck_freq) + print self.check_for_error() + except messaging.Empty, e: + pass # do nothing + ssn.close() + + def check_for_error(msg): + raise Exception("Error:%s \ntime:%s\ntrace:%s\n" % + (msg.properties["desc"], + msg.properties["time"], + msg.properties["exception-trace"] + )) + + def terminate_and_capture_logs(self,popen, process_name): + popen.terminate() + log = os.path.join(self.dir, process_name+".out") + f = open(log, 'w') + f.write(popen.stdout.read()) + f.close() + + log = os.path.join(self.dir, process_name+".err") + f = open(log, 'w') + f.write(popen.stderr.read()) + f.close() + + def verify(self, receiver,sender): + sender_running = receiver.is_running() + receiver_running = sender.is_running() + + self.terminate_and_capture_logs(receiver,"receiver") + self.terminate_and_capture_logs(sender,"sender") + + self.assertTrue(receiver_running,"Receiver has exited prematually") + self.assertTrue(sender_running,"Sender has exited prematually") + + +class ConcurrencyTest(JavaClientTest): + """A concurrency test suite for the JMS client""" + + def test_multiplexing_con(self): + """Tests multiple sessions on a single connection""" + + cluster = Cluster(self, 2) + p = cluster[0].port + + self.start_error_watcher(broker=cluster[0]) + + receiver = self.popen(self.client(receiver=True, + ssn_count=25, + port=p, + test_name=self.id()), + expect=EXPECT_EXIT_FAIL) + + sender = self.popen(self.client(sender=True, + ssn_count=25, + port=p, + test_name=self.id()), + expect=EXPECT_EXIT_FAIL) + + self.monitor_clients(broker=cluster[0],run_time=60) + self.verify(receiver,sender) + + + def test_multiplexing_con_tx(self): + """Tests multiple transacted sessions on a single connection""" + + cluster = Cluster(self, 2) + ssn = cluster[0].connect().session() + p = cluster[0].port + + self.start_error_watcher(broker=cluster[0]) + + receiver = self.popen(self.client(receiver=True, + ssn_count=25, + port=p, + transacted=True, + test_name=self.id()), + expect=EXPECT_EXIT_FAIL) + + sender = self.popen(self.client(sender=True, + ssn_count=25, + port=p, + transacted=True, + test_name=self.id()), + expect=EXPECT_EXIT_FAIL) + + self.monitor_clients(broker=cluster[0],run_time=60) + ssn.close(); + self.verify(receiver,sender) + +class SoakTest(JavaClientTest): + """A soak test suite for the JMS client""" + + def test_failover(self): + cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL) + p = cluster[0].port + self.start_error_watcher(broker=cluster[0]) + receiver = self.popen(self.client(receiver=True, + ssn_count=1, + port=p, + reliability="at_least_once", + test_name=self.id()), + expect=EXPECT_EXIT_FAIL) + + sender = self.popen(self.client(sender=True, + ssn_count=1, + port=p, + reliability="at_least_once", + test_name=self.id()), + expect=EXPECT_EXIT_FAIL) + + # grace period for java clients to get the failover properly setup. + time.sleep(30) + error_msg=None + # Kill original brokers, start new ones. + try: + for i in range(4): + cluster[i].kill() + b=cluster.start() + self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) + except ConnectError, e1: + error_msg = "Unable to connect to new cluster node" + except SessionError, e2: + error_msg = "Session error while connected to new cluster node" + + # verify also captures out/err streams + self.verify(receiver,sender) + if error_msg: + raise Exception(error_msg) + +if __name__ == '__main__': + if not test.main(): sys.exit(1) + |