From 22912b1f7e82542b66f53be02ea1b8be3402e728 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 7 Mar 2008 13:55:00 +0000 Subject: added timeouts to hello-010-world; switched to conditions rather than events for handling connection/session state; handle session exceptions git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634678 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/session.py | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) (limited to 'python/qpid/session.py') diff --git a/python/qpid/session.py b/python/qpid/session.py index c628762dac..b83bd1637f 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -17,13 +17,14 @@ # under the License. # -from threading import Event, RLock +from threading import Condition, RLock from invoker import Invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec from assembler import Segment from queue import Queue from datatypes import Message +from util import wait from logging import getLogger class SessionDetached(Exception): pass @@ -34,6 +35,8 @@ def client(*args): def server(*args): return Server(*args) +class SessionException(Exception): pass + class Session(Invoker): def __init__(self, name, spec, sync=True, timeout=10, delegate=client): @@ -42,17 +45,22 @@ class Session(Invoker): self.sync = sync self.timeout = timeout self.channel = None - self.opened = Event() - self.closed = Event() + + self.condition = Condition() + + self.send_id = True self.receiver = Receiver(self) self.sender = Sender(self) - self.delegate = delegate(self) - self.send_id = True - self.results = {} + self.lock = RLock() self._incoming = {} + self.results = {} + self.exceptions = [] + self.assembly = None + self.delegate = delegate(self) + def incoming(self, destination): self.lock.acquire() try: @@ -66,7 +74,7 @@ class Session(Invoker): def close(self, timeout=None): self.channel.session_detach(self.name) - self.closed.wait(timeout=timeout) + wait(self.condition, lambda: self.channel is None, timeout) def resolve_method(self, name): cmd = self.spec.instructions.get(name) @@ -105,7 +113,7 @@ class Session(Invoker): type.segment_type, type.track, self.channel.id, sc.encoded) if type.result: - result = Future() + result = Future(exception=SessionException) self.results[self.sender.next_id] = result self.send(seg) @@ -234,9 +242,27 @@ class Delegate: self.session = session def execution_result(self, er): - future = self.session.results[er.command_id] + future = self.session.results.pop(er.command_id) future.set(er.value) + def execution_exception(self, ex): + self.session.lock.acquire() + try: + self.session.exceptions.append(ex) + excs = self.session.exceptions[:] + if len(excs) == 1: + error = excs[0] + else: + error = tuple(excs) + for id in self.session.results: + f = self.session.results.pop(id) + f.error(error) + + for q in self.session._incoming.values(): + q.close(error) + finally: + self.session.lock.release() + msg = getLogger("qpid.ssn.msg") class Client(Delegate): -- cgit v1.2.1