From c742af513e2afe49de32d90300b0c83ef340569d Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 2 Dec 2009 20:32:12 +0000 Subject: Fix test race condition that was causing the test to hang. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@886297 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/brokertest.py | 48 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 15 deletions(-) (limited to 'python') diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index 39f1e1a410..c3145c06ea 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -352,7 +352,7 @@ class StoppableThread(Thread): self.join() if self.error: raise self.error -class NumberedSender(StoppableThread): +class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. """ @@ -362,12 +362,14 @@ class NumberedSender(StoppableThread): max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.received(n) to be called each time messages are received. """ - StoppableThread.__init__(self) + Thread.__init__(self) self.sender = broker.test.popen( [broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING) self.condition = Condition() self.max = max_depth self.received = 0 + self.stopped = False + self.error = None def run(self): try: @@ -375,7 +377,7 @@ class NumberedSender(StoppableThread): while not self.stopped: if self.max: self.condition.acquire() - while self.sent - self.received > self.max: + while not self.stopped and self.sent - self.received > self.max: self.condition.wait() self.condition.release() self.sender.stdin.write(str(self.sent)+"\n") @@ -389,6 +391,16 @@ class NumberedSender(StoppableThread): self.received = count self.condition.notify() self.condition.release() + + def stop(self): + log.debug("NumberedSender.stop") + self.condition.acquire() + self.stopped = True + self.condition.notify() + self.condition.release() + self.join() + log.debug("NumberedSender.stop - joined") + if self.error: raise self.error class NumberedReceiver(Thread): """ @@ -407,30 +419,36 @@ class NumberedReceiver(Thread): self.lock = Lock() self.error = None self.sender = sender - + + def continue_test(self): + self.lock.acquire() + ret = self.stopat is None or self.received < self.stopat + self.lock.release() + return ret + def run(self): try: self.received = 0 - while self.stopat is None or self.received < self.stopat: - self.lock.acquire() - try: - m = int(self.receiver.stdout.readline()) - assert(m <= self.received) # Allow for duplicates - if (m == self.received): - self.received += 1 - if self.sender: - self.sender.notify_received(self.received) - finally: - self.lock.release() + while self.continue_test(): + m = int(self.receiver.stdout.readline()) + assert(m <= self.received) # Allow for duplicates + if (m == self.received): + self.received += 1 + if self.sender: + self.sender.notify_received(self.received) except Exception, e: + log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME aconway 2009-12-02: self.error = RethrownException(e, self.receiver.pname) def stop(self, count): """Returns when received >= count""" + log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02: self.lock.acquire() + log.debug("NumberedReceiver.stop at %d, received=%d" % (count, self.received)) self.stopat = count self.lock.release() self.join() + log.debug("NumberedReceiver.stop - joined") if self.error: raise self.error class ErrorGenerator(StoppableThread): -- cgit v1.2.1