diff options
| author | Alan Conway <aconway@apache.org> | 2007-11-15 21:36:00 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-11-15 21:36:00 +0000 |
| commit | a24723ab523b37408d8cb5c3e1268667d166f127 (patch) | |
| tree | 6d08de729742e372fce6ef51f96a3381b6e7e376 /python | |
| parent | d8dd71de008990f5a665e0191c231ae62fcf44fb (diff) | |
| download | qpid-python-a24723ab523b37408d8cb5c3e1268667d166f127.tar.gz | |
QPID-687: comitted qpid-patch7-python.diff for real this time.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@595465 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/codec.py | 6 | ||||
| -rw-r--r-- | python/qpid/management.py | 258 |
2 files changed, 190 insertions, 74 deletions
diff --git a/python/qpid/codec.py b/python/qpid/codec.py index a3663773a9..b25de11f11 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -247,6 +247,12 @@ class Codec: def decode_signed_long(self): return self.unpack("!q") + def encode_signed_int(self, o): + self.pack("!l", o) + + def decode_signed_int(self): + return self.unpack("!l") + def encode_longlong(self, o): """ encodes long long (64 bits) data 'o' in network byte order diff --git a/python/qpid/management.py b/python/qpid/management.py index 24c4700c7f..a5ad997a24 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -18,7 +18,7 @@ # """ -Management classes for AMQP +Management API for Qpid """ import qpid @@ -42,50 +42,91 @@ from codec import Codec, EOF #=================================================================== class ManagementMetadata: - def parseSchema (self, cls, oid, len, codec): - #print "Schema Record: objId=", oid - - config = [] - inst = [] - while 1: - flags = codec.decode_octet () - if flags == 0x80: - break - - tc = codec.decode_octet () - name = codec.decode_shortstr () - desc = codec.decode_shortstr () - - if flags & 1: # TODO: Define constants for these - config.append ((tc, name, desc)) - if (flags & 1) == 0 or (flags & 2) == 2: - inst.append ((tc, name, desc)) + def parseSchema (self, cls, codec): + className = codec.decode_shortstr () + configCount = codec.decode_short () + instCount = codec.decode_short () + methodCount = codec.decode_short () + eventCount = codec.decode_short () + + configs = [] + insts = [] + methods = [] + events = [] + + configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) + insts.append (("id", 4, None, None)) + + for idx in range (configCount): + ft = codec.decode_table () + name = ft["name"] + type = ft["type"] + access = ft["access"] + index = ft["index"] + unit = None + min = None + max = None + maxlen = None + desc = None + + for key, value in ft.items (): + if key == "unit": + unit = value + elif key == "min": + min = value + elif key == "max": + max = value + elif key == "maxlen": + maxlen = value + elif key == "desc": + desc = value + + config = (name, type, unit, desc, access, index, min, max, maxlen) + configs.append (config) + + for idx in range (instCount): + ft = codec.decode_table () + name = ft["name"] + type = ft["type"] + unit = None + desc = None + + for key, value in ft.items (): + if key == "unit": + unit = value + elif key == "desc": + desc = value + + inst = (name, type, unit, desc) + insts.append (inst) # TODO: Handle notification of schema change outbound - self.schema[(oid,'C')] = config - self.schema[(oid,'I')] = inst - - def parseContent (self, cls, oid, len, codec): - #print "Content Record: Class=", cls, ", objId=", oid + self.schema[(className,'C')] = configs + self.schema[(className,'I')] = insts + self.schema[(className,'M')] = methods + self.schema[(className,'E')] = events + def parseContent (self, cls, codec): if cls == 'C' and self.broker.config_cb == None: return if cls == 'I' and self.broker.inst_cb == None: return - if (oid,cls) not in self.schema: + className = codec.decode_shortstr () + + if (className,cls) not in self.schema: return row = [] timestamps = [] - timestamps.append (codec.decode_longlong ()); # Current Time - timestamps.append (codec.decode_longlong ()); # Create Time - timestamps.append (codec.decode_longlong ()); # Delete Time + timestamps.append (codec.decode_longlong ()) # Current Time + timestamps.append (codec.decode_longlong ()) # Create Time + timestamps.append (codec.decode_longlong ()) # Delete Time - for element in self.schema[(oid,cls)][:]: - tc = element[0] - name = element[1] + for element in self.schema[(className,cls)][:]: + tc = element[1] + name = element[0] if tc == 1: # TODO: Define constants for these data = codec.decode_octet () elif tc == 2: @@ -98,33 +139,29 @@ class ManagementMetadata: data = codec.decode_octet () elif tc == 6: data = codec.decode_shortstr () + elif tc == 7: + data = codec.decode_longstr () + else: + raise ValueError ("Invalid type code: %d" % tc) row.append ((name, data)) if cls == 'C': - self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps) - if cls == 'I': - self.broker.inst_cb[1] (self.broker.inst_cb[0], oid, row, timestamps) + self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps) + elif cls == 'I': + self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) def parse (self, codec): - try: - opcode = chr (codec.decode_octet ()) - except EOF: - return 0 - - cls = chr (codec.decode_octet ()) - oid = codec.decode_short () - len = codec.decode_long () - - if len < 8: - raise ValueError ("parse error: value of length field too small") + opcode = chr (codec.decode_octet ()) + cls = chr (codec.decode_octet ()) if opcode == 'S': - self.parseSchema (cls, oid, len, codec) + self.parseSchema (cls, codec) - if opcode == 'C': - self.parseContent (cls, oid, len, codec) + elif opcode == 'C': + self.parseContent (cls, codec) - return 1 + else: + raise ValueError ("Unknown opcode: %c" % opcode); def __init__ (self, broker): self.broker = broker @@ -140,7 +177,8 @@ class ManagementMetadata: #=================================================================== class ManagedBroker: - exchange = "qpid.management" + mExchange = "qpid.management" + dExchange = "amq.direct" def checkHeader (self, codec): octet = chr (codec.decode_octet ()) @@ -157,69 +195,141 @@ class ManagedBroker: return 0 return 1 - def receive_cb (self, msg): + def publish_cb (self, msg): codec = Codec (StringIO (msg.content.body), self.spec) if self.checkHeader (codec) == 0: raise ValueError ("outer header invalid"); - while self.metadata.parse (codec): - pass - + self.metadata.parse (codec) msg.complete () - def __init__ (self, host = "localhost", port = 5672, - username = "guest", password = "guest"): + def reply_cb (self, msg): + codec = Codec (StringIO (msg.content.body), self.spec) + methodId = codec.decode_long () + status = codec.decode_long () + sText = codec.decode_shortstr () + + args = {} + if status == 0: + args["sequence"] = codec.decode_long () + args["body"] = codec.decode_longstr () + + if self.method_cb != None: + self.method_cb[1] (self.method_cb[0], methodId, status, sText, args) - self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml") - self.client = None - self.channel = None - self.queue = None - self.qname = None - self.metadata = ManagementMetadata (self) + msg.complete () + + def __init__ (self, + host = "localhost", + port = 5672, + username = "guest", + password = "guest", + specfile = "../specs/amqp.0-10-preview.xml"): + + self.spec = qpid.spec.load (specfile) + self.client = None + self.channel = None + self.queue = None + self.rqueue = None + self.qname = None + self.rqname = None + self.metadata = ManagementMetadata (self) + self.connected = 0 + self.lastConnectError = None # Initialize the callback records + self.status_cb = None self.schema_cb = None self.config_cb = None self.inst_cb = None + self.method_cb = None self.host = host self.port = port self.username = username self.password = password + def statusListener (self, context, callback): + self.status_cb = (context, callback) + def schemaListener (self, context, callback): self.schema_cb = (context, callback) def configListener (self, context, callback): self.config_cb = (context, callback) + def methodListener (self, context, callback): + self.method_cb = (context, callback) + def instrumentationListener (self, context, callback): self.inst_cb = (context, callback) + def method (self, methodId, objId, className, + methodName, args=None, packageName="qpid"): + codec = Codec (StringIO (), self.spec); + codec.encode_long (methodId) + codec.encode_longlong (objId) + codec.encode_shortstr (self.rqname) + + # TODO: Encode args according to schema + if methodName == "echo": + codec.encode_long (args["sequence"]) + codec.encode_longstr (args["body"]) + + msg = Content (codec.stream.getvalue ()) + msg["content_type"] = "application/octet-stream" + msg["routing_key"] = "method." + packageName + "." + className + "." + methodName + msg["reply_to"] = self.spec.struct ("reply_to") + self.channel.message_transfer (destination="qpid.management", content=msg) + + def isConnected (self): + return connected + def start (self): - print "Connecting to broker", self.host + print "Connecting to broker %s:%d" % (self.host, self.port) try: self.client = Client (self.host, self.port, self.spec) self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) self.channel = self.client.channel (1) - response = self.channel.session_open (detached_lifetime=300) - self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id) + response = self.channel.session_open (detached_lifetime=10) + self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) + self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) + + self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) + self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1) + + self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname, + routing_key="mgmt.#") + self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname, + routing_key=self.rqname) + + self.channel.message_subscribe (queue=self.qname, destination="mdest") + self.channel.message_subscribe (queue=self.rqname, destination="rdest") + + self.queue = self.client.queue ("mdest") + self.queue.listen (self.publish_cb) - self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) - self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname, - routing_key="mgmt") - self.channel.message_subscribe (queue=self.qname, destination="dest") - self.queue = self.client.queue ("dest") - self.queue.listen (self.receive_cb) + self.channel.message_flow_mode (destination="mdest", mode=1) + self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF) + self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF) - self.channel.message_flow_mode (destination="dest", mode=1) - self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF) - self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF) + self.rqueue = self.client.queue ("rdest") + self.rqueue.listen (self.reply_cb) + + self.channel.message_flow_mode (destination="rdest", mode=1) + self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) + self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) + + self.connected = 1 except socket.error, e: print "Socket Error Detected:", e[1] + self.lastConnectError = e raise except: raise + + def stop (self): + pass |
