summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-15 21:36:00 +0000
committerAlan Conway <aconway@apache.org>2007-11-15 21:36:00 +0000
commita24723ab523b37408d8cb5c3e1268667d166f127 (patch)
tree6d08de729742e372fce6ef51f96a3381b6e7e376 /python/qpid
parentd8dd71de008990f5a665e0191c231ae62fcf44fb (diff)
downloadqpid-python-a24723ab523b37408d8cb5c3e1268667d166f127.tar.gz
QPID-687: comitted qpid-patch7-python.diff for real this time.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@595465 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/codec.py6
-rw-r--r--python/qpid/management.py258
2 files changed, 190 insertions, 74 deletions
diff --git a/python/qpid/codec.py b/python/qpid/codec.py
index a3663773a9..b25de11f11 100644
--- a/python/qpid/codec.py
+++ b/python/qpid/codec.py
@@ -247,6 +247,12 @@ class Codec:
def decode_signed_long(self):
return self.unpack("!q")
+ def encode_signed_int(self, o):
+ self.pack("!l", o)
+
+ def decode_signed_int(self):
+ return self.unpack("!l")
+
def encode_longlong(self, o):
"""
encodes long long (64 bits) data 'o' in network byte order
diff --git a/python/qpid/management.py b/python/qpid/management.py
index 24c4700c7f..a5ad997a24 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -18,7 +18,7 @@
#
"""
-Management classes for AMQP
+Management API for Qpid
"""
import qpid
@@ -42,50 +42,91 @@ from codec import Codec, EOF
#===================================================================
class ManagementMetadata:
- def parseSchema (self, cls, oid, len, codec):
- #print "Schema Record: objId=", oid
-
- config = []
- inst = []
- while 1:
- flags = codec.decode_octet ()
- if flags == 0x80:
- break
-
- tc = codec.decode_octet ()
- name = codec.decode_shortstr ()
- desc = codec.decode_shortstr ()
-
- if flags & 1: # TODO: Define constants for these
- config.append ((tc, name, desc))
- if (flags & 1) == 0 or (flags & 2) == 2:
- inst.append ((tc, name, desc))
+ def parseSchema (self, cls, codec):
+ className = codec.decode_shortstr ()
+ configCount = codec.decode_short ()
+ instCount = codec.decode_short ()
+ methodCount = codec.decode_short ()
+ eventCount = codec.decode_short ()
+
+ configs = []
+ insts = []
+ methods = []
+ events = []
+
+ configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
+ insts.append (("id", 4, None, None))
+
+ for idx in range (configCount):
+ ft = codec.decode_table ()
+ name = ft["name"]
+ type = ft["type"]
+ access = ft["access"]
+ index = ft["index"]
+ unit = None
+ min = None
+ max = None
+ maxlen = None
+ desc = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = value
+ elif key == "min":
+ min = value
+ elif key == "max":
+ max = value
+ elif key == "maxlen":
+ maxlen = value
+ elif key == "desc":
+ desc = value
+
+ config = (name, type, unit, desc, access, index, min, max, maxlen)
+ configs.append (config)
+
+ for idx in range (instCount):
+ ft = codec.decode_table ()
+ name = ft["name"]
+ type = ft["type"]
+ unit = None
+ desc = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = value
+ elif key == "desc":
+ desc = value
+
+ inst = (name, type, unit, desc)
+ insts.append (inst)
# TODO: Handle notification of schema change outbound
- self.schema[(oid,'C')] = config
- self.schema[(oid,'I')] = inst
-
- def parseContent (self, cls, oid, len, codec):
- #print "Content Record: Class=", cls, ", objId=", oid
+ self.schema[(className,'C')] = configs
+ self.schema[(className,'I')] = insts
+ self.schema[(className,'M')] = methods
+ self.schema[(className,'E')] = events
+ def parseContent (self, cls, codec):
if cls == 'C' and self.broker.config_cb == None:
return
if cls == 'I' and self.broker.inst_cb == None:
return
- if (oid,cls) not in self.schema:
+ className = codec.decode_shortstr ()
+
+ if (className,cls) not in self.schema:
return
row = []
timestamps = []
- timestamps.append (codec.decode_longlong ()); # Current Time
- timestamps.append (codec.decode_longlong ()); # Create Time
- timestamps.append (codec.decode_longlong ()); # Delete Time
+ timestamps.append (codec.decode_longlong ()) # Current Time
+ timestamps.append (codec.decode_longlong ()) # Create Time
+ timestamps.append (codec.decode_longlong ()) # Delete Time
- for element in self.schema[(oid,cls)][:]:
- tc = element[0]
- name = element[1]
+ for element in self.schema[(className,cls)][:]:
+ tc = element[1]
+ name = element[0]
if tc == 1: # TODO: Define constants for these
data = codec.decode_octet ()
elif tc == 2:
@@ -98,33 +139,29 @@ class ManagementMetadata:
data = codec.decode_octet ()
elif tc == 6:
data = codec.decode_shortstr ()
+ elif tc == 7:
+ data = codec.decode_longstr ()
+ else:
+ raise ValueError ("Invalid type code: %d" % tc)
row.append ((name, data))
if cls == 'C':
- self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps)
- if cls == 'I':
- self.broker.inst_cb[1] (self.broker.inst_cb[0], oid, row, timestamps)
+ self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps)
+ elif cls == 'I':
+ self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps)
def parse (self, codec):
- try:
- opcode = chr (codec.decode_octet ())
- except EOF:
- return 0
-
- cls = chr (codec.decode_octet ())
- oid = codec.decode_short ()
- len = codec.decode_long ()
-
- if len < 8:
- raise ValueError ("parse error: value of length field too small")
+ opcode = chr (codec.decode_octet ())
+ cls = chr (codec.decode_octet ())
if opcode == 'S':
- self.parseSchema (cls, oid, len, codec)
+ self.parseSchema (cls, codec)
- if opcode == 'C':
- self.parseContent (cls, oid, len, codec)
+ elif opcode == 'C':
+ self.parseContent (cls, codec)
- return 1
+ else:
+ raise ValueError ("Unknown opcode: %c" % opcode);
def __init__ (self, broker):
self.broker = broker
@@ -140,7 +177,8 @@ class ManagementMetadata:
#===================================================================
class ManagedBroker:
- exchange = "qpid.management"
+ mExchange = "qpid.management"
+ dExchange = "amq.direct"
def checkHeader (self, codec):
octet = chr (codec.decode_octet ())
@@ -157,69 +195,141 @@ class ManagedBroker:
return 0
return 1
- def receive_cb (self, msg):
+ def publish_cb (self, msg):
codec = Codec (StringIO (msg.content.body), self.spec)
if self.checkHeader (codec) == 0:
raise ValueError ("outer header invalid");
- while self.metadata.parse (codec):
- pass
-
+ self.metadata.parse (codec)
msg.complete ()
- def __init__ (self, host = "localhost", port = 5672,
- username = "guest", password = "guest"):
+ def reply_cb (self, msg):
+ codec = Codec (StringIO (msg.content.body), self.spec)
+ methodId = codec.decode_long ()
+ status = codec.decode_long ()
+ sText = codec.decode_shortstr ()
+
+ args = {}
+ if status == 0:
+ args["sequence"] = codec.decode_long ()
+ args["body"] = codec.decode_longstr ()
+
+ if self.method_cb != None:
+ self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
- self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml")
- self.client = None
- self.channel = None
- self.queue = None
- self.qname = None
- self.metadata = ManagementMetadata (self)
+ msg.complete ()
+
+ def __init__ (self,
+ host = "localhost",
+ port = 5672,
+ username = "guest",
+ password = "guest",
+ specfile = "../specs/amqp.0-10-preview.xml"):
+
+ self.spec = qpid.spec.load (specfile)
+ self.client = None
+ self.channel = None
+ self.queue = None
+ self.rqueue = None
+ self.qname = None
+ self.rqname = None
+ self.metadata = ManagementMetadata (self)
+ self.connected = 0
+ self.lastConnectError = None
# Initialize the callback records
+ self.status_cb = None
self.schema_cb = None
self.config_cb = None
self.inst_cb = None
+ self.method_cb = None
self.host = host
self.port = port
self.username = username
self.password = password
+ def statusListener (self, context, callback):
+ self.status_cb = (context, callback)
+
def schemaListener (self, context, callback):
self.schema_cb = (context, callback)
def configListener (self, context, callback):
self.config_cb = (context, callback)
+ def methodListener (self, context, callback):
+ self.method_cb = (context, callback)
+
def instrumentationListener (self, context, callback):
self.inst_cb = (context, callback)
+ def method (self, methodId, objId, className,
+ methodName, args=None, packageName="qpid"):
+ codec = Codec (StringIO (), self.spec);
+ codec.encode_long (methodId)
+ codec.encode_longlong (objId)
+ codec.encode_shortstr (self.rqname)
+
+ # TODO: Encode args according to schema
+ if methodName == "echo":
+ codec.encode_long (args["sequence"])
+ codec.encode_longstr (args["body"])
+
+ msg = Content (codec.stream.getvalue ())
+ msg["content_type"] = "application/octet-stream"
+ msg["routing_key"] = "method." + packageName + "." + className + "." + methodName
+ msg["reply_to"] = self.spec.struct ("reply_to")
+ self.channel.message_transfer (destination="qpid.management", content=msg)
+
+ def isConnected (self):
+ return connected
+
def start (self):
- print "Connecting to broker", self.host
+ print "Connecting to broker %s:%d" % (self.host, self.port)
try:
self.client = Client (self.host, self.port, self.spec)
self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
self.channel = self.client.channel (1)
- response = self.channel.session_open (detached_lifetime=300)
- self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id)
+ response = self.channel.session_open (detached_lifetime=10)
+ self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
+ self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
+
+ self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
+ self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1)
+
+ self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname,
+ routing_key="mgmt.#")
+ self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname,
+ routing_key=self.rqname)
+
+ self.channel.message_subscribe (queue=self.qname, destination="mdest")
+ self.channel.message_subscribe (queue=self.rqname, destination="rdest")
+
+ self.queue = self.client.queue ("mdest")
+ self.queue.listen (self.publish_cb)
- self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
- self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname,
- routing_key="mgmt")
- self.channel.message_subscribe (queue=self.qname, destination="dest")
- self.queue = self.client.queue ("dest")
- self.queue.listen (self.receive_cb)
+ self.channel.message_flow_mode (destination="mdest", mode=1)
+ self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF)
+ self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF)
- self.channel.message_flow_mode (destination="dest", mode=1)
- self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF)
- self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF)
+ self.rqueue = self.client.queue ("rdest")
+ self.rqueue.listen (self.reply_cb)
+
+ self.channel.message_flow_mode (destination="rdest", mode=1)
+ self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
+ self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
+
+ self.connected = 1
except socket.error, e:
print "Socket Error Detected:", e[1]
+ self.lastConnectError = e
raise
except:
raise
+
+ def stop (self):
+ pass