summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py29
1 files changed, 22 insertions, 7 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 9108832eec..6eb8fa0e9d 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -181,7 +181,14 @@ class Connection:
"""
self._connected = True
self._wakeup()
- self._ewait(lambda: self._driver._connected, exc=ConnectError)
+ self._ewait(lambda: self._driver._connected and not self._unlinked(),
+ exc=ConnectError)
+
+ def _unlinked(self):
+ return [l
+ for ssn in self.sessions.values()
+ for l in ssn.senders + ssn.receivers
+ if not (l.linked or l.error or l.closed)]
@synchronized
def disconnect(self):
@@ -484,11 +491,13 @@ class Session:
sender = Sender(self, self.next_sender_id, target, options)
self.next_sender_id += 1
self.senders.append(sender)
- self._wakeup()
- # XXX: because of the lack of waiting here we can end up getting
- # into the driver loop with messages sent for senders that haven't
- # been linked yet, something similar can probably happen for
- # receivers
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ sender._ewait(lambda: sender.linked)
+ except SendError, e:
+ sender.close()
+ raise e
return sender
@synchronized
@@ -505,7 +514,13 @@ class Session:
receiver = Receiver(self, self.next_receiver_id, source, options)
self.next_receiver_id += 1
self.receivers.append(receiver)
- self._wakeup()
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ receiver._ewait(lambda: receiver.linked)
+ except ReceiveError, e:
+ receiver.close()
+ raise e
return receiver
@synchronized