summaryrefslogtreecommitdiff
path: root/python/qpid/tests/messaging/endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/tests/messaging/endpoints.py')
-rw-r--r--python/qpid/tests/messaging/endpoints.py1335
1 files changed, 0 insertions, 1335 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
deleted file mode 100644
index db5ec03df2..0000000000
--- a/python/qpid/tests/messaging/endpoints.py
+++ /dev/null
@@ -1,1335 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# setup, usage, teardown, errors(sync), errors(async), stress, soak,
-# boundary-conditions, config
-
-import errno, os, socket, sys, time
-from qpid import compat
-from qpid.compat import set
-from qpid.messaging import *
-from qpid.messaging.transports import TRANSPORTS
-from qpid.tests.messaging import Base
-from threading import Thread
-
-class SetupTests(Base):
-
- def testEstablish(self):
- self.conn = Connection.establish(self.broker, **self.connection_options())
- self.ping(self.conn.session())
-
- def testOpen(self):
- self.conn = Connection(self.broker, **self.connection_options())
- self.conn.open()
- self.ping(self.conn.session())
-
- def testOpenReconnectURLs(self):
- options = self.connection_options()
- options["reconnect_urls"] = [self.broker, self.broker]
- self.conn = Connection(self.broker, **options)
- self.conn.open()
- self.ping(self.conn.session())
-
- def testTcpNodelay(self):
- self.conn = Connection.establish(self.broker, tcp_nodelay=True)
- assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
-
- def testConnectError(self):
- try:
- # Specifying port 0 yields a bad address on Windows; port 4 is unassigned
- self.conn = Connection.establish("localhost:4")
- assert False, "connect succeeded"
- except ConnectError, e:
- assert "refused" in str(e)
-
- def testGetError(self):
- self.conn = Connection("localhost:0")
- try:
- self.conn.open()
- assert False, "connect succeeded"
- except ConnectError, e:
- assert self.conn.get_error() == e
-
- def use_fds(self):
- fds = []
- try:
- while True:
- fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY))
- except OSError, e:
- if e.errno != errno.EMFILE:
- raise e
- else:
- return fds
-
- def testOpenCloseResourceLeaks(self):
- fds = self.use_fds()
- try:
- for i in range(32):
- if fds: os.close(fds.pop())
- for i in xrange(64):
- conn = Connection.establish(self.broker, **self.connection_options())
- conn.close()
- finally:
- while fds:
- os.close(fds.pop())
-
- def testOpenFailResourceLeaks(self):
- fds = self.use_fds()
- try:
- for i in range(32):
- if fds: os.close(fds.pop())
- for i in xrange(64):
- conn = Connection("localhost:0", **self.connection_options())
- # XXX: we need to force a waiter to be created for this test
- # to work
- conn._lock.acquire()
- conn._wait(lambda: False, timeout=0.001)
- conn._lock.release()
- try:
- conn.open()
- except ConnectError, e:
- pass
- finally:
- while fds:
- os.close(fds.pop())
-
- def testReconnect(self):
- options = self.connection_options()
- real = TRANSPORTS["tcp"]
-
- class flaky:
-
- def __init__(self, conn, host, port):
- self.real = real(conn, host, port)
- self.sent_count = 0
- self.recv_count = 0
-
- def fileno(self):
- return self.real.fileno()
-
- def reading(self, reading):
- return self.real.reading(reading)
-
- def writing(self, writing):
- return self.real.writing(writing)
-
- def send(self, bytes):
- if self.sent_count > 2048:
- raise socket.error("fake error")
- n = self.real.send(bytes)
- self.sent_count += n
- return n
-
- def recv(self, n):
- if self.recv_count > 2048:
- return ""
- bytes = self.real.recv(n)
- self.recv_count += len(bytes)
- return bytes
-
- def close(self):
- self.real.close()
-
- TRANSPORTS["flaky"] = flaky
-
- options["reconnect"] = True
- options["reconnect_interval"] = 0
- options["reconnect_limit"] = 100
- options["reconnect_log"] = False
- options["transport"] = "flaky"
-
- self.conn = Connection.establish(self.broker, **options)
- ssn = self.conn.session()
- snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
- rcv = ssn.receiver(snd.target)
-
- msgs = [self.message("testReconnect", i) for i in range(20)]
- for m in msgs:
- snd.send(m)
-
- content = set()
- drained = []
- duplicates = []
- try:
- while True:
- m = rcv.fetch(timeout=0)
- if m.content not in content:
- content.add(m.content)
- drained.append(m)
- else:
- duplicates.append(m)
- ssn.acknowledge(m)
- except Empty:
- pass
- # XXX: apparently we don't always get duplicates, should figure out why
- #assert duplicates, "no duplicates"
- assert len(drained) == len(msgs)
- for m, d in zip(msgs, drained):
- # XXX: we should figure out how to provide proper end to end
- # redelivered
- self.assertEcho(m, d, d.redelivered)
-
-class ConnectionTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def testCheckClosed(self):
- assert not self.conn.check_closed()
-
- def testSessionAnon(self):
- ssn1 = self.conn.session()
- ssn2 = self.conn.session()
- self.ping(ssn1)
- self.ping(ssn2)
- assert ssn1 is not ssn2
-
- def testSessionNamed(self):
- ssn1 = self.conn.session("one")
- ssn2 = self.conn.session("two")
- self.ping(ssn1)
- self.ping(ssn2)
- assert ssn1 is not ssn2
- assert ssn1 is self.conn.session("one")
- assert ssn2 is self.conn.session("two")
-
- def testDetach(self):
- ssn = self.conn.session()
- self.ping(ssn)
- self.conn.detach()
- try:
- self.ping(ssn)
- assert False, "ping succeeded"
- except Detached:
- # this is the expected failure when pinging on a detached
- # connection
- pass
- self.conn.attach()
- self.ping(ssn)
-
- def testClose(self):
- self.conn.close()
- assert not self.conn.attached()
-
- def testSimultaneousClose(self):
- ssns = [self.conn.session() for i in range(3)]
- for s in ssns:
- for i in range(3):
- s.receiver("amq.topic")
- s.sender("amq.topic")
-
- def closer(errors):
- try:
- self.conn.close()
- except:
- _, e, _ = sys.exc_info()
- errors.append(compat.format_exc(e))
-
- t1_errors = []
- t2_errors = []
- t1 = Thread(target=lambda: closer(t1_errors))
- t2 = Thread(target=lambda: closer(t2_errors))
- t1.start()
- t2.start()
- t1.join(self.delay())
- t2.join(self.delay())
-
- assert not t1_errors, t1_errors[0]
- assert not t2_errors, t2_errors[0]
-
-class hangable:
-
- def __init__(self, conn, host, port):
- self.tcp = TRANSPORTS["tcp"](conn, host, port)
- self.hung = False
-
- def hang(self):
- self.hung = True
-
- def fileno(self):
- return self.tcp.fileno()
-
- def reading(self, reading):
- if self.hung:
- return True
- else:
- return self.tcp.reading(reading)
-
- def writing(self, writing):
- if self.hung:
- return False
- else:
- return self.tcp.writing(writing)
-
- def send(self, bytes):
- if self.hung:
- return 0
- else:
- return self.tcp.send(bytes)
-
- def recv(self, n):
- if self.hung:
- return ""
- else:
- return self.tcp.recv(n)
-
- def close(self):
- self.tcp.close()
-
-TRANSPORTS["hangable"] = hangable
-
-class TimeoutTests(Base):
-
- def setup_connection(self):
- options = self.connection_options()
- options["transport"] = "hangable"
- return Connection.establish(self.broker, **options)
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender("amq.topic")
-
- def setup_receiver(self):
- return self.ssn.receiver("amq.topic; {link: {reliability: unreliable}}")
-
- def teardown_connection(self, conn):
- try:
- conn.detach(timeout=0)
- except Timeout:
- pass
-
- def hang(self):
- self.conn._driver._transport.hang()
-
- def timeoutTest(self, method):
- self.hang()
- try:
- method(timeout=self.delay())
- assert False, "did not time out"
- except Timeout:
- pass
-
- def testSenderSync(self):
- self.snd.send(self.content("testSenderSync"), sync=False)
- self.timeoutTest(self.snd.sync)
-
- def testSenderClose(self):
- self.snd.send(self.content("testSenderClose"), sync=False)
- self.timeoutTest(self.snd.close)
-
- def testReceiverClose(self):
- self.timeoutTest(self.rcv.close)
-
- def testSessionSync(self):
- self.snd.send(self.content("testSessionSync"), sync=False)
- self.timeoutTest(self.ssn.sync)
-
- def testSessionClose(self):
- self.timeoutTest(self.ssn.close)
-
- def testConnectionDetach(self):
- self.timeoutTest(self.conn.detach)
-
- def testConnectionClose(self):
- self.timeoutTest(self.conn.close)
-
-ACK_QC = 'test-ack-queue; {create: always}'
-ACK_QD = 'test-ack-queue; {delete: always}'
-
-class SessionTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def testSender(self):
- snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
- durable=self.durable())
- snd2 = self.ssn.sender(snd.target, durable=self.durable())
- assert snd is not snd2
- snd2.close()
-
- content = self.content("testSender")
- snd.send(content)
- rcv = self.ssn.receiver(snd.target)
- msg = rcv.fetch(0)
- assert msg.content == content
- self.ssn.acknowledge(msg)
-
- def testReceiver(self):
- rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
- rcv2 = self.ssn.receiver(rcv.source)
- assert rcv is not rcv2
- rcv2.close()
-
- content = self.content("testReceiver")
- snd = self.ssn.sender(rcv.source, durable=self.durable())
- snd.send(content)
- msg = rcv.fetch(0)
- assert msg.content == content
- self.ssn.acknowledge(msg)
- snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
-
- def testDetachedReceiver(self):
- self.conn.detach()
- rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
- m = self.content("testDetachedReceiver")
- self.conn.attach()
- snd = self.ssn.sender("test-dis-rcv-queue")
- snd.send(m)
- self.drain(rcv, expected=[m])
-
- def testNextReceiver(self):
- ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
- rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
- rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
- rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
-
- snd = self.ssn.sender(ADDR)
-
- msgs = []
- for i in range(10):
- content = self.content("testNextReceiver", i)
- snd.send(content)
- msgs.append(content)
-
- fetched = []
- try:
- while True:
- rcv = self.ssn.next_receiver(timeout=self.delay())
- assert rcv in (rcv1, rcv2, rcv3)
- assert rcv.available() > 0
- fetched.append(rcv.fetch().content)
- except Empty:
- pass
- assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
- self.ssn.acknowledge()
- #we set the capacity to 0 to prevent the deletion of the queue -
- #triggered the deletion policy when the first receiver is closed -
- #resulting in session exceptions being issued for the remaining
- #active subscriptions:
- for r in [rcv1, rcv2, rcv3]:
- r.capacity = 0
-
- # XXX, we need a convenient way to assert that required queues are
- # empty on setup, and possibly also to drain queues on teardown
- def ackTest(self, acker, ack_capacity=None):
- # send a bunch of messages
- snd = self.ssn.sender(ACK_QC, durable=self.durable())
- contents = [self.content("ackTest", i) for i in range(15)]
- for c in contents:
- snd.send(c)
-
- # drain the queue, verify the messages are there and then close
- # without acking
- rcv = self.ssn.receiver(ACK_QC)
- self.drain(rcv, expected=contents)
- self.ssn.close()
-
- # drain the queue again, verify that they are all the messages
- # were requeued, and ack this time before closing
- self.ssn = self.conn.session()
- if ack_capacity is not None:
- self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver(ACK_QC)
- self.drain(rcv, expected=contents)
- acker(self.ssn)
- self.ssn.close()
-
- # drain the queue a final time and verify that the messages were
- # dequeued
- self.ssn = self.conn.session()
- rcv = self.ssn.receiver(ACK_QD)
- self.assertEmpty(rcv)
-
- def testAcknowledge(self):
- self.ackTest(lambda ssn: ssn.acknowledge())
-
- def testAcknowledgeAsync(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
-
- def testAcknowledgeAsyncAckCap0(self):
- try:
- try:
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
- assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
- except InsufficientCapacity:
- pass
- finally:
- self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver(ACK_QD))
- self.ssn.acknowledge()
-
- def testAcknowledgeAsyncAckCap1(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
-
- def testAcknowledgeAsyncAckCap5(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
-
- def testAcknowledgeAsyncAckCapUNLIMITED(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
-
- def testRelease(self):
- msgs = [self.message("testRelease", i) for i in range(3)]
- snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
- for m in msgs:
- snd.send(m)
- rcv = self.ssn.receiver(snd.target)
- echos = self.drain(rcv, expected=msgs)
- self.ssn.acknowledge(echos[0])
- self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
- self.ssn.acknowledge(echos[2], Disposition(RELEASED))
- self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
- self.drain(rcv, expected=msgs[2:3])
- self.ssn.acknowledge()
-
- def testReject(self):
- msgs = [self.message("testReject", i) for i in range(3)]
- snd = self.ssn.sender("""
- test-reject-queue; {
- create: always,
- delete: always,
- node: {
- x-declare: {
- alternate-exchange: 'amq.topic'
- }
- }
- }
-""")
- for m in msgs:
- snd.send(m)
- rcv = self.ssn.receiver(snd.target)
- rej = self.ssn.receiver("amq.topic")
- echos = self.drain(rcv, expected=msgs)
- self.ssn.acknowledge(echos[0])
- self.ssn.acknowledge(echos[1], Disposition(REJECTED))
- self.ssn.acknowledge(echos[2],
- Disposition(REJECTED, code=3, text="test-reject"))
- self.drain(rej, expected=msgs[1:])
- self.ssn.acknowledge()
-
- def send(self, ssn, target, base, count=1):
- snd = ssn.sender(target, durable=self.durable())
- messages = []
- for i in range(count):
- c = self.message(base, i)
- snd.send(c)
- messages.append(c)
- snd.close()
- return messages
-
- def txTest(self, commit):
- TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
- TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
- txssn = self.conn.session(transactional=True)
- messages = self.send(self.ssn, TX_Q, "txTest", 3)
- txrcv = txssn.receiver(TX_Q)
- txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
- rcv = self.ssn.receiver(txrcv.source)
- copy_rcv = self.ssn.receiver(txsnd.target)
- self.assertEmpty(copy_rcv)
- for i in range(3):
- m = txrcv.fetch(0)
- txsnd.send(m)
- self.assertEmpty(copy_rcv)
- txssn.acknowledge()
- if commit:
- txssn.commit()
- self.assertEmpty(rcv)
- self.drain(copy_rcv, expected=messages)
- else:
- txssn.rollback()
- self.drain(rcv, expected=messages, redelivered=True)
- self.assertEmpty(copy_rcv)
- self.ssn.acknowledge()
-
- def testCommit(self):
- self.txTest(True)
-
- def testRollback(self):
- self.txTest(False)
-
- def txTestSend(self, commit):
- TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
- txssn = self.conn.session(transactional=True)
- messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
- rcv = self.ssn.receiver(TX_SEND_Q)
- self.assertEmpty(rcv)
-
- if commit:
- txssn.commit()
- self.drain(rcv, expected=messages)
- self.ssn.acknowledge()
- else:
- txssn.rollback()
- self.assertEmpty(rcv)
- txssn.commit()
- self.assertEmpty(rcv)
-
- def testCommitSend(self):
- self.txTestSend(True)
-
- def testRollbackSend(self):
- self.txTestSend(False)
-
- def txTestAck(self, commit):
- TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
- TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
- txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_QC)
- self.assertEmpty(txrcv)
- messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
- self.drain(txrcv, expected=messages)
-
- if commit:
- txssn.acknowledge()
- else:
- txssn.rollback()
- self.drain(txrcv, expected=messages, redelivered=True)
- txssn.acknowledge()
- txssn.rollback()
- self.drain(txrcv, expected=messages, redelivered=True)
- txssn.commit() # commit without ack
- self.assertEmpty(txrcv)
-
- txssn.close()
-
- txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_QC)
- self.drain(txrcv, expected=messages, redelivered=True)
- txssn.acknowledge()
- txssn.commit()
- rcv = self.ssn.receiver(TX_ACK_QD)
- self.assertEmpty(rcv)
- txssn.close()
- self.assertEmpty(rcv)
-
- def testCommitAck(self):
- self.txTestAck(True)
-
- def testRollbackAck(self):
- self.txTestAck(False)
-
- def testDoubleCommit(self):
- ssn = self.conn.session(transactional=True)
- snd = ssn.sender("amq.direct")
- rcv = ssn.receiver("amq.direct")
- msgs = [self.message("testDoubleCommit", i) for i in range(3)]
- for m in msgs:
- snd.send(m)
- ssn.commit()
- self.drain(rcv, expected=msgs)
- ssn.acknowledge()
- ssn.commit()
-
- def testClose(self):
- self.ssn.close()
- try:
- self.ping(self.ssn)
- assert False, "ping succeeded"
- except Detached:
- pass
-
-RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
-
-class ReceiverTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(RECEIVER_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(RECEIVER_Q)
-
- def send(self, base, count = None, sync=True):
- content = self.content(base, count)
- self.snd.send(content, sync=sync)
- return content
-
- def testFetch(self):
- try:
- msg = self.rcv.fetch(0)
- assert False, "unexpected message: %s" % msg
- except Empty:
- pass
- try:
- start = time.time()
- msg = self.rcv.fetch(self.delay())
- assert False, "unexpected message: %s" % msg
- except Empty:
- elapsed = time.time() - start
- assert elapsed >= self.delay()
-
- one = self.send("testFetch", 1)
- two = self.send("testFetch", 2)
- three = self.send("testFetch", 3)
- msg = self.rcv.fetch(0)
- assert msg.content == one
- msg = self.rcv.fetch(self.delay())
- assert msg.content == two
- msg = self.rcv.fetch()
- assert msg.content == three
- self.ssn.acknowledge()
-
- def fetchFromClosedTest(self, entry):
- entry.close()
- try:
- msg = self.rcv.fetch(0)
- assert False, "unexpected result: %s" % msg
- except Empty, e:
- assert False, "unexpected exception: %s" % e
- except LinkClosed, e:
- pass
-
- def testFetchFromClosedReceiver(self):
- self.fetchFromClosedTest(self.rcv)
-
- def testFetchFromClosedSession(self):
- self.fetchFromClosedTest(self.ssn)
-
- def testFetchFromClosedConnection(self):
- self.fetchFromClosedTest(self.conn)
-
- def fetchFromConcurrentCloseTest(self, entry):
- def closer():
- self.sleep()
- entry.close()
- t = Thread(target=closer)
- t.start()
- try:
- msg = self.rcv.fetch()
- assert False, "unexpected result: %s" % msg
- except Empty, e:
- assert False, "unexpected exception: %s" % e
- except LinkClosed, e:
- pass
- t.join()
-
- def testFetchFromConcurrentCloseReceiver(self):
- self.fetchFromConcurrentCloseTest(self.rcv)
-
- def testFetchFromConcurrentCloseSession(self):
- self.fetchFromConcurrentCloseTest(self.ssn)
-
- def testFetchFromConcurrentCloseConnection(self):
- self.fetchFromConcurrentCloseTest(self.conn)
-
- def testCapacityIncrease(self):
- content = self.send("testCapacityIncrease")
- self.sleep()
- assert self.rcv.available() == 0
- self.rcv.capacity = UNLIMITED
- self.sleep()
- assert self.rcv.available() == 1
- msg = self.rcv.fetch(0)
- assert msg.content == content
- assert self.rcv.available() == 0
- self.ssn.acknowledge()
-
- def testCapacityDecrease(self):
- self.rcv.capacity = UNLIMITED
- one = self.send("testCapacityDecrease", 1)
- self.sleep()
- assert self.rcv.available() == 1
- msg = self.rcv.fetch(0)
- assert msg.content == one
-
- self.rcv.capacity = 0
-
- two = self.send("testCapacityDecrease", 2)
- self.sleep()
- assert self.rcv.available() == 0
- msg = self.rcv.fetch(0)
- assert msg.content == two
-
- self.ssn.acknowledge()
-
- def capacityTest(self, capacity, threshold=None):
- if threshold is not None:
- self.rcv.threshold = threshold
- self.rcv.capacity = capacity
- self.assertAvailable(self.rcv, 0)
-
- for i in range(2*capacity):
- self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False)
- self.snd.sync()
- self.sleep()
- self.assertAvailable(self.rcv)
-
- first = capacity/2
- second = capacity - first
- self.drain(self.rcv, limit = first)
- self.sleep()
- self.assertAvailable(self.rcv)
- self.drain(self.rcv, limit = second)
- self.sleep()
- self.assertAvailable(self.rcv)
-
- drained = self.drain(self.rcv)
- assert len(drained) == capacity, "%s, %s" % (len(drained), drained)
- self.assertAvailable(self.rcv, 0)
-
- self.ssn.acknowledge()
-
- def testCapacity5(self):
- self.capacityTest(5)
-
- def testCapacity5Threshold1(self):
- self.capacityTest(5, 1)
-
- def testCapacity10(self):
- self.capacityTest(10)
-
- def testCapacity10Threshold1(self):
- self.capacityTest(10, 1)
-
- def testCapacity100(self):
- self.capacityTest(100)
-
- def testCapacity100Threshold1(self):
- self.capacityTest(100, 1)
-
- def testCapacityUNLIMITED(self):
- self.rcv.capacity = UNLIMITED
- self.assertAvailable(self.rcv, 0)
-
- for i in range(10):
- self.send("testCapacityUNLIMITED", i)
- self.sleep()
- self.assertAvailable(self.rcv, 10)
-
- self.drain(self.rcv)
- self.assertAvailable(self.rcv, 0)
-
- self.ssn.acknowledge()
-
- def testAvailable(self):
- self.rcv.capacity = UNLIMITED
- assert self.rcv.available() == 0
-
- for i in range(3):
- self.send("testAvailable", i)
- self.sleep()
- assert self.rcv.available() == 3
-
- for i in range(3, 10):
- self.send("testAvailable", i)
- self.sleep()
- assert self.rcv.available() == 10
-
- self.drain(self.rcv, limit=3)
- assert self.rcv.available() == 7
-
- self.drain(self.rcv)
- assert self.rcv.available() == 0
-
- self.ssn.acknowledge()
-
- def testDoubleClose(self):
- m1 = self.content("testDoubleClose", 1)
- m2 = self.content("testDoubleClose", 2)
-
- snd = self.ssn.sender("""test-double-close; {
- create: always,
- delete: sender,
- node: {
- type: topic
- }
-}
-""")
- r1 = self.ssn.receiver(snd.target)
- r2 = self.ssn.receiver(snd.target)
- snd.send(m1)
- self.drain(r1, expected=[m1])
- self.drain(r2, expected=[m1])
- r1.close()
- snd.send(m2)
- self.drain(r2, expected=[m2])
- r2.close()
-
- # XXX: need testClose
-
- def testMode(self):
- msgs = [self.content("testMode", 1),
- self.content("testMode", 2),
- self.content("testMode", 3)]
-
- for m in msgs:
- self.snd.send(m)
-
- rb = self.ssn.receiver('test-receiver-queue; {mode: browse}')
- rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
- self.drain(rb, expected=msgs)
- self.drain(rc, expected=msgs)
- rb2 = self.ssn.receiver(rb.source)
- self.assertEmpty(rb2)
- self.drain(self.rcv, expected=[])
-
- # XXX: need testUnsettled()
-
- def unreliabilityTest(self, mode="unreliable"):
- msgs = [self.message("testUnreliable", i) for i in range(3)]
- snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}")
- rcv = self.ssn.receiver(snd.target)
- for m in msgs:
- snd.send(m)
-
- # close without ack on reliable receiver, messages should be requeued
- ssn = self.conn.session()
- rrcv = ssn.receiver("test-unreliability-queue")
- self.drain(rrcv, expected=msgs)
- ssn.close()
-
- # close without ack on unreliable receiver, messages should not be requeued
- ssn = self.conn.session()
- urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode)
- self.drain(urcv, expected=msgs, redelivered=True)
- ssn.close()
-
- self.assertEmpty(rcv)
-
- def testUnreliable(self):
- self.unreliabilityTest(mode="unreliable")
-
- def testAtMostOnce(self):
- self.unreliabilityTest(mode="at-most-once")
-
-class AddressTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def badOption(self, options, error):
- try:
- self.ssn.sender("test-bad-options-snd; %s" % options)
- assert False
- except InvalidOption, e:
- assert "error in options: %s" % error == str(e), e
-
- try:
- self.ssn.receiver("test-bad-options-rcv; %s" % options)
- assert False
- except InvalidOption, e:
- assert "error in options: %s" % error == str(e), e
-
- def testIllegalKey(self):
- self.badOption("{create: always, node: "
- "{this-property-does-not-exist: 3}}",
- "node: this-property-does-not-exist: "
- "illegal key")
-
- def testWrongValue(self):
- self.badOption("{create: asdf}", "create: asdf not in "
- "('always', 'sender', 'receiver', 'never')")
-
- def testWrongType1(self):
- self.badOption("{node: asdf}",
- "node: asdf is not a map")
-
- def testWrongType2(self):
- self.badOption("{node: {durable: []}}",
- "node: durable: [] is not a bool")
-
- def testCreateQueue(self):
- snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
- "node: {type: queue, durable: False, "
- "x-declare: {auto_delete: true}}}")
- content = self.content("testCreateQueue")
- snd.send(content)
- rcv = self.ssn.receiver("test-create-queue")
- self.drain(rcv, expected=[content])
-
- def createExchangeTest(self, props=""):
- addr = """test-create-exchange; {
- create: always,
- delete: always,
- node: {
- type: topic,
- durable: False,
- x-declare: {auto_delete: true, %s}
- }
- }""" % props
- snd = self.ssn.sender(addr)
- snd.send("ping")
- rcv1 = self.ssn.receiver("test-create-exchange/first")
- rcv2 = self.ssn.receiver("test-create-exchange/first")
- rcv3 = self.ssn.receiver("test-create-exchange/second")
- for r in (rcv1, rcv2, rcv3):
- try:
- r.fetch(0)
- assert False
- except Empty:
- pass
- msg1 = Message(self.content("testCreateExchange", 1), subject="first")
- msg2 = Message(self.content("testCreateExchange", 2), subject="second")
- snd.send(msg1)
- snd.send(msg2)
- self.drain(rcv1, expected=[msg1.content])
- self.drain(rcv2, expected=[msg1.content])
- self.drain(rcv3, expected=[msg2.content])
-
- def testCreateExchange(self):
- self.createExchangeTest()
-
- def testCreateExchangeDirect(self):
- self.createExchangeTest("type: direct")
-
- def testCreateExchangeTopic(self):
- self.createExchangeTest("type: topic")
-
- def testDeleteBySender(self):
- snd = self.ssn.sender("test-delete; {create: always}")
- snd.send("ping")
- snd.close()
- snd = self.ssn.sender("test-delete; {delete: always}")
- snd.send("ping")
- snd.close()
- try:
- self.ssn.sender("test-delete")
- except NotFound, e:
- assert "no such queue" in str(e)
-
- def testDeleteByReceiver(self):
- rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
- try:
- rcv.fetch(0)
- except Empty:
- pass
- rcv.close()
-
- try:
- self.ssn.receiver("test-delete")
- assert False
- except NotFound, e:
- assert "no such queue" in str(e)
-
- def testDeleteSpecial(self):
- snd = self.ssn.sender("amq.topic; {delete: always}")
- snd.send("asdf")
- try:
- snd.close()
- assert False, "successfully deleted amq.topic"
- except SessionError, e:
- assert "Cannot delete default exchange" in str(e)
- # XXX: need to figure out close after error
- self.conn._remove_session(self.ssn)
-
- def testNodeBindingsQueue(self):
- snd = self.ssn.sender("""
-test-node-bindings-queue; {
- create: always,
- delete: always,
- node: {
- x-bindings: [{exchange: "amq.topic", key: "a.#"},
- {exchange: "amq.direct", key: "b"},
- {exchange: "amq.topic", key: "c.*"}]
- }
-}
-""")
- snd.send("one")
- snd_a = self.ssn.sender("amq.topic/a.foo")
- snd_b = self.ssn.sender("amq.direct/b")
- snd_c = self.ssn.sender("amq.topic/c.bar")
- snd_a.send("two")
- snd_b.send("three")
- snd_c.send("four")
- rcv = self.ssn.receiver("test-node-bindings-queue")
- self.drain(rcv, expected=["one", "two", "three", "four"])
-
- def testNodeBindingsTopic(self):
- rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
- rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
- rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
- rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
- snd = self.ssn.sender("""
-test-node-bindings-topic; {
- create: always,
- delete: always,
- node: {
- type: topic,
- x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
- {queue: test-node-bindings-topic-queue-a, key: "a.#"},
- {queue: test-node-bindings-topic-queue-b, key: "b"},
- {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
- }
-}
-""")
- m1 = Message("one")
- m2 = Message(subject="a.foo", content="two")
- m3 = Message(subject="b", content="three")
- m4 = Message(subject="c.bar", content="four")
- snd.send(m1)
- snd.send(m2)
- snd.send(m3)
- snd.send(m4)
- self.drain(rcv, expected=[m1, m2, m3, m4])
- self.drain(rcv_a, expected=[m2])
- self.drain(rcv_b, expected=[m3])
- self.drain(rcv_c, expected=[m4])
-
- def testLinkBindings(self):
- m_a = self.message("testLinkBindings", 1, subject="a")
- m_b = self.message("testLinkBindings", 2, subject="b")
-
- self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
- snd = self.ssn.sender("amq.topic")
-
- snd.send(m_a)
- snd.send(m_b)
- snd.close()
-
- rcv = self.ssn.receiver("test-link-bindings-queue")
- self.assertEmpty(rcv)
-
- snd = self.ssn.sender("""
-amq.topic; {
- link: {
- x-bindings: [{queue: test-link-bindings-queue, key: a}]
- }
-}
-""")
-
- snd.send(m_a)
- snd.send(m_b)
-
- self.drain(rcv, expected=[m_a])
- rcv.close()
-
- rcv = self.ssn.receiver("""
-test-link-bindings-queue; {
- link: {
- x-bindings: [{exchange: "amq.topic", key: b}]
- }
-}
-""")
-
- snd.send(m_a)
- snd.send(m_b)
-
- self.drain(rcv, expected=[m_a, m_b])
-
- def testSubjectOverride(self):
- snd = self.ssn.sender("amq.topic/a")
- rcv_a = self.ssn.receiver("amq.topic/a")
- rcv_b = self.ssn.receiver("amq.topic/b")
- m1 = self.content("testSubjectOverride", 1)
- m2 = self.content("testSubjectOverride", 2)
- snd.send(m1)
- snd.send(Message(subject="b", content=m2))
- self.drain(rcv_a, expected=[m1])
- self.drain(rcv_b, expected=[m2])
-
- def testSubjectDefault(self):
- m1 = self.content("testSubjectDefault", 1)
- m2 = self.content("testSubjectDefault", 2)
- snd = self.ssn.sender("amq.topic/a")
- rcv = self.ssn.receiver("amq.topic")
- snd.send(m1)
- snd.send(Message(subject="b", content=m2))
- e1 = rcv.fetch(timeout=0)
- e2 = rcv.fetch(timeout=0)
- assert e1.subject == "a", "subject: %s" % e1.subject
- assert e2.subject == "b", "subject: %s" % e2.subject
- self.assertEmpty(rcv)
-
- def doReliabilityTest(self, reliability, messages, expected):
- snd = self.ssn.sender("amq.topic")
- rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
- for m in messages:
- snd.send(m)
- self.conn.detach()
- self.conn.attach()
- self.drain(rcv, expected=expected)
-
- def testReliabilityUnreliable(self):
- msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
- self.doReliabilityTest("unreliable", msgs, [])
-
- def testReliabilityAtLeastOnce(self):
- msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
- self.doReliabilityTest("at-least-once", msgs, msgs)
-
- def testLinkName(self):
- msgs = [self.message("testLinkName", i) for i in range(3)]
- snd = self.ssn.sender("amq.topic")
- trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
- qrcv = self.ssn.receiver("test-link-name")
- for m in msgs:
- snd.send(m)
- self.drain(qrcv, expected=msgs)
-
- def testAssert1(self):
- try:
- snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}")
- assert 0, "assertion failed to trigger"
- except AssertionFailed, e:
- pass
-
- def testAssert2(self):
- snd = self.ssn.sender("amq.topic; {assert: always}")
-
-NOSUCH_Q = "this-queue-should-not-exist"
-UNPARSEABLE_ADDR = "name/subject; {bad options"
-UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
-
-class AddressErrorTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def senderErrorTest(self, addr, exc, check=lambda e: True):
- try:
- self.ssn.sender(addr, durable=self.durable())
- assert False, "sender creation succeeded"
- except exc, e:
- assert check(e), "unexpected error: %s" % compat.format_exc(e)
-
- def receiverErrorTest(self, addr, exc, check=lambda e: True):
- try:
- self.ssn.receiver(addr)
- assert False, "receiver creation succeeded"
- except exc, e:
- assert check(e), "unexpected error: %s" % compat.format_exc(e)
-
- def testNoneTarget(self):
- self.senderErrorTest(None, MalformedAddress)
-
- def testNoneSource(self):
- self.receiverErrorTest(None, MalformedAddress)
-
- def testNoTarget(self):
- self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
-
- def testNoSource(self):
- self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
-
- def testUnparseableTarget(self):
- self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
- lambda e: "expecting COLON" in str(e))
-
- def testUnparseableSource(self):
- self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
- lambda e: "expecting COLON" in str(e))
-
- def testUnlexableTarget(self):
- self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress,
- lambda e: "unrecognized characters" in str(e))
-
- def testUnlexableSource(self):
- self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress,
- lambda e: "unrecognized characters" in str(e))
-
- def testInvalidMode(self):
- self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
- InvalidOption,
- lambda e: "not in ('browse', 'consume')" in str(e))
-
-SENDER_Q = 'test-sender-q; {create: always, delete: always}'
-
-class SenderTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(SENDER_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(SENDER_Q)
-
- def checkContent(self, content):
- self.snd.send(content)
- msg = self.rcv.fetch(0)
- assert msg.content == content
-
- out = Message(content)
- self.snd.send(out)
- echo = self.rcv.fetch(0)
- assert out.content == echo.content
- assert echo.content == msg.content
- self.ssn.acknowledge()
-
- def testSendString(self):
- self.checkContent(self.content("testSendString"))
-
- def testSendList(self):
- self.checkContent(["testSendList", 1, 3.14, self.test_id])
-
- def testSendMap(self):
- self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
-
- def asyncTest(self, capacity):
- self.snd.capacity = capacity
- msgs = [self.content("asyncTest", i) for i in range(15)]
- for m in msgs:
- self.snd.send(m, sync=False)
- self.drain(self.rcv, timeout=self.delay(), expected=msgs)
- self.ssn.acknowledge()
-
- def testSendAsyncCapacity0(self):
- try:
- self.asyncTest(0)
- assert False, "send shouldn't succeed with zero capacity"
- except InsufficientCapacity:
- # this is expected
- pass
-
- def testSendAsyncCapacity1(self):
- self.asyncTest(1)
-
- def testSendAsyncCapacity5(self):
- self.asyncTest(5)
-
- def testSendAsyncCapacityUNLIMITED(self):
- self.asyncTest(UNLIMITED)
-
- def testCapacityTimeout(self):
- self.snd.capacity = 1
- msgs = []
- caught = False
- while len(msgs) < 100:
- m = self.content("testCapacity", len(msgs))
- try:
- self.snd.send(m, sync=False, timeout=0)
- msgs.append(m)
- except InsufficientCapacity:
- caught = True
- break
- self.snd.sync()
- self.drain(self.rcv, expected=msgs)
- self.ssn.acknowledge()
- assert caught, "did not exceed capacity"