diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-10-10 17:15:31 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-10-10 17:15:31 +0000 |
| commit | ca9e506b1d39ad1959cacb19a64b44c5894ce5f9 (patch) | |
| tree | e1ce264b64635af6c7c64a00c14a501ac72b015f /qpid/python | |
| parent | 12c44544a81d77a80e2e3dcb067b822418880806 (diff) | |
| download | qpid-python-ca9e506b1d39ad1959cacb19a64b44c5894ce5f9.tar.gz | |
made addresses not auto-create by default; added error handling and tests for nonexist/invalid addresses; added logging for aborted connections; fixed spurious reattach
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@823890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qpid/address.py | 47 | ||||
| -rw-r--r-- | qpid/python/qpid/driver.py | 176 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging.py | 37 | ||||
| -rw-r--r-- | qpid/python/qpid/ops.py | 2 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging.py | 131 |
5 files changed, 301 insertions, 92 deletions
diff --git a/qpid/python/qpid/address.py b/qpid/python/qpid/address.py index 18e8284e4d..5976d4889b 100644 --- a/qpid/python/qpid/address.py +++ b/qpid/python/qpid/address.py @@ -35,7 +35,8 @@ LBRACE = Type("LBRACE", r"\{") RBRACE = Type("RBRACE", r"\}") COLON = Type("COLON", r":") COMMA = Type("COMMA", r",") -ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*') +SLASH = Type("SLASH", r"/") +ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_.-]*') NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+') STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""") WSPACE = Type("WSPACE", r"[ \n\r\t]+") @@ -53,12 +54,36 @@ class Token: joined = "|".join(["(%s)" % t.pattern for t in TYPES]) LEXER = re.compile(joined) +class LexError(Exception): + pass + +def line_info(st, pos): + idx = 0 + lineno = 1 + column = 0 + line_pos = 0 + while idx < pos: + if st[idx] == "\n": + lineno += 1 + column = 0 + line_pos = idx + column += 1 + idx += 1 + + end = st.find("\n", line_pos) + if end < 0: + end = len(st) + line = st[line_pos:end] + + return line, lineno, column + def lex(st): pos = 0 while pos < len(st): m = LEXER.match(st, pos) if m is None: - raise ValueError(repr(st[pos:])) + line, ln, col = line_info(st, pos) + raise LexError("unrecognized character in <string>:%s,%s: %s" % (ln, col, line)) else: idx = m.lastindex t = Token(TYPES[idx - 1], m.group(idx)) @@ -68,8 +93,6 @@ def lex(st): class ParseError(Exception): pass -class EOF(Exception): pass - class Parser: def __init__(self, tokens): @@ -97,11 +120,17 @@ class Parser: def address(self): name = self.eat(ID).value - if self.matches(LBRACE): + subject = None + options = None + if self.matches(SLASH): + self.eat(SLASH) + if self.matches(ID): + subject = self.eat(ID).value + else: + subject = "" + elif self.matches(LBRACE): options = self.map() - else: - options = None - return name, options + return name, subject, options def map(self): self.eat(LBRACE) @@ -129,6 +158,8 @@ class Parser: def value(self): if self.matches(NUMBER, STRING): return eval(self.eat().value) + elif self.matches(ID): + return self.eat().value elif self.matches(LBRACE): return self.map() else: diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index a4244b9830..dadf43fc7f 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -17,7 +17,7 @@ # under the License. # -import compat, connection, socket, struct, sys, time +import address, compat, connection, socket, struct, sys, time from concurrency import synchronized from datatypes import RangedSet, Serial from exceptions import Timeout, VersionError @@ -26,18 +26,14 @@ from logging import getLogger from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED from ops import * from selector import Selector -from session import Client, INCOMPLETE, SessionDetached from threading import Condition, Thread from util import connect log = getLogger("qpid.messaging") -def parse_addr(address): - parts = address.split("/", 1) - if len(parts) == 1: - return parts[0], None - else: - return parts[0], parts[i1] +def addr2reply_to(addr): + name, subject, options = address.parse(addr) + return ReplyTo(name, subject) def reply_to2addr(reply_to): if reply_to.routing_key is None: @@ -100,9 +96,12 @@ class SessionState: def write_query(self, query, handler): id = self.sent + query.sync = True self.write_cmd(query, lambda: handler(self.results.pop(id))) def write_cmd(self, cmd, completion=noop): + if self.detached: + raise Exception("detached") cmd.id = self.sent self.sent += 1 self.completions[cmd.id] = completion @@ -187,6 +186,7 @@ class Driver: if data: log.debug("READ: %r", data) else: + log.debug("ABORTED: %s", self._socket.getpeername()) error = ("connection aborted",) recoverable = True except socket.error, e: @@ -285,7 +285,7 @@ class Driver: def do_connection_close(self, close): self.write_op(ConnectionCloseOk()) - if close.reply_ok != close_code.normal: + if close.reply_code != close_code.normal: self.connection.error = (close.reply_code, close.reply_text) # XXX: should we do a half shutdown on the socket here? # XXX: we really need to test this, we may end up reporting a @@ -343,6 +343,10 @@ class Driver: sst = self.get_sst(er) sst.results[er.command_id] = er.value + def do_execution_exception(self, ex): + sst = self.get_sst(ex) + sst.session.error = (ex,) + def dispatch(self): try: if self._socket is None and self.connection._connected and not self._opening: @@ -381,7 +385,7 @@ class Driver: def attach(self, ssn): sst = self._attachments.get(ssn) - if sst is None: + if sst is None and not ssn.closed: for i in xrange(0, self.channel_max): if not self._sessions.has_key(i): ch = i @@ -403,7 +407,7 @@ class Driver: for rcv in ssn.receivers: self.link_in(rcv) - if ssn.closing and not sst.detached: + if sst is not None and ssn.closing and not sst.detached: sst.detached = True sst.write_op(SessionDetach(name=ssn.name)) @@ -416,24 +420,66 @@ class Driver: del self._attachments[ssn] ssn.closed = True + def do_session_detach(self, dtc): + sst = self.get_sst(dtc) + sst.write_op(SessionDetached(name=dtc.name)) + self.do_session_detached(dtc) + def link_out(self, snd): - sst = self._attachments[snd.session] + sst = self._attachments.get(snd.session) _snd = self._attachments.get(snd) - if not snd.closing and _snd is None: + if _snd is None and not snd.closing and not snd.closed: _snd = Attachment(snd) - _snd.linked = False - node, _snd._subject = parse_addr(snd.target) - def do_link_out(result): - if result.not_found: - # XXX: should check 'create' option - sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT)) + + try: + _snd.name, _snd.subject, _snd.options = address.parse(snd.target) + except address.LexError, e: + snd.error = e + snd.closed = True + return + except address.ParseError, e: + snd.error = e + snd.closed = True + return + + # XXX: subject + if _snd.options is None: + _snd.options = {} + + def do_link(): + snd.linked = True + + def do_queue_q(result): + if sst.detached: + return + + if result.queue: _snd._exchange = "" - _snd._routing_key = node + _snd._routing_key = _snd.name + do_link() else: - _snd._exchange = node - _snd._routing_key = _snd._subject - _snd.linked = True - sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out) + snd.error = ("no such queue: %s" % _snd.name,) + del self._attachments[snd] + snd.closed = True + + def do_exchange_q(result): + if sst.detached: + return + + if result.not_found: + if _snd.options.get("create") in ("always", "receiver"): + sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT)) + _snd._exchange = "" + _snd._routing_key = _snd.name + else: + sst.write_query(QueueQuery(queue=_snd.name), do_queue_q) + return + else: + _snd._exchange = _snd.name + _snd._routing_key = _snd.subject + do_link() + + sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q) self._attachments[snd] = _snd if snd.closing and not snd.closed: @@ -441,41 +487,77 @@ class Driver: snd.closed = True def link_in(self, rcv): - sst = self._attachments[rcv.session] + sst = self._attachments.get(rcv.session) _rcv = self._attachments.get(rcv) - if _rcv is None and not rcv.closing: + if _rcv is None and not rcv.closing and not rcv.closed: _rcv = Attachment(rcv) - _rcv.linked = False _rcv.canceled = False _rcv.draining = False - def do_link_in(result): + try: + _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source) + except address.LexError, e: + rcv.error = e + rcv.closed = True + return + except address.ParseError, e: + rcv.error = e + rcv.closed = True + return + + # XXX: subject + if _rcv.options is None: + _rcv.options = {} + + def do_link(): + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) + sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + rcv.linked = True + + def do_queue_q(result): + if sst.detached: + return + if result.queue: + _rcv._queue = _rcv.name + do_link() + else: + rcv.error = ("no such queue: %s" % _rcv.name,) + del self._attachments[rcv] + rcv.closed = True + + def do_exchange_q(result): + if sst.detached: + return if result.not_found: - _rcv._queue = rcv.source - # XXX: should check 'create' option - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) + if _rcv.options.get("create") in ("always", "receiver"): + _rcv._queue = _rcv.name + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) + else: + sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q) + return else: _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) # XXX - if rcv.filter is None: + if _rcv.options.get("filter") is None: f = FILTER_DEFAULTS[result.type] else: f = rcv.filter - f._bind(sst, rcv.source, _rcv._queue) - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) - sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) - _rcv.linked = True - sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in) + f._bind(sst, _rcv.name, _rcv._queue) + do_link() + sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q) self._attachments[rcv] = _rcv if rcv.closing and not rcv.closed: - if not _rcv.canceled: - def close_rcv(): - del self._attachments[rcv] - rcv.closed = True - sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) - _rcv.canceled = True + if rcv.linked: + if not _rcv.canceled: + def close_rcv(): + del self._attachments[rcv] + rcv.closed = True + sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) + _rcv.canceled = True + else: + rcv.closed = True def process(self, ssn): if ssn.closing: return @@ -485,8 +567,9 @@ class Driver: while sst.outgoing_idx < len(ssn.outgoing): msg = ssn.outgoing[sst.outgoing_idx] snd = msg._sender + # XXX: should check for sender error here _snd = self._attachments.get(snd) - if _snd and _snd.linked: + if _snd and snd.linked: self.send(snd, msg) sst.outgoing_idx += 1 else: @@ -559,7 +642,7 @@ class Driver: def grant(self, rcv): sst = self._attachments[rcv.session] _rcv = self._attachments.get(rcv) - if _rcv is None or not _rcv.linked or _rcv.draining: + if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining: return if rcv.granted is UNLIMITED: @@ -606,7 +689,7 @@ class Driver: rk = _snd._routing_key # XXX: do we need to query to figure out how to create the reply-to interoperably? if msg.reply_to: - rt = ReplyTo(*parse_addr(msg.reply_to)) + rt = addr2reply_to(msg.reply_to) else: rt = None dp = DeliveryProperties(routing_key=rk) @@ -650,7 +733,6 @@ class Driver: log.debug("RECV [%s] %s", ssn, msg) ssn.incoming.append(msg) self.connection._waiter.notifyAll() - return INCOMPLETE def _decode(self, xfr): dp = EMPTY_DP diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index f9ca54fe9e..10d9c78396 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -237,9 +237,10 @@ class Pattern: self.value = value # XXX: this should become part of the driver - def _bind(self, ssn, exchange, queue): - ssn.exchange_bind(exchange=exchange, queue=queue, - binding_key=self.value.replace("*", "#")) + def _bind(self, sst, exchange, queue): + from qpid.ops import ExchangeBind + sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, + binding_key=self.value.replace("*", "#"))) class SessionError(Exception): pass @@ -289,6 +290,7 @@ class Session: # XXX: I hate this name. self.ack_capacity = UNLIMITED + self.error = None self.closing = False self.closed = False @@ -309,9 +311,13 @@ class Session: def _check_error(self, exc=SessionError): self.connection._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=SessionError): - return self.connection._ewait(predicate, timeout, exc) + result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def sender(self, target, **options): @@ -520,6 +526,8 @@ class Sender: self.capacity = options.get("capacity", UNLIMITED) self.queued = Serial(0) self.acked = Serial(0) + self.error = None + self.linked = False self.closing = False self.closed = False self._lock = self.session._lock @@ -529,9 +537,13 @@ class Sender: def _check_error(self, exc=SendError): self.session._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=SendError): - return self.session._ewait(predicate, timeout, exc) + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def pending(self): @@ -567,6 +579,8 @@ class Sender: if not self.session.connection._connected or self.session.closing: raise Disconnected() + self._ewait(lambda: self.linked) + if isinstance(object, Message): message = object else: @@ -637,6 +651,8 @@ class Receiver: self.received = Serial(0) self.returned = Serial(0) + self.error = None + self.linked = False self.closing = False self.closed = False self.listener = None @@ -647,9 +663,13 @@ class Receiver: def _check_error(self, exc=ReceiveError): self.session._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=ReceiveError): - return self.session._ewait(predicate, timeout, exc) + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def pending(self): @@ -693,6 +713,9 @@ class Receiver: @type timeout: float @param timeout: the time to wait for a message to be available """ + + self._ewait(lambda: self.linked) + if self._capacity() == 0: self.granted = self.returned + 1 self._wakeup() @@ -751,7 +774,7 @@ class Receiver: self.closing = True self._wakeup() try: - self._ewait(lambda: self.closed) + self.session._ewait(lambda: self.closed) finally: self.session.receivers.remove(self) diff --git a/qpid/python/qpid/ops.py b/qpid/python/qpid/ops.py index 422a104466..277d059203 100644 --- a/qpid/python/qpid/ops.py +++ b/qpid/python/qpid/ops.py @@ -80,7 +80,7 @@ class Compound(object): return "%s(%s)" % (self.__class__.__name__, ", ".join(["%s=%r" % (f.name, getattr(self, f.name)) for f in self.ARGS - if getattr(self, f.name) is not f.default])) + if getattr(self, f.name) != f.default])) class Command(Compound): UNENCODED=[Field("channel", "uint16", 0), diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index fd7aca6ec7..f2bbc79c26 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -24,7 +24,7 @@ import time from qpid.tests import Test from qpid.harness import Skipped from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ - InsufficientCapacity, Message, UNLIMITED, uuid4 + InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): @@ -63,11 +63,12 @@ class Base(Test): return "%s[%s, %s]" % (base, count, self.test_id) def ping(self, ssn): + PING_Q = 'ping-queue {create: always}' # send a message - sender = ssn.sender("ping-queue") + sender = ssn.sender(PING_Q) content = self.content("ping") sender.send(content) - receiver = ssn.receiver("ping-queue") + receiver = ssn.receiver(PING_Q) msg = receiver.fetch(0) ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) @@ -174,6 +175,8 @@ class ConnectionTests(Base): self.conn.close() assert not self.conn.connected() +ACK_Q = 'test-ack-queue {create: always}' + class SessionTests(Base): def setup_connection(self): @@ -183,7 +186,7 @@ class SessionTests(Base): return self.conn.session() def testSender(self): - snd = self.ssn.sender("test-snd-queue") + snd = self.ssn.sender('test-snd-queue {create: always}') snd2 = self.ssn.sender(snd.target) assert snd is not snd2 snd2.close() @@ -196,7 +199,7 @@ class SessionTests(Base): self.ssn.acknowledge(msg) def testReceiver(self): - rcv = self.ssn.receiver("test-rcv-queue") + rcv = self.ssn.receiver('test-rcv-queue {create: always}') rcv2 = self.ssn.receiver(rcv.source) assert rcv is not rcv2 rcv2.close() @@ -209,34 +212,36 @@ class SessionTests(Base): self.ssn.acknowledge(msg) def testStart(self): - rcv = self.ssn.receiver("test-start-queue") + START_Q = 'test-start-queue {create: always}' + rcv = self.ssn.receiver(START_Q) assert not rcv.started self.ssn.start() assert rcv.started - rcv = self.ssn.receiver("test-start-queue") + rcv = self.ssn.receiver(START_Q) assert rcv.started def testStop(self): + STOP_Q = 'test-stop-queue {create: always}' self.ssn.start() - rcv = self.ssn.receiver("test-stop-queue") + rcv = self.ssn.receiver(STOP_Q) assert rcv.started self.ssn.stop() assert not rcv.started - rcv = self.ssn.receiver("test-stop-queue") + rcv = self.ssn.receiver(STOP_Q) assert not rcv.started # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown def ackTest(self, acker, ack_capacity=None): # send a bunch of messages - snd = self.ssn.sender("test-ack-queue") + snd = self.ssn.sender(ACK_Q) contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking - rcv = self.ssn.receiver(snd.target) + rcv = self.ssn.receiver(ACK_Q) self.drain(rcv, expected=contents) self.ssn.close() @@ -245,7 +250,7 @@ class SessionTests(Base): self.ssn = self.conn.session() if ack_capacity is not None: self.ssn.ack_capacity = ack_capacity - rcv = self.ssn.receiver("test-ack-queue") + rcv = self.ssn.receiver(ACK_Q) self.drain(rcv, expected=contents) acker(self.ssn) self.ssn.close() @@ -253,7 +258,7 @@ class SessionTests(Base): # drain the queue a final time and verify that the messages were # dequeued self.ssn = self.conn.session() - rcv = self.ssn.receiver("test-ack-queue") + rcv = self.ssn.receiver(ACK_Q) self.assertEmpty(rcv) def testAcknowledge(self): @@ -271,7 +276,7 @@ class SessionTests(Base): pass finally: self.ssn.ack_capacity = UNLIMITED - self.drain(self.ssn.receiver("test-ack-queue")) + self.drain(self.ssn.receiver(ACK_Q)) self.ssn.acknowledge() def testAcknowledgeAsyncAckCap1(self): @@ -294,10 +299,12 @@ class SessionTests(Base): return contents def txTest(self, commit): + TX_Q = 'test-tx-queue {create: always}' + TX_Q_COPY = 'test-tx-queue-copy {create: always}' txssn = self.conn.session(transactional=True) - contents = self.send(self.ssn, "test-tx-queue", "txTest", 3) - txrcv = txssn.receiver("test-tx-queue") - txsnd = txssn.sender("test-tx-queue-copy") + contents = self.send(self.ssn, TX_Q, "txTest", 3) + txrcv = txssn.receiver(TX_Q) + txsnd = txssn.sender(TX_Q_COPY) rcv = self.ssn.receiver(txrcv.source) copy_rcv = self.ssn.receiver(txsnd.target) self.assertEmpty(copy_rcv) @@ -323,9 +330,10 @@ class SessionTests(Base): self.txTest(False) def txTestSend(self, commit): + TX_SEND_Q = 'test-tx-send-queue {create: always}' txssn = self.conn.session(transactional=True) - contents = self.send(txssn, "test-tx-send-queue", "txTestSend", 3) - rcv = self.ssn.receiver("test-tx-send-queue") + contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + rcv = self.ssn.receiver(TX_SEND_Q) self.assertEmpty(rcv) if commit: @@ -345,10 +353,11 @@ class SessionTests(Base): self.txTestSend(False) def txTestAck(self, commit): + TX_ACK_Q = 'test-tx-ack-queue {create: always}' txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver("test-tx-ack-queue") + txrcv = txssn.receiver(TX_ACK_Q) self.assertEmpty(txrcv) - contents = self.send(self.ssn, "test-tx-ack-queue", "txTestAck", 3) + contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3) assert contents == self.drain(txrcv) if commit: @@ -366,11 +375,11 @@ class SessionTests(Base): txssn.close() txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver("test-tx-ack-queue") + txrcv = txssn.receiver(TX_ACK_Q) assert contents == self.drain(txrcv) txssn.acknowledge() txssn.commit() - rcv = self.ssn.receiver("test-tx-ack-queue") + rcv = self.ssn.receiver(TX_ACK_Q) self.assertEmpty(rcv) txssn.close() self.assertEmpty(rcv) @@ -389,6 +398,8 @@ class SessionTests(Base): except Disconnected: pass +RECEIVER_Q = 'test-receiver-queue {create: always}' + class ReceiverTests(Base): def setup_connection(self): @@ -398,10 +409,10 @@ class ReceiverTests(Base): return self.conn.session() def setup_sender(self): - return self.ssn.sender("test-receiver-queue") + return self.ssn.sender(RECEIVER_Q) def setup_receiver(self): - return self.ssn.receiver("test-receiver-queue") + return self.ssn.receiver(RECEIVER_Q) def send(self, base, count = None): content = self.content(base, count) @@ -538,6 +549,66 @@ class ReceiverTests(Base): # XXX: need testClose +NOSUCH_Q = "this-queue-should-not-exist" +UNPARSEABLE_ADDR = "{bad address}" +UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" + +class AddressErrorTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port) + + def setup_session(self): + return self.conn.session() + + def sendErrorTest(self, addr, exc, check=lambda e: True): + snd = self.ssn.sender(addr) + try: + snd.send("hello") + assert False, "send succeeded" + except exc, e: + assert check(e), "unexpected error: %s" % e + snd.close() + + def fetchErrorTest(self, addr, exc, check=lambda e: True): + rcv = self.ssn.receiver(addr) + try: + rcv.fetch(timeout=0) + assert False, "fetch succeeded" + except exc, e: + assert check(e), "unexpected error: %s" % e + rcv.close() + + def testNoTarget(self): + # XXX: should have specific exception for this + self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) + + def testNoSource(self): + # XXX: should have specific exception for this + self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e)) + + def testUnparseableTarget(self): + # XXX: should have specific exception for this + self.sendErrorTest(UNPARSEABLE_ADDR, SendError, + lambda e: "expecting ID" in str(e)) + + def testUnparseableSource(self): + # XXX: should have specific exception for this + self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError, + lambda e: "expecting ID" in str(e)) + + def testUnlexableTarget(self): + # XXX: should have specific exception for this + self.sendErrorTest(UNLEXABLE_ADDR, SendError, + lambda e: "unrecognized character" in str(e)) + + def testUnlexableSource(self): + # XXX: should have specific exception for this + self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError, + lambda e: "unrecognized character" in str(e)) + +SENDER_Q = 'test-sender-q {create: always}' + class SenderTests(Base): def setup_connection(self): @@ -547,10 +618,10 @@ class SenderTests(Base): return self.conn.session() def setup_sender(self): - return self.ssn.sender("test-sender-queue") + return self.ssn.sender(SENDER_Q) def setup_receiver(self): - return self.ssn.receiver("test-sender-queue") + return self.ssn.receiver(SENDER_Q) def checkContent(self, content): self.snd.send(content) @@ -644,6 +715,8 @@ class MessageTests(Base): m.content = u"<html/>" assert m.content_type == "text/html; charset=utf8" +ECHO_Q = 'test-message-echo-queue {create: always}' + class MessageEchoTests(Base): def setup_connection(self): @@ -653,10 +726,10 @@ class MessageEchoTests(Base): return self.conn.session() def setup_sender(self): - return self.ssn.sender("test-message-echo-queue") + return self.ssn.sender(ECHO_Q) def setup_receiver(self): - return self.ssn.receiver("test-message-echo-queue") + return self.ssn.receiver(ECHO_Q) def check(self, msg): self.snd.send(msg) |
