diff options
| author | Gordon Sim <gsim@apache.org> | 2013-11-12 13:42:50 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-11-12 13:42:50 +0000 |
| commit | 785bfe3e9a9e6afe5494e48d02be2665dc599bb8 (patch) | |
| tree | e42e33549654a7f633d9c38d2f7cbdbba90287a0 /qpid/cpp/src/tests | |
| parent | b955a41e69f69a1ada69c780d9fb7260c0bfc3f2 (diff) | |
| download | qpid-python-785bfe3e9a9e6afe5494e48d02be2665dc599bb8.tar.gz | |
QPID-5251: allow policies to be specified that will create topics or queues on demand if they match the specified pattern
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541059 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/policies.py | 209 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/swig_python_tests | 2 |
2 files changed, 210 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/policies.py b/qpid/cpp/src/tests/policies.py new file mode 100644 index 0000000000..ec0191f91e --- /dev/null +++ b/qpid/cpp/src/tests/policies.py @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.tests.messaging.implementation import * +from qpid.tests.messaging import VersionTest + +class Mgmt: + """ + Simple QMF management utility (qpidtoollibs uses + qpid.messaging.Message rather than swigged version) + """ + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def list(self, class_name): + props = {'method' : 'request', + 'qmf.opcode' : '_query_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name.lower()}} + + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + + + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + return items + + def do_qmf_method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(timeout) + self.sess.acknowledge() + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def create(self, _type, name, properties={}): + return self.do_qmf_method('create', {'type': _type, 'name': name, 'properties': properties}) + + def delete(self, _type, name): + return self.do_qmf_method('delete', {'type': _type, 'name': name}) + + +class PoliciesTests (VersionTest): + """ + Tests for node policies with qpidd + """ + + def do_simple_queue_test(self, pattern, name, properties={}, autodeleted=True): + mgmt = self.create_connection("amqp0-10", True) + agent = Mgmt(mgmt) + agent.create('QueuePolicy', pattern, properties) + try: + snd = self.ssn.sender(name) + msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']] + for m in msgs: snd.send(m) + snd.close() + + for expected in msgs: + rcv = self.ssn.receiver(name) + msg = rcv.fetch(0) + assert msg.content == expected.content, (msg.content, expected.content) + self.ssn.acknowledge() + rcv.close() #close after each message to ensure queue isn't deleted with messages in it + self.ssn.close() + self.conn.close() + + matched = [q for q in agent.list("Queue") if q['name'] == name] + if autodeleted: + # ensure that queue is no longer there (as empty and unused) + assert len(matched) == 0, (matched) + else: + # ensure that queue is still there though empty and unused + assert len(matched) == 1, (matched) + finally: + agent.delete('QueuePolicy', pattern) + mgmt.close() + + def test_queue(self): + self.do_simple_queue_test("queue-*", "queue-1") + + def test_queue_not_autodeleted(self): + self.do_simple_queue_test("permanent-queue-*", "permanent-queue-1", {'auto-delete':False}, False) + + def test_queue_manual_delete(self): + self.do_simple_queue_test("permanent-queue-*", "permanent-queue-1", {'qpid.lifetime-policy':'manual'}, False) + + def test_queue_delete_if_unused_and_empty(self): + self.do_simple_queue_test("queue-*", "queue-1", {'qpid.lifetime-policy':'delete-if-unused-and-empty'}, True) + + def do_simple_topic_test(self, pattern, name, properties={}, autodeleted=True): + mgmt = self.create_connection("amqp0-10", True) + agent = Mgmt(mgmt) + agent.create('TopicPolicy', pattern, properties) + try: + snd = self.ssn.sender(name) + rcv1 = self.ssn.receiver(name) + rcv2 = self.ssn.receiver(name) + + msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']] + for m in msgs: snd.send(m) + + for rcv in [rcv1, rcv2]: + for expected in msgs: + msg = rcv.fetch(0) + assert msg.content == expected.content, (msg.content, expected.content) + self.ssn.acknowledge() + rcv1.close() + rcv2.close() + snd.close() + + matched = [e for e in agent.list("Exchange") if e['name'] == name] + if autodeleted: + # ensure that exchange is no longer there (as it is now unused) + assert len(matched) == 0, (matched) + else: + # ensure that exchange has not been autodeleted in spite of being unused + assert len(matched) == 1, (matched) + finally: + agent.delete('TopicPolicy', pattern) + mgmt.close() + + def test_topic(self): + self.do_simple_topic_test('fanout-*', 'fanout-1', {'exchange-type':'fanout'}) + + def test_topic_not_autodelete(self): + self.do_simple_topic_test('permanent-fanout-*', 'permanent-fanout-1', {'exchange-type':'fanout', 'auto-delete':False}, False) + + def test_topic_manual_delete(self): + self.do_simple_topic_test('permanent-fanout-*', 'permanent-fanout-1', {'exchange-type':'fanout', 'qpid.lifetime-policy':'manual'}, False) + + def test_topic_delete_if_unused(self): + self.do_simple_topic_test('fanout-*', 'fanout-1', {'exchange-type':'fanout', 'qpid.lifetime-policy':'delete-if-unused'}, True) + + def test_mgmt(self): + mgmt = self.create_connection("amqp0-10", True) + agent = Mgmt(mgmt) + agent.create('QueuePolicy', 'queue-*') + agent.create('QueuePolicy', 'alt.queue.*') + agent.create('TopicPolicy', 'topic-*') + try: + queues = [q['name'] for q in agent.list("QueuePolicy")] + topics = [t['name'] for t in agent.list("TopicPolicy")] + assert 'queue-*' in queues, (queues) + assert 'alt.queue.*' in queues, (queues) + + try: + agent.delete('TopicPolicy', 'queue-*') + assert False, ('Deletion of policy using wrong type should fail') + except: None + + finally: + agent.delete('QueuePolicy', 'queue-*') + agent.delete('QueuePolicy', 'alt.queue.*') + agent.delete('TopicPolicy', 'topic-*') + mgmt.close() diff --git a/qpid/cpp/src/tests/swig_python_tests b/qpid/cpp/src/tests/swig_python_tests index dd793373d2..2eb8ce67b3 100755 --- a/qpid/cpp/src/tests/swig_python_tests +++ b/qpid/cpp/src/tests/swig_python_tests @@ -53,7 +53,7 @@ export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG export QPID_USE_SWIG_CLIENT=1 $QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1 if [[ -a $AMQP_LIB ]] ; then - $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 + $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 fi stop_broker if [[ $FAILED -eq 1 ]]; then |
