summaryrefslogtreecommitdiff
path: root/python/qpid/management.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-04-22 16:11:34 +0000
committerRafael H. Schloming <rhs@apache.org>2008-04-22 16:11:34 +0000
commite06aa805cfe24b8edf619a6a535883f94589ac35 (patch)
tree4b886461816ca97127aae8a9639ddad74d77bd46 /python/qpid/management.py
parent61959e29ee69f9cebb61b845272eededaec6f11e (diff)
downloadqpid-python-e06aa805cfe24b8edf619a6a535883f94589ac35.tar.gz
QPID-947: update cpp and python management to 0-10 final
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650565 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/management.py')
-rw-r--r--python/qpid/management.py274
1 files changed, 126 insertions, 148 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py
index 6b25d5ea08..3a7a564e19 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -25,12 +25,10 @@ import qpid
import struct
import socket
from threading import Thread
-from message import Message
+from datatypes import Message, RangedSet
from time import time
-from qpid.client import Client
-from qpid.content import Content
from cStringIO import StringIO
-from codec import Codec, EOF
+from codec010 import StringCodec as Codec
from threading import Lock, Condition
@@ -83,40 +81,39 @@ class methodResult:
class managementChannel:
""" This class represents a connection to an AMQP broker. """
- def __init__ (self, ch, topicCb, replyCb, cbContext, _detlife=0):
+ def __init__ (self, ssn, topicCb, replyCb, cbContext, _detlife=0):
""" Given a channel on an established AMQP broker connection, this method
opens a session and performs all of the declarations and bindings needed
to participate in the management protocol. """
- response = ch.session_open (detached_lifetime=_detlife)
- self.sessionId = response.session_id
- self.topicName = "mgmt-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id)
- self.replyName = "repl-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id)
- self.qpidChannel = ch
+ self.sessionId = ssn.name
+ self.topicName = "mgmt-%s" % self.sessionId
+ self.replyName = "repl-%s" % self.sessionId
+ self.qpidChannel = ssn
self.tcb = topicCb
self.rcb = replyCb
self.context = cbContext
self.reqsOutstanding = 0
- ch.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True)
- ch.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True)
+ ssn.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True)
+ ssn.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True)
- ch.queue_bind (exchange="qpid.management",
- queue=self.topicName, routing_key="mgmt.#")
- ch.queue_bind (exchange="amq.direct",
- queue=self.replyName, routing_key=self.replyName)
- ch.message_subscribe (queue=self.topicName, destination="tdest")
- ch.message_subscribe (queue=self.replyName, destination="rdest")
+ ssn.exchange_bind (exchange="qpid.management",
+ queue=self.topicName, binding_key="mgmt.#")
+ ssn.exchange_bind (exchange="amq.direct",
+ queue=self.replyName, binding_key=self.replyName)
+ ssn.message_subscribe (queue=self.topicName, destination="tdest")
+ ssn.message_subscribe (queue=self.replyName, destination="rdest")
- ch.client.queue ("tdest").listen (self.topicCb)
- ch.client.queue ("rdest").listen (self.replyCb)
+ ssn.incoming ("tdest").listen (self.topicCb)
+ ssn.incoming ("rdest").listen (self.replyCb)
- ch.message_flow_mode (destination="tdest", mode=1)
- ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF)
- ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF)
+ ssn.message_set_flow_mode (destination="tdest", flow_mode=1)
+ ssn.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF)
+ ssn.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF)
- ch.message_flow_mode (destination="rdest", mode=1)
- ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
- ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
+ ssn.message_set_flow_mode (destination="rdest", flow_mode=1)
+ ssn.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
+ ssn.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
def topicCb (self, msg):
""" Receive messages via the topic queue on this channel. """
@@ -127,7 +124,18 @@ class managementChannel:
self.rcb (self, msg)
def send (self, exchange, msg):
- self.qpidChannel.message_transfer (destination=exchange, content=msg)
+ self.qpidChannel.message_transfer (destination=exchange, message=msg)
+
+ def accept (self, msg):
+ self.qpidChannel.message_accept(RangedSet(msg.id))
+
+ def message (self, body, routing_key="agent"):
+ dp = self.qpidChannel.delivery_properties()
+ dp.routing_key = routing_key
+ mp = self.qpidChannel.message_properties()
+ mp.content_type = "application/octet-stream"
+ mp.reply_to = self.qpidChannel.reply_to("amq.direct", self.replyName)
+ return Message(dp, mp, body)
class managementClient:
@@ -177,14 +185,9 @@ class managementClient:
self.channels.append (mch)
self.incOutstanding (mch)
- codec = Codec (StringIO (), self.spec)
+ codec = Codec (self.spec)
self.setHeader (codec, ord ('B'))
- msg = Content (codec.stream.getvalue ())
- msg["content_type"] = "application/octet-stream"
- msg["routing_key"] = "agent"
- msg["reply_to"] = self.spec.struct ("reply_to")
- msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = mch.replyName
+ msg = mch.message(codec.encoded)
mch.send ("qpid.management", msg)
return mch
@@ -198,17 +201,12 @@ class managementClient:
def getObjects (self, channel, userSequence, className):
""" Request immediate content from broker """
- codec = Codec (StringIO (), self.spec)
+ codec = Codec (self.spec)
self.setHeader (codec, ord ('G'), userSequence)
ft = {}
ft["_class"] = className
- codec.encode_table (ft)
- msg = Content (codec.stream.getvalue ())
- msg["content_type"] = "application/octet-stream"
- msg["routing_key"] = "agent"
- msg["reply_to"] = self.spec.struct ("reply_to")
- msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = channel.replyName
+ codec.write_map (ft)
+ msg = channel.message(codec.encoded)
channel.send ("qpid.management", msg)
def syncWaitForStable (self, channel):
@@ -270,19 +268,19 @@ class managementClient:
#========================================================
def topicCb (self, ch, msg):
""" Receive messages via the topic queue of a particular channel. """
- codec = Codec (StringIO (msg.content.body), self.spec)
+ codec = Codec (self.spec, msg.body)
hdr = self.checkHeader (codec)
if hdr == None:
raise ValueError ("outer header invalid");
self.parse (ch, codec, hdr[0], hdr[1])
- msg.complete ()
+ ch.accept(msg)
def replyCb (self, ch, msg):
""" Receive messages via the reply queue of a particular channel. """
- codec = Codec (StringIO (msg.content.body), self.spec)
+ codec = Codec (self.spec, msg.body)
hdr = self.checkHeader (codec)
if hdr == None:
- msg.complete ()
+ ch.accept(msg)
return
if hdr[0] == 'm':
@@ -297,102 +295,102 @@ class managementClient:
self.handleClassInd (ch, codec)
else:
self.parse (ch, codec, hdr[0], hdr[1])
- msg.complete ()
+ ch.accept(msg)
#========================================================
# Internal Functions
#========================================================
def setHeader (self, codec, opcode, seq = 0):
""" Compose the header of a management message. """
- codec.encode_octet (ord ('A'))
- codec.encode_octet (ord ('M'))
- codec.encode_octet (ord ('1'))
- codec.encode_octet (opcode)
- codec.encode_long (seq)
+ codec.write_uint8 (ord ('A'))
+ codec.write_uint8 (ord ('M'))
+ codec.write_uint8 (ord ('1'))
+ codec.write_uint8 (opcode)
+ codec.write_uint32 (seq)
def checkHeader (self, codec):
""" Check the header of a management message and extract the opcode and
class. """
- octet = chr (codec.decode_octet ())
+ octet = chr (codec.read_uint8 ())
if octet != 'A':
return None
- octet = chr (codec.decode_octet ())
+ octet = chr (codec.read_uint8 ())
if octet != 'M':
return None
- octet = chr (codec.decode_octet ())
+ octet = chr (codec.read_uint8 ())
if octet != '1':
return None
- opcode = chr (codec.decode_octet ())
- seq = codec.decode_long ()
+ opcode = chr (codec.read_uint8 ())
+ seq = codec.read_uint32 ()
return (opcode, seq)
def encodeValue (self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
if typecode == 1:
- codec.encode_octet (int (value))
+ codec.write_uint8 (int (value))
elif typecode == 2:
- codec.encode_short (int (value))
+ codec.write_uint16 (int (value))
elif typecode == 3:
- codec.encode_long (long (value))
+ codec.write_uint32 (long (value))
elif typecode == 4:
- codec.encode_longlong (long (value))
+ codec.write_uint64 (long (value))
elif typecode == 5:
- codec.encode_octet (int (value))
+ codec.write_uint8 (int (value))
elif typecode == 6:
- codec.encode_shortstr (value)
+ codec.write_str8 (value)
elif typecode == 7:
- codec.encode_longstr (value)
+ codec.write_vbin32 (value)
elif typecode == 8: # ABSTIME
- codec.encode_longlong (long (value))
+ codec.write_uint64 (long (value))
elif typecode == 9: # DELTATIME
- codec.encode_longlong (long (value))
+ codec.write_uint64 (long (value))
elif typecode == 10: # REF
- codec.encode_longlong (long (value))
+ codec.write_uint64 (long (value))
elif typecode == 11: # BOOL
- codec.encode_octet (int (value))
+ codec.write_uint8 (int (value))
elif typecode == 12: # FLOAT
- codec.encode_float (float (value))
+ codec.write_float (float (value))
elif typecode == 13: # DOUBLE
- codec.encode_double (double (value))
+ codec.write_double (double (value))
elif typecode == 14: # UUID
- codec.encode_uuid (value)
+ codec.write_uuid (value)
elif typecode == 15: # FTABLE
- codec.encode_table (value)
+ codec.write_map (value)
else:
raise ValueError ("Invalid type code: %d" % typecode)
def decodeValue (self, codec, typecode):
""" Decode, from the codec, a value based on its typecode. """
if typecode == 1:
- data = codec.decode_octet ()
+ data = codec.read_uint8 ()
elif typecode == 2:
- data = codec.decode_short ()
+ data = codec.read_uint16 ()
elif typecode == 3:
- data = codec.decode_long ()
+ data = codec.read_uint32 ()
elif typecode == 4:
- data = codec.decode_longlong ()
+ data = codec.read_uint64 ()
elif typecode == 5:
- data = codec.decode_octet ()
+ data = codec.read_uint8 ()
elif typecode == 6:
- data = codec.decode_shortstr ()
+ data = codec.read_str8 ()
elif typecode == 7:
- data = codec.decode_longstr ()
+ data = codec.read_vbin32 ()
elif typecode == 8: # ABSTIME
- data = codec.decode_longlong ()
+ data = codec.read_uint64 ()
elif typecode == 9: # DELTATIME
- data = codec.decode_longlong ()
+ data = codec.read_uint64 ()
elif typecode == 10: # REF
- data = codec.decode_longlong ()
+ data = codec.read_uint64 ()
elif typecode == 11: # BOOL
- data = codec.decode_octet ()
+ data = codec.read_uint8 ()
elif typecode == 12: # FLOAT
- data = codec.decode_float ()
+ data = codec.read_float ()
elif typecode == 13: # DOUBLE
- data = codec.decode_double ()
+ data = codec.read_double ()
elif typecode == 14: # UUID
- data = codec.decode_uuid ()
+ data = codec.read_uuid ()
elif typecode == 15: # FTABLE
- data = codec.decode_table ()
+ data = codec.read_map ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
@@ -415,8 +413,8 @@ class managementClient:
self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
def handleMethodReply (self, ch, codec, sequence):
- status = codec.decode_long ()
- sText = codec.decode_shortstr ()
+ status = codec.read_uint32 ()
+ sText = codec.read_str8 ()
data = self.seqMgr.release (sequence)
if data == None:
@@ -451,8 +449,8 @@ class managementClient:
self.methodCb (ch.context, userSequence, status, sText, args)
def handleCommandComplete (self, ch, codec, seq):
- code = codec.decode_long ()
- text = codec.decode_shortstr ()
+ code = codec.read_uint32 ()
+ text = codec.read_str8 ()
data = (seq, code, text)
context = self.seqMgr.release (seq)
if context == "outstanding":
@@ -467,75 +465,60 @@ class managementClient:
def handleBrokerResponse (self, ch, codec):
if self.ctrlCb != None:
- uuid = codec.decode_uuid ()
+ uuid = codec.read_uuid ()
data = (uuid, ch.sessionId)
self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data)
# Send a package request
- sendCodec = Codec (StringIO (), self.spec)
+ sendCodec = Codec (self.spec)
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('P'), seq)
- smsg = Content (sendCodec.stream.getvalue ())
- smsg["content_type"] = "application/octet-stream"
- smsg["routing_key"] = "agent"
- smsg["reply_to"] = self.spec.struct ("reply_to")
- smsg["reply_to"]["exchange_name"] = "amq.direct"
- smsg["reply_to"]["routing_key"] = ch.replyName
+ smsg = ch.message(sendCodec.encoded)
ch.send ("qpid.management", smsg)
-
+
def handlePackageInd (self, ch, codec):
- pname = codec.decode_shortstr ()
+ pname = codec.read_str8 ()
if pname not in self.packages:
self.packages[pname] = {}
# Send a class request
- sendCodec = Codec (StringIO (), self.spec)
+ sendCodec = Codec (self.spec)
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('Q'), seq)
self.incOutstanding (ch)
- sendCodec.encode_shortstr (pname)
- smsg = Content (sendCodec.stream.getvalue ())
- smsg["content_type"] = "application/octet-stream"
- smsg["routing_key"] = "agent"
- smsg["reply_to"] = self.spec.struct ("reply_to")
- smsg["reply_to"]["exchange_name"] = "amq.direct"
- smsg["reply_to"]["routing_key"] = ch.replyName
+ sendCodec.write_str8 (pname)
+ smsg = ch.message(sendCodec.encoded)
ch.send ("qpid.management", smsg)
def handleClassInd (self, ch, codec):
- pname = codec.decode_shortstr ()
- cname = codec.decode_shortstr ()
- hash = codec.decode_bin128 ()
+ pname = codec.read_str8 ()
+ cname = codec.read_str8 ()
+ hash = codec.read_bin128 ()
if pname not in self.packages:
return
if (cname, hash) not in self.packages[pname]:
# Send a schema request
- sendCodec = Codec (StringIO (), self.spec)
+ sendCodec = Codec (self.spec)
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('S'), seq)
self.incOutstanding (ch)
- sendCodec.encode_shortstr (pname)
- sendCodec.encode_shortstr (cname)
- sendCodec.encode_bin128 (hash)
- smsg = Content (sendCodec.stream.getvalue ())
- smsg["content_type"] = "application/octet-stream"
- smsg["routing_key"] = "agent"
- smsg["reply_to"] = self.spec.struct ("reply_to")
- smsg["reply_to"]["exchange_name"] = "amq.direct"
- smsg["reply_to"]["routing_key"] = ch.replyName
+ sendCodec.write_str8 (pname)
+ sendCodec.write_str8 (cname)
+ sendCodec.write_bin128 (hash)
+ smsg = ch.message(sendCodec.encoded)
ch.send ("qpid.management", smsg)
def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
self.decOutstanding (ch)
- packageName = codec.decode_shortstr ()
- className = codec.decode_shortstr ()
- hash = codec.decode_bin128 ()
- configCount = codec.decode_short ()
- instCount = codec.decode_short ()
- methodCount = codec.decode_short ()
- eventCount = codec.decode_short ()
+ packageName = codec.read_str8 ()
+ className = codec.read_str8 ()
+ hash = codec.read_bin128 ()
+ configCount = codec.read_uint16 ()
+ instCount = codec.read_uint16 ()
+ methodCount = codec.read_uint16 ()
+ eventCount = codec.read_uint16 ()
if packageName not in self.packages:
return
@@ -555,7 +538,7 @@ class managementClient:
insts.append (("id", 4, None, None))
for idx in range (configCount):
- ft = codec.decode_table ()
+ ft = codec.read_map ()
name = ft["name"]
type = ft["type"]
access = ft["access"]
@@ -582,7 +565,7 @@ class managementClient:
configs.append (config)
for idx in range (instCount):
- ft = codec.decode_table ()
+ ft = codec.read_map ()
name = ft["name"]
type = ft["type"]
unit = None
@@ -598,7 +581,7 @@ class managementClient:
insts.append (inst)
for idx in range (methodCount):
- ft = codec.decode_table ()
+ ft = codec.read_map ()
mname = ft["name"]
argCount = ft["argCount"]
if "desc" in ft:
@@ -608,7 +591,7 @@ class managementClient:
args = []
for aidx in range (argCount):
- ft = codec.decode_table ()
+ ft = codec.read_map ()
name = ft["name"]
type = ft["type"]
dir = ft["dir"].upper ()
@@ -654,9 +637,9 @@ class managementClient:
if cls == 'I' and self.instCb == None:
return
- packageName = codec.decode_shortstr ()
- className = codec.decode_shortstr ()
- hash = codec.decode_bin128 ()
+ packageName = codec.read_str8 ()
+ className = codec.read_str8 ()
+ hash = codec.read_bin128 ()
classKey = (packageName, className, hash)
if classKey not in self.schema:
@@ -665,9 +648,9 @@ class managementClient:
row = []
timestamps = []
- timestamps.append (codec.decode_longlong ()) # Current Time
- timestamps.append (codec.decode_longlong ()) # Create Time
- timestamps.append (codec.decode_longlong ()) # Delete Time
+ timestamps.append (codec.read_uint64 ()) # Current Time
+ timestamps.append (codec.read_uint64 ()) # Create Time
+ timestamps.append (codec.read_uint64 ()) # Delete Time
schemaClass = self.schema[classKey]
if cls == 'C' or cls == 'B':
@@ -712,10 +695,10 @@ class managementClient:
def method (self, channel, userSequence, objId, classId, methodName, args):
""" Invoke a method on an object """
- codec = Codec (StringIO (), self.spec)
+ codec = Codec (self.spec)
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
self.setHeader (codec, ord ('M'), sequence)
- codec.encode_longlong (objId) # ID of object
+ codec.write_uint64 (objId) # ID of object
# Encode args according to schema
if classId not in self.schema:
@@ -745,11 +728,6 @@ class managementClient:
packageName = classId[0]
className = classId[1]
- msg = Content (codec.stream.getvalue ())
- msg["content_type"] = "application/octet-stream"
- msg["routing_key"] = "agent.method." + packageName + "." + \
- className + "." + methodName
- msg["reply_to"] = self.spec.struct ("reply_to")
- msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = channel.replyName
+ msg = channel.message(codec.encoded, "agent.method." + packageName + "." + \
+ className + "." + methodName)
channel.send ("qpid.management", msg)