From 0745cd0d738fbec7bd48529d7024535fc3301931 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 10 Dec 2007 20:22:23 +0000 Subject: Patches from Ted Ross QPID-697 Fixed access-rights constants for management schema. Added mutex to fix problems associated with concurrent invocation of accessors for queue statistics. Removed queue schema content that is not relevant to QPID. QPID-698 This patch creates a new subdirectory in python called "mgmt-cli". python/mgmt-cli/main.py can be executed from the shell. If no arguments are supplied, it attempts to connect to the broker at localhost:5672. The first argument is the hostname for the target broker and the second (optional) argument is the TCP port (defaults to 5672). It is assumed that the AMQP spec file is in the following location: /usr/share/amqp/amqp.0-10-preview.xml It is also required that the qpid/python directory be in the PYTHONPATH environment variable. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@603034 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/management.py | 239 ++++++++++++++++------ python/qpid/management.py.rej | 457 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 636 insertions(+), 60 deletions(-) create mode 100644 python/qpid/management.py.rej (limited to 'python/qpid') diff --git a/python/qpid/management.py b/python/qpid/management.py index a5ad997a24..8373ecceb7 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -31,17 +31,74 @@ from qpid.client import Client from qpid.content import Content from cStringIO import StringIO from codec import Codec, EOF +from threading import Lock + + +class SequenceManager: + def __init__ (self): + self.lock = Lock () + self.sequence = 0 + self.pending = {} + + def reserve (self, data): + self.lock.acquire () + result = self.sequence + self.sequence = self.sequence + 1 + self.pending[result] = data + self.lock.release () + return result + + def release (self, seq): + data = None + self.lock.acquire () + if seq in self.pending: + data = self.pending[seq] + del self.pending[seq] + self.lock.release () + return data -#=================================================================== -# ManagementMetadata -# -# One instance of this class is created for each ManagedBroker. It -# is used to store metadata from the broker which is needed for the -# proper interpretation of recevied management content. -# -#=================================================================== class ManagementMetadata: - + """One instance of this class is created for each ManagedBroker. It + is used to store metadata from the broker which is needed for the + proper interpretation of received management content.""" + + def encodeValue (self, codec, value, typecode): + if typecode == 1: + codec.encode_octet (int (value)) + elif typecode == 2: + codec.encode_short (int (value)) + elif typecode == 3: + codec.encode_long (long (value)) + elif typecode == 4: + codec.encode_longlong (long (value)) + elif typecode == 5: + codec.encode_octet (int (value)) + elif typecode == 6: + codec.encode_shortstr (value) + elif typecode == 7: + codec.encode_longstr (value) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def decodeValue (self, codec, typecode): + if typecode == 1: + data = codec.decode_octet () + elif typecode == 2: + data = codec.decode_short () + elif typecode == 3: + data = codec.decode_long () + elif typecode == 4: + data = codec.decode_longlong () + elif typecode == 5: + data = codec.decode_octet () + elif typecode == 6: + data = codec.decode_shortstr () + elif typecode == 7: + data = codec.decode_longstr () + else: + raise ValueError ("Invalid type code: %d" % typecode) + return data + def parseSchema (self, cls, codec): className = codec.decode_shortstr () configCount = codec.decode_short () @@ -100,12 +157,56 @@ class ManagementMetadata: inst = (name, type, unit, desc) insts.append (inst) - # TODO: Handle notification of schema change outbound + for idx in range (methodCount): + ft = codec.decode_table () + mname = ft["name"] + argCount = ft["argCount"] + if "desc" in ft: + mdesc = ft["desc"] + else: + mdesc = None + + args = [] + for aidx in range (argCount): + ft = codec.decode_table () + name = ft["name"] + type = ft["type"] + dir = ft["dir"].upper () + unit = None + min = None + max = None + maxlen = None + desc = None + default = None + + for key, value in ft.items (): + if key == "unit": + unit = value + elif key == "min": + min = value + elif key == "max": + max = value + elif key == "maxlen": + maxlen = value + elif key == "desc": + desc = value + elif key == "default": + default = value + + arg = (name, type, dir, unit, desc, min, max, maxlen, default) + args.append (arg) + methods.append ((mname, mdesc, args)) + + self.schema[(className,'C')] = configs self.schema[(className,'I')] = insts self.schema[(className,'M')] = methods self.schema[(className,'E')] = events + if self.broker.schema_cb != None: + self.broker.schema_cb[1] (self.broker.schema_cb[0], className, + configs, insts, methods, events) + def parseContent (self, cls, codec): if cls == 'C' and self.broker.config_cb == None: return @@ -127,22 +228,7 @@ class ManagementMetadata: for element in self.schema[(className,cls)][:]: tc = element[1] name = element[0] - if tc == 1: # TODO: Define constants for these - data = codec.decode_octet () - elif tc == 2: - data = codec.decode_short () - elif tc == 3: - data = codec.decode_long () - elif tc == 4: - data = codec.decode_longlong () - elif tc == 5: - data = codec.decode_octet () - elif tc == 6: - data = codec.decode_shortstr () - elif tc == 7: - data = codec.decode_longstr () - else: - raise ValueError ("Invalid type code: %d" % tc) + data = self.decodeValue (codec, tc) row.append ((name, data)) if cls == 'C': @@ -168,14 +254,9 @@ class ManagementMetadata: self.schema = {} -#=================================================================== -# ManagedBroker -# -# An object of this class represents a connection (over AMQP) to a -# single managed broker. -# -#=================================================================== class ManagedBroker: + """An object of this class represents a connection (over AMQP) to a + single managed broker.""" mExchange = "qpid.management" dExchange = "amq.direct" @@ -205,18 +286,35 @@ class ManagedBroker: msg.complete () def reply_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - methodId = codec.decode_long () + codec = Codec (StringIO (msg.content.body), self.spec) + sequence = codec.decode_long () status = codec.decode_long () sText = codec.decode_shortstr () - args = {} + data = self.sequenceManager.release (sequence) + if data == None: + msg.complete () + return + + (userSequence, className, methodName) = data + if status == 0: - args["sequence"] = codec.decode_long () - args["body"] = codec.decode_longstr () + ms = self.metadata.schema[(className,'M')] + arglist = None + for (mname, mdesc, margs) in ms: + if mname == methodName: + arglist = margs + if arglist == None: + msg.complete () + return + + args = {} + for arg in arglist: + if arg[2].find("O") != -1: + args[arg[0]] = self.metadata.decodeValue (codec, arg[1]) if self.method_cb != None: - self.method_cb[1] (self.method_cb[0], methodId, status, sText, args) + self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args) msg.complete () @@ -225,17 +323,18 @@ class ManagedBroker: port = 5672, username = "guest", password = "guest", - specfile = "../specs/amqp.0-10-preview.xml"): - - self.spec = qpid.spec.load (specfile) - self.client = None - self.channel = None - self.queue = None - self.rqueue = None - self.qname = None - self.rqname = None - self.metadata = ManagementMetadata (self) - self.connected = 0 + specfile = "/usr/share/amqp/amqp.0-10-preview.xml"): + + self.spec = qpid.spec.load (specfile) + self.client = None + self.channel = None + self.queue = None + self.rqueue = None + self.qname = None + self.rqname = None + self.metadata = ManagementMetadata (self) + self.sequenceManager = SequenceManager () + self.connected = 0 self.lastConnectError = None # Initialize the callback records @@ -265,17 +364,37 @@ class ManagedBroker: def instrumentationListener (self, context, callback): self.inst_cb = (context, callback) - def method (self, methodId, objId, className, + def method (self, userSequence, objId, className, methodName, args=None, packageName="qpid"): codec = Codec (StringIO (), self.spec); - codec.encode_long (methodId) - codec.encode_longlong (objId) - codec.encode_shortstr (self.rqname) - - # TODO: Encode args according to schema - if methodName == "echo": - codec.encode_long (args["sequence"]) - codec.encode_longstr (args["body"]) + sequence = self.sequenceManager.reserve ((userSequence, className, methodName)) + codec.encode_long (sequence) # Method sequence id + codec.encode_longlong (objId) # ID of object + codec.encode_shortstr (self.rqname) # name of reply queue + + # Encode args according to schema + if (className,'M') not in self.metadata.schema: + self.sequenceManager.release (sequence) + raise ValueError ("Unknown class name: %s" % className) + + ms = self.metadata.schema[(className,'M')] + arglist = None + for (mname, mdesc, margs) in ms: + if mname == methodName: + arglist = margs + if arglist == None: + self.sequenceManager.release (sequence) + raise ValueError ("Unknown method name: %s" % methodName) + + for arg in arglist: + if arg[2].find("I") != -1: + value = arg[8] # default + if arg[0] in args: + value = args[arg[0]] + if value == None: + self.sequenceManager.release (sequence) + raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) + self.metadata.encodeValue (codec, value, arg[1]) msg = Content (codec.stream.getvalue ()) msg["content_type"] = "application/octet-stream" @@ -325,7 +444,7 @@ class ManagedBroker: self.connected = 1 except socket.error, e: - print "Socket Error Detected:", e[1] + print "Socket Error:", e[1] self.lastConnectError = e raise except: diff --git a/python/qpid/management.py.rej b/python/qpid/management.py.rej new file mode 100644 index 0000000000..28d172abe5 --- /dev/null +++ b/python/qpid/management.py.rej @@ -0,0 +1,457 @@ +*************** +*** 18,24 **** + # + + """ +- Management classes for AMQP + """ + + import qpid +--- 18,24 ---- + # + + """ ++ Management API for Qpid + """ + + import qpid +*************** +*** 42,91 **** + #=================================================================== + class ManagementMetadata: + +- def parseSchema (self, cls, oid, len, codec): +- #print "Schema Record: objId=", oid + +- config = [] +- inst = [] +- while 1: +- flags = codec.decode_octet () +- if flags == 0x80: +- break + +- tc = codec.decode_octet () +- name = codec.decode_shortstr () +- desc = codec.decode_shortstr () + +- if flags & 1: # TODO: Define constants for these +- config.append ((tc, name, desc)) +- if (flags & 1) == 0 or (flags & 2) == 2: +- inst.append ((tc, name, desc)) + + # TODO: Handle notification of schema change outbound +- self.schema[(oid,'C')] = config +- self.schema[(oid,'I')] = inst + +- def parseContent (self, cls, oid, len, codec): +- #print "Content Record: Class=", cls, ", objId=", oid +- + if cls == 'C' and self.broker.config_cb == None: + return + if cls == 'I' and self.broker.inst_cb == None: + return + +- if (oid,cls) not in self.schema: + return + + row = [] + timestamps = [] + +- timestamps.append (codec.decode_longlong ()); # Current Time +- timestamps.append (codec.decode_longlong ()); # Create Time +- timestamps.append (codec.decode_longlong ()); # Delete Time + +- for element in self.schema[(oid,cls)][:]: +- tc = element[0] +- name = element[1] + if tc == 1: # TODO: Define constants for these + data = codec.decode_octet () + elif tc == 2: +--- 42,132 ---- + #=================================================================== + class ManagementMetadata: + ++ def parseSchema (self, cls, codec): ++ className = codec.decode_shortstr () ++ configCount = codec.decode_short () ++ instCount = codec.decode_short () ++ methodCount = codec.decode_short () ++ eventCount = codec.decode_short () + ++ configs = [] ++ insts = [] ++ methods = [] ++ events = [] + ++ configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) ++ insts.append (("id", 4, None, None)) + ++ for idx in range (configCount): ++ ft = codec.decode_table () ++ name = ft["name"] ++ type = ft["type"] ++ access = ft["access"] ++ index = ft["index"] ++ unit = None ++ min = None ++ max = None ++ maxlen = None ++ desc = None + ++ for key, value in ft.items (): ++ if key == "unit": ++ unit = value ++ elif key == "min": ++ min = value ++ elif key == "max": ++ max = value ++ elif key == "maxlen": ++ maxlen = value ++ elif key == "desc": ++ desc = value ++ ++ config = (name, type, unit, desc, access, index, min, max, maxlen) ++ configs.append (config) ++ ++ for idx in range (instCount): ++ ft = codec.decode_table () ++ name = ft["name"] ++ type = ft["type"] ++ unit = None ++ desc = None ++ ++ for key, value in ft.items (): ++ if key == "unit": ++ unit = value ++ elif key == "desc": ++ desc = value ++ ++ inst = (name, type, unit, desc) ++ insts.append (inst) ++ + # TODO: Handle notification of schema change outbound ++ self.schema[(className,'C')] = configs ++ self.schema[(className,'I')] = insts ++ self.schema[(className,'M')] = methods ++ self.schema[(className,'E')] = events + ++ def parseContent (self, cls, codec): + if cls == 'C' and self.broker.config_cb == None: + return + if cls == 'I' and self.broker.inst_cb == None: + return + ++ className = codec.decode_shortstr () ++ ++ if (className,cls) not in self.schema: + return + + row = [] + timestamps = [] + ++ timestamps.append (codec.decode_longlong ()) # Current Time ++ timestamps.append (codec.decode_longlong ()) # Create Time ++ timestamps.append (codec.decode_longlong ()) # Delete Time + ++ for element in self.schema[(className,cls)][:]: ++ tc = element[1] ++ name = element[0] + if tc == 1: # TODO: Define constants for these + data = codec.decode_octet () + elif tc == 2: +*************** +*** 98,130 **** + data = codec.decode_octet () + elif tc == 6: + data = codec.decode_shortstr () + row.append ((name, data)) + + if cls == 'C': +- self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps) +- if cls == 'I': +- self.broker.inst_cb[1] (self.broker.inst_cb[0], oid, row, timestamps) + + def parse (self, codec): +- try: +- opcode = chr (codec.decode_octet ()) +- except EOF: +- return 0 + +- cls = chr (codec.decode_octet ()) +- oid = codec.decode_short () +- len = codec.decode_long () +- +- if len < 8: +- raise ValueError ("parse error: value of length field too small") +- + if opcode == 'S': +- self.parseSchema (cls, oid, len, codec) + +- if opcode == 'C': +- self.parseContent (cls, oid, len, codec) + +- return 1 + + def __init__ (self, broker): + self.broker = broker +--- 139,167 ---- + data = codec.decode_octet () + elif tc == 6: + data = codec.decode_shortstr () ++ elif tc == 7: ++ data = codec.decode_longstr () ++ else: ++ raise ValueError ("Invalid type code: %d" % tc) + row.append ((name, data)) + + if cls == 'C': ++ self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps) ++ elif cls == 'I': ++ self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) + + def parse (self, codec): ++ opcode = chr (codec.decode_octet ()) ++ cls = chr (codec.decode_octet ()) + + if opcode == 'S': ++ self.parseSchema (cls, codec) + ++ elif opcode == 'C': ++ self.parseContent (cls, codec) + ++ else: ++ raise ValueError ("Unknown opcode: %c" % opcode); + + def __init__ (self, broker): + self.broker = broker +*************** +*** 140,146 **** + #=================================================================== + class ManagedBroker: + +- exchange = "qpid.management" + + def checkHeader (self, codec): + octet = chr (codec.decode_octet ()) +--- 177,184 ---- + #=================================================================== + class ManagedBroker: + ++ mExchange = "qpid.management" ++ dExchange = "amq.direct" + + def checkHeader (self, codec): + octet = chr (codec.decode_octet ()) +*************** +*** 157,225 **** + return 0 + return 1 + +- def receive_cb (self, msg): + codec = Codec (StringIO (msg.content.body), self.spec) + + if self.checkHeader (codec) == 0: + raise ValueError ("outer header invalid"); + +- while self.metadata.parse (codec): +- pass + + msg.complete () + +- def __init__ (self, host = "localhost", port = 5672, +- username = "guest", password = "guest"): + +- self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml") +- self.client = None +- self.channel = None +- self.queue = None +- self.qname = None +- self.metadata = ManagementMetadata (self) + + # Initialize the callback records + self.schema_cb = None + self.config_cb = None + self.inst_cb = None + + self.host = host + self.port = port + self.username = username + self.password = password + + def schemaListener (self, context, callback): + self.schema_cb = (context, callback) + + def configListener (self, context, callback): + self.config_cb = (context, callback) + + def instrumentationListener (self, context, callback): + self.inst_cb = (context, callback) + + def start (self): +- print "Connecting to broker", self.host + + try: + self.client = Client (self.host, self.port, self.spec) + self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) + self.channel = self.client.channel (1) +- response = self.channel.session_open (detached_lifetime=300) +- self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id) + +- self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) +- self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname, +- routing_key="mgmt") +- self.channel.message_subscribe (queue=self.qname, destination="dest") +- self.queue = self.client.queue ("dest") +- self.queue.listen (self.receive_cb) + +- self.channel.message_flow_mode (destination="dest", mode=1) +- self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF) +- self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF) + + except socket.error, e: + print "Socket Error Detected:", e[1] + raise + except: + raise +--- 195,335 ---- + return 0 + return 1 + ++ def publish_cb (self, msg): + codec = Codec (StringIO (msg.content.body), self.spec) + + if self.checkHeader (codec) == 0: + raise ValueError ("outer header invalid"); + ++ self.metadata.parse (codec) ++ msg.complete () + ++ def reply_cb (self, msg): ++ codec = Codec (StringIO (msg.content.body), self.spec) ++ methodId = codec.decode_long () ++ status = codec.decode_long () ++ sText = codec.decode_shortstr () ++ ++ args = {} ++ if status == 0: ++ args["sequence"] = codec.decode_long () ++ args["body"] = codec.decode_longstr () ++ ++ if self.method_cb != None: ++ self.method_cb[1] (self.method_cb[0], methodId, status, sText, args) ++ + msg.complete () + ++ def __init__ (self, ++ host = "localhost", ++ port = 5672, ++ username = "guest", ++ password = "guest", ++ specfile = "../specs/amqp.0-10-preview.xml"): + ++ self.spec = qpid.spec.load (specfile) ++ self.client = None ++ self.channel = None ++ self.queue = None ++ self.rqueue = None ++ self.qname = None ++ self.rqname = None ++ self.metadata = ManagementMetadata (self) ++ self.connected = 0 ++ self.lastConnectError = None + + # Initialize the callback records ++ self.status_cb = None + self.schema_cb = None + self.config_cb = None + self.inst_cb = None ++ self.method_cb = None + + self.host = host + self.port = port + self.username = username + self.password = password + ++ def statusListener (self, context, callback): ++ self.status_cb = (context, callback) ++ + def schemaListener (self, context, callback): + self.schema_cb = (context, callback) + + def configListener (self, context, callback): + self.config_cb = (context, callback) + ++ def methodListener (self, context, callback): ++ self.method_cb = (context, callback) ++ + def instrumentationListener (self, context, callback): + self.inst_cb = (context, callback) + ++ def method (self, methodId, objId, className, ++ methodName, args=None, packageName="qpid"): ++ codec = Codec (StringIO (), self.spec); ++ codec.encode_long (methodId) ++ codec.encode_longlong (objId) ++ codec.encode_shortstr (self.rqname) ++ ++ # TODO: Encode args according to schema ++ if methodName == "echo": ++ codec.encode_long (args["sequence"]) ++ codec.encode_longstr (args["body"]) ++ ++ msg = Content (codec.stream.getvalue ()) ++ msg["content_type"] = "application/octet-stream" ++ msg["routing_key"] = "method." + packageName + "." + className + "." + methodName ++ msg["reply_to"] = self.spec.struct ("reply_to") ++ self.channel.message_transfer (destination="qpid.management", content=msg) ++ ++ def isConnected (self): ++ return connected ++ + def start (self): ++ print "Connecting to broker %s:%d" % (self.host, self.port) + + try: + self.client = Client (self.host, self.port, self.spec) + self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) + self.channel = self.client.channel (1) ++ response = self.channel.session_open (detached_lifetime=10) ++ self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) ++ self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) + ++ self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) ++ self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1) ++ ++ self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname, ++ routing_key="mgmt.#") ++ self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname, ++ routing_key=self.rqname) + ++ self.channel.message_subscribe (queue=self.qname, destination="mdest") ++ self.channel.message_subscribe (queue=self.rqname, destination="rdest") + ++ self.queue = self.client.queue ("mdest") ++ self.queue.listen (self.publish_cb) ++ ++ self.channel.message_flow_mode (destination="mdest", mode=1) ++ self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF) ++ self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF) ++ ++ self.rqueue = self.client.queue ("rdest") ++ self.rqueue.listen (self.reply_cb) ++ ++ self.channel.message_flow_mode (destination="rdest", mode=1) ++ self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) ++ self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) ++ ++ self.connected = 1 ++ + except socket.error, e: + print "Socket Error Detected:", e[1] ++ self.lastConnectError = e + raise + except: + raise ++ ++ def stop (self): ++ pass -- cgit v1.2.1