summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 01:19:00 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 01:19:00 +0000
commitebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch)
treedcfb94e75656c6c239fc3dcb754cd2015126424d /tests
parent5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff)
downloadqpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tests')
-rwxr-xr-xtests/setup.py2
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/__init__.py1
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py125
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/dtx.py102
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/exchange.py31
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/extensions.py58
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/management.py154
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/message.py165
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/msg_groups.py1077
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/priority.py20
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/threshold.py15
11 files changed, 117 insertions, 1633 deletions
diff --git a/tests/setup.py b/tests/setup.py
index 2ea7d347e7..5438275b22 100755
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -20,7 +20,7 @@
from distutils.core import setup
setup(name="qpid-tests",
- version="0.13",
+ version="0.9",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9",
diff --git a/tests/src/py/qpid_tests/broker_0_10/__init__.py b/tests/src/py/qpid_tests/broker_0_10/__init__.py
index 7b779df5f4..921786af22 100644
--- a/tests/src/py/qpid_tests/broker_0_10/__init__.py
+++ b/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -33,4 +33,3 @@ from lvq import *
from priority import *
from threshold import *
from extensions import *
-from msg_groups import *
diff --git a/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
index 8cbb5793d9..0ffeb57172 100644
--- a/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
+++ b/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
@@ -18,7 +18,7 @@
#
import traceback
from qpid.queue import Empty
-from qpid.datatypes import Message, RangedSet
+from qpid.datatypes import Message
from qpid.testlib import TestBase010
from qpid.session import SessionException
@@ -77,7 +77,13 @@ class AlternateExchangeTests(TestBase010):
"""
session = self.session
#set up a 'dead letter queue':
- dlq = self.setup_dlq()
+ session.exchange_declare(exchange="dlq", type="fanout")
+ session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="dlq", queue="deleted")
+ session.message_subscribe(destination="dlq", queue="deleted")
+ session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ dlq = session.incoming("dlq")
#create a queue using the dlq as its alternate exchange:
session.queue_declare(queue="delete-me", alternate_exchange="dlq")
@@ -230,121 +236,6 @@ class AlternateExchangeTests(TestBase010):
self.assertEqual("Three", dlq.get(timeout=1).body)
self.assertEmpty(dlq)
- def test_queue_delete_loop(self):
- """
- Test that if a queue is bound to its own alternate exchange,
- then on deletion there is no infinite looping
- """
- session = self.session
- dlq = self.setup_dlq()
-
- #create a queue using the dlq as its alternate exchange:
- session.queue_declare(queue="delete-me", alternate_exchange="dlq")
- #bind that queue to the dlq as well:
- session.exchange_bind(exchange="dlq", queue="delete-me")
- #send it some messages:
- dp=self.session.delivery_properties(routing_key="delete-me")
- for m in ["One", "Two", "Three"]:
- session.message_transfer(message=Message(dp, m))
- #delete it:
- session.queue_delete(queue="delete-me")
- #cleanup:
- session.exchange_delete(exchange="dlq")
-
- #check the messages were delivered to the dlq:
- for m in ["One", "Two", "Three"]:
- self.assertEqual(m, dlq.get(timeout=1).body)
- self.assertEmpty(dlq)
-
- def test_queue_delete_no_match(self):
- """
- Test that on queue deletion, if the queues own alternate
- exchange cannot find a match for the message, the
- alternate-exchange of that exchange will be tried. Note:
- though the spec rules out going to the alternate-exchanges
- alternate exchange when sending to an exchange, it does not
- cover this case.
- """
- session = self.session
- dlq = self.setup_dlq()
-
- #setu up an 'intermediary' exchange
- session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
-
- #create a queue using the intermediary as its alternate exchange:
- session.queue_declare(queue="delete-me", alternate_exchange="my-exchange")
- #bind that queue to the dlq as well:
- session.exchange_bind(exchange="dlq", queue="delete-me")
- #send it some messages:
- dp=self.session.delivery_properties(routing_key="delete-me")
- for m in ["One", "Two", "Three"]:
- session.message_transfer(message=Message(dp, m))
-
- #delete it:
- session.queue_delete(queue="delete-me")
- #cleanup:
- session.exchange_delete(exchange="my-exchange")
- session.exchange_delete(exchange="dlq")
-
- #check the messages were delivered to the dlq:
- for m in ["One", "Two", "Three"]:
- self.assertEqual(m, dlq.get(timeout=1).body)
- self.assertEmpty(dlq)
-
- def test_reject_no_match(self):
- """
- Test that on rejecting a message, if the queues own alternate
- exchange cannot find a match for the message, the
- alternate-exchange of that exchange will be tried. Note:
- though the spec rules out going to the alternate-exchanges
- alternate exchange when sending to an exchange, it does not
- cover this case.
- """
- session = self.session
- dlq = self.setup_dlq()
-
- #setu up an 'intermediary' exchange
- session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
-
- #create a queue using the intermediary as its alternate exchange:
- session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True)
- #bind that queue to the dlq as well:
- session.exchange_bind(exchange="dlq", queue="delivery-queue")
- #send it some messages:
- dp=self.session.delivery_properties(routing_key="delivery-queue")
- for m in ["One", "Two", "Three"]:
- session.message_transfer(message=Message(dp, m))
-
- #get and reject those messages:
- session.message_subscribe(destination="a", queue="delivery-queue")
- session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- incoming = session.incoming("a")
- for m in ["One", "Two", "Three"]:
- msg = incoming.get(timeout=1)
- self.assertEqual(m, msg.body)
- session.message_reject(RangedSet(msg.id))
- session.message_cancel(destination="a")
-
- #check the messages were delivered to the dlq:
- for m in ["One", "Two", "Three"]:
- self.assertEqual(m, dlq.get(timeout=1).body)
- self.assertEmpty(dlq)
- #cleanup:
- session.exchange_delete(exchange="my-exchange")
- session.exchange_delete(exchange="dlq")
-
- def setup_dlq(self):
- session = self.session
- #set up 'dead-letter' handling:
- session.exchange_declare(exchange="dlq", type="fanout")
- session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
- session.exchange_bind(exchange="dlq", queue="deleted")
- session.message_subscribe(destination="dlq", queue="deleted")
- session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- dlq = session.incoming("dlq")
- return dlq
def assertEmpty(self, queue):
try:
diff --git a/tests/src/py/qpid_tests/broker_0_10/dtx.py b/tests/src/py/qpid_tests/broker_0_10/dtx.py
index 19a5c6a8d9..2823385a3b 100644
--- a/tests/src/py/qpid_tests/broker_0_10/dtx.py
+++ b/tests/src/py/qpid_tests/broker_0_10/dtx.py
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,7 +36,7 @@ class DtxTests(TestBase010):
and the appropriate result verified.
The other tests enforce more specific rules and behaviour on a
- per-method or per-field basis.
+ per-method or per-field basis.
"""
XA_RBROLLBACK = 1
@@ -49,8 +49,8 @@ class DtxTests(TestBase010):
self.session = self.conn.session("dtx-session", 1)
def test_simple_commit(self):
- """
- Test basic one-phase commit behaviour.
+ """
+ Test basic one-phase commit behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
@@ -73,8 +73,8 @@ class DtxTests(TestBase010):
self.assertMessageId("commit", "queue-b")
def test_simple_prepare_commit(self):
- """
- Test basic two-phase commit behaviour.
+ """
+ Test basic two-phase commit behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
@@ -100,8 +100,8 @@ class DtxTests(TestBase010):
def test_simple_rollback(self):
- """
- Test basic rollback behaviour.
+ """
+ Test basic rollback behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
@@ -123,8 +123,8 @@ class DtxTests(TestBase010):
self.assertMessageId("rollback", "queue-a")
def test_simple_prepare_rollback(self):
- """
- Test basic rollback behaviour after the transaction has been prepared.
+ """
+ Test basic rollback behaviour after the transaction has been prepared.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
@@ -146,18 +146,18 @@ class DtxTests(TestBase010):
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
- self.assertMessageId("prepare-rollback", "queue-a")
+ self.assertMessageId("prepare-rollback", "queue-a")
def test_select_required(self):
"""
check that an error is flagged if select is not issued before
- start or end
+ start or end
"""
session = self.session
tx = self.xid("dummy")
try:
session.dtx_start(xid=tx)
-
+
#if we get here we have failed, but need to do some cleanup:
session.dtx_end(xid=tx)
session.dtx_rollback(xid=tx)
@@ -197,10 +197,10 @@ class DtxTests(TestBase010):
other.close()
session1.dtx_end(xid=tx)
session1.dtx_rollback(xid=tx)
-
+
#verification:
if failed: self.assertEquals(530, error.args[0].error_code)
- else: self.fail("Xid already known, expected exception!")
+ else: self.fail("Xid already known, expected exception!")
def test_forget_xid_on_completion(self):
"""
@@ -210,8 +210,8 @@ class DtxTests(TestBase010):
#do some transactional work & complete the transaction
self.test_simple_commit()
# session has been reset, so reselect for use with dtx
- self.session.dtx_select()
-
+ self.session.dtx_select()
+
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
self.session.dtx_start(xid=tx)
@@ -237,9 +237,9 @@ class DtxTests(TestBase010):
self.assertEquals(503, e.args[0].error_code)
def test_start_join(self):
- """
+ """
Verify 'join' behaviour, where a session is associated with a
- transaction that is already associated with another session.
+ transaction that is already associated with another session.
"""
guard = self.keepQueuesAlive(["one", "two"])
#create two sessions & select them for use with dtx:
@@ -269,14 +269,14 @@ class DtxTests(TestBase010):
#mark end on both sessions
session1.dtx_end(xid=tx)
session2.dtx_end(xid=tx)
-
+
#commit and check
session1.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
self.assertMessageId("b", "one")
-
+
def test_suspend_resume(self):
"""
@@ -300,7 +300,7 @@ class DtxTests(TestBase010):
session.dtx_start(xid=tx, resume=True)
self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
session.dtx_end(xid=tx)
-
+
#commit and check
session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
@@ -308,7 +308,7 @@ class DtxTests(TestBase010):
self.assertMessageId("a", "two")
self.assertMessageId("b", "one")
- def test_suspend_start_end_resume(self):
+ def test_suspend_start_end_resume(self):
"""
Test suspension and resumption of an association with work
done on another transaction when the first transaction is
@@ -332,7 +332,7 @@ class DtxTests(TestBase010):
session.dtx_start(xid=tx, resume=True)
self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
session.dtx_end(xid=tx)
-
+
#commit and check
session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
@@ -341,10 +341,10 @@ class DtxTests(TestBase010):
self.assertMessageId("b", "one")
def test_end_suspend_and_fail(self):
- """
+ """
Verify that the correct error is signalled if the suspend and
fail flag are both set when disassociating a transaction from
- the session
+ the session
"""
session = self.session
session.dtx_select()
@@ -356,16 +356,16 @@ class DtxTests(TestBase010):
except SessionException, e:
self.assertEquals(503, e.args[0].error_code)
- #cleanup
+ #cleanup
other = self.connect()
session = other.session("cleanup", 1)
session.dtx_rollback(xid=tx)
session.close()
other.close()
-
+
def test_end_unknown_xid(self):
- """
+ """
Verifies that the correct exception is thrown when an attempt
is made to end the association for a xid not previously
associated with the session
@@ -382,7 +382,7 @@ class DtxTests(TestBase010):
def test_end(self):
"""
Verify that the association is terminated by end and subsequent
- operations are non-transactional
+ operations are non-transactional
"""
guard = self.keepQueuesAlive(["tx-queue"])
session = self.conn.session("alternate", 1)
@@ -408,7 +408,7 @@ class DtxTests(TestBase010):
session.message_accept(RangedSet(msg.id))
session.close()
- session = self.session
+ session = self.session
#commit the transaction and check that the first message (and
#only the first message) is then delivered
session.dtx_commit(xid=tx, one_phase=True)
@@ -418,7 +418,7 @@ class DtxTests(TestBase010):
def test_invalid_commit_one_phase_true(self):
"""
Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
+ transaction in question has already been prepared.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -447,7 +447,7 @@ class DtxTests(TestBase010):
def test_invalid_commit_one_phase_false(self):
"""
Test that a commit with one_phase = False is rejected if the
- transaction in question has not yet been prepared.
+ transaction in question has not yet been prepared.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -474,7 +474,7 @@ class DtxTests(TestBase010):
def test_invalid_commit_not_ended(self):
"""
- Test that a commit fails if the xid is still associated with a session.
+ Test that a commit fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -502,7 +502,7 @@ class DtxTests(TestBase010):
def test_invalid_rollback_not_ended(self):
"""
- Test that a rollback fails if the xid is still associated with a session.
+ Test that a rollback fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -531,7 +531,7 @@ class DtxTests(TestBase010):
def test_invalid_prepare_not_ended(self):
"""
- Test that a prepare fails if the xid is still associated with a session.
+ Test that a prepare fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -586,9 +586,9 @@ class DtxTests(TestBase010):
session1.dtx_rollback(xid=tx)
def test_get_timeout(self):
- """
+ """
Check that get-timeout returns the correct value, (and that a
- transaction with a timeout can complete normally)
+ transaction with a timeout can complete normally)
"""
session = self.session
tx = self.xid("dummy")
@@ -599,12 +599,12 @@ class DtxTests(TestBase010):
session.dtx_set_timeout(xid=tx, timeout=60)
self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
- self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
-
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
+
def test_set_timeout(self):
- """
+ """
Test the timeout of a transaction results in the expected
- behaviour
+ behaviour
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
@@ -627,7 +627,7 @@ class DtxTests(TestBase010):
self.assertMessageId("timeout", "queue-a")
#check the correct codes are returned when we try to complete the txn
self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
- self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
+ self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
@@ -649,20 +649,20 @@ class DtxTests(TestBase010):
if i in [2, 5, 6, 8]:
session.dtx_prepare(xid=tx)
prepared.append(tx)
- else:
+ else:
session.dtx_rollback(xid=tx)
xids = session.dtx_recover().in_doubt
-
+
#rollback the prepared transactions returned by recover
for x in xids:
- session.dtx_rollback(xid=x)
+ session.dtx_rollback(xid=x)
#validate against the expected list of prepared transactions
actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these
expected = set([x.global_id for x in prepared])
intersection = actual.intersection(expected)
-
+
if intersection != expected:
missing = expected.difference(actual)
extra = actual.difference(expected)
@@ -723,7 +723,7 @@ class DtxTests(TestBase010):
session.message_transfer(message=Message(dp, mp, "DtxMessage"))
#start the transaction:
- session.dtx_select()
+ session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
#'swap' the message from one queue to the other, under that transaction:
@@ -760,7 +760,7 @@ class DtxTests(TestBase010):
def getMessageProperty(self, msg, prop):
for h in msg.headers:
if hasattr(h, prop): return getattr(h, prop)
- return None
+ return None
def keepQueuesAlive(self, names):
session = self.conn.session("nasty", 99)
@@ -768,7 +768,7 @@ class DtxTests(TestBase010):
session.queue_declare(queue=n, auto_delete=True)
session.message_subscribe(destination=n, queue=n)
return session
-
+
def createMessage(self, session, key, id, body):
dp=session.delivery_properties(routing_key=key)
mp=session.message_properties(correlation_id=id)
diff --git a/tests/src/py/qpid_tests/broker_0_10/exchange.py b/tests/src/py/qpid_tests/broker_0_10/exchange.py
index db52b36754..9a4cfd37d6 100644
--- a/tests/src/py/qpid_tests/broker_0_10/exchange.py
+++ b/tests/src/py/qpid_tests/broker_0_10/exchange.py
@@ -226,6 +226,8 @@ class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier):
# Test automatic binding by queue name.
self.queue_declare(queue="d")
self.assertPublishConsume(queue="d", routing_key="d")
+ # Test explicit bind to default queue
+ self.verifyDirectExchange("")
# TODO aconway 2006-09-27: Fill in empty tests:
@@ -446,9 +448,9 @@ class MiscellaneousErrorsTests(TestHelper):
def testTypeNotKnown(self):
try:
self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
- self.fail("Expected 404 for declaration of unknown exchange type.")
+ self.fail("Expected 503 for declaration of unknown exchange type.")
except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
+ self.assertEquals(503, e.args[0].error_code)
def testDifferentDeclaredType(self):
self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
@@ -458,30 +460,7 @@ class MiscellaneousErrorsTests(TestHelper):
self.fail("Expected 530 for redeclaration of exchange with different type.")
except SessionException, e:
self.assertEquals(530, e.args[0].error_code)
-
- def testDefaultAccessBind(self):
- try:
- self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True)
- self.session.exchange_bind(exchange="", queue="my-queue", binding_key="another-key")
- self.fail("Expected 542 (invalid-argument) code for bind to default exchange.")
- except SessionException, e:
- self.assertEquals(542, e.args[0].error_code)
-
- def testDefaultAccessUnbind(self):
- try:
- self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True)
- self.session.exchange_unbind(exchange="", queue="my-queue", binding_key="my-queue")
- self.fail("Expected 542 (invalid-argument) code for unbind from default exchange.")
- except SessionException, e:
- self.assertEquals(542, e.args[0].error_code)
-
- def testDefaultAccessDelete(self):
- try:
- self.session.exchange_delete(exchange="")
- self.fail("Expected 542 (invalid-argument) code for delete of default exchange.")
- except SessionException, e:
- self.assertEquals(542, e.args[0].error_code)
-
+
class ExchangeTests(TestHelper):
def testHeadersBindNoMatchArg(self):
self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)
diff --git a/tests/src/py/qpid_tests/broker_0_10/extensions.py b/tests/src/py/qpid_tests/broker_0_10/extensions.py
index 50c0aa3dd1..26ea3cb0e9 100644
--- a/tests/src/py/qpid_tests/broker_0_10/extensions.py
+++ b/tests/src/py/qpid_tests/broker_0_10/extensions.py
@@ -20,8 +20,6 @@ from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
from qpid.testlib import TestBase010
-from qpid.session import SessionException
-from qpid.datatypes import uuid4
from time import sleep
class ExtensionTests(TestBase010):
@@ -30,58 +28,10 @@ class ExtensionTests(TestBase010):
def test_timed_autodelete(self):
session = self.session
session2 = self.conn.session("another-session")
- name=str(uuid4())
- session2.queue_declare(queue=name, exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":3})
+ session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5})
session2.close()
- result = session.queue_query(queue=name)
- self.assertEqual(name, result.queue)
+ result = session.queue_query(queue="my-queue")
+ self.assertEqual("my-queue", result.queue)
sleep(5)
- result = session.queue_query(queue=name)
+ result = session.queue_query(queue="my-queue")
self.assert_(not result.queue)
-
- def valid_policy_args(self, args, name="test-queue"):
- try:
- self.session.queue_declare(queue=name, arguments=args)
- self.session.queue_delete(queue=name) # cleanup
- except SessionException, e:
- self.fail("declare with valid policy args failed: %s" % (args))
- self.session = self.conn.session("replacement", 2)
-
- def invalid_policy_args(self, args, name="test-queue"):
- # go through invalid declare attempts twice to make sure that
- # the queue doesn't actually get created first time around
- # even if exception is thrown
- for i in range(1, 3):
- try:
- self.session.queue_declare(queue=name, arguments=args)
- self.session.queue_delete(queue=name) # cleanup
- self.fail("declare with invalid policy args suceeded: %s (iteration %d)" % (args, i))
- except SessionException, e:
- self.session = self.conn.session(str(uuid4()))
-
- def test_policy_max_size_as_valid_string(self):
- self.valid_policy_args({"qpid.max_size":"3"})
-
- def test_policy_max_count_as_valid_string(self):
- self.valid_policy_args({"qpid.max_count":"3"})
-
- def test_policy_max_count_and_size_as_valid_strings(self):
- self.valid_policy_args({"qpid.max_count":"3","qpid.max_size":"0"})
-
- def test_policy_negative_count(self):
- self.invalid_policy_args({"qpid.max_count":-1})
-
- def test_policy_negative_size(self):
- self.invalid_policy_args({"qpid.max_size":-1})
-
- def test_policy_size_as_invalid_string(self):
- self.invalid_policy_args({"qpid.max_size":"foo"})
-
- def test_policy_count_as_invalid_string(self):
- self.invalid_policy_args({"qpid.max_count":"foo"})
-
- def test_policy_size_as_float(self):
- self.invalid_policy_args({"qpid.max_size":3.14159})
-
- def test_policy_count_as_float(self):
- self.invalid_policy_args({"qpid.max_count":"2222222.22222"})
diff --git a/tests/src/py/qpid_tests/broker_0_10/management.py b/tests/src/py/qpid_tests/broker_0_10/management.py
index ac6d7578da..06f3212a6f 100644
--- a/tests/src/py/qpid_tests/broker_0_10/management.py
+++ b/tests/src/py/qpid_tests/broker_0_10/management.py
@@ -156,7 +156,7 @@ class ManagementTest (TestBase010):
queues = self.qmf.getObjects(_class="queue")
"Move 10 messages from src-queue to dest-queue"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {})
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
self.assertEqual (result.status, 0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -166,7 +166,7 @@ class ManagementTest (TestBase010):
self.assertEqual (dq.msgDepth,10)
"Move all remaining messages to destination"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {})
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -176,16 +176,16 @@ class ManagementTest (TestBase010):
self.assertEqual (dq.msgDepth,20)
"Use a bad source queue name"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {})
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
self.assertEqual (result.status,4)
"Use a bad destination queue name"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {})
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
self.assertEqual (result.status,4)
" Use a large qty (40) to move from dest-queue back to "
" src-queue- should move all "
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {})
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -225,55 +225,23 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
"Purge top message from purge-queue"
- result = pq.purge(1, {})
+ result = pq.purge(1)
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,19)
"Purge top 9 messages from purge-queue"
- result = pq.purge(9, {})
+ result = pq.purge(9)
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,10)
"Purge all messages from purge-queue"
- result = pq.purge(0, {})
+ result = pq.purge(0)
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
- def test_reroute_priority_queue(self):
- self.startQmf()
- session = self.session
-
- #setup test queue supporting multiple priority levels
- session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10})
-
- #send some messages of varying priority to that queue:
- for i in range(0, 5):
- deliveryProps = session.delivery_properties(routing_key="test-queue", priority=i+5)
- session.message_transfer(message=Message(deliveryProps, "Message %d" % (i+1)))
-
-
- #declare and bind a queue to amq.fanout through which rerouted
- #messages can be verified:
- session.queue_declare(queue="rerouted", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10})
- session.exchange_bind(queue="rerouted", exchange="amq.fanout")
-
- #reroute messages from test queue to amq.fanout (and hence to
- #rerouted queue):
- pq = self.qmf.getObjects(_class="queue", name="test-queue")[0]
- result = pq.reroute(0, False, "amq.fanout", {})
- self.assertEqual(result.status, 0)
-
- #verify messages are all rerouted:
- self.subscribe(destination="incoming", queue="rerouted")
- incoming = session.incoming("incoming")
- for i in range(0, 5):
- msg = incoming.get(timeout=1)
- self.assertEqual("Message %d" % (5-i), msg.body)
-
-
def test_reroute_queue(self):
"""
Test ability to reroute messages from the head of a queue.
@@ -301,7 +269,7 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
"Reroute top message from reroute-queue to alternate exchange"
- result = pq.reroute(1, True, "", {})
+ result = pq.reroute(1, True, "")
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
@@ -309,7 +277,7 @@ class ManagementTest (TestBase010):
self.assertEqual(aq.msgDepth,1)
"Reroute top 9 messages from reroute-queue to alt.direct2"
- result = pq.reroute(9, False, "alt.direct2", {})
+ result = pq.reroute(9, False, "alt.direct2")
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -317,11 +285,11 @@ class ManagementTest (TestBase010):
self.assertEqual(aq.msgDepth,9)
"Reroute using a non-existent exchange"
- result = pq.reroute(0, False, "amq.nosuchexchange", {})
+ result = pq.reroute(0, False, "amq.nosuchexchange")
self.assertEqual(result.status, 4)
"Reroute all messages from reroute-queue"
- result = pq.reroute(0, False, "alt.direct2", {})
+ result = pq.reroute(0, False, "alt.direct2")
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -337,44 +305,11 @@ class ManagementTest (TestBase010):
session.message_transfer(destination="amq.direct", message=msg)
"Reroute onto the same queue"
- result = pq.reroute(0, False, "amq.direct", {})
+ result = pq.reroute(0, False, "amq.direct")
self.assertEqual(result.status, 0)
pq.update()
self.assertEqual(pq.msgDepth,20)
-
- def test_reroute_alternate_exchange(self):
- """
- Test that when rerouting, the alternate-exchange is considered if relevant
- """
- self.startQmf()
- session = self.session
- # 1. Create 2 exchanges A and B (fanout) where B is the
- # alternate exchange for A
- session.exchange_declare(exchange="B", type="fanout")
- session.exchange_declare(exchange="A", type="fanout", alternate_exchange="B")
-
- # 2. Bind queue X to B
- session.queue_declare(queue="X", exclusive=True, auto_delete=True)
- session.exchange_bind(queue="X", exchange="B")
-
- # 3. Send 1 message to queue Y
- session.queue_declare(queue="Y", exclusive=True, auto_delete=True)
- props = session.delivery_properties(routing_key="Y")
- session.message_transfer(message=Message(props, "reroute me!"))
-
- # 4. Call reroute on queue Y and specify that messages should
- # be sent to exchange A
- y = self.qmf.getObjects(_class="queue", name="Y")[0]
- result = y.reroute(1, False, "A", {})
- self.assertEqual(result.status, 0)
-
- # 5. verify that the message is rerouted through B (as A has
- # no matching bindings) to X
- self.subscribe(destination="x", queue="X")
- self.assertEqual("reroute me!", session.incoming("x").get(timeout=1).body)
-
- # Cleanup
- for e in ["A", "B"]: session.exchange_delete(exchange=e)
+
def test_methods_async (self):
"""
@@ -584,63 +519,4 @@ class ManagementTest (TestBase010):
conn_qmf.update()
self.assertEqual(conn_qmf.msgsToClient, 1)
- def test_timestamp_config(self):
- """
- Test message timestamping control.
- """
- self.startQmf()
- conn = self.connect()
- session = conn.session("timestamp-session")
-
- #verify that receive message timestamping is OFF by default
- broker = self.qmf.getObjects(_class="broker")[0]
- rc = broker.getTimestampConfig()
- self.assertEqual(rc.status, 0)
- self.assertEqual(rc.text, "OK")
- #self.assertEqual(rc.receive, False)
-
- #try to enable it
- rc = broker.setTimestampConfig(True)
- self.assertEqual(rc.status, 0)
- self.assertEqual(rc.text, "OK")
-
- rc = broker.getTimestampConfig()
- self.assertEqual(rc.status, 0)
- self.assertEqual(rc.text, "OK")
- self.assertEqual(rc.receive, True)
-
- #send a message to a queue
- session.queue_declare(queue="ts-q", exclusive=True, auto_delete=True)
- session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "abc"))
-
- #receive message from queue, and verify timestamp is present
- session.message_subscribe(destination="d", queue="ts-q")
- session.message_flow(destination="d", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="d", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- incoming = session.incoming("d")
- msg = incoming.get(timeout=1)
- self.assertEqual("abc", msg.body)
- self.assertEqual(msg.has("delivery_properties"), True)
- dp = msg.get("delivery_properties")
- assert(dp.timestamp)
-
- #try to disable it
- rc = broker.setTimestampConfig(False)
- self.assertEqual(rc.status, 0)
- self.assertEqual(rc.text, "OK")
-
- rc = broker.getTimestampConfig()
- self.assertEqual(rc.status, 0)
- self.assertEqual(rc.text, "OK")
- self.assertEqual(rc.receive, False)
-
- #send another message to the queue
- session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "def"))
-
- #receive message from queue, and verify timestamp is NOT PRESENT
- msg = incoming.get(timeout=1)
- self.assertEqual("def", msg.body)
- self.assertEqual(msg.has("delivery_properties"), True)
- dp = msg.get("delivery_properties")
- self.assertEqual(dp.timestamp, None)
-
+
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py
index 6c864bcd13..e80333a1e6 100644
--- a/tests/src/py/qpid_tests/broker_0_10/message.py
+++ b/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -245,46 +245,26 @@ class MessageTests(TestBase010):
self.fail("Got message after cancellation: " + msg)
except Empty: None
- #cancellation of non-existant consumers should be result in 404s
- try:
- session.message_cancel(destination="my-consumer")
- self.fail("Expected 404 for recancellation of subscription.")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
- session = self.conn.session("alternate-session", timeout=10)
- try:
- session.message_cancel(destination="this-never-existed")
- self.fail("Expected 404 for cancellation of unknown subscription.")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
+ #cancellation of non-existant consumers should be handled without error
+ session.message_cancel(destination="my-consumer")
+ session.message_cancel(destination="this-never-existed")
def test_ack(self):
"""
- Test basic ack/recover behaviour using a combination of implicit and
- explicit accept subscriptions.
+ Test basic ack/recover behaviour
"""
- self.startQmf()
- session1 = self.conn.session("alternate-session", timeout=10)
- session1.queue_declare(queue="test-ack-queue", auto_delete=True)
-
- delivery_properties = session1.delivery_properties(routing_key="test-ack-queue")
- for i in ["One", "Two", "Three", "Four", "Five"]:
- session1.message_transfer(message=Message(delivery_properties, i))
+ session = self.conn.session("alternate-session", timeout=10)
+ session.queue_declare(queue="test-ack-queue", auto_delete=True)
- # verify enqueued message count, use both QMF and session query to verify consistency
- self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count)
- queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0]
- self.assertEquals(queueObj.msgDepth, 5)
- self.assertEquals(queueObj.msgTotalEnqueues, 5)
- self.assertEquals(queueObj.msgTotalDequeues, 0)
+ session.message_subscribe(queue = "test-ack-queue", destination = "consumer")
+ session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("consumer")
- # subscribe with implied acquire, explicit accept:
- session1.message_subscribe(queue = "test-ack-queue", destination = "consumer")
- session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFFL)
- session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFFL)
- queue = session1.incoming("consumer")
+ delivery_properties = session.delivery_properties(routing_key="test-ack-queue")
+ for i in ["One", "Two", "Three", "Four", "Five"]:
+ session.message_transfer(message=Message(delivery_properties, i))
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
@@ -298,46 +278,20 @@ class MessageTests(TestBase010):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- # messages should not be on the queue:
- self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
- # QMF shows the dequeues as not having happened yet, since they are have
- # not been accepted
- queueObj.update()
- self.assertEquals(queueObj.msgDepth, 5)
- self.assertEquals(queueObj.msgTotalEnqueues, 5)
- self.assertEquals(queueObj.msgTotalDequeues, 0)
-
- session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
-
- # QMF should now reflect the accepted messages as being dequeued
- self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
- queueObj.update()
- self.assertEquals(queueObj.msgDepth, 2)
- self.assertEquals(queueObj.msgTotalEnqueues, 5)
- self.assertEquals(queueObj.msgTotalDequeues, 3)
-
- #subscribe from second session here to ensure queue is not auto-deleted
- #when alternate session closes. Use implicit accept mode to test that
- #we don't need to explicitly accept
- session2 = self.conn.session("alternate-session-2", timeout=10)
- session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
-
- #now close the first session, and see that the unaccepted messages are
- #then redelivered to another subscriber:
- session1.close(timeout=10)
+ session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
- # check the statistics - the queue_query will show the non-accepted
- # messages have been released. QMF never considered them dequeued, so
- # those counts won't change
- self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count)
- queueObj.update()
- self.assertEquals(queueObj.msgDepth, 2)
- self.assertEquals(queueObj.msgTotalEnqueues, 5)
- self.assertEquals(queueObj.msgTotalDequeues, 3)
+ #subscribe from second session here to ensure queue is not
+ #auto-deleted when alternate session closes (no need to ack on these):
+ self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
- session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFFL)
- session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFFL)
- queue = session2.incoming("checker")
+ #now close the session, and see that the unacked messages are
+ #then redelivered to another subscriber:
+ session.close(timeout=10)
+
+ session = self.session
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("checker")
msg3b = queue.get(timeout=1)
msg5b = queue.get(timeout=1)
@@ -350,33 +304,6 @@ class MessageTests(TestBase010):
self.fail("Got unexpected message: " + extra.body)
except Empty: None
- self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count)
- queueObj.update()
- self.assertEquals(queueObj.msgDepth, 0)
- self.assertEquals(queueObj.msgTotalEnqueues, 5)
- self.assertEquals(queueObj.msgTotalDequeues, 5)
-
- # Subscribe one last time to keep the queue available, and to verify
- # that the implied accept worked by verifying no messages have been
- # returned when session2 is closed.
- self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker")
-
- session2.close(timeout=10)
-
- # check the statistics - they should not have changed
- self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count)
- queueObj.update()
- self.assertEquals(queueObj.msgDepth, 0)
- self.assertEquals(queueObj.msgTotalEnqueues, 5)
- self.assertEquals(queueObj.msgTotalDequeues, 5)
-
- self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFFL)
- self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
- try:
- extra = self.session.incoming("final-checker").get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
def test_reject(self):
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
@@ -571,47 +498,6 @@ class MessageTests(TestBase010):
msgB = q.get(timeout=10)
- def test_window_stop(self):
- """
- Ensure window based flow control reacts to stop correctly
- """
- session = self.session
- #setup subscriber on a test queue
- session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- session.message_subscribe(queue = "q", destination = "c")
- session.message_set_flow_mode(flow_mode = 1, destination = "c")
- session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
-
-
- #send batch of messages to queue
- for i in range(0, 10):
- session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % (i+1)))
-
- #retrieve all delivered messages
- q = session.incoming("c")
- for i in range(0, 5):
- msg = q.get(timeout = 1)
- session.receiver._completed.add(msg.id)#TODO: this may be done automatically
- self.assertDataEquals(session, msg, "Message %d" % (i+1))
-
- session.message_stop(destination = "c")
-
- #now send completions, normally used to move window forward,
- #but after a stop should not do so
- session.channel.session_completed(session.receiver._completed)
-
- #check no more messages are sent
- self.assertEmpty(q)
-
- #re-establish window and check remaining messages
- session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
- for i in range(0, 5):
- msg = q.get(timeout = 1)
- self.assertDataEquals(session, msg, "Message %d" % (i+6))
-
-
def test_subscribe_not_acquired(self):
"""
Test the not-acquired modes works as expected for a simple case
@@ -1016,6 +902,7 @@ class MessageTests(TestBase010):
assert messages.get(timeout=1).body == "second"
self.assertEmpty(messages)
+
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)
diff --git a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
deleted file mode 100644
index 99d11151e8..0000000000
--- a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ /dev/null
@@ -1,1077 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.messaging import *
-from qpid.tests.messaging import Base
-import qmf.console
-
-from time import sleep
-#
-# Tests the Broker's support for message groups
-#
-
-class MultiConsumerMsgGroupTests(Base):
- """
- Tests for the behavior of multi-consumer message groups. These tests allow
- a messages from the same group be consumed by multiple different clients as
- long as each message is processed "in sequence". See QPID-3346 for
- details.
- """
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def test_simple(self):
- """ Verify simple acquire/accept actions on a set of grouped
- messages shared between two receivers.
- """
- ## Create a msg group queue
-
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","A","B","B","B","C","C","C"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- ## Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- ## Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
-
- # create consumers on separate sessions: C1,C2
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 should acquire A-0, then C2 should acquire B-3
-
- m1 = c1.fetch(0);
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- m2 = c2.fetch(0);
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 3
-
- # C1 Acknowledge A-0
- c1.session.acknowledge(m1);
-
- # C2 should next acquire A-1
- m3 = c2.fetch(0);
- assert m3.properties['THE-GROUP'] == 'A'
- assert m3.content['index'] == 1
-
- # C1 should next acquire C-6, since groups A&B are held by c2
- m4 = c1.fetch(0);
- assert m4.properties['THE-GROUP'] == 'C'
- assert m4.content['index'] == 6
-
- ## Queue = XXX, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- ## Owners= ---, ^C2, +C2, ^C2, +C2, +C2, ^C1, +C1, +C1,
-
- # C2 Acknowledge B-3, freeing up the rest of B group
- c2.session.acknowledge(m2);
-
- ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
- ## Owners= ---, ^C2, +C2, ---, ---, ---, ^C1, +C1, +C1,
-
- # C1 should now acquire B-4, since it is next "free"
- m5 = c1.fetch(0);
- assert m5.properties['THE-GROUP'] == 'B'
- assert m5.content['index'] == 4
-
- ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
- ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ^C1, +C1, +C1,
-
- # C1 acknowledges C-6, freeing the C group
- c1.session.acknowledge(m4)
-
- ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
- ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ---, ---, ---
-
- # C2 should next fetch A-2, followed by C-7
- m7 = c2.fetch(0);
- assert m7.properties['THE-GROUP'] == 'A'
- assert m7.content['index'] == 2
-
- m8 = c2.fetch(0);
- assert m8.properties['THE-GROUP'] == 'C'
- assert m8.content['index'] == 7
-
- ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
- ## Owners= ---, ^C2, ^C2, ---, ^C1, +C1, ---, ^C2, +C2
-
- # have C2 ack all fetched messages, freeing C-8
- c2.session.acknowledge()
-
- ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
- ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ---
-
- # the next fetch of C2 would get C-8, since B-5 is "owned"
- m9 = c2.fetch(0);
- assert m9.properties['THE-GROUP'] == 'C'
- assert m9.content['index'] == 8
-
- ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
- ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ^C2
-
- # C1 acks B-4, freeing B-5 for consumption
- c1.session.acknowledge(m5)
-
- ## Queue = XXX, XXX, XXX, XXX, XXX, b-5, XXX, XXX, c-8...
- ## Owners= ---, ---, ---, ---, ---, ^C2, ---, ---, ^C2
-
- # the next fetch of C2 would get B-5
- m10 = c2.fetch(0);
- assert m10.properties['THE-GROUP'] == 'B'
- assert m10.content['index'] == 5
-
- # there should be no more left for C1:
- try:
- mx = c1.fetch(0)
- assert False # should never get here
- except Empty:
- pass
-
- c1.session.acknowledge()
- c2.session.acknowledge()
- c1.close()
- c2.close()
- snd.close()
-
- def test_simple_browse(self):
- """ Test the behavior of a browsing subscription on a message grouping
- queue.
- """
-
- ## Create a msg group queue
-
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- ## Queue = A-0, B-1, A-2, b-3, C-4
- ## Owners= ---, ---, ---, ---, ---
-
- # create consumer and browser
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
-
- # C1 should acquire A-0
-
- m1 = c1.fetch(0);
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- ## Queue = A-0, B-1, A-2, b-3, C-4
- ## Owners= ^C1, ---, +C1, ---, ---
-
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 1
-
- # verify that the browser may see A-2, even though its group is owned
- # by C1
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 2
-
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 3
-
- # verify the consumer can own groups currently seen by the browser
- m3 = c1.fetch(0);
- assert m3.properties['THE-GROUP'] == 'B'
- assert m3.content['index'] == 1
-
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'C'
- assert m2.content['index'] == 4
-
- def test_release(self):
- """ Verify releasing a message can free its assocated group
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","B","B"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 2
-
- # C1 release m1, and the first group
-
- s1.acknowledge(m1, Disposition(RELEASED, set_redelivered=True))
-
- # C2 should be able to get group 'A', msg 'A-0' now
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 0
-
- def test_reject(self):
- """ Verify rejecting a message can free its associated group
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","B","B"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 2
-
- # C1 rejects m1, and the first group is released
- s1.acknowledge(m1, Disposition(REJECTED))
-
- # C2 should be able to get group 'A', msg 'A-1' now
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 1
-
- def test_close(self):
- """ Verify behavior when a consumer that 'owns' a group closes.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","B","B"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 will own group A
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # C2 will own group B
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 2
-
- # C1 shuffles off the mortal coil...
- c1.close();
-
- # but the session (s1) remains active, so "A" remains blocked
- # from c2, c2 should fetch the next B-3
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 3
-
- # and there should be no more messages available for C2
- try:
- m2 = c2.fetch(0)
- assert False # should never get here
- except Empty:
- pass
-
- # close session s1, releasing the A group
- s1.close()
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 0
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 1
-
- # and there should be no more messages now
- try:
- m2 = c2.fetch(0)
- assert False # should never get here
- except Empty:
- pass
-
- def test_transaction(self):
- """ Verify behavior when using transactions.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","B","B","A","B"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.conn.session(transactional=True)
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.conn.session(transactional=True)
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 gets group A
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # C2 gets group B
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 2
-
- s1.acknowledge(m1) # A-0 consumed, A group freed
- s2.acknowledge(m2) # B-2 consumed, B group freed
-
- s1.commit() # A-0 consumption done, A group now free
- s2.rollback() # releases B-2, and group B
-
- ## Q: ["A1","B2","B3","A4","B5"]
-
- # C2 should be able to get the next A
- m3 = c2.fetch(0)
- assert m3.properties['THE-GROUP'] == 'A'
- assert m3.content['index'] == 1
-
- # C1 should be able to get B-2
- m4 = c1.fetch(0)
- assert m4.properties['THE-GROUP'] == 'B'
- assert m4.content['index'] == 2
-
- s2.acknowledge(m3) # C2 consumes A-1
- s1.acknowledge(m4) # C1 consumes B-2
- s1.commit() # C1 consume B-2 occurs, free group B
-
- ## Q: [["A1",]"B3","A4","B5"]
-
- # A-1 is still considered owned by C2, since the commit has yet to
- # occur, so the next available to C1 would be B-3
- m5 = c1.fetch(0) # B-3
- assert m5.properties['THE-GROUP'] == 'B'
- assert m5.content['index'] == 3
-
- # and C2 should find A-4 available, since it owns the A group
- m6 = c2.fetch(0) # A-4
- assert m6.properties['THE-GROUP'] == 'A'
- assert m6.content['index'] == 4
-
- s2.acknowledge(m6) # C2 consumes A-4
-
- # uh-oh, A-1 and A-4 released, along with A group
- s2.rollback()
-
- ## Q: ["A1",["B3"],"A4","B5"]
- m7 = c1.fetch(0) # A-1 is found
- assert m7.properties['THE-GROUP'] == 'A'
- assert m7.content['index'] == 1
-
- ## Q: [["A1"],["B3"],"A4","B5"]
- # since C1 "owns" both A and B group, C2 should find nothing available
- try:
- m8 = c2.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- # C1 next gets A4
- m9 = c1.fetch(0)
- assert m9.properties['THE-GROUP'] == 'A'
- assert m9.content['index'] == 4
-
- s1.acknowledge()
-
- ## Q: [["A1"],["B3"],["A4"],"B5"]
- # even though C1 acknowledges A1,B3, and A4, B5 is still considered
- # owned as the commit has yet to take place
- try:
- m10 = c2.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- # now A1,B3,A4 dequeued, B5 should be free
- s1.commit()
-
- ## Q: ["B5"]
- m11 = c2.fetch(0)
- assert m11.properties['THE-GROUP'] == 'B'
- assert m11.content['index'] == 5
-
- s2.acknowledge()
- s2.commit()
-
- def test_send_transaction(self):
- """ Verify behavior when sender is using transactions.
- """
- ssn = self.conn.session(transactional=True)
- snd = ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- msg = Message(content={'index':0}, properties={"THE-GROUP": "A"})
- snd.send(msg)
- msg = Message(content={'index':1}, properties={"THE-GROUP": "B"})
- snd.send(msg)
- snd.session.commit()
- msg = Message(content={'index':2}, properties={"THE-GROUP": "A"})
- snd.send(msg)
-
- # Queue: [A0,B1, (uncommitted: A2) ]
-
- s1 = self.conn.session(transactional=True)
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.conn.session(transactional=True)
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 gets A0, group A
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # C2 gets B2, group B
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 1
-
- # Since A2 uncommitted, there should be nothing left to fetch
- try:
- mX = c1.fetch(0)
- assert False # should not get here
- except Empty:
- pass
- try:
- mX = c2.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- snd.session.commit()
- msg = Message(content={'index':3}, properties={"THE-GROUP": "B"})
- snd.send(msg)
-
- # Queue: [A2, (uncommitted: B3) ]
-
- # B3 has yet to be committed, so C2 should see nothing available:
- try:
- mX = c2.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- # but A2 should be available to C1
- m3 = c1.fetch(0)
- assert m3.properties['THE-GROUP'] == 'A'
- assert m3.content['index'] == 2
-
- # now make B3 available
- snd.session.commit()
-
- # C1 should still be done:
- try:
- mX = c1.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- # but C2 should find the new B
- m4 = c2.fetch(0)
- assert m4.properties['THE-GROUP'] == 'B'
- assert m4.content['index'] == 3
-
- # extra: have C1 rollback, verify C2 finds the released 'A' messages
- c1.session.rollback()
-
- ## Q: ["A0","A2"]
-
- # C2 should be able to get the next A
- m5 = c2.fetch(0)
- assert m5.properties['THE-GROUP'] == 'A'
- assert m5.content['index'] == 0
-
- m6 = c2.fetch(0)
- assert m6.properties['THE-GROUP'] == 'A'
- assert m6.content['index'] == 2
-
- c2.session.acknowledge()
- c2.session.commit()
-
- def test_query(self):
- """ Verify the queue query method against message groups
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","C","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- m1 = c1.fetch(0)
- m2 = c2.fetch(0)
-
- # at this point, group A should be owned by C1, group B by C2, and
- # group C should be available
-
- # now setup a QMF session, so we can call methods
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- brokers = self.qmf_session.getObjects(_class="broker")
- assert len(brokers) == 1
- broker = brokers[0]
-
- # verify the query method call's group information
- rc = broker.query("queue", "msg-group-q")
- assert rc.status == 0
- assert rc.text == "OK"
- results = rc.outArgs['results']
- assert 'qpid.message_group_queue' in results
- q_info = results['qpid.message_group_queue']
- assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
- assert 'group_state' in q_info and len(q_info['group_state']) == 3
- for g_info in q_info['group_state']:
- assert 'group_id' in g_info
- if g_info['group_id'] == "A":
- assert g_info['msg_count'] == 3
- assert g_info['consumer'] != ""
- elif g_info['group_id'] == "B":
- assert g_info['msg_count'] == 2
- assert g_info['consumer'] != ""
- elif g_info['group_id'] == "C":
- assert g_info['msg_count'] == 2
- assert g_info['consumer'] == ""
- else:
- assert(False) # should never get here
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_purge_free(self):
- """ Verify we can purge a queue of all messages of a given "unowned"
- group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # now setup a QMF session, so we can call methods
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
- assert queue
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "B" }}
- assert queue.msgDepth == 6
- rc = queue.purge(0, msg_filter)
- assert rc.status == 0
- queue.update()
- assert queue.msgDepth == 4
-
- # verify all B's removed....
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'B'
- count += 1
- except Empty:
- pass
- assert count == 4
-
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_purge_acquired(self):
- """ Verify we can purge messages from an acquired group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # acquire group "A"
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # now setup a QMF session, so we can purge group A
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
- assert queue
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "A" }}
- assert queue.msgDepth == 6
- rc = queue.purge(0, msg_filter)
- assert rc.status == 0
- queue.update()
- queue.msgDepth == 4 # the pending acquired A still counts!
-
- # verify all other A's removed....
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'A'
- count += 1
- except Empty:
- pass
- assert count == 3 # only 3 really available
- s1.acknowledge() # ack the consumed A-0
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_purge_count(self):
- """ Verify we can purge a fixed number of messages from an acquired
- group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # acquire group "A"
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # now setup a QMF session, so we can purge group A
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
- assert queue
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "A" }}
- assert queue.msgDepth == 6
- rc = queue.purge(1, msg_filter)
- assert rc.status == 0
- queue.update()
- queue.msgDepth == 5 # the pending acquired A still counts!
-
- # verify all other A's removed....
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- a_count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- if m2.properties['THE-GROUP'] != 'A':
- count += 1
- else:
- a_count += 1
- except Empty:
- pass
- assert count == 3 # non-A's
- assert a_count == 1 # and one is an A
- s1.acknowledge() # ack the consumed A-0
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_move_all(self):
- """ Verify we can move messages from an acquired group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # set up destination queue
- rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- # acquire group "A"
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # now setup a QMF session, so we can move what's left of group A
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- brokers = self.qmf_session.getObjects(_class="broker")
- assert len(brokers) == 1
- broker = brokers[0]
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "A" }}
- rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter)
- assert rc.status == 0
-
- # verify all other A's removed from msg-group-q
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'A'
- count += 1
- except Empty:
- pass
- assert count == 3 # only 3 really available
-
- # verify the moved A's are at the dest-q
- s2 = self.setup_session()
- b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 2 or m2.content['index'] == 5
- count += 1
- except Empty:
- pass
- assert count == 2 # two A's moved
-
- s1.acknowledge() # ack the consumed A-0
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_move_count(self):
- """ Verify we can move a fixed number of messages from an acquired group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # set up destination queue
- rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- # now setup a QMF session, so we can move group B
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- brokers = self.qmf_session.getObjects(_class="broker")
- assert len(brokers) == 1
- broker = brokers[0]
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "B" }}
- rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter)
- assert rc.status == 0
-
- # verify all B's removed from msg-group-q
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'B'
- count += 1
- except Empty:
- pass
- assert count == 4
-
- # verify the moved B's are at the dest-q
- s2 = self.setup_session()
- b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 1 or m2.content['index'] == 3
- count += 1
- except Empty:
- pass
- assert count == 2
-
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_reroute(self):
- """ Verify we can reroute messages from an acquired group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # create a topic exchange for the reroute
- rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," +
- " node: {type: topic}}")
-
- # acquire group "A"
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # now setup a QMF session, so we can reroute group A
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
- assert queue
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "A" }}
- assert queue.msgDepth == 6
- rc = queue.reroute(0, False, "reroute-q", msg_filter)
- assert rc.status == 0
- queue.update()
- queue.msgDepth == 4 # the pending acquired A still counts!
-
- # verify all other A's removed....
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'A'
- count += 1
- except Empty:
- pass
- assert count == 3 # only 3 really available
-
- # and what of reroute-q?
- count = 0
- try:
- while True:
- m2 = rcvr.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 2 or m2.content['index'] == 5
- count += 1
- except Empty:
- pass
- assert count == 2
-
- s1.acknowledge() # ack the consumed A-0
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_queue_delete(self):
- """ Test deleting a queue while consumers are active.
- """
-
- ## Create a msg group queue
-
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- ## Queue = A-0, B-1, A-2, b-3, C-4
- ## Owners= ---, ---, ---, ---, ---
-
- # create consumers
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 should acquire A-0
- m1 = c1.fetch(0);
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # c2 acquires B-1
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 1
-
- # with group A and B owned, and C free, delete the
- # queue
- snd.close()
- self.ssn.close()
-
- def test_default_group_id(self):
- """ Verify the queue assigns the default group id should a message
- arrive without a group identifier.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- m = Message(content={}, properties={"NO-GROUP-HEADER":"HA-HA"})
- snd.send(m)
-
- # now setup a QMF session, so we can call methods
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- brokers = self.qmf_session.getObjects(_class="broker")
- assert len(brokers) == 1
- broker = brokers[0]
-
- # grab the group state off the queue, and verify the default group is
- # present ("qpid.no-group" is the broker default)
- rc = broker.query("queue", "msg-group-q")
- assert rc.status == 0
- assert rc.text == "OK"
- results = rc.outArgs['results']
- assert 'qpid.message_group_queue' in results
- q_info = results['qpid.message_group_queue']
- assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
- assert 'group_state' in q_info and len(q_info['group_state']) == 1
- g_info = q_info['group_state'][0]
- assert 'group_id' in g_info
- assert g_info['group_id'] == 'qpid.no-group'
-
- self.qmf_session.delBroker(self.qmf_broker)
-
-
-class StickyConsumerMsgGroupTests(Base):
- """
- Tests for the behavior of sticky-consumer message groups. These tests
- expect all messages from the same group be consumed by the same clients.
- See QPID-3347 for details.
- """
- pass # TBD
diff --git a/tests/src/py/qpid_tests/broker_0_10/priority.py b/tests/src/py/qpid_tests/broker_0_10/priority.py
index 6a60add97e..3651a1218b 100644
--- a/tests/src/py/qpid_tests/broker_0_10/priority.py
+++ b/tests/src/py/qpid_tests/broker_0_10/priority.py
@@ -33,13 +33,13 @@ class PriorityTests (Base):
def setup_session(self):
return self.conn.session()
- def prioritised_delivery(self, priorities, levels=10, key="x-qpid-priorities"):
+ def prioritised_delivery(self, priorities, levels=10):
"""
Test that message on a queue are delivered in priority order.
"""
msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
- snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s}}}}" % (key, levels),
+ snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}" % levels,
durable=self.durable())
for m in msgs: snd.send(m)
@@ -50,16 +50,16 @@ class PriorityTests (Base):
assert msg.content == expected.content
self.ssn.acknowledge(msg)
- def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10, level_key="x-qpid-priorities", fairshare_key="x-qpid-fairshare"):
+ def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10):
msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
- limit_policy = "'%s':%s" % (fairshare_key, default_limit)
+ limit_policy = "x-qpid-fairshare:%s" % default_limit
if limits:
for k, v in limits.items():
- limit_policy += ", '%s-%s':%s" % (fairshare_key, k, v)
+ limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v)
- snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s, %s}}}}"
- % (level_key, levels, limit_policy),
+ snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s, %s}}}}"
+ % (levels, limit_policy),
durable=self.durable())
for m in msgs: snd.send(m)
@@ -79,18 +79,12 @@ class PriorityTests (Base):
def test_prioritised_delivery_1(self):
self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10)
- def test_prioritised_delivery_with_alias(self):
- self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10, key="qpid.priorities")
-
def test_prioritised_delivery_2(self):
self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5)
def test_fairshare_1(self):
self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3])
- def test_fairshare_with_alias(self):
- self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,2,3], level_key="qpid.priorities", fairshare_key="qpid.fairshare")
-
def test_fairshare_2(self):
self.fairshare_delivery(priorities = [10 for i in range(30)])
diff --git a/tests/src/py/qpid_tests/broker_0_10/threshold.py b/tests/src/py/qpid_tests/broker_0_10/threshold.py
index 6628ae8424..bcd3c507e2 100644
--- a/tests/src/py/qpid_tests/broker_0_10/threshold.py
+++ b/tests/src/py/qpid_tests/broker_0_10/threshold.py
@@ -60,18 +60,3 @@ class ThresholdTests (Base):
def test_alert_size_alias(self):
self.do_threshold_test("x-qpid-maximum-message-size", 15, [Message("msg-%s" % i) for i in range(3)])
-
- def test_alert_on_alert_queue(self):
- rcv = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#; {link:{x-declare:{arguments:{'qpid.alert_count':1}}}}")
- rcvQMFv1 = self.ssn.receiver("qpid.management/console.event.#; {link:{x-declare:{arguments:{'qpid.alert_count':1}}}}")
- snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{'qpid.alert_count':1}}}}")
- snd.send(Message("my-message"))
- queues = []
- for i in range(2):
- event = rcv.fetch()
- schema = event.content[0]["_schema_id"]
- assert schema["_class_name"] == "queueThresholdExceeded"
- values = event.content[0]["_values"]
- queues.append(values["qName"])
- assert "ttq" in queues, "expected event for ttq (%s)" % (queues)
-