summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-10-29 22:23:33 +0000
committerRafael H. Schloming <rhs@apache.org>2007-10-29 22:23:33 +0000
commit3667cdd020d46af9c47c1c37998fbed7adae2248 (patch)
treeef1dea76faf22972170069d9e9acb55ff1e82390 /python
parentda6e2b9f62966ef7d0cb69f58ffe1365af98d676 (diff)
downloadqpid-python-3667cdd020d46af9c47c1c37998fbed7adae2248.tar.gz
applied patch from tross posted in QPID-666
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@589889 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/management.py225
-rw-r--r--python/tests_0-10/example.py4
2 files changed, 227 insertions, 2 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py
new file mode 100644
index 0000000000..24c4700c7f
--- /dev/null
+++ b/python/qpid/management.py
@@ -0,0 +1,225 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Management classes for AMQP
+"""
+
+import qpid
+import base64
+import socket
+from threading import Thread
+from message import Message
+from time import sleep
+from qpid.client import Client
+from qpid.content import Content
+from cStringIO import StringIO
+from codec import Codec, EOF
+
+#===================================================================
+# ManagementMetadata
+#
+# One instance of this class is created for each ManagedBroker. It
+# is used to store metadata from the broker which is needed for the
+# proper interpretation of recevied management content.
+#
+#===================================================================
+class ManagementMetadata:
+
+ def parseSchema (self, cls, oid, len, codec):
+ #print "Schema Record: objId=", oid
+
+ config = []
+ inst = []
+ while 1:
+ flags = codec.decode_octet ()
+ if flags == 0x80:
+ break
+
+ tc = codec.decode_octet ()
+ name = codec.decode_shortstr ()
+ desc = codec.decode_shortstr ()
+
+ if flags & 1: # TODO: Define constants for these
+ config.append ((tc, name, desc))
+ if (flags & 1) == 0 or (flags & 2) == 2:
+ inst.append ((tc, name, desc))
+
+ # TODO: Handle notification of schema change outbound
+ self.schema[(oid,'C')] = config
+ self.schema[(oid,'I')] = inst
+
+ def parseContent (self, cls, oid, len, codec):
+ #print "Content Record: Class=", cls, ", objId=", oid
+
+ if cls == 'C' and self.broker.config_cb == None:
+ return
+ if cls == 'I' and self.broker.inst_cb == None:
+ return
+
+ if (oid,cls) not in self.schema:
+ return
+
+ row = []
+ timestamps = []
+
+ timestamps.append (codec.decode_longlong ()); # Current Time
+ timestamps.append (codec.decode_longlong ()); # Create Time
+ timestamps.append (codec.decode_longlong ()); # Delete Time
+
+ for element in self.schema[(oid,cls)][:]:
+ tc = element[0]
+ name = element[1]
+ if tc == 1: # TODO: Define constants for these
+ data = codec.decode_octet ()
+ elif tc == 2:
+ data = codec.decode_short ()
+ elif tc == 3:
+ data = codec.decode_long ()
+ elif tc == 4:
+ data = codec.decode_longlong ()
+ elif tc == 5:
+ data = codec.decode_octet ()
+ elif tc == 6:
+ data = codec.decode_shortstr ()
+ row.append ((name, data))
+
+ if cls == 'C':
+ self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps)
+ if cls == 'I':
+ self.broker.inst_cb[1] (self.broker.inst_cb[0], oid, row, timestamps)
+
+ def parse (self, codec):
+ try:
+ opcode = chr (codec.decode_octet ())
+ except EOF:
+ return 0
+
+ cls = chr (codec.decode_octet ())
+ oid = codec.decode_short ()
+ len = codec.decode_long ()
+
+ if len < 8:
+ raise ValueError ("parse error: value of length field too small")
+
+ if opcode == 'S':
+ self.parseSchema (cls, oid, len, codec)
+
+ if opcode == 'C':
+ self.parseContent (cls, oid, len, codec)
+
+ return 1
+
+ def __init__ (self, broker):
+ self.broker = broker
+ self.schema = {}
+
+
+#===================================================================
+# ManagedBroker
+#
+# An object of this class represents a connection (over AMQP) to a
+# single managed broker.
+#
+#===================================================================
+class ManagedBroker:
+
+ exchange = "qpid.management"
+
+ def checkHeader (self, codec):
+ octet = chr (codec.decode_octet ())
+ if octet != 'A':
+ return 0
+ octet = chr (codec.decode_octet ())
+ if octet != 'M':
+ return 0
+ octet = chr (codec.decode_octet ())
+ if octet != '0':
+ return 0
+ octet = chr (codec.decode_octet ())
+ if octet != '1':
+ return 0
+ return 1
+
+ def receive_cb (self, msg):
+ codec = Codec (StringIO (msg.content.body), self.spec)
+
+ if self.checkHeader (codec) == 0:
+ raise ValueError ("outer header invalid");
+
+ while self.metadata.parse (codec):
+ pass
+
+ msg.complete ()
+
+ def __init__ (self, host = "localhost", port = 5672,
+ username = "guest", password = "guest"):
+
+ self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml")
+ self.client = None
+ self.channel = None
+ self.queue = None
+ self.qname = None
+ self.metadata = ManagementMetadata (self)
+
+ # Initialize the callback records
+ self.schema_cb = None
+ self.config_cb = None
+ self.inst_cb = None
+
+ self.host = host
+ self.port = port
+ self.username = username
+ self.password = password
+
+ def schemaListener (self, context, callback):
+ self.schema_cb = (context, callback)
+
+ def configListener (self, context, callback):
+ self.config_cb = (context, callback)
+
+ def instrumentationListener (self, context, callback):
+ self.inst_cb = (context, callback)
+
+ def start (self):
+ print "Connecting to broker", self.host
+
+ try:
+ self.client = Client (self.host, self.port, self.spec)
+ self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
+ self.channel = self.client.channel (1)
+ response = self.channel.session_open (detached_lifetime=300)
+ self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id)
+
+ self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
+ self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname,
+ routing_key="mgmt")
+ self.channel.message_subscribe (queue=self.qname, destination="dest")
+ self.queue = self.client.queue ("dest")
+ self.queue.listen (self.receive_cb)
+
+ self.channel.message_flow_mode (destination="dest", mode=1)
+ self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF)
+ self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF)
+
+ except socket.error, e:
+ print "Socket Error Detected:", e[1]
+ raise
+ except:
+ raise
diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py
index 9dbe73e3cb..da5ee2441f 100644
--- a/python/tests_0-10/example.py
+++ b/python/tests_0-10/example.py
@@ -22,8 +22,8 @@ from qpid.testlib import testrunner, TestBase
class ExampleTest (TestBase):
"""
- An example Qpid test, illustrating the unittest frameowkr and the
- python Qpid client. The test class must inherit TestCase. The
+ An example Qpid test, illustrating the unittest framework and the
+ python Qpid client. The test class must inherit TestBase. The
test code uses the Qpid client to interact with a qpid broker and
verify it behaves as expected.
"""