diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-01-02 15:56:20 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-01-02 15:56:20 +0000 |
| commit | 3fe6853a7029e48f693c0853e51af33be5c79aec (patch) | |
| tree | 6139a715591aabc91370350aa26f854639a2aa11 /python/qpid/management.py | |
| parent | 8bc0b992a0e67259a7d9c525bbbbbc32fbc60a20 (diff) | |
| download | qpid-python-3fe6853a7029e48f693c0853e51af33be5c79aec.tar.gz | |
patch-715 (tross)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@608135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/management.py')
| -rw-r--r-- | python/qpid/management.py | 65 |
1 files changed, 50 insertions, 15 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py index 1c8b3cd840..40de2a5298 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -77,6 +77,14 @@ class ManagementMetadata: codec.encode_shortstr (value) elif typecode == 7: codec.encode_longstr (value) + elif typecode == 8: # ABSTIME + codec.encode_longlong (long (value)) + elif typecode == 9: # DELTATIME + codec.encode_longlong (long (value)) + elif typecode == 10: # REF + codec.encode_longlong (long (value)) + elif typecode == 11: # BOOL + codec.encode_octet (int (value)) else: raise ValueError ("Invalid type code: %d" % typecode) @@ -95,6 +103,14 @@ class ManagementMetadata: data = codec.decode_shortstr () elif typecode == 7: data = codec.decode_longstr () + elif typecode == 8: # ABSTIME + data = codec.decode_longlong () + elif typecode == 9: # DELTATIME + data = codec.decode_longlong () + elif typecode == 10: # REF + data = codec.decode_longlong () + elif typecode == 11: # BOOL + data = codec.decode_octet () else: raise ValueError ("Invalid type code: %d" % typecode) return data @@ -236,10 +252,7 @@ class ManagementMetadata: elif cls == 'I': self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) - def parse (self, codec): - opcode = chr (codec.decode_octet ()) - cls = chr (codec.decode_octet ()) - + def parse (self, codec, opcode, cls): if opcode == 'S': self.parseSchema (cls, codec) @@ -261,34 +274,53 @@ class ManagedBroker: mExchange = "qpid.management" dExchange = "amq.direct" + def setHeader (self, codec, opcode, cls = 0): + codec.encode_octet (ord ('A')) + codec.encode_octet (ord ('M')) + codec.encode_octet (ord ('0')) + codec.encode_octet (ord ('1')) + codec.encode_octet (opcode) + codec.encode_octet (cls) + def checkHeader (self, codec): octet = chr (codec.decode_octet ()) if octet != 'A': - return 0 + return None octet = chr (codec.decode_octet ()) if octet != 'M': - return 0 + return None octet = chr (codec.decode_octet ()) if octet != '0': - return 0 + return None octet = chr (codec.decode_octet ()) if octet != '1': - return 0 - return 1 + return None + opcode = chr (codec.decode_octet ()) + cls = chr (codec.decode_octet ()) + return (opcode, cls) def publish_cb (self, msg): codec = Codec (StringIO (msg.content.body), self.spec) - if self.checkHeader (codec) == 0: + hdr = self.checkHeader (codec) + if hdr == None: raise ValueError ("outer header invalid"); - self.metadata.parse (codec) + self.metadata.parse (codec, hdr[0], hdr[1]) msg.complete () def reply_cb (self, msg): codec = Codec (StringIO (msg.content.body), self.spec) - sequence = codec.decode_long () - status = codec.decode_long () + hdr = self.checkHeader (codec) + if hdr == None: + msg.complete () + return + if hdr[0] != 'R': + msg.complete () + return + + sequence = codec.decode_long () + status = codec.decode_long () sText = codec.decode_shortstr () data = self.sequenceManager.release (sequence) @@ -369,9 +401,10 @@ class ManagedBroker: methodName, args=None, packageName="qpid"): codec = Codec (StringIO (), self.spec); sequence = self.sequenceManager.reserve ((userSequence, className, methodName)) + self.setHeader (codec, ord ('M')) codec.encode_long (sequence) # Method sequence id codec.encode_longlong (objId) # ID of object - codec.encode_shortstr (self.rqname) # name of reply queue + #codec.encode_shortstr (self.rqname) # name of reply queue # Encode args according to schema if (className,'M') not in self.metadata.schema: @@ -402,6 +435,8 @@ class ManagedBroker: msg["content_type"] = "application/octet-stream" msg["routing_key"] = "method." + packageName + "." + className + "." + methodName msg["reply_to"] = self.spec.struct ("reply_to") + msg["reply_to"]["exchange_name"] = "amq.direct" + msg["reply_to"]["routing_key"] = self.rqname self.channel.message_transfer (destination="qpid.management", content=msg) def isConnected (self): @@ -414,7 +449,7 @@ class ManagedBroker: self.client = Client (self.host, self.port, self.spec) self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) self.channel = self.client.channel (1) - response = self.channel.session_open (detached_lifetime=10) + response = self.channel.session_open (detached_lifetime=300) self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) |
