diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-05-09 18:40:13 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-05-09 18:40:13 +0000 |
| commit | 2ebccc4f3ab6e7813ac2179c8318163ffdd22cff (patch) | |
| tree | 423197bb243ca909e90637fdc24c3a5a9565ad81 /python/qpid/session.py | |
| parent | 7f0c95b0e94c964a92c77c7c8c59035ffff35f34 (diff) | |
| download | qpid-python-2ebccc4f3ab6e7813ac2179c8318163ffdd22cff.tar.gz | |
QPID-1045: always notify incoming message queues of session closure and provide API for notifying listeners of closure; also preserve connection close code and report in errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@654907 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/session.py')
| -rw-r--r-- | python/qpid/session.py | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py index f8ac98b96e..a1103e0428 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -52,7 +52,8 @@ class Session(Invoker): self.timeout = timeout self.channel = None self.invoke_lock = Lock() - self.closed = False + self._closing = False + self._closed = False self.condition = Condition() @@ -82,7 +83,9 @@ class Session(Invoker): def error(self): exc = self.exceptions[:] - if len(exc) == 1: + if len(exc) == 0: + return None + elif len(exc) == 1: return exc[0] else: return tuple(exc) @@ -102,13 +105,31 @@ class Session(Invoker): def close(self, timeout=None): self.invoke_lock.acquire() try: - self.closed = True + self._closing = True self.channel.session_detach(self.name) finally: self.invoke_lock.release() if not wait(self.condition, lambda: self.channel is None, timeout): raise Timeout() + def closed(self): + self.lock.acquire() + try: + if self._closed: return + self._closed = True + + error = self.error() + for id in self.results: + f = self.results[id] + f.error(error) + self.results.clear() + + for q in self._incoming.values(): + q.close(error) + notify(self.condition) + finally: + self.lock.release() + def resolve_method(self, name): cmd = self.spec.instructions.get(name) if cmd is not None and cmd.track == self.spec["track.command"].value: @@ -136,7 +157,7 @@ class Session(Invoker): self.invoke_lock.release() def do_invoke(self, type, args, kwargs): - if self.closed: + if self._closing: raise SessionClosed() if self.channel == None: @@ -311,20 +332,7 @@ class Delegate: future.set(er.value) def execution_exception(self, ex): - self.session.lock.acquire() - try: - self.session.exceptions.append(ex) - error = self.session.error() - for id in self.session.results: - f = self.session.results[id] - f.error(error) - self.session.results.clear() - - for q in self.session._incoming.values(): - q.close(error) - notify(self.session.condition) - finally: - self.session.lock.release() + self.session.exceptions.append(ex) class Client(Delegate): |
