diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-02-28 18:55:21 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-02-28 18:55:21 +0000 |
| commit | ac3f850123c903f00c163d6d2dbad22d98aec7a2 (patch) | |
| tree | 2e622a3e9349a9062454d16bf4bca83a5a3e9d90 /python/qpid/management.py | |
| parent | 1820dd421a096ed184a08deee9512e809312fed2 (diff) | |
| download | qpid-python-ac3f850123c903f00c163d6d2dbad22d98aec7a2.tar.gz | |
QPID-820 from tross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@632087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/management.py')
| -rw-r--r-- | python/qpid/management.py | 551 |
1 files changed, 321 insertions, 230 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py index 40de2a5298..b5d992cf5d 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -35,12 +35,14 @@ from threading import Lock class SequenceManager: + """ Manage sequence numbers for asynchronous method calls """ def __init__ (self): self.lock = Lock () self.sequence = 0 self.pending = {} def reserve (self, data): + """ Reserve a unique sequence number """ self.lock.acquire () result = self.sequence self.sequence = self.sequence + 1 @@ -49,6 +51,7 @@ class SequenceManager: return result def release (self, seq): + """ Release a reserved sequence number """ data = None self.lock.acquire () if seq in self.pending: @@ -57,12 +60,172 @@ class SequenceManager: self.lock.release () return data -class ManagementMetadata: - """One instance of this class is created for each ManagedBroker. It - is used to store metadata from the broker which is needed for the - proper interpretation of received management content.""" + +class managementChannel: + """ This class represents a connection to an AMQP broker. """ + + def __init__ (self, ch, topicCb, replyCb, cbContext=None): + """ Given a channel on an established AMQP broker connection, this method + opens a session and performs all of the declarations and bindings needed + to participate in the management protocol. """ + response = ch.session_open (detached_lifetime=300) + self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id) + self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id) + self.qpidChannel = ch + self.tcb = topicCb + self.rcb = replyCb + self.context = cbContext + + ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1) + ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1) + + ch.queue_bind (exchange="qpid.management", + queue=self.topicName, routing_key="mgmt.#") + ch.queue_bind (exchange="amq.direct", + queue=self.replyName, routing_key=self.replyName) + ch.message_subscribe (queue=self.topicName, destination="tdest") + ch.message_subscribe (queue=self.replyName, destination="rdest") + + ch.client.queue ("tdest").listen (self.topicCb) + ch.client.queue ("rdest").listen (self.replyCb) + + ch.message_flow_mode (destination="tdest", mode=1) + ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF) + ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF) + + ch.message_flow_mode (destination="rdest", mode=1) + ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) + ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) + + def topicCb (self, msg): + """ Receive messages via the topic queue on this channel. """ + self.tcb (self, msg) + + def replyCb (self, msg): + """ Receive messages via the reply queue on this channel. """ + self.rcb (self, msg) + + def send (self, exchange, msg): + self.qpidChannel.message_transfer (destination=exchange, content=msg) + + +class managementClient: + """ This class provides an API for access to management data on the AMQP + network. It implements the management protocol and manages the management + schemas as advertised by the various management agents in the network. """ + + #======================================================== + # User API - interacts with the class's user + #======================================================== + def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None): + self.spec = amqpSpec + self.ctrlCb = ctrlCb + self.configCb = configCb + self.instCb = instCb + self.methodCb = methodCb + self.schemaCb = None + self.eventCb = None + self.channels = [] + self.seqMgr = SequenceManager () + self.schema = {} + self.packages = {} + + def schemaListener (self, schemaCb): + """ Optionally register a callback to receive details of the schema of + managed objects in the network. """ + self.schemaCb = schemaCb + + def eventListener (self, eventCb): + """ Optionally register a callback to receive events from managed objects + in the network. """ + self.eventCb = eventCb + + def addChannel (self, channel): + """ Register a new channel. """ + self.channels.append (channel) + codec = Codec (StringIO (), self.spec) + self.setHeader (codec, ord ('H')) + msg = Content (codec.stream.getvalue ()) + msg["content_type"] = "application/octet-stream" + msg["routing_key"] = "agent" + msg["reply_to"] = self.spec.struct ("reply_to") + msg["reply_to"]["exchange_name"] = "amq.direct" + msg["reply_to"]["routing_key"] = channel.replyName + channel.send ("qpid.management", msg) + + def removeChannel (self, channel): + """ Remove a previously added channel from management. """ + self.channels.remove (channel) + + def callMethod (self, channel, userSequence, objId, className, methodName, args=None): + """ Invoke a method on a managed object. """ + self.method (channel, userSequence, objId, className, methodName, args) + + #======================================================== + # Channel API - interacts with registered channel objects + #======================================================== + def topicCb (self, ch, msg): + """ Receive messages via the topic queue of a particular channel. """ + codec = Codec (StringIO (msg.content.body), self.spec) + hdr = self.checkHeader (codec) + if hdr == None: + raise ValueError ("outer header invalid"); + self.parse (ch, codec, hdr[0], hdr[1]) + msg.complete () + + def replyCb (self, ch, msg): + """ Receive messages via the reply queue of a particular channel. """ + codec = Codec (StringIO (msg.content.body), self.spec) + hdr = self.checkHeader (codec) + if hdr == None: + msg.complete () + return + + if hdr[0] == 'm': + self.handleMethodReply (ch, codec) + elif hdr[0] == 'I': + self.handleInit (ch, codec) + elif hdr[0] == 'p': + self.handlePackageInd (ch, codec) + elif hdr[0] == 'q': + self.handleClassInd (ch, codec) + else: + self.parse (ch, codec, hdr[0], hdr[1]) + msg.complete () + + #======================================================== + # Internal Functions + #======================================================== + def setHeader (self, codec, opcode, cls = 0): + """ Compose the header of a management message. """ + codec.encode_octet (ord ('A')) + codec.encode_octet (ord ('M')) + codec.encode_octet (ord ('0')) + codec.encode_octet (ord ('1')) + codec.encode_octet (opcode) + codec.encode_octet (cls) + + def checkHeader (self, codec): + """ Check the header of a management message and extract the opcode and + class. """ + octet = chr (codec.decode_octet ()) + if octet != 'A': + return None + octet = chr (codec.decode_octet ()) + if octet != 'M': + return None + octet = chr (codec.decode_octet ()) + if octet != '0': + return None + octet = chr (codec.decode_octet ()) + if octet != '1': + return None + opcode = chr (codec.decode_octet ()) + cls = chr (codec.decode_octet ()) + return (opcode, cls) def encodeValue (self, codec, value, typecode): + """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: codec.encode_octet (int (value)) elif typecode == 2: @@ -85,10 +248,15 @@ class ManagementMetadata: codec.encode_longlong (long (value)) elif typecode == 11: # BOOL codec.encode_octet (int (value)) + elif typecode == 12: # FLOAT + codec.encode_float (float (value)) + elif typecode == 13: # DOUBLE + codec.encode_double (double (value)) else: raise ValueError ("Invalid type code: %d" % typecode) def decodeValue (self, codec, typecode): + """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.decode_octet () elif typecode == 2: @@ -111,17 +279,119 @@ class ManagementMetadata: data = codec.decode_longlong () elif typecode == 11: # BOOL data = codec.decode_octet () + elif typecode == 12: # FLOAT + data = codec.decode_float () + elif typecode == 13: # DOUBLE + data = codec.decode_double () else: raise ValueError ("Invalid type code: %d" % typecode) return data + + def handleMethodReply (self, ch, codec): + sequence = codec.decode_long () + status = codec.decode_long () + sText = codec.decode_shortstr () + + data = self.seqMgr.release (sequence) + if data == None: + return + + (userSequence, classId, methodName) = data + args = {} + + if status == 0: + schemaClass = self.schema[classId] + ms = schemaClass['M'] + arglist = None + for mname in ms: + (mdesc, margs) = ms[mname] + if mname == methodName: + arglist = margs + if arglist == None: + return + + for arg in arglist: + if arg[2].find("O") != -1: + args[arg[0]] = self.decodeValue (codec, arg[1]) + + if self.methodCb != None: + self.methodCb (ch.context, userSequence, status, sText, args) + + def handleInit (self, ch, codec): + len = codec.decode_short () + data = codec.decode_raw (len) + if self.ctrlCb != None: + self.ctrlCb (ch.context, len, data) + + # Send a package request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('P')) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) - def parseSchema (self, cls, codec): + def handlePackageInd (self, ch, codec): + pname = codec.decode_shortstr () + if pname not in self.packages: + self.packages[pname] = {} + + # Send a class request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('Q')) + sendCodec.encode_shortstr (pname) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) + + def handleClassInd (self, ch, codec): + pname = codec.decode_shortstr () + cname = codec.decode_shortstr () + hash = codec.decode_bin128 () + if pname not in self.packages: + return + + if (cname, hash) not in self.packages[pname]: + # Send a schema request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('S')) + sendCodec.encode_shortstr (pname) + sendCodec.encode_shortstr (cname) + sendCodec.encode_bin128 (hash) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) + + def parseSchema (self, ch, cls, codec): + """ Parse a received schema-description message. """ + packageName = codec.decode_shortstr () className = codec.decode_shortstr () + hash = codec.decode_bin128 () configCount = codec.decode_short () instCount = codec.decode_short () methodCount = codec.decode_short () eventCount = codec.decode_short () + if packageName not in self.packages: + return + if (className, hash) in self.packages[packageName]: + return + + classKey = (packageName, className, hash) + if classKey in self.schema: + return + configs = [] insts = [] methods = {} @@ -213,25 +483,29 @@ class ManagementMetadata: args.append (arg) methods[mname] = (mdesc, args) + schemaClass = {} + schemaClass['C'] = configs + schemaClass['I'] = insts + schemaClass['M'] = methods + schemaClass['E'] = events + self.schema[classKey] = schemaClass - self.schema[(className,'C')] = configs - self.schema[(className,'I')] = insts - self.schema[(className,'M')] = methods - self.schema[(className,'E')] = events - - if self.broker.schema_cb != None: - self.broker.schema_cb[1] (self.broker.schema_cb[0], className, - configs, insts, methods, events) + if self.schemaCb != None: + self.schemaCb (ch.context, classKey, configs, insts, methods, events) - def parseContent (self, cls, codec): - if cls == 'C' and self.broker.config_cb == None: + def parseContent (self, ch, cls, codec): + """ Parse a received content message. """ + if cls == 'C' and self.configCb == None: return - if cls == 'I' and self.broker.inst_cb == None: + if cls == 'I' and self.instCb == None: return - className = codec.decode_shortstr () + packageName = codec.decode_shortstr () + className = codec.decode_shortstr () + hash = codec.decode_bin128 () + classKey = (packageName, className, hash) - if (className,cls) not in self.schema: + if classKey not in self.schema: return row = [] @@ -241,184 +515,49 @@ class ManagementMetadata: timestamps.append (codec.decode_longlong ()) # Create Time timestamps.append (codec.decode_longlong ()) # Delete Time - for element in self.schema[(className,cls)][:]: + schemaClass = self.schema[classKey] + for element in schemaClass[cls][:]: tc = element[1] name = element[0] data = self.decodeValue (codec, tc) row.append ((name, data)) - if cls == 'C': - self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps) + if cls == 'C': + self.configCb (ch.context, classKey, row, timestamps) elif cls == 'I': - self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) - - def parse (self, codec, opcode, cls): - if opcode == 'S': - self.parseSchema (cls, codec) + self.instCb (ch.context, classKey, row, timestamps) + def parse (self, ch, codec, opcode, cls): + """ Parse a message received from the topic queue. """ + if opcode == 's': + self.parseSchema (ch, cls, codec) elif opcode == 'C': - self.parseContent (cls, codec) - + self.parseContent (ch, cls, codec) else: raise ValueError ("Unknown opcode: %c" % opcode); - def __init__ (self, broker): - self.broker = broker - self.schema = {} - - -class ManagedBroker: - """An object of this class represents a connection (over AMQP) to a - single managed broker.""" - - mExchange = "qpid.management" - dExchange = "amq.direct" - - def setHeader (self, codec, opcode, cls = 0): - codec.encode_octet (ord ('A')) - codec.encode_octet (ord ('M')) - codec.encode_octet (ord ('0')) - codec.encode_octet (ord ('1')) - codec.encode_octet (opcode) - codec.encode_octet (cls) - - def checkHeader (self, codec): - octet = chr (codec.decode_octet ()) - if octet != 'A': - return None - octet = chr (codec.decode_octet ()) - if octet != 'M': - return None - octet = chr (codec.decode_octet ()) - if octet != '0': - return None - octet = chr (codec.decode_octet ()) - if octet != '1': - return None - opcode = chr (codec.decode_octet ()) - cls = chr (codec.decode_octet ()) - return (opcode, cls) - - def publish_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - - hdr = self.checkHeader (codec) - if hdr == None: - raise ValueError ("outer header invalid"); - - self.metadata.parse (codec, hdr[0], hdr[1]) - msg.complete () - - def reply_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - hdr = self.checkHeader (codec) - if hdr == None: - msg.complete () - return - if hdr[0] != 'R': - msg.complete () - return - - sequence = codec.decode_long () - status = codec.decode_long () - sText = codec.decode_shortstr () - - data = self.sequenceManager.release (sequence) - if data == None: - msg.complete () - return - - (userSequence, className, methodName) = data - args = {} - - if status == 0: - ms = self.metadata.schema[(className,'M')] - arglist = None - for mname in ms: - (mdesc, margs) = ms[mname] - if mname == methodName: - arglist = margs - if arglist == None: - msg.complete () - return - - for arg in arglist: - if arg[2].find("O") != -1: - args[arg[0]] = self.metadata.decodeValue (codec, arg[1]) - - if self.method_cb != None: - self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args) - - msg.complete () - - def __init__ (self, - host = "localhost", - port = 5672, - username = "guest", - password = "guest", - specfile = "/usr/share/amqp/amqp.0-10-preview.xml"): - - self.spec = qpid.spec.load (specfile) - self.client = None - self.channel = None - self.queue = None - self.rqueue = None - self.qname = None - self.rqname = None - self.metadata = ManagementMetadata (self) - self.sequenceManager = SequenceManager () - self.connected = 0 - self.lastConnectError = None - - # Initialize the callback records - self.status_cb = None - self.schema_cb = None - self.config_cb = None - self.inst_cb = None - self.method_cb = None - - self.host = host - self.port = port - self.username = username - self.password = password - - def statusListener (self, context, callback): - self.status_cb = (context, callback) - - def schemaListener (self, context, callback): - self.schema_cb = (context, callback) - - def configListener (self, context, callback): - self.config_cb = (context, callback) - - def methodListener (self, context, callback): - self.method_cb = (context, callback) - - def instrumentationListener (self, context, callback): - self.inst_cb = (context, callback) - - def method (self, userSequence, objId, className, - methodName, args=None, packageName="qpid"): - codec = Codec (StringIO (), self.spec); - sequence = self.sequenceManager.reserve ((userSequence, className, methodName)) + def method (self, channel, userSequence, objId, classId, methodName, args): + """ Invoke a method on an object """ + codec = Codec (StringIO (), self.spec) + sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) self.setHeader (codec, ord ('M')) codec.encode_long (sequence) # Method sequence id codec.encode_longlong (objId) # ID of object - #codec.encode_shortstr (self.rqname) # name of reply queue # Encode args according to schema - if (className,'M') not in self.metadata.schema: - self.sequenceManager.release (sequence) - raise ValueError ("Unknown class name: %s" % className) + if classId not in self.schema: + self.seqMgr.release (sequence) + raise ValueError ("Unknown class name: %s" % classId) - ms = self.metadata.schema[(className,'M')] - arglist = None + schemaClass = self.schema[classId] + ms = schemaClass['M'] + arglist = None for mname in ms: (mdesc, margs) = ms[mname] if mname == methodName: arglist = margs if arglist == None: - self.sequenceManager.release (sequence) + self.seqMgr.release (sequence) raise ValueError ("Unknown method name: %s" % methodName) for arg in arglist: @@ -427,65 +566,17 @@ class ManagedBroker: if arg[0] in args: value = args[arg[0]] if value == None: - self.sequenceManager.release (sequence) + self.seqMgr.release (sequence) raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) - self.metadata.encodeValue (codec, value, arg[1]) + self.encodeValue (codec, value, arg[1]) + packageName = classId[0] + className = classId[1] msg = Content (codec.stream.getvalue ()) msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "method." + packageName + "." + className + "." + methodName + msg["routing_key"] = "agent.method." + packageName + "." + \ + className + "." + methodName msg["reply_to"] = self.spec.struct ("reply_to") msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = self.rqname - self.channel.message_transfer (destination="qpid.management", content=msg) - - def isConnected (self): - return connected - - def start (self): - print "Connecting to broker %s:%d" % (self.host, self.port) - - try: - self.client = Client (self.host, self.port, self.spec) - self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) - self.channel = self.client.channel (1) - response = self.channel.session_open (detached_lifetime=300) - self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) - self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) - - self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) - self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1) - - self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname, - routing_key="mgmt.#") - self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname, - routing_key=self.rqname) - - self.channel.message_subscribe (queue=self.qname, destination="mdest") - self.channel.message_subscribe (queue=self.rqname, destination="rdest") - - self.queue = self.client.queue ("mdest") - self.queue.listen (self.publish_cb) - - self.channel.message_flow_mode (destination="mdest", mode=1) - self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF) - self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF) - - self.rqueue = self.client.queue ("rdest") - self.rqueue.listen (self.reply_cb) - - self.channel.message_flow_mode (destination="rdest", mode=1) - self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) - self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) - - self.connected = 1 - - except socket.error, e: - print "Socket Error:", e[1] - self.lastConnectError = e - raise - except: - raise - - def stop (self): - pass + msg["reply_to"]["routing_key"] = channel.replyName + channel.send ("qpid.management", msg) |
