diff options
| author | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
| commit | 9d199b74aee76859480a7ee92d95c6db42028b43 (patch) | |
| tree | ca09aace4aaac2afa9650cc78833d30b056313a9 /python | |
| parent | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (diff) | |
| download | qpid-python-9d199b74aee76859480a7ee92d95c6db42028b43.tar.gz | |
QPID-1327 - Event support for Management
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rwxr-xr-x | python/commands/qpid-config | 22 | ||||
| -rwxr-xr-x | python/commands/qpid-printevents | 5 | ||||
| -rwxr-xr-x | python/commands/qpid-route | 24 | ||||
| -rw-r--r-- | python/qpid/management.py | 52 | ||||
| -rw-r--r-- | python/qpid/managementdata.py | 4 | ||||
| -rw-r--r-- | python/qpid/qmfconsole.py | 274 | ||||
| -rw-r--r-- | python/tests_0-10/management.py | 34 |
7 files changed, 194 insertions, 221 deletions
diff --git a/python/commands/qpid-config b/python/commands/qpid-config index 13b489abae..8b011778d6 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -84,8 +84,8 @@ class BrokerManager: self.qmf.delBroker(self.broker) def Overview (self): - exchanges = self.qmf.getObjects(cls="exchange") - queues = self.qmf.getObjects(cls="queue") + exchanges = self.qmf.getObjects(_class="exchange") + queues = self.qmf.getObjects(_class="queue") print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: @@ -106,7 +106,7 @@ class BrokerManager: print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): - exchanges = self.qmf.getObjects(cls="exchange") + exchanges = self.qmf.getObjects(_class="exchange") print "Durable Type Bindings Exchange Name" print "=======================================================" for ex in exchanges: @@ -114,9 +114,9 @@ class BrokerManager: print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name) def ExchangeListRecurse (self, filter): - exchanges = self.qmf.getObjects(cls="exchange") - bindings = self.qmf.getObjects(cls="binding") - queues = self.qmf.getObjects(cls="queue") + exchanges = self.qmf.getObjects(_class="exchange") + bindings = self.qmf.getObjects(_class="binding") + queues = self.qmf.getObjects(_class="queue") for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) @@ -130,8 +130,8 @@ class BrokerManager: def QueueList (self, filter): - queues = self.qmf.getObjects(cls="queue") - journals = self.qmf.getObjects(cls="journal") + queues = self.qmf.getObjects(_class="queue") + journals = self.qmf.getObjects(_class="journal") print " Store Size" print "Durable AutoDel Excl Bindings (files x file pages) Queue Name" print "===========================================================================================" @@ -151,9 +151,9 @@ class BrokerManager: YN (q.exclusive), q.bindingCount, q.name) def QueueListRecurse (self, filter): - exchanges = self.qmf.getObjects(cls="exchange") - bindings = self.qmf.getObjects(cls="binding") - queues = self.qmf.getObjects(cls="queue") + exchanges = self.qmf.getObjects(_class="exchange") + bindings = self.qmf.getObjects(_class="binding") + queues = self.qmf.getObjects(_class="queue") for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name diff --git a/python/commands/qpid-printevents b/python/commands/qpid-printevents index 970607c797..6efd472221 100755 --- a/python/commands/qpid-printevents +++ b/python/commands/qpid-printevents @@ -30,16 +30,13 @@ class EventConsole(Console): def event(self, broker, event): print event - def heartbeat(self, agent, timestamp): - print "Heartbeat" - ## ## Main Program ## def main(): _usage = "%prog [options] [broker-addr]..." _description = \ -"""Collect and print events from one of more Qpid message brokers. If no broker-addr is +"""Collect and print events from one or more Qpid message brokers. If no broker-addr is supplied, %prog will connect to 'localhost:5672'. broker-addr is of the form: [username/password@] hostname | ip-address [:<port>] ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost diff --git a/python/commands/qpid-route b/python/commands/qpid-route index f9f938cdec..4dadcd543b 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -62,7 +62,7 @@ class RouteManager: self.qmf.delBroker(self.broker) def getLink (self): - links = self.qmf.getObjects(cls="link") + links = self.qmf.getObjects(_class="link") for link in links: if "%s:%d" % (link.host, link.port) == self.src.name (): return link @@ -74,7 +74,7 @@ class RouteManager: print "Linking broker to itself is not permitted" sys.exit(1) - brokers = self.qmf.getObjects(cls="broker") + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link != None: @@ -92,7 +92,7 @@ class RouteManager: def DelLink (self, srcBroker): self.src = qmfconsole.BrokerURL(srcBroker) - brokers = self.qmf.getObjects(cls="broker") + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link == None: @@ -103,7 +103,7 @@ class RouteManager: print "Close method returned:", res.status, res.text def ListLinks (self): - links = self.qmf.getObjects(cls="link") + links = self.qmf.getObjects(_class="link") if len(links) == 0: print "No Links Found" else: @@ -119,7 +119,7 @@ class RouteManager: if self.dest.name() == self.src.name(): raise Exception("Linking broker to itself is not permitted") - brokers = self.qmf.getObjects(cls="broker") + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() @@ -140,7 +140,7 @@ class RouteManager: if link == None: raise Exception("Protocol Error - Missing link ID") - bridges = self.qmf.getObjects(cls="bridge") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.key == routingKey: @@ -164,7 +164,7 @@ class RouteManager: raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name())) sys.exit (0) - bridges = self.qmf.getObjects(cls="bridge") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey: if _verbose: @@ -186,8 +186,8 @@ class RouteManager: raise Exception("Route not found") def ListRoutes (self): - links = self.qmf.getObjects(cls="link") - bridges = self.qmf.getObjects(cls="bridge") + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: myLink = None @@ -199,8 +199,8 @@ class RouteManager: print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) def ClearAllRoutes (self): - links = self.qmf.getObjects(cls="link") - bridges = self.qmf.getObjects(cls="bridge") + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if _verbose: @@ -218,7 +218,7 @@ class RouteManager: print "Ok" if _dellink: - links = self.qmf.getObjects(cls="link") + links = self.qmf.getObjects(_class="link") for link in links: if _verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), diff --git a/python/qpid/management.py b/python/qpid/management.py index 8d8339b2c6..81d9dbe030 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -285,7 +285,7 @@ class managementClient: ft = {} ft["_class"] = className codec.write_map (ft) - msg = channel.message(codec.encoded, routing_key="agent.%d" % bank) + msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank) channel.send ("qpid.management", msg) def syncWaitForStable (self, channel): @@ -398,7 +398,7 @@ class managementClient: """ Compose the header of a management message. """ codec.write_uint8 (ord ('A')) codec.write_uint8 (ord ('M')) - codec.write_uint8 (ord ('1')) + codec.write_uint8 (ord ('2')) codec.write_uint8 (opcode) codec.write_uint32 (seq) @@ -412,7 +412,7 @@ class managementClient: if octet != 'M': return None octet = chr (codec.read_uint8 ()) - if octet != '1': + if octet != '2': return None opcode = chr (codec.read_uint8 ()) seq = codec.read_uint32 () @@ -433,7 +433,7 @@ class managementClient: elif typecode == 6: codec.write_str8 (value) elif typecode == 7: - codec.write_vbin32 (value) + codec.write_str16 (value) elif typecode == 8: # ABSTIME codec.write_uint64 (long (value)) elif typecode == 9: # DELTATIME @@ -476,7 +476,7 @@ class managementClient: elif typecode == 6: data = str (codec.read_str8 ()) elif typecode == 7: - data = codec.read_vbin32 () + data = codec.read_str16 () elif typecode == 8: # ABSTIME data = codec.read_uint64 () elif typecode == 9: # DELTATIME @@ -604,6 +604,9 @@ class managementClient: ch.send ("qpid.management", smsg) def handleClassInd (self, ch, codec): + kind = codec.read_uint8() + if kind != 1: # This API doesn't handle new-style events + return pname = str (codec.read_str8()) cname = str (codec.read_str8()) hash = codec.read_bin128() @@ -656,13 +659,15 @@ class managementClient: def parseSchema (self, ch, codec): """ Parse a received schema-description message. """ self.decOutstanding (ch) + kind = codec.read_uint8() + if kind != 1: # This API doesn't handle new-style events + return packageName = str (codec.read_str8 ()) className = str (codec.read_str8 ()) hash = codec.read_bin128 () configCount = codec.read_uint16 () instCount = codec.read_uint16 () methodCount = codec.read_uint16 () - eventCount = codec.read_uint16 () if packageName not in self.packages: return @@ -676,7 +681,6 @@ class managementClient: configs = [] insts = [] methods = {} - events = {} configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) insts.append (("id", 4, None, None)) @@ -765,42 +769,14 @@ class managementClient: args.append (arg) methods[mname] = (mdesc, args) - for idx in range (eventCount): - ft = codec.read_map () - ename = str (ft["name"]) - argCount = ft["argCount"] - if "desc" in ft: - edesc = str (ft["desc"]) - else: - edesc = None - - args = [] - for aidx in range (argCount): - ft = codec.read_map () - name = str (ft["name"]) - type = ft["type"] - unit = None - desc = None - - for key, value in ft.items (): - if key == "unit": - unit = str (value) - elif key == "desc": - desc = str (value) - - arg = (name, type, unit, desc) - args.append (arg) - events[ename] = (edesc, args) - schemaClass = {} schemaClass['C'] = configs schemaClass['I'] = insts schemaClass['M'] = methods - schemaClass['E'] = events self.schema[classKey] = schemaClass if self.schemaCb != None: - self.schemaCb (ch.context, classKey, configs, insts, methods, events) + self.schemaCb (ch.context, classKey, configs, insts, methods, {}) def parsePresenceMasks(self, codec, schemaClass): """ Generate a list of not-present properties """ @@ -896,7 +872,7 @@ class managementClient: codec.write_str8 (classId[1]) codec.write_bin128 (classId[2]) codec.write_str8 (methodName) - bank = objId.getBank() + bank = "%d.%d" % (objId.getBroker(), objId.getBank()) # Encode args according to schema if classId not in self.schema: @@ -926,5 +902,5 @@ class managementClient: packageName = classId[0] className = classId[1] - msg = channel.message(codec.encoded, "agent." + str(bank)) + msg = channel.message(codec.encoded, "agent." + bank) channel.send ("qpid.management", msg) diff --git a/python/qpid/managementdata.py b/python/qpid/managementdata.py index d86dd3a360..2bf66a5e5b 100644 --- a/python/qpid/managementdata.py +++ b/python/qpid/managementdata.py @@ -546,10 +546,10 @@ class ManagementData: for classKey in sorted: tuple = self.schema[classKey] row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]), - len (tuple[2]), len (tuple[3])) + len (tuple[2])) rows.append (row) self.disp.table ("Classes in Schema:", - ("Class", "Properties", "Statistics", "Methods", "Events"), + ("Class", "Properties", "Statistics", "Methods"), rows) finally: self.lock.release () diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index 3800e54b5b..c8035e87f2 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -49,7 +49,7 @@ class Console: """ Invoked when a QMF package is discovered. """ pass - def newClass(self, classKey): + def newClass(self, kind, classKey): """ Invoked when a new class is discovered. Session.getSchema can be used to obtain details about the class.""" pass @@ -158,7 +158,7 @@ class Session: raise Exception(broker.error) self.brokers.append(broker) - self.getObjects(broker=broker, cls="agent") + self.getObjects(broker=broker, _class="agent") return broker def delBroker(self, broker): @@ -219,35 +219,36 @@ class Session: The class for queried objects may be specified in one of the following ways: - schema = <schema> - supply a schema object returned from getSchema - key = <key> - supply a classKey from the list returned by getClasses - cls = <name> - supply a class name as a string + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. If objects should be obtained from only one agent, use the following argument. Otherwise, the query will go to all agents. - agent = <agent> - supply an agent from the list returned by getAgents + _agent = <agent> - supply an agent from the list returned by getAgents. If the get query is to be restricted to one broker (as opposed to all connected brokers), add the following argument: - broker = <broker> - supply a broker as returned by addBroker + _broker = <broker> - supply a broker as returned by addBroker. If additional arguments are supplied, they are used as property selectors. For example, if the argument name="test" is supplied, only objects whose "name" property is "test" will be returned in the result. """ - if "broker" in kwargs: + if "_broker" in kwargs: brokerList = [] - brokerList.append(kwargs["broker"]) + brokerList.append(kwargs["_broker"]) else: brokerList = self.brokers for broker in brokerList: broker._waitForStable() agentList = [] - if "agent" in kwargs: - agent = kwargs["agent"] + if "_agent" in kwargs: + agent = kwargs["_agent"] if agent.broker not in brokerList: raise Exception("Supplied agent is not accessible through the supplied broker") agentList.append(agent) @@ -257,11 +258,14 @@ class Session: agentList.append(agent) cname = None - if "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey() - elif "key" in kwargs: pname, cname, hash = kwargs["key"] - elif "cls" in kwargs: pname, cname, hash = None, kwargs["cls"], None + if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey() + elif "_key" in kwargs: pname, cname, hash = kwargs["_key"] + elif "_class" in kwargs: + pname, cname, hash = None, kwargs["_class"], None + if "_package" in kwargs: + pname = kwargs["_package"] if cname == None: - raise Exception("No class supplied, use 'schema', 'key', or 'cls' argument") + raise Exception("No class supplied, use '_schema', '_key', or '_class' argument") map = {} map["_class"] = cname if pname != None: map["_package"] = pname @@ -269,7 +273,7 @@ class Session: self.getSelect = [] for item in kwargs: - if item != "schema" and item != "key" and item != "cls": + if item[0] != '_': self.getSelect.append((item, kwargs[item])) self.getResult = [] @@ -282,7 +286,7 @@ class Session: self.cv.release() broker._setHeader(sendCodec, 'G', seq) sendCodec.write_map(map) - smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank) + smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank) broker._send(smsg) starttime = time() @@ -382,6 +386,7 @@ class Session: self.cv.release() def _handleClassInd(self, broker, codec, seq): + kind = codec.read_uint8() pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() @@ -431,17 +436,18 @@ class Session: self.console.event(broker, event) def _handleSchemaResp(self, broker, codec, seq): + kind = codec.read_uint8() pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() classKey = (pname, cname, hash) - _class = SchemaClass(classKey, codec) + _class = SchemaClass(kind, classKey, codec) self.cv.acquire() self.packages[pname][(cname, hash)] = _class self.cv.release() broker._decOutstanding() if self.console != None: - self.console.newClass(classKey) + self.console.newClass(kind, classKey) def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): pname = str(codec.read_str8()) @@ -485,7 +491,7 @@ class Session: def _selectMatch(self, object): """ Check the object against self.getSelect to check for a match """ for key, value in self.getSelect: - for prop, propval in object.properties: + for prop, propval in object.getProperties(): if key == prop.name and value != propval: return False return True @@ -497,7 +503,7 @@ class Session: elif typecode == 3: data = codec.read_uint32() # U32 elif typecode == 4: data = codec.read_uint64() # U64 elif typecode == 6: data = str(codec.read_str8()) # SSTR - elif typecode == 7: data = codec.read_vbin32() # LSTR + elif typecode == 7: data = codec.read_str16() # LSTR elif typecode == 8: data = codec.read_int64() # ABSTIME elif typecode == 9: data = codec.read_uint64() # DELTATIME elif typecode == 10: data = ObjectId(codec) # REF @@ -521,7 +527,7 @@ class Session: elif typecode == 3: codec.write_uint32 (long(value)) # U32 elif typecode == 4: codec.write_uint64 (long(value)) # U64 elif typecode == 6: codec.write_str8 (value) # SSTR - elif typecode == 7: codec.write_vbin32 (value) # LSTR + elif typecode == 7: codec.write_str16 (value) # LSTR elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME elif typecode == 10: value.encode (codec) # REF @@ -577,30 +583,42 @@ class ClassKey: class SchemaClass: """ """ - def __init__(self, key, codec): + CLASS_KIND_TABLE = 1 + CLASS_KIND_EVENT = 2 + + def __init__(self, kind, key, codec): + self.kind = kind self.classKey = key self.properties = [] self.statistics = [] - self.methods = [] - self.events = [] - - propCount = codec.read_uint16() - statCount = codec.read_uint16() - methodCount = codec.read_uint16() - eventCount = codec.read_uint16() - - for idx in range(propCount): - self.properties.append(SchemaProperty(codec)) - for idx in range(statCount): - self.statistics.append(SchemaStatistic(codec)) - for idx in range(methodCount): - self.methods.append(SchemaMethod(codec)) - for idx in range(eventCount): - self.events.append(SchemaEvent(codec)) + self.methods = [] + self.arguments = [] + + if self.kind == self.CLASS_KIND_TABLE: + propCount = codec.read_uint16() + statCount = codec.read_uint16() + methodCount = codec.read_uint16() + for idx in range(propCount): + self.properties.append(SchemaProperty(codec)) + for idx in range(statCount): + self.statistics.append(SchemaStatistic(codec)) + for idx in range(methodCount): + self.methods.append(SchemaMethod(codec)) + + elif self.kind == self.CLASS_KIND_EVENT: + argCount = codec.read_uint16() + for idx in range(argCount): + self.arguments.append(SchemaArgument(codec, methodArg=False)) def __repr__(self): pname, cname, hash = self.classKey - result = "Class: %s:%s " % (pname, cname) + if self.kind == self.CLASS_KIND_TABLE: + kindStr = "Table" + elif self.kind == self.CLASS_KIND_EVENT: + kindStr = "Event" + else: + kindStr = "Unsupported" + result = "%s Class: %s:%s " % (kindStr, pname, cname) result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash) return result @@ -620,9 +638,9 @@ class SchemaClass: """ Return the list of methods for the class. """ return self.methods - def getEvents(self): + def getArguments(self): """ Return the list of events for the class. """ - return self.events + return self.arguments class SchemaProperty: """ """ @@ -693,33 +711,6 @@ class SchemaMethod: result += ")" return result -class SchemaEvent: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - argCount = map["argCount"] - if "desc" in map: - self.desc = str(map["desc"]) - else: - self.desc = None - self.arguments = [] - - for idx in range(argCount): - self.arguments.append(SchemaArgument(codec, methodArg=False)) - - def __repr__(self): - result = self.name + "(" - first = True - for arg in self.arguments: - if first: - first = False - else: - result += ", " - result += arg.name - result += ")" - return result - class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -743,7 +734,7 @@ class SchemaArgument: elif key == "desc" : self.desc = str(value) elif key == "default" : self.default = str(value) -class ObjectId(object): +class ObjectId: """ Object that represents QMF object identifiers """ def __init__(self, codec, first=0, second=0): if codec: @@ -800,80 +791,86 @@ class Object(object): """ """ def __init__(self, session, broker, schema, codec, prop, stat): """ """ - self.session = session - self.broker = broker - self.schema = schema - self.currentTime = codec.read_uint64() - self.createTime = codec.read_uint64() - self.deleteTime = codec.read_uint64() - self.objectId = ObjectId(codec) - self.properties = [] - self.statistics = [] + self._session = session + self._broker = broker + self._schema = schema + self._currentTime = codec.read_uint64() + self._createTime = codec.read_uint64() + self._deleteTime = codec.read_uint64() + self._objectId = ObjectId(codec) + self._properties = [] + self._statistics = [] if prop: notPresent = self._parsePresenceMasks(codec, schema) for property in schema.getProperties(): if property.name in notPresent: - self.properties.append((property, None)) + self._properties.append((property, None)) else: - self.properties.append((property, self.session._decodeValue(codec, property.type))) + self._properties.append((property, self._session._decodeValue(codec, property.type))) if stat: for statistic in schema.getStatistics(): - self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type))) + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) def getObjectId(self): """ Return the object identifier for this object """ - return self.objectId + return self._objectId def getClassKey(self): """ Return the class-key that references the schema describing this object. """ - return self.schema.getKey() + return self._schema.getKey() def getSchema(self): """ Return the schema that describes this object. """ - return self.schema + return self._schema def getMethods(self): """ Return a list of methods available for this object. """ - return self.schema.getMethods() + return self._schema.getMethods() def getTimestamps(self): """ Return the current, creation, and deletion times for this object. """ - return self.currentTime, self.createTime, self.deleteTime + return self._currentTime, self._createTime, self._deleteTime def getIndex(self): """ Return a string describing this object's primary key. """ result = "" - for property, value in self.properties: + for property, value in self._properties: if property.index: if result != "": result += ":" result += str(value) return result + def getProperties(self): + return self._properties + + def getStatistics(self): + return self._statistics + def __repr__(self): return self.getIndex() def __getattr__(self, name): - for method in self.schema.getMethods(): + for method in self._schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for property, value in self.properties: + for property, value in self._properties: if name == property.name: return value - for statistic, value in self.statistics: + for statistic, value in self._statistics: if name == statistic.name: return value raise Exception("Type Object has no attribute '%s'" % name) def _invoke(self, name, args, kwargs): - for method in self.schema.getMethods(): + for method in self._schema.getMethods(): if name == method.name: aIdx = 0 - sendCodec = Codec(self.broker.conn.spec) - seq = self.session.seqMgr._reserve((self, method)) - self.broker._setHeader(sendCodec, 'M', seq) - self.objectId.encode(sendCodec) - pname, cname, hash = self.schema.getKey() + sendCodec = Codec(self._broker.conn.spec) + seq = self._session.seqMgr._reserve((self, method)) + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + pname, cname, hash = self._schema.getKey() sendCodec.write_str8(pname) sendCodec.write_str8(cname) sendCodec.write_bin128(hash) @@ -888,29 +885,30 @@ class Object(object): for arg in method.arguments: if arg.dir.find("I") != -1: - self.session._encodeValue(sendCodec, args[aIdx], arg.type) + self._session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 - smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank())) - self.broker.cv.acquire() - self.broker.syncInFlight = True - self.broker.cv.release() + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % + (self._objectId.getBroker(), self._objectId.getBank())) + self._broker.cv.acquire() + self._broker.syncInFlight = True + self._broker.cv.release() - self.broker._send(smsg) + self._broker._send(smsg) - self.broker.cv.acquire() + self._broker.cv.acquire() starttime = time() - while self.broker.syncInFlight and self.broker.error == None: - self.broker.cv.wait(self.broker.SYNC_TIME) - if time() - starttime > self.broker.SYNC_TIME: - self.broker.cv.release() - self.session.seqMgr._release(seq) + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(self._broker.SYNC_TIME) + if time() - starttime > self._broker.SYNC_TIME: + self._broker.cv.release() + self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") - self.broker.cv.release() - if self.broker.error != None: - errorText = self.broker.error - self.broker.error = None + self._broker.cv.release() + if self._broker.error != None: + errorText = self._broker.error + self._broker.error = None raise Exception(errorText) - return self.broker.syncResult + return self._broker.syncResult raise Exception("Invalid Method (software defect) [%s]" % name) def _parsePresenceMasks(self, codec, schema): @@ -954,7 +952,7 @@ class Broker: self.authUser = authUser self.authPass = authPass self.agents = {} - self.agents[0] = Agent(self, 0, "BrokerAgent") + self.agents[0] = Agent(self, "1.0", "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False @@ -1040,14 +1038,15 @@ class Broker: self.error = "Connect Failed %d - %s" % (e[0], e[1]) def _updateAgent(self, obj): - if obj.deleteTime == 0: - if obj.objectIdBank not in self.agents: - agent = Agent(self, obj.objectIdBank, obj.label) - self.agents[obj.objectIdBank] = agent + bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank) + if obj._deleteTime == 0: + if bankKey not in self.agents: + agent = Agent(self, bankKey, obj.label) + self.agents[bankKey] = agent if self.session.console != None: self.session.console.newAgent(agent) else: - agent = self.agents.pop(obj.objectIdBank, None) + agent = self.agents.pop(bankKey, None) if agent != None and self.session.console != None: self.session.console.delAgent(agent) @@ -1055,7 +1054,7 @@ class Broker: """ Compose the header of a management message. """ codec.write_uint8(ord('A')) codec.write_uint8(ord('M')) - codec.write_uint8(ord('1')) + codec.write_uint8(ord('2')) codec.write_uint8(ord(opcode)) codec.write_uint32(seq) @@ -1068,7 +1067,7 @@ class Broker: if octet != 'M': return None, None octet = chr(codec.read_uint8()) - if octet != '1': + if octet != '2': return None, None opcode = chr(codec.read_uint8()) seq = codec.read_uint32() @@ -1164,28 +1163,24 @@ class Agent: self.label = label def __repr__(self): - return "Agent at bank %d (%s)" % (self.bank, self.label) + return "Agent at bank %s (%s)" % (self.bank, self.label) class Event: """ """ def __init__(self, session, codec): self.session = session - self.timestamp = codec.read_int64() - self.objectId = ObjectId(codec) pname = codec.read_str8() cname = codec.read_str8() hash = codec.read_bin128() self.classKey = (pname, cname, hash) - self.name = codec.read_str8() + self.timestamp = codec.read_int64() + self.schema = None if pname in session.packages: if (cname, hash) in session.packages[pname]: - schema = session.packages[pname][(cname, hash)] - for event in schema.getEvents(): - if event.name == self.name: - self.schemaEvent = event - self.arguments = {} - for arg in event.arguments: - self.arguments[arg.name] = session._decodeValue(codec, arg.type) + self.schema = session.packages[pname][(cname, hash)] + self.arguments = {} + for arg in self.schema.arguments: + self.arguments[arg.name] = session._decodeValue(codec, arg.type) def __repr__(self): return self.getSyslogText() @@ -1202,10 +1197,15 @@ class Event: def getName(self): return self.name + def getSchema(self): + return self.schema + def getSyslogText(self): + if self.schema == None: + return "<uninterpretable>" out = strftime("%c", gmtime(self.timestamp / 1000000000)) - out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name - for arg in self.schemaEvent.arguments: + out += " " + self.classKey[0] + ":" + self.classKey[1] + for arg in self.schema.arguments: out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type) return out @@ -1247,8 +1247,8 @@ class DebugConsole(Console): def newPackage(self, name): print "newPackage:", name - def newClass(self, classKey): - print "newClass:", classKey + def newClass(self, kind, classKey): + print "newClass:", kind, classKey def newAgent(self, agent): print "newAgent:", agent diff --git a/python/tests_0-10/management.py b/python/tests_0-10/management.py index eea1b29404..efec2b8a92 100644 --- a/python/tests_0-10/management.py +++ b/python/tests_0-10/management.py @@ -59,7 +59,7 @@ class ManagementTest (TestBase010): session = self.session self.startQmf() - brokers = self.qmf.getObjects(cls="broker") + brokers = self.qmf.getObjects(_class="broker") self.assertEqual (len(brokers), 1) broker = brokers[0] @@ -147,43 +147,43 @@ class ManagementTest (TestBase010): session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True) session.exchange_bind(queue="dest-queue", exchange="amq.direct") - queues = self.qmf.getObjects(cls="queue") + queues = self.qmf.getObjects(_class="queue") "Move 10 messages from src-queue to dest-queue" - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) self.assertEqual (result.status, 0) - sq = self.qmf.getObjects(cls="queue", name="src-queue")[0] - dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0] + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,10) self.assertEqual (dq.msgDepth,10) "Move all remaining messages to destination" - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) self.assertEqual (result.status,0) - sq = self.qmf.getObjects(cls="queue", name="src-queue")[0] - dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0] + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,0) self.assertEqual (dq.msgDepth,20) "Use a bad source queue name" - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) self.assertEqual (result.status,4) "Use a bad destination queue name" - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) self.assertEqual (result.status,4) " Use a large qty (40) to move from dest-queue back to " " src-queue- should move all " - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) self.assertEqual (result.status,0) - sq = self.qmf.getObjects(cls="queue", name="src-queue")[0] - dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0] + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,20) self.assertEqual (dq.msgDepth,0) @@ -216,23 +216,23 @@ class ManagementTest (TestBase010): msg = Message(props, body) session.message_transfer(destination="amq.direct", message=msg) - pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0] + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] "Purge top message from purge-queue" result = pq.purge(1) self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0] + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,19) "Purge top 9 messages from purge-queue" result = pq.purge(9) self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0] + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,10) "Purge all messages from purge-queue" result = pq.purge(0) self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0] + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) |
