From 248f1fe188fe2307b9dcf2c87a83b653eaa1920c Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Sat, 26 Dec 2009 12:42:57 +0000 Subject: synchronized with trunk except for ruby dir git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68 --- python/LICENSE.txt | 0 python/Makefile | 98 ++ python/README.txt | 58 +- python/RELEASE_NOTES | 32 +- python/amqp-doc | 80 - python/commands/qpid-cluster | 328 ++++ python/commands/qpid-config | 474 +++--- python/commands/qpid-printevents | 74 + python/commands/qpid-queue-stats | 203 ++- python/commands/qpid-route | 593 ++++--- python/commands/qpid-stat | 460 ++++++ python/commands/qpid-tool | 6 +- python/cpp_failing_0-10.txt | 0 python/cpp_failing_0-8.txt | 0 python/cpp_failing_0-9.txt | 4 - python/doc/test-requirements.txt | 19 + python/examples/README | 319 ++++ python/examples/api/drain | 62 + python/examples/api/server | 87 ++ python/examples/api/spout | 103 ++ python/examples/datatypes/client.py | 122 ++ python/examples/datatypes/server.py | 124 ++ python/examples/datatypes/testdata.py | 180 +++ python/examples/direct/declare_queues.py | 20 +- python/examples/direct/direct_consumer.py | 20 +- python/examples/direct/direct_producer.py | 20 +- python/examples/direct/listener.py | 20 +- python/examples/direct/verify | 19 + python/examples/fanout/fanout_consumer.py | 20 +- python/examples/fanout/fanout_producer.py | 20 +- python/examples/fanout/listener.py | 20 +- python/examples/fanout/verify | 19 + python/examples/headers/declare_queues.py | 77 + python/examples/headers/headers_consumer.py | 107 ++ python/examples/headers/headers_producer.py | 79 + python/examples/headers/verify | 22 + python/examples/headers/verify.in | 25 + python/examples/pubsub/topic_publisher.py | 18 + python/examples/pubsub/topic_subscriber.py | 20 +- python/examples/pubsub/verify | 19 + python/examples/request-response/client.py | 20 +- python/examples/request-response/server.py | 20 +- python/examples/request-response/verify | 19 + python/examples/xml-exchange/declare_queues.py | 20 +- python/examples/xml-exchange/listener.py | 20 +- python/examples/xml-exchange/verify | 19 + python/examples/xml-exchange/xml_consumer.py | 20 +- python/examples/xml-exchange/xml_producer.py | 20 +- python/hello-world | 31 +- python/java_failing_0-8.txt | 2 - python/java_failing_0-9.txt | 18 - python/mllib/__init__.py | 30 +- python/mllib/dom.py | 15 + python/models/fedsim/__init__.py | 19 + python/models/fedsim/fedsim.py | 434 ++++++ python/models/fedsim/testBig.py | 88 ++ python/models/fedsim/testRing.py | 48 + python/models/fedsim/testStar.py | 65 + python/models/fedsim/testStarAdd.py | 56 + python/pal2py | 274 ---- python/perftest | 95 -- python/preppy | 67 + python/qmf/__init__.py | 18 + python/qmf/console.py | 1970 ++++++++++++++++++++++++ python/qpid-python-test | 575 +++++++ python/qpid/address.py | 161 ++ python/qpid/assembler.py | 118 -- python/qpid/brokertest.py | 480 ++++++ python/qpid/client.py | 7 +- python/qpid/codec010.py | 255 ++- python/qpid/compat.py | 94 ++ python/qpid/concurrency.py | 100 ++ python/qpid/connection.py | 96 +- python/qpid/connection08.py | 21 +- python/qpid/datatypes.py | 107 +- python/qpid/debug.py | 55 + python/qpid/delegates.py | 128 +- python/qpid/disp.py | 171 +- python/qpid/driver.py | 859 +++++++++++ python/qpid/exceptions.py | 1 + python/qpid/framer.py | 107 +- python/qpid/framing.py | 310 ++++ python/qpid/generator.py | 56 + python/qpid/harness.py | 20 + python/qpid/invoker.py | 48 - python/qpid/lexer.py | 112 ++ python/qpid/management.py | 300 ++-- python/qpid/managementdata.py | 170 +- python/qpid/message.py | 1 - python/qpid/messaging.py | 822 ++++++++++ python/qpid/mimetype.py | 106 ++ python/qpid/ops.py | 280 ++++ python/qpid/parser.py | 68 + python/qpid/peer.py | 12 +- python/qpid/queue.py | 4 +- python/qpid/selector.py | 139 ++ python/qpid/session.py | 227 +-- python/qpid/spec.py | 6 +- python/qpid/spec010.py | 691 --------- python/qpid/testlib.py | 300 +--- python/qpid/tests/__init__.py | 28 + python/qpid/tests/address.py | 199 +++ python/qpid/tests/framing.py | 289 ++++ python/qpid/tests/messaging.py | 929 +++++++++++ python/qpid/tests/mimetype.py | 56 + python/qpid/tests/parser.py | 37 + python/qpid/util.py | 70 +- python/qpid_config.py | 6 +- python/rule2test | 108 -- python/run-tests | 35 - python/server | 18 + python/server010 | 18 + python/setup.py | 4 +- python/tests/__init__.py | 10 +- python/tests/assembler.py | 77 - python/tests/codec.py | 14 +- python/tests/codec010.py | 79 +- python/tests/connection.py | 44 +- python/tests/datatypes.py | 95 +- python/tests/framer.py | 94 -- python/tests/spec.py | 56 - python/tests/spec010.py | 70 +- python/tests_0-10/__init__.py | 1 + python/tests_0-10/alternate_exchange.py | 68 +- python/tests_0-10/broker.py | 16 +- python/tests_0-10/dtx.py | 6 +- python/tests_0-10/example.py | 4 +- python/tests_0-10/exchange.py | 47 +- python/tests_0-10/management.py | 339 +++- python/tests_0-10/message.py | 171 +- python/tests_0-10/persistence.py | 5 +- python/tests_0-10/query.py | 16 +- python/tests_0-10/queue.py | 34 +- python/tests_0-10/tx.py | 10 +- python/tests_0-8/__init__.py | 2 + python/tests_0-8/basic.py | 7 +- python/tests_0-8/broker.py | 24 +- python/tests_0-8/example.py | 2 +- python/tests_0-8/queue.py | 2 +- python/tests_0-8/testlib.py | 2 +- python/tests_0-8/tx.py | 2 +- python/tests_0-9/__init__.py | 2 + python/tests_0-9/basic.py | 396 ----- python/tests_0-9/broker.py | 133 -- python/tests_0-9/dtx.py | 587 ------- python/tests_0-9/example.py | 94 -- python/tests_0-9/exchange.py | 327 ---- python/tests_0-9/execution.py | 29 - python/tests_0-9/message.py | 657 -------- python/tests_0-9/query.py | 2 +- python/tests_0-9/queue.py | 261 +--- python/tests_0-9/testlib.py | 66 - python/tests_0-9/tx.py | 188 --- python/todo.txt | 188 +++ 154 files changed, 14384 insertions(+), 6006 deletions(-) mode change 100755 => 100644 python/LICENSE.txt create mode 100644 python/Makefile delete mode 100755 python/amqp-doc create mode 100755 python/commands/qpid-cluster create mode 100755 python/commands/qpid-printevents create mode 100755 python/commands/qpid-stat delete mode 100644 python/cpp_failing_0-10.txt delete mode 100644 python/cpp_failing_0-8.txt delete mode 100644 python/cpp_failing_0-9.txt create mode 100644 python/examples/README create mode 100755 python/examples/api/drain create mode 100755 python/examples/api/server create mode 100755 python/examples/api/spout create mode 100755 python/examples/datatypes/client.py create mode 100755 python/examples/datatypes/server.py create mode 100644 python/examples/datatypes/testdata.py create mode 100755 python/examples/headers/declare_queues.py create mode 100755 python/examples/headers/headers_consumer.py create mode 100755 python/examples/headers/headers_producer.py create mode 100644 python/examples/headers/verify create mode 100644 python/examples/headers/verify.in delete mode 100644 python/java_failing_0-8.txt delete mode 100644 python/java_failing_0-9.txt create mode 100644 python/models/fedsim/__init__.py create mode 100644 python/models/fedsim/fedsim.py create mode 100644 python/models/fedsim/testBig.py create mode 100644 python/models/fedsim/testRing.py create mode 100644 python/models/fedsim/testStar.py create mode 100644 python/models/fedsim/testStarAdd.py delete mode 100755 python/pal2py delete mode 100755 python/perftest create mode 100755 python/preppy create mode 100644 python/qmf/__init__.py create mode 100644 python/qmf/console.py create mode 100755 python/qpid-python-test create mode 100644 python/qpid/address.py delete mode 100644 python/qpid/assembler.py create mode 100644 python/qpid/brokertest.py create mode 100644 python/qpid/concurrency.py create mode 100644 python/qpid/debug.py create mode 100644 python/qpid/driver.py create mode 100644 python/qpid/framing.py create mode 100644 python/qpid/generator.py create mode 100644 python/qpid/harness.py delete mode 100644 python/qpid/invoker.py create mode 100644 python/qpid/lexer.py create mode 100644 python/qpid/messaging.py create mode 100644 python/qpid/mimetype.py create mode 100644 python/qpid/ops.py create mode 100644 python/qpid/parser.py create mode 100644 python/qpid/selector.py delete mode 100644 python/qpid/spec010.py create mode 100644 python/qpid/tests/__init__.py create mode 100644 python/qpid/tests/address.py create mode 100644 python/qpid/tests/framing.py create mode 100644 python/qpid/tests/messaging.py create mode 100644 python/qpid/tests/mimetype.py create mode 100644 python/qpid/tests/parser.py delete mode 100755 python/rule2test delete mode 100755 python/run-tests delete mode 100644 python/tests/assembler.py delete mode 100644 python/tests/framer.py delete mode 100644 python/tests/spec.py delete mode 100644 python/tests_0-9/basic.py delete mode 100644 python/tests_0-9/broker.py delete mode 100644 python/tests_0-9/dtx.py delete mode 100644 python/tests_0-9/example.py delete mode 100644 python/tests_0-9/exchange.py delete mode 100644 python/tests_0-9/execution.py delete mode 100644 python/tests_0-9/message.py delete mode 100644 python/tests_0-9/testlib.py delete mode 100644 python/tests_0-9/tx.py create mode 100644 python/todo.txt (limited to 'python') diff --git a/python/LICENSE.txt b/python/LICENSE.txt old mode 100755 new mode 100644 diff --git a/python/Makefile b/python/Makefile new file mode 100644 index 0000000000..7f475adc09 --- /dev/null +++ b/python/Makefile @@ -0,0 +1,98 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +PREFIX=/usr/local +EXEC_PREFIX=$(PREFIX)/bin +DATA_DIR=$(PREFIX)/share + +PYTHON_LIB=$(shell python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(prefix='$(PREFIX)')") +PYTHON_VERSION=$(shell python -c "from distutils.sysconfig import get_python_version; print get_python_version()") + +ddfirst=$(shell ddir=$(DATA_DIR) && echo $${ddir:0:1}) +ifeq ($(ddfirst),/) +AMQP_SPEC_DIR=$(DATA_DIR)/amqp +else +AMQP_SPEC_DIR=$(PWD)/$(DATA_DIR)/amqp +endif + +DIRS=qmf qpid mllib models examples tests tests_0-8 tests_0-9 tests_0-10 +SRCS=$(shell find $(DIRS) -name "*.py") qpid_config.py +BUILD=build +TARGETS=$(SRCS:%.py=$(BUILD)/%.py) + +PYCC=python -O -c "import compileall; compileall.main()" + +all: build + +$(BUILD)/%.py: %.py + @mkdir -p $(shell dirname $@) + ./preppy $(PYTHON_VERSION) < $< > $@ + +build: $(TARGETS) + +.PHONY: doc + +doc: + @mkdir -p $(BUILD) + PYTHONPATH=. epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log + +install: build + install -d $(PYTHON_LIB) + + install -d $(PYTHON_LIB)/mllib + install -pm 0644 LICENSE.txt NOTICE.txt $(BUILD)/mllib/*.* $(PYTHON_LIB)/mllib + $(PYCC) $(PYTHON_LIB)/mllib + + install -d $(PYTHON_LIB)/qpid + install -pm 0644 LICENSE.txt NOTICE.txt README.txt $(BUILD)/qpid/*.* $(PYTHON_LIB)/qpid + TDIR=$(shell mktemp -d) && \ + sed s@AMQP_SPEC_DIR=.*@AMQP_SPEC_DIR='"$(AMQP_SPEC_DIR)"'@ \ + $(BUILD)/qpid_config.py > $${TDIR}/qpid_config.py && \ + install -pm 0644 $${TDIR}/qpid_config.py $(PYTHON_LIB) && \ + rm -rf $${TDIR} + + install -d $(PYTHON_LIB)/qpid/tests + install -pm 0644 $(BUILD)/qpid/tests/*.* $(PYTHON_LIB)/qpid/tests + $(PYCC) $(PYTHON_LIB)/qpid + + install -d $(PYTHON_LIB)/qmf + install -pm 0644 LICENSE.txt NOTICE.txt qmf/*.* $(PYTHON_LIB)/qmf + $(PYCC) $(PYTHON_LIB)/qmf + + install -d $(PYTHON_LIB)/tests + install -pm 0644 $(BUILD)/tests/*.* $(PYTHON_LIB)/tests + $(PYCC) $(PYTHON_LIB)/tests + + install -d $(PYTHON_LIB)/tests_0-8 + install -pm 0644 $(BUILD)/tests_0-8/*.* $(PYTHON_LIB)/tests_0-8 + $(PYCC) $(PYTHON_LIB)/tests_0-8 + + install -d $(PYTHON_LIB)/tests_0-9 + install -pm 0644 $(BUILD)/tests_0-9/*.* $(PYTHON_LIB)/tests_0-9 + $(PYCC) $(PYTHON_LIB)/tests_0-9 + + install -d $(PYTHON_LIB)/tests_0-10 + install -pm 0644 $(BUILD)/tests_0-10/*.* $(PYTHON_LIB)/tests_0-10 + $(PYCC) $(PYTHON_LIB)/tests_0-10 + + install -d $(EXEC_PREFIX) + install -pm 0755 qpid-python-test commands/* $(EXEC_PREFIX) + +clean: + rm -rf $(BUILD) diff --git a/python/README.txt b/python/README.txt index e7bb5af408..772271cffe 100644 --- a/python/README.txt +++ b/python/README.txt @@ -1,32 +1,50 @@ -= RUNNING THE PYTHON TESTS = += INSTALLATION = -The tests/ directory contains a collection of python unit tests to -exercise functions of a broker. +Extract the release archive into a directory of your choice and set +your PYTHONPATH accordingly: -Simplest way to run the tests: + tar -xzf qpid-python-.tar.gz -C + export PYTHONPATH=/qpid-/python - * Run a broker on the default port += GETTING STARTED = - * ./run-tests +The python client includes a simple hello-world example that publishes +and consumes a message: -For additional options: ./run-tests --help + cp /qpid-/python/hello-world . + ./hello-world += EXAMPLES = -== Expected failures == +More comprehensive examples can be found here: -Until we complete functionality, tests may fail because the tested -functionality is missing in the broker. To skip expected failures -in the C++ or Java brokers: + cd /qpid-/python/examples - ./run-tests -I += RUNNING THE TESTS = -=== File List === +The "tests" directory contains a collection of unit tests for the +python client. The "tests_0-10", "tests_0-9", and "tests_0-8" +directories contain protocol level conformance tests for AMQP brokers +of the specified version. -1. cpp_failing_0-10.txt -2. cpp_failing_0-9.txt -3. cpp_failing_0-8.txt -4. java_failing_0-9.txt -5. java_failing_0-8.txt -6. cpp_failing_0-10_preview.txt -- will be depricated soon. +The qpid-python-test script may be used to run these tests. It will by +default run the python unit tests and the 0-10 conformance tests: -If you fix a failure, please remove it from the corresponding list. + 1. Run a broker on the default port + + 2. ./qpid-python-test + +If you wish to run the 0-8 or 0-9 conformence tests, they may be +selected as follows: + + 1. Run a broker on the default port + + 2. ./qpid-python-test tests_0-8.* + + -- or -- + + ./qpid-python-test tests_0-9.* + +See the qpid-python-test usage for for additional options: + + ./qpid-python-test -h diff --git a/python/RELEASE_NOTES b/python/RELEASE_NOTES index 7005aa83cb..c0903df38e 100644 --- a/python/RELEASE_NOTES +++ b/python/RELEASE_NOTES @@ -1,25 +1,17 @@ -Apache Incubator Qpid Python M2 Release Notes -------------------------------------------- +Apache Python M4 Release Notes +------------------------------ -The Qpid M2 release contains support the for AMQP 0-8 specification. -You can access the 0-8 specification using the following link. -http://www.amqp.org/tikiwiki/tiki-index.php?page=Download - -For full details of Qpid capabilities, as they currently stand, see our -detailed project documentation at: - -http://cwiki.apache.org/confluence/pages/viewpage.action?pageId=28284 - -Please take time to go through the README file provided with the distro. +The Qpid M4 release of the python client contains support the for both + 0-8 and 0-10 of the AMQP specification as well as support for the +non-WIP portion of the 0-9 specification. You can access these +specifications from: +http://jira.amqp.org/confluence/display/AMQP/Download -Known Issues/Outstanding Work ------------------------------ - -There are no known issues for the Phyton client. - +For full details of Qpid capabilities, as they currently stand, see our +project page at: -M2 Tasks Completed -------------------- +http://cwiki.apache.org/confluence/display/qpid/Index -Bug QPID-467 Complete Interop Testing +The README file provided contains some details on installing and using +the python client that is included with this distribution. diff --git a/python/amqp-doc b/python/amqp-doc deleted file mode 100755 index 1f5910f942..0000000000 --- a/python/amqp-doc +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import sys, re -from qpid.spec import load, pythonize -from getopt import gnu_getopt as getopt, GetoptError -from fnmatch import fnmatchcase as fnmatch - -def die(msg): - print >> sys.stderr, msg - sys.exit(1) - -def usage(msg = ""): - return ("""%s - -Usage %s [] [ ... ] - -Options: - -e, --regexp use regex instead of glob when matching - -s, --spec location of amqp.xml -""" % (msg, sys.argv[0])).strip() - -try: - opts, args = getopt(sys.argv[1:], "s:ea:", ["regexp", "spec=", "additional="]) -except GetoptError, e: - die(str(e)) - -regexp = False -spec = "../specs/amqp.0-9.xml" -errata = [] -for k, v in opts: - if k == "-e" or k == "--regexp": regexp = True - if k == "-s" or k == "--spec": spec = v - if k == "-a" or k == "--additional": errata.append(v) - -if regexp: - def match(pattern, value): - try: - return re.match(pattern, value) - except Exception, e: - die("error: '%s': %s" % (pattern, e)) -else: - def match(pattern, value): - return fnmatch(value, pattern) - -spec = load(spec, *errata) -methods = {} -patterns = args -for pattern in patterns: - for c in spec.classes: - for m in c.methods: - name = pythonize("%s_%s" % (c.name, m.name)) - if match(pattern, name): - methods[name] = m.define_method(name) - -if patterns: - if methods: - AMQP = type("AMQP[%s]" % ", ".join(patterns), (), methods) - else: - die("no matches") -else: - AMQP = spec.define_class("AMQP") - -help(AMQP) diff --git a/python/commands/qpid-cluster b/python/commands/qpid-cluster new file mode 100755 index 0000000000..7afb7671b8 --- /dev/null +++ b/python/commands/qpid-cluster @@ -0,0 +1,328 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +import socket +import re +from qmf.console import Session + +class Config: + def __init__(self): + self._host = "localhost" + self._connTimeout = 10 + self._stopId = None + self._stopAll = False + self._force = False + self._numeric = False + self._showConn = False + self._delConn = None + +def usage (): + print "Usage: qpid-cluster [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print "Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" + print " -C [--all-connections] View client connections to all cluster members" + print " -c [--connections] ID View client connections to specified member" + print " -d [--del-connection] HOST:PORT" + print " Disconnect a client connection" + print " -s [--stop] ID Stop one member of the cluster by its ID" + print " -k [--all-stop] Shut down the whole cluster" + print " -f [--force] Suppress the 'are-you-sure?' prompt" + print " -n [--numeric] Don't resolve names" + print + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFFL + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class BrokerManager: + def __init__(self, config): + self.config = config + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def _getClusters(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + raise Exception("Clustering is not installed on the broker.") + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + raise Exception("Clustering is installed but not enabled on the broker.") + + return clusters + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(self.config._host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def overview(self): + clusters = self._getClusters() + cluster = clusters[0] + memberList = cluster.members.split(";") + idList = cluster.memberIDs.split(";") + + print " Cluster Name: %s" % cluster.clusterName + print "Cluster Status: %s" % cluster.status + print " Cluster Size: %d" % cluster.clusterSize + print " Members: ID=%s URL=%s" % (idList[0], memberList[0]) + for idx in range(1,len(idList)): + print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) + + def stopMember(self, id): + clusters = self._getClusters() + cluster = clusters[0] + idList = cluster.memberIDs.split(";") + if id not in idList: + raise Exception("No member with matching ID found") + + if not self.config._force: + prompt = "Warning: " + if len(idList) == 1: + prompt += "This command will shut down the last running cluster member." + else: + prompt += "This command will shut down a cluster member." + prompt += " Are you sure? [N]: " + + confirm = raw_input(prompt) + if len(confirm) == 0 or confirm[0].upper() != 'Y': + raise Exception("Operation canceled") + + cluster.stopClusterNode(id) + + def stopAll(self): + clusters = self._getClusters() + if not self.config._force: + prompt = "Warning: This command will shut down the entire cluster." + prompt += " Are you sure? [N]: " + + confirm = raw_input(prompt) + if len(confirm) == 0 or confirm[0].upper() != 'Y': + raise Exception("Operation canceled") + + cluster = clusters[0] + cluster.stopFullCluster() + + def showConnections(self): + clusters = self._getClusters() + cluster = clusters[0] + memberList = cluster.members.split(";") + idList = cluster.memberIDs.split(";") + displayList = [] + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + self.brokers = [] + pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") + + idx = 0 + for host in hostList: + if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn: + self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout)) + displayList.append(idList[idx]) + idx += 1 + + idx = 0 + found = False + for broker in self.brokers: + if not self.config._delConn: + print "Clients on Member: ID=%s:" % displayList[idx] + connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) + for conn in connList: + if pattern.match(conn.address): + if self.config._numeric or self.config._delConn: + a = conn.address + else: + tokens = conn.address.split(":") + try: + hostList = socket.gethostbyaddr(tokens[0]) + host = hostList[0] + except: + host = tokens[0] + a = host + ":" + tokens[1] + if self.config._delConn: + tokens = self.config._delConn.split(":") + ip = socket.gethostbyname(tokens[0]) + toDelete = ip + ":" + tokens[1] + if a == toDelete: + print "Closing connection from client: %s" % a + conn.close() + found = True + else: + print " %s" % a + idx += 1 + if not self.config._delConn: + print + if self.config._delConn and not found: + print "Client connection '%s' not found" % self.config._delConn + + for broker in self.brokers: + self.qmf.delBroker(broker) + + +def main(argv=None): + if argv is None: argv = sys.argv + try: + config = Config() + try: + longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt(argv[1:], "s:kfCc:d:n", longOpts) + except: + usage() + return 1 + + try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] + except: + cargs = encArgs + + count = 0 + for opt in optlist: + if opt[0] == "--timeout": + config._connTimeout = int(opt[1]) + if config._connTimeout == 0: + config._connTimeout = None + if opt[0] == "-s" or opt[0] == "--stop": + config._stopId = opt[1] + if len(config._stopId.split(":")) != 2: + raise Exception("Member ID must be of form: :") + count += 1 + if opt[0] == "-k" or opt[0] == "--all-stop": + config._stopAll = True + count += 1 + if opt[0] == "-f" or opt[0] == "--force": + config._force = True + if opt[0] == "-n" or opt[0] == "--numeric": + config._numeric = True + if opt[0] == "-C" or opt[0] == "--all-connections": + config._showConn = "all" + count += 1 + if opt[0] == "-c" or opt[0] == "--connections": + config._showConn = opt[1] + if len(config._showConn.split(":")) != 2: + raise Exception("Member ID must be of form: :") + count += 1 + if opt[0] == "-d" or opt[0] == "--del-connection": + config._delConn = opt[1] + if len(config._delConn.split(":")) != 2: + raise Exception("Connection must be of form: :") + count += 1 + + if count > 1: + print "Only one command option may be supplied" + print + usage() + return 1 + + nargs = len(cargs) + bm = BrokerManager(config) + + if nargs == 1: + config._host = cargs[0] + + try: + bm.SetBroker(config._host) + if config._stopId: + bm.stopMember(config._stopId) + elif config._stopAll: + bm.stopAll() + elif config._showConn or config._delConn: + bm.showConnections() + else: + bm.overview() + except KeyboardInterrupt: + print + except Exception,e: + if str(e).find("connection aborted") > 0: + # we expect this when asking the connected broker to shut down + return 0 + raise Exception("Failed: %s - %s" % (e.__class__.__name__, e)) + + bm.Disconnect() + except Exception, e: + print str(e) + return 1 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python/commands/qpid-config b/python/commands/qpid-config index cc9315f7ea..39af67f39c 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -22,30 +22,39 @@ import os import getopt import sys -import socket -import qpid -from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.datatypes import uuid4 -from qpid.util import connect -from time import sleep - -_recursive = False -_host = "localhost" -_durable = False -_fileCount = 8 -_fileSize = 24 -_maxQueueSize = None -_maxQueueCount= None - +import locale +from qmf.console import Session + +_recursive = False +_host = "localhost" +_connTimeout = 10 +_altern_ex = None +_passive = False +_durable = False +_clusterDurable = False +_if_empty = True +_if_unused = True +_fileCount = 8 +_fileSize = 24 +_maxQueueSize = None +_maxQueueCount = None +_limitPolicy = None +_order = None +_msgSequence = False +_ive = False +_eventGeneration = None FILECOUNT = "qpid.file_count" FILESIZE = "qpid.file_size" MAX_QUEUE_SIZE = "qpid.max_size" MAX_QUEUE_COUNT = "qpid.max_count" +POLICY_TYPE = "qpid.policy_type" +CLUSTER_DURABLE = "qpid.persist_last_node" +LVQ = "qpid.last_value_queue" +LVQNB = "qpid.last_value_queue_no_browse" +MSG_SEQUENCE = "qpid.msg_sequence" +IVE = "qpid.ive" +QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" def Usage (): print "Usage: qpid-config [OPTIONS]" @@ -54,68 +63,99 @@ def Usage (): print " qpid-config [OPTIONS] add exchange [AddExchangeOptions]" print " qpid-config [OPTIONS] del exchange " print " qpid-config [OPTIONS] add queue [AddQueueOptions]" - print " qpid-config [OPTIONS] del queue " + print " qpid-config [OPTIONS] del queue [DelQueueOptions]" print " qpid-config [OPTIONS] bind [binding-key]" print " qpid-config [OPTIONS] unbind [binding-key]" print print "Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" print " -b [ --bindings ] Show bindings in queue or exchange list" print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" print " broker-addr is in the form: [username/password@] hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print print "Add Queue Options:" - print " --durable Queue is durable" - print " --file-count N (8) Number of files in queue's persistence journal" - print " --file-size N (24) File size in pages (64Kib/page)" - print " --max-queue-size N Maximum in-memory queue size as bytes" - print " --max-queue-count N Maximum in-memory queue size as a number of messages" + print " --alternate-exchange [name of the alternate exchange]" + print " The alternate-exchange field specifies how messages on this queue should" + print " be treated when they are rejected by a subscriber, or when they are" + print " orphaned by queue deletion. When present, rejected or orphaned messages" + print " MUST be routed to the alternate-exchange. In all cases the messages MUST" + print " be removed from the queue." + print " --passive Do not actually change the broker state (queue will not be created)" + print " --durable Queue is durable" + print " --cluster-durable Queue becomes durable if there is only one functioning cluster node" + print " --file-count N (8) Number of files in queue's persistence journal" + print " --file-size N (24) File size in pages (64Kib/page)" + print " --max-queue-size N Maximum in-memory queue size as bytes" + print " --max-queue-count N Maximum in-memory queue size as a number of messages" + print " --limit-policy [none | reject | flow-to-disk | ring | ring-strict]" + print " Action taken when queue limit is reached:" + print " none (default) - Use broker's default policy" + print " reject - Reject enqueued messages" + print " flow-to-disk - Page messages to disk" + print " ring - Replace oldest unacquired message with new" + print " ring-strict - Replace oldest message, reject if oldest is acquired" + print " --order [fifo | lvq | lvq-no-browse]" + print " Set queue ordering policy:" + print " fifo (default) - First in, first out" + print " lvq - Last Value Queue ordering, allows queue browsing" + print " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data" + print " --generate-queue-events N" + print " If set to 1, every enqueue will generate an event that can be processed by" + print " registered listeners (e.g. for replication). If set to 2, events will be" + print " generated for enqueues and dequeues" + print + print "Del Queue Options:" + print " --force Force delete of queue even if it's currently used or it's not empty" + print " --force-if-not-empty Force delete of queue even if it's not empty" + print " --force-if-used Force delete of queue even if it's currently used" + print + print "Add Exchange values:" + print " direct Direct exchange for point-to-point communication" + print " fanout Fanout exchange for broadcast communication" + print " topic Topic exchange that routes messages using binding keys with wildcards" + print " headers Headers exchange that matches header fields against the binding keys" print print "Add Exchange Options:" - print " --durable Exchange is durable" + print " --alternate-exchange [name of the alternate exchange]" + print " In the event that a message cannot be routed, this is the name of the exchange to" + print " which the message will be sent. Messages transferred using message.transfer will" + print " be routed to the alternate-exchange only if they are sent with the \"none\"" + print " accept-mode, and the discard-unroutable delivery property is set to false, and" + print " there is no queue to route to for the given message according to the bindings" + print " on this exchange." + print " --passive Do not actually change the broker state (exchange will not be created)" + print " --durable Exchange is durable" + print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header" + print " with a value that increments for each message forwarded." + print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference" + print " to the last message forwarded and enqueuing that message to newly bound" + print " queues." print sys.exit (1) class BrokerManager: def __init__ (self): - self.dest = None - self.src = None - self.broker = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session (self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker (self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) def Overview (self): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: @@ -136,30 +176,39 @@ class BrokerManager: print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - print "Durable Type Bindings Exchange Name" - print "=======================================================" + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + caption1 = "Type " + caption2 = "Exchange Name" + maxNameLen = len(caption2) + for ex in exchanges: + if self.match(ex.name, filter): + if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) + print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) + line = "" + for i in range(((maxNameLen + len(caption1)) / 5) + 5): + line += "=====" + print line + for ex in exchanges: if self.match (ex.name, filter): - print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name) + print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), + args = ex.arguments + if ex.durable: print "--durable", + if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", + if IVE in args and args[IVE] == 1: print "--ive", + if ex.altExchange: + print "--alternate-exchange=%s" % ex._altExchange_.name, + print def ExchangeListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.id: + if bind.exchangeRef == ex.getObjectId(): qname = "" queue = self.findById (queues, bind.queueRef) if queue != None: @@ -168,43 +217,48 @@ class BrokerManager: def QueueList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - queues = mc.syncGetObjects (mch, "queue") - journals = mc.syncGetObjects (mch, "journal") - print " Store Size" - print "Durable AutoDel Excl Bindings (files x file pages) Queue Name" - print "===========================================================================================" + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + + caption = "Queue Name" + maxNameLen = len(caption) + for q in queues: + if self.match (q.name, filter): + if len(q.name) > maxNameLen: maxNameLen = len(q.name) + print "%-*s Attributes" % (maxNameLen, caption) + line = "" + for i in range((maxNameLen / 5) + 5): + line += "=====" + print line + for q in queues: if self.match (q.name, filter): + print "%-*s " % (maxNameLen, q.name), args = q.arguments - if q.durable and FILESIZE in args and FILECOUNT in args: - fs = int (args[FILESIZE]) - fc = int (args[FILECOUNT]) - print "%4c%9c%7c%10d%11dx%-14d%s" % \ - (YN (q.durable), YN (q.autoDelete), - YN (q.exclusive), q.bindingCount, fc, fs, q.name) - else: - if not _durable: - print "%4c%9c%7c%10d %s" % \ - (YN (q.durable), YN (q.autoDelete), - YN (q.exclusive), q.bindingCount, q.name) + if q.durable: print "--durable", + if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", + if q.autoDelete: print "auto-del", + if q.exclusive: print "excl", + if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], + if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], + if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], + if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], + if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), + if LVQ in args and args[LVQ] == 1: print "--order lvq", + if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", + if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], + if q.altExchange: + print "--alternate-exchange=%s" % q._altExchange_.name, + print def QueueListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.id: + if bind.queueRef == queue.getObjectId(): ename = "" ex = self.findById (exchanges, bind.exchangeRef) if ex != None: @@ -216,30 +270,27 @@ class BrokerManager: def AddExchange (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () etype = args[0] ename = args[1] - - try: - self.session.exchange_declare (exchange=ename, type=etype, durable=_durable) - except Closed, e: - print "Failed:", e + declArgs = {} + if _msgSequence: + declArgs[MSG_SEQUENCE] = 1 + if _ive: + declArgs[IVE] = 1 + if _altern_ex != None: + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) + else: + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs) def DelExchange (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () ename = args[0] - - try: - self.session.exchange_delete (exchange=ename) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_delete (exchange=ename) def AddQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] declArgs = {} if _durable: @@ -250,56 +301,64 @@ class BrokerManager: declArgs[MAX_QUEUE_SIZE] = _maxQueueSize if _maxQueueCount: declArgs[MAX_QUEUE_COUNT] = _maxQueueCount - - try: - self.session.queue_declare (queue=qname, durable=_durable, arguments=declArgs) - except Closed, e: - print "Failed:", e + if _limitPolicy: + if _limitPolicy == "none": + pass + elif _limitPolicy == "reject": + declArgs[POLICY_TYPE] = "reject" + elif _limitPolicy == "flow-to-disk": + declArgs[POLICY_TYPE] = "flow_to_disk" + elif _limitPolicy == "ring": + declArgs[POLICY_TYPE] = "ring" + elif _limitPolicy == "ring-strict": + declArgs[POLICY_TYPE] = "ring_strict" + + if _clusterDurable: + declArgs[CLUSTER_DURABLE] = 1 + if _order: + if _order == "fifo": + pass + elif _order == "lvq": + declArgs[LVQ] = 1 + elif _order == "lvq-no-browse": + declArgs[LVQNB] = 1 + if _eventGeneration: + declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration + + if _altern_ex != None: + self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) + else: + self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs) def DelQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] - - try: - self.session.queue_delete (queue=qname) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused) def Bind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_bind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) def Unbind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_unbind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) def findById (self, items, id): for item in items: - if item.id == id: + if item.getObjectId() == id: return item return None @@ -315,23 +374,43 @@ def YN (bool): return 'Y' return 'N' + ## ## Main Program ## try: - longOpts = ("durable", "bindings", "broker-addr=", "file-count=", "file-size=", "max-queue-size=", "max-queue-count=") - (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) + longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=", + "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=", + "order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty", + "force_if_used", "alternate-exchange=", "passive", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) except: Usage () +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + for opt in optlist: if opt[0] == "-b" or opt[0] == "--bindings": _recursive = True if opt[0] == "-a" or opt[0] == "--broker-addr": _host = opt[1] + if opt[0] == "--timeout": + _connTimeout = int(opt[1]) + if _connTimeout == 0: + _connTimeout = None + if opt[0] == "--alternate-exchange": + _altern_ex = opt[1] + if opt[0] == "--passive": + _passive = True if opt[0] == "--durable": _durable = True + if opt[0] == "--cluster-durable": + _clusterDurable = True if opt[0] == "--file-count": _fileCount = int (opt[1]) if opt[0] == "--file-size": @@ -340,46 +419,77 @@ for opt in optlist: _maxQueueSize = int (opt[1]) if opt[0] == "--max-queue-count": _maxQueueCount = int (opt[1]) + if opt[0] == "--limit-policy": + _limitPolicy = opt[1] + if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", "ring-strict"): + print "Error: Invalid --limit-policy argument" + sys.exit(1) + if opt[0] == "--order": + _order = opt[1] + if _order not in ("fifo", "lvq", "lvq-no-browse"): + print "Error: Invalid --order argument" + sys.exit(1) + if opt[0] == "--sequence": + _msgSequence = True + if opt[0] == "--ive": + _ive = True + if opt[0] == "--generate-queue-events": + _eventGeneration = int (opt[1]) + if opt[0] == "--force": + _if_empty = False + _if_unused = False + if opt[0] == "--force-if-not-empty": + _if_empty = False + if opt[0] == "--force-if-used": + _if_unused = False + nargs = len (cargs) bm = BrokerManager () -bm.SetBroker (Broker (_host)) - -if nargs == 0: - bm.Overview () -else: - cmd = cargs[0] - modifier = "" - if nargs > 1: - modifier = cargs[1] - if cmd[0] == 'e': - if _recursive: - bm.ExchangeListRecurse (modifier) - else: - bm.ExchangeList (modifier) - elif cmd[0] == 'q': - if _recursive: - bm.QueueListRecurse (modifier) - else: - bm.QueueList (modifier) - elif cmd == "add": - if modifier == "exchange": - bm.AddExchange (cargs[2:]) - elif modifier == "queue": - bm.AddQueue (cargs[2:]) - else: - Usage () - elif cmd == "del": - if modifier == "exchange": - bm.DelExchange (cargs[2:]) - elif modifier == "queue": - bm.DelQueue (cargs[2:]) + +try: + bm.SetBroker(_host) + if nargs == 0: + bm.Overview () + else: + cmd = cargs[0] + modifier = "" + if nargs > 1: + modifier = cargs[1] + if cmd == "exchanges": + if _recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd == "queues": + if _recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (cargs[2:]) + elif modifier == "queue": + bm.AddQueue (cargs[2:]) + else: + Usage () + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (cargs[2:]) + elif modifier == "queue": + bm.DelQueue (cargs[2:]) + else: + Usage () + elif cmd == "bind": + bm.Bind (cargs[1:]) + elif cmd == "unbind": + bm.Unbind (cargs[1:]) else: Usage () - elif cmd == "bind": - bm.Bind (cargs[1:]) - elif cmd == "unbind": - bm.Unbind (cargs[1:]) - else: - Usage () +except KeyboardInterrupt: + print +except Exception,e: + print "Failed: %s: %s" % (e.__class__.__name__, e) + sys.exit(1) + bm.Disconnect() diff --git a/python/commands/qpid-printevents b/python/commands/qpid-printevents new file mode 100755 index 0000000000..0c1b618a1f --- /dev/null +++ b/python/commands/qpid-printevents @@ -0,0 +1,74 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import optparse +import sys +import socket +from time import time, strftime, gmtime, sleep +from qmf.console import Console, Session + +class EventConsole(Console): + def event(self, broker, event): + print event + + def brokerConnected(self, broker): + print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl() + + def brokerDisconnected(self, broker): + print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl() + + +## +## Main Program +## +def main(): + _usage = "%prog [options] [broker-addr]..." + _description = \ +"""Collect and print events from one or more Qpid message brokers. If no broker-addr is +supplied, %prog will connect to 'localhost:5672'. +broker-addr is of the form: [username/password@] hostname | ip-address [:] +ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost +""" + p = optparse.OptionParser(usage=_usage, description=_description) + + options, arguments = p.parse_args() + if len(arguments) == 0: + arguments.append("localhost") + + console = EventConsole() + session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True) + brokers = [] + for host in arguments: + brokers.append(session.addBroker(host)) + + try: + while (True): + sleep(10) + except KeyboardInterrupt: + for broker in brokers: + session.delBroker(broker) + print + sys.exit(0) + +if __name__ == '__main__': + main() + diff --git a/python/commands/qpid-queue-stats b/python/commands/qpid-queue-stats index 98dfa7580a..3b8a0dcb19 100755 --- a/python/commands/qpid-queue-stats +++ b/python/commands/qpid-queue-stats @@ -26,120 +26,100 @@ import re import socket import qpid from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker +from qmf.console import Session, Console from qpid.peer import Closed from qpid.connection import Connection, ConnectionFailed -from qpid.util import connect from time import sleep -class mgmtObject (object): - """ Generic object that holds the contents of a management object with its - attributes set as object attributes. """ - - def __init__ (self, classKey, timestamps, row): - self.classKey = classKey - self.timestamps = timestamps - for cell in row: - setattr (self, cell[0], cell[1]) - - - -class BrokerManager: - def __init__ (self): - self.dest = None - self.src = None - self.broker = None - self.objects = {} - self.filter = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec, None, self.configCb, self.instCb) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def setFilter(self,filter): - self.filter = filter - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) - - def configCb (self, context, classKey, row, timestamps): - className = classKey[1] - if className != "queue": - return - - obj = mgmtObject (classKey, timestamps, row) - if obj.id not in self.objects: - self.objects[obj.id] = (obj.name, None, None) - - def instCb (self, context, classKey, row, timestamps): - className = classKey[1] - if className != "queue": - return - - obj = mgmtObject (classKey, timestamps, row) - if obj.id not in self.objects: - return - - (name, first, last) = self.objects[obj.id] - if first == None: - self.objects[obj.id] = (name, obj, None) - return - - if len(self.filter) > 0 : - match = False - - for x in self.filter: - if x.match(name): - match = True - break - if match == False: - return - - if last == None: - lastSample = first - else: - lastSample = last - - self.objects[obj.id] = (name, first, obj) - - deltaTime = float (obj.timestamps[0] - lastSample.timestamps[0]) - enqueueRate = float (obj.msgTotalEnqueues - lastSample.msgTotalEnqueues) / (deltaTime / 1000000000.0) - dequeueRate = float (obj.msgTotalDequeues - lastSample.msgTotalDequeues) / (deltaTime / 1000000000.0) - print "%-41s%10.2f%11d%13.2f%13.2f" % \ - (name, deltaTime / 1000000000, obj.msgDepth, enqueueRate, dequeueRate) - - - def Display (self): - self.ConnectToBroker () - print "Queue Name Sec Depth Enq Rate Deq Rate" - print "========================================================================================" - try: - while True: - sleep (1) - except KeyboardInterrupt: - pass - self.Disconnect () +class BrokerManager(Console): + def __init__(self, host): + self.url = host + self.objects = {} + self.filter = None + self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, + userBindings=True, manageConnections=True) + self.broker = self.session.addBroker(self.url) + self.firstError = True + + def setFilter(self,filter): + self.filter = filter + + def brokerConnected(self, broker): + if not self.firstError: + print "*** Broker connected" + self.firstError = False + + def brokerDisconnected(self, broker): + print "*** Broker connection lost - %s, retrying..." % broker.getError() + self.firstError = False + self.objects.clear() + + def objectProps(self, broker, record): + className = record.getClassKey().getClassName() + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + self.objects[id] = (record.name, None, None) + + def objectStats(self, broker, record): + className = record.getClassKey().getClassName() + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + return + + (name, first, last) = self.objects[id] + if first == None: + self.objects[id] = (name, record, None) + return + + if len(self.filter) > 0 : + match = False + + for x in self.filter: + if x.match(name): + match = True + break + if match == False: + return + + if last == None: + lastSample = first + else: + lastSample = last + + self.objects[id] = (name, first, record) + + deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0]) + if deltaTime < 1000000000.0: + return + enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \ + (deltaTime / 1000000000.0) + dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \ + (deltaTime / 1000000000.0) + print "%-41s%10.2f%11d%13.2f%13.2f" % \ + (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate) + sys.stdout.flush() + + + def Display (self): + self.session.bindClass("org.apache.qpid.broker", "queue") + print "Queue Name Sec Depth Enq Rate Deq Rate" + print "========================================================================================" + sys.stdout.flush() + try: + while True: + sleep (1) + if self.firstError and self.broker.getError(): + self.firstError = False + print "*** Error: %s, retrying..." % self.broker.getError() + except KeyboardInterrupt: + print + self.session.delBroker(self.broker) ## ## Main Program @@ -157,8 +137,7 @@ def main(): for s in options.filter.split(","): filter.append(re.compile(s)) - bm = BrokerManager () - bm.SetBroker (Broker (host)) + bm = BrokerManager(host) bm.setFilter(filter) bm.Display() diff --git a/python/commands/qpid-route b/python/commands/qpid-route index 3cd9109a6a..9965047000 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -22,280 +22,379 @@ import getopt import sys import socket -import qpid import os -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.util import connect - -def Usage (): - print "Usage: qpid-route [OPTIONS] link add " - print " qpid-route [OPTIONS] link del " - print " qpid-route [OPTIONS] link list []" +import locale +from qmf.console import Session, BrokerURL + +def Usage(): + print "Usage: qpid-route [OPTIONS] dynamic add [tag] [exclude-list]" + print " qpid-route [OPTIONS] dynamic del " print print " qpid-route [OPTIONS] route add [tag] [exclude-list]" print " qpid-route [OPTIONS] route del " + print " qpid-route [OPTIONS] queue add " + print " qpid-route [OPTIONS] queue del " print " qpid-route [OPTIONS] route list []" print " qpid-route [OPTIONS] route flush []" + print " qpid-route [OPTIONS] route map []" + print + print " qpid-route [OPTIONS] link add " + print " qpid-route [OPTIONS] link del " + print " qpid-route [OPTIONS] link list []" print print "Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" print " -v [ --verbose ] Verbose output" print " -q [ --quiet ] Quiet output, don't print duplicate warnings" print " -d [ --durable ] Added configuration shall be durable" print " -e [ --del-empty-link ] Delete link after deleting last route on the link" + print " -s [ --src-local ] Make connection to source broker (push route)" + print " --ack N Acknowledge transfers over the bridge in batches of N" + print " -t [ --transport ]" + print " Specify transport to use for links, defaults to tcp" print print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print - sys.exit (1) + sys.exit(1) -_verbose = False -_quiet = False -_durable = False -_dellink = False +_verbose = False +_quiet = False +_durable = False +_dellink = False +_srclocal = False +_transport = "tcp" +_ack = 0 +_connTimeout = 10 class RouteManager: - def __init__ (self, destBroker): - self.dest = Broker (destBroker) - self.src = None + def __init__(self, localBroker): + self.local = BrokerURL(localBroker) + self.remote = None + self.qmf = Session() + self.broker = self.qmf.addBroker(localBroker, _connTimeout) - def ConnectToBroker (self): - broker = self.dest - if _verbose: - print "Connecting to broker: %s:%d" % (broker.host, broker.port) - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (broker.host, broker.port), \ - username=broker.username, password=broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mch = self.mclient.addChannel (self.session) - self.mclient.syncWaitForStable (self.mch) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def Disconnect (self): - self.mclient.removeChannel (self.mch) - self.session.close(timeout=10) - self.conn.close(timeout=10) - - def getLink (self): - links = self.mclient.syncGetObjects (self.mch, "link") + def disconnect(self): + self.qmf.delBroker(self.broker) + + def getLink(self): + links = self.qmf.getObjects(_class="link") for link in links: - if "%s:%d" % (link.host, link.port) == self.src.name (): + if self.remote.match(link.host, link.port): return link return None - def AddLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient - - if self.dest.name() == self.src.name(): - print "Linking broker to itself is not permitted" - sys.exit(1) + def addLink(self, remoteBroker): + self.remote = BrokerURL(remoteBroker) + if self.local.match(self.remote.host, self.remote.port): + raise Exception("Linking broker to itself is not permitted") - brokers = mc.syncGetObjects (self.mch, "broker") + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() - if link != None: - print "Link already exists" - sys.exit(1) - - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" - else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) - if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () - - def DelLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient + if link == None: + if not self.remote.authName or self.remote.authName == "anonymous": + mech = "ANONYMOUS" + else: + mech = "PLAIN" + res = broker.connect(self.remote.host, self.remote.port, _durable, + mech, self.remote.authName or "", self.remote.authPass or "", + _transport) + if _verbose: + print "Connect method returned:", res.status, res.text - brokers = mc.syncGetObjects (self.mch, "broker") + def delLink(self, remoteBroker): + self.remote = BrokerURL(remoteBroker) + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link == None: - print "Link not found" - sys.exit(1) + raise Exception("Link not found") - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if _verbose: - print "Close method returned:", res.status, res.statusText + print "Close method returned:", res.status, res.text - def ListLinks (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") + def listLinks(self): + links = self.qmf.getObjects(_class="link") if len(links) == 0: print "No Links Found" else: print - print "Host Port Durable State Last Error" - print "===================================================================" + print "Host Port Transport Durable State Last Error" + print "=============================================================================" + for link in links: + print "%-16s%-8d%-13s%c %-18s%s" % \ + (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) + + def mapRoutes(self): + qmf = self.qmf + print + print "Finding Linked Brokers:" + + brokerList = {} + brokerList[self.local.name()] = self.broker + print " %s... Ok" % self.local + + added = True + while added: + added = False + links = qmf.getObjects(_class="link") for link in links: - print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError) + url = BrokerURL("%s:%d" % (link.host, link.port)) + if url.name() not in brokerList: + print " %s..." % url.name(), + try: + b = qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout) + brokerList[url.name()] = b + added = True + print "Ok" + except Exception, e: + print e + + print + print "Dynamic Routes:" + bridges = qmf.getObjects(_class="bridge", dynamic=True) + fedExchanges = [] + for bridge in bridges: + if bridge.src not in fedExchanges: + fedExchanges.append(bridge.src) + if len(fedExchanges) == 0: + print " none found" + print + + for ex in fedExchanges: + print " Exchange %s:" % ex + pairs = [] + for bridge in bridges: + if bridge.src == ex: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + found = False + for pair in pairs: + if pair.matches(fromUrl, toUrl): + found = True + if not found: + pairs.append(RoutePair(fromUrl, toUrl)) + for pair in pairs: + print " %s" % pair + print - def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes): - self.src = Broker (srcBroker) - mc = self.mclient + print "Static Routes:" + bridges = qmf.getObjects(_class="bridge", dynamic=False) + if len(bridges) == 0: + print " none found" + print - if self.dest.name() == self.src.name(): - print "Linking broker to itself is not permitted" - sys.exit(1) + for bridge in bridges: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + leftType = "ex" + rightType = "ex" + if bridge.srcIsLocal: + arrow = "=>" + left = bridge.src + right = bridge.dest + if bridge.srcIsQueue: + leftType = "queue" + else: + arrow = "<=" + left = bridge.dest + right = bridge.src + if bridge.srcIsQueue: + rightType = "queue" + + if bridge.srcIsQueue: + print " %s(%s=%s) %s %s(%s=%s)" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right) + else: + print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) + print + + for broker in brokerList: + if broker != self.local.name(): + qmf.delBroker(brokerList[broker]) - brokers = mc.syncGetObjects (self.mch, "broker") - broker = brokers[0] - link = self.getLink () + def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False): + if dynamic and _srclocal: + raise Exception("--src-local is not permitted on dynamic routes") + + self.addLink(remoteBroker) + link = self.getLink() if link == None: - if _verbose: - print "Inter-broker link not found, creating..." - - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" - else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) - if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () + raise Exception("Link failed to create") + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: + if not _quiet: + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) + sys.exit(0) + + if _verbose: + print "Creating inter-broker binding..." + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack) + if res.status != 0: + raise Exception(res.text) + if _verbose: + print "Bridge method returned:", res.status, res.text + + def addQueueRoute(self, remoteBroker, exchange, queue): + self.addLink(remoteBroker) + link = self.getLink() if link == None: - print "Protocol Error - Missing link ID" - sys.exit (1) + raise Exception("Link failed to create") - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: if not _quiet: - print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey) - sys.exit (1) - sys.exit (0) + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) + sys.exit(0) if _verbose: print "Creating inter-broker binding..." - bridgeArgs = {} - bridgeArgs["durable"] = _durable - bridgeArgs["src"] = exchange - bridgeArgs["dest"] = exchange - bridgeArgs["key"] = routingKey - bridgeArgs["tag"] = tag - bridgeArgs["excludes"] = excludes - bridgeArgs["srcIsQueue"] = 0 - bridgeArgs["srcIsLocal"] = 0 - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs) - if res.status == 4: - print "Can't create a durable route on a non-durable link" - sys.exit(1) + res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack) + if res.status != 0: + raise Exception(res.text) if _verbose: - print "Bridge method returned:", res.status, res.statusText + print "Bridge method returned:", res.status, res.text - def DelRoute (self, srcBroker, exchange, routingKey): - self.src = Broker (srcBroker) - mc = self.mclient + def delQueueRoute(self, remoteBroker, exchange, queue): + self.remote = BrokerURL(remoteBroker) + link = self.getLink() + if link == None: + if not _quiet: + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) + + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: + if _verbose: + print "Closing bridge..." + res = bridge.close() + if res.status != 0: + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and _dellink: + link = self.getLink() + if link == None: + sys.exit(0) + if _verbose: + print "Last bridge on link, closing link..." + res = link.close() + if res.status != 0: + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + sys.exit(0) + if not _quiet: + raise Exception("Route not found") - link = self.getLink () + def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): + self.remote = BrokerURL(remoteBroker) + link = self.getLink() if link == None: if not _quiet: - print "No link found from %s to %s" % (self.src.name(), self.dest.name()) - sys.exit (1) - sys.exit (0) + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ + and bridge.dynamic == dynamic: if _verbose: print "Closing bridge..." - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: - print "Error closing bridge: %d - %s" % (res.status, res.statusText) - sys.exit (1) - if len (bridges) == 1 and _dellink: - link = self.getLink () + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and _dellink: + link = self.getLink() if link == None: - sys.exit (0) + sys.exit(0) if _verbose: print "Last bridge on link, closing link..." - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: - print "Error closing link: %d - %s" % (res.status, res.statusText) - sys.exit (1) - sys.exit (0) + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + sys.exit(0) if not _quiet: - print "Route not found" - sys.exit (1) + raise Exception("Route not found") - def ListRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + def listRoutes(self): + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: - print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) + if bridge.dynamic: + keyText = "" + else: + keyText = bridge.key + print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) - def ClearAllRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + def clearAllRoutes(self): + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if _verbose: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: - print "Error: %d - %s" % (res.status, res.statusText) + print "Error: %d - %s" % (res.status, res.text) elif _verbose: print "Ok" if _dellink: - links = mc.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(_class="link") for link in links: if _verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: - print "Error: %d - %s" % (res.status, res.statusText) + print "Error: %d - %s" % (res.status, res.text) elif _verbose: print "Ok" +class RoutePair: + def __init__(self, fromUrl, toUrl): + self.fromUrl = fromUrl + self.toUrl = toUrl + self.bidir = False + + def __repr__(self): + if self.bidir: + delimit = "<=>" + else: + delimit = " =>" + return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) + + def matches(self, fromUrl, toUrl): + if fromUrl == self.fromUrl and toUrl == self.toUrl: + return True + if toUrl == self.fromUrl and fromUrl == self.toUrl: + self.bidir = True + return True + return False + + def YN(val): if val == 1: return 'Y' @@ -306,12 +405,22 @@ def YN(val): ## try: - longOpts = ("verbose", "quiet", "durable", "del-empty-link") - (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "vqde", longOpts) + longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts) except: - Usage () + Usage() + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs for opt in optlist: + if opt[0] == "--timeout": + _connTimeout = int(opt[1]) + if _connTimeout == 0: + _connTimeout = None if opt[0] == "-v" or opt[0] == "--verbose": _verbose = True if opt[0] == "-q" or opt[0] == "--quiet": @@ -320,52 +429,96 @@ for opt in optlist: _durable = True if opt[0] == "-e" or opt[0] == "--del-empty-link": _dellink = True - -nargs = len (cargs) + if opt[0] == "-s" or opt[0] == "--src-local": + _srclocal = True + if opt[0] == "-t" or opt[0] == "--transport": + _transport = opt[1] + if opt[0] == "--ack": + _ack = int(opt[1]) + +nargs = len(cargs) if nargs < 2: - Usage () + Usage() if nargs == 2: - destBroker = "localhost" + localBroker = "localhost" else: - destBroker = cargs[2] + if _srclocal: + localBroker = cargs[3] + remoteBroker = cargs[2] + else: + localBroker = cargs[2] + if nargs > 3: + remoteBroker = cargs[3] group = cargs[0] cmd = cargs[1] -rm = RouteManager (destBroker) -rm.ConnectToBroker () -if group == "link": - if cmd == "add": - if nargs != 4: - Usage() - rm.AddLink (cargs[3]) - elif cmd == "del": - if nargs != 4: - Usage() - rm.DelLink (cargs[3]) - elif cmd == "list": - rm.ListLinks () - -elif group == "route": - if cmd == "add": - if nargs < 6 or nargs > 8: - Usage () - - tag = "" - excludes = "" - if nargs > 6: tag = cargs[6] - if nargs > 7: excludes = cargs[7] - rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) - elif cmd == "del": - if nargs != 6: - Usage () +try: + rm = RouteManager(localBroker) + if group == "link": + if cmd == "add": + if nargs != 4: + Usage() + rm.addLink(remoteBroker) + elif cmd == "del": + if nargs != 4: + Usage() + rm.delLink(remoteBroker) + elif cmd == "list": + rm.listLinks() + + elif group == "dynamic": + if cmd == "add": + if nargs < 5 or nargs > 7: + Usage() + + tag = "" + excludes = "" + if nargs > 5: tag = cargs[5] + if nargs > 6: excludes = cargs[6] + rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True) + elif cmd == "del": + if nargs != 5: + Usage() + else: + rm.delRoute(remoteBroker, cargs[4], "", dynamic=True) + + elif group == "route": + if cmd == "add": + if nargs < 6 or nargs > 8: + Usage() + + tag = "" + excludes = "" + if nargs > 6: tag = cargs[6] + if nargs > 7: excludes = cargs[7] + rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False) + elif cmd == "del": + if nargs != 6: + Usage() + rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False) + elif cmd == "map": + rm.mapRoutes() else: - rm.DelRoute (cargs[3], cargs[4], cargs[5]) - else: - if cmd == "list": - rm.ListRoutes () - elif cmd == "flush": - rm.ClearAllRoutes () + if cmd == "list": + rm.listRoutes() + elif cmd == "flush": + rm.clearAllRoutes() + else: + Usage() + + elif group == "queue": + if nargs != 6: + Usage() + if cmd == "add": + rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) + elif cmd == "del": + rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) else: - Usage () -rm.Disconnect () + Usage() + +except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + sys.exit(1) + +rm.disconnect() diff --git a/python/commands/qpid-stat b/python/commands/qpid-stat new file mode 100755 index 0000000000..29deeb2342 --- /dev/null +++ b/python/commands/qpid-stat @@ -0,0 +1,460 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +import socket +import re +from qmf.console import Session, Console +from qpid.disp import Display, Header, Sorter + +_host = "localhost" +_connTimeout = 10 +_types = "" +_limit = 50 +_increasing = False +_sortcol = None +pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") + +def Usage (): + print "Usage: qpid-stat [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print "General Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" +# print " -n [--numeric] Don't resolve names" + print + print "Display Options:" + print + print " -b Show Brokers" + print " -c Show Connections" +# print " -s Show Sessions" + print " -e Show Exchanges" + print " -q Show Queues" + print + print " -S [--sort-by] COLNAME Sort by column name" + print " -I [--increasing] Sort by increasing value (default = decreasing)" + print " -L [--limit] NUM Limit output to NUM rows (default = 50)" + print + sys.exit (1) + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFFL + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class Broker(object): + def __init__(self, qmf, broker): + self.broker = broker + + agents = qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] + self.currentTime = bobj.getTimestamps()[0] + try: + self.uptime = bobj.uptime + except: + self.uptime = 0 + self.connections = {} + self.sessions = {} + self.exchanges = {} + self.queues = {} + package = "org.apache.qpid.broker" + + list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) + for conn in list: + if pattern.match(conn.address): + self.connections[conn.getObjectId()] = conn + + list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) + for sess in list: + if sess.connectionRef in self.connections: + self.sessions[sess.getObjectId()] = sess + + list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) + for exchange in list: + self.exchanges[exchange.getObjectId()] = exchange + + list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) + for queue in list: + self.queues[queue.getObjectId()] = queue + + def getName(self): + return self.broker.getUrl() + + def getCurrentTime(self): + return self.currentTime + + def getUptime(self): + return self.uptime + +class BrokerManager(Console): + def __init__(self): + self.brokerName = None + self.qmf = None + self.broker = None + self.brokers = [] + self.cluster = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def _getCluster(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + return None + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + return None + + self.cluster = clusters[0] + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(_host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): + if len(subs) == 0: + return + this = subs[0] + remaining = subs[1:] + newindent = indent + " " + if this == 'b': + pass + elif this == 'c': + if broker: + for oid in broker.connections: + iconn = broker.connections[oid] + self.printConnSub(indent, broker.getName(), iconn) + self.displaySubs(remaining, newindent, broker=broker, conn=iconn, + sess=sess, exchange=exchange, queue=queue) + elif this == 's': + pass + elif this == 'e': + pass + elif this == 'q': + pass + print + + def displayBroker(self, subs): + disp = Display(prefix=" ") + heads = [] + heads.append(Header('broker')) + heads.append(Header('cluster')) + heads.append(Header('uptime', Header.DURATION)) + heads.append(Header('conn', Header.KMG)) + heads.append(Header('sess', Header.KMG)) + heads.append(Header('exch', Header.KMG)) + heads.append(Header('queue', Header.KMG)) + rows = [] + for broker in self.brokers: + if self.cluster: + ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) + else: + ctext = "" + row = (broker.getName(), ctext, broker.getUptime(), + len(broker.connections), len(broker.sessions), + len(broker.exchanges), len(broker.queues)) + rows.append(row) + title = "Brokers" + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayConn(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header('client-addr')) + heads.append(Header('cproc')) + heads.append(Header('cpid')) + heads.append(Header('auth')) + heads.append(Header('connected', Header.DURATION)) + heads.append(Header('idle', Header.DURATION)) + heads.append(Header('msgIn', Header.KMG)) + heads.append(Header('msgOut', Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.connections: + conn = broker.connections[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(conn.address) + row.append(conn.remoteProcessName) + row.append(conn.remotePid) + row.append(conn.authIdentity) + row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) + idle = broker.getCurrentTime() - conn.getTimestamps()[0] + row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) + row.append(conn.framesFromClient) + row.append(conn.framesToClient) + rows.append(row) + title = "Connections" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displaySession(self, subs): + disp = Display(prefix=" ") + + def displayExchange(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("exchange")) + heads.append(Header("type")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("bind", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("msgDrop", Header.KMG)) + heads.append(Header("byteIn", Header.KMG)) + heads.append(Header("byteOut", Header.KMG)) + heads.append(Header("byteDrop", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.exchanges: + ex = broker.exchanges[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(ex.name) + row.append(ex.type) + row.append(ex.durable) + row.append(ex.bindingCount) + row.append(ex.msgReceives) + row.append(ex.msgRoutes) + row.append(ex.msgDrops) + row.append(ex.byteReceives) + row.append(ex.byteRoutes) + row.append(ex.byteDrops) + rows.append(row) + title = "Exchanges" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayQueue(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("queue")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("autoDel", Header.Y)) + heads.append(Header("excl", Header.Y)) + heads.append(Header("msg", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("bytes", Header.KMG)) + heads.append(Header("bytesIn", Header.KMG)) + heads.append(Header("bytesOut", Header.KMG)) + heads.append(Header("cons", Header.KMG)) + heads.append(Header("bind", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.queues: + q = broker.queues[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(q.name) + row.append(q.durable) + row.append(q.autoDelete) + row.append(q.exclusive) + row.append(q.msgDepth) + row.append(q.msgTotalEnqueues) + row.append(q.msgTotalDequeues) + row.append(q.byteDepth) + row.append(q.byteTotalEnqueues) + row.append(q.byteTotalDequeues) + row.append(q.consumerCount) + row.append(q.bindingCount) + rows.append(row) + title = "Queues" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayMain(self, main, subs): + if main == 'b': self.displayBroker(subs) + elif main == 'c': self.displayConn(subs) + elif main == 's': self.displaySession(subs) + elif main == 'e': self.displayExchange(subs) + elif main == 'q': self.displayQueue(subs) + + def display(self): + self._getCluster() + if self.cluster: + memberList = self.cluster.members.split(";") + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + if _host.find("@") > 0: + authString = _host.split("@")[0] + "@" + else: + authString = "" + for host in hostList: + b = self.qmf.addBroker(authString + host, _connTimeout) + self.brokers.append(Broker(self.qmf, b)) + else: + self.brokers.append(Broker(self.qmf, self.broker)) + + self.displayMain(_types[0], _types[1:]) + + +## +## Main Program +## + +try: + longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts) +except: + Usage() + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "--timeout": + _connTimeout = int(opt[1]) + if _connTimeout == 0: + _connTimeout = None + elif opt[0] == "-n" or opt[0] == "--numeric": + _numeric = True + elif opt[0] == "-S" or opt[0] == "--sort-by": + _sortcol = opt[1] + elif opt[0] == "-I" or opt[0] == "--increasing": + _increasing = True + elif opt[0] == "-L" or opt[0] == "--limit": + _limit = int(opt[1]) + elif len(opt[0]) == 2: + char = opt[0][1] + if "bcseq".find(char) != -1: + _types += char + else: + Usage() + else: + Usage() + +if len(_types) == 0: + Usage() + +nargs = len(cargs) +bm = BrokerManager() + +if nargs == 1: + _host = cargs[0] + +try: + bm.SetBroker(_host) + bm.display() +except KeyboardInterrupt: + print +except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + sys.exit(1) + +bm.Disconnect() diff --git a/python/commands/qpid-tool b/python/commands/qpid-tool index 60535c253b..05afcc9732 100755 --- a/python/commands/qpid-tool +++ b/python/commands/qpid-tool @@ -24,7 +24,7 @@ import getopt import sys import socket from cmd import Cmd -from qpid.connection import ConnectionFailed +from qpid.connection import ConnectionFailed, Timeout from qpid.managementdata import ManagementData from shlex import split from qpid.disp import Display @@ -148,7 +148,7 @@ class Mcli (Cmd): self.dataObject.close () def Usage (): - print "Usage: qpid-tool []]" + print "Usage: qpid-tool [[/@][:]]" print sys.exit (1) @@ -183,6 +183,8 @@ except ConnectionFailed, e: except Exception, e: if str(e).find ("Exchange not found") != -1: print "Management not enabled on broker: Use '-m yes' option on broker startup." + else: + print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) # Instantiate the CLI interpreter and launch it. diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/python/cpp_failing_0-8.txt b/python/cpp_failing_0-8.txt deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/python/cpp_failing_0-9.txt b/python/cpp_failing_0-9.txt deleted file mode 100644 index 06c31080fb..0000000000 --- a/python/cpp_failing_0-9.txt +++ /dev/null @@ -1,4 +0,0 @@ -tests_0-9.message.MessageTests.test_checkpoint -tests_0-9.message.MessageTests.test_reject -tests_0-9.basic.BasicTests.test_get - diff --git a/python/doc/test-requirements.txt b/python/doc/test-requirements.txt index a1ba414eb2..5089b49dbe 100644 --- a/python/doc/test-requirements.txt +++ b/python/doc/test-requirements.txt @@ -1,3 +1,22 @@ +############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +############################################################################### + * start and stop server, possibly in different configurations, should at least be able to specify host and port diff --git a/python/examples/README b/python/examples/README new file mode 100644 index 0000000000..bd30b2a6f4 --- /dev/null +++ b/python/examples/README @@ -0,0 +1,319 @@ +Running the Python Examples +============================ + + +Running the Direct Examples +---------------------------- + +To run the direct examples, do the following: + +1. Make sure that a qpidd broker is running: + + $ ps -eaf | grep qpidd + + If a broker is running, you should see the qpidd process in the output of the above command. + +2.Declare a message queue and bind it to an exchange by running declare_queues.py, as follows: + + $ python declare_queues.py + + This program has no output. After this program has been run, all messages sent to the amq.direct exchange using the routing key routing_key are sent to the queue named message_queue. + +3.Publish a series of messages to the amq.direct exchange by running direct_producer.py, as follows: + + $ python direct_producer.py + +This program has no output; the messages are routed to the message queue, as instructed by the binding. + +4. Read the messages from the message queue using direct_consumer.py or listener.py, as follows: + + $ python direct_consumer.py + + or + + $ python listener.py + +You should see the following output: + +message 0 +message 1 +message 2 +message 3 +message 4 +message 5 +message 6 +message 7 +message 8 +message 9 +That's all, folks! + + + +Running the Fanout Examples +---------------------------- + +To run the programs for the Fanout example, do the following: + +1. Make sure that a qpidd broker is running: + + $ ps -eaf | grep qpidd + +If a broker is running, you should see the qpidd process in the output of the above command. + +2. In separate windows, start two or more fanout consumers or fanout listeners as follows: + + $ python fanout_consumer.py + + or + + $ python listener.py + +These programs each create a private queue, bind it to the amq.fanout exchange, and wait for messages to arrive on their queue. + +3. In a separate window, publish a series of messages to the amq.fanout exchange by running fanout_producer.py, as follows: + + $ python fanout_producer.py + +This program has no output; the messages are routed to the message queue, as instructed by the binding. + +4. Go to the windows where you are running consumers or listeners. You should see the following output for each listener or consumer: + + message 0 + message 1 + message 2 + message 3 + message 4 + message 5 + message 6 + message 7 + message 8 + message 9 + That's all, folks! + + + +Running the Publish-Subscribe Examples +--------------------------------------- + +To run the programs for the Publish-Subscribe example, do the following: + +1. Make sure that a qpidd broker is running: + + $ ps -eaf | grep qpidd + +If a broker is running, you should see the qpidd process in the output of the above command. + +2. In separate windows, start one or more topic subscribers by running topic_subscriber.py, as follows: + + $ python topic_subscriber.py + +You will see output similar to this: + + Queues created - please start the topic producer + Subscribing local queue 'local_news' to news-53408183-fcee-4b92-950b-90abb297e739' + Subscribing local queue 'local_weather' to weather-53408183-fcee-4b92-950b-90abb297e739' + Subscribing local queue 'local_usa' to usa-53408183-fcee-4b92-950b-90abb297e739' + Subscribing local queue 'local_europe' to europe-53408183-fcee-4b92-950b-90abb297e739' + Messages on 'news' queue: + +Each topic consumer creates a set of private queues, and binds each queue to the amq.topic exchange together with a binding that indicates which messages should be routed to the queue. + +3.In another window, start the topic publisher, which publishes messages to the amq.topic exchange, as follows: + + $ python topic_publisher.py + +This program has no output; the messages are routed to the message queues for each topic_consumer as specified by the bindings the consumer created. + +4. Go back to the window for each topic consumer. You should see output like this: + + Messages on 'news' queue: + usa.news 0 + usa.news 1 + usa.news 2 + usa.news 3 + usa.news 4 + europe.news 0 + europe.news 1 + europe.news 2 + europe.news 3 + europe.news 4 + That's all, folks! + Messages on 'weather' queue: + usa.weather 0 + usa.weather 1 + usa.weather 2 + usa.weather 3 + usa.weather 4 + europe.weather 0 + europe.weather 1 + europe.weather 2 + europe.weather 3 + europe.weather 4 + That's all, folks! + Messages on 'usa' queue: + usa.news 0 + usa.news 1 + usa.news 2 + usa.news 3 + usa.news 4 + usa.weather 0 + usa.weather 1 + usa.weather 2 + usa.weather 3 + usa.weather 4 + That's all, folks! + Messages on 'europe' queue: + europe.news 0 + europe.news 1 + europe.news 2 + europe.news 3 + europe.news 4 + europe.weather 0 + europe.weather 1 + europe.weather 2 + europe.weather 3 + europe.weather 4 + That's all, folks! + + +Running the Request/Response Examples +-------------------------------------- + +To run the programs for the Request/Response example, do the following: + +1. Make sure that a qpidd broker is running: + + $ ps -eaf | grep qpidd + +If a broker is running, you should see the qpidd process in the output of the above command. + +2. Run the server. + + $ python server.py + +You should see the following output: + + Request server running - run your client now. + (Times out after 100 seconds ...) + +3. In a separate window, start a client: + + $ python client.py + +You should see the following output: + + Request: Twas brillig, and the slithy toves + Request: Did gyre and gimble in the wabe. + Request: All mimsy were the borogroves, + Request: And the mome raths outgrabe. + Messages on queue: reply_to:db0f862e-6b36-4e0f-a4b2-ad049eb435ce + Response: TWAS BRILLIG, AND THE SLITHY TOVES + Response: DID GYRE AND GIMBLE IN THE WABE. + Response: ALL MIMSY WERE THE BOROGROVES, + Response: AND THE MOME RATHS OUTGRABE. + No more messages! + + +Running the XML-based Routing Examples +--------------------------------------- + +To run the programs for the XML-based Routing example, do the following: + +1. Make sure that a qpidd broker is running: + + $ ps -eaf | grep qpidd + +If a broker is running, you should see the qpidd process in the output of the above command. + +2. Declare an XML exchange and a message queue, then bind the queue to the exchange by running declare_queues.py, as follows: + + $ python declare_queues.py + +This program has no output. After this program has been run, all messages sent to the xml exchange using the routing key weather are sent to the queue named message_queue if they satisfy the conditions specified in the following XQuery, which is used in the binding: + + let $w := ./weather + return $w/station = 'Raleigh-Durham International Airport (KRDU)' + and $w/temperature_f > 50 + and $w/temperature_f - $w/dewpoint > 5 + and $w/wind_speed_mph > 7 + and $w/wind_speed_mph < 20 + +3. Publish a series of messages to the xml exchange by running xml_producer.py, as follows: + + $ python xml_producer.py + +The messages are routed to the message queue, as prescribed by the binding. Each message represents a weather report, such as this one: + + + Raleigh-Durham International Airport (KRDU) + 16 + 70 + 35 + + +4. Read the messages from the message queue using direct_consumer.py or listener.py, as follows: + + $ python xml_consumer.py + + or + + $ python listener.py + +You should see the following output: + +Raleigh-Durham International Airport (KRDU) +1670 +35 + + +Running the Headers Examples +----------------------------- + +To run the headers examples, do the following: + +1. Make sure that a qpidd broker is running: + + $ ps -eaf | grep qpidd + + If a broker is running, you should see the qpidd process in the output of the above command. + +2.Declare a message queues and bind them to an exchange by running declare_queues.py, as follows: + + $ python declare_queues.py + + This program has no output. After this program has been run, all messages sent to the amq.match exchange with an application-header of {'class': 'first'} will be routed to the queue named "first" and messages with an application-header of {'class': 'second'} will be routed to the queue named "second". + +3.Publish a series of messages to the amq.match exchange by running headers_producer.py, as follows: + + $ python headers_producer.py + +This program has no output; the messages are routed to the message queues, as instructed by the bindings. + +4. Read the messages from the message queues using headers_consumer.py as follows: + + $ python headers_consumer.py + +You should see the following output: + +message(first) 0 +message(first) 1 +message(first) 2 +message(first) 3 +message(first) 4 +message(first) 5 +message(first) 6 +message(first) 7 +message(first) 8 +message(first) 9 +That's all, folks! +message(second) 0 +message(second) 1 +message(second) 2 +message(second) 3 +message(second) 4 +message(second) 5 +message(second) 6 +message(second) 7 +message(second) 8 +message(second) 9 +That's all, folks! diff --git a/python/examples/api/drain b/python/examples/api/drain new file mode 100755 index 0000000000..485985f16d --- /dev/null +++ b/python/examples/api/drain @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse +from qpid.messaging import * +from qpid.util import URL + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="Drain messages from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-t", "--timeout", type=float, default=0, + help="timeout in seconds to wait before exiting (default %default)") +parser.add_option("-f", "--forever", action="store_true", + help="ignore timeout and wait forever") + +opts, args = parser.parse_args() + +url = URL(opts.broker) +if args: + addr = args.pop(0) +else: + parser.error("address is required") +if opts.forever: + timeout = None +else: + timeout = opts.timeout + +# XXX: should make URL default the port for us +conn = Connection.open(url.host, url.port or AMQP_PORT, + username=url.user, password=url.password) +ssn = conn.session() +rcv = ssn.receiver(addr) + +while True: + try: + print rcv.fetch(timeout=timeout) + ssn.acknowledge() + except Empty: + break + except ReceiveError, e: + print e + break + +conn.close() diff --git a/python/examples/api/server b/python/examples/api/server new file mode 100755 index 0000000000..adb2dcf792 --- /dev/null +++ b/python/examples/api/server @@ -0,0 +1,87 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, traceback +from qpid.messaging import * +from qpid.util import URL +from subprocess import Popen, STDOUT, PIPE +from qpid.log import enable, DEBUG, WARN + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="handle requests from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-v", dest="verbose", action="store_true", help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +url = URL(opts.broker) +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +# XXX: should make URL default the port for us +conn = Connection.open(url.host, url.port or AMQP_PORT, + username=url.user, password=url.password) +conn.reconnect = True +ssn = conn.session() +rcv = ssn.receiver(addr) + +def dispatch(msg): + msg_type = msg.properties.get("type") + if msg_type == "shell": + proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE) + output, _ = proc.communicate() + result = Message(output) + result.properties["exit"] = proc.returncode + elif msg_type == "eval": + try: + content = eval(msg.content) + except: + content = traceback.format_exc() + result = Message(content) + else: + result = Message("unrecognized message type: %s" % msg_type) + return result + +while True: + try: + msg = rcv.fetch() + response = dispatch(msg) + snd = ssn.sender(msg.reply_to) + try: + snd.send(response) + except SendError, e: + print e + snd.close() + ssn.acknowledge() + except Empty: + break + except ReceiveError, e: + print e + break + +conn.close() diff --git a/python/examples/api/spout b/python/examples/api/spout new file mode 100755 index 0000000000..6a9b2b6e3d --- /dev/null +++ b/python/examples/api/spout @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time +from qpid.messaging import * +from qpid.util import URL + +def nameval(st): + idx = st.find("=") + if idx >= 0: + name = st[0:idx] + value = st[idx+1:] + else: + name = st + value = None + return name, value + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", + description="Send messages to the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-c", "--count", type=int, default=1, + help="stop after count messages have been sent, zero disables (default %default)") +parser.add_option("-t", "--timeout", type=float, default=None, + help="exit after the specified time") +parser.add_option("-i", "--id", help="use the supplied id instead of generating one") +parser.add_option("-r", "--reply-to", help="specify reply-to address") +parser.add_option("-P", "--property", dest="properties", action="append", default=[], + help="specify message property") +parser.add_option("-M", "--map", dest="entries", action="append", default=[], + help="specify map entry for message body") + +opts, args = parser.parse_args() + +url = URL(opts.broker) +if opts.id is None: + spout_id = str(uuid4()) +else: + spout_id = opts.id +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +content = None + +if args: + text = " ".join(args) +else: + text = None + +if opts.entries: + content = {} + if text: + content["text"] = text + for e in opts.entries: + name, val = nameval(e) + content[name] = val +else: + content = text + +# XXX: should make URL default the port for us +conn = Connection.open(url.host, url.port or AMQP_PORT, + username=url.user, password=url.password) +ssn = conn.session() +snd = ssn.sender(addr) + +count = 0 +start = time.time() +while (opts.count == 0 or count < opts.count) and \ + (opts.timeout is None or time.time() - start < opts.timeout): + msg = Message(content, reply_to=opts.reply_to) + msg.properties["spout-id"] = "%s:%s" % (spout_id, count) + for p in opts.properties: + name, val = nameval(p) + msg.properties[name] = val + + try: + snd.send(msg) + count += 1 + print msg + except SendError, e: + print e + break + +conn.close() diff --git a/python/examples/datatypes/client.py b/python/examples/datatypes/client.py new file mode 100755 index 0000000000..088e529909 --- /dev/null +++ b/python/examples/datatypes/client.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + client.py + + Client for testing use of Unicode and datatypes. + + Both client and server will be written in C++ and Python. + Tests can run clients and servers written in different + languages, and they can be run on 32-bit and 64-bit architectures. + +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +import testdata + +#----- Initialization -------------------------------------- + + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + + +#----- Main Body -- ---------------------------------------- + +# Create a response queue for the server to send responses to. Use the +# same string as the name of the queue and the name of the routing +# key. + +reply_to = "reply_to:" + session.name +session.queue_declare(queue=reply_to, exclusive=True) +session.exchange_bind(exchange="amq.direct", queue=reply_to, binding_key=reply_to) + +# Create a local queue and subscribe it to the response queue + +local_queue_name = "local_queue" +queue = session.incoming(local_queue_name) + +# Call message_subscribe() to tell the broker to deliver messages from +# the server's reply_to queue to our local client queue. The server +# will start delivering messages as soon as message credit is +# available. + +session.message_subscribe(queue=reply_to, destination=local_queue_name) +queue.start() + +# Set up the properties. Perhaps a few application headers? + +delivery_properties = session.delivery_properties(routing_key="request") + +message_properties = session.message_properties() + +message_properties.content_encoding="text/plain; charset='utf-8'" + +testdata.set_application_headers(message_properties) +message_properties.reply_to = session.reply_to("amq.direct", reply_to) + +# deliver the message - remember to encode the Unicode string! +request = Message(message_properties, delivery_properties, testdata.String_Greek.encode("utf8")) +session.message_transfer(destination="amq.direct", message=request) + +# Now see what messages the server sent to our reply_to queue + +try: + response = queue.get(timeout=10) + content = response.body + session.message_accept(RangedSet(response.id)) + testdata.check_message(response) + print "Response: " + content +except Empty: + print "No more messages!" + exit(1) +except: + print "Unexpected exception!" + exit(1) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/python/examples/datatypes/server.py b/python/examples/datatypes/server.py new file mode 100755 index 0000000000..18e6fa4ad7 --- /dev/null +++ b/python/examples/datatypes/server.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + server.py + + Server for testing use of Unicode and datatypes. + + Both client and server will be written in C++ and Python. + Tests can run clients and servers written in different + languages, and they can be run on 32-bit and 64-bit architectures. +""" + +import testdata + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ------------------------------------------- +def respond(session, request): + + # The routing key for the response is the request's reply-to + # property. The body for the response is the request's body, + # converted to upper case. + + testdata.check_message(request) + + message_properties = request.get("message_properties") + reply_to = message_properties.reply_to + + testdata.set_application_headers(message_properties) + + if reply_to == None: + raise Exception("This message is missing the 'reply_to' property, which is required") + + delivery_properties = session.delivery_properties(routing_key=reply_to["routing_key"]) + response = Message(delivery_properties, message_properties, testdata.String_Greek.encode("utf8")) + print "Sending response ..." + session.message_transfer(destination=reply_to["exchange"], message=response) + +#----- Initialization -------------------------------------- + + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Main Body -- ---------------------------------------- + +# Create a request queue and subscribe to it + +session.queue_declare(queue="request", exclusive=True) +session.exchange_bind(exchange="amq.direct", queue="request", binding_key="request") + +local_queue_name = "local_queue" + +session.message_subscribe(queue="request", destination=local_queue_name) + +queue = session.incoming(local_queue_name) +queue.start() + +# Remind the user to start the client program + +print "Request server running - run your client now." +print "(Times out after 100 seconds ...)" +sys.stdout.flush() + +# Respond to each request + +# If we get a message, send it back to the user (as indicated in the +# ReplyTo property) + +while True: + try: + request = queue.get(timeout=100) + session.message_accept(RangedSet(request.id)) + + respond(session, request) + except Empty: + print "No more messages!" + break; + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/python/examples/datatypes/testdata.py b/python/examples/datatypes/testdata.py new file mode 100644 index 0000000000..cdf140d400 --- /dev/null +++ b/python/examples/datatypes/testdata.py @@ -0,0 +1,180 @@ +# -*- encoding: utf-8 -*- + +from qpid.datatypes import uuid4, timestamp + +#----- Some variables to test boundary conditions on various data types + +void = None +boolean_true = True +boolean_false = False +Uint8_0 = 0 +Uint8_max = 255 +Uint16_0 = 0 +Uint16_max = 65535 +Uint32_0 = 0 +Uint32_max = 4294967295 +Uint64_0 = 0 +Uint64_max = 18446744073709551615 +Int8_min = -128 +Int8_0 = 0 +Int8_max = 127 +Int16_min = -32768 +Int16_0 = 0 +Int16_max = 32767 +Int32_min = -2147483648 +Int32_0 = 0 +Int32_max = 2147483647 +Int64_min = -9223372036854775808 +Int64_0 = 0 +Int64_max = 9223372036854775807 + +Float_pi = 3.14159265 +Float_neg = -1E4 +Float_big = 1267.43233E12 +Float_small = 12.78e-12 +Float_neg0 = -0 +Float_pos0 = 0 +Float_INF = float('inf') +Float_Negative_INF = float('-inf') + +Double_pi = 3.1415926535897932384626433832795 +Double_neg = -1E4 +Double_big = 1267.43233E12 +Double_small = 12.78e-2 +Double_neg0 = -0 +Double_pos0 = 0 +Double_INF = float('inf') +Double_Negative_INF = float('-inf') + +char_1byte = u'0024' # $ +char_2byte = u'00A2' # ¢ +char_3byte = u'20AC' # € +char_4byte = u'10ABCD' + +timestamp = timestamp() + +UUID = uuid4() + +String_Greek = u"ἐξίσταντο δὲ πάντες καὶ διηπόρουν, ἄλλος πρὸς ἄλλον λέγοντες, Τί θέλει τοῦτο εἶναι;" + +String_Empty = "" + +#----- A few functions ---------------------------------------------------------- + +def near_enough(float1, float2, delta): + return abs(float1-float2) < delta + +def set_application_headers(message_properties): + + message_properties.application_headers = {} + message_properties.application_headers["void"] = None + message_properties.application_headers["boolean_true"] = boolean_true + message_properties.application_headers["boolean_false"] = boolean_false + message_properties.application_headers["Uint8_0"] = Uint8_0 + message_properties.application_headers["Uint8_max"] = Uint8_max + message_properties.application_headers["Uint16_0"] = Uint16_0 + message_properties.application_headers["Uint16_max"] = Uint16_max + message_properties.application_headers["Uint32_0"] = Uint32_0 + message_properties.application_headers["Uint32_max"] = Uint32_max + message_properties.application_headers["Uint64_0"] = Uint64_0 +# message_properties.application_headers["Uint64_max"] = Uint64_max + message_properties.application_headers["Int8_min"] = Int8_min + message_properties.application_headers["Int8_0"] = Int8_0 + message_properties.application_headers["Int8_max"] = Int8_max + message_properties.application_headers["Int16_min"] = Int16_min + message_properties.application_headers["Int16_0"] = Int16_0 + message_properties.application_headers["Int16_max"] = Int16_max + message_properties.application_headers["Int32_min"] = Int32_min + message_properties.application_headers["Int32_0"] = Int32_0 + message_properties.application_headers["Int32_max"] = Int32_max + message_properties.application_headers["Int64_min"] = Int64_min + message_properties.application_headers["Int64_0"] = Int64_0 + message_properties.application_headers["Int64_max"] = Int64_max + + message_properties.application_headers["Float_pi"] = Float_pi + message_properties.application_headers["Float_neg"] = Float_neg + message_properties.application_headers["Float_big"] = Float_big + message_properties.application_headers["Float_small"] = Float_small + message_properties.application_headers["Float_neg0"] = Float_neg0 + message_properties.application_headers["Float_pos0"] = Float_pos0 + message_properties.application_headers["Float_INF"] = Float_INF + message_properties.application_headers["Float_Negative_INF"] = Float_Negative_INF + + message_properties.application_headers["Double_pi"] = Double_pi + message_properties.application_headers["Double_neg"] = Double_neg + message_properties.application_headers["Double_big"] = Double_big + message_properties.application_headers["Double_small"] = Double_small + message_properties.application_headers["Double_neg0"] = Double_neg0 + message_properties.application_headers["Double_pos0"] = Double_pos0 + message_properties.application_headers["Double_INF"] = Double_INF + message_properties.application_headers["Double_Negative_INF"] = Double_Negative_INF + + message_properties.application_headers["char_1byte"] = char_1byte + message_properties.application_headers["char_2byte"] = char_2byte + message_properties.application_headers["char_3byte"] = char_3byte + message_properties.application_headers["char_4byte"] = char_4byte + + message_properties.application_headers["timestamp"] = timestamp + message_properties.application_headers["UUID"] = uuid4() + message_properties.application_headers["String_Greek"] = String_Greek + message_properties.application_headers["String_Empty"] = String_Empty + +def check_message(message): + +# message_properties = message.message_properties() + message_properties = message.get("message_properties") + assert message_properties.application_headers["void"] == None + assert message_properties.application_headers["boolean_true"] == boolean_true + assert message_properties.application_headers["boolean_false"] == boolean_false + assert message_properties.application_headers["Uint8_0"] == Uint8_0 + assert message_properties.application_headers["Uint8_max"] == Uint8_max + assert message_properties.application_headers["Uint16_0"] == Uint16_0 + assert message_properties.application_headers["Uint16_max"] == Uint16_max + assert message_properties.application_headers["Uint32_0"] == Uint32_0 + assert message_properties.application_headers["Uint32_max"] == Uint32_max + assert message_properties.application_headers["Uint64_0"] == Uint64_0 +# assert message_properties.application_headers["Uint64_max"] == Uint64_max + assert message_properties.application_headers["Int8_min"] == Int8_min + assert message_properties.application_headers["Int8_0"] == Int8_0 + assert message_properties.application_headers["Int8_max"] == Int8_max + assert message_properties.application_headers["Int16_min"] == Int16_min + assert message_properties.application_headers["Int16_0"] == Int16_0 + assert message_properties.application_headers["Int16_max"] == Int16_max + assert message_properties.application_headers["Int32_min"] == Int32_min + assert message_properties.application_headers["Int32_0"] == Int32_0 + assert message_properties.application_headers["Int32_max"] == Int32_max + assert message_properties.application_headers["Int64_min"] == Int64_min + assert message_properties.application_headers["Int64_0"] == Int64_0 + assert message_properties.application_headers["Int64_max"] == Int64_max + +# Change floating point comparisons to allow inexactness + + assert near_enough(message_properties.application_headers["Float_pi"], Float_pi, 0.00001) + assert near_enough(message_properties.application_headers["Float_neg"], Float_neg, 0.00001) + assert near_enough(message_properties.application_headers["Float_big"], Float_big, Float_big/1000000) + assert near_enough(message_properties.application_headers["Float_small"], Float_small, 0.00001) + assert message_properties.application_headers["Float_neg0"] == Float_neg0 + assert message_properties.application_headers["Float_pos0"] == Float_pos0 + assert message_properties.application_headers["Float_INF"] == Float_INF + assert message_properties.application_headers["Float_Negative_INF"] == Float_Negative_INF + + assert near_enough(message_properties.application_headers["Double_pi"], Double_pi, 0.00001) + assert near_enough(message_properties.application_headers["Double_neg"], Double_neg, 0.00001) + assert near_enough(message_properties.application_headers["Double_big"], Double_big, Double_big/1000000) + assert near_enough(message_properties.application_headers["Double_small"], Double_small, 0.00001) + assert message_properties.application_headers["Double_neg0"] == Double_neg0 + assert message_properties.application_headers["Double_pos0"] == Double_pos0 + assert message_properties.application_headers["Double_INF"] == Double_INF + assert message_properties.application_headers["Double_Negative_INF"] == Double_Negative_INF + + assert message_properties.application_headers["char_1byte"] == char_1byte + assert message_properties.application_headers["char_2byte"] == char_2byte + assert message_properties.application_headers["char_3byte"] == char_3byte + assert message_properties.application_headers["char_4byte"] == char_4byte + +# assert message_properties.application_headers["timestamp"] == timestamp +# assert message_properties.application_headers["UUID"] == UUID + assert message_properties.application_headers["String_Greek"] == String_Greek + assert message_properties.application_headers["String_Empty"] == String_Empty + + diff --git a/python/examples/direct/declare_queues.py b/python/examples/direct/declare_queues.py index f0c34fa8c9..13818ee9d7 100755 --- a/python/examples/direct/declare_queues.py +++ b/python/examples/direct/declare_queues.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ declare_queues.py @@ -36,7 +54,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/direct/direct_consumer.py b/python/examples/direct/direct_consumer.py index 23577e9f53..b07e53c5c7 100755 --- a/python/examples/direct/direct_consumer.py +++ b/python/examples/direct/direct_consumer.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ direct_consumer.py @@ -34,7 +52,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/direct/direct_producer.py b/python/examples/direct/direct_producer.py index 870ce66e78..fcbb4675e4 100755 --- a/python/examples/direct/direct_producer.py +++ b/python/examples/direct/direct_producer.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ direct_producer.py @@ -34,7 +52,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/direct/listener.py b/python/examples/direct/listener.py index 66927eca4b..9d06bd3929 100755 --- a/python/examples/direct/listener.py +++ b/python/examples/direct/listener.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ listener.py @@ -55,7 +73,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/direct/verify b/python/examples/direct/verify index 01d81a18a1..92f87bf827 100644 --- a/python/examples/direct/verify +++ b/python/examples/direct/verify @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify clients ./declare_queues.py ./direct_producer.py ./direct_consumer.py outputs ./declare_queues.py.out ./direct_producer.py.out ./direct_consumer.py.out diff --git a/python/examples/fanout/fanout_consumer.py b/python/examples/fanout/fanout_consumer.py index a2b1b30141..0452baa8da 100755 --- a/python/examples/fanout/fanout_consumer.py +++ b/python/examples/fanout/fanout_consumer.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ fanout_consumer.py @@ -32,7 +50,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/fanout/fanout_producer.py b/python/examples/fanout/fanout_producer.py index 3950ca6d2e..c4df252c70 100755 --- a/python/examples/fanout/fanout_producer.py +++ b/python/examples/fanout/fanout_producer.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ fanout_producer.py @@ -31,7 +49,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/fanout/listener.py b/python/examples/fanout/listener.py index 74ae858127..29db402e9d 100755 --- a/python/examples/fanout/listener.py +++ b/python/examples/fanout/listener.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ listener.py @@ -52,7 +70,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/fanout/verify b/python/examples/fanout/verify index 6a3132a94f..9e5c364bfa 100644 --- a/python/examples/fanout/verify +++ b/python/examples/fanout/verify @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify background "Subscribed" ./fanout_consumer.py background "Subscribed" ./fanout_consumer.py diff --git a/python/examples/headers/declare_queues.py b/python/examples/headers/declare_queues.py new file mode 100755 index 0000000000..b3d5c43fe5 --- /dev/null +++ b/python/examples/headers/declare_queues.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + declare_queues.py + + Creates and binds a queue on an AMQP headers exchange. + + All messages with an application header of {'class': 'first'} are sent to queue "first". + All messages with an application header of {'class': 'second'} are sent to queue "second". +""" + +# Common includes + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Create queues ------------------------------------- + +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# exchange_bind() determines which messages are routed to a queue. + +session.queue_declare(queue="first") +session.exchange_bind(exchange="amq.match", queue="first", arguments={'x-match':'any', 'class':'first'}) + +session.queue_declare(queue="second") +session.exchange_bind(exchange="amq.match", queue="second", arguments={'x-match':'any', 'class':'second'}) + +#----- Cleanup --------------------------------------------- + +session.close(timeout=10) diff --git a/python/examples/headers/headers_consumer.py b/python/examples/headers/headers_consumer.py new file mode 100755 index 0000000000..8f5ce3c5ff --- /dev/null +++ b/python/examples/headers/headers_consumer.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + headers_consumer.py + + This AMQP client reads messages from two message + queues named "first" and "second". +""" + +import qpid +import sys +import os +from random import randint +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Read from queue -------------------------------------------- + +# Now let's create two local client queues and tell them to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +local_queue_name_first = "local_queue_first" +local_queue_name_second = "local_queue_second" + +queue_first = session.incoming(local_queue_name_first) +queue_second = session.incoming(local_queue_name_second) + +# Call message_subscribe() to tell the broker to deliver messages +# from the AMQP queue to these local client queues. The broker will +# start delivering messages as soon as credit is allocated using +# queue.start(). + +session.message_subscribe(queue="first", destination=local_queue_name_first) +session.message_subscribe(queue="second", destination=local_queue_name_second) + +queue_first.start() +queue_second.start() + +# Initialize 'final' and 'content', variables used to identify the last message. + +final = "That's all, folks!" # In a message body, signals the last message +content = "" # Content of the last message read + +message = None +while content != final: + message = queue_first.get(timeout=10) + content = message.body + session.message_accept(RangedSet(message.id)) + print content + +content = "" +while content != final: + message = queue_second.get(timeout=10) + content = message.body + session.message_accept(RangedSet(message.id)) + print content + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close(timeout=10) diff --git a/python/examples/headers/headers_producer.py b/python/examples/headers/headers_producer.py new file mode 100755 index 0000000000..43130d5993 --- /dev/null +++ b/python/examples/headers/headers_producer.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + headers_producer.py + + Publishes messages to an AMQP headers exchange, using + various application header values. +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message +from qpid.datatypes import uuid4 +from qpid.queue import Empty + + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. +props_first = session.message_properties(application_headers={'class':'first'}) +props_second = session.message_properties(application_headers={'class':'second'}) +props_third = session.message_properties(application_headers={'class':'third'}) + +for i in range(10): + session.message_transfer(destination="amq.match", message=Message(props_first,"message(first) " + str(i))) + session.message_transfer(destination="amq.match", message=Message(props_second,"message(second) " + str(i))) + session.message_transfer(destination="amq.match", message=Message(props_third,"message(third) " + str(i))) + +session.message_transfer(destination="amq.match", message=Message(props_first,"That's all, folks!")) +session.message_transfer(destination="amq.match", message=Message(props_second,"That's all, folks!")) +session.message_transfer(destination="amq.match", message=Message(props_third,"That's all, folks!")) + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/python/examples/headers/verify b/python/examples/headers/verify new file mode 100644 index 0000000000..5fe96c5c23 --- /dev/null +++ b/python/examples/headers/verify @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +clients ./declare_queues.py ./headers_producer.py ./headers_consumer.py +outputs ./declare_queues.py.out ./headers_producer.py.out ./headers_consumer.py.out diff --git a/python/examples/headers/verify.in b/python/examples/headers/verify.in new file mode 100644 index 0000000000..90ffd0a071 --- /dev/null +++ b/python/examples/headers/verify.in @@ -0,0 +1,25 @@ +==== declare_queues.py.out +==== headers_producer.py.out +==== headers_consumer.py.out +message(first) 0 +message(first) 1 +message(first) 2 +message(first) 3 +message(first) 4 +message(first) 5 +message(first) 6 +message(first) 7 +message(first) 8 +message(first) 9 +That's all, folks! +message(second) 0 +message(second) 1 +message(second) 2 +message(second) 3 +message(second) 4 +message(second) 5 +message(second) 6 +message(second) 7 +message(second) 8 +message(second) 9 +That's all, folks! diff --git a/python/examples/pubsub/topic_publisher.py b/python/examples/pubsub/topic_publisher.py index 8cf1b08644..b50d5fa8ca 100755 --- a/python/examples/pubsub/topic_publisher.py +++ b/python/examples/pubsub/topic_publisher.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ topic_publisher.py diff --git a/python/examples/pubsub/topic_subscriber.py b/python/examples/pubsub/topic_subscriber.py index 039cc0c55b..489c7cbb19 100755 --- a/python/examples/pubsub/topic_subscriber.py +++ b/python/examples/pubsub/topic_subscriber.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ topic_subscriber.py @@ -63,7 +81,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/pubsub/verify b/python/examples/pubsub/verify index 963d2e32e1..cf1bade62e 100644 --- a/python/examples/pubsub/verify +++ b/python/examples/pubsub/verify @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify background "Queues created" ./topic_subscriber.py clients ./topic_publisher.py diff --git a/python/examples/request-response/client.py b/python/examples/request-response/client.py index a9ecd5c78f..b29fcf3ea7 100755 --- a/python/examples/request-response/client.py +++ b/python/examples/request-response/client.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ client.py @@ -55,7 +73,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/request-response/server.py b/python/examples/request-response/server.py index 05ee051c57..a80c4541e4 100755 --- a/python/examples/request-response/server.py +++ b/python/examples/request-response/server.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ server.py @@ -46,7 +64,7 @@ if len(sys.argv) > 2 : port=int(sys.argv[2]) socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/request-response/verify b/python/examples/request-response/verify index cf8151d4e4..3c058febb2 100644 --- a/python/examples/request-response/verify +++ b/python/examples/request-response/verify @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify background "Request server running" ./server.py clients ./client.py diff --git a/python/examples/xml-exchange/declare_queues.py b/python/examples/xml-exchange/declare_queues.py index bd17da5013..ca40af5dc5 100755 --- a/python/examples/xml-exchange/declare_queues.py +++ b/python/examples/xml-exchange/declare_queues.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ declare_queues.py @@ -35,7 +53,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/xml-exchange/listener.py b/python/examples/xml-exchange/listener.py index dec824dddf..a56f5d6018 100755 --- a/python/examples/xml-exchange/listener.py +++ b/python/examples/xml-exchange/listener.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ listener.py @@ -52,7 +70,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/xml-exchange/verify b/python/examples/xml-exchange/verify index bf05463f1d..a93a32dc90 100644 --- a/python/examples/xml-exchange/verify +++ b/python/examples/xml-exchange/verify @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + # See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify clients ./declare_queues.py ./xml_producer.py ./xml_consumer.py outputs ./declare_queues.py.out ./xml_producer.py.out ./xml_consumer.py.out diff --git a/python/examples/xml-exchange/xml_consumer.py b/python/examples/xml-exchange/xml_consumer.py index 0ab079e7a6..cd89110b05 100755 --- a/python/examples/xml-exchange/xml_consumer.py +++ b/python/examples/xml-exchange/xml_consumer.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ direct_consumer.py @@ -34,7 +52,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/examples/xml-exchange/xml_producer.py b/python/examples/xml-exchange/xml_producer.py index 72c5bdb53a..fa97cab4e1 100755 --- a/python/examples/xml-exchange/xml_producer.py +++ b/python/examples/xml-exchange/xml_producer.py @@ -1,4 +1,22 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# """ xml_producer.py @@ -52,7 +70,7 @@ if len(sys.argv) > 2 : # Create a connection. socket = connect(host, port) -connection = Connection (sock=socket) +connection = Connection (sock=socket, username=user, password=password) connection.start() session = connection.session(str(uuid4())) diff --git a/python/hello-world b/python/hello-world index 5d513cc57b..efee84059c 100755 --- a/python/hello-world +++ b/python/hello-world @@ -1,10 +1,39 @@ #!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys from qpid.connection import Connection from qpid.util import connect from qpid.datatypes import uuid4, Message +broker = "127.0.0.1" +port = 5672 + +if len(sys.argv) > 1: broker = sys.argv[1] +if len(sys.argv) > 2: port = int(sys.argv[2]) + +if len(sys.argv) > 3: + print >> sys.stderr, "usage: hello-world [ [ ] ]" + sys.exit(1) + # connect to the server and start a session -conn = Connection(connect("127.0.0.1", 5672)) +conn = Connection(connect(broker, port)) conn.start() ssn = conn.session(str(uuid4())) diff --git a/python/java_failing_0-8.txt b/python/java_failing_0-8.txt deleted file mode 100644 index c13b40a42c..0000000000 --- a/python/java_failing_0-8.txt +++ /dev/null @@ -1,2 +0,0 @@ -tests_0-8.exchange.RecommendedTypesRuleTests.testTopic -tests_0-8.exchange.RequiredInstancesRuleTests.testAmqTopic diff --git a/python/java_failing_0-9.txt b/python/java_failing_0-9.txt deleted file mode 100644 index 7252d0f496..0000000000 --- a/python/java_failing_0-9.txt +++ /dev/null @@ -1,18 +0,0 @@ -ntests.basic.BasicTests.test_qos_prefetch_count -tests.basic.BasicTests.test_ack -tests.basic.BasicTests.test_cancel -tests.basic.BasicTests.test_consume_exclusive -tests.basic.BasicTests.test_consume_no_local -tests.basic.BasicTests.test_consume_queue_errors -tests.basic.BasicTests.test_consume_unique_consumers -tests.basic.BasicTests.test_get -tests.basic.BasicTests.test_qos_prefetch_size -tests.basic.BasicTests.test_recover_requeue - -tests.exchange.RecommendedTypesRuleTests.testTopic -tests.exchange.RequiredInstancesRuleTests.testAmqTopic - -tests.message.MessageTests.test_checkpoint -tests.message.MessageTests.test_reject - -tests.broker.BrokerTests.test_ping_pong diff --git a/python/mllib/__init__.py b/python/mllib/__init__.py index 39e9363614..9aa1e56e66 100644 --- a/python/mllib/__init__.py +++ b/python/mllib/__init__.py @@ -24,6 +24,8 @@ both SGML and XML. import os, dom, transforms, parsers, sys import xml.sax, types +from xml.sax.handler import ErrorHandler +from xml.sax.xmlreader import InputSource from cStringIO import StringIO def transform(node, *args): @@ -49,15 +51,33 @@ def sgml_parse(source): p.close() return p.parser.tree -def xml_parse(filename): +class Resolver: + + def __init__(self, path): + self.path = path + + def resolveEntity(self, publicId, systemId): + for p in self.path: + fname = os.path.join(p, systemId) + if os.path.exists(fname): + source = InputSource(systemId) + source.setByteStream(open(fname)) + return source + return InputSource(systemId) + +def xml_parse(filename, path=()): if sys.version_info[0:2] == (2,3): # XXX: this is for older versions of python - source = "file://%s" % os.path.abspath(filename) + source = "file://%s" % os.path.abspath(filename) else: source = filename - p = parsers.XMLParser() - xml.sax.parse(source, p) - return p.parser.tree + h = parsers.XMLParser() + p = xml.sax.make_parser() + p.setContentHandler(h) + p.setErrorHandler(ErrorHandler()) + p.setEntityResolver(Resolver(path)) + p.parse(source) + return h.parser.tree def sexp(node): s = transforms.Sexp() diff --git a/python/mllib/dom.py b/python/mllib/dom.py index df2b88322a..486f7082e1 100644 --- a/python/mllib/dom.py +++ b/python/mllib/dom.py @@ -148,6 +148,21 @@ class Tag(Node): if name == k: return v + def _idx(self, attr): + idx = 0 + for k, v in self.attrs: + if k == attr: + return idx + idx += 1 + return None + + def set_attr(self, name, value): + idx = self._idx(name) + if idx is None: + self.attrs.append((name, value)) + else: + self.attrs[idx] = (name, value) + def dispatch(self, f): try: attr = "do_" + self.name diff --git a/python/models/fedsim/__init__.py b/python/models/fedsim/__init__.py new file mode 100644 index 0000000000..63a3f41f28 --- /dev/null +++ b/python/models/fedsim/__init__.py @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + diff --git a/python/models/fedsim/fedsim.py b/python/models/fedsim/fedsim.py new file mode 100644 index 0000000000..edb6c4c8ed --- /dev/null +++ b/python/models/fedsim/fedsim.py @@ -0,0 +1,434 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Sim: + def __init__(self): + self.brokers = {} + self.clients = {} + self.errors = 0 + self.warnings = 0 + + def error(self, text): + self.errors += 1 + print "###### Error:", text + + def warning(self, text): + self.warnings += 1 + print "###### Warning:", text + + def end(self): + print "========================" + print "Errors: %d, Warnings: %d" % (self.errors, self.warnings) + print "========================" + + def dumpState(self): + print "============================" + print "===== Federation State =====" + print "============================" + for broker in self.brokers: + for exchange in self.brokers[broker].exchanges: + print "Exchange %s.%s" % (broker, exchange) + for key in self.brokers[broker].exchanges[exchange].keys: + print " Key %s" % key + for queue in self.brokers[broker].exchanges[exchange].keys[key]: + print " Queue %s origins=%s" % \ + (queue.name, self.brokers[broker].exchanges[exchange].keys[key][queue].originList) + + def addBroker(self, name): + if name in self.brokers: + raise Exception("Broker of same name already exists") + broker = Broker(self, name) + self.brokers[name] = broker + return broker + + def addClient(self, name, broker): + if name in self.clients: + raise Exception("Client of same name already exists") + client = Client(self, name, broker) + self.clients[name] = client + return client + + def link(self, left, right, bidir=True): + print "====== link %s to %s, bidir=%s" % (left.tag, right.tag, bidir) + l1 = left.createLink(right) + l1.bridge("amq.direct") + if bidir: + l2 = right.createLink(left) + l2.bridge("amq.direct") + + def bind(self, client, key): + print "====== bind Client(%s): k=%s" % (client.name, key) + client.bind(key) + + def unbind(self, client, key): + print "====== unbind Client(%s): k=%s" % (client.name, key) + client.unbind(key) + + def sendMessage(self, key, broker, body="Message Body"): + print "====== sendMessage: broker=%s k=%s" % (broker.tag, key) + msg = Message(key, body) + exchange = broker.exchanges["amq.direct"] + for client in self.clients: + self.clients[client].expect(key); + exchange.receive(key, msg, True) + for client in self.clients: + self.clients[client].checkReception() + + +class Destination: + def receive(self, key, msg, fromUser=False): + pass + + +class Client(Destination): + def __init__(self, sim, name, broker): + self.sim = sim + self.name = name + self.broker = broker + self.broker.connect(self) + self.queue = self.broker.declare_queue(name) + self.subscription = self.broker.subscribe(self, name) + self.expected = None + self.boundKeys = [] + + def bind(self, key): + self.boundKeys.append(key) + self.broker.bind("amq.direct", self.name, key) + + def unbind(self, key): + self.boundKeys.remove(key) + self.broker.unbind("amq.direct", self.name, key) + + def receive(self, key, msg, fromUser=False): + print "Client(%s) received [%s]: %s" % (self.name, key, msg.body) + if self.expected == key: + self.expected = None + else: + self.sim.error("Client(%s) received unexpected message with key [%s]" % \ + (self.name, self.expected)) + + def expect(self, key): + if key in self.boundKeys: + self.expected = key + + def checkReception(self): + if self.expected: + self.sim.error("Client(%s) never received message with key [%s]" % \ + (self.name, self.expected)) + +class Broker(Client): + def __init__(self, sim, tag): + self.sim = sim + self.tag = tag + self.connections = {} + self.exchanges = {} + self.queues = {} + self.subscriptions = {} + self.links = {} + self.directExchange = self.declare_exchange("amq.direct") + + def connect(self, client): + if client in self.connections: + raise Exception("Client already connected") + self.connections[client] = Connection(client) + + def declare_queue(self, name, tag=None, exclude=None): + if name in self.queues: + raise Exception("Queue already exists") + self.queues[name] = Queue(self, name, tag, exclude) + + def subscribe(self, dest, queueName): + if queueName not in self.queues: + raise Exception("Queue does not exist") + self.queues[queueName].setDest(dest) + + def declare_exchange(self, name): + if name in self.exchanges: + return + exchange = Exchange(self, name) + self.exchanges[name] = exchange + return exchange + + def bind(self, exchangeName, queueName, key, tagList=[], fedOp=None, origin=None): + if exchangeName not in self.exchanges: + raise Exception("Exchange not found") + if queueName not in self.queues: + raise Exception("Queue not found") + exchange = self.exchanges[exchangeName] + queue = self.queues[queueName] + exchange.bind(queue, key, tagList, fedOp, origin) + + def unbind(self, exchangeName, queueName, key): + if exchangeName not in self.exchanges: + raise Exception("Exchange not found") + if queueName not in self.queues: + raise Exception("Queue not found") + exchange = self.exchanges[exchangeName] + queue = self.queues[queueName] + exchange.unbind(queue, key) + + def createLink(self, other): + if other in self.links: + raise Exception("Peer broker already linked") + link = Link(self, other) + self.links[other] = link + return link + + +class Connection: + def __init__(self, client): + self.client = client + + +class Exchange(Destination): + def __init__(self, broker, name): + self.broker = broker + self.sim = broker.sim + self.name = name + self.keys = {} + self.bridges = [] + + def bind(self, queue, key, tagList, fedOp, origin): + if not fedOp: fedOp = "bind" + print "Exchange(%s.%s) bind q=%s, k=%s, tags=%s, op=%s, origin=%s" % \ + (self.broker.tag, self.name, queue.name, key, tagList, fedOp, origin), + + if self.broker.tag in tagList: + print "(tag ignored)" + return + + if fedOp == "bind" or fedOp == "unbind": + if key not in self.keys: + self.keys[key] = {} + queueMap = self.keys[key] + + if fedOp == "bind": + ## + ## Add local or federation binding case + ## + if queue in queueMap: + if origin and origin in queueMap[queue].originList: + print "(dup ignored)" + elif origin: + queueMap[queue].originList.append(origin) + print "(origin added)" + else: + binding = Binding(origin) + queueMap[queue] = binding + print "(binding added)" + + elif fedOp == "unbind": + ## + ## Delete federation binding case + ## + if queue in queueMap: + binding = queueMap[queue] + if origin and origin in binding.originList: + binding.originList.remove(origin) + if len(binding.originList) == 0: + queueMap.pop(queue) + if len(queueMap) == 0: + self.keys.pop(key) + print "(last origin del)" + else: + print "(removed origin)" + else: + print "(origin not found)" + else: + print "(queue not found)" + + elif fedOp == "reorigin": + print "(ok)" + self.reorigin() + + elif fedOp == "hello": + print "(ok)" + + else: + raise Exception("Unknown fed-opcode '%s'" % fedOp) + + newTagList = [] + newTagList.append(self.broker.tag) + for tag in tagList: + newTagList.append(tag) + if origin: + propOrigin = origin + else: + propOrigin = self.broker.tag + + for bridge in self.bridges: + if bridge.isDynamic(): + bridge.propagate(key, newTagList, fedOp, propOrigin) + + def reorigin(self): + myTag = [] + myTag.append(self.broker.tag) + for key in self.keys: + queueMap = self.keys[key] + found = False + for queue in queueMap: + binding = queueMap[queue] + if binding.isLocal(): + found = True + if found: + for bridge in self.bridges: + if bridge.isDynamic(): + bridge.propagate(key, myTag, "bind", self.broker.tag) + + def unbind(self, queue, key): + print "Exchange(%s.%s) unbind q=%s, k=%s" % (self.broker.tag, self.name, queue.name, key), + if key not in self.keys: + print "(key not known)" + return + queueMap = self.keys[key] + if queue not in queueMap: + print "(queue not bound)" + return + queueMap.pop(queue) + if len(queueMap) == 0: + self.keys.pop(key) + print "(ok, remove bound-key)" + else: + print "(ok)" + + count = 0 + for queue in queueMap: + if len(queueMap[queue].originList) == 0: + count += 1 + + if count == 0: + myTag = [] + myTag.append(self.broker.tag) + for bridge in self.bridges: + if bridge.isDynamic(): + bridge.propagate(key, myTag, "unbind", self.broker.tag) + + def receive(self, key, msg, fromUser=False): + sent = False + if key in self.keys: + queueMap = self.keys[key] + for queue in queueMap: + if queue.enqueue(msg): + sent = True + if not sent and not fromUser: + self.sim.warning("Exchange(%s.%s) received unroutable message: k=%s" % \ + (self.broker.tag, self.name, key)) + + def addDynamicBridge(self, bridge): + if bridge in self.bridges: + raise Exception("Dynamic bridge already added to exchange") + self.bridges.append(bridge) + + for b in self.bridges: + if b != bridge: + b.sendReorigin() + self.reorigin() + +class Queue: + def __init__(self, broker, name, tag=None, exclude=None): + self.broker = broker + self.name = name + self.tag = tag + self.exclude = exclude + self.dest = None + + def setDest(self, dest): + self.dest = dest + + def enqueue(self, msg): + print "Queue(%s.%s) rcvd k=%s, tags=%s" % (self.broker.tag, self.name, msg.key, msg.tags), + if self.dest == None: + print "(dropped, no dest)" + return False + if self.exclude and msg.tagFound(self.exclude): + print "(dropped, tag)" + return False + if self.tag: + msg.appendTag(self.tag) + print "(ok)" + self.dest.receive(msg.key, msg) + return True + + +class Binding: + def __init__(self, origin): + self.originList = [] + if origin: + self.originList.append(origin) + + def isLocal(self): + return len(self.originList) == 0 + + +class Link: + def __init__(self, local, remote): + self.local = local + self.remote = remote + self.remote.connect(self) + self.bridges = [] + + def bridge(self, exchangeName): + bridge = Bridge(self, exchangeName) + + +class Bridge: + def __init__(self, link, exchangeName): + self.link = link + self.exchangeName = exchangeName + if self.exchangeName not in link.local.exchanges: + raise Exception("Exchange not found") + self.exchange = link.local.exchanges[self.exchangeName] + self.queueName = "bridge." + link.local.tag + self.link.remote.declare_queue(self.queueName, self.link.remote.tag, self.link.local.tag) + self.link.remote.subscribe(self.exchange, self.queueName) + self.exchange.addDynamicBridge(self) + + def isDynamic(self): + return True + + def localTag(self): + return self.link.local.tag + + def remoteTag(self): + return self.link.remote.tag + + def propagate(self, key, tagList, fedOp, origin): + if self.link.remote.tag not in tagList: + self.link.remote.bind(self.exchangeName, self.queueName, key, tagList, fedOp, origin) + + def sendReorigin(self): + myTag = [] + myTag.append(self.link.local.tag) + self.link.remote.bind(self.exchangeName, self.queueName, "", myTag, "reorigin", "") + + +class Message: + def __init__(self, key, body): + self.key = key + self.body = body + self.tags = [] + + def appendTag(self, tag): + if tag not in self.tags: + self.tags.append(tag) + + def tagFound(self, tag): + return tag in self.tags + + diff --git a/python/models/fedsim/testBig.py b/python/models/fedsim/testBig.py new file mode 100644 index 0000000000..416a086983 --- /dev/null +++ b/python/models/fedsim/testBig.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from fedsim import Sim + +sim = Sim() +b1 = sim.addBroker("B1") +b2 = sim.addBroker("B2") +b3 = sim.addBroker("B3") +b4 = sim.addBroker("B4") +b5 = sim.addBroker("B5") +b6 = sim.addBroker("B6") +b7 = sim.addBroker("B7") +b8 = sim.addBroker("B8") + +c1 = sim.addClient("C1", b1) +c3 = sim.addClient("C3", b3) +c4 = sim.addClient("C4", b4) +c5 = sim.addClient("C5", b5) +c8 = sim.addClient("C8", b8) + +sim.link(b1, b2) +sim.link(b3, b2) +sim.link(b4, b2) +sim.link(b5, b2) + +sim.link(b6, b7) +sim.link(b6, b8) + +sim.bind(c1, "A") +sim.bind(c3, "B") +sim.bind(c8, "A") + +sim.link(b5, b6) + +sim.bind(c4, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", b3) +sim.sendMessage("A", b4) +sim.sendMessage("A", b5) +sim.sendMessage("A", b6) +sim.sendMessage("A", b7) +sim.sendMessage("A", b8) + +sim.sendMessage("B", b1) +sim.sendMessage("B", b2) +sim.sendMessage("B", b3) +sim.sendMessage("B", b4) +sim.sendMessage("B", b5) +sim.sendMessage("B", b6) +sim.sendMessage("B", b7) +sim.sendMessage("B", b8) + +sim.unbind(c1, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", b3) +sim.sendMessage("A", b4) +sim.sendMessage("A", b5) +sim.sendMessage("A", b6) +sim.sendMessage("A", b7) +sim.sendMessage("A", b8) + +sim.unbind(c4, "A") +sim.unbind(c3, "B") +sim.unbind(c8, "A") + +sim.dumpState() +sim.end() diff --git a/python/models/fedsim/testRing.py b/python/models/fedsim/testRing.py new file mode 100644 index 0000000000..c883b54993 --- /dev/null +++ b/python/models/fedsim/testRing.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from fedsim import Sim + +sim = Sim() +b1 = sim.addBroker("B1") +b2 = sim.addBroker("B2") +b3 = sim.addBroker("B3") + +sim.link(b1, b2, False) +sim.link(b2, b3, False) +sim.link(b3, b1, False) + +c1 = sim.addClient("C1", b1) +c2 = sim.addClient("C2", b2) +c3 = sim.addClient("C3", b3) + +sim.bind(c1, "A") +sim.bind(c2, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", b3) + +sim.unbind(c2, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", b3) + +sim.end() diff --git a/python/models/fedsim/testStar.py b/python/models/fedsim/testStar.py new file mode 100644 index 0000000000..e6b801446f --- /dev/null +++ b/python/models/fedsim/testStar.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from fedsim import Sim + +sim = Sim() +b1 = sim.addBroker("B1") +b2 = sim.addBroker("B2") +b3 = sim.addBroker("B3") +bc = sim.addBroker("BC") + +sim.link(b1, bc) +sim.link(b2, bc) +sim.link(b3, bc) + +c1 = sim.addClient("C1", b1) +c2 = sim.addClient("C2", b2) +c3 = sim.addClient("C3", b3) +cc = sim.addClient("CC", bc) + +sim.bind(c1, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", b3) +sim.sendMessage("A", bc) + +sim.bind(c2, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", b3) +sim.sendMessage("A", bc) + +sim.unbind(c1, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", b3) +sim.sendMessage("A", bc) + +sim.unbind(c2, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", b3) +sim.sendMessage("A", bc) + +sim.end() diff --git a/python/models/fedsim/testStarAdd.py b/python/models/fedsim/testStarAdd.py new file mode 100644 index 0000000000..e0eb44952a --- /dev/null +++ b/python/models/fedsim/testStarAdd.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from fedsim import Sim + +sim = Sim() +b1 = sim.addBroker("B1") +b2 = sim.addBroker("B2") +b3 = sim.addBroker("B3") +bc = sim.addBroker("BC") + +sim.link(b1, bc) +sim.link(b2, bc) + +c1 = sim.addClient("C1", b1) +c2 = sim.addClient("C2", b2) +c3 = sim.addClient("C3", b3) +cc = sim.addClient("CC", bc) + +sim.bind(c1, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", bc) + +sim.bind(c2, "A") + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", bc) + +sim.bind(c3, "A") +sim.link(b3, bc) + +sim.sendMessage("A", b1) +sim.sendMessage("A", b2) +sim.sendMessage("A", bc) + +sim.end() + diff --git a/python/pal2py b/python/pal2py deleted file mode 100755 index 544151bf76..0000000000 --- a/python/pal2py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import sys, os, xml - -from qpid.spec import load, pythonize -from textwrap import TextWrapper -from xml.sax.handler import ContentHandler - -class Block: - - def __init__(self, children): - self.children = children - - def emit(self, out): - for child in self.children: - if not hasattr(child, "emit"): - raise ValueError(child) - child.emit(out) - - if not self.children: - out.line("pass") - -class If: - - def __init__(self, expr, cons, alt = None): - self.expr = expr - self.cons = cons - self.alt = alt - - def emit(self, out): - out.line("if ") - self.expr.emit(out) - out.write(":") - out.level += 1 - self.cons.emit(out) - out.level -= 1 - if self.alt: - out.line("else:") - out.level += 1 - self.alt.emit(out) - out.level -= 1 - -class Stmt: - - def __init__(self, code): - self.code = code - - def emit(self, out): - out.line(self.code) - -class Expr: - - def __init__(self, code): - self.code = code - - def emit(self, out): - out.write(self.code) - -class Abort: - - def __init__(self, expr): - self.expr = expr - - def emit(self, out): - out.line("assert False, ") - self.expr.emit(out) - -WRAPPER = TextWrapper() - -def wrap(text): - return WRAPPER.wrap(" ".join(text.split())) - -class Doc: - - def __init__(self, text): - self.text = text - - def emit(self, out): - out.line('"""') - for line in wrap(self.text): - out.line(line) - out.line('"""') - -class Frame: - - def __init__(self, attrs): - self.attrs = attrs - self.children = [] - self.text = None - - def __getattr__(self, attr): - return self.attrs[attr] - -def isunicode(s): - if isinstance(s, str): - return False - for ch in s: - if ord(ch) > 127: - return True - return False - -def string_literal(s): - if s == None: - return None - if isunicode(s): - return "%r" % s - else: - return "%r" % str(s) - -TRUTH = { - "1": True, - "0": False, - "true": True, - "false": False - } - -LITERAL = { - "shortstr": string_literal, - "longstr": string_literal, - "bit": lambda s: TRUTH[s.lower()], - "longlong": lambda s: "%r" % long(s) - } - -def literal(s, field): - return LITERAL[field.type](s) - -def palexpr(s, field): - if s.startswith("$"): - return "msg.%s" % s[1:] - else: - return literal(s, field) - -class Translator(ContentHandler): - - def __init__(self, spec): - self.spec = spec - self.stack = [] - self.content = None - self.root = Frame(None) - self.push(self.root) - - def emit(self, out): - blk = Block(self.root.children) - blk.emit(out) - out.write("\n") - - def peek(self): - return self.stack[-1] - - def pop(self): - return self.stack.pop() - - def push(self, frame): - self.stack.append(frame) - - def startElement(self, name, attrs): - self.push(Frame(attrs)) - - def endElement(self, name): - frame = self.pop() - if hasattr(self, name): - child = getattr(self, name)(frame) - else: - child = self.handle(name, frame) - - if child: - self.peek().children.append(child) - - def characters(self, text): - frame = self.peek() - if frame.text: - frame.text += text - else: - frame.text = text - - def handle(self, name, frame): - for klass in self.spec.classes: - pyklass = pythonize(klass.name) - if name.startswith(pyklass): - name = name[len(pyklass) + 1:] - break - else: - raise ValueError("unknown class: %s" % name) - - for method in klass.methods: - pymethod = pythonize(method.name) - if name == pymethod: - break - else: - raise ValueError("unknown method: %s" % name) - - args = ["%s = %s" % (key, palexpr(val, method.fields.bypyname[key])) - for key, val in frame.attrs.items()] - if method.content and self.content: - args.append("content = %r" % string_literal(self.content)) - code = "ssn.%s_%s(%s)" % (pyklass, pymethod, ", ".join(args)) - if pymethod == "consume": - code = "consumer_tag = %s.consumer_tag" % code - return Stmt(code) - - def pal(self, frame): - return Block([Doc(frame.text)] + frame.children) - - def include(self, frame): - base, ext = os.path.splitext(frame.filename) - return Stmt("from %s import *" % base) - - def session(self, frame): - return Block([Stmt("cli = open()"), Stmt("ssn = cli.channel(0)"), - Stmt("ssn.channel_open()")] + frame.children) - - def empty(self, frame): - return If(Expr("msg == None"), Block(frame.children)) - - def abort(self, frame): - return Abort(Expr(string_literal(frame.text))) - - def wait(self, frame): - return Stmt("msg = ssn.queue(consumer_tag).get(timeout=%r)" % - (int(frame.timeout)/1000)) - - def basic_arrived(self, frame): - if frame.children: - return If(Expr("msg != None"), Block(frame.children)) - - def basic_content(self, frame): - self.content = frame.text - -class Emitter: - - def __init__(self, out): - self.out = out - self.level = 0 - - def write(self, code): - self.out.write(code) - - def line(self, code): - self.write("\n%s%s" % (" "*self.level, code)) - - def flush(self): - self.out.flush() - - def close(self): - self.out.close() - - -for f in sys.argv[2:]: - base, ext = os.path.splitext(f) - spec = load(sys.argv[1]) - t = Translator(spec) - xml.sax.parse(f, t) -# out = Emitter(open("%s.py" % base)) - out = Emitter(sys.stdout) - t.emit(out) - out.close() diff --git a/python/perftest b/python/perftest deleted file mode 100755 index 2e9148ce50..0000000000 --- a/python/perftest +++ /dev/null @@ -1,95 +0,0 @@ -#!/usr/bin/env python - -def publisher(n): - import qpid - import sys - from qpid.client import Client - from qpid.content import Content - if len(sys.argv) >= 3: - n = int(sys.argv[2]) - client = Client("127.0.0.1", 5672) - client.start({"LOGIN": "guest", "PASSWORD": "guest"}) - channel = client.channel(1) - channel.session_open() - message = Content("message") - message["routing_key"] = "message_queue" - print "producing ", n, " messages" - for i in range(n): - channel.message_transfer(destination="amq.direct", content=message) - - print "producing final message" - message = Content("That's done") - message["routing_key"] = "message_queue" - channel.message_transfer(destination="amq.direct", content=message) - - print "consuming sync message" - consumer = "consumer" - queue = client.queue(consumer) - channel.message_subscribe(queue="sync_queue", destination=consumer) - channel.message_flow(consumer, 0, 0xFFFFFFFF) - channel.message_flow(consumer, 1, 0xFFFFFFFF) - queue.get(block = True) - print "done" - channel.session_close() - -def consumer(): - import sys - import qpid - from qpid.client import Client - from qpid.content import Content - client = Client("127.0.0.1", 5672) - client.start({"LOGIN": "guest", "PASSWORD": "guest"}) - channel = client.channel(1) - channel.session_open() - consumer = "consumer" - queue = client.queue(consumer) - channel.message_subscribe(queue="message_queue", destination=consumer) - channel.message_flow(consumer, 0, 0xFFFFFFFF) - channel.message_flow(consumer, 1, 0xFFFFFFFF) - final = "That's done" - content = "" - message = None - print "getting messages" - while content != final: - message = queue.get(block = True) - content = message.content.body - message.complete(cumulative=True) - - print "consumed all messages" - message = Content("message") - message["routing_key"] = "sync_queue" - channel.message_transfer(destination="amq.direct", content=message) - print "done" - channel.session_close() - -if __name__=='__main__': - import sys - import qpid - from timeit import Timer - from qpid.client import Client - from qpid.content import Content - client = Client("127.0.0.1", 5672) - client.start({"LOGIN": "guest", "PASSWORD": "guest"}) - channel = client.channel(1) - channel.session_open() - channel.queue_declare(queue="message_queue") - channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue") - channel.queue_declare(queue="sync_queue") - channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue") - channel.session_close() - - numMess = 100 - if len(sys.argv) >= 3: - numMess = int(sys.argv[2]) - if len(sys.argv) == 1: - print "error: please specify prod or cons" - elif sys.argv[1] == 'prod': - tprod = Timer("publisher(100)", "from __main__ import publisher") - tp = tprod.timeit(1) - print "produced and consumed" , numMess + 2 ,"messages in: ", tp - elif sys.argv[1] == 'cons': - tcons = Timer("consumer()", "from __main__ import consumer") - tc = tcons.timeit(1) - print "consumed " , numMess ," in: ", tc - else: - print "please specify prod or cons" diff --git a/python/preppy b/python/preppy new file mode 100755 index 0000000000..22893dad03 --- /dev/null +++ b/python/preppy @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, re, sys + +ann = re.compile(r"([ \t]*)@([_a-zA-Z][_a-zA-Z0-9]*)([ \t\n\r]+def[ \t]+)([_a-zA-Z][_a-zA-Z0-9]*)") +line = re.compile(r"\n([ \t]*)[^ \t\n#]+") + +if len(sys.argv) == 2: + major, minor = [int(p) for p in sys.argv[1].split(".")] +elif len(sys.argv) == 1: + major, minor = sys.version_info[0:2] +else: + print "usage: %s [ version ] < input.py > output.py" % sys.argv[0] + sys.exit(-1) + +if major <= 2 and minor <= 3: + def process(input): + output = "" + pos = 0 + while True: + m = ann.search(input, pos) + if m: + indent, decorator, idef, function = m.groups() + output += input[pos:m.start()] + output += "%s#@%s%s%s" % (indent, decorator, idef, function) + pos = m.end() + + subst = "\n%s%s = %s(%s)\n" % (indent, function, decorator, function) + npos = pos + while True: + n = line.search(input, npos) + if not n: + input += subst + break + if len(n.group(1)) <= len(indent): + idx = n.start() + input = input[:idx] + subst + input[idx:] + break + npos = n.end() + else: + break + + output += input[pos:] + return output +else: + def process(input): + return input + +sys.stdout.write(process(sys.stdin.read())) diff --git a/python/qmf/__init__.py b/python/qmf/__init__.py new file mode 100644 index 0000000000..31d5a2ef58 --- /dev/null +++ b/python/qmf/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# diff --git a/python/qmf/console.py b/python/qmf/console.py new file mode 100644 index 0000000000..5348904097 --- /dev/null +++ b/python/qmf/console.py @@ -0,0 +1,1970 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" Console API for Qpid Management Framework """ + +import os +import platform +import qpid +import struct +import socket +import re +from qpid.datatypes import UUID +from qpid.datatypes import timestamp +from qpid.datatypes import datetime +from qpid.peer import Closed +from qpid.session import SessionDetached +from qpid.connection import Connection, ConnectionFailed, Timeout +from qpid.datatypes import Message, RangedSet, UUID +from qpid.util import connect, ssl, URL +from qpid.codec010 import StringCodec as Codec +from threading import Lock, Condition, Thread +from time import time, strftime, gmtime +from cStringIO import StringIO + +#import qpid.log +#qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG) + +class Console: + """ To access the asynchronous operations, a class must be derived from + Console with overrides of any combination of the available methods. """ + + def brokerConnected(self, broker): + """ Invoked when a connection is established to a broker """ + pass + + def brokerDisconnected(self, broker): + """ Invoked when the connection to a broker is lost """ + pass + + def newPackage(self, name): + """ Invoked when a QMF package is discovered. """ + pass + + def newClass(self, kind, classKey): + """ Invoked when a new class is discovered. Session.getSchema can be + used to obtain details about the class.""" + pass + + def newAgent(self, agent): + """ Invoked when a QMF agent is discovered. """ + pass + + def delAgent(self, agent): + """ Invoked when a QMF agent disconects. """ + pass + + def objectProps(self, broker, record): + """ Invoked when an object is updated. """ + pass + + def objectStats(self, broker, record): + """ Invoked when an object is updated. """ + pass + + def event(self, broker, event): + """ Invoked when an event is raised. """ + pass + + def heartbeat(self, agent, timestamp): + """ Invoked when an agent heartbeat is received. """ + pass + + def brokerInfo(self, broker): + """ Invoked when the connection sequence reaches the point where broker information is available. """ + pass + + def methodResponse(self, broker, seq, response): + """ Invoked when a method response from an asynchronous method call is received. """ + pass + +class BrokerURL(URL): + def __init__(self, text): + URL.__init__(self, text) + if self.port is None: + if self.scheme == URL.AMQPS: + self.port = 5671 + else: + self.port = 5672 + self.authName = None + self.authPass = None + if self.user: + self.authName = str(self.user) + if self.password: + self.authPass = str(self.password) + + def name(self): + return self.host + ":" + str(self.port) + + def match(self, host, port): + return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4] + +class Object(object): + """ This class defines a 'proxy' object representing a real managed object on an agent. + Actions taken on this proxy are remotely affected on the real managed object. + """ + def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}): + self._session = session + self._broker = broker + self._schema = schema + self._managed = managed + if self._managed: + self._currentTime = codec.read_uint64() + self._createTime = codec.read_uint64() + self._deleteTime = codec.read_uint64() + self._objectId = ObjectId(codec) + else: + self._currentTime = None + self._createTime = None + self._deleteTime = None + self._objectId = None + self._properties = [] + self._statistics = [] + if codec: + if prop: + notPresent = self._parsePresenceMasks(codec, schema) + for property in schema.getProperties(): + if property.name in notPresent: + self._properties.append((property, None)) + else: + self._properties.append((property, self._session._decodeValue(codec, property.type, broker))) + if stat: + for statistic in schema.getStatistics(): + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker))) + else: + for property in schema.getProperties(): + if property.optional: + self._properties.append((property, None)) + else: + self._properties.append((property, self._session._defaultValue(property, broker, kwargs))) + for statistic in schema.getStatistics(): + self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs))) + + def getBroker(self): + """ Return the broker from which this object was sent """ + return self._broker + + def getObjectId(self): + """ Return the object identifier for this object """ + return self._objectId + + def getClassKey(self): + """ Return the class-key that references the schema describing this object. """ + return self._schema.getKey() + + def getSchema(self): + """ Return the schema that describes this object. """ + return self._schema + + def getMethods(self): + """ Return a list of methods available for this object. """ + return self._schema.getMethods() + + def getTimestamps(self): + """ Return the current, creation, and deletion times for this object. """ + return self._currentTime, self._createTime, self._deleteTime + + def isDeleted(self): + """ Return True iff this object has been deleted. """ + return self._deleteTime != 0 + + def isManaged(self): + """ Return True iff this object is a proxy for a managed object on an agent. """ + return self._managed + + def getIndex(self): + """ Return a string describing this object's primary key. """ + result = u"" + for property, value in self._properties: + if property.index: + if result != u"": + result += u":" + try: + valstr = unicode(self._session._displayValue(value, property.type)) + except: + valstr = u"" + result += valstr + return result + + def getProperties(self): + """ Return a list of object properties """ + return self._properties + + def getStatistics(self): + """ Return a list of object statistics """ + return self._statistics + + def mergeUpdate(self, newer): + """ Replace properties and/or statistics with a newly received update """ + if not self.isManaged(): + raise Exception("Object is not managed") + if self._objectId != newer._objectId: + raise Exception("Objects with different object-ids") + if len(newer.getProperties()) > 0: + self._properties = newer.getProperties() + if len(newer.getStatistics()) > 0: + self._statistics = newer.getStatistics() + + def update(self): + """ Contact the agent and retrieve the lastest property and statistic values for this object. """ + if not self.isManaged(): + raise Exception("Object is not managed") + obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) + if obj: + self.mergeUpdate(obj[0]) + else: + raise Exception("Underlying object no longer exists") + + def __repr__(self): + if self.isManaged(): + id = self.getObjectId().__repr__() + else: + id = "unmanaged" + key = self.getClassKey() + return key.getPackageName() + ":" + key.getClassName() +\ + "[" + id + "] " + self.getIndex().encode("utf8") + + def __getattr__(self, name): + for method in self._schema.getMethods(): + if name == method.name: + return lambda *args, **kwargs : self._invoke(name, args, kwargs) + for property, value in self._properties: + if name == property.name: + return value + if name == "_" + property.name + "_" and property.type == 10: # Dereference references + deref = self._session.getObjects(_objectId=value, _broker=self._broker) + if len(deref) != 1: + return None + else: + return deref[0] + for statistic, value in self._statistics: + if name == statistic.name: + return value + raise Exception("Type Object has no attribute '%s'" % name) + + def __setattr__(self, name, value): + if name[0] == '_': + super.__setattr__(self, name, value) + return + + for prop, unusedValue in self._properties: + if name == prop.name: + newprop = (prop, value) + newlist = [] + for old, val in self._properties: + if name == old.name: + newlist.append(newprop) + else: + newlist.append((old, val)) + self._properties = newlist + return + super.__setattr__(self, name, value) + + def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): + for method in self._schema.getMethods(): + if name == method.name: + aIdx = 0 + sendCodec = Codec() + seq = self._session.seqMgr._reserve((method, synchronous)) + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + self._schema.getKey().encode(sendCodec) + sendCodec.write_str8(name) + + count = 0 + for arg in method.arguments: + if arg.dir.find("I") != -1: + count += 1 + if count != len(args): + raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._session._encodeValue(sendCodec, args[aIdx], arg.type) + aIdx += 1 + if timeWait: + ttl = timeWait * 1000 + else: + ttl = None + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % + (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), + ttl=ttl) + if synchronous: + try: + self._broker.cv.acquire() + self._broker.syncInFlight = True + finally: + self._broker.cv.release() + self._broker._send(smsg) + return seq + return None + + def _invoke(self, name, args, kwargs): + if not self.isManaged(): + raise Exception("Object is not managed") + if "_timeout" in kwargs: + timeout = kwargs["_timeout"] + else: + timeout = self._broker.SYNC_TIME + + if "_async" in kwargs and kwargs["_async"]: + sync = False + if "_timeout" not in kwargs: + timeout = None + else: + sync = True + + seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) + if seq: + if not sync: + return seq + try: + self._broker.cv.acquire() + starttime = time() + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(timeout) + if time() - starttime > timeout: + self._session.seqMgr._release(seq) + raise RuntimeError("Timed out waiting for method to respond") + finally: + self._broker.cv.release() + if self._broker.error != None: + errorText = self._broker.error + self._broker.error = None + raise Exception(errorText) + return self._broker.syncResult + raise Exception("Invalid Method (software defect) [%s]" % name) + + def _encodeUnmanaged(self, codec): + + codec.write_uint8(20) + codec.write_str8(self._schema.getKey().getPackageName()) + codec.write_str8(self._schema.getKey().getClassName()) + codec.write_bin128(self._schema.getKey().getHash()) + + # emit presence masks for optional properties + mask = 0 + bit = 0 + for prop, value in self._properties: + if prop.optional: + if bit == 0: + bit = 1 + if value: + mask |= bit + bit = bit << 1 + if bit == 256: + bit = 0 + codec.write_uint8(mask) + mask = 0 + if bit != 0: + codec.write_uint8(mask) + + # encode properties + for prop, value in self._properties: + if value != None: + self._session._encodeValue(codec, value, prop.type) + + # encode statistics + for stat, value in self._statistics: + self._session._encodeValue(codec, value, stat.type) + + def _parsePresenceMasks(self, codec, schema): + excludeList = [] + bit = 0 + for property in schema.getProperties(): + if property.optional: + if bit == 0: + mask = codec.read_uint8() + bit = 1 + if (mask & bit) == 0: + excludeList.append(property.name) + bit *= 2 + if bit == 256: + bit = 0 + return excludeList + +class Session: + """ + An instance of the Session class represents a console session running + against one or more QMF brokers. A single instance of Session is needed + to interact with the management framework as a console. + """ + _CONTEXT_SYNC = 1 + _CONTEXT_STARTUP = 2 + _CONTEXT_MULTIGET = 3 + + DEFAULT_GET_WAIT_TIME = 60 + + ENCODINGS = { + str: 7, + timestamp: 8, + datetime: 8, + int: 9, + long: 9, + float: 13, + UUID: 14, + Object: 20, + list: 21 + } + + def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, + manageConnections=False, userBindings=False): + """ + Initialize a session. If the console argument is provided, the + more advanced asynchronous features are available. If console is + defaulted, the session will operate in a simpler, synchronous manner. + + The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' + is provided. They control whether object updates, events, and agent-heartbeats are + subscribed to. If the console is not interested in receiving one or more of the above, + setting the argument to False will reduce tha bandwidth used by the API. + + If manageConnections is set to True, the Session object will manage connections to + the brokers. This means that if a broker is unreachable, it will retry until a connection + can be established. If a connection is lost, the Session will attempt to reconnect. + + If manageConnections is set to False, the user is responsible for handing failures. In + this case, an unreachable broker will cause addBroker to raise an exception. + + If userBindings is set to False (the default) and rcvObjects is True, the console will + receive data for all object classes. If userBindings is set to True, the user must select + which classes the console shall receive by invoking the bindPackage or bindClass methods. + This allows the console to be configured to receive only information that is relavant to + a particular application. If rcvObjects id False, userBindings has no meaning. + """ + self.console = console + self.brokers = [] + self.packages = {} + self.seqMgr = SequenceManager() + self.cv = Condition() + self.syncSequenceList = [] + self.getResult = [] + self.getSelect = [] + self.error = None + self.rcvObjects = rcvObjects + self.rcvEvents = rcvEvents + self.rcvHeartbeats = rcvHeartbeats + self.userBindings = userBindings + if self.console == None: + self.rcvObjects = False + self.rcvEvents = False + self.rcvHeartbeats = False + self.bindingKeyList = self._bindingKeys() + self.manageConnections = manageConnections + + if self.userBindings and not self.rcvObjects: + raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + + def __repr__(self): + return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) + + def addBroker(self, target="localhost", timeout=None, mechanisms=None): + """ Connect to a Qpid broker. Returns an object of type Broker. """ + url = BrokerURL(target) + broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass, + ssl = url.scheme == URL.AMQPS, connTimeout=timeout) + + self.brokers.append(broker) + if not self.manageConnections: + self.getObjects(broker=broker, _class="agent") + return broker + + def delBroker(self, broker): + """ Disconnect from a broker. The 'broker' argument is the object + returned from the addBroker call """ + if self.console: + for agent in broker.getAgents(): + self.console.delAgent(agent) + broker._shutdown() + self.brokers.remove(broker) + del broker + + def getPackages(self): + """ Get the list of known QMF packages """ + for broker in self.brokers: + broker._waitForStable() + list = [] + for package in self.packages: + list.append(package) + return list + + def getClasses(self, packageName): + """ Get the list of known classes within a QMF package """ + for broker in self.brokers: + broker._waitForStable() + list = [] + if packageName in self.packages: + for pkey in self.packages[packageName]: + list.append(self.packages[packageName][pkey].getKey()) + return list + + def getSchema(self, classKey): + """ Get the schema for a QMF class """ + for broker in self.brokers: + broker._waitForStable() + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + if pname in self.packages: + if pkey in self.packages[pname]: + return self.packages[pname][pkey] + + def bindPackage(self, packageName): + """ Request object updates for all table classes within a package. """ + if not self.userBindings or not self.rcvObjects: + raise Exception("userBindings option not set for Session") + key = "console.obj.*.*.%s.#" % packageName + self.bindingKeyList.append(key) + for broker in self.brokers: + if broker.isConnected(): + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, + binding_key=key) + + def bindClass(self, pname, cname): + """ Request object updates for a particular table class by package and class name. """ + if not self.userBindings or not self.rcvObjects: + raise Exception("userBindings option not set for Session") + key = "console.obj.*.*.%s.%s.#" % (pname, cname) + self.bindingKeyList.append(key) + for broker in self.brokers: + if broker.isConnected(): + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, + binding_key=key) + + def bindClassKey(self, classKey): + """ Request object updates for a particular table class by class key. """ + pname = classKey.getPackageName() + cname = classKey.getClassName() + self.bindClass(pname, cname) + + def getAgents(self, broker=None): + """ Get a list of currently known agents """ + brokerList = [] + if broker == None: + for b in self.brokers: + brokerList.append(b) + else: + brokerList.append(broker) + + for b in brokerList: + b._waitForStable() + agentList = [] + for b in brokerList: + for a in b.getAgents(): + agentList.append(a) + return agentList + + def makeObject(self, classKey, broker=None, **kwargs): + """ Create a new, unmanaged object of the schema indicated by classKey """ + schema = self.getSchema(classKey) + if schema == None: + raise Exception("Schema not found for classKey") + return Object(self, broker, schema, None, True, True, False, kwargs) + + def getObjects(self, **kwargs): + """ Get a list of objects from QMF agents. + All arguments are passed by name(keyword). + + The class for queried objects may be specified in one of the following ways: + + _schema = - supply a schema object returned from getSchema. + _key = - supply a classKey from the list returned by getClasses. + _class = - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. + _objectId = - get the object referenced by the object-id + + If objects should be obtained from only one agent, use the following argument. + Otherwise, the query will go to all agents. + + _agent = - supply an agent from the list returned by getAgents. + + If the get query is to be restricted to one broker (as opposed to all connected brokers), + add the following argument: + + _broker = - supply a broker as returned by addBroker. + + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout =