summaryrefslogtreecommitdiff
path: root/python/qpid/management.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-12-10 20:22:23 +0000
committerAlan Conway <aconway@apache.org>2007-12-10 20:22:23 +0000
commit0745cd0d738fbec7bd48529d7024535fc3301931 (patch)
tree37a743c98b2dd700fb90804d055495ab46cbb9a7 /python/qpid/management.py
parente6f8984d1ad2168f5ef146172de51ca654996f6a (diff)
downloadqpid-python-0745cd0d738fbec7bd48529d7024535fc3301931.tar.gz
Patches from Ted Ross <tross@redhat.com>
QPID-697 Fixed access-rights constants for management schema. Added mutex to fix problems associated with concurrent invocation of accessors for queue statistics. Removed queue schema content that is not relevant to QPID. QPID-698 This patch creates a new subdirectory in python called "mgmt-cli". python/mgmt-cli/main.py can be executed from the shell. If no arguments are supplied, it attempts to connect to the broker at localhost:5672. The first argument is the hostname for the target broker and the second (optional) argument is the TCP port (defaults to 5672). It is assumed that the AMQP spec file is in the following location: /usr/share/amqp/amqp.0-10-preview.xml It is also required that the qpid/python directory be in the PYTHONPATH environment variable. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@603034 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/management.py')
-rw-r--r--python/qpid/management.py239
1 files changed, 179 insertions, 60 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py
index a5ad997a24..8373ecceb7 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -31,17 +31,74 @@ from qpid.client import Client
from qpid.content import Content
from cStringIO import StringIO
from codec import Codec, EOF
+from threading import Lock
+
+
+class SequenceManager:
+ def __init__ (self):
+ self.lock = Lock ()
+ self.sequence = 0
+ self.pending = {}
+
+ def reserve (self, data):
+ self.lock.acquire ()
+ result = self.sequence
+ self.sequence = self.sequence + 1
+ self.pending[result] = data
+ self.lock.release ()
+ return result
+
+ def release (self, seq):
+ data = None
+ self.lock.acquire ()
+ if seq in self.pending:
+ data = self.pending[seq]
+ del self.pending[seq]
+ self.lock.release ()
+ return data
-#===================================================================
-# ManagementMetadata
-#
-# One instance of this class is created for each ManagedBroker. It
-# is used to store metadata from the broker which is needed for the
-# proper interpretation of recevied management content.
-#
-#===================================================================
class ManagementMetadata:
-
+ """One instance of this class is created for each ManagedBroker. It
+ is used to store metadata from the broker which is needed for the
+ proper interpretation of received management content."""
+
+ def encodeValue (self, codec, value, typecode):
+ if typecode == 1:
+ codec.encode_octet (int (value))
+ elif typecode == 2:
+ codec.encode_short (int (value))
+ elif typecode == 3:
+ codec.encode_long (long (value))
+ elif typecode == 4:
+ codec.encode_longlong (long (value))
+ elif typecode == 5:
+ codec.encode_octet (int (value))
+ elif typecode == 6:
+ codec.encode_shortstr (value)
+ elif typecode == 7:
+ codec.encode_longstr (value)
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+ def decodeValue (self, codec, typecode):
+ if typecode == 1:
+ data = codec.decode_octet ()
+ elif typecode == 2:
+ data = codec.decode_short ()
+ elif typecode == 3:
+ data = codec.decode_long ()
+ elif typecode == 4:
+ data = codec.decode_longlong ()
+ elif typecode == 5:
+ data = codec.decode_octet ()
+ elif typecode == 6:
+ data = codec.decode_shortstr ()
+ elif typecode == 7:
+ data = codec.decode_longstr ()
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+ return data
+
def parseSchema (self, cls, codec):
className = codec.decode_shortstr ()
configCount = codec.decode_short ()
@@ -100,12 +157,56 @@ class ManagementMetadata:
inst = (name, type, unit, desc)
insts.append (inst)
- # TODO: Handle notification of schema change outbound
+ for idx in range (methodCount):
+ ft = codec.decode_table ()
+ mname = ft["name"]
+ argCount = ft["argCount"]
+ if "desc" in ft:
+ mdesc = ft["desc"]
+ else:
+ mdesc = None
+
+ args = []
+ for aidx in range (argCount):
+ ft = codec.decode_table ()
+ name = ft["name"]
+ type = ft["type"]
+ dir = ft["dir"].upper ()
+ unit = None
+ min = None
+ max = None
+ maxlen = None
+ desc = None
+ default = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = value
+ elif key == "min":
+ min = value
+ elif key == "max":
+ max = value
+ elif key == "maxlen":
+ maxlen = value
+ elif key == "desc":
+ desc = value
+ elif key == "default":
+ default = value
+
+ arg = (name, type, dir, unit, desc, min, max, maxlen, default)
+ args.append (arg)
+ methods.append ((mname, mdesc, args))
+
+
self.schema[(className,'C')] = configs
self.schema[(className,'I')] = insts
self.schema[(className,'M')] = methods
self.schema[(className,'E')] = events
+ if self.broker.schema_cb != None:
+ self.broker.schema_cb[1] (self.broker.schema_cb[0], className,
+ configs, insts, methods, events)
+
def parseContent (self, cls, codec):
if cls == 'C' and self.broker.config_cb == None:
return
@@ -127,22 +228,7 @@ class ManagementMetadata:
for element in self.schema[(className,cls)][:]:
tc = element[1]
name = element[0]
- if tc == 1: # TODO: Define constants for these
- data = codec.decode_octet ()
- elif tc == 2:
- data = codec.decode_short ()
- elif tc == 3:
- data = codec.decode_long ()
- elif tc == 4:
- data = codec.decode_longlong ()
- elif tc == 5:
- data = codec.decode_octet ()
- elif tc == 6:
- data = codec.decode_shortstr ()
- elif tc == 7:
- data = codec.decode_longstr ()
- else:
- raise ValueError ("Invalid type code: %d" % tc)
+ data = self.decodeValue (codec, tc)
row.append ((name, data))
if cls == 'C':
@@ -168,14 +254,9 @@ class ManagementMetadata:
self.schema = {}
-#===================================================================
-# ManagedBroker
-#
-# An object of this class represents a connection (over AMQP) to a
-# single managed broker.
-#
-#===================================================================
class ManagedBroker:
+ """An object of this class represents a connection (over AMQP) to a
+ single managed broker."""
mExchange = "qpid.management"
dExchange = "amq.direct"
@@ -205,18 +286,35 @@ class ManagedBroker:
msg.complete ()
def reply_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
- methodId = codec.decode_long ()
+ codec = Codec (StringIO (msg.content.body), self.spec)
+ sequence = codec.decode_long ()
status = codec.decode_long ()
sText = codec.decode_shortstr ()
- args = {}
+ data = self.sequenceManager.release (sequence)
+ if data == None:
+ msg.complete ()
+ return
+
+ (userSequence, className, methodName) = data
+
if status == 0:
- args["sequence"] = codec.decode_long ()
- args["body"] = codec.decode_longstr ()
+ ms = self.metadata.schema[(className,'M')]
+ arglist = None
+ for (mname, mdesc, margs) in ms:
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ msg.complete ()
+ return
+
+ args = {}
+ for arg in arglist:
+ if arg[2].find("O") != -1:
+ args[arg[0]] = self.metadata.decodeValue (codec, arg[1])
if self.method_cb != None:
- self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
+ self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args)
msg.complete ()
@@ -225,17 +323,18 @@ class ManagedBroker:
port = 5672,
username = "guest",
password = "guest",
- specfile = "../specs/amqp.0-10-preview.xml"):
-
- self.spec = qpid.spec.load (specfile)
- self.client = None
- self.channel = None
- self.queue = None
- self.rqueue = None
- self.qname = None
- self.rqname = None
- self.metadata = ManagementMetadata (self)
- self.connected = 0
+ specfile = "/usr/share/amqp/amqp.0-10-preview.xml"):
+
+ self.spec = qpid.spec.load (specfile)
+ self.client = None
+ self.channel = None
+ self.queue = None
+ self.rqueue = None
+ self.qname = None
+ self.rqname = None
+ self.metadata = ManagementMetadata (self)
+ self.sequenceManager = SequenceManager ()
+ self.connected = 0
self.lastConnectError = None
# Initialize the callback records
@@ -265,17 +364,37 @@ class ManagedBroker:
def instrumentationListener (self, context, callback):
self.inst_cb = (context, callback)
- def method (self, methodId, objId, className,
+ def method (self, userSequence, objId, className,
methodName, args=None, packageName="qpid"):
codec = Codec (StringIO (), self.spec);
- codec.encode_long (methodId)
- codec.encode_longlong (objId)
- codec.encode_shortstr (self.rqname)
-
- # TODO: Encode args according to schema
- if methodName == "echo":
- codec.encode_long (args["sequence"])
- codec.encode_longstr (args["body"])
+ sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+ codec.encode_long (sequence) # Method sequence id
+ codec.encode_longlong (objId) # ID of object
+ codec.encode_shortstr (self.rqname) # name of reply queue
+
+ # Encode args according to schema
+ if (className,'M') not in self.metadata.schema:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Unknown class name: %s" % className)
+
+ ms = self.metadata.schema[(className,'M')]
+ arglist = None
+ for (mname, mdesc, margs) in ms:
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Unknown method name: %s" % methodName)
+
+ for arg in arglist:
+ if arg[2].find("I") != -1:
+ value = arg[8] # default
+ if arg[0] in args:
+ value = args[arg[0]]
+ if value == None:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Missing non-defaulted argument: %s" % arg[0])
+ self.metadata.encodeValue (codec, value, arg[1])
msg = Content (codec.stream.getvalue ())
msg["content_type"] = "application/octet-stream"
@@ -325,7 +444,7 @@ class ManagedBroker:
self.connected = 1
except socket.error, e:
- print "Socket Error Detected:", e[1]
+ print "Socket Error:", e[1]
self.lastConnectError = e
raise
except: