diff options
| author | Ted Ross <tross@apache.org> | 2012-02-09 21:11:41 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2012-02-09 21:11:41 +0000 |
| commit | 192126471686e72d7b59ef9923458fcefe6847a2 (patch) | |
| tree | e0a652a61c5bae9dd2b7f26d847a4049f0ed7693 /qpid/tools | |
| parent | 19a3076040f4d144e604f825b59e48ab27524440 (diff) | |
| download | qpid-python-192126471686e72d7b59ef9923458fcefe6847a2.tar.gz | |
QPID-3824 - Additional queue statistics, posix memory statistics, and broker-scope statistics
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242526 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tools')
| -rwxr-xr-x | qpid/tools/setup.py | 19 | ||||
| -rwxr-xr-x | qpid/tools/src/py/qpid-stat | 452 | ||||
| -rw-r--r-- | qpid/tools/src/py/qpidtoollibs/__init__.py | 18 | ||||
| -rw-r--r-- | qpid/tools/src/py/qpidtoollibs/broker.py | 322 | ||||
| -rw-r--r-- | qpid/tools/src/py/qpidtoollibs/disp.py | 249 |
5 files changed, 797 insertions, 263 deletions
diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py index feae4bb1bd..b04bb65c87 100755 --- a/qpid/tools/setup.py +++ b/qpid/tools/setup.py @@ -23,15 +23,16 @@ setup(name="qpid-tools", version="0.15", author="Apache Qpid", author_email="dev@qpid.apache.org", - scripts=["src/py/qpid-cluster", - "src/py/qpid-cluster-store", - "src/py/qpid-config", - "src/py/qpid-printevents", - "src/py/qpid-queue-stats", - "src/py/qpid-route", - "src/py/qpid-stat", - "src/py/qpid-tool", - "src/py/qmf-tool"], + packages=["qpidtoollibs"], + scripts=["qpid-cluster", + "qpid-cluster-store", + "qpid-config", + "qpid-printevents", + "qpid-queue-stats", + "qpid-route", + "qpid-stat", + "qpid-tool", + "qmf-tool"], url="http://qpid.apache.org/", license="Apache Software License", description="Diagnostic and management tools for Apache Qpid brokers.") diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat index a7272da3f1..bb094554e6 100755 --- a/qpid/tools/src/py/qpid-stat +++ b/qpid/tools/src/py/qpid-stat @@ -21,13 +21,18 @@ import os from optparse import OptionParser, OptionGroup -from time import sleep ### debug import sys import locale import socket import re -from qmf.console import Session, Console -from qpid.disp import Display, Header, Sorter +from qpid.messaging import Connection + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpidtoollibs.broker import BrokerAgent +from qpidtoollibs.disp import Display, Header, Sorter + class Config: def __init__(self): @@ -37,7 +42,7 @@ class Config: self._limit = 50 self._increasing = False self._sortcol = None - self._cluster_detail = False + self._details = None self._sasl_mechanism = None config = Config() @@ -56,24 +61,16 @@ def OptionsAndArguments(argv): parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") - group2.add_option("-b", "--broker", help="Show Brokers", - action="store_const", const="b", dest="show") - group2.add_option("-c", "--connections", help="Show Connections", - action="store_const", const="c", dest="show") - group2.add_option("-e", "--exchanges", help="Show Exchanges", - action="store_const", const="e", dest="show") - group2.add_option("-q", "--queues", help="Show Queues", - action="store_const", const="q", dest="show") - group2.add_option("-u", "--subscriptions", help="Show Subscriptions", - action="store_const", const="u", dest="show") - group2.add_option("-S", "--sort-by", metavar="<colname>", - help="Sort by column name") - group2.add_option("-I", "--increasing", action="store_true", default=False, - help="Sort by increasing value (default = decreasing)") - group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", - help="Limit output to n rows") - group2.add_option("-C", "--cluster", action="store_true", default=False, - help="Display per-broker cluster detail.") + group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") + group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") + group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") + group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") + group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") + group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") + group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") + group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.") parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) @@ -86,8 +83,8 @@ def OptionsAndArguments(argv): config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit - config._cluster_detail = opts.cluster config._sasl_mechanism = opts.sasl_mechanism + config._detail = opts.detail if args: config._host = args[0] @@ -119,86 +116,26 @@ class IpAddr: bestAddr = addrPort return bestAddr -class Broker(object): - def __init__(self, qmf, broker): - self.broker = broker - - agents = qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a - - bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] - self.currentTime = bobj.getTimestamps()[0] - try: - self.uptime = bobj.uptime - except: - self.uptime = 0 - self.connections = {} - self.sessions = {} - self.exchanges = {} - self.queues = {} - self.subscriptions = {} - package = "org.apache.qpid.broker" - - list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) - for conn in list: - if not conn.shadow: - self.connections[conn.getObjectId()] = conn - - list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) - for sess in list: - if sess.connectionRef in self.connections: - self.sessions[sess.getObjectId()] = sess - - list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) - for exchange in list: - self.exchanges[exchange.getObjectId()] = exchange - - list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) - for queue in list: - self.queues[queue.getObjectId()] = queue - - list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent) - for subscription in list: - self.subscriptions[subscription.getObjectId()] = subscription - - def getName(self): - return self.broker.getUrl() - - def getCurrentTime(self): - return self.currentTime - - def getUptime(self): - return self.uptime - -class BrokerManager(Console): +class BrokerManager: def __init__(self): - self.brokerName = None - self.qmf = None - self.broker = None - self.brokers = [] - self.cluster = None + self.brokerName = None + self.connections = [] + self.brokers = [] + self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.qmf = Session() - self.mechanism = mechanism - self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a + self.connections.append(Connection(self.url, sasl_mechanism=mechanism)) + self.connections[0].open() + self.brokers.append(BrokerAgent(self.connections[0])) def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: - if self.broker: - self.qmf.delBroker(self.broker) - else: - for b in self.brokers: self.qmf.delBroker(b.broker) + for conn in self.connections: + conn.close() except: pass @@ -238,62 +175,63 @@ class BrokerManager(Console): hosts.append(bestUrl) return hosts - def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): - if len(subs) == 0: - return - this = subs[0] - remaining = subs[1:] - newindent = indent + " " - if this == 'b': - pass - elif this == 'c': - if broker: - for oid in broker.connections: - iconn = broker.connections[oid] - self.printConnSub(indent, broker.getName(), iconn) - self.displaySubs(remaining, newindent, broker=broker, conn=iconn, - sess=sess, exchange=exchange, queue=queue) - elif this == 's': - pass - elif this == 'e': - pass - elif this == 'q': - pass - print - def displayBroker(self, subs): disp = Display(prefix=" ") heads = [] - heads.append(Header('broker')) - heads.append(Header('cluster')) heads.append(Header('uptime', Header.DURATION)) - heads.append(Header('conn', Header.KMG)) - heads.append(Header('sess', Header.KMG)) - heads.append(Header('exch', Header.KMG)) - heads.append(Header('queue', Header.KMG)) + heads.append(Header('connections', Header.COMMAS)) + heads.append(Header('sessions', Header.COMMAS)) + heads.append(Header('exchanges', Header.COMMAS)) + heads.append(Header('queues', Header.COMMAS)) rows = [] - for broker in self.brokers: - if self.cluster: - ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) - else: - ctext = "<standalone>" - row = (broker.getName(), ctext, broker.getUptime(), - len(broker.connections), len(broker.sessions), - len(broker.exchanges), len(broker.queues)) - rows.append(row) - title = "Brokers" - if config._sortcol: - sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) - dispRows = sorter.getSorted() - else: - dispRows = rows - disp.formattedTable(title, heads, dispRows) + broker = self.brokers[0].getBroker() + connections = self.getConnectionMap() + sessions = self.getSessionMap() + exchanges = self.getExchangeMap() + queues = self.getQueueMap() + row = (broker.getUpdateTime() - broker.getCreateTime(), + len(connections), len(sessions), + len(exchanges), len(queues)) + rows.append(row) + disp.formattedTable('Broker Summary:', heads, rows) + + if 'queueCount' not in broker.values: + return + + print + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', broker.msgDepth, broker.byteDepth]) + rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues]) + rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues]) + rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues]) + rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues]) + rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues]) + rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues]) + rows.append(['acquires', broker.acquires, None]) + rows.append(['releases', broker.releases, None]) + rows.append(['discards-no-route', broker.discardsNoRoute, None]) + rows.append(['discards-ttl-expired', broker.discardsTtl, None]) + rows.append(['discards-limit-overflow', broker.discardsOverflow, None]) + rows.append(['discards-ring-overflow', broker.discardsRing, None]) + rows.append(['discards-lvq-replace', broker.discardsLvq, None]) + rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None]) + rows.append(['discards-purged', broker.discardsPurge, None]) + rows.append(['reroutes', broker.reroutes, None]) + rows.append(['abandoned', broker.abandoned, None]) + rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None]) + disp.formattedTable('Aggregate Broker Statistics:', heads, rows) + def displayConn(self, subs): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header('client-addr')) heads.append(Header('cproc')) heads.append(Header('cpid')) @@ -303,25 +241,20 @@ class BrokerManager(Console): heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.connections: - conn = broker.connections[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(conn.address) - row.append(conn.remoteProcessName) - row.append(conn.remotePid) - row.append(conn.authIdentity) - row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) - idle = broker.getCurrentTime() - conn.getTimestamps()[0] - row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) - row.append(conn.msgsFromClient) - row.append(conn.msgsToClient) - rows.append(row) + connections = self.brokers[0].getAllConnections() + broker = self.brokers[0].getBroker() + for conn in connections: + row = [] + row.append(conn.address) + row.append(conn.remoteProcessName) + row.append(conn.remotePid) + row.append(conn.authIdentity) + row.append(broker.getUpdateTime() - conn.getCreateTime()) + row.append(broker.getUpdateTime() - conn.getUpdateTime()) + row.append(conn.msgsFromClient) + row.append(conn.msgsToClient) + rows.append(row) title = "Connections" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -335,8 +268,6 @@ class BrokerManager(Console): def displayExchange(self, subs): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header("exchange")) heads.append(Header("type")) heads.append(Header("dur", Header.Y)) @@ -348,26 +279,21 @@ class BrokerManager(Console): heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.exchanges: - ex = broker.exchanges[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(ex.name) - row.append(ex.type) - row.append(ex.durable) - row.append(ex.bindingCount) - row.append(ex.msgReceives) - row.append(ex.msgRoutes) - row.append(ex.msgDrops) - row.append(ex.byteReceives) - row.append(ex.byteRoutes) - row.append(ex.byteDrops) - rows.append(row) + exchanges = self.brokers[0].getAllExchanges() + for ex in exchanges: + row = [] + row.append(ex.name) + row.append(ex.type) + row.append(ex.durable) + row.append(ex.bindingCount) + row.append(ex.msgReceives) + row.append(ex.msgRoutes) + row.append(ex.msgDrops) + row.append(ex.byteReceives) + row.append(ex.byteRoutes) + row.append(ex.byteDrops) + rows.append(row) title = "Exchanges" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -375,11 +301,9 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueue(self, subs): + def displayQueues(self, subs): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header("queue")) heads.append(Header("dur", Header.Y)) heads.append(Header("autoDel", Header.Y)) @@ -393,28 +317,23 @@ class BrokerManager(Console): heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.queues: - q = broker.queues[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(q.name) - row.append(q.durable) - row.append(q.autoDelete) - row.append(q.exclusive) - row.append(q.msgDepth) - row.append(q.msgTotalEnqueues) - row.append(q.msgTotalDequeues) - row.append(q.byteDepth) - row.append(q.byteTotalEnqueues) - row.append(q.byteTotalDequeues) - row.append(q.consumerCount) - row.append(q.bindingCount) - rows.append(row) + queues = self.brokers[0].getAllQueues() + for q in queues: + row = [] + row.append(q.name) + row.append(q.durable) + row.append(q.autoDelete) + row.append(q.exclusive) + row.append(q.msgDepth) + row.append(q.msgTotalEnqueues) + row.append(q.msgTotalDequeues) + row.append(q.byteDepth) + row.append(q.byteTotalEnqueues) + row.append(q.byteTotalDequeues) + row.append(q.consumerCount) + row.append(q.bindingCount) + rows.append(row) title = "Queues" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -422,46 +341,46 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) + def displayQueue(self, subs): + disp = Display(prefix=" ") + heads = [] + def displaySubscriptions(self, subs): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) - heads.append(Header("subscription")) + heads.append(Header("subscr")) heads.append(Header("queue")) - heads.append(Header("connection")) - heads.append(Header("processName")) - heads.append(Header("processId")) - heads.append(Header("browsing", Header.Y)) - heads.append(Header("acknowledged", Header.Y)) - heads.append(Header("exclusive", Header.Y)) + heads.append(Header("conn")) + heads.append(Header("procName")) + heads.append(Header("procId")) + heads.append(Header("browse", Header.Y)) + heads.append(Header("acked", Header.Y)) + heads.append(Header("excl", Header.Y)) heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.subscriptions: - s = broker.subscriptions[oid] - row = [] - try: - if self.cluster: - row.append(broker.getName()) - row.append(s.name) - row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name) - connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address) - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName) - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid) - row.append(s.browsing) - row.append(s.acknowledged) - row.append(s.exclusive) - row.append(s.creditMode) - row.append(s.delivered) - rows.append(row) - except: - pass + subscriptions = self.brokers[0].getAllSubscriptions() + sessions = self.getSessionMap() + connections = self.getConnectionMap() + for s in subscriptions: + row = [] + try: + row.append(s.name) + row.append(s.queueRef) + session = sessions[s.sessionRef] + connection = connections[session.connectionRef] + row.append(connection.address) + row.append(connection.remoteProcessName) + row.append(connection.remotePid) + row.append(s.browsing) + row.append(s.acknowledged) + row.append(s.exclusive) + row.append(s.creditMode) + row.append(s.delivered) + rows.append(row) + except: + pass title = "Subscriptions" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -469,33 +388,58 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) + def displayMemory(self, unused): + disp = Display(prefix=" ") + heads = [Header('Statistic'), Header('Value', Header.COMMAS)] + rows = [] + memory = self.brokers[0].getMemory() + for k,v in memory.values.items(): + if k != 'name': + rows.append([k, v]) + disp.formattedTable('Broker Memory Statistics:', heads, rows) + + def getExchangeMap(self): + exchanges = self.brokers[0].getAllExchanges() + emap = {} + for e in exchanges: + emap[e.name] = e + return emap + + def getQueueMap(self): + queues = self.brokers[0].getAllQueues() + qmap = {} + for q in queues: + qmap[q.name] = q + return qmap + + def getSessionMap(self): + sessions = self.brokers[0].getAllSessions() + smap = {} + for s in sessions: + smap[s.name] = s + return smap + + def getConnectionMap(self): + connections = self.brokers[0].getAllConnections() + cmap = {} + for c in connections: + cmap[c.address] = c + return cmap + def displayMain(self, main, subs): if main == 'b': self.displayBroker(subs) elif main == 'c': self.displayConn(subs) elif main == 's': self.displaySession(subs) elif main == 'e': self.displayExchange(subs) - elif main == 'q': self.displayQueue(subs) + elif main == 'q': + if config._detail: + self.displayQueue(subs, config._detail) + else: + self.displayQueues(subs) elif main == 'u': self.displaySubscriptions(subs) + elif main == 'm': self.displayMemory(subs) def display(self): - if config._cluster_detail or config._types[0] == 'b': - # always show cluster detail when dumping broker stats - self._getCluster() - if self.cluster: - memberList = self.cluster.members.split(";") - hostList = self._getHostList(memberList) - self.qmf.delBroker(self.broker) - self.broker = None - if config._host.find("@") > 0: - authString = config._host.split("@")[0] + "@" - else: - authString = "" - for host in hostList: - b = self.qmf.addBroker(authString + host, config._connTimeout) - self.brokers.append(Broker(self.qmf, b)) - else: - self.brokers.append(Broker(self.qmf, self.broker)) - self.displayMain(config._types[0], config._types[1:]) diff --git a/qpid/tools/src/py/qpidtoollibs/__init__.py b/qpid/tools/src/py/qpidtoollibs/__init__.py new file mode 100644 index 0000000000..31d5a2ef58 --- /dev/null +++ b/qpid/tools/src/py/qpidtoollibs/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py new file mode 100644 index 0000000000..366d9b0663 --- /dev/null +++ b/qpid/tools/src/py/qpidtoollibs/broker.py @@ -0,0 +1,322 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import Message +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +class BrokerAgent(object): + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def close(self): + self.sess.close() + + def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(timeout) + self.sess.acknowledge() + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def _sendRequest(self, opcode, content): + props = {'method' : 'request', + 'qmf.opcode' : opcode, + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + return correlator + + def _doClassQuery(self, class_name): + query = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + return items + + def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): + query = {'_what' : 'OBJECT', + '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + if len(items) == 1: + return items[0] + return None + + def _getAllBrokerObjects(self, cls): + items = self._doClassQuery(cls.__name__.lower()) + objs = [] + for item in items: + objs.append(cls(self, item)) + return objs + + def _getBrokerObject(self, cls, name): + obj = self._doNameQuery(cls.__name__.lower(), name) + if obj: + return cls(self, obj) + return None + + def getCluster(self): + return self._getAllBrokerObjects(Cluster) + + def getBroker(self): + return self._getBrokerObject(Broker, "amqp-broker") + + def getMemory(self): + return self._getAllBrokerObjects(Memory)[0] + + def getAllConnections(self): + return self._getAllBrokerObjects(Connection) + + def getConnection(self, name): + return self._getBrokerObject(Connection, name) + + def getAllSessions(self): + return self._getAllBrokerObjects(Session) + + def getSession(self, name): + return self._getBrokerObject(Session, name) + + def getAllSubscriptions(self): + return self._getAllBrokerObjects(Subscription) + + def getSubscription(self, name): + return self._getBrokerObject(Subscription, name) + + def getAllExchanges(self): + return self._getAllBrokerObjects(Exchange) + + def getExchange(self, name): + return self._getBrokerObject(Exchange, name) + + def getAllQueues(self): + return self._getAllBrokerObjects(Queue) + + def getQueue(self, name): + return self._getBrokerObject(Queue, name) + + def getAllBindings(self): + return self._getAllBrokerObjects(Binding) + + def getBinding(self, exchange=None, queue=None): + pass + + def echo(self, sequence, body): + """Request a response to test the path to the management broker""" + pass + + def connect(self, host, port, durable, authMechanism, username, password, transport): + """Establish a connection to another broker""" + pass + + def queueMoveMessages(self, srcQueue, destQueue, qty): + """Move messages from one queue to another""" + pass + + def setLogLevel(self, level): + """Set the log level""" + pass + + def getLogLevel(self): + """Get the log level""" + pass + + def setTimestampConfig(self, receive): + """Set the message timestamping configuration""" + pass + + def getTimestampConfig(self): + """Get the message timestamping configuration""" + pass + +# def addExchange(self, exchange_type, name, **kwargs): +# pass + +# def delExchange(self, name): +# pass + +# def addQueue(self, name, **kwargs): +# pass + +# def delQueue(self, name): +# pass + +# def bind(self, exchange, queue, key, **kwargs): +# pass + +# def unbind(self, exchange, queue, key, **kwargs): +# pass + + def create(self, _type, name, properties, strict): + """Create an object of the specified type""" + pass + + def delete(self, _type, name, options): + """Delete an object of the specified type""" + pass + + def query(self, _type, name): + """Query the current state of an object""" + return self._getBrokerObject(self, _type, name) + + +class BrokerObject(object): + def __init__(self, broker, content): + self.broker = broker + self.content = content + self.values = content['_values'] + + def __getattr__(self, key): + if key not in self.values: + return None + value = self.values[key] + if value.__class__ == dict and '_object_name' in value: + full_name = value['_object_name'] + colon = full_name.find(':') + if colon > 0: + full_name = full_name[colon+1:] + colon = full_name.find(':') + if colon > 0: + return full_name[colon+1:] + return value + + def getAttributes(self): + return self.values + + def getCreateTime(self): + return self.content['_create_ts'] + + def getDeleteTime(self): + return self.content['_delete_ts'] + + def getUpdateTime(self): + return self.content['_update_ts'] + + def update(self): + """ + Reload the property values from the agent. + """ + refreshed = self.broker._getBrokerObject(self.__class__, self.name) + if refreshed: + self.content = refreshed.content + self.values = self.content['_values'] + else: + raise Exception("No longer exists on the broker") + +class Broker(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Memory(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Connection(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def close(self): + pass + +class Session(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Subscription(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def __repr__(self): + return "subscription name undefined" + +class Exchange(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Binding(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def __repr__(self): + return "Binding key: %s" % self.values['bindingKey'] + +class Queue(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def purge(self, request): + """Discard all or some messages on a queue""" + self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) + + def reroute(self, request, useAltExchange, exchange, filter={}): + """Remove all or some messages on this queue and route them to an exchange""" + self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, + "org.apache.qpid.broker:queue:%s" % self.name) + diff --git a/qpid/tools/src/py/qpidtoollibs/disp.py b/qpid/tools/src/py/qpidtoollibs/disp.py new file mode 100644 index 0000000000..cb7d3da306 --- /dev/null +++ b/qpid/tools/src/py/qpidtoollibs/disp.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from time import strftime, gmtime + +class Header: + """ """ + NONE = 1 + KMG = 2 + YN = 3 + Y = 4 + TIME_LONG = 5 + TIME_SHORT = 6 + DURATION = 7 + COMMAS = 8 + + def __init__(self, text, format=NONE): + self.text = text + self.format = format + + def __repr__(self): + return self.text + + def __str__(self): + return self.text + + def formatted(self, value): + try: + if value == None: + return '' + if self.format == Header.NONE: + return value + if self.format == Header.KMG: + return self.num(value) + if self.format == Header.YN: + if value: + return 'Y' + return 'N' + if self.format == Header.Y: + if value: + return 'Y' + return '' + if self.format == Header.TIME_LONG: + return strftime("%c", gmtime(value / 1000000000)) + if self.format == Header.TIME_SHORT: + return strftime("%X", gmtime(value / 1000000000)) + if self.format == Header.DURATION: + if value < 0: value = 0 + sec = value / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + if self.format == Header.COMMAS: + sval = str(value) + result = "" + while True: + if len(sval) == 0: + return result + left = sval[:-3] + right = sval[-3:] + result = right + result + if len(left) > 0: + result = ',' + result + sval = left + except: + return "?" + + def numCell(self, value, tag): + fp = float(value) / 1000. + if fp < 10.0: + return "%1.2f%c" % (fp, tag) + if fp < 100.0: + return "%2.1f%c" % (fp, tag) + return "%4d%c" % (value / 1000, tag) + + def num(self, value): + if value < 1000: + return "%4d" % value + if value < 1000000: + return self.numCell(value, 'k') + value /= 1000 + if value < 1000000: + return self.numCell(value, 'm') + value /= 1000 + return self.numCell(value, 'g') + + +class Display: + """ Display formatting for QPID Management CLI """ + + def __init__(self, spacing=2, prefix=" "): + self.tableSpacing = spacing + self.tablePrefix = prefix + self.timestampFormat = "%X" + + def formattedTable(self, title, heads, rows): + fRows = [] + for row in rows: + fRow = [] + col = 0 + for cell in row: + fRow.append(heads[col].formatted(cell)) + col += 1 + fRows.append(fRow) + headtext = [] + for head in heads: + headtext.append(head.text) + self.table(title, headtext, fRows) + + def table(self, title, heads, rows): + """ Print a table with autosized columns """ + + # Pad the rows to the number of heads + for row in rows: + diff = len(heads) - len(row) + for idx in range(diff): + row.append("") + + print title + if len (rows) == 0: + return + colWidth = [] + col = 0 + line = self.tablePrefix + for head in heads: + width = len (head) + for row in rows: + cellWidth = len (unicode (row[col])) + if cellWidth > width: + width = cellWidth + colWidth.append (width + self.tableSpacing) + line = line + head + if col < len (heads) - 1: + for i in range (colWidth[col] - len (head)): + line = line + " " + col = col + 1 + print line + line = self.tablePrefix + for width in colWidth: + for i in range (width): + line = line + "=" + print line + + for row in rows: + line = self.tablePrefix + col = 0 + for width in colWidth: + line = line + unicode (row[col]) + if col < len (heads) - 1: + for i in range (width - len (unicode (row[col]))): + line = line + " " + col = col + 1 + print line + + def do_setTimeFormat (self, fmt): + """ Select timestamp format """ + if fmt == "long": + self.timestampFormat = "%c" + elif fmt == "short": + self.timestampFormat = "%X" + + def timestamp (self, nsec): + """ Format a nanosecond-since-the-epoch timestamp for printing """ + return strftime (self.timestampFormat, gmtime (nsec / 1000000000)) + + def duration(self, nsec): + if nsec < 0: nsec = 0 + sec = nsec / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + +class Sortable: + """ """ + def __init__(self, row, sortIndex): + self.row = row + self.sortIndex = sortIndex + if sortIndex >= len(row): + raise Exception("sort index exceeds row boundary") + + def __cmp__(self, other): + return cmp(self.row[self.sortIndex], other.row[self.sortIndex]) + + def getRow(self): + return self.row + +class Sorter: + """ """ + def __init__(self, heads, rows, sortCol, limit=0, inc=True): + col = 0 + for head in heads: + if head.text == sortCol: + break + col += 1 + if col == len(heads): + raise Exception("sortCol '%s', not found in headers" % sortCol) + + list = [] + for row in rows: + list.append(Sortable(row, col)) + list.sort() + if not inc: + list.reverse() + count = 0 + self.sorted = [] + for row in list: + self.sorted.append(row.getRow()) + count += 1 + if count == limit: + break + + def getSorted(self): + return self.sorted |
