From d4b3e508a31400eab400eb00958c02da4b54c645 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 26 Mar 2010 13:40:02 +0000 Subject: Fix race condition causing deadlock in cluster_tests.py, failover_test. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@927847 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/brokertest.py | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) (limited to 'python') diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index f18b75bb0d..2f064f59b6 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -497,7 +497,7 @@ class NumberedSender(Thread): def __init__(self, broker, max_depth=None): """ max_depth: enable flow control, ensure sent - received <= max_depth. - Requires self.received(n) to be called each time messages are received. + Requires self.notify_received(n) to be called each time messages are received. """ Thread.__init__(self) self.sender = broker.test.popen( @@ -508,6 +508,10 @@ class NumberedSender(Thread): self.stopped = False self.error = None + def write_message(self, n): + self.sender.stdin.write(str(n)+"\n") + self.sender.stdin.flush() + def run(self): try: self.sent = 0 @@ -517,8 +521,7 @@ class NumberedSender(Thread): while not self.stopped and self.sent - self.received > self.max: self.condition.wait() self.condition.release() - self.sender.stdin.write(str(self.sent)+"\n") - self.sender.stdin.flush() + self.write_message(self.sent) self.sent += 1 except Exception: self.error = RethrownException(self.sender.pname) @@ -531,10 +534,12 @@ class NumberedSender(Thread): def stop(self): self.condition.acquire() - self.stopped = True - self.condition.notify() - self.condition.release() + try: + self.stopped = True + self.condition.notify() + finally: self.condition.release() self.join() + self.write_message(-1) # end-of-messages marker. if self.error: raise self.error class NumberedReceiver(Thread): @@ -551,35 +556,29 @@ class NumberedReceiver(Thread): self.receiver = self.test.popen( [self.test.receiver_exec, "--port", broker.port()], expect=EXPECT_RUNNING, drain=False) - self.stopat = None self.lock = Lock() self.error = None self.sender = sender - def continue_test(self): - self.lock.acquire() - ret = self.stopat is None or self.received < self.stopat - self.lock.release() - return ret + def read_message(self): + return int(self.receiver.stdout.readline()) def run(self): try: self.received = 0 - while self.continue_test(): - m = int(self.receiver.stdout.readline()) - assert(m <= self.received) # Allow for duplicates - if (m == self.received): + m = self.read_message() + while m != -1: + assert(m <= self.received) # Check for missing messages + if (m == self.received): # Ignore duplicates self.received += 1 if self.sender: self.sender.notify_received(self.received) + m = self.read_message() except Exception: self.error = RethrownException(self.receiver.pname) - def stop(self, count): - """Returns when received >= count""" - self.lock.acquire() - self.stopat = count - self.lock.release() + def stop(self): + """Returns when termination message is received""" self.join() if self.error: raise self.error -- cgit v1.2.1