summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-16 03:48:44 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-16 03:48:44 +0000
commita127cedac14989a730c08ffb7660d8566991f6a3 (patch)
treecbbb14d04a6e2a99ff1ebc9eb4e1aab61c282e5c /python/qpid/messaging.py
parent0c3950cee69412bd3b7c1790fd452799af25d5da (diff)
downloadqpid-python-a127cedac14989a730c08ffb7660d8566991f6a3.tar.gz
changed sender/receiver to be synchronous by default when invoked on a connected session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@910388 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py29
1 files changed, 22 insertions, 7 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 9108832eec..6eb8fa0e9d 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -181,7 +181,14 @@ class Connection:
"""
self._connected = True
self._wakeup()
- self._ewait(lambda: self._driver._connected, exc=ConnectError)
+ self._ewait(lambda: self._driver._connected and not self._unlinked(),
+ exc=ConnectError)
+
+ def _unlinked(self):
+ return [l
+ for ssn in self.sessions.values()
+ for l in ssn.senders + ssn.receivers
+ if not (l.linked or l.error or l.closed)]
@synchronized
def disconnect(self):
@@ -484,11 +491,13 @@ class Session:
sender = Sender(self, self.next_sender_id, target, options)
self.next_sender_id += 1
self.senders.append(sender)
- self._wakeup()
- # XXX: because of the lack of waiting here we can end up getting
- # into the driver loop with messages sent for senders that haven't
- # been linked yet, something similar can probably happen for
- # receivers
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ sender._ewait(lambda: sender.linked)
+ except SendError, e:
+ sender.close()
+ raise e
return sender
@synchronized
@@ -505,7 +514,13 @@ class Session:
receiver = Receiver(self, self.next_receiver_id, source, options)
self.next_receiver_id += 1
self.receivers.append(receiver)
- self._wakeup()
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ receiver._ewait(lambda: receiver.linked)
+ except ReceiveError, e:
+ receiver.close()
+ raise e
return receiver
@synchronized