summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-07 21:47:35 +0000
committerTed Ross <tross@apache.org>2008-10-07 21:47:35 +0000
commit9d199b74aee76859480a7ee92d95c6db42028b43 (patch)
treeca09aace4aaac2afa9650cc78833d30b056313a9 /python
parent41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (diff)
downloadqpid-python-9d199b74aee76859480a7ee92d95c6db42028b43.tar.gz
QPID-1327 - Event support for Management
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rwxr-xr-xpython/commands/qpid-config22
-rwxr-xr-xpython/commands/qpid-printevents5
-rwxr-xr-xpython/commands/qpid-route24
-rw-r--r--python/qpid/management.py52
-rw-r--r--python/qpid/managementdata.py4
-rw-r--r--python/qpid/qmfconsole.py274
-rw-r--r--python/tests_0-10/management.py34
7 files changed, 194 insertions, 221 deletions
diff --git a/python/commands/qpid-config b/python/commands/qpid-config
index 13b489abae..8b011778d6 100755
--- a/python/commands/qpid-config
+++ b/python/commands/qpid-config
@@ -84,8 +84,8 @@ class BrokerManager:
self.qmf.delBroker(self.broker)
def Overview (self):
- exchanges = self.qmf.getObjects(cls="exchange")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ queues = self.qmf.getObjects(_class="queue")
print "Total Exchanges: %d" % len (exchanges)
etype = {}
for ex in exchanges:
@@ -106,7 +106,7 @@ class BrokerManager:
print " non-durable: %d" % (len (queues) - _durable)
def ExchangeList (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
+ exchanges = self.qmf.getObjects(_class="exchange")
print "Durable Type Bindings Exchange Name"
print "======================================================="
for ex in exchanges:
@@ -114,9 +114,9 @@ class BrokerManager:
print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name)
def ExchangeListRecurse (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
- bindings = self.qmf.getObjects(cls="binding")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ bindings = self.qmf.getObjects(_class="binding")
+ queues = self.qmf.getObjects(_class="queue")
for ex in exchanges:
if self.match (ex.name, filter):
print "Exchange '%s' (%s)" % (ex.name, ex.type)
@@ -130,8 +130,8 @@ class BrokerManager:
def QueueList (self, filter):
- queues = self.qmf.getObjects(cls="queue")
- journals = self.qmf.getObjects(cls="journal")
+ queues = self.qmf.getObjects(_class="queue")
+ journals = self.qmf.getObjects(_class="journal")
print " Store Size"
print "Durable AutoDel Excl Bindings (files x file pages) Queue Name"
print "==========================================================================================="
@@ -151,9 +151,9 @@ class BrokerManager:
YN (q.exclusive), q.bindingCount, q.name)
def QueueListRecurse (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
- bindings = self.qmf.getObjects(cls="binding")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ bindings = self.qmf.getObjects(_class="binding")
+ queues = self.qmf.getObjects(_class="queue")
for queue in queues:
if self.match (queue.name, filter):
print "Queue '%s'" % queue.name
diff --git a/python/commands/qpid-printevents b/python/commands/qpid-printevents
index 970607c797..6efd472221 100755
--- a/python/commands/qpid-printevents
+++ b/python/commands/qpid-printevents
@@ -30,16 +30,13 @@ class EventConsole(Console):
def event(self, broker, event):
print event
- def heartbeat(self, agent, timestamp):
- print "Heartbeat"
-
##
## Main Program
##
def main():
_usage = "%prog [options] [broker-addr]..."
_description = \
-"""Collect and print events from one of more Qpid message brokers. If no broker-addr is
+"""Collect and print events from one or more Qpid message brokers. If no broker-addr is
supplied, %prog will connect to 'localhost:5672'.
broker-addr is of the form: [username/password@] hostname | ip-address [:<port>]
ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
index f9f938cdec..4dadcd543b 100755
--- a/python/commands/qpid-route
+++ b/python/commands/qpid-route
@@ -62,7 +62,7 @@ class RouteManager:
self.qmf.delBroker(self.broker)
def getLink (self):
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
for link in links:
if "%s:%d" % (link.host, link.port) == self.src.name ():
return link
@@ -74,7 +74,7 @@ class RouteManager:
print "Linking broker to itself is not permitted"
sys.exit(1)
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link != None:
@@ -92,7 +92,7 @@ class RouteManager:
def DelLink (self, srcBroker):
self.src = qmfconsole.BrokerURL(srcBroker)
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
@@ -103,7 +103,7 @@ class RouteManager:
print "Close method returned:", res.status, res.text
def ListLinks (self):
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
if len(links) == 0:
print "No Links Found"
else:
@@ -119,7 +119,7 @@ class RouteManager:
if self.dest.name() == self.src.name():
raise Exception("Linking broker to itself is not permitted")
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
@@ -140,7 +140,7 @@ class RouteManager:
if link == None:
raise Exception("Protocol Error - Missing link ID")
- bridges = self.qmf.getObjects(cls="bridge")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.key == routingKey:
@@ -164,7 +164,7 @@ class RouteManager:
raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
sys.exit (0)
- bridges = self.qmf.getObjects(cls="bridge")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
if _verbose:
@@ -186,8 +186,8 @@ class RouteManager:
raise Exception("Route not found")
def ListRoutes (self):
- links = self.qmf.getObjects(cls="link")
- bridges = self.qmf.getObjects(cls="bridge")
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
myLink = None
@@ -199,8 +199,8 @@ class RouteManager:
print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
def ClearAllRoutes (self):
- links = self.qmf.getObjects(cls="link")
- bridges = self.qmf.getObjects(cls="bridge")
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if _verbose:
@@ -218,7 +218,7 @@ class RouteManager:
print "Ok"
if _dellink:
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
for link in links:
if _verbose:
print "Deleting Link: %s:%d... " % (link.host, link.port),
diff --git a/python/qpid/management.py b/python/qpid/management.py
index 8d8339b2c6..81d9dbe030 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -285,7 +285,7 @@ class managementClient:
ft = {}
ft["_class"] = className
codec.write_map (ft)
- msg = channel.message(codec.encoded, routing_key="agent.%d" % bank)
+ msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank)
channel.send ("qpid.management", msg)
def syncWaitForStable (self, channel):
@@ -398,7 +398,7 @@ class managementClient:
""" Compose the header of a management message. """
codec.write_uint8 (ord ('A'))
codec.write_uint8 (ord ('M'))
- codec.write_uint8 (ord ('1'))
+ codec.write_uint8 (ord ('2'))
codec.write_uint8 (opcode)
codec.write_uint32 (seq)
@@ -412,7 +412,7 @@ class managementClient:
if octet != 'M':
return None
octet = chr (codec.read_uint8 ())
- if octet != '1':
+ if octet != '2':
return None
opcode = chr (codec.read_uint8 ())
seq = codec.read_uint32 ()
@@ -433,7 +433,7 @@ class managementClient:
elif typecode == 6:
codec.write_str8 (value)
elif typecode == 7:
- codec.write_vbin32 (value)
+ codec.write_str16 (value)
elif typecode == 8: # ABSTIME
codec.write_uint64 (long (value))
elif typecode == 9: # DELTATIME
@@ -476,7 +476,7 @@ class managementClient:
elif typecode == 6:
data = str (codec.read_str8 ())
elif typecode == 7:
- data = codec.read_vbin32 ()
+ data = codec.read_str16 ()
elif typecode == 8: # ABSTIME
data = codec.read_uint64 ()
elif typecode == 9: # DELTATIME
@@ -604,6 +604,9 @@ class managementClient:
ch.send ("qpid.management", smsg)
def handleClassInd (self, ch, codec):
+ kind = codec.read_uint8()
+ if kind != 1: # This API doesn't handle new-style events
+ return
pname = str (codec.read_str8())
cname = str (codec.read_str8())
hash = codec.read_bin128()
@@ -656,13 +659,15 @@ class managementClient:
def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
self.decOutstanding (ch)
+ kind = codec.read_uint8()
+ if kind != 1: # This API doesn't handle new-style events
+ return
packageName = str (codec.read_str8 ())
className = str (codec.read_str8 ())
hash = codec.read_bin128 ()
configCount = codec.read_uint16 ()
instCount = codec.read_uint16 ()
methodCount = codec.read_uint16 ()
- eventCount = codec.read_uint16 ()
if packageName not in self.packages:
return
@@ -676,7 +681,6 @@ class managementClient:
configs = []
insts = []
methods = {}
- events = {}
configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
insts.append (("id", 4, None, None))
@@ -765,42 +769,14 @@ class managementClient:
args.append (arg)
methods[mname] = (mdesc, args)
- for idx in range (eventCount):
- ft = codec.read_map ()
- ename = str (ft["name"])
- argCount = ft["argCount"]
- if "desc" in ft:
- edesc = str (ft["desc"])
- else:
- edesc = None
-
- args = []
- for aidx in range (argCount):
- ft = codec.read_map ()
- name = str (ft["name"])
- type = ft["type"]
- unit = None
- desc = None
-
- for key, value in ft.items ():
- if key == "unit":
- unit = str (value)
- elif key == "desc":
- desc = str (value)
-
- arg = (name, type, unit, desc)
- args.append (arg)
- events[ename] = (edesc, args)
-
schemaClass = {}
schemaClass['C'] = configs
schemaClass['I'] = insts
schemaClass['M'] = methods
- schemaClass['E'] = events
self.schema[classKey] = schemaClass
if self.schemaCb != None:
- self.schemaCb (ch.context, classKey, configs, insts, methods, events)
+ self.schemaCb (ch.context, classKey, configs, insts, methods, {})
def parsePresenceMasks(self, codec, schemaClass):
""" Generate a list of not-present properties """
@@ -896,7 +872,7 @@ class managementClient:
codec.write_str8 (classId[1])
codec.write_bin128 (classId[2])
codec.write_str8 (methodName)
- bank = objId.getBank()
+ bank = "%d.%d" % (objId.getBroker(), objId.getBank())
# Encode args according to schema
if classId not in self.schema:
@@ -926,5 +902,5 @@ class managementClient:
packageName = classId[0]
className = classId[1]
- msg = channel.message(codec.encoded, "agent." + str(bank))
+ msg = channel.message(codec.encoded, "agent." + bank)
channel.send ("qpid.management", msg)
diff --git a/python/qpid/managementdata.py b/python/qpid/managementdata.py
index d86dd3a360..2bf66a5e5b 100644
--- a/python/qpid/managementdata.py
+++ b/python/qpid/managementdata.py
@@ -546,10 +546,10 @@ class ManagementData:
for classKey in sorted:
tuple = self.schema[classKey]
row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]),
- len (tuple[2]), len (tuple[3]))
+ len (tuple[2]))
rows.append (row)
self.disp.table ("Classes in Schema:",
- ("Class", "Properties", "Statistics", "Methods", "Events"),
+ ("Class", "Properties", "Statistics", "Methods"),
rows)
finally:
self.lock.release ()
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index 3800e54b5b..c8035e87f2 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -49,7 +49,7 @@ class Console:
""" Invoked when a QMF package is discovered. """
pass
- def newClass(self, classKey):
+ def newClass(self, kind, classKey):
""" Invoked when a new class is discovered. Session.getSchema can be
used to obtain details about the class."""
pass
@@ -158,7 +158,7 @@ class Session:
raise Exception(broker.error)
self.brokers.append(broker)
- self.getObjects(broker=broker, cls="agent")
+ self.getObjects(broker=broker, _class="agent")
return broker
def delBroker(self, broker):
@@ -219,35 +219,36 @@ class Session:
The class for queried objects may be specified in one of the following ways:
- schema = <schema> - supply a schema object returned from getSchema
- key = <key> - supply a classKey from the list returned by getClasses
- cls = <name> - supply a class name as a string
+ _schema = <schema> - supply a schema object returned from getSchema.
+ _key = <key> - supply a classKey from the list returned by getClasses.
+ _class = <name> - supply a class name as a string. If the class name exists
+ in multiple packages, a _package argument may also be supplied.
If objects should be obtained from only one agent, use the following argument.
Otherwise, the query will go to all agents.
- agent = <agent> - supply an agent from the list returned by getAgents
+ _agent = <agent> - supply an agent from the list returned by getAgents.
If the get query is to be restricted to one broker (as opposed to all connected brokers),
add the following argument:
- broker = <broker> - supply a broker as returned by addBroker
+ _broker = <broker> - supply a broker as returned by addBroker.
If additional arguments are supplied, they are used as property selectors. For example,
if the argument name="test" is supplied, only objects whose "name" property is "test"
will be returned in the result.
"""
- if "broker" in kwargs:
+ if "_broker" in kwargs:
brokerList = []
- brokerList.append(kwargs["broker"])
+ brokerList.append(kwargs["_broker"])
else:
brokerList = self.brokers
for broker in brokerList:
broker._waitForStable()
agentList = []
- if "agent" in kwargs:
- agent = kwargs["agent"]
+ if "_agent" in kwargs:
+ agent = kwargs["_agent"]
if agent.broker not in brokerList:
raise Exception("Supplied agent is not accessible through the supplied broker")
agentList.append(agent)
@@ -257,11 +258,14 @@ class Session:
agentList.append(agent)
cname = None
- if "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey()
- elif "key" in kwargs: pname, cname, hash = kwargs["key"]
- elif "cls" in kwargs: pname, cname, hash = None, kwargs["cls"], None
+ if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey()
+ elif "_key" in kwargs: pname, cname, hash = kwargs["_key"]
+ elif "_class" in kwargs:
+ pname, cname, hash = None, kwargs["_class"], None
+ if "_package" in kwargs:
+ pname = kwargs["_package"]
if cname == None:
- raise Exception("No class supplied, use 'schema', 'key', or 'cls' argument")
+ raise Exception("No class supplied, use '_schema', '_key', or '_class' argument")
map = {}
map["_class"] = cname
if pname != None: map["_package"] = pname
@@ -269,7 +273,7 @@ class Session:
self.getSelect = []
for item in kwargs:
- if item != "schema" and item != "key" and item != "cls":
+ if item[0] != '_':
self.getSelect.append((item, kwargs[item]))
self.getResult = []
@@ -282,7 +286,7 @@ class Session:
self.cv.release()
broker._setHeader(sendCodec, 'G', seq)
sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank)
+ smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
broker._send(smsg)
starttime = time()
@@ -382,6 +386,7 @@ class Session:
self.cv.release()
def _handleClassInd(self, broker, codec, seq):
+ kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
@@ -431,17 +436,18 @@ class Session:
self.console.event(broker, event)
def _handleSchemaResp(self, broker, codec, seq):
+ kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
classKey = (pname, cname, hash)
- _class = SchemaClass(classKey, codec)
+ _class = SchemaClass(kind, classKey, codec)
self.cv.acquire()
self.packages[pname][(cname, hash)] = _class
self.cv.release()
broker._decOutstanding()
if self.console != None:
- self.console.newClass(classKey)
+ self.console.newClass(kind, classKey)
def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
pname = str(codec.read_str8())
@@ -485,7 +491,7 @@ class Session:
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
for key, value in self.getSelect:
- for prop, propval in object.properties:
+ for prop, propval in object.getProperties():
if key == prop.name and value != propval:
return False
return True
@@ -497,7 +503,7 @@ class Session:
elif typecode == 3: data = codec.read_uint32() # U32
elif typecode == 4: data = codec.read_uint64() # U64
elif typecode == 6: data = str(codec.read_str8()) # SSTR
- elif typecode == 7: data = codec.read_vbin32() # LSTR
+ elif typecode == 7: data = codec.read_str16() # LSTR
elif typecode == 8: data = codec.read_int64() # ABSTIME
elif typecode == 9: data = codec.read_uint64() # DELTATIME
elif typecode == 10: data = ObjectId(codec) # REF
@@ -521,7 +527,7 @@ class Session:
elif typecode == 3: codec.write_uint32 (long(value)) # U32
elif typecode == 4: codec.write_uint64 (long(value)) # U64
elif typecode == 6: codec.write_str8 (value) # SSTR
- elif typecode == 7: codec.write_vbin32 (value) # LSTR
+ elif typecode == 7: codec.write_str16 (value) # LSTR
elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME
elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME
elif typecode == 10: value.encode (codec) # REF
@@ -577,30 +583,42 @@ class ClassKey:
class SchemaClass:
""" """
- def __init__(self, key, codec):
+ CLASS_KIND_TABLE = 1
+ CLASS_KIND_EVENT = 2
+
+ def __init__(self, kind, key, codec):
+ self.kind = kind
self.classKey = key
self.properties = []
self.statistics = []
- self.methods = []
- self.events = []
-
- propCount = codec.read_uint16()
- statCount = codec.read_uint16()
- methodCount = codec.read_uint16()
- eventCount = codec.read_uint16()
-
- for idx in range(propCount):
- self.properties.append(SchemaProperty(codec))
- for idx in range(statCount):
- self.statistics.append(SchemaStatistic(codec))
- for idx in range(methodCount):
- self.methods.append(SchemaMethod(codec))
- for idx in range(eventCount):
- self.events.append(SchemaEvent(codec))
+ self.methods = []
+ self.arguments = []
+
+ if self.kind == self.CLASS_KIND_TABLE:
+ propCount = codec.read_uint16()
+ statCount = codec.read_uint16()
+ methodCount = codec.read_uint16()
+ for idx in range(propCount):
+ self.properties.append(SchemaProperty(codec))
+ for idx in range(statCount):
+ self.statistics.append(SchemaStatistic(codec))
+ for idx in range(methodCount):
+ self.methods.append(SchemaMethod(codec))
+
+ elif self.kind == self.CLASS_KIND_EVENT:
+ argCount = codec.read_uint16()
+ for idx in range(argCount):
+ self.arguments.append(SchemaArgument(codec, methodArg=False))
def __repr__(self):
pname, cname, hash = self.classKey
- result = "Class: %s:%s " % (pname, cname)
+ if self.kind == self.CLASS_KIND_TABLE:
+ kindStr = "Table"
+ elif self.kind == self.CLASS_KIND_EVENT:
+ kindStr = "Event"
+ else:
+ kindStr = "Unsupported"
+ result = "%s Class: %s:%s " % (kindStr, pname, cname)
result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash)
return result
@@ -620,9 +638,9 @@ class SchemaClass:
""" Return the list of methods for the class. """
return self.methods
- def getEvents(self):
+ def getArguments(self):
""" Return the list of events for the class. """
- return self.events
+ return self.arguments
class SchemaProperty:
""" """
@@ -693,33 +711,6 @@ class SchemaMethod:
result += ")"
return result
-class SchemaEvent:
- """ """
- def __init__(self, codec):
- map = codec.read_map()
- self.name = str(map["name"])
- argCount = map["argCount"]
- if "desc" in map:
- self.desc = str(map["desc"])
- else:
- self.desc = None
- self.arguments = []
-
- for idx in range(argCount):
- self.arguments.append(SchemaArgument(codec, methodArg=False))
-
- def __repr__(self):
- result = self.name + "("
- first = True
- for arg in self.arguments:
- if first:
- first = False
- else:
- result += ", "
- result += arg.name
- result += ")"
- return result
-
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -743,7 +734,7 @@ class SchemaArgument:
elif key == "desc" : self.desc = str(value)
elif key == "default" : self.default = str(value)
-class ObjectId(object):
+class ObjectId:
""" Object that represents QMF object identifiers """
def __init__(self, codec, first=0, second=0):
if codec:
@@ -800,80 +791,86 @@ class Object(object):
""" """
def __init__(self, session, broker, schema, codec, prop, stat):
""" """
- self.session = session
- self.broker = broker
- self.schema = schema
- self.currentTime = codec.read_uint64()
- self.createTime = codec.read_uint64()
- self.deleteTime = codec.read_uint64()
- self.objectId = ObjectId(codec)
- self.properties = []
- self.statistics = []
+ self._session = session
+ self._broker = broker
+ self._schema = schema
+ self._currentTime = codec.read_uint64()
+ self._createTime = codec.read_uint64()
+ self._deleteTime = codec.read_uint64()
+ self._objectId = ObjectId(codec)
+ self._properties = []
+ self._statistics = []
if prop:
notPresent = self._parsePresenceMasks(codec, schema)
for property in schema.getProperties():
if property.name in notPresent:
- self.properties.append((property, None))
+ self._properties.append((property, None))
else:
- self.properties.append((property, self.session._decodeValue(codec, property.type)))
+ self._properties.append((property, self._session._decodeValue(codec, property.type)))
if stat:
for statistic in schema.getStatistics():
- self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type)))
+ self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
def getObjectId(self):
""" Return the object identifier for this object """
- return self.objectId
+ return self._objectId
def getClassKey(self):
""" Return the class-key that references the schema describing this object. """
- return self.schema.getKey()
+ return self._schema.getKey()
def getSchema(self):
""" Return the schema that describes this object. """
- return self.schema
+ return self._schema
def getMethods(self):
""" Return a list of methods available for this object. """
- return self.schema.getMethods()
+ return self._schema.getMethods()
def getTimestamps(self):
""" Return the current, creation, and deletion times for this object. """
- return self.currentTime, self.createTime, self.deleteTime
+ return self._currentTime, self._createTime, self._deleteTime
def getIndex(self):
""" Return a string describing this object's primary key. """
result = ""
- for property, value in self.properties:
+ for property, value in self._properties:
if property.index:
if result != "":
result += ":"
result += str(value)
return result
+ def getProperties(self):
+ return self._properties
+
+ def getStatistics(self):
+ return self._statistics
+
def __repr__(self):
return self.getIndex()
def __getattr__(self, name):
- for method in self.schema.getMethods():
+ for method in self._schema.getMethods():
if name == method.name:
return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self.properties:
+ for property, value in self._properties:
if name == property.name:
return value
- for statistic, value in self.statistics:
+ for statistic, value in self._statistics:
if name == statistic.name:
return value
raise Exception("Type Object has no attribute '%s'" % name)
def _invoke(self, name, args, kwargs):
- for method in self.schema.getMethods():
+ for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
- sendCodec = Codec(self.broker.conn.spec)
- seq = self.session.seqMgr._reserve((self, method))
- self.broker._setHeader(sendCodec, 'M', seq)
- self.objectId.encode(sendCodec)
- pname, cname, hash = self.schema.getKey()
+ sendCodec = Codec(self._broker.conn.spec)
+ seq = self._session.seqMgr._reserve((self, method))
+ self._broker._setHeader(sendCodec, 'M', seq)
+ self._objectId.encode(sendCodec)
+ pname, cname, hash = self._schema.getKey()
sendCodec.write_str8(pname)
sendCodec.write_str8(cname)
sendCodec.write_bin128(hash)
@@ -888,29 +885,30 @@ class Object(object):
for arg in method.arguments:
if arg.dir.find("I") != -1:
- self.session._encodeValue(sendCodec, args[aIdx], arg.type)
+ self._session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
- smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
- self.broker.cv.acquire()
- self.broker.syncInFlight = True
- self.broker.cv.release()
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
+ (self._objectId.getBroker(), self._objectId.getBank()))
+ self._broker.cv.acquire()
+ self._broker.syncInFlight = True
+ self._broker.cv.release()
- self.broker._send(smsg)
+ self._broker._send(smsg)
- self.broker.cv.acquire()
+ self._broker.cv.acquire()
starttime = time()
- while self.broker.syncInFlight and self.broker.error == None:
- self.broker.cv.wait(self.broker.SYNC_TIME)
- if time() - starttime > self.broker.SYNC_TIME:
- self.broker.cv.release()
- self.session.seqMgr._release(seq)
+ while self._broker.syncInFlight and self._broker.error == None:
+ self._broker.cv.wait(self._broker.SYNC_TIME)
+ if time() - starttime > self._broker.SYNC_TIME:
+ self._broker.cv.release()
+ self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
- self.broker.cv.release()
- if self.broker.error != None:
- errorText = self.broker.error
- self.broker.error = None
+ self._broker.cv.release()
+ if self._broker.error != None:
+ errorText = self._broker.error
+ self._broker.error = None
raise Exception(errorText)
- return self.broker.syncResult
+ return self._broker.syncResult
raise Exception("Invalid Method (software defect) [%s]" % name)
def _parsePresenceMasks(self, codec, schema):
@@ -954,7 +952,7 @@ class Broker:
self.authUser = authUser
self.authPass = authPass
self.agents = {}
- self.agents[0] = Agent(self, 0, "BrokerAgent")
+ self.agents[0] = Agent(self, "1.0", "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
@@ -1040,14 +1038,15 @@ class Broker:
self.error = "Connect Failed %d - %s" % (e[0], e[1])
def _updateAgent(self, obj):
- if obj.deleteTime == 0:
- if obj.objectIdBank not in self.agents:
- agent = Agent(self, obj.objectIdBank, obj.label)
- self.agents[obj.objectIdBank] = agent
+ bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank)
+ if obj._deleteTime == 0:
+ if bankKey not in self.agents:
+ agent = Agent(self, bankKey, obj.label)
+ self.agents[bankKey] = agent
if self.session.console != None:
self.session.console.newAgent(agent)
else:
- agent = self.agents.pop(obj.objectIdBank, None)
+ agent = self.agents.pop(bankKey, None)
if agent != None and self.session.console != None:
self.session.console.delAgent(agent)
@@ -1055,7 +1054,7 @@ class Broker:
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
codec.write_uint8(ord('M'))
- codec.write_uint8(ord('1'))
+ codec.write_uint8(ord('2'))
codec.write_uint8(ord(opcode))
codec.write_uint32(seq)
@@ -1068,7 +1067,7 @@ class Broker:
if octet != 'M':
return None, None
octet = chr(codec.read_uint8())
- if octet != '1':
+ if octet != '2':
return None, None
opcode = chr(codec.read_uint8())
seq = codec.read_uint32()
@@ -1164,28 +1163,24 @@ class Agent:
self.label = label
def __repr__(self):
- return "Agent at bank %d (%s)" % (self.bank, self.label)
+ return "Agent at bank %s (%s)" % (self.bank, self.label)
class Event:
""" """
def __init__(self, session, codec):
self.session = session
- self.timestamp = codec.read_int64()
- self.objectId = ObjectId(codec)
pname = codec.read_str8()
cname = codec.read_str8()
hash = codec.read_bin128()
self.classKey = (pname, cname, hash)
- self.name = codec.read_str8()
+ self.timestamp = codec.read_int64()
+ self.schema = None
if pname in session.packages:
if (cname, hash) in session.packages[pname]:
- schema = session.packages[pname][(cname, hash)]
- for event in schema.getEvents():
- if event.name == self.name:
- self.schemaEvent = event
- self.arguments = {}
- for arg in event.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type)
+ self.schema = session.packages[pname][(cname, hash)]
+ self.arguments = {}
+ for arg in self.schema.arguments:
+ self.arguments[arg.name] = session._decodeValue(codec, arg.type)
def __repr__(self):
return self.getSyslogText()
@@ -1202,10 +1197,15 @@ class Event:
def getName(self):
return self.name
+ def getSchema(self):
+ return self.schema
+
def getSyslogText(self):
+ if self.schema == None:
+ return "<uninterpretable>"
out = strftime("%c", gmtime(self.timestamp / 1000000000))
- out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name
- for arg in self.schemaEvent.arguments:
+ out += " " + self.classKey[0] + ":" + self.classKey[1]
+ for arg in self.schema.arguments:
out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type)
return out
@@ -1247,8 +1247,8 @@ class DebugConsole(Console):
def newPackage(self, name):
print "newPackage:", name
- def newClass(self, classKey):
- print "newClass:", classKey
+ def newClass(self, kind, classKey):
+ print "newClass:", kind, classKey
def newAgent(self, agent):
print "newAgent:", agent
diff --git a/python/tests_0-10/management.py b/python/tests_0-10/management.py
index eea1b29404..efec2b8a92 100644
--- a/python/tests_0-10/management.py
+++ b/python/tests_0-10/management.py
@@ -59,7 +59,7 @@ class ManagementTest (TestBase010):
session = self.session
self.startQmf()
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
self.assertEqual (len(brokers), 1)
broker = brokers[0]
@@ -147,43 +147,43 @@ class ManagementTest (TestBase010):
session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
session.exchange_bind(queue="dest-queue", exchange="amq.direct")
- queues = self.qmf.getObjects(cls="queue")
+ queues = self.qmf.getObjects(_class="queue")
"Move 10 messages from src-queue to dest-queue"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
self.assertEqual (result.status, 0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,10)
self.assertEqual (dq.msgDepth,10)
"Move all remaining messages to destination"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
self.assertEqual (result.status,0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,0)
self.assertEqual (dq.msgDepth,20)
"Use a bad source queue name"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
self.assertEqual (result.status,4)
"Use a bad destination queue name"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
self.assertEqual (result.status,4)
" Use a large qty (40) to move from dest-queue back to "
" src-queue- should move all "
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
self.assertEqual (result.status,0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,20)
self.assertEqual (dq.msgDepth,0)
@@ -216,23 +216,23 @@ class ManagementTest (TestBase010):
msg = Message(props, body)
session.message_transfer(destination="amq.direct", message=msg)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
"Purge top message from purge-queue"
result = pq.purge(1)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,19)
"Purge top 9 messages from purge-queue"
result = pq.purge(9)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,10)
"Purge all messages from purge-queue"
result = pq.purge(0)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)