diff options
| author | Alan Conway <aconway@apache.org> | 2010-03-26 13:40:02 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-03-26 13:40:02 +0000 |
| commit | d4b3e508a31400eab400eb00958c02da4b54c645 (patch) | |
| tree | 83cde2a4479ec58414fd9956c98e258226053c38 /python | |
| parent | 6e9d609b0edea3daab64fcd8fbf7a1bb0f0b3146 (diff) | |
| download | qpid-python-d4b3e508a31400eab400eb00958c02da4b54c645.tar.gz | |
Fix race condition causing deadlock in cluster_tests.py, failover_test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@927847 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/brokertest.py | 41 |
1 files changed, 20 insertions, 21 deletions
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index f18b75bb0d..2f064f59b6 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -497,7 +497,7 @@ class NumberedSender(Thread): def __init__(self, broker, max_depth=None): """ max_depth: enable flow control, ensure sent - received <= max_depth. - Requires self.received(n) to be called each time messages are received. + Requires self.notify_received(n) to be called each time messages are received. """ Thread.__init__(self) self.sender = broker.test.popen( @@ -508,6 +508,10 @@ class NumberedSender(Thread): self.stopped = False self.error = None + def write_message(self, n): + self.sender.stdin.write(str(n)+"\n") + self.sender.stdin.flush() + def run(self): try: self.sent = 0 @@ -517,8 +521,7 @@ class NumberedSender(Thread): while not self.stopped and self.sent - self.received > self.max: self.condition.wait() self.condition.release() - self.sender.stdin.write(str(self.sent)+"\n") - self.sender.stdin.flush() + self.write_message(self.sent) self.sent += 1 except Exception: self.error = RethrownException(self.sender.pname) @@ -531,10 +534,12 @@ class NumberedSender(Thread): def stop(self): self.condition.acquire() - self.stopped = True - self.condition.notify() - self.condition.release() + try: + self.stopped = True + self.condition.notify() + finally: self.condition.release() self.join() + self.write_message(-1) # end-of-messages marker. if self.error: raise self.error class NumberedReceiver(Thread): @@ -551,35 +556,29 @@ class NumberedReceiver(Thread): self.receiver = self.test.popen( [self.test.receiver_exec, "--port", broker.port()], expect=EXPECT_RUNNING, drain=False) - self.stopat = None self.lock = Lock() self.error = None self.sender = sender - def continue_test(self): - self.lock.acquire() - ret = self.stopat is None or self.received < self.stopat - self.lock.release() - return ret + def read_message(self): + return int(self.receiver.stdout.readline()) def run(self): try: self.received = 0 - while self.continue_test(): - m = int(self.receiver.stdout.readline()) - assert(m <= self.received) # Allow for duplicates - if (m == self.received): + m = self.read_message() + while m != -1: + assert(m <= self.received) # Check for missing messages + if (m == self.received): # Ignore duplicates self.received += 1 if self.sender: self.sender.notify_received(self.received) + m = self.read_message() except Exception: self.error = RethrownException(self.receiver.pname) - def stop(self, count): - """Returns when received >= count""" - self.lock.acquire() - self.stopat = count - self.lock.release() + def stop(self): + """Returns when termination message is received""" self.join() if self.error: raise self.error |
