summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-10-10 17:15:31 +0000
committerRafael H. Schloming <rhs@apache.org>2009-10-10 17:15:31 +0000
commit13e6bd9704643993d95c81f22106dae5b59b3084 (patch)
tree2dbfa0faacecf665170120c61ec6239e5bff5a9c /python/qpid/messaging.py
parentc68d17bf36649f3ba68334c3147e2d0da7246e67 (diff)
downloadqpid-python-13e6bd9704643993d95c81f22106dae5b59b3084.tar.gz
made addresses not auto-create by default; added error handling and tests for nonexist/invalid addresses; added logging for aborted connections; fixed spurious reattach
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@823890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py37
1 files changed, 30 insertions, 7 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index f9ca54fe9e..10d9c78396 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -237,9 +237,10 @@ class Pattern:
self.value = value
# XXX: this should become part of the driver
- def _bind(self, ssn, exchange, queue):
- ssn.exchange_bind(exchange=exchange, queue=queue,
- binding_key=self.value.replace("*", "#"))
+ def _bind(self, sst, exchange, queue):
+ from qpid.ops import ExchangeBind
+ sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
+ binding_key=self.value.replace("*", "#")))
class SessionError(Exception):
pass
@@ -289,6 +290,7 @@ class Session:
# XXX: I hate this name.
self.ack_capacity = UNLIMITED
+ self.error = None
self.closing = False
self.closed = False
@@ -309,9 +311,13 @@ class Session:
def _check_error(self, exc=SessionError):
self.connection._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SessionError):
- return self.connection._ewait(predicate, timeout, exc)
+ result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def sender(self, target, **options):
@@ -520,6 +526,8 @@ class Sender:
self.capacity = options.get("capacity", UNLIMITED)
self.queued = Serial(0)
self.acked = Serial(0)
+ self.error = None
+ self.linked = False
self.closing = False
self.closed = False
self._lock = self.session._lock
@@ -529,9 +537,13 @@ class Sender:
def _check_error(self, exc=SendError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SendError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -567,6 +579,8 @@ class Sender:
if not self.session.connection._connected or self.session.closing:
raise Disconnected()
+ self._ewait(lambda: self.linked)
+
if isinstance(object, Message):
message = object
else:
@@ -637,6 +651,8 @@ class Receiver:
self.received = Serial(0)
self.returned = Serial(0)
+ self.error = None
+ self.linked = False
self.closing = False
self.closed = False
self.listener = None
@@ -647,9 +663,13 @@ class Receiver:
def _check_error(self, exc=ReceiveError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=ReceiveError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -693,6 +713,9 @@ class Receiver:
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
+
+ self._ewait(lambda: self.linked)
+
if self._capacity() == 0:
self.granted = self.returned + 1
self._wakeup()
@@ -751,7 +774,7 @@ class Receiver:
self.closing = True
self._wakeup()
try:
- self._ewait(lambda: self.closed)
+ self.session._ewait(lambda: self.closed)
finally:
self.session.receivers.remove(self)