summaryrefslogtreecommitdiff
path: root/python/commands
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-15 15:51:15 +0000
committerTed Ross <tross@apache.org>2008-10-15 15:51:15 +0000
commitd5913038788795bd964c534bd28983e5732c2fce (patch)
treeb86ab7ab84ec2f25421c161e76ef2924043ca30a /python/commands
parente173cf8c8bd0af424a2d087f02dfa83fcbf7029d (diff)
downloadqpid-python-d5913038788795bd964c534bd28983e5732c2fce.tar.gz
QPID-1360 - Scaling improvements for QMF
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704944 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/commands')
-rwxr-xr-xpython/commands/qpid-queue-stats195
1 files changed, 85 insertions, 110 deletions
diff --git a/python/commands/qpid-queue-stats b/python/commands/qpid-queue-stats
index 98dfa7580a..c29cab3568 100755
--- a/python/commands/qpid-queue-stats
+++ b/python/commands/qpid-queue-stats
@@ -26,120 +26,96 @@ import re
import socket
import qpid
from threading import Condition
-from qpid.management import managementClient
-from qpid.managementdata import Broker
+from qpid.qmfconsole import Session, Console
from qpid.peer import Closed
from qpid.connection import Connection, ConnectionFailed
from qpid.util import connect
from time import sleep
-class mgmtObject (object):
- """ Generic object that holds the contents of a management object with its
- attributes set as object attributes. """
-
- def __init__ (self, classKey, timestamps, row):
- self.classKey = classKey
- self.timestamps = timestamps
- for cell in row:
- setattr (self, cell[0], cell[1])
-
-
-
-class BrokerManager:
- def __init__ (self):
- self.dest = None
- self.src = None
- self.broker = None
- self.objects = {}
- self.filter = None
-
- def SetBroker (self, broker):
- self.broker = broker
-
- def ConnectToBroker (self):
- try:
- self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
- self.conn = Connection (connect (self.broker.host, self.broker.port),
- username=self.broker.username, password=self.broker.password)
- self.conn.start ()
- self.session = self.conn.session(self.sessionId)
- self.mclient = managementClient (self.conn.spec, None, self.configCb, self.instCb)
- self.mchannel = self.mclient.addChannel (self.session)
- except socket.error, e:
- print "Socket Error %s - %s" % (e[0], e[1])
- sys.exit (1)
- except Closed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit (1)
- except ConnectionFailed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit(1)
-
- def setFilter(self,filter):
- self.filter = filter
-
- def Disconnect (self):
- self.mclient.removeChannel (self.mchannel)
- self.session.close(timeout=10)
- self.conn.close(timeout=10)
-
- def configCb (self, context, classKey, row, timestamps):
- className = classKey[1]
- if className != "queue":
- return
-
- obj = mgmtObject (classKey, timestamps, row)
- if obj.id not in self.objects:
- self.objects[obj.id] = (obj.name, None, None)
-
- def instCb (self, context, classKey, row, timestamps):
- className = classKey[1]
- if className != "queue":
- return
-
- obj = mgmtObject (classKey, timestamps, row)
- if obj.id not in self.objects:
- return
-
- (name, first, last) = self.objects[obj.id]
- if first == None:
- self.objects[obj.id] = (name, obj, None)
- return
-
- if len(self.filter) > 0 :
- match = False
+class BrokerManager(Console):
+ def __init__(self, host):
+ self.url = host
+ self.objects = {}
+ self.filter = None
+ self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, userBindings=True)
+ try:
+ self.broker = self.session.addBroker(self.url)
+ except socket.error, e:
+ print "Socket Error %s - %s" % (e[0], e[1])
+ sys.exit (1)
+ except Closed, e:
+ print "Connect Failed %d - %s" % (e[0], e[1])
+ sys.exit (1)
+ except ConnectionFailed, e:
+ print "Connect Failed %d - %s" % (e[0], e[1])
+ sys.exit(1)
+
+ def setFilter(self,filter):
+ self.filter = filter
+
+ def objectProps(self, broker, record):
+ className = record.getClassKey()[1]
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ self.objects[id] = (record.name, None, None)
+
+ def objectStats(self, broker, record):
+ className = record.getClassKey()[1]
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ return
+
+ (name, first, last) = self.objects[id]
+ if first == None:
+ self.objects[id] = (name, record, None)
+ return
+
+ if len(self.filter) > 0 :
+ match = False
- for x in self.filter:
- if x.match(name):
- match = True
- break
- if match == False:
- return
-
- if last == None:
- lastSample = first
- else:
- lastSample = last
-
- self.objects[obj.id] = (name, first, obj)
-
- deltaTime = float (obj.timestamps[0] - lastSample.timestamps[0])
- enqueueRate = float (obj.msgTotalEnqueues - lastSample.msgTotalEnqueues) / (deltaTime / 1000000000.0)
- dequeueRate = float (obj.msgTotalDequeues - lastSample.msgTotalDequeues) / (deltaTime / 1000000000.0)
- print "%-41s%10.2f%11d%13.2f%13.2f" % \
- (name, deltaTime / 1000000000, obj.msgDepth, enqueueRate, dequeueRate)
-
-
- def Display (self):
- self.ConnectToBroker ()
- print "Queue Name Sec Depth Enq Rate Deq Rate"
- print "========================================================================================"
- try:
- while True:
- sleep (1)
- except KeyboardInterrupt:
- pass
- self.Disconnect ()
+ for x in self.filter:
+ if x.match(name):
+ match = True
+ break
+ if match == False:
+ return
+
+ if last == None:
+ lastSample = first
+ else:
+ lastSample = last
+
+ self.objects[id] = (name, first, record)
+
+ deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0])
+ enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \
+ (deltaTime / 1000000000.0)
+ dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \
+ (deltaTime / 1000000000.0)
+ print "%-41s%10.2f%11d%13.2f%13.2f" % \
+ (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate)
+
+
+ def Display (self):
+ classes = self.session.getClasses("org.apache.qpid.broker")
+ for cls in classes:
+ if cls[1] == "queue":
+ queueClassKey = cls
+ self.session.bindClass(queueClassKey)
+ print "Queue Name Sec Depth Enq Rate Deq Rate"
+ print "========================================================================================"
+ try:
+ while True:
+ sleep (1)
+ except KeyboardInterrupt:
+ print
+ self.session.delBroker(self.broker)
##
## Main Program
@@ -157,8 +133,7 @@ def main():
for s in options.filter.split(","):
filter.append(re.compile(s))
- bm = BrokerManager ()
- bm.SetBroker (Broker (host))
+ bm = BrokerManager(host)
bm.setFilter(filter)
bm.Display()