summaryrefslogtreecommitdiff
path: root/python/qpid/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r--python/qpid/connection.py31
1 files changed, 22 insertions, 9 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 4ed430249b..8d0e115458 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -19,8 +19,7 @@
import datatypes, session
from threading import Thread, Condition, RLock
-from util import wait
-from framer import Closed
+from util import wait, notify
from assembler import Assembler, Segment
from codec010 import StringCodec
from session import Session
@@ -39,11 +38,11 @@ class SessionBusy(Exception): pass
class ConnectionFailed(Exception): pass
-def client(*args):
- return delegates.Client(*args)
+def client(*args, **kwargs):
+ return delegates.Client(*args, **kwargs)
-def server(*args):
- return delegates.Server(*args)
+def server(*args, **kwargs):
+ return delegates.Server(*args, **kwargs)
class Connection(Assembler):
@@ -61,13 +60,14 @@ class Connection(Assembler):
self.condition = Condition()
self.opened = False
self.failed = False
+ self.close_code = (None, "connection aborted")
self.thread = Thread(target=self.run)
self.thread.setDaemon(True)
self.channel_max = 65535
- self.delegate = delegate(self, args)
+ self.delegate = delegate(self, **args)
def attach(self, name, ch, delegate, force=False):
self.lock.acquire()
@@ -101,6 +101,8 @@ class Connection(Assembler):
ssn = self.sessions.pop(name, None)
if ssn is not None:
ssn.channel = None
+ ssn.closed()
+ notify(ssn.condition)
return ssn
finally:
self.lock.release()
@@ -127,13 +129,23 @@ class Connection(Assembler):
finally:
self.lock.release()
+ def detach_all(self):
+ self.lock.acquire()
+ try:
+ for ssn in self.attached.values():
+ if self.close_code[0] != 200:
+ ssn.exceptions.append(self.close_code)
+ self.detach(ssn.name, ssn.channel)
+ finally:
+ self.lock.release()
+
def start(self, timeout=None):
self.delegate.start()
self.thread.start()
if not wait(self.condition, lambda: self.opened or self.failed, timeout):
raise Timeout()
- if (self.failed):
- raise ConnectionFailed()
+ if self.failed:
+ raise ConnectionFailed(*self.close_code)
def run(self):
# XXX: we don't really have a good way to exit this loop without
@@ -142,6 +154,7 @@ class Connection(Assembler):
try:
seg = self.read_segment()
except Closed:
+ self.detach_all()
break
self.delegate.received(seg)