diff options
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 5 | ||||
-rw-r--r-- | python/qpid/brokertest.py | 16 |
2 files changed, 12 insertions, 9 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 57f3727a13..00cd98a20a 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -23,7 +23,7 @@ from qpid import datatypes, messaging from qpid.brokertest import * from qpid.harness import Skipped from qpid.messaging import Message -from threading import Thread +from threading import Thread, Lock from logging import getLogger from itertools import chain @@ -251,7 +251,6 @@ class LongTests(BrokerTest): def test_management(self): """Stress test: Run management clients and other clients concurrently.""" - # TODO aconway 2010-03-03: move to brokertest framework class ClientLoop(StoppableThread): """Run a client executable in a loop.""" def __init__(self, broker, cmd): @@ -301,8 +300,8 @@ class LongTests(BrokerTest): def stop(self): """Stop the running client and wait for it to exit""" self.lock.acquire() - if self.stopped: return try: + if self.stopped: return self.stopped = True if self.process: try: self.process.kill() # Kill the client. diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index a241c8b528..19acfd76ca 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -148,6 +148,7 @@ class Popen(popen2.Popen3): drain - if true (default) drain stdout/stderr to files. """ self._clean = False + self._clean_lock = Lock() assert find_exe(cmd[0]), "executable not found: "+cmd[0] if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] @@ -174,12 +175,15 @@ class Popen(popen2.Popen3): def _cleanup(self): """Close pipes to sub-process""" - if self._clean: return - self._clean = True - self.stdin.close() - self.drain() # Drain output pipes. - self.stdout.thread.join() # Drain thread closes pipe. - self.stderr.thread.join() + self._clean_lock.acquire() + try: + if self._clean: return + self._clean = True + self.stdin.close() + self.drain() # Drain output pipes. + self.stdout.thread.join() # Drain thread closes pipe. + self.stderr.thread.join() + finally: self._clean_lock.release() def unexpected(self,msg): self._cleanup() |