#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys import os import imp from qpid.testlib import TestBase010 from qpid.datatypes import Message from qpid.queue import Empty from time import sleep def import_script(path): """ Import executable script at path as a module. Requires some trickery as scripts are not in standard module format """ f = open(path) try: name=os.path.split(path)[1].replace("-","_") return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE)) finally: f.close() def checkenv(name): value = os.getenv(name) if not value: raise Exception("Environment variable %s is not set" % name) return value class CliTests(TestBase010): def remote_host(self): return self.defines.get("remote-host", "localhost") def remote_port(self): return int(self.defines["remote-port"]) def cli_dir(self): return self.defines["cli-dir"] def makeQueue(self, qname, arguments, api=False): if api: ret = self.qpid_config_api(" add queue " + qname + " " + arguments) else: ret = os.system(self.qpid_config_command(" add queue " + qname + " " + arguments)) self.assertEqual(ret, 0) queue = self.broker_access.getQueue(qname) if queue: return queue assert False def test_queue_params(self): self.startBrokerAccess() queue1 = self.makeQueue("test_queue_params1", "--limit-policy none") queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject") queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk") queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring") queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict") LIMIT = "qpid.policy_type" assert LIMIT not in queue1.arguments self.assertEqual(queue2.arguments[LIMIT], "reject") self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk") self.assertEqual(queue4.arguments[LIMIT], "ring") self.assertEqual(queue5.arguments[LIMIT], "ring_strict") queue6 = self.makeQueue("test_queue_params6", "--lvq-key lkey") LVQKEY = "qpid.last_value_queue_key" assert LVQKEY not in queue5.arguments assert LVQKEY in queue6.arguments assert queue6.arguments[LVQKEY] == "lkey" def test_queue_params_api(self): self.startBrokerAccess() queue1 = self.makeQueue("test_queue_params_api1", "--limit-policy none", True) queue2 = self.makeQueue("test_queue_params_api2", "--limit-policy reject", True) queue3 = self.makeQueue("test_queue_params_api3", "--limit-policy flow-to-disk", True) queue4 = self.makeQueue("test_queue_params_api4", "--limit-policy ring", True) queue5 = self.makeQueue("test_queue_params_api5", "--limit-policy ring-strict", True) LIMIT = "qpid.policy_type" assert LIMIT not in queue1.arguments self.assertEqual(queue2.arguments[LIMIT], "reject") self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk") self.assertEqual(queue4.arguments[LIMIT], "ring") self.assertEqual(queue5.arguments[LIMIT], "ring_strict") queue6 = self.makeQueue("test_queue_params_api6", "--lvq-key lkey") LVQKEY = "qpid.last_value_queue_key" assert LVQKEY not in queue5.arguments assert LVQKEY in queue6.arguments assert queue6.arguments[LVQKEY] == "lkey" def test_qpid_config(self): self.startBrokerAccess(); qname = "test_qpid_config" ret = os.system(self.qpid_config_command(" add queue " + qname)) self.assertEqual(ret, 0) queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: self.assertEqual(queue.durable, False) found = True self.assertEqual(found, True) ret = os.system(self.qpid_config_command(" del queue " + qname)) self.assertEqual(ret, 0) queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: found = True self.assertEqual(found, False) def test_qpid_config_api(self): self.startBrokerAccess(); qname = "test_qpid_config_api" ret = self.qpid_config_api(" add queue " + qname) self.assertEqual(ret, 0) queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: self.assertEqual(queue.durable, False) found = True self.assertEqual(found, True) ret = self.qpid_config_api(" del queue " + qname) self.assertEqual(ret, 0) queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: found = True self.assertEqual(found, False) def test_qpid_config_sasl_plain_expect_succeed(self): self.startBrokerAccess(); qname = "test_qpid_config_sasl_plain_expect_succeed" cmd = " --sasl-mechanism PLAIN -b guest/guest@localhost:"+str(self.broker.port) + " add queue " + qname ret = self.qpid_config_api(cmd) self.assertEqual(ret, 0) def test_qpid_config_sasl_plain_expect_fail(self): """Fails because no user name and password is supplied""" self.startBrokerAccess(); qname = "test_qpid_config_sasl_plain_expect_fail" cmd = " --sasl-mechanism PLAIN -b localhost:"+str(self.broker.port) + " add queue " + qname ret = self.qpid_config_api(cmd) assert ret != 0 # helpers for some of the test methods def helper_find_exchange(self, xchgname, typ, expected=True): xchgs = self.broker_access.getAllExchanges() found = False for xchg in xchgs: if xchg.name == xchgname: if typ: self.assertEqual(xchg.type, typ) found = True self.assertEqual(found, expected) def helper_create_exchange(self, xchgname, typ="direct", opts=""): foo = self.qpid_config_command(opts + " add exchange " + typ + " " + xchgname) # print foo ret = os.system(foo) self.assertEqual(ret, 0) self.helper_find_exchange(xchgname, typ, True) def helper_destroy_exchange(self, xchgname): foo = self.qpid_config_command(" del exchange " + xchgname) # print foo ret = os.system(foo) self.assertEqual(ret, 0) self.helper_find_exchange(xchgname, False, expected=False) def helper_find_queue(self, qname, expected=True): queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: self.assertEqual(queue.durable, False) found = True self.assertEqual(found, expected) def helper_create_queue(self, qname): foo = self.qpid_config_command(" add queue " + qname) # print foo ret = os.system(foo) self.assertEqual(ret, 0) self.helper_find_queue(qname, True) def helper_destroy_queue(self, qname): foo = self.qpid_config_command(" del queue " + qname) # print foo ret = os.system(foo) self.assertEqual(ret, 0) self.helper_find_queue(qname, False) # test the bind-queue-to-header-exchange functionality def test_qpid_config_headers(self): self.startBrokerAccess(); qname = "test_qpid_config" xchgname = "test_xchg" # first create a header xchg self.helper_create_exchange(xchgname, typ="headers") # create the queue self.helper_create_queue(qname) # now bind the queue to the xchg foo = self.qpid_config_command(" bind " + xchgname + " " + qname + " key all foo=bar baz=quux") # print foo ret = os.system(foo) self.assertEqual(ret, 0) # he likes it, mikey. Ok, now tear it all down. first the binding ret = os.system(self.qpid_config_command(" unbind " + xchgname + " " + qname + " key")) self.assertEqual(ret, 0) # then the queue self.helper_destroy_queue(qname) # then the exchange self.helper_destroy_exchange(xchgname) def test_qpid_config_xml(self): self.startBrokerAccess(); qname = "test_qpid_config" xchgname = "test_xchg" # first create a header xchg self.helper_create_exchange(xchgname, typ="xml") # create the queue self.helper_create_queue(qname) # now bind the queue to the xchg foo = self.qpid_config_command("-f test.xquery bind " + xchgname + " " + qname) # print foo ret = os.system(foo) self.assertEqual(ret, 0) # he likes it, mikey. Ok, now tear it all down. first the binding ret = os.system(self.qpid_config_command(" unbind " + xchgname + " " + qname + " key")) self.assertEqual(ret, 0) # then the queue self.helper_destroy_queue(qname) # then the exchange self.helper_destroy_exchange(xchgname) def test_qpid_config_durable(self): self.startBrokerAccess(); qname = "test_qpid_config" ret = os.system(self.qpid_config_command(" add queue --durable " + qname)) self.assertEqual(ret, 0) queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: self.assertEqual(queue.durable, True) found = True self.assertEqual(found, True) ret = os.system(self.qpid_config_command(" del queue " + qname)) self.assertEqual(ret, 0) queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: found = True self.assertEqual(found, False) def test_qpid_config_altex(self): self.startBrokerAccess(); exName = "testalt" qName = "testqalt" altName = "amq.direct" ret = os.system(self.qpid_config_command(" add exchange topic %s --alternate-exchange=%s" % (exName, altName))) self.assertEqual(ret, 0) exchanges = self.broker_access.getAllExchanges() found = False for exchange in exchanges: if exchange.name == altName: self.assertEqual(exchange.altExchange, None) if exchange.name == exName: found = True if not exchange.altExchange: self.fail("Alternate exchange not set") self.assertEqual(exchange.altExchange, altName) self.assertEqual(found, True) ret = os.system(self.qpid_config_command(" add queue %s --alternate-exchange=%s" % (qName, altName))) self.assertEqual(ret, 0) ret = os.system(self.qpid_config_command(" queues")) self.assertEqual(ret, 0) queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qName: found = True if not queue.altExchange: self.fail("Alternate exchange not set") self.assertEqual(queue.altExchange, altName) self.assertEqual(found, True) def test_qpid_config_list_queues_arguments(self): """ Test to verify that when the type of a policy limit is actually a string (though still a valid value), it does not upset qpid-config """ self.startBrokerAccess(); names = ["queue_capacity%s" % (i) for i in range(1, 6)] for name in names: self.session.queue_declare(queue=name, exclusive=True, arguments={'qpid.max_count' : str(i), 'qpid.max_size': '100'}) output = os.popen(self.qpid_config_command(" queues")).readlines() queues = [line.split()[0] for line in output[2:len(output)]] #ignore first two lines (header) for name in names: assert name in queues, "%s not in %s" % (name, queues) def test_qpid_route(self): self.startBrokerAccess(); command = self.cli_dir() + "/qpid-route dynamic add guest/guest@localhost:%d %s:%d amq.topic" %\ (self.broker.port, self.remote_host(), self.remote_port()) ret = os.system(command) self.assertEqual(ret, 0) links = self.broker_access.getAllLinks() found = False for link in links: if link.port == self.remote_port(): found = True self.assertEqual(found, True) def test_qpid_route_api(self): self.startBrokerAccess(); ret = self.qpid_route_api("dynamic add " + "guest/guest@localhost:"+str(self.broker.port) + " " + str(self.remote_host())+":"+str(self.remote_port()) + " " +"amq.direct") self.assertEqual(ret, 0) links = self.broker_access.getAllLinks() found = False for link in links: if link.port == self.remote_port(): found = True self.assertEqual(found, True) def test_qpid_route_api(self): self.startBrokerAccess(); ret = self.qpid_route_api("dynamic add " + " --client-sasl-mechanism PLAIN " + "guest/guest@localhost:"+str(self.broker.port) + " " + str(self.remote_host())+":"+str(self.remote_port()) + " " +"amq.direct") self.assertEqual(ret, 0) links = self.broker_access.getAllLinks() found = False for link in links: if link.port == self.remote_port(): found = True self.assertEqual(found, True) def test_qpid_route_api_expect_fail(self): self.startBrokerAccess(); ret = self.qpid_route_api("dynamic add " + " --client-sasl-mechanism PLAIN " + "localhost:"+str(self.broker.port) + " " + str(self.remote_host())+":"+str(self.remote_port()) + " " +"amq.direct") assert ret != 0 def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) return None def getAppHeader(self, msg, name): headers = self.getProperty(msg, "application_headers") if headers: return headers[name] return None def qpid_config_command(self, arg = ""): return self.cli_dir() + "/qpid-config -b localhost:%d" % self.broker.port + " " + arg def qpid_config_api(self, arg = ""): script = import_script(checkenv("QPID_CONFIG_EXEC")) broker = ["-b", "localhost:"+str(self.broker.port)] return script.main(broker + arg.split()) def qpid_route_api(self, arg = ""): script = import_script(checkenv("QPID_ROUTE_EXEC")) return script.main(arg.split())