summaryrefslogtreecommitdiff
path: root/python/qpid/management.py
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-01-02 15:56:20 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-01-02 15:56:20 +0000
commit3fe6853a7029e48f693c0853e51af33be5c79aec (patch)
tree6139a715591aabc91370350aa26f854639a2aa11 /python/qpid/management.py
parent8bc0b992a0e67259a7d9c525bbbbbc32fbc60a20 (diff)
downloadqpid-python-3fe6853a7029e48f693c0853e51af33be5c79aec.tar.gz
patch-715 (tross)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@608135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/management.py')
-rw-r--r--python/qpid/management.py65
1 files changed, 50 insertions, 15 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py
index 1c8b3cd840..40de2a5298 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -77,6 +77,14 @@ class ManagementMetadata:
codec.encode_shortstr (value)
elif typecode == 7:
codec.encode_longstr (value)
+ elif typecode == 8: # ABSTIME
+ codec.encode_longlong (long (value))
+ elif typecode == 9: # DELTATIME
+ codec.encode_longlong (long (value))
+ elif typecode == 10: # REF
+ codec.encode_longlong (long (value))
+ elif typecode == 11: # BOOL
+ codec.encode_octet (int (value))
else:
raise ValueError ("Invalid type code: %d" % typecode)
@@ -95,6 +103,14 @@ class ManagementMetadata:
data = codec.decode_shortstr ()
elif typecode == 7:
data = codec.decode_longstr ()
+ elif typecode == 8: # ABSTIME
+ data = codec.decode_longlong ()
+ elif typecode == 9: # DELTATIME
+ data = codec.decode_longlong ()
+ elif typecode == 10: # REF
+ data = codec.decode_longlong ()
+ elif typecode == 11: # BOOL
+ data = codec.decode_octet ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
@@ -236,10 +252,7 @@ class ManagementMetadata:
elif cls == 'I':
self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps)
- def parse (self, codec):
- opcode = chr (codec.decode_octet ())
- cls = chr (codec.decode_octet ())
-
+ def parse (self, codec, opcode, cls):
if opcode == 'S':
self.parseSchema (cls, codec)
@@ -261,34 +274,53 @@ class ManagedBroker:
mExchange = "qpid.management"
dExchange = "amq.direct"
+ def setHeader (self, codec, opcode, cls = 0):
+ codec.encode_octet (ord ('A'))
+ codec.encode_octet (ord ('M'))
+ codec.encode_octet (ord ('0'))
+ codec.encode_octet (ord ('1'))
+ codec.encode_octet (opcode)
+ codec.encode_octet (cls)
+
def checkHeader (self, codec):
octet = chr (codec.decode_octet ())
if octet != 'A':
- return 0
+ return None
octet = chr (codec.decode_octet ())
if octet != 'M':
- return 0
+ return None
octet = chr (codec.decode_octet ())
if octet != '0':
- return 0
+ return None
octet = chr (codec.decode_octet ())
if octet != '1':
- return 0
- return 1
+ return None
+ opcode = chr (codec.decode_octet ())
+ cls = chr (codec.decode_octet ())
+ return (opcode, cls)
def publish_cb (self, msg):
codec = Codec (StringIO (msg.content.body), self.spec)
- if self.checkHeader (codec) == 0:
+ hdr = self.checkHeader (codec)
+ if hdr == None:
raise ValueError ("outer header invalid");
- self.metadata.parse (codec)
+ self.metadata.parse (codec, hdr[0], hdr[1])
msg.complete ()
def reply_cb (self, msg):
codec = Codec (StringIO (msg.content.body), self.spec)
- sequence = codec.decode_long ()
- status = codec.decode_long ()
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ msg.complete ()
+ return
+ if hdr[0] != 'R':
+ msg.complete ()
+ return
+
+ sequence = codec.decode_long ()
+ status = codec.decode_long ()
sText = codec.decode_shortstr ()
data = self.sequenceManager.release (sequence)
@@ -369,9 +401,10 @@ class ManagedBroker:
methodName, args=None, packageName="qpid"):
codec = Codec (StringIO (), self.spec);
sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+ self.setHeader (codec, ord ('M'))
codec.encode_long (sequence) # Method sequence id
codec.encode_longlong (objId) # ID of object
- codec.encode_shortstr (self.rqname) # name of reply queue
+ #codec.encode_shortstr (self.rqname) # name of reply queue
# Encode args according to schema
if (className,'M') not in self.metadata.schema:
@@ -402,6 +435,8 @@ class ManagedBroker:
msg["content_type"] = "application/octet-stream"
msg["routing_key"] = "method." + packageName + "." + className + "." + methodName
msg["reply_to"] = self.spec.struct ("reply_to")
+ msg["reply_to"]["exchange_name"] = "amq.direct"
+ msg["reply_to"]["routing_key"] = self.rqname
self.channel.message_transfer (destination="qpid.management", content=msg)
def isConnected (self):
@@ -414,7 +449,7 @@ class ManagedBroker:
self.client = Client (self.host, self.port, self.spec)
self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
self.channel = self.client.channel (1)
- response = self.channel.session_open (detached_lifetime=10)
+ response = self.channel.session_open (detached_lifetime=300)
self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)