diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-02-28 18:55:21 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-02-28 18:55:21 +0000 |
| commit | ac3f850123c903f00c163d6d2dbad22d98aec7a2 (patch) | |
| tree | 2e622a3e9349a9062454d16bf4bca83a5a3e9d90 /python | |
| parent | 1820dd421a096ed184a08deee9512e809312fed2 (diff) | |
| download | qpid-python-ac3f850123c903f00c163d6d2dbad22d98aec7a2.tar.gz | |
QPID-820 from tross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@632087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rwxr-xr-x | python/mgmt-cli/main.py | 34 | ||||
| -rw-r--r-- | python/mgmt-cli/managementdata.py | 152 | ||||
| -rw-r--r-- | python/qpid/codec.py | 32 | ||||
| -rw-r--r-- | python/qpid/management.py | 551 |
4 files changed, 473 insertions, 296 deletions
diff --git a/python/mgmt-cli/main.py b/python/mgmt-cli/main.py index 76e1f25c14..f4c22012eb 100755 --- a/python/mgmt-cli/main.py +++ b/python/mgmt-cli/main.py @@ -104,7 +104,10 @@ class Mcli (Cmd): self.dataObject.do_list (data) def do_call (self, data): - self.dataObject.do_call (data) + try: + self.dataObject.do_call (data) + except ValueError, e: + print "ValueError:", e def do_EOF (self, data): print "quit" @@ -121,7 +124,10 @@ class Mcli (Cmd): self.dataObject.close () def Usage (): - print sys.argv[0], "[<target-host> [<tcp-port>]]" + print "Usage:", sys.argv[0], "[OPTIONS] [<target-host> [<tcp-port>]]" + print + print "Options:" + print " -s <amqp-spec-file> default: /usr/share/amqp/amqp.0-10-preview.xml" print sys.exit (1) @@ -134,13 +140,15 @@ try: (optlist, cargs) = getopt.getopt (sys.argv[1:], 's:') except: Usage () + exit (1) specpath = "/usr/share/amqp/amqp.0-10-preview.xml" host = "localhost" port = 5672 -if "s" in optlist: - specpath = optlist["s"] +for opt in optlist: + if opt[0] == "-s": + specpath = opt[1] if len (cargs) > 0: host = cargs[0] @@ -148,19 +156,27 @@ if len (cargs) > 0: if len (cargs) > 1: port = int (cargs[1]) -print ("Management Tool for QPID") disp = Display () # Attempt to make a connection to the target broker try: - data = ManagementData (disp, host, port, spec=specpath) + data = ManagementData (disp, host, port, specfile=specpath) except socket.error, e: - sys.exit (0) + print "Socket Error:", e[1] + sys.exit (1) except Closed, e: if str(e).find ("Exchange not found") != -1: print "Management not enabled on broker: Use '-m yes' option on broker startup." - sys.exit (0) + sys.exit (1) +except IOError, e: + print "IOError: %d - %s: %s" % (e.errno, e.strerror, e.filename) + sys.exit (1) # Instantiate the CLI interpreter and launch it. cli = Mcli (data, disp) -cli.cmdloop () +print ("Management Tool for QPID") +try: + cli.cmdloop () +except Closed, e: + print "Connection to Broker Lost:", e + exit (1) diff --git a/python/mgmt-cli/managementdata.py b/python/mgmt-cli/managementdata.py index e7233c98ae..5b13594994 100644 --- a/python/mgmt-cli/managementdata.py +++ b/python/mgmt-cli/managementdata.py @@ -19,10 +19,12 @@ # under the License. # -from qpid.management import ManagedBroker +import qpid +from qpid.management import managementChannel, managementClient from threading import Lock from disp import Display from shlex import split +from qpid.client import Client class ManagementData: @@ -35,9 +37,10 @@ class ManagementData: # The only historical data it keeps are the high and low watermarks # for hi-lo statistics. # - # tables :== {<class-name>} + # tables :== {class-key} # {<obj-id>} # (timestamp, config-record, inst-record) + # class-key :== (<package-name>, <class-name>, <class-hash>) # timestamp :== (<last-interval-time>, <create-time>, <delete-time>) # config-record :== [element] # inst-record :== [element] @@ -59,6 +62,10 @@ class ManagementData: return displayId + self.baseId return displayId - 5000 + 0x8000000000000000L + def displayClassName (self, cls): + (packageName, className, hash) = cls + return packageName + "." + className + def dataHandler (self, context, className, list, timestamps): """ Callback for configuration and instrumentation data updates """ self.lock.acquire () @@ -104,6 +111,12 @@ class ManagementData: finally: self.lock.release () + def configHandler (self, context, className, list, timestamps): + self.dataHandler (0, className, list, timestamps); + + def instHandler (self, context, className, list, timestamps): + self.dataHandler (1, className, list, timestamps); + def methodReply (self, broker, sequence, status, sText, args): """ Callback for method-reply messages """ self.lock.acquire () @@ -121,12 +134,8 @@ class ManagementData: self.schema[className] = (configs, insts, methods, events) def __init__ (self, disp, host, port=5672, username="guest", password="guest", - spec="../../specs/amqp.0-10-preview.xml"): - self.broker = ManagedBroker (host, port, username, password, spec) - self.broker.configListener (0, self.dataHandler) - self.broker.instrumentationListener (1, self.dataHandler) - self.broker.methodListener (None, self.methodReply) - self.broker.schemaListener (None, self.schemaHandler) + specfile="../../specs/amqp.0-10-preview.xml"): + self.spec = qpid.spec.load (specfile) self.lock = Lock () self.tables = {} self.schema = {} @@ -135,24 +144,33 @@ class ManagementData: self.lastUnit = None self.methodSeq = 1 self.methodsPending = {} - self.broker.start () + + self.client = Client (host, port, self.spec) + self.client.start ({"LOGIN": username, "PASSWORD": password}) + self.channel = self.client.channel (1) + + self.mclient = managementClient (self.spec, None, self.configHandler, + self.instHandler, self.methodReply) + self.mclient.schemaListener (self.schemaHandler) + self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb) + self.mclient.addChannel (self.mch) def close (self): - self.broker.stop () + self.mclient.removeChannel (self.mch) def refName (self, oid): if oid == 0: return "NULL" return str (self.displayObjId (oid)) - def valueDisplay (self, className, key, value): + def valueDisplay (self, classKey, key, value): for kind in range (2): - schema = self.schema[className][kind] + schema = self.schema[classKey][kind] for item in schema: if item[0] == key: typecode = item[1] unit = item[2] - if typecode >= 1 and typecode <= 5: # numerics + if (typecode >= 1 and typecode <= 5) or typecode >= 12: # numerics if unit == None or unit == self.lastUnit: return str (value) else: @@ -191,6 +209,20 @@ class ManagementData: result = result + self.valueDisplay (className, key, val) return result + def getClassKey (self, className): + dotPos = className.find(".") + if dotPos == -1: + for key in self.schema: + if key[1] == className: + return key + else: + package = className[0:dotPos] + name = className[dotPos + 1:] + for key in self.schema: + if key[0] == package and key[1] == name: + return key + return None + def classCompletions (self, prefix): """ Provide a list of candidate class names for command completion """ self.lock.acquire () @@ -227,6 +259,10 @@ class ManagementData: return "reference" elif typecode == 11: return "boolean" + elif typecode == 12: + return "float" + elif typecode == 13: + return "double" else: raise ValueError ("Invalid type code: %d" % typecode) @@ -253,16 +289,16 @@ class ManagementData: return False return True - def listOfIds (self, className, tokens): + def listOfIds (self, classKey, tokens): """ Generate a tuple of object ids for a classname based on command tokens. """ list = [] if tokens[0] == "all": - for id in self.tables[className]: + for id in self.tables[classKey]: list.append (self.displayObjId (id)) elif tokens[0] == "active": - for id in self.tables[className]: - if self.tables[className][id][0][2] == 0: + for id in self.tables[classKey]: + if self.tables[classKey][id][0][2] == 0: list.append (self.displayObjId (id)) else: @@ -271,7 +307,7 @@ class ManagementData: if token.find ("-") != -1: ids = token.split("-", 2) for id in range (int (ids[0]), int (ids[1]) + 1): - if self.getClassForId (self.rawObjId (long (id))) == className: + if self.getClassForId (self.rawObjId (long (id))) == classKey: list.append (id) else: list.append (token) @@ -301,7 +337,7 @@ class ManagementData: deleted = deleted + 1 else: active = active + 1 - rows.append ((name, active, deleted)) + rows.append ((self.displayClassName (name), active, deleted)) self.disp.table ("Management Object Types:", ("ObjectType", "Active", "Deleted"), rows) finally: @@ -311,22 +347,23 @@ class ManagementData: """ Generate a display of a list of objects in a class """ self.lock.acquire () try: - if className not in self.tables: + classKey = self.getClassKey (className) + if classKey == None: print ("Object type %s not known" % className) else: rows = [] - sorted = self.tables[className].keys () + sorted = self.tables[classKey].keys () sorted.sort () for objId in sorted: - (ts, config, inst) = self.tables[className][objId] + (ts, config, inst) = self.tables[classKey][objId] createTime = self.disp.timestamp (ts[1]) destroyTime = "-" if ts[2] > 0: destroyTime = self.disp.timestamp (ts[2]) - objIndex = self.getObjIndex (className, config) + objIndex = self.getObjIndex (classKey, config) row = (self.refName (objId), createTime, destroyTime, objIndex) rows.append (row) - self.disp.table ("Objects of type %s" % className, + self.disp.table ("Objects of type %s.%s" % (classKey[0], classKey[1]), ("ID", "Created", "Destroyed", "Index"), rows) finally: @@ -343,57 +380,57 @@ class ManagementData: else: rootId = int (tokens[0]) - className = self.getClassForId (self.rawObjId (rootId)) + classKey = self.getClassForId (self.rawObjId (rootId)) remaining = tokens - if className == None: + if classKey == None: print "Id not known: %d" % int (tokens[0]) raise ValueError () else: - className = tokens[0] + classKey = self.getClassKey (tokens[0]) remaining = tokens[1:] - if className not in self.tables: - print "Class not known: %s" % className + if classKey not in self.tables: + print "Class not known: %s" % tokens[0] raise ValueError () - userIds = self.listOfIds (className, remaining) + userIds = self.listOfIds (classKey, remaining) if len (userIds) == 0: print "No object IDs supplied" raise ValueError () ids = [] for id in userIds: - if self.getClassForId (self.rawObjId (long (id))) == className: + if self.getClassForId (self.rawObjId (long (id))) == classKey: ids.append (self.rawObjId (long (id))) rows = [] timestamp = None - config = self.tables[className][ids[0]][1] + config = self.tables[classKey][ids[0]][1] for eIdx in range (len (config)): key = config[eIdx][0] if key != "id": row = ("config", key) for id in ids: if timestamp == None or \ - timestamp < self.tables[className][id][0][0]: - timestamp = self.tables[className][id][0][0] - (key, value) = self.tables[className][id][1][eIdx] - row = row + (self.valueDisplay (className, key, value),) + timestamp < self.tables[classKey][id][0][0]: + timestamp = self.tables[classKey][id][0][0] + (key, value) = self.tables[classKey][id][1][eIdx] + row = row + (self.valueDisplay (classKey, key, value),) rows.append (row) - inst = self.tables[className][ids[0]][2] + inst = self.tables[classKey][ids[0]][2] for eIdx in range (len (inst)): key = inst[eIdx][0] if key != "id": row = ("inst", key) for id in ids: - (key, value) = self.tables[className][id][2][eIdx] - row = row + (self.valueDisplay (className, key, value),) + (key, value) = self.tables[classKey][id][2][eIdx] + row = row + (self.valueDisplay (classKey, key, value),) rows.append (row) titleRow = ("Type", "Element") for id in ids: titleRow = titleRow + (self.refName (id),) - caption = "Object of type %s:" % className + caption = "Object of type %s.%s:" % (classKey[0], classKey[1]) if timestamp != None: caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")" self.disp.table (caption, titleRow, rows) @@ -423,12 +460,13 @@ class ManagementData: """ Generate a display of details of the schema of a particular class """ self.lock.acquire () try: - if className not in self.schema: + classKey = self.getClassKey (className) + if classKey == None: print ("Class name %s not known" % className) raise ValueError () rows = [] - for config in self.schema[className][0]: + for config in self.schema[classKey][0]: name = config[0] if name != "id": typename = self.typeName(config[1]) @@ -446,7 +484,7 @@ class ManagementData: extra = extra + "MaxLen: " + str (config[8]) rows.append ((name, typename, unit, access, extra, desc)) - for config in self.schema[className][1]: + for config in self.schema[classKey][1]: name = config[0] if name != "id": typename = self.typeName(config[1]) @@ -455,10 +493,10 @@ class ManagementData: rows.append ((name, typename, unit, "", "", desc)) titles = ("Element", "Type", "Unit", "Access", "Notes", "Description") - self.disp.table ("Schema for class '%s':" % className, titles, rows) + self.disp.table ("Schema for class '%s.%s':" % (classKey[0], classKey[1]), titles, rows) - for mname in self.schema[className][2]: - (mdesc, args) = self.schema[className][2][mname] + for mname in self.schema[classKey][2]: + (mdesc, args) = self.schema[classKey][2][mname] caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc)) rows = [] for arg in args: @@ -485,25 +523,25 @@ class ManagementData: self.lock.release () def getClassForId (self, objId): - """ Given an object ID, return the class name for the referenced object """ - for className in self.tables: - if objId in self.tables[className]: - return className + """ Given an object ID, return the class key for the referenced object """ + for classKey in self.tables: + if objId in self.tables[classKey]: + return classKey return None def callMethod (self, userOid, methodName, args): self.lock.acquire () methodOk = True try: - className = self.getClassForId (self.rawObjId (userOid)) - if className == None: + classKey = self.getClassForId (self.rawObjId (userOid)) + if classKey == None: raise ValueError () - if methodName not in self.schema[className][2]: - print "Method '%s' not valid for class '%s'" % (methodName, className) + if methodName not in self.schema[classKey][2]: + print "Method '%s' not valid for class '%s.%s'" % (methodName, classKey[0], classKey[1]) raise ValueError () - schemaMethod = self.schema[className][2][methodName] + schemaMethod = self.schema[classKey][2][methodName] if len (args) != len (schemaMethod[1]): print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args)) raise ValueError () @@ -519,8 +557,8 @@ class ManagementData: self.lock.release () if methodOk: # try: - self.broker.method (self.methodSeq, self.rawObjId (userOid), className, - methodName, namedArgs) + self.mclient.callMethod (self.mch, self.methodSeq, self.rawObjId (userOid), classKey, + methodName, namedArgs) # except ValueError, e: # print "Error invoking method:", e diff --git a/python/qpid/codec.py b/python/qpid/codec.py index b25de11f11..1a9372455d 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -265,6 +265,38 @@ class Codec: """ return self.unpack("!Q") + def encode_float(self, o): + self.pack("!f", o) + + def decode_float(self): + return self.unpack("!f") + + def encode_double(self, o): + self.pack("!d", o) + + def decode_double(self): + return self.unpack("!d") + + def encode_bin128(self, b): + for idx in range (0,16): + self.pack("!B", ord (b[idx])) + + def decode_bin128(self): + result = "" + for idx in range (0,16): + result = result + chr (self.unpack("!B")) + return result + + def encode_raw(self, len, b): + for idx in range (0,len): + self.pack("!B", b[idx]) + + def decode_raw(self, len): + result = "" + for idx in range (0,len): + result = result + chr (self.unpack("!B")) + return result + def enc_str(self, fmt, s): """ encodes a string 's' in network byte order as per format 'fmt' diff --git a/python/qpid/management.py b/python/qpid/management.py index 40de2a5298..b5d992cf5d 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -35,12 +35,14 @@ from threading import Lock class SequenceManager: + """ Manage sequence numbers for asynchronous method calls """ def __init__ (self): self.lock = Lock () self.sequence = 0 self.pending = {} def reserve (self, data): + """ Reserve a unique sequence number """ self.lock.acquire () result = self.sequence self.sequence = self.sequence + 1 @@ -49,6 +51,7 @@ class SequenceManager: return result def release (self, seq): + """ Release a reserved sequence number """ data = None self.lock.acquire () if seq in self.pending: @@ -57,12 +60,172 @@ class SequenceManager: self.lock.release () return data -class ManagementMetadata: - """One instance of this class is created for each ManagedBroker. It - is used to store metadata from the broker which is needed for the - proper interpretation of received management content.""" + +class managementChannel: + """ This class represents a connection to an AMQP broker. """ + + def __init__ (self, ch, topicCb, replyCb, cbContext=None): + """ Given a channel on an established AMQP broker connection, this method + opens a session and performs all of the declarations and bindings needed + to participate in the management protocol. """ + response = ch.session_open (detached_lifetime=300) + self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id) + self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id) + self.qpidChannel = ch + self.tcb = topicCb + self.rcb = replyCb + self.context = cbContext + + ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1) + ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1) + + ch.queue_bind (exchange="qpid.management", + queue=self.topicName, routing_key="mgmt.#") + ch.queue_bind (exchange="amq.direct", + queue=self.replyName, routing_key=self.replyName) + ch.message_subscribe (queue=self.topicName, destination="tdest") + ch.message_subscribe (queue=self.replyName, destination="rdest") + + ch.client.queue ("tdest").listen (self.topicCb) + ch.client.queue ("rdest").listen (self.replyCb) + + ch.message_flow_mode (destination="tdest", mode=1) + ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF) + ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF) + + ch.message_flow_mode (destination="rdest", mode=1) + ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) + ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) + + def topicCb (self, msg): + """ Receive messages via the topic queue on this channel. """ + self.tcb (self, msg) + + def replyCb (self, msg): + """ Receive messages via the reply queue on this channel. """ + self.rcb (self, msg) + + def send (self, exchange, msg): + self.qpidChannel.message_transfer (destination=exchange, content=msg) + + +class managementClient: + """ This class provides an API for access to management data on the AMQP + network. It implements the management protocol and manages the management + schemas as advertised by the various management agents in the network. """ + + #======================================================== + # User API - interacts with the class's user + #======================================================== + def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None): + self.spec = amqpSpec + self.ctrlCb = ctrlCb + self.configCb = configCb + self.instCb = instCb + self.methodCb = methodCb + self.schemaCb = None + self.eventCb = None + self.channels = [] + self.seqMgr = SequenceManager () + self.schema = {} + self.packages = {} + + def schemaListener (self, schemaCb): + """ Optionally register a callback to receive details of the schema of + managed objects in the network. """ + self.schemaCb = schemaCb + + def eventListener (self, eventCb): + """ Optionally register a callback to receive events from managed objects + in the network. """ + self.eventCb = eventCb + + def addChannel (self, channel): + """ Register a new channel. """ + self.channels.append (channel) + codec = Codec (StringIO (), self.spec) + self.setHeader (codec, ord ('H')) + msg = Content (codec.stream.getvalue ()) + msg["content_type"] = "application/octet-stream" + msg["routing_key"] = "agent" + msg["reply_to"] = self.spec.struct ("reply_to") + msg["reply_to"]["exchange_name"] = "amq.direct" + msg["reply_to"]["routing_key"] = channel.replyName + channel.send ("qpid.management", msg) + + def removeChannel (self, channel): + """ Remove a previously added channel from management. """ + self.channels.remove (channel) + + def callMethod (self, channel, userSequence, objId, className, methodName, args=None): + """ Invoke a method on a managed object. """ + self.method (channel, userSequence, objId, className, methodName, args) + + #======================================================== + # Channel API - interacts with registered channel objects + #======================================================== + def topicCb (self, ch, msg): + """ Receive messages via the topic queue of a particular channel. """ + codec = Codec (StringIO (msg.content.body), self.spec) + hdr = self.checkHeader (codec) + if hdr == None: + raise ValueError ("outer header invalid"); + self.parse (ch, codec, hdr[0], hdr[1]) + msg.complete () + + def replyCb (self, ch, msg): + """ Receive messages via the reply queue of a particular channel. """ + codec = Codec (StringIO (msg.content.body), self.spec) + hdr = self.checkHeader (codec) + if hdr == None: + msg.complete () + return + + if hdr[0] == 'm': + self.handleMethodReply (ch, codec) + elif hdr[0] == 'I': + self.handleInit (ch, codec) + elif hdr[0] == 'p': + self.handlePackageInd (ch, codec) + elif hdr[0] == 'q': + self.handleClassInd (ch, codec) + else: + self.parse (ch, codec, hdr[0], hdr[1]) + msg.complete () + + #======================================================== + # Internal Functions + #======================================================== + def setHeader (self, codec, opcode, cls = 0): + """ Compose the header of a management message. """ + codec.encode_octet (ord ('A')) + codec.encode_octet (ord ('M')) + codec.encode_octet (ord ('0')) + codec.encode_octet (ord ('1')) + codec.encode_octet (opcode) + codec.encode_octet (cls) + + def checkHeader (self, codec): + """ Check the header of a management message and extract the opcode and + class. """ + octet = chr (codec.decode_octet ()) + if octet != 'A': + return None + octet = chr (codec.decode_octet ()) + if octet != 'M': + return None + octet = chr (codec.decode_octet ()) + if octet != '0': + return None + octet = chr (codec.decode_octet ()) + if octet != '1': + return None + opcode = chr (codec.decode_octet ()) + cls = chr (codec.decode_octet ()) + return (opcode, cls) def encodeValue (self, codec, value, typecode): + """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: codec.encode_octet (int (value)) elif typecode == 2: @@ -85,10 +248,15 @@ class ManagementMetadata: codec.encode_longlong (long (value)) elif typecode == 11: # BOOL codec.encode_octet (int (value)) + elif typecode == 12: # FLOAT + codec.encode_float (float (value)) + elif typecode == 13: # DOUBLE + codec.encode_double (double (value)) else: raise ValueError ("Invalid type code: %d" % typecode) def decodeValue (self, codec, typecode): + """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.decode_octet () elif typecode == 2: @@ -111,17 +279,119 @@ class ManagementMetadata: data = codec.decode_longlong () elif typecode == 11: # BOOL data = codec.decode_octet () + elif typecode == 12: # FLOAT + data = codec.decode_float () + elif typecode == 13: # DOUBLE + data = codec.decode_double () else: raise ValueError ("Invalid type code: %d" % typecode) return data + + def handleMethodReply (self, ch, codec): + sequence = codec.decode_long () + status = codec.decode_long () + sText = codec.decode_shortstr () + + data = self.seqMgr.release (sequence) + if data == None: + return + + (userSequence, classId, methodName) = data + args = {} + + if status == 0: + schemaClass = self.schema[classId] + ms = schemaClass['M'] + arglist = None + for mname in ms: + (mdesc, margs) = ms[mname] + if mname == methodName: + arglist = margs + if arglist == None: + return + + for arg in arglist: + if arg[2].find("O") != -1: + args[arg[0]] = self.decodeValue (codec, arg[1]) + + if self.methodCb != None: + self.methodCb (ch.context, userSequence, status, sText, args) + + def handleInit (self, ch, codec): + len = codec.decode_short () + data = codec.decode_raw (len) + if self.ctrlCb != None: + self.ctrlCb (ch.context, len, data) + + # Send a package request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('P')) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) - def parseSchema (self, cls, codec): + def handlePackageInd (self, ch, codec): + pname = codec.decode_shortstr () + if pname not in self.packages: + self.packages[pname] = {} + + # Send a class request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('Q')) + sendCodec.encode_shortstr (pname) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) + + def handleClassInd (self, ch, codec): + pname = codec.decode_shortstr () + cname = codec.decode_shortstr () + hash = codec.decode_bin128 () + if pname not in self.packages: + return + + if (cname, hash) not in self.packages[pname]: + # Send a schema request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('S')) + sendCodec.encode_shortstr (pname) + sendCodec.encode_shortstr (cname) + sendCodec.encode_bin128 (hash) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) + + def parseSchema (self, ch, cls, codec): + """ Parse a received schema-description message. """ + packageName = codec.decode_shortstr () className = codec.decode_shortstr () + hash = codec.decode_bin128 () configCount = codec.decode_short () instCount = codec.decode_short () methodCount = codec.decode_short () eventCount = codec.decode_short () + if packageName not in self.packages: + return + if (className, hash) in self.packages[packageName]: + return + + classKey = (packageName, className, hash) + if classKey in self.schema: + return + configs = [] insts = [] methods = {} @@ -213,25 +483,29 @@ class ManagementMetadata: args.append (arg) methods[mname] = (mdesc, args) + schemaClass = {} + schemaClass['C'] = configs + schemaClass['I'] = insts + schemaClass['M'] = methods + schemaClass['E'] = events + self.schema[classKey] = schemaClass - self.schema[(className,'C')] = configs - self.schema[(className,'I')] = insts - self.schema[(className,'M')] = methods - self.schema[(className,'E')] = events - - if self.broker.schema_cb != None: - self.broker.schema_cb[1] (self.broker.schema_cb[0], className, - configs, insts, methods, events) + if self.schemaCb != None: + self.schemaCb (ch.context, classKey, configs, insts, methods, events) - def parseContent (self, cls, codec): - if cls == 'C' and self.broker.config_cb == None: + def parseContent (self, ch, cls, codec): + """ Parse a received content message. """ + if cls == 'C' and self.configCb == None: return - if cls == 'I' and self.broker.inst_cb == None: + if cls == 'I' and self.instCb == None: return - className = codec.decode_shortstr () + packageName = codec.decode_shortstr () + className = codec.decode_shortstr () + hash = codec.decode_bin128 () + classKey = (packageName, className, hash) - if (className,cls) not in self.schema: + if classKey not in self.schema: return row = [] @@ -241,184 +515,49 @@ class ManagementMetadata: timestamps.append (codec.decode_longlong ()) # Create Time timestamps.append (codec.decode_longlong ()) # Delete Time - for element in self.schema[(className,cls)][:]: + schemaClass = self.schema[classKey] + for element in schemaClass[cls][:]: tc = element[1] name = element[0] data = self.decodeValue (codec, tc) row.append ((name, data)) - if cls == 'C': - self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps) + if cls == 'C': + self.configCb (ch.context, classKey, row, timestamps) elif cls == 'I': - self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) - - def parse (self, codec, opcode, cls): - if opcode == 'S': - self.parseSchema (cls, codec) + self.instCb (ch.context, classKey, row, timestamps) + def parse (self, ch, codec, opcode, cls): + """ Parse a message received from the topic queue. """ + if opcode == 's': + self.parseSchema (ch, cls, codec) elif opcode == 'C': - self.parseContent (cls, codec) - + self.parseContent (ch, cls, codec) else: raise ValueError ("Unknown opcode: %c" % opcode); - def __init__ (self, broker): - self.broker = broker - self.schema = {} - - -class ManagedBroker: - """An object of this class represents a connection (over AMQP) to a - single managed broker.""" - - mExchange = "qpid.management" - dExchange = "amq.direct" - - def setHeader (self, codec, opcode, cls = 0): - codec.encode_octet (ord ('A')) - codec.encode_octet (ord ('M')) - codec.encode_octet (ord ('0')) - codec.encode_octet (ord ('1')) - codec.encode_octet (opcode) - codec.encode_octet (cls) - - def checkHeader (self, codec): - octet = chr (codec.decode_octet ()) - if octet != 'A': - return None - octet = chr (codec.decode_octet ()) - if octet != 'M': - return None - octet = chr (codec.decode_octet ()) - if octet != '0': - return None - octet = chr (codec.decode_octet ()) - if octet != '1': - return None - opcode = chr (codec.decode_octet ()) - cls = chr (codec.decode_octet ()) - return (opcode, cls) - - def publish_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - - hdr = self.checkHeader (codec) - if hdr == None: - raise ValueError ("outer header invalid"); - - self.metadata.parse (codec, hdr[0], hdr[1]) - msg.complete () - - def reply_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - hdr = self.checkHeader (codec) - if hdr == None: - msg.complete () - return - if hdr[0] != 'R': - msg.complete () - return - - sequence = codec.decode_long () - status = codec.decode_long () - sText = codec.decode_shortstr () - - data = self.sequenceManager.release (sequence) - if data == None: - msg.complete () - return - - (userSequence, className, methodName) = data - args = {} - - if status == 0: - ms = self.metadata.schema[(className,'M')] - arglist = None - for mname in ms: - (mdesc, margs) = ms[mname] - if mname == methodName: - arglist = margs - if arglist == None: - msg.complete () - return - - for arg in arglist: - if arg[2].find("O") != -1: - args[arg[0]] = self.metadata.decodeValue (codec, arg[1]) - - if self.method_cb != None: - self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args) - - msg.complete () - - def __init__ (self, - host = "localhost", - port = 5672, - username = "guest", - password = "guest", - specfile = "/usr/share/amqp/amqp.0-10-preview.xml"): - - self.spec = qpid.spec.load (specfile) - self.client = None - self.channel = None - self.queue = None - self.rqueue = None - self.qname = None - self.rqname = None - self.metadata = ManagementMetadata (self) - self.sequenceManager = SequenceManager () - self.connected = 0 - self.lastConnectError = None - - # Initialize the callback records - self.status_cb = None - self.schema_cb = None - self.config_cb = None - self.inst_cb = None - self.method_cb = None - - self.host = host - self.port = port - self.username = username - self.password = password - - def statusListener (self, context, callback): - self.status_cb = (context, callback) - - def schemaListener (self, context, callback): - self.schema_cb = (context, callback) - - def configListener (self, context, callback): - self.config_cb = (context, callback) - - def methodListener (self, context, callback): - self.method_cb = (context, callback) - - def instrumentationListener (self, context, callback): - self.inst_cb = (context, callback) - - def method (self, userSequence, objId, className, - methodName, args=None, packageName="qpid"): - codec = Codec (StringIO (), self.spec); - sequence = self.sequenceManager.reserve ((userSequence, className, methodName)) + def method (self, channel, userSequence, objId, classId, methodName, args): + """ Invoke a method on an object """ + codec = Codec (StringIO (), self.spec) + sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) self.setHeader (codec, ord ('M')) codec.encode_long (sequence) # Method sequence id codec.encode_longlong (objId) # ID of object - #codec.encode_shortstr (self.rqname) # name of reply queue # Encode args according to schema - if (className,'M') not in self.metadata.schema: - self.sequenceManager.release (sequence) - raise ValueError ("Unknown class name: %s" % className) + if classId not in self.schema: + self.seqMgr.release (sequence) + raise ValueError ("Unknown class name: %s" % classId) - ms = self.metadata.schema[(className,'M')] - arglist = None + schemaClass = self.schema[classId] + ms = schemaClass['M'] + arglist = None for mname in ms: (mdesc, margs) = ms[mname] if mname == methodName: arglist = margs if arglist == None: - self.sequenceManager.release (sequence) + self.seqMgr.release (sequence) raise ValueError ("Unknown method name: %s" % methodName) for arg in arglist: @@ -427,65 +566,17 @@ class ManagedBroker: if arg[0] in args: value = args[arg[0]] if value == None: - self.sequenceManager.release (sequence) + self.seqMgr.release (sequence) raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) - self.metadata.encodeValue (codec, value, arg[1]) + self.encodeValue (codec, value, arg[1]) + packageName = classId[0] + className = classId[1] msg = Content (codec.stream.getvalue ()) msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "method." + packageName + "." + className + "." + methodName + msg["routing_key"] = "agent.method." + packageName + "." + \ + className + "." + methodName msg["reply_to"] = self.spec.struct ("reply_to") msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = self.rqname - self.channel.message_transfer (destination="qpid.management", content=msg) - - def isConnected (self): - return connected - - def start (self): - print "Connecting to broker %s:%d" % (self.host, self.port) - - try: - self.client = Client (self.host, self.port, self.spec) - self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) - self.channel = self.client.channel (1) - response = self.channel.session_open (detached_lifetime=300) - self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) - self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) - - self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) - self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1) - - self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname, - routing_key="mgmt.#") - self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname, - routing_key=self.rqname) - - self.channel.message_subscribe (queue=self.qname, destination="mdest") - self.channel.message_subscribe (queue=self.rqname, destination="rdest") - - self.queue = self.client.queue ("mdest") - self.queue.listen (self.publish_cb) - - self.channel.message_flow_mode (destination="mdest", mode=1) - self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF) - self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF) - - self.rqueue = self.client.queue ("rdest") - self.rqueue.listen (self.reply_cb) - - self.channel.message_flow_mode (destination="rdest", mode=1) - self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) - self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) - - self.connected = 1 - - except socket.error, e: - print "Socket Error:", e[1] - self.lastConnectError = e - raise - except: - raise - - def stop (self): - pass + msg["reply_to"]["routing_key"] = channel.replyName + channel.send ("qpid.management", msg) |
