diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-10-10 17:15:31 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-10-10 17:15:31 +0000 |
| commit | 13e6bd9704643993d95c81f22106dae5b59b3084 (patch) | |
| tree | 2dbfa0faacecf665170120c61ec6239e5bff5a9c /python/qpid/messaging.py | |
| parent | c68d17bf36649f3ba68334c3147e2d0da7246e67 (diff) | |
| download | qpid-python-13e6bd9704643993d95c81f22106dae5b59b3084.tar.gz | |
made addresses not auto-create by default; added error handling and tests for nonexist/invalid addresses; added logging for aborted connections; fixed spurious reattach
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@823890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
| -rw-r--r-- | python/qpid/messaging.py | 37 |
1 files changed, 30 insertions, 7 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index f9ca54fe9e..10d9c78396 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -237,9 +237,10 @@ class Pattern: self.value = value # XXX: this should become part of the driver - def _bind(self, ssn, exchange, queue): - ssn.exchange_bind(exchange=exchange, queue=queue, - binding_key=self.value.replace("*", "#")) + def _bind(self, sst, exchange, queue): + from qpid.ops import ExchangeBind + sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, + binding_key=self.value.replace("*", "#"))) class SessionError(Exception): pass @@ -289,6 +290,7 @@ class Session: # XXX: I hate this name. self.ack_capacity = UNLIMITED + self.error = None self.closing = False self.closed = False @@ -309,9 +311,13 @@ class Session: def _check_error(self, exc=SessionError): self.connection._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=SessionError): - return self.connection._ewait(predicate, timeout, exc) + result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def sender(self, target, **options): @@ -520,6 +526,8 @@ class Sender: self.capacity = options.get("capacity", UNLIMITED) self.queued = Serial(0) self.acked = Serial(0) + self.error = None + self.linked = False self.closing = False self.closed = False self._lock = self.session._lock @@ -529,9 +537,13 @@ class Sender: def _check_error(self, exc=SendError): self.session._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=SendError): - return self.session._ewait(predicate, timeout, exc) + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def pending(self): @@ -567,6 +579,8 @@ class Sender: if not self.session.connection._connected or self.session.closing: raise Disconnected() + self._ewait(lambda: self.linked) + if isinstance(object, Message): message = object else: @@ -637,6 +651,8 @@ class Receiver: self.received = Serial(0) self.returned = Serial(0) + self.error = None + self.linked = False self.closing = False self.closed = False self.listener = None @@ -647,9 +663,13 @@ class Receiver: def _check_error(self, exc=ReceiveError): self.session._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=ReceiveError): - return self.session._ewait(predicate, timeout, exc) + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def pending(self): @@ -693,6 +713,9 @@ class Receiver: @type timeout: float @param timeout: the time to wait for a message to be available """ + + self._ewait(lambda: self.linked) + if self._capacity() == 0: self.granted = self.returned + 1 self._wakeup() @@ -751,7 +774,7 @@ class Receiver: self.closing = True self._wakeup() try: - self._ewait(lambda: self.closed) + self.session._ewait(lambda: self.closed) finally: self.session.receivers.remove(self) |
