diff options
Diffstat (limited to 'tests')
-rwxr-xr-x | tests/setup.py | 2 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/management.py | 43 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/msg_groups.py | 72 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/new_api.py | 5 |
4 files changed, 113 insertions, 9 deletions
diff --git a/tests/setup.py b/tests/setup.py index b8e952998d..36e7a0531a 100755 --- a/tests/setup.py +++ b/tests/setup.py @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-tests", - version="0.15", + version="0.17", author="Apache Qpid", author_email="dev@qpid.apache.org", packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9", diff --git a/tests/src/py/qpid_tests/broker_0_10/management.py b/tests/src/py/qpid_tests/broker_0_10/management.py index 867210b11d..2dd2291b2e 100644 --- a/tests/src/py/qpid_tests/broker_0_10/management.py +++ b/tests/src/py/qpid_tests/broker_0_10/management.py @@ -24,8 +24,17 @@ from threading import Condition from time import sleep import qmf.console import qpid.messaging +from qpidtoollibs import BrokerAgent class ManagementTest (TestBase010): + + def setup_access(self): + if 'broker_agent' not in self.__dict__: + self.conn2 = qpid.messaging.Connection(self.broker) + self.conn2.open() + self.broker_agent = BrokerAgent(self.conn2) + return self.broker_agent + """ Tests for the management hooks """ @@ -376,6 +385,30 @@ class ManagementTest (TestBase010): # Cleanup for e in ["A", "B"]: session.exchange_delete(exchange=e) + def test_reroute_invalid_alt_exchange(self): + """ + Test that an error is returned for an attempt to reroute to + alternate exchange on a queue for which no such exchange has + been defined. + """ + self.startQmf() + session = self.session + # create queue with no alt-exchange, and send a message to it + session.queue_declare(queue="q", exclusive=True, auto_delete=True) + props = session.delivery_properties(routing_key="q") + session.message_transfer(message=Message(props, "don't reroute me!")) + + # attempt to reroute the message to alt-exchange + q = self.qmf.getObjects(_class="queue", name="q")[0] + result = q.reroute(1, True, "", {}) + # verify the attempt fails... + self.assertEqual(result.status, 4) #invalid parameter + + # ...and message is still on the queue + self.subscribe(destination="d", queue="q") + self.assertEqual("don't reroute me!", session.incoming("d").get(timeout=1).body) + + def test_methods_async (self): """ """ @@ -559,12 +592,18 @@ class ManagementTest (TestBase010): """ Test message in/out stats for connection """ - self.startQmf() + agent = self.setup_access() conn = self.connect() session = conn.session("stats-session") #using qmf find named session and the corresponding connection: - conn_qmf = self.qmf.getObjects(_class="session", name="stats-session")[0]._connectionRef_ + conn_qmf = None + sessions = agent.getAllSessions() + for s in sessions: + if s.name == "stats-session": + conn_qmf = agent.getConnection(s.connectionRef) + + assert(conn_qmf) #send a message to a queue session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True) diff --git a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py index 938d3b3ee2..ace7611a2f 100644 --- a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py +++ b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py @@ -193,6 +193,10 @@ class MultiConsumerMsgGroupTests(Base): s2 = self.setup_session() b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) + m2 = b1.fetch(0); + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 0 + # C1 should acquire A-0 m1 = c1.fetch(0); @@ -202,10 +206,6 @@ class MultiConsumerMsgGroupTests(Base): ## Queue = A-0, B-1, A-2, b-3, C-4 ## Owners= ^C1, ---, +C1, ---, --- - m2 = b1.fetch(0); - assert m2.properties['THE-GROUP'] == 'A' - assert m2.content['index'] == 0 - m2 = b1.fetch(0) assert m2.properties['THE-GROUP'] == 'B' assert m2.content['index'] == 1 @@ -1122,6 +1122,70 @@ class MultiConsumerMsgGroupTests(Base): snd.close() + def test_ttl_expire(self): + """ Verify that expired (TTL) group messages are skipped correctly + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") + + groups = ["A","B","C","A","B","C"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + if m.properties['THE-GROUP'] == 'B': + m.ttl = 1; + snd.send(m) + + sleep(2) # let all B's expire + + # create consumers on separate sessions: C1,C2 + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":0}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":0}) + + # C1 should acquire A-0, then C2 should acquire C-2, Group B should + # expire and never be fetched + + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + m2 = c2.fetch(0); + assert m2.properties['THE-GROUP'] == 'C' + assert m2.content['index'] == 2 + + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 3 + + m2 = c2.fetch(0); + assert m2.properties['THE-GROUP'] == 'C' + assert m2.content['index'] == 5 + + # there should be no more left for either consumer + try: + mx = c1.fetch(0) + assert False # should never get here + except Empty: + pass + try: + mx = c2.fetch(0) + assert False # should never get here + except Empty: + pass + + c1.session.acknowledge() + c2.session.acknowledge() + c1.close() + c2.close() + snd.close() + + class StickyConsumerMsgGroupTests(Base): """ Tests for the behavior of sticky-consumer message groups. These tests diff --git a/tests/src/py/qpid_tests/broker_0_10/new_api.py b/tests/src/py/qpid_tests/broker_0_10/new_api.py index 0bbcf342bc..05c4815e57 100644 --- a/tests/src/py/qpid_tests/broker_0_10/new_api.py +++ b/tests/src/py/qpid_tests/broker_0_10/new_api.py @@ -57,7 +57,7 @@ class GeneralTests(Base): sess2 = self.setup_session() tx = sess1.sender("amq.direct/key") - rx_main = sess1.receiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}") + rx_main = sess1.receiver("amq.direct/key;{link:{reliability:at-least-once,x-declare:{alternate-exchange:'amq.fanout'}}}") rx_alt = sess2.receiver("amq.fanout") rx_alt.capacity = 10 @@ -74,7 +74,7 @@ class GeneralTests(Base): self.assertEqual(rx_alt.available(), 0, "No messages should have been routed to the alt_exchange") sess1.close() - + sleep(1) self.assertEqual(rx_alt.available(), 5, "All 5 messages should have been routed to the alt_exchange") sess2.close() @@ -108,6 +108,7 @@ class GeneralTests(Base): # Close sess1; This will cause the queue to be deleted sess1.close() + sleep(1) self.assertEqual(rx_alt.available(), 2, "2 of the messages should have been routed to the alt_exchange") # Close sess2; This will cause the acquired messages to be requeued and routed to the alternate |