diff options
Diffstat (limited to 'RC9/qpid/python/tests_0-10/management.py')
-rw-r--r-- | RC9/qpid/python/tests_0-10/management.py | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/RC9/qpid/python/tests_0-10/management.py b/RC9/qpid/python/tests_0-10/management.py new file mode 100644 index 0000000000..0632d85da4 --- /dev/null +++ b/RC9/qpid/python/tests_0-10/management.py @@ -0,0 +1,240 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.datatypes import Message, RangedSet +from qpid.testlib import TestBase010 +from qpid.management import managementChannel, managementClient + +class ManagementTest (TestBase010): + """ + Tests for the management hooks + """ + + def test_broker_connectivity_oldAPI (self): + """ + Call the "echo" method on the broker to verify it is alive and talking. + """ + session = self.session + + mc = managementClient (session.spec) + mch = mc.addChannel (session) + + mc.syncWaitForStable (mch) + brokers = mc.syncGetObjects (mch, "broker") + self.assertEqual (len (brokers), 1) + broker = brokers[0] + args = {} + body = "Echo Message Body" + args["body"] = body + + for seq in range (1, 5): + args["sequence"] = seq + res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args) + self.assertEqual (res.status, 0) + self.assertEqual (res.statusText, "OK") + self.assertEqual (res.sequence, seq) + self.assertEqual (res.body, body) + mc.removeChannel (mch) + + def test_broker_connectivity (self): + """ + Call the "echo" method on the broker to verify it is alive and talking. + """ + session = self.session + self.startQmf() + + brokers = self.qmf.getObjects(_class="broker") + self.assertEqual (len(brokers), 1) + broker = brokers[0] + + body = "Echo Message Body" + for seq in range (1, 10): + res = broker.echo(seq, body) + self.assertEqual (res.status, 0) + self.assertEqual (res.text, "OK") + self.assertEqual (res.sequence, seq) + self.assertEqual (res.body, body) + + def test_get_objects(self): + self.startQmf() + + # get the package list, verify that the qpid broker package is there + packages = self.qmf.getPackages() + assert 'org.apache.qpid.broker' in packages + + # get the schema class keys for the broker, verify the broker table and link-down event + keys = self.qmf.getClasses('org.apache.qpid.broker') + broker = None + linkDown = None + for key in keys: + if key.getClassName() == "broker": broker = key + if key.getClassName() == "brokerLinkDown" : linkDown = key + assert broker + assert linkDown + + brokerObjs = self.qmf.getObjects(_class="broker") + assert len(brokerObjs) == 1 + brokerObjs = self.qmf.getObjects(_key=broker) + assert len(brokerObjs) == 1 + + def test_self_session_id (self): + self.startQmf() + sessionId = self.qmf_broker.getSessionId() + brokerSessions = self.qmf.getObjects(_class="session") + + found = False + for bs in brokerSessions: + if bs.name == sessionId: + found = True + self.assertEqual (found, True) + + def test_standard_exchanges (self): + self.startQmf() + + exchanges = self.qmf.getObjects(_class="exchange") + exchange = self.findExchange (exchanges, "") + self.assertEqual (exchange.type, "direct") + exchange = self.findExchange (exchanges, "amq.direct") + self.assertEqual (exchange.type, "direct") + exchange = self.findExchange (exchanges, "amq.topic") + self.assertEqual (exchange.type, "topic") + exchange = self.findExchange (exchanges, "amq.fanout") + self.assertEqual (exchange.type, "fanout") + exchange = self.findExchange (exchanges, "amq.match") + self.assertEqual (exchange.type, "headers") + exchange = self.findExchange (exchanges, "qpid.management") + self.assertEqual (exchange.type, "topic") + + def findExchange (self, exchanges, name): + for exchange in exchanges: + if exchange.name == name: + return exchange + return None + + def test_move_queued_messages(self): + """ + Test ability to move messages from the head of one queue to another. + Need to test moveing all and N messages. + """ + self.startQmf() + session = self.session + "Set up source queue" + session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key") + + twenty = range(1,21) + props = session.delivery_properties(routing_key="routing_key") + for count in twenty: + body = "Move Message %d" % count + src_msg = Message(props, body) + session.message_transfer(destination="amq.direct", message=src_msg) + + "Set up destination queue" + session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="dest-queue", exchange="amq.direct") + + queues = self.qmf.getObjects(_class="queue") + + "Move 10 messages from src-queue to dest-queue" + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + self.assertEqual (result.status, 0) + + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] + + self.assertEqual (sq.msgDepth,10) + self.assertEqual (dq.msgDepth,10) + + "Move all remaining messages to destination" + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + self.assertEqual (result.status,0) + + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] + + self.assertEqual (sq.msgDepth,0) + self.assertEqual (dq.msgDepth,20) + + "Use a bad source queue name" + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + self.assertEqual (result.status,4) + + "Use a bad destination queue name" + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + self.assertEqual (result.status,4) + + " Use a large qty (40) to move from dest-queue back to " + " src-queue- should move all " + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + self.assertEqual (result.status,0) + + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] + + self.assertEqual (sq.msgDepth,20) + self.assertEqual (dq.msgDepth,0) + + "Consume the messages of the queue and check they are all there in order" + session.message_subscribe(queue="src-queue", destination="tag") + session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF) + queue = session.incoming("tag") + for count in twenty: + consumed_msg = queue.get(timeout=1) + body = "Move Message %d" % count + self.assertEqual(body, consumed_msg.body) + + def test_purge_queue(self): + """ + Test ability to purge messages from the head of a queue. + Need to test moveing all, 1 (top message) and N messages. + """ + self.startQmf() + session = self.session + "Set up purge queue" + session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key") + + twenty = range(1,21) + props = session.delivery_properties(routing_key="routing_key") + for count in twenty: + body = "Purge Message %d" % count + msg = Message(props, body) + session.message_transfer(destination="amq.direct", message=msg) + + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] + + "Purge top message from purge-queue" + result = pq.purge(1) + self.assertEqual (result.status, 0) + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] + self.assertEqual (pq.msgDepth,19) + + "Purge top 9 messages from purge-queue" + result = pq.purge(9) + self.assertEqual (result.status, 0) + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] + self.assertEqual (pq.msgDepth,10) + + "Purge all messages from purge-queue" + result = pq.purge(0) + self.assertEqual (result.status, 0) + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] + self.assertEqual (pq.msgDepth,0) + |