diff options
Diffstat (limited to 'python/qpid/messaging.py')
| -rw-r--r-- | python/qpid/messaging.py | 29 |
1 files changed, 22 insertions, 7 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 9108832eec..6eb8fa0e9d 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -181,7 +181,14 @@ class Connection: """ self._connected = True self._wakeup() - self._ewait(lambda: self._driver._connected, exc=ConnectError) + self._ewait(lambda: self._driver._connected and not self._unlinked(), + exc=ConnectError) + + def _unlinked(self): + return [l + for ssn in self.sessions.values() + for l in ssn.senders + ssn.receivers + if not (l.linked or l.error or l.closed)] @synchronized def disconnect(self): @@ -484,11 +491,13 @@ class Session: sender = Sender(self, self.next_sender_id, target, options) self.next_sender_id += 1 self.senders.append(sender) - self._wakeup() - # XXX: because of the lack of waiting here we can end up getting - # into the driver loop with messages sent for senders that haven't - # been linked yet, something similar can probably happen for - # receivers + if not self.closed and self.connection._connected: + self._wakeup() + try: + sender._ewait(lambda: sender.linked) + except SendError, e: + sender.close() + raise e return sender @synchronized @@ -505,7 +514,13 @@ class Session: receiver = Receiver(self, self.next_receiver_id, source, options) self.next_receiver_id += 1 self.receivers.append(receiver) - self._wakeup() + if not self.closed and self.connection._connected: + self._wakeup() + try: + receiver._ewait(lambda: receiver.linked) + except ReceiveError, e: + receiver.close() + raise e return receiver @synchronized |
