summaryrefslogtreecommitdiff
path: root/qpid/tools
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2012-02-09 21:11:41 +0000
committerTed Ross <tross@apache.org>2012-02-09 21:11:41 +0000
commit192126471686e72d7b59ef9923458fcefe6847a2 (patch)
treee0a652a61c5bae9dd2b7f26d847a4049f0ed7693 /qpid/tools
parent19a3076040f4d144e604f825b59e48ab27524440 (diff)
downloadqpid-python-192126471686e72d7b59ef9923458fcefe6847a2.tar.gz
QPID-3824 - Additional queue statistics, posix memory statistics, and broker-scope statistics
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242526 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tools')
-rwxr-xr-xqpid/tools/setup.py19
-rwxr-xr-xqpid/tools/src/py/qpid-stat452
-rw-r--r--qpid/tools/src/py/qpidtoollibs/__init__.py18
-rw-r--r--qpid/tools/src/py/qpidtoollibs/broker.py322
-rw-r--r--qpid/tools/src/py/qpidtoollibs/disp.py249
5 files changed, 797 insertions, 263 deletions
diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py
index feae4bb1bd..b04bb65c87 100755
--- a/qpid/tools/setup.py
+++ b/qpid/tools/setup.py
@@ -23,15 +23,16 @@ setup(name="qpid-tools",
version="0.15",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
- scripts=["src/py/qpid-cluster",
- "src/py/qpid-cluster-store",
- "src/py/qpid-config",
- "src/py/qpid-printevents",
- "src/py/qpid-queue-stats",
- "src/py/qpid-route",
- "src/py/qpid-stat",
- "src/py/qpid-tool",
- "src/py/qmf-tool"],
+ packages=["qpidtoollibs"],
+ scripts=["qpid-cluster",
+ "qpid-cluster-store",
+ "qpid-config",
+ "qpid-printevents",
+ "qpid-queue-stats",
+ "qpid-route",
+ "qpid-stat",
+ "qpid-tool",
+ "qmf-tool"],
url="http://qpid.apache.org/",
license="Apache Software License",
description="Diagnostic and management tools for Apache Qpid brokers.")
diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat
index a7272da3f1..bb094554e6 100755
--- a/qpid/tools/src/py/qpid-stat
+++ b/qpid/tools/src/py/qpid-stat
@@ -21,13 +21,18 @@
import os
from optparse import OptionParser, OptionGroup
-from time import sleep ### debug
import sys
import locale
import socket
import re
-from qmf.console import Session, Console
-from qpid.disp import Display, Header, Sorter
+from qpid.messaging import Connection
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs.broker import BrokerAgent
+from qpidtoollibs.disp import Display, Header, Sorter
+
class Config:
def __init__(self):
@@ -37,7 +42,7 @@ class Config:
self._limit = 50
self._increasing = False
self._sortcol = None
- self._cluster_detail = False
+ self._details = None
self._sasl_mechanism = None
config = Config()
@@ -56,24 +61,16 @@ def OptionsAndArguments(argv):
parser.add_option_group(group1)
group2 = OptionGroup(parser, "Display Options")
- group2.add_option("-b", "--broker", help="Show Brokers",
- action="store_const", const="b", dest="show")
- group2.add_option("-c", "--connections", help="Show Connections",
- action="store_const", const="c", dest="show")
- group2.add_option("-e", "--exchanges", help="Show Exchanges",
- action="store_const", const="e", dest="show")
- group2.add_option("-q", "--queues", help="Show Queues",
- action="store_const", const="q", dest="show")
- group2.add_option("-u", "--subscriptions", help="Show Subscriptions",
- action="store_const", const="u", dest="show")
- group2.add_option("-S", "--sort-by", metavar="<colname>",
- help="Sort by column name")
- group2.add_option("-I", "--increasing", action="store_true", default=False,
- help="Sort by increasing value (default = decreasing)")
- group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>",
- help="Limit output to n rows")
- group2.add_option("-C", "--cluster", action="store_true", default=False,
- help="Display per-broker cluster detail.")
+ group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show")
+ group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show")
+ group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show")
+ group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show")
+ group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show")
+ group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show")
+ group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name")
+ group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)")
+ group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows")
+ group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.")
parser.add_option_group(group2)
opts, args = parser.parse_args(args=argv)
@@ -86,8 +83,8 @@ def OptionsAndArguments(argv):
config._connTimeout = opts.timeout
config._increasing = opts.increasing
config._limit = opts.limit
- config._cluster_detail = opts.cluster
config._sasl_mechanism = opts.sasl_mechanism
+ config._detail = opts.detail
if args:
config._host = args[0]
@@ -119,86 +116,26 @@ class IpAddr:
bestAddr = addrPort
return bestAddr
-class Broker(object):
- def __init__(self, qmf, broker):
- self.broker = broker
-
- agents = qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == '0':
- self.brokerAgent = a
-
- bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
- self.currentTime = bobj.getTimestamps()[0]
- try:
- self.uptime = bobj.uptime
- except:
- self.uptime = 0
- self.connections = {}
- self.sessions = {}
- self.exchanges = {}
- self.queues = {}
- self.subscriptions = {}
- package = "org.apache.qpid.broker"
-
- list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
- for conn in list:
- if not conn.shadow:
- self.connections[conn.getObjectId()] = conn
-
- list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
- for sess in list:
- if sess.connectionRef in self.connections:
- self.sessions[sess.getObjectId()] = sess
-
- list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
- for exchange in list:
- self.exchanges[exchange.getObjectId()] = exchange
-
- list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
- for queue in list:
- self.queues[queue.getObjectId()] = queue
-
- list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent)
- for subscription in list:
- self.subscriptions[subscription.getObjectId()] = subscription
-
- def getName(self):
- return self.broker.getUrl()
-
- def getCurrentTime(self):
- return self.currentTime
-
- def getUptime(self):
- return self.uptime
-
-class BrokerManager(Console):
+class BrokerManager:
def __init__(self):
- self.brokerName = None
- self.qmf = None
- self.broker = None
- self.brokers = []
- self.cluster = None
+ self.brokerName = None
+ self.connections = []
+ self.brokers = []
+ self.cluster = None
def SetBroker(self, brokerUrl, mechanism):
self.url = brokerUrl
- self.qmf = Session()
- self.mechanism = mechanism
- self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism)
- agents = self.qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == '0':
- self.brokerAgent = a
+ self.connections.append(Connection(self.url, sasl_mechanism=mechanism))
+ self.connections[0].open()
+ self.brokers.append(BrokerAgent(self.connections[0]))
def Disconnect(self):
""" Release any allocated brokers. Ignore any failures as the tool is
shutting down.
"""
try:
- if self.broker:
- self.qmf.delBroker(self.broker)
- else:
- for b in self.brokers: self.qmf.delBroker(b.broker)
+ for conn in self.connections:
+ conn.close()
except:
pass
@@ -238,62 +175,63 @@ class BrokerManager(Console):
hosts.append(bestUrl)
return hosts
- def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
- if len(subs) == 0:
- return
- this = subs[0]
- remaining = subs[1:]
- newindent = indent + " "
- if this == 'b':
- pass
- elif this == 'c':
- if broker:
- for oid in broker.connections:
- iconn = broker.connections[oid]
- self.printConnSub(indent, broker.getName(), iconn)
- self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
- sess=sess, exchange=exchange, queue=queue)
- elif this == 's':
- pass
- elif this == 'e':
- pass
- elif this == 'q':
- pass
- print
-
def displayBroker(self, subs):
disp = Display(prefix=" ")
heads = []
- heads.append(Header('broker'))
- heads.append(Header('cluster'))
heads.append(Header('uptime', Header.DURATION))
- heads.append(Header('conn', Header.KMG))
- heads.append(Header('sess', Header.KMG))
- heads.append(Header('exch', Header.KMG))
- heads.append(Header('queue', Header.KMG))
+ heads.append(Header('connections', Header.COMMAS))
+ heads.append(Header('sessions', Header.COMMAS))
+ heads.append(Header('exchanges', Header.COMMAS))
+ heads.append(Header('queues', Header.COMMAS))
rows = []
- for broker in self.brokers:
- if self.cluster:
- ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
- else:
- ctext = "<standalone>"
- row = (broker.getName(), ctext, broker.getUptime(),
- len(broker.connections), len(broker.sessions),
- len(broker.exchanges), len(broker.queues))
- rows.append(row)
- title = "Brokers"
- if config._sortcol:
- sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
- dispRows = sorter.getSorted()
- else:
- dispRows = rows
- disp.formattedTable(title, heads, dispRows)
+ broker = self.brokers[0].getBroker()
+ connections = self.getConnectionMap()
+ sessions = self.getSessionMap()
+ exchanges = self.getExchangeMap()
+ queues = self.getQueueMap()
+ row = (broker.getUpdateTime() - broker.getCreateTime(),
+ len(connections), len(sessions),
+ len(exchanges), len(queues))
+ rows.append(row)
+ disp.formattedTable('Broker Summary:', heads, rows)
+
+ if 'queueCount' not in broker.values:
+ return
+
+ print
+ heads = []
+ heads.append(Header('Statistic'))
+ heads.append(Header('Messages', Header.COMMAS))
+ heads.append(Header('Bytes', Header.COMMAS))
+ rows = []
+ rows.append(['queue-depth', broker.msgDepth, broker.byteDepth])
+ rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues])
+ rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues])
+ rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues])
+ rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues])
+ rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues])
+ rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues])
+ rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth])
+ rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues])
+ rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues])
+ rows.append(['acquires', broker.acquires, None])
+ rows.append(['releases', broker.releases, None])
+ rows.append(['discards-no-route', broker.discardsNoRoute, None])
+ rows.append(['discards-ttl-expired', broker.discardsTtl, None])
+ rows.append(['discards-limit-overflow', broker.discardsOverflow, None])
+ rows.append(['discards-ring-overflow', broker.discardsRing, None])
+ rows.append(['discards-lvq-replace', broker.discardsLvq, None])
+ rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None])
+ rows.append(['discards-purged', broker.discardsPurge, None])
+ rows.append(['reroutes', broker.reroutes, None])
+ rows.append(['abandoned', broker.abandoned, None])
+ rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None])
+ disp.formattedTable('Aggregate Broker Statistics:', heads, rows)
+
def displayConn(self, subs):
disp = Display(prefix=" ")
heads = []
- if self.cluster:
- heads.append(Header('broker'))
heads.append(Header('client-addr'))
heads.append(Header('cproc'))
heads.append(Header('cpid'))
@@ -303,25 +241,20 @@ class BrokerManager(Console):
heads.append(Header('msgIn', Header.KMG))
heads.append(Header('msgOut', Header.KMG))
rows = []
- for broker in self.brokers:
- for oid in broker.connections:
- conn = broker.connections[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(conn.address)
- row.append(conn.remoteProcessName)
- row.append(conn.remotePid)
- row.append(conn.authIdentity)
- row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
- idle = broker.getCurrentTime() - conn.getTimestamps()[0]
- row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
- row.append(conn.msgsFromClient)
- row.append(conn.msgsToClient)
- rows.append(row)
+ connections = self.brokers[0].getAllConnections()
+ broker = self.brokers[0].getBroker()
+ for conn in connections:
+ row = []
+ row.append(conn.address)
+ row.append(conn.remoteProcessName)
+ row.append(conn.remotePid)
+ row.append(conn.authIdentity)
+ row.append(broker.getUpdateTime() - conn.getCreateTime())
+ row.append(broker.getUpdateTime() - conn.getUpdateTime())
+ row.append(conn.msgsFromClient)
+ row.append(conn.msgsToClient)
+ rows.append(row)
title = "Connections"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
@@ -335,8 +268,6 @@ class BrokerManager(Console):
def displayExchange(self, subs):
disp = Display(prefix=" ")
heads = []
- if self.cluster:
- heads.append(Header('broker'))
heads.append(Header("exchange"))
heads.append(Header("type"))
heads.append(Header("dur", Header.Y))
@@ -348,26 +279,21 @@ class BrokerManager(Console):
heads.append(Header("byteOut", Header.KMG))
heads.append(Header("byteDrop", Header.KMG))
rows = []
- for broker in self.brokers:
- for oid in broker.exchanges:
- ex = broker.exchanges[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(ex.name)
- row.append(ex.type)
- row.append(ex.durable)
- row.append(ex.bindingCount)
- row.append(ex.msgReceives)
- row.append(ex.msgRoutes)
- row.append(ex.msgDrops)
- row.append(ex.byteReceives)
- row.append(ex.byteRoutes)
- row.append(ex.byteDrops)
- rows.append(row)
+ exchanges = self.brokers[0].getAllExchanges()
+ for ex in exchanges:
+ row = []
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(ex.durable)
+ row.append(ex.bindingCount)
+ row.append(ex.msgReceives)
+ row.append(ex.msgRoutes)
+ row.append(ex.msgDrops)
+ row.append(ex.byteReceives)
+ row.append(ex.byteRoutes)
+ row.append(ex.byteDrops)
+ rows.append(row)
title = "Exchanges"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
@@ -375,11 +301,9 @@ class BrokerManager(Console):
dispRows = rows
disp.formattedTable(title, heads, dispRows)
- def displayQueue(self, subs):
+ def displayQueues(self, subs):
disp = Display(prefix=" ")
heads = []
- if self.cluster:
- heads.append(Header('broker'))
heads.append(Header("queue"))
heads.append(Header("dur", Header.Y))
heads.append(Header("autoDel", Header.Y))
@@ -393,28 +317,23 @@ class BrokerManager(Console):
heads.append(Header("cons", Header.KMG))
heads.append(Header("bind", Header.KMG))
rows = []
- for broker in self.brokers:
- for oid in broker.queues:
- q = broker.queues[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(q.name)
- row.append(q.durable)
- row.append(q.autoDelete)
- row.append(q.exclusive)
- row.append(q.msgDepth)
- row.append(q.msgTotalEnqueues)
- row.append(q.msgTotalDequeues)
- row.append(q.byteDepth)
- row.append(q.byteTotalEnqueues)
- row.append(q.byteTotalDequeues)
- row.append(q.consumerCount)
- row.append(q.bindingCount)
- rows.append(row)
+ queues = self.brokers[0].getAllQueues()
+ for q in queues:
+ row = []
+ row.append(q.name)
+ row.append(q.durable)
+ row.append(q.autoDelete)
+ row.append(q.exclusive)
+ row.append(q.msgDepth)
+ row.append(q.msgTotalEnqueues)
+ row.append(q.msgTotalDequeues)
+ row.append(q.byteDepth)
+ row.append(q.byteTotalEnqueues)
+ row.append(q.byteTotalDequeues)
+ row.append(q.consumerCount)
+ row.append(q.bindingCount)
+ rows.append(row)
title = "Queues"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
@@ -422,46 +341,46 @@ class BrokerManager(Console):
dispRows = rows
disp.formattedTable(title, heads, dispRows)
+ def displayQueue(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+
def displaySubscriptions(self, subs):
disp = Display(prefix=" ")
heads = []
- if self.cluster:
- heads.append(Header('broker'))
- heads.append(Header("subscription"))
+ heads.append(Header("subscr"))
heads.append(Header("queue"))
- heads.append(Header("connection"))
- heads.append(Header("processName"))
- heads.append(Header("processId"))
- heads.append(Header("browsing", Header.Y))
- heads.append(Header("acknowledged", Header.Y))
- heads.append(Header("exclusive", Header.Y))
+ heads.append(Header("conn"))
+ heads.append(Header("procName"))
+ heads.append(Header("procId"))
+ heads.append(Header("browse", Header.Y))
+ heads.append(Header("acked", Header.Y))
+ heads.append(Header("excl", Header.Y))
heads.append(Header("creditMode"))
heads.append(Header("delivered", Header.KMG))
rows = []
- for broker in self.brokers:
- for oid in broker.subscriptions:
- s = broker.subscriptions[oid]
- row = []
- try:
- if self.cluster:
- row.append(broker.getName())
- row.append(s.name)
- row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name)
- connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef
- row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address)
- row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName)
- row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid)
- row.append(s.browsing)
- row.append(s.acknowledged)
- row.append(s.exclusive)
- row.append(s.creditMode)
- row.append(s.delivered)
- rows.append(row)
- except:
- pass
+ subscriptions = self.brokers[0].getAllSubscriptions()
+ sessions = self.getSessionMap()
+ connections = self.getConnectionMap()
+ for s in subscriptions:
+ row = []
+ try:
+ row.append(s.name)
+ row.append(s.queueRef)
+ session = sessions[s.sessionRef]
+ connection = connections[session.connectionRef]
+ row.append(connection.address)
+ row.append(connection.remoteProcessName)
+ row.append(connection.remotePid)
+ row.append(s.browsing)
+ row.append(s.acknowledged)
+ row.append(s.exclusive)
+ row.append(s.creditMode)
+ row.append(s.delivered)
+ rows.append(row)
+ except:
+ pass
title = "Subscriptions"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
@@ -469,33 +388,58 @@ class BrokerManager(Console):
dispRows = rows
disp.formattedTable(title, heads, dispRows)
+ def displayMemory(self, unused):
+ disp = Display(prefix=" ")
+ heads = [Header('Statistic'), Header('Value', Header.COMMAS)]
+ rows = []
+ memory = self.brokers[0].getMemory()
+ for k,v in memory.values.items():
+ if k != 'name':
+ rows.append([k, v])
+ disp.formattedTable('Broker Memory Statistics:', heads, rows)
+
+ def getExchangeMap(self):
+ exchanges = self.brokers[0].getAllExchanges()
+ emap = {}
+ for e in exchanges:
+ emap[e.name] = e
+ return emap
+
+ def getQueueMap(self):
+ queues = self.brokers[0].getAllQueues()
+ qmap = {}
+ for q in queues:
+ qmap[q.name] = q
+ return qmap
+
+ def getSessionMap(self):
+ sessions = self.brokers[0].getAllSessions()
+ smap = {}
+ for s in sessions:
+ smap[s.name] = s
+ return smap
+
+ def getConnectionMap(self):
+ connections = self.brokers[0].getAllConnections()
+ cmap = {}
+ for c in connections:
+ cmap[c.address] = c
+ return cmap
+
def displayMain(self, main, subs):
if main == 'b': self.displayBroker(subs)
elif main == 'c': self.displayConn(subs)
elif main == 's': self.displaySession(subs)
elif main == 'e': self.displayExchange(subs)
- elif main == 'q': self.displayQueue(subs)
+ elif main == 'q':
+ if config._detail:
+ self.displayQueue(subs, config._detail)
+ else:
+ self.displayQueues(subs)
elif main == 'u': self.displaySubscriptions(subs)
+ elif main == 'm': self.displayMemory(subs)
def display(self):
- if config._cluster_detail or config._types[0] == 'b':
- # always show cluster detail when dumping broker stats
- self._getCluster()
- if self.cluster:
- memberList = self.cluster.members.split(";")
- hostList = self._getHostList(memberList)
- self.qmf.delBroker(self.broker)
- self.broker = None
- if config._host.find("@") > 0:
- authString = config._host.split("@")[0] + "@"
- else:
- authString = ""
- for host in hostList:
- b = self.qmf.addBroker(authString + host, config._connTimeout)
- self.brokers.append(Broker(self.qmf, b))
- else:
- self.brokers.append(Broker(self.qmf, self.broker))
-
self.displayMain(config._types[0], config._types[1:])
diff --git a/qpid/tools/src/py/qpidtoollibs/__init__.py b/qpid/tools/src/py/qpidtoollibs/__init__.py
new file mode 100644
index 0000000000..31d5a2ef58
--- /dev/null
+++ b/qpid/tools/src/py/qpidtoollibs/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py
new file mode 100644
index 0000000000..366d9b0663
--- /dev/null
+++ b/qpid/tools/src/py/qpidtoollibs/broker.py
@@ -0,0 +1,322 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import Message
+try:
+ from uuid import uuid4
+except ImportError:
+ from qpid.datatypes import uuid4
+
+class BrokerAgent(object):
+ def __init__(self, conn):
+ self.conn = conn
+ self.sess = self.conn.session()
+ self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
+ str(uuid4())
+ self.reply_rx = self.sess.receiver(self.reply_to)
+ self.reply_rx.capacity = 10
+ self.tx = self.sess.sender("qmf.default.direct/broker")
+ self.next_correlator = 1
+
+ def close(self):
+ self.sess.close()
+
+ def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10):
+ props = {'method' : 'request',
+ 'qmf.opcode' : '_method_request',
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+
+ content = {'_object_id' : {'_object_name' : addr},
+ '_method_name' : method,
+ '_arguments' : arguments}
+
+ message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ response = self.reply_rx.fetch(timeout)
+ self.sess.acknowledge()
+ if response.properties['qmf.opcode'] == '_exception':
+ raise Exception("Exception from Agent: %r" % response.content['_values'])
+ if response.properties['qmf.opcode'] != '_method_response':
+ raise Exception("bad response: %r" % response.properties)
+ return response.content['_arguments']
+
+ def _sendRequest(self, opcode, content):
+ props = {'method' : 'request',
+ 'qmf.opcode' : opcode,
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+ message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ return correlator
+
+ def _doClassQuery(self, class_name):
+ query = {'_what' : 'OBJECT',
+ '_schema_id' : {'_class_name' : class_name}}
+ correlator = self._sendRequest('_query_request', query)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item)
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ self.sess.acknowledge()
+ return items
+
+ def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'):
+ query = {'_what' : 'OBJECT',
+ '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}}
+ correlator = self._sendRequest('_query_request', query)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item)
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ self.sess.acknowledge()
+ if len(items) == 1:
+ return items[0]
+ return None
+
+ def _getAllBrokerObjects(self, cls):
+ items = self._doClassQuery(cls.__name__.lower())
+ objs = []
+ for item in items:
+ objs.append(cls(self, item))
+ return objs
+
+ def _getBrokerObject(self, cls, name):
+ obj = self._doNameQuery(cls.__name__.lower(), name)
+ if obj:
+ return cls(self, obj)
+ return None
+
+ def getCluster(self):
+ return self._getAllBrokerObjects(Cluster)
+
+ def getBroker(self):
+ return self._getBrokerObject(Broker, "amqp-broker")
+
+ def getMemory(self):
+ return self._getAllBrokerObjects(Memory)[0]
+
+ def getAllConnections(self):
+ return self._getAllBrokerObjects(Connection)
+
+ def getConnection(self, name):
+ return self._getBrokerObject(Connection, name)
+
+ def getAllSessions(self):
+ return self._getAllBrokerObjects(Session)
+
+ def getSession(self, name):
+ return self._getBrokerObject(Session, name)
+
+ def getAllSubscriptions(self):
+ return self._getAllBrokerObjects(Subscription)
+
+ def getSubscription(self, name):
+ return self._getBrokerObject(Subscription, name)
+
+ def getAllExchanges(self):
+ return self._getAllBrokerObjects(Exchange)
+
+ def getExchange(self, name):
+ return self._getBrokerObject(Exchange, name)
+
+ def getAllQueues(self):
+ return self._getAllBrokerObjects(Queue)
+
+ def getQueue(self, name):
+ return self._getBrokerObject(Queue, name)
+
+ def getAllBindings(self):
+ return self._getAllBrokerObjects(Binding)
+
+ def getBinding(self, exchange=None, queue=None):
+ pass
+
+ def echo(self, sequence, body):
+ """Request a response to test the path to the management broker"""
+ pass
+
+ def connect(self, host, port, durable, authMechanism, username, password, transport):
+ """Establish a connection to another broker"""
+ pass
+
+ def queueMoveMessages(self, srcQueue, destQueue, qty):
+ """Move messages from one queue to another"""
+ pass
+
+ def setLogLevel(self, level):
+ """Set the log level"""
+ pass
+
+ def getLogLevel(self):
+ """Get the log level"""
+ pass
+
+ def setTimestampConfig(self, receive):
+ """Set the message timestamping configuration"""
+ pass
+
+ def getTimestampConfig(self):
+ """Get the message timestamping configuration"""
+ pass
+
+# def addExchange(self, exchange_type, name, **kwargs):
+# pass
+
+# def delExchange(self, name):
+# pass
+
+# def addQueue(self, name, **kwargs):
+# pass
+
+# def delQueue(self, name):
+# pass
+
+# def bind(self, exchange, queue, key, **kwargs):
+# pass
+
+# def unbind(self, exchange, queue, key, **kwargs):
+# pass
+
+ def create(self, _type, name, properties, strict):
+ """Create an object of the specified type"""
+ pass
+
+ def delete(self, _type, name, options):
+ """Delete an object of the specified type"""
+ pass
+
+ def query(self, _type, name):
+ """Query the current state of an object"""
+ return self._getBrokerObject(self, _type, name)
+
+
+class BrokerObject(object):
+ def __init__(self, broker, content):
+ self.broker = broker
+ self.content = content
+ self.values = content['_values']
+
+ def __getattr__(self, key):
+ if key not in self.values:
+ return None
+ value = self.values[key]
+ if value.__class__ == dict and '_object_name' in value:
+ full_name = value['_object_name']
+ colon = full_name.find(':')
+ if colon > 0:
+ full_name = full_name[colon+1:]
+ colon = full_name.find(':')
+ if colon > 0:
+ return full_name[colon+1:]
+ return value
+
+ def getAttributes(self):
+ return self.values
+
+ def getCreateTime(self):
+ return self.content['_create_ts']
+
+ def getDeleteTime(self):
+ return self.content['_delete_ts']
+
+ def getUpdateTime(self):
+ return self.content['_update_ts']
+
+ def update(self):
+ """
+ Reload the property values from the agent.
+ """
+ refreshed = self.broker._getBrokerObject(self.__class__, self.name)
+ if refreshed:
+ self.content = refreshed.content
+ self.values = self.content['_values']
+ else:
+ raise Exception("No longer exists on the broker")
+
+class Broker(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Memory(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Connection(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+ def close(self):
+ pass
+
+class Session(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Subscription(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+ def __repr__(self):
+ return "subscription name undefined"
+
+class Exchange(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Binding(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+ def __repr__(self):
+ return "Binding key: %s" % self.values['bindingKey']
+
+class Queue(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+ def purge(self, request):
+ """Discard all or some messages on a queue"""
+ self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name)
+
+ def reroute(self, request, useAltExchange, exchange, filter={}):
+ """Remove all or some messages on this queue and route them to an exchange"""
+ self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter},
+ "org.apache.qpid.broker:queue:%s" % self.name)
+
diff --git a/qpid/tools/src/py/qpidtoollibs/disp.py b/qpid/tools/src/py/qpidtoollibs/disp.py
new file mode 100644
index 0000000000..cb7d3da306
--- /dev/null
+++ b/qpid/tools/src/py/qpidtoollibs/disp.py
@@ -0,0 +1,249 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from time import strftime, gmtime
+
+class Header:
+ """ """
+ NONE = 1
+ KMG = 2
+ YN = 3
+ Y = 4
+ TIME_LONG = 5
+ TIME_SHORT = 6
+ DURATION = 7
+ COMMAS = 8
+
+ def __init__(self, text, format=NONE):
+ self.text = text
+ self.format = format
+
+ def __repr__(self):
+ return self.text
+
+ def __str__(self):
+ return self.text
+
+ def formatted(self, value):
+ try:
+ if value == None:
+ return ''
+ if self.format == Header.NONE:
+ return value
+ if self.format == Header.KMG:
+ return self.num(value)
+ if self.format == Header.YN:
+ if value:
+ return 'Y'
+ return 'N'
+ if self.format == Header.Y:
+ if value:
+ return 'Y'
+ return ''
+ if self.format == Header.TIME_LONG:
+ return strftime("%c", gmtime(value / 1000000000))
+ if self.format == Header.TIME_SHORT:
+ return strftime("%X", gmtime(value / 1000000000))
+ if self.format == Header.DURATION:
+ if value < 0: value = 0
+ sec = value / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
+ if self.format == Header.COMMAS:
+ sval = str(value)
+ result = ""
+ while True:
+ if len(sval) == 0:
+ return result
+ left = sval[:-3]
+ right = sval[-3:]
+ result = right + result
+ if len(left) > 0:
+ result = ',' + result
+ sval = left
+ except:
+ return "?"
+
+ def numCell(self, value, tag):
+ fp = float(value) / 1000.
+ if fp < 10.0:
+ return "%1.2f%c" % (fp, tag)
+ if fp < 100.0:
+ return "%2.1f%c" % (fp, tag)
+ return "%4d%c" % (value / 1000, tag)
+
+ def num(self, value):
+ if value < 1000:
+ return "%4d" % value
+ if value < 1000000:
+ return self.numCell(value, 'k')
+ value /= 1000
+ if value < 1000000:
+ return self.numCell(value, 'm')
+ value /= 1000
+ return self.numCell(value, 'g')
+
+
+class Display:
+ """ Display formatting for QPID Management CLI """
+
+ def __init__(self, spacing=2, prefix=" "):
+ self.tableSpacing = spacing
+ self.tablePrefix = prefix
+ self.timestampFormat = "%X"
+
+ def formattedTable(self, title, heads, rows):
+ fRows = []
+ for row in rows:
+ fRow = []
+ col = 0
+ for cell in row:
+ fRow.append(heads[col].formatted(cell))
+ col += 1
+ fRows.append(fRow)
+ headtext = []
+ for head in heads:
+ headtext.append(head.text)
+ self.table(title, headtext, fRows)
+
+ def table(self, title, heads, rows):
+ """ Print a table with autosized columns """
+
+ # Pad the rows to the number of heads
+ for row in rows:
+ diff = len(heads) - len(row)
+ for idx in range(diff):
+ row.append("")
+
+ print title
+ if len (rows) == 0:
+ return
+ colWidth = []
+ col = 0
+ line = self.tablePrefix
+ for head in heads:
+ width = len (head)
+ for row in rows:
+ cellWidth = len (unicode (row[col]))
+ if cellWidth > width:
+ width = cellWidth
+ colWidth.append (width + self.tableSpacing)
+ line = line + head
+ if col < len (heads) - 1:
+ for i in range (colWidth[col] - len (head)):
+ line = line + " "
+ col = col + 1
+ print line
+ line = self.tablePrefix
+ for width in colWidth:
+ for i in range (width):
+ line = line + "="
+ print line
+
+ for row in rows:
+ line = self.tablePrefix
+ col = 0
+ for width in colWidth:
+ line = line + unicode (row[col])
+ if col < len (heads) - 1:
+ for i in range (width - len (unicode (row[col]))):
+ line = line + " "
+ col = col + 1
+ print line
+
+ def do_setTimeFormat (self, fmt):
+ """ Select timestamp format """
+ if fmt == "long":
+ self.timestampFormat = "%c"
+ elif fmt == "short":
+ self.timestampFormat = "%X"
+
+ def timestamp (self, nsec):
+ """ Format a nanosecond-since-the-epoch timestamp for printing """
+ return strftime (self.timestampFormat, gmtime (nsec / 1000000000))
+
+ def duration(self, nsec):
+ if nsec < 0: nsec = 0
+ sec = nsec / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
+
+class Sortable:
+ """ """
+ def __init__(self, row, sortIndex):
+ self.row = row
+ self.sortIndex = sortIndex
+ if sortIndex >= len(row):
+ raise Exception("sort index exceeds row boundary")
+
+ def __cmp__(self, other):
+ return cmp(self.row[self.sortIndex], other.row[self.sortIndex])
+
+ def getRow(self):
+ return self.row
+
+class Sorter:
+ """ """
+ def __init__(self, heads, rows, sortCol, limit=0, inc=True):
+ col = 0
+ for head in heads:
+ if head.text == sortCol:
+ break
+ col += 1
+ if col == len(heads):
+ raise Exception("sortCol '%s', not found in headers" % sortCol)
+
+ list = []
+ for row in rows:
+ list.append(Sortable(row, col))
+ list.sort()
+ if not inc:
+ list.reverse()
+ count = 0
+ self.sorted = []
+ for row in list:
+ self.sorted.append(row.getRow())
+ count += 1
+ if count == limit:
+ break
+
+ def getSorted(self):
+ return self.sorted