summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-27 22:21:13 +0000
committerAlan Conway <aconway@apache.org>2010-01-27 22:21:13 +0000
commitacedc31ff40ef337ed7194a0c61dad72db6f5ada (patch)
tree824d9032d69f5da1109cc8c96d769323e2815777 /qpid/python
parent54dec7ec3d992f9d29034a9f9f317337ac8c932d (diff)
downloadqpid-python-acedc31ff40ef337ed7194a0c61dad72db6f5ada.tar.gz
Test for management + cluster: run management tools in parallel with regular clients.
cluster_tests.py: added LongTests.test_management brokertest.py: optionally drain test process output to *.out/*.err files. On by default. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@903868 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/brokertest.py117
1 files changed, 85 insertions, 32 deletions
diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py
index 9f07ffe388..d53a5ae1e2 100644
--- a/qpid/python/qpid/brokertest.py
+++ b/qpid/python/qpid/brokertest.py
@@ -36,6 +36,7 @@ log = getLogger("qpid.brokertest")
EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test.
EXPECT_RUNNING=3 # Expect to still be running at end of test
+EXPECT_UNKNOWN=4 # No expectation, don't check exit status.
def is_running(pid):
try:
@@ -62,14 +63,16 @@ class ExceptionWrapper:
return func(*args, **kwargs)
except Exception, e:
raise Exception("%s: %s" %(self.msg, str(e)))
-
+
def error_line(f):
try:
- lines = file(f).readlines()
- if len(lines) > 0: return ": %s" % (lines[-1])
- except: pass
- return ""
-
+ ff = file(f)
+ try:
+ lines = ff.readlines()
+ if len(lines) > 0: return ": %s" % (lines[-1])
+ else: return ""
+ finally: ff.close()
+ except: return ""
class Popen(popen2.Popen3):
"""
@@ -78,7 +81,41 @@ class Popen(popen2.Popen3):
Dumps command line, stdout, stderr to data dir for debugging.
"""
- def __init__(self, cmd, expect=EXPECT_EXIT_OK):
+ class DrainThread(Thread):
+ """Thread to drain a file object and write the data to a file."""
+ def __init__(self, infile, outname):
+ Thread.__init__(self)
+ self.infile, self.outname = infile, outname
+ self.outfile = None
+
+ def run(self):
+ try:
+ for line in self.infile:
+ if self.outfile is None:
+ self.outfile = file(self.outname, "w")
+ self.outfile.write(line)
+ finally:
+ if self.outfile is not None: self.outfile.close
+
+ class OutStream(ExceptionWrapper):
+ """Wrapper for output streams, handles excpetions & draining output"""
+ def __init__(self, infile, outfile, msg):
+ ExceptionWrapper.__init__(self, infile, msg)
+ self.infile, self.outfile = infile, outfile
+ self.thread = None
+
+ def drain(self):
+ if self.thread is None:
+ self.thread = Popen.DrainThread(self.infile, self.outfile)
+ self.thread.start()
+
+ def outfile(self, ext): return "%s.%s" % (self.pname, ext)
+
+ def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
+ """Run cmd (should be a list of arguments)
+ expect - if set verify expectation at end of test.
+ drain - if true (default) drain stdout/stderr to files.
+ """
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
popen2.Popen3.__init__(self, self.cmd, True)
@@ -86,25 +123,34 @@ class Popen(popen2.Popen3):
self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
msg = "Process %s" % self.pname
self.stdin = ExceptionWrapper(self.tochild, msg)
- self.stdout = ExceptionWrapper(self.fromchild, msg)
- self.stderr = ExceptionWrapper(self.childerr, msg)
- self.dump(self.cmd_str(), "cmd")
+ self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg)
+ self.stderr = Popen.OutStream(self.childerr, self.outfile("err"), msg)
+ f = file(self.outfile("cmd"), "w")
+ try: f.write(self.cmd_str())
+ finally: f.close()
log.debug("Started process %s" % self.pname)
+ if drain: self.drain()
+
+ def drain(self):
+ self.stdout.drain()
+ self.stderr.drain()
- def dump(self, str, ext):
- name = "%s.%s" % (self.pname, ext)
- f = file(name, "w")
- f.write(str)
- f.close()
- return name
+ def drain_join(self):
+ self.stdout.thread.join()
+ self.stderr.thread.join()
def unexpected(self,msg):
- self.dump(self.stdout.read(), "out")
- err = self.dump(self.stderr.read(), "err")
- raise BadProcessStatus("%s %s%s" % (self.pname, msg, error_line(err)))
+ self.drain()
+ self.drain_join()
+ raise BadProcessStatus("%s %s%s" % (self.pname, msg,
+ error_line(self.outfile("err"))))
def stop(self): # Clean up at end of test.
- if self.expect == EXPECT_RUNNING:
+ self.drain()
+ if self.expect == EXPECT_UNKNOWN:
+ try: self.kill() # Just make sure its dead
+ except: pass
+ elif self.expect == EXPECT_RUNNING:
try:
self.kill()
except:
@@ -131,7 +177,8 @@ class Popen(popen2.Popen3):
self.wait()
return outerr
- def is_running(self): return self.poll() is None
+ def is_running(self):
+ return self.poll() is None
def assert_running(self):
if not self.is_running(): unexpected("Exit code %d" % self.returncode)
@@ -142,7 +189,9 @@ class Popen(popen2.Popen3):
return self.returncode
def wait(self):
+ self.drain()
self.returncode = popen2.Popen3.wait(self)
+ self.drain_join()
return self.returncode
def send_signal(self, sig):
@@ -186,7 +235,7 @@ class Broker(Popen):
cmd += ["--log-to-stderr=no"]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
- Popen.__init__(self, cmd, expect)
+ Popen.__init__(self, cmd, expect, drain=False)
test.cleanup_stop(self)
self.host = "localhost"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
@@ -336,10 +385,10 @@ class BrokerTest(TestCase):
"""Call thing.stop at end of test"""
self.stopem.append(stopable)
- def popen(self, cmd, expect=EXPECT_EXIT_OK):
+ def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
"""Start a process that will be killed at end of test, in the test dir."""
os.chdir(self.dir)
- p = Popen(cmd, expect)
+ p = Popen(cmd, expect, drain)
self.cleanup_stop(p)
return p
@@ -359,8 +408,8 @@ class BrokerTest(TestCase):
for b in _brokers: b.connect().close()
class RethrownException(Exception):
- """Captures the original stack trace to be thrown later"""
- def __init__(self, e, msg=""):
+ """Captures the stack trace of the current exception to be thrown later"""
+ def __init__(self, msg=""):
Exception.__init__(self, msg+"\n"+format_exc())
class StoppableThread(Thread):
@@ -409,7 +458,7 @@ class NumberedSender(Thread):
self.sender.stdin.write(str(self.sent)+"\n")
self.sender.stdin.flush()
self.sent += 1
- except Exception, e: self.error = RethrownException(e, self.sender.pname)
+ except Exception: self.error = RethrownException(self.sender.pname)
def notify_received(self, count):
"""Called by receiver to enable flow control. count = messages received so far."""
@@ -438,7 +487,8 @@ class NumberedReceiver(Thread):
Thread.__init__(self)
self.test = broker.test
self.receiver = self.test.popen(
- [self.test.receiver_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
+ [self.test.receiver_exec, "--port", broker.port()],
+ expect=EXPECT_RUNNING, drain=False)
self.stopat = None
self.lock = Lock()
self.error = None
@@ -460,8 +510,8 @@ class NumberedReceiver(Thread):
self.received += 1
if self.sender:
self.sender.notify_received(self.received)
- except Exception, e:
- self.error = RethrownException(e, self.receiver.pname)
+ except Exception:
+ self.error = RethrownException(self.receiver.pname)
def stop(self, count):
"""Returns when received >= count"""
@@ -500,5 +550,8 @@ def import_script(path):
Import executable script at path as a module.
Requires some trickery as scripts are not in standard module format
"""
- name=os.path.split(path)[1].replace("-","_")
- return imp.load_module(name, file(path), path, ("", "r", imp.PY_SOURCE))
+ f = file(path)
+ try:
+ name=os.path.split(path)[1].replace("-","_")
+ return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
+ finally: f.close()