summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/client.py3
-rw-r--r--python/qpid/peer.py30
2 files changed, 33 insertions, 0 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py
index cdceb87bdf..f1800204db 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -140,6 +140,9 @@ class ClientDelegate(Delegate):
def connection_close(self, ch, msg):
self.client.peer.close(msg)
+ def execution_complete(self, ch, msg):
+ ch.completion.complete(msg.cumulative_execution_mark)
+
def close(self, reason):
self.client.closed = True
self.client.reason = reason
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 72e6a19bc7..9880eea19b 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -186,6 +186,8 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
+ self.completion = ExecutionCompletion()
+
# Use reliable framing if version == 0-9.
self.reliable = (spec.major == 0 and spec.minor == 9)
self.synchronous = True
@@ -247,6 +249,7 @@ class Channel:
self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
+ self.completion.next_command(type)
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
if self.reliable:
@@ -337,3 +340,30 @@ class Future:
def is_complete(self):
return self.completed.isSet()
+
+class ExecutionCompletion:
+ def __init__(self):
+ self.completed = threading.Event()
+ self.sequence = Sequence(0)
+ self.command_id = 0
+ self.mark = 0
+
+ def next_command(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.klass.name != "execution":
+ self.command_id = self.sequence.next()
+
+ def complete(self, mark):
+ self.mark = mark
+ self.completed.set()
+ self.completed.clear()
+
+ def wait(self, point_of_interest=-1, timeout=None):
+ """
+ todo: really want to allow different threads to call this with
+ different points of interest on the same channel, this is a quick
+ hack for now
+ """
+ if point_of_interest == -1: point_of_interest = self.command_id
+ self.completed.wait(timeout)
+ return point_of_interest <= self.mark