diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
commit | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch) | |
tree | dcfb94e75656c6c239fc3dcb754cd2015126424d /tests | |
parent | 5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff) | |
download | qpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz |
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tests')
-rwxr-xr-x | tests/setup.py | 2 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/__init__.py | 1 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py | 125 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/dtx.py | 102 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/exchange.py | 31 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/extensions.py | 58 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/management.py | 154 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/message.py | 165 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/msg_groups.py | 1077 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/priority.py | 20 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/threshold.py | 15 |
11 files changed, 117 insertions, 1633 deletions
diff --git a/tests/setup.py b/tests/setup.py index 2ea7d347e7..5438275b22 100755 --- a/tests/setup.py +++ b/tests/setup.py @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-tests", - version="0.13", + version="0.9", author="Apache Qpid", author_email="dev@qpid.apache.org", packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9", diff --git a/tests/src/py/qpid_tests/broker_0_10/__init__.py b/tests/src/py/qpid_tests/broker_0_10/__init__.py index 7b779df5f4..921786af22 100644 --- a/tests/src/py/qpid_tests/broker_0_10/__init__.py +++ b/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -33,4 +33,3 @@ from lvq import * from priority import * from threshold import * from extensions import * -from msg_groups import * diff --git a/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py index 8cbb5793d9..0ffeb57172 100644 --- a/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py +++ b/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py @@ -18,7 +18,7 @@ # import traceback from qpid.queue import Empty -from qpid.datatypes import Message, RangedSet +from qpid.datatypes import Message from qpid.testlib import TestBase010 from qpid.session import SessionException @@ -77,7 +77,13 @@ class AlternateExchangeTests(TestBase010): """ session = self.session #set up a 'dead letter queue': - dlq = self.setup_dlq() + session.exchange_declare(exchange="dlq", type="fanout") + session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="dlq", queue="deleted") + session.message_subscribe(destination="dlq", queue="deleted") + session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + dlq = session.incoming("dlq") #create a queue using the dlq as its alternate exchange: session.queue_declare(queue="delete-me", alternate_exchange="dlq") @@ -230,121 +236,6 @@ class AlternateExchangeTests(TestBase010): self.assertEqual("Three", dlq.get(timeout=1).body) self.assertEmpty(dlq) - def test_queue_delete_loop(self): - """ - Test that if a queue is bound to its own alternate exchange, - then on deletion there is no infinite looping - """ - session = self.session - dlq = self.setup_dlq() - - #create a queue using the dlq as its alternate exchange: - session.queue_declare(queue="delete-me", alternate_exchange="dlq") - #bind that queue to the dlq as well: - session.exchange_bind(exchange="dlq", queue="delete-me") - #send it some messages: - dp=self.session.delivery_properties(routing_key="delete-me") - for m in ["One", "Two", "Three"]: - session.message_transfer(message=Message(dp, m)) - #delete it: - session.queue_delete(queue="delete-me") - #cleanup: - session.exchange_delete(exchange="dlq") - - #check the messages were delivered to the dlq: - for m in ["One", "Two", "Three"]: - self.assertEqual(m, dlq.get(timeout=1).body) - self.assertEmpty(dlq) - - def test_queue_delete_no_match(self): - """ - Test that on queue deletion, if the queues own alternate - exchange cannot find a match for the message, the - alternate-exchange of that exchange will be tried. Note: - though the spec rules out going to the alternate-exchanges - alternate exchange when sending to an exchange, it does not - cover this case. - """ - session = self.session - dlq = self.setup_dlq() - - #setu up an 'intermediary' exchange - session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq") - - #create a queue using the intermediary as its alternate exchange: - session.queue_declare(queue="delete-me", alternate_exchange="my-exchange") - #bind that queue to the dlq as well: - session.exchange_bind(exchange="dlq", queue="delete-me") - #send it some messages: - dp=self.session.delivery_properties(routing_key="delete-me") - for m in ["One", "Two", "Three"]: - session.message_transfer(message=Message(dp, m)) - - #delete it: - session.queue_delete(queue="delete-me") - #cleanup: - session.exchange_delete(exchange="my-exchange") - session.exchange_delete(exchange="dlq") - - #check the messages were delivered to the dlq: - for m in ["One", "Two", "Three"]: - self.assertEqual(m, dlq.get(timeout=1).body) - self.assertEmpty(dlq) - - def test_reject_no_match(self): - """ - Test that on rejecting a message, if the queues own alternate - exchange cannot find a match for the message, the - alternate-exchange of that exchange will be tried. Note: - though the spec rules out going to the alternate-exchanges - alternate exchange when sending to an exchange, it does not - cover this case. - """ - session = self.session - dlq = self.setup_dlq() - - #setu up an 'intermediary' exchange - session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq") - - #create a queue using the intermediary as its alternate exchange: - session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True) - #bind that queue to the dlq as well: - session.exchange_bind(exchange="dlq", queue="delivery-queue") - #send it some messages: - dp=self.session.delivery_properties(routing_key="delivery-queue") - for m in ["One", "Two", "Three"]: - session.message_transfer(message=Message(dp, m)) - - #get and reject those messages: - session.message_subscribe(destination="a", queue="delivery-queue") - session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - incoming = session.incoming("a") - for m in ["One", "Two", "Three"]: - msg = incoming.get(timeout=1) - self.assertEqual(m, msg.body) - session.message_reject(RangedSet(msg.id)) - session.message_cancel(destination="a") - - #check the messages were delivered to the dlq: - for m in ["One", "Two", "Three"]: - self.assertEqual(m, dlq.get(timeout=1).body) - self.assertEmpty(dlq) - #cleanup: - session.exchange_delete(exchange="my-exchange") - session.exchange_delete(exchange="dlq") - - def setup_dlq(self): - session = self.session - #set up 'dead-letter' handling: - session.exchange_declare(exchange="dlq", type="fanout") - session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="dlq", queue="deleted") - session.message_subscribe(destination="dlq", queue="deleted") - session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - dlq = session.incoming("dlq") - return dlq def assertEmpty(self, queue): try: diff --git a/tests/src/py/qpid_tests/broker_0_10/dtx.py b/tests/src/py/qpid_tests/broker_0_10/dtx.py index 19a5c6a8d9..2823385a3b 100644 --- a/tests/src/py/qpid_tests/broker_0_10/dtx.py +++ b/tests/src/py/qpid_tests/broker_0_10/dtx.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,7 +36,7 @@ class DtxTests(TestBase010): and the appropriate result verified. The other tests enforce more specific rules and behaviour on a - per-method or per-field basis. + per-method or per-field basis. """ XA_RBROLLBACK = 1 @@ -49,8 +49,8 @@ class DtxTests(TestBase010): self.session = self.conn.session("dtx-session", 1) def test_simple_commit(self): - """ - Test basic one-phase commit behaviour. + """ + Test basic one-phase commit behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session @@ -73,8 +73,8 @@ class DtxTests(TestBase010): self.assertMessageId("commit", "queue-b") def test_simple_prepare_commit(self): - """ - Test basic two-phase commit behaviour. + """ + Test basic two-phase commit behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session @@ -100,8 +100,8 @@ class DtxTests(TestBase010): def test_simple_rollback(self): - """ - Test basic rollback behaviour. + """ + Test basic rollback behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session @@ -123,8 +123,8 @@ class DtxTests(TestBase010): self.assertMessageId("rollback", "queue-a") def test_simple_prepare_rollback(self): - """ - Test basic rollback behaviour after the transaction has been prepared. + """ + Test basic rollback behaviour after the transaction has been prepared. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session @@ -146,18 +146,18 @@ class DtxTests(TestBase010): #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") - self.assertMessageId("prepare-rollback", "queue-a") + self.assertMessageId("prepare-rollback", "queue-a") def test_select_required(self): """ check that an error is flagged if select is not issued before - start or end + start or end """ session = self.session tx = self.xid("dummy") try: session.dtx_start(xid=tx) - + #if we get here we have failed, but need to do some cleanup: session.dtx_end(xid=tx) session.dtx_rollback(xid=tx) @@ -197,10 +197,10 @@ class DtxTests(TestBase010): other.close() session1.dtx_end(xid=tx) session1.dtx_rollback(xid=tx) - + #verification: if failed: self.assertEquals(530, error.args[0].error_code) - else: self.fail("Xid already known, expected exception!") + else: self.fail("Xid already known, expected exception!") def test_forget_xid_on_completion(self): """ @@ -210,8 +210,8 @@ class DtxTests(TestBase010): #do some transactional work & complete the transaction self.test_simple_commit() # session has been reset, so reselect for use with dtx - self.session.dtx_select() - + self.session.dtx_select() + #start association for the same xid as the previously completed txn tx = self.xid("my-xid") self.session.dtx_start(xid=tx) @@ -237,9 +237,9 @@ class DtxTests(TestBase010): self.assertEquals(503, e.args[0].error_code) def test_start_join(self): - """ + """ Verify 'join' behaviour, where a session is associated with a - transaction that is already associated with another session. + transaction that is already associated with another session. """ guard = self.keepQueuesAlive(["one", "two"]) #create two sessions & select them for use with dtx: @@ -269,14 +269,14 @@ class DtxTests(TestBase010): #mark end on both sessions session1.dtx_end(xid=tx) session2.dtx_end(xid=tx) - + #commit and check session1.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") self.assertMessageId("b", "one") - + def test_suspend_resume(self): """ @@ -300,7 +300,7 @@ class DtxTests(TestBase010): session.dtx_start(xid=tx, resume=True) self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' session.dtx_end(xid=tx) - + #commit and check session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") @@ -308,7 +308,7 @@ class DtxTests(TestBase010): self.assertMessageId("a", "two") self.assertMessageId("b", "one") - def test_suspend_start_end_resume(self): + def test_suspend_start_end_resume(self): """ Test suspension and resumption of an association with work done on another transaction when the first transaction is @@ -332,7 +332,7 @@ class DtxTests(TestBase010): session.dtx_start(xid=tx, resume=True) self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' session.dtx_end(xid=tx) - + #commit and check session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") @@ -341,10 +341,10 @@ class DtxTests(TestBase010): self.assertMessageId("b", "one") def test_end_suspend_and_fail(self): - """ + """ Verify that the correct error is signalled if the suspend and fail flag are both set when disassociating a transaction from - the session + the session """ session = self.session session.dtx_select() @@ -356,16 +356,16 @@ class DtxTests(TestBase010): except SessionException, e: self.assertEquals(503, e.args[0].error_code) - #cleanup + #cleanup other = self.connect() session = other.session("cleanup", 1) session.dtx_rollback(xid=tx) session.close() other.close() - + def test_end_unknown_xid(self): - """ + """ Verifies that the correct exception is thrown when an attempt is made to end the association for a xid not previously associated with the session @@ -382,7 +382,7 @@ class DtxTests(TestBase010): def test_end(self): """ Verify that the association is terminated by end and subsequent - operations are non-transactional + operations are non-transactional """ guard = self.keepQueuesAlive(["tx-queue"]) session = self.conn.session("alternate", 1) @@ -408,7 +408,7 @@ class DtxTests(TestBase010): session.message_accept(RangedSet(msg.id)) session.close() - session = self.session + session = self.session #commit the transaction and check that the first message (and #only the first message) is then delivered session.dtx_commit(xid=tx, one_phase=True) @@ -418,7 +418,7 @@ class DtxTests(TestBase010): def test_invalid_commit_one_phase_true(self): """ Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. + transaction in question has already been prepared. """ other = self.connect() tester = other.session("tester", 1) @@ -447,7 +447,7 @@ class DtxTests(TestBase010): def test_invalid_commit_one_phase_false(self): """ Test that a commit with one_phase = False is rejected if the - transaction in question has not yet been prepared. + transaction in question has not yet been prepared. """ other = self.connect() tester = other.session("tester", 1) @@ -474,7 +474,7 @@ class DtxTests(TestBase010): def test_invalid_commit_not_ended(self): """ - Test that a commit fails if the xid is still associated with a session. + Test that a commit fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) @@ -502,7 +502,7 @@ class DtxTests(TestBase010): def test_invalid_rollback_not_ended(self): """ - Test that a rollback fails if the xid is still associated with a session. + Test that a rollback fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) @@ -531,7 +531,7 @@ class DtxTests(TestBase010): def test_invalid_prepare_not_ended(self): """ - Test that a prepare fails if the xid is still associated with a session. + Test that a prepare fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) @@ -586,9 +586,9 @@ class DtxTests(TestBase010): session1.dtx_rollback(xid=tx) def test_get_timeout(self): - """ + """ Check that get-timeout returns the correct value, (and that a - transaction with a timeout can complete normally) + transaction with a timeout can complete normally) """ session = self.session tx = self.xid("dummy") @@ -599,12 +599,12 @@ class DtxTests(TestBase010): session.dtx_set_timeout(xid=tx, timeout=60) self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) - self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) - + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) + def test_set_timeout(self): - """ + """ Test the timeout of a transaction results in the expected - behaviour + behaviour """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) @@ -627,7 +627,7 @@ class DtxTests(TestBase010): self.assertMessageId("timeout", "queue-a") #check the correct codes are returned when we try to complete the txn self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) - self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) @@ -649,20 +649,20 @@ class DtxTests(TestBase010): if i in [2, 5, 6, 8]: session.dtx_prepare(xid=tx) prepared.append(tx) - else: + else: session.dtx_rollback(xid=tx) xids = session.dtx_recover().in_doubt - + #rollback the prepared transactions returned by recover for x in xids: - session.dtx_rollback(xid=x) + session.dtx_rollback(xid=x) #validate against the expected list of prepared transactions actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these expected = set([x.global_id for x in prepared]) intersection = actual.intersection(expected) - + if intersection != expected: missing = expected.difference(actual) extra = actual.difference(expected) @@ -723,7 +723,7 @@ class DtxTests(TestBase010): session.message_transfer(message=Message(dp, mp, "DtxMessage")) #start the transaction: - session.dtx_select() + session.dtx_select() self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) #'swap' the message from one queue to the other, under that transaction: @@ -760,7 +760,7 @@ class DtxTests(TestBase010): def getMessageProperty(self, msg, prop): for h in msg.headers: if hasattr(h, prop): return getattr(h, prop) - return None + return None def keepQueuesAlive(self, names): session = self.conn.session("nasty", 99) @@ -768,7 +768,7 @@ class DtxTests(TestBase010): session.queue_declare(queue=n, auto_delete=True) session.message_subscribe(destination=n, queue=n) return session - + def createMessage(self, session, key, id, body): dp=session.delivery_properties(routing_key=key) mp=session.message_properties(correlation_id=id) diff --git a/tests/src/py/qpid_tests/broker_0_10/exchange.py b/tests/src/py/qpid_tests/broker_0_10/exchange.py index db52b36754..9a4cfd37d6 100644 --- a/tests/src/py/qpid_tests/broker_0_10/exchange.py +++ b/tests/src/py/qpid_tests/broker_0_10/exchange.py @@ -226,6 +226,8 @@ class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier): # Test automatic binding by queue name. self.queue_declare(queue="d") self.assertPublishConsume(queue="d", routing_key="d") + # Test explicit bind to default queue + self.verifyDirectExchange("") # TODO aconway 2006-09-27: Fill in empty tests: @@ -446,9 +448,9 @@ class MiscellaneousErrorsTests(TestHelper): def testTypeNotKnown(self): try: self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") - self.fail("Expected 404 for declaration of unknown exchange type.") + self.fail("Expected 503 for declaration of unknown exchange type.") except SessionException, e: - self.assertEquals(404, e.args[0].error_code) + self.assertEquals(503, e.args[0].error_code) def testDifferentDeclaredType(self): self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") @@ -458,30 +460,7 @@ class MiscellaneousErrorsTests(TestHelper): self.fail("Expected 530 for redeclaration of exchange with different type.") except SessionException, e: self.assertEquals(530, e.args[0].error_code) - - def testDefaultAccessBind(self): - try: - self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True) - self.session.exchange_bind(exchange="", queue="my-queue", binding_key="another-key") - self.fail("Expected 542 (invalid-argument) code for bind to default exchange.") - except SessionException, e: - self.assertEquals(542, e.args[0].error_code) - - def testDefaultAccessUnbind(self): - try: - self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True) - self.session.exchange_unbind(exchange="", queue="my-queue", binding_key="my-queue") - self.fail("Expected 542 (invalid-argument) code for unbind from default exchange.") - except SessionException, e: - self.assertEquals(542, e.args[0].error_code) - - def testDefaultAccessDelete(self): - try: - self.session.exchange_delete(exchange="") - self.fail("Expected 542 (invalid-argument) code for delete of default exchange.") - except SessionException, e: - self.assertEquals(542, e.args[0].error_code) - + class ExchangeTests(TestHelper): def testHeadersBindNoMatchArg(self): self.session.queue_declare(queue="q", exclusive=True, auto_delete=True) diff --git a/tests/src/py/qpid_tests/broker_0_10/extensions.py b/tests/src/py/qpid_tests/broker_0_10/extensions.py index 50c0aa3dd1..26ea3cb0e9 100644 --- a/tests/src/py/qpid_tests/broker_0_10/extensions.py +++ b/tests/src/py/qpid_tests/broker_0_10/extensions.py @@ -20,8 +20,6 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content from qpid.testlib import TestBase010 -from qpid.session import SessionException -from qpid.datatypes import uuid4 from time import sleep class ExtensionTests(TestBase010): @@ -30,58 +28,10 @@ class ExtensionTests(TestBase010): def test_timed_autodelete(self): session = self.session session2 = self.conn.session("another-session") - name=str(uuid4()) - session2.queue_declare(queue=name, exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":3}) + session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5}) session2.close() - result = session.queue_query(queue=name) - self.assertEqual(name, result.queue) + result = session.queue_query(queue="my-queue") + self.assertEqual("my-queue", result.queue) sleep(5) - result = session.queue_query(queue=name) + result = session.queue_query(queue="my-queue") self.assert_(not result.queue) - - def valid_policy_args(self, args, name="test-queue"): - try: - self.session.queue_declare(queue=name, arguments=args) - self.session.queue_delete(queue=name) # cleanup - except SessionException, e: - self.fail("declare with valid policy args failed: %s" % (args)) - self.session = self.conn.session("replacement", 2) - - def invalid_policy_args(self, args, name="test-queue"): - # go through invalid declare attempts twice to make sure that - # the queue doesn't actually get created first time around - # even if exception is thrown - for i in range(1, 3): - try: - self.session.queue_declare(queue=name, arguments=args) - self.session.queue_delete(queue=name) # cleanup - self.fail("declare with invalid policy args suceeded: %s (iteration %d)" % (args, i)) - except SessionException, e: - self.session = self.conn.session(str(uuid4())) - - def test_policy_max_size_as_valid_string(self): - self.valid_policy_args({"qpid.max_size":"3"}) - - def test_policy_max_count_as_valid_string(self): - self.valid_policy_args({"qpid.max_count":"3"}) - - def test_policy_max_count_and_size_as_valid_strings(self): - self.valid_policy_args({"qpid.max_count":"3","qpid.max_size":"0"}) - - def test_policy_negative_count(self): - self.invalid_policy_args({"qpid.max_count":-1}) - - def test_policy_negative_size(self): - self.invalid_policy_args({"qpid.max_size":-1}) - - def test_policy_size_as_invalid_string(self): - self.invalid_policy_args({"qpid.max_size":"foo"}) - - def test_policy_count_as_invalid_string(self): - self.invalid_policy_args({"qpid.max_count":"foo"}) - - def test_policy_size_as_float(self): - self.invalid_policy_args({"qpid.max_size":3.14159}) - - def test_policy_count_as_float(self): - self.invalid_policy_args({"qpid.max_count":"2222222.22222"}) diff --git a/tests/src/py/qpid_tests/broker_0_10/management.py b/tests/src/py/qpid_tests/broker_0_10/management.py index ac6d7578da..06f3212a6f 100644 --- a/tests/src/py/qpid_tests/broker_0_10/management.py +++ b/tests/src/py/qpid_tests/broker_0_10/management.py @@ -156,7 +156,7 @@ class ManagementTest (TestBase010): queues = self.qmf.getObjects(_class="queue") "Move 10 messages from src-queue to dest-queue" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {}) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) self.assertEqual (result.status, 0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -166,7 +166,7 @@ class ManagementTest (TestBase010): self.assertEqual (dq.msgDepth,10) "Move all remaining messages to destination" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {}) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) self.assertEqual (result.status,0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -176,16 +176,16 @@ class ManagementTest (TestBase010): self.assertEqual (dq.msgDepth,20) "Use a bad source queue name" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {}) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) self.assertEqual (result.status,4) "Use a bad destination queue name" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {}) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) self.assertEqual (result.status,4) " Use a large qty (40) to move from dest-queue back to " " src-queue- should move all " - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {}) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) self.assertEqual (result.status,0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -225,55 +225,23 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] "Purge top message from purge-queue" - result = pq.purge(1, {}) + result = pq.purge(1) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,19) "Purge top 9 messages from purge-queue" - result = pq.purge(9, {}) + result = pq.purge(9) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,10) "Purge all messages from purge-queue" - result = pq.purge(0, {}) + result = pq.purge(0) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) - def test_reroute_priority_queue(self): - self.startQmf() - session = self.session - - #setup test queue supporting multiple priority levels - session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10}) - - #send some messages of varying priority to that queue: - for i in range(0, 5): - deliveryProps = session.delivery_properties(routing_key="test-queue", priority=i+5) - session.message_transfer(message=Message(deliveryProps, "Message %d" % (i+1))) - - - #declare and bind a queue to amq.fanout through which rerouted - #messages can be verified: - session.queue_declare(queue="rerouted", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10}) - session.exchange_bind(queue="rerouted", exchange="amq.fanout") - - #reroute messages from test queue to amq.fanout (and hence to - #rerouted queue): - pq = self.qmf.getObjects(_class="queue", name="test-queue")[0] - result = pq.reroute(0, False, "amq.fanout", {}) - self.assertEqual(result.status, 0) - - #verify messages are all rerouted: - self.subscribe(destination="incoming", queue="rerouted") - incoming = session.incoming("incoming") - for i in range(0, 5): - msg = incoming.get(timeout=1) - self.assertEqual("Message %d" % (5-i), msg.body) - - def test_reroute_queue(self): """ Test ability to reroute messages from the head of a queue. @@ -301,7 +269,7 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0] "Reroute top message from reroute-queue to alternate exchange" - result = pq.reroute(1, True, "", {}) + result = pq.reroute(1, True, "") self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0] @@ -309,7 +277,7 @@ class ManagementTest (TestBase010): self.assertEqual(aq.msgDepth,1) "Reroute top 9 messages from reroute-queue to alt.direct2" - result = pq.reroute(9, False, "alt.direct2", {}) + result = pq.reroute(9, False, "alt.direct2") self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] @@ -317,11 +285,11 @@ class ManagementTest (TestBase010): self.assertEqual(aq.msgDepth,9) "Reroute using a non-existent exchange" - result = pq.reroute(0, False, "amq.nosuchexchange", {}) + result = pq.reroute(0, False, "amq.nosuchexchange") self.assertEqual(result.status, 4) "Reroute all messages from reroute-queue" - result = pq.reroute(0, False, "alt.direct2", {}) + result = pq.reroute(0, False, "alt.direct2") self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] @@ -337,44 +305,11 @@ class ManagementTest (TestBase010): session.message_transfer(destination="amq.direct", message=msg) "Reroute onto the same queue" - result = pq.reroute(0, False, "amq.direct", {}) + result = pq.reroute(0, False, "amq.direct") self.assertEqual(result.status, 0) pq.update() self.assertEqual(pq.msgDepth,20) - - def test_reroute_alternate_exchange(self): - """ - Test that when rerouting, the alternate-exchange is considered if relevant - """ - self.startQmf() - session = self.session - # 1. Create 2 exchanges A and B (fanout) where B is the - # alternate exchange for A - session.exchange_declare(exchange="B", type="fanout") - session.exchange_declare(exchange="A", type="fanout", alternate_exchange="B") - - # 2. Bind queue X to B - session.queue_declare(queue="X", exclusive=True, auto_delete=True) - session.exchange_bind(queue="X", exchange="B") - - # 3. Send 1 message to queue Y - session.queue_declare(queue="Y", exclusive=True, auto_delete=True) - props = session.delivery_properties(routing_key="Y") - session.message_transfer(message=Message(props, "reroute me!")) - - # 4. Call reroute on queue Y and specify that messages should - # be sent to exchange A - y = self.qmf.getObjects(_class="queue", name="Y")[0] - result = y.reroute(1, False, "A", {}) - self.assertEqual(result.status, 0) - - # 5. verify that the message is rerouted through B (as A has - # no matching bindings) to X - self.subscribe(destination="x", queue="X") - self.assertEqual("reroute me!", session.incoming("x").get(timeout=1).body) - - # Cleanup - for e in ["A", "B"]: session.exchange_delete(exchange=e) + def test_methods_async (self): """ @@ -584,63 +519,4 @@ class ManagementTest (TestBase010): conn_qmf.update() self.assertEqual(conn_qmf.msgsToClient, 1) - def test_timestamp_config(self): - """ - Test message timestamping control. - """ - self.startQmf() - conn = self.connect() - session = conn.session("timestamp-session") - - #verify that receive message timestamping is OFF by default - broker = self.qmf.getObjects(_class="broker")[0] - rc = broker.getTimestampConfig() - self.assertEqual(rc.status, 0) - self.assertEqual(rc.text, "OK") - #self.assertEqual(rc.receive, False) - - #try to enable it - rc = broker.setTimestampConfig(True) - self.assertEqual(rc.status, 0) - self.assertEqual(rc.text, "OK") - - rc = broker.getTimestampConfig() - self.assertEqual(rc.status, 0) - self.assertEqual(rc.text, "OK") - self.assertEqual(rc.receive, True) - - #send a message to a queue - session.queue_declare(queue="ts-q", exclusive=True, auto_delete=True) - session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "abc")) - - #receive message from queue, and verify timestamp is present - session.message_subscribe(destination="d", queue="ts-q") - session.message_flow(destination="d", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="d", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - incoming = session.incoming("d") - msg = incoming.get(timeout=1) - self.assertEqual("abc", msg.body) - self.assertEqual(msg.has("delivery_properties"), True) - dp = msg.get("delivery_properties") - assert(dp.timestamp) - - #try to disable it - rc = broker.setTimestampConfig(False) - self.assertEqual(rc.status, 0) - self.assertEqual(rc.text, "OK") - - rc = broker.getTimestampConfig() - self.assertEqual(rc.status, 0) - self.assertEqual(rc.text, "OK") - self.assertEqual(rc.receive, False) - - #send another message to the queue - session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "def")) - - #receive message from queue, and verify timestamp is NOT PRESENT - msg = incoming.get(timeout=1) - self.assertEqual("def", msg.body) - self.assertEqual(msg.has("delivery_properties"), True) - dp = msg.get("delivery_properties") - self.assertEqual(dp.timestamp, None) - + diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py index 6c864bcd13..e80333a1e6 100644 --- a/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/tests/src/py/qpid_tests/broker_0_10/message.py @@ -245,46 +245,26 @@ class MessageTests(TestBase010): self.fail("Got message after cancellation: " + msg) except Empty: None - #cancellation of non-existant consumers should be result in 404s - try: - session.message_cancel(destination="my-consumer") - self.fail("Expected 404 for recancellation of subscription.") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - session = self.conn.session("alternate-session", timeout=10) - try: - session.message_cancel(destination="this-never-existed") - self.fail("Expected 404 for cancellation of unknown subscription.") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) + #cancellation of non-existant consumers should be handled without error + session.message_cancel(destination="my-consumer") + session.message_cancel(destination="this-never-existed") def test_ack(self): """ - Test basic ack/recover behaviour using a combination of implicit and - explicit accept subscriptions. + Test basic ack/recover behaviour """ - self.startQmf() - session1 = self.conn.session("alternate-session", timeout=10) - session1.queue_declare(queue="test-ack-queue", auto_delete=True) - - delivery_properties = session1.delivery_properties(routing_key="test-ack-queue") - for i in ["One", "Two", "Three", "Four", "Five"]: - session1.message_transfer(message=Message(delivery_properties, i)) + session = self.conn.session("alternate-session", timeout=10) + session.queue_declare(queue="test-ack-queue", auto_delete=True) - # verify enqueued message count, use both QMF and session query to verify consistency - self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count) - queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0] - self.assertEquals(queueObj.msgDepth, 5) - self.assertEquals(queueObj.msgTotalEnqueues, 5) - self.assertEquals(queueObj.msgTotalDequeues, 0) + session.message_subscribe(queue = "test-ack-queue", destination = "consumer") + session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("consumer") - # subscribe with implied acquire, explicit accept: - session1.message_subscribe(queue = "test-ack-queue", destination = "consumer") - session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFFL) - session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFFL) - queue = session1.incoming("consumer") + delivery_properties = session.delivery_properties(routing_key="test-ack-queue") + for i in ["One", "Two", "Three", "Four", "Five"]: + session.message_transfer(message=Message(delivery_properties, i)) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -298,46 +278,20 @@ class MessageTests(TestBase010): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - # messages should not be on the queue: - self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count) - # QMF shows the dequeues as not having happened yet, since they are have - # not been accepted - queueObj.update() - self.assertEquals(queueObj.msgDepth, 5) - self.assertEquals(queueObj.msgTotalEnqueues, 5) - self.assertEquals(queueObj.msgTotalDequeues, 0) - - session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four - - # QMF should now reflect the accepted messages as being dequeued - self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count) - queueObj.update() - self.assertEquals(queueObj.msgDepth, 2) - self.assertEquals(queueObj.msgTotalEnqueues, 5) - self.assertEquals(queueObj.msgTotalDequeues, 3) - - #subscribe from second session here to ensure queue is not auto-deleted - #when alternate session closes. Use implicit accept mode to test that - #we don't need to explicitly accept - session2 = self.conn.session("alternate-session-2", timeout=10) - session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) - - #now close the first session, and see that the unaccepted messages are - #then redelivered to another subscriber: - session1.close(timeout=10) + session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four - # check the statistics - the queue_query will show the non-accepted - # messages have been released. QMF never considered them dequeued, so - # those counts won't change - self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count) - queueObj.update() - self.assertEquals(queueObj.msgDepth, 2) - self.assertEquals(queueObj.msgTotalEnqueues, 5) - self.assertEquals(queueObj.msgTotalDequeues, 3) + #subscribe from second session here to ensure queue is not + #auto-deleted when alternate session closes (no need to ack on these): + self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) - session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFFL) - session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFFL) - queue = session2.incoming("checker") + #now close the session, and see that the unacked messages are + #then redelivered to another subscriber: + session.close(timeout=10) + + session = self.session + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("checker") msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) @@ -350,33 +304,6 @@ class MessageTests(TestBase010): self.fail("Got unexpected message: " + extra.body) except Empty: None - self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count) - queueObj.update() - self.assertEquals(queueObj.msgDepth, 0) - self.assertEquals(queueObj.msgTotalEnqueues, 5) - self.assertEquals(queueObj.msgTotalDequeues, 5) - - # Subscribe one last time to keep the queue available, and to verify - # that the implied accept worked by verifying no messages have been - # returned when session2 is closed. - self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker") - - session2.close(timeout=10) - - # check the statistics - they should not have changed - self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count) - queueObj.update() - self.assertEquals(queueObj.msgDepth, 0) - self.assertEquals(queueObj.msgTotalEnqueues, 5) - self.assertEquals(queueObj.msgTotalDequeues, 5) - - self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFFL) - self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) - try: - extra = self.session.incoming("final-checker").get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - def test_reject(self): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") @@ -571,47 +498,6 @@ class MessageTests(TestBase010): msgB = q.get(timeout=10) - def test_window_stop(self): - """ - Ensure window based flow control reacts to stop correctly - """ - session = self.session - #setup subscriber on a test queue - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 1, destination = "c") - session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") - - - #send batch of messages to queue - for i in range(0, 10): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % (i+1))) - - #retrieve all delivered messages - q = session.incoming("c") - for i in range(0, 5): - msg = q.get(timeout = 1) - session.receiver._completed.add(msg.id)#TODO: this may be done automatically - self.assertDataEquals(session, msg, "Message %d" % (i+1)) - - session.message_stop(destination = "c") - - #now send completions, normally used to move window forward, - #but after a stop should not do so - session.channel.session_completed(session.receiver._completed) - - #check no more messages are sent - self.assertEmpty(q) - - #re-establish window and check remaining messages - session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") - for i in range(0, 5): - msg = q.get(timeout = 1) - self.assertDataEquals(session, msg, "Message %d" % (i+6)) - - def test_subscribe_not_acquired(self): """ Test the not-acquired modes works as expected for a simple case @@ -1016,6 +902,7 @@ class MessageTests(TestBase010): assert messages.get(timeout=1).body == "second" self.assertEmpty(messages) + def assertDataEquals(self, session, msg, expected): self.assertEquals(expected, msg.body) diff --git a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py deleted file mode 100644 index 99d11151e8..0000000000 --- a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py +++ /dev/null @@ -1,1077 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.messaging import * -from qpid.tests.messaging import Base -import qmf.console - -from time import sleep -# -# Tests the Broker's support for message groups -# - -class MultiConsumerMsgGroupTests(Base): - """ - Tests for the behavior of multi-consumer message groups. These tests allow - a messages from the same group be consumed by multiple different clients as - long as each message is processed "in sequence". See QPID-3346 for - details. - """ - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def test_simple(self): - """ Verify simple acquire/accept actions on a set of grouped - messages shared between two receivers. - """ - ## Create a msg group queue - - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","A","A","B","B","B","C","C","C"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - ## Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - ## Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, - - # create consumers on separate sessions: C1,C2 - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":0}) - - # C1 should acquire A-0, then C2 should acquire B-3 - - m1 = c1.fetch(0); - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - m2 = c2.fetch(0); - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 3 - - # C1 Acknowledge A-0 - c1.session.acknowledge(m1); - - # C2 should next acquire A-1 - m3 = c2.fetch(0); - assert m3.properties['THE-GROUP'] == 'A' - assert m3.content['index'] == 1 - - # C1 should next acquire C-6, since groups A&B are held by c2 - m4 = c1.fetch(0); - assert m4.properties['THE-GROUP'] == 'C' - assert m4.content['index'] == 6 - - ## Queue = XXX, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - ## Owners= ---, ^C2, +C2, ^C2, +C2, +C2, ^C1, +C1, +C1, - - # C2 Acknowledge B-3, freeing up the rest of B group - c2.session.acknowledge(m2); - - ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8... - ## Owners= ---, ^C2, +C2, ---, ---, ---, ^C1, +C1, +C1, - - # C1 should now acquire B-4, since it is next "free" - m5 = c1.fetch(0); - assert m5.properties['THE-GROUP'] == 'B' - assert m5.content['index'] == 4 - - ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8... - ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ^C1, +C1, +C1, - - # C1 acknowledges C-6, freeing the C group - c1.session.acknowledge(m4) - - ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8... - ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ---, ---, --- - - # C2 should next fetch A-2, followed by C-7 - m7 = c2.fetch(0); - assert m7.properties['THE-GROUP'] == 'A' - assert m7.content['index'] == 2 - - m8 = c2.fetch(0); - assert m8.properties['THE-GROUP'] == 'C' - assert m8.content['index'] == 7 - - ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8... - ## Owners= ---, ^C2, ^C2, ---, ^C1, +C1, ---, ^C2, +C2 - - # have C2 ack all fetched messages, freeing C-8 - c2.session.acknowledge() - - ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8... - ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, --- - - # the next fetch of C2 would get C-8, since B-5 is "owned" - m9 = c2.fetch(0); - assert m9.properties['THE-GROUP'] == 'C' - assert m9.content['index'] == 8 - - ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8... - ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ^C2 - - # C1 acks B-4, freeing B-5 for consumption - c1.session.acknowledge(m5) - - ## Queue = XXX, XXX, XXX, XXX, XXX, b-5, XXX, XXX, c-8... - ## Owners= ---, ---, ---, ---, ---, ^C2, ---, ---, ^C2 - - # the next fetch of C2 would get B-5 - m10 = c2.fetch(0); - assert m10.properties['THE-GROUP'] == 'B' - assert m10.content['index'] == 5 - - # there should be no more left for C1: - try: - mx = c1.fetch(0) - assert False # should never get here - except Empty: - pass - - c1.session.acknowledge() - c2.session.acknowledge() - c1.close() - c2.close() - snd.close() - - def test_simple_browse(self): - """ Test the behavior of a browsing subscription on a message grouping - queue. - """ - - ## Create a msg group queue - - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","A","B","C"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - ## Queue = A-0, B-1, A-2, b-3, C-4 - ## Owners= ---, ---, ---, ---, --- - - # create consumer and browser - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) - - # C1 should acquire A-0 - - m1 = c1.fetch(0); - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - ## Queue = A-0, B-1, A-2, b-3, C-4 - ## Owners= ^C1, ---, +C1, ---, --- - - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 1 - - # verify that the browser may see A-2, even though its group is owned - # by C1 - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] == 'A' - assert m2.content['index'] == 2 - - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 3 - - # verify the consumer can own groups currently seen by the browser - m3 = c1.fetch(0); - assert m3.properties['THE-GROUP'] == 'B' - assert m3.content['index'] == 1 - - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] == 'C' - assert m2.content['index'] == 4 - - def test_release(self): - """ Verify releasing a message can free its assocated group - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","A","B","B"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":0}) - - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 2 - - # C1 release m1, and the first group - - s1.acknowledge(m1, Disposition(RELEASED, set_redelivered=True)) - - # C2 should be able to get group 'A', msg 'A-0' now - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'A' - assert m2.content['index'] == 0 - - def test_reject(self): - """ Verify rejecting a message can free its associated group - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","A","B","B"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":0}) - - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 2 - - # C1 rejects m1, and the first group is released - s1.acknowledge(m1, Disposition(REJECTED)) - - # C2 should be able to get group 'A', msg 'A-1' now - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'A' - assert m2.content['index'] == 1 - - def test_close(self): - """ Verify behavior when a consumer that 'owns' a group closes. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","A","B","B"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":0}) - - # C1 will own group A - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - # C2 will own group B - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 2 - - # C1 shuffles off the mortal coil... - c1.close(); - - # but the session (s1) remains active, so "A" remains blocked - # from c2, c2 should fetch the next B-3 - - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 3 - - # and there should be no more messages available for C2 - try: - m2 = c2.fetch(0) - assert False # should never get here - except Empty: - pass - - # close session s1, releasing the A group - s1.close() - - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'A' - assert m2.content['index'] == 0 - - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'A' - assert m2.content['index'] == 1 - - # and there should be no more messages now - try: - m2 = c2.fetch(0) - assert False # should never get here - except Empty: - pass - - def test_transaction(self): - """ Verify behavior when using transactions. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","A","B","B","A","B"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - s1 = self.conn.session(transactional=True) - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.conn.session(transactional=True) - c2 = s2.receiver("msg-group-q", options={"capacity":0}) - - # C1 gets group A - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - # C2 gets group B - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 2 - - s1.acknowledge(m1) # A-0 consumed, A group freed - s2.acknowledge(m2) # B-2 consumed, B group freed - - s1.commit() # A-0 consumption done, A group now free - s2.rollback() # releases B-2, and group B - - ## Q: ["A1","B2","B3","A4","B5"] - - # C2 should be able to get the next A - m3 = c2.fetch(0) - assert m3.properties['THE-GROUP'] == 'A' - assert m3.content['index'] == 1 - - # C1 should be able to get B-2 - m4 = c1.fetch(0) - assert m4.properties['THE-GROUP'] == 'B' - assert m4.content['index'] == 2 - - s2.acknowledge(m3) # C2 consumes A-1 - s1.acknowledge(m4) # C1 consumes B-2 - s1.commit() # C1 consume B-2 occurs, free group B - - ## Q: [["A1",]"B3","A4","B5"] - - # A-1 is still considered owned by C2, since the commit has yet to - # occur, so the next available to C1 would be B-3 - m5 = c1.fetch(0) # B-3 - assert m5.properties['THE-GROUP'] == 'B' - assert m5.content['index'] == 3 - - # and C2 should find A-4 available, since it owns the A group - m6 = c2.fetch(0) # A-4 - assert m6.properties['THE-GROUP'] == 'A' - assert m6.content['index'] == 4 - - s2.acknowledge(m6) # C2 consumes A-4 - - # uh-oh, A-1 and A-4 released, along with A group - s2.rollback() - - ## Q: ["A1",["B3"],"A4","B5"] - m7 = c1.fetch(0) # A-1 is found - assert m7.properties['THE-GROUP'] == 'A' - assert m7.content['index'] == 1 - - ## Q: [["A1"],["B3"],"A4","B5"] - # since C1 "owns" both A and B group, C2 should find nothing available - try: - m8 = c2.fetch(0) - assert False # should not get here - except Empty: - pass - - # C1 next gets A4 - m9 = c1.fetch(0) - assert m9.properties['THE-GROUP'] == 'A' - assert m9.content['index'] == 4 - - s1.acknowledge() - - ## Q: [["A1"],["B3"],["A4"],"B5"] - # even though C1 acknowledges A1,B3, and A4, B5 is still considered - # owned as the commit has yet to take place - try: - m10 = c2.fetch(0) - assert False # should not get here - except Empty: - pass - - # now A1,B3,A4 dequeued, B5 should be free - s1.commit() - - ## Q: ["B5"] - m11 = c2.fetch(0) - assert m11.properties['THE-GROUP'] == 'B' - assert m11.content['index'] == 5 - - s2.acknowledge() - s2.commit() - - def test_send_transaction(self): - """ Verify behavior when sender is using transactions. - """ - ssn = self.conn.session(transactional=True) - snd = ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - msg = Message(content={'index':0}, properties={"THE-GROUP": "A"}) - snd.send(msg) - msg = Message(content={'index':1}, properties={"THE-GROUP": "B"}) - snd.send(msg) - snd.session.commit() - msg = Message(content={'index':2}, properties={"THE-GROUP": "A"}) - snd.send(msg) - - # Queue: [A0,B1, (uncommitted: A2) ] - - s1 = self.conn.session(transactional=True) - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.conn.session(transactional=True) - c2 = s2.receiver("msg-group-q", options={"capacity":0}) - - # C1 gets A0, group A - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - # C2 gets B2, group B - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 1 - - # Since A2 uncommitted, there should be nothing left to fetch - try: - mX = c1.fetch(0) - assert False # should not get here - except Empty: - pass - try: - mX = c2.fetch(0) - assert False # should not get here - except Empty: - pass - - snd.session.commit() - msg = Message(content={'index':3}, properties={"THE-GROUP": "B"}) - snd.send(msg) - - # Queue: [A2, (uncommitted: B3) ] - - # B3 has yet to be committed, so C2 should see nothing available: - try: - mX = c2.fetch(0) - assert False # should not get here - except Empty: - pass - - # but A2 should be available to C1 - m3 = c1.fetch(0) - assert m3.properties['THE-GROUP'] == 'A' - assert m3.content['index'] == 2 - - # now make B3 available - snd.session.commit() - - # C1 should still be done: - try: - mX = c1.fetch(0) - assert False # should not get here - except Empty: - pass - - # but C2 should find the new B - m4 = c2.fetch(0) - assert m4.properties['THE-GROUP'] == 'B' - assert m4.content['index'] == 3 - - # extra: have C1 rollback, verify C2 finds the released 'A' messages - c1.session.rollback() - - ## Q: ["A0","A2"] - - # C2 should be able to get the next A - m5 = c2.fetch(0) - assert m5.properties['THE-GROUP'] == 'A' - assert m5.content['index'] == 0 - - m6 = c2.fetch(0) - assert m6.properties['THE-GROUP'] == 'A' - assert m6.content['index'] == 2 - - c2.session.acknowledge() - c2.session.commit() - - def test_query(self): - """ Verify the queue query method against message groups - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","C","A","B","C","A"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":0}) - - m1 = c1.fetch(0) - m2 = c2.fetch(0) - - # at this point, group A should be owned by C1, group B by C2, and - # group C should be available - - # now setup a QMF session, so we can call methods - self.qmf_session = qmf.console.Session() - self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) - brokers = self.qmf_session.getObjects(_class="broker") - assert len(brokers) == 1 - broker = brokers[0] - - # verify the query method call's group information - rc = broker.query("queue", "msg-group-q") - assert rc.status == 0 - assert rc.text == "OK" - results = rc.outArgs['results'] - assert 'qpid.message_group_queue' in results - q_info = results['qpid.message_group_queue'] - assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP" - assert 'group_state' in q_info and len(q_info['group_state']) == 3 - for g_info in q_info['group_state']: - assert 'group_id' in g_info - if g_info['group_id'] == "A": - assert g_info['msg_count'] == 3 - assert g_info['consumer'] != "" - elif g_info['group_id'] == "B": - assert g_info['msg_count'] == 2 - assert g_info['consumer'] != "" - elif g_info['group_id'] == "C": - assert g_info['msg_count'] == 2 - assert g_info['consumer'] == "" - else: - assert(False) # should never get here - self.qmf_session.delBroker(self.qmf_broker) - - def test_purge_free(self): - """ Verify we can purge a queue of all messages of a given "unowned" - group. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","A","B","C","A"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - # now setup a QMF session, so we can call methods - self.qmf_session = qmf.console.Session() - self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) - queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0] - assert queue - msg_filter = { 'filter_type' : 'header_match_str', - 'filter_params' : { 'header_key' : "THE-GROUP", - 'header_value' : "B" }} - assert queue.msgDepth == 6 - rc = queue.purge(0, msg_filter) - assert rc.status == 0 - queue.update() - assert queue.msgDepth == 4 - - # verify all B's removed.... - s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) - count = 0 - try: - while True: - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] != 'B' - count += 1 - except Empty: - pass - assert count == 4 - - self.qmf_session.delBroker(self.qmf_broker) - - def test_purge_acquired(self): - """ Verify we can purge messages from an acquired group. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","A","B","C","A"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - # acquire group "A" - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - # now setup a QMF session, so we can purge group A - self.qmf_session = qmf.console.Session() - self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) - queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0] - assert queue - msg_filter = { 'filter_type' : 'header_match_str', - 'filter_params' : { 'header_key' : "THE-GROUP", - 'header_value' : "A" }} - assert queue.msgDepth == 6 - rc = queue.purge(0, msg_filter) - assert rc.status == 0 - queue.update() - queue.msgDepth == 4 # the pending acquired A still counts! - - # verify all other A's removed.... - s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) - count = 0 - try: - while True: - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] != 'A' - count += 1 - except Empty: - pass - assert count == 3 # only 3 really available - s1.acknowledge() # ack the consumed A-0 - self.qmf_session.delBroker(self.qmf_broker) - - def test_purge_count(self): - """ Verify we can purge a fixed number of messages from an acquired - group. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","A","B","C","A"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - # acquire group "A" - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - # now setup a QMF session, so we can purge group A - self.qmf_session = qmf.console.Session() - self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) - queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0] - assert queue - msg_filter = { 'filter_type' : 'header_match_str', - 'filter_params' : { 'header_key' : "THE-GROUP", - 'header_value' : "A" }} - assert queue.msgDepth == 6 - rc = queue.purge(1, msg_filter) - assert rc.status == 0 - queue.update() - queue.msgDepth == 5 # the pending acquired A still counts! - - # verify all other A's removed.... - s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) - count = 0 - a_count = 0 - try: - while True: - m2 = b1.fetch(0) - if m2.properties['THE-GROUP'] != 'A': - count += 1 - else: - a_count += 1 - except Empty: - pass - assert count == 3 # non-A's - assert a_count == 1 # and one is an A - s1.acknowledge() # ack the consumed A-0 - self.qmf_session.delBroker(self.qmf_broker) - - def test_move_all(self): - """ Verify we can move messages from an acquired group. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","A","B","C","A"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - # set up destination queue - rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - # acquire group "A" - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - # now setup a QMF session, so we can move what's left of group A - self.qmf_session = qmf.console.Session() - self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) - brokers = self.qmf_session.getObjects(_class="broker") - assert len(brokers) == 1 - broker = brokers[0] - msg_filter = { 'filter_type' : 'header_match_str', - 'filter_params' : { 'header_key' : "THE-GROUP", - 'header_value' : "A" }} - rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter) - assert rc.status == 0 - - # verify all other A's removed from msg-group-q - s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) - count = 0 - try: - while True: - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] != 'A' - count += 1 - except Empty: - pass - assert count == 3 # only 3 really available - - # verify the moved A's are at the dest-q - s2 = self.setup_session() - b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0}) - count = 0 - try: - while True: - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] == 'A' - assert m2.content['index'] == 2 or m2.content['index'] == 5 - count += 1 - except Empty: - pass - assert count == 2 # two A's moved - - s1.acknowledge() # ack the consumed A-0 - self.qmf_session.delBroker(self.qmf_broker) - - def test_move_count(self): - """ Verify we can move a fixed number of messages from an acquired group. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","A","B","C","A"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - # set up destination queue - rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - # now setup a QMF session, so we can move group B - self.qmf_session = qmf.console.Session() - self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) - brokers = self.qmf_session.getObjects(_class="broker") - assert len(brokers) == 1 - broker = brokers[0] - msg_filter = { 'filter_type' : 'header_match_str', - 'filter_params' : { 'header_key' : "THE-GROUP", - 'header_value' : "B" }} - rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter) - assert rc.status == 0 - - # verify all B's removed from msg-group-q - s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) - count = 0 - try: - while True: - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] != 'B' - count += 1 - except Empty: - pass - assert count == 4 - - # verify the moved B's are at the dest-q - s2 = self.setup_session() - b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0}) - count = 0 - try: - while True: - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 1 or m2.content['index'] == 3 - count += 1 - except Empty: - pass - assert count == 2 - - self.qmf_session.delBroker(self.qmf_broker) - - def test_reroute(self): - """ Verify we can reroute messages from an acquired group. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","A","B","C","A"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - # create a topic exchange for the reroute - rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," + - " node: {type: topic}}") - - # acquire group "A" - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - m1 = c1.fetch(0) - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - # now setup a QMF session, so we can reroute group A - self.qmf_session = qmf.console.Session() - self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) - queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0] - assert queue - msg_filter = { 'filter_type' : 'header_match_str', - 'filter_params' : { 'header_key' : "THE-GROUP", - 'header_value' : "A" }} - assert queue.msgDepth == 6 - rc = queue.reroute(0, False, "reroute-q", msg_filter) - assert rc.status == 0 - queue.update() - queue.msgDepth == 4 # the pending acquired A still counts! - - # verify all other A's removed.... - s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) - count = 0 - try: - while True: - m2 = b1.fetch(0) - assert m2.properties['THE-GROUP'] != 'A' - count += 1 - except Empty: - pass - assert count == 3 # only 3 really available - - # and what of reroute-q? - count = 0 - try: - while True: - m2 = rcvr.fetch(0) - assert m2.properties['THE-GROUP'] == 'A' - assert m2.content['index'] == 2 or m2.content['index'] == 5 - count += 1 - except Empty: - pass - assert count == 2 - - s1.acknowledge() # ack the consumed A-0 - self.qmf_session.delBroker(self.qmf_broker) - - def test_queue_delete(self): - """ Test deleting a queue while consumers are active. - """ - - ## Create a msg group queue - - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - groups = ["A","B","A","B","C"] - messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] - index = 0 - for m in messages: - m.content['index'] = index - index += 1 - snd.send(m) - - ## Queue = A-0, B-1, A-2, b-3, C-4 - ## Owners= ---, ---, ---, ---, --- - - # create consumers - s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":0}) - s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":0}) - - # C1 should acquire A-0 - m1 = c1.fetch(0); - assert m1.properties['THE-GROUP'] == 'A' - assert m1.content['index'] == 0 - - # c2 acquires B-1 - m2 = c2.fetch(0) - assert m2.properties['THE-GROUP'] == 'B' - assert m2.content['index'] == 1 - - # with group A and B owned, and C free, delete the - # queue - snd.close() - self.ssn.close() - - def test_default_group_id(self): - """ Verify the queue assigns the default group id should a message - arrive without a group identifier. - """ - snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + - " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'," + - "'qpid.shared_msg_group':1}}}}") - - m = Message(content={}, properties={"NO-GROUP-HEADER":"HA-HA"}) - snd.send(m) - - # now setup a QMF session, so we can call methods - self.qmf_session = qmf.console.Session() - self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) - brokers = self.qmf_session.getObjects(_class="broker") - assert len(brokers) == 1 - broker = brokers[0] - - # grab the group state off the queue, and verify the default group is - # present ("qpid.no-group" is the broker default) - rc = broker.query("queue", "msg-group-q") - assert rc.status == 0 - assert rc.text == "OK" - results = rc.outArgs['results'] - assert 'qpid.message_group_queue' in results - q_info = results['qpid.message_group_queue'] - assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP" - assert 'group_state' in q_info and len(q_info['group_state']) == 1 - g_info = q_info['group_state'][0] - assert 'group_id' in g_info - assert g_info['group_id'] == 'qpid.no-group' - - self.qmf_session.delBroker(self.qmf_broker) - - -class StickyConsumerMsgGroupTests(Base): - """ - Tests for the behavior of sticky-consumer message groups. These tests - expect all messages from the same group be consumed by the same clients. - See QPID-3347 for details. - """ - pass # TBD diff --git a/tests/src/py/qpid_tests/broker_0_10/priority.py b/tests/src/py/qpid_tests/broker_0_10/priority.py index 6a60add97e..3651a1218b 100644 --- a/tests/src/py/qpid_tests/broker_0_10/priority.py +++ b/tests/src/py/qpid_tests/broker_0_10/priority.py @@ -33,13 +33,13 @@ class PriorityTests (Base): def setup_session(self): return self.conn.session() - def prioritised_delivery(self, priorities, levels=10, key="x-qpid-priorities"): + def prioritised_delivery(self, priorities, levels=10): """ Test that message on a queue are delivered in priority order. """ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] - snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s}}}}" % (key, levels), + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}" % levels, durable=self.durable()) for m in msgs: snd.send(m) @@ -50,16 +50,16 @@ class PriorityTests (Base): assert msg.content == expected.content self.ssn.acknowledge(msg) - def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10, level_key="x-qpid-priorities", fairshare_key="x-qpid-fairshare"): + def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10): msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] - limit_policy = "'%s':%s" % (fairshare_key, default_limit) + limit_policy = "x-qpid-fairshare:%s" % default_limit if limits: for k, v in limits.items(): - limit_policy += ", '%s-%s':%s" % (fairshare_key, k, v) + limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v) - snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s, %s}}}}" - % (level_key, levels, limit_policy), + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s, %s}}}}" + % (levels, limit_policy), durable=self.durable()) for m in msgs: snd.send(m) @@ -79,18 +79,12 @@ class PriorityTests (Base): def test_prioritised_delivery_1(self): self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10) - def test_prioritised_delivery_with_alias(self): - self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10, key="qpid.priorities") - def test_prioritised_delivery_2(self): self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5) def test_fairshare_1(self): self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]) - def test_fairshare_with_alias(self): - self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,2,3], level_key="qpid.priorities", fairshare_key="qpid.fairshare") - def test_fairshare_2(self): self.fairshare_delivery(priorities = [10 for i in range(30)]) diff --git a/tests/src/py/qpid_tests/broker_0_10/threshold.py b/tests/src/py/qpid_tests/broker_0_10/threshold.py index 6628ae8424..bcd3c507e2 100644 --- a/tests/src/py/qpid_tests/broker_0_10/threshold.py +++ b/tests/src/py/qpid_tests/broker_0_10/threshold.py @@ -60,18 +60,3 @@ class ThresholdTests (Base): def test_alert_size_alias(self): self.do_threshold_test("x-qpid-maximum-message-size", 15, [Message("msg-%s" % i) for i in range(3)]) - - def test_alert_on_alert_queue(self): - rcv = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#; {link:{x-declare:{arguments:{'qpid.alert_count':1}}}}") - rcvQMFv1 = self.ssn.receiver("qpid.management/console.event.#; {link:{x-declare:{arguments:{'qpid.alert_count':1}}}}") - snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{'qpid.alert_count':1}}}}") - snd.send(Message("my-message")) - queues = [] - for i in range(2): - event = rcv.fetch() - schema = event.content[0]["_schema_id"] - assert schema["_class_name"] == "queueThresholdExceeded" - values = event.content[0]["_values"] - queues.append(values["qName"]) - assert "ttq" in queues, "expected event for ttq (%s)" % (queues) - |