summaryrefslogtreecommitdiff
path: root/RC5/python/qpid
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
commit38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac (patch)
tree3599403c0c9690898f1e336c009a5564c587c732 /RC5/python/qpid
parenta8960649bcd365ef70a5de7812f5910222388a6d (diff)
downloadqpid-python-38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac.tar.gz
Tagging RC5 for M4 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@728121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC5/python/qpid')
-rw-r--r--RC5/python/qpid/__init__.py84
-rw-r--r--RC5/python/qpid/assembler.py118
-rw-r--r--RC5/python/qpid/client.py225
-rw-r--r--RC5/python/qpid/codec.py590
-rw-r--r--RC5/python/qpid/codec010.py301
-rw-r--r--RC5/python/qpid/compat.py28
-rw-r--r--RC5/python/qpid/connection.py218
-rw-r--r--RC5/python/qpid/connection08.py493
-rw-r--r--RC5/python/qpid/content.py58
-rw-r--r--RC5/python/qpid/datatypes.py349
-rw-r--r--RC5/python/qpid/delegate.py53
-rw-r--r--RC5/python/qpid/delegates.py162
-rw-r--r--RC5/python/qpid/disp.py79
-rw-r--r--RC5/python/qpid/exceptions.py21
-rw-r--r--RC5/python/qpid/framer.py159
-rw-r--r--RC5/python/qpid/invoker.py48
-rw-r--r--RC5/python/qpid/log.py28
-rw-r--r--RC5/python/qpid/management.py913
-rw-r--r--RC5/python/qpid/managementdata.py753
-rw-r--r--RC5/python/qpid/message.py74
-rw-r--r--RC5/python/qpid/packer.py36
-rw-r--r--RC5/python/qpid/peer.py465
-rw-r--r--RC5/python/qpid/queue.py86
-rw-r--r--RC5/python/qpid/reference.py117
-rw-r--r--RC5/python/qpid/session.py379
-rw-r--r--RC5/python/qpid/spec.py59
-rw-r--r--RC5/python/qpid/spec010.py693
-rw-r--r--RC5/python/qpid/spec08.py504
-rw-r--r--RC5/python/qpid/testlib.py392
-rw-r--r--RC5/python/qpid/util.py117
30 files changed, 7602 insertions, 0 deletions
diff --git a/RC5/python/qpid/__init__.py b/RC5/python/qpid/__init__.py
new file mode 100644
index 0000000000..ff9cc04df8
--- /dev/null
+++ b/RC5/python/qpid/__init__.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import spec, codec, connection, content, peer, delegate, client
+
+class Struct:
+
+ def __init__(self, type, *args, **kwargs):
+ self.__dict__["type"] = type
+ self.__dict__["_values"] = {}
+
+ if len(args) > len(self.type.fields):
+ raise TypeError("too many args")
+
+ for a, f in zip(args, self.type.fields):
+ self.set(f.name, a)
+
+ for k, a in kwargs.items():
+ self.set(k, a)
+
+ def _check(self, attr):
+ field = self.type.fields.byname.get(attr)
+ if field == None:
+ raise AttributeError(attr)
+ return field
+
+ def exists(self, attr):
+ return self.type.fields.byname.has_key(attr)
+
+ def has(self, attr):
+ self._check(attr)
+ return self._values.has_key(attr)
+
+ def set(self, attr, value):
+ self._check(attr)
+ self._values[attr] = value
+
+ def get(self, attr):
+ field = self._check(attr)
+ return self._values.get(attr, field.default())
+
+ def clear(self, attr):
+ self._check(attr)
+ del self._values[attr]
+
+ def __setattr__(self, attr, value):
+ self.set(attr, value)
+
+ def __getattr__(self, attr):
+ return self.get(attr)
+
+ def __delattr__(self, attr):
+ self.clear(attr)
+
+ def __setitem__(self, attr, value):
+ self.set(attr, value)
+
+ def __getitem__(self, attr):
+ return self.get(attr)
+
+ def __delitem__(self, attr):
+ self.clear(attr)
+
+ def __str__(self):
+ return "%s %s" % (self.type, self._values)
+
+ def __repr__(self):
+ return str(self)
diff --git a/RC5/python/qpid/assembler.py b/RC5/python/qpid/assembler.py
new file mode 100644
index 0000000000..92bb0aa0f8
--- /dev/null
+++ b/RC5/python/qpid/assembler.py
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from codec010 import StringCodec
+from framer import *
+from logging import getLogger
+
+log = getLogger("qpid.io.seg")
+
+class Segment:
+
+ def __init__(self, first, last, type, track, channel, payload):
+ self.id = None
+ self.offset = None
+ self.first = first
+ self.last = last
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def decode(self, spec):
+ segs = spec["segment_type"]
+ choice = segs.choices[self.type]
+ return getattr(self, "decode_%s" % choice.name)(spec)
+
+ def decode_control(self, spec):
+ sc = StringCodec(spec, self.payload)
+ return sc.read_control()
+
+ def decode_command(self, spec):
+ sc = StringCodec(spec, self.payload)
+ hdr, cmd = sc.read_command()
+ cmd.id = self.id
+ return hdr, cmd
+
+ def decode_header(self, spec):
+ sc = StringCodec(spec, self.payload)
+ values = []
+ while len(sc.encoded) > 0:
+ values.append(sc.read_struct32())
+ return values
+
+ def decode_body(self, spec):
+ return self.payload
+
+ def __str__(self):
+ return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
+ self.track, self.channel, self.payload)
+
+ def __repr__(self):
+ return str(self)
+
+class Assembler(Framer):
+
+ def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD):
+ Framer.__init__(self, sock)
+ self.max_payload = max_payload
+ self.fragments = {}
+
+ def read_segment(self):
+ while True:
+ frame = self.read_frame()
+
+ key = (frame.channel, frame.track)
+ seg = self.fragments.get(key)
+ if seg == None:
+ seg = Segment(frame.isFirstSegment(), frame.isLastSegment(),
+ frame.type, frame.track, frame.channel, "")
+ self.fragments[key] = seg
+
+ seg.payload += frame.payload
+
+ if frame.isLastFrame():
+ self.fragments.pop(key)
+ log.debug("RECV %s", seg)
+ return seg
+
+ def write_segment(self, segment):
+ remaining = segment.payload
+
+ first = True
+ while first or remaining:
+ payload = remaining[:self.max_payload]
+ remaining = remaining[self.max_payload:]
+
+ flags = 0
+ if first:
+ flags |= FIRST_FRM
+ first = False
+ if not remaining:
+ flags |= LAST_FRM
+ if segment.first:
+ flags |= FIRST_SEG
+ if segment.last:
+ flags |= LAST_SEG
+
+ frame = Frame(flags, segment.type, segment.track, segment.channel,
+ payload)
+ self.write_frame(frame)
+
+ log.debug("SENT %s", segment)
diff --git a/RC5/python/qpid/client.py b/RC5/python/qpid/client.py
new file mode 100644
index 0000000000..4605710de8
--- /dev/null
+++ b/RC5/python/qpid/client.py
@@ -0,0 +1,225 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+An AQMP client implementation that uses a custom delegate for
+interacting with the server.
+"""
+
+import os, threading
+from peer import Peer, Channel, Closed
+from delegate import Delegate
+from connection08 import Connection, Frame, connect
+from spec import load
+from queue import Queue
+from reference import ReferenceId, References
+
+
+class Client:
+
+ def __init__(self, host, port, spec = None, vhost = None):
+ self.host = host
+ self.port = port
+ if spec:
+ self.spec = spec
+ else:
+ try:
+ name = os.environ["AMQP_SPEC"]
+ except KeyError:
+ raise EnvironmentError("environment variable AMQP_SPEC must be set")
+ self.spec = load(name)
+ self.structs = StructFactory(self.spec)
+ self.sessions = {}
+
+ self.mechanism = None
+ self.response = None
+ self.locale = None
+
+ self.vhost = vhost
+ if self.vhost == None:
+ self.vhost = "/"
+
+ self.queues = {}
+ self.lock = threading.Lock()
+
+ self.closed = False
+ self.reason = None
+ self.started = threading.Event()
+
+ def wait(self):
+ self.started.wait()
+ if self.closed:
+ raise Closed(self.reason)
+
+ def queue(self, key):
+ self.lock.acquire()
+ try:
+ try:
+ q = self.queues[key]
+ except KeyError:
+ q = Queue(0)
+ self.queues[key] = q
+ finally:
+ self.lock.release()
+ return q
+
+ def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None):
+ self.mechanism = mechanism
+ self.response = response
+ self.locale = locale
+ self.tune_params = tune_params
+
+ self.socket = connect(self.host, self.port)
+ self.conn = Connection(self.socket, self.spec)
+ self.peer = Peer(self.conn, ClientDelegate(self), Session)
+
+ self.conn.init()
+ self.peer.start()
+ self.wait()
+ self.channel(0).connection_open(self.vhost)
+
+ def channel(self, id):
+ self.lock.acquire()
+ try:
+ ssn = self.peer.channel(id)
+ ssn.client = self
+ self.sessions[id] = ssn
+ finally:
+ self.lock.release()
+ return ssn
+
+ def session(self):
+ self.lock.acquire()
+ try:
+ id = None
+ for i in xrange(1, 64*1024):
+ if not self.sessions.has_key(id):
+ id = i
+ break
+ finally:
+ self.lock.release()
+ if id == None:
+ raise RuntimeError("out of channels")
+ else:
+ return self.channel(id)
+
+ def close(self):
+ self.socket.close()
+
+class ClientDelegate(Delegate):
+
+ def __init__(self, client):
+ Delegate.__init__(self)
+ self.client = client
+
+ def connection_start(self, ch, msg):
+ msg.start_ok(mechanism=self.client.mechanism,
+ response=self.client.response,
+ locale=self.client.locale)
+
+ def connection_tune(self, ch, msg):
+ if self.client.tune_params:
+ #todo: just override the params, i.e. don't require them
+ # all to be included in tune_params
+ msg.tune_ok(**self.client.tune_params)
+ else:
+ msg.tune_ok(*msg.frame.args)
+ self.client.started.set()
+
+ def message_transfer(self, ch, msg):
+ self.client.queue(msg.destination).put(msg)
+
+ def message_open(self, ch, msg):
+ ch.references.open(msg.reference)
+
+ def message_close(self, ch, msg):
+ ch.references.close(msg.reference)
+
+ def message_append(self, ch, msg):
+ ch.references.get(msg.reference).append(msg.bytes)
+
+ def message_acquired(self, ch, msg):
+ ch.control_queue.put(msg)
+
+ def basic_deliver(self, ch, msg):
+ self.client.queue(msg.consumer_tag).put(msg)
+
+ def channel_pong(self, ch, msg):
+ msg.ok()
+
+ def channel_close(self, ch, msg):
+ ch.closed(msg)
+
+ def session_ack(self, ch, msg):
+ pass
+
+ def session_closed(self, ch, msg):
+ ch.closed(msg)
+
+ def connection_close(self, ch, msg):
+ self.client.peer.closed(msg)
+
+ def execution_complete(self, ch, msg):
+ ch.completion.complete(msg.cumulative_execution_mark)
+
+ def execution_result(self, ch, msg):
+ future = ch.futures[msg.command_id]
+ future.put_response(ch, msg.data)
+
+ def closed(self, reason):
+ self.client.closed = True
+ self.client.reason = reason
+ self.client.started.set()
+
+class StructFactory:
+
+ def __init__(self, spec):
+ self.spec = spec
+ self.factories = {}
+
+ def __getattr__(self, name):
+ if self.factories.has_key(name):
+ return self.factories[name]
+ elif self.spec.domains.byname.has_key(name):
+ f = lambda *args, **kwargs: self.struct(name, *args, **kwargs)
+ self.factories[name] = f
+ return f
+ else:
+ raise AttributeError(name)
+
+ def struct(self, name, *args, **kwargs):
+ return self.spec.struct(name, *args, **kwargs)
+
+class Session(Channel):
+
+ def __init__(self, *args):
+ Channel.__init__(self, *args)
+ self.references = References()
+ self.client = None
+
+ def open(self):
+ self.session_open()
+
+ def close(self):
+ self.session_close()
+ self.client.lock.acquire()
+ try:
+ del self.client.sessions[self.id]
+ finally:
+ self.client.lock.release()
diff --git a/RC5/python/qpid/codec.py b/RC5/python/qpid/codec.py
new file mode 100644
index 0000000000..8026b209dc
--- /dev/null
+++ b/RC5/python/qpid/codec.py
@@ -0,0 +1,590 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Utility code to translate between python objects and AMQP encoded data
+fields.
+
+The unit test for this module is located in tests/codec.py
+"""
+
+import re, qpid, spec08
+from cStringIO import StringIO
+from struct import *
+from reference import ReferenceId
+
+class EOF(Exception):
+ pass
+
+TYPE_ALIASES = {
+ "long_string": "longstr",
+ "unsigned_int": "long"
+ }
+
+class Codec:
+
+ """
+ class that handles encoding/decoding of AMQP primitives
+ """
+
+ def __init__(self, stream, spec):
+ """
+ initializing the stream/fields used
+ """
+ self.stream = stream
+ self.spec = spec
+ self.nwrote = 0
+ self.nread = 0
+ self.incoming_bits = []
+ self.outgoing_bits = []
+
+ self.types = {}
+ self.codes = {}
+ self.encodings = {
+ basestring: "longstr",
+ int: "long",
+ long: "long",
+ None.__class__:"void",
+ list: "sequence",
+ tuple: "sequence",
+ dict: "table"
+ }
+
+ for constant in self.spec.constants:
+ if constant.klass == "field-table-type":
+ type = constant.name.replace("field_table_", "")
+ self.typecode(constant.id, TYPE_ALIASES.get(type, type))
+
+ if not self.types:
+ self.typecode(ord('S'), "longstr")
+ self.typecode(ord('I'), "long")
+
+ def typecode(self, code, type):
+ self.types[code] = type
+ self.codes[type] = code
+
+ def resolve(self, klass):
+ if self.encodings.has_key(klass):
+ return self.encodings[klass]
+ for base in klass.__bases__:
+ result = self.resolve(base)
+ if result != None:
+ return result
+
+ def read(self, n):
+ """
+ reads in 'n' bytes from the stream. Can raise EOF exception
+ """
+ self.clearbits()
+ data = self.stream.read(n)
+ if n > 0 and len(data) == 0:
+ raise EOF()
+ self.nread += len(data)
+ return data
+
+ def write(self, s):
+ """
+ writes data 's' to the stream
+ """
+ self.flushbits()
+ self.stream.write(s)
+ self.nwrote += len(s)
+
+ def flush(self):
+ """
+ flushes the bits and data present in the stream
+ """
+ self.flushbits()
+ self.stream.flush()
+
+ def flushbits(self):
+ """
+ flushes the bits(compressed into octets) onto the stream
+ """
+ if len(self.outgoing_bits) > 0:
+ bytes = []
+ index = 0
+ for b in self.outgoing_bits:
+ if index == 0: bytes.append(0)
+ if b: bytes[-1] |= 1 << index
+ index = (index + 1) % 8
+ del self.outgoing_bits[:]
+ for byte in bytes:
+ self.encode_octet(byte)
+
+ def clearbits(self):
+ if self.incoming_bits:
+ self.incoming_bits = []
+
+ def pack(self, fmt, *args):
+ """
+ packs the data 'args' as per the format 'fmt' and writes it to the stream
+ """
+ self.write(pack(fmt, *args))
+
+ def unpack(self, fmt):
+ """
+ reads data from the stream and unpacks it as per the format 'fmt'
+ """
+ size = calcsize(fmt)
+ data = self.read(size)
+ values = unpack(fmt, data)
+ if len(values) == 1:
+ return values[0]
+ else:
+ return values
+
+ def encode(self, type, value):
+ """
+ calls the appropriate encode function e.g. encode_octet, encode_short etc.
+ """
+ if isinstance(type, spec08.Struct):
+ self.encode_struct(type, value)
+ else:
+ getattr(self, "encode_" + type)(value)
+
+ def decode(self, type):
+ """
+ calls the appropriate decode function e.g. decode_octet, decode_short etc.
+ """
+ if isinstance(type, spec08.Struct):
+ return self.decode_struct(type)
+ else:
+ return getattr(self, "decode_" + type)()
+
+ def encode_bit(self, o):
+ """
+ encodes a bit
+ """
+ if o:
+ self.outgoing_bits.append(True)
+ else:
+ self.outgoing_bits.append(False)
+
+ def decode_bit(self):
+ """
+ decodes a bit
+ """
+ if len(self.incoming_bits) == 0:
+ bits = self.decode_octet()
+ for i in range(8):
+ self.incoming_bits.append(bits >> i & 1 != 0)
+ return self.incoming_bits.pop(0)
+
+ def encode_octet(self, o):
+ """
+ encodes octet (8 bits) data 'o' in network byte order
+ """
+
+ # octet's valid range is [0,255]
+ if (o < 0 or o > 255):
+ raise ValueError('Valid range of octet is [0,255]')
+
+ self.pack("!B", int(o))
+
+ def decode_octet(self):
+ """
+ decodes a octet (8 bits) encoded in network byte order
+ """
+ return self.unpack("!B")
+
+ def encode_short(self, o):
+ """
+ encodes short (16 bits) data 'o' in network byte order
+ """
+
+ # short int's valid range is [0,65535]
+ if (o < 0 or o > 65535):
+ raise ValueError('Valid range of short int is [0,65535]: %s' % o)
+
+ self.pack("!H", int(o))
+
+ def decode_short(self):
+ """
+ decodes a short (16 bits) in network byte order
+ """
+ return self.unpack("!H")
+
+ def encode_long(self, o):
+ """
+ encodes long (32 bits) data 'o' in network byte order
+ """
+
+ # we need to check both bounds because on 64 bit platforms
+ # struct.pack won't raise an error if o is too large
+ if (o < 0 or o > 4294967295):
+ raise ValueError('Valid range of long int is [0,4294967295]')
+
+ self.pack("!L", int(o))
+
+ def decode_long(self):
+ """
+ decodes a long (32 bits) in network byte order
+ """
+ return self.unpack("!L")
+
+ def encode_signed_long(self, o):
+ self.pack("!q", o)
+
+ def decode_signed_long(self):
+ return self.unpack("!q")
+
+ def encode_signed_int(self, o):
+ self.pack("!l", o)
+
+ def decode_signed_int(self):
+ return self.unpack("!l")
+
+ def encode_longlong(self, o):
+ """
+ encodes long long (64 bits) data 'o' in network byte order
+ """
+ self.pack("!Q", o)
+
+ def decode_longlong(self):
+ """
+ decodes a long long (64 bits) in network byte order
+ """
+ return self.unpack("!Q")
+
+ def encode_float(self, o):
+ self.pack("!f", o)
+
+ def decode_float(self):
+ return self.unpack("!f")
+
+ def encode_double(self, o):
+ self.pack("!d", o)
+
+ def decode_double(self):
+ return self.unpack("!d")
+
+ def encode_bin128(self, b):
+ for idx in range (0,16):
+ self.pack("!B", ord (b[idx]))
+
+ def decode_bin128(self):
+ result = ""
+ for idx in range (0,16):
+ result = result + chr (self.unpack("!B"))
+ return result
+
+ def encode_raw(self, len, b):
+ for idx in range (0,len):
+ self.pack("!B", b[idx])
+
+ def decode_raw(self, len):
+ result = ""
+ for idx in range (0,len):
+ result = result + chr (self.unpack("!B"))
+ return result
+
+ def enc_str(self, fmt, s):
+ """
+ encodes a string 's' in network byte order as per format 'fmt'
+ """
+ size = len(s)
+ self.pack(fmt, size)
+ self.write(s)
+
+ def dec_str(self, fmt):
+ """
+ decodes a string in network byte order as per format 'fmt'
+ """
+ size = self.unpack(fmt)
+ return self.read(size)
+
+ def encode_shortstr(self, s):
+ """
+ encodes a short string 's' in network byte order
+ """
+
+ # short strings are limited to 255 octets
+ if len(s) > 255:
+ raise ValueError('Short strings are limited to 255 octets')
+
+ self.enc_str("!B", s)
+
+ def decode_shortstr(self):
+ """
+ decodes a short string in network byte order
+ """
+ return self.dec_str("!B")
+
+ def encode_longstr(self, s):
+ """
+ encodes a long string 's' in network byte order
+ """
+ if isinstance(s, dict):
+ self.encode_table(s)
+ else:
+ self.enc_str("!L", s)
+
+ def decode_longstr(self):
+ """
+ decodes a long string 's' in network byte order
+ """
+ return self.dec_str("!L")
+
+ def encode_table(self, tbl):
+ """
+ encodes a table data structure in network byte order
+ """
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ if tbl:
+ for key, value in tbl.items():
+ if self.spec.major == 8 and self.spec.minor == 0 and len(key) > 128:
+ raise ValueError("field table key too long: '%s'" % key)
+ type = self.resolve(value.__class__)
+ if type == None:
+ raise ValueError("no encoding for: " + value.__class__)
+ codec.encode_shortstr(key)
+ codec.encode_octet(self.codes[type])
+ codec.encode(type, value)
+ s = enc.getvalue()
+ self.encode_long(len(s))
+ self.write(s)
+
+ def decode_table(self):
+ """
+ decodes a table data structure in network byte order
+ """
+ size = self.decode_long()
+ start = self.nread
+ result = {}
+ while self.nread - start < size:
+ key = self.decode_shortstr()
+ code = self.decode_octet()
+ if self.types.has_key(code):
+ value = self.decode(self.types[code])
+ else:
+ w = width(code)
+ if fixed(code):
+ value = self.read(w)
+ else:
+ value = self.read(self.dec_num(w))
+ result[key] = value
+ return result
+
+ def encode_timestamp(self, t):
+ """
+ encodes a timestamp data structure in network byte order
+ """
+ self.encode_longlong(t)
+
+ def decode_timestamp(self):
+ """
+ decodes a timestamp data structure in network byte order
+ """
+ return self.decode_longlong()
+
+ def encode_content(self, s):
+ """
+ encodes a content data structure in network byte order
+
+ content can be passed as a string in which case it is assumed to
+ be inline data, or as an instance of ReferenceId indicating it is
+ a reference id
+ """
+ if isinstance(s, ReferenceId):
+ self.encode_octet(1)
+ self.encode_longstr(s.id)
+ else:
+ self.encode_octet(0)
+ self.encode_longstr(s)
+
+ def decode_content(self):
+ """
+ decodes a content data structure in network byte order
+
+ return a string for inline data and a ReferenceId instance for
+ references
+ """
+ type = self.decode_octet()
+ if type == 0:
+ return self.decode_longstr()
+ else:
+ return ReferenceId(self.decode_longstr())
+
+ # new domains for 0-10:
+
+ def encode_rfc1982_long(self, s):
+ self.encode_long(s)
+
+ def decode_rfc1982_long(self):
+ return self.decode_long()
+
+ def encode_rfc1982_long_set(self, s):
+ self.encode_short(len(s) * 4)
+ for i in s:
+ self.encode_long(i)
+
+ def decode_rfc1982_long_set(self):
+ count = self.decode_short() / 4
+ set = []
+ for i in range(0, count):
+ set.append(self.decode_long())
+ return set;
+
+ def encode_uuid(self, s):
+ self.pack("16s", s)
+
+ def decode_uuid(self):
+ return self.unpack("16s")
+
+ def enc_num(self, width, n):
+ if width == 1:
+ self.encode_octet(n)
+ elif width == 2:
+ self.encode_short(n)
+ elif width == 3:
+ self.encode_long(n)
+ else:
+ raise ValueError("invalid width: %s" % width)
+
+ def dec_num(self, width):
+ if width == 1:
+ return self.decode_octet()
+ elif width == 2:
+ return self.decode_short()
+ elif width == 4:
+ return self.decode_long()
+ else:
+ raise ValueError("invalid width: %s" % width)
+
+ def encode_struct(self, type, s):
+ if type.size:
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ codec.encode_struct_body(type, s)
+ codec.flush()
+ body = enc.getvalue()
+ self.enc_num(type.size, len(body))
+ self.write(body)
+ else:
+ self.encode_struct_body(type, s)
+
+ def decode_struct(self, type):
+ if type.size:
+ size = self.dec_num(type.size)
+ if size == 0:
+ return None
+ return self.decode_struct_body(type)
+
+ def encode_struct_body(self, type, s):
+ reserved = 8*type.pack - len(type.fields)
+ assert reserved >= 0
+
+ for f in type.fields:
+ if s == None:
+ self.encode_bit(False)
+ elif f.type == "bit":
+ self.encode_bit(s.get(f.name))
+ else:
+ self.encode_bit(s.has(f.name))
+
+ for i in range(reserved):
+ self.encode_bit(False)
+
+ for f in type.fields:
+ if f.type != "bit" and s != None and s.has(f.name):
+ self.encode(f.type, s.get(f.name))
+
+ self.flush()
+
+ def decode_struct_body(self, type):
+ reserved = 8*type.pack - len(type.fields)
+ assert reserved >= 0
+
+ s = qpid.Struct(type)
+
+ for f in type.fields:
+ if f.type == "bit":
+ s.set(f.name, self.decode_bit())
+ elif self.decode_bit():
+ s.set(f.name, None)
+
+ for i in range(reserved):
+ if self.decode_bit():
+ raise ValueError("expecting reserved flag")
+
+ for f in type.fields:
+ if f.type != "bit" and s.has(f.name):
+ s.set(f.name, self.decode(f.type))
+
+ self.clearbits()
+
+ return s
+
+ def encode_long_struct(self, s):
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ type = s.type
+ codec.encode_short(type.type)
+ codec.encode_struct_body(type, s)
+ self.encode_longstr(enc.getvalue())
+
+ def decode_long_struct(self):
+ codec = Codec(StringIO(self.decode_longstr()), self.spec)
+ type = self.spec.structs[codec.decode_short()]
+ return codec.decode_struct_body(type)
+
+ def decode_array(self):
+ size = self.decode_long()
+ code = self.decode_octet()
+ count = self.decode_long()
+ result = []
+ for i in range(0, count):
+ if self.types.has_key(code):
+ value = self.decode(self.types[code])
+ else:
+ w = width(code)
+ if fixed(code):
+ value = self.read(w)
+ else:
+ value = self.read(self.dec_num(w))
+ result.append(value)
+ return result
+
+def fixed(code):
+ return (code >> 6) != 2
+
+def width(code):
+ # decimal
+ if code >= 192:
+ decsel = (code >> 4) & 3
+ if decsel == 0:
+ return 5
+ elif decsel == 1:
+ return 9
+ elif decsel == 3:
+ return 0
+ else:
+ raise ValueError(code)
+ # variable width
+ elif code < 192 and code >= 128:
+ lenlen = (code >> 4) & 3
+ if lenlen == 3: raise ValueError(code)
+ return 2 ** lenlen
+ # fixed width
+ else:
+ return (code >> 4) & 7
diff --git a/RC5/python/qpid/codec010.py b/RC5/python/qpid/codec010.py
new file mode 100644
index 0000000000..f34025ef17
--- /dev/null
+++ b/RC5/python/qpid/codec010.py
@@ -0,0 +1,301 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import datetime
+from packer import Packer
+from datatypes import serial, timestamp, RangedSet, Struct
+
+class CodecException(Exception): pass
+
+class Codec(Packer):
+
+ def __init__(self, spec):
+ self.spec = spec
+
+ def write_void(self, v):
+ assert v == None
+ def read_void(self):
+ return None
+
+ def write_bit(self, b):
+ if not b: raise ValueError(b)
+ def read_bit(self):
+ return True
+
+ def read_uint8(self):
+ return self.unpack("!B")
+ def write_uint8(self, n):
+ return self.pack("!B", n)
+
+ def read_int8(self):
+ return self.unpack("!b")
+ def write_int8(self, n):
+ self.pack("!b", n)
+
+ def read_char(self):
+ return self.unpack("!c")
+ def write_char(self, c):
+ self.pack("!c", c)
+
+ def read_boolean(self):
+ return self.read_uint8() != 0
+ def write_boolean(self, b):
+ if b: n = 1
+ else: n = 0
+ self.write_uint8(n)
+
+
+ def read_uint16(self):
+ return self.unpack("!H")
+ def write_uint16(self, n):
+ self.pack("!H", n)
+
+ def read_int16(self):
+ return self.unpack("!h")
+ def write_int16(self, n):
+ self.pack("!h", n)
+
+
+ def read_uint32(self):
+ return self.unpack("!L")
+ def write_uint32(self, n):
+ self.pack("!L", n)
+
+ def read_int32(self):
+ return self.unpack("!l")
+ def write_int32(self, n):
+ self.pack("!l", n)
+
+ def read_float(self):
+ return self.unpack("!f")
+ def write_float(self, f):
+ self.pack("!f", f)
+
+ def read_sequence_no(self):
+ return serial(self.read_uint32())
+ def write_sequence_no(self, n):
+ self.write_uint32(n.value)
+
+
+ def read_uint64(self):
+ return self.unpack("!Q")
+ def write_uint64(self, n):
+ self.pack("!Q", n)
+
+ def read_int64(self):
+ return self.unpack("!q")
+ def write_int64(self, n):
+ self.pack("!q", n)
+
+ def read_datetime(self):
+ return timestamp(self.read_uint64())
+ def write_datetime(self, t):
+ if isinstance(t, datetime.datetime):
+ t = timestamp(t)
+ self.write_uint64(t)
+
+ def read_double(self):
+ return self.unpack("!d")
+ def write_double(self, d):
+ self.pack("!d", d)
+
+ def read_vbin8(self):
+ return self.read(self.read_uint8())
+ def write_vbin8(self, b):
+ self.write_uint8(len(b))
+ self.write(b)
+
+ def read_str8(self):
+ return self.read_vbin8().decode("utf8")
+ def write_str8(self, s):
+ self.write_vbin8(s.encode("utf8"))
+
+ def read_str16(self):
+ return self.read_vbin16().decode("utf8")
+ def write_str16(self, s):
+ self.write_vbin16(s.encode("utf8"))
+
+
+ def read_vbin16(self):
+ return self.read(self.read_uint16())
+ def write_vbin16(self, b):
+ self.write_uint16(len(b))
+ self.write(b)
+
+ def read_sequence_set(self):
+ result = RangedSet()
+ size = self.read_uint16()
+ nranges = size/8
+ while nranges > 0:
+ lower = self.read_sequence_no()
+ upper = self.read_sequence_no()
+ result.add(lower, upper)
+ nranges -= 1
+ return result
+ def write_sequence_set(self, ss):
+ size = 8*len(ss.ranges)
+ self.write_uint16(size)
+ for range in ss.ranges:
+ self.write_sequence_no(range.lower)
+ self.write_sequence_no(range.upper)
+
+ def read_vbin32(self):
+ return self.read(self.read_uint32())
+ def write_vbin32(self, b):
+ self.write_uint32(len(b))
+ self.write(b)
+
+ def write_map(self, m):
+ sc = StringCodec(self.spec)
+ if m is not None:
+ sc.write_uint32(len(m))
+ for k, v in m.items():
+ type = self.spec.encoding(v.__class__)
+ if type == None:
+ raise CodecException("no encoding for %s" % v.__class__)
+ sc.write_str8(k)
+ sc.write_uint8(type.code)
+ type.encode(sc, v)
+ self.write_vbin32(sc.encoded)
+ def read_map(self):
+ sc = StringCodec(self.spec, self.read_vbin32())
+ if not sc.encoded:
+ return None
+ count = sc.read_uint32()
+ result = {}
+ while sc.encoded:
+ k = sc.read_str8()
+ code = sc.read_uint8()
+ type = self.spec.types[code]
+ v = type.decode(sc)
+ result[k] = v
+ return result
+
+ def write_array(self, a):
+ sc = StringCodec(self.spec)
+ if a is not None:
+ if len(a) > 0:
+ type = self.spec.encoding(a[0].__class__)
+ else:
+ type = self.spec.encoding(None.__class__)
+ sc.write_uint8(type.code)
+ sc.write_uint32(len(a))
+ for o in a:
+ type.encode(sc, o)
+ self.write_vbin32(sc.encoded)
+ def read_array(self):
+ sc = StringCodec(self.spec, self.read_vbin32())
+ if not sc.encoded:
+ return None
+ type = self.spec.types[sc.read_uint8()]
+ count = sc.read_uint32()
+ result = []
+ while count > 0:
+ result.append(type.decode(sc))
+ count -= 1
+ return result
+
+ def write_list(self, l):
+ sc = StringCodec(self.spec)
+ if l is not None:
+ sc.write_uint32(len(l))
+ for o in l:
+ type = self.spec.encoding(o.__class__)
+ sc.write_uint8(type.code)
+ type.encode(sc, o)
+ self.write_vbin32(sc.encoded)
+ def read_list(self):
+ sc = StringCodec(self.spec, self.read_vbin32())
+ if not sc.encoded:
+ return None
+ count = sc.read_uint32()
+ result = []
+ while count > 0:
+ type = self.spec.types[sc.read_uint8()]
+ result.append(type.decode(sc))
+ count -= 1
+ return result
+
+ def read_struct32(self):
+ size = self.read_uint32()
+ code = self.read_uint16()
+ type = self.spec.structs[code]
+ fields = type.decode_fields(self)
+ return Struct(type, **fields)
+ def write_struct32(self, value):
+ sc = StringCodec(self.spec)
+ sc.write_uint16(value._type.code)
+ value._type.encode_fields(sc, value)
+ self.write_vbin32(sc.encoded)
+
+ def read_control(self):
+ cntrl = self.spec.controls[self.read_uint16()]
+ return Struct(cntrl, **cntrl.decode_fields(self))
+ def write_control(self, ctrl):
+ type = ctrl._type
+ self.write_uint16(type.code)
+ type.encode_fields(self, ctrl)
+
+ def read_command(self):
+ type = self.spec.commands[self.read_uint16()]
+ hdr = self.spec["session.header"].decode(self)
+ cmd = Struct(type, **type.decode_fields(self))
+ return hdr, cmd
+ def write_command(self, hdr, cmd):
+ self.write_uint16(cmd._type.code)
+ hdr._type.encode(self, hdr)
+ cmd._type.encode_fields(self, cmd)
+
+ def read_size(self, width):
+ if width > 0:
+ attr = "read_uint%d" % (width*8)
+ return getattr(self, attr)()
+
+ def write_size(self, width, n):
+ if width > 0:
+ attr = "write_uint%d" % (width*8)
+ getattr(self, attr)(n)
+
+ def read_uuid(self):
+ return self.unpack("16s")
+
+ def write_uuid(self, s):
+ self.pack("16s", s)
+
+ def read_bin128(self):
+ return self.unpack("16s")
+
+ def write_bin128(self, b):
+ self.pack("16s", b)
+
+
+
+class StringCodec(Codec):
+
+ def __init__(self, spec, encoded = ""):
+ Codec.__init__(self, spec)
+ self.encoded = encoded
+
+ def write(self, s):
+ self.encoded += s
+
+ def read(self, n):
+ result = self.encoded[:n]
+ self.encoded = self.encoded[n:]
+ return result
diff --git a/RC5/python/qpid/compat.py b/RC5/python/qpid/compat.py
new file mode 100644
index 0000000000..26f60fb8aa
--- /dev/null
+++ b/RC5/python/qpid/compat.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+try:
+ set = set
+except NameError:
+ from sets import Set as set
+
+try:
+ from socket import SHUT_RDWR
+except ImportError:
+ SHUT_RDWR = 2
diff --git a/RC5/python/qpid/connection.py b/RC5/python/qpid/connection.py
new file mode 100644
index 0000000000..4c9c02822a
--- /dev/null
+++ b/RC5/python/qpid/connection.py
@@ -0,0 +1,218 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import datatypes, session, socket
+from threading import Thread, Condition, RLock
+from util import wait, notify
+from assembler import Assembler, Segment
+from codec010 import StringCodec
+from session import Session
+from invoker import Invoker
+from spec010 import Control, Command, load
+from spec import default
+from exceptions import *
+from logging import getLogger
+import delegates
+
+class ChannelBusy(Exception): pass
+
+class ChannelsBusy(Exception): pass
+
+class SessionBusy(Exception): pass
+
+class ConnectionFailed(Exception): pass
+
+def client(*args, **kwargs):
+ return delegates.Client(*args, **kwargs)
+
+def server(*args, **kwargs):
+ return delegates.Server(*args, **kwargs)
+
+class SSLWrapper:
+
+ def __init__(self, ssl):
+ self.ssl = ssl
+
+ def recv(self, n):
+ return self.ssl.read(n)
+
+ def send(self, s):
+ return self.ssl.write(s)
+
+def sslwrap(sock):
+ if isinstance(sock, socket.SSLType):
+ return SSLWrapper(sock)
+ else:
+ return sock
+
+class Connection(Assembler):
+
+ def __init__(self, sock, spec=None, delegate=client, **args):
+ Assembler.__init__(self, sslwrap(sock))
+ if spec == None:
+ spec = load(default())
+ self.spec = spec
+ self.track = self.spec["track"]
+
+ self.lock = RLock()
+ self.attached = {}
+ self.sessions = {}
+
+ self.condition = Condition()
+ self.opened = False
+ self.failed = False
+ self.close_code = (None, "connection aborted")
+
+ self.thread = Thread(target=self.run)
+ self.thread.setDaemon(True)
+
+ self.channel_max = 65535
+
+ self.delegate = delegate(self, **args)
+
+ def attach(self, name, ch, delegate, force=False):
+ self.lock.acquire()
+ try:
+ ssn = self.attached.get(ch.id)
+ if ssn is not None:
+ if ssn.name != name:
+ raise ChannelBusy(ch, ssn)
+ else:
+ ssn = self.sessions.get(name)
+ if ssn is None:
+ ssn = Session(name, self.spec, delegate=delegate)
+ self.sessions[name] = ssn
+ elif ssn.channel is not None:
+ if force:
+ del self.attached[ssn.channel.id]
+ ssn.channel = None
+ else:
+ raise SessionBusy(ssn)
+ self.attached[ch.id] = ssn
+ ssn.channel = ch
+ ch.session = ssn
+ return ssn
+ finally:
+ self.lock.release()
+
+ def detach(self, name, ch):
+ self.lock.acquire()
+ try:
+ self.attached.pop(ch.id, None)
+ ssn = self.sessions.pop(name, None)
+ if ssn is not None:
+ ssn.channel = None
+ ssn.closed()
+ return ssn
+ finally:
+ self.lock.release()
+
+ def __channel(self):
+ # XXX: ch 0?
+ for i in xrange(self.channel_max):
+ if not self.attached.has_key(i):
+ return i
+ else:
+ raise ChannelsBusy()
+
+ def session(self, name, timeout=None, delegate=session.client):
+ self.lock.acquire()
+ try:
+ ch = Channel(self, self.__channel())
+ ssn = self.attach(name, ch, delegate)
+ ssn.channel.session_attach(name)
+ if wait(ssn.condition, lambda: ssn.channel is not None, timeout):
+ return ssn
+ else:
+ self.detach(name, ch)
+ raise Timeout()
+ finally:
+ self.lock.release()
+
+ def detach_all(self):
+ self.lock.acquire()
+ try:
+ for ssn in self.attached.values():
+ if self.close_code[0] != 200:
+ ssn.exceptions.append(self.close_code)
+ self.detach(ssn.name, ssn.channel)
+ finally:
+ self.lock.release()
+
+ def start(self, timeout=None):
+ self.delegate.start()
+ self.thread.start()
+ if not wait(self.condition, lambda: self.opened or self.failed, timeout):
+ raise Timeout()
+ if self.failed:
+ raise ConnectionFailed(*self.close_code)
+
+ def run(self):
+ # XXX: we don't really have a good way to exit this loop without
+ # getting the other end to kill the socket
+ while True:
+ try:
+ seg = self.read_segment()
+ except Closed:
+ self.detach_all()
+ break
+ self.delegate.received(seg)
+
+ def close(self, timeout=None):
+ if not self.opened: return
+ Channel(self, 0).connection_close(200)
+ if not wait(self.condition, lambda: not self.opened, timeout):
+ raise Timeout()
+ self.thread.join(timeout=timeout)
+
+ def __str__(self):
+ return "%s:%s" % self.sock.getsockname()
+
+ def __repr__(self):
+ return str(self)
+
+log = getLogger("qpid.io.ctl")
+
+class Channel(Invoker):
+
+ def __init__(self, connection, id):
+ self.connection = connection
+ self.id = id
+ self.session = None
+
+ def resolve_method(self, name):
+ inst = self.connection.spec.instructions.get(name)
+ if inst is not None and isinstance(inst, Control):
+ return self.METHOD, inst
+ else:
+ return self.ERROR, None
+
+ def invoke(self, type, args, kwargs):
+ ctl = type.new(args, kwargs)
+ sc = StringCodec(self.connection.spec)
+ sc.write_control(ctl)
+ self.connection.write_segment(Segment(True, True, type.segment_type,
+ type.track, self.id, sc.encoded))
+ log.debug("SENT %s", ctl)
+
+ def __str__(self):
+ return "%s[%s]" % (self.connection, self.id)
+
+ def __repr__(self):
+ return str(self)
diff --git a/RC5/python/qpid/connection08.py b/RC5/python/qpid/connection08.py
new file mode 100644
index 0000000000..be94a792cb
--- /dev/null
+++ b/RC5/python/qpid/connection08.py
@@ -0,0 +1,493 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A Connection class containing socket code that uses the spec metadata
+to read and write Frame objects. This could be used by a client,
+server, or even a proxy implementation.
+"""
+
+import socket, codec, logging, qpid
+from cStringIO import StringIO
+from spec import load
+from codec import EOF
+from compat import SHUT_RDWR
+
+class SockIO:
+
+ def __init__(self, sock):
+ self.sock = sock
+
+ def write(self, buf):
+# print "OUT: %r" % buf
+ self.sock.sendall(buf)
+
+ def read(self, n):
+ data = ""
+ while len(data) < n:
+ try:
+ s = self.sock.recv(n - len(data))
+ except socket.error:
+ break
+ if len(s) == 0:
+ break
+# print "IN: %r" % s
+ data += s
+ return data
+
+ def flush(self):
+ pass
+
+ def close(self):
+ self.sock.shutdown(SHUT_RDWR)
+ self.sock.close()
+
+def connect(host, port):
+ sock = socket.socket()
+ sock.connect((host, port))
+ sock.setblocking(1)
+ return SockIO(sock)
+
+def listen(host, port, predicate = lambda: True):
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, port))
+ sock.listen(5)
+ while predicate():
+ s, a = sock.accept()
+ yield SockIO(s)
+
+class Connection:
+
+ def __init__(self, io, spec):
+ self.codec = codec.Codec(io, spec)
+ self.spec = spec
+ self.FRAME_END = self.spec.constants.byname["frame_end"].id
+ self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor))
+ self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor))
+
+ def flush(self):
+ self.codec.flush()
+
+ INIT="!4s4B"
+
+ def init(self):
+ self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
+ self.spec.minor)
+
+ def tini(self):
+ self.codec.unpack(Connection.INIT)
+
+ def write_8_0(self, frame):
+ c = self.codec
+ c.encode_octet(self.spec.constants.byname[frame.type].id)
+ c.encode_short(frame.channel)
+ body = StringIO()
+ enc = codec.Codec(body, self.spec)
+ frame.encode(enc)
+ enc.flush()
+ c.encode_longstr(body.getvalue())
+ c.encode_octet(self.FRAME_END)
+
+ def read_8_0(self):
+ c = self.codec
+ type = self.spec.constants.byid[c.decode_octet()].name
+ channel = c.decode_short()
+ body = c.decode_longstr()
+ dec = codec.Codec(StringIO(body), self.spec)
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ frame.channel = channel
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
+ return frame
+
+ def write_0_9(self, frame):
+ self.write_8_0(frame)
+
+ def read_0_9(self):
+ return self.read_8_0()
+
+ def write_0_10(self, frame):
+ c = self.codec
+ flags = 0
+ if frame.bof: flags |= 0x08
+ if frame.eof: flags |= 0x04
+ if frame.bos: flags |= 0x02
+ if frame.eos: flags |= 0x01
+
+ c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1
+ c.encode_octet(self.spec.constants.byname[frame.type].id)
+ body = StringIO()
+ enc = codec.Codec(body, self.spec)
+ frame.encode(enc)
+ enc.flush()
+ frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size)
+ c.encode_short(frame_size)
+ c.encode_octet(0) # Reserved
+ c.encode_octet(frame.subchannel & 0x0f)
+ c.encode_short(frame.channel)
+ c.encode_long(0) # Reserved
+ c.write(body.getvalue())
+ c.encode_octet(self.FRAME_END)
+
+ def read_0_10(self):
+ c = self.codec
+ flags = c.decode_octet() # TODO: currently ignoring flags
+ framing_version = (flags & 0xc0) >> 6
+ if framing_version != 0:
+ raise "frame error: unknown framing version"
+ type = self.spec.constants.byid[c.decode_octet()].name
+ frame_size = c.decode_short()
+ if frame_size < 12: # TODO: Magic number (frame header size)
+ raise "frame error: frame size too small"
+ reserved1 = c.decode_octet()
+ field = c.decode_octet()
+ subchannel = field & 0x0f
+ channel = c.decode_short()
+ reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0
+ if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0:
+ raise "frame error: reserved bits not all zero"
+ body_size = frame_size - 12 # TODO: Magic number (frame header size)
+ body = c.read(body_size)
+ dec = codec.Codec(StringIO(body), self.spec)
+ try:
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ except EOF:
+ raise "truncated frame body: %r" % body
+ frame.channel = channel
+ frame.subchannel = subchannel
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
+ return frame
+
+ def write_99_0(self, frame):
+ self.write_0_10(frame)
+
+ def read_99_0(self):
+ return self.read_0_10()
+
+class Frame:
+
+ DECODERS = {}
+
+ class __metaclass__(type):
+
+ def __new__(cls, name, bases, dict):
+ for attr in ("encode", "decode", "type"):
+ if not dict.has_key(attr):
+ raise TypeError("%s must define %s" % (name, attr))
+ dict["decode"] = staticmethod(dict["decode"])
+ if dict.has_key("__init__"):
+ __init__ = dict["__init__"]
+ def init(self, *args, **kwargs):
+ args = list(args)
+ self.init(args, kwargs)
+ __init__(self, *args, **kwargs)
+ dict["__init__"] = init
+ t = type.__new__(cls, name, bases, dict)
+ if t.type != None:
+ Frame.DECODERS[t.type] = t
+ return t
+
+ type = None
+
+ def init(self, args, kwargs):
+ self.channel = kwargs.pop("channel", 0)
+ self.subchannel = kwargs.pop("subchannel", 0)
+ self.bos = True
+ self.eos = True
+ self.bof = True
+ self.eof = True
+
+ def encode(self, enc): abstract
+
+ def decode(spec, dec, size): abstract
+
+class Method(Frame):
+
+ type = "frame_method"
+
+ def __init__(self, method, args):
+ if len(args) != len(method.fields):
+ argspec = ["%s: %s" % (f.name, f.type)
+ for f in method.fields]
+ raise TypeError("%s.%s expecting (%s), got %s" %
+ (method.klass.name, method.name, ", ".join(argspec),
+ args))
+ self.method = method
+ self.method_type = method
+ self.args = args
+ self.eof = not method.content
+
+ def encode(self, c):
+ version = (c.spec.major, c.spec.minor)
+ if version == (0, 10) or version == (99, 0):
+ c.encode_octet(self.method.klass.id)
+ c.encode_octet(self.method.id)
+ else:
+ c.encode_short(self.method.klass.id)
+ c.encode_short(self.method.id)
+ for field, arg in zip(self.method.fields, self.args):
+ c.encode(field.type, arg)
+
+ def decode(spec, c, size):
+ version = (c.spec.major, c.spec.minor)
+ if version == (0, 10) or version == (99, 0):
+ klass = spec.classes.byid[c.decode_octet()]
+ meth = klass.methods.byid[c.decode_octet()]
+ else:
+ klass = spec.classes.byid[c.decode_short()]
+ meth = klass.methods.byid[c.decode_short()]
+ args = tuple([c.decode(f.type) for f in meth.fields])
+ return Method(meth, args)
+
+ def __str__(self):
+ return "[%s] %s %s" % (self.channel, self.method,
+ ", ".join([str(a) for a in self.args]))
+
+class Request(Frame):
+
+ type = "frame_request"
+
+ def __init__(self, id, response_mark, method):
+ self.id = id
+ self.response_mark = response_mark
+ self.method = method
+ self.method_type = method.method_type
+ self.args = method.args
+
+ def encode(self, enc):
+ enc.encode_longlong(self.id)
+ enc.encode_longlong(self.response_mark)
+ # reserved
+ enc.encode_long(0)
+ self.method.encode(enc)
+
+ def decode(spec, dec, size):
+ id = dec.decode_longlong()
+ mark = dec.decode_longlong()
+ # reserved
+ dec.decode_long()
+ method = Method.decode(spec, dec, size - 20)
+ return Request(id, mark, method)
+
+ def __str__(self):
+ return "[%s] Request(%s) %s" % (self.channel, self.id, self.method)
+
+class Response(Frame):
+
+ type = "frame_response"
+
+ def __init__(self, id, request_id, batch_offset, method):
+ self.id = id
+ self.request_id = request_id
+ self.batch_offset = batch_offset
+ self.method = method
+ self.method_type = method.method_type
+ self.args = method.args
+
+ def encode(self, enc):
+ enc.encode_longlong(self.id)
+ enc.encode_longlong(self.request_id)
+ enc.encode_long(self.batch_offset)
+ self.method.encode(enc)
+
+ def decode(spec, dec, size):
+ id = dec.decode_longlong()
+ request_id = dec.decode_longlong()
+ batch_offset = dec.decode_long()
+ method = Method.decode(spec, dec, size - 20)
+ return Response(id, request_id, batch_offset, method)
+
+ def __str__(self):
+ return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method)
+
+def uses_struct_encoding(spec):
+ return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
+
+class Header(Frame):
+
+ type = "frame_header"
+
+ def __init__(self, klass, weight, size, properties):
+ self.klass = klass
+ self.weight = weight
+ self.size = size
+ self.properties = properties
+ self.eof = size == 0
+ self.bof = False
+
+ def __getitem__(self, name):
+ return self.properties[name]
+
+ def __setitem__(self, name, value):
+ self.properties[name] = value
+
+ def __delitem__(self, name):
+ del self.properties[name]
+
+ def encode(self, c):
+ if uses_struct_encoding(c.spec):
+ self.encode_structs(c)
+ else:
+ self.encode_legacy(c)
+
+ def encode_structs(self, c):
+ # XXX
+ structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type),
+ qpid.Struct(c.spec.domains.byname["message_properties"].type)]
+
+ # XXX
+ props = self.properties.copy()
+ for k in self.properties:
+ for s in structs:
+ if s.exists(k):
+ s.set(k, props.pop(k))
+ if props:
+ raise TypeError("no such property: %s" % (", ".join(props)))
+
+ # message properties store the content-length now, and weight is
+ # deprecated
+ if self.size != None:
+ structs[1].content_length = self.size
+
+ for s in structs:
+ c.encode_long_struct(s)
+
+ def encode_legacy(self, c):
+ c.encode_short(self.klass.id)
+ c.encode_short(self.weight)
+ c.encode_longlong(self.size)
+
+ # property flags
+ nprops = len(self.klass.fields)
+ flags = 0
+ for i in range(nprops):
+ f = self.klass.fields.items[i]
+ flags <<= 1
+ if self.properties.get(f.name) != None:
+ flags |= 1
+ # the last bit indicates more flags
+ if i > 0 and (i % 15) == 0:
+ flags <<= 1
+ if nprops > (i + 1):
+ flags |= 1
+ c.encode_short(flags)
+ flags = 0
+ flags <<= ((16 - (nprops % 15)) % 16)
+ c.encode_short(flags)
+
+ # properties
+ for f in self.klass.fields:
+ v = self.properties.get(f.name)
+ if v != None:
+ c.encode(f.type, v)
+
+ def decode(spec, c, size):
+ if uses_struct_encoding(spec):
+ return Header.decode_structs(spec, c, size)
+ else:
+ return Header.decode_legacy(spec, c, size)
+
+ def decode_structs(spec, c, size):
+ structs = []
+ start = c.nread
+ while c.nread - start < size:
+ structs.append(c.decode_long_struct())
+
+ # XXX
+ props = {}
+ length = None
+ for s in structs:
+ for f in s.type.fields:
+ if s.has(f.name):
+ props[f.name] = s.get(f.name)
+ if f.name == "content_length":
+ length = s.get(f.name)
+ return Header(None, 0, length, props)
+
+ decode_structs = staticmethod(decode_structs)
+
+ def decode_legacy(spec, c, size):
+ klass = spec.classes.byid[c.decode_short()]
+ weight = c.decode_short()
+ size = c.decode_longlong()
+
+ # property flags
+ bits = []
+ while True:
+ flags = c.decode_short()
+ for i in range(15, 0, -1):
+ if flags >> i & 0x1 != 0:
+ bits.append(True)
+ else:
+ bits.append(False)
+ if flags & 0x1 == 0:
+ break
+
+ # properties
+ properties = {}
+ for b, f in zip(bits, klass.fields):
+ if b:
+ # Note: decode returns a unicode u'' string but only
+ # plain '' strings can be used as keywords so we need to
+ # stringify the names.
+ properties[str(f.name)] = c.decode(f.type)
+ return Header(klass, weight, size, properties)
+
+ decode_legacy = staticmethod(decode_legacy)
+
+ def __str__(self):
+ return "%s %s %s %s" % (self.klass, self.weight, self.size,
+ self.properties)
+
+class Body(Frame):
+
+ type = "frame_body"
+
+ def __init__(self, content):
+ self.content = content
+ self.eof = True
+ self.bof = False
+
+ def encode(self, enc):
+ enc.write(self.content)
+
+ def decode(spec, dec, size):
+ return Body(dec.read(size))
+
+ def __str__(self):
+ return "Body(%r)" % self.content
+
+# TODO:
+# OOB_METHOD = "frame_oob_method"
+# OOB_HEADER = "frame_oob_header"
+# OOB_BODY = "frame_oob_body"
+# TRACE = "frame_trace"
+# HEARTBEAT = "frame_heartbeat"
diff --git a/RC5/python/qpid/content.py b/RC5/python/qpid/content.py
new file mode 100644
index 0000000000..9391f4f1a8
--- /dev/null
+++ b/RC5/python/qpid/content.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A simple python representation for AMQP content.
+"""
+
+def default(val, defval):
+ if val == None:
+ return defval
+ else:
+ return val
+
+class Content:
+
+ def __init__(self, body = "", children = None, properties = None):
+ self.body = body
+ self.children = default(children, [])
+ self.properties = default(properties, {})
+
+ def size(self):
+ return len(self.body)
+
+ def weight(self):
+ return len(self.children)
+
+ def __getitem__(self, name):
+ return self.properties[name]
+
+ def __setitem__(self, name, value):
+ self.properties[name] = value
+
+ def __delitem__(self, name):
+ del self.properties[name]
+
+ def __str__(self):
+ if self.children:
+ return "%s [%s] %s" % (self.properties,
+ ", ".join(map(str, self.children)),
+ self.body)
+ else:
+ return "%s %s" % (self.properties, self.body)
diff --git a/RC5/python/qpid/datatypes.py b/RC5/python/qpid/datatypes.py
new file mode 100644
index 0000000000..eb1f86b0b0
--- /dev/null
+++ b/RC5/python/qpid/datatypes.py
@@ -0,0 +1,349 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import threading, struct, datetime, time
+
+class Struct:
+
+ def __init__(self, _type, *args, **kwargs):
+ if len(args) > len(_type.fields):
+ raise TypeError("%s() takes at most %s arguments (%s given)" %
+ (_type.name, len(_type.fields), len(args)))
+
+ self._type = _type
+
+ idx = 0
+ for field in _type.fields:
+ if idx < len(args):
+ arg = args[idx]
+ if kwargs.has_key(field.name):
+ raise TypeError("%s() got multiple values for keyword argument '%s'" %
+ (_type.name, field.name))
+ elif kwargs.has_key(field.name):
+ arg = kwargs.pop(field.name)
+ else:
+ arg = field.default()
+ setattr(self, field.name, arg)
+ idx += 1
+
+ if kwargs:
+ unexpected = kwargs.keys()[0]
+ raise TypeError("%s() got an unexpected keyword argument '%s'" %
+ (_type.name, unexpected))
+
+ def __getitem__(self, name):
+ return getattr(self, name)
+
+ def __setitem__(self, name, value):
+ if not hasattr(self, name):
+ raise AttributeError("'%s' object has no attribute '%s'" %
+ (self._type.name, name))
+ setattr(self, name, value)
+
+ def __repr__(self):
+ fields = []
+ for f in self._type.fields:
+ v = self[f.name]
+ if f.type.is_present(v):
+ fields.append("%s=%r" % (f.name, v))
+ return "%s(%s)" % (self._type.name, ", ".join(fields))
+
+class Message:
+
+ def __init__(self, *args):
+ if args:
+ self.body = args[-1]
+ else:
+ self.body = None
+ if len(args) > 1:
+ self.headers = list(args[:-1])
+ else:
+ self.headers = None
+ self.id = None
+
+ def has(self, name):
+ return self.get(name) != None
+
+ def get(self, name):
+ if self.headers:
+ for h in self.headers:
+ if h._type.name == name:
+ return h
+ return None
+
+ def set(self, header):
+ if self.headers is None:
+ self.headers = []
+ idx = 0
+ while idx < len(self.headers):
+ if self.headers[idx]._type == header._type:
+ self.headers[idx] = header
+ return
+ idx += 1
+ self.headers.append(header)
+
+ def clear(self, name):
+ idx = 0
+ while idx < len(self.headers):
+ if self.headers[idx]._type.name == name:
+ del self.headers[idx]
+ return
+ idx += 1
+
+ def __repr__(self):
+ args = []
+ if self.headers:
+ args.extend(map(repr, self.headers))
+ if self.body:
+ args.append(repr(self.body))
+ if self.id is not None:
+ args.append("id=%s" % self.id)
+ return "Message(%s)" % ", ".join(args)
+
+def serial(o):
+ if isinstance(o, Serial):
+ return o
+ else:
+ return Serial(o)
+
+class Serial:
+
+ def __init__(self, value):
+ self.value = value & 0xFFFFFFFF
+
+ def __hash__(self):
+ return hash(self.value)
+
+ def __cmp__(self, other):
+ if other is None:
+ return 1
+
+ other = serial(other)
+
+ delta = (self.value - other.value) & 0xFFFFFFFF
+ neg = delta & 0x80000000
+ mag = delta & 0x7FFFFFFF
+
+ if neg:
+ return -mag
+ else:
+ return mag
+
+ def __add__(self, other):
+ return Serial(self.value + other)
+
+ def __sub__(self, other):
+ return Serial(self.value - other)
+
+ def __repr__(self):
+ return "serial(%s)" % self.value
+
+ def __str__(self):
+ return str(self.value)
+
+class Range:
+
+ def __init__(self, lower, upper = None):
+ self.lower = serial(lower)
+ if upper is None:
+ self.upper = self.lower
+ else:
+ self.upper = serial(upper)
+
+ def __contains__(self, n):
+ return self.lower <= n and n <= self.upper
+
+ def __iter__(self):
+ i = self.lower
+ while i <= self.upper:
+ yield i
+ i += 1
+
+ def touches(self, r):
+ # XXX: are we doing more checks than we need?
+ return (self.lower - 1 in r or
+ self.upper + 1 in r or
+ r.lower - 1 in self or
+ r.upper + 1 in self or
+ self.lower in r or
+ self.upper in r or
+ r.lower in self or
+ r.upper in self)
+
+ def span(self, r):
+ return Range(min(self.lower, r.lower), max(self.upper, r.upper))
+
+ def intersect(self, r):
+ lower = max(self.lower, r.lower)
+ upper = min(self.upper, r.upper)
+ if lower > upper:
+ return None
+ else:
+ return Range(lower, upper)
+
+ def __repr__(self):
+ return "%s-%s" % (self.lower, self.upper)
+
+class RangedSet:
+
+ def __init__(self, *args):
+ self.ranges = []
+ for n in args:
+ self.add(n)
+
+ def __contains__(self, n):
+ for r in self.ranges:
+ if n in r:
+ return True
+ return False
+
+ def add_range(self, range):
+ idx = 0
+ while idx < len(self.ranges):
+ r = self.ranges[idx]
+ if range.touches(r):
+ del self.ranges[idx]
+ range = range.span(r)
+ elif range.upper < r.lower:
+ self.ranges.insert(idx, range)
+ return
+ else:
+ idx += 1
+ self.ranges.append(range)
+
+ def add(self, lower, upper = None):
+ self.add_range(Range(lower, upper))
+
+ def __iter__(self):
+ return iter(self.ranges)
+
+ def __repr__(self):
+ return str(self.ranges)
+
+class Future:
+ def __init__(self, initial=None, exception=Exception):
+ self.value = initial
+ self._error = None
+ self._set = threading.Event()
+ self.exception = exception
+
+ def error(self, error):
+ self._error = error
+ self._set.set()
+
+ def set(self, value):
+ self.value = value
+ self._set.set()
+
+ def get(self, timeout=None):
+ self._set.wait(timeout)
+ if self._error != None:
+ raise self.exception(self._error)
+ return self.value
+
+ def is_set(self):
+ return self._set.isSet()
+
+try:
+ import uuid
+ def random_uuid():
+ return uuid.uuid4().get_bytes()
+except ImportError:
+ import random
+ def random_uuid():
+ bytes = [random.randint(0, 255) for i in xrange(16)]
+
+ # From RFC4122, the version bits are set to 0100
+ bytes[7] &= 0x0F
+ bytes[7] |= 0x40
+
+ # From RFC4122, the top two bits of byte 8 get set to 01
+ bytes[8] &= 0x3F
+ bytes[8] |= 0x80
+ return "".join(map(chr, bytes))
+
+def uuid4():
+ return UUID(random_uuid())
+
+class UUID:
+
+ def __init__(self, bytes):
+ self.bytes = bytes
+
+ def __cmp__(self, other):
+ if isinstance(other, UUID):
+ return cmp(self.bytes, other.bytes)
+ raise NotImplemented()
+
+ def __str__(self):
+ return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
+
+ def __repr__(self):
+ return "UUID(%r)" % str(self)
+
+ def __hash__(self):
+ return self.bytes.__hash__()
+
+class timestamp(float):
+
+ def __new__(cls, obj=None):
+ if obj is None:
+ obj = time.time()
+ elif isinstance(obj, datetime.datetime):
+ obj = time.mktime(obj.timetuple()) + 1e-6 * obj.microsecond
+ return super(timestamp, cls).__new__(cls, obj)
+
+ def datetime(self):
+ return datetime.datetime.fromtimestamp(self)
+
+ def __add__(self, other):
+ if isinstance(other, datetime.timedelta):
+ return timestamp(self.datetime() + other)
+ else:
+ return timestamp(float(self) + other)
+
+ def __sub__(self, other):
+ if isinstance(other, datetime.timedelta):
+ return timestamp(self.datetime() - other)
+ else:
+ return timestamp(float(self) - other)
+
+ def __radd__(self, other):
+ if isinstance(other, datetime.timedelta):
+ return timestamp(self.datetime() + other)
+ else:
+ return timestamp(other + float(self))
+
+ def __rsub__(self, other):
+ if isinstance(other, datetime.timedelta):
+ return timestamp(self.datetime() - other)
+ else:
+ return timestamp(other - float(self))
+
+ def __neg__(self):
+ return timestamp(-float(self))
+
+ def __pos__(self):
+ return self
+
+ def __abs__(self):
+ return timestamp(abs(float(self)))
+
+ def __repr__(self):
+ return "timestamp(%r)" % float(self)
diff --git a/RC5/python/qpid/delegate.py b/RC5/python/qpid/delegate.py
new file mode 100644
index 0000000000..b447c4aa29
--- /dev/null
+++ b/RC5/python/qpid/delegate.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Delegate implementation intended for use with the peer module.
+"""
+
+import threading, inspect, traceback, sys
+from connection08 import Method, Request, Response
+
+def _handler_name(method):
+ return "%s_%s" % (method.klass.name, method.name)
+
+class Delegate:
+
+ def __init__(self):
+ self.handlers = {}
+ self.invokers = {}
+
+ def __call__(self, channel, frame):
+ method = frame.method
+
+ try:
+ handler = self.handlers[method]
+ except KeyError:
+ name = _handler_name(method)
+ handler = getattr(self, name)
+ self.handlers[method] = handler
+
+ try:
+ return handler(channel, frame)
+ except:
+ print >> sys.stderr, "Error in handler: %s\n\n%s" % \
+ (_handler_name(method), traceback.format_exc())
+
+ def closed(self, reason):
+ print "Connection closed: %s" % reason
diff --git a/RC5/python/qpid/delegates.py b/RC5/python/qpid/delegates.py
new file mode 100644
index 0000000000..bf26553dda
--- /dev/null
+++ b/RC5/python/qpid/delegates.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, connection, session
+from util import notify
+from datatypes import RangedSet
+from logging import getLogger
+
+log = getLogger("qpid.io.ctl")
+
+class Delegate:
+
+ def __init__(self, connection, delegate=session.client):
+ self.connection = connection
+ self.spec = connection.spec
+ self.delegate = delegate
+ self.control = self.spec["track.control"].value
+
+ def received(self, seg):
+ ssn = self.connection.attached.get(seg.channel)
+ if ssn is None:
+ ch = connection.Channel(self.connection, seg.channel)
+ else:
+ ch = ssn.channel
+
+ if seg.track == self.control:
+ ctl = seg.decode(self.spec)
+ log.debug("RECV %s", ctl)
+ attr = ctl._type.qname.replace(".", "_")
+ getattr(self, attr)(ch, ctl)
+ elif ssn is None:
+ ch.session_detached()
+ else:
+ ssn.received(seg)
+
+ def connection_close(self, ch, close):
+ self.connection.close_code = (close.reply_code, close.reply_text)
+ ch.connection_close_ok()
+ self.connection.sock.close()
+ if not self.connection.opened:
+ self.connection.failed = True
+ notify(self.connection.condition)
+
+ def connection_close_ok(self, ch, close_ok):
+ self.connection.opened = False
+ notify(self.connection.condition)
+
+ def session_attach(self, ch, a):
+ try:
+ self.connection.attach(a.name, ch, self.delegate, a.force)
+ ch.session_attached(a.name)
+ except connection.ChannelBusy:
+ ch.session_detached(a.name)
+ except connection.SessionBusy:
+ ch.session_detached(a.name)
+
+ def session_attached(self, ch, a):
+ notify(ch.session.condition)
+
+ def session_detach(self, ch, d):
+ #send back the confirmation of detachment before removing the
+ #channel from the attached set; this avoids needing to hold the
+ #connection lock during the sending of this control and ensures
+ #that if the channel is immediately reused for a new session the
+ #attach request will follow the detached notification.
+ ch.session_detached(d.name)
+ ssn = self.connection.detach(d.name, ch)
+
+ def session_detached(self, ch, d):
+ self.connection.detach(d.name, ch)
+
+ def session_request_timeout(self, ch, rt):
+ ch.session_timeout(rt.timeout);
+
+ def session_command_point(self, ch, cp):
+ ssn = ch.session
+ ssn.receiver.next_id = cp.command_id
+ ssn.receiver.next_offset = cp.command_offset
+
+ def session_completed(self, ch, cmp):
+ ch.session.sender.completed(cmp.commands)
+ if cmp.timely_reply:
+ ch.session_known_completed(cmp.commands)
+ notify(ch.session.condition)
+
+ def session_known_completed(self, ch, kn_cmp):
+ ch.session.receiver.known_completed(kn_cmp.commands)
+
+ def session_flush(self, ch, f):
+ rcv = ch.session.receiver
+ if f.expected:
+ if rcv.next_id == None:
+ exp = None
+ else:
+ exp = RangedSet(rcv.next_id)
+ ch.session_expected(exp)
+ if f.confirmed:
+ ch.session_confirmed(rcv._completed)
+ if f.completed:
+ ch.session_completed(rcv._completed)
+
+class Server(Delegate):
+
+ def start(self):
+ self.connection.read_header()
+ self.connection.write_header(self.spec.major, self.spec.minor)
+ connection.Channel(self.connection, 0).connection_start(mechanisms=["ANONYMOUS"])
+
+ def connection_start_ok(self, ch, start_ok):
+ ch.connection_tune(channel_max=65535)
+
+ def connection_tune_ok(self, ch, tune_ok):
+ pass
+
+ def connection_open(self, ch, open):
+ self.connection.opened = True
+ ch.connection_open_ok()
+ notify(self.connection.condition)
+
+class Client(Delegate):
+
+ PROPERTIES = {"product": "qpid python client",
+ "version": "development",
+ "platform": os.name}
+
+ def __init__(self, connection, username="guest", password="guest", mechanism="PLAIN"):
+ Delegate.__init__(self, connection)
+ self.username = username
+ self.password = password
+ self.mechanism = mechanism
+
+ def start(self):
+ self.connection.write_header(self.spec.major, self.spec.minor)
+ self.connection.read_header()
+
+ def connection_start(self, ch, start):
+ r = "\0%s\0%s" % (self.username, self.password)
+ ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=self.mechanism, response=r)
+
+ def connection_tune(self, ch, tune):
+ ch.connection_tune_ok()
+ ch.connection_open()
+
+ def connection_open_ok(self, ch, open_ok):
+ self.connection.opened = True
+ notify(self.connection.condition)
diff --git a/RC5/python/qpid/disp.py b/RC5/python/qpid/disp.py
new file mode 100644
index 0000000000..e46cb33c60
--- /dev/null
+++ b/RC5/python/qpid/disp.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from time import strftime, gmtime
+
+class Display:
+ """ Display formatting for QPID Management CLI """
+
+ def __init__ (self):
+ self.tableSpacing = 2
+ self.tablePrefix = " "
+ self.timestampFormat = "%X"
+
+ def table (self, title, heads, rows):
+ """ Print a formatted table with autosized columns """
+ print title
+ if len (rows) == 0:
+ return
+ colWidth = []
+ col = 0
+ line = self.tablePrefix
+ for head in heads:
+ width = len (head)
+ for row in rows:
+ cellWidth = len (unicode (row[col]))
+ if cellWidth > width:
+ width = cellWidth
+ colWidth.append (width + self.tableSpacing)
+ line = line + head
+ if col < len (heads) - 1:
+ for i in range (colWidth[col] - len (head)):
+ line = line + " "
+ col = col + 1
+ print line
+ line = self.tablePrefix
+ for width in colWidth:
+ for i in range (width):
+ line = line + "="
+ print line
+
+ for row in rows:
+ line = self.tablePrefix
+ col = 0
+ for width in colWidth:
+ line = line + unicode (row[col])
+ if col < len (heads) - 1:
+ for i in range (width - len (unicode (row[col]))):
+ line = line + " "
+ col = col + 1
+ print line
+
+ def do_setTimeFormat (self, fmt):
+ """ Select timestamp format """
+ if fmt == "long":
+ self.timestampFormat = "%c"
+ elif fmt == "short":
+ self.timestampFormat = "%X"
+
+ def timestamp (self, nsec):
+ """ Format a nanosecond-since-the-epoch timestamp for printing """
+ return strftime (self.timestampFormat, gmtime (nsec / 1000000000))
diff --git a/RC5/python/qpid/exceptions.py b/RC5/python/qpid/exceptions.py
new file mode 100644
index 0000000000..7eaaf81ed4
--- /dev/null
+++ b/RC5/python/qpid/exceptions.py
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Closed(Exception): pass
+class Timeout(Exception): pass
diff --git a/RC5/python/qpid/framer.py b/RC5/python/qpid/framer.py
new file mode 100644
index 0000000000..f6363b2291
--- /dev/null
+++ b/RC5/python/qpid/framer.py
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import struct, socket
+from exceptions import Closed
+from packer import Packer
+from threading import RLock
+from logging import getLogger
+
+raw = getLogger("qpid.io.raw")
+frm = getLogger("qpid.io.frm")
+
+FIRST_SEG = 0x08
+LAST_SEG = 0x04
+FIRST_FRM = 0x02
+LAST_FRM = 0x01
+
+class Frame:
+
+ HEADER = "!2BHxBH4x"
+ MAX_PAYLOAD = 65535 - struct.calcsize(HEADER)
+
+ def __init__(self, flags, type, track, channel, payload):
+ if len(payload) > Frame.MAX_PAYLOAD:
+ raise ValueError("max payload size exceeded: %s" % len(payload))
+ self.flags = flags
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def isFirstSegment(self):
+ return bool(FIRST_SEG & self.flags)
+
+ def isLastSegment(self):
+ return bool(LAST_SEG & self.flags)
+
+ def isFirstFrame(self):
+ return bool(FIRST_FRM & self.flags)
+
+ def isLastFrame(self):
+ return bool(LAST_FRM & self.flags)
+
+ def __str__(self):
+ return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()),
+ int(self.isLastSegment()),
+ int(self.isFirstFrame()),
+ int(self.isLastFrame()),
+ self.type,
+ self.track,
+ self.channel,
+ self.payload)
+
+class FramingError(Exception): pass
+
+class Framer(Packer):
+
+ HEADER="!4s4B"
+
+ def __init__(self, sock):
+ self.sock = sock
+ self.sock_lock = RLock()
+ self._buf = ""
+
+ def aborted(self):
+ return False
+
+ def write(self, buf):
+ self._buf += buf
+
+ def flush(self):
+ self.sock_lock.acquire()
+ try:
+ self._write(self._buf)
+ self._buf = ""
+ frm.debug("FLUSHED")
+ finally:
+ self.sock_lock.release()
+
+ def _write(self, buf):
+ while buf:
+ try:
+ n = self.sock.send(buf)
+ except socket.timeout:
+ if self.aborted():
+ raise Closed()
+ else:
+ continue
+ raw.debug("SENT %r", buf[:n])
+ buf = buf[n:]
+
+ def read(self, n):
+ data = ""
+ while len(data) < n:
+ try:
+ s = self.sock.recv(n - len(data))
+ except socket.timeout:
+ if self.aborted():
+ raise Closed()
+ else:
+ continue
+ except socket.error, e:
+ if data != "":
+ raise e
+ else:
+ raise Closed()
+ if len(s) == 0:
+ raise Closed()
+ data += s
+ raw.debug("RECV %r", s)
+ return data
+
+ def read_header(self):
+ return self.unpack(Framer.HEADER)
+
+ def write_header(self, major, minor):
+ self.sock_lock.acquire()
+ try:
+ self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+ self.flush()
+ finally:
+ self.sock_lock.release()
+
+ def write_frame(self, frame):
+ self.sock_lock.acquire()
+ try:
+ size = len(frame.payload) + struct.calcsize(Frame.HEADER)
+ track = frame.track & 0x0F
+ self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
+ self.write(frame.payload)
+ if frame.isLastSegment() and frame.isLastFrame():
+ self.flush()
+ frm.debug("SENT %s", frame)
+ finally:
+ self.sock_lock.release()
+
+ def read_frame(self):
+ flags, type, size, track, channel = self.unpack(Frame.HEADER)
+ if flags & 0xF0: raise FramingError()
+ payload = self.read(size - struct.calcsize(Frame.HEADER))
+ frame = Frame(flags, type, track, channel, payload)
+ frm.debug("RECV %s", frame)
+ return frame
diff --git a/RC5/python/qpid/invoker.py b/RC5/python/qpid/invoker.py
new file mode 100644
index 0000000000..635f3ee769
--- /dev/null
+++ b/RC5/python/qpid/invoker.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+
+# TODO: need a better naming for this class now that it does the value
+# stuff
+class Invoker:
+
+ def METHOD(self, name, resolved):
+ method = lambda *args, **kwargs: self.invoke(resolved, args, kwargs)
+ if sys.version_info[:2] > (2, 3):
+ method.__name__ = resolved.pyname
+ method.__doc__ = resolved.pydoc
+ method.__module__ = self.__class__.__module__
+ self.__dict__[name] = method
+ return method
+
+ def VALUE(self, name, resolved):
+ self.__dict__[name] = resolved
+ return resolved
+
+ def ERROR(self, name, resolved):
+ raise AttributeError("%s instance has no attribute '%s'" %
+ (self.__class__.__name__, name))
+
+ def resolve_method(self, name):
+ return ERROR, None
+
+ def __getattr__(self, name):
+ disp, resolved = self.resolve_method(name)
+ return disp(name, resolved)
diff --git a/RC5/python/qpid/log.py b/RC5/python/qpid/log.py
new file mode 100644
index 0000000000..1fd7d74136
--- /dev/null
+++ b/RC5/python/qpid/log.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from logging import getLogger, StreamHandler, Formatter
+from logging import DEBUG, INFO, WARN, ERROR, CRITICAL
+
+def enable(name=None, level=WARN, file=None):
+ log = getLogger(name)
+ handler = StreamHandler(file)
+ handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s"))
+ log.addHandler(handler)
+ log.setLevel(level)
diff --git a/RC5/python/qpid/management.py b/RC5/python/qpid/management.py
new file mode 100644
index 0000000000..477f3e8f2b
--- /dev/null
+++ b/RC5/python/qpid/management.py
@@ -0,0 +1,913 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###############################################################################
+## This file is being obsoleted by qmf/console.py
+###############################################################################
+
+"""
+Management API for Qpid
+"""
+
+import qpid
+import struct
+import socket
+from threading import Thread
+from datatypes import Message, RangedSet
+from time import time
+from cStringIO import StringIO
+from codec010 import StringCodec as Codec
+from threading import Lock, Condition
+
+
+class SequenceManager:
+ """ Manage sequence numbers for asynchronous method calls """
+ def __init__ (self):
+ self.lock = Lock ()
+ self.sequence = 0
+ self.pending = {}
+
+ def reserve (self, data):
+ """ Reserve a unique sequence number """
+ self.lock.acquire ()
+ result = self.sequence
+ self.sequence = self.sequence + 1
+ self.pending[result] = data
+ self.lock.release ()
+ return result
+
+ def release (self, seq):
+ """ Release a reserved sequence number """
+ data = None
+ self.lock.acquire ()
+ if seq in self.pending:
+ data = self.pending[seq]
+ del self.pending[seq]
+ self.lock.release ()
+ return data
+
+
+class mgmtObject (object):
+ """ Generic object that holds the contents of a management object with its
+ attributes set as object attributes. """
+
+ def __init__ (self, classKey, timestamps, row):
+ self.classKey = classKey
+ self.timestamps = timestamps
+ for cell in row:
+ setattr (self, cell[0], cell[1])
+
+class objectId(object):
+ """ Object that represents QMF object identifiers """
+
+ def __init__(self, codec, first=0, second=0):
+ if codec:
+ self.first = codec.read_uint64()
+ self.second = codec.read_uint64()
+ else:
+ self.first = first
+ self.second = second
+
+ def __cmp__(self, other):
+ if other == None:
+ return 1
+ if self.first < other.first:
+ return -1
+ if self.first > other.first:
+ return 1
+ if self.second < other.second:
+ return -1
+ if self.second > other.second:
+ return 1
+ return 0
+
+
+ def index(self):
+ return (self.first, self.second)
+
+ def getFlags(self):
+ return (self.first & 0xF000000000000000) >> 60
+
+ def getSequence(self):
+ return (self.first & 0x0FFF000000000000) >> 48
+
+ def getBroker(self):
+ return (self.first & 0x0000FFFFF0000000) >> 28
+
+ def getBank(self):
+ return self.first & 0x000000000FFFFFFF
+
+ def getObject(self):
+ return self.second
+
+ def isDurable(self):
+ return self.getSequence() == 0
+
+ def encode(self, codec):
+ codec.write_uint64(self.first)
+ codec.write_uint64(self.second)
+
+
+class methodResult:
+ """ Object that contains the result of a method call """
+
+ def __init__ (self, status, sText, args):
+ self.status = status
+ self.statusText = sText
+ for arg in args:
+ setattr (self, arg, args[arg])
+
+class brokerInfo:
+ """ Object that contains information about a broker and the session to it """
+
+ def __init__ (self, brokerId, sessionId):
+ self.brokerId = brokerId
+ self.sessionId = sessionId
+
+class managementChannel:
+ """ This class represents a connection to an AMQP broker. """
+
+ def __init__ (self, ssn, topicCb, replyCb, exceptionCb, cbContext, _detlife=0):
+ """ Given a channel on an established AMQP broker connection, this method
+ opens a session and performs all of the declarations and bindings needed
+ to participate in the management protocol. """
+ self.enabled = True
+ self.ssn = ssn
+ self.sessionId = ssn.name
+ self.topicName = "mgmt-%s" % self.sessionId
+ self.replyName = "repl-%s" % self.sessionId
+ self.qpidChannel = ssn
+ self.tcb = topicCb
+ self.rcb = replyCb
+ self.ecb = exceptionCb
+ self.context = cbContext
+ self.reqsOutstanding = 0
+ self.brokerInfo = None
+
+ ssn.auto_sync = False
+ ssn.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True)
+ ssn.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True)
+
+ ssn.exchange_bind (exchange="amq.direct",
+ queue=self.replyName, binding_key=self.replyName)
+ ssn.message_subscribe (queue=self.topicName, destination="tdest",
+ accept_mode=ssn.accept_mode.none,
+ acquire_mode=ssn.acquire_mode.pre_acquired)
+ ssn.message_subscribe (queue=self.replyName, destination="rdest",
+ accept_mode=ssn.accept_mode.none,
+ acquire_mode=ssn.acquire_mode.pre_acquired)
+
+ ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb)
+ ssn.incoming ("rdest").listen (self.replyCb)
+
+ ssn.message_set_flow_mode (destination="tdest", flow_mode=1)
+ ssn.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF)
+ ssn.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF)
+
+ ssn.message_set_flow_mode (destination="rdest", flow_mode=1)
+ ssn.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
+ ssn.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
+
+ def setBrokerInfo (self, data):
+ self.brokerInfo = data
+
+ def shutdown (self):
+ self.enabled = False
+ self.ssn.incoming("tdest").stop()
+ self.ssn.incoming("rdest").stop()
+
+ def topicCb (self, msg):
+ """ Receive messages via the topic queue on this channel. """
+ if self.enabled:
+ self.tcb (self, msg)
+
+ def replyCb (self, msg):
+ """ Receive messages via the reply queue on this channel. """
+ if self.enabled:
+ self.rcb (self, msg)
+
+ def exceptionCb (self, data):
+ if self.ecb != None:
+ self.ecb (self, data)
+
+ def send (self, exchange, msg):
+ if self.enabled:
+ self.qpidChannel.message_transfer (destination=exchange, message=msg)
+
+ def message (self, body, routing_key="broker"):
+ dp = self.qpidChannel.delivery_properties()
+ dp.routing_key = routing_key
+ mp = self.qpidChannel.message_properties()
+ mp.content_type = "application/octet-stream"
+ mp.reply_to = self.qpidChannel.reply_to("amq.direct", self.replyName)
+ return Message(dp, mp, body)
+
+
+class managementClient:
+ """ This class provides an API for access to management data on the AMQP
+ network. It implements the management protocol and manages the management
+ schemas as advertised by the various management agents in the network. """
+
+ CTRL_BROKER_INFO = 1
+ CTRL_SCHEMA_LOADED = 2
+ CTRL_USER = 3
+ CTRL_HEARTBEAT = 4
+
+ SYNC_TIME = 10.0
+
+ #========================================================
+ # User API - interacts with the class's user
+ #========================================================
+ def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, methodCb=None, closeCb=None):
+ self.spec = amqpSpec
+ self.ctrlCb = ctrlCb
+ self.configCb = configCb
+ self.instCb = instCb
+ self.methodCb = methodCb
+ self.closeCb = closeCb
+ self.schemaCb = None
+ self.eventCb = None
+ self.channels = []
+ self.seqMgr = SequenceManager ()
+ self.schema = {}
+ self.packages = {}
+ self.cv = Condition ()
+ self.syncInFlight = False
+ self.syncSequence = 0
+ self.syncResult = None
+
+ def schemaListener (self, schemaCb):
+ """ Optionally register a callback to receive details of the schema of
+ managed objects in the network. """
+ self.schemaCb = schemaCb
+
+ def eventListener (self, eventCb):
+ """ Optionally register a callback to receive events from managed objects
+ in the network. """
+ self.eventCb = eventCb
+
+ def addChannel (self, channel, cbContext=None):
+ """ Register a new channel. """
+ mch = managementChannel (channel, self.topicCb, self.replyCb, self.exceptCb, cbContext)
+
+ self.channels.append (mch)
+ self.incOutstanding (mch)
+ codec = Codec (self.spec)
+ self.setHeader (codec, ord ('B'))
+ msg = mch.message(codec.encoded)
+ mch.send ("qpid.management", msg)
+ return mch
+
+ def removeChannel (self, mch):
+ """ Remove a previously added channel from management. """
+ mch.shutdown ()
+ self.channels.remove (mch)
+
+ def callMethod (self, channel, userSequence, objId, className, methodName, args=None):
+ """ Invoke a method on a managed object. """
+ self.method (channel, userSequence, objId, className, methodName, args)
+
+ def getObjects (self, channel, userSequence, className, bank=0):
+ """ Request immediate content from broker """
+ codec = Codec (self.spec)
+ self.setHeader (codec, ord ('G'), userSequence)
+ ft = {}
+ ft["_class"] = className
+ codec.write_map (ft)
+ msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank)
+ channel.send ("qpid.management", msg)
+
+ def syncWaitForStable (self, channel):
+ """ Synchronous (blocking) call to wait for schema stability on a channel """
+ self.cv.acquire ()
+ if channel.reqsOutstanding == 0:
+ self.cv.release ()
+ return channel.brokerInfo
+
+ self.syncInFlight = True
+ starttime = time ()
+ while channel.reqsOutstanding != 0:
+ self.cv.wait (self.SYNC_TIME)
+ if time () - starttime > self.SYNC_TIME:
+ self.cv.release ()
+ raise RuntimeError ("Timed out waiting for response on channel")
+ self.cv.release ()
+ return channel.brokerInfo
+
+ def syncCallMethod (self, channel, objId, className, methodName, args=None):
+ """ Synchronous (blocking) method call """
+ self.cv.acquire ()
+ self.syncInFlight = True
+ self.syncResult = None
+ self.syncSequence = self.seqMgr.reserve ("sync")
+ self.cv.release ()
+ self.callMethod (channel, self.syncSequence, objId, className, methodName, args)
+ self.cv.acquire ()
+ starttime = time ()
+ while self.syncInFlight:
+ self.cv.wait (self.SYNC_TIME)
+ if time () - starttime > self.SYNC_TIME:
+ self.cv.release ()
+ raise RuntimeError ("Timed out waiting for response on channel")
+ result = self.syncResult
+ self.cv.release ()
+ return result
+
+ def syncGetObjects (self, channel, className, bank=0):
+ """ Synchronous (blocking) get call """
+ self.cv.acquire ()
+ self.syncInFlight = True
+ self.syncResult = []
+ self.syncSequence = self.seqMgr.reserve ("sync")
+ self.cv.release ()
+ self.getObjects (channel, self.syncSequence, className, bank)
+ self.cv.acquire ()
+ starttime = time ()
+ while self.syncInFlight:
+ self.cv.wait (self.SYNC_TIME)
+ if time () - starttime > self.SYNC_TIME:
+ self.cv.release ()
+ raise RuntimeError ("Timed out waiting for response on channel")
+ result = self.syncResult
+ self.cv.release ()
+ return result
+
+ #========================================================
+ # Channel API - interacts with registered channel objects
+ #========================================================
+ def topicCb (self, ch, msg):
+ """ Receive messages via the topic queue of a particular channel. """
+ codec = Codec (self.spec, msg.body)
+ while True:
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ return
+
+ if hdr[0] == 'p':
+ self.handlePackageInd (ch, codec)
+ elif hdr[0] == 'q':
+ self.handleClassInd (ch, codec)
+ elif hdr[0] == 'h':
+ self.handleHeartbeat (ch, codec)
+ elif hdr[0] == 'e':
+ self.handleEvent (ch, codec)
+ else:
+ self.parse (ch, codec, hdr[0], hdr[1])
+
+ def replyCb (self, ch, msg):
+ """ Receive messages via the reply queue of a particular channel. """
+ codec = Codec (self.spec, msg.body)
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ return
+
+ if hdr[0] == 'm':
+ self.handleMethodReply (ch, codec, hdr[1])
+ elif hdr[0] == 'z':
+ self.handleCommandComplete (ch, codec, hdr[1])
+ elif hdr[0] == 'b':
+ self.handleBrokerResponse (ch, codec)
+ elif hdr[0] == 'p':
+ self.handlePackageInd (ch, codec)
+ elif hdr[0] == 'q':
+ self.handleClassInd (ch, codec)
+ else:
+ self.parse (ch, codec, hdr[0], hdr[1])
+
+ def exceptCb (self, ch, data):
+ if self.closeCb != None:
+ self.closeCb (ch.context, data)
+
+ #========================================================
+ # Internal Functions
+ #========================================================
+ def setHeader (self, codec, opcode, seq = 0):
+ """ Compose the header of a management message. """
+ codec.write_uint8 (ord ('A'))
+ codec.write_uint8 (ord ('M'))
+ codec.write_uint8 (ord ('2'))
+ codec.write_uint8 (opcode)
+ codec.write_uint32 (seq)
+
+ def checkHeader (self, codec):
+ """ Check the header of a management message and extract the opcode and class. """
+ try:
+ octet = chr (codec.read_uint8 ())
+ if octet != 'A':
+ return None
+ octet = chr (codec.read_uint8 ())
+ if octet != 'M':
+ return None
+ octet = chr (codec.read_uint8 ())
+ if octet != '2':
+ return None
+ opcode = chr (codec.read_uint8 ())
+ seq = codec.read_uint32 ()
+ return (opcode, seq)
+ except:
+ return None
+
+ def encodeValue (self, codec, value, typecode):
+ """ Encode, into the codec, a value based on its typecode. """
+ if typecode == 1:
+ codec.write_uint8 (int (value))
+ elif typecode == 2:
+ codec.write_uint16 (int (value))
+ elif typecode == 3:
+ codec.write_uint32 (long (value))
+ elif typecode == 4:
+ codec.write_uint64 (long (value))
+ elif typecode == 5:
+ codec.write_uint8 (int (value))
+ elif typecode == 6:
+ codec.write_str8 (value)
+ elif typecode == 7:
+ codec.write_str16 (value)
+ elif typecode == 8: # ABSTIME
+ codec.write_uint64 (long (value))
+ elif typecode == 9: # DELTATIME
+ codec.write_uint64 (long (value))
+ elif typecode == 10: # REF
+ value.encode(codec)
+ elif typecode == 11: # BOOL
+ codec.write_uint8 (int (value))
+ elif typecode == 12: # FLOAT
+ codec.write_float (float (value))
+ elif typecode == 13: # DOUBLE
+ codec.write_double (float (value))
+ elif typecode == 14: # UUID
+ codec.write_uuid (value)
+ elif typecode == 15: # FTABLE
+ codec.write_map (value)
+ elif typecode == 16:
+ codec.write_int8 (int(value))
+ elif typecode == 17:
+ codec.write_int16 (int(value))
+ elif typecode == 18:
+ codec.write_int32 (int(value))
+ elif typecode == 19:
+ codec.write_int64 (int(value))
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+ def decodeValue (self, codec, typecode):
+ """ Decode, from the codec, a value based on its typecode. """
+ if typecode == 1:
+ data = codec.read_uint8 ()
+ elif typecode == 2:
+ data = codec.read_uint16 ()
+ elif typecode == 3:
+ data = codec.read_uint32 ()
+ elif typecode == 4:
+ data = codec.read_uint64 ()
+ elif typecode == 5:
+ data = codec.read_uint8 ()
+ elif typecode == 6:
+ data = codec.read_str8 ()
+ elif typecode == 7:
+ data = codec.read_str16 ()
+ elif typecode == 8: # ABSTIME
+ data = codec.read_uint64 ()
+ elif typecode == 9: # DELTATIME
+ data = codec.read_uint64 ()
+ elif typecode == 10: # REF
+ data = objectId(codec)
+ elif typecode == 11: # BOOL
+ data = codec.read_uint8 ()
+ elif typecode == 12: # FLOAT
+ data = codec.read_float ()
+ elif typecode == 13: # DOUBLE
+ data = codec.read_double ()
+ elif typecode == 14: # UUID
+ data = codec.read_uuid ()
+ elif typecode == 15: # FTABLE
+ data = codec.read_map ()
+ elif typecode == 16:
+ data = codec.read_int8 ()
+ elif typecode == 17:
+ data = codec.read_int16 ()
+ elif typecode == 18:
+ data = codec.read_int32 ()
+ elif typecode == 19:
+ data = codec.read_int64 ()
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+ return data
+
+ def incOutstanding (self, ch):
+ self.cv.acquire ()
+ ch.reqsOutstanding = ch.reqsOutstanding + 1
+ self.cv.release ()
+
+ def decOutstanding (self, ch):
+ self.cv.acquire ()
+ ch.reqsOutstanding = ch.reqsOutstanding - 1
+ if ch.reqsOutstanding == 0 and self.syncInFlight:
+ self.syncInFlight = False
+ self.cv.notify ()
+ self.cv.release ()
+
+ if ch.reqsOutstanding == 0:
+ if self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
+ ch.ssn.exchange_bind (exchange="qpid.management",
+ queue=ch.topicName, binding_key="console.#")
+ ch.ssn.exchange_bind (exchange="qpid.management",
+ queue=ch.topicName, binding_key="schema.#")
+
+
+ def handleMethodReply (self, ch, codec, sequence):
+ status = codec.read_uint32 ()
+ sText = codec.read_str16 ()
+
+ data = self.seqMgr.release (sequence)
+ if data == None:
+ return
+
+ (userSequence, classId, methodName) = data
+ args = {}
+ context = self.seqMgr.release (userSequence)
+
+ if status == 0:
+ schemaClass = self.schema[classId]
+ ms = schemaClass['M']
+ arglist = None
+ for mname in ms:
+ (mdesc, margs) = ms[mname]
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ return
+
+ for arg in arglist:
+ if arg[2].find("O") != -1:
+ args[arg[0]] = self.decodeValue (codec, arg[1])
+
+ if context == "sync" and userSequence == self.syncSequence:
+ self.cv.acquire ()
+ self.syncInFlight = False
+ self.syncResult = methodResult (status, sText, args)
+ self.cv.notify ()
+ self.cv.release ()
+ elif self.methodCb != None:
+ self.methodCb (ch.context, userSequence, status, sText, args)
+
+ def handleCommandComplete (self, ch, codec, seq):
+ code = codec.read_uint32 ()
+ text = codec.read_str8 ()
+ data = (seq, code, text)
+ context = self.seqMgr.release (seq)
+ if context == "outstanding":
+ self.decOutstanding (ch)
+ elif context == "sync" and seq == self.syncSequence:
+ self.cv.acquire ()
+ self.syncInFlight = False
+ self.cv.notify ()
+ self.cv.release ()
+ elif self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_USER, data)
+
+ def handleBrokerResponse (self, ch, codec):
+ uuid = codec.read_uuid ()
+ ch.brokerInfo = brokerInfo (uuid, ch.sessionId)
+ if self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, ch.brokerInfo)
+
+ # Send a package request
+ sendCodec = Codec (self.spec)
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('P'), seq)
+ smsg = ch.message(sendCodec.encoded)
+ ch.send ("qpid.management", smsg)
+
+ def handlePackageInd (self, ch, codec):
+ pname = codec.read_str8 ()
+ if pname not in self.packages:
+ self.packages[pname] = {}
+
+ # Send a class request
+ sendCodec = Codec (self.spec)
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('Q'), seq)
+ self.incOutstanding (ch)
+ sendCodec.write_str8 (pname)
+ smsg = ch.message(sendCodec.encoded)
+ ch.send ("qpid.management", smsg)
+
+ def handleClassInd (self, ch, codec):
+ kind = codec.read_uint8()
+ if kind != 1: # This API doesn't handle new-style events
+ return
+ pname = codec.read_str8()
+ cname = codec.read_str8()
+ hash = codec.read_bin128()
+ if pname not in self.packages:
+ return
+
+ if (cname, hash) not in self.packages[pname]:
+ # Send a schema request
+ sendCodec = Codec (self.spec)
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('S'), seq)
+ self.incOutstanding (ch)
+ sendCodec.write_str8 (pname)
+ sendCodec.write_str8 (cname)
+ sendCodec.write_bin128 (hash)
+ smsg = ch.message(sendCodec.encoded)
+ ch.send ("qpid.management", smsg)
+
+ def handleHeartbeat (self, ch, codec):
+ timestamp = codec.read_uint64()
+ if self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp)
+
+ def handleEvent (self, ch, codec):
+ if self.eventCb == None:
+ return
+ timestamp = codec.read_uint64()
+ objId = objectId(codec)
+ packageName = codec.read_str8()
+ className = codec.read_str8()
+ hash = codec.read_bin128()
+ name = codec.read_str8()
+ classKey = (packageName, className, hash)
+ if classKey not in self.schema:
+ return;
+ schemaClass = self.schema[classKey]
+ row = []
+ es = schemaClass['E']
+ arglist = None
+ for ename in es:
+ (edesc, eargs) = es[ename]
+ if ename == name:
+ arglist = eargs
+ if arglist == None:
+ return
+ for arg in arglist:
+ row.append((arg[0], self.decodeValue(codec, arg[1])))
+ self.eventCb(ch.context, classKey, objId, name, row)
+
+ def parseSchema (self, ch, codec):
+ """ Parse a received schema-description message. """
+ self.decOutstanding (ch)
+ kind = codec.read_uint8()
+ if kind != 1: # This API doesn't handle new-style events
+ return
+ packageName = codec.read_str8 ()
+ className = codec.read_str8 ()
+ hash = codec.read_bin128 ()
+ configCount = codec.read_uint16 ()
+ instCount = codec.read_uint16 ()
+ methodCount = codec.read_uint16 ()
+
+ if packageName not in self.packages:
+ return
+ if (className, hash) in self.packages[packageName]:
+ return
+
+ classKey = (packageName, className, hash)
+ if classKey in self.schema:
+ return
+
+ configs = []
+ insts = []
+ methods = {}
+
+ configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
+ insts.append (("id", 4, None, None))
+
+ for idx in range (configCount):
+ ft = codec.read_map ()
+ name = str (ft["name"])
+ type = ft["type"]
+ access = ft["access"]
+ index = ft["index"]
+ optional = ft["optional"]
+ unit = None
+ min = None
+ max = None
+ maxlen = None
+ desc = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = str (value)
+ elif key == "min":
+ min = value
+ elif key == "max":
+ max = value
+ elif key == "maxlen":
+ maxlen = value
+ elif key == "desc":
+ desc = str (value)
+
+ config = (name, type, unit, desc, access, index, min, max, maxlen, optional)
+ configs.append (config)
+
+ for idx in range (instCount):
+ ft = codec.read_map ()
+ name = str (ft["name"])
+ type = ft["type"]
+ unit = None
+ desc = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = str (value)
+ elif key == "desc":
+ desc = str (value)
+
+ inst = (name, type, unit, desc)
+ insts.append (inst)
+
+ for idx in range (methodCount):
+ ft = codec.read_map ()
+ mname = str (ft["name"])
+ argCount = ft["argCount"]
+ if "desc" in ft:
+ mdesc = str (ft["desc"])
+ else:
+ mdesc = None
+
+ args = []
+ for aidx in range (argCount):
+ ft = codec.read_map ()
+ name = str (ft["name"])
+ type = ft["type"]
+ dir = str (ft["dir"].upper ())
+ unit = None
+ min = None
+ max = None
+ maxlen = None
+ desc = None
+ default = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = str (value)
+ elif key == "min":
+ min = value
+ elif key == "max":
+ max = value
+ elif key == "maxlen":
+ maxlen = value
+ elif key == "desc":
+ desc = str (value)
+ elif key == "default":
+ default = str (value)
+
+ arg = (name, type, dir, unit, desc, min, max, maxlen, default)
+ args.append (arg)
+ methods[mname] = (mdesc, args)
+
+ schemaClass = {}
+ schemaClass['C'] = configs
+ schemaClass['I'] = insts
+ schemaClass['M'] = methods
+ self.schema[classKey] = schemaClass
+
+ if self.schemaCb != None:
+ self.schemaCb (ch.context, classKey, configs, insts, methods, {})
+
+ def parsePresenceMasks(self, codec, schemaClass):
+ """ Generate a list of not-present properties """
+ excludeList = []
+ bit = 0
+ for element in schemaClass['C'][1:]:
+ if element[9] == 1:
+ if bit == 0:
+ mask = codec.read_uint8()
+ bit = 1
+ if (mask & bit) == 0:
+ excludeList.append(element[0])
+ bit = bit * 2
+ if bit == 256:
+ bit = 0
+ return excludeList
+
+ def parseContent (self, ch, cls, codec, seq=0):
+ """ Parse a received content message. """
+ if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None:
+ return
+ if cls == 'I' and self.instCb == None:
+ return
+
+ packageName = codec.read_str8 ()
+ className = codec.read_str8 ()
+ hash = codec.read_bin128 ()
+ classKey = (packageName, className, hash)
+
+ if classKey not in self.schema:
+ return
+
+ row = []
+ timestamps = []
+
+ timestamps.append (codec.read_uint64 ()) # Current Time
+ timestamps.append (codec.read_uint64 ()) # Create Time
+ timestamps.append (codec.read_uint64 ()) # Delete Time
+ objId = objectId(codec)
+ schemaClass = self.schema[classKey]
+ if cls == 'C' or cls == 'B':
+ notPresent = self.parsePresenceMasks(codec, schemaClass)
+
+ if cls == 'C' or cls == 'B':
+ row.append(("id", objId))
+ for element in schemaClass['C'][1:]:
+ tc = element[1]
+ name = element[0]
+ if name in notPresent:
+ row.append((name, None))
+ else:
+ data = self.decodeValue(codec, tc)
+ row.append((name, data))
+
+ if cls == 'I' or cls == 'B':
+ if cls == 'I':
+ row.append(("id", objId))
+ for element in schemaClass['I'][1:]:
+ tc = element[1]
+ name = element[0]
+ data = self.decodeValue (codec, tc)
+ row.append ((name, data))
+
+ if cls == 'C' or (cls == 'B' and seq != self.syncSequence):
+ self.configCb (ch.context, classKey, row, timestamps)
+ elif cls == 'B' and seq == self.syncSequence:
+ if timestamps[2] == 0:
+ obj = mgmtObject (classKey, timestamps, row)
+ self.syncResult.append (obj)
+ elif cls == 'I':
+ self.instCb (ch.context, classKey, row, timestamps)
+
+ def parse (self, ch, codec, opcode, seq):
+ """ Parse a message received from the topic queue. """
+ if opcode == 's':
+ self.parseSchema (ch, codec)
+ elif opcode == 'c':
+ self.parseContent (ch, 'C', codec)
+ elif opcode == 'i':
+ self.parseContent (ch, 'I', codec)
+ elif opcode == 'g':
+ self.parseContent (ch, 'B', codec, seq)
+ else:
+ raise ValueError ("Unknown opcode: %c" % opcode);
+
+ def method (self, channel, userSequence, objId, classId, methodName, args):
+ """ Invoke a method on an object """
+ codec = Codec (self.spec)
+ sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
+ self.setHeader (codec, ord ('M'), sequence)
+ objId.encode(codec)
+ codec.write_str8 (classId[0])
+ codec.write_str8 (classId[1])
+ codec.write_bin128 (classId[2])
+ codec.write_str8 (methodName)
+ bank = "%d.%d" % (objId.getBroker(), objId.getBank())
+
+ # Encode args according to schema
+ if classId not in self.schema:
+ self.seqMgr.release (sequence)
+ raise ValueError ("Unknown class name: %s" % classId)
+
+ schemaClass = self.schema[classId]
+ ms = schemaClass['M']
+ arglist = None
+ for mname in ms:
+ (mdesc, margs) = ms[mname]
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ self.seqMgr.release (sequence)
+ raise ValueError ("Unknown method name: %s" % methodName)
+
+ for arg in arglist:
+ if arg[2].find("I") != -1:
+ value = arg[8] # default
+ if arg[0] in args:
+ value = args[arg[0]]
+ if value == None:
+ self.seqMgr.release (sequence)
+ raise ValueError ("Missing non-defaulted argument: %s" % arg[0])
+ self.encodeValue (codec, value, arg[1])
+
+ packageName = classId[0]
+ className = classId[1]
+ msg = channel.message(codec.encoded, "agent." + bank)
+ channel.send ("qpid.management", msg)
diff --git a/RC5/python/qpid/managementdata.py b/RC5/python/qpid/managementdata.py
new file mode 100644
index 0000000000..46c746c0f9
--- /dev/null
+++ b/RC5/python/qpid/managementdata.py
@@ -0,0 +1,753 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+###############################################################################
+## This file is being obsoleted by qmf/console.py
+###############################################################################
+
+import qpid
+import re
+import socket
+import struct
+import os
+import locale
+from qpid.management import managementChannel, managementClient
+from threading import Lock
+from disp import Display
+from shlex import split
+from qpid.connection import Connection
+from qpid.util import connect
+
+class Broker:
+ def __init__ (self, text):
+ rex = re.compile(r"""
+ # [ <user> [ / <password> ] @] <host> [ :<port> ]
+ ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
+ match = rex.match(text)
+ if not match: raise ValueError("'%s' is not a valid broker url" % (text))
+ user, password, host, port = match.groups()
+
+ self.host = socket.gethostbyname (host)
+ if port: self.port = int(port)
+ else: self.port = 5672
+ self.username = user or "guest"
+ self.password = password or "guest"
+
+ def name (self):
+ return self.host + ":" + str (self.port)
+
+class ManagementData:
+
+ #
+ # Data Structure:
+ #
+ # Please note that this data structure holds only the most recent
+ # configuration and instrumentation data for each object. It does
+ # not hold the detailed historical data that is sent from the broker.
+ # The only historical data it keeps are the high and low watermarks
+ # for hi-lo statistics.
+ #
+ # tables :== {class-key}
+ # {<obj-id>}
+ # (timestamp, config-record, inst-record)
+ # class-key :== (<package-name>, <class-name>, <class-hash>)
+ # timestamp :== (<last-interval-time>, <create-time>, <delete-time>)
+ # config-record :== [element]
+ # inst-record :== [element]
+ # element :== (<element-name>, <element-value>)
+ #
+
+ def registerObjId (self, objId):
+ if not objId.index() in self.idBackMap:
+ self.idBackMap[objId.index()] = self.nextId
+ self.idMap[self.nextId] = objId
+ self.nextId += 1
+
+ def displayObjId (self, objIdIndex):
+ if objIdIndex in self.idBackMap:
+ return self.idBackMap[objIdIndex]
+ else:
+ return 0
+
+ def rawObjId (self, displayId):
+ if displayId in self.idMap:
+ return self.idMap[displayId]
+ else:
+ return None
+
+ def displayClassName (self, cls):
+ (packageName, className, hash) = cls
+ rev = self.schema[cls][4]
+ if rev == 0:
+ suffix = ""
+ else:
+ suffix = ".%d" % rev
+ return packageName + ":" + className + suffix
+
+ def dataHandler (self, context, className, list, timestamps):
+ """ Callback for configuration and instrumentation data updates """
+ self.lock.acquire ()
+ try:
+ # If this class has not been seen before, create an empty dictionary to
+ # hold objects of this class
+ if className not in self.tables:
+ self.tables[className] = {}
+
+ # Register the ID so a more friendly presentation can be displayed
+ objId = list[0][1]
+ oidx = objId.index()
+ self.registerObjId (objId)
+
+ # If this object hasn't been seen before, create a new object record with
+ # the timestamps and empty lists for configuration and instrumentation data.
+ if oidx not in self.tables[className]:
+ self.tables[className][oidx] = (timestamps, [], [])
+
+ (unused, oldConf, oldInst) = self.tables[className][oidx]
+
+ # For config updates, simply replace old config list with the new one.
+ if context == 0: #config
+ self.tables[className][oidx] = (timestamps, list, oldInst)
+
+ # For instrumentation updates, carry the minimum and maximum values for
+ # "hi-lo" stats forward.
+ elif context == 1: #inst
+ if len (oldInst) == 0:
+ newInst = list
+ else:
+ newInst = []
+ for idx in range (len (list)):
+ (key, value) = list[idx]
+ if key.find ("High") == len (key) - 4:
+ if oldInst[idx][1] > value:
+ value = oldInst[idx][1]
+ if key.find ("Low") == len (key) - 3:
+ if oldInst[idx][1] < value:
+ value = oldInst[idx][1]
+ newInst.append ((key, value))
+ self.tables[className][oidx] = (timestamps, oldConf, newInst)
+
+ finally:
+ self.lock.release ()
+
+ def ctrlHandler (self, context, op, data):
+ if op == self.mclient.CTRL_BROKER_INFO:
+ pass
+ elif op == self.mclient.CTRL_HEARTBEAT:
+ pass
+
+ def configHandler (self, context, className, list, timestamps):
+ self.dataHandler (0, className, list, timestamps);
+
+ def instHandler (self, context, className, list, timestamps):
+ self.dataHandler (1, className, list, timestamps);
+
+ def methodReply (self, broker, sequence, status, sText, args):
+ """ Callback for method-reply messages """
+ self.lock.acquire ()
+ try:
+ line = "Call Result: " + self.methodsPending[sequence] + \
+ " " + str (status) + " (" + sText + ")"
+ print line, args
+ del self.methodsPending[sequence]
+ finally:
+ self.lock.release ()
+
+ def closeHandler (self, context, reason):
+ if self.operational:
+ print "Connection to broker lost:", reason
+ self.operational = False
+ if self.cli != None:
+ self.cli.setPromptMessage ("Broker Disconnected")
+
+ def schemaHandler (self, context, classKey, configs, insts, methods, events):
+ """ Callback for schema updates """
+ if classKey not in self.schema:
+ schemaRev = 0
+ for key in self.schema:
+ if classKey[0] == key[0] and classKey[1] == key[1]:
+ schemaRev += 1
+ self.schema[classKey] = (configs, insts, methods, events, schemaRev)
+
+ def setCli (self, cliobj):
+ self.cli = cliobj
+
+ def __init__ (self, disp, host, username="guest", password="guest"):
+ self.lock = Lock ()
+ self.tables = {}
+ self.schema = {}
+ self.bootSequence = 0
+ self.operational = False
+ self.disp = disp
+ self.cli = None
+ self.lastUnit = None
+ self.methodSeq = 1
+ self.methodsPending = {}
+ self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
+
+ self.broker = Broker (host)
+ self.conn = Connection (connect (self.broker.host, self.broker.port),
+ username=self.broker.username, password=self.broker.password)
+ self.spec = self.conn.spec
+ self.conn.start ()
+
+ self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler,
+ self.instHandler, self.methodReply, self.closeHandler)
+ self.mclient.schemaListener (self.schemaHandler)
+ self.mch = self.mclient.addChannel (self.conn.session(self.sessionId))
+ self.operational = True
+ self.idMap = {}
+ self.idBackMap = {}
+ self.nextId = 101
+
+ def close (self):
+ pass
+
+ def refName (self, oid):
+ if oid == None:
+ return "NULL"
+ return str (self.displayObjId (oid.index()))
+
+ def valueDisplay (self, classKey, key, value):
+ if value == None:
+ return "<NULL>"
+ for kind in range (2):
+ schema = self.schema[classKey][kind]
+ for item in schema:
+ if item[0] == key:
+ typecode = item[1]
+ unit = item[2]
+ if (typecode >= 1 and typecode <= 5) or typecode == 12 or typecode == 13 or \
+ (typecode >= 16 and typecode <= 19):
+ if unit == None or unit == self.lastUnit:
+ return str (value)
+ else:
+ self.lastUnit = unit
+ suffix = ""
+ if value != 1:
+ suffix = "s"
+ return str (value) + " " + unit + suffix
+ elif typecode == 6 or typecode == 7: # strings
+ return value
+ elif typecode == 8:
+ if value == 0:
+ return "--"
+ return self.disp.timestamp (value)
+ elif typecode == 9:
+ return str (value)
+ elif typecode == 10:
+ return self.refName (value)
+ elif typecode == 11:
+ if value == 0:
+ return "False"
+ else:
+ return "True"
+ elif typecode == 14:
+ return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", value)
+ elif typecode == 15:
+ return str (value)
+ return "*type-error*"
+
+ def getObjIndex (self, classKey, config):
+ """ Concatenate the values from index columns to form a unique object name """
+ result = ""
+ schemaConfig = self.schema[classKey][0]
+ for item in schemaConfig:
+ if item[5] == 1 and item[0] != "id":
+ if result != "":
+ result = result + "."
+ for key,val in config:
+ if key == item[0]:
+ result = result + self.valueDisplay (classKey, key, val)
+ return result
+
+ def getClassKey (self, className):
+ delimPos = className.find(":")
+ if delimPos == -1:
+ schemaRev = 0
+ delim = className.find(".")
+ if delim != -1:
+ schemaRev = int(className[delim + 1:])
+ name = className[0:delim]
+ else:
+ name = className
+ for key in self.schema:
+ if key[1] == name and self.schema[key][4] == schemaRev:
+ return key
+ else:
+ package = className[0:delimPos]
+ name = className[delimPos + 1:]
+ schemaRev = 0
+ delim = name.find(".")
+ if delim != -1:
+ schemaRev = int(name[delim + 1:])
+ name = name[0:delim]
+ for key in self.schema:
+ if key[0] == package and key[1] == name:
+ if self.schema[key][4] == schemaRev:
+ return key
+ return None
+
+ def classCompletions (self, prefix):
+ """ Provide a list of candidate class names for command completion """
+ self.lock.acquire ()
+ complist = []
+ try:
+ for name in self.tables:
+ if name.find (prefix) == 0:
+ complist.append (name)
+ finally:
+ self.lock.release ()
+ return complist
+
+ def typeName (self, typecode):
+ """ Convert type-codes to printable strings """
+ if typecode == 1:
+ return "uint8"
+ elif typecode == 2:
+ return "uint16"
+ elif typecode == 3:
+ return "uint32"
+ elif typecode == 4:
+ return "uint64"
+ elif typecode == 5:
+ return "bool"
+ elif typecode == 6:
+ return "short-string"
+ elif typecode == 7:
+ return "long-string"
+ elif typecode == 8:
+ return "abs-time"
+ elif typecode == 9:
+ return "delta-time"
+ elif typecode == 10:
+ return "reference"
+ elif typecode == 11:
+ return "boolean"
+ elif typecode == 12:
+ return "float"
+ elif typecode == 13:
+ return "double"
+ elif typecode == 14:
+ return "uuid"
+ elif typecode == 15:
+ return "field-table"
+ elif typecode == 16:
+ return "int8"
+ elif typecode == 17:
+ return "int16"
+ elif typecode == 18:
+ return "int32"
+ elif typecode == 19:
+ return "int64"
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+ def accessName (self, code):
+ """ Convert element access codes to printable strings """
+ if code == 1:
+ return "ReadCreate"
+ elif code == 2:
+ return "ReadWrite"
+ elif code == 3:
+ return "ReadOnly"
+ else:
+ raise ValueError ("Invalid access code: %d" %code)
+
+ def notNone (self, text):
+ if text == None:
+ return ""
+ else:
+ return text
+
+ def isOid (self, id):
+ for char in str (id):
+ if not char.isdigit () and not char == '-':
+ return False
+ return True
+
+ def listOfIds (self, classKey, tokens):
+ """ Generate a tuple of object ids for a classname based on command tokens. """
+ list = []
+ if len(tokens) == 0 or tokens[0] == "all":
+ for id in self.tables[classKey]:
+ list.append (self.displayObjId (id))
+
+ elif tokens[0] == "active":
+ for id in self.tables[classKey]:
+ if self.tables[classKey][id][0][2] == 0:
+ list.append (self.displayObjId (id))
+
+ else:
+ for token in tokens:
+ if self.isOid (token):
+ if token.find ("-") != -1:
+ ids = token.split("-", 2)
+ for id in range (int (ids[0]), int (ids[1]) + 1):
+ if self.getClassForId (self.rawObjId (long (id))) == classKey:
+ list.append (id)
+ else:
+ list.append (int(token))
+
+ list.sort ()
+ result = ()
+ for item in list:
+ result = result + (item,)
+ return result
+
+ def listClasses (self):
+ """ Generate a display of the list of classes """
+ self.lock.acquire ()
+ try:
+ rows = []
+ sorted = self.tables.keys ()
+ sorted.sort ()
+ for name in sorted:
+ active = 0
+ deleted = 0
+ for record in self.tables[name]:
+ isdel = False
+ ts = self.tables[name][record][0]
+ if ts[2] > 0:
+ isdel = True
+ if isdel:
+ deleted = deleted + 1
+ else:
+ active = active + 1
+ rows.append ((self.displayClassName (name), active, deleted))
+ if len (rows) != 0:
+ self.disp.table ("Management Object Types:",
+ ("ObjectType", "Active", "Deleted"), rows)
+ else:
+ print "Waiting for next periodic update"
+ finally:
+ self.lock.release ()
+
+ def listObjects (self, tokens):
+ """ Generate a display of a list of objects in a class """
+ if len(tokens) == 0:
+ print "Error - No class name provided"
+ return
+
+ self.lock.acquire ()
+ try:
+ classKey = self.getClassKey (tokens[0])
+ if classKey == None:
+ print ("Object type %s not known" % tokens[0])
+ else:
+ rows = []
+ if classKey in self.tables:
+ ids = self.listOfIds(classKey, tokens[1:])
+ for objId in ids:
+ (ts, config, inst) = self.tables[classKey][self.rawObjId(objId).index()]
+ createTime = self.disp.timestamp (ts[1])
+ destroyTime = "-"
+ if ts[2] > 0:
+ destroyTime = self.disp.timestamp (ts[2])
+ objIndex = self.getObjIndex (classKey, config)
+ row = (objId, createTime, destroyTime, objIndex)
+ rows.append (row)
+ self.disp.table ("Objects of type %s" % self.displayClassName(classKey),
+ ("ID", "Created", "Destroyed", "Index"),
+ rows)
+ finally:
+ self.lock.release ()
+
+ def showObjects (self, tokens):
+ """ Generate a display of object data for a particular class """
+ self.lock.acquire ()
+ try:
+ self.lastUnit = None
+ if self.isOid (tokens[0]):
+ if tokens[0].find ("-") != -1:
+ rootId = int (tokens[0][0:tokens[0].find ("-")])
+ else:
+ rootId = int (tokens[0])
+
+ classKey = self.getClassForId (self.rawObjId (rootId))
+ remaining = tokens
+ if classKey == None:
+ print "Id not known: %d" % int (tokens[0])
+ raise ValueError ()
+ else:
+ classKey = self.getClassKey (tokens[0])
+ remaining = tokens[1:]
+ if classKey not in self.tables:
+ print "Class not known: %s" % tokens[0]
+ raise ValueError ()
+
+ userIds = self.listOfIds (classKey, remaining)
+ if len (userIds) == 0:
+ print "No object IDs supplied"
+ raise ValueError ()
+
+ ids = []
+ for id in userIds:
+ if self.getClassForId (self.rawObjId (long (id))) == classKey:
+ ids.append (self.rawObjId (long (id)))
+
+ rows = []
+ timestamp = None
+ config = self.tables[classKey][ids[0].index()][1]
+ for eIdx in range (len (config)):
+ key = config[eIdx][0]
+ if key != "id":
+ row = ("property", key)
+ for id in ids:
+ if timestamp == None or \
+ timestamp < self.tables[classKey][id.index()][0][0]:
+ timestamp = self.tables[classKey][id.index()][0][0]
+ (key, value) = self.tables[classKey][id.index()][1][eIdx]
+ row = row + (self.valueDisplay (classKey, key, value),)
+ rows.append (row)
+
+ inst = self.tables[classKey][ids[0].index()][2]
+ for eIdx in range (len (inst)):
+ key = inst[eIdx][0]
+ if key != "id":
+ row = ("statistic", key)
+ for id in ids:
+ (key, value) = self.tables[classKey][id.index()][2][eIdx]
+ row = row + (self.valueDisplay (classKey, key, value),)
+ rows.append (row)
+
+ titleRow = ("Type", "Element")
+ for id in ids:
+ titleRow = titleRow + (self.refName(id),)
+ caption = "Object of type %s:" % self.displayClassName(classKey)
+ if timestamp != None:
+ caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")"
+ self.disp.table (caption, titleRow, rows)
+
+ except:
+ pass
+ self.lock.release ()
+
+ def schemaSummary (self):
+ """ Generate a display of the list of classes in the schema """
+ self.lock.acquire ()
+ try:
+ rows = []
+ sorted = self.schema.keys ()
+ sorted.sort ()
+ for classKey in sorted:
+ tuple = self.schema[classKey]
+ row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]),
+ len (tuple[2]))
+ rows.append (row)
+ self.disp.table ("Classes in Schema:",
+ ("Class", "Properties", "Statistics", "Methods"),
+ rows)
+ finally:
+ self.lock.release ()
+
+ def schemaTable (self, className):
+ """ Generate a display of details of the schema of a particular class """
+ self.lock.acquire ()
+ try:
+ classKey = self.getClassKey (className)
+ if classKey == None:
+ print ("Class name %s not known" % className)
+ raise ValueError ()
+
+ rows = []
+ schemaRev = self.schema[classKey][4]
+ for config in self.schema[classKey][0]:
+ name = config[0]
+ if name != "id":
+ typename = self.typeName(config[1])
+ unit = self.notNone (config[2])
+ desc = self.notNone (config[3])
+ access = self.accessName (config[4])
+ extra = ""
+ if config[5] == 1:
+ extra += "index "
+ if config[6] != None:
+ extra += "Min: " + str(config[6]) + " "
+ if config[7] != None:
+ extra += "Max: " + str(config[7]) + " "
+ if config[8] != None:
+ extra += "MaxLen: " + str(config[8]) + " "
+ if config[9] == 1:
+ extra += "optional "
+ rows.append ((name, typename, unit, access, extra, desc))
+
+ for config in self.schema[classKey][1]:
+ name = config[0]
+ if name != "id":
+ typename = self.typeName(config[1])
+ unit = self.notNone (config[2])
+ desc = self.notNone (config[3])
+ rows.append ((name, typename, unit, "", "", desc))
+
+ titles = ("Element", "Type", "Unit", "Access", "Notes", "Description")
+ self.disp.table ("Schema for class '%s':" % self.displayClassName(classKey), titles, rows)
+
+ for mname in self.schema[classKey][2]:
+ (mdesc, args) = self.schema[classKey][2][mname]
+ caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc))
+ rows = []
+ for arg in args:
+ name = arg[0]
+ typename = self.typeName (arg[1])
+ dir = arg[2]
+ unit = self.notNone (arg[3])
+ desc = self.notNone (arg[4])
+ extra = ""
+ if arg[5] != None:
+ extra = extra + "Min: " + str (arg[5])
+ if arg[6] != None:
+ extra = extra + "Max: " + str (arg[6])
+ if arg[7] != None:
+ extra = extra + "MaxLen: " + str (arg[7])
+ if arg[8] != None:
+ extra = extra + "Default: " + str (arg[8])
+ rows.append ((name, typename, dir, unit, extra, desc))
+ titles = ("Argument", "Type", "Direction", "Unit", "Notes", "Description")
+ self.disp.table (caption, titles, rows)
+
+ except Exception,e:
+ pass
+ self.lock.release ()
+
+ def getClassForId (self, objId):
+ """ Given an object ID, return the class key for the referenced object """
+ for classKey in self.tables:
+ if objId.index() in self.tables[classKey]:
+ return classKey
+ return None
+
+ def callMethod (self, userOid, methodName, args):
+ self.lock.acquire ()
+ methodOk = True
+ try:
+ classKey = self.getClassForId (self.rawObjId (userOid))
+ if classKey == None:
+ raise ValueError ()
+
+ if methodName not in self.schema[classKey][2]:
+ print "Method '%s' not valid for class '%s'" % (methodName, self.displayClassName(classKey))
+ raise ValueError ()
+
+ schemaMethod = self.schema[classKey][2][methodName]
+ count = 0
+ for arg in range(len(schemaMethod[1])):
+ if schemaMethod[1][arg][2].find("I") != -1:
+ count += 1
+ if len (args) != count:
+ print "Wrong number of method args: Need %d, Got %d" % (count, len (args))
+ raise ValueError ()
+
+ namedArgs = {}
+ idx = 0
+ for arg in range(len(schemaMethod[1])):
+ if schemaMethod[1][arg][2].find("I") != -1:
+ namedArgs[schemaMethod[1][arg][0]] = args[idx]
+ idx += 1
+
+ self.methodSeq = self.methodSeq + 1
+ self.methodsPending[self.methodSeq] = methodName
+ except Exception, e:
+ methodOk = False
+ self.lock.release ()
+ if methodOk:
+# try:
+ self.mclient.callMethod (self.mch, self.methodSeq, self.rawObjId (userOid), classKey,
+ methodName, namedArgs)
+# except ValueError, e:
+# print "Error invoking method:", e
+
+ def makeIdRow (self, displayId):
+ if displayId in self.idMap:
+ objId = self.idMap[displayId]
+ else:
+ return None
+ if objId.getFlags() == 0:
+ flags = ""
+ else:
+ flags = str(objId.getFlags())
+ seq = objId.getSequence()
+ if seq == 0:
+ seqText = "<durable>"
+ else:
+ seqText = str(seq)
+ return (displayId, flags, seqText, objId.getBroker(), objId.getBank(), hex(objId.getObject()))
+
+ def listIds (self, select):
+ rows = []
+ if select == 0:
+ sorted = self.idMap.keys()
+ sorted.sort()
+ for displayId in sorted:
+ row = self.makeIdRow (displayId)
+ rows.append(row)
+ else:
+ row = self.makeIdRow (select)
+ if row == None:
+ print "Display Id %d not known" % select
+ return
+ rows.append(row)
+ self.disp.table("Translation of Display IDs:",
+ ("DisplayID", "Flags", "BootSequence", "Broker", "Bank", "Object"),
+ rows)
+
+ def do_list (self, data):
+ tokens = data.split ()
+ if len (tokens) == 0:
+ self.listClasses ()
+ else:
+ self.listObjects (tokens)
+
+ def do_show (self, data):
+ tokens = data.split ()
+ self.showObjects (tokens)
+
+ def do_schema (self, data):
+ if data == "":
+ self.schemaSummary ()
+ else:
+ self.schemaTable (data)
+
+ def do_call (self, data):
+ encTokens = data.split ()
+ try:
+ tokens = [a.decode(locale.getpreferredencoding()) for a in encArgs]
+ except:
+ tokens = encTokens
+ if len (tokens) < 2:
+ print "Not enough arguments supplied"
+ return
+
+ displayId = long (tokens[0])
+ methodName = tokens[1]
+ args = tokens[2:]
+ self.callMethod (displayId, methodName, args)
+
+ def do_id (self, data):
+ if data == "":
+ select = 0
+ else:
+ select = int(data)
+ self.listIds(select)
+
+ def do_exit (self):
+ self.mclient.removeChannel (self.mch)
diff --git a/RC5/python/qpid/message.py b/RC5/python/qpid/message.py
new file mode 100644
index 0000000000..eb3ef5c03c
--- /dev/null
+++ b/RC5/python/qpid/message.py
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from connection08 import Method, Request
+from sets import Set
+
+class Message:
+
+ def __init__(self, channel, frame, content = None):
+ self.channel = channel
+ self.frame = frame
+ self.method = frame.method_type
+ self.content = content
+ if self.method.is_l4_command():
+ self.command_id = self.channel.incoming_completion.sequence.next()
+ #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
+
+ def __len__(self):
+ return len(self.frame.args)
+
+ def _idx(self, idx):
+ if idx < 0: idx += len(self)
+ if idx < 0 or idx > len(self):
+ raise IndexError(idx)
+ return idx
+
+ def __getitem__(self, idx):
+ return self.frame.args[idx]
+
+ def __getattr__(self, attr):
+ fields = self.method.fields.byname
+ if fields.has_key(attr):
+ f = fields[attr]
+ result = self[self.method.fields.index(f)]
+ else:
+ for r in self.method.responses:
+ if attr == r.name:
+ def respond(*args, **kwargs):
+ batch=0
+ if kwargs.has_key("batchoffset"):
+ batch=kwargs.pop("batchoffset")
+ self.channel.respond(Method(r, r.arguments(*args, **kwargs)), batch, self.frame)
+ result = respond
+ break
+ else:
+ raise AttributeError(attr)
+ return result
+
+ STR = "%s %s content = %s"
+ REPR = STR.replace("%s", "%r")
+
+ def __str__(self):
+ return Message.STR % (self.method, self.frame.args, self.content)
+
+ def __repr__(self):
+ return Message.REPR % (self.method, self.frame.args, self.content)
+
+ def complete(self, cumulative=True):
+ self.channel.incoming_completion.complete(mark=self.command_id, cumulative=cumulative)
diff --git a/RC5/python/qpid/packer.py b/RC5/python/qpid/packer.py
new file mode 100644
index 0000000000..22c16918dc
--- /dev/null
+++ b/RC5/python/qpid/packer.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import struct
+
+class Packer:
+
+ def read(self, n): abstract
+
+ def write(self, s): abstract
+
+ def unpack(self, fmt):
+ values = struct.unpack(fmt, self.read(struct.calcsize(fmt)))
+ if len(values) == 1:
+ return values[0]
+ else:
+ return values
+
+ def pack(self, fmt, *args):
+ self.write(struct.pack(fmt, *args))
diff --git a/RC5/python/qpid/peer.py b/RC5/python/qpid/peer.py
new file mode 100644
index 0000000000..648f32ceef
--- /dev/null
+++ b/RC5/python/qpid/peer.py
@@ -0,0 +1,465 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+This module contains a skeletal peer implementation useful for
+implementing an AMQP server, client, or proxy. The peer implementation
+sorts incoming frames to their intended channels, and dispatches
+incoming method frames to a delegate.
+"""
+
+import thread, threading, traceback, socket, sys, logging
+from connection08 import EOF, Method, Header, Body, Request, Response
+from message import Message
+from queue import Queue, Closed as QueueClosed
+from content import Content
+from cStringIO import StringIO
+from time import time
+
+class Sequence:
+
+ def __init__(self, start, step = 1):
+ # we should keep start for wrap around
+ self._next = start
+ self.step = step
+ self.lock = thread.allocate_lock()
+
+ def next(self):
+ self.lock.acquire()
+ try:
+ result = self._next
+ self._next += self.step
+ return result
+ finally:
+ self.lock.release()
+
+class Peer:
+
+ def __init__(self, conn, delegate, channel_factory=None):
+ self.conn = conn
+ self.delegate = delegate
+ self.outgoing = Queue(0)
+ self.work = Queue(0)
+ self.channels = {}
+ self.lock = thread.allocate_lock()
+ if channel_factory:
+ self.channel_factory = channel_factory
+ else:
+ self.channel_factory = Channel
+
+ def channel(self, id):
+ self.lock.acquire()
+ try:
+ try:
+ ch = self.channels[id]
+ except KeyError:
+ ch = self.channel_factory(id, self.outgoing, self.conn.spec)
+ self.channels[id] = ch
+ finally:
+ self.lock.release()
+ return ch
+
+ def start(self):
+ thread.start_new_thread(self.writer, ())
+ thread.start_new_thread(self.reader, ())
+ thread.start_new_thread(self.worker, ())
+
+ def fatal(self, message=None):
+ """Call when an unexpected exception occurs that will kill a thread."""
+ if message: print >> sys.stderr, message
+ self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
+
+ def reader(self):
+ try:
+ while True:
+ try:
+ frame = self.conn.read()
+ except EOF, e:
+ self.work.close()
+ break
+ ch = self.channel(frame.channel)
+ ch.receive(frame, self.work)
+ except:
+ self.fatal()
+
+ def closed(self, reason):
+ # We must close the delegate first because closing channels
+ # may wake up waiting threads and we don't want them to see
+ # the delegate as open.
+ self.delegate.closed(reason)
+ for ch in self.channels.values():
+ ch.closed(reason)
+
+ def writer(self):
+ try:
+ while True:
+ try:
+ message = self.outgoing.get()
+ self.conn.write(message)
+ except socket.error, e:
+ self.closed(e)
+ break
+ self.conn.flush()
+ except:
+ self.fatal()
+
+ def worker(self):
+ try:
+ while True:
+ queue = self.work.get()
+ frame = queue.get()
+ channel = self.channel(frame.channel)
+ if frame.method_type.content:
+ content = read_content(queue)
+ else:
+ content = None
+
+ self.delegate(channel, Message(channel, frame, content))
+ except QueueClosed:
+ self.closed("worker closed")
+ except:
+ self.fatal()
+
+class Requester:
+
+ def __init__(self, writer):
+ self.write = writer
+ self.sequence = Sequence(1)
+ self.mark = 0
+ # request_id -> listener
+ self.outstanding = {}
+
+ def request(self, method, listener, content = None):
+ frame = Request(self.sequence.next(), self.mark, method)
+ self.outstanding[frame.id] = listener
+ self.write(frame, content)
+
+ def receive(self, channel, frame):
+ listener = self.outstanding.pop(frame.request_id)
+ listener(channel, frame)
+
+class Responder:
+
+ def __init__(self, writer):
+ self.write = writer
+ self.sequence = Sequence(1)
+
+ def respond(self, method, batch, request):
+ if isinstance(request, Method):
+ self.write(method)
+ else:
+ # allow batching from frame at either end
+ if batch<0:
+ frame = Response(self.sequence.next(), request.id+batch, -batch, method)
+ else:
+ frame = Response(self.sequence.next(), request.id, batch, method)
+ self.write(frame)
+
+class Closed(Exception): pass
+
+class Channel:
+
+ def __init__(self, id, outgoing, spec):
+ self.id = id
+ self.outgoing = outgoing
+ self.spec = spec
+ self.incoming = Queue(0)
+ self.responses = Queue(0)
+ self.queue = None
+ self._closed = False
+ self.reason = None
+
+ self.requester = Requester(self.write)
+ self.responder = Responder(self.write)
+
+ self.completion = OutgoingCompletion()
+ self.incoming_completion = IncomingCompletion(self)
+ self.futures = {}
+ self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves
+
+ self.invoker = self.invoke_method
+ self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
+ self.synchronous = True
+
+ def closed(self, reason):
+ if self._closed:
+ return
+ self._closed = True
+ self.reason = reason
+ self.incoming.close()
+ self.responses.close()
+ self.completion.close()
+ self.incoming_completion.reset()
+ for f in self.futures.values():
+ f.put_response(self, reason)
+
+ def write(self, frame, content = None):
+ if self._closed:
+ raise Closed(self.reason)
+ frame.channel = self.id
+ self.outgoing.put(frame)
+ if (isinstance(frame, (Method, Request))
+ and content == None
+ and frame.method_type.content):
+ content = Content()
+ if content != None:
+ self.write_content(frame.method_type.klass, content)
+
+ def write_content(self, klass, content):
+ header = Header(klass, content.weight(), content.size(), content.properties)
+ self.write(header)
+ for child in content.children:
+ self.write_content(klass, child)
+ # should split up if content.body exceeds max frame size
+ if content.body:
+ self.write(Body(content.body))
+
+ def receive(self, frame, work):
+ if isinstance(frame, Method):
+ if frame.method.response:
+ self.queue = self.responses
+ else:
+ self.queue = self.incoming
+ work.put(self.incoming)
+ elif isinstance(frame, Request):
+ self.queue = self.incoming
+ work.put(self.incoming)
+ elif isinstance(frame, Response):
+ self.requester.receive(self, frame)
+ if frame.method_type.content:
+ self.queue = self.responses
+ return
+ self.queue.put(frame)
+
+ def queue_response(self, channel, frame):
+ channel.responses.put(frame.method)
+
+ def request(self, method, listener, content = None):
+ self.requester.request(method, listener, content)
+
+ def respond(self, method, batch, request):
+ self.responder.respond(method, batch, request)
+
+ def invoke(self, type, args, kwargs):
+ if (type.klass.name in ["channel", "session"]) and (type.name in ["close", "open", "closed"]):
+ self.completion.reset()
+ self.incoming_completion.reset()
+ self.completion.next_command(type)
+
+ content = kwargs.pop("content", None)
+ frame = Method(type, type.arguments(*args, **kwargs))
+ return self.invoker(frame, content)
+
+ # used for 0-9
+ def invoke_reliable(self, frame, content = None):
+ if not self.synchronous:
+ future = Future()
+ self.request(frame, future.put_response, content)
+ if not frame.method.responses: return None
+ else: return future
+
+ self.request(frame, self.queue_response, content)
+ if not frame.method.responses:
+ if self.use_execution_layer and frame.method_type.is_l4_command():
+ self.execution_sync()
+ self.completion.wait()
+ if self._closed:
+ raise Closed(self.reason)
+ return None
+ try:
+ resp = self.responses.get()
+ if resp.method_type.content:
+ return Message(self, resp, read_content(self.responses))
+ else:
+ return Message(self, resp)
+ except QueueClosed, e:
+ if self._closed:
+ raise Closed(self.reason)
+ else:
+ raise e
+
+ # used for 0-8 and 0-10
+ def invoke_method(self, frame, content = None):
+ if frame.method.result:
+ cmd_id = self.completion.command_id
+ future = Future()
+ self.futures[cmd_id] = future
+
+ self.write(frame, content)
+
+ try:
+ # here we depend on all nowait fields being named nowait
+ f = frame.method.fields.byname["nowait"]
+ nowait = frame.args[frame.method.fields.index(f)]
+ except KeyError:
+ nowait = False
+
+ try:
+ if not nowait and frame.method.responses:
+ resp = self.responses.get()
+ if resp.method.content:
+ content = read_content(self.responses)
+ else:
+ content = None
+ if resp.method in frame.method.responses:
+ return Message(self, resp, content)
+ else:
+ raise ValueError(resp)
+ elif frame.method.result:
+ if self.synchronous:
+ fr = future.get_response(timeout=10)
+ if self._closed:
+ raise Closed(self.reason)
+ return fr
+ else:
+ return future
+ elif self.synchronous and not frame.method.response \
+ and self.use_execution_layer and frame.method.is_l4_command():
+ self.execution_sync()
+ completed = self.completion.wait(timeout=10)
+ if self._closed:
+ raise Closed(self.reason)
+ if not completed:
+ self.closed("Timed-out waiting for completion of %s" % frame)
+ except QueueClosed, e:
+ if self._closed:
+ raise Closed(self.reason)
+ else:
+ raise e
+
+ def __getattr__(self, name):
+ type = self.spec.method(name)
+ if type == None: raise AttributeError(name)
+ method = lambda *args, **kwargs: self.invoke(type, args, kwargs)
+ self.__dict__[name] = method
+ return method
+
+def read_content(queue):
+ header = queue.get()
+ children = []
+ for i in range(header.weight):
+ children.append(read_content(queue))
+ buf = StringIO()
+ eof = header.eof
+ while not eof:
+ body = queue.get()
+ eof = body.eof
+ content = body.content
+ buf.write(content)
+ return Content(buf.getvalue(), children, header.properties.copy())
+
+class Future:
+ def __init__(self):
+ self.completed = threading.Event()
+
+ def put_response(self, channel, response):
+ self.response = response
+ self.completed.set()
+
+ def get_response(self, timeout=None):
+ self.completed.wait(timeout)
+ if self.completed.isSet():
+ return self.response
+ else:
+ return None
+
+ def is_complete(self):
+ return self.completed.isSet()
+
+class OutgoingCompletion:
+ """
+ Manages completion of outgoing commands i.e. command sent by this peer
+ """
+
+ def __init__(self):
+ self.condition = threading.Condition()
+
+ #todo, implement proper wraparound
+ self.sequence = Sequence(0) #issues ids for outgoing commands
+ self.command_id = -1 #last issued id
+ self.mark = -1 #commands up to this mark are known to be complete
+ self._closed = False
+
+ def next_command(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.is_l4_command():
+ self.command_id = self.sequence.next()
+
+ def reset(self):
+ self.sequence = Sequence(0) #reset counter
+
+ def close(self):
+ self.reset()
+ self.condition.acquire()
+ try:
+ self._closed = True
+ self.condition.notifyAll()
+ finally:
+ self.condition.release()
+
+ def complete(self, mark):
+ self.condition.acquire()
+ try:
+ self.mark = mark
+ #print "set mark to %s [%s] " % (self.mark, self)
+ self.condition.notifyAll()
+ finally:
+ self.condition.release()
+
+ def wait(self, point_of_interest=-1, timeout=None):
+ if point_of_interest == -1: point_of_interest = self.command_id
+ start_time = time()
+ remaining = timeout
+ self.condition.acquire()
+ try:
+ while not self._closed and point_of_interest > self.mark:
+ #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self)
+ self.condition.wait(remaining)
+ if not self._closed and point_of_interest > self.mark and timeout:
+ if (start_time + timeout) < time(): break
+ else: remaining = timeout - (time() - start_time)
+ finally:
+ self.condition.release()
+ return point_of_interest <= self.mark
+
+class IncomingCompletion:
+ """
+ Manages completion of incoming commands i.e. command received by this peer
+ """
+
+ def __init__(self, channel):
+ self.sequence = Sequence(0) #issues ids for incoming commands
+ self.mark = -1 #id of last command of whose completion notification was sent to the other peer
+ self.channel = channel
+
+ def reset(self):
+ self.sequence = Sequence(0) #reset counter
+
+ def complete(self, mark, cumulative=True):
+ if cumulative:
+ if mark > self.mark:
+ self.mark = mark
+ self.channel.execution_complete(cumulative_execution_mark=self.mark)
+ else:
+ #TODO: record and manage the ranges properly
+ range = [mark, mark]
+ if (self.mark == -1):#hack until wraparound is implemented
+ self.channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=range)
+ else:
+ self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range)
diff --git a/RC5/python/qpid/queue.py b/RC5/python/qpid/queue.py
new file mode 100644
index 0000000000..c9f4d1d1d0
--- /dev/null
+++ b/RC5/python/qpid/queue.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+This module augments the standard python multithreaded Queue
+implementation to add a close() method so that threads blocking on the
+content of a queue can be notified if the queue is no longer in use.
+"""
+
+from Queue import Queue as BaseQueue, Empty, Full
+from threading import Thread
+from exceptions import Closed
+
+class Queue(BaseQueue):
+
+ END = object()
+ STOP = object()
+
+ def __init__(self, *args, **kwargs):
+ BaseQueue.__init__(self, *args, **kwargs)
+ self.error = None
+ self.listener = None
+ self.exc_listener = None
+ self.thread = None
+
+ def close(self, error = None):
+ self.error = error
+ self.put(Queue.END)
+ if self.thread is not None:
+ self.thread.join()
+ self.thread = None
+
+ def get(self, block = True, timeout = None):
+ result = BaseQueue.get(self, block, timeout)
+ if result == Queue.END:
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Closed exception
+ self.put(Queue.END)
+ raise Closed(self.error)
+ else:
+ return result
+
+ def listen(self, listener, exc_listener = None):
+ if listener is None and exc_listener is not None:
+ raise ValueError("cannot set exception listener without setting listener")
+
+ if listener is None:
+ if self.thread is not None:
+ self.put(Queue.STOP)
+ self.thread.join()
+ self.thread = None
+
+ self.listener = listener
+ self.exc_listener = exc_listener
+
+ if listener is not None and self.thread is None:
+ self.thread = Thread(target = self.run)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ def run(self):
+ while True:
+ try:
+ o = self.get()
+ if o == Queue.STOP: break
+ self.listener(o)
+ except Closed, e:
+ if self.exc_listener is not None:
+ self.exc_listener(e)
+ break
diff --git a/RC5/python/qpid/reference.py b/RC5/python/qpid/reference.py
new file mode 100644
index 0000000000..48ecb67656
--- /dev/null
+++ b/RC5/python/qpid/reference.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Support for amqp 'reference' content (as opposed to inline content)
+"""
+
+import threading
+from queue import Queue, Closed
+
+class NotOpened(Exception): pass
+
+class AlreadyOpened(Exception): pass
+
+"""
+A representation of a reference id; can be passed wherever amqp
+content is required in place of inline data
+"""
+class ReferenceId:
+
+ def __init__(self, id):
+ self.id = id
+
+"""
+Holds content received through 'reference api'. Instances of this
+class will be placed in the consumers queue on receiving a transfer
+(assuming the reference has been opened). Data can be retrieved in
+chunks (as append calls are received) or in full (after reference has
+been closed signalling data s complete).
+"""
+
+class Reference:
+
+ def __init__(self, id):
+ self.id = id
+ self.chunks = Queue(0)
+
+ def close(self):
+ self.chunks.close()
+
+ def append(self, bytes):
+ self.chunks.put(bytes)
+
+ def get_chunk(self):
+ return self.chunks.get()
+
+ def get_complete(self):
+ data = ""
+ for chunk in self:
+ data += chunk
+ return data
+
+ def next(self):
+ try:
+ return self.get_chunk()
+ except Closed, e:
+ raise StopIteration
+
+ def __iter__(self):
+ return self
+
+"""
+Manages a set of opened references. New references can be opened and
+existing references can be retrieved or closed.
+"""
+class References:
+
+ def __init__(self):
+ self.map = {}
+ self.lock = threading.Lock()
+
+ def get(self, id):
+ self.lock.acquire()
+ try:
+ try:
+ ref = self.map[id]
+ except KeyError:
+ raise NotOpened()
+ finally:
+ self.lock.release()
+ return ref
+
+ def open(self, id):
+ self.lock.acquire()
+ try:
+ if id in self.map: raise AlreadyOpened()
+ self.map[id] = Reference(id)
+ finally:
+ self.lock.release()
+
+
+ def close(self, id):
+ self.get(id).close()
+ self.lock.acquire()
+ try:
+ self.map.pop(id)
+ finally:
+ self.lock.release()
+
diff --git a/RC5/python/qpid/session.py b/RC5/python/qpid/session.py
new file mode 100644
index 0000000000..4a7ecbc28a
--- /dev/null
+++ b/RC5/python/qpid/session.py
@@ -0,0 +1,379 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from threading import Condition, RLock, Lock, currentThread
+from invoker import Invoker
+from datatypes import RangedSet, Struct, Future
+from codec010 import StringCodec
+from assembler import Segment
+from queue import Queue
+from datatypes import Message, serial
+from util import wait, notify
+from exceptions import *
+from logging import getLogger
+
+log = getLogger("qpid.io.cmd")
+msg = getLogger("qpid.io.msg")
+
+class SessionException(Exception): pass
+class SessionClosed(SessionException): pass
+class SessionDetached(SessionException): pass
+
+def client(*args):
+ return Client(*args)
+
+def server(*args):
+ return Server(*args)
+
+INCOMPLETE = object()
+
+class Session(Invoker):
+
+ def __init__(self, name, spec, auto_sync=True, timeout=10, delegate=client):
+ self.name = name
+ self.spec = spec
+ self.auto_sync = auto_sync
+ self.timeout = timeout
+ self.channel = None
+ self.invoke_lock = Lock()
+ self._closing = False
+ self._closed = False
+
+ self.condition = Condition()
+
+ self.send_id = True
+ self.receiver = Receiver(self)
+ self.sender = Sender(self)
+
+ self.lock = RLock()
+ self._incoming = {}
+ self.results = {}
+ self.exceptions = []
+
+ self.assembly = None
+
+ self.delegate = delegate(self)
+
+ def incoming(self, destination):
+ self.lock.acquire()
+ try:
+ queue = self._incoming.get(destination)
+ if queue == None:
+ queue = Incoming(self, destination)
+ self._incoming[destination] = queue
+ return queue
+ finally:
+ self.lock.release()
+
+ def error(self):
+ exc = self.exceptions[:]
+ if len(exc) == 0:
+ return None
+ elif len(exc) == 1:
+ return exc[0]
+ else:
+ return tuple(exc)
+
+ def sync(self, timeout=None):
+ ch = self.channel
+ if ch is not None and currentThread() == ch.connection.thread:
+ raise SessionException("deadlock detected")
+ if not self.auto_sync:
+ self.execution_sync(sync=True)
+ last = self.sender.next_id - 1
+ if not wait(self.condition, lambda:
+ last in self.sender._completed or self.exceptions,
+ timeout):
+ raise Timeout()
+ if self.exceptions:
+ raise SessionException(self.error())
+
+ def close(self, timeout=None):
+ self.invoke_lock.acquire()
+ try:
+ self._closing = True
+ self.channel.session_detach(self.name)
+ finally:
+ self.invoke_lock.release()
+ if not wait(self.condition, lambda: self._closed, timeout):
+ raise Timeout()
+
+ def closed(self):
+ self.lock.acquire()
+ try:
+ if self._closed: return
+
+ error = self.error()
+ for id in self.results:
+ f = self.results[id]
+ f.error(error)
+ self.results.clear()
+
+ for q in self._incoming.values():
+ q.close(error)
+
+ self._closed = True
+ notify(self.condition)
+ finally:
+ self.lock.release()
+
+ def resolve_method(self, name):
+ cmd = self.spec.instructions.get(name)
+ if cmd is not None and cmd.track == self.spec["track.command"].value:
+ return self.METHOD, cmd
+ else:
+ # XXX
+ for st in self.spec.structs.values():
+ if st.name == name:
+ return self.METHOD, st
+ if self.spec.structs_by_name.has_key(name):
+ return self.METHOD, self.spec.structs_by_name[name]
+ if self.spec.enums.has_key(name):
+ return self.VALUE, self.spec.enums[name]
+ return self.ERROR, None
+
+ def invoke(self, type, args, kwargs):
+ # XXX
+ if not hasattr(type, "track"):
+ return type.new(args, kwargs)
+
+ self.invoke_lock.acquire()
+ try:
+ return self.do_invoke(type, args, kwargs)
+ finally:
+ self.invoke_lock.release()
+
+ def do_invoke(self, type, args, kwargs):
+ if self._closing:
+ raise SessionClosed()
+
+ if self.channel == None:
+ raise SessionDetached()
+
+ if type.segments:
+ if len(args) == len(type.fields) + 1:
+ message = args[-1]
+ args = args[:-1]
+ else:
+ message = kwargs.pop("message", None)
+ else:
+ message = None
+
+ hdr = Struct(self.spec["session.header"])
+ hdr.sync = self.auto_sync or kwargs.pop("sync", False)
+
+ cmd = type.new(args, kwargs)
+ sc = StringCodec(self.spec)
+ sc.write_command(hdr, cmd)
+
+ seg = Segment(True, (message == None or
+ (message.headers == None and message.body == None)),
+ type.segment_type, type.track, self.channel.id, sc.encoded)
+
+ if type.result:
+ result = Future(exception=SessionException)
+ self.results[self.sender.next_id] = result
+
+ self.send(seg)
+
+ log.debug("SENT %s %s %s", seg.id, hdr, cmd)
+
+ if message != None:
+ if message.headers != None:
+ sc = StringCodec(self.spec)
+ for st in message.headers:
+ sc.write_struct32(st)
+ seg = Segment(False, message.body == None, self.spec["segment_type.header"].value,
+ type.track, self.channel.id, sc.encoded)
+ self.send(seg)
+ if message.body != None:
+ seg = Segment(False, True, self.spec["segment_type.body"].value,
+ type.track, self.channel.id, message.body)
+ self.send(seg)
+ msg.debug("SENT %s", message)
+
+ if type.result:
+ if self.auto_sync:
+ return result.get(self.timeout)
+ else:
+ return result
+ elif self.auto_sync:
+ self.sync(self.timeout)
+
+ def received(self, seg):
+ self.receiver.received(seg)
+ if seg.first:
+ assert self.assembly == None
+ self.assembly = []
+ self.assembly.append(seg)
+ if seg.last:
+ self.dispatch(self.assembly)
+ self.assembly = None
+
+ def dispatch(self, assembly):
+ segments = assembly[:]
+
+ hdr, cmd = assembly.pop(0).decode(self.spec)
+ log.debug("RECV %s %s %s", cmd.id, hdr, cmd)
+
+ args = []
+
+ for st in cmd._type.segments:
+ if assembly:
+ seg = assembly[0]
+ if seg.type == st.segment_type:
+ args.append(seg.decode(self.spec))
+ assembly.pop(0)
+ continue
+ args.append(None)
+
+ assert len(assembly) == 0
+
+ attr = cmd._type.qname.replace(".", "_")
+ result = getattr(self.delegate, attr)(cmd, *args)
+
+ if cmd._type.result:
+ self.execution_result(cmd.id, result)
+
+ if result is not INCOMPLETE:
+ for seg in segments:
+ self.receiver.completed(seg)
+ # XXX: don't forget to obey sync for manual completion as well
+ if hdr.sync:
+ self.channel.session_completed(self.receiver._completed)
+
+ def send(self, seg):
+ self.sender.send(seg)
+
+ def __str__(self):
+ return '<Session: %s, %s>' % (self.name, self.channel)
+
+ def __repr__(self):
+ return str(self)
+
+class Receiver:
+
+ def __init__(self, session):
+ self.session = session
+ self.next_id = None
+ self.next_offset = None
+ self._completed = RangedSet()
+
+ def received(self, seg):
+ if self.next_id == None or self.next_offset == None:
+ raise Exception("todo")
+ seg.id = self.next_id
+ seg.offset = self.next_offset
+ if seg.last:
+ self.next_id += 1
+ self.next_offset = 0
+ else:
+ self.next_offset += len(seg.payload)
+
+ def completed(self, seg):
+ if seg.id == None:
+ raise ValueError("cannot complete unidentified segment")
+ if seg.last:
+ self._completed.add(seg.id)
+
+ def known_completed(self, commands):
+ completed = RangedSet()
+ for c in self._completed.ranges:
+ for kc in commands.ranges:
+ if c.lower in kc and c.upper in kc:
+ break
+ else:
+ completed.add_range(c)
+ self._completed = completed
+
+class Sender:
+
+ def __init__(self, session):
+ self.session = session
+ self.next_id = serial(0)
+ self.next_offset = 0
+ self.segments = []
+ self._completed = RangedSet()
+
+ def send(self, seg):
+ seg.id = self.next_id
+ seg.offset = self.next_offset
+ if seg.last:
+ self.next_id += 1
+ self.next_offset = 0
+ else:
+ self.next_offset += len(seg.payload)
+ self.segments.append(seg)
+ if self.session.send_id:
+ self.session.send_id = False
+ self.session.channel.session_command_point(seg.id, seg.offset)
+ self.session.channel.connection.write_segment(seg)
+
+ def completed(self, commands):
+ idx = 0
+ while idx < len(self.segments):
+ seg = self.segments[idx]
+ if seg.id in commands:
+ del self.segments[idx]
+ else:
+ idx += 1
+ for range in commands.ranges:
+ self._completed.add(range.lower, range.upper)
+
+class Incoming(Queue):
+
+ def __init__(self, session, destination):
+ Queue.__init__(self)
+ self.session = session
+ self.destination = destination
+
+ def start(self):
+ self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit)
+ for unit in self.session.credit_unit.values():
+ self.session.message_flow(self.destination, unit, 0xFFFFFFFFL)
+
+ def stop(self):
+ self.session.message_cancel(self.destination)
+ self.listen(None)
+
+class Delegate:
+
+ def __init__(self, session):
+ self.session = session
+
+ #XXX: do something with incoming accepts
+ def message_accept(self, ma): None
+
+ def execution_result(self, er):
+ future = self.session.results.pop(er.command_id)
+ future.set(er.value)
+
+ def execution_exception(self, ex):
+ self.session.exceptions.append(ex)
+
+class Client(Delegate):
+
+ def message_transfer(self, cmd, headers, body):
+ m = Message(body)
+ m.headers = headers
+ m.id = cmd.id
+ messages = self.session.incoming(cmd.destination)
+ messages.put(m)
+ msg.debug("RECV %s", m)
+ return INCOMPLETE
diff --git a/RC5/python/qpid/spec.py b/RC5/python/qpid/spec.py
new file mode 100644
index 0000000000..e6d914044c
--- /dev/null
+++ b/RC5/python/qpid/spec.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+This module loads protocol metadata into python objects. It provides
+access to spec metadata via a python object model, and can also
+dynamically creating python methods, classes, and modules based on the
+spec metadata. All the generated methods have proper signatures and
+doc strings based on the spec metadata so the python help system can
+be used to browse the spec documentation. The generated methods all
+dispatch to the self.invoke(meth, args) callback of the containing
+class so that the generated code can be reused in a variety of
+situations.
+"""
+
+import os, mllib, spec08, spec010
+
+def default():
+ try:
+ amqp_spec = os.environ["AMQP_SPEC"]
+ return amqp_spec
+ except KeyError:
+ try:
+ from qpid_config import amqp_spec
+ return amqp_spec
+ except ImportError:
+ raise Exception("unable to locate the amqp specification, please set "
+ "the AMQP_SPEC environment variable or supply "
+ "qpid_config.py on the PYTHONPATH")
+
+def load(specfile, *errata):
+ for name in (specfile,) + errata:
+ if not os.path.exists(name):
+ raise IOError("No such file or directory: '%s'" % name)
+
+ doc = mllib.xml_parse(specfile)
+ major = doc["amqp/@major"]
+ minor = doc["amqp/@minor"]
+
+ if major == "0" and minor == "10":
+ return spec010.load(specfile, *errata)
+ else:
+ return spec08.load(specfile, *errata)
diff --git a/RC5/python/qpid/spec010.py b/RC5/python/qpid/spec010.py
new file mode 100644
index 0000000000..cbc85a5e8b
--- /dev/null
+++ b/RC5/python/qpid/spec010.py
@@ -0,0 +1,693 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, cPickle, datatypes, datetime
+from codec010 import StringCodec
+from util import mtime, fill
+
+class Node:
+
+ def __init__(self, children):
+ self.children = children
+ self.named = {}
+ self.docs = []
+ self.rules = []
+
+ def register(self):
+ for ch in self.children:
+ ch.register(self)
+
+ def resolve(self):
+ for ch in self.children:
+ ch.resolve()
+
+ def __getitem__(self, name):
+ path = name.split(".", 1)
+ nd = self.named
+ for step in path:
+ nd = nd[step]
+ return nd
+
+ def __iter__(self):
+ return iter(self.children)
+
+class Anonymous:
+
+ def __init__(self, children):
+ self.children = children
+
+ def register(self, node):
+ for ch in self.children:
+ ch.register(node)
+
+ def resolve(self):
+ for ch in self.children:
+ ch.resolve()
+
+class Named:
+
+ def __init__(self, name):
+ self.name = name
+ self.qname = None
+
+ def register(self, node):
+ self.spec = node.spec
+ self.klass = node.klass
+ node.named[self.name] = self
+ if node.qname:
+ self.qname = "%s.%s" % (node.qname, self.name)
+ else:
+ self.qname = self.name
+
+ def __str__(self):
+ return self.qname
+
+ def __repr__(self):
+ return str(self)
+
+class Lookup:
+
+ def lookup(self, name):
+ value = None
+ if self.klass:
+ try:
+ value = self.klass[name]
+ except KeyError:
+ pass
+ if not value:
+ value = self.spec[name]
+ return value
+
+class Coded:
+
+ def __init__(self, code):
+ self.code = code
+
+class Constant(Named, Node):
+
+ def __init__(self, name, value, children):
+ Named.__init__(self, name)
+ Node.__init__(self, children)
+ self.value = value
+
+ def register(self, node):
+ Named.register(self, node)
+ node.constants.append(self)
+ Node.register(self)
+
+class Type(Named, Node):
+
+ def __init__(self, name, children):
+ Named.__init__(self, name)
+ Node.__init__(self, children)
+
+ def is_present(self, value):
+ return value != None
+
+ def register(self, node):
+ Named.register(self, node)
+ Node.register(self)
+
+class Primitive(Coded, Type):
+
+ def __init__(self, name, code, fixed, variable, children):
+ Coded.__init__(self, code)
+ Type.__init__(self, name, children)
+ self.fixed = fixed
+ self.variable = variable
+
+ def register(self, node):
+ Type.register(self, node)
+ if self.code is not None:
+ self.spec.types[self.code] = self
+
+ def is_present(self, value):
+ if self.fixed == 0:
+ return value
+ else:
+ return Type.is_present(self, value)
+
+ def encode(self, codec, value):
+ getattr(codec, "write_%s" % self.name)(value)
+
+ def decode(self, codec):
+ return getattr(codec, "read_%s" % self.name)()
+
+class Domain(Type, Lookup):
+
+ def __init__(self, name, type, children):
+ Type.__init__(self, name, children)
+ self.type = type
+ self.choices = {}
+
+ def resolve(self):
+ self.type = self.lookup(self.type)
+ Node.resolve(self)
+
+ def encode(self, codec, value):
+ self.type.encode(codec, value)
+
+ def decode(self, codec):
+ return self.type.decode(codec)
+
+class Enum:
+
+ def __init__(self, name):
+ self.name = name
+ self._names = ()
+ self._values = ()
+
+ def values(self):
+ return self._values
+
+ def __repr__(self):
+ return "%s(%s)" % (self.name, ", ".join(self._names))
+
+class Choice(Named, Node):
+
+ def __init__(self, name, value, children):
+ Named.__init__(self, name)
+ Node.__init__(self, children)
+ self.value = value
+
+ def register(self, node):
+ Named.register(self, node)
+ node.choices[self.value] = self
+ Node.register(self)
+ try:
+ enum = node.spec.enums[node.name]
+ except KeyError:
+ enum = Enum(node.name)
+ node.spec.enums[node.name] = enum
+ setattr(enum, self.name, self.value)
+ enum._names += (self.name,)
+ enum._values += (self.value,)
+
+class Composite(Type, Coded):
+
+ def __init__(self, name, label, code, size, pack, children):
+ Coded.__init__(self, code)
+ Type.__init__(self, name, children)
+ self.label = label
+ self.fields = []
+ self.size = size
+ self.pack = pack
+
+ def new(self, args, kwargs):
+ return datatypes.Struct(self, *args, **kwargs)
+
+ def decode(self, codec):
+ codec.read_size(self.size)
+ if self.code is not None:
+ code = codec.read_uint16()
+ assert self.code == code
+ return datatypes.Struct(self, **self.decode_fields(codec))
+
+ def decode_fields(self, codec):
+ flags = 0
+ for i in range(self.pack):
+ flags |= (codec.read_uint8() << 8*i)
+
+ result = {}
+
+ for i in range(len(self.fields)):
+ f = self.fields[i]
+ if flags & (0x1 << i):
+ result[f.name] = f.type.decode(codec)
+ else:
+ result[f.name] = None
+ return result
+
+ def encode(self, codec, value):
+ sc = StringCodec(self.spec)
+ if self.code is not None:
+ sc.write_uint16(self.code)
+ self.encode_fields(sc, value)
+ codec.write_size(self.size, len(sc.encoded))
+ codec.write(sc.encoded)
+
+ def encode_fields(self, codec, values):
+ flags = 0
+ for i in range(len(self.fields)):
+ f = self.fields[i]
+ if f.type.is_present(values[f.name]):
+ flags |= (0x1 << i)
+ for i in range(self.pack):
+ codec.write_uint8((flags >> 8*i) & 0xFF)
+ for i in range(len(self.fields)):
+ f = self.fields[i]
+ if flags & (0x1 << i):
+ f.type.encode(codec, values[f.name])
+
+ def docstring(self):
+ docs = []
+ if self.label:
+ docs.append(self.label)
+ docs += [d.text for d in self.docs]
+ s = "\n\n".join([fill(t, 2) for t in docs])
+ for f in self.fields:
+ fdocs = []
+ if f.label:
+ fdocs.append(f.label)
+ else:
+ fdocs.append("")
+ fdocs += [d.text for d in f.docs]
+ s += "\n\n" + "\n\n".join([fill(fdocs[0], 4, f.name)] +
+ [fill(t, 4) for t in fdocs[1:]])
+ return s
+
+
+class Field(Named, Node, Lookup):
+
+ def __init__(self, name, label, type, children):
+ Named.__init__(self, name)
+ Node.__init__(self, children)
+ self.label = label
+ self.type = type
+ self.exceptions = []
+
+ def default(self):
+ return None
+
+ def register(self, node):
+ Named.register(self, node)
+ node.fields.append(self)
+ Node.register(self)
+
+ def resolve(self):
+ self.type = self.lookup(self.type)
+ Node.resolve(self)
+
+ def __str__(self):
+ return "%s: %s" % (self.qname, self.type.qname)
+
+class Struct(Composite):
+
+ def register(self, node):
+ Composite.register(self, node)
+ if self.code is not None:
+ self.spec.structs[self.code] = self
+ self.spec.structs_by_name[self.name] = self
+ self.pyname = self.name
+ self.pydoc = self.docstring()
+
+ def __str__(self):
+ fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname)
+ for f in self.fields])
+ return "%s {\n %s\n}" % (self.qname, fields)
+
+class Segment:
+
+ def __init__(self):
+ self.segment_type = None
+
+ def register(self, node):
+ self.spec = node.spec
+ self.klass = node.klass
+ node.segments.append(self)
+ Node.register(self)
+
+class Instruction(Composite, Segment):
+
+ def __init__(self, name, label, code, children):
+ Composite.__init__(self, name, label, code, 0, 2, children)
+ Segment.__init__(self)
+ self.track = None
+ self.handlers = []
+
+ def __str__(self):
+ return "%s(%s)" % (self.qname, ", ".join(["%s: %s" % (f.name, f.type.qname)
+ for f in self.fields]))
+
+ def register(self, node):
+ Composite.register(self, node)
+ self.pyname = self.qname.replace(".", "_")
+ self.pydoc = self.docstring()
+ self.spec.instructions[self.pyname] = self
+
+class Control(Instruction):
+
+ def __init__(self, name, code, label, children):
+ Instruction.__init__(self, name, code, label, children)
+ self.response = None
+
+ def register(self, node):
+ Instruction.register(self, node)
+ node.controls.append(self)
+ self.spec.controls[self.code] = self
+ self.segment_type = self.spec["segment_type.control"].value
+ self.track = self.spec["track.control"].value
+
+class Command(Instruction):
+
+ def __init__(self, name, label, code, children):
+ Instruction.__init__(self, name, label, code, children)
+ self.result = None
+ self.exceptions = []
+ self.segments = []
+
+ def register(self, node):
+ Instruction.register(self, node)
+ node.commands.append(self)
+ self.spec.commands[self.code] = self
+ self.segment_type = self.spec["segment_type.command"].value
+ self.track = self.spec["track.command"].value
+
+class Header(Segment, Node):
+
+ def __init__(self, children):
+ Segment.__init__(self)
+ Node.__init__(self, children)
+ self.entries = []
+
+ def register(self, node):
+ Segment.register(self, node)
+ self.segment_type = self.spec["segment_type.header"].value
+ Node.register(self)
+
+class Entry(Lookup):
+
+ def __init__(self, type):
+ self.type = type
+
+ def register(self, node):
+ self.spec = node.spec
+ self.klass = node.klass
+ node.entries.append(self)
+
+ def resolve(self):
+ self.type = self.lookup(self.type)
+
+class Body(Segment, Node):
+
+ def __init__(self, children):
+ Segment.__init__(self)
+ Node.__init__(self, children)
+
+ def register(self, node):
+ Segment.register(self, node)
+ self.segment_type = self.spec["segment_type.body"].value
+ Node.register(self)
+
+ def resolve(self): pass
+
+class Class(Named, Coded, Node):
+
+ def __init__(self, name, code, children):
+ Named.__init__(self, name)
+ Coded.__init__(self, code)
+ Node.__init__(self, children)
+ self.controls = []
+ self.commands = []
+
+ def register(self, node):
+ Named.register(self, node)
+ self.klass = self
+ node.classes.append(self)
+ Node.register(self)
+
+class Doc:
+
+ def __init__(self, type, title, text):
+ self.type = type
+ self.title = title
+ self.text = text
+
+ def register(self, node):
+ node.docs.append(self)
+
+ def resolve(self): pass
+
+class Role(Named, Node):
+
+ def __init__(self, name, children):
+ Named.__init__(self, name)
+ Node.__init__(self, children)
+
+ def register(self, node):
+ Named.register(self, node)
+ Node.register(self)
+
+class Rule(Named, Node):
+
+ def __init__(self, name, children):
+ Named.__init__(self, name)
+ Node.__init__(self, children)
+
+ def register(self, node):
+ Named.register(self, node)
+ node.rules.append(self)
+ Node.register(self)
+
+class Exception(Named, Node):
+
+ def __init__(self, name, error_code, children):
+ Named.__init__(self, name)
+ Node.__init__(self, children)
+ self.error_code = error_code
+
+ def register(self, node):
+ Named.register(self, node)
+ node.exceptions.append(self)
+ Node.register(self)
+
+class Spec(Node):
+
+ ENCODINGS = {
+ basestring: "vbin16",
+ int: "int64",
+ long: "int64",
+ float: "float",
+ None.__class__: "void",
+ list: "list",
+ tuple: "list",
+ dict: "map",
+ datatypes.timestamp: "datetime",
+ datetime.datetime: "datetime"
+ }
+
+ def __init__(self, major, minor, port, children):
+ Node.__init__(self, children)
+ self.major = major
+ self.minor = minor
+ self.port = port
+ self.constants = []
+ self.classes = []
+ self.types = {}
+ self.qname = None
+ self.spec = self
+ self.klass = None
+ self.instructions = {}
+ self.controls = {}
+ self.commands = {}
+ self.structs = {}
+ self.structs_by_name = {}
+ self.enums = {}
+
+ def encoding(self, klass):
+ if Spec.ENCODINGS.has_key(klass):
+ return self.named[Spec.ENCODINGS[klass]]
+ for base in klass.__bases__:
+ result = self.encoding(base)
+ if result != None:
+ return result
+
+class Implement:
+
+ def __init__(self, handle):
+ self.handle = handle
+
+ def register(self, node):
+ node.handlers.append(self.handle)
+
+ def resolve(self): pass
+
+class Response(Node):
+
+ def __init__(self, name, children):
+ Node.__init__(self, children)
+ self.name = name
+
+ def register(self, node):
+ Node.register(self)
+
+class Result(Node, Lookup):
+
+ def __init__(self, type, children):
+ self.type = type
+ Node.__init__(self, children)
+
+ def register(self, node):
+ node.result = self
+ self.qname = node.qname
+ self.klass = node.klass
+ self.spec = node.spec
+ Node.register(self)
+
+ def resolve(self):
+ self.type = self.lookup(self.type)
+ Node.resolve(self)
+
+import mllib
+
+def num(s):
+ if s: return int(s, 0)
+
+REPLACE = {" ": "_", "-": "_"}
+KEYWORDS = {"global": "global_",
+ "return": "return_"}
+
+def id(name):
+ name = str(name)
+ for key, val in REPLACE.items():
+ name = name.replace(key, val)
+ try:
+ name = KEYWORDS[name]
+ except KeyError:
+ pass
+ return name
+
+class Loader:
+
+ def __init__(self):
+ self.class_code = 0
+
+ def code(self, nd):
+ c = num(nd["@code"])
+ if c is None:
+ return None
+ else:
+ return c | (self.class_code << 8)
+
+ def list(self, q):
+ result = []
+ for nd in q:
+ result.append(nd.dispatch(self))
+ return result
+
+ def children(self, n):
+ return self.list(n.query["#tag"])
+
+ def data(self, d):
+ return d.data
+
+ def do_amqp(self, a):
+ return Spec(num(a["@major"]), num(a["@minor"]), num(a["@port"]),
+ self.children(a))
+
+ def do_type(self, t):
+ return Primitive(id(t["@name"]), self.code(t), num(t["@fixed-width"]),
+ num(t["@variable-width"]), self.children(t))
+
+ def do_constant(self, c):
+ return Constant(id(c["@name"]), num(c["@value"]), self.children(c))
+
+ def do_domain(self, d):
+ return Domain(id(d["@name"]), id(d["@type"]), self.children(d))
+
+ def do_enum(self, e):
+ return Anonymous(self.children(e))
+
+ def do_choice(self, c):
+ return Choice(id(c["@name"]), num(c["@value"]), self.children(c))
+
+ def do_class(self, c):
+ code = num(c["@code"])
+ self.class_code = code
+ children = self.children(c)
+ children += self.list(c.query["command/result/struct"])
+ self.class_code = 0
+ return Class(id(c["@name"]), code, children)
+
+ def do_doc(self, doc):
+ text = reduce(lambda x, y: x + y, self.list(doc.children))
+ return Doc(doc["@type"], doc["@title"], text)
+
+ def do_xref(self, x):
+ return x["@ref"]
+
+ def do_role(self, r):
+ return Role(id(r["@name"]), self.children(r))
+
+ def do_control(self, c):
+ return Control(id(c["@name"]), c["@label"], self.code(c), self.children(c))
+
+ def do_rule(self, r):
+ return Rule(id(r["@name"]), self.children(r))
+
+ def do_implement(self, i):
+ return Implement(id(i["@handle"]))
+
+ def do_response(self, r):
+ return Response(id(r["@name"]), self.children(r))
+
+ def do_field(self, f):
+ return Field(id(f["@name"]), f["@label"], id(f["@type"]), self.children(f))
+
+ def do_struct(self, s):
+ return Struct(id(s["@name"]), s["@label"], self.code(s), num(s["@size"]),
+ num(s["@pack"]), self.children(s))
+
+ def do_command(self, c):
+ return Command(id(c["@name"]), c["@label"], self.code(c), self.children(c))
+
+ def do_segments(self, s):
+ return Anonymous(self.children(s))
+
+ def do_header(self, h):
+ return Header(self.children(h))
+
+ def do_entry(self, e):
+ return Entry(id(e["@type"]))
+
+ def do_body(self, b):
+ return Body(self.children(b))
+
+ def do_result(self, r):
+ type = r["@type"]
+ if not type:
+ type = r["struct/@name"]
+ return Result(id(type), self.list(r.query["#tag", lambda x: x.name != "struct"]))
+
+ def do_exception(self, e):
+ return Exception(id(e["@name"]), id(e["@error-code"]), self.children(e))
+
+def load(xml):
+ fname = xml + ".pcl"
+
+ if os.path.exists(fname) and mtime(fname) > mtime(__file__):
+ file = open(fname, "r")
+ s = cPickle.load(file)
+ file.close()
+ else:
+ doc = mllib.xml_parse(xml)
+ s = doc["amqp"].dispatch(Loader())
+ s.register()
+ s.resolve()
+
+ try:
+ file = open(fname, "w")
+ except IOError:
+ file = None
+
+ if file:
+ cPickle.dump(s, file)
+ file.close()
+
+ return s
diff --git a/RC5/python/qpid/spec08.py b/RC5/python/qpid/spec08.py
new file mode 100644
index 0000000000..a0047e7107
--- /dev/null
+++ b/RC5/python/qpid/spec08.py
@@ -0,0 +1,504 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+This module loads protocol metadata into python objects. It provides
+access to spec metadata via a python object model, and can also
+dynamically creating python methods, classes, and modules based on the
+spec metadata. All the generated methods have proper signatures and
+doc strings based on the spec metadata so the python help system can
+be used to browse the spec documentation. The generated methods all
+dispatch to the self.invoke(meth, args) callback of the containing
+class so that the generated code can be reused in a variety of
+situations.
+"""
+
+import re, new, mllib, qpid
+from util import fill
+
+class SpecContainer:
+
+ def __init__(self):
+ self.items = []
+ self.byname = {}
+ self.byid = {}
+ self.indexes = {}
+
+ def add(self, item):
+ if self.byname.has_key(item.name):
+ raise ValueError("duplicate name: %s" % item)
+ if item.id == None:
+ item.id = len(self)
+ elif self.byid.has_key(item.id):
+ raise ValueError("duplicate id: %s" % item)
+ self.indexes[item] = len(self.items)
+ self.items.append(item)
+ self.byname[item.name] = item
+ self.byid[item.id] = item
+
+ def index(self, item):
+ try:
+ return self.indexes[item]
+ except KeyError:
+ raise ValueError(item)
+
+ def __iter__(self):
+ return iter(self.items)
+
+ def __len__(self):
+ return len(self.items)
+
+class Metadata:
+
+ PRINT = []
+
+ def __init__(self):
+ pass
+
+ def __str__(self):
+ args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT)
+ return "%s(%s)" % (self.__class__.__name__, ", ".join(args))
+
+ def __repr__(self):
+ return str(self)
+
+class Spec(Metadata):
+
+ PRINT=["major", "minor", "file"]
+
+ def __init__(self, major, minor, file):
+ Metadata.__init__(self)
+ self.major = major
+ self.minor = minor
+ self.file = file
+ self.constants = SpecContainer()
+ self.domains = SpecContainer()
+ self.classes = SpecContainer()
+ # methods indexed by classname_methname
+ self.methods = {}
+ # structs by type code
+ self.structs = {}
+
+ def post_load(self):
+ self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
+ self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor))
+
+ def method(self, name):
+ if not self.methods.has_key(name):
+ for cls in self.classes:
+ clen = len(cls.name)
+ if name.startswith(cls.name) and name[clen] == "_":
+ end = name[clen + 1:]
+ if cls.methods.byname.has_key(end):
+ self.methods[name] = cls.methods.byname[end]
+ return self.methods.get(name)
+
+ def parse_method(self, name):
+ parts = re.split(r"\s*\.\s*", name)
+ if len(parts) != 2:
+ raise ValueError(name)
+ klass, meth = parts
+ return self.classes.byname[klass].methods.byname[meth]
+
+ def struct(self, name, *args, **kwargs):
+ type = self.domains.byname[name].type
+ return qpid.Struct(type, *args, **kwargs)
+
+ def define_module(self, name, doc = None):
+ module = new.module(name, doc)
+ module.__file__ = self.file
+ for c in self.classes:
+ cls = c.define_class(c.name)
+ cls.__module__ = module.__name__
+ setattr(module, c.name, cls)
+ return module
+
+ def define_class(self, name):
+ methods = {}
+ for c in self.classes:
+ for m in c.methods:
+ meth = m.klass.name + "_" + m.name
+ methods[meth] = m.define_method(meth)
+ return type(name, (), methods)
+
+class Constant(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, spec, name, id, klass, docs):
+ Metadata.__init__(self)
+ self.spec = spec
+ self.name = name
+ self.id = id
+ self.klass = klass
+ self.docs = docs
+
+class Domain(Metadata):
+
+ PRINT=["name", "type"]
+
+ def __init__(self, spec, name, type, description, docs):
+ Metadata.__init__(self)
+ self.spec = spec
+ self.id = None
+ self.name = name
+ self.type = type
+ self.description = description
+ self.docs = docs
+
+class Struct(Metadata):
+
+ PRINT=["size", "type", "pack"]
+
+ def __init__(self, size, type, pack):
+ Metadata.__init__(self)
+ self.size = size
+ self.type = type
+ self.pack = pack
+ self.fields = SpecContainer()
+
+class Class(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, spec, name, id, handler, docs):
+ Metadata.__init__(self)
+ self.spec = spec
+ self.name = name
+ self.id = id
+ self.handler = handler
+ self.fields = SpecContainer()
+ self.methods = SpecContainer()
+ self.docs = docs
+
+ def define_class(self, name):
+ methods = {}
+ for m in self.methods:
+ methods[m.name] = m.define_method(m.name)
+ return type(name, (), methods)
+
+class Method(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, klass, name, id, content, responses, result, synchronous,
+ description, docs):
+ Metadata.__init__(self)
+ self.klass = klass
+ self.name = name
+ self.id = id
+ self.content = content
+ self.responses = responses
+ self.result = result
+ self.synchronous = synchronous
+ self.fields = SpecContainer()
+ self.description = description
+ self.docs = docs
+ self.response = False
+
+ def is_l4_command(self):
+ return self.klass.name not in ["execution", "channel", "connection", "session"]
+
+ def arguments(self, *args, **kwargs):
+ nargs = len(args) + len(kwargs)
+ maxargs = len(self.fields)
+ if nargs > maxargs:
+ self._type_error("takes at most %s arguments (%s) given", maxargs, nargs)
+ result = []
+ for f in self.fields:
+ idx = self.fields.index(f)
+ if idx < len(args):
+ result.append(args[idx])
+ elif kwargs.has_key(f.name):
+ result.append(kwargs.pop(f.name))
+ else:
+ result.append(Method.DEFAULTS[f.type])
+ for key, value in kwargs.items():
+ if self.fields.byname.has_key(key):
+ self._type_error("got multiple values for keyword argument '%s'", key)
+ else:
+ self._type_error("got an unexpected keyword argument '%s'", key)
+ return tuple(result)
+
+ def _type_error(self, msg, *args):
+ raise TypeError("%s %s" % (self.name, msg % args))
+
+ def docstring(self):
+ s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs])
+ for f in self.fields:
+ if f.docs:
+ s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] +
+ [fill(d, 4) for d in f.docs[1:]])
+ if self.responses:
+ s += "\n\nValid responses: "
+ for r in self.responses:
+ s += r.name + " "
+ return s
+
+ METHOD = "__method__"
+ DEFAULTS = {"bit": False,
+ "shortstr": "",
+ "longstr": "",
+ "table": {},
+ "array": [],
+ "octet": 0,
+ "short": 0,
+ "long": 0,
+ "longlong": 0,
+ "timestamp": 0,
+ "content": None,
+ "uuid": "",
+ "rfc1982_long": 0,
+ "rfc1982_long_set": [],
+ "long_struct": None}
+
+ def define_method(self, name):
+ g = {Method.METHOD: self}
+ l = {}
+ args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields]
+ methargs = args[:]
+ if self.content:
+ args += [("content", None)]
+ code = "def %s(self, %s):\n" % \
+ (name, ", ".join(["%s = %r" % a for a in args]))
+ code += " %r\n" % self.docstring()
+ argnames = ", ".join([a[0] for a in methargs])
+ code += " return self.invoke(%s" % Method.METHOD
+ if argnames:
+ code += ", (%s,)" % argnames
+ else:
+ code += ", ()"
+ if self.content:
+ code += ", content"
+ code += ")"
+ exec code in g, l
+ return l[name]
+
+class Field(Metadata):
+
+ PRINT=["name", "id", "type"]
+
+ def __init__(self, name, id, type, domain, description, docs):
+ Metadata.__init__(self)
+ self.name = name
+ self.id = id
+ self.type = type
+ self.domain = domain
+ self.description = description
+ self.docs = docs
+
+ def default(self):
+ if isinstance(self.type, Struct):
+ return None
+ else:
+ return Method.DEFAULTS[self.type]
+
+WIDTHS = {
+ "octet": 1,
+ "short": 2,
+ "long": 4
+ }
+
+def width(st, default=None):
+ if st in (None, "none", ""):
+ return default
+ else:
+ return WIDTHS[st]
+
+def get_result(nd, spec):
+ result = nd["result"]
+ if not result: return None
+ name = result["@domain"]
+ if name != None: return spec.domains.byname[name]
+ st_nd = result["struct"]
+ st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 +
+ int(st_nd["@type"]), width(st_nd["@pack"], 2))
+ spec.structs[st.type] = st
+ load_fields(st_nd, st.fields, spec.domains.byname)
+ return st
+
+def get_desc(nd):
+ label = nd["@label"]
+ if not label:
+ label = nd.text()
+ if label:
+ label = label.strip()
+ return label
+
+def get_docs(nd):
+ return [n.text() for n in nd.query["doc"]]
+
+def load_fields(nd, l, domains):
+ for f_nd in nd.query["field"]:
+ type = f_nd["@domain"]
+ if type == None:
+ type = f_nd["@type"]
+ type = pythonize(type)
+ domain = None
+ while domains.has_key(type) and domains[type].type != type:
+ domain = domains[type]
+ type = domain.type
+ l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, domain,
+ get_desc(f_nd), get_docs(f_nd)))
+
+def load(specfile, *errata):
+ doc = mllib.xml_parse(specfile)
+ spec_root = doc["amqp"]
+ spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
+
+ for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata):
+ # constants
+ for nd in root.query["constant"]:
+ val = nd["@value"]
+ if val.startswith("0x"): val = int(val, 16)
+ else: val = int(val)
+ const = Constant(spec, pythonize(nd["@name"]), val, nd["@class"],
+ get_docs(nd))
+ try:
+ spec.constants.add(const)
+ except ValueError, e:
+ pass
+ #print "Warning:", e
+
+ # domains are typedefs
+ structs = []
+ for nd in root.query["domain"]:
+ type = nd["@type"]
+ if type == None:
+ st_nd = nd["struct"]
+ code = st_nd["@type"]
+ if code not in (None, "", "none"):
+ code = int(code)
+ type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2))
+ if type.type != None:
+ spec.structs[type.type] = type
+ structs.append((type, st_nd))
+ else:
+ type = pythonize(type)
+ domain = Domain(spec, pythonize(nd["@name"]), type, get_desc(nd),
+ get_docs(nd))
+ spec.domains.add(domain)
+
+ # structs
+ for st, st_nd in structs:
+ load_fields(st_nd, st.fields, spec.domains.byname)
+
+ # classes
+ for c_nd in root.query["class"]:
+ cname = pythonize(c_nd["@name"])
+ if spec.classes.byname.has_key(cname):
+ klass = spec.classes.byname[cname]
+ else:
+ klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"],
+ get_docs(c_nd))
+ spec.classes.add(klass)
+
+ added_methods = []
+ load_fields(c_nd, klass.fields, spec.domains.byname)
+ for m_nd in c_nd.query["method"]:
+ mname = pythonize(m_nd["@name"])
+ if klass.methods.byname.has_key(mname):
+ meth = klass.methods.byname[mname]
+ else:
+ meth = Method(klass, mname,
+ int(m_nd["@index"]),
+ m_nd["@content"] == "1",
+ [pythonize(nd["@name"]) for nd in m_nd.query["response"]],
+ get_result(m_nd, spec),
+ m_nd["@synchronous"] == "1",
+ get_desc(m_nd),
+ get_docs(m_nd))
+ klass.methods.add(meth)
+ added_methods.append(meth)
+ load_fields(m_nd, meth.fields, spec.domains.byname)
+ # resolve the responses
+ for m in added_methods:
+ m.responses = [klass.methods.byname[r] for r in m.responses]
+ for resp in m.responses:
+ resp.response = True
+
+ spec.post_load()
+ return spec
+
+REPLACE = {" ": "_", "-": "_"}
+KEYWORDS = {"global": "global_",
+ "return": "return_"}
+
+def pythonize(name):
+ name = str(name)
+ for key, val in REPLACE.items():
+ name = name.replace(key, val)
+ try:
+ name = KEYWORDS[name]
+ except KeyError:
+ pass
+ return name
+
+class Rule(Metadata):
+
+ PRINT = ["text", "implement", "tests"]
+
+ def __init__(self, text, implement, tests, path):
+ self.text = text
+ self.implement = implement
+ self.tests = tests
+ self.path = path
+
+def find_rules(node, rules):
+ if node.name == "rule":
+ rules.append(Rule(node.text, node.get("@implement"),
+ [ch.text for ch in node if ch.name == "test"],
+ node.path()))
+ if node.name == "doc" and node.get("@name") == "rule":
+ tests = []
+ if node.has("@test"):
+ tests.append(node["@test"])
+ rules.append(Rule(node.text, None, tests, node.path()))
+ for child in node:
+ find_rules(child, rules)
+
+def load_rules(specfile):
+ rules = []
+ find_rules(xmlutil.parse(specfile), rules)
+ return rules
+
+def test_summary():
+ template = """
+ <html><head><title>AMQP Tests</title></head>
+ <body>
+ <table width="80%%" align="center">
+ %s
+ </table>
+ </body>
+ </html>
+ """
+ rows = []
+ for rule in load_rules("amqp.org/specs/amqp7.xml"):
+ if rule.tests:
+ tests = ", ".join(rule.tests)
+ else:
+ tests = "&nbsp;"
+ rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>'
+ '<td><b>Implement:</b> %s</td>'
+ '<td><b>Tests:</b> %s</td></tr>' %
+ (rule.path[len("/root/amqp"):], rule.implement, tests))
+ rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text)
+ rows.append('<tr><td colspan="3">&nbsp;</td></tr>')
+
+ print template % "\n".join(rows)
diff --git a/RC5/python/qpid/testlib.py b/RC5/python/qpid/testlib.py
new file mode 100644
index 0000000000..31f52169ae
--- /dev/null
+++ b/RC5/python/qpid/testlib.py
@@ -0,0 +1,392 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Support library for qpid python tests.
+#
+
+import sys, re, unittest, os, random, logging, traceback
+import qpid.client, qpid.spec, qmf.console
+import Queue
+from fnmatch import fnmatch
+from getopt import getopt, GetoptError
+from qpid.content import Content
+from qpid.message import Message
+
+#0-10 support
+from qpid.connection import Connection
+from qpid.spec010 import load
+from qpid.util import connect, ssl, URL
+
+def findmodules(root):
+ """Find potential python modules under directory root"""
+ found = []
+ for dirpath, subdirs, files in os.walk(root):
+ modpath = dirpath.replace(os.sep, '.')
+ if not re.match(r'\.svn$', dirpath): # Avoid SVN directories
+ for f in files:
+ match = re.match(r'(.+)\.py$', f)
+ if match and f != '__init__.py':
+ found.append('.'.join([modpath, match.group(1)]))
+ return found
+
+def default(value, default):
+ if (value == None): return default
+ else: return value
+
+class TestRunner:
+
+ SPEC_FOLDER = "../specs"
+
+ """Runs unit tests.
+
+ Parses command line arguments, provides utility functions for tests,
+ runs the selected test suite.
+ """
+
+ def _die(self, message = None):
+ if message: print message
+ print """
+run-tests [options] [test*]
+The name of a test is package.module.ClassName.testMethod
+Options:
+ -?/-h/--help : this message
+ -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
+ 0-8 - use the default 0-8 specification.
+ 0-9 - use the default 0-9 specification.
+ 0-10-errata - use the 0-10 specification with qpid errata.
+ -e/--errata <errata.xml> : file containing amqp XML errata
+ -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to
+ -v/--verbose : verbose - lists tests as they are run.
+ -d/--debug : enable debug logging.
+ -i/--ignore <test> : ignore the named test.
+ -I/--ignore-file : file containing patterns to ignore.
+ -S/--skip-self-test : skips the client self tests in the 'tests folder'
+ -F/--spec-folder : folder that contains the specs to be loaded
+ """
+ sys.exit(1)
+
+ def setBroker(self, broker):
+ try:
+ self.url = URL(broker)
+ except ValueError:
+ self._die("'%s' is not a valid broker" % (broker))
+ self.user = default(self.url.user, "guest")
+ self.password = default(self.url.password, "guest")
+ self.host = self.url.host
+ if self.url.scheme == URL.AMQPS:
+ self.ssl = True
+ default_port = 5671
+ else:
+ self.ssl = False
+ default_port = 5672
+ self.port = default(self.url.port, default_port)
+
+ def ignoreFile(self, filename):
+ f = file(filename)
+ for line in f.readlines(): self.ignore.append(line.strip())
+ f.close()
+
+ def use08spec(self):
+ "True if we are running with the old 0-8 spec."
+ # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons.
+ return self.spec.major==8 and self.spec.minor==0
+
+ def use09spec(self):
+ "True if we are running with the 0-9 (non-wip) spec."
+ return self.spec.major==0 and self.spec.minor==9
+
+ def _parseargs(self, args):
+ # Defaults
+ self.setBroker("localhost")
+ self.verbose = 1
+ self.ignore = []
+ self.specfile = "0-8"
+ self.errata = []
+ self.skip_self_test = False
+
+ try:
+ opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:",
+ ["help", "spec", "errata=", "broker=",
+ "verbose", "skip-self-test", "ignore",
+ "ignore-file", "spec-folder"])
+ except GetoptError, e:
+ self._die(str(e))
+ for opt, value in opts:
+ if opt in ("-?", "-h", "--help"): self._die()
+ if opt in ("-s", "--spec"): self.specfile = value
+ if opt in ("-e", "--errata"): self.errata.append(value)
+ if opt in ("-b", "--broker"): self.setBroker(value)
+ if opt in ("-v", "--verbose"): self.verbose = 2
+ if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
+ if opt in ("-i", "--ignore"): self.ignore.append(value)
+ if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
+ if opt in ("-S", "--skip-self-test"): self.skip_self_test = True
+ if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value
+
+ # Abbreviations for default settings.
+ if (self.specfile == "0-10"):
+ self.spec = load(self.get_spec_file("amqp.0-10.xml"))
+ elif (self.specfile == "0-10-errata"):
+ self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml"))
+ else:
+ if (self.specfile == "0-8"):
+ self.specfile = self.get_spec_file("amqp.0-8.xml")
+ elif (self.specfile == "0-9"):
+ self.specfile = self.get_spec_file("amqp.0-9.xml")
+ self.errata.append(self.get_spec_file("amqp-errata.0-9.xml"))
+
+ if (self.specfile == None):
+ self._die("No XML specification provided")
+ print "Using specification from:", self.specfile
+
+ self.spec = qpid.spec.load(self.specfile, *self.errata)
+
+ if len(self.tests) == 0:
+ if not self.skip_self_test:
+ self.tests=findmodules("tests")
+ if self.use08spec() or self.use09spec():
+ self.tests+=findmodules("tests_0-8")
+ elif (self.spec.major == 99 and self.spec.minor == 0):
+ self.tests+=findmodules("tests_0-10_preview")
+ elif (self.spec.major == 0 and self.spec.minor == 10):
+ self.tests+=findmodules("tests_0-10")
+
+ def testSuite(self):
+ class IgnoringTestSuite(unittest.TestSuite):
+ def addTest(self, test):
+ if isinstance(test, unittest.TestCase):
+ for pattern in testrunner.ignore:
+ if fnmatch(test.id(), pattern):
+ return
+ unittest.TestSuite.addTest(self, test)
+
+ # Use our IgnoringTestSuite in the test loader.
+ unittest.TestLoader.suiteClass = IgnoringTestSuite
+ return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
+
+ def run(self, args=sys.argv[1:]):
+ self._parseargs(args)
+ runner = unittest.TextTestRunner(descriptions=False,
+ verbosity=self.verbose)
+ result = runner.run(self.testSuite())
+
+ if (self.ignore):
+ print "======================================="
+ print "NOTE: the following tests were ignored:"
+ for t in self.ignore: print t
+ print "======================================="
+
+ return result.wasSuccessful()
+
+ def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None):
+ """Connect to the broker, returns a qpid.client.Client"""
+ host = host or self.host
+ port = port or self.port
+ spec = spec or self.spec
+ user = user or self.user
+ password = password or self.password
+ client = qpid.client.Client(host, port, spec)
+ if self.use08spec():
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
+ else:
+ client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
+ return client
+
+ def get_spec_file(self, fname):
+ return TestRunner.SPEC_FOLDER + os.sep + fname
+
+# Global instance for tests to call connect.
+testrunner = TestRunner()
+
+
+class TestBase(unittest.TestCase):
+ """Base class for Qpid test cases.
+
+ self.client is automatically connected with channel 1 open before
+ the test methods are run.
+
+ Deletes queues and exchanges after. Tests call
+ self.queue_declare(channel, ...) and self.exchange_declare(chanel,
+ ...) which are wrappers for the Channel functions that note
+ resources to clean up later.
+ """
+
+ def setUp(self):
+ self.queues = []
+ self.exchanges = []
+ self.client = self.connect()
+ self.channel = self.client.channel(1)
+ self.version = (self.client.spec.major, self.client.spec.minor)
+ if self.version == (8, 0) or self.version == (0, 9):
+ self.channel.channel_open()
+ else:
+ self.channel.session_open()
+
+ def tearDown(self):
+ try:
+ for ch, q in self.queues:
+ ch.queue_delete(queue=q)
+ for ch, ex in self.exchanges:
+ ch.exchange_delete(exchange=ex)
+ except:
+ print "Error on tearDown:"
+ print traceback.print_exc()
+
+ if not self.client.closed:
+ self.client.channel(0).connection_close(reply_code=200)
+ else:
+ self.client.close()
+
+ def connect(self, *args, **keys):
+ """Create a new connction, return the Client object"""
+ return testrunner.connect(*args, **keys)
+
+ def queue_declare(self, channel=None, *args, **keys):
+ channel = channel or self.channel
+ reply = channel.queue_declare(*args, **keys)
+ self.queues.append((channel, keys["queue"]))
+ return reply
+
+ def exchange_declare(self, channel=None, ticket=0, exchange='',
+ type='', passive=False, durable=False,
+ auto_delete=False,
+ arguments={}):
+ channel = channel or self.channel
+ reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments)
+ self.exchanges.append((channel,exchange))
+ return reply
+
+ def uniqueString(self):
+ """Generate a unique string, unique for this TestBase instance"""
+ if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+ return "Test Message " + str(self.uniqueCounter)
+
+ def consume(self, queueName):
+ """Consume from named queue returns the Queue object."""
+ if testrunner.use08spec() or testrunner.use09spec():
+ reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+ return self.client.queue(reply.consumer_tag)
+ else:
+ if not "uniqueTag" in dir(self): self.uniqueTag = 1
+ else: self.uniqueTag += 1
+ consumer_tag = "tag" + str(self.uniqueTag)
+ self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
+ self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+ return self.client.queue(consumer_tag)
+
+ def subscribe(self, channel=None, **keys):
+ channel = channel or self.channel
+ consumer_tag = keys["destination"]
+ channel.message_subscribe(**keys)
+ channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+
+ def assertEmpty(self, queue):
+ """Assert that the queue is empty"""
+ try:
+ queue.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Queue.Empty: None # Ignore
+
+ def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
+ """
+ Publish to exchange and assert queue.get() returns the same message.
+ """
+ body = self.uniqueString()
+ if testrunner.use08spec() or testrunner.use09spec():
+ self.channel.basic_publish(
+ exchange=exchange,
+ content=Content(body, properties=properties),
+ routing_key=routing_key)
+ else:
+ self.channel.message_transfer(
+ destination=exchange,
+ content=Content(body, properties={'application_headers':properties,'routing_key':routing_key}))
+ msg = queue.get(timeout=1)
+ if testrunner.use08spec() or testrunner.use09spec():
+ self.assertEqual(body, msg.content.body)
+ if (properties):
+ self.assertEqual(properties, msg.content.properties)
+ else:
+ self.assertEqual(body, msg.content.body)
+ if (properties):
+ self.assertEqual(properties, msg.content['application_headers'])
+
+ def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
+ """
+ Publish a message and consume it, assert it comes back intact.
+ Return the Queue object used to consume.
+ """
+ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
+
+ def assertChannelException(self, expectedCode, message):
+ if self.version == (8, 0) or self.version == (0, 9):
+ if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message))
+ self.assertEqual("channel", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ else:
+ if not isinstance(message, Message): self.fail("expected session_closed method, got %s" % (message))
+ self.assertEqual("session", message.method.klass.name)
+ self.assertEqual("closed", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
+
+
+ def assertConnectionException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message))
+ self.assertEqual("connection", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
+
+class TestBase010(unittest.TestCase):
+ """
+ Base class for Qpid test cases. using the final 0-10 spec
+ """
+
+ def setUp(self):
+ self.conn = self.connect()
+ self.session = self.conn.session("test-session", timeout=10)
+ self.qmf = None
+
+ def startQmf(self):
+ self.qmf = qmf.console.Session()
+ self.qmf_broker = self.qmf.addBroker(str(testrunner.url))
+
+ def connect(self, host=None, port=None):
+ sock = connect(host or testrunner.host, port or testrunner.port)
+ if testrunner.url.scheme == URL.AMQPS:
+ sock = ssl(sock)
+ conn = Connection(sock, testrunner.spec, username=testrunner.user,
+ password=testrunner.password)
+ conn.start(timeout=10)
+ return conn
+
+ def tearDown(self):
+ if not self.session.error(): self.session.close(timeout=10)
+ self.conn.close(timeout=10)
+ if self.qmf:
+ self.qmf.delBroker(self.qmf_broker)
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
diff --git a/RC5/python/qpid/util.py b/RC5/python/qpid/util.py
new file mode 100644
index 0000000000..bb7f5090df
--- /dev/null
+++ b/RC5/python/qpid/util.py
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, socket, time, textwrap, re
+
+ssl = socket.ssl
+
+def connect(host, port):
+ sock = socket.socket()
+ sock.connect((host, port))
+ sock.setblocking(1)
+ # XXX: we could use this on read, but we'd have to put write in a
+ # loop as well
+ # sock.settimeout(1)
+ return sock
+
+def listen(host, port, predicate = lambda: True, bound = lambda: None):
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, port))
+ sock.listen(5)
+ bound()
+ while predicate():
+ s, a = sock.accept()
+ yield s
+
+def mtime(filename):
+ return os.stat(filename).st_mtime
+
+def wait(condition, predicate, timeout=None):
+ condition.acquire()
+ try:
+ passed = 0
+ start = time.time()
+ while not predicate():
+ if timeout is None:
+ condition.wait()
+ elif passed < timeout:
+ condition.wait(timeout - passed)
+ else:
+ return False
+ passed = time.time() - start
+ return True
+ finally:
+ condition.release()
+
+def notify(condition, action=lambda: None):
+ condition.acquire()
+ try:
+ action()
+ condition.notifyAll()
+ finally:
+ condition.release()
+
+def fill(text, indent, heading = None):
+ sub = indent * " "
+ if heading:
+ if not text:
+ return (indent - 2) * " " + heading
+ init = (indent - 2) * " " + heading + " -- "
+ else:
+ init = sub
+ w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub)
+ return w.fill(" ".join(text.split()))
+
+class URL:
+
+ RE = re.compile(r"""
+ # [ <scheme>:// ] [ <user> [ / <password> ] @] <host> [ :<port> ]
+ ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? ([^@:/]+) (?: :([0-9]+))?$
+""", re.X)
+
+ AMQPS = "amqps"
+ AMQP = "amqp"
+
+ def __init__(self, s):
+ match = URL.RE.match(s)
+ if match is None:
+ raise ValueError(s)
+ self.scheme, self.user, self.password, self.host, port = match.groups()
+ if port is None:
+ self.port = None
+ else:
+ self.port = int(port)
+
+ def __repr__(self):
+ return "URL(%r)" % str(self)
+
+ def __str__(self):
+ s = ""
+ if self.scheme:
+ s += "%s://" % self.scheme
+ if self.user:
+ s += self.user
+ if self.password:
+ s += "/%s" % self.password
+ s += "@"
+ s += self.host
+ if self.port:
+ s += ":%s" % self.port
+ return s