summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/peer.py32
1 files changed, 20 insertions, 12 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index d7ae85e345..3927f20667 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -351,8 +351,8 @@ class Future:
class ExecutionCompletion:
def __init__(self):
- self.completed = threading.Event()
- self.sequence = Sequence(0)
+ self.condition = threading.Condition()
+ self.sequence = Sequence(1)
self.command_id = 0
self.mark = 0
@@ -362,19 +362,27 @@ class ExecutionCompletion:
self.command_id = self.sequence.next()
def close(self):
- self.completed.set()
+ self.condition.acquire()
+ try:
+ self.condition.notifyAll()
+ finally:
+ self.condition.release()
def complete(self, mark):
- self.mark = mark
- self.completed.set()
- self.completed.clear()
+ self.condition.acquire()
+ try:
+ self.mark = mark
+ self.condition.notifyAll()
+ finally:
+ self.condition.release()
def wait(self, point_of_interest=-1, timeout=None):
- """
- todo: really want to allow different threads to call this with
- different points of interest on the same channel, this is a quick
- hack for now
- """
if point_of_interest == -1: point_of_interest = self.command_id
- self.completed.wait(timeout)
+ self.condition.acquire()
+ try:
+ if point_of_interest > self.mark:
+ self.condition.wait(timeout)
+ finally:
+ self.condition.release()
+ #todo: retry until timed out or closed
return point_of_interest <= self.mark