summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcpp/src/tests/cluster_tests.py5
-rw-r--r--python/qpid/brokertest.py16
2 files changed, 12 insertions, 9 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 57f3727a13..00cd98a20a 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -23,7 +23,7 @@ from qpid import datatypes, messaging
from qpid.brokertest import *
from qpid.harness import Skipped
from qpid.messaging import Message
-from threading import Thread
+from threading import Thread, Lock
from logging import getLogger
from itertools import chain
@@ -251,7 +251,6 @@ class LongTests(BrokerTest):
def test_management(self):
"""Stress test: Run management clients and other clients concurrently."""
- # TODO aconway 2010-03-03: move to brokertest framework
class ClientLoop(StoppableThread):
"""Run a client executable in a loop."""
def __init__(self, broker, cmd):
@@ -301,8 +300,8 @@ class LongTests(BrokerTest):
def stop(self):
"""Stop the running client and wait for it to exit"""
self.lock.acquire()
- if self.stopped: return
try:
+ if self.stopped: return
self.stopped = True
if self.process:
try: self.process.kill() # Kill the client.
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index a241c8b528..19acfd76ca 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -148,6 +148,7 @@ class Popen(popen2.Popen3):
drain - if true (default) drain stdout/stderr to files.
"""
self._clean = False
+ self._clean_lock = Lock()
assert find_exe(cmd[0]), "executable not found: "+cmd[0]
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
@@ -174,12 +175,15 @@ class Popen(popen2.Popen3):
def _cleanup(self):
"""Close pipes to sub-process"""
- if self._clean: return
- self._clean = True
- self.stdin.close()
- self.drain() # Drain output pipes.
- self.stdout.thread.join() # Drain thread closes pipe.
- self.stderr.thread.join()
+ self._clean_lock.acquire()
+ try:
+ if self._clean: return
+ self._clean = True
+ self.stdin.close()
+ self.drain() # Drain output pipes.
+ self.stdout.thread.join() # Drain thread closes pipe.
+ self.stderr.thread.join()
+ finally: self._clean_lock.release()
def unexpected(self,msg):
self._cleanup()