summaryrefslogtreecommitdiff
path: root/qpid/cpp/management/python/bin
diff options
context:
space:
mode:
authorJustin Ross <jross@apache.org>2016-04-21 12:31:34 +0000
committerJustin Ross <jross@apache.org>2016-04-21 12:31:34 +0000
commit71149592670f7592886751a9a866459bef0f12cc (patch)
treee4d1fd948055e36d1560112a318e77a210506d06 /qpid/cpp/management/python/bin
parenta835fb2724824dcd8a470fb51424cedeb6b38f62 (diff)
downloadqpid-python-71149592670f7592886751a9a866459bef0f12cc.tar.gz
QPID-7207: Create independent cpp and python subtrees, with content from tools and extras
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1740289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/management/python/bin')
-rw-r--r--qpid/cpp/management/python/bin/.gitignore13
-rwxr-xr-xqpid/cpp/management/python/bin/qmf-tool775
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-config878
-rw-r--r--qpid/cpp/management/python/bin/qpid-config.bat2
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-ha299
-rw-r--r--qpid/cpp/management/python/bin/qpid-ha.bat2
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-printevents191
-rw-r--r--qpid/cpp/management/python/bin/qpid-printevents.bat2
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-qls-analyze114
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-queue-stats159
-rw-r--r--qpid/cpp/management/python/bin/qpid-queue-stats.bat3
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-receive194
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-route635
-rw-r--r--qpid/cpp/management/python/bin/qpid-route.bat2
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-send281
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-stat514
-rw-r--r--qpid/cpp/management/python/bin/qpid-stat.bat2
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-store-chk332
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-store-resize350
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-tool799
-rw-r--r--qpid/cpp/management/python/bin/qpid-tool.bat2
21 files changed, 5549 insertions, 0 deletions
diff --git a/qpid/cpp/management/python/bin/.gitignore b/qpid/cpp/management/python/bin/.gitignore
new file mode 100644
index 0000000000..f99dba8c08
--- /dev/null
+++ b/qpid/cpp/management/python/bin/.gitignore
@@ -0,0 +1,13 @@
+qmf-toolc
+qpid-configc
+qpid-hac
+qpid-printeventsc
+qpid-qls-analyzec
+qpid-queue-statsc
+qpid-receivec
+qpid-routec
+qpid-sendc
+qpid-statc
+qpid-store-chkc
+qpid-store-resizec
+qpid-toolc
diff --git a/qpid/cpp/management/python/bin/qmf-tool b/qpid/cpp/management/python/bin/qmf-tool
new file mode 100755
index 0000000000..407ae74b10
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qmf-tool
@@ -0,0 +1,775 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+import socket
+from cmd import Cmd
+from shlex import split
+from threading import Lock
+from time import strftime, gmtime
+from qpid.disp import Display
+import qpid_messaging
+import qmf2
+
+class OptsAndArgs(object):
+
+ def __init__(self, argv):
+ self.argv = argv
+ self.usage = """qmf-tool [OPTIONS] [<broker-host>[:<port>]]"""
+ self.option_parser = optparse.OptionParser(usage=self.usage)
+ self.conn_group = optparse.OptionGroup(self.option_parser, "Connection Options")
+ self.conn_group.add_option("-u", "--user", action="store", type="string", help="User name for authentication")
+ self.conn_group.add_option("-p", "--password", action="store", type="string", help="Password for authentication")
+ self.conn_group.add_option("-t", "--transport", action="store", type="string", help="Transport type (tcp, ssl, rdma)")
+ self.conn_group.add_option("-m", "--mechanism", action="store", type="string", help="SASL Mechanism for security")
+ self.conn_group.add_option("-s", "--service", action="store", type="string", default="qpidd", help="SASL Service name")
+ self.conn_group.add_option("--min-ssf", action="store", type="int", metavar="<n>", help="Minimum acceptable security strength factor")
+ self.conn_group.add_option("--max-ssf", action="store", type="int", metavar="<n>", help="Maximum acceptable security strength factor")
+ self.conn_group.add_option("--conn-option", action="append", default=[], metavar="<NAME=VALUE>", help="Additional connection option(s)")
+ self.option_parser.add_option_group(self.conn_group)
+
+ self.qmf_group = optparse.OptionGroup(self.option_parser, "QMF Session Options")
+ self.qmf_group.add_option("--domain", action="store", type="string", help="QMF Domain")
+ self.qmf_group.add_option("--agent-age", action="store", type="int", metavar="<n>", help="Time, in minutes, to age out non-communicating agents")
+ self.qmf_group.add_option("--qmf-option", action="append", default=[], metavar="<NAME=VALUE>", help="Additional QMF session option(s)")
+ self.option_parser.add_option_group(self.qmf_group)
+
+ def parse(self):
+ host = "localhost"
+ conn_options = {}
+ qmf_options = []
+
+ options, encArgs = self.option_parser.parse_args(args=self.argv)
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ if len(args) > 1:
+ host = args[1]
+
+ if options.user:
+ conn_options["username"] = options.user
+ if options.password:
+ conn_options["password"] = options.password
+ if options.transport:
+ conn_options["transport"] = options.transport
+ if options.mechanism:
+ conn_options["sasl_mechanisms"] = options.mechanism
+ if options.service:
+ conn_options["sasl_service"] = options.service
+ if options.min_ssf:
+ conn_options["sasl_min_ssf"] = options.min_ssf
+ if options.max_ssf:
+ conn_options["sasl_max_ssf"] = options.max_ssf
+ for x in options.conn_option:
+ try:
+ key, val = x.split('=')
+ conn_options[key] = val
+ except:
+ raise Exception("Improperly formatted text for --conn-option: '%s'" % x)
+
+ if options.domain:
+ qmf_options.append("domain:'%s'" % options.domain)
+ if options.agent_age:
+ qmf_options.append("max-agent-age:%d" % options.agent_age)
+ for x in options.qmf_option:
+ try:
+ key, val = x.split('=')
+ qmf_options.append("%s:%s" % (key, val))
+ except:
+ raise Exception("Improperly formatted text for --qmf-option: '%s'" % x)
+
+ qmf_string = '{'
+ first = True
+ for x in qmf_options:
+ if first:
+ first = None
+ else:
+ qmf_string += ','
+ qmf_string += x
+ qmf_string += '}'
+
+ return host, conn_options, qmf_string
+
+
+
+class Mcli(Cmd):
+ """ Management Command Interpreter """
+
+ def __init__(self, dataObject, dispObject):
+ Cmd.__init__(self)
+ self.dataObject = dataObject
+ self.dispObject = dispObject
+ self.dataObject.setCli(self)
+ self.prompt = "qmf: "
+
+ def emptyline(self):
+ pass
+
+ def setPromptMessage(self, p=None):
+ if p == None:
+ self.prompt = "qmf: "
+ else:
+ self.prompt = "qmf[%s]: " % p
+
+ def do_help(self, data):
+ print "Management Tool for QMF"
+ print
+ print "Agent Commands:"
+ print " set filter <filter-string> - Filter the list of agents"
+ print " list agents - Print a list of the known Agents"
+ print " set default <item-number> - Set the default agent for operations"
+ print " show filter - Show the agent filter currently in effect"
+ print " show agent <item-number> - Print detailed information about an Agent"
+ print " show options - Show option strings used in the QMF session"
+ print
+ print "Schema Commands:"
+ print " list packages - Print a list of packages supported by the default agent"
+ print " list classes [<package-name>] - Print all classes supported byt the default agent"
+ print " show class <class-name> [<package-name>] - Show details of a class"
+ print
+ print "Data Commands:"
+ print " query <class-name> [<package-name>] [<predicate>] - Query for data from the agent"
+ print " list - List accumulated query results"
+ print " clear - Clear accumulated query results"
+ print " show <id> - Show details from a data object"
+ print " call <id> <method> [<args>] - Call a method on a data object"
+ print
+ print "General Commands:"
+ print " set time-format short - Select short timestamp format (default)"
+ print " set time-format long - Select long timestamp format"
+ print " quit or ^D - Exit the program"
+ print
+
+ def complete_set(self, text, line, begidx, endidx):
+ """ Command completion for the 'set' command """
+ tokens = split(line[:begidx])
+ if len(tokens) == 1:
+ return [i for i in ('filter ', 'default ', 'time-format ') if i.startswith(text)]
+ if len(tokens) == 2 and tokens[1] == 'time-format':
+ return [i for i in ('long', 'short') if i.startswith(text)]
+ return []
+
+ def do_set(self, data):
+ tokens = split(data)
+ try:
+ if tokens[0] == "time-format":
+ self.dispObject.do_setTimeFormat(tokens[1])
+ else:
+ self.dataObject.do_set(data)
+ except Exception, e:
+ print "Exception in set command:", e
+
+ def complete_list(self, text, line, begidx, endidx):
+ tokens = split(line[:begidx])
+ if len(tokens) == 1:
+ return [i for i in ('agents', 'packages', 'classes ') if i.startswith(text)]
+ return []
+
+ def do_list(self, data):
+ try:
+ self.dataObject.do_list(data)
+ except Exception, e:
+ print "Exception in list command:", e
+
+ def complete_show(self, text, line, begidx, endidx):
+ tokens = split(line[:begidx])
+ if len(tokens) == 1:
+ return [i for i in ('options', 'filter', 'agent ', 'class ') if i.startswith(text)]
+ return []
+
+ def do_show(self, data):
+ try:
+ self.dataObject.do_show(data)
+ except Exception, e:
+ print "Exception in show command:", e
+
+ def complete_query(self, text, line, begidx, endidx):
+ return []
+
+ def do_query(self, data):
+ try:
+ self.dataObject.do_query(data)
+ except Exception, e:
+ if e.message.__class__ == qmf2.Data:
+ e = e.message.getProperties()
+ print "Exception in query command:", e
+
+ def do_call(self, data):
+ try:
+ self.dataObject.do_call(data)
+ except Exception, e:
+ if e.message.__class__ == qmf2.Data:
+ e = e.message.getProperties()
+ print "Exception in call command:", e
+
+ def do_clear(self, data):
+ try:
+ self.dataObject.do_clear(data)
+ except Exception, e:
+ print "Exception in clear command:", e
+
+ def do_EOF(self, data):
+ print "quit"
+ try:
+ self.dataObject.do_exit()
+ except:
+ pass
+ return True
+
+ def do_quit(self, data):
+ try:
+ self.dataObject.do_exit()
+ except:
+ pass
+ return True
+
+ def postcmd(self, stop, line):
+ return stop
+
+ def postloop(self):
+ print "Exiting..."
+ self.dataObject.close()
+
+
+#======================================================================================================
+# QmfData
+#======================================================================================================
+class QmfData:
+ """
+ """
+ def __init__(self, disp, url, conn_options, qmf_options):
+ self.disp = disp
+ self.url = url
+ self.conn_options = conn_options
+ self.qmf_options = qmf_options
+ self.agent_filter = '[]'
+ self.connection = qpid_messaging.Connection(self.url, **self.conn_options)
+ self.connection.open()
+ self.session = qmf2.ConsoleSession(self.connection, self.qmf_options)
+ self.session.setAgentFilter(self.agent_filter)
+ self.session.open()
+ self.lock = Lock()
+ self.cli = None
+ self.agents = {} # Map of number => agent object
+ self.deleted_agents = {} # Map of number => agent object
+ self.agent_numbers = {} # Map of agent name => number
+ self.next_number = 1
+ self.focus_agent = None
+ self.data_list = {}
+ self.next_data_index = 1
+
+ #=======================
+ # Methods to support CLI
+ #=======================
+ def setCli(self, cli):
+ self.cli = cli
+
+ def close(self):
+ try:
+ self.session.close()
+ self.connection.close()
+ except:
+ pass # we're shutting down - ignore any errors
+
+ def do_list(self, data):
+ tokens = data.split()
+ if len(tokens) == 0:
+ self.listData()
+ elif tokens[0] == 'agents' or tokens[0] == 'agent':
+ self.listAgents()
+ elif tokens[0] == 'packages' or tokens[0] == 'package':
+ self.listPackages()
+ elif tokens[0] == 'classes' or tokens[0] == 'class':
+ self.listClasses(tokens[1:])
+
+ def do_set(self, data):
+ tokens = split(data)
+ if len(tokens) == 0:
+ print "What do you want to set? type 'help' for more information."
+ return
+ if tokens[0] == 'filter':
+ if len(tokens) == 2:
+ self.setAgentFilter(tokens[1])
+ elif tokens[0] == 'default':
+ if len(tokens) == 2:
+ self.updateAgents()
+ number = int(tokens[1])
+ self.focus_agent = self.agents[number]
+ print "Default Agent: %s" % self.focus_agent.getName()
+
+ def do_show(self, data):
+ tokens = split(data)
+ if len(tokens) == 0:
+ print "What do you want to show? Type 'help' for more information."
+ return
+
+ if tokens[0] == 'options':
+ print "Options used in this session:"
+ print " Connection Options : %s" % self.scrubConnOptions()
+ print " QMF Session Options: %s" % self.qmf_options
+ return
+
+ if tokens[0] == 'agent':
+ self.showAgent(tokens[1:])
+ return
+
+ if tokens[0] == 'filter':
+ print self.agent_filter
+ return
+
+ if tokens[0] == "default":
+ if not self.focus_agent:
+ self.updateAgents()
+ if self.focus_agent:
+ print "Default Agent: %s" % self.focus_agent.getName()
+ else:
+ print "Default Agent not set"
+ return
+
+ if tokens[0] == "class":
+ self.showClass(tokens[1:])
+ return
+
+ if tokens[0].isdigit():
+ self.showData(tokens[0])
+ return
+
+ print "What do you want to show? Type 'help' for more information."
+ return
+
+ def do_query(self, data):
+ tokens = split(data)
+ if len(tokens) == 0:
+ print "Class name not specified."
+ return
+ cname = tokens[0]
+ pname = None
+ pred = None
+ if len(tokens) >= 2:
+ if tokens[1][0] == '[':
+ pred = tokens[1]
+ else:
+ pname = tokens[1]
+ if len(tokens) >= 3:
+ pred = tokens[2]
+ query = "{class:'%s'" % cname
+ if pname:
+ query += ",package:'%s'" % pname
+ if pred:
+ query += ",where:%s" % pred
+ query += "}"
+ if not self.focus_agent:
+ self.updateAgents()
+ d_list = self.focus_agent.query(query)
+ local_data_list = {}
+ for d in d_list:
+ local_data_list[self.next_data_index] = d
+ self.next_data_index += 1
+ rows = []
+ for index,val in local_data_list.items():
+ rows.append((index, val.getAddr().getName()))
+ self.data_list[index] = val
+ self.disp.table("Data Objects Returned: %d:" % len(d_list), ("Number", "Data Address"), rows)
+
+ def do_call(self, data):
+ tokens = split(data)
+ if len(tokens) < 2:
+ print "Data ID and method-name not specified."
+ return
+ idx = int(tokens[0])
+ methodName = tokens[1]
+ args = []
+ for arg in tokens[2:]:
+ ##
+ ## If the argument is a map, list, boolean, integer, or floating (one decimal point),
+ ## run it through the Python evaluator so it is converted to the correct type.
+ ##
+ ## TODO: use a regex for this instead of this convoluted logic
+ if arg[0] == '{' or arg[0] == '[' or arg == "True" or arg == "False" or \
+ ((arg.count('.') < 2 and (arg.count('-') == 0 or \
+ (arg.count('-') == 1 and arg[0] == '-')) and \
+ arg.replace('.','').replace('-','').isdigit())):
+ args.append(eval(arg))
+ else:
+ args.append(arg)
+
+ if not idx in self.data_list:
+ print "Unknown data index, run 'query' to get a list of data indices"
+ return
+
+ data = self.data_list[idx]
+ data._getSchema()
+ result = data._invoke(methodName, args, {})
+ rows = []
+ for k,v in result.items():
+ rows.append((k,v))
+ self.disp.table("Output Parameters:", ("Name", "Value"), rows)
+
+ def do_clear(self, data):
+ self.data_list = {}
+ self.next_data_index = 1
+ print "Accumulated query results cleared"
+
+ def do_exit(self):
+ pass
+
+ #====================
+ # Sub-Command Methods
+ #====================
+ def setAgentFilter(self, filt):
+ self.agent_filter = filt
+ self.session.setAgentFilter(filt)
+
+ def updateAgents(self):
+ agents = self.session.getAgents()
+ number_list = []
+ for agent in agents:
+ if agent.getName() not in self.agent_numbers:
+ number = self.next_number
+ number_list.append(number)
+ self.next_number += 1
+ self.agent_numbers[agent.getName()] = number
+ self.agents[number] = agent
+ else:
+ ## Track seen agents so we can clean out deleted ones
+ number = self.agent_numbers[agent.getName()]
+ number_list.append(number)
+ if number in self.deleted_agents:
+ self.agents[number] = self.deleted_agents.pop(number)
+ deleted = []
+ for number in self.agents:
+ if number not in number_list:
+ deleted.append(number)
+ for number in deleted:
+ self.deleted_agents[number] = self.agents.pop(number)
+ if not self.focus_agent:
+ self.focus_agent = self.session.getConnectedBrokerAgent()
+
+ def listAgents(self):
+ self.updateAgents()
+ rows = []
+ for number in self.agents:
+ agent = self.agents[number]
+ if self.focus_agent and agent.getName() == self.focus_agent.getName():
+ d = '*'
+ else:
+ d = ''
+ rows.append((d, number, agent.getVendor(), agent.getProduct(), agent.getInstance(), agent.getEpoch()))
+ self.disp.table("QMF Agents:", ("", "Id", "Vendor", "Product", "Instance", "Epoch"), rows)
+
+ def listPackages(self):
+ if not self.focus_agent:
+ raise "Default Agent not set - use 'set default'"
+ self.focus_agent.loadSchemaInfo()
+ packages = self.focus_agent.getPackages()
+ for p in packages:
+ print " %s" % p
+
+ def getClasses(self, tokens):
+ if not self.focus_agent:
+ raise "Default Agent not set - use 'set default'"
+ return
+ self.focus_agent.loadSchemaInfo()
+ if len(tokens) == 1:
+ classes = self.focus_agent.getSchemaIds(tokens[0]);
+ else:
+ packages = self.focus_agent.getPackages()
+ classes = []
+ for p in packages:
+ classes.extend(self.focus_agent.getSchemaIds(p))
+ return classes
+
+ def listClasses(self, tokens):
+ classes = self.getClasses(tokens)
+ rows = []
+ for c in classes:
+ rows.append((c.getPackageName(), c.getName(), self.classTypeName(c.getType())))
+ self.disp.table("Classes:", ("Package", "Class", "Type"), rows)
+
+ def showClass(self, tokens):
+ if len(tokens) < 1:
+ return
+ classes = self.getClasses([])
+ c = tokens[0]
+ p = None
+ if len(tokens) == 2:
+ p = tokens[1]
+ schema = None
+ sid = None
+ for cls in classes:
+ if c == cls.getName():
+ if not p or p == cls.getPackageName():
+ schema = self.focus_agent.getSchema(cls)
+ sid = cls
+ break
+ if not sid:
+ return
+ print "Class: %s:%s (%s) - %s" % \
+ (sid.getPackageName(), sid.getName(), self.classTypeName(sid.getType()), schema.getDesc())
+ print " hash: %r" % sid.getHash()
+ props = schema.getProperties()
+ methods = schema.getMethods()
+ rows = []
+ for prop in props:
+ name = prop.getName()
+ dtype = self.typeName(prop.getType())
+ if len(prop.getSubtype()) > 0:
+ dtype += "(%s)" % prop.getSubtype()
+ access = self.accessName(prop.getAccess())
+ idx = self.yes_blank(prop.isIndex())
+ opt = self.yes_blank(prop.isOptional())
+ unit = prop.getUnit()
+ desc = prop.getDesc()
+ rows.append((name, dtype, idx, access, opt, unit, desc))
+ self.disp.table("Properties:", ("Name", "Type", "Index", "Access", "Optional", "Unit", "Description"), rows)
+ if len(methods) > 0:
+ for meth in methods:
+ name = meth.getName()
+ desc = meth.getDesc()
+ if len(desc) > 0:
+ desc = " - " + desc
+ args = meth.getArguments()
+ rows = []
+ for prop in args:
+ aname = prop.getName()
+ dtype = self.typeName(prop.getType())
+ if len(prop.getSubtype()) > 0:
+ dtype += "(%s)" % prop.getSubtype()
+ unit = prop.getUnit()
+ adesc = prop.getDesc()
+ io = self.dirName(prop.getDirection())
+ rows.append((aname, dtype, io, unit, adesc))
+ print
+ print " Method: %s%s" % (name, desc)
+ self.disp.table("Arguments:", ("Name", "Type", "Dir", "Unit", "Description"), rows)
+
+ def showAgent(self, tokens):
+ self.updateAgents()
+ for token in tokens:
+ number = int(token)
+ agent = self.agents[number]
+ print
+ print " =================================================================================="
+ print " Agent Id: %d" % number
+ print " Agent Name: %s" % agent.getName()
+ print " Epoch: %d" % agent.getEpoch()
+ print " Attributes:"
+ attrs = agent.getAttributes()
+ keys = attrs.keys()
+ keys.sort()
+ pairs = []
+ for key in keys:
+ if key == '_timestamp' or key == '_schema_updated':
+ val = disp.timestamp(attrs[key])
+ else:
+ val = attrs[key]
+ pairs.append((key, val))
+ self.printAlignedPairs(pairs)
+ agent.loadSchemaInfo()
+ print " Packages:"
+ packages = agent.getPackages()
+ for package in packages:
+ print " %s" % package
+
+ def showData(self, idx):
+ num = int(idx)
+ if not num in self.data_list:
+ print "Data ID not known, run 'query' first to get data"
+ return
+ data = self.data_list[num]
+ props = data.getProperties()
+ rows = []
+ for k,v in props.items():
+ rows.append((k, v))
+ self.disp.table("Properties:", ("Name", "Value"), rows)
+
+ def listData(self):
+ if len(self.data_list) == 0:
+ print "No Query Results - Use the 'query' command"
+ return
+ rows = []
+ for index,val in self.data_list.items():
+ rows.append((index, val.getAgent().getName(), val.getAddr().getName()))
+ self.disp.table("Accumulated Query Results:", ('Number', 'Agent', 'Data Address'), rows)
+
+ def printAlignedPairs(self, rows, indent=8):
+ maxlen = 0
+ for first, second in rows:
+ if len(first) > maxlen:
+ maxlen = len(first)
+ maxlen += indent
+ for first, second in rows:
+ for i in range(maxlen - len(first)):
+ print "",
+ print "%s : %s" % (first, second)
+
+ def classTypeName(self, code):
+ if code == qmf2.SCHEMA_TYPE_DATA: return "Data"
+ if code == qmf2.SCHEMA_TYPE_EVENT: return "Event"
+ return "Unknown"
+
+ def typeName (self, typecode):
+ """ Convert type-codes to printable strings """
+ if typecode == qmf2.SCHEMA_DATA_VOID: return "void"
+ elif typecode == qmf2.SCHEMA_DATA_BOOL: return "bool"
+ elif typecode == qmf2.SCHEMA_DATA_INT: return "int"
+ elif typecode == qmf2.SCHEMA_DATA_FLOAT: return "float"
+ elif typecode == qmf2.SCHEMA_DATA_STRING: return "string"
+ elif typecode == qmf2.SCHEMA_DATA_MAP: return "map"
+ elif typecode == qmf2.SCHEMA_DATA_LIST: return "list"
+ elif typecode == qmf2.SCHEMA_DATA_UUID: return "uuid"
+ else:
+ raise ValueError ("Invalid type code: %s" % str(typecode))
+
+ def valueByType(self, typecode, val):
+ if typecode == 1: return "%d" % val
+ elif typecode == 2: return "%d" % val
+ elif typecode == 3: return "%d" % val
+ elif typecode == 4: return "%d" % val
+ elif typecode == 6: return val
+ elif typecode == 7: return val
+ elif typecode == 8: return strftime("%c", gmtime(val / 1000000000))
+ elif typecode == 9:
+ if val < 0: val = 0
+ sec = val / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
+
+ elif typecode == 10: return str(self.idRegistry.displayId(val))
+ elif typecode == 11:
+ if val:
+ return "True"
+ else:
+ return "False"
+
+ elif typecode == 12: return "%f" % val
+ elif typecode == 13: return "%f" % val
+ elif typecode == 14: return "%r" % val
+ elif typecode == 15: return "%r" % val
+ elif typecode == 16: return "%d" % val
+ elif typecode == 17: return "%d" % val
+ elif typecode == 18: return "%d" % val
+ elif typecode == 19: return "%d" % val
+ elif typecode == 20: return "%r" % val
+ elif typecode == 21: return "%r" % val
+ elif typecode == 22: return "%r" % val
+ else:
+ raise ValueError ("Invalid type code: %s" % str(typecode))
+
+ def accessName (self, code):
+ """ Convert element access codes to printable strings """
+ if code == qmf2.ACCESS_READ_CREATE: return "ReadCreate"
+ elif code == qmf2.ACCESS_READ_WRITE: return "ReadWrite"
+ elif code == qmf2.ACCESS_READ_ONLY: return "ReadOnly"
+ else:
+ raise ValueError ("Invalid access code: %s" % str(code))
+
+ def dirName(self, io):
+ if io == qmf2.DIR_IN: return "in"
+ elif io == qmf2.DIR_OUT: return "out"
+ elif io == qmf2.DIR_IN_OUT: return "in_out"
+ else:
+ raise ValueError("Invalid direction code: %r" % io)
+
+ def notNone (self, text):
+ if text == None:
+ return ""
+ else:
+ return text
+
+ def yes_blank(self, val):
+ if val:
+ return "Y"
+ return ""
+
+ def objectIndex(self, obj):
+ if obj._objectId.isV2:
+ return obj._objectId.getObject()
+ result = ""
+ first = True
+ props = obj.getProperties()
+ for prop in props:
+ if prop[0].index:
+ if not first:
+ result += "."
+ result += self.valueByType(prop[0].type, prop[1])
+ first = None
+ return result
+
+ def scrubConnOptions(self):
+ scrubbed = {}
+ for key, val in self.conn_options.items():
+ if key == "password":
+ val = "***"
+ scrubbed[key] = val
+ return str(scrubbed)
+
+
+#=========================================================
+# Main Program
+#=========================================================
+try:
+ oa = OptsAndArgs(sys.argv)
+ host, conn_options, qmf_options = oa.parse()
+except Exception, e:
+ print "Parse Error: %s" % e
+ sys.exit(1)
+
+disp = Display()
+
+# Attempt to make a connection to the target broker
+try:
+ data = QmfData(disp, host, conn_options, qmf_options)
+except Exception, e:
+ if str(e).find("Exchange not found") != -1:
+ print "Management not enabled on broker: Use '-m yes' option on broker startup."
+ else:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+# Instantiate the CLI interpreter and launch it.
+cli = Mcli(data, disp)
+print("Management Tool for QMF")
+try:
+ cli.cmdloop()
+except KeyboardInterrupt:
+ print
+ print "Exiting..."
+except Exception, e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+# alway attempt to cleanup broker resources
+data.close()
diff --git a/qpid/cpp/management/python/bin/qpid-config b/qpid/cpp/management/python/bin/qpid-config
new file mode 100755
index 0000000000..3d4bb6036a
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-config
@@ -0,0 +1,878 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import pdb
+
+import os
+from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
+import sys
+import locale
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpid.messaging import Connection, ConnectionError
+from qpidtoollibs import BrokerAgent
+from qpidtoollibs import Display, Header
+
+usage = """
+Usage: qpid-config [OPTIONS]
+ qpid-config [OPTIONS] exchanges [filter-string]
+ qpid-config [OPTIONS] queues [filter-string]
+ qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]
+ qpid-config [OPTIONS] del exchange <name>
+ qpid-config [OPTIONS] add queue <name> [AddQueueOptions]
+ qpid-config [OPTIONS] del queue <name> [DelQueueOptions]
+ qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]
+ <for type xml> [-f -|filename]
+ <for type header> [all|any] k1=v1 [, k2=v2...]
+ qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]
+ qpid-config [OPTIONS] reload-acl
+ qpid-config [OPTIONS] add <type> <name> [--argument <property-name>=<property-value>]
+ qpid-config [OPTIONS] del <type> <name>
+ qpid-config [OPTIONS] list <type> [--show-property <property-name>]
+ qpid-config [OPTIONS] log [<logstring>]
+ qpid-config [OPTIONS] shutdown"""
+
+description = """
+Examples:
+
+$ qpid-config add queue q
+$ qpid-config add exchange direct d -a localhost:5672
+$ qpid-config exchanges -b 10.1.1.7:10000
+$ qpid-config queues -b guest/guest@broker-host:10000
+
+Add Exchange <type> values:
+
+ direct Direct exchange for point-to-point communication
+ fanout Fanout exchange for broadcast communication
+ topic Topic exchange that routes messages using binding keys with wildcards
+ headers Headers exchange that matches header fields against the binding keys
+ xml XML Exchange - allows content filtering using an XQuery
+
+
+Queue Limit Actions:
+
+ none (default) - Use broker's default policy
+ reject - Reject enqueued messages
+ ring - Replace oldest unacquired message with new
+
+Replication levels:
+
+ none - no replication
+ configuration - replicate queue and exchange existence and bindings, but not messages.
+ all - replicate configuration and messages
+
+Log <logstring> value:
+
+ Comma separated <module>:<level> pairs, e.g. 'info+,debug+:Broker,trace+:Queue'
+"""
+
+REPLICATE_LEVELS= ["none", "configuration", "all"]
+DEFAULT_PROPERTIES = {"exchange":["name", "type", "durable"], "queue":["name", "durable", "autoDelete"]}
+
+def get_value(r):
+ if len(r) == 2:
+ try:
+ value = int(r[1])
+ except:
+ value = r[1]
+ else: value = None
+ return value
+
+class Config:
+ def __init__(self):
+ self._recursive = False
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._ignoreDefault = False
+ self._altern_ex = None
+ self._durable = False
+ self._replicate = None
+ self._if_empty = True
+ self._if_unused = True
+ self._fileCount = None
+ self._fileSize = None
+ self._efp_partition_num = None
+ self._efp_pool_file_size = None
+ self._maxQueueSize = None
+ self._maxQueueCount = None
+ self._limitPolicy = None
+ self._msgSequence = False
+ self._lvq_key = None
+ self._ive = False
+ self._eventGeneration = None
+ self._file = None
+ self._flowStopCount = None
+ self._flowResumeCount = None
+ self._flowStopSize = None
+ self._flowResumeSize = None
+ self._msgGroupHeader = None
+ self._sharedMsgGroup = False
+ self._extra_arguments = []
+ self._start_replica = None
+ self._returnCode = 0
+ self._list_properties = []
+
+ def getOptions(self):
+ options = {}
+ for a in self._extra_arguments:
+ r = a.split("=", 1)
+ options[r[0]] = get_value(r)
+ return options
+
+
+config = Config()
+conn_options = {}
+
+FILECOUNT = "qpid.file_count"
+FILESIZE = "qpid.file_size"
+EFP_PARTITION_NUM = "qpid.efp_partition_num"
+EFP_POOL_FILE_SIZE = "qpid.efp_pool_file_size"
+MAX_QUEUE_SIZE = "qpid.max_size"
+MAX_QUEUE_COUNT = "qpid.max_count"
+POLICY_TYPE = "qpid.policy_type"
+LVQ_KEY = "qpid.last_value_queue_key"
+MSG_SEQUENCE = "qpid.msg_sequence"
+IVE = "qpid.ive"
+FLOW_STOP_COUNT = "qpid.flow_stop_count"
+FLOW_RESUME_COUNT = "qpid.flow_resume_count"
+FLOW_STOP_SIZE = "qpid.flow_stop_size"
+FLOW_RESUME_SIZE = "qpid.flow_resume_size"
+MSG_GROUP_HDR_KEY = "qpid.group_header_key"
+SHARED_MSG_GROUP = "qpid.shared_msg_group"
+REPLICATE = "qpid.replicate"
+#There are various arguments to declare that have specific program
+#options in this utility. However there is now a generic mechanism for
+#passing arguments as well. The SPECIAL_ARGS list contains the
+#arguments for which there are specific program options defined
+#i.e. the arguments for which there is special processing on add and
+#list
+SPECIAL_ARGS=[
+ FILECOUNT,FILESIZE,EFP_PARTITION_NUM,EFP_POOL_FILE_SIZE,
+ MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,
+ LVQ_KEY,MSG_SEQUENCE,IVE,
+ FLOW_STOP_COUNT,FLOW_RESUME_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+ MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE]
+
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+def Usage():
+ print usage
+ sys.exit(-1)
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+
+
+ parser = OptionParser(usage=usage,
+ description=description,
+ formatter=JHelpFormatter())
+
+ group1 = OptionGroup(parser, "General Options")
+ group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
+ group1.add_option("-r", "--recursive", action="store_true", help="Show bindings in queue or exchange list")
+ group1.add_option("-b", "--broker", action="store", type="string", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
+ group1.add_option("-a", "--broker-addr", action="store", type="string", metavar="<address>")
+ group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ group1.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
+ group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
+ parser.add_option_group(group1)
+
+ group_ls = OptionGroup(parser, "Options for Listing Exchanges and Queues")
+ group_ls.add_option("--ignore-default", action="store_true", help="Ignore the default exchange in exchange or queue list")
+ parser.add_option_group(group_ls)
+
+ group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
+ group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
+ group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
+ group2.add_option("--replicate", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'all').")
+ parser.add_option_group(group2)
+
+ group3 = OptionGroup(parser, "Options for Adding Queues")
+ group3.add_option("--file-count", action="store", type="int", metavar="<n>", help="[legacystore] Number of files in queue's persistence journal")
+ group3.add_option("--file-size", action="store", type="int", metavar="<n>", help="[legactystore] File size in pages (64KiB/page)")
+ group3.add_option("--efp-partition-num", action="store", type="int", metavar="<n>", help="[linearstore] EFP partition number")
+ group3.add_option("--efp-pool-file-size", action="store", type="int", metavar="<n>", help="[linearstore] EFP file size (KiB)")
+ group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes")
+ group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages")
+ group3.add_option("--limit-policy", action="store", choices=["none", "reject", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
+ group3.add_option("--lvq-key", action="store", metavar="<key>", help="Last Value Queue key")
+ group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued bytes exceeds this value.")
+ group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued bytes drops below this value.")
+ group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued messages exceeds this value.")
+ group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued messages drops below this value.")
+ group3.add_option("--group-header", action="store", type="string", metavar="<header-name>",
+ help="Enable message groups. Specify name of header that holds group identifier.")
+ group3.add_option("--shared-groups", action="store_true",
+ help="Allow message group consumption across multiple consumers.")
+ group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
+ metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
+ group3.add_option("--start-replica", metavar="<broker-url>", help="Start replication from the same-named queue at <broker-url>")
+ # no option for declaring an exclusive queue - which can only be used by the session that creates it.
+ parser.add_option_group(group3)
+
+ group4 = OptionGroup(parser, "Options for Adding Exchanges")
+ group4.add_option("--sequence", action="store_true", help="Exchange will insert a 'qpid.msg_sequence' field in the message header")
+ group4.add_option("--ive", action="store_true", help="Exchange will behave as an 'initial-value-exchange', keeping a reference to the last message forwarded and enqueuing that message to newly bound queues.")
+ parser.add_option_group(group4)
+
+ group5 = OptionGroup(parser, "Options for Deleting Queues")
+ group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty")
+ group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty")
+ group5.add_option("--force-if-used", action="store_true", help="Force delete of queue even if it's currently used")
+ parser.add_option_group(group5)
+
+ group6 = OptionGroup(parser, "Options for Declaring Bindings")
+ group6.add_option("-f", "--file", action="store", type="string", metavar="<file.xq>", help="For XML Exchange bindings - specifies the name of a file containing an XQuery.")
+ parser.add_option_group(group6)
+
+ group_7 = OptionGroup(parser, "Formatting options for 'list' action")
+ group_7.add_option("--show-property", dest="list_properties", action="append", default=[],
+ metavar="<property-name>", help="Specify a property of an object to be included in output")
+ parser.add_option_group(group_7)
+
+ opts, encArgs = parser.parse_args(args=argv)
+
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ if opts.recursive:
+ config._recursive = True
+ if opts.broker:
+ config._host = opts.broker
+ if opts.broker_addr:
+ config._host = opts.broker_addr
+ if config._host is None: config._host="localhost:5672"
+ if opts.timeout is not None:
+ config._connTimeout = opts.timeout
+ if config._connTimeout == 0:
+ config._connTimeout = None
+ if opts.ignore_default:
+ config._ignoreDefault = True
+ if opts.alternate_exchange:
+ config._altern_ex = opts.alternate_exchange
+ if opts.durable:
+ config._durable = True
+ if opts.replicate:
+ if not opts.replicate in REPLICATE_LEVELS:
+ raise Exception("Invalid replication level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS)))
+ config._replicate = opts.replicate
+ if opts.ha_admin: config._ha_admin = True
+ if opts.file:
+ config._file = opts.file
+ if opts.file_count is not None:
+ config._fileCount = opts.file_count
+ if opts.file_size is not None:
+ config._fileSize = opts.file_size
+ if opts.efp_partition_num is not None:
+ config._efp_partition_num = opts.efp_partition_num
+ if opts.efp_pool_file_size is not None:
+ config._efp_pool_file_size = opts.efp_pool_file_size
+ if opts.max_queue_size is not None:
+ config._maxQueueSize = opts.max_queue_size
+ if opts.max_queue_count is not None:
+ config._maxQueueCount = opts.max_queue_count
+ if opts.limit_policy:
+ config._limitPolicy = opts.limit_policy
+ if opts.sequence:
+ config._msgSequence = True
+ if opts.lvq_key:
+ config._lvq_key = opts.lvq_key
+ if opts.ive:
+ config._ive = True
+ if opts.force:
+ config._if_empty = False
+ config._if_unused = False
+ if opts.force_if_not_empty:
+ config._if_empty = False
+ if opts.force_if_used:
+ config._if_unused = False
+ if opts.sasl_mechanism:
+ config._sasl_mechanism = opts.sasl_mechanism
+ if opts.flow_stop_size is not None:
+ config._flowStopSize = opts.flow_stop_size
+ if opts.flow_resume_size is not None:
+ config._flowResumeSize = opts.flow_resume_size
+ if opts.flow_stop_count is not None:
+ config._flowStopCount = opts.flow_stop_count
+ if opts.flow_resume_count is not None:
+ config._flowResumeCount = opts.flow_resume_count
+ if opts.group_header:
+ config._msgGroupHeader = opts.group_header
+ if opts.shared_groups:
+ config._sharedMsgGroup = True
+ if opts.extra_arguments:
+ config._extra_arguments = opts.extra_arguments
+ if opts.start_replica:
+ config._start_replica = opts.start_replica
+ if opts.list_properties:
+ config._list_properties = opts.list_properties
+
+ if opts.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.sasl_service_name:
+ conn_options['sasl_service'] = opts.sasl_service_name
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ if not opts.ssl_certificate:
+ parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ conn_options['ssl_keyfile'] = opts.ssl_key
+ if opts.ha_admin:
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
+ return args
+
+
+#
+# helpers for the arg parsing in bind(). return multiple values; "ok"
+# followed by the resultant args
+
+#
+# accept -f followed by either
+# a filename or "-", for stdin. pull the bits into a string, to be
+# passed to the xml binding.
+#
+def snarf_xquery_args():
+ if not config._file:
+ print "Invalid args to bind xml: need an input file or stdin"
+ return [False]
+ if config._file == "-":
+ res = sys.stdin.read()
+ else:
+ f = open(config._file) # let this signal if it can't find it
+ res = f.read()
+ f.close()
+ return [True, res]
+
+#
+# look for "any"/"all" and grok the rest of argv into a map
+#
+def snarf_header_args(args):
+
+ if len(args) < 2:
+ print "Invalid args to bind headers: need 'any'/'all' plus conditions"
+ return [False]
+ op = args[0]
+ if op == "all" or op == "any":
+ kv = {}
+ for thing in args[1:]:
+ k_and_v = thing.split("=")
+ kv[k_and_v[0]] = k_and_v[1]
+ return [True, op, kv]
+ else:
+ print "Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'"
+ return [False]
+
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.conn = None
+ self.broker = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.conn = Connection.establish(self.url, **conn_options)
+ self.broker = BrokerAgent(self.conn)
+
+ def Disconnect(self, ignore=True):
+ if self.conn:
+ try:
+ self.conn.close()
+ except Exception, e:
+ if ignore:
+ # suppress close errors to avoid
+ # tracebacks when a previous
+ # exception will be printed to stdout
+ pass
+ else:
+ # raise last exception so complete
+ # trackback is preserved
+ raise
+
+ def Overview(self):
+ exchanges = self.broker.getAllExchanges()
+ queues = self.broker.getAllQueues()
+ print "Total Exchanges: %d" % len(exchanges)
+ etype = {}
+ for ex in exchanges:
+ if ex.type not in etype:
+ etype[ex.type] = 1
+ else:
+ etype[ex.type] = etype[ex.type] + 1
+ for typ in etype:
+ print "%15s: %d" % (typ, etype[typ])
+
+ print
+ print " Total Queues: %d" % len(queues)
+ durable = 0
+ for queue in queues:
+ if queue.durable:
+ durable = durable + 1
+ print " durable: %d" % durable
+ print " non-durable: %d" % (len(queues) - durable)
+
+ def ExchangeList(self, filter):
+ exchanges = self.broker.getAllExchanges()
+ caption1 = "Type "
+ caption2 = "Exchange Name"
+ maxNameLen = len(caption2)
+ found = False
+ for ex in exchanges:
+ if self.match(ex.name, filter):
+ if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
+ found = True
+ if not found:
+ global config
+ config._returnCode = 1
+ return
+
+ print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
+ line = ""
+ for i in range(((maxNameLen + len(caption1)) / 5) + 5):
+ line += "====="
+ print line
+
+ for ex in exchanges:
+ if config._ignoreDefault and not ex.name: continue
+ if self.match(ex.name, filter):
+ print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
+ args = ex.arguments
+ if not args: args = {}
+ if ex.durable: print "--durable",
+ if REPLICATE in args: print "--replicate=%s" % args[REPLICATE],
+ if MSG_SEQUENCE in args and args[MSG_SEQUENCE]: print "--sequence",
+ if IVE in args and args[IVE]: print "--ive",
+ if ex.altExchange:
+ print "--alternate-exchange=%s" % ex.altExchange,
+ print
+
+ def ExchangeListRecurse(self, filter):
+ exchanges = self.broker.getAllExchanges()
+ bindings = self.broker.getAllBindings()
+ queues = self.broker.getAllQueues()
+ for ex in exchanges:
+ if config._ignoreDefault and not ex.name: continue
+ if self.match(ex.name, filter):
+ print "Exchange '%s' (%s)" % (ex.name, ex.type)
+ for bind in bindings:
+ if bind.exchangeRef == ex.name:
+ qname = "<unknown>"
+ queue = self.findById(queues, bind.queueRef)
+ if queue != None:
+ qname = queue.name
+ if bind.arguments:
+ print " bind [%s] => %s %s" % (bind.bindingKey, qname, bind.arguments)
+ else:
+ print " bind [%s] => %s" % (bind.bindingKey, qname)
+
+
+ def QueueList(self, filter):
+ queues = self.broker.getAllQueues()
+ caption = "Queue Name"
+ maxNameLen = len(caption)
+ found = False
+ for q in queues:
+ if self.match(q.name, filter):
+ if len(q.name) > maxNameLen: maxNameLen = len(q.name)
+ found = True
+ if not found:
+ global config
+ config._returnCode = 1
+ return
+
+ print "%-*s Attributes" % (maxNameLen, caption)
+ line = ""
+ for i in range((maxNameLen / 5) + 5):
+ line += "====="
+ print line
+
+ for q in queues:
+ if self.match(q.name, filter):
+ print "%-*s " % (maxNameLen, q.name),
+ args = q.arguments
+ if not args: args = {}
+ if q.durable: print "--durable",
+ if REPLICATE in args: print "--replicate=%s" % args[REPLICATE],
+ if q.autoDelete: print "auto-del",
+ if q.exclusive: print "excl",
+ if FILESIZE in args: print "--file-size=%s" % args[FILESIZE],
+ if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT],
+ if EFP_PARTITION_NUM in args: print "--efp-partition-num=%s" % args[EFP_PARTITION_NUM],
+ if EFP_POOL_FILE_SIZE in args: print "--efp-pool-file-size=%s" % args[EFP_POOL_FILE_SIZE],
+ if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE],
+ if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT],
+ if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
+ if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY],
+ if q.altExchange:
+ print "--alternate-exchange=%s" % q.altExchange,
+ if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
+ if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
+ if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
+ if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+ if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY],
+ if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups",
+ print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
+
+ def QueueListRecurse(self, filter):
+ exchanges = self.broker.getAllExchanges()
+ bindings = self.broker.getAllBindings()
+ queues = self.broker.getAllQueues()
+ for queue in queues:
+ if self.match(queue.name, filter):
+ print "Queue '%s'" % queue.name
+ for bind in bindings:
+ if bind.queueRef == queue.name:
+ ename = "<unknown>"
+ ex = self.findById(exchanges, bind.exchangeRef)
+ if ex != None:
+ ename = ex.name
+ if ename == "":
+ if config._ignoreDefault: continue
+ ename = "''"
+ if bind.arguments:
+ print " bind [%s] => %s %s" % (bind.bindingKey, ename, bind.arguments)
+ else:
+ print " bind [%s] => %s" % (bind.bindingKey, ename)
+
+ def AddExchange(self, args):
+ if len(args) < 2:
+ Usage()
+ etype = args[0]
+ ename = args[1]
+ declArgs = {}
+ for a in config._extra_arguments:
+ r = a.split("=", 1)
+ declArgs[r[0]] = get_value(r)
+
+ if config._msgSequence:
+ declArgs[MSG_SEQUENCE] = 1
+ if config._ive:
+ declArgs[IVE] = 1
+ if config._altern_ex:
+ declArgs['alternate-exchange'] = config._altern_ex
+ if config._durable:
+ declArgs['durable'] = 1
+ if config._replicate:
+ declArgs[REPLICATE] = config._replicate
+ self.broker.addExchange(etype, ename, declArgs)
+
+
+ def DelExchange(self, args):
+ if len(args) < 1:
+ Usage()
+ ename = args[0]
+ self.broker.delExchange(ename)
+
+
+ def AddQueue(self, args):
+ if len(args) < 1:
+ Usage()
+ qname = args[0]
+ declArgs = {}
+ for a in config._extra_arguments:
+ r = a.split("=", 1)
+ declArgs[r[0]] = get_value(r)
+
+ if config._durable:
+ # allow the default fileCount and fileSize specified
+ # in qpid config file to take prededence
+ if config._fileCount:
+ declArgs[FILECOUNT] = config._fileCount
+ if config._fileSize:
+ declArgs[FILESIZE] = config._fileSize
+ if config._efp_partition_num:
+ declArgs[EFP_PARTITION_NUM] = config._efp_partition_num
+ if config._efp_pool_file_size:
+ declArgs[EFP_POOL_FILE_SIZE] = config._efp_pool_file_size
+
+ if config._maxQueueSize is not None:
+ declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize
+ if config._maxQueueCount is not None:
+ declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount
+ if config._limitPolicy:
+ if config._limitPolicy == "none":
+ pass
+ elif config._limitPolicy == "reject":
+ declArgs[POLICY_TYPE] = "reject"
+ elif config._limitPolicy == "ring":
+ declArgs[POLICY_TYPE] = "ring"
+
+ if config._lvq_key:
+ declArgs[LVQ_KEY] = config._lvq_key
+
+ if config._flowStopSize is not None:
+ declArgs[FLOW_STOP_SIZE] = config._flowStopSize
+ if config._flowResumeSize is not None:
+ declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize
+ if config._flowStopCount is not None:
+ declArgs[FLOW_STOP_COUNT] = config._flowStopCount
+ if config._flowResumeCount is not None:
+ declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+
+ if config._msgGroupHeader:
+ declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader
+ if config._sharedMsgGroup:
+ declArgs[SHARED_MSG_GROUP] = 1
+
+ if config._altern_ex:
+ declArgs['alternate-exchange'] = config._altern_ex
+ if config._durable:
+ declArgs['durable'] = 1
+ if config._replicate:
+ declArgs[REPLICATE] = config._replicate
+ self.broker.addQueue(qname, declArgs)
+ if config._start_replica: # Start replication
+ self.broker._method("replicate", {"broker":config._start_replica, "queue":qname}, "org.apache.qpid.ha:habroker:ha-broker")
+
+ def DelQueue(self, args):
+ if len(args) < 1:
+ Usage()
+ qname = args[0]
+ self.broker.delQueue(qname, if_empty=config._if_empty, if_unused=config._if_unused)
+
+
+
+ def Bind(self, args):
+ if len(args) < 2:
+ Usage()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len(args) > 2:
+ key = args[2]
+
+ # query the exchange to determine its type.
+ res = self.broker.getExchange(ename)
+
+ # type of the xchg determines the processing of the rest of
+ # argv. if it's an xml xchg, we want to find a file
+ # containing an x-query, and pass that. if it's a headers
+ # exchange, we need to pass either "any" or all, followed by a
+ # map containing key/value pairs. if neither of those, extra
+ # args are ignored.
+ ok = True
+ _args = {}
+ if not res:
+ pass
+ elif res.type == "xml":
+ # this checks/imports the -f arg
+ [ok, xquery] = snarf_xquery_args()
+ _args = { "xquery" : xquery }
+ else:
+ if res.type == "headers":
+ [ok, op, kv] = snarf_header_args(args[3:])
+ _args = kv
+ _args["x-match"] = op
+
+ if not ok:
+ sys.exit(1)
+
+ self.broker.bind(ename, qname, key, _args)
+
+ def Unbind(self, args):
+ if len(args) < 2:
+ Usage()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len(args) > 2:
+ key = args[2]
+ self.broker.unbind(ename, qname, key)
+
+ def ReloadAcl(self):
+ try:
+ self.broker.reloadAclFile()
+ except Exception, e:
+ if str(e).find('No object found') != -1:
+ print "Failed: ACL Module Not Loaded in Broker"
+ else:
+ raise
+
+ def findById(self, items, id):
+ for item in items:
+ if item.name == id:
+ return item
+ return None
+
+ def match(self, name, filter):
+ if filter == "":
+ return True
+ if name.find(filter) == -1:
+ return False
+ return True
+
+def YN(bool):
+ if bool:
+ return 'Y'
+ return 'N'
+
+def _clean_ref(o):
+ if isinstance(o, dict) and "_object_name" in o:
+ fqn = o["_object_name"]
+ parts = fqn.split(":",2)
+ return parts[len(parts)-1]
+ else:
+ return o
+
+def main(argv=None):
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager()
+
+ try:
+ bm.SetBroker(config._host)
+ if len(args) == 0:
+ bm.Overview()
+ else:
+ cmd = args[0]
+ modifier = ""
+ if len(args) > 1:
+ modifier = args[1]
+ if cmd == "exchanges":
+ if config._recursive:
+ bm.ExchangeListRecurse(modifier)
+ else:
+ bm.ExchangeList(modifier)
+ elif cmd == "queues":
+ if config._recursive:
+ bm.QueueListRecurse(modifier)
+ else:
+ bm.QueueList(modifier)
+ elif cmd == "add":
+ if modifier == "exchange":
+ bm.AddExchange(args[2:])
+ elif modifier == "queue":
+ bm.AddQueue(args[2:])
+ elif len(args) > 2:
+ bm.broker.create(modifier, args[2], config.getOptions())
+ else:
+ Usage()
+ elif cmd == "del":
+ if modifier == "exchange":
+ bm.DelExchange(args[2:])
+ elif modifier == "queue":
+ bm.DelQueue(args[2:])
+ elif len(args) > 2:
+ bm.broker.delete(modifier, args[2], {})
+ else:
+ Usage()
+ elif cmd == "bind":
+ bm.Bind(args[1:])
+ elif cmd == "unbind":
+ bm.Unbind(args[1:])
+ elif cmd == "reload-acl":
+ bm.ReloadAcl()
+ elif cmd == "list" and len(args) > 1:
+ # fetch objects
+ objects = bm.broker.list(modifier)
+
+ # collect available attributes
+ attributes = []
+ for o in objects:
+ for k in o.keys():
+ if k == "name" and k not in attributes:
+ attributes.insert(0, k)
+ elif k not in attributes:
+ attributes.append(k)
+
+ # determine which attributes to display
+ desired = []
+ if len(config._list_properties):
+ for p in config._list_properties:
+ if p not in attributes: print "Warning: No such property '%s' for type '%s'" % (p, modifier)
+ else: desired.append(p)
+ elif modifier in DEFAULT_PROPERTIES:
+ desired = DEFAULT_PROPERTIES[modifier]
+ else:
+ desired = attributes[:6]
+
+ # display
+ display = Display(prefix=" ")
+ headers = [Header(a) for a in desired]
+ rows = [tuple([_clean_ref(o.get(a, "n/a")) for a in desired]) for o in objects]
+ display.formattedTable("Objects of type '%s'" % modifier, headers, rows)
+ elif cmd == "log" and len (args) == 1:
+ print "Log level:", bm.broker.getLogLevel()["level"]
+ elif cmd == "log" and len (args) == 2:
+ bm.broker.setLogLevel(args[1])
+ elif cmd == "shutdown":
+ try:
+ bm.broker._method("shutdown", {})
+ except ConnectionError:
+ pass # Normal, the broker has been shut down!
+ bm.conn = None # Don't try to close again
+ else:
+ Usage()
+ except KeyboardInterrupt:
+ print
+ except IOError, e:
+ print e
+ bm.Disconnect()
+ return 1
+ except SystemExit, e:
+ bm.Disconnect()
+ return 1
+ except Exception,e:
+ if e.__class__.__name__ != "Timeout":
+ # ignore Timeout exception, handle in the loop below
+ print "Failed: %s: %s" % (e.__class__.__name__, e)
+ bm.Disconnect()
+ return 1
+
+ while True:
+ # some commands take longer than the default amqp timeout to complete,
+ # so attempt to disconnect until successful, ignoring Timeouts
+ try:
+ bm.Disconnect(ignore=False)
+ break
+ except Exception, e:
+ if e.__class__.__name__ != "Timeout":
+ print "Failed: %s: %s" % (e.__class__.__name__, e)
+ return 1
+ return config._returnCode
+
+
+if __name__ == "__main__":
+ sys.exit(main())
+
diff --git a/qpid/cpp/management/python/bin/qpid-config.bat b/qpid/cpp/management/python/bin/qpid-config.bat
new file mode 100644
index 0000000000..0ab000f5d3
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-config.bat
@@ -0,0 +1,2 @@
+@echo off
+python %~dp0\qpid-config %*
diff --git a/qpid/cpp/management/python/bin/qpid-ha b/qpid/cpp/management/python/bin/qpid-ha
new file mode 100755
index 0000000000..1c07658d34
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-ha
@@ -0,0 +1,299 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, sys, time, os, re, math
+from qpid.messaging import Connection
+from qpid.messaging import Message as QpidMessage
+from qpid.util import URL
+from qpidtoollibs.broker import BrokerAgent
+from qpidtoollibs.config import parse_qpidd_conf
+try:
+ from uuid import uuid4
+except ImportError:
+ from qpid.datatypes import uuid4
+
+# QMF address for the HA broker object.
+HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
+# Define these defaults here rather than in add_option because we want
+# to use qpidd.conf for defaults if --config is specified and
+# these defaults otherwise:
+DEFAULTS = { "broker":"0.0.0.0", "timeout":10.0}
+
+class ExitStatus(Exception):
+ """Raised if a command want's a non-0 exit status from the script"""
+ def __init__(self, status): self.status = status
+
+def find_qpidd_conf():
+ """Return the path to the local qpid.conf file or None if it is not found"""
+ p = os.path
+ prefix, bin = p.split(p.dirname(__file__))
+ if bin == "bin": # Installed in a standard place.
+ conf = p.join(prefix, "etc", "qpid", "qpidd.conf")
+ if p.isfile(conf): return conf
+ return None
+
+class Command(object):
+ """
+ Common options and logic for all commands. Subclasses provide additional
+ options and execution logic.
+ """
+
+ commands = []
+
+ def __init__(self, name, help, arg_names=[], connect_agent=True):
+ """@param connect_agent true if we should establish a QMF agent connection"""
+ Command.commands.append(self)
+ self.name = name
+ self.connect_agent = connect_agent
+ self.arg_names = arg_names
+ usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
+ self.help = help
+ self.op=optparse.OptionParser(usage)
+ common = optparse.OptionGroup(self.op, "Broker connection options")
+ def help_default(what): return " (Default %s)"%DEFAULTS[what]
+ common.add_option("-b", "--broker", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]"+help_default("broker"))
+ common.add_option("--timeout", type="float", metavar="<seconds>", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout"))
+ common.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override")
+ common.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
+ common.add_option("--ssl-certificate", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ common.add_option("--ssl-key", metavar="<key>", help="Client SSL private key (PEM Format)")
+ common.add_option("--config", metavar="<path/to/qpidd.conf>", help="Read default connection configuration from the qpidd.conf broker configuration file. Defaults are overridden by command-line options.)")
+ self.op.add_option_group(common)
+
+ def connect(self, opts):
+ conn_options = {}
+ if not opts.broker:
+ opts.broker = DEFAULTS["broker"]
+ # If we are connecting locally, use local qpidd.conf by default
+ if not opts.config: opts.config = find_qpidd_conf()
+ url = URL(opts.broker)
+ if opts.config: # Use broker config file for defaults
+ config = parse_qpidd_conf(opts.config)
+ if not url.user: url.user = config.get("ha-username")
+ if not url.password: url.password = config.get("ha-password")
+ if not url.port: url.port = config.get("port")
+ opts.broker = str(url)
+ if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism")
+ if not opts.timeout:
+ timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval")
+ if timeout: opts.timeout = float(timeout)
+ else: # Use DEFAULTS
+ if not opts.timeout: opts.timeout = DEFAULTS["timeout"]
+ if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.sasl_service_name:
+ conn_options['sasl_service'] = opts.sasl_service_name
+ if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ if not opts.ssl_certificate:
+ self.op.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ conn_options['ssl_keyfile'] = opts.ssl_key
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+ if opts.timeout:
+ conn_options['timeout'] = opts.timeout
+ conn_options['heartbeat'] = int(math.ceil(opts.timeout/2))
+ connection = Connection.establish(opts.broker, **conn_options)
+ qmf_broker = self.connect_agent and BrokerAgent(connection)
+ ha_broker = self.connect_agent and qmf_broker.getHaBroker()
+ return (connection, qmf_broker, ha_broker)
+
+ def all_brokers(self, ha_broker, opts, func):
+ """@return: List of (broker_addr, ha_broker) for all brokers in the cluster.
+ Returns (broker_addr, Exception) if an exception is raised accessing a broker.
+ """
+ # The brokersUrl setting is not in python URL format, simpler parsing here.
+ result = []
+ brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(","))
+ if brokers and opts.all:
+ if "@" in opts.broker: userpass = opts.broker.split("@")[0]
+ else: userpass = None
+ for b in brokers:
+ if userpass and not "@" in b: opts.broker = userpass+"@"+b
+ else: opts.broker = b
+ try:
+ connection, qmf_broker, ha_broker = self.connect(opts)
+ func(ha_broker, b)
+ except Exception,e:
+ func(ha_broker, b, e)
+ else:
+ func(ha_broker)
+
+ def execute(self, args):
+ opts, args = self.op.parse_args(args)
+ if len(args) != len(self.arg_names)+1:
+ self.op.print_help()
+ raise Exception("Wrong number of arguments")
+ self.connection, qmf_broker, ha_broker = self.connect(opts)
+ if self.connect_agent and not ha_broker:
+ raise Exception("HA module is not loaded on broker at %s" % opts.broker)
+ try: self.do_execute(qmf_broker, ha_broker, opts, args)
+ finally: self.connection.close()
+
+ def do_execute(self, qmf_broker, opts, args):
+ raise Exception("Command '%s' is not yet implemented"%self.name)
+
+class ManagerCommand(Command):
+ """
+ Base for commands that should only be used by a cluster manager tool that ensures
+ cluster consistency.
+ """
+
+ manager_commands = [] # Cluster manager commands
+
+ def __init__(self, name, help, arg_names=[], connect_agent=True):
+ """@param connect_agent true if we should establish a QMF agent connection"""
+ super(ManagerCommand, self).__init__(name, "[Cluster manager only] "+help, arg_names, connect_agent)
+ self.commands.remove(self) # Not a user command
+ self.manager_commands.append(self)
+
+
+class PingCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False)
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ self.connection.session() # Make sure we can establish a session.
+PingCmd()
+
+class PromoteCmd(ManagerCommand):
+ def __init__(self):
+ super(PromoteCmd, self).__init__("promote", "Promote a backup broker to primary. This command should *only* be used by a cluster manager (such as rgmanager) that ensures only one broker is primary at a time. Promoting more than one broker to primary at the same time will make the cluster inconsistent and will cause data loss and unexpected behavior.")
+
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ qmf_broker._method("promote", {}, HA_BROKER, timeout=opts.timeout)
+
+PromoteCmd()
+
+
+class StatusCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "status", "Print HA status")
+ self.op.add_option(
+ "--expect", metavar="<status>",
+ help="Don't print status. Return 0 if it matches <status>, 1 otherwise")
+ self.op.add_option(
+ "--is-primary", action="store_true", default=False,
+ help="Don't print status. Return 0 if the broker is primary, 1 otherwise")
+ self.op.add_option(
+ "--all", action="store_true", default=False,
+ help="Print status for all brokers in the cluster")
+
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ if opts.is_primary:
+ if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1)
+ return
+ if opts.expect:
+ if opts.expect != ha_broker.status: raise ExitStatus(1)
+ return
+
+ def status(hb, b=None, ex=None):
+ if ex: print b, ex
+ elif b: print b, hb.status
+ else: print hb.status
+ self.all_brokers(ha_broker, opts, status)
+
+StatusCmd()
+
+class ReplicateCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER, timeout=opts.timeout)
+ReplicateCmd()
+
+class QueryCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "query", "Print HA configuration and status")
+ self.op.add_option(
+ "--all", action="store_true", default=False,
+ help="Print configuration and status for all brokers in the cluster")
+
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ def query(hb, b=None, ex=None):
+ if ex:
+ print "%s %s\n" % (b, ex)
+ else:
+ if b:
+ print "%-20s %s"%("Address:", b)
+ for x in [("Status:", hb.status),
+ ("Broker ID:", hb.systemId),
+ ("Brokers URL:", hb.brokersUrl),
+ ("Public URL:", hb.publicUrl),
+ ("Replicate: ", hb.replicateDefault)
+ ]:
+ print "%-20s %s"%x
+ if b: print
+ self.all_brokers(ha_broker, opts, query)
+
+
+QueryCmd()
+
+def print_usage(prog):
+ print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%prog
+ for cmd in Command.commands:
+ print " %-12s %s."%(cmd.name, cmd.help.split(".")[0])
+ print "\nFor help with a command type: %s <command> --help\n"%prog
+
+def find_command(args, commands):
+ """Find a command among the arguments and options"""
+ for arg in args:
+ cmds = [cmd for cmd in commands if cmd.name == arg]
+ if cmds: return cmds[0]
+ return None
+
+def main_except(argv):
+ """This version of main raises exceptions"""
+ args = argv[1:]
+ commands = Command.commands
+ if "--cluster-manager" in args:
+ commands += ManagerCommand.manager_commands
+ args.remove("--cluster-manager")
+ if len(args) and args[0] in ['help', '--help', '-help', '-h', 'help-all', '--help-all']:
+ if 'help-all' in args[0]:
+ for c in commands: c.op.print_help(); print
+ else:
+ print_usage(os.path.basename(argv[0]));
+ else:
+ command = find_command(args, commands)
+ if command:
+ command.execute(args)
+ else:
+ # Check for attempt to use a manager command without --cluster-manager
+ command = find_command(args, ManagerCommand.manager_commands)
+ if command:
+ message="""'%s' should only be called by the cluster manager.
+Incorrect use of '%s' will cause cluster malfunction.
+To call from a cluster manager use '%s --cluster-manager'. """
+ raise Exception(message%((command.name,)*3))
+ else:
+ print_usage(os.path.basename(argv[0]));
+ raise Exception("No valid command")
+
+def main(argv):
+ try:
+ main_except(argv)
+ return 0
+ except ExitStatus, e:
+ return e.status
+ except Exception, e:
+ print "%s: %s"%(type(e).__name__, e)
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv))
diff --git a/qpid/cpp/management/python/bin/qpid-ha.bat b/qpid/cpp/management/python/bin/qpid-ha.bat
new file mode 100644
index 0000000000..29a77a0fb4
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-ha.bat
@@ -0,0 +1,2 @@
+@echo off
+python %~dp0\qpid-ha %*
diff --git a/qpid/cpp/management/python/bin/qpid-printevents b/qpid/cpp/management/python/bin/qpid-printevents
new file mode 100755
index 0000000000..f702ca91e8
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-printevents
@@ -0,0 +1,191 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+from optparse import IndentedHelpFormatter
+from time import time, strftime, gmtime, sleep
+from threading import Lock, Condition, Thread
+from qpid.messaging import Connection
+import qpid.messaging.exceptions
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs.broker import EventHelper
+
+
+class Printer(object):
+ """
+ This class serializes printed lines so that events coming from different
+ threads don't overlap each other.
+ """
+ def __init__(self):
+ self.lock = Lock()
+
+ def pr(self, text):
+ self.lock.acquire()
+ try:
+ print text
+ finally:
+ self.lock.release()
+ sys.stdout.flush()
+
+
+class EventReceiver(Thread):
+ """
+ One instance of this class is created for each broker that is being monitored.
+ This class does not use the "reconnect" option because it needs to report as
+ events when the connection is established and when it's lost.
+ """
+ def __init__(self, printer, url, options):
+ Thread.__init__(self)
+ self.printer = printer
+ self.url = url
+ self.options = options
+ self.running = True
+ self.helper = EventHelper()
+
+ def cancel(self):
+ self.running = False
+
+ def run(self):
+ isOpen = False
+ while self.running:
+ try:
+ conn = Connection.establish(self.url, **self.options)
+ isOpen = True
+ self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerConnected broker=%s" % self.url)
+
+ sess = conn.session()
+ rx = sess.receiver(self.helper.eventAddress())
+
+ while self.running:
+ try:
+ msg = rx.fetch(1)
+ event = self.helper.event(msg)
+ self.printer.pr(event.__repr__())
+ sess.acknowledge()
+ except qpid.messaging.exceptions.Empty:
+ pass
+
+ except Exception, e:
+ if isOpen:
+ self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerDisconnected broker=%s" % self.url)
+ isOpen = False
+ sleep(1)
+
+
+class JHelpFormatter(IndentedHelpFormatter):
+ """
+ Format usage and description without stripping newlines from usage strings
+ """
+ def format_usage(self, usage):
+ return usage
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+_usage = "%prog [options] [broker-addr]..."
+
+_description = \
+"""
+Collect and print events from one or more Qpid message brokers.
+
+If no broker-addr is supplied, %prog connects to 'localhost:5672'.
+
+[broker-addr] syntax:
+
+ [username/password@] hostname
+ ip-address [:<port>]
+
+Examples:
+
+$ %prog localhost:5672
+$ %prog 10.1.1.7:10000
+$ %prog guest/guest@broker-host:10000
+"""
+
+def main(argv=None):
+ p = optparse.OptionParser(usage=_usage, description=_description, formatter=JHelpFormatter())
+ p.add_option("--heartbeats", action="store_true", default=False, help="Use heartbeats.")
+ p.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ p.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
+ p.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ p.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ p.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
+
+ options, arguments = p.parse_args(args=argv)
+ if len(arguments) == 0:
+ arguments.append("localhost")
+
+ brokers = []
+ conn_options = {}
+ props = {}
+ printer = Printer()
+
+ if options.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = options.sasl_mechanism
+ if options.sasl_service_name:
+ conn_options['sasl_service'] = options.sasl_service_name
+ if options.ssl_certificate:
+ conn_options['ssl_certfile'] = options.ssl_certificate
+ if options.ssl_key:
+ if not options.ssl_certificate:
+ p.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ conn_options['ssl_keyfile'] = options.ssl_key
+ if options.ha_admin:
+ props['qpid.ha-admin'] = 1
+ if options.heartbeats:
+ props['heartbeat'] = 5
+
+ if len(props) > 0:
+ conn_options['client_properties'] = props
+
+ try:
+ try:
+ for host in arguments:
+ er = EventReceiver(printer, host, conn_options)
+ brokers.append(er)
+ er.start()
+
+ while (True):
+ sleep(10)
+
+ except KeyboardInterrupt:
+ print
+ return 0
+
+ except Exception, e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ return 1
+ finally:
+ for b in brokers:
+ b.cancel()
+ for b in brokers:
+ b.join()
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/qpid/cpp/management/python/bin/qpid-printevents.bat b/qpid/cpp/management/python/bin/qpid-printevents.bat
new file mode 100644
index 0000000000..3486bed39d
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-printevents.bat
@@ -0,0 +1,2 @@
+@echo off
+python %~dp0\qpid-printevents %*
diff --git a/qpid/cpp/management/python/bin/qpid-qls-analyze b/qpid/cpp/management/python/bin/qpid-qls-analyze
new file mode 100755
index 0000000000..7fbf6b1bb2
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-qls-analyze
@@ -0,0 +1,114 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+qpid-qls-analyze
+
+Reads and analyzes a Qpid Linear Store (QLS) store directory.
+"""
+
+import os.path
+import sys
+
+default = os.path.normpath('/usr/share/qpid-tools')
+home = os.environ.get('QPID_TOOLS_HOME', default)
+sys.path.append(os.path.join(home, 'python'))
+
+import argparse
+import os
+import qlslibs.analyze
+import qlslibs.efp
+
+class QlsAnalyzerArgParser(argparse.ArgumentParser):
+ """
+ Class to handle command-line arguments.
+ """
+ def __init__(self):
+ argparse.ArgumentParser.__init__(self, description='Qpid Linear Store Analyzer', prog='qpid-qls-analyze')
+ self.add_argument('qls_dir', metavar='DIR',
+ help='Qpid Linear Store (QLS) directory to be analyzed')
+ self.add_argument('--efp', action='store_true',
+ help='Analyze the Emtpy File Pool (EFP) and show stats')
+ self.add_argument('--show-recovered-recs', action='store_true',
+ help='Show only recovered records')
+ self.add_argument('--show-recovery-recs', action='store_true',
+ help='Show material records found during recovery')
+ self.add_argument('--show-all-recs', action='store_true',
+ help='Show all records (including fillers) found during recovery')
+ self.add_argument('--show-xids', action='store_true',
+ help='Show xid as hex number, otherwise show only xid length. Only has effect when records are shown')
+# TODO: Add ability to show xid as an index rather than a value, helps analysis when xid is a long value with
+# small differences which cannot easily be seen when looking at an output. Also prints a table of indeces vs xid values.
+# self.add_argument('--show-xid-index', action='store_true',
+# help='Show xids by index rather than by their value. Useful for long xids. Prints xid index table')
+ self.add_argument('--show-data', action='store_true',
+ help='Show data, otherwise show only data length. Only has effect when records are shown')
+ self.add_argument('--stats', action='store_true',
+ help='Print journal record stats')
+ self.add_argument('--txtest', action='store_true',
+ help='Show qpid-txtest message number as the message content when viewing records. Only has effect when records are shown')
+ self.add_argument('--txn', action='store_true',
+ help='Reconcile incomplete transactions')
+ self.add_argument('--version', action='version',
+ version='%(prog)s ' + QqpdLinearStoreAnalyzer.QLS_ANALYZE_VERSION)
+ def parse_args(self, args=None, namespace=None):
+ args = argparse.ArgumentParser.parse_args(self, args, namespace)
+ # If required, perform additional validity checks here, raise errors if req'd
+ return args
+
+class QqpdLinearStoreAnalyzer(object):
+ """
+ Top-level store analyzer. Will analyze the directory in args.qls_dir as the top-level Qpid Linear Store (QLS)
+ directory. The following may be analyzed:
+ * The Empty File Pool (if --efp is specified in the arguments)
+ * The Linear Store
+ * The Transaction Prepared List (TPL)
+ """
+ QLS_ANALYZE_VERSION = '1.0'
+ def __init__(self):
+ self.args = None
+ self._process_args()
+ self.qls_dir = os.path.abspath(self.args.qls_dir)
+ self.efp_manager = qlslibs.efp.EfpManager(self.qls_dir, None)
+ self.jrnl_recovery_mgr = qlslibs.analyze.JournalRecoveryManager(self.qls_dir, self.args)
+ def _process_args(self):
+ """ Create arg parser and process args """
+ parser = QlsAnalyzerArgParser()
+ self.args = parser.parse_args()
+ if not os.path.exists(self.args.qls_dir):
+ parser.error('Journal path "%s" does not exist' % self.args.qls_dir)
+ def report(self):
+ """ Create a report on the linear store previously analyzed using analyze() """
+ if self.args.efp:
+ self.efp_manager.report()
+ self.jrnl_recovery_mgr.report()
+ def run(self):
+ """ Run the analyzer, which reads and analyzes the linear store """
+ if self.args.efp:
+ self.efp_manager.run(None)
+ self.jrnl_recovery_mgr.run()
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+ M = QqpdLinearStoreAnalyzer()
+ M.run()
+ M.report()
diff --git a/qpid/cpp/management/python/bin/qpid-queue-stats b/qpid/cpp/management/python/bin/qpid-queue-stats
new file mode 100755
index 0000000000..ca78f9b602
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-queue-stats
@@ -0,0 +1,159 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+import re
+import socket
+import qpid
+from threading import Condition
+from qmf.console import Session, Console
+from qpid.peer import Closed
+from qpid.connection import Connection, ConnectionFailed
+from time import sleep
+
+class BrokerManager(Console):
+ def __init__(self, host, conn_options):
+ self.url = host
+ self.objects = {}
+ self.filter = None
+ self.session = Session(self, rcvEvents=False, rcvHeartbeats=False,
+ userBindings=True, manageConnections=True)
+ self.broker = self.session.addBroker(self.url, **conn_options)
+ self.firstError = True
+
+ def setFilter(self,filter):
+ self.filter = filter
+
+ def brokerConnected(self, broker):
+ if not self.firstError:
+ print "*** Broker connected"
+ self.firstError = False
+
+ def brokerDisconnected(self, broker):
+ print "*** Broker connection lost - %s, retrying..." % broker.getError()
+ self.firstError = False
+ self.objects.clear()
+
+ def objectProps(self, broker, record):
+ className = record.getClassKey().getClassName()
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ self.objects[id] = (record.name, None, None)
+
+ def objectStats(self, broker, record):
+ className = record.getClassKey().getClassName()
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ return
+
+ (name, first, last) = self.objects[id]
+ if first == None:
+ self.objects[id] = (name, record, None)
+ return
+
+ if len(self.filter) > 0 :
+ match = False
+
+ for x in self.filter:
+ if x.match(name):
+ match = True
+ break
+ if match == False:
+ return
+
+ if last == None:
+ lastSample = first
+ else:
+ lastSample = last
+
+ self.objects[id] = (name, first, record)
+
+ deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0])
+ if deltaTime < 1000000000.0:
+ return
+ enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \
+ (deltaTime / 1000000000.0)
+ dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \
+ (deltaTime / 1000000000.0)
+ print "%-41s%10.2f%11d%13.2f%13.2f" % \
+ (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate)
+ sys.stdout.flush()
+
+
+ def Display (self):
+ self.session.bindClass("org.apache.qpid.broker", "queue")
+ print "Queue Name Sec Depth Enq Rate Deq Rate"
+ print "========================================================================================"
+ sys.stdout.flush()
+ try:
+ while True:
+ sleep (1)
+ if self.firstError and self.broker.getError():
+ self.firstError = False
+ print "*** Error: %s, retrying..." % self.broker.getError()
+ except KeyboardInterrupt:
+ print
+ self.session.delBroker(self.broker)
+
+def main(argv=None):
+ p = optparse.OptionParser()
+ p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost')
+ p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show')
+ p.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ p.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
+ p.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ p.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+
+ options, arguments = p.parse_args(args=argv)
+
+ conn_options = {}
+ if options.sasl_mechanism:
+ conn_options['mechanisms'] = options.sasl_mechanism
+ if options.sasl_service_name:
+ conn_options['service'] = options.sasl_service_name
+ if options.ssl_certificate:
+ conn_options['ssl_certfile'] = options.ssl_certificate
+ if options.ssl_key:
+ if not options.ssl_certificate:
+ p.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ conn_options['ssl_keyfile'] = options.ssl_key
+
+ host = options.broker_address
+ filter = []
+ if options.filter != None:
+ for s in options.filter.split(","):
+ filter.append(re.compile(s))
+
+ bm = BrokerManager(host, conn_options)
+ bm.setFilter(filter)
+ bm.Display()
+
+if __name__ == '__main__':
+ sys.exit(main())
+
diff --git a/qpid/cpp/management/python/bin/qpid-queue-stats.bat b/qpid/cpp/management/python/bin/qpid-queue-stats.bat
new file mode 100644
index 0000000000..24290d46b3
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-queue-stats.bat
@@ -0,0 +1,3 @@
+@echo off
+python %~dp0\qpid-queue-stats %*
+
diff --git a/qpid/cpp/management/python/bin/qpid-receive b/qpid/cpp/management/python/bin/qpid-receive
new file mode 100755
index 0000000000..f14df277ac
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-receive
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, sys, time
+import statistics
+from qpid.messaging import *
+
+SECOND = 1000
+TIME_SEC = 1000000000
+
+op = optparse.OptionParser(usage="usage: %prog [options]", description="Drains messages from the specified address")
+op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to")
+op.add_option("-a", "--address", type="str", help="address to receive from")
+op.add_option("--connection-options", default={}, help="options for the connection")
+op.add_option("-m", "--messages", default=0, type="int", help="stop after N messages have been received, 0 means no limit")
+op.add_option("--timeout", default=0, type="int", help="timeout in seconds to wait before exiting")
+op.add_option("-f", "--forever", default=False, action="store_true", help="ignore timeout and wait forever")
+op.add_option("--ignore-duplicates", default=False, action="store_true", help="Detect and ignore duplicates (by checking 'sn' header)")
+op.add_option("--verify-sequence", default=False, action="store_true", help="Verify there are no gaps in the message sequence (by checking 'sn' header)")
+op.add_option("--check-redelivered", default=False, action="store_true", help="Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
+op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue")
+op.add_option("--ack-frequency", default=100, type="int", help="Ack frequency (0 implies none of the messages will get accepted)")
+op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)")
+op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)")
+op.add_option("--print-content", type="str", default="yes", help="print out message content")
+op.add_option("--print-headers", type="str", default="no", help="print out message headers")
+op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover")
+op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics")
+op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages")
+op.add_option("--report-header", type="str", default="yes", help="Headers on report")
+op.add_option("--ready-address", type="str", help="send a message to this address when ready to receive")
+op.add_option("--receive-rate", default=0, type="int", help="Receive at rate of N messages/second. 0 means receive as fast as possible")
+#op.add_option("--help", default=False, action="store_true", help="print this usage statement")
+
+def getTimeout(timeout, forever):
+ if forever:
+ return None
+ else:
+ return SECOND*timeout
+
+
+EOS = "eos"
+SN = "sn"
+
+# Check for duplicate or dropped messages by sequence number
+class SequenceTracker:
+ def __init__(self, opts):
+ self.opts = opts
+ self.lastSn = 0
+
+ # Return True if the message should be procesed, false if it should be ignored.
+ def track(self, message):
+ if not(self.opts.verify_sequence) or (self.opts.ignore_duplicates):
+ return True
+ sn = message.properties[SN]
+ duplicate = (sn <= lastSn)
+ dropped = (sn > lastSn+1)
+ if self.opts.verify_sequence and dropped:
+ raise Exception("Gap in sequence numbers %s-%s" %(lastSn, sn))
+ ignore = (duplicate and self.opts.ignore_duplicates)
+ if ignore and self.opts.check_redelivered and (not msg.redelivered):
+ raise Exception("duplicate sequence number received, message not marked as redelivered!")
+ if not duplicate:
+ lastSn = sn
+ return (not(ignore))
+
+
+def main():
+ opts, args = op.parse_args()
+ if not opts.address:
+ raise Exception("Address must be specified!")
+
+ broker = opts.broker
+ address = opts.address
+ connection = Connection(opts.broker, **opts.connection_options)
+
+ try:
+ connection.open()
+ if opts.failover_updates:
+ auto_fetch_reconnect_urls(connection)
+ session = connection.session(transactional=(opts.tx))
+ receiver = session.receiver(opts.address)
+ if opts.capacity > 0:
+ receiver.capacity = opts.capacity
+ msg = Message()
+ count = 0
+ txCount = 0
+ sequenceTracker = SequenceTracker(opts)
+ timeout = getTimeout(opts.timeout, opts.forever)
+ done = False
+ stats = statistics.ThroughputAndLatency()
+ reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats)
+
+ if opts.ready_address is not None:
+ session.sender(opts.ready_address).send(msg)
+ if opts.tx > 0:
+ session.commit()
+ # For receive rate calculation
+ start = time.time()*TIME_SEC
+ interval = 0
+ if opts.receive_rate > 0:
+ interval = TIME_SEC / opts.receive_rate
+
+ replyTo = {} # a dictionary of reply-to address -> sender mapping
+
+ while (not done):
+ try:
+ msg = receiver.fetch(timeout=timeout)
+ reporter.message(msg)
+ if sequenceTracker.track(msg):
+ if msg.content == EOS:
+ done = True
+ else:
+ count+=1
+ if opts.print_headers == "yes":
+ if msg.subject is not None:
+ print "Subject: %s" %msg.subject
+ if msg.reply_to is not None:
+ print "ReplyTo: %s" %msg.reply_to
+ if msg.correlation_id is not None:
+ print "CorrelationId: %s" %msg.correlation_id
+ if msg.user_id is not None:
+ print "UserId: %s" %msg.user_id
+ if msg.ttl is not None:
+ print "TTL: %s" %msg.ttl
+ if msg.priority is not None:
+ print "Priority: %s" %msg.priority
+ if msg.durable:
+ print "Durable: true"
+ if msg.redelivered:
+ print "Redelivered: true"
+ print "Properties: %s" %msg.properties
+ print
+ if opts.print_content == "yes":
+ print msg.content
+ if (opts.messages > 0) and (count >= opts.messages):
+ done = True
+ # end of "if sequenceTracker.track(msg):"
+ if (opts.tx > 0) and (count % opts.tx == 0):
+ txCount+=1
+ if (opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0):
+ session.rollback()
+ else:
+ session.commit()
+ elif (opts.ack_frequency > 0) and (count % opts.ack_frequency == 0):
+ session.acknowledge()
+ if msg.reply_to is not None: # Echo message back to reply-to address.
+ if msg.reply_to not in replyTo:
+ replyTo[msg.reply_to] = session.sender(msg.reply_to)
+ replyTo[msg.reply_to].capacity = opts.capacity
+ replyTo[msg.reply_to].send(msg)
+ if opts.receive_rate > 0:
+ delay = start + count*interval - time.time()*TIME_SEC
+ if delay > 0:
+ time.sleep(delay)
+ # Clear out message properties & content for next iteration.
+ msg = Message()
+ except Empty: # no message fetched => break the while cycle
+ break
+ # end of while cycle
+ if opts.report_total:
+ reporter.report()
+ if opts.tx > 0:
+ txCount+=1
+ if opts.rollback_frequency and (txCount % opts.rollback_frequency == 0):
+ session.rollback()
+ else:
+ session.commit()
+ else:
+ session.acknowledge()
+ session.close()
+ connection.close()
+ except Exception,e:
+ print e
+ connection.close()
+
+if __name__ == "__main__": main()
diff --git a/qpid/cpp/management/python/bin/qpid-route b/qpid/cpp/management/python/bin/qpid-route
new file mode 100755
index 0000000000..f51d2493e9
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-route
@@ -0,0 +1,635 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
+import sys
+import os
+import locale
+from qmf.console import Session, BrokerURL
+from time import sleep
+
+usage = """
+Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism]
+ qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>
+
+ qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism]
+ qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>
+ qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> [mechanism]
+ qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>
+ qpid-route [OPTIONS] route list [<dest-broker>]
+ qpid-route [OPTIONS] route flush [<dest-broker>]
+ qpid-route [OPTIONS] route map [<broker>]
+
+ qpid-route [OPTIONS] link add <dest-broker> <src-broker> [mechanism]
+ qpid-route [OPTIONS] link del <dest-broker> <src-broker>
+ qpid-route [OPTIONS] link list [<dest-broker>]"""
+
+description = """
+ADDRESS syntax:
+
+ [username/password@] hostname
+ ip-address [:<port>]"""
+
+def Usage():
+ print usage
+
+class Config:
+ def __init__(self):
+ self._verbose = False
+ self._quiet = False
+ self._durable = False
+ self._dellink = False
+ self._srclocal = False
+ self._transport = "tcp"
+ self._ack = 0
+ self._credit = 0xFFFFFFFF # unlimited
+ self._connTimeout = 10
+ self._conn_options = {}
+
+config = Config()
+
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+def OptionsAndArguments(argv):
+ parser = OptionParser(usage=usage,
+ description=description,
+ formatter=JHelpFormatter())
+
+ parser.add_option("--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option("-v", "--verbose", action="store_true", help="Verbose output")
+ parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings")
+ parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable")
+
+ parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link")
+ parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)")
+
+ parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N")
+ parser.add_option("--credit", action="store", type="int", default=0xFFFFFFFF, metavar="<msgs>",
+ help="Maximum number of messages a sender can have outstanding (0=unlimited)")
+ parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
+
+ parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
+ parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
+ opts, encArgs = parser.parse_args(args=argv)
+
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ if opts.timeout:
+ config._connTimeout = opts.timeout
+ if config._connTimeout == 0:
+ config._connTimeout = None
+
+ if opts.verbose:
+ config._verbose = True
+
+ if opts.quiet:
+ config._quiet = True
+
+ if opts.durable:
+ config._durable = True
+
+ if opts.del_empty_link:
+ config._dellink = True
+
+ if opts.src_local:
+ config._srclocal = True
+
+ if opts.transport:
+ config._transport = opts.transport
+
+ if opts.ha_admin:
+ config._conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
+ if opts.ack:
+ config._ack = opts.ack
+
+ config._credit = opts.credit
+
+ if opts.client_sasl_mechanism:
+ config._conn_options['mechanisms'] = opts.client_sasl_mechanism
+ if opts.sasl_service_name:
+ config._conn_options['service'] = opts.sasl_service_name
+
+ if opts.ssl_certificate:
+ config._conn_options['ssl_certfile'] = opts.ssl_certificate
+
+ if opts.ssl_key:
+ if not opts.ssl_certificate:
+ parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ config._conn_options['ssl_keyfile'] = opts.ssl_key
+
+ return args
+
+
+class RouteManager:
+ def __init__(self, localBroker):
+ self.brokerList = {}
+ self.local = BrokerURL(localBroker)
+ self.remote = None
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(localBroker, config._connTimeout, **config._conn_options)
+ self.broker._waitForStable()
+ self.agent = self.broker.getBrokerAgent()
+
+ def disconnect(self):
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
+ except:
+ pass # ignore errors while shutting down
+
+ def getLink(self):
+ links = self.agent.getObjects(_class="link")
+ for link in links:
+ if self.remote.match(link.host, link.port):
+ return link
+ return None
+
+ def checkLink(self, link):
+ retry = 3
+ while link is None or (link.state in ("Waiting", "Connecting", "Closing") and retry > 0):
+ sleep(1)
+ link = self.getLink()
+ retry -= 1
+
+ if link == None:
+ raise Exception("Link failed to create")
+
+ if link.state == "Failed":
+ raise Exception("Link failed to create %s" % (link.lastError or ""))
+ elif config._verbose:
+ print "Link state is", link.state
+
+ def addLink(self, remoteBroker, interbroker_mechanism=""):
+ self.remote = BrokerURL(remoteBroker)
+ if self.local.match(self.remote.host, self.remote.port):
+ raise Exception("Linking broker to itself is not permitted")
+
+ brokers = self.agent.getObjects(_class="broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ res = broker.connect(self.remote.host, self.remote.port, config._durable,
+ interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "",
+ config._transport)
+
+ def delLink(self, remoteBroker):
+ self.remote = BrokerURL(remoteBroker)
+ brokers = self.agent.getObjects(_class="broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ raise Exception("Link not found")
+
+ res = link.close()
+ if config._verbose:
+ print "Close method returned:", res.status, res.text
+
+ def listLinks(self):
+ links = self.agent.getObjects(_class="link")
+ if len(links) == 0:
+ print "No Links Found"
+ else:
+ print
+ print "Host Port Transport Durable State Last Error"
+ print "============================================================================="
+ for link in links:
+ print "%-16s%-8d%-13s%c %-18s%s" % \
+ (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError)
+
+ def mapRoutes(self):
+ print
+ print "Finding Linked Brokers:"
+
+ self.brokerList[self.local.name()] = self.broker
+ print " %s:%s... Ok" % (self.local.host, self.local.port)
+
+ added = True
+ while added:
+ added = False
+ links = self.qmf.getObjects(_class="link")
+ for link in links:
+ url = BrokerURL(host=link.host, port=link.port, user=self.broker.authUser, password=self.broker.authPass)
+ if url.name() not in self.brokerList:
+ print " %s:%s..." % (link.host, link.port)
+ try:
+ url.authName = self.local.authName
+ url.authPass = self.local.authPass
+ b = self.qmf.addBroker(url, config._connTimeout, **config._conn_options)
+ self.brokerList[url.name()] = b
+ added = True
+ print "Ok"
+ except Exception, e:
+ print e
+
+ print
+ print "Dynamic Routes:"
+ bridges = self.qmf.getObjects(_class="bridge", dynamic=True)
+ fedExchanges = []
+ for bridge in bridges:
+ if bridge.src not in fedExchanges:
+ fedExchanges.append(bridge.src)
+ if len(fedExchanges) == 0:
+ print " none found"
+ print
+
+ for ex in fedExchanges:
+ print " Exchange %s:" % ex
+ pairs = []
+ for bridge in bridges:
+ if bridge.src == ex:
+ link = bridge._linkRef_
+ fromUrl = BrokerURL(host=link.host, port=link.port)
+ toUrl = bridge.getBroker().getUrl()
+ found = False
+ for pair in pairs:
+ if pair.matches(fromUrl, toUrl):
+ found = True
+ if not found:
+ pairs.append(RoutePair(fromUrl, toUrl))
+ for pair in pairs:
+ print " %s" % pair
+ print
+
+ print "Static Routes:"
+ bridges = self.qmf.getObjects(_class="bridge", dynamic=False)
+ if len(bridges) == 0:
+ print " none found"
+ print
+
+ for bridge in bridges:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ leftType = "ex"
+ rightType = "ex"
+ if bridge.srcIsLocal:
+ arrow = "=>"
+ left = bridge.src
+ right = bridge.dest
+ if bridge.srcIsQueue:
+ leftType = "queue"
+ else:
+ arrow = "<="
+ left = bridge.dest
+ right = bridge.src
+ if bridge.srcIsQueue:
+ rightType = "queue"
+
+ if bridge.srcIsQueue:
+ print " %s(%s=%s) %s %s(%s=%s)" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right)
+ else:
+ print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
+ print
+
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
+
+ def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False):
+ if dynamic and config._srclocal:
+ raise Exception("--src-local is not permitted on dynamic routes")
+
+ self.addLink(remoteBroker, interbroker_mechanism)
+ link = self.getLink()
+ self.checkLink(link)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
+ if not config._quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
+ sys.exit(0)
+
+ if config._verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(config._durable, exchange, exchange, routingKey, tag,
+ excludes, False, config._srclocal, dynamic,
+ config._ack, credit=config._credit)
+ if res.status != 0:
+ raise Exception(res.text)
+ if config._verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ):
+ self.addLink(remoteBroker, interbroker_mechanism)
+ link = self.getLink()
+ self.checkLink(link)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if not config._quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
+ sys.exit(0)
+
+ if config._verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(config._durable, queue, exchange, "", "", "", True,
+ config._srclocal, False, config._ack, credit=config._credit)
+ if res.status != 0:
+ raise Exception(res.text)
+ if config._verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def delQueueRoute(self, remoteBroker, exchange, queue):
+ self.remote = BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not config._quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if config._verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and config._dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if config._verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ sys.exit(0)
+ if not config._quiet:
+ raise Exception("Route not found")
+
+ def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
+ self.remote = BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not config._quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
+ and bridge.dynamic == dynamic:
+ if config._verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and config._dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if config._verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ return
+ if not config._quiet:
+ raise Exception("Route not found")
+
+ def listRoutes(self):
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
+
+ for bridge in bridges:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.getObjectId():
+ myLink = link
+ break
+ if myLink != None:
+ if bridge.dynamic:
+ keyText = "<dynamic>"
+ else:
+ keyText = bridge.key
+ print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
+
+ def clearAllRoutes(self):
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
+
+ for bridge in bridges:
+ if config._verbose:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.getObjectId():
+ myLink = link
+ break
+ if myLink != None:
+ print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
+ res = bridge.close()
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.text)
+ elif config._verbose:
+ print "Ok"
+
+ if config._dellink:
+ links = self.qmf.getObjects(_class="link")
+ for link in links:
+ if config._verbose:
+ print "Deleting Link: %s:%d... " % (link.host, link.port),
+ res = link.close()
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.text)
+ elif config._verbose:
+ print "Ok"
+
+class RoutePair:
+ def __init__(self, fromUrl, toUrl):
+ self.fromUrl = fromUrl
+ self.toUrl = toUrl
+ self.bidir = False
+
+ def __repr__(self):
+ if self.bidir:
+ delimit = "<=>"
+ else:
+ delimit = " =>"
+ return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
+
+ def matches(self, fromUrl, toUrl):
+ if fromUrl == self.fromUrl and toUrl == self.toUrl:
+ return True
+ if toUrl == self.fromUrl and fromUrl == self.toUrl:
+ self.bidir = True
+ return True
+ return False
+
+
+def YN(val):
+ if val == 1:
+ return 'Y'
+ return 'N'
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ nargs = len(args)
+ if nargs < 2:
+ Usage()
+ return(-1)
+
+ if nargs == 2:
+ localBroker = "localhost"
+ else:
+ if config._srclocal:
+ localBroker = args[3]
+ remoteBroker = args[2]
+ else:
+ localBroker = args[2]
+ if nargs > 3:
+ remoteBroker = args[3]
+
+ group = args[0]
+ cmd = args[1]
+
+ rm = None
+ try:
+ rm = RouteManager(localBroker)
+ if group == "link":
+ if cmd == "add":
+ if nargs < 3 or nargs > 5:
+ Usage()
+ return(-1)
+ interbroker_mechanism = ""
+ if nargs > 4: interbroker_mechanism = args[4]
+ rm.addLink(remoteBroker, interbroker_mechanism)
+ rm.checkLink(rm.getLink())
+ elif cmd == "del":
+ if nargs != 4:
+ Usage()
+ return(-1)
+ rm.delLink(remoteBroker)
+ elif cmd == "list":
+ rm.listLinks()
+
+ elif group == "dynamic":
+ if cmd == "add":
+ if nargs < 5 or nargs > 8:
+ Usage()
+ return(-1)
+
+ tag = ""
+ excludes = ""
+ interbroker_mechanism = ""
+ if nargs > 5: tag = args[5]
+ if nargs > 6: excludes = args[6]
+ if nargs > 7: interbroker_mechanism = args[7]
+ rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True)
+ elif cmd == "del":
+ if nargs != 5:
+ Usage()
+ return(-1)
+ else:
+ rm.delRoute(remoteBroker, args[4], "", dynamic=True)
+
+ elif group == "route":
+ if cmd == "add":
+ if nargs < 6 or nargs > 9:
+ Usage()
+ return(-1)
+
+ tag = ""
+ excludes = ""
+ interbroker_mechanism = ""
+ if nargs > 6: tag = args[6]
+ if nargs > 7: excludes = args[7]
+ if nargs > 8: interbroker_mechanism = args[8]
+ rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False)
+ elif cmd == "del":
+ if nargs != 6:
+ Usage()
+ return(-1)
+ rm.delRoute(remoteBroker, args[4], args[5], dynamic=False)
+ elif cmd == "map":
+ rm.mapRoutes()
+ else:
+ if cmd == "list":
+ rm.listRoutes()
+ elif cmd == "flush":
+ rm.clearAllRoutes()
+ else:
+ Usage()
+ return(-1)
+
+ elif group == "queue":
+ if nargs < 6 or nargs > 7:
+ Usage()
+ return(-1)
+ if cmd == "add":
+ interbroker_mechanism = ""
+ if nargs > 6: interbroker_mechanism = args[6]
+ rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] )
+ elif cmd == "del":
+ rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
+ else:
+ Usage()
+ return(-1)
+ else:
+ Usage()
+ return(-1)
+
+ except Exception,e:
+ if rm:
+ rm.disconnect() # try to release broker resources
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ return 1
+
+ rm.disconnect()
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/qpid/cpp/management/python/bin/qpid-route.bat b/qpid/cpp/management/python/bin/qpid-route.bat
new file mode 100644
index 0000000000..ae8e9fe63c
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-route.bat
@@ -0,0 +1,2 @@
+@echo off
+python %~dp0\qpid-route %*
diff --git a/qpid/cpp/management/python/bin/qpid-send b/qpid/cpp/management/python/bin/qpid-send
new file mode 100755
index 0000000000..b0105e41a6
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-send
@@ -0,0 +1,281 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, random, os, time, uuid
+from qpid.messaging import *
+import statistics
+
+EOS = "eos"
+SN = "sn"
+TS = "ts"
+
+TIME_SEC = 1000000000
+SECOND = 1000
+
+def nameval(st):
+ idx = st.find("=")
+ if idx >= 0:
+ name = st[0:idx]
+ value = st[idx+1:]
+ else:
+ name = st
+ value = None
+ return name, value
+
+
+op = optparse.OptionParser(usage="usage: %prog [options]", description="Spouts messages to the specified address")
+op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to")
+op.add_option("-a", "--address", type="str", help="address to send to")
+op.add_option("--connection-options", default={}, help="options for the connection")
+op.add_option("-m", "--messages", default=1, type="int", help="stop after N messages have been sent, 0 means no limit")
+op.add_option("-i", "--id", type="str", help="use the supplied id instead of generating one")
+op.add_option("--reply-to", type="str", help="specify reply-to address")
+op.add_option("--send-eos", default=0, type="int", help="Send N EOS messages to mark end of input")
+op.add_option("--durable", default=False, action="store_true", help="Mark messages as durable")
+op.add_option("--ttl", default=0, type="int", help="Time-to-live for messages, in milliseconds")
+op.add_option("--priority", default=0, type="int", help="Priority for messages (higher value implies higher priority)")
+op.add_option("-P", "--property", default=[], action="append", type="str", help="specify message property")
+op.add_option("--correlation-id", type="str", help="correlation-id for message")
+op.add_option("--user-id", type="str", help="userid for message")
+op.add_option("--content-string", type="str", help="use CONTENT as message content")
+op.add_option("--content-size", default=0, type="int", help="create an N-byte message content")
+op.add_option("-M", "--content-map", default=[], action="append", type="str", help="specify entry for map content")
+op.add_option("--content-stdin", default=False, action="store_true", help="read message content from stdin, one line per message")
+op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue")
+op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)")
+op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)")
+op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover")
+op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics")
+op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages")
+op.add_option("--report-header", type="str", default="yes", help="Headers on report")
+op.add_option("--send-rate", default=0, type="int", help="Send at rate of N messages/second. 0 means send as fast as possible")
+op.add_option("--flow-control", default=0, type="int", help="Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
+op.add_option("--sequence", type="str", default="yes", help="Add a sequence number messages property (required for duplicate/lost message detection)")
+op.add_option("--timestamp", type="str", default="yes", help="Add a time stamp messages property (required for latency measurement)")
+op.add_option("--group-key", type="str", help="Generate groups of messages using message header 'KEY' to hold the group identifier")
+op.add_option("--group-prefix", default="GROUP-", type="str", help="Generate group identifers with 'STRING' prefix (if group-key specified)")
+op.add_option("--group-size", default=10, type="int", help="Number of messages per a group (if group-key specified)")
+op.add_option("--group-randomize-size", default=False, action="store_true", help="Randomize the number of messages per group to [1...group-size] (if group-key specified)")
+op.add_option("--group-interleave", default=1, type="int", help="Simultaineously interleave messages from N different groups (if group-key specified)")
+
+
+class ContentGenerator:
+ def setContent(self, msg):
+ return
+
+class GetlineContentGenerator(ContentGenerator):
+ def setContent(self, msg):
+ content = sys.stdin.readline()
+ got = (not line)
+ if (got):
+ msg.content = content
+ return got
+
+class FixedContentGenerator(ContentGenerator):
+ def __init__(self, content=None):
+ self.content = content
+
+ def setContent(self, msg):
+ msg.content = self.content
+ return True
+
+class MapContentGenerator(ContentGenerator):
+ def __init__(self, opts=None):
+ self.opts = opts
+
+ def setContent(self, msg):
+ self.content = {}
+ for e in self.opts.content_map:
+ name, val = nameval(p)
+ content[name] = val
+ msg.content = self.content
+ return True
+
+
+# tag each generated message with a group identifer
+class GroupGenerator:
+ def __init__(self, key, prefix, size, randomize, interleave):
+ groupKey = key
+ groupPrefix = prefix
+ groupSize = size
+ randomizeSize = randomize
+ groupSuffix = 0
+ if (randomize > 0):
+ random.seed(os.getpid())
+
+ for i in range(0, interleave):
+ self.newGroup()
+ current = 0
+
+ def setGroupInfo(self, msg):
+ if (current == len(groups)):
+ current = 0
+ my_group = groups[current]
+ msg.properties[groupKey] = my_group[id];
+ # print "SENDING GROUPID=[%s]\n" % my_group[id]
+ my_group[count]=my_group[count]+1
+ if (my_group[count] == my_group[size]):
+ self.newGroup()
+ del groups[current]
+ else:
+ current+=1
+
+ def newGroup(self):
+ groupId = "%s%s" % (groupPrefix, groupSuffix)
+ groupSuffix+=1
+ size = groupSize
+ if (randomizeSize == True):
+ size = random.randint(1,groupSize)
+ # print "New group: GROUPID=["%s] size=%s" % (groupId, size)
+ groups.append({'id':groupId, 'size':size, 'count':0})
+
+
+
+def main():
+ opts, args = op.parse_args()
+ if not opts.address:
+ raise Exception("Address must be specified!")
+
+ broker = opts.broker
+ address = opts.address
+ connection = Connection(opts.broker, **opts.connection_options)
+
+ try:
+ connection.open()
+ if (opts.failover_updates):
+ auto_fetch_reconnect_urls(connection)
+ session = connection.session(transactional=(opts.tx))
+ sender = session.sender(opts.address)
+ if (opts.capacity>0):
+ sender.capacity = opts.capacity
+ sent = 0
+ txCount = 0
+ stats = statistics.Throughput()
+ reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats)
+
+ contentGen = ContentGenerator()
+ content = "" # auxiliary variable for determining content type of message - needs to be changed to {} for Map message
+ if opts.content_stdin:
+ opts.messages = 0 # Don't limit number of messages sent.
+ contentGen = GetlineContentGenerator()
+ elif opts.content_map is not None:
+ contentGen = MapContentGenerator(opts)
+ content = {}
+ elif opts.content_size is not None:
+ contentGen = FixedContentGenerator('X' * opts.content_size)
+ else:
+ contentGen = FixedContentGenerator(opts.content_string)
+ if opts.group_key is not None:
+ groupGen = GroupGenerator(opts.group_key, opts.group_prefix, opts.group_size, opts.group_random_size, opts.group_interleave)
+
+ msg = Message(content=content)
+ msg.durable = opts.durable
+ if opts.ttl:
+ msg.ttl = opts.ttl/1000.0
+ if opts.priority:
+ msg.priority = opts.priority
+ if opts.reply_to is not None:
+ if opts.flow_control > 0:
+ raise Exception("Can't use reply-to and flow-control together")
+ msg.reply_to = opts.reply_to
+ if opts.user_id is not None:
+ msg.user_id = opts.user_id
+ if opts.correlation_id is not None:
+ msg.correlation_id = opts.correlation_id
+ for p in opts.property:
+ name, val = nameval(p)
+ msg.properties[name] = val
+
+ start = time.time()*TIME_SEC
+ interval = 0
+ if opts.send_rate > 0:
+ interval = TIME_SEC/opts.send_rate
+
+ flowControlAddress = "flow-" + str(uuid.uuid1()) + ";{create:always,delete:always}"
+ flowSent = 0
+ if opts.flow_control > 0:
+ flowControlReceiver = session.receiver(flowControlAddress)
+ flowControlReceiver.capacity = 2
+
+ while (contentGen.setContent(msg) == True):
+ sent+=1
+ if opts.sequence == "yes":
+ msg.properties[SN] = sent
+
+ if opts.flow_control > 0:
+ if (sent % opts.flow_control == 0):
+ msg.reply_to = flowControlAddress
+ flowSent+=1
+ else:
+ msg.reply_to = "" # Clear the reply address.
+
+ if 'groupGen' in vars():
+ groupGen.setGroupInfo(msg)
+
+ if (opts.timestamp == "yes"):
+ msg.properties[TS] = int(time.time()*TIME_SEC)
+ sender.send(msg)
+ reporter.message(msg)
+
+ if ((opts.tx > 0) and (sent % opts.tx == 0)):
+ txCount+=1
+ if ((opts.rollbackFrequency > 0) and (txCount % opts.rollbackFrequency == 0)):
+ session.rollback()
+ else:
+ session.commit()
+ if ((opts.messages > 0) and (sent >= opts.messages)):
+ break
+
+ if (opts.flow_control > 0) and (flowSent == 2):
+ flowControlReceiver.fetch(timeout=SECOND)
+ flowSent -= 1
+
+ if (opts.send_rate > 0):
+ delay = start + sent*interval - time.time()*TIME_SEC
+ if (delay > 0):
+ time.sleep(delay)
+ #end of while
+
+ while flowSent > 0:
+ flowControlReceiver.fetch(timeout=SECOND)
+ flowSent -= 1
+
+ if (opts.report_total):
+ reporter.report()
+ for i in reversed(range(1,opts.send_eos+1)):
+ if (opts.sequence == "yes"):
+ sent+=1
+ msg.properties[SN] = sent
+ msg.properties[EOS] = True #TODO (also in C++ client): add in ability to send digest or similar
+ sender.send(msg)
+ if ((opts.tx > 0) and (sent % opts.tx == 0)):
+ txCount+=1
+ if ((opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0)):
+ session.rollback()
+ else:
+ session.commit()
+ session.sync()
+ session.close()
+ connection.close()
+ except Exception,e:
+ print e
+ connection.close()
+
+if __name__ == "__main__": main()
diff --git a/qpid/cpp/management/python/bin/qpid-stat b/qpid/cpp/management/python/bin/qpid-stat
new file mode 100755
index 0000000000..1780c4a819
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-stat
@@ -0,0 +1,514 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from optparse import OptionParser, OptionGroup
+import sys
+import locale
+import socket
+import re
+from qpid.messaging import Connection
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs import BrokerAgent
+from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong
+
+
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._types = ""
+ self._limit = 50
+ self._increasing = False
+ self._sortcol = None
+
+config = Config()
+conn_options = {}
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+ global conn_options
+
+ usage = \
+"""%prog -g [options]
+ %prog -c [options]
+ %prog -e [options]
+ %prog -q [options] [queue-name]
+ %prog -u [options]
+ %prog -m [options]
+ %prog --acl [options]"""
+
+ parser = OptionParser(usage=usage)
+
+ group1 = OptionGroup(parser, "General Options")
+ group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>",
+ help="URL of the broker to query")
+ group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>",
+ help="Maximum time to wait for broker connection (in seconds)")
+ group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
+ help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ group1.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
+ group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
+ parser.add_option_group(group1)
+
+ group2 = OptionGroup(parser, "Command Options")
+ group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show")
+ group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show")
+ group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show")
+ group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show")
+ group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show")
+ group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show")
+ group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show")
+ parser.add_option_group(group2)
+
+ group3 = OptionGroup(parser, "Display Options")
+ group3.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name")
+ group3.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)")
+ group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows")
+ parser.add_option_group(group3)
+
+ opts, args = parser.parse_args(args=argv)
+
+ if not opts.show:
+ parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help")
+
+ config._types = opts.show
+ config._sortcol = opts.sort_by
+ config._host = opts.broker
+ config._connTimeout = opts.timeout
+ config._increasing = opts.increasing
+ config._limit = opts.limit
+
+ if opts.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.sasl_service_name:
+ conn_options['sasl_service'] = opts.sasl_service_name
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ if not opts.ssl_certificate:
+ parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ conn_options['ssl_keyfile'] = opts.ssl_key
+ if opts.ha_admin:
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
+ return args
+
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.connection = None
+ self.broker = None
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.connection = Connection.establish(self.url, **conn_options)
+ self.broker = BrokerAgent(self.connection)
+
+ def Disconnect(self):
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ self.connection.close()
+ except:
+ pass
+
+ def displayBroker(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('uptime', Header.DURATION))
+ heads.append(Header('cluster', Header.NONE))
+ heads.append(Header('connections', Header.COMMAS))
+ heads.append(Header('sessions', Header.COMMAS))
+ heads.append(Header('exchanges', Header.COMMAS))
+ heads.append(Header('queues', Header.COMMAS))
+ rows = []
+ broker = self.broker.getBroker()
+ cluster = self.broker.getCluster()
+ clusterInfo = cluster and cluster.clusterName + "<" + cluster.status + ">" or "<standalone>"
+ connections = self.getConnectionMap()
+ sessions = self.getSessionMap()
+ exchanges = self.getExchangeMap()
+ queues = self.getQueueMap()
+ row = (broker.getUpdateTime() - broker.getCreateTime(),
+ clusterInfo,
+ len(connections), len(sessions),
+ len(exchanges), len(queues))
+ rows.append(row)
+ disp.formattedTable('Broker Summary:', heads, rows)
+
+ if 'queueCount' not in broker.values:
+ return
+
+ print
+ heads = []
+ heads.append(Header('Statistic'))
+ heads.append(Header('Messages', Header.COMMAS))
+ heads.append(Header('Bytes', Header.COMMAS))
+ rows = []
+ rows.append(['queue-depth', broker.msgDepth, broker.byteDepth])
+ rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues])
+ rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues])
+ rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues])
+ rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues])
+ rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues])
+ rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues])
+ rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth])
+ rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues])
+ rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues])
+ rows.append(['acquires', broker.acquires, None])
+ rows.append(['releases', broker.releases, None])
+ rows.append(['discards-no-route', broker.discardsNoRoute, None])
+ rows.append(['discards-ttl-expired', broker.discardsTtl, None])
+ rows.append(['discards-limit-overflow', broker.discardsOverflow, None])
+ rows.append(['discards-ring-overflow', broker.discardsRing, None])
+ rows.append(['discards-lvq-replace', broker.discardsLvq, None])
+ rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None])
+ rows.append(['discards-purged', broker.discardsPurge, None])
+ rows.append(['reroutes', broker.reroutes, None])
+ rows.append(['abandoned', broker.abandoned, None])
+ rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None])
+ disp.formattedTable('Aggregate Broker Statistics:', heads, rows)
+
+
+ def displayConn(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('connection'))
+ heads.append(Header('cproc'))
+ heads.append(Header('cpid'))
+ heads.append(Header('mech'))
+ heads.append(Header('auth'))
+ heads.append(Header('connected', Header.DURATION))
+ heads.append(Header('idle', Header.DURATION))
+ heads.append(Header('msgIn', Header.KMG))
+ heads.append(Header('msgOut', Header.KMG))
+ rows = []
+ connections = self.broker.getAllConnections()
+ broker = self.broker.getBroker()
+ for conn in connections:
+ row = []
+ row.append(conn.address)
+ if conn.remoteProcessName: row.append(conn.remoteProcessName)
+ else: row.append("-")
+ row.append(conn.remotePid)
+ if conn.saslMechanism: row.append(conn.saslMechanism)
+ else: row.append("-")
+ if conn.authIdentity: row.append(conn.authIdentity)
+ else: row.append("-")
+ row.append(broker.getUpdateTime() - conn.getCreateTime())
+ row.append(broker.getUpdateTime() - conn.getUpdateTime())
+ row.append(conn.msgsFromClient)
+ row.append(conn.msgsToClient)
+ rows.append(row)
+ title = "Connections"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displaySession(self):
+ disp = Display(prefix=" ")
+
+ def displayExchange(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header("exchange"))
+ heads.append(Header("type"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("bind", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("msgDrop", Header.KMG))
+ heads.append(Header("byteIn", Header.KMG))
+ heads.append(Header("byteOut", Header.KMG))
+ heads.append(Header("byteDrop", Header.KMG))
+ rows = []
+ exchanges = self.broker.getAllExchanges()
+ for ex in exchanges:
+ row = []
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(ex.durable)
+ row.append(ex.bindingCount)
+ row.append(ex.msgReceives)
+ row.append(ex.msgRoutes)
+ row.append(ex.msgDrops)
+ row.append(ex.byteReceives)
+ row.append(ex.byteRoutes)
+ row.append(ex.byteDrops)
+ rows.append(row)
+ title = "Exchanges"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayQueues(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header("queue"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("autoDel", Header.Y))
+ heads.append(Header("excl", Header.Y))
+ heads.append(Header("msg", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("bytes", Header.KMG))
+ heads.append(Header("bytesIn", Header.KMG))
+ heads.append(Header("bytesOut", Header.KMG))
+ heads.append(Header("cons", Header.KMG))
+ heads.append(Header("bind", Header.KMG))
+ rows = []
+ queues = self.broker.getAllQueues()
+ for q in queues:
+ row = []
+ row.append(q.name)
+ row.append(q.durable)
+ row.append(q.autoDelete)
+ row.append(q.exclusive)
+ row.append(q.msgDepth)
+ row.append(q.msgTotalEnqueues)
+ row.append(q.msgTotalDequeues)
+ row.append(q.byteDepth)
+ row.append(q.byteTotalEnqueues)
+ row.append(q.byteTotalDequeues)
+ row.append(q.consumerCount)
+ row.append(q.bindingCount)
+ rows.append(row)
+ title = "Queues"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+
+ def displayQueue(self, name):
+ queue = self.broker.getQueue(name)
+ if not queue:
+ print "Queue '%s' not found" % name
+ return
+
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('Name'))
+ heads.append(Header('Durable', Header.YN))
+ heads.append(Header('AutoDelete', Header.YN))
+ heads.append(Header('Exclusive', Header.YN))
+ heads.append(Header('FlowStopped', Header.YN))
+ heads.append(Header('FlowStoppedCount', Header.COMMAS))
+ heads.append(Header('Consumers', Header.COMMAS))
+ heads.append(Header('Bindings', Header.COMMAS))
+ rows = []
+ rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive,
+ queue.flowStopped, queue.flowStoppedCount,
+ queue.consumerCount, queue.bindingCount])
+ disp.formattedTable("Properties:", heads, rows)
+ print
+
+ heads = []
+ heads.append(Header('Property'))
+ heads.append(Header('Value'))
+ rows = []
+ rows.append(['arguments', queue.arguments])
+ rows.append(['alt-exchange', queue.altExchange])
+ disp.formattedTable("Optional Properties:", heads, rows)
+ print
+
+ heads = []
+ heads.append(Header('Statistic'))
+ heads.append(Header('Messages', Header.COMMAS))
+ heads.append(Header('Bytes', Header.COMMAS))
+ rows = []
+ rows.append(['queue-depth', queue.msgDepth, queue.byteDepth])
+ rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues])
+ rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues])
+ rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues])
+ rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues])
+ rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues])
+ rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues])
+ rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth])
+ rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues])
+ rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues])
+ rows.append(['acquires', queue.acquires, None])
+ rows.append(['releases', queue.releases, None])
+ rows.append(['discards-ttl-expired', queue.discardsTtl, None])
+ rows.append(['discards-limit-overflow', queue.discardsOverflow, None])
+ rows.append(['discards-ring-overflow', queue.discardsRing, None])
+ rows.append(['discards-lvq-replace', queue.discardsLvq, None])
+ rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None])
+ rows.append(['discards-purged', queue.discardsPurge, None])
+ rows.append(['reroutes', queue.reroutes, None])
+ disp.formattedTable("Statistics:", heads, rows)
+
+
+ def displaySubscriptions(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header("subscr"))
+ heads.append(Header("queue"))
+ heads.append(Header("conn"))
+ heads.append(Header("procName"))
+ heads.append(Header("procId"))
+ heads.append(Header("browse", Header.Y))
+ heads.append(Header("acked", Header.Y))
+ heads.append(Header("excl", Header.Y))
+ heads.append(Header("creditMode"))
+ heads.append(Header("delivered", Header.COMMAS))
+ heads.append(Header("sessUnacked", Header.COMMAS))
+ rows = []
+ subscriptions = self.broker.getAllSubscriptions()
+ sessions = self.getSessionMap()
+ connections = self.getConnectionMap()
+ for s in subscriptions:
+ row = []
+ try:
+ row.append(s.name)
+ row.append(s.queueRef)
+ session = sessions[s.sessionRef]
+ connection = connections[session.connectionRef]
+ row.append(connection.address)
+ if connection.remoteProcessName: row.append(connection.remoteProcessName)
+ else: row.append("-")
+ row.append(connection.remotePid)
+ row.append(s.browsing)
+ row.append(s.acknowledged)
+ row.append(s.exclusive)
+ row.append(s.creditMode)
+ row.append(s.delivered)
+ row.append(session.unackedMessages)
+ rows.append(row)
+ except:
+ pass
+ title = "Subscriptions"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayMemory(self):
+ disp = Display(prefix=" ")
+ heads = [Header('Statistic'), Header('Value', Header.COMMAS)]
+ rows = []
+ memory = self.broker.getMemory()
+ for k,v in memory.values.items():
+ if k != 'name':
+ rows.append([k, v])
+ disp.formattedTable('Broker Memory Statistics:', heads, rows)
+
+ def displayAcl(self):
+ acl = self.broker.getAcl()
+ if not acl:
+ print "ACL Policy Module is not installed"
+ return
+ disp = Display(prefix=" ")
+ heads = [Header('Statistic'), Header('Value')]
+ rows = []
+ rows.append(['policy-file', acl.policyFile])
+ rows.append(['enforcing', YN(acl.enforcingAcl)])
+ rows.append(['has-transfer-acls', YN(acl.transferAcl)])
+ rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)])
+ rows.append(['acl-denials', Commas(acl.aclDenyCount)])
+ disp.formattedTable('ACL Policy Statistics:', heads, rows)
+
+ def getExchangeMap(self):
+ exchanges = self.broker.getAllExchanges()
+ emap = {}
+ for e in exchanges:
+ emap[e.name] = e
+ return emap
+
+ def getQueueMap(self):
+ queues = self.broker.getAllQueues()
+ qmap = {}
+ for q in queues:
+ qmap[q.name] = q
+ return qmap
+
+ def getSessionMap(self):
+ sessions = self.broker.getAllSessions()
+ smap = {}
+ for s in sessions:
+ smap[s.name] = s
+ return smap
+
+ def getConnectionMap(self):
+ connections = self.broker.getAllConnections()
+ cmap = {}
+ for c in connections:
+ cmap[c.address] = c
+ return cmap
+
+ def displayMain(self, names, main):
+ if main == 'g': self.displayBroker()
+ elif main == 'c': self.displayConn()
+ elif main == 's': self.displaySession()
+ elif main == 'e': self.displayExchange()
+ elif main == 'q':
+ if len(names) >= 1:
+ self.displayQueue(names[0])
+ else:
+ self.displayQueues()
+ elif main == 'u': self.displaySubscriptions()
+ elif main == 'm': self.displayMemory()
+ elif main == 'acl': self.displayAcl()
+
+ def display(self, names):
+ self.displayMain(names, config._types)
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager()
+
+ try:
+ bm.SetBroker(config._host)
+ bm.display(args)
+ bm.Disconnect()
+ return 0
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+ bm.Disconnect() # try to deallocate brokers
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/qpid/cpp/management/python/bin/qpid-stat.bat b/qpid/cpp/management/python/bin/qpid-stat.bat
new file mode 100644
index 0000000000..0a03d5177c
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-stat.bat
@@ -0,0 +1,2 @@
+@echo off
+python %~dp0\qpid-stat %*
diff --git a/qpid/cpp/management/python/bin/qpid-store-chk b/qpid/cpp/management/python/bin/qpid-store-chk
new file mode 100755
index 0000000000..f6d70cb3c6
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-store-chk
@@ -0,0 +1,332 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpidstore import jerr, jrnl, janal
+import optparse, os, sys
+
+
+#== class StoreChk ============================================================
+
+class StoreChk(object):
+ """
+ This class:
+ 1. Reads a journal jinf file, and from its info:
+ 2. Analyzes the journal data files to determine which is the last to be written, then
+ 3. Reads and analyzes all the records in the journal files.
+ The only public method is run() which kicks off the analysis.
+ """
+
+ def __init__(self):
+ """Constructor"""
+ # params
+ self.opts = None
+
+ self._jdir = None
+
+ # recovery analysis objects
+# self._jrnl_info = None
+# self.jrnl_rdr = None
+
+ self._process_args()
+ self._jrnl_info = jrnl.JrnlInfo(self._jdir, self.opts.bfn)
+ # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+ jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
+ self.jrnl_anal = janal.JrnlAnalyzer(self._jrnl_info)
+ self.jrnl_rdr = janal.JrnlReader(self._jrnl_info, self.jrnl_anal, self.opts.qflag, self.opts.rflag,
+ self.opts.vflag)
+
+ def run(self):
+ """Run the store check"""
+ if not self.opts.qflag:
+ print self._jrnl_info
+ print self.jrnl_anal
+ self.jrnl_rdr.run()
+ self._report()
+
+ def _report(self):
+ """Print the results of the store check"""
+ if not self.opts.qflag:
+ print
+ print " === REPORT ===="
+ print
+ print "Records: %8d non-transactional" % \
+ (self.jrnl_rdr.get_msg_cnt() - self.jrnl_rdr.get_txn_msg_cnt())
+ print " %8d transactional" % self.jrnl_rdr.get_txn_msg_cnt()
+ print " %8d total" % self.jrnl_rdr.get_msg_cnt()
+ print
+ print "Transactions: %8d aborts" % self.jrnl_rdr.get_abort_cnt()
+ print " %8d commits" % self.jrnl_rdr.get_commit_cnt()
+ print " %8d total" % (self.jrnl_rdr.get_abort_cnt() + self.jrnl_rdr.get_commit_cnt())
+ print
+ if self.jrnl_rdr.emap().size() > 0:
+ print "Remaining enqueued records (sorted by rid): "
+ rid_list = self.jrnl_rdr.emap().rids()
+ rid_list.sort()
+ for rid in rid_list:
+ l = self.jrnl_rdr.emap().get(rid)
+ locked = ""
+ if l[2]:
+ locked += " (locked)"
+ print " fid=%d %s%s" % (l[0], l[1], locked)
+ print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size()
+ else:
+ print "No remaining enqueued records found (emap empty)."
+ print
+ if self.jrnl_rdr.tmap().size() > 0:
+ txn_rec_cnt = 0
+ print "Incomplete transactions: "
+ for xid in self.jrnl_rdr.tmap().xids():
+ jrnl.Utils.format_xid(xid)
+ recs = self.jrnl_rdr.tmap().get(xid)
+ for l in recs:
+ print " fid=%d %s" % (l[0], l[1])
+ print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid))
+ print
+ txn_rec_cnt += len(recs)
+ print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % \
+ (self.jrnl_rdr.tmap().size(), txn_rec_cnt)
+ else:
+ print "No incomplete transactions found (tmap empty)."
+ print
+ print "%d enqueues, %d journal records processed." % \
+ (self.jrnl_rdr.get_msg_cnt(), self.jrnl_rdr.get_rec_cnt())
+
+
+ def _process_args(self):
+ """Process the command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
+ action="store", dest="bfn", default="JournalData",
+ help="Base filename for old journal files")
+ opt.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ opt.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print all records and transactions (including consumed/closed)")
+ opt.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self.opts, args) = opt.parse_args()
+ if len(args) == 0:
+ opt.error("No journal directory argument")
+ elif len(args) > 1:
+ opt.error("Too many positional arguments: %s" % args)
+ if self.opts.qflag and self.opts.rflag:
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ if self.opts.qflag and self.opts.vflag:
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
+
+
+#== class CsvStoreChk =========================================================
+
+class CsvStoreChk(StoreChk):
+ """
+ This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction
+ record counts) to expected values from a CSV file. This can be used for additional automated testing, and is
+ currently in use in the long store tests for journal encode testing.
+ """
+
+ # CSV file cols
+ TEST_NUM_COL = 0
+ NUM_MSGS_COL = 5
+ MIN_MSG_SIZE_COL = 7
+ MAX_MSG_SIZE_COL = 8
+ MIN_XID_SIZE_COL = 9
+ MAX_XID_SIZE_COL = 10
+ AUTO_DEQ_COL = 11
+ TRANSIENT_COL = 12
+ EXTERN_COL = 13
+ COMMENT_COL = 20
+
+ def __init__(self):
+ """Constructor"""
+ StoreChk.__init__(self)
+
+ # csv params
+ self.num_msgs = None
+ self.msg_len = None
+ self.auto_deq = None
+ self.xid_len = None
+ self.transient = None
+ self.extern = None
+
+ self._warning = []
+
+ self.jrnl_rdr.set_callbacks(self, CsvStoreChk._csv_pre_run_chk, CsvStoreChk._csv_enq_chk,
+ CsvStoreChk._csv_deq_chk, CsvStoreChk._csv_txn_chk, CsvStoreChk._csv_post_run_chk)
+ self._get_csv_test()
+
+ def _get_csv_test(self):
+ """Get a test from the CSV reader"""
+ if self.opts.csvfn != None and self.opts.tnum != None:
+ tparams = self._read_csv_file(self.opts.csvfn, self.opts.tnum)
+ if tparams == None:
+ print "ERROR: Test %d not found in CSV file \"%s\"" % (self.opts.tnum, self.opts.csvfn)
+ sys.exit(1)
+ self.num_msgs = tparams["num_msgs"]
+ if tparams["min_size"] == tparams["max_size"]:
+ self.msg_len = tparams["max_size"]
+ else:
+ self.msg_len = 0
+ self.auto_deq = tparams["auto_deq"]
+ if tparams["xid_min_size"] == tparams["xid_max_size"]:
+ self.xid_len = tparams["xid_max_size"]
+ else:
+ self.xid_len = 0
+ self.transient = tparams["transient"]
+ self.extern = tparams["extern"]
+
+ def _read_csv_file(self, filename, tnum):
+ """Read the CSV test parameter file"""
+ try:
+ csvf = open(filename, "r")
+ except IOError:
+ print "ERROR: Unable to open CSV file \"%s\"" % filename
+ sys.exit(1)
+ for line in csvf:
+ str_list = line.strip().split(",")
+ if len(str_list[0]) > 0 and str_list[0][0] != "\"":
+ try:
+ if (int(str_list[self.TEST_NUM_COL]) == tnum):
+ return { "num_msgs": int(str_list[self.NUM_MSGS_COL]),
+ "min_size": int(str_list[self.MIN_MSG_SIZE_COL]),
+ "max_size": int(str_list[self.MAX_MSG_SIZE_COL]),
+ "auto_deq": not (str_list[self.AUTO_DEQ_COL] == "FALSE" or
+ str_list[self.AUTO_DEQ_COL] == "0"),
+ "xid_min_size": int(str_list[self.MIN_XID_SIZE_COL]),
+ "xid_max_size": int(str_list[self.MAX_XID_SIZE_COL]),
+ "transient": not (str_list[self.TRANSIENT_COL] == "FALSE" or
+ str_list[self.TRANSIENT_COL] == "0"),
+ "extern": not (str_list[self.EXTERN_COL] == "FALSE" or
+ str_list[self.EXTERN_COL] == "0"),
+ "comment": str_list[self.COMMENT_COL] }
+ except Exception:
+ pass
+ return None
+
+ def _process_args(self):
+ """Process command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
+ action="store", dest="bfn", default="JournalData",
+ help="Base filename for old journal files")
+ opt.add_option("-c", "--csv-filename",
+ action="store", dest="csvfn",
+ help="CSV filename containing test parameters")
+ opt.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ opt.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print all records and transactions (including consumed/closed)")
+ opt.add_option("-t", "--test-num",
+ action="store", type="int", dest="tnum",
+ help="Test number from CSV file - only valid if CSV file named")
+ opt.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self.opts, args) = opt.parse_args()
+ if len(args) == 0:
+ opt.error("No journal directory argument")
+ elif len(args) > 1:
+ opt.error("Too many positional arguments: %s" % args)
+ if self.opts.qflag and self.opts.rflag:
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ if self.opts.qflag and self.opts.vflag:
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
+
+ # Callbacks for checking against CSV test parameters. Return False if ok, True to raise error.
+
+ #@staticmethod
+ def _csv_pre_run_chk(csv_store_chk):
+ """Check performed before a test runs"""
+ if csv_store_chk.num_msgs == None:
+ return
+ if csv_store_chk.jrnl_anal.is_empty() and csv_store_chk.num_msgs > 0:
+ raise jerr.AllJrnlFilesEmptyCsvError(csv_store_chk.get_opts().tnum, csv_store_chk.num_msgs)
+ return False
+ _csv_pre_run_chk = staticmethod(_csv_pre_run_chk)
+
+ #@staticmethod
+ def _csv_enq_chk(csv_store_chk, hdr):
+ """Check performed before each enqueue operation"""
+ #if csv_store_chk.num_msgs == None: return
+ #
+ if csv_store_chk.extern != None:
+ if csv_store_chk.extern != hdr.extern:
+ raise jerr.ExternFlagCsvError(csv_store_chk.opts.tnum, csv_store_chk.extern)
+ if hdr.extern and hdr.data != None:
+ raise jerr.ExternFlagWithDataCsvError(csv_store_chk.opts.tnum)
+ if csv_store_chk.msg_len != None and csv_store_chk.msg_len > 0 and hdr.data != None and \
+ len(hdr.data) != csv_store_chk.msg_len:
+ raise jerr.MessageLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.msg_len, len(hdr.data))
+ if csv_store_chk.xid_len != None and csv_store_chk.xid_len > 0 and len(hdr.xid) != csv_store_chk.xid_len:
+ raise jerr.XidLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.xid_len, len(hdr.xid))
+ if csv_store_chk.transient != None and hdr.transient != csv_store_chk.transient:
+ raise jerr.TransactionCsvError(csv_store_chk.opts.tnum, csv_store_chk.transient)
+ return False
+ _csv_enq_chk = staticmethod(_csv_enq_chk)
+
+ #@staticmethod
+ def _csv_deq_chk(csv_store_chk, hdr):
+ """Check performed before each dequeue operation"""
+ if csv_store_chk.auto_deq != None and not csv_store_chk.auto_deq:
+ raise jerr.JWarning("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
+ (csv_store_chk.opts.tnum, hdr.rid))
+ #self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
+ # (csv_store_chk.opts.tnum, hdr.rid))
+ return False
+ _csv_deq_chk = staticmethod(_csv_deq_chk)
+
+ #@staticmethod
+ def _csv_txn_chk(csv_store_chk, hdr):
+ """Check performed before each transaction commit/abort"""
+ return False
+ _csv_txn_chk = staticmethod(_csv_txn_chk)
+
+ #@staticmethod
+ def _csv_post_run_chk(csv_store_chk):
+ """Cehck performed after the completion of the test"""
+ # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because
+ # of journal overwriting
+ if csv_store_chk.num_msgs != None and not csv_store_chk.jrnl_rdr.is_last_file() and \
+ csv_store_chk.num_msgs != csv_store_chk.jrnl_rdr.get_msg_cnt():
+ raise jerr.NumMsgsCsvError(csv_store_chk.opts.tnum, csv_store_chk.num_msgs,
+ csv_store_chk.jrnl_rdr.get_msg_cnt())
+ return False
+ _csv_post_run_chk = staticmethod(_csv_post_run_chk)
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+ M = CsvStoreChk()
+ try:
+ M.run()
+ except Exception, e:
+ sys.exit(e)
diff --git a/qpid/cpp/management/python/bin/qpid-store-resize b/qpid/cpp/management/python/bin/qpid-store-resize
new file mode 100755
index 0000000000..38d8eaf1ad
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-store-resize
@@ -0,0 +1,350 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpidstore import jerr, jrnl, janal
+import glob, optparse, os, sys, time
+
+
+#== class Resize ==============================================================
+
+class Resize(object):
+ """
+ Creates a new store journal and copies records from old journal to new. The new journal may be of
+ different size from the old one. The records are packed into the new journal (ie only remaining
+ enqueued records and associated transactions - if any - are copied over without spaces between them).
+
+ The default action is to push the old journal down into a 'bak' sub-directory and then create a
+ new journal of the same size and pack it with the records from the old. However, it is possible to
+ suppress the pushdown (using --no-pushdown), in which case either a new journal id (using
+ --new-base-filename) or an old journal id (usnig --old-base-filename) must be supplied. In the former
+ case,a new journal will be created using the new base file name alongside the old one. In the latter
+ case, the old journal will be renamed to the supplied name, and the new one will take the default.
+ Note that both can be specified together with the --no-pushdown option.
+
+ To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters. These
+ should be large enough to write all the records or an error will result. If the size is large enough
+ to write all records, but too small to keep below the enqueue threshold, a warning will be printed.
+ Note that as any valid size will be accepted, a journal can also be shrunk, as long as it is sufficiently
+ big to accept the transferred records.
+ """
+
+ BAK_DIR = "bak"
+ JFILE_SIZE_PGS_MIN = 1
+ JFILE_SIZE_PGS_MAX = 32768
+ NUM_JFILES_MIN = 4
+ NUM_JFILES_MAX = 64
+
+ def __init__(self):
+ """Constructor"""
+ self._opts = None
+ self._jdir = None
+ self._fname = None
+ self._fnum = None
+ self._file = None
+ self._file_rec_wr_cnt = None
+ self._filler_wr_cnt = None
+ self._last_rec_fid = None
+ self._last_rec_offs = None
+ self._rec_wr_cnt = None
+
+ self._jrnl_info = None
+ self._jrnl_analysis = None
+ self._jrnl_reader = None
+
+ self._process_args()
+ self._jrnl_info = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+ # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+ jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
+ self._jrnl_analysis = janal.JrnlAnalyzer(self._jrnl_info)
+ self._jrnl_reader = janal.JrnlReader(self._jrnl_info, self._jrnl_analysis, self._opts.qflag, self._opts.rflag,
+ self._opts.vflag)
+
+ def run(self):
+ """Perform the action of resizing the journal"""
+ if not self._opts.qflag:
+ print self._jrnl_analysis
+ self._jrnl_reader.run()
+ if self._opts.vflag:
+ print self._jrnl_info
+ if not self._opts.qflag:
+ print self._jrnl_reader.report(self._opts.vflag, self._opts.rflag)
+ self._handle_old_files()
+ self._create_new_files()
+ if not self._opts.qflag:
+ print "Transferred %d records to new journal." % self._rec_wr_cnt
+ self._chk_free()
+
+ def _chk_free(self):
+ """Check if sufficient space is available in resized journal to be able to enqueue. Raise a warning if not."""
+ if self._last_rec_fid == None or self._last_rec_offs == None:
+ return
+ wr_capacity_bytes = self._last_rec_fid * self._jrnl_info.get_jrnl_data_size_bytes() + self._last_rec_offs
+ tot_capacity_bytes = self._jrnl_info.get_tot_jrnl_data_size_bytes()
+ percent_full = 100.0 * wr_capacity_bytes / tot_capacity_bytes
+ if percent_full > 80.0:
+ raise jerr.JWarning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records"
+ " until some existing records are dequeued." %
+ (self._jrnl_info.get_jrnl_id(), percent_full))
+
+ def _create_new_files(self):
+ """Create new journal files"""
+ # Assemble records to be transfered
+ master_record_list = {}
+ txn_record_list = self._jrnl_reader.txn_obj_list()
+ if self._opts.vflag and self._jrnl_reader.emap().size() > 0:
+ print "* Assembling %d records from emap" % self._jrnl_reader.emap().size()
+ for tup in self._jrnl_reader.emap().get_rec_list():
+ hdr = tup[1]
+ hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[long(hdr.rid)] = hdr
+ if hdr.xidsize > 0 and hdr.xid in txn_record_list:
+ txn_hdr = txn_record_list[hdr.xid]
+ del(txn_record_list[hdr.xid])
+ txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[long(txn_hdr.rid)] = txn_hdr
+ if self._opts.vflag and self._jrnl_reader.tmap().size() > 0:
+ print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size()
+ for xid in self._jrnl_reader.tmap().xids():
+ for l in self._jrnl_reader.tmap().get(xid):
+ hdr = l[1]
+ hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[hdr.rid] = hdr
+ rid_list = master_record_list.keys()
+ rid_list.sort()
+
+ # get base filename
+ bfn = self._opts.bfn
+ if self._opts.nbfn != None:
+ bfn = self._opts.nbfn
+
+ # write jinf file
+ self._jrnl_info.resize(self._opts.njf, self._opts.jfs)
+ self._jrnl_info.write(self._jdir, bfn)
+
+ # write records
+ if self._opts.vflag:
+ print "* Transferring records to new journal files"
+ fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
+ while len(rid_list) > 0:
+ hdr = master_record_list[rid_list.pop(0)]
+ rec = hdr.encode()
+ pos = 0
+ while pos < len(rec):
+ if self._file == None or self._file.tell() >= self._jrnl_info.get_jrnl_file_size_bytes():
+ if self._file == None:
+ rid = hdr.rid
+ elif len(rid_list) == 0:
+ rid = 0
+ else:
+ rid = rid_list[0]
+ if not self._rotate_file(rid, fro):
+ raise jerr.JournalSpaceExceededError()
+ if len(rec) - pos <= self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell():
+ self._file.write(rec[pos:])
+ self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
+ pos = len(rec)
+ fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
+ else:
+ flen = self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell()
+ self._file.write(rec[pos:pos + flen])
+ pos += flen
+ rem = len(rec) - pos
+ if rem <= self._jrnl_info.get_jrnl_data_size_bytes():
+ fro = (jrnl.Utils.size_in_bytes_to_blk(self._jrnl_info.get_jrnl_sblk_size_bytes() + rem,
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
+ else:
+ fro = 0
+ self._rec_wr_cnt += 1
+ self._file_rec_wr_cnt += 1
+ self._fill_file(add_filler_recs = True)
+ while self._rotate_file():
+ pass
+
+ def _fill_file(self, to_posn = None, add_filler_recs = False):
+ """Fill a file to a known offset"""
+ if self._file == None:
+ return
+ if add_filler_recs:
+ nfr = int(jrnl.Utils.rem_bytes_in_blk(self._file, self._jrnl_info.get_jrnl_sblk_size_bytes()) /
+ self._jrnl_info.get_jrnl_dblk_size_bytes())
+ if nfr > 0:
+ self._filler_wr_cnt = nfr
+ for i in range(0, nfr):
+ self._file.write("RHMx")
+ self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
+ self._last_rec_fid = self._fnum
+ self._last_rec_offs = self._file.tell()
+ if to_posn == None:
+ to_posn = self._jrnl_info.get_jrnl_file_size_bytes()
+ elif to_posn > self._jrnl_info.get_jrnl_file_size_bytes():
+ raise jerr.FillExceedsFileSizeError(to_posn, self._jrnl_info.get_jrnl_file_size_bytes())
+ diff = to_posn - self._file.tell()
+ self._file.write(str("\0" * diff))
+ #DEBUG
+ if self._file.tell() != to_posn:
+ raise jerr.FillSizeError(self._file.tell(), to_posn)
+
+ def _rotate_file(self, rid = None, fro = None):
+ """Switch to the next logical file"""
+ if self._file != None:
+ self._file.close()
+ if self._opts.vflag:
+ if self._file_rec_wr_cnt == 0:
+ print " (empty)"
+ elif self._filler_wr_cnt == None:
+ print " (%d records)" % self._file_rec_wr_cnt
+ else:
+ print " (%d records + %d filler(s))" % (self._file_rec_wr_cnt, self._filler_wr_cnt)
+ if self._fnum == None:
+ self._fnum = 0
+ self._rec_wr_cnt = 0
+ elif self._fnum == self._jrnl_info.get_num_jrnl_files() - 1:
+ return False
+ else:
+ self._fnum += 1
+ self._file_rec_wr_cnt = 0
+ self._fname = os.path.join(self._jrnl_info.get_jrnl_dir(), "%s.%04x.jdat" %
+ (self._jrnl_info.get_jrnl_base_name(), self._fnum))
+ if self._opts.vflag:
+ print "* Opening file %s" % self._fname,
+ self._file = open(self._fname, "w")
+ if rid == None or fro == None:
+ self._fill_file()
+ else:
+ now = time.time()
+ fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.HDR_VER, int(jrnl.Hdr.BIG_ENDIAN), 0, rid)
+ fhdr.init(self._file, 0, self._fnum, self._fnum, fro, int(now), 1000000000*(now - int(now)))
+ self._file.write(fhdr.encode())
+ self._fill_file(self._jrnl_info.get_jrnl_sblk_size_bytes())
+ return True
+
+ def _handle_old_files(self):
+ """Push old journal down into a backup directory"""
+ target_dir = self._jdir
+ if not self._opts.npd:
+ target_dir = os.path.join(self._jdir, self.BAK_DIR)
+ if os.path.exists(target_dir):
+ if self._opts.vflag:
+ print "* Pushdown directory %s exists, deleting content" % target_dir
+ for fname in glob.glob(os.path.join(target_dir, "*")):
+ os.unlink(fname)
+ else:
+ if self._opts.vflag:
+ print "* Creating new pushdown directory %s" % target_dir
+ os.mkdir(target_dir)
+
+ if not self._opts.npd or self._opts.obfn != None:
+ if self._opts.obfn != None and self._opts.vflag:
+ print "* Renaming old journal files using base name %s" % self._opts.obfn
+ # .jdat files
+ for fname in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)):
+ tbfn = os.path.basename(fname)
+ if self._opts.obfn != None:
+ per1 = tbfn.rfind(".")
+ if per1 >= 0:
+ per2 = tbfn.rfind(".", 0, per1)
+ if per2 >= 0:
+ tbfn = "%s%s" % (self._opts.obfn, tbfn[per2:])
+ os.rename(fname, os.path.join(target_dir, tbfn))
+ # .jinf file
+ self._jrnl_info.write(target_dir, self._opts.obfn)
+ os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn))
+
+ def _print_options(self):
+ """Print program options"""
+ if self._opts.vflag:
+ print "Journal dir: %s" % self._jdir
+ print "Options: Base filename: %s" % self._opts.bfn
+ print " New base filename: %s" % self._opts.nbfn
+ print " Old base filename: %s" % self._opts.obfn
+ print " Pushdown: %s" % self._opts.npd
+ print " No. journal files: %d" % self._opts.njf
+ print " Journal file size: %d 64kiB blocks" % self._opts.jfs
+ print " Show records flag: %s" % self._opts.rflag
+ print " Verbose flag: %s" % True
+ print
+
+ def _process_args(self):
+ """Process the command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
+ action="store", dest="bfn", default="JournalData",
+ help="Base filename for old journal files")
+ opt.add_option("-B", "--new-base-filename",
+ action="store", dest="nbfn",
+ help="Base filename for new journal files")
+ opt.add_option("-n", "--no-pushdown",
+ action="store_true", dest="npd",
+ help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir")
+ opt.add_option("-N", "--num-jfiles",
+ action="store", type="int", dest="njf", default=8,
+ help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+ opt.add_option("-o", "--old-base-filename",
+ action="store", dest="obfn",
+ help="Base filename for old journal files")
+ opt.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ opt.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print remaining records and transactions")
+ opt.add_option("-s", "--jfile-size-pgs",
+ action="store", type="int", dest="jfs", default=24,
+ help="Size of each new journal file in 64kiB blocks (%d-%d)" %
+ (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+ opt.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self._opts, args) = opt.parse_args()
+ if len(args) == 0:
+ opt.error("No journal directory argument")
+ elif len(args) > 1:
+ opt.error("Too many positional arguments: %s" % args)
+ if self._opts.qflag and self._opts.rflag:
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ if self._opts.qflag and self._opts.vflag:
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX):
+ opt.error("Number of files (%d) is out of range (%d-%d)" %
+ (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+ if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or
+ self._opts.jfs > self.JFILE_SIZE_PGS_MAX):
+ opt.error("File size (%d) is out of range (%d-%d)" %
+ (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+ if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None):
+ opt.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and"
+ " (-o/--old-base-filename) must be used.")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
+ self._print_options()
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+ R = Resize()
+ try:
+ R.run()
+ except Exception, e:
+ sys.exit(e)
diff --git a/qpid/cpp/management/python/bin/qpid-tool b/qpid/cpp/management/python/bin/qpid-tool
new file mode 100755
index 0000000000..09ca2b8c13
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-tool
@@ -0,0 +1,799 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+import socket
+import locale
+from types import *
+from cmd import Cmd
+from shlex import split
+from threading import Lock
+from time import strftime, gmtime
+from qpidtoollibs import Display
+from qmf.console import Session, Console, SchemaClass, ObjectId
+
+class Mcli(Cmd):
+ """ Management Command Interpreter """
+
+ def __init__(self, dataObject, dispObject):
+ Cmd.__init__(self)
+ self.dataObject = dataObject
+ self.dispObject = dispObject
+ self.dataObject.setCli(self)
+ self.prompt = "qpid: "
+
+ def emptyline(self):
+ pass
+
+ def setPromptMessage(self, p):
+ if p == None:
+ self.prompt = "qpid: "
+ else:
+ self.prompt = "qpid[%s]: " % p
+
+ def do_help(self, data):
+ print "Management Tool for QPID"
+ print
+ print "Commands:"
+ print " agents - Print a list of the known Agents"
+ print " list - Print summary of existing objects by class"
+ print " list <className> - Print list of objects of the specified class"
+ print " list <className> active - Print list of non-deleted objects of the specified class"
+# print " show <className> - Print contents of all objects of specified class"
+# print " show <className> active - Print contents of all non-deleted objects of specified class"
+ print " show <ID> - Print contents of an object (infer className)"
+# print " show <className> <list-of-IDs> - Print contents of one or more objects"
+# print " list is space-separated, ranges may be specified (i.e. 1004-1010)"
+ print " call <ID> <methodName> [<args>] - Invoke a method on an object"
+ print " schema - Print summary of object classes seen on the target"
+ print " schema <className> - Print details of an object class"
+ print " set time-format short - Select short timestamp format (default)"
+ print " set time-format long - Select long timestamp format"
+ print " quit or ^D - Exit the program"
+ print
+
+ def complete_set(self, text, line, begidx, endidx):
+ """ Command completion for the 'set' command """
+ tokens = split(line)
+ if len(tokens) < 2:
+ return ["time-format "]
+ elif tokens[1] == "time-format":
+ if len(tokens) == 2:
+ return ["long", "short"]
+ elif len(tokens) == 3:
+ if "long".find(text) == 0:
+ return ["long"]
+ elif "short".find(text) == 0:
+ return ["short"]
+ elif "time-format".find(text) == 0:
+ return ["time-format "]
+ return []
+
+ def do_set(self, data):
+ tokens = split(data)
+ try:
+ if tokens[0] == "time-format":
+ self.dispObject.do_setTimeFormat(tokens[1])
+ except:
+ pass
+
+ def complete_schema(self, text, line, begidx, endidx):
+ tokens = split(line)
+ if len(tokens) > 2:
+ return []
+ return self.dataObject.classCompletions(text)
+
+ def do_schema(self, data):
+ try:
+ self.dataObject.do_schema(data)
+ except Exception, e:
+ print "Exception in do_schema: %r" % e
+
+ def do_agents(self, data):
+ try:
+ self.dataObject.do_agents(data)
+ except Exception, e:
+ print "Exception in do_agents: %r" % e
+
+ def do_id(self, data):
+ try:
+ self.dataObject.do_id(data)
+ except Exception, e:
+ print "Exception in do_id: %r" % e
+
+ def complete_list(self, text, line, begidx, endidx):
+ tokens = split(line)
+ if len(tokens) > 2:
+ return []
+ return self.dataObject.classCompletions(text)
+
+ def do_list(self, data):
+ try:
+ self.dataObject.do_list(data)
+ except Exception, e:
+ print "Exception in do_list: %r" % e
+
+ def do_show(self, data):
+ try:
+ self.dataObject.do_show(data)
+ except Exception, e:
+ print "Exception in do_show: %r" % e
+
+ def do_call(self, data):
+ try:
+ self.dataObject.do_call(data)
+ except Exception, e:
+ print "Exception in do_call: %r" % e
+
+ def do_EOF(self, data):
+ print "quit"
+ try:
+ self.dataObject.do_exit()
+ except:
+ pass
+ return True
+
+ def do_quit(self, data):
+ try:
+ self.dataObject.do_exit()
+ except:
+ pass
+ return True
+
+ def postcmd(self, stop, line):
+ return stop
+
+ def postloop(self):
+ print "Exiting..."
+ self.dataObject.close()
+
+#======================================================================================================
+# QmfData
+#======================================================================================================
+class QmfData(Console):
+ """
+ """
+ def __init__(self, disp, url, conn_options):
+ self.disp = disp
+ self.url = url
+ self.session = Session(self, manageConnections=True)
+ self.broker = self.session.addBroker(self.url, **conn_options)
+ self.lock = Lock()
+ self.connected = None
+ self.closing = None
+ self.first_connect = True
+ self.cli = None
+ self.idRegistry = IdRegistry()
+ self.objects = {}
+
+ #=======================
+ # Methods to support CLI
+ #=======================
+ def setCli(self, cli):
+ self.cli = cli
+
+ def close(self):
+ try:
+ self.closing = True
+ if self.session and self.broker:
+ self.session.delBroker(self.broker)
+ except:
+ pass # we're shutting down - ignore any errors
+
+ def classCompletions(self, text):
+ pass
+
+ def do_schema(self, data):
+ if data == "":
+ self.schemaSummary()
+ else:
+ self.schemaTable(data)
+
+ def do_agents(self, data):
+ agents = self.session.getAgents()
+ rows = []
+ for agent in agents:
+ version = 1
+ if agent.isV2:
+ version = 2
+ rows.append(("%d.%s" % (agent.getBrokerBank(), agent.getAgentBank()), agent.label, agent.epoch, version))
+ self.disp.table("QMF Agents:", ("Agent Name", "Label", "Epoch", "QMF Version"), rows)
+
+ def do_id(self, data):
+ tokens = data.split()
+ for token in tokens:
+ if not token.isdigit():
+ print "Value %s is non-numeric" % token
+ return
+ title = "Translation of Display IDs:"
+ heads = ('DisplayID', 'Epoch', 'Agent', 'ObjectName')
+ if len(tokens) == 0:
+ tokens = self.idRegistry.getDisplayIds()
+ rows = []
+ for token in tokens:
+ rows.append(self.idRegistry.getIdInfo(int(token)))
+ self.disp.table(title, heads, rows)
+
+ def do_list(self, data):
+ tokens = data.split()
+ if len(tokens) == 0:
+ self.listClasses()
+ else:
+ self.listObjects(tokens)
+
+ def do_show(self, data):
+ tokens = data.split()
+ if len(tokens) == 0:
+ print "Missing Class or ID"
+ return
+ keys = self.classKeysByToken(tokens[0])
+ if keys:
+ self.showObjectsByKey(keys)
+ elif tokens[0].isdigit():
+ self.showObjectById(int(tokens[0]))
+
+ def _build_object_name(self, obj):
+ values = []
+ for p,v in obj.getProperties():
+ if p.name != "vhostRef" and p.index == 1:
+ if p.name == "brokerRef": # reference to broker
+ values.append('org.apache.qpid.broker:broker:amqp-broker')
+ else:
+ values.append(str(v))
+
+ object_key = ",".join(values)
+ class_key = obj.getClassKey();
+ return class_key.getPackageName() + ":" + class_key.getClassName() + ":" + object_key
+
+
+ def do_call(self, data):
+ tokens = data.split()
+ if len(tokens) < 2:
+ print "Not enough arguments supplied"
+ return
+ displayId = long(tokens[0])
+ methodName = tokens[1]
+ args = []
+ for arg in tokens[2:]:
+ ##
+ ## If the argument is a map, list, boolean, integer, or floating (one decimal point),
+ ## run it through the Python evaluator so it is converted to the correct type.
+ ##
+ ## TODO: use a regex for this instead of this convoluted logic,
+ ## or even consider passing all args through eval() [which would
+ ## be a minor change to the interface as string args would then
+ ## always need to be quoted as strings within a map/list would
+ ## now]
+ if arg[0] == '{' or arg[0] == '[' or arg[0] == '"' or arg[0] == '\'' or arg == "True" or arg == "False" or \
+ ((arg.count('.') < 2 and (arg.count('-') == 0 or \
+ (arg.count('-') == 1 and arg[0] == '-')) and \
+ arg.replace('.','').replace('-','').isdigit())):
+ args.append(eval(arg))
+ else:
+ args.append(arg)
+
+ obj = None
+ try:
+ self.lock.acquire()
+ if displayId not in self.objects:
+ print "Unknown ID"
+ return
+ obj = self.objects[displayId]
+ finally:
+ self.lock.release()
+
+ object_id = obj.getObjectId();
+ if not object_id.isV2 and obj.getAgent().isV2:
+ object_name = self._build_object_name(obj)
+ object_id = ObjectId.create(object_id.agentName, object_name)
+
+ self.session._sendMethodRequest(self.broker, obj.getClassKey(), object_id, methodName, args)
+
+
+ def do_exit(self):
+ pass
+
+ #====================
+ # Sub-Command Methods
+ #====================
+ def schemaSummary(self, package_filter=None):
+ rows = []
+ packages = self.session.getPackages()
+ for package in packages:
+ if package_filter and package_filter != package:
+ continue
+ keys = self.session.getClasses(package)
+ for key in keys:
+ kind = "object"
+ schema = self.session.getSchema(key)
+ if schema:
+ if schema.kind == SchemaClass.CLASS_KIND_EVENT:
+ kind = "event"
+ if schema.kind == SchemaClass.CLASS_KIND_TABLE:
+ #
+ # Don't display event schemata. This will be a future feature.
+ #
+ rows.append((package, key.getClassName(), kind))
+ self.disp.table("QMF Classes:", ("Package", "Name", "Kind"), rows)
+
+ def schemaTable(self, text):
+ packages = self.session.getPackages()
+ if text in packages:
+ self.schemaSummary(package_filter=text)
+ for package in packages:
+ keys = self.session.getClasses(package)
+ for key in keys:
+ if text == key.getClassName() or text == package + ":" + key.getClassName():
+ schema = self.session.getSchema(key)
+ if schema.kind == SchemaClass.CLASS_KIND_TABLE:
+ self.schemaObject(schema)
+ else:
+ self.schemaEvent(schema)
+
+ def schemaObject(self, schema):
+ rows = []
+ title = "Object Class: %s" % schema.__repr__()
+ heads = ("Element", "Type", "Access", "Unit", "Notes", "Description")
+ for prop in schema.getProperties():
+ notes = ""
+ if prop.index : notes += "index "
+ if prop.optional : notes += "optional "
+ row = (prop.name, self.typeName(prop.type), self.accessName(prop.access),
+ self.notNone(prop.unit), notes, self.notNone(prop.desc))
+ rows.append(row)
+ for stat in schema.getStatistics():
+ row = (stat.name, self.typeName(stat.type), "", self.notNone(stat.unit), "", self.notNone(stat.desc))
+ rows.append(row)
+ self.disp.table(title, heads, rows)
+
+ for method in schema.methods:
+ rows = []
+ heads = ("Argument", "Type", "Direction", "Unit", "Description")
+ title = " Method: %s" % method.name
+ for arg in method.arguments:
+ row = (arg.name, self.typeName(arg.type), arg.dir, self.notNone(arg.unit), self.notNone(arg.desc))
+ rows.append(row)
+ print
+ self.disp.table(title, heads, rows)
+
+ def schemaEvent(self, schema):
+ rows = []
+ title = "Event Class: %s" % schema.__repr__()
+ heads = ("Element", "Type", "Unit", "Description")
+ for arg in schema.arguments:
+ row = (arg.name, self.typeName(arg.type), self.notNone(arg.unit), self.notNone(arg.desc))
+ rows.append(row)
+ self.disp.table(title, heads, rows)
+
+ def listClasses(self):
+ title = "Summary of Objects by Type:"
+ heads = ("Package", "Class", "Active", "Deleted")
+ rows = []
+ totals = {}
+ try:
+ self.lock.acquire()
+ for dispId in self.objects:
+ obj = self.objects[dispId]
+ key = obj.getClassKey()
+ index = (key.getPackageName(), key.getClassName())
+ if index in totals:
+ stats = totals[index]
+ else:
+ stats = (0, 0)
+ if obj.isDeleted():
+ stats = (stats[0], stats[1] + 1)
+ else:
+ stats = (stats[0] + 1, stats[1])
+ totals[index] = stats
+ finally:
+ self.lock.release()
+
+ for index in totals:
+ stats = totals[index]
+ rows.append((index[0], index[1], stats[0], stats[1]))
+ self.disp.table(title, heads, rows)
+
+ def listObjects(self, tokens):
+ ckeys = self.classKeysByToken(tokens[0])
+ show_deleted = True
+ if len(tokens) > 1 and tokens[1] == 'active':
+ show_deleted = None
+ heads = ("ID", "Created", "Destroyed", "Index")
+ rows = []
+ try:
+ self.lock.acquire()
+ for dispId in self.objects:
+ obj = self.objects[dispId]
+ if obj.getClassKey() in ckeys:
+ utime, ctime, dtime = obj.getTimestamps()
+ dtimestr = self.disp.timestamp(dtime)
+ if dtime == 0:
+ dtimestr = "-"
+ if dtime == 0 or (dtime > 0 and show_deleted):
+ row = (dispId, self.disp.timestamp(ctime), dtimestr, self.objectIndex(obj))
+ rows.append(row)
+ finally:
+ self.lock.release()
+ self.disp.table("Object Summary:", heads, rows)
+
+ def showObjectsByKey(self, key):
+ pass
+
+ def showObjectById(self, dispId):
+ heads = ("Attribute", str(dispId))
+ rows = []
+ try:
+ self.lock.acquire()
+ if dispId in self.objects:
+ obj = self.objects[dispId]
+ caption = "Object of type: %r" % obj.getClassKey()
+ for prop in obj.getProperties():
+ row = (prop[0].name, self.valueByType(prop[0].type, prop[1]))
+ rows.append(row)
+ for stat in obj.getStatistics():
+ row = (stat[0].name, self.valueByType(stat[0].type, stat[1]))
+ rows.append(row)
+ else:
+ print "No object found with ID %d" % dispId
+ return
+ finally:
+ self.lock.release()
+ self.disp.table(caption, heads, rows)
+
+ def classKeysByToken(self, token):
+ """
+ Given a token, return a list of matching class keys (if found):
+ token formats: <class-name>
+ <package-name>:<class-name>
+ """
+ pname = None
+ cname = None
+ parts = token.split(':')
+ if len(parts) == 1:
+ cname = parts[0]
+ elif len(parts) == 2:
+ pname = parts[0]
+ cname = parts[1]
+ else:
+ raise ValueError("Invalid Class Name: %s" % token)
+
+ keys = []
+ packages = self.session.getPackages()
+ for p in packages:
+ if pname == None or pname == p:
+ classes = self.session.getClasses(p)
+ for key in classes:
+ if key.getClassName() == cname:
+ keys.append(key)
+ return keys
+
+ def typeName (self, typecode):
+ """ Convert type-codes to printable strings """
+ if typecode == 1: return "uint8"
+ elif typecode == 2: return "uint16"
+ elif typecode == 3: return "uint32"
+ elif typecode == 4: return "uint64"
+ elif typecode == 5: return "bool"
+ elif typecode == 6: return "short-string"
+ elif typecode == 7: return "long-string"
+ elif typecode == 8: return "abs-time"
+ elif typecode == 9: return "delta-time"
+ elif typecode == 10: return "reference"
+ elif typecode == 11: return "boolean"
+ elif typecode == 12: return "float"
+ elif typecode == 13: return "double"
+ elif typecode == 14: return "uuid"
+ elif typecode == 15: return "field-table"
+ elif typecode == 16: return "int8"
+ elif typecode == 17: return "int16"
+ elif typecode == 18: return "int32"
+ elif typecode == 19: return "int64"
+ elif typecode == 20: return "object"
+ elif typecode == 21: return "list"
+ elif typecode == 22: return "array"
+ else:
+ raise ValueError ("Invalid type code: %s" % str(typecode))
+
+ def valueByType(self, typecode, val):
+ if type(val) is type(None):
+ return "absent"
+ if typecode == 1: return "%d" % val
+ elif typecode == 2: return "%d" % val
+ elif typecode == 3: return "%d" % val
+ elif typecode == 4: return "%d" % val
+ elif typecode == 6: return val
+ elif typecode == 7: return val
+ elif typecode == 8: return strftime("%c", gmtime(val / 1000000000))
+ elif typecode == 9:
+ if val < 0: val = 0
+ sec = val / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
+
+ elif typecode == 10: return str(self.idRegistry.displayId(val))
+ elif typecode == 11:
+ if val:
+ return "True"
+ else:
+ return "False"
+
+ elif typecode == 12: return "%f" % val
+ elif typecode == 13: return "%f" % val
+ elif typecode == 14: return "%r" % val
+ elif typecode == 15: return "%r" % val
+ elif typecode == 16: return "%d" % val
+ elif typecode == 17: return "%d" % val
+ elif typecode == 18: return "%d" % val
+ elif typecode == 19: return "%d" % val
+ elif typecode == 20: return "%r" % val
+ elif typecode == 21: return "%r" % val
+ elif typecode == 22: return "%r" % val
+ else:
+ raise ValueError ("Invalid type code: %s" % str(typecode))
+
+ def accessName (self, code):
+ """ Convert element access codes to printable strings """
+ if code == '1': return "ReadCreate"
+ elif code == '2': return "ReadWrite"
+ elif code == '3': return "ReadOnly"
+ else:
+ raise ValueError ("Invalid access code: %s" % str(code))
+
+ def notNone (self, text):
+ if text == None:
+ return ""
+ else:
+ return text
+
+ def objectIndex(self, obj):
+ if obj._objectId.isV2:
+ return obj._objectId.getObject()
+ result = ""
+ first = True
+ props = obj.getProperties()
+ for prop in props:
+ if prop[0].index:
+ if not first:
+ result += "."
+ result += self.valueByType(prop[0].type, prop[1])
+ first = None
+ return result
+
+
+ #=====================
+ # Methods from Console
+ #=====================
+ def brokerConnectionFailed(self, broker):
+ """ Invoked when a connection to a broker fails """
+ if self.first_connect:
+ self.first_connect = None
+ print "Failed to connect: ", broker.error
+
+ def brokerConnected(self, broker):
+ """ Invoked when a connection is established to a broker """
+ try:
+ self.lock.acquire()
+ self.connected = True
+ finally:
+ self.lock.release()
+ if not self.first_connect:
+ print "Broker connected:", broker
+ self.first_connect = None
+
+ def brokerDisconnected(self, broker):
+ """ Invoked when the connection to a broker is lost """
+ try:
+ self.lock.acquire()
+ self.connected = None
+ finally:
+ self.lock.release()
+ if not self.closing:
+ print "Broker disconnected:", broker
+
+ def objectProps(self, broker, record):
+ """ Invoked when an object is updated. """
+ oid = record.getObjectId()
+ dispId = self.idRegistry.displayId(oid)
+ try:
+ self.lock.acquire()
+ if dispId in self.objects:
+ self.objects[dispId].mergeUpdate(record)
+ else:
+ self.objects[dispId] = record
+ finally:
+ self.lock.release()
+
+ def objectStats(self, broker, record):
+ """ Invoked when an object is updated. """
+ oid = record.getObjectId()
+ dispId = self.idRegistry.displayId(oid)
+ try:
+ self.lock.acquire()
+ if dispId in self.objects:
+ self.objects[dispId].mergeUpdate(record)
+ finally:
+ self.lock.release()
+
+ def event(self, broker, event):
+ """ Invoked when an event is raised. """
+ pass
+
+ def methodResponse(self, broker, seq, response):
+ print response
+
+
+#======================================================================================================
+# IdRegistry
+#======================================================================================================
+class IdRegistry(object):
+ """
+ """
+ def __init__(self):
+ self.next_display_id = 101
+ self.oid_to_display = {}
+ self.display_to_oid = {}
+ self.lock = Lock()
+
+ def displayId(self, oid):
+ try:
+ self.lock.acquire()
+ if oid in self.oid_to_display:
+ return self.oid_to_display[oid]
+ newId = self.next_display_id
+ self.next_display_id += 1
+ self.oid_to_display[oid] = newId
+ self.display_to_oid[newId] = oid
+ return newId
+ finally:
+ self.lock.release()
+
+ def objectId(self, displayId):
+ try:
+ self.lock.acquire()
+ if displayId in self.display_to_oid:
+ return self.display_to_oid[displayId]
+ return None
+ finally:
+ self.lock.release()
+
+ def getDisplayIds(self):
+ result = []
+ for displayId in self.display_to_oid:
+ result.append(str(displayId))
+ return result
+
+ def getIdInfo(self, displayId):
+ """
+ Given a display ID, return a tuple of (displayID, bootSequence/Durable, AgentBank/Name, ObjectName)
+ """
+ oid = self.objectId(displayId)
+ if oid == None:
+ return (displayId, "?", "unknown", "unknown")
+ bootSeq = oid.getSequence()
+ if bootSeq == 0:
+ bootSeq = '<durable>'
+ agent = oid.getAgentBank()
+ if agent == '0':
+ agent = 'Broker'
+ return (displayId, bootSeq, agent, oid.getObject())
+
+#=========================================================
+# Option Parsing
+#=========================================================
+
+def parse_options( argv ):
+ _usage = """qpid-tool [OPTIONS] [[<username>/<password>@]<target-host>[:<tcp-port>]]"""
+
+ parser = optparse.OptionParser(usage=_usage)
+ parser.add_option("-b", "--broker", action="store", type="string", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
+ parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
+ parser.add_option("--ssl-certificate",
+ action="store", type="string", metavar="<path>",
+ help="SSL certificate for client authentication")
+ parser.add_option("--ssl-key",
+ action="store", type="string", metavar="<path>",
+ help="Private key (if not contained in certificate)")
+
+ opts, encArgs = parser.parse_args(args=argv)
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ conn_options = {}
+ broker_option = None
+ if opts.broker:
+ broker_option = opts.broker
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ if not opts.ssl_certificate:
+ parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ conn_options['ssl_keyfile'] = opts.ssl_key
+ if opts.sasl_mechanism:
+ conn_options['mechanisms'] = opts.sasl_mechanism
+ if opts.sasl_service_name:
+ conn_options['service'] = opts.sasl_service_name
+ return broker_option, conn_options, args[1:]
+
+#=========================================================
+# Main Program
+#=========================================================
+
+
+# Get options specified on the command line
+broker_option, conn_options, cargs = parse_options(sys.argv)
+
+_host = "localhost"
+if broker_option is not None:
+ _host = broker_option
+elif len(cargs) > 0:
+ _host = cargs[0]
+
+# note: prior to supporting options, qpid-tool assumed positional parameters.
+# the first argument was assumed to be the broker address. The second argument
+# was optional, and, if supplied, was assumed to be the path to the
+# certificate. To preserve backward compatibility, accept the certificate if
+# supplied via the second parameter.
+#
+if 'ssl_certfile' not in conn_options:
+ if len(cargs) > 1:
+ conn_options['ssl_certfile'] = cargs[1]
+
+disp = Display()
+
+# Attempt to make a connection to the target broker
+try:
+ data = QmfData(disp, _host, conn_options)
+except Exception, e:
+ if str(e).find("Exchange not found") != -1:
+ print "Management not enabled on broker: Use '-m yes' option on broker startup."
+ else:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+# Instantiate the CLI interpreter and launch it.
+cli = Mcli(data, disp)
+print("Management Tool for QPID")
+try:
+ cli.cmdloop()
+except KeyboardInterrupt:
+ print
+ print "Exiting..."
+except Exception, e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+# alway attempt to cleanup broker resources
+data.close()
diff --git a/qpid/cpp/management/python/bin/qpid-tool.bat b/qpid/cpp/management/python/bin/qpid-tool.bat
new file mode 100644
index 0000000000..7eb0210da2
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-tool.bat
@@ -0,0 +1,2 @@
+@echo off
+python %~dp0\qpid-tool %*