summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-11-12 13:42:50 +0000
committerGordon Sim <gsim@apache.org>2013-11-12 13:42:50 +0000
commit785bfe3e9a9e6afe5494e48d02be2665dc599bb8 (patch)
treee42e33549654a7f633d9c38d2f7cbdbba90287a0 /qpid/cpp/src/tests
parentb955a41e69f69a1ada69c780d9fb7260c0bfc3f2 (diff)
downloadqpid-python-785bfe3e9a9e6afe5494e48d02be2665dc599bb8.tar.gz
QPID-5251: allow policies to be specified that will create topics or queues on demand if they match the specified pattern
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541059 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/policies.py209
-rwxr-xr-xqpid/cpp/src/tests/swig_python_tests2
2 files changed, 210 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/policies.py b/qpid/cpp/src/tests/policies.py
new file mode 100644
index 0000000000..ec0191f91e
--- /dev/null
+++ b/qpid/cpp/src/tests/policies.py
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.tests.messaging.implementation import *
+from qpid.tests.messaging import VersionTest
+
+class Mgmt:
+ """
+ Simple QMF management utility (qpidtoollibs uses
+ qpid.messaging.Message rather than swigged version)
+ """
+ def __init__(self, conn):
+ self.conn = conn
+ self.sess = self.conn.session()
+ self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
+ str(uuid4())
+ self.reply_rx = self.sess.receiver(self.reply_to)
+ self.reply_rx.capacity = 10
+ self.tx = self.sess.sender("qmf.default.direct/broker")
+ self.next_correlator = 1
+
+ def list(self, class_name):
+ props = {'method' : 'request',
+ 'qmf.opcode' : '_query_request',
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+
+ content = {'_what' : 'OBJECT',
+ '_schema_id' : {'_class_name' : class_name.lower()}}
+
+ message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+
+
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item['_values'])
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ self.sess.acknowledge()
+ return items
+
+ def do_qmf_method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10):
+ props = {'method' : 'request',
+ 'qmf.opcode' : '_method_request',
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+
+ content = {'_object_id' : {'_object_name' : addr},
+ '_method_name' : method,
+ '_arguments' : arguments}
+
+ message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ response = self.reply_rx.fetch(timeout)
+ self.sess.acknowledge()
+ if response.properties['qmf.opcode'] == '_exception':
+ raise Exception("Exception from Agent: %r" % response.content['_values'])
+ if response.properties['qmf.opcode'] != '_method_response':
+ raise Exception("bad response: %r" % response.properties)
+ return response.content['_arguments']
+
+ def create(self, _type, name, properties={}):
+ return self.do_qmf_method('create', {'type': _type, 'name': name, 'properties': properties})
+
+ def delete(self, _type, name):
+ return self.do_qmf_method('delete', {'type': _type, 'name': name})
+
+
+class PoliciesTests (VersionTest):
+ """
+ Tests for node policies with qpidd
+ """
+
+ def do_simple_queue_test(self, pattern, name, properties={}, autodeleted=True):
+ mgmt = self.create_connection("amqp0-10", True)
+ agent = Mgmt(mgmt)
+ agent.create('QueuePolicy', pattern, properties)
+ try:
+ snd = self.ssn.sender(name)
+ msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']]
+ for m in msgs: snd.send(m)
+ snd.close()
+
+ for expected in msgs:
+ rcv = self.ssn.receiver(name)
+ msg = rcv.fetch(0)
+ assert msg.content == expected.content, (msg.content, expected.content)
+ self.ssn.acknowledge()
+ rcv.close() #close after each message to ensure queue isn't deleted with messages in it
+ self.ssn.close()
+ self.conn.close()
+
+ matched = [q for q in agent.list("Queue") if q['name'] == name]
+ if autodeleted:
+ # ensure that queue is no longer there (as empty and unused)
+ assert len(matched) == 0, (matched)
+ else:
+ # ensure that queue is still there though empty and unused
+ assert len(matched) == 1, (matched)
+ finally:
+ agent.delete('QueuePolicy', pattern)
+ mgmt.close()
+
+ def test_queue(self):
+ self.do_simple_queue_test("queue-*", "queue-1")
+
+ def test_queue_not_autodeleted(self):
+ self.do_simple_queue_test("permanent-queue-*", "permanent-queue-1", {'auto-delete':False}, False)
+
+ def test_queue_manual_delete(self):
+ self.do_simple_queue_test("permanent-queue-*", "permanent-queue-1", {'qpid.lifetime-policy':'manual'}, False)
+
+ def test_queue_delete_if_unused_and_empty(self):
+ self.do_simple_queue_test("queue-*", "queue-1", {'qpid.lifetime-policy':'delete-if-unused-and-empty'}, True)
+
+ def do_simple_topic_test(self, pattern, name, properties={}, autodeleted=True):
+ mgmt = self.create_connection("amqp0-10", True)
+ agent = Mgmt(mgmt)
+ agent.create('TopicPolicy', pattern, properties)
+ try:
+ snd = self.ssn.sender(name)
+ rcv1 = self.ssn.receiver(name)
+ rcv2 = self.ssn.receiver(name)
+
+ msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']]
+ for m in msgs: snd.send(m)
+
+ for rcv in [rcv1, rcv2]:
+ for expected in msgs:
+ msg = rcv.fetch(0)
+ assert msg.content == expected.content, (msg.content, expected.content)
+ self.ssn.acknowledge()
+ rcv1.close()
+ rcv2.close()
+ snd.close()
+
+ matched = [e for e in agent.list("Exchange") if e['name'] == name]
+ if autodeleted:
+ # ensure that exchange is no longer there (as it is now unused)
+ assert len(matched) == 0, (matched)
+ else:
+ # ensure that exchange has not been autodeleted in spite of being unused
+ assert len(matched) == 1, (matched)
+ finally:
+ agent.delete('TopicPolicy', pattern)
+ mgmt.close()
+
+ def test_topic(self):
+ self.do_simple_topic_test('fanout-*', 'fanout-1', {'exchange-type':'fanout'})
+
+ def test_topic_not_autodelete(self):
+ self.do_simple_topic_test('permanent-fanout-*', 'permanent-fanout-1', {'exchange-type':'fanout', 'auto-delete':False}, False)
+
+ def test_topic_manual_delete(self):
+ self.do_simple_topic_test('permanent-fanout-*', 'permanent-fanout-1', {'exchange-type':'fanout', 'qpid.lifetime-policy':'manual'}, False)
+
+ def test_topic_delete_if_unused(self):
+ self.do_simple_topic_test('fanout-*', 'fanout-1', {'exchange-type':'fanout', 'qpid.lifetime-policy':'delete-if-unused'}, True)
+
+ def test_mgmt(self):
+ mgmt = self.create_connection("amqp0-10", True)
+ agent = Mgmt(mgmt)
+ agent.create('QueuePolicy', 'queue-*')
+ agent.create('QueuePolicy', 'alt.queue.*')
+ agent.create('TopicPolicy', 'topic-*')
+ try:
+ queues = [q['name'] for q in agent.list("QueuePolicy")]
+ topics = [t['name'] for t in agent.list("TopicPolicy")]
+ assert 'queue-*' in queues, (queues)
+ assert 'alt.queue.*' in queues, (queues)
+
+ try:
+ agent.delete('TopicPolicy', 'queue-*')
+ assert False, ('Deletion of policy using wrong type should fail')
+ except: None
+
+ finally:
+ agent.delete('QueuePolicy', 'queue-*')
+ agent.delete('QueuePolicy', 'alt.queue.*')
+ agent.delete('TopicPolicy', 'topic-*')
+ mgmt.close()
diff --git a/qpid/cpp/src/tests/swig_python_tests b/qpid/cpp/src/tests/swig_python_tests
index dd793373d2..2eb8ce67b3 100755
--- a/qpid/cpp/src/tests/swig_python_tests
+++ b/qpid/cpp/src/tests/swig_python_tests
@@ -53,7 +53,7 @@ export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG
export QPID_USE_SWIG_CLIENT=1
$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1
if [[ -a $AMQP_LIB ]] ; then
- $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
+ $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
fi
stop_broker
if [[ $FAILED -eq 1 ]]; then