diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-04-22 16:11:34 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-04-22 16:11:34 +0000 |
| commit | e06aa805cfe24b8edf619a6a535883f94589ac35 (patch) | |
| tree | 4b886461816ca97127aae8a9639ddad74d77bd46 /python/qpid/management.py | |
| parent | 61959e29ee69f9cebb61b845272eededaec6f11e (diff) | |
| download | qpid-python-e06aa805cfe24b8edf619a6a535883f94589ac35.tar.gz | |
QPID-947: update cpp and python management to 0-10 final
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650565 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/management.py')
| -rw-r--r-- | python/qpid/management.py | 274 |
1 files changed, 126 insertions, 148 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py index 6b25d5ea08..3a7a564e19 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -25,12 +25,10 @@ import qpid import struct import socket from threading import Thread -from message import Message +from datatypes import Message, RangedSet from time import time -from qpid.client import Client -from qpid.content import Content from cStringIO import StringIO -from codec import Codec, EOF +from codec010 import StringCodec as Codec from threading import Lock, Condition @@ -83,40 +81,39 @@ class methodResult: class managementChannel: """ This class represents a connection to an AMQP broker. """ - def __init__ (self, ch, topicCb, replyCb, cbContext, _detlife=0): + def __init__ (self, ssn, topicCb, replyCb, cbContext, _detlife=0): """ Given a channel on an established AMQP broker connection, this method opens a session and performs all of the declarations and bindings needed to participate in the management protocol. """ - response = ch.session_open (detached_lifetime=_detlife) - self.sessionId = response.session_id - self.topicName = "mgmt-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id) - self.replyName = "repl-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id) - self.qpidChannel = ch + self.sessionId = ssn.name + self.topicName = "mgmt-%s" % self.sessionId + self.replyName = "repl-%s" % self.sessionId + self.qpidChannel = ssn self.tcb = topicCb self.rcb = replyCb self.context = cbContext self.reqsOutstanding = 0 - ch.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True) - ch.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True) + ssn.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True) + ssn.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True) - ch.queue_bind (exchange="qpid.management", - queue=self.topicName, routing_key="mgmt.#") - ch.queue_bind (exchange="amq.direct", - queue=self.replyName, routing_key=self.replyName) - ch.message_subscribe (queue=self.topicName, destination="tdest") - ch.message_subscribe (queue=self.replyName, destination="rdest") + ssn.exchange_bind (exchange="qpid.management", + queue=self.topicName, binding_key="mgmt.#") + ssn.exchange_bind (exchange="amq.direct", + queue=self.replyName, binding_key=self.replyName) + ssn.message_subscribe (queue=self.topicName, destination="tdest") + ssn.message_subscribe (queue=self.replyName, destination="rdest") - ch.client.queue ("tdest").listen (self.topicCb) - ch.client.queue ("rdest").listen (self.replyCb) + ssn.incoming ("tdest").listen (self.topicCb) + ssn.incoming ("rdest").listen (self.replyCb) - ch.message_flow_mode (destination="tdest", mode=1) - ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF) - ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF) + ssn.message_set_flow_mode (destination="tdest", flow_mode=1) + ssn.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF) + ssn.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF) - ch.message_flow_mode (destination="rdest", mode=1) - ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) - ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) + ssn.message_set_flow_mode (destination="rdest", flow_mode=1) + ssn.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) + ssn.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) def topicCb (self, msg): """ Receive messages via the topic queue on this channel. """ @@ -127,7 +124,18 @@ class managementChannel: self.rcb (self, msg) def send (self, exchange, msg): - self.qpidChannel.message_transfer (destination=exchange, content=msg) + self.qpidChannel.message_transfer (destination=exchange, message=msg) + + def accept (self, msg): + self.qpidChannel.message_accept(RangedSet(msg.id)) + + def message (self, body, routing_key="agent"): + dp = self.qpidChannel.delivery_properties() + dp.routing_key = routing_key + mp = self.qpidChannel.message_properties() + mp.content_type = "application/octet-stream" + mp.reply_to = self.qpidChannel.reply_to("amq.direct", self.replyName) + return Message(dp, mp, body) class managementClient: @@ -177,14 +185,9 @@ class managementClient: self.channels.append (mch) self.incOutstanding (mch) - codec = Codec (StringIO (), self.spec) + codec = Codec (self.spec) self.setHeader (codec, ord ('B')) - msg = Content (codec.stream.getvalue ()) - msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "agent" - msg["reply_to"] = self.spec.struct ("reply_to") - msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = mch.replyName + msg = mch.message(codec.encoded) mch.send ("qpid.management", msg) return mch @@ -198,17 +201,12 @@ class managementClient: def getObjects (self, channel, userSequence, className): """ Request immediate content from broker """ - codec = Codec (StringIO (), self.spec) + codec = Codec (self.spec) self.setHeader (codec, ord ('G'), userSequence) ft = {} ft["_class"] = className - codec.encode_table (ft) - msg = Content (codec.stream.getvalue ()) - msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "agent" - msg["reply_to"] = self.spec.struct ("reply_to") - msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = channel.replyName + codec.write_map (ft) + msg = channel.message(codec.encoded) channel.send ("qpid.management", msg) def syncWaitForStable (self, channel): @@ -270,19 +268,19 @@ class managementClient: #======================================================== def topicCb (self, ch, msg): """ Receive messages via the topic queue of a particular channel. """ - codec = Codec (StringIO (msg.content.body), self.spec) + codec = Codec (self.spec, msg.body) hdr = self.checkHeader (codec) if hdr == None: raise ValueError ("outer header invalid"); self.parse (ch, codec, hdr[0], hdr[1]) - msg.complete () + ch.accept(msg) def replyCb (self, ch, msg): """ Receive messages via the reply queue of a particular channel. """ - codec = Codec (StringIO (msg.content.body), self.spec) + codec = Codec (self.spec, msg.body) hdr = self.checkHeader (codec) if hdr == None: - msg.complete () + ch.accept(msg) return if hdr[0] == 'm': @@ -297,102 +295,102 @@ class managementClient: self.handleClassInd (ch, codec) else: self.parse (ch, codec, hdr[0], hdr[1]) - msg.complete () + ch.accept(msg) #======================================================== # Internal Functions #======================================================== def setHeader (self, codec, opcode, seq = 0): """ Compose the header of a management message. """ - codec.encode_octet (ord ('A')) - codec.encode_octet (ord ('M')) - codec.encode_octet (ord ('1')) - codec.encode_octet (opcode) - codec.encode_long (seq) + codec.write_uint8 (ord ('A')) + codec.write_uint8 (ord ('M')) + codec.write_uint8 (ord ('1')) + codec.write_uint8 (opcode) + codec.write_uint32 (seq) def checkHeader (self, codec): """ Check the header of a management message and extract the opcode and class. """ - octet = chr (codec.decode_octet ()) + octet = chr (codec.read_uint8 ()) if octet != 'A': return None - octet = chr (codec.decode_octet ()) + octet = chr (codec.read_uint8 ()) if octet != 'M': return None - octet = chr (codec.decode_octet ()) + octet = chr (codec.read_uint8 ()) if octet != '1': return None - opcode = chr (codec.decode_octet ()) - seq = codec.decode_long () + opcode = chr (codec.read_uint8 ()) + seq = codec.read_uint32 () return (opcode, seq) def encodeValue (self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: - codec.encode_octet (int (value)) + codec.write_uint8 (int (value)) elif typecode == 2: - codec.encode_short (int (value)) + codec.write_uint16 (int (value)) elif typecode == 3: - codec.encode_long (long (value)) + codec.write_uint32 (long (value)) elif typecode == 4: - codec.encode_longlong (long (value)) + codec.write_uint64 (long (value)) elif typecode == 5: - codec.encode_octet (int (value)) + codec.write_uint8 (int (value)) elif typecode == 6: - codec.encode_shortstr (value) + codec.write_str8 (value) elif typecode == 7: - codec.encode_longstr (value) + codec.write_vbin32 (value) elif typecode == 8: # ABSTIME - codec.encode_longlong (long (value)) + codec.write_uint64 (long (value)) elif typecode == 9: # DELTATIME - codec.encode_longlong (long (value)) + codec.write_uint64 (long (value)) elif typecode == 10: # REF - codec.encode_longlong (long (value)) + codec.write_uint64 (long (value)) elif typecode == 11: # BOOL - codec.encode_octet (int (value)) + codec.write_uint8 (int (value)) elif typecode == 12: # FLOAT - codec.encode_float (float (value)) + codec.write_float (float (value)) elif typecode == 13: # DOUBLE - codec.encode_double (double (value)) + codec.write_double (double (value)) elif typecode == 14: # UUID - codec.encode_uuid (value) + codec.write_uuid (value) elif typecode == 15: # FTABLE - codec.encode_table (value) + codec.write_map (value) else: raise ValueError ("Invalid type code: %d" % typecode) def decodeValue (self, codec, typecode): """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: - data = codec.decode_octet () + data = codec.read_uint8 () elif typecode == 2: - data = codec.decode_short () + data = codec.read_uint16 () elif typecode == 3: - data = codec.decode_long () + data = codec.read_uint32 () elif typecode == 4: - data = codec.decode_longlong () + data = codec.read_uint64 () elif typecode == 5: - data = codec.decode_octet () + data = codec.read_uint8 () elif typecode == 6: - data = codec.decode_shortstr () + data = codec.read_str8 () elif typecode == 7: - data = codec.decode_longstr () + data = codec.read_vbin32 () elif typecode == 8: # ABSTIME - data = codec.decode_longlong () + data = codec.read_uint64 () elif typecode == 9: # DELTATIME - data = codec.decode_longlong () + data = codec.read_uint64 () elif typecode == 10: # REF - data = codec.decode_longlong () + data = codec.read_uint64 () elif typecode == 11: # BOOL - data = codec.decode_octet () + data = codec.read_uint8 () elif typecode == 12: # FLOAT - data = codec.decode_float () + data = codec.read_float () elif typecode == 13: # DOUBLE - data = codec.decode_double () + data = codec.read_double () elif typecode == 14: # UUID - data = codec.decode_uuid () + data = codec.read_uuid () elif typecode == 15: # FTABLE - data = codec.decode_table () + data = codec.read_map () else: raise ValueError ("Invalid type code: %d" % typecode) return data @@ -415,8 +413,8 @@ class managementClient: self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None) def handleMethodReply (self, ch, codec, sequence): - status = codec.decode_long () - sText = codec.decode_shortstr () + status = codec.read_uint32 () + sText = codec.read_str8 () data = self.seqMgr.release (sequence) if data == None: @@ -451,8 +449,8 @@ class managementClient: self.methodCb (ch.context, userSequence, status, sText, args) def handleCommandComplete (self, ch, codec, seq): - code = codec.decode_long () - text = codec.decode_shortstr () + code = codec.read_uint32 () + text = codec.read_str8 () data = (seq, code, text) context = self.seqMgr.release (seq) if context == "outstanding": @@ -467,75 +465,60 @@ class managementClient: def handleBrokerResponse (self, ch, codec): if self.ctrlCb != None: - uuid = codec.decode_uuid () + uuid = codec.read_uuid () data = (uuid, ch.sessionId) self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data) # Send a package request - sendCodec = Codec (StringIO (), self.spec) + sendCodec = Codec (self.spec) seq = self.seqMgr.reserve ("outstanding") self.setHeader (sendCodec, ord ('P'), seq) - smsg = Content (sendCodec.stream.getvalue ()) - smsg["content_type"] = "application/octet-stream" - smsg["routing_key"] = "agent" - smsg["reply_to"] = self.spec.struct ("reply_to") - smsg["reply_to"]["exchange_name"] = "amq.direct" - smsg["reply_to"]["routing_key"] = ch.replyName + smsg = ch.message(sendCodec.encoded) ch.send ("qpid.management", smsg) - + def handlePackageInd (self, ch, codec): - pname = codec.decode_shortstr () + pname = codec.read_str8 () if pname not in self.packages: self.packages[pname] = {} # Send a class request - sendCodec = Codec (StringIO (), self.spec) + sendCodec = Codec (self.spec) seq = self.seqMgr.reserve ("outstanding") self.setHeader (sendCodec, ord ('Q'), seq) self.incOutstanding (ch) - sendCodec.encode_shortstr (pname) - smsg = Content (sendCodec.stream.getvalue ()) - smsg["content_type"] = "application/octet-stream" - smsg["routing_key"] = "agent" - smsg["reply_to"] = self.spec.struct ("reply_to") - smsg["reply_to"]["exchange_name"] = "amq.direct" - smsg["reply_to"]["routing_key"] = ch.replyName + sendCodec.write_str8 (pname) + smsg = ch.message(sendCodec.encoded) ch.send ("qpid.management", smsg) def handleClassInd (self, ch, codec): - pname = codec.decode_shortstr () - cname = codec.decode_shortstr () - hash = codec.decode_bin128 () + pname = codec.read_str8 () + cname = codec.read_str8 () + hash = codec.read_bin128 () if pname not in self.packages: return if (cname, hash) not in self.packages[pname]: # Send a schema request - sendCodec = Codec (StringIO (), self.spec) + sendCodec = Codec (self.spec) seq = self.seqMgr.reserve ("outstanding") self.setHeader (sendCodec, ord ('S'), seq) self.incOutstanding (ch) - sendCodec.encode_shortstr (pname) - sendCodec.encode_shortstr (cname) - sendCodec.encode_bin128 (hash) - smsg = Content (sendCodec.stream.getvalue ()) - smsg["content_type"] = "application/octet-stream" - smsg["routing_key"] = "agent" - smsg["reply_to"] = self.spec.struct ("reply_to") - smsg["reply_to"]["exchange_name"] = "amq.direct" - smsg["reply_to"]["routing_key"] = ch.replyName + sendCodec.write_str8 (pname) + sendCodec.write_str8 (cname) + sendCodec.write_bin128 (hash) + smsg = ch.message(sendCodec.encoded) ch.send ("qpid.management", smsg) def parseSchema (self, ch, codec): """ Parse a received schema-description message. """ self.decOutstanding (ch) - packageName = codec.decode_shortstr () - className = codec.decode_shortstr () - hash = codec.decode_bin128 () - configCount = codec.decode_short () - instCount = codec.decode_short () - methodCount = codec.decode_short () - eventCount = codec.decode_short () + packageName = codec.read_str8 () + className = codec.read_str8 () + hash = codec.read_bin128 () + configCount = codec.read_uint16 () + instCount = codec.read_uint16 () + methodCount = codec.read_uint16 () + eventCount = codec.read_uint16 () if packageName not in self.packages: return @@ -555,7 +538,7 @@ class managementClient: insts.append (("id", 4, None, None)) for idx in range (configCount): - ft = codec.decode_table () + ft = codec.read_map () name = ft["name"] type = ft["type"] access = ft["access"] @@ -582,7 +565,7 @@ class managementClient: configs.append (config) for idx in range (instCount): - ft = codec.decode_table () + ft = codec.read_map () name = ft["name"] type = ft["type"] unit = None @@ -598,7 +581,7 @@ class managementClient: insts.append (inst) for idx in range (methodCount): - ft = codec.decode_table () + ft = codec.read_map () mname = ft["name"] argCount = ft["argCount"] if "desc" in ft: @@ -608,7 +591,7 @@ class managementClient: args = [] for aidx in range (argCount): - ft = codec.decode_table () + ft = codec.read_map () name = ft["name"] type = ft["type"] dir = ft["dir"].upper () @@ -654,9 +637,9 @@ class managementClient: if cls == 'I' and self.instCb == None: return - packageName = codec.decode_shortstr () - className = codec.decode_shortstr () - hash = codec.decode_bin128 () + packageName = codec.read_str8 () + className = codec.read_str8 () + hash = codec.read_bin128 () classKey = (packageName, className, hash) if classKey not in self.schema: @@ -665,9 +648,9 @@ class managementClient: row = [] timestamps = [] - timestamps.append (codec.decode_longlong ()) # Current Time - timestamps.append (codec.decode_longlong ()) # Create Time - timestamps.append (codec.decode_longlong ()) # Delete Time + timestamps.append (codec.read_uint64 ()) # Current Time + timestamps.append (codec.read_uint64 ()) # Create Time + timestamps.append (codec.read_uint64 ()) # Delete Time schemaClass = self.schema[classKey] if cls == 'C' or cls == 'B': @@ -712,10 +695,10 @@ class managementClient: def method (self, channel, userSequence, objId, classId, methodName, args): """ Invoke a method on an object """ - codec = Codec (StringIO (), self.spec) + codec = Codec (self.spec) sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) self.setHeader (codec, ord ('M'), sequence) - codec.encode_longlong (objId) # ID of object + codec.write_uint64 (objId) # ID of object # Encode args according to schema if classId not in self.schema: @@ -745,11 +728,6 @@ class managementClient: packageName = classId[0] className = classId[1] - msg = Content (codec.stream.getvalue ()) - msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "agent.method." + packageName + "." + \ - className + "." + methodName - msg["reply_to"] = self.spec.struct ("reply_to") - msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = channel.replyName + msg = channel.message(codec.encoded, "agent.method." + packageName + "." + \ + className + "." + methodName) channel.send ("qpid.management", msg) |
