diff options
Diffstat (limited to 'python/qpid/qmfconsole.py')
-rw-r--r-- | python/qpid/qmfconsole.py | 274 |
1 files changed, 137 insertions, 137 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index 3800e54b5b..c8035e87f2 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -49,7 +49,7 @@ class Console: """ Invoked when a QMF package is discovered. """ pass - def newClass(self, classKey): + def newClass(self, kind, classKey): """ Invoked when a new class is discovered. Session.getSchema can be used to obtain details about the class.""" pass @@ -158,7 +158,7 @@ class Session: raise Exception(broker.error) self.brokers.append(broker) - self.getObjects(broker=broker, cls="agent") + self.getObjects(broker=broker, _class="agent") return broker def delBroker(self, broker): @@ -219,35 +219,36 @@ class Session: The class for queried objects may be specified in one of the following ways: - schema = <schema> - supply a schema object returned from getSchema - key = <key> - supply a classKey from the list returned by getClasses - cls = <name> - supply a class name as a string + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. If objects should be obtained from only one agent, use the following argument. Otherwise, the query will go to all agents. - agent = <agent> - supply an agent from the list returned by getAgents + _agent = <agent> - supply an agent from the list returned by getAgents. If the get query is to be restricted to one broker (as opposed to all connected brokers), add the following argument: - broker = <broker> - supply a broker as returned by addBroker + _broker = <broker> - supply a broker as returned by addBroker. If additional arguments are supplied, they are used as property selectors. For example, if the argument name="test" is supplied, only objects whose "name" property is "test" will be returned in the result. """ - if "broker" in kwargs: + if "_broker" in kwargs: brokerList = [] - brokerList.append(kwargs["broker"]) + brokerList.append(kwargs["_broker"]) else: brokerList = self.brokers for broker in brokerList: broker._waitForStable() agentList = [] - if "agent" in kwargs: - agent = kwargs["agent"] + if "_agent" in kwargs: + agent = kwargs["_agent"] if agent.broker not in brokerList: raise Exception("Supplied agent is not accessible through the supplied broker") agentList.append(agent) @@ -257,11 +258,14 @@ class Session: agentList.append(agent) cname = None - if "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey() - elif "key" in kwargs: pname, cname, hash = kwargs["key"] - elif "cls" in kwargs: pname, cname, hash = None, kwargs["cls"], None + if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey() + elif "_key" in kwargs: pname, cname, hash = kwargs["_key"] + elif "_class" in kwargs: + pname, cname, hash = None, kwargs["_class"], None + if "_package" in kwargs: + pname = kwargs["_package"] if cname == None: - raise Exception("No class supplied, use 'schema', 'key', or 'cls' argument") + raise Exception("No class supplied, use '_schema', '_key', or '_class' argument") map = {} map["_class"] = cname if pname != None: map["_package"] = pname @@ -269,7 +273,7 @@ class Session: self.getSelect = [] for item in kwargs: - if item != "schema" and item != "key" and item != "cls": + if item[0] != '_': self.getSelect.append((item, kwargs[item])) self.getResult = [] @@ -282,7 +286,7 @@ class Session: self.cv.release() broker._setHeader(sendCodec, 'G', seq) sendCodec.write_map(map) - smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank) + smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank) broker._send(smsg) starttime = time() @@ -382,6 +386,7 @@ class Session: self.cv.release() def _handleClassInd(self, broker, codec, seq): + kind = codec.read_uint8() pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() @@ -431,17 +436,18 @@ class Session: self.console.event(broker, event) def _handleSchemaResp(self, broker, codec, seq): + kind = codec.read_uint8() pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() classKey = (pname, cname, hash) - _class = SchemaClass(classKey, codec) + _class = SchemaClass(kind, classKey, codec) self.cv.acquire() self.packages[pname][(cname, hash)] = _class self.cv.release() broker._decOutstanding() if self.console != None: - self.console.newClass(classKey) + self.console.newClass(kind, classKey) def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): pname = str(codec.read_str8()) @@ -485,7 +491,7 @@ class Session: def _selectMatch(self, object): """ Check the object against self.getSelect to check for a match """ for key, value in self.getSelect: - for prop, propval in object.properties: + for prop, propval in object.getProperties(): if key == prop.name and value != propval: return False return True @@ -497,7 +503,7 @@ class Session: elif typecode == 3: data = codec.read_uint32() # U32 elif typecode == 4: data = codec.read_uint64() # U64 elif typecode == 6: data = str(codec.read_str8()) # SSTR - elif typecode == 7: data = codec.read_vbin32() # LSTR + elif typecode == 7: data = codec.read_str16() # LSTR elif typecode == 8: data = codec.read_int64() # ABSTIME elif typecode == 9: data = codec.read_uint64() # DELTATIME elif typecode == 10: data = ObjectId(codec) # REF @@ -521,7 +527,7 @@ class Session: elif typecode == 3: codec.write_uint32 (long(value)) # U32 elif typecode == 4: codec.write_uint64 (long(value)) # U64 elif typecode == 6: codec.write_str8 (value) # SSTR - elif typecode == 7: codec.write_vbin32 (value) # LSTR + elif typecode == 7: codec.write_str16 (value) # LSTR elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME elif typecode == 10: value.encode (codec) # REF @@ -577,30 +583,42 @@ class ClassKey: class SchemaClass: """ """ - def __init__(self, key, codec): + CLASS_KIND_TABLE = 1 + CLASS_KIND_EVENT = 2 + + def __init__(self, kind, key, codec): + self.kind = kind self.classKey = key self.properties = [] self.statistics = [] - self.methods = [] - self.events = [] - - propCount = codec.read_uint16() - statCount = codec.read_uint16() - methodCount = codec.read_uint16() - eventCount = codec.read_uint16() - - for idx in range(propCount): - self.properties.append(SchemaProperty(codec)) - for idx in range(statCount): - self.statistics.append(SchemaStatistic(codec)) - for idx in range(methodCount): - self.methods.append(SchemaMethod(codec)) - for idx in range(eventCount): - self.events.append(SchemaEvent(codec)) + self.methods = [] + self.arguments = [] + + if self.kind == self.CLASS_KIND_TABLE: + propCount = codec.read_uint16() + statCount = codec.read_uint16() + methodCount = codec.read_uint16() + for idx in range(propCount): + self.properties.append(SchemaProperty(codec)) + for idx in range(statCount): + self.statistics.append(SchemaStatistic(codec)) + for idx in range(methodCount): + self.methods.append(SchemaMethod(codec)) + + elif self.kind == self.CLASS_KIND_EVENT: + argCount = codec.read_uint16() + for idx in range(argCount): + self.arguments.append(SchemaArgument(codec, methodArg=False)) def __repr__(self): pname, cname, hash = self.classKey - result = "Class: %s:%s " % (pname, cname) + if self.kind == self.CLASS_KIND_TABLE: + kindStr = "Table" + elif self.kind == self.CLASS_KIND_EVENT: + kindStr = "Event" + else: + kindStr = "Unsupported" + result = "%s Class: %s:%s " % (kindStr, pname, cname) result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash) return result @@ -620,9 +638,9 @@ class SchemaClass: """ Return the list of methods for the class. """ return self.methods - def getEvents(self): + def getArguments(self): """ Return the list of events for the class. """ - return self.events + return self.arguments class SchemaProperty: """ """ @@ -693,33 +711,6 @@ class SchemaMethod: result += ")" return result -class SchemaEvent: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - argCount = map["argCount"] - if "desc" in map: - self.desc = str(map["desc"]) - else: - self.desc = None - self.arguments = [] - - for idx in range(argCount): - self.arguments.append(SchemaArgument(codec, methodArg=False)) - - def __repr__(self): - result = self.name + "(" - first = True - for arg in self.arguments: - if first: - first = False - else: - result += ", " - result += arg.name - result += ")" - return result - class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -743,7 +734,7 @@ class SchemaArgument: elif key == "desc" : self.desc = str(value) elif key == "default" : self.default = str(value) -class ObjectId(object): +class ObjectId: """ Object that represents QMF object identifiers """ def __init__(self, codec, first=0, second=0): if codec: @@ -800,80 +791,86 @@ class Object(object): """ """ def __init__(self, session, broker, schema, codec, prop, stat): """ """ - self.session = session - self.broker = broker - self.schema = schema - self.currentTime = codec.read_uint64() - self.createTime = codec.read_uint64() - self.deleteTime = codec.read_uint64() - self.objectId = ObjectId(codec) - self.properties = [] - self.statistics = [] + self._session = session + self._broker = broker + self._schema = schema + self._currentTime = codec.read_uint64() + self._createTime = codec.read_uint64() + self._deleteTime = codec.read_uint64() + self._objectId = ObjectId(codec) + self._properties = [] + self._statistics = [] if prop: notPresent = self._parsePresenceMasks(codec, schema) for property in schema.getProperties(): if property.name in notPresent: - self.properties.append((property, None)) + self._properties.append((property, None)) else: - self.properties.append((property, self.session._decodeValue(codec, property.type))) + self._properties.append((property, self._session._decodeValue(codec, property.type))) if stat: for statistic in schema.getStatistics(): - self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type))) + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) def getObjectId(self): """ Return the object identifier for this object """ - return self.objectId + return self._objectId def getClassKey(self): """ Return the class-key that references the schema describing this object. """ - return self.schema.getKey() + return self._schema.getKey() def getSchema(self): """ Return the schema that describes this object. """ - return self.schema + return self._schema def getMethods(self): """ Return a list of methods available for this object. """ - return self.schema.getMethods() + return self._schema.getMethods() def getTimestamps(self): """ Return the current, creation, and deletion times for this object. """ - return self.currentTime, self.createTime, self.deleteTime + return self._currentTime, self._createTime, self._deleteTime def getIndex(self): """ Return a string describing this object's primary key. """ result = "" - for property, value in self.properties: + for property, value in self._properties: if property.index: if result != "": result += ":" result += str(value) return result + def getProperties(self): + return self._properties + + def getStatistics(self): + return self._statistics + def __repr__(self): return self.getIndex() def __getattr__(self, name): - for method in self.schema.getMethods(): + for method in self._schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for property, value in self.properties: + for property, value in self._properties: if name == property.name: return value - for statistic, value in self.statistics: + for statistic, value in self._statistics: if name == statistic.name: return value raise Exception("Type Object has no attribute '%s'" % name) def _invoke(self, name, args, kwargs): - for method in self.schema.getMethods(): + for method in self._schema.getMethods(): if name == method.name: aIdx = 0 - sendCodec = Codec(self.broker.conn.spec) - seq = self.session.seqMgr._reserve((self, method)) - self.broker._setHeader(sendCodec, 'M', seq) - self.objectId.encode(sendCodec) - pname, cname, hash = self.schema.getKey() + sendCodec = Codec(self._broker.conn.spec) + seq = self._session.seqMgr._reserve((self, method)) + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + pname, cname, hash = self._schema.getKey() sendCodec.write_str8(pname) sendCodec.write_str8(cname) sendCodec.write_bin128(hash) @@ -888,29 +885,30 @@ class Object(object): for arg in method.arguments: if arg.dir.find("I") != -1: - self.session._encodeValue(sendCodec, args[aIdx], arg.type) + self._session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 - smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank())) - self.broker.cv.acquire() - self.broker.syncInFlight = True - self.broker.cv.release() + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % + (self._objectId.getBroker(), self._objectId.getBank())) + self._broker.cv.acquire() + self._broker.syncInFlight = True + self._broker.cv.release() - self.broker._send(smsg) + self._broker._send(smsg) - self.broker.cv.acquire() + self._broker.cv.acquire() starttime = time() - while self.broker.syncInFlight and self.broker.error == None: - self.broker.cv.wait(self.broker.SYNC_TIME) - if time() - starttime > self.broker.SYNC_TIME: - self.broker.cv.release() - self.session.seqMgr._release(seq) + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(self._broker.SYNC_TIME) + if time() - starttime > self._broker.SYNC_TIME: + self._broker.cv.release() + self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") - self.broker.cv.release() - if self.broker.error != None: - errorText = self.broker.error - self.broker.error = None + self._broker.cv.release() + if self._broker.error != None: + errorText = self._broker.error + self._broker.error = None raise Exception(errorText) - return self.broker.syncResult + return self._broker.syncResult raise Exception("Invalid Method (software defect) [%s]" % name) def _parsePresenceMasks(self, codec, schema): @@ -954,7 +952,7 @@ class Broker: self.authUser = authUser self.authPass = authPass self.agents = {} - self.agents[0] = Agent(self, 0, "BrokerAgent") + self.agents[0] = Agent(self, "1.0", "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False @@ -1040,14 +1038,15 @@ class Broker: self.error = "Connect Failed %d - %s" % (e[0], e[1]) def _updateAgent(self, obj): - if obj.deleteTime == 0: - if obj.objectIdBank not in self.agents: - agent = Agent(self, obj.objectIdBank, obj.label) - self.agents[obj.objectIdBank] = agent + bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank) + if obj._deleteTime == 0: + if bankKey not in self.agents: + agent = Agent(self, bankKey, obj.label) + self.agents[bankKey] = agent if self.session.console != None: self.session.console.newAgent(agent) else: - agent = self.agents.pop(obj.objectIdBank, None) + agent = self.agents.pop(bankKey, None) if agent != None and self.session.console != None: self.session.console.delAgent(agent) @@ -1055,7 +1054,7 @@ class Broker: """ Compose the header of a management message. """ codec.write_uint8(ord('A')) codec.write_uint8(ord('M')) - codec.write_uint8(ord('1')) + codec.write_uint8(ord('2')) codec.write_uint8(ord(opcode)) codec.write_uint32(seq) @@ -1068,7 +1067,7 @@ class Broker: if octet != 'M': return None, None octet = chr(codec.read_uint8()) - if octet != '1': + if octet != '2': return None, None opcode = chr(codec.read_uint8()) seq = codec.read_uint32() @@ -1164,28 +1163,24 @@ class Agent: self.label = label def __repr__(self): - return "Agent at bank %d (%s)" % (self.bank, self.label) + return "Agent at bank %s (%s)" % (self.bank, self.label) class Event: """ """ def __init__(self, session, codec): self.session = session - self.timestamp = codec.read_int64() - self.objectId = ObjectId(codec) pname = codec.read_str8() cname = codec.read_str8() hash = codec.read_bin128() self.classKey = (pname, cname, hash) - self.name = codec.read_str8() + self.timestamp = codec.read_int64() + self.schema = None if pname in session.packages: if (cname, hash) in session.packages[pname]: - schema = session.packages[pname][(cname, hash)] - for event in schema.getEvents(): - if event.name == self.name: - self.schemaEvent = event - self.arguments = {} - for arg in event.arguments: - self.arguments[arg.name] = session._decodeValue(codec, arg.type) + self.schema = session.packages[pname][(cname, hash)] + self.arguments = {} + for arg in self.schema.arguments: + self.arguments[arg.name] = session._decodeValue(codec, arg.type) def __repr__(self): return self.getSyslogText() @@ -1202,10 +1197,15 @@ class Event: def getName(self): return self.name + def getSchema(self): + return self.schema + def getSyslogText(self): + if self.schema == None: + return "<uninterpretable>" out = strftime("%c", gmtime(self.timestamp / 1000000000)) - out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name - for arg in self.schemaEvent.arguments: + out += " " + self.classKey[0] + ":" + self.classKey[1] + for arg in self.schema.arguments: out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type) return out @@ -1247,8 +1247,8 @@ class DebugConsole(Console): def newPackage(self, name): print "newPackage:", name - def newClass(self, classKey): - print "newClass:", classKey + def newClass(self, kind, classKey): + print "newClass:", kind, classKey def newAgent(self, agent): print "newAgent:", agent |