diff options
Diffstat (limited to 'qpid/cpp/management/python/lib/qpidtoollibs')
-rw-r--r-- | qpid/cpp/management/python/lib/qpidtoollibs/__init__.py | 22 | ||||
-rw-r--r-- | qpid/cpp/management/python/lib/qpidtoollibs/broker.py | 486 | ||||
-rw-r--r-- | qpid/cpp/management/python/lib/qpidtoollibs/config.py | 36 | ||||
-rw-r--r-- | qpid/cpp/management/python/lib/qpidtoollibs/disp.py | 270 |
4 files changed, 814 insertions, 0 deletions
diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/__init__.py b/qpid/cpp/management/python/lib/qpidtoollibs/__init__.py new file mode 100644 index 0000000000..2815bac22f --- /dev/null +++ b/qpid/cpp/management/python/lib/qpidtoollibs/__init__.py @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpidtoollibs.broker import * +from qpidtoollibs.disp import * + diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/broker.py b/qpid/cpp/management/python/lib/qpidtoollibs/broker.py new file mode 100644 index 0000000000..fca6680067 --- /dev/null +++ b/qpid/cpp/management/python/lib/qpidtoollibs/broker.py @@ -0,0 +1,486 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from qpidtoollibs.disp import TimeLong +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +class BrokerAgent(object): + """ + Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection + or qpid_messaging.Connection + """ + def __init__(self, conn): + # Use the Message class from the same module as conn which could be qpid.messaging + # or qpid_messaging + self.message_class = sys.modules[conn.__class__.__module__].Message + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}}" % str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def close(self): + """ + Close the proxy session. This will not affect the connection used in creating the object. + """ + self.sess.close() + + def _method(self, method, arguments=None, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments or {}} + + message = self.message_class( + content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(timeout) + self.sess.acknowledge() + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def _sendRequest(self, opcode, content): + props = {'method' : 'request', + 'qmf.opcode' : opcode, + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + message = self.message_class( + content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + return correlator + + def _doClassQuery(self, class_name): + query = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + return items + + def _doNameQuery(self, object_id): + query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + if len(items) == 1: + return items[0] + return None + + def _getAllBrokerObjects(self, cls): + items = self._doClassQuery(cls.__name__.lower()) + objs = [] + for item in items: + objs.append(cls(self, item)) + return objs + + def _getBrokerObject(self, cls, oid): + obj = self._doNameQuery(oid) + if obj: + return cls(self, obj) + return None + + def _getSingleObject(self, cls): + # + # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because + # of a bug that used to be in the broker whereby by-name queries did not return the + # object timestamps. + # + objects = self._getAllBrokerObjects(cls) + if objects: return objects[0] + return None + + def getBroker(self): + """ + Get the Broker object that contains broker-scope statistics and operations. + """ + return self._getSingleObject(Broker) + + + def getCluster(self): + return self._getSingleObject(Cluster) + + def getHaBroker(self): + return self._getSingleObject(HaBroker) + + def getAllConnections(self): + return self._getAllBrokerObjects(Connection) + + def getConnection(self, oid): + return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid) + + def getAllSessions(self): + return self._getAllBrokerObjects(Session) + + def getSession(self, oid): + return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid) + + def getAllSubscriptions(self): + return self._getAllBrokerObjects(Subscription) + + def getSubscription(self, oid): + return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) + + def getAllExchanges(self): + return self._getAllBrokerObjects(Exchange) + + def getExchange(self, name): + return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name) + + def getAllQueues(self): + return self._getAllBrokerObjects(Queue) + + def getQueue(self, name): + return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name) + + def getAllBindings(self): + return self._getAllBrokerObjects(Binding) + + def getAllLinks(self): + return self._getAllBrokerObjects(Link) + + def getAcl(self): + return self._getSingleObject(Acl) + + def getMemory(self): + return self._getSingleObject(Memory) + + def echo(self, sequence = 1, body = "Body"): + """Request a response to test the path to the management broker""" + args = {'sequence' : sequence, 'body' : body} + return self._method('echo', args) + + def connect(self, host, port, durable, authMechanism, username, password, transport): + """Establish a connection to another broker""" + pass + + def queueMoveMessages(self, srcQueue, destQueue, qty): + """Move messages from one queue to another""" + self._method("queueMoveMessages", {'srcQueue':srcQueue,'destQueue':destQueue,'qty':qty}) + + def queueRedirect(self, sourceQueue, targetQueue): + """Enable/disable delivery redirect for indicated queues""" + self._method("queueRedirect", {'sourceQueue':sourceQueue,'targetQueue':targetQueue}) + + def setLogLevel(self, level): + """Set the log level""" + self._method("setLogLevel", {'level':level}) + + def getLogLevel(self): + """Get the log level""" + return self._method('getLogLevel') + + def setTimestampConfig(self, receive): + """Set the message timestamping configuration""" + self._method("setTimestampConfig", {'receive':receive}) + + def getTimestampConfig(self): + """Get the message timestamping configuration""" + return self._method('getTimestampConfig') + + def setLogHiresTimestamp(self, logHires): + """Set the high resolution timestamp in logs""" + self._method("setLogHiresTimestamp", {'logHires':logHires}) + + def getLogHiresTimestamp(self): + """Get the high resolution timestamp in logs""" + return self._method('getLogHiresTimestamp') + + def addExchange(self, exchange_type, name, options={}, **kwargs): + properties = {} + properties['exchange-type'] = exchange_type + for k,v in options.items(): + properties[k] = v + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'exchange', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delExchange(self, name): + args = {'type': 'exchange', 'name': name} + self._method('delete', args) + + def addQueue(self, name, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'queue', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delQueue(self, name, if_empty=True, if_unused=True): + options = {'if_empty': if_empty, + 'if_unused': if_unused} + + args = {'type': 'queue', + 'name': name, + 'options': options} + self._method('delete', args) + + def bind(self, exchange, queue, key="", options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'properties': properties, + 'strict': True} + self._method('create', args) + + def unbind(self, exchange, queue, key, **kwargs): + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'strict': True} + self._method('delete', args) + + def reloadAclFile(self): + self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): + args = {'userId': userName, + 'action': action, + 'object': aclObj, + 'objectName': aclObjName, + 'propertyMap': propMap} + return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def acl_lookupPublish(self, userName, exchange, key): + args = {'userId': userName, + 'exchangeName': exchange, + 'routingKey': key} + return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def Redirect(self, sourceQueue, targetQueue): + args = {'sourceQueue': sourceQueue, + 'targetQueue': targetQueue} + return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker") + + def create(self, _type, name, properties={}, strict=False): + """Create an object of the specified type""" + args = {'type': _type, + 'name': name, + 'properties': properties, + 'strict': strict} + return self._method('create', args) + + def delete(self, _type, name, options): + """Delete an object of the specified type""" + args = {'type': _type, + 'name': name, + 'options': options} + return self._method('delete', args) + + def list(self, _type): + """List objects of the specified type""" + return [i["_values"] for i in self._doClassQuery(_type.lower())] + + def query(self, _type, oid): + """Query the current state of an object""" + return self._getBrokerObject(self, _type, oid) + + +class EventHelper(object): + def eventAddress(self, pkg='*', cls='*', sev='*'): + return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev) + + def event(self, msg): + return BrokerEvent(msg) + + +class BrokerEvent(object): + def __init__(self, msg): + self.msg = msg + self.content = msg.content[0] + self.values = self.content['_values'] + self.schema_id = self.content['_schema_id'] + self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name']) + + def __repr__(self): + rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name) + for k,v in self.values.items(): + rep = rep + " %s=%s" % (k, v) + return rep + + def __getattr__(self, key): + if key not in self.values: + return None + value = self.values[key] + return value + + def getAttributes(self): + return self.values + + def getTimestamp(self): + return self.content['_timestamp'] + + +class BrokerObject(object): + def __init__(self, broker, content): + self.broker = broker + self.content = content + self.values = content['_values'] + + def __getattr__(self, key): + if key not in self.values: + return None + value = self.values[key] + if value.__class__ == dict and '_object_name' in value: + full_name = value['_object_name'] + colon = full_name.find(':') + if colon > 0: + full_name = full_name[colon+1:] + colon = full_name.find(':') + if colon > 0: + return full_name[colon+1:] + return value + + def getObjectId(self): + return self.content['_object_id']['_object_name'] + + def getAttributes(self): + return self.values + + def getCreateTime(self): + return self.content['_create_ts'] + + def getDeleteTime(self): + return self.content['_delete_ts'] + + def getUpdateTime(self): + return self.content['_update_ts'] + + def update(self): + """ + Reload the property values from the agent. + """ + refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId()) + if refreshed: + self.content = refreshed.content + self.values = self.content['_values'] + else: + raise Exception("No longer exists on the broker") + +class Broker(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Cluster(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class HaBroker(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Memory(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Connection(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def close(self): + self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address) + +class Session(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Subscription(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def __repr__(self): + return "subscription name undefined" + +class Exchange(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Binding(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def __repr__(self): + return "Binding key: %s" % self.values['bindingKey'] + +class Queue(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def purge(self, request): + """Discard all or some messages on a queue""" + self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) + + def reroute(self, request, useAltExchange, exchange, filter={}): + """Remove all or some messages on this queue and route them to an exchange""" + self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, + "org.apache.qpid.broker:queue:%s" % self.name) + +class Link(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Acl(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/config.py b/qpid/cpp/management/python/lib/qpidtoollibs/config.py new file mode 100644 index 0000000000..9168215ac3 --- /dev/null +++ b/qpid/cpp/management/python/lib/qpidtoollibs/config.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""Utilities for managing configuration files""" +import os + +QPID_ENV_PREFIX="QPID_" + +def parse_qpidd_conf(config_file): + """Parse a qpidd.conf configuration file into a dictionary""" + f = open(config_file) + try: + clean = filter(None, [line.split("#")[0].strip() for line in f]) # Strip comments and blanks + def item(line): return [x.strip() for x in line.split("=")] + config = dict(item(line) for line in clean if "=" in line) + finally: f.close() + def name(env_name): return env_name[len(QPID_ENV_PREFIX):].lower() + env = dict((name(i[0]), i[1]) for i in os.environ.iteritems() if i[0].startswith(QPID_ENV_PREFIX)) + config.update(env) # Environment takes precedence + return config diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/disp.py b/qpid/cpp/management/python/lib/qpidtoollibs/disp.py new file mode 100644 index 0000000000..1b7419ba2c --- /dev/null +++ b/qpid/cpp/management/python/lib/qpidtoollibs/disp.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from time import strftime, gmtime + +def YN(val): + if val: + return 'Y' + return 'N' + +def Commas(value): + sval = str(value) + result = "" + while True: + if len(sval) == 0: + return result + left = sval[:-3] + right = sval[-3:] + result = right + result + if len(left) > 0: + result = ',' + result + sval = left + +def TimeLong(value): + return strftime("%c", gmtime(value / 1000000000)) + +def TimeShort(value): + return strftime("%X", gmtime(value / 1000000000)) + + +class Header: + """ """ + NONE = 1 + KMG = 2 + YN = 3 + Y = 4 + TIME_LONG = 5 + TIME_SHORT = 6 + DURATION = 7 + COMMAS = 8 + + def __init__(self, text, format=NONE): + self.text = text + self.format = format + + def __repr__(self): + return self.text + + def __str__(self): + return self.text + + def formatted(self, value): + try: + if value == None: + return '' + if self.format == Header.NONE: + return value + if self.format == Header.KMG: + return self.num(value) + if self.format == Header.YN: + if value: + return 'Y' + return 'N' + if self.format == Header.Y: + if value: + return 'Y' + return '' + if self.format == Header.TIME_LONG: + return TimeLong(value) + if self.format == Header.TIME_SHORT: + return TimeShort(value) + if self.format == Header.DURATION: + if value < 0: value = 0 + sec = value / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + if self.format == Header.COMMAS: + return Commas(value) + except: + return "?" + + def numCell(self, value, tag): + fp = float(value) / 1000. + if fp < 10.0: + return "%1.2f%c" % (fp, tag) + if fp < 100.0: + return "%2.1f%c" % (fp, tag) + return "%4d%c" % (value / 1000, tag) + + def num(self, value): + if value < 1000: + return "%4d" % value + if value < 1000000: + return self.numCell(value, 'k') + value /= 1000 + if value < 1000000: + return self.numCell(value, 'm') + value /= 1000 + return self.numCell(value, 'g') + + +class Display: + """ Display formatting for QPID Management CLI """ + + def __init__(self, spacing=2, prefix=" "): + self.tableSpacing = spacing + self.tablePrefix = prefix + self.timestampFormat = "%X" + + def formattedTable(self, title, heads, rows): + fRows = [] + for row in rows: + fRow = [] + col = 0 + for cell in row: + fRow.append(heads[col].formatted(cell)) + col += 1 + fRows.append(fRow) + headtext = [] + for head in heads: + headtext.append(head.text) + self.table(title, headtext, fRows) + + def table(self, title, heads, rows): + """ Print a table with autosized columns """ + + # Pad the rows to the number of heads + for row in rows: + diff = len(heads) - len(row) + for idx in range(diff): + row.append("") + + print title + if len (rows) == 0: + return + colWidth = [] + col = 0 + line = self.tablePrefix + for head in heads: + width = len (head) + for row in rows: + text = row[col] + if text.__class__ == str: + text = text.decode('utf-8') + cellWidth = len(unicode(text)) + if cellWidth > width: + width = cellWidth + colWidth.append (width + self.tableSpacing) + line = line + head + if col < len (heads) - 1: + for i in range (colWidth[col] - len (head)): + line = line + " " + col = col + 1 + print line + line = self.tablePrefix + for width in colWidth: + line = line + "=" * width + line = line[:255] + print line + + for row in rows: + line = self.tablePrefix + col = 0 + for width in colWidth: + text = row[col] + if text.__class__ == str: + text = text.decode('utf-8') + line = line + unicode(text) + if col < len (heads) - 1: + for i in range (width - len(unicode(text))): + line = line + " " + col = col + 1 + print line + + def do_setTimeFormat (self, fmt): + """ Select timestamp format """ + if fmt == "long": + self.timestampFormat = "%c" + elif fmt == "short": + self.timestampFormat = "%X" + + def timestamp (self, nsec): + """ Format a nanosecond-since-the-epoch timestamp for printing """ + return strftime (self.timestampFormat, gmtime (nsec / 1000000000)) + + def duration(self, nsec): + if nsec < 0: nsec = 0 + sec = nsec / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + +class Sortable: + """ """ + def __init__(self, row, sortIndex): + self.row = row + self.sortIndex = sortIndex + if sortIndex >= len(row): + raise Exception("sort index exceeds row boundary") + + def __cmp__(self, other): + return cmp(self.row[self.sortIndex], other.row[self.sortIndex]) + + def getRow(self): + return self.row + +class Sorter: + """ """ + def __init__(self, heads, rows, sortCol, limit=0, inc=True): + col = 0 + for head in heads: + if head.text == sortCol: + break + col += 1 + if col == len(heads): + raise Exception("sortCol '%s', not found in headers" % sortCol) + + list = [] + for row in rows: + list.append(Sortable(row, col)) + list.sort() + if not inc: + list.reverse() + count = 0 + self.sorted = [] + for row in list: + self.sorted.append(row.getRow()) + count += 1 + if count == limit: + break + + def getSorted(self): + return self.sorted |