summaryrefslogtreecommitdiff
path: root/python/qpid/qmfconsole.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/qmfconsole.py')
-rw-r--r--python/qpid/qmfconsole.py274
1 files changed, 137 insertions, 137 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index 3800e54b5b..c8035e87f2 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -49,7 +49,7 @@ class Console:
""" Invoked when a QMF package is discovered. """
pass
- def newClass(self, classKey):
+ def newClass(self, kind, classKey):
""" Invoked when a new class is discovered. Session.getSchema can be
used to obtain details about the class."""
pass
@@ -158,7 +158,7 @@ class Session:
raise Exception(broker.error)
self.brokers.append(broker)
- self.getObjects(broker=broker, cls="agent")
+ self.getObjects(broker=broker, _class="agent")
return broker
def delBroker(self, broker):
@@ -219,35 +219,36 @@ class Session:
The class for queried objects may be specified in one of the following ways:
- schema = <schema> - supply a schema object returned from getSchema
- key = <key> - supply a classKey from the list returned by getClasses
- cls = <name> - supply a class name as a string
+ _schema = <schema> - supply a schema object returned from getSchema.
+ _key = <key> - supply a classKey from the list returned by getClasses.
+ _class = <name> - supply a class name as a string. If the class name exists
+ in multiple packages, a _package argument may also be supplied.
If objects should be obtained from only one agent, use the following argument.
Otherwise, the query will go to all agents.
- agent = <agent> - supply an agent from the list returned by getAgents
+ _agent = <agent> - supply an agent from the list returned by getAgents.
If the get query is to be restricted to one broker (as opposed to all connected brokers),
add the following argument:
- broker = <broker> - supply a broker as returned by addBroker
+ _broker = <broker> - supply a broker as returned by addBroker.
If additional arguments are supplied, they are used as property selectors. For example,
if the argument name="test" is supplied, only objects whose "name" property is "test"
will be returned in the result.
"""
- if "broker" in kwargs:
+ if "_broker" in kwargs:
brokerList = []
- brokerList.append(kwargs["broker"])
+ brokerList.append(kwargs["_broker"])
else:
brokerList = self.brokers
for broker in brokerList:
broker._waitForStable()
agentList = []
- if "agent" in kwargs:
- agent = kwargs["agent"]
+ if "_agent" in kwargs:
+ agent = kwargs["_agent"]
if agent.broker not in brokerList:
raise Exception("Supplied agent is not accessible through the supplied broker")
agentList.append(agent)
@@ -257,11 +258,14 @@ class Session:
agentList.append(agent)
cname = None
- if "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey()
- elif "key" in kwargs: pname, cname, hash = kwargs["key"]
- elif "cls" in kwargs: pname, cname, hash = None, kwargs["cls"], None
+ if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey()
+ elif "_key" in kwargs: pname, cname, hash = kwargs["_key"]
+ elif "_class" in kwargs:
+ pname, cname, hash = None, kwargs["_class"], None
+ if "_package" in kwargs:
+ pname = kwargs["_package"]
if cname == None:
- raise Exception("No class supplied, use 'schema', 'key', or 'cls' argument")
+ raise Exception("No class supplied, use '_schema', '_key', or '_class' argument")
map = {}
map["_class"] = cname
if pname != None: map["_package"] = pname
@@ -269,7 +273,7 @@ class Session:
self.getSelect = []
for item in kwargs:
- if item != "schema" and item != "key" and item != "cls":
+ if item[0] != '_':
self.getSelect.append((item, kwargs[item]))
self.getResult = []
@@ -282,7 +286,7 @@ class Session:
self.cv.release()
broker._setHeader(sendCodec, 'G', seq)
sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank)
+ smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
broker._send(smsg)
starttime = time()
@@ -382,6 +386,7 @@ class Session:
self.cv.release()
def _handleClassInd(self, broker, codec, seq):
+ kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
@@ -431,17 +436,18 @@ class Session:
self.console.event(broker, event)
def _handleSchemaResp(self, broker, codec, seq):
+ kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
classKey = (pname, cname, hash)
- _class = SchemaClass(classKey, codec)
+ _class = SchemaClass(kind, classKey, codec)
self.cv.acquire()
self.packages[pname][(cname, hash)] = _class
self.cv.release()
broker._decOutstanding()
if self.console != None:
- self.console.newClass(classKey)
+ self.console.newClass(kind, classKey)
def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
pname = str(codec.read_str8())
@@ -485,7 +491,7 @@ class Session:
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
for key, value in self.getSelect:
- for prop, propval in object.properties:
+ for prop, propval in object.getProperties():
if key == prop.name and value != propval:
return False
return True
@@ -497,7 +503,7 @@ class Session:
elif typecode == 3: data = codec.read_uint32() # U32
elif typecode == 4: data = codec.read_uint64() # U64
elif typecode == 6: data = str(codec.read_str8()) # SSTR
- elif typecode == 7: data = codec.read_vbin32() # LSTR
+ elif typecode == 7: data = codec.read_str16() # LSTR
elif typecode == 8: data = codec.read_int64() # ABSTIME
elif typecode == 9: data = codec.read_uint64() # DELTATIME
elif typecode == 10: data = ObjectId(codec) # REF
@@ -521,7 +527,7 @@ class Session:
elif typecode == 3: codec.write_uint32 (long(value)) # U32
elif typecode == 4: codec.write_uint64 (long(value)) # U64
elif typecode == 6: codec.write_str8 (value) # SSTR
- elif typecode == 7: codec.write_vbin32 (value) # LSTR
+ elif typecode == 7: codec.write_str16 (value) # LSTR
elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME
elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME
elif typecode == 10: value.encode (codec) # REF
@@ -577,30 +583,42 @@ class ClassKey:
class SchemaClass:
""" """
- def __init__(self, key, codec):
+ CLASS_KIND_TABLE = 1
+ CLASS_KIND_EVENT = 2
+
+ def __init__(self, kind, key, codec):
+ self.kind = kind
self.classKey = key
self.properties = []
self.statistics = []
- self.methods = []
- self.events = []
-
- propCount = codec.read_uint16()
- statCount = codec.read_uint16()
- methodCount = codec.read_uint16()
- eventCount = codec.read_uint16()
-
- for idx in range(propCount):
- self.properties.append(SchemaProperty(codec))
- for idx in range(statCount):
- self.statistics.append(SchemaStatistic(codec))
- for idx in range(methodCount):
- self.methods.append(SchemaMethod(codec))
- for idx in range(eventCount):
- self.events.append(SchemaEvent(codec))
+ self.methods = []
+ self.arguments = []
+
+ if self.kind == self.CLASS_KIND_TABLE:
+ propCount = codec.read_uint16()
+ statCount = codec.read_uint16()
+ methodCount = codec.read_uint16()
+ for idx in range(propCount):
+ self.properties.append(SchemaProperty(codec))
+ for idx in range(statCount):
+ self.statistics.append(SchemaStatistic(codec))
+ for idx in range(methodCount):
+ self.methods.append(SchemaMethod(codec))
+
+ elif self.kind == self.CLASS_KIND_EVENT:
+ argCount = codec.read_uint16()
+ for idx in range(argCount):
+ self.arguments.append(SchemaArgument(codec, methodArg=False))
def __repr__(self):
pname, cname, hash = self.classKey
- result = "Class: %s:%s " % (pname, cname)
+ if self.kind == self.CLASS_KIND_TABLE:
+ kindStr = "Table"
+ elif self.kind == self.CLASS_KIND_EVENT:
+ kindStr = "Event"
+ else:
+ kindStr = "Unsupported"
+ result = "%s Class: %s:%s " % (kindStr, pname, cname)
result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash)
return result
@@ -620,9 +638,9 @@ class SchemaClass:
""" Return the list of methods for the class. """
return self.methods
- def getEvents(self):
+ def getArguments(self):
""" Return the list of events for the class. """
- return self.events
+ return self.arguments
class SchemaProperty:
""" """
@@ -693,33 +711,6 @@ class SchemaMethod:
result += ")"
return result
-class SchemaEvent:
- """ """
- def __init__(self, codec):
- map = codec.read_map()
- self.name = str(map["name"])
- argCount = map["argCount"]
- if "desc" in map:
- self.desc = str(map["desc"])
- else:
- self.desc = None
- self.arguments = []
-
- for idx in range(argCount):
- self.arguments.append(SchemaArgument(codec, methodArg=False))
-
- def __repr__(self):
- result = self.name + "("
- first = True
- for arg in self.arguments:
- if first:
- first = False
- else:
- result += ", "
- result += arg.name
- result += ")"
- return result
-
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -743,7 +734,7 @@ class SchemaArgument:
elif key == "desc" : self.desc = str(value)
elif key == "default" : self.default = str(value)
-class ObjectId(object):
+class ObjectId:
""" Object that represents QMF object identifiers """
def __init__(self, codec, first=0, second=0):
if codec:
@@ -800,80 +791,86 @@ class Object(object):
""" """
def __init__(self, session, broker, schema, codec, prop, stat):
""" """
- self.session = session
- self.broker = broker
- self.schema = schema
- self.currentTime = codec.read_uint64()
- self.createTime = codec.read_uint64()
- self.deleteTime = codec.read_uint64()
- self.objectId = ObjectId(codec)
- self.properties = []
- self.statistics = []
+ self._session = session
+ self._broker = broker
+ self._schema = schema
+ self._currentTime = codec.read_uint64()
+ self._createTime = codec.read_uint64()
+ self._deleteTime = codec.read_uint64()
+ self._objectId = ObjectId(codec)
+ self._properties = []
+ self._statistics = []
if prop:
notPresent = self._parsePresenceMasks(codec, schema)
for property in schema.getProperties():
if property.name in notPresent:
- self.properties.append((property, None))
+ self._properties.append((property, None))
else:
- self.properties.append((property, self.session._decodeValue(codec, property.type)))
+ self._properties.append((property, self._session._decodeValue(codec, property.type)))
if stat:
for statistic in schema.getStatistics():
- self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type)))
+ self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
def getObjectId(self):
""" Return the object identifier for this object """
- return self.objectId
+ return self._objectId
def getClassKey(self):
""" Return the class-key that references the schema describing this object. """
- return self.schema.getKey()
+ return self._schema.getKey()
def getSchema(self):
""" Return the schema that describes this object. """
- return self.schema
+ return self._schema
def getMethods(self):
""" Return a list of methods available for this object. """
- return self.schema.getMethods()
+ return self._schema.getMethods()
def getTimestamps(self):
""" Return the current, creation, and deletion times for this object. """
- return self.currentTime, self.createTime, self.deleteTime
+ return self._currentTime, self._createTime, self._deleteTime
def getIndex(self):
""" Return a string describing this object's primary key. """
result = ""
- for property, value in self.properties:
+ for property, value in self._properties:
if property.index:
if result != "":
result += ":"
result += str(value)
return result
+ def getProperties(self):
+ return self._properties
+
+ def getStatistics(self):
+ return self._statistics
+
def __repr__(self):
return self.getIndex()
def __getattr__(self, name):
- for method in self.schema.getMethods():
+ for method in self._schema.getMethods():
if name == method.name:
return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self.properties:
+ for property, value in self._properties:
if name == property.name:
return value
- for statistic, value in self.statistics:
+ for statistic, value in self._statistics:
if name == statistic.name:
return value
raise Exception("Type Object has no attribute '%s'" % name)
def _invoke(self, name, args, kwargs):
- for method in self.schema.getMethods():
+ for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
- sendCodec = Codec(self.broker.conn.spec)
- seq = self.session.seqMgr._reserve((self, method))
- self.broker._setHeader(sendCodec, 'M', seq)
- self.objectId.encode(sendCodec)
- pname, cname, hash = self.schema.getKey()
+ sendCodec = Codec(self._broker.conn.spec)
+ seq = self._session.seqMgr._reserve((self, method))
+ self._broker._setHeader(sendCodec, 'M', seq)
+ self._objectId.encode(sendCodec)
+ pname, cname, hash = self._schema.getKey()
sendCodec.write_str8(pname)
sendCodec.write_str8(cname)
sendCodec.write_bin128(hash)
@@ -888,29 +885,30 @@ class Object(object):
for arg in method.arguments:
if arg.dir.find("I") != -1:
- self.session._encodeValue(sendCodec, args[aIdx], arg.type)
+ self._session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
- smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
- self.broker.cv.acquire()
- self.broker.syncInFlight = True
- self.broker.cv.release()
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
+ (self._objectId.getBroker(), self._objectId.getBank()))
+ self._broker.cv.acquire()
+ self._broker.syncInFlight = True
+ self._broker.cv.release()
- self.broker._send(smsg)
+ self._broker._send(smsg)
- self.broker.cv.acquire()
+ self._broker.cv.acquire()
starttime = time()
- while self.broker.syncInFlight and self.broker.error == None:
- self.broker.cv.wait(self.broker.SYNC_TIME)
- if time() - starttime > self.broker.SYNC_TIME:
- self.broker.cv.release()
- self.session.seqMgr._release(seq)
+ while self._broker.syncInFlight and self._broker.error == None:
+ self._broker.cv.wait(self._broker.SYNC_TIME)
+ if time() - starttime > self._broker.SYNC_TIME:
+ self._broker.cv.release()
+ self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
- self.broker.cv.release()
- if self.broker.error != None:
- errorText = self.broker.error
- self.broker.error = None
+ self._broker.cv.release()
+ if self._broker.error != None:
+ errorText = self._broker.error
+ self._broker.error = None
raise Exception(errorText)
- return self.broker.syncResult
+ return self._broker.syncResult
raise Exception("Invalid Method (software defect) [%s]" % name)
def _parsePresenceMasks(self, codec, schema):
@@ -954,7 +952,7 @@ class Broker:
self.authUser = authUser
self.authPass = authPass
self.agents = {}
- self.agents[0] = Agent(self, 0, "BrokerAgent")
+ self.agents[0] = Agent(self, "1.0", "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
@@ -1040,14 +1038,15 @@ class Broker:
self.error = "Connect Failed %d - %s" % (e[0], e[1])
def _updateAgent(self, obj):
- if obj.deleteTime == 0:
- if obj.objectIdBank not in self.agents:
- agent = Agent(self, obj.objectIdBank, obj.label)
- self.agents[obj.objectIdBank] = agent
+ bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank)
+ if obj._deleteTime == 0:
+ if bankKey not in self.agents:
+ agent = Agent(self, bankKey, obj.label)
+ self.agents[bankKey] = agent
if self.session.console != None:
self.session.console.newAgent(agent)
else:
- agent = self.agents.pop(obj.objectIdBank, None)
+ agent = self.agents.pop(bankKey, None)
if agent != None and self.session.console != None:
self.session.console.delAgent(agent)
@@ -1055,7 +1054,7 @@ class Broker:
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
codec.write_uint8(ord('M'))
- codec.write_uint8(ord('1'))
+ codec.write_uint8(ord('2'))
codec.write_uint8(ord(opcode))
codec.write_uint32(seq)
@@ -1068,7 +1067,7 @@ class Broker:
if octet != 'M':
return None, None
octet = chr(codec.read_uint8())
- if octet != '1':
+ if octet != '2':
return None, None
opcode = chr(codec.read_uint8())
seq = codec.read_uint32()
@@ -1164,28 +1163,24 @@ class Agent:
self.label = label
def __repr__(self):
- return "Agent at bank %d (%s)" % (self.bank, self.label)
+ return "Agent at bank %s (%s)" % (self.bank, self.label)
class Event:
""" """
def __init__(self, session, codec):
self.session = session
- self.timestamp = codec.read_int64()
- self.objectId = ObjectId(codec)
pname = codec.read_str8()
cname = codec.read_str8()
hash = codec.read_bin128()
self.classKey = (pname, cname, hash)
- self.name = codec.read_str8()
+ self.timestamp = codec.read_int64()
+ self.schema = None
if pname in session.packages:
if (cname, hash) in session.packages[pname]:
- schema = session.packages[pname][(cname, hash)]
- for event in schema.getEvents():
- if event.name == self.name:
- self.schemaEvent = event
- self.arguments = {}
- for arg in event.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type)
+ self.schema = session.packages[pname][(cname, hash)]
+ self.arguments = {}
+ for arg in self.schema.arguments:
+ self.arguments[arg.name] = session._decodeValue(codec, arg.type)
def __repr__(self):
return self.getSyslogText()
@@ -1202,10 +1197,15 @@ class Event:
def getName(self):
return self.name
+ def getSchema(self):
+ return self.schema
+
def getSyslogText(self):
+ if self.schema == None:
+ return "<uninterpretable>"
out = strftime("%c", gmtime(self.timestamp / 1000000000))
- out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name
- for arg in self.schemaEvent.arguments:
+ out += " " + self.classKey[0] + ":" + self.classKey[1]
+ for arg in self.schema.arguments:
out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type)
return out
@@ -1247,8 +1247,8 @@ class DebugConsole(Console):
def newPackage(self, name):
print "newPackage:", name
- def newClass(self, classKey):
- print "newClass:", classKey
+ def newClass(self, kind, classKey):
+ print "newClass:", kind, classKey
def newAgent(self, agent):
print "newAgent:", agent