diff options
| author | Ted Ross <tross@apache.org> | 2008-10-15 15:51:15 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-10-15 15:51:15 +0000 |
| commit | d5913038788795bd964c534bd28983e5732c2fce (patch) | |
| tree | b86ab7ab84ec2f25421c161e76ef2924043ca30a /python/commands | |
| parent | e173cf8c8bd0af424a2d087f02dfa83fcbf7029d (diff) | |
| download | qpid-python-d5913038788795bd964c534bd28983e5732c2fce.tar.gz | |
QPID-1360 - Scaling improvements for QMF
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704944 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/commands')
| -rwxr-xr-x | python/commands/qpid-queue-stats | 195 |
1 files changed, 85 insertions, 110 deletions
diff --git a/python/commands/qpid-queue-stats b/python/commands/qpid-queue-stats index 98dfa7580a..c29cab3568 100755 --- a/python/commands/qpid-queue-stats +++ b/python/commands/qpid-queue-stats @@ -26,120 +26,96 @@ import re import socket import qpid from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker +from qpid.qmfconsole import Session, Console from qpid.peer import Closed from qpid.connection import Connection, ConnectionFailed from qpid.util import connect from time import sleep -class mgmtObject (object): - """ Generic object that holds the contents of a management object with its - attributes set as object attributes. """ - - def __init__ (self, classKey, timestamps, row): - self.classKey = classKey - self.timestamps = timestamps - for cell in row: - setattr (self, cell[0], cell[1]) - - - -class BrokerManager: - def __init__ (self): - self.dest = None - self.src = None - self.broker = None - self.objects = {} - self.filter = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec, None, self.configCb, self.instCb) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def setFilter(self,filter): - self.filter = filter - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) - - def configCb (self, context, classKey, row, timestamps): - className = classKey[1] - if className != "queue": - return - - obj = mgmtObject (classKey, timestamps, row) - if obj.id not in self.objects: - self.objects[obj.id] = (obj.name, None, None) - - def instCb (self, context, classKey, row, timestamps): - className = classKey[1] - if className != "queue": - return - - obj = mgmtObject (classKey, timestamps, row) - if obj.id not in self.objects: - return - - (name, first, last) = self.objects[obj.id] - if first == None: - self.objects[obj.id] = (name, obj, None) - return - - if len(self.filter) > 0 : - match = False +class BrokerManager(Console): + def __init__(self, host): + self.url = host + self.objects = {} + self.filter = None + self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, userBindings=True) + try: + self.broker = self.session.addBroker(self.url) + except socket.error, e: + print "Socket Error %s - %s" % (e[0], e[1]) + sys.exit (1) + except Closed, e: + print "Connect Failed %d - %s" % (e[0], e[1]) + sys.exit (1) + except ConnectionFailed, e: + print "Connect Failed %d - %s" % (e[0], e[1]) + sys.exit(1) + + def setFilter(self,filter): + self.filter = filter + + def objectProps(self, broker, record): + className = record.getClassKey()[1] + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + self.objects[id] = (record.name, None, None) + + def objectStats(self, broker, record): + className = record.getClassKey()[1] + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + return + + (name, first, last) = self.objects[id] + if first == None: + self.objects[id] = (name, record, None) + return + + if len(self.filter) > 0 : + match = False - for x in self.filter: - if x.match(name): - match = True - break - if match == False: - return - - if last == None: - lastSample = first - else: - lastSample = last - - self.objects[obj.id] = (name, first, obj) - - deltaTime = float (obj.timestamps[0] - lastSample.timestamps[0]) - enqueueRate = float (obj.msgTotalEnqueues - lastSample.msgTotalEnqueues) / (deltaTime / 1000000000.0) - dequeueRate = float (obj.msgTotalDequeues - lastSample.msgTotalDequeues) / (deltaTime / 1000000000.0) - print "%-41s%10.2f%11d%13.2f%13.2f" % \ - (name, deltaTime / 1000000000, obj.msgDepth, enqueueRate, dequeueRate) - - - def Display (self): - self.ConnectToBroker () - print "Queue Name Sec Depth Enq Rate Deq Rate" - print "========================================================================================" - try: - while True: - sleep (1) - except KeyboardInterrupt: - pass - self.Disconnect () + for x in self.filter: + if x.match(name): + match = True + break + if match == False: + return + + if last == None: + lastSample = first + else: + lastSample = last + + self.objects[id] = (name, first, record) + + deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0]) + enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \ + (deltaTime / 1000000000.0) + dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \ + (deltaTime / 1000000000.0) + print "%-41s%10.2f%11d%13.2f%13.2f" % \ + (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate) + + + def Display (self): + classes = self.session.getClasses("org.apache.qpid.broker") + for cls in classes: + if cls[1] == "queue": + queueClassKey = cls + self.session.bindClass(queueClassKey) + print "Queue Name Sec Depth Enq Rate Deq Rate" + print "========================================================================================" + try: + while True: + sleep (1) + except KeyboardInterrupt: + print + self.session.delBroker(self.broker) ## ## Main Program @@ -157,8 +133,7 @@ def main(): for s in options.filter.split(","): filter.append(re.compile(s)) - bm = BrokerManager () - bm.SetBroker (Broker (host)) + bm = BrokerManager(host) bm.setFilter(filter) bm.Display() |
