summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-03-26 13:40:02 +0000
committerAlan Conway <aconway@apache.org>2010-03-26 13:40:02 +0000
commitd4b3e508a31400eab400eb00958c02da4b54c645 (patch)
tree83cde2a4479ec58414fd9956c98e258226053c38 /python
parent6e9d609b0edea3daab64fcd8fbf7a1bb0f0b3146 (diff)
downloadqpid-python-d4b3e508a31400eab400eb00958c02da4b54c645.tar.gz
Fix race condition causing deadlock in cluster_tests.py, failover_test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@927847 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/brokertest.py41
1 files changed, 20 insertions, 21 deletions
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index f18b75bb0d..2f064f59b6 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -497,7 +497,7 @@ class NumberedSender(Thread):
def __init__(self, broker, max_depth=None):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
- Requires self.received(n) to be called each time messages are received.
+ Requires self.notify_received(n) to be called each time messages are received.
"""
Thread.__init__(self)
self.sender = broker.test.popen(
@@ -508,6 +508,10 @@ class NumberedSender(Thread):
self.stopped = False
self.error = None
+ def write_message(self, n):
+ self.sender.stdin.write(str(n)+"\n")
+ self.sender.stdin.flush()
+
def run(self):
try:
self.sent = 0
@@ -517,8 +521,7 @@ class NumberedSender(Thread):
while not self.stopped and self.sent - self.received > self.max:
self.condition.wait()
self.condition.release()
- self.sender.stdin.write(str(self.sent)+"\n")
- self.sender.stdin.flush()
+ self.write_message(self.sent)
self.sent += 1
except Exception: self.error = RethrownException(self.sender.pname)
@@ -531,10 +534,12 @@ class NumberedSender(Thread):
def stop(self):
self.condition.acquire()
- self.stopped = True
- self.condition.notify()
- self.condition.release()
+ try:
+ self.stopped = True
+ self.condition.notify()
+ finally: self.condition.release()
self.join()
+ self.write_message(-1) # end-of-messages marker.
if self.error: raise self.error
class NumberedReceiver(Thread):
@@ -551,35 +556,29 @@ class NumberedReceiver(Thread):
self.receiver = self.test.popen(
[self.test.receiver_exec, "--port", broker.port()],
expect=EXPECT_RUNNING, drain=False)
- self.stopat = None
self.lock = Lock()
self.error = None
self.sender = sender
- def continue_test(self):
- self.lock.acquire()
- ret = self.stopat is None or self.received < self.stopat
- self.lock.release()
- return ret
+ def read_message(self):
+ return int(self.receiver.stdout.readline())
def run(self):
try:
self.received = 0
- while self.continue_test():
- m = int(self.receiver.stdout.readline())
- assert(m <= self.received) # Allow for duplicates
- if (m == self.received):
+ m = self.read_message()
+ while m != -1:
+ assert(m <= self.received) # Check for missing messages
+ if (m == self.received): # Ignore duplicates
self.received += 1
if self.sender:
self.sender.notify_received(self.received)
+ m = self.read_message()
except Exception:
self.error = RethrownException(self.receiver.pname)
- def stop(self, count):
- """Returns when received >= count"""
- self.lock.acquire()
- self.stopat = count
- self.lock.release()
+ def stop(self):
+ """Returns when termination message is received"""
self.join()
if self.error: raise self.error