diff options
Diffstat (limited to 'python/qpid/tests/messaging/endpoints.py')
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 1335 |
1 files changed, 0 insertions, 1335 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py deleted file mode 100644 index db5ec03df2..0000000000 --- a/python/qpid/tests/messaging/endpoints.py +++ /dev/null @@ -1,1335 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# setup, usage, teardown, errors(sync), errors(async), stress, soak, -# boundary-conditions, config - -import errno, os, socket, sys, time -from qpid import compat -from qpid.compat import set -from qpid.messaging import * -from qpid.messaging.transports import TRANSPORTS -from qpid.tests.messaging import Base -from threading import Thread - -class SetupTests(Base): - - def testEstablish(self): - self.conn = Connection.establish(self.broker, **self.connection_options()) - self.ping(self.conn.session()) - - def testOpen(self): - self.conn = Connection(self.broker, **self.connection_options()) - self.conn.open() - self.ping(self.conn.session()) - - def testOpenReconnectURLs(self): - options = self.connection_options() - options["reconnect_urls"] = [self.broker, self.broker] - self.conn = Connection(self.broker, **options) - self.conn.open() - self.ping(self.conn.session()) - - def testTcpNodelay(self): - self.conn = Connection.establish(self.broker, tcp_nodelay=True) - assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) - - def testConnectError(self): - try: - # Specifying port 0 yields a bad address on Windows; port 4 is unassigned - self.conn = Connection.establish("localhost:4") - assert False, "connect succeeded" - except ConnectError, e: - assert "refused" in str(e) - - def testGetError(self): - self.conn = Connection("localhost:0") - try: - self.conn.open() - assert False, "connect succeeded" - except ConnectError, e: - assert self.conn.get_error() == e - - def use_fds(self): - fds = [] - try: - while True: - fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY)) - except OSError, e: - if e.errno != errno.EMFILE: - raise e - else: - return fds - - def testOpenCloseResourceLeaks(self): - fds = self.use_fds() - try: - for i in range(32): - if fds: os.close(fds.pop()) - for i in xrange(64): - conn = Connection.establish(self.broker, **self.connection_options()) - conn.close() - finally: - while fds: - os.close(fds.pop()) - - def testOpenFailResourceLeaks(self): - fds = self.use_fds() - try: - for i in range(32): - if fds: os.close(fds.pop()) - for i in xrange(64): - conn = Connection("localhost:0", **self.connection_options()) - # XXX: we need to force a waiter to be created for this test - # to work - conn._lock.acquire() - conn._wait(lambda: False, timeout=0.001) - conn._lock.release() - try: - conn.open() - except ConnectError, e: - pass - finally: - while fds: - os.close(fds.pop()) - - def testReconnect(self): - options = self.connection_options() - real = TRANSPORTS["tcp"] - - class flaky: - - def __init__(self, conn, host, port): - self.real = real(conn, host, port) - self.sent_count = 0 - self.recv_count = 0 - - def fileno(self): - return self.real.fileno() - - def reading(self, reading): - return self.real.reading(reading) - - def writing(self, writing): - return self.real.writing(writing) - - def send(self, bytes): - if self.sent_count > 2048: - raise socket.error("fake error") - n = self.real.send(bytes) - self.sent_count += n - return n - - def recv(self, n): - if self.recv_count > 2048: - return "" - bytes = self.real.recv(n) - self.recv_count += len(bytes) - return bytes - - def close(self): - self.real.close() - - TRANSPORTS["flaky"] = flaky - - options["reconnect"] = True - options["reconnect_interval"] = 0 - options["reconnect_limit"] = 100 - options["reconnect_log"] = False - options["transport"] = "flaky" - - self.conn = Connection.establish(self.broker, **options) - ssn = self.conn.session() - snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}") - rcv = ssn.receiver(snd.target) - - msgs = [self.message("testReconnect", i) for i in range(20)] - for m in msgs: - snd.send(m) - - content = set() - drained = [] - duplicates = [] - try: - while True: - m = rcv.fetch(timeout=0) - if m.content not in content: - content.add(m.content) - drained.append(m) - else: - duplicates.append(m) - ssn.acknowledge(m) - except Empty: - pass - # XXX: apparently we don't always get duplicates, should figure out why - #assert duplicates, "no duplicates" - assert len(drained) == len(msgs) - for m, d in zip(msgs, drained): - # XXX: we should figure out how to provide proper end to end - # redelivered - self.assertEcho(m, d, d.redelivered) - -class ConnectionTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def testCheckClosed(self): - assert not self.conn.check_closed() - - def testSessionAnon(self): - ssn1 = self.conn.session() - ssn2 = self.conn.session() - self.ping(ssn1) - self.ping(ssn2) - assert ssn1 is not ssn2 - - def testSessionNamed(self): - ssn1 = self.conn.session("one") - ssn2 = self.conn.session("two") - self.ping(ssn1) - self.ping(ssn2) - assert ssn1 is not ssn2 - assert ssn1 is self.conn.session("one") - assert ssn2 is self.conn.session("two") - - def testDetach(self): - ssn = self.conn.session() - self.ping(ssn) - self.conn.detach() - try: - self.ping(ssn) - assert False, "ping succeeded" - except Detached: - # this is the expected failure when pinging on a detached - # connection - pass - self.conn.attach() - self.ping(ssn) - - def testClose(self): - self.conn.close() - assert not self.conn.attached() - - def testSimultaneousClose(self): - ssns = [self.conn.session() for i in range(3)] - for s in ssns: - for i in range(3): - s.receiver("amq.topic") - s.sender("amq.topic") - - def closer(errors): - try: - self.conn.close() - except: - _, e, _ = sys.exc_info() - errors.append(compat.format_exc(e)) - - t1_errors = [] - t2_errors = [] - t1 = Thread(target=lambda: closer(t1_errors)) - t2 = Thread(target=lambda: closer(t2_errors)) - t1.start() - t2.start() - t1.join(self.delay()) - t2.join(self.delay()) - - assert not t1_errors, t1_errors[0] - assert not t2_errors, t2_errors[0] - -class hangable: - - def __init__(self, conn, host, port): - self.tcp = TRANSPORTS["tcp"](conn, host, port) - self.hung = False - - def hang(self): - self.hung = True - - def fileno(self): - return self.tcp.fileno() - - def reading(self, reading): - if self.hung: - return True - else: - return self.tcp.reading(reading) - - def writing(self, writing): - if self.hung: - return False - else: - return self.tcp.writing(writing) - - def send(self, bytes): - if self.hung: - return 0 - else: - return self.tcp.send(bytes) - - def recv(self, n): - if self.hung: - return "" - else: - return self.tcp.recv(n) - - def close(self): - self.tcp.close() - -TRANSPORTS["hangable"] = hangable - -class TimeoutTests(Base): - - def setup_connection(self): - options = self.connection_options() - options["transport"] = "hangable" - return Connection.establish(self.broker, **options) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender("amq.topic") - - def setup_receiver(self): - return self.ssn.receiver("amq.topic; {link: {reliability: unreliable}}") - - def teardown_connection(self, conn): - try: - conn.detach(timeout=0) - except Timeout: - pass - - def hang(self): - self.conn._driver._transport.hang() - - def timeoutTest(self, method): - self.hang() - try: - method(timeout=self.delay()) - assert False, "did not time out" - except Timeout: - pass - - def testSenderSync(self): - self.snd.send(self.content("testSenderSync"), sync=False) - self.timeoutTest(self.snd.sync) - - def testSenderClose(self): - self.snd.send(self.content("testSenderClose"), sync=False) - self.timeoutTest(self.snd.close) - - def testReceiverClose(self): - self.timeoutTest(self.rcv.close) - - def testSessionSync(self): - self.snd.send(self.content("testSessionSync"), sync=False) - self.timeoutTest(self.ssn.sync) - - def testSessionClose(self): - self.timeoutTest(self.ssn.close) - - def testConnectionDetach(self): - self.timeoutTest(self.conn.detach) - - def testConnectionClose(self): - self.timeoutTest(self.conn.close) - -ACK_QC = 'test-ack-queue; {create: always}' -ACK_QD = 'test-ack-queue; {delete: always}' - -class SessionTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def testSender(self): - snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}', - durable=self.durable()) - snd2 = self.ssn.sender(snd.target, durable=self.durable()) - assert snd is not snd2 - snd2.close() - - content = self.content("testSender") - snd.send(content) - rcv = self.ssn.receiver(snd.target) - msg = rcv.fetch(0) - assert msg.content == content - self.ssn.acknowledge(msg) - - def testReceiver(self): - rcv = self.ssn.receiver('test-rcv-queue; {create: always}') - rcv2 = self.ssn.receiver(rcv.source) - assert rcv is not rcv2 - rcv2.close() - - content = self.content("testReceiver") - snd = self.ssn.sender(rcv.source, durable=self.durable()) - snd.send(content) - msg = rcv.fetch(0) - assert msg.content == content - self.ssn.acknowledge(msg) - snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') - - def testDetachedReceiver(self): - self.conn.detach() - rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}") - m = self.content("testDetachedReceiver") - self.conn.attach() - snd = self.ssn.sender("test-dis-rcv-queue") - snd.send(m) - self.drain(rcv, expected=[m]) - - def testNextReceiver(self): - ADDR = 'test-next-rcv-queue; {create: always, delete: always}' - rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - - snd = self.ssn.sender(ADDR) - - msgs = [] - for i in range(10): - content = self.content("testNextReceiver", i) - snd.send(content) - msgs.append(content) - - fetched = [] - try: - while True: - rcv = self.ssn.next_receiver(timeout=self.delay()) - assert rcv in (rcv1, rcv2, rcv3) - assert rcv.available() > 0 - fetched.append(rcv.fetch().content) - except Empty: - pass - assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) - self.ssn.acknowledge() - #we set the capacity to 0 to prevent the deletion of the queue - - #triggered the deletion policy when the first receiver is closed - - #resulting in session exceptions being issued for the remaining - #active subscriptions: - for r in [rcv1, rcv2, rcv3]: - r.capacity = 0 - - # XXX, we need a convenient way to assert that required queues are - # empty on setup, and possibly also to drain queues on teardown - def ackTest(self, acker, ack_capacity=None): - # send a bunch of messages - snd = self.ssn.sender(ACK_QC, durable=self.durable()) - contents = [self.content("ackTest", i) for i in range(15)] - for c in contents: - snd.send(c) - - # drain the queue, verify the messages are there and then close - # without acking - rcv = self.ssn.receiver(ACK_QC) - self.drain(rcv, expected=contents) - self.ssn.close() - - # drain the queue again, verify that they are all the messages - # were requeued, and ack this time before closing - self.ssn = self.conn.session() - if ack_capacity is not None: - self.ssn.ack_capacity = ack_capacity - rcv = self.ssn.receiver(ACK_QC) - self.drain(rcv, expected=contents) - acker(self.ssn) - self.ssn.close() - - # drain the queue a final time and verify that the messages were - # dequeued - self.ssn = self.conn.session() - rcv = self.ssn.receiver(ACK_QD) - self.assertEmpty(rcv) - - def testAcknowledge(self): - self.ackTest(lambda ssn: ssn.acknowledge()) - - def testAcknowledgeAsync(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) - - def testAcknowledgeAsyncAckCap0(self): - try: - try: - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) - assert False, "acknowledge shouldn't succeed with ack_capacity of zero" - except InsufficientCapacity: - pass - finally: - self.ssn.ack_capacity = UNLIMITED - self.drain(self.ssn.receiver(ACK_QD)) - self.ssn.acknowledge() - - def testAcknowledgeAsyncAckCap1(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) - - def testAcknowledgeAsyncAckCap5(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) - - def testAcknowledgeAsyncAckCapUNLIMITED(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) - - def testRelease(self): - msgs = [self.message("testRelease", i) for i in range(3)] - snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") - for m in msgs: - snd.send(m) - rcv = self.ssn.receiver(snd.target) - echos = self.drain(rcv, expected=msgs) - self.ssn.acknowledge(echos[0]) - self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) - self.ssn.acknowledge(echos[2], Disposition(RELEASED)) - self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) - self.drain(rcv, expected=msgs[2:3]) - self.ssn.acknowledge() - - def testReject(self): - msgs = [self.message("testReject", i) for i in range(3)] - snd = self.ssn.sender(""" - test-reject-queue; { - create: always, - delete: always, - node: { - x-declare: { - alternate-exchange: 'amq.topic' - } - } - } -""") - for m in msgs: - snd.send(m) - rcv = self.ssn.receiver(snd.target) - rej = self.ssn.receiver("amq.topic") - echos = self.drain(rcv, expected=msgs) - self.ssn.acknowledge(echos[0]) - self.ssn.acknowledge(echos[1], Disposition(REJECTED)) - self.ssn.acknowledge(echos[2], - Disposition(REJECTED, code=3, text="test-reject")) - self.drain(rej, expected=msgs[1:]) - self.ssn.acknowledge() - - def send(self, ssn, target, base, count=1): - snd = ssn.sender(target, durable=self.durable()) - messages = [] - for i in range(count): - c = self.message(base, i) - snd.send(c) - messages.append(c) - snd.close() - return messages - - def txTest(self, commit): - TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' - TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' - txssn = self.conn.session(transactional=True) - messages = self.send(self.ssn, TX_Q, "txTest", 3) - txrcv = txssn.receiver(TX_Q) - txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) - rcv = self.ssn.receiver(txrcv.source) - copy_rcv = self.ssn.receiver(txsnd.target) - self.assertEmpty(copy_rcv) - for i in range(3): - m = txrcv.fetch(0) - txsnd.send(m) - self.assertEmpty(copy_rcv) - txssn.acknowledge() - if commit: - txssn.commit() - self.assertEmpty(rcv) - self.drain(copy_rcv, expected=messages) - else: - txssn.rollback() - self.drain(rcv, expected=messages, redelivered=True) - self.assertEmpty(copy_rcv) - self.ssn.acknowledge() - - def testCommit(self): - self.txTest(True) - - def testRollback(self): - self.txTest(False) - - def txTestSend(self, commit): - TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' - txssn = self.conn.session(transactional=True) - messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) - rcv = self.ssn.receiver(TX_SEND_Q) - self.assertEmpty(rcv) - - if commit: - txssn.commit() - self.drain(rcv, expected=messages) - self.ssn.acknowledge() - else: - txssn.rollback() - self.assertEmpty(rcv) - txssn.commit() - self.assertEmpty(rcv) - - def testCommitSend(self): - self.txTestSend(True) - - def testRollbackSend(self): - self.txTestSend(False) - - def txTestAck(self, commit): - TX_ACK_QC = 'test-tx-ack-queue; {create: always}' - TX_ACK_QD = 'test-tx-ack-queue; {delete: always}' - txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_QC) - self.assertEmpty(txrcv) - messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) - self.drain(txrcv, expected=messages) - - if commit: - txssn.acknowledge() - else: - txssn.rollback() - self.drain(txrcv, expected=messages, redelivered=True) - txssn.acknowledge() - txssn.rollback() - self.drain(txrcv, expected=messages, redelivered=True) - txssn.commit() # commit without ack - self.assertEmpty(txrcv) - - txssn.close() - - txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_QC) - self.drain(txrcv, expected=messages, redelivered=True) - txssn.acknowledge() - txssn.commit() - rcv = self.ssn.receiver(TX_ACK_QD) - self.assertEmpty(rcv) - txssn.close() - self.assertEmpty(rcv) - - def testCommitAck(self): - self.txTestAck(True) - - def testRollbackAck(self): - self.txTestAck(False) - - def testDoubleCommit(self): - ssn = self.conn.session(transactional=True) - snd = ssn.sender("amq.direct") - rcv = ssn.receiver("amq.direct") - msgs = [self.message("testDoubleCommit", i) for i in range(3)] - for m in msgs: - snd.send(m) - ssn.commit() - self.drain(rcv, expected=msgs) - ssn.acknowledge() - ssn.commit() - - def testClose(self): - self.ssn.close() - try: - self.ping(self.ssn) - assert False, "ping succeeded" - except Detached: - pass - -RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' - -class ReceiverTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(RECEIVER_Q) - - def setup_receiver(self): - return self.ssn.receiver(RECEIVER_Q) - - def send(self, base, count = None, sync=True): - content = self.content(base, count) - self.snd.send(content, sync=sync) - return content - - def testFetch(self): - try: - msg = self.rcv.fetch(0) - assert False, "unexpected message: %s" % msg - except Empty: - pass - try: - start = time.time() - msg = self.rcv.fetch(self.delay()) - assert False, "unexpected message: %s" % msg - except Empty: - elapsed = time.time() - start - assert elapsed >= self.delay() - - one = self.send("testFetch", 1) - two = self.send("testFetch", 2) - three = self.send("testFetch", 3) - msg = self.rcv.fetch(0) - assert msg.content == one - msg = self.rcv.fetch(self.delay()) - assert msg.content == two - msg = self.rcv.fetch() - assert msg.content == three - self.ssn.acknowledge() - - def fetchFromClosedTest(self, entry): - entry.close() - try: - msg = self.rcv.fetch(0) - assert False, "unexpected result: %s" % msg - except Empty, e: - assert False, "unexpected exception: %s" % e - except LinkClosed, e: - pass - - def testFetchFromClosedReceiver(self): - self.fetchFromClosedTest(self.rcv) - - def testFetchFromClosedSession(self): - self.fetchFromClosedTest(self.ssn) - - def testFetchFromClosedConnection(self): - self.fetchFromClosedTest(self.conn) - - def fetchFromConcurrentCloseTest(self, entry): - def closer(): - self.sleep() - entry.close() - t = Thread(target=closer) - t.start() - try: - msg = self.rcv.fetch() - assert False, "unexpected result: %s" % msg - except Empty, e: - assert False, "unexpected exception: %s" % e - except LinkClosed, e: - pass - t.join() - - def testFetchFromConcurrentCloseReceiver(self): - self.fetchFromConcurrentCloseTest(self.rcv) - - def testFetchFromConcurrentCloseSession(self): - self.fetchFromConcurrentCloseTest(self.ssn) - - def testFetchFromConcurrentCloseConnection(self): - self.fetchFromConcurrentCloseTest(self.conn) - - def testCapacityIncrease(self): - content = self.send("testCapacityIncrease") - self.sleep() - assert self.rcv.available() == 0 - self.rcv.capacity = UNLIMITED - self.sleep() - assert self.rcv.available() == 1 - msg = self.rcv.fetch(0) - assert msg.content == content - assert self.rcv.available() == 0 - self.ssn.acknowledge() - - def testCapacityDecrease(self): - self.rcv.capacity = UNLIMITED - one = self.send("testCapacityDecrease", 1) - self.sleep() - assert self.rcv.available() == 1 - msg = self.rcv.fetch(0) - assert msg.content == one - - self.rcv.capacity = 0 - - two = self.send("testCapacityDecrease", 2) - self.sleep() - assert self.rcv.available() == 0 - msg = self.rcv.fetch(0) - assert msg.content == two - - self.ssn.acknowledge() - - def capacityTest(self, capacity, threshold=None): - if threshold is not None: - self.rcv.threshold = threshold - self.rcv.capacity = capacity - self.assertAvailable(self.rcv, 0) - - for i in range(2*capacity): - self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False) - self.snd.sync() - self.sleep() - self.assertAvailable(self.rcv) - - first = capacity/2 - second = capacity - first - self.drain(self.rcv, limit = first) - self.sleep() - self.assertAvailable(self.rcv) - self.drain(self.rcv, limit = second) - self.sleep() - self.assertAvailable(self.rcv) - - drained = self.drain(self.rcv) - assert len(drained) == capacity, "%s, %s" % (len(drained), drained) - self.assertAvailable(self.rcv, 0) - - self.ssn.acknowledge() - - def testCapacity5(self): - self.capacityTest(5) - - def testCapacity5Threshold1(self): - self.capacityTest(5, 1) - - def testCapacity10(self): - self.capacityTest(10) - - def testCapacity10Threshold1(self): - self.capacityTest(10, 1) - - def testCapacity100(self): - self.capacityTest(100) - - def testCapacity100Threshold1(self): - self.capacityTest(100, 1) - - def testCapacityUNLIMITED(self): - self.rcv.capacity = UNLIMITED - self.assertAvailable(self.rcv, 0) - - for i in range(10): - self.send("testCapacityUNLIMITED", i) - self.sleep() - self.assertAvailable(self.rcv, 10) - - self.drain(self.rcv) - self.assertAvailable(self.rcv, 0) - - self.ssn.acknowledge() - - def testAvailable(self): - self.rcv.capacity = UNLIMITED - assert self.rcv.available() == 0 - - for i in range(3): - self.send("testAvailable", i) - self.sleep() - assert self.rcv.available() == 3 - - for i in range(3, 10): - self.send("testAvailable", i) - self.sleep() - assert self.rcv.available() == 10 - - self.drain(self.rcv, limit=3) - assert self.rcv.available() == 7 - - self.drain(self.rcv) - assert self.rcv.available() == 0 - - self.ssn.acknowledge() - - def testDoubleClose(self): - m1 = self.content("testDoubleClose", 1) - m2 = self.content("testDoubleClose", 2) - - snd = self.ssn.sender("""test-double-close; { - create: always, - delete: sender, - node: { - type: topic - } -} -""") - r1 = self.ssn.receiver(snd.target) - r2 = self.ssn.receiver(snd.target) - snd.send(m1) - self.drain(r1, expected=[m1]) - self.drain(r2, expected=[m1]) - r1.close() - snd.send(m2) - self.drain(r2, expected=[m2]) - r2.close() - - # XXX: need testClose - - def testMode(self): - msgs = [self.content("testMode", 1), - self.content("testMode", 2), - self.content("testMode", 3)] - - for m in msgs: - self.snd.send(m) - - rb = self.ssn.receiver('test-receiver-queue; {mode: browse}') - rc = self.ssn.receiver('test-receiver-queue; {mode: consume}') - self.drain(rb, expected=msgs) - self.drain(rc, expected=msgs) - rb2 = self.ssn.receiver(rb.source) - self.assertEmpty(rb2) - self.drain(self.rcv, expected=[]) - - # XXX: need testUnsettled() - - def unreliabilityTest(self, mode="unreliable"): - msgs = [self.message("testUnreliable", i) for i in range(3)] - snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}") - rcv = self.ssn.receiver(snd.target) - for m in msgs: - snd.send(m) - - # close without ack on reliable receiver, messages should be requeued - ssn = self.conn.session() - rrcv = ssn.receiver("test-unreliability-queue") - self.drain(rrcv, expected=msgs) - ssn.close() - - # close without ack on unreliable receiver, messages should not be requeued - ssn = self.conn.session() - urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode) - self.drain(urcv, expected=msgs, redelivered=True) - ssn.close() - - self.assertEmpty(rcv) - - def testUnreliable(self): - self.unreliabilityTest(mode="unreliable") - - def testAtMostOnce(self): - self.unreliabilityTest(mode="at-most-once") - -class AddressTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def badOption(self, options, error): - try: - self.ssn.sender("test-bad-options-snd; %s" % options) - assert False - except InvalidOption, e: - assert "error in options: %s" % error == str(e), e - - try: - self.ssn.receiver("test-bad-options-rcv; %s" % options) - assert False - except InvalidOption, e: - assert "error in options: %s" % error == str(e), e - - def testIllegalKey(self): - self.badOption("{create: always, node: " - "{this-property-does-not-exist: 3}}", - "node: this-property-does-not-exist: " - "illegal key") - - def testWrongValue(self): - self.badOption("{create: asdf}", "create: asdf not in " - "('always', 'sender', 'receiver', 'never')") - - def testWrongType1(self): - self.badOption("{node: asdf}", - "node: asdf is not a map") - - def testWrongType2(self): - self.badOption("{node: {durable: []}}", - "node: durable: [] is not a bool") - - def testCreateQueue(self): - snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " - "node: {type: queue, durable: False, " - "x-declare: {auto_delete: true}}}") - content = self.content("testCreateQueue") - snd.send(content) - rcv = self.ssn.receiver("test-create-queue") - self.drain(rcv, expected=[content]) - - def createExchangeTest(self, props=""): - addr = """test-create-exchange; { - create: always, - delete: always, - node: { - type: topic, - durable: False, - x-declare: {auto_delete: true, %s} - } - }""" % props - snd = self.ssn.sender(addr) - snd.send("ping") - rcv1 = self.ssn.receiver("test-create-exchange/first") - rcv2 = self.ssn.receiver("test-create-exchange/first") - rcv3 = self.ssn.receiver("test-create-exchange/second") - for r in (rcv1, rcv2, rcv3): - try: - r.fetch(0) - assert False - except Empty: - pass - msg1 = Message(self.content("testCreateExchange", 1), subject="first") - msg2 = Message(self.content("testCreateExchange", 2), subject="second") - snd.send(msg1) - snd.send(msg2) - self.drain(rcv1, expected=[msg1.content]) - self.drain(rcv2, expected=[msg1.content]) - self.drain(rcv3, expected=[msg2.content]) - - def testCreateExchange(self): - self.createExchangeTest() - - def testCreateExchangeDirect(self): - self.createExchangeTest("type: direct") - - def testCreateExchangeTopic(self): - self.createExchangeTest("type: topic") - - def testDeleteBySender(self): - snd = self.ssn.sender("test-delete; {create: always}") - snd.send("ping") - snd.close() - snd = self.ssn.sender("test-delete; {delete: always}") - snd.send("ping") - snd.close() - try: - self.ssn.sender("test-delete") - except NotFound, e: - assert "no such queue" in str(e) - - def testDeleteByReceiver(self): - rcv = self.ssn.receiver("test-delete; {create: always, delete: always}") - try: - rcv.fetch(0) - except Empty: - pass - rcv.close() - - try: - self.ssn.receiver("test-delete") - assert False - except NotFound, e: - assert "no such queue" in str(e) - - def testDeleteSpecial(self): - snd = self.ssn.sender("amq.topic; {delete: always}") - snd.send("asdf") - try: - snd.close() - assert False, "successfully deleted amq.topic" - except SessionError, e: - assert "Cannot delete default exchange" in str(e) - # XXX: need to figure out close after error - self.conn._remove_session(self.ssn) - - def testNodeBindingsQueue(self): - snd = self.ssn.sender(""" -test-node-bindings-queue; { - create: always, - delete: always, - node: { - x-bindings: [{exchange: "amq.topic", key: "a.#"}, - {exchange: "amq.direct", key: "b"}, - {exchange: "amq.topic", key: "c.*"}] - } -} -""") - snd.send("one") - snd_a = self.ssn.sender("amq.topic/a.foo") - snd_b = self.ssn.sender("amq.direct/b") - snd_c = self.ssn.sender("amq.topic/c.bar") - snd_a.send("two") - snd_b.send("three") - snd_c.send("four") - rcv = self.ssn.receiver("test-node-bindings-queue") - self.drain(rcv, expected=["one", "two", "three", "four"]) - - def testNodeBindingsTopic(self): - rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}") - rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}") - rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}") - rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}") - snd = self.ssn.sender(""" -test-node-bindings-topic; { - create: always, - delete: always, - node: { - type: topic, - x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"}, - {queue: test-node-bindings-topic-queue-a, key: "a.#"}, - {queue: test-node-bindings-topic-queue-b, key: "b"}, - {queue: test-node-bindings-topic-queue-c, key: "c.*"}] - } -} -""") - m1 = Message("one") - m2 = Message(subject="a.foo", content="two") - m3 = Message(subject="b", content="three") - m4 = Message(subject="c.bar", content="four") - snd.send(m1) - snd.send(m2) - snd.send(m3) - snd.send(m4) - self.drain(rcv, expected=[m1, m2, m3, m4]) - self.drain(rcv_a, expected=[m2]) - self.drain(rcv_b, expected=[m3]) - self.drain(rcv_c, expected=[m4]) - - def testLinkBindings(self): - m_a = self.message("testLinkBindings", 1, subject="a") - m_b = self.message("testLinkBindings", 2, subject="b") - - self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}") - snd = self.ssn.sender("amq.topic") - - snd.send(m_a) - snd.send(m_b) - snd.close() - - rcv = self.ssn.receiver("test-link-bindings-queue") - self.assertEmpty(rcv) - - snd = self.ssn.sender(""" -amq.topic; { - link: { - x-bindings: [{queue: test-link-bindings-queue, key: a}] - } -} -""") - - snd.send(m_a) - snd.send(m_b) - - self.drain(rcv, expected=[m_a]) - rcv.close() - - rcv = self.ssn.receiver(""" -test-link-bindings-queue; { - link: { - x-bindings: [{exchange: "amq.topic", key: b}] - } -} -""") - - snd.send(m_a) - snd.send(m_b) - - self.drain(rcv, expected=[m_a, m_b]) - - def testSubjectOverride(self): - snd = self.ssn.sender("amq.topic/a") - rcv_a = self.ssn.receiver("amq.topic/a") - rcv_b = self.ssn.receiver("amq.topic/b") - m1 = self.content("testSubjectOverride", 1) - m2 = self.content("testSubjectOverride", 2) - snd.send(m1) - snd.send(Message(subject="b", content=m2)) - self.drain(rcv_a, expected=[m1]) - self.drain(rcv_b, expected=[m2]) - - def testSubjectDefault(self): - m1 = self.content("testSubjectDefault", 1) - m2 = self.content("testSubjectDefault", 2) - snd = self.ssn.sender("amq.topic/a") - rcv = self.ssn.receiver("amq.topic") - snd.send(m1) - snd.send(Message(subject="b", content=m2)) - e1 = rcv.fetch(timeout=0) - e2 = rcv.fetch(timeout=0) - assert e1.subject == "a", "subject: %s" % e1.subject - assert e2.subject == "b", "subject: %s" % e2.subject - self.assertEmpty(rcv) - - def doReliabilityTest(self, reliability, messages, expected): - snd = self.ssn.sender("amq.topic") - rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability) - for m in messages: - snd.send(m) - self.conn.detach() - self.conn.attach() - self.drain(rcv, expected=expected) - - def testReliabilityUnreliable(self): - msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)] - self.doReliabilityTest("unreliable", msgs, []) - - def testReliabilityAtLeastOnce(self): - msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)] - self.doReliabilityTest("at-least-once", msgs, msgs) - - def testLinkName(self): - msgs = [self.message("testLinkName", i) for i in range(3)] - snd = self.ssn.sender("amq.topic") - trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}") - qrcv = self.ssn.receiver("test-link-name") - for m in msgs: - snd.send(m) - self.drain(qrcv, expected=msgs) - - def testAssert1(self): - try: - snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}") - assert 0, "assertion failed to trigger" - except AssertionFailed, e: - pass - - def testAssert2(self): - snd = self.ssn.sender("amq.topic; {assert: always}") - -NOSUCH_Q = "this-queue-should-not-exist" -UNPARSEABLE_ADDR = "name/subject; {bad options" -UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" - -class AddressErrorTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def senderErrorTest(self, addr, exc, check=lambda e: True): - try: - self.ssn.sender(addr, durable=self.durable()) - assert False, "sender creation succeeded" - except exc, e: - assert check(e), "unexpected error: %s" % compat.format_exc(e) - - def receiverErrorTest(self, addr, exc, check=lambda e: True): - try: - self.ssn.receiver(addr) - assert False, "receiver creation succeeded" - except exc, e: - assert check(e), "unexpected error: %s" % compat.format_exc(e) - - def testNoneTarget(self): - self.senderErrorTest(None, MalformedAddress) - - def testNoneSource(self): - self.receiverErrorTest(None, MalformedAddress) - - def testNoTarget(self): - self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) - - def testNoSource(self): - self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) - - def testUnparseableTarget(self): - self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress, - lambda e: "expecting COLON" in str(e)) - - def testUnparseableSource(self): - self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress, - lambda e: "expecting COLON" in str(e)) - - def testUnlexableTarget(self): - self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress, - lambda e: "unrecognized characters" in str(e)) - - def testUnlexableSource(self): - self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress, - lambda e: "unrecognized characters" in str(e)) - - def testInvalidMode(self): - self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}', - InvalidOption, - lambda e: "not in ('browse', 'consume')" in str(e)) - -SENDER_Q = 'test-sender-q; {create: always, delete: always}' - -class SenderTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(SENDER_Q) - - def setup_receiver(self): - return self.ssn.receiver(SENDER_Q) - - def checkContent(self, content): - self.snd.send(content) - msg = self.rcv.fetch(0) - assert msg.content == content - - out = Message(content) - self.snd.send(out) - echo = self.rcv.fetch(0) - assert out.content == echo.content - assert echo.content == msg.content - self.ssn.acknowledge() - - def testSendString(self): - self.checkContent(self.content("testSendString")) - - def testSendList(self): - self.checkContent(["testSendList", 1, 3.14, self.test_id]) - - def testSendMap(self): - self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) - - def asyncTest(self, capacity): - self.snd.capacity = capacity - msgs = [self.content("asyncTest", i) for i in range(15)] - for m in msgs: - self.snd.send(m, sync=False) - self.drain(self.rcv, timeout=self.delay(), expected=msgs) - self.ssn.acknowledge() - - def testSendAsyncCapacity0(self): - try: - self.asyncTest(0) - assert False, "send shouldn't succeed with zero capacity" - except InsufficientCapacity: - # this is expected - pass - - def testSendAsyncCapacity1(self): - self.asyncTest(1) - - def testSendAsyncCapacity5(self): - self.asyncTest(5) - - def testSendAsyncCapacityUNLIMITED(self): - self.asyncTest(UNLIMITED) - - def testCapacityTimeout(self): - self.snd.capacity = 1 - msgs = [] - caught = False - while len(msgs) < 100: - m = self.content("testCapacity", len(msgs)) - try: - self.snd.send(m, sync=False, timeout=0) - msgs.append(m) - except InsufficientCapacity: - caught = True - break - self.snd.sync() - self.drain(self.rcv, expected=msgs) - self.ssn.acknowledge() - assert caught, "did not exceed capacity" |