diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
commit | 7e34266b9a23f4536415bfbc3f161b84615b6550 (patch) | |
tree | 484008cf2d413f58b5e4ab80b373303c66200888 /RC9/qpid/python/commands/qpid-queue-stats | |
parent | 4612263ea692f00a4bd810438bdaf9bc88022091 (diff) | |
download | qpid-python-M4.tar.gz |
Tag M4 RC9M4
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@734202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC9/qpid/python/commands/qpid-queue-stats')
-rwxr-xr-x | RC9/qpid/python/commands/qpid-queue-stats | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/RC9/qpid/python/commands/qpid-queue-stats b/RC9/qpid/python/commands/qpid-queue-stats new file mode 100755 index 0000000000..356a1d2d8d --- /dev/null +++ b/RC9/qpid/python/commands/qpid-queue-stats @@ -0,0 +1,144 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import optparse +import sys +import re +import socket +import qpid +from threading import Condition +from qmf.console import Session, Console +from qpid.peer import Closed +from qpid.connection import Connection, ConnectionFailed +from time import sleep + +class BrokerManager(Console): + def __init__(self, host): + self.url = host + self.objects = {} + self.filter = None + self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, + userBindings=True, manageConnections=True) + self.broker = self.session.addBroker(self.url) + self.firstError = True + + def setFilter(self,filter): + self.filter = filter + + def brokerConnected(self, broker): + if not self.firstError: + print "*** Broker connected" + self.firstError = False + + def brokerDisconnected(self, broker): + print "*** Broker connection lost - %s, retrying..." % broker.getError() + self.firstError = False + self.objects.clear() + + def objectProps(self, broker, record): + className = record.getClassKey().getClassName() + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + self.objects[id] = (record.name, None, None) + + def objectStats(self, broker, record): + className = record.getClassKey().getClassName() + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + return + + (name, first, last) = self.objects[id] + if first == None: + self.objects[id] = (name, record, None) + return + + if len(self.filter) > 0 : + match = False + + for x in self.filter: + if x.match(name): + match = True + break + if match == False: + return + + if last == None: + lastSample = first + else: + lastSample = last + + self.objects[id] = (name, first, record) + + deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0]) + if deltaTime < 1000000000.0: + return + enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \ + (deltaTime / 1000000000.0) + dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \ + (deltaTime / 1000000000.0) + print "%-41s%10.2f%11d%13.2f%13.2f" % \ + (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate) + + + def Display (self): + self.session.bindClass("org.apache.qpid.broker", "queue") + print "Queue Name Sec Depth Enq Rate Deq Rate" + print "========================================================================================" + try: + while True: + sleep (1) + if self.firstError and self.broker.getError(): + self.firstError = False + print "*** Error: %s, retrying..." % self.broker.getError() + except KeyboardInterrupt: + print + self.session.delBroker(self.broker) + +## +## Main Program +## +def main(): + p = optparse.OptionParser() + p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost') + p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show') + + options, arguments = p.parse_args() + + host = options.broker_address + filter = [] + if options.filter != None: + for s in options.filter.split(","): + filter.append(re.compile(s)) + + bm = BrokerManager(host) + bm.setFilter(filter) + bm.Display() + +if __name__ == '__main__': + main() + |