diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-02-16 03:48:44 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-16 03:48:44 +0000 |
| commit | a127cedac14989a730c08ffb7660d8566991f6a3 (patch) | |
| tree | cbbb14d04a6e2a99ff1ebc9eb4e1aab61c282e5c /python/qpid/messaging.py | |
| parent | 0c3950cee69412bd3b7c1790fd452799af25d5da (diff) | |
| download | qpid-python-a127cedac14989a730c08ffb7660d8566991f6a3.tar.gz | |
changed sender/receiver to be synchronous by default when invoked on a connected session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@910388 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
| -rw-r--r-- | python/qpid/messaging.py | 29 |
1 files changed, 22 insertions, 7 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 9108832eec..6eb8fa0e9d 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -181,7 +181,14 @@ class Connection: """ self._connected = True self._wakeup() - self._ewait(lambda: self._driver._connected, exc=ConnectError) + self._ewait(lambda: self._driver._connected and not self._unlinked(), + exc=ConnectError) + + def _unlinked(self): + return [l + for ssn in self.sessions.values() + for l in ssn.senders + ssn.receivers + if not (l.linked or l.error or l.closed)] @synchronized def disconnect(self): @@ -484,11 +491,13 @@ class Session: sender = Sender(self, self.next_sender_id, target, options) self.next_sender_id += 1 self.senders.append(sender) - self._wakeup() - # XXX: because of the lack of waiting here we can end up getting - # into the driver loop with messages sent for senders that haven't - # been linked yet, something similar can probably happen for - # receivers + if not self.closed and self.connection._connected: + self._wakeup() + try: + sender._ewait(lambda: sender.linked) + except SendError, e: + sender.close() + raise e return sender @synchronized @@ -505,7 +514,13 @@ class Session: receiver = Receiver(self, self.next_receiver_id, source, options) self.next_receiver_id += 1 self.receivers.append(receiver) - self._wakeup() + if not self.closed and self.connection._connected: + self._wakeup() + try: + receiver._ewait(lambda: receiver.linked) + except ReceiveError, e: + receiver.close() + raise e return receiver @synchronized |
