#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import getopt import sys import locale import socket import re from qmf.console import Session, Console from qpid.disp import Display _host = "localhost" _top = False _types = "" pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") def Usage (): print "Usage: qpid-stat [OPTIONS] [broker-addr]" print print " broker-addr is in the form: [username/password@] hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print # print "General Options:" # print " -n [--numeric] Don't resolve names" # print " -t [--top] Repeatedly display top items" # print print "Display Options:" print print " -b Show Brokers" print " -c Show Connections" # print " -s Show Sessions" # print " -e Show Exchanges" # print " -q Show Queues" print sys.exit (1) def num(value): if value < 2000: return str(value) value /= 1000 if value < 2000: return str(value) + "k" value /= 1000 if value < 2000: return str(value) + "m" value /= 1000 return str(value) + "g" class IpAddr: def __init__(self, text): if text.find("@") != -1: tokens = text.split("@") text = tokens[1] if text.find(":") != -1: tokens = text.split(":") text = tokens[0] self.port = int(tokens[1]) else: self.port = 5672 self.dottedQuad = socket.gethostbyname(text) nums = self.dottedQuad.split(".") self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) def bestAddr(self, addrPortList): bestDiff = 0xFFFFFFFF bestAddr = None for addrPort in addrPortList: diff = IpAddr(addrPort[0]).addr ^ self.addr if diff < bestDiff: bestDiff = diff bestAddr = addrPort return bestAddr class Broker(object): def __init__(self, qmf, broker): self.broker = broker list = qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _broker=broker)[0] self.currentTime = bobj.getTimestamps()[0] try: self.uptime = bobj.uptime except: self.uptime = 0 self.connections = {} self.sessions = {} self.exchanges = {} self.queues = {} for conn in list: if pattern.match(conn.address): self.connections[conn.getObjectId()] = conn list = qmf.getObjects(_class="session", _package="org.apache.qpid.broker", _broker=broker) for sess in list: if sess.connectionRef in self.connections: self.sessions[sess.getObjectId()] = sess list = qmf.getObjects(_class="exchange", _package="org.apache.qpid.broker", _broker=broker) for exchange in list: self.exchanges[exchange.getObjectId()] = exchange list = qmf.getObjects(_class="queue", _package="org.apache.qpid.broker", _broker=broker) for queue in list: self.queues[queue.getObjectId()] = queue def getName(self): return self.broker.getUrl() def getCurrentTime(self): return self.currentTime def getUptime(self): return self.uptime class BrokerManager(Console): def __init__(self): self.brokerName = None self.qmf = None self.broker = None self.brokers = [] self.cluster = None def SetBroker(self, brokerUrl): self.url = brokerUrl self.qmf = Session() self.broker = self.qmf.addBroker(brokerUrl) agents = self.qmf.getAgents() for a in agents: if a.getAgentBank() == 0: self.brokerAgent = a def Disconnect(self): if self.broker: self.qmf.delBroker(self.broker) def _getCluster(self): packages = self.qmf.getPackages() if "org.apache.qpid.cluster" not in packages: return None clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) if len(clusters) == 0: print "Clustering is installed but not enabled on the broker." return None self.cluster = clusters[0] def _getHostList(self, urlList): hosts = [] hostAddr = IpAddr(_host) for url in urlList: if url.find("amqp:") != 0: raise Exception("Invalid URL 1") url = url[5:] addrs = str(url).split(",") addrList = [] for addr in addrs: tokens = addr.split(":") if len(tokens) != 3: raise Exception("Invalid URL 2") addrList.append((tokens[1], tokens[2])) # Find the address in the list that is most likely to be in the same subnet as the address # with which we made the original QMF connection. This increases the probability that we will # be able to reach the cluster member. best = hostAddr.bestAddr(addrList) bestUrl = best[0] + ":" + best[1] hosts.append(bestUrl) return hosts def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): if len(subs) == 0: return this = subs[0] remaining = subs[1:] newindent = indent + " " if this == 'b': pass elif this == 'c': if broker: for oid in broker.connections: iconn = broker.connections[oid] self.printConnSub(indent, broker.getName(), iconn) self.displaySubs(remaining, newindent, broker=broker, conn=iconn, sess=sess, exchange=exchange, queue=queue) elif this == 's': pass elif this == 'e': pass elif this == 'q': pass print def displayBroker(self, subs): disp = Display(prefix=" ") heads = ('Broker', 'cluster', 'uptime', 'conn', 'sess', 'exch', 'queue') rows = [] for broker in self.brokers: if self.cluster: ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) else: ctext = "" utext = "" if broker.getUptime() > 0: utext = disp.duration(broker.getUptime()) row = (broker.getName(), ctext, utext, str(len(broker.connections)), str(len(broker.sessions)), str(len(broker.exchanges)), str(len(broker.queues))) rows.append(row) disp.table("Brokers", heads, rows) def displayConn(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append('broker') heads.append('client addr') heads.append('client(pid)') heads.append('auth') heads.append('connected') heads.append('idle') heads.append('msgIn') heads.append('msgOut') rows = [] for broker in self.brokers: for oid in broker.connections: conn = broker.connections[oid] row = [] if self.cluster: row.append(broker.getName()) row.append(conn.address) procpid = "" if conn.remoteProcessName: procpid += conn.remoteProcessName if conn.remotePid: procpid += "(%d)" % conn.remotePid row.append(procpid) row.append(conn.authIdentity) row.append(disp.duration(broker.getCurrentTime() - conn.getTimestamps()[1])) idle = broker.getCurrentTime() - conn.getTimestamps()[0] if idle < 10000000000: itext = "" else: itext = disp.duration(idle) row.append(itext) row.append(num(conn.framesFromClient)) row.append(num(conn.framesToClient)) rows.append(row) title = "Connections" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName disp.table(title, heads, rows) def displaySession(self, subs): disp = Display(prefix=" ") def displayExchange(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append('broker') heads.append("exchange") heads.append("type") heads.append("dur") heads.append("bind") heads.append("msgIn") heads.append("msgOut") heads.append("msgDrop") heads.append("byteIn") heads.append("byteOut") heads.append("byteDrop") rows = [] for broker in self.brokers: for oid in broker.exchanges: ex = broker.exchanges[oid] row = [] if self.cluster: row.append(broker.getName()) if ex.durable: dur = "Y" else: dur = "" row.append(ex.name) row.append(ex.type) row.append(dur) row.append(num(ex.bindingCount)) row.append(num(ex.msgReceives)) row.append(num(ex.msgRoutes)) row.append(num(ex.msgDrops)) row.append(num(ex.byteReceives)) row.append(num(ex.byteRoutes)) row.append(num(ex.byteDrops)) rows.append(row) title = "Exchanges" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName disp.table(title, heads, rows) def displayMain(self, main, subs): if main == 'b': self.displayBroker(subs) elif main == 'c': self.displayConn(subs) elif main == 's': self.displaySession(subs) elif main == 'e': self.displayExchange(subs) def display(self): self._getCluster() if self.cluster: memberList = self.cluster.members.split(";") hostList = self._getHostList(memberList) self.qmf.delBroker(self.broker) self.broker = None for host in hostList: b = self.qmf.addBroker(host) self.brokers.append(Broker(self.qmf, b)) else: self.brokers.append(Broker(self.qmf, self.broker)) self.displayMain(_types[0], _types[1:]) ## ## Main Program ## try: longOpts = ("top", "numeric") (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bc", longOpts) except: Usage() try: encoding = locale.getpreferredencoding() cargs = [a.decode(encoding) for a in encArgs] except: cargs = encArgs for opt in optlist: if opt[0] == "-t" or opt[0] == "--top": _top = True elif opt[0] == "-n" or opt[0] == "--numeric": _numeric = True elif len(opt[0]) == 2: char = opt[0][1] if "bcseq".find(char) != -1: _types += char else: Usage() else: Usage() if len(_types) == 0: Usage() nargs = len(cargs) bm = BrokerManager() if nargs == 1: _host = cargs[0] try: bm.SetBroker(_host) bm.display() except KeyboardInterrupt: print except Exception,e: print "Failed:", e.args sys.exit(1) bm.Disconnect()