summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rwxr-xr-xtests/setup.py2
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/management.py43
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/msg_groups.py72
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/new_api.py5
4 files changed, 113 insertions, 9 deletions
diff --git a/tests/setup.py b/tests/setup.py
index b8e952998d..36e7a0531a 100755
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -20,7 +20,7 @@
from distutils.core import setup
setup(name="qpid-tests",
- version="0.15",
+ version="0.17",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9",
diff --git a/tests/src/py/qpid_tests/broker_0_10/management.py b/tests/src/py/qpid_tests/broker_0_10/management.py
index 867210b11d..2dd2291b2e 100644
--- a/tests/src/py/qpid_tests/broker_0_10/management.py
+++ b/tests/src/py/qpid_tests/broker_0_10/management.py
@@ -24,8 +24,17 @@ from threading import Condition
from time import sleep
import qmf.console
import qpid.messaging
+from qpidtoollibs import BrokerAgent
class ManagementTest (TestBase010):
+
+ def setup_access(self):
+ if 'broker_agent' not in self.__dict__:
+ self.conn2 = qpid.messaging.Connection(self.broker)
+ self.conn2.open()
+ self.broker_agent = BrokerAgent(self.conn2)
+ return self.broker_agent
+
"""
Tests for the management hooks
"""
@@ -376,6 +385,30 @@ class ManagementTest (TestBase010):
# Cleanup
for e in ["A", "B"]: session.exchange_delete(exchange=e)
+ def test_reroute_invalid_alt_exchange(self):
+ """
+ Test that an error is returned for an attempt to reroute to
+ alternate exchange on a queue for which no such exchange has
+ been defined.
+ """
+ self.startQmf()
+ session = self.session
+ # create queue with no alt-exchange, and send a message to it
+ session.queue_declare(queue="q", exclusive=True, auto_delete=True)
+ props = session.delivery_properties(routing_key="q")
+ session.message_transfer(message=Message(props, "don't reroute me!"))
+
+ # attempt to reroute the message to alt-exchange
+ q = self.qmf.getObjects(_class="queue", name="q")[0]
+ result = q.reroute(1, True, "", {})
+ # verify the attempt fails...
+ self.assertEqual(result.status, 4) #invalid parameter
+
+ # ...and message is still on the queue
+ self.subscribe(destination="d", queue="q")
+ self.assertEqual("don't reroute me!", session.incoming("d").get(timeout=1).body)
+
+
def test_methods_async (self):
"""
"""
@@ -559,12 +592,18 @@ class ManagementTest (TestBase010):
"""
Test message in/out stats for connection
"""
- self.startQmf()
+ agent = self.setup_access()
conn = self.connect()
session = conn.session("stats-session")
#using qmf find named session and the corresponding connection:
- conn_qmf = self.qmf.getObjects(_class="session", name="stats-session")[0]._connectionRef_
+ conn_qmf = None
+ sessions = agent.getAllSessions()
+ for s in sessions:
+ if s.name == "stats-session":
+ conn_qmf = agent.getConnection(s.connectionRef)
+
+ assert(conn_qmf)
#send a message to a queue
session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True)
diff --git a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
index 938d3b3ee2..ace7611a2f 100644
--- a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -193,6 +193,10 @@ class MultiConsumerMsgGroupTests(Base):
s2 = self.setup_session()
b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
+ m2 = b1.fetch(0);
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 0
+
# C1 should acquire A-0
m1 = c1.fetch(0);
@@ -202,10 +206,6 @@ class MultiConsumerMsgGroupTests(Base):
## Queue = A-0, B-1, A-2, b-3, C-4
## Owners= ^C1, ---, +C1, ---, ---
- m2 = b1.fetch(0);
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 0
-
m2 = b1.fetch(0)
assert m2.properties['THE-GROUP'] == 'B'
assert m2.content['index'] == 1
@@ -1122,6 +1122,70 @@ class MultiConsumerMsgGroupTests(Base):
snd.close()
+ def test_ttl_expire(self):
+ """ Verify that expired (TTL) group messages are skipped correctly
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","C","A","B","C"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ if m.properties['THE-GROUP'] == 'B':
+ m.ttl = 1;
+ snd.send(m)
+
+ sleep(2) # let all B's expire
+
+ # create consumers on separate sessions: C1,C2
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
+
+ # C1 should acquire A-0, then C2 should acquire C-2, Group B should
+ # expire and never be fetched
+
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ m2 = c2.fetch(0);
+ assert m2.properties['THE-GROUP'] == 'C'
+ assert m2.content['index'] == 2
+
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 3
+
+ m2 = c2.fetch(0);
+ assert m2.properties['THE-GROUP'] == 'C'
+ assert m2.content['index'] == 5
+
+ # there should be no more left for either consumer
+ try:
+ mx = c1.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+ try:
+ mx = c2.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+
+ c1.session.acknowledge()
+ c2.session.acknowledge()
+ c1.close()
+ c2.close()
+ snd.close()
+
+
class StickyConsumerMsgGroupTests(Base):
"""
Tests for the behavior of sticky-consumer message groups. These tests
diff --git a/tests/src/py/qpid_tests/broker_0_10/new_api.py b/tests/src/py/qpid_tests/broker_0_10/new_api.py
index 0bbcf342bc..05c4815e57 100644
--- a/tests/src/py/qpid_tests/broker_0_10/new_api.py
+++ b/tests/src/py/qpid_tests/broker_0_10/new_api.py
@@ -57,7 +57,7 @@ class GeneralTests(Base):
sess2 = self.setup_session()
tx = sess1.sender("amq.direct/key")
- rx_main = sess1.receiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}")
+ rx_main = sess1.receiver("amq.direct/key;{link:{reliability:at-least-once,x-declare:{alternate-exchange:'amq.fanout'}}}")
rx_alt = sess2.receiver("amq.fanout")
rx_alt.capacity = 10
@@ -74,7 +74,7 @@ class GeneralTests(Base):
self.assertEqual(rx_alt.available(), 0, "No messages should have been routed to the alt_exchange")
sess1.close()
-
+ sleep(1)
self.assertEqual(rx_alt.available(), 5, "All 5 messages should have been routed to the alt_exchange")
sess2.close()
@@ -108,6 +108,7 @@ class GeneralTests(Base):
# Close sess1; This will cause the queue to be deleted
sess1.close()
+ sleep(1)
self.assertEqual(rx_alt.available(), 2, "2 of the messages should have been routed to the alt_exchange")
# Close sess2; This will cause the acquired messages to be requeued and routed to the alternate