From e06aa805cfe24b8edf619a6a535883f94589ac35 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 22 Apr 2008 16:11:34 +0000 Subject: QPID-947: update cpp and python management to 0-10 final git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650565 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/session.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) (limited to 'python/qpid/session.py') diff --git a/python/qpid/session.py b/python/qpid/session.py index 427a403b90..f649b95a2c 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -17,7 +17,7 @@ # under the License. # -from threading import Condition, RLock, currentThread +from threading import Condition, RLock, Lock, currentThread from invoker import Invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec @@ -29,8 +29,11 @@ from exceptions import * from logging import getLogger log = getLogger("qpid.io.cmd") +msg = getLogger("qpid.io.msg") -class SessionDetached(Exception): pass +class SessionException(Exception): pass +class SessionClosed(SessionException): pass +class SessionDetached(SessionException): pass def client(*args): return Client(*args) @@ -38,8 +41,6 @@ def client(*args): def server(*args): return Server(*args) -class SessionException(Exception): pass - INCOMPLETE = object() class Session(Invoker): @@ -50,6 +51,8 @@ class Session(Invoker): self.auto_sync = auto_sync self.timeout = timeout self.channel = None + self.invoke_lock = Lock() + self.closed = False self.condition = Condition() @@ -97,7 +100,12 @@ class Session(Invoker): raise SessionException(self.error()) def close(self, timeout=None): - self.channel.session_detach(self.name) + self.invoke_lock.acquire() + try: + self.closed = True + self.channel.session_detach(self.name) + finally: + self.invoke_lock.release() if not wait(self.condition, lambda: self.channel is None, timeout): raise Timeout() @@ -119,6 +127,16 @@ class Session(Invoker): if not hasattr(type, "track"): return type.new(args, kwargs) + self.invoke_lock.acquire() + try: + return self.do_invoke(type, args, kwargs) + finally: + self.invoke_lock.release() + + def do_invoke(self, type, args, kwargs): + if self.closed: + raise SessionClosed() + if self.channel == None: raise SessionDetached() @@ -160,6 +178,7 @@ class Session(Invoker): seg = Segment(False, True, self.spec["segment_type.body"].value, type.track, self.channel.id, message.body) self.send(seg) + msg.debug("SENT %s", message) if type.result: if self.auto_sync: @@ -304,8 +323,6 @@ class Delegate: finally: self.session.lock.release() -msg = getLogger("qpid.io.msg") - class Client(Delegate): def message_transfer(self, cmd, headers, body): -- cgit v1.2.1