diff options
| author | Ted Ross <tross@apache.org> | 2010-05-19 13:26:01 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-05-19 13:26:01 +0000 |
| commit | bebae522aad1f1bbf46249230c5026de085ae4e3 (patch) | |
| tree | 8896a3103cdd3c726ac81cf56740d8855534e716 | |
| parent | df0f6a518fb1574dcdf946975fe38d8f25346480 (diff) | |
| download | qpid-python-bebae522aad1f1bbf46249230c5026de085ae4e3.tar.gz | |
qpid-tool re-write:
1) No longer uses the (really) old API
2) Handles the new QMFv2 functionality
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@946178 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | extras/qmf/src/py/qmf/console.py | 36 | ||||
| -rwxr-xr-x | tools/src/py/qpid-tool | 599 |
2 files changed, 546 insertions, 89 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index fc301fe296..1cd2acbcb7 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -247,14 +247,16 @@ class Object(object): def getIndex(self): """ Return a string describing this object's primary key. """ + if self._objectId.isV2: + return self._objectId.getObject() result = u"" - for property, value in self._properties: - if property.index: + for prop, value in self._properties: + if prop.index: if result != u"": result += u":" try: - valstr = unicode(self._session._displayValue(value, property.type)) - except: + valstr = unicode(self._session._displayValue(value, prop.type)) + except Exception, e: valstr = u"<undecodable>" result += valstr return result @@ -984,7 +986,10 @@ class Session: values = content["_values"] timestamp = values["timestamp"] interval = values["heartbeat_interval"] - except: + epoch = 0 + if 'epoch' in values: + epoch = values['epoch'] + except Exception,e: return ## @@ -999,6 +1004,7 @@ class Session: agent = broker.getAgent(1, agentName) if agent == None: agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) + agent.setEpoch(epoch) broker._addAgent(agentName, agent) else: agent.touch() @@ -1233,7 +1239,7 @@ class Session: sendCodec = Codec() seq = self.seqMgr._reserve((method, False)) - if objectId.isV2(): + if objectId.isV2: # # Compose and send a QMFv2 method request # @@ -1681,6 +1687,7 @@ class ObjectId: """ Object that represents QMF object identifiers """ def __init__(self, constructor, first=0, second=0, agentName=None): if constructor.__class__ == dict: + self.isV2 = True self.agentName = agentName self.agentEpoch = 0 if '_agent_name' in constructor: self.agentName = constructor['_agent_name'] @@ -1689,6 +1696,7 @@ class ObjectId: raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.") self.objectName = constructor['_object_name'] else: + self.isV2 = None if not constructor: first = first second = second @@ -1723,9 +1731,6 @@ class ObjectId: return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(), self.getBrokerBank(), self.getAgentBank(), self.getObject()) - def isV2(self): - return not self.agentName.isdigit() - def index(self): return self.__repr__() @@ -2392,6 +2397,7 @@ class Agent: self.unsolicitedContext = RequestContext(self, self) self.lastSeenTime = time() self.closed = None + self.epoch = 0 def _checkClosed(self): @@ -2418,6 +2424,18 @@ class Agent: self.lastSeenTime = time() + def setEpoch(self, epoch): + self.epoch = epoch + + + def epochMismatch(self, epoch): + if epoch == 0 or self.epoch == 0: + return None + if epoch == self.epoch: + return None + return True + + def isOld(self): if self.heartbeatInterval == 0: return None diff --git a/tools/src/py/qpid-tool b/tools/src/py/qpid-tool index 05afcc9732..c74595a359 100755 --- a/tools/src/py/qpid-tool +++ b/tools/src/py/qpid-tool @@ -23,134 +23,581 @@ import os import getopt import sys import socket -from cmd import Cmd -from qpid.connection import ConnectionFailed, Timeout -from qpid.managementdata import ManagementData -from shlex import split -from qpid.disp import Display -from qpid.peer import Closed - -class Mcli (Cmd): +from cmd import Cmd +from shlex import split +from threading import Lock +from qpid.disp import Display +from qpid.peer import Closed +from qmf.console import Session, Console, SchemaClass + +class Mcli(Cmd): """ Management Command Interpreter """ - def __init__ (self, dataObject, dispObject): - Cmd.__init__ (self) + def __init__(self, dataObject, dispObject): + Cmd.__init__(self) self.dataObject = dataObject self.dispObject = dispObject - self.dataObject.setCli (self) + self.dataObject.setCli(self) self.prompt = "qpid: " - def emptyline (self): + def emptyline(self): pass - def setPromptMessage (self, p): + def setPromptMessage(self, p): if p == None: self.prompt = "qpid: " else: self.prompt = "qpid[%s]: " % p - def do_help (self, data): + def do_help(self, data): print "Management Tool for QPID" print print "Commands:" + print " agents - Print a list of the known Agents" print " list - Print summary of existing objects by class" print " list <className> - Print list of objects of the specified class" print " list <className> active - Print list of non-deleted objects of the specified class" - print " show <className> - Print contents of all objects of specified class" - print " show <className> active - Print contents of all non-deleted objects of specified class" - print " show <list-of-IDs> - Print contents of one or more objects (infer className)" - print " show <className> <list-of-IDs> - Print contents of one or more objects" - print " list is space-separated, ranges may be specified (i.e. 1004-1010)" +# print " show <className> - Print contents of all objects of specified class" +# print " show <className> active - Print contents of all non-deleted objects of specified class" + print " show <ID> - Print contents of an object (infer className)" +# print " show <className> <list-of-IDs> - Print contents of one or more objects" +# print " list is space-separated, ranges may be specified (i.e. 1004-1010)" print " call <ID> <methodName> [<args>] - Invoke a method on an object" print " schema - Print summary of object classes seen on the target" print " schema <className> - Print details of an object class" print " set time-format short - Select short timestamp format (default)" print " set time-format long - Select long timestamp format" - print " id [<ID>] - Display translations of display object ids" print " quit or ^D - Exit the program" print - def complete_set (self, text, line, begidx, endidx): + def complete_set(self, text, line, begidx, endidx): """ Command completion for the 'set' command """ - tokens = split (line) - if len (tokens) < 2: + tokens = split(line) + if len(tokens) < 2: return ["time-format "] elif tokens[1] == "time-format": - if len (tokens) == 2: + if len(tokens) == 2: return ["long", "short"] - elif len (tokens) == 3: - if "long".find (text) == 0: + elif len(tokens) == 3: + if "long".find(text) == 0: return ["long"] - elif "short".find (text) == 0: + elif "short".find(text) == 0: return ["short"] - elif "time-format".find (text) == 0: + elif "time-format".find(text) == 0: return ["time-format "] return [] - def do_set (self, data): - tokens = split (data) + def do_set(self, data): + tokens = split(data) try: if tokens[0] == "time-format": - self.dispObject.do_setTimeFormat (tokens[1]) + self.dispObject.do_setTimeFormat(tokens[1]) except: pass - def do_id (self, data): - self.dataObject.do_id(data) - - def complete_schema (self, text, line, begidx, endidx): - tokens = split (line) - if len (tokens) > 2: + def complete_schema(self, text, line, begidx, endidx): + tokens = split(line) + if len(tokens) > 2: return [] - return self.dataObject.classCompletions (text) + return self.dataObject.classCompletions(text) + + def do_schema(self, data): + try: + self.dataObject.do_schema(data) + except Exception, e: + print "Exception in do_schema: %r" % e + + def do_agents(self, data): + try: + self.dataObject.do_agents(data) + except Exception, e: + print "Exception in do_agents: %r" % e - def do_schema (self, data): - self.dataObject.do_schema (data) + def do_id(self, data): + try: + self.dataObject.do_id(data) + except Exception, e: + print "Exception in do_id: %r" % e - def complete_list (self, text, line, begidx, endidx): - tokens = split (line) - if len (tokens) > 2: + def complete_list(self, text, line, begidx, endidx): + tokens = split(line) + if len(tokens) > 2: return [] - return self.dataObject.classCompletions (text) + return self.dataObject.classCompletions(text) - def do_list (self, data): - self.dataObject.do_list (data) + def do_list(self, data): + try: + self.dataObject.do_list(data) + except Exception, e: + print "Exception in do_list: %r" % e - def do_show (self, data): - self.dataObject.do_show (data) + def do_show(self, data): + try: + self.dataObject.do_show(data) + except Exception, e: + print "Exception in do_show: %r" % e - def do_call (self, data): + def do_call(self, data): try: - self.dataObject.do_call (data) - except ValueError, e: - print "ValueError:", e + self.dataObject.do_call(data) + except Exception, e: + print "Exception in do_call: %r", e - def do_EOF (self, data): + def do_EOF(self, data): print "quit" try: - self.dataObject.do_exit () + self.dataObject.do_exit() except: pass return True - def do_quit (self, data): + def do_quit(self, data): try: - self.dataObject.do_exit () + self.dataObject.do_exit() except: pass return True - def postcmd (self, stop, line): + def postcmd(self, stop, line): return stop - def postloop (self): + def postloop(self): print "Exiting..." - self.dataObject.close () + self.dataObject.close() + +#====================================================================================================== +# QmfData +#====================================================================================================== +class QmfData(Console): + """ + """ + def __init__(self, disp, url): + self.disp = disp + self.url = url + self.session = Session(self, manageConnections=True) + self.broker = self.session.addBroker(self.url) + self.lock = Lock() + self.connected = None + self.closing = None + self.first_connect = True + self.cli = None + self.idRegistry = IdRegistry() + self.objects = {} + + #======================= + # Methods to support CLI + #======================= + def setCli(self, cli): + self.cli = cli + + def close(self): + self.closing = True + self.session.delBroker(self.broker) + + def classCompletions(self, text): + pass + + def do_schema(self, data): + if data == "": + self.schemaSummary() + else: + self.schemaTable(data) + + def do_agents(self, data): + agents = self.session.getAgents() + rows = [] + for agent in agents: + version = 1 + if agent.isV2: + version = 2 + rows.append(("%d.%s" % (agent.getBrokerBank(), agent.getAgentBank()), agent.label, agent.epoch, version)) + self.disp.table("QMF Agents:", ("Agent Name", "Label", "Epoch", "QMF Version"), rows) + + def do_id(self, data): + tokens = data.split() + for token in tokens: + if not token.isdigit(): + print "Value %s is non-numeric" % token + return + title = "Translation of Display IDs:" + heads = ('DisplayID', 'Epoch', 'Agent', 'ObjectName') + if len(tokens) == 0: + tokens = self.idRegistry.getDisplayIds() + rows = [] + for token in tokens: + rows.append(self.idRegistry.getIdInfo(int(token))) + self.disp.table(title, heads, rows) + + def do_list(self, data): + tokens = data.split() + if len(tokens) == 0: + self.listClasses() + else: + self.listObjects(tokens) + + def do_show(self, data): + tokens = data.split() + if len(tokens) == 0: + print "Missing Class or ID" + return + key = self.classKeyByToken(tokens[0]) + if key: + self.showObjectsByKey(key) + elif tokens[0].isdigit(): + self.showObjectById(int(tokens[0])) + + def do_call(self, data): + tokens = data.split() + if len(tokens) < 2: + print "Not enough arguments supplied" + return + displayId = long(tokens[0]) + methodName = tokens[1] + args = tokens[2:] + + obj = None + try: + self.lock.acquire() + if displayId not in self.objects: + print "Unknown ID" + return + obj = self.objects[displayId] + finally: + self.lock.release() + + self.session._sendMethodRequest(self.broker, obj.getClassKey(), obj.getObjectId(), methodName, args) + + def do_exit(self): + pass + + #==================== + # Sub-Command Methods + #==================== + def schemaSummary(self, package_filter=None): + rows = [] + packages = self.session.getPackages() + for package in packages: + if package_filter and package_filter != package: + continue + keys = self.session.getClasses(package) + for key in keys: + kind = "object" + schema = self.session.getSchema(key) + if schema.kind == SchemaClass.CLASS_KIND_EVENT: + kind = "event" + if schema.kind == SchemaClass.CLASS_KIND_TABLE: + # + # Don't display event schemata. This will be a future feature. + # + rows.append((package, key.getClassName(), kind)) + self.disp.table("QMF Classes:", ("Package", "Name", "Kind"), rows) + + def schemaTable(self, text): + packages = self.session.getPackages() + if text in packages: + self.schemaSummary(package_filter=text) + for package in packages: + keys = self.session.getClasses(package) + for key in keys: + if text == key.getClassName() or text == package + ":" + key.getClassName(): + schema = self.session.getSchema(key) + if schema.kind == SchemaClass.CLASS_KIND_TABLE: + self.schemaObject(schema) + else: + self.schemaEvent(schema) + + def schemaObject(self, schema): + rows = [] + title = "Object Class: %s" % schema.__repr__() + heads = ("Element", "Type", "Access", "Unit", "Notes", "Description") + for prop in schema.getProperties(): + notes = "" + if prop.index : notes += "index " + if prop.optional : notes += "optional " + row = (prop.name, self.typeName(prop.type), self.accessName(prop.access), + self.notNone(prop.unit), notes, self.notNone(prop.desc)) + rows.append(row) + for stat in schema.getStatistics(): + row = (stat.name, self.typeName(stat.type), "", self.notNone(prop.unit), "", self.notNone(prop.desc)) + rows.append(row) + self.disp.table(title, heads, rows) + + for method in schema.methods: + rows = [] + heads = ("Argument", "Type", "Direction", "Unit", "Description") + title = " Method: %s" % method.name + for arg in method.arguments: + row = (arg.name, self.typeName(arg.type), arg.dir, self.notNone(arg.unit), self.notNone(arg.desc)) + rows.append(row) + print + self.disp.table(title, heads, rows) + + def schemaEvent(self, schema): + rows = [] + title = "Event Class: %s" % schema.__repr__() + heads = ("Element", "Type", "Unit", "Description") + for arg in schema.arguments: + row = (arg.name, self.typeName(arg.type), self.notNone(arg.unit), self.notNone(arg.desc)) + rows.append(row) + self.disp.table(title, heads, rows) + + def listClasses(self): + title = "Summary of Objects by Type:" + heads = ("Package", "Class", "Active", "Deleted") + rows = [] + totals = {} + try: + self.lock.acquire() + for dispId in self.objects: + obj = self.objects[dispId] + key = obj.getClassKey() + if key in totals: + stats = totals[key] + else: + stats = (0, 0) + if obj.isDeleted(): + stats = (stats[0], stats[1] + 1) + else: + stats = (stats[0] + 1, stats[1]) + totals[key] = stats + finally: + self.lock.release() + + for key in totals: + stats = totals[key] + rows.append((key.getPackageName(), key.getClassName(), stats[0], stats[1])) + self.disp.table(title, heads, rows) + + def listObjects(self, tokens): + ckey = self.classKeyByToken(tokens[0]) + show_deleted = True + if len(tokens) > 1 and tokens[1] == 'active': + show_deleted = None + heads = ("ID", "Created", "Destroyed", "Index") + rows = [] + try: + self.lock.acquire() + for dispId in self.objects: + obj = self.objects[dispId] + if obj.getClassKey() == ckey: + utime, ctime, dtime = obj.getTimestamps() + dtimestr = self.disp.timestamp(dtime) + if dtime == 0: + dtimestr = "-" + if dtime == 0 or (dtime > 0 and show_deleted): + row = (dispId, self.disp.timestamp(ctime), dtimestr, obj.getIndex()) + rows.append(row) + finally: + self.lock.release() + self.disp.table("Object Summary:", heads, rows) -def Usage (): + def showObjectsByKey(self, key): + pass + + def showObjectById(self, dispId): + heads = ("Attribute", str(dispId)) + rows = [] + try: + self.lock.acquire() + if dispId in self.objects: + obj = self.objects[dispId] + caption = "Object of type: %r" % obj.getClassKey() + for prop in obj.getProperties(): + row = (prop[0].name, "%r" % prop[1]) + rows.append(row) + for stat in obj.getStatistics(): + row = (stat[0].name, "%r" % stat[1]) + rows.append(row) + else: + print "No object found with ID %d" % dispId + finally: + self.lock.release() + self.disp.table(caption, heads, rows) + + def classKeyByToken(self, token): + """ + Given a token, find the matching class key (if found): + token formats: <class-name> + <package-name>:<class-name> + """ + pname = None + cname = None + parts = token.split(':') + if len(parts) == 1: + cname = parts[0] + elif len(parts) == 2: + pname = parts[0] + cname = parts[1] + else: + raise ValueError("Invalid Class Name: %s" % token) + + packages = self.session.getPackages() + for p in packages: + if pname == None or pname == p: + classes = self.session.getClasses(p) + for key in classes: + if key.getClassName() == cname: + return key + return None + + def typeName (self, typecode): + """ Convert type-codes to printable strings """ + if typecode == 1: return "uint8" + elif typecode == 2: return "uint16" + elif typecode == 3: return "uint32" + elif typecode == 4: return "uint64" + elif typecode == 5: return "bool" + elif typecode == 6: return "short-string" + elif typecode == 7: return "long-string" + elif typecode == 8: return "abs-time" + elif typecode == 9: return "delta-time" + elif typecode == 10: return "reference" + elif typecode == 11: return "boolean" + elif typecode == 12: return "float" + elif typecode == 13: return "double" + elif typecode == 14: return "uuid" + elif typecode == 15: return "field-table" + elif typecode == 16: return "int8" + elif typecode == 17: return "int16" + elif typecode == 18: return "int32" + elif typecode == 19: return "int64" + elif typecode == 20: return "object" + elif typecode == 21: return "list" + elif typecode == 22: return "array" + else: + raise ValueError ("Invalid type code: %s" % str(typecode)) + + def accessName (self, code): + """ Convert element access codes to printable strings """ + if code == '1': return "ReadCreate" + elif code == '2': return "ReadWrite" + elif code == '3': return "ReadOnly" + else: + raise ValueError ("Invalid access code: %s" % str(code)) + + def notNone (self, text): + if text == None: + return "" + else: + return text + + + #===================== + # Methods from Console + #===================== + def brokerConnected(self, broker): + """ Invoked when a connection is established to a broker """ + try: + self.lock.acquire() + self.connected = True + finally: + self.lock.release() + if not self.first_connect: + print "Broker connected:", broker + self.first_connect = None + + def brokerDisconnected(self, broker): + """ Invoked when the connection to a broker is lost """ + try: + self.lock.acquire() + self.connected = None + finally: + self.lock.release() + if not self.closing: + print "Broker disconnected:", broker + + def objectProps(self, broker, record): + """ Invoked when an object is updated. """ + oid = record.getObjectId() + dispId = self.idRegistry.displayId(oid) + try: + self.lock.acquire() + if dispId in self.objects: + self.objects[dispId].mergeUpdate(record) + else: + self.objects[dispId] = record + finally: + self.lock.release() + + def objectStats(self, broker, record): + """ Invoked when an object is updated. """ + oid = record.getObjectId() + dispId = self.idRegistry.displayId(oid) + try: + self.lock.acquire() + if dispId in self.objects: + self.objects[dispId].mergeUpdate(record) + finally: + self.lock.release() + + def event(self, broker, event): + """ Invoked when an event is raised. """ + pass + + def methodResponse(self, broker, seq, response): + print response + + +#====================================================================================================== +# IdRegistry +#====================================================================================================== +class IdRegistry(object): + """ + """ + def __init__(self): + self.next_display_id = 101 + self.oid_to_display = {} + self.display_to_oid = {} + self.lock = Lock() + + def displayId(self, oid): + try: + self.lock.acquire() + if oid in self.oid_to_display: + return self.oid_to_display[oid] + newId = self.next_display_id + self.next_display_id += 1 + self.oid_to_display[oid] = newId + self.display_to_oid[newId] = oid + return newId + finally: + self.lock.release() + + def objectId(self, displayId): + try: + self.lock.acquire() + if displayId in self.display_to_oid: + return self.display_to_oid[displayId] + return None + finally: + self.lock.release() + + def getDisplayIds(self): + result = [] + for displayId in self.display_to_oid: + result.append(str(displayId)) + return result + + def getIdInfo(self, displayId): + """ + Given a display ID, return a tuple of (displayID, bootSequence/Durable, AgentBank/Name, ObjectName) + """ + oid = self.objectId(displayId) + if oid == None: + return (displayId, "?", "unknown", "unknown") + bootSeq = oid.getSequence() + if bootSeq == 0: + bootSeq = '<durable>' + agent = oid.getAgentBank() + if agent == '0': + agent = 'Broker' + return (displayId, bootSeq, agent, oid.getObject()) + + +def Usage(): print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]" print - sys.exit (1) + sys.exit(1) #========================================================= # Main Program @@ -160,38 +607,30 @@ def Usage (): cargs = sys.argv[1:] _host = "localhost" -if len (cargs) > 0: +if len(cargs) > 0: _host = cargs[0] if _host[0] == '-': Usage() -disp = Display () +disp = Display() # Attempt to make a connection to the target broker try: - data = ManagementData (disp, _host) -except socket.error, e: - print "Socket Error (%s):" % _host, e[1] - sys.exit (1) -except IOError, e: - print "IOError: %d - %s: %s" % (e.errno, e.strerror, e.filename) - sys.exit (1) -except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) + data = QmfData(disp, _host) except Exception, e: - if str(e).find ("Exchange not found") != -1: + if str(e).find("Exchange not found") != -1: print "Management not enabled on broker: Use '-m yes' option on broker startup." else: print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) # Instantiate the CLI interpreter and launch it. -cli = Mcli (data, disp) -print ("Management Tool for QPID") +cli = Mcli(data, disp) +print("Management Tool for QPID") try: - cli.cmdloop () -except Closed, e: - print "Connection to Broker Lost:", e - sys.exit (1) + cli.cmdloop() +except KeyboardInterrupt: + print + print "Exiting..." + data.close() |
