diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-12-19 19:34:45 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-12-19 19:34:45 +0000 |
| commit | 38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac (patch) | |
| tree | 3599403c0c9690898f1e336c009a5564c587c732 /RC5/python/qpid | |
| parent | a8960649bcd365ef70a5de7812f5910222388a6d (diff) | |
| download | qpid-python-38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac.tar.gz | |
Tagging RC5 for M4 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@728121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC5/python/qpid')
30 files changed, 7602 insertions, 0 deletions
diff --git a/RC5/python/qpid/__init__.py b/RC5/python/qpid/__init__.py new file mode 100644 index 0000000000..ff9cc04df8 --- /dev/null +++ b/RC5/python/qpid/__init__.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import spec, codec, connection, content, peer, delegate, client + +class Struct: + + def __init__(self, type, *args, **kwargs): + self.__dict__["type"] = type + self.__dict__["_values"] = {} + + if len(args) > len(self.type.fields): + raise TypeError("too many args") + + for a, f in zip(args, self.type.fields): + self.set(f.name, a) + + for k, a in kwargs.items(): + self.set(k, a) + + def _check(self, attr): + field = self.type.fields.byname.get(attr) + if field == None: + raise AttributeError(attr) + return field + + def exists(self, attr): + return self.type.fields.byname.has_key(attr) + + def has(self, attr): + self._check(attr) + return self._values.has_key(attr) + + def set(self, attr, value): + self._check(attr) + self._values[attr] = value + + def get(self, attr): + field = self._check(attr) + return self._values.get(attr, field.default()) + + def clear(self, attr): + self._check(attr) + del self._values[attr] + + def __setattr__(self, attr, value): + self.set(attr, value) + + def __getattr__(self, attr): + return self.get(attr) + + def __delattr__(self, attr): + self.clear(attr) + + def __setitem__(self, attr, value): + self.set(attr, value) + + def __getitem__(self, attr): + return self.get(attr) + + def __delitem__(self, attr): + self.clear(attr) + + def __str__(self): + return "%s %s" % (self.type, self._values) + + def __repr__(self): + return str(self) diff --git a/RC5/python/qpid/assembler.py b/RC5/python/qpid/assembler.py new file mode 100644 index 0000000000..92bb0aa0f8 --- /dev/null +++ b/RC5/python/qpid/assembler.py @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from codec010 import StringCodec +from framer import * +from logging import getLogger + +log = getLogger("qpid.io.seg") + +class Segment: + + def __init__(self, first, last, type, track, channel, payload): + self.id = None + self.offset = None + self.first = first + self.last = last + self.type = type + self.track = track + self.channel = channel + self.payload = payload + + def decode(self, spec): + segs = spec["segment_type"] + choice = segs.choices[self.type] + return getattr(self, "decode_%s" % choice.name)(spec) + + def decode_control(self, spec): + sc = StringCodec(spec, self.payload) + return sc.read_control() + + def decode_command(self, spec): + sc = StringCodec(spec, self.payload) + hdr, cmd = sc.read_command() + cmd.id = self.id + return hdr, cmd + + def decode_header(self, spec): + sc = StringCodec(spec, self.payload) + values = [] + while len(sc.encoded) > 0: + values.append(sc.read_struct32()) + return values + + def decode_body(self, spec): + return self.payload + + def __str__(self): + return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type, + self.track, self.channel, self.payload) + + def __repr__(self): + return str(self) + +class Assembler(Framer): + + def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD): + Framer.__init__(self, sock) + self.max_payload = max_payload + self.fragments = {} + + def read_segment(self): + while True: + frame = self.read_frame() + + key = (frame.channel, frame.track) + seg = self.fragments.get(key) + if seg == None: + seg = Segment(frame.isFirstSegment(), frame.isLastSegment(), + frame.type, frame.track, frame.channel, "") + self.fragments[key] = seg + + seg.payload += frame.payload + + if frame.isLastFrame(): + self.fragments.pop(key) + log.debug("RECV %s", seg) + return seg + + def write_segment(self, segment): + remaining = segment.payload + + first = True + while first or remaining: + payload = remaining[:self.max_payload] + remaining = remaining[self.max_payload:] + + flags = 0 + if first: + flags |= FIRST_FRM + first = False + if not remaining: + flags |= LAST_FRM + if segment.first: + flags |= FIRST_SEG + if segment.last: + flags |= LAST_SEG + + frame = Frame(flags, segment.type, segment.track, segment.channel, + payload) + self.write_frame(frame) + + log.debug("SENT %s", segment) diff --git a/RC5/python/qpid/client.py b/RC5/python/qpid/client.py new file mode 100644 index 0000000000..4605710de8 --- /dev/null +++ b/RC5/python/qpid/client.py @@ -0,0 +1,225 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +An AQMP client implementation that uses a custom delegate for +interacting with the server. +""" + +import os, threading +from peer import Peer, Channel, Closed +from delegate import Delegate +from connection08 import Connection, Frame, connect +from spec import load +from queue import Queue +from reference import ReferenceId, References + + +class Client: + + def __init__(self, host, port, spec = None, vhost = None): + self.host = host + self.port = port + if spec: + self.spec = spec + else: + try: + name = os.environ["AMQP_SPEC"] + except KeyError: + raise EnvironmentError("environment variable AMQP_SPEC must be set") + self.spec = load(name) + self.structs = StructFactory(self.spec) + self.sessions = {} + + self.mechanism = None + self.response = None + self.locale = None + + self.vhost = vhost + if self.vhost == None: + self.vhost = "/" + + self.queues = {} + self.lock = threading.Lock() + + self.closed = False + self.reason = None + self.started = threading.Event() + + def wait(self): + self.started.wait() + if self.closed: + raise Closed(self.reason) + + def queue(self, key): + self.lock.acquire() + try: + try: + q = self.queues[key] + except KeyError: + q = Queue(0) + self.queues[key] = q + finally: + self.lock.release() + return q + + def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None): + self.mechanism = mechanism + self.response = response + self.locale = locale + self.tune_params = tune_params + + self.socket = connect(self.host, self.port) + self.conn = Connection(self.socket, self.spec) + self.peer = Peer(self.conn, ClientDelegate(self), Session) + + self.conn.init() + self.peer.start() + self.wait() + self.channel(0).connection_open(self.vhost) + + def channel(self, id): + self.lock.acquire() + try: + ssn = self.peer.channel(id) + ssn.client = self + self.sessions[id] = ssn + finally: + self.lock.release() + return ssn + + def session(self): + self.lock.acquire() + try: + id = None + for i in xrange(1, 64*1024): + if not self.sessions.has_key(id): + id = i + break + finally: + self.lock.release() + if id == None: + raise RuntimeError("out of channels") + else: + return self.channel(id) + + def close(self): + self.socket.close() + +class ClientDelegate(Delegate): + + def __init__(self, client): + Delegate.__init__(self) + self.client = client + + def connection_start(self, ch, msg): + msg.start_ok(mechanism=self.client.mechanism, + response=self.client.response, + locale=self.client.locale) + + def connection_tune(self, ch, msg): + if self.client.tune_params: + #todo: just override the params, i.e. don't require them + # all to be included in tune_params + msg.tune_ok(**self.client.tune_params) + else: + msg.tune_ok(*msg.frame.args) + self.client.started.set() + + def message_transfer(self, ch, msg): + self.client.queue(msg.destination).put(msg) + + def message_open(self, ch, msg): + ch.references.open(msg.reference) + + def message_close(self, ch, msg): + ch.references.close(msg.reference) + + def message_append(self, ch, msg): + ch.references.get(msg.reference).append(msg.bytes) + + def message_acquired(self, ch, msg): + ch.control_queue.put(msg) + + def basic_deliver(self, ch, msg): + self.client.queue(msg.consumer_tag).put(msg) + + def channel_pong(self, ch, msg): + msg.ok() + + def channel_close(self, ch, msg): + ch.closed(msg) + + def session_ack(self, ch, msg): + pass + + def session_closed(self, ch, msg): + ch.closed(msg) + + def connection_close(self, ch, msg): + self.client.peer.closed(msg) + + def execution_complete(self, ch, msg): + ch.completion.complete(msg.cumulative_execution_mark) + + def execution_result(self, ch, msg): + future = ch.futures[msg.command_id] + future.put_response(ch, msg.data) + + def closed(self, reason): + self.client.closed = True + self.client.reason = reason + self.client.started.set() + +class StructFactory: + + def __init__(self, spec): + self.spec = spec + self.factories = {} + + def __getattr__(self, name): + if self.factories.has_key(name): + return self.factories[name] + elif self.spec.domains.byname.has_key(name): + f = lambda *args, **kwargs: self.struct(name, *args, **kwargs) + self.factories[name] = f + return f + else: + raise AttributeError(name) + + def struct(self, name, *args, **kwargs): + return self.spec.struct(name, *args, **kwargs) + +class Session(Channel): + + def __init__(self, *args): + Channel.__init__(self, *args) + self.references = References() + self.client = None + + def open(self): + self.session_open() + + def close(self): + self.session_close() + self.client.lock.acquire() + try: + del self.client.sessions[self.id] + finally: + self.client.lock.release() diff --git a/RC5/python/qpid/codec.py b/RC5/python/qpid/codec.py new file mode 100644 index 0000000000..8026b209dc --- /dev/null +++ b/RC5/python/qpid/codec.py @@ -0,0 +1,590 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Utility code to translate between python objects and AMQP encoded data +fields. + +The unit test for this module is located in tests/codec.py +""" + +import re, qpid, spec08 +from cStringIO import StringIO +from struct import * +from reference import ReferenceId + +class EOF(Exception): + pass + +TYPE_ALIASES = { + "long_string": "longstr", + "unsigned_int": "long" + } + +class Codec: + + """ + class that handles encoding/decoding of AMQP primitives + """ + + def __init__(self, stream, spec): + """ + initializing the stream/fields used + """ + self.stream = stream + self.spec = spec + self.nwrote = 0 + self.nread = 0 + self.incoming_bits = [] + self.outgoing_bits = [] + + self.types = {} + self.codes = {} + self.encodings = { + basestring: "longstr", + int: "long", + long: "long", + None.__class__:"void", + list: "sequence", + tuple: "sequence", + dict: "table" + } + + for constant in self.spec.constants: + if constant.klass == "field-table-type": + type = constant.name.replace("field_table_", "") + self.typecode(constant.id, TYPE_ALIASES.get(type, type)) + + if not self.types: + self.typecode(ord('S'), "longstr") + self.typecode(ord('I'), "long") + + def typecode(self, code, type): + self.types[code] = type + self.codes[type] = code + + def resolve(self, klass): + if self.encodings.has_key(klass): + return self.encodings[klass] + for base in klass.__bases__: + result = self.resolve(base) + if result != None: + return result + + def read(self, n): + """ + reads in 'n' bytes from the stream. Can raise EOF exception + """ + self.clearbits() + data = self.stream.read(n) + if n > 0 and len(data) == 0: + raise EOF() + self.nread += len(data) + return data + + def write(self, s): + """ + writes data 's' to the stream + """ + self.flushbits() + self.stream.write(s) + self.nwrote += len(s) + + def flush(self): + """ + flushes the bits and data present in the stream + """ + self.flushbits() + self.stream.flush() + + def flushbits(self): + """ + flushes the bits(compressed into octets) onto the stream + """ + if len(self.outgoing_bits) > 0: + bytes = [] + index = 0 + for b in self.outgoing_bits: + if index == 0: bytes.append(0) + if b: bytes[-1] |= 1 << index + index = (index + 1) % 8 + del self.outgoing_bits[:] + for byte in bytes: + self.encode_octet(byte) + + def clearbits(self): + if self.incoming_bits: + self.incoming_bits = [] + + def pack(self, fmt, *args): + """ + packs the data 'args' as per the format 'fmt' and writes it to the stream + """ + self.write(pack(fmt, *args)) + + def unpack(self, fmt): + """ + reads data from the stream and unpacks it as per the format 'fmt' + """ + size = calcsize(fmt) + data = self.read(size) + values = unpack(fmt, data) + if len(values) == 1: + return values[0] + else: + return values + + def encode(self, type, value): + """ + calls the appropriate encode function e.g. encode_octet, encode_short etc. + """ + if isinstance(type, spec08.Struct): + self.encode_struct(type, value) + else: + getattr(self, "encode_" + type)(value) + + def decode(self, type): + """ + calls the appropriate decode function e.g. decode_octet, decode_short etc. + """ + if isinstance(type, spec08.Struct): + return self.decode_struct(type) + else: + return getattr(self, "decode_" + type)() + + def encode_bit(self, o): + """ + encodes a bit + """ + if o: + self.outgoing_bits.append(True) + else: + self.outgoing_bits.append(False) + + def decode_bit(self): + """ + decodes a bit + """ + if len(self.incoming_bits) == 0: + bits = self.decode_octet() + for i in range(8): + self.incoming_bits.append(bits >> i & 1 != 0) + return self.incoming_bits.pop(0) + + def encode_octet(self, o): + """ + encodes octet (8 bits) data 'o' in network byte order + """ + + # octet's valid range is [0,255] + if (o < 0 or o > 255): + raise ValueError('Valid range of octet is [0,255]') + + self.pack("!B", int(o)) + + def decode_octet(self): + """ + decodes a octet (8 bits) encoded in network byte order + """ + return self.unpack("!B") + + def encode_short(self, o): + """ + encodes short (16 bits) data 'o' in network byte order + """ + + # short int's valid range is [0,65535] + if (o < 0 or o > 65535): + raise ValueError('Valid range of short int is [0,65535]: %s' % o) + + self.pack("!H", int(o)) + + def decode_short(self): + """ + decodes a short (16 bits) in network byte order + """ + return self.unpack("!H") + + def encode_long(self, o): + """ + encodes long (32 bits) data 'o' in network byte order + """ + + # we need to check both bounds because on 64 bit platforms + # struct.pack won't raise an error if o is too large + if (o < 0 or o > 4294967295): + raise ValueError('Valid range of long int is [0,4294967295]') + + self.pack("!L", int(o)) + + def decode_long(self): + """ + decodes a long (32 bits) in network byte order + """ + return self.unpack("!L") + + def encode_signed_long(self, o): + self.pack("!q", o) + + def decode_signed_long(self): + return self.unpack("!q") + + def encode_signed_int(self, o): + self.pack("!l", o) + + def decode_signed_int(self): + return self.unpack("!l") + + def encode_longlong(self, o): + """ + encodes long long (64 bits) data 'o' in network byte order + """ + self.pack("!Q", o) + + def decode_longlong(self): + """ + decodes a long long (64 bits) in network byte order + """ + return self.unpack("!Q") + + def encode_float(self, o): + self.pack("!f", o) + + def decode_float(self): + return self.unpack("!f") + + def encode_double(self, o): + self.pack("!d", o) + + def decode_double(self): + return self.unpack("!d") + + def encode_bin128(self, b): + for idx in range (0,16): + self.pack("!B", ord (b[idx])) + + def decode_bin128(self): + result = "" + for idx in range (0,16): + result = result + chr (self.unpack("!B")) + return result + + def encode_raw(self, len, b): + for idx in range (0,len): + self.pack("!B", b[idx]) + + def decode_raw(self, len): + result = "" + for idx in range (0,len): + result = result + chr (self.unpack("!B")) + return result + + def enc_str(self, fmt, s): + """ + encodes a string 's' in network byte order as per format 'fmt' + """ + size = len(s) + self.pack(fmt, size) + self.write(s) + + def dec_str(self, fmt): + """ + decodes a string in network byte order as per format 'fmt' + """ + size = self.unpack(fmt) + return self.read(size) + + def encode_shortstr(self, s): + """ + encodes a short string 's' in network byte order + """ + + # short strings are limited to 255 octets + if len(s) > 255: + raise ValueError('Short strings are limited to 255 octets') + + self.enc_str("!B", s) + + def decode_shortstr(self): + """ + decodes a short string in network byte order + """ + return self.dec_str("!B") + + def encode_longstr(self, s): + """ + encodes a long string 's' in network byte order + """ + if isinstance(s, dict): + self.encode_table(s) + else: + self.enc_str("!L", s) + + def decode_longstr(self): + """ + decodes a long string 's' in network byte order + """ + return self.dec_str("!L") + + def encode_table(self, tbl): + """ + encodes a table data structure in network byte order + """ + enc = StringIO() + codec = Codec(enc, self.spec) + if tbl: + for key, value in tbl.items(): + if self.spec.major == 8 and self.spec.minor == 0 and len(key) > 128: + raise ValueError("field table key too long: '%s'" % key) + type = self.resolve(value.__class__) + if type == None: + raise ValueError("no encoding for: " + value.__class__) + codec.encode_shortstr(key) + codec.encode_octet(self.codes[type]) + codec.encode(type, value) + s = enc.getvalue() + self.encode_long(len(s)) + self.write(s) + + def decode_table(self): + """ + decodes a table data structure in network byte order + """ + size = self.decode_long() + start = self.nread + result = {} + while self.nread - start < size: + key = self.decode_shortstr() + code = self.decode_octet() + if self.types.has_key(code): + value = self.decode(self.types[code]) + else: + w = width(code) + if fixed(code): + value = self.read(w) + else: + value = self.read(self.dec_num(w)) + result[key] = value + return result + + def encode_timestamp(self, t): + """ + encodes a timestamp data structure in network byte order + """ + self.encode_longlong(t) + + def decode_timestamp(self): + """ + decodes a timestamp data structure in network byte order + """ + return self.decode_longlong() + + def encode_content(self, s): + """ + encodes a content data structure in network byte order + + content can be passed as a string in which case it is assumed to + be inline data, or as an instance of ReferenceId indicating it is + a reference id + """ + if isinstance(s, ReferenceId): + self.encode_octet(1) + self.encode_longstr(s.id) + else: + self.encode_octet(0) + self.encode_longstr(s) + + def decode_content(self): + """ + decodes a content data structure in network byte order + + return a string for inline data and a ReferenceId instance for + references + """ + type = self.decode_octet() + if type == 0: + return self.decode_longstr() + else: + return ReferenceId(self.decode_longstr()) + + # new domains for 0-10: + + def encode_rfc1982_long(self, s): + self.encode_long(s) + + def decode_rfc1982_long(self): + return self.decode_long() + + def encode_rfc1982_long_set(self, s): + self.encode_short(len(s) * 4) + for i in s: + self.encode_long(i) + + def decode_rfc1982_long_set(self): + count = self.decode_short() / 4 + set = [] + for i in range(0, count): + set.append(self.decode_long()) + return set; + + def encode_uuid(self, s): + self.pack("16s", s) + + def decode_uuid(self): + return self.unpack("16s") + + def enc_num(self, width, n): + if width == 1: + self.encode_octet(n) + elif width == 2: + self.encode_short(n) + elif width == 3: + self.encode_long(n) + else: + raise ValueError("invalid width: %s" % width) + + def dec_num(self, width): + if width == 1: + return self.decode_octet() + elif width == 2: + return self.decode_short() + elif width == 4: + return self.decode_long() + else: + raise ValueError("invalid width: %s" % width) + + def encode_struct(self, type, s): + if type.size: + enc = StringIO() + codec = Codec(enc, self.spec) + codec.encode_struct_body(type, s) + codec.flush() + body = enc.getvalue() + self.enc_num(type.size, len(body)) + self.write(body) + else: + self.encode_struct_body(type, s) + + def decode_struct(self, type): + if type.size: + size = self.dec_num(type.size) + if size == 0: + return None + return self.decode_struct_body(type) + + def encode_struct_body(self, type, s): + reserved = 8*type.pack - len(type.fields) + assert reserved >= 0 + + for f in type.fields: + if s == None: + self.encode_bit(False) + elif f.type == "bit": + self.encode_bit(s.get(f.name)) + else: + self.encode_bit(s.has(f.name)) + + for i in range(reserved): + self.encode_bit(False) + + for f in type.fields: + if f.type != "bit" and s != None and s.has(f.name): + self.encode(f.type, s.get(f.name)) + + self.flush() + + def decode_struct_body(self, type): + reserved = 8*type.pack - len(type.fields) + assert reserved >= 0 + + s = qpid.Struct(type) + + for f in type.fields: + if f.type == "bit": + s.set(f.name, self.decode_bit()) + elif self.decode_bit(): + s.set(f.name, None) + + for i in range(reserved): + if self.decode_bit(): + raise ValueError("expecting reserved flag") + + for f in type.fields: + if f.type != "bit" and s.has(f.name): + s.set(f.name, self.decode(f.type)) + + self.clearbits() + + return s + + def encode_long_struct(self, s): + enc = StringIO() + codec = Codec(enc, self.spec) + type = s.type + codec.encode_short(type.type) + codec.encode_struct_body(type, s) + self.encode_longstr(enc.getvalue()) + + def decode_long_struct(self): + codec = Codec(StringIO(self.decode_longstr()), self.spec) + type = self.spec.structs[codec.decode_short()] + return codec.decode_struct_body(type) + + def decode_array(self): + size = self.decode_long() + code = self.decode_octet() + count = self.decode_long() + result = [] + for i in range(0, count): + if self.types.has_key(code): + value = self.decode(self.types[code]) + else: + w = width(code) + if fixed(code): + value = self.read(w) + else: + value = self.read(self.dec_num(w)) + result.append(value) + return result + +def fixed(code): + return (code >> 6) != 2 + +def width(code): + # decimal + if code >= 192: + decsel = (code >> 4) & 3 + if decsel == 0: + return 5 + elif decsel == 1: + return 9 + elif decsel == 3: + return 0 + else: + raise ValueError(code) + # variable width + elif code < 192 and code >= 128: + lenlen = (code >> 4) & 3 + if lenlen == 3: raise ValueError(code) + return 2 ** lenlen + # fixed width + else: + return (code >> 4) & 7 diff --git a/RC5/python/qpid/codec010.py b/RC5/python/qpid/codec010.py new file mode 100644 index 0000000000..f34025ef17 --- /dev/null +++ b/RC5/python/qpid/codec010.py @@ -0,0 +1,301 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import datetime +from packer import Packer +from datatypes import serial, timestamp, RangedSet, Struct + +class CodecException(Exception): pass + +class Codec(Packer): + + def __init__(self, spec): + self.spec = spec + + def write_void(self, v): + assert v == None + def read_void(self): + return None + + def write_bit(self, b): + if not b: raise ValueError(b) + def read_bit(self): + return True + + def read_uint8(self): + return self.unpack("!B") + def write_uint8(self, n): + return self.pack("!B", n) + + def read_int8(self): + return self.unpack("!b") + def write_int8(self, n): + self.pack("!b", n) + + def read_char(self): + return self.unpack("!c") + def write_char(self, c): + self.pack("!c", c) + + def read_boolean(self): + return self.read_uint8() != 0 + def write_boolean(self, b): + if b: n = 1 + else: n = 0 + self.write_uint8(n) + + + def read_uint16(self): + return self.unpack("!H") + def write_uint16(self, n): + self.pack("!H", n) + + def read_int16(self): + return self.unpack("!h") + def write_int16(self, n): + self.pack("!h", n) + + + def read_uint32(self): + return self.unpack("!L") + def write_uint32(self, n): + self.pack("!L", n) + + def read_int32(self): + return self.unpack("!l") + def write_int32(self, n): + self.pack("!l", n) + + def read_float(self): + return self.unpack("!f") + def write_float(self, f): + self.pack("!f", f) + + def read_sequence_no(self): + return serial(self.read_uint32()) + def write_sequence_no(self, n): + self.write_uint32(n.value) + + + def read_uint64(self): + return self.unpack("!Q") + def write_uint64(self, n): + self.pack("!Q", n) + + def read_int64(self): + return self.unpack("!q") + def write_int64(self, n): + self.pack("!q", n) + + def read_datetime(self): + return timestamp(self.read_uint64()) + def write_datetime(self, t): + if isinstance(t, datetime.datetime): + t = timestamp(t) + self.write_uint64(t) + + def read_double(self): + return self.unpack("!d") + def write_double(self, d): + self.pack("!d", d) + + def read_vbin8(self): + return self.read(self.read_uint8()) + def write_vbin8(self, b): + self.write_uint8(len(b)) + self.write(b) + + def read_str8(self): + return self.read_vbin8().decode("utf8") + def write_str8(self, s): + self.write_vbin8(s.encode("utf8")) + + def read_str16(self): + return self.read_vbin16().decode("utf8") + def write_str16(self, s): + self.write_vbin16(s.encode("utf8")) + + + def read_vbin16(self): + return self.read(self.read_uint16()) + def write_vbin16(self, b): + self.write_uint16(len(b)) + self.write(b) + + def read_sequence_set(self): + result = RangedSet() + size = self.read_uint16() + nranges = size/8 + while nranges > 0: + lower = self.read_sequence_no() + upper = self.read_sequence_no() + result.add(lower, upper) + nranges -= 1 + return result + def write_sequence_set(self, ss): + size = 8*len(ss.ranges) + self.write_uint16(size) + for range in ss.ranges: + self.write_sequence_no(range.lower) + self.write_sequence_no(range.upper) + + def read_vbin32(self): + return self.read(self.read_uint32()) + def write_vbin32(self, b): + self.write_uint32(len(b)) + self.write(b) + + def write_map(self, m): + sc = StringCodec(self.spec) + if m is not None: + sc.write_uint32(len(m)) + for k, v in m.items(): + type = self.spec.encoding(v.__class__) + if type == None: + raise CodecException("no encoding for %s" % v.__class__) + sc.write_str8(k) + sc.write_uint8(type.code) + type.encode(sc, v) + self.write_vbin32(sc.encoded) + def read_map(self): + sc = StringCodec(self.spec, self.read_vbin32()) + if not sc.encoded: + return None + count = sc.read_uint32() + result = {} + while sc.encoded: + k = sc.read_str8() + code = sc.read_uint8() + type = self.spec.types[code] + v = type.decode(sc) + result[k] = v + return result + + def write_array(self, a): + sc = StringCodec(self.spec) + if a is not None: + if len(a) > 0: + type = self.spec.encoding(a[0].__class__) + else: + type = self.spec.encoding(None.__class__) + sc.write_uint8(type.code) + sc.write_uint32(len(a)) + for o in a: + type.encode(sc, o) + self.write_vbin32(sc.encoded) + def read_array(self): + sc = StringCodec(self.spec, self.read_vbin32()) + if not sc.encoded: + return None + type = self.spec.types[sc.read_uint8()] + count = sc.read_uint32() + result = [] + while count > 0: + result.append(type.decode(sc)) + count -= 1 + return result + + def write_list(self, l): + sc = StringCodec(self.spec) + if l is not None: + sc.write_uint32(len(l)) + for o in l: + type = self.spec.encoding(o.__class__) + sc.write_uint8(type.code) + type.encode(sc, o) + self.write_vbin32(sc.encoded) + def read_list(self): + sc = StringCodec(self.spec, self.read_vbin32()) + if not sc.encoded: + return None + count = sc.read_uint32() + result = [] + while count > 0: + type = self.spec.types[sc.read_uint8()] + result.append(type.decode(sc)) + count -= 1 + return result + + def read_struct32(self): + size = self.read_uint32() + code = self.read_uint16() + type = self.spec.structs[code] + fields = type.decode_fields(self) + return Struct(type, **fields) + def write_struct32(self, value): + sc = StringCodec(self.spec) + sc.write_uint16(value._type.code) + value._type.encode_fields(sc, value) + self.write_vbin32(sc.encoded) + + def read_control(self): + cntrl = self.spec.controls[self.read_uint16()] + return Struct(cntrl, **cntrl.decode_fields(self)) + def write_control(self, ctrl): + type = ctrl._type + self.write_uint16(type.code) + type.encode_fields(self, ctrl) + + def read_command(self): + type = self.spec.commands[self.read_uint16()] + hdr = self.spec["session.header"].decode(self) + cmd = Struct(type, **type.decode_fields(self)) + return hdr, cmd + def write_command(self, hdr, cmd): + self.write_uint16(cmd._type.code) + hdr._type.encode(self, hdr) + cmd._type.encode_fields(self, cmd) + + def read_size(self, width): + if width > 0: + attr = "read_uint%d" % (width*8) + return getattr(self, attr)() + + def write_size(self, width, n): + if width > 0: + attr = "write_uint%d" % (width*8) + getattr(self, attr)(n) + + def read_uuid(self): + return self.unpack("16s") + + def write_uuid(self, s): + self.pack("16s", s) + + def read_bin128(self): + return self.unpack("16s") + + def write_bin128(self, b): + self.pack("16s", b) + + + +class StringCodec(Codec): + + def __init__(self, spec, encoded = ""): + Codec.__init__(self, spec) + self.encoded = encoded + + def write(self, s): + self.encoded += s + + def read(self, n): + result = self.encoded[:n] + self.encoded = self.encoded[n:] + return result diff --git a/RC5/python/qpid/compat.py b/RC5/python/qpid/compat.py new file mode 100644 index 0000000000..26f60fb8aa --- /dev/null +++ b/RC5/python/qpid/compat.py @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +try: + set = set +except NameError: + from sets import Set as set + +try: + from socket import SHUT_RDWR +except ImportError: + SHUT_RDWR = 2 diff --git a/RC5/python/qpid/connection.py b/RC5/python/qpid/connection.py new file mode 100644 index 0000000000..4c9c02822a --- /dev/null +++ b/RC5/python/qpid/connection.py @@ -0,0 +1,218 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import datatypes, session, socket +from threading import Thread, Condition, RLock +from util import wait, notify +from assembler import Assembler, Segment +from codec010 import StringCodec +from session import Session +from invoker import Invoker +from spec010 import Control, Command, load +from spec import default +from exceptions import * +from logging import getLogger +import delegates + +class ChannelBusy(Exception): pass + +class ChannelsBusy(Exception): pass + +class SessionBusy(Exception): pass + +class ConnectionFailed(Exception): pass + +def client(*args, **kwargs): + return delegates.Client(*args, **kwargs) + +def server(*args, **kwargs): + return delegates.Server(*args, **kwargs) + +class SSLWrapper: + + def __init__(self, ssl): + self.ssl = ssl + + def recv(self, n): + return self.ssl.read(n) + + def send(self, s): + return self.ssl.write(s) + +def sslwrap(sock): + if isinstance(sock, socket.SSLType): + return SSLWrapper(sock) + else: + return sock + +class Connection(Assembler): + + def __init__(self, sock, spec=None, delegate=client, **args): + Assembler.__init__(self, sslwrap(sock)) + if spec == None: + spec = load(default()) + self.spec = spec + self.track = self.spec["track"] + + self.lock = RLock() + self.attached = {} + self.sessions = {} + + self.condition = Condition() + self.opened = False + self.failed = False + self.close_code = (None, "connection aborted") + + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) + + self.channel_max = 65535 + + self.delegate = delegate(self, **args) + + def attach(self, name, ch, delegate, force=False): + self.lock.acquire() + try: + ssn = self.attached.get(ch.id) + if ssn is not None: + if ssn.name != name: + raise ChannelBusy(ch, ssn) + else: + ssn = self.sessions.get(name) + if ssn is None: + ssn = Session(name, self.spec, delegate=delegate) + self.sessions[name] = ssn + elif ssn.channel is not None: + if force: + del self.attached[ssn.channel.id] + ssn.channel = None + else: + raise SessionBusy(ssn) + self.attached[ch.id] = ssn + ssn.channel = ch + ch.session = ssn + return ssn + finally: + self.lock.release() + + def detach(self, name, ch): + self.lock.acquire() + try: + self.attached.pop(ch.id, None) + ssn = self.sessions.pop(name, None) + if ssn is not None: + ssn.channel = None + ssn.closed() + return ssn + finally: + self.lock.release() + + def __channel(self): + # XXX: ch 0? + for i in xrange(self.channel_max): + if not self.attached.has_key(i): + return i + else: + raise ChannelsBusy() + + def session(self, name, timeout=None, delegate=session.client): + self.lock.acquire() + try: + ch = Channel(self, self.__channel()) + ssn = self.attach(name, ch, delegate) + ssn.channel.session_attach(name) + if wait(ssn.condition, lambda: ssn.channel is not None, timeout): + return ssn + else: + self.detach(name, ch) + raise Timeout() + finally: + self.lock.release() + + def detach_all(self): + self.lock.acquire() + try: + for ssn in self.attached.values(): + if self.close_code[0] != 200: + ssn.exceptions.append(self.close_code) + self.detach(ssn.name, ssn.channel) + finally: + self.lock.release() + + def start(self, timeout=None): + self.delegate.start() + self.thread.start() + if not wait(self.condition, lambda: self.opened or self.failed, timeout): + raise Timeout() + if self.failed: + raise ConnectionFailed(*self.close_code) + + def run(self): + # XXX: we don't really have a good way to exit this loop without + # getting the other end to kill the socket + while True: + try: + seg = self.read_segment() + except Closed: + self.detach_all() + break + self.delegate.received(seg) + + def close(self, timeout=None): + if not self.opened: return + Channel(self, 0).connection_close(200) + if not wait(self.condition, lambda: not self.opened, timeout): + raise Timeout() + self.thread.join(timeout=timeout) + + def __str__(self): + return "%s:%s" % self.sock.getsockname() + + def __repr__(self): + return str(self) + +log = getLogger("qpid.io.ctl") + +class Channel(Invoker): + + def __init__(self, connection, id): + self.connection = connection + self.id = id + self.session = None + + def resolve_method(self, name): + inst = self.connection.spec.instructions.get(name) + if inst is not None and isinstance(inst, Control): + return self.METHOD, inst + else: + return self.ERROR, None + + def invoke(self, type, args, kwargs): + ctl = type.new(args, kwargs) + sc = StringCodec(self.connection.spec) + sc.write_control(ctl) + self.connection.write_segment(Segment(True, True, type.segment_type, + type.track, self.id, sc.encoded)) + log.debug("SENT %s", ctl) + + def __str__(self): + return "%s[%s]" % (self.connection, self.id) + + def __repr__(self): + return str(self) diff --git a/RC5/python/qpid/connection08.py b/RC5/python/qpid/connection08.py new file mode 100644 index 0000000000..be94a792cb --- /dev/null +++ b/RC5/python/qpid/connection08.py @@ -0,0 +1,493 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A Connection class containing socket code that uses the spec metadata +to read and write Frame objects. This could be used by a client, +server, or even a proxy implementation. +""" + +import socket, codec, logging, qpid +from cStringIO import StringIO +from spec import load +from codec import EOF +from compat import SHUT_RDWR + +class SockIO: + + def __init__(self, sock): + self.sock = sock + + def write(self, buf): +# print "OUT: %r" % buf + self.sock.sendall(buf) + + def read(self, n): + data = "" + while len(data) < n: + try: + s = self.sock.recv(n - len(data)) + except socket.error: + break + if len(s) == 0: + break +# print "IN: %r" % s + data += s + return data + + def flush(self): + pass + + def close(self): + self.sock.shutdown(SHUT_RDWR) + self.sock.close() + +def connect(host, port): + sock = socket.socket() + sock.connect((host, port)) + sock.setblocking(1) + return SockIO(sock) + +def listen(host, port, predicate = lambda: True): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.listen(5) + while predicate(): + s, a = sock.accept() + yield SockIO(s) + +class Connection: + + def __init__(self, io, spec): + self.codec = codec.Codec(io, spec) + self.spec = spec + self.FRAME_END = self.spec.constants.byname["frame_end"].id + self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor)) + self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor)) + + def flush(self): + self.codec.flush() + + INIT="!4s4B" + + def init(self): + self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, + self.spec.minor) + + def tini(self): + self.codec.unpack(Connection.INIT) + + def write_8_0(self, frame): + c = self.codec + c.encode_octet(self.spec.constants.byname[frame.type].id) + c.encode_short(frame.channel) + body = StringIO() + enc = codec.Codec(body, self.spec) + frame.encode(enc) + enc.flush() + c.encode_longstr(body.getvalue()) + c.encode_octet(self.FRAME_END) + + def read_8_0(self): + c = self.codec + type = self.spec.constants.byid[c.decode_octet()].name + channel = c.decode_short() + body = c.decode_longstr() + dec = codec.Codec(StringIO(body), self.spec) + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + frame.channel = channel + end = c.decode_octet() + if end != self.FRAME_END: + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) + return frame + + def write_0_9(self, frame): + self.write_8_0(frame) + + def read_0_9(self): + return self.read_8_0() + + def write_0_10(self, frame): + c = self.codec + flags = 0 + if frame.bof: flags |= 0x08 + if frame.eof: flags |= 0x04 + if frame.bos: flags |= 0x02 + if frame.eos: flags |= 0x01 + + c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1 + c.encode_octet(self.spec.constants.byname[frame.type].id) + body = StringIO() + enc = codec.Codec(body, self.spec) + frame.encode(enc) + enc.flush() + frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size) + c.encode_short(frame_size) + c.encode_octet(0) # Reserved + c.encode_octet(frame.subchannel & 0x0f) + c.encode_short(frame.channel) + c.encode_long(0) # Reserved + c.write(body.getvalue()) + c.encode_octet(self.FRAME_END) + + def read_0_10(self): + c = self.codec + flags = c.decode_octet() # TODO: currently ignoring flags + framing_version = (flags & 0xc0) >> 6 + if framing_version != 0: + raise "frame error: unknown framing version" + type = self.spec.constants.byid[c.decode_octet()].name + frame_size = c.decode_short() + if frame_size < 12: # TODO: Magic number (frame header size) + raise "frame error: frame size too small" + reserved1 = c.decode_octet() + field = c.decode_octet() + subchannel = field & 0x0f + channel = c.decode_short() + reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0 + if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0: + raise "frame error: reserved bits not all zero" + body_size = frame_size - 12 # TODO: Magic number (frame header size) + body = c.read(body_size) + dec = codec.Codec(StringIO(body), self.spec) + try: + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + except EOF: + raise "truncated frame body: %r" % body + frame.channel = channel + frame.subchannel = subchannel + end = c.decode_octet() + if end != self.FRAME_END: + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) + return frame + + def write_99_0(self, frame): + self.write_0_10(frame) + + def read_99_0(self): + return self.read_0_10() + +class Frame: + + DECODERS = {} + + class __metaclass__(type): + + def __new__(cls, name, bases, dict): + for attr in ("encode", "decode", "type"): + if not dict.has_key(attr): + raise TypeError("%s must define %s" % (name, attr)) + dict["decode"] = staticmethod(dict["decode"]) + if dict.has_key("__init__"): + __init__ = dict["__init__"] + def init(self, *args, **kwargs): + args = list(args) + self.init(args, kwargs) + __init__(self, *args, **kwargs) + dict["__init__"] = init + t = type.__new__(cls, name, bases, dict) + if t.type != None: + Frame.DECODERS[t.type] = t + return t + + type = None + + def init(self, args, kwargs): + self.channel = kwargs.pop("channel", 0) + self.subchannel = kwargs.pop("subchannel", 0) + self.bos = True + self.eos = True + self.bof = True + self.eof = True + + def encode(self, enc): abstract + + def decode(spec, dec, size): abstract + +class Method(Frame): + + type = "frame_method" + + def __init__(self, method, args): + if len(args) != len(method.fields): + argspec = ["%s: %s" % (f.name, f.type) + for f in method.fields] + raise TypeError("%s.%s expecting (%s), got %s" % + (method.klass.name, method.name, ", ".join(argspec), + args)) + self.method = method + self.method_type = method + self.args = args + self.eof = not method.content + + def encode(self, c): + version = (c.spec.major, c.spec.minor) + if version == (0, 10) or version == (99, 0): + c.encode_octet(self.method.klass.id) + c.encode_octet(self.method.id) + else: + c.encode_short(self.method.klass.id) + c.encode_short(self.method.id) + for field, arg in zip(self.method.fields, self.args): + c.encode(field.type, arg) + + def decode(spec, c, size): + version = (c.spec.major, c.spec.minor) + if version == (0, 10) or version == (99, 0): + klass = spec.classes.byid[c.decode_octet()] + meth = klass.methods.byid[c.decode_octet()] + else: + klass = spec.classes.byid[c.decode_short()] + meth = klass.methods.byid[c.decode_short()] + args = tuple([c.decode(f.type) for f in meth.fields]) + return Method(meth, args) + + def __str__(self): + return "[%s] %s %s" % (self.channel, self.method, + ", ".join([str(a) for a in self.args])) + +class Request(Frame): + + type = "frame_request" + + def __init__(self, id, response_mark, method): + self.id = id + self.response_mark = response_mark + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.response_mark) + # reserved + enc.encode_long(0) + self.method.encode(enc) + + def decode(spec, dec, size): + id = dec.decode_longlong() + mark = dec.decode_longlong() + # reserved + dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Request(id, mark, method) + + def __str__(self): + return "[%s] Request(%s) %s" % (self.channel, self.id, self.method) + +class Response(Frame): + + type = "frame_response" + + def __init__(self, id, request_id, batch_offset, method): + self.id = id + self.request_id = request_id + self.batch_offset = batch_offset + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.request_id) + enc.encode_long(self.batch_offset) + self.method.encode(enc) + + def decode(spec, dec, size): + id = dec.decode_longlong() + request_id = dec.decode_longlong() + batch_offset = dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Response(id, request_id, batch_offset, method) + + def __str__(self): + return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) + +def uses_struct_encoding(spec): + return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) + +class Header(Frame): + + type = "frame_header" + + def __init__(self, klass, weight, size, properties): + self.klass = klass + self.weight = weight + self.size = size + self.properties = properties + self.eof = size == 0 + self.bof = False + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] + + def encode(self, c): + if uses_struct_encoding(c.spec): + self.encode_structs(c) + else: + self.encode_legacy(c) + + def encode_structs(self, c): + # XXX + structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type), + qpid.Struct(c.spec.domains.byname["message_properties"].type)] + + # XXX + props = self.properties.copy() + for k in self.properties: + for s in structs: + if s.exists(k): + s.set(k, props.pop(k)) + if props: + raise TypeError("no such property: %s" % (", ".join(props))) + + # message properties store the content-length now, and weight is + # deprecated + if self.size != None: + structs[1].content_length = self.size + + for s in structs: + c.encode_long_struct(s) + + def encode_legacy(self, c): + c.encode_short(self.klass.id) + c.encode_short(self.weight) + c.encode_longlong(self.size) + + # property flags + nprops = len(self.klass.fields) + flags = 0 + for i in range(nprops): + f = self.klass.fields.items[i] + flags <<= 1 + if self.properties.get(f.name) != None: + flags |= 1 + # the last bit indicates more flags + if i > 0 and (i % 15) == 0: + flags <<= 1 + if nprops > (i + 1): + flags |= 1 + c.encode_short(flags) + flags = 0 + flags <<= ((16 - (nprops % 15)) % 16) + c.encode_short(flags) + + # properties + for f in self.klass.fields: + v = self.properties.get(f.name) + if v != None: + c.encode(f.type, v) + + def decode(spec, c, size): + if uses_struct_encoding(spec): + return Header.decode_structs(spec, c, size) + else: + return Header.decode_legacy(spec, c, size) + + def decode_structs(spec, c, size): + structs = [] + start = c.nread + while c.nread - start < size: + structs.append(c.decode_long_struct()) + + # XXX + props = {} + length = None + for s in structs: + for f in s.type.fields: + if s.has(f.name): + props[f.name] = s.get(f.name) + if f.name == "content_length": + length = s.get(f.name) + return Header(None, 0, length, props) + + decode_structs = staticmethod(decode_structs) + + def decode_legacy(spec, c, size): + klass = spec.classes.byid[c.decode_short()] + weight = c.decode_short() + size = c.decode_longlong() + + # property flags + bits = [] + while True: + flags = c.decode_short() + for i in range(15, 0, -1): + if flags >> i & 0x1 != 0: + bits.append(True) + else: + bits.append(False) + if flags & 0x1 == 0: + break + + # properties + properties = {} + for b, f in zip(bits, klass.fields): + if b: + # Note: decode returns a unicode u'' string but only + # plain '' strings can be used as keywords so we need to + # stringify the names. + properties[str(f.name)] = c.decode(f.type) + return Header(klass, weight, size, properties) + + decode_legacy = staticmethod(decode_legacy) + + def __str__(self): + return "%s %s %s %s" % (self.klass, self.weight, self.size, + self.properties) + +class Body(Frame): + + type = "frame_body" + + def __init__(self, content): + self.content = content + self.eof = True + self.bof = False + + def encode(self, enc): + enc.write(self.content) + + def decode(spec, dec, size): + return Body(dec.read(size)) + + def __str__(self): + return "Body(%r)" % self.content + +# TODO: +# OOB_METHOD = "frame_oob_method" +# OOB_HEADER = "frame_oob_header" +# OOB_BODY = "frame_oob_body" +# TRACE = "frame_trace" +# HEARTBEAT = "frame_heartbeat" diff --git a/RC5/python/qpid/content.py b/RC5/python/qpid/content.py new file mode 100644 index 0000000000..9391f4f1a8 --- /dev/null +++ b/RC5/python/qpid/content.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A simple python representation for AMQP content. +""" + +def default(val, defval): + if val == None: + return defval + else: + return val + +class Content: + + def __init__(self, body = "", children = None, properties = None): + self.body = body + self.children = default(children, []) + self.properties = default(properties, {}) + + def size(self): + return len(self.body) + + def weight(self): + return len(self.children) + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] + + def __str__(self): + if self.children: + return "%s [%s] %s" % (self.properties, + ", ".join(map(str, self.children)), + self.body) + else: + return "%s %s" % (self.properties, self.body) diff --git a/RC5/python/qpid/datatypes.py b/RC5/python/qpid/datatypes.py new file mode 100644 index 0000000000..eb1f86b0b0 --- /dev/null +++ b/RC5/python/qpid/datatypes.py @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import threading, struct, datetime, time + +class Struct: + + def __init__(self, _type, *args, **kwargs): + if len(args) > len(_type.fields): + raise TypeError("%s() takes at most %s arguments (%s given)" % + (_type.name, len(_type.fields), len(args))) + + self._type = _type + + idx = 0 + for field in _type.fields: + if idx < len(args): + arg = args[idx] + if kwargs.has_key(field.name): + raise TypeError("%s() got multiple values for keyword argument '%s'" % + (_type.name, field.name)) + elif kwargs.has_key(field.name): + arg = kwargs.pop(field.name) + else: + arg = field.default() + setattr(self, field.name, arg) + idx += 1 + + if kwargs: + unexpected = kwargs.keys()[0] + raise TypeError("%s() got an unexpected keyword argument '%s'" % + (_type.name, unexpected)) + + def __getitem__(self, name): + return getattr(self, name) + + def __setitem__(self, name, value): + if not hasattr(self, name): + raise AttributeError("'%s' object has no attribute '%s'" % + (self._type.name, name)) + setattr(self, name, value) + + def __repr__(self): + fields = [] + for f in self._type.fields: + v = self[f.name] + if f.type.is_present(v): + fields.append("%s=%r" % (f.name, v)) + return "%s(%s)" % (self._type.name, ", ".join(fields)) + +class Message: + + def __init__(self, *args): + if args: + self.body = args[-1] + else: + self.body = None + if len(args) > 1: + self.headers = list(args[:-1]) + else: + self.headers = None + self.id = None + + def has(self, name): + return self.get(name) != None + + def get(self, name): + if self.headers: + for h in self.headers: + if h._type.name == name: + return h + return None + + def set(self, header): + if self.headers is None: + self.headers = [] + idx = 0 + while idx < len(self.headers): + if self.headers[idx]._type == header._type: + self.headers[idx] = header + return + idx += 1 + self.headers.append(header) + + def clear(self, name): + idx = 0 + while idx < len(self.headers): + if self.headers[idx]._type.name == name: + del self.headers[idx] + return + idx += 1 + + def __repr__(self): + args = [] + if self.headers: + args.extend(map(repr, self.headers)) + if self.body: + args.append(repr(self.body)) + if self.id is not None: + args.append("id=%s" % self.id) + return "Message(%s)" % ", ".join(args) + +def serial(o): + if isinstance(o, Serial): + return o + else: + return Serial(o) + +class Serial: + + def __init__(self, value): + self.value = value & 0xFFFFFFFF + + def __hash__(self): + return hash(self.value) + + def __cmp__(self, other): + if other is None: + return 1 + + other = serial(other) + + delta = (self.value - other.value) & 0xFFFFFFFF + neg = delta & 0x80000000 + mag = delta & 0x7FFFFFFF + + if neg: + return -mag + else: + return mag + + def __add__(self, other): + return Serial(self.value + other) + + def __sub__(self, other): + return Serial(self.value - other) + + def __repr__(self): + return "serial(%s)" % self.value + + def __str__(self): + return str(self.value) + +class Range: + + def __init__(self, lower, upper = None): + self.lower = serial(lower) + if upper is None: + self.upper = self.lower + else: + self.upper = serial(upper) + + def __contains__(self, n): + return self.lower <= n and n <= self.upper + + def __iter__(self): + i = self.lower + while i <= self.upper: + yield i + i += 1 + + def touches(self, r): + # XXX: are we doing more checks than we need? + return (self.lower - 1 in r or + self.upper + 1 in r or + r.lower - 1 in self or + r.upper + 1 in self or + self.lower in r or + self.upper in r or + r.lower in self or + r.upper in self) + + def span(self, r): + return Range(min(self.lower, r.lower), max(self.upper, r.upper)) + + def intersect(self, r): + lower = max(self.lower, r.lower) + upper = min(self.upper, r.upper) + if lower > upper: + return None + else: + return Range(lower, upper) + + def __repr__(self): + return "%s-%s" % (self.lower, self.upper) + +class RangedSet: + + def __init__(self, *args): + self.ranges = [] + for n in args: + self.add(n) + + def __contains__(self, n): + for r in self.ranges: + if n in r: + return True + return False + + def add_range(self, range): + idx = 0 + while idx < len(self.ranges): + r = self.ranges[idx] + if range.touches(r): + del self.ranges[idx] + range = range.span(r) + elif range.upper < r.lower: + self.ranges.insert(idx, range) + return + else: + idx += 1 + self.ranges.append(range) + + def add(self, lower, upper = None): + self.add_range(Range(lower, upper)) + + def __iter__(self): + return iter(self.ranges) + + def __repr__(self): + return str(self.ranges) + +class Future: + def __init__(self, initial=None, exception=Exception): + self.value = initial + self._error = None + self._set = threading.Event() + self.exception = exception + + def error(self, error): + self._error = error + self._set.set() + + def set(self, value): + self.value = value + self._set.set() + + def get(self, timeout=None): + self._set.wait(timeout) + if self._error != None: + raise self.exception(self._error) + return self.value + + def is_set(self): + return self._set.isSet() + +try: + import uuid + def random_uuid(): + return uuid.uuid4().get_bytes() +except ImportError: + import random + def random_uuid(): + bytes = [random.randint(0, 255) for i in xrange(16)] + + # From RFC4122, the version bits are set to 0100 + bytes[7] &= 0x0F + bytes[7] |= 0x40 + + # From RFC4122, the top two bits of byte 8 get set to 01 + bytes[8] &= 0x3F + bytes[8] |= 0x80 + return "".join(map(chr, bytes)) + +def uuid4(): + return UUID(random_uuid()) + +class UUID: + + def __init__(self, bytes): + self.bytes = bytes + + def __cmp__(self, other): + if isinstance(other, UUID): + return cmp(self.bytes, other.bytes) + raise NotImplemented() + + def __str__(self): + return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes) + + def __repr__(self): + return "UUID(%r)" % str(self) + + def __hash__(self): + return self.bytes.__hash__() + +class timestamp(float): + + def __new__(cls, obj=None): + if obj is None: + obj = time.time() + elif isinstance(obj, datetime.datetime): + obj = time.mktime(obj.timetuple()) + 1e-6 * obj.microsecond + return super(timestamp, cls).__new__(cls, obj) + + def datetime(self): + return datetime.datetime.fromtimestamp(self) + + def __add__(self, other): + if isinstance(other, datetime.timedelta): + return timestamp(self.datetime() + other) + else: + return timestamp(float(self) + other) + + def __sub__(self, other): + if isinstance(other, datetime.timedelta): + return timestamp(self.datetime() - other) + else: + return timestamp(float(self) - other) + + def __radd__(self, other): + if isinstance(other, datetime.timedelta): + return timestamp(self.datetime() + other) + else: + return timestamp(other + float(self)) + + def __rsub__(self, other): + if isinstance(other, datetime.timedelta): + return timestamp(self.datetime() - other) + else: + return timestamp(other - float(self)) + + def __neg__(self): + return timestamp(-float(self)) + + def __pos__(self): + return self + + def __abs__(self): + return timestamp(abs(float(self))) + + def __repr__(self): + return "timestamp(%r)" % float(self) diff --git a/RC5/python/qpid/delegate.py b/RC5/python/qpid/delegate.py new file mode 100644 index 0000000000..b447c4aa29 --- /dev/null +++ b/RC5/python/qpid/delegate.py @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Delegate implementation intended for use with the peer module. +""" + +import threading, inspect, traceback, sys +from connection08 import Method, Request, Response + +def _handler_name(method): + return "%s_%s" % (method.klass.name, method.name) + +class Delegate: + + def __init__(self): + self.handlers = {} + self.invokers = {} + + def __call__(self, channel, frame): + method = frame.method + + try: + handler = self.handlers[method] + except KeyError: + name = _handler_name(method) + handler = getattr(self, name) + self.handlers[method] = handler + + try: + return handler(channel, frame) + except: + print >> sys.stderr, "Error in handler: %s\n\n%s" % \ + (_handler_name(method), traceback.format_exc()) + + def closed(self, reason): + print "Connection closed: %s" % reason diff --git a/RC5/python/qpid/delegates.py b/RC5/python/qpid/delegates.py new file mode 100644 index 0000000000..bf26553dda --- /dev/null +++ b/RC5/python/qpid/delegates.py @@ -0,0 +1,162 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, connection, session +from util import notify +from datatypes import RangedSet +from logging import getLogger + +log = getLogger("qpid.io.ctl") + +class Delegate: + + def __init__(self, connection, delegate=session.client): + self.connection = connection + self.spec = connection.spec + self.delegate = delegate + self.control = self.spec["track.control"].value + + def received(self, seg): + ssn = self.connection.attached.get(seg.channel) + if ssn is None: + ch = connection.Channel(self.connection, seg.channel) + else: + ch = ssn.channel + + if seg.track == self.control: + ctl = seg.decode(self.spec) + log.debug("RECV %s", ctl) + attr = ctl._type.qname.replace(".", "_") + getattr(self, attr)(ch, ctl) + elif ssn is None: + ch.session_detached() + else: + ssn.received(seg) + + def connection_close(self, ch, close): + self.connection.close_code = (close.reply_code, close.reply_text) + ch.connection_close_ok() + self.connection.sock.close() + if not self.connection.opened: + self.connection.failed = True + notify(self.connection.condition) + + def connection_close_ok(self, ch, close_ok): + self.connection.opened = False + notify(self.connection.condition) + + def session_attach(self, ch, a): + try: + self.connection.attach(a.name, ch, self.delegate, a.force) + ch.session_attached(a.name) + except connection.ChannelBusy: + ch.session_detached(a.name) + except connection.SessionBusy: + ch.session_detached(a.name) + + def session_attached(self, ch, a): + notify(ch.session.condition) + + def session_detach(self, ch, d): + #send back the confirmation of detachment before removing the + #channel from the attached set; this avoids needing to hold the + #connection lock during the sending of this control and ensures + #that if the channel is immediately reused for a new session the + #attach request will follow the detached notification. + ch.session_detached(d.name) + ssn = self.connection.detach(d.name, ch) + + def session_detached(self, ch, d): + self.connection.detach(d.name, ch) + + def session_request_timeout(self, ch, rt): + ch.session_timeout(rt.timeout); + + def session_command_point(self, ch, cp): + ssn = ch.session + ssn.receiver.next_id = cp.command_id + ssn.receiver.next_offset = cp.command_offset + + def session_completed(self, ch, cmp): + ch.session.sender.completed(cmp.commands) + if cmp.timely_reply: + ch.session_known_completed(cmp.commands) + notify(ch.session.condition) + + def session_known_completed(self, ch, kn_cmp): + ch.session.receiver.known_completed(kn_cmp.commands) + + def session_flush(self, ch, f): + rcv = ch.session.receiver + if f.expected: + if rcv.next_id == None: + exp = None + else: + exp = RangedSet(rcv.next_id) + ch.session_expected(exp) + if f.confirmed: + ch.session_confirmed(rcv._completed) + if f.completed: + ch.session_completed(rcv._completed) + +class Server(Delegate): + + def start(self): + self.connection.read_header() + self.connection.write_header(self.spec.major, self.spec.minor) + connection.Channel(self.connection, 0).connection_start(mechanisms=["ANONYMOUS"]) + + def connection_start_ok(self, ch, start_ok): + ch.connection_tune(channel_max=65535) + + def connection_tune_ok(self, ch, tune_ok): + pass + + def connection_open(self, ch, open): + self.connection.opened = True + ch.connection_open_ok() + notify(self.connection.condition) + +class Client(Delegate): + + PROPERTIES = {"product": "qpid python client", + "version": "development", + "platform": os.name} + + def __init__(self, connection, username="guest", password="guest", mechanism="PLAIN"): + Delegate.__init__(self, connection) + self.username = username + self.password = password + self.mechanism = mechanism + + def start(self): + self.connection.write_header(self.spec.major, self.spec.minor) + self.connection.read_header() + + def connection_start(self, ch, start): + r = "\0%s\0%s" % (self.username, self.password) + ch.connection_start_ok(client_properties=Client.PROPERTIES, mechanism=self.mechanism, response=r) + + def connection_tune(self, ch, tune): + ch.connection_tune_ok() + ch.connection_open() + + def connection_open_ok(self, ch, open_ok): + self.connection.opened = True + notify(self.connection.condition) diff --git a/RC5/python/qpid/disp.py b/RC5/python/qpid/disp.py new file mode 100644 index 0000000000..e46cb33c60 --- /dev/null +++ b/RC5/python/qpid/disp.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from time import strftime, gmtime + +class Display: + """ Display formatting for QPID Management CLI """ + + def __init__ (self): + self.tableSpacing = 2 + self.tablePrefix = " " + self.timestampFormat = "%X" + + def table (self, title, heads, rows): + """ Print a formatted table with autosized columns """ + print title + if len (rows) == 0: + return + colWidth = [] + col = 0 + line = self.tablePrefix + for head in heads: + width = len (head) + for row in rows: + cellWidth = len (unicode (row[col])) + if cellWidth > width: + width = cellWidth + colWidth.append (width + self.tableSpacing) + line = line + head + if col < len (heads) - 1: + for i in range (colWidth[col] - len (head)): + line = line + " " + col = col + 1 + print line + line = self.tablePrefix + for width in colWidth: + for i in range (width): + line = line + "=" + print line + + for row in rows: + line = self.tablePrefix + col = 0 + for width in colWidth: + line = line + unicode (row[col]) + if col < len (heads) - 1: + for i in range (width - len (unicode (row[col]))): + line = line + " " + col = col + 1 + print line + + def do_setTimeFormat (self, fmt): + """ Select timestamp format """ + if fmt == "long": + self.timestampFormat = "%c" + elif fmt == "short": + self.timestampFormat = "%X" + + def timestamp (self, nsec): + """ Format a nanosecond-since-the-epoch timestamp for printing """ + return strftime (self.timestampFormat, gmtime (nsec / 1000000000)) diff --git a/RC5/python/qpid/exceptions.py b/RC5/python/qpid/exceptions.py new file mode 100644 index 0000000000..7eaaf81ed4 --- /dev/null +++ b/RC5/python/qpid/exceptions.py @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Closed(Exception): pass +class Timeout(Exception): pass diff --git a/RC5/python/qpid/framer.py b/RC5/python/qpid/framer.py new file mode 100644 index 0000000000..f6363b2291 --- /dev/null +++ b/RC5/python/qpid/framer.py @@ -0,0 +1,159 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import struct, socket +from exceptions import Closed +from packer import Packer +from threading import RLock +from logging import getLogger + +raw = getLogger("qpid.io.raw") +frm = getLogger("qpid.io.frm") + +FIRST_SEG = 0x08 +LAST_SEG = 0x04 +FIRST_FRM = 0x02 +LAST_FRM = 0x01 + +class Frame: + + HEADER = "!2BHxBH4x" + MAX_PAYLOAD = 65535 - struct.calcsize(HEADER) + + def __init__(self, flags, type, track, channel, payload): + if len(payload) > Frame.MAX_PAYLOAD: + raise ValueError("max payload size exceeded: %s" % len(payload)) + self.flags = flags + self.type = type + self.track = track + self.channel = channel + self.payload = payload + + def isFirstSegment(self): + return bool(FIRST_SEG & self.flags) + + def isLastSegment(self): + return bool(LAST_SEG & self.flags) + + def isFirstFrame(self): + return bool(FIRST_FRM & self.flags) + + def isLastFrame(self): + return bool(LAST_FRM & self.flags) + + def __str__(self): + return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()), + int(self.isLastSegment()), + int(self.isFirstFrame()), + int(self.isLastFrame()), + self.type, + self.track, + self.channel, + self.payload) + +class FramingError(Exception): pass + +class Framer(Packer): + + HEADER="!4s4B" + + def __init__(self, sock): + self.sock = sock + self.sock_lock = RLock() + self._buf = "" + + def aborted(self): + return False + + def write(self, buf): + self._buf += buf + + def flush(self): + self.sock_lock.acquire() + try: + self._write(self._buf) + self._buf = "" + frm.debug("FLUSHED") + finally: + self.sock_lock.release() + + def _write(self, buf): + while buf: + try: + n = self.sock.send(buf) + except socket.timeout: + if self.aborted(): + raise Closed() + else: + continue + raw.debug("SENT %r", buf[:n]) + buf = buf[n:] + + def read(self, n): + data = "" + while len(data) < n: + try: + s = self.sock.recv(n - len(data)) + except socket.timeout: + if self.aborted(): + raise Closed() + else: + continue + except socket.error, e: + if data != "": + raise e + else: + raise Closed() + if len(s) == 0: + raise Closed() + data += s + raw.debug("RECV %r", s) + return data + + def read_header(self): + return self.unpack(Framer.HEADER) + + def write_header(self, major, minor): + self.sock_lock.acquire() + try: + self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor) + self.flush() + finally: + self.sock_lock.release() + + def write_frame(self, frame): + self.sock_lock.acquire() + try: + size = len(frame.payload) + struct.calcsize(Frame.HEADER) + track = frame.track & 0x0F + self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel) + self.write(frame.payload) + if frame.isLastSegment() and frame.isLastFrame(): + self.flush() + frm.debug("SENT %s", frame) + finally: + self.sock_lock.release() + + def read_frame(self): + flags, type, size, track, channel = self.unpack(Frame.HEADER) + if flags & 0xF0: raise FramingError() + payload = self.read(size - struct.calcsize(Frame.HEADER)) + frame = Frame(flags, type, track, channel, payload) + frm.debug("RECV %s", frame) + return frame diff --git a/RC5/python/qpid/invoker.py b/RC5/python/qpid/invoker.py new file mode 100644 index 0000000000..635f3ee769 --- /dev/null +++ b/RC5/python/qpid/invoker.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys + +# TODO: need a better naming for this class now that it does the value +# stuff +class Invoker: + + def METHOD(self, name, resolved): + method = lambda *args, **kwargs: self.invoke(resolved, args, kwargs) + if sys.version_info[:2] > (2, 3): + method.__name__ = resolved.pyname + method.__doc__ = resolved.pydoc + method.__module__ = self.__class__.__module__ + self.__dict__[name] = method + return method + + def VALUE(self, name, resolved): + self.__dict__[name] = resolved + return resolved + + def ERROR(self, name, resolved): + raise AttributeError("%s instance has no attribute '%s'" % + (self.__class__.__name__, name)) + + def resolve_method(self, name): + return ERROR, None + + def __getattr__(self, name): + disp, resolved = self.resolve_method(name) + return disp(name, resolved) diff --git a/RC5/python/qpid/log.py b/RC5/python/qpid/log.py new file mode 100644 index 0000000000..1fd7d74136 --- /dev/null +++ b/RC5/python/qpid/log.py @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from logging import getLogger, StreamHandler, Formatter +from logging import DEBUG, INFO, WARN, ERROR, CRITICAL + +def enable(name=None, level=WARN, file=None): + log = getLogger(name) + handler = StreamHandler(file) + handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s")) + log.addHandler(handler) + log.setLevel(level) diff --git a/RC5/python/qpid/management.py b/RC5/python/qpid/management.py new file mode 100644 index 0000000000..477f3e8f2b --- /dev/null +++ b/RC5/python/qpid/management.py @@ -0,0 +1,913 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +############################################################################### +## This file is being obsoleted by qmf/console.py +############################################################################### + +""" +Management API for Qpid +""" + +import qpid +import struct +import socket +from threading import Thread +from datatypes import Message, RangedSet +from time import time +from cStringIO import StringIO +from codec010 import StringCodec as Codec +from threading import Lock, Condition + + +class SequenceManager: + """ Manage sequence numbers for asynchronous method calls """ + def __init__ (self): + self.lock = Lock () + self.sequence = 0 + self.pending = {} + + def reserve (self, data): + """ Reserve a unique sequence number """ + self.lock.acquire () + result = self.sequence + self.sequence = self.sequence + 1 + self.pending[result] = data + self.lock.release () + return result + + def release (self, seq): + """ Release a reserved sequence number """ + data = None + self.lock.acquire () + if seq in self.pending: + data = self.pending[seq] + del self.pending[seq] + self.lock.release () + return data + + +class mgmtObject (object): + """ Generic object that holds the contents of a management object with its + attributes set as object attributes. """ + + def __init__ (self, classKey, timestamps, row): + self.classKey = classKey + self.timestamps = timestamps + for cell in row: + setattr (self, cell[0], cell[1]) + +class objectId(object): + """ Object that represents QMF object identifiers """ + + def __init__(self, codec, first=0, second=0): + if codec: + self.first = codec.read_uint64() + self.second = codec.read_uint64() + else: + self.first = first + self.second = second + + def __cmp__(self, other): + if other == None: + return 1 + if self.first < other.first: + return -1 + if self.first > other.first: + return 1 + if self.second < other.second: + return -1 + if self.second > other.second: + return 1 + return 0 + + + def index(self): + return (self.first, self.second) + + def getFlags(self): + return (self.first & 0xF000000000000000) >> 60 + + def getSequence(self): + return (self.first & 0x0FFF000000000000) >> 48 + + def getBroker(self): + return (self.first & 0x0000FFFFF0000000) >> 28 + + def getBank(self): + return self.first & 0x000000000FFFFFFF + + def getObject(self): + return self.second + + def isDurable(self): + return self.getSequence() == 0 + + def encode(self, codec): + codec.write_uint64(self.first) + codec.write_uint64(self.second) + + +class methodResult: + """ Object that contains the result of a method call """ + + def __init__ (self, status, sText, args): + self.status = status + self.statusText = sText + for arg in args: + setattr (self, arg, args[arg]) + +class brokerInfo: + """ Object that contains information about a broker and the session to it """ + + def __init__ (self, brokerId, sessionId): + self.brokerId = brokerId + self.sessionId = sessionId + +class managementChannel: + """ This class represents a connection to an AMQP broker. """ + + def __init__ (self, ssn, topicCb, replyCb, exceptionCb, cbContext, _detlife=0): + """ Given a channel on an established AMQP broker connection, this method + opens a session and performs all of the declarations and bindings needed + to participate in the management protocol. """ + self.enabled = True + self.ssn = ssn + self.sessionId = ssn.name + self.topicName = "mgmt-%s" % self.sessionId + self.replyName = "repl-%s" % self.sessionId + self.qpidChannel = ssn + self.tcb = topicCb + self.rcb = replyCb + self.ecb = exceptionCb + self.context = cbContext + self.reqsOutstanding = 0 + self.brokerInfo = None + + ssn.auto_sync = False + ssn.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True) + ssn.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True) + + ssn.exchange_bind (exchange="amq.direct", + queue=self.replyName, binding_key=self.replyName) + ssn.message_subscribe (queue=self.topicName, destination="tdest", + accept_mode=ssn.accept_mode.none, + acquire_mode=ssn.acquire_mode.pre_acquired) + ssn.message_subscribe (queue=self.replyName, destination="rdest", + accept_mode=ssn.accept_mode.none, + acquire_mode=ssn.acquire_mode.pre_acquired) + + ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb) + ssn.incoming ("rdest").listen (self.replyCb) + + ssn.message_set_flow_mode (destination="tdest", flow_mode=1) + ssn.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF) + ssn.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF) + + ssn.message_set_flow_mode (destination="rdest", flow_mode=1) + ssn.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) + ssn.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) + + def setBrokerInfo (self, data): + self.brokerInfo = data + + def shutdown (self): + self.enabled = False + self.ssn.incoming("tdest").stop() + self.ssn.incoming("rdest").stop() + + def topicCb (self, msg): + """ Receive messages via the topic queue on this channel. """ + if self.enabled: + self.tcb (self, msg) + + def replyCb (self, msg): + """ Receive messages via the reply queue on this channel. """ + if self.enabled: + self.rcb (self, msg) + + def exceptionCb (self, data): + if self.ecb != None: + self.ecb (self, data) + + def send (self, exchange, msg): + if self.enabled: + self.qpidChannel.message_transfer (destination=exchange, message=msg) + + def message (self, body, routing_key="broker"): + dp = self.qpidChannel.delivery_properties() + dp.routing_key = routing_key + mp = self.qpidChannel.message_properties() + mp.content_type = "application/octet-stream" + mp.reply_to = self.qpidChannel.reply_to("amq.direct", self.replyName) + return Message(dp, mp, body) + + +class managementClient: + """ This class provides an API for access to management data on the AMQP + network. It implements the management protocol and manages the management + schemas as advertised by the various management agents in the network. """ + + CTRL_BROKER_INFO = 1 + CTRL_SCHEMA_LOADED = 2 + CTRL_USER = 3 + CTRL_HEARTBEAT = 4 + + SYNC_TIME = 10.0 + + #======================================================== + # User API - interacts with the class's user + #======================================================== + def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, methodCb=None, closeCb=None): + self.spec = amqpSpec + self.ctrlCb = ctrlCb + self.configCb = configCb + self.instCb = instCb + self.methodCb = methodCb + self.closeCb = closeCb + self.schemaCb = None + self.eventCb = None + self.channels = [] + self.seqMgr = SequenceManager () + self.schema = {} + self.packages = {} + self.cv = Condition () + self.syncInFlight = False + self.syncSequence = 0 + self.syncResult = None + + def schemaListener (self, schemaCb): + """ Optionally register a callback to receive details of the schema of + managed objects in the network. """ + self.schemaCb = schemaCb + + def eventListener (self, eventCb): + """ Optionally register a callback to receive events from managed objects + in the network. """ + self.eventCb = eventCb + + def addChannel (self, channel, cbContext=None): + """ Register a new channel. """ + mch = managementChannel (channel, self.topicCb, self.replyCb, self.exceptCb, cbContext) + + self.channels.append (mch) + self.incOutstanding (mch) + codec = Codec (self.spec) + self.setHeader (codec, ord ('B')) + msg = mch.message(codec.encoded) + mch.send ("qpid.management", msg) + return mch + + def removeChannel (self, mch): + """ Remove a previously added channel from management. """ + mch.shutdown () + self.channels.remove (mch) + + def callMethod (self, channel, userSequence, objId, className, methodName, args=None): + """ Invoke a method on a managed object. """ + self.method (channel, userSequence, objId, className, methodName, args) + + def getObjects (self, channel, userSequence, className, bank=0): + """ Request immediate content from broker """ + codec = Codec (self.spec) + self.setHeader (codec, ord ('G'), userSequence) + ft = {} + ft["_class"] = className + codec.write_map (ft) + msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank) + channel.send ("qpid.management", msg) + + def syncWaitForStable (self, channel): + """ Synchronous (blocking) call to wait for schema stability on a channel """ + self.cv.acquire () + if channel.reqsOutstanding == 0: + self.cv.release () + return channel.brokerInfo + + self.syncInFlight = True + starttime = time () + while channel.reqsOutstanding != 0: + self.cv.wait (self.SYNC_TIME) + if time () - starttime > self.SYNC_TIME: + self.cv.release () + raise RuntimeError ("Timed out waiting for response on channel") + self.cv.release () + return channel.brokerInfo + + def syncCallMethod (self, channel, objId, className, methodName, args=None): + """ Synchronous (blocking) method call """ + self.cv.acquire () + self.syncInFlight = True + self.syncResult = None + self.syncSequence = self.seqMgr.reserve ("sync") + self.cv.release () + self.callMethod (channel, self.syncSequence, objId, className, methodName, args) + self.cv.acquire () + starttime = time () + while self.syncInFlight: + self.cv.wait (self.SYNC_TIME) + if time () - starttime > self.SYNC_TIME: + self.cv.release () + raise RuntimeError ("Timed out waiting for response on channel") + result = self.syncResult + self.cv.release () + return result + + def syncGetObjects (self, channel, className, bank=0): + """ Synchronous (blocking) get call """ + self.cv.acquire () + self.syncInFlight = True + self.syncResult = [] + self.syncSequence = self.seqMgr.reserve ("sync") + self.cv.release () + self.getObjects (channel, self.syncSequence, className, bank) + self.cv.acquire () + starttime = time () + while self.syncInFlight: + self.cv.wait (self.SYNC_TIME) + if time () - starttime > self.SYNC_TIME: + self.cv.release () + raise RuntimeError ("Timed out waiting for response on channel") + result = self.syncResult + self.cv.release () + return result + + #======================================================== + # Channel API - interacts with registered channel objects + #======================================================== + def topicCb (self, ch, msg): + """ Receive messages via the topic queue of a particular channel. """ + codec = Codec (self.spec, msg.body) + while True: + hdr = self.checkHeader (codec) + if hdr == None: + return + + if hdr[0] == 'p': + self.handlePackageInd (ch, codec) + elif hdr[0] == 'q': + self.handleClassInd (ch, codec) + elif hdr[0] == 'h': + self.handleHeartbeat (ch, codec) + elif hdr[0] == 'e': + self.handleEvent (ch, codec) + else: + self.parse (ch, codec, hdr[0], hdr[1]) + + def replyCb (self, ch, msg): + """ Receive messages via the reply queue of a particular channel. """ + codec = Codec (self.spec, msg.body) + hdr = self.checkHeader (codec) + if hdr == None: + return + + if hdr[0] == 'm': + self.handleMethodReply (ch, codec, hdr[1]) + elif hdr[0] == 'z': + self.handleCommandComplete (ch, codec, hdr[1]) + elif hdr[0] == 'b': + self.handleBrokerResponse (ch, codec) + elif hdr[0] == 'p': + self.handlePackageInd (ch, codec) + elif hdr[0] == 'q': + self.handleClassInd (ch, codec) + else: + self.parse (ch, codec, hdr[0], hdr[1]) + + def exceptCb (self, ch, data): + if self.closeCb != None: + self.closeCb (ch.context, data) + + #======================================================== + # Internal Functions + #======================================================== + def setHeader (self, codec, opcode, seq = 0): + """ Compose the header of a management message. """ + codec.write_uint8 (ord ('A')) + codec.write_uint8 (ord ('M')) + codec.write_uint8 (ord ('2')) + codec.write_uint8 (opcode) + codec.write_uint32 (seq) + + def checkHeader (self, codec): + """ Check the header of a management message and extract the opcode and class. """ + try: + octet = chr (codec.read_uint8 ()) + if octet != 'A': + return None + octet = chr (codec.read_uint8 ()) + if octet != 'M': + return None + octet = chr (codec.read_uint8 ()) + if octet != '2': + return None + opcode = chr (codec.read_uint8 ()) + seq = codec.read_uint32 () + return (opcode, seq) + except: + return None + + def encodeValue (self, codec, value, typecode): + """ Encode, into the codec, a value based on its typecode. """ + if typecode == 1: + codec.write_uint8 (int (value)) + elif typecode == 2: + codec.write_uint16 (int (value)) + elif typecode == 3: + codec.write_uint32 (long (value)) + elif typecode == 4: + codec.write_uint64 (long (value)) + elif typecode == 5: + codec.write_uint8 (int (value)) + elif typecode == 6: + codec.write_str8 (value) + elif typecode == 7: + codec.write_str16 (value) + elif typecode == 8: # ABSTIME + codec.write_uint64 (long (value)) + elif typecode == 9: # DELTATIME + codec.write_uint64 (long (value)) + elif typecode == 10: # REF + value.encode(codec) + elif typecode == 11: # BOOL + codec.write_uint8 (int (value)) + elif typecode == 12: # FLOAT + codec.write_float (float (value)) + elif typecode == 13: # DOUBLE + codec.write_double (float (value)) + elif typecode == 14: # UUID + codec.write_uuid (value) + elif typecode == 15: # FTABLE + codec.write_map (value) + elif typecode == 16: + codec.write_int8 (int(value)) + elif typecode == 17: + codec.write_int16 (int(value)) + elif typecode == 18: + codec.write_int32 (int(value)) + elif typecode == 19: + codec.write_int64 (int(value)) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def decodeValue (self, codec, typecode): + """ Decode, from the codec, a value based on its typecode. """ + if typecode == 1: + data = codec.read_uint8 () + elif typecode == 2: + data = codec.read_uint16 () + elif typecode == 3: + data = codec.read_uint32 () + elif typecode == 4: + data = codec.read_uint64 () + elif typecode == 5: + data = codec.read_uint8 () + elif typecode == 6: + data = codec.read_str8 () + elif typecode == 7: + data = codec.read_str16 () + elif typecode == 8: # ABSTIME + data = codec.read_uint64 () + elif typecode == 9: # DELTATIME + data = codec.read_uint64 () + elif typecode == 10: # REF + data = objectId(codec) + elif typecode == 11: # BOOL + data = codec.read_uint8 () + elif typecode == 12: # FLOAT + data = codec.read_float () + elif typecode == 13: # DOUBLE + data = codec.read_double () + elif typecode == 14: # UUID + data = codec.read_uuid () + elif typecode == 15: # FTABLE + data = codec.read_map () + elif typecode == 16: + data = codec.read_int8 () + elif typecode == 17: + data = codec.read_int16 () + elif typecode == 18: + data = codec.read_int32 () + elif typecode == 19: + data = codec.read_int64 () + else: + raise ValueError ("Invalid type code: %d" % typecode) + return data + + def incOutstanding (self, ch): + self.cv.acquire () + ch.reqsOutstanding = ch.reqsOutstanding + 1 + self.cv.release () + + def decOutstanding (self, ch): + self.cv.acquire () + ch.reqsOutstanding = ch.reqsOutstanding - 1 + if ch.reqsOutstanding == 0 and self.syncInFlight: + self.syncInFlight = False + self.cv.notify () + self.cv.release () + + if ch.reqsOutstanding == 0: + if self.ctrlCb != None: + self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None) + ch.ssn.exchange_bind (exchange="qpid.management", + queue=ch.topicName, binding_key="console.#") + ch.ssn.exchange_bind (exchange="qpid.management", + queue=ch.topicName, binding_key="schema.#") + + + def handleMethodReply (self, ch, codec, sequence): + status = codec.read_uint32 () + sText = codec.read_str16 () + + data = self.seqMgr.release (sequence) + if data == None: + return + + (userSequence, classId, methodName) = data + args = {} + context = self.seqMgr.release (userSequence) + + if status == 0: + schemaClass = self.schema[classId] + ms = schemaClass['M'] + arglist = None + for mname in ms: + (mdesc, margs) = ms[mname] + if mname == methodName: + arglist = margs + if arglist == None: + return + + for arg in arglist: + if arg[2].find("O") != -1: + args[arg[0]] = self.decodeValue (codec, arg[1]) + + if context == "sync" and userSequence == self.syncSequence: + self.cv.acquire () + self.syncInFlight = False + self.syncResult = methodResult (status, sText, args) + self.cv.notify () + self.cv.release () + elif self.methodCb != None: + self.methodCb (ch.context, userSequence, status, sText, args) + + def handleCommandComplete (self, ch, codec, seq): + code = codec.read_uint32 () + text = codec.read_str8 () + data = (seq, code, text) + context = self.seqMgr.release (seq) + if context == "outstanding": + self.decOutstanding (ch) + elif context == "sync" and seq == self.syncSequence: + self.cv.acquire () + self.syncInFlight = False + self.cv.notify () + self.cv.release () + elif self.ctrlCb != None: + self.ctrlCb (ch.context, self.CTRL_USER, data) + + def handleBrokerResponse (self, ch, codec): + uuid = codec.read_uuid () + ch.brokerInfo = brokerInfo (uuid, ch.sessionId) + if self.ctrlCb != None: + self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, ch.brokerInfo) + + # Send a package request + sendCodec = Codec (self.spec) + seq = self.seqMgr.reserve ("outstanding") + self.setHeader (sendCodec, ord ('P'), seq) + smsg = ch.message(sendCodec.encoded) + ch.send ("qpid.management", smsg) + + def handlePackageInd (self, ch, codec): + pname = codec.read_str8 () + if pname not in self.packages: + self.packages[pname] = {} + + # Send a class request + sendCodec = Codec (self.spec) + seq = self.seqMgr.reserve ("outstanding") + self.setHeader (sendCodec, ord ('Q'), seq) + self.incOutstanding (ch) + sendCodec.write_str8 (pname) + smsg = ch.message(sendCodec.encoded) + ch.send ("qpid.management", smsg) + + def handleClassInd (self, ch, codec): + kind = codec.read_uint8() + if kind != 1: # This API doesn't handle new-style events + return + pname = codec.read_str8() + cname = codec.read_str8() + hash = codec.read_bin128() + if pname not in self.packages: + return + + if (cname, hash) not in self.packages[pname]: + # Send a schema request + sendCodec = Codec (self.spec) + seq = self.seqMgr.reserve ("outstanding") + self.setHeader (sendCodec, ord ('S'), seq) + self.incOutstanding (ch) + sendCodec.write_str8 (pname) + sendCodec.write_str8 (cname) + sendCodec.write_bin128 (hash) + smsg = ch.message(sendCodec.encoded) + ch.send ("qpid.management", smsg) + + def handleHeartbeat (self, ch, codec): + timestamp = codec.read_uint64() + if self.ctrlCb != None: + self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp) + + def handleEvent (self, ch, codec): + if self.eventCb == None: + return + timestamp = codec.read_uint64() + objId = objectId(codec) + packageName = codec.read_str8() + className = codec.read_str8() + hash = codec.read_bin128() + name = codec.read_str8() + classKey = (packageName, className, hash) + if classKey not in self.schema: + return; + schemaClass = self.schema[classKey] + row = [] + es = schemaClass['E'] + arglist = None + for ename in es: + (edesc, eargs) = es[ename] + if ename == name: + arglist = eargs + if arglist == None: + return + for arg in arglist: + row.append((arg[0], self.decodeValue(codec, arg[1]))) + self.eventCb(ch.context, classKey, objId, name, row) + + def parseSchema (self, ch, codec): + """ Parse a received schema-description message. """ + self.decOutstanding (ch) + kind = codec.read_uint8() + if kind != 1: # This API doesn't handle new-style events + return + packageName = codec.read_str8 () + className = codec.read_str8 () + hash = codec.read_bin128 () + configCount = codec.read_uint16 () + instCount = codec.read_uint16 () + methodCount = codec.read_uint16 () + + if packageName not in self.packages: + return + if (className, hash) in self.packages[packageName]: + return + + classKey = (packageName, className, hash) + if classKey in self.schema: + return + + configs = [] + insts = [] + methods = {} + + configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) + insts.append (("id", 4, None, None)) + + for idx in range (configCount): + ft = codec.read_map () + name = str (ft["name"]) + type = ft["type"] + access = ft["access"] + index = ft["index"] + optional = ft["optional"] + unit = None + min = None + max = None + maxlen = None + desc = None + + for key, value in ft.items (): + if key == "unit": + unit = str (value) + elif key == "min": + min = value + elif key == "max": + max = value + elif key == "maxlen": + maxlen = value + elif key == "desc": + desc = str (value) + + config = (name, type, unit, desc, access, index, min, max, maxlen, optional) + configs.append (config) + + for idx in range (instCount): + ft = codec.read_map () + name = str (ft["name"]) + type = ft["type"] + unit = None + desc = None + + for key, value in ft.items (): + if key == "unit": + unit = str (value) + elif key == "desc": + desc = str (value) + + inst = (name, type, unit, desc) + insts.append (inst) + + for idx in range (methodCount): + ft = codec.read_map () + mname = str (ft["name"]) + argCount = ft["argCount"] + if "desc" in ft: + mdesc = str (ft["desc"]) + else: + mdesc = None + + args = [] + for aidx in range (argCount): + ft = codec.read_map () + name = str (ft["name"]) + type = ft["type"] + dir = str (ft["dir"].upper ()) + unit = None + min = None + max = None + maxlen = None + desc = None + default = None + + for key, value in ft.items (): + if key == "unit": + unit = str (value) + elif key == "min": + min = value + elif key == "max": + max = value + elif key == "maxlen": + maxlen = value + elif key == "desc": + desc = str (value) + elif key == "default": + default = str (value) + + arg = (name, type, dir, unit, desc, min, max, maxlen, default) + args.append (arg) + methods[mname] = (mdesc, args) + + schemaClass = {} + schemaClass['C'] = configs + schemaClass['I'] = insts + schemaClass['M'] = methods + self.schema[classKey] = schemaClass + + if self.schemaCb != None: + self.schemaCb (ch.context, classKey, configs, insts, methods, {}) + + def parsePresenceMasks(self, codec, schemaClass): + """ Generate a list of not-present properties """ + excludeList = [] + bit = 0 + for element in schemaClass['C'][1:]: + if element[9] == 1: + if bit == 0: + mask = codec.read_uint8() + bit = 1 + if (mask & bit) == 0: + excludeList.append(element[0]) + bit = bit * 2 + if bit == 256: + bit = 0 + return excludeList + + def parseContent (self, ch, cls, codec, seq=0): + """ Parse a received content message. """ + if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None: + return + if cls == 'I' and self.instCb == None: + return + + packageName = codec.read_str8 () + className = codec.read_str8 () + hash = codec.read_bin128 () + classKey = (packageName, className, hash) + + if classKey not in self.schema: + return + + row = [] + timestamps = [] + + timestamps.append (codec.read_uint64 ()) # Current Time + timestamps.append (codec.read_uint64 ()) # Create Time + timestamps.append (codec.read_uint64 ()) # Delete Time + objId = objectId(codec) + schemaClass = self.schema[classKey] + if cls == 'C' or cls == 'B': + notPresent = self.parsePresenceMasks(codec, schemaClass) + + if cls == 'C' or cls == 'B': + row.append(("id", objId)) + for element in schemaClass['C'][1:]: + tc = element[1] + name = element[0] + if name in notPresent: + row.append((name, None)) + else: + data = self.decodeValue(codec, tc) + row.append((name, data)) + + if cls == 'I' or cls == 'B': + if cls == 'I': + row.append(("id", objId)) + for element in schemaClass['I'][1:]: + tc = element[1] + name = element[0] + data = self.decodeValue (codec, tc) + row.append ((name, data)) + + if cls == 'C' or (cls == 'B' and seq != self.syncSequence): + self.configCb (ch.context, classKey, row, timestamps) + elif cls == 'B' and seq == self.syncSequence: + if timestamps[2] == 0: + obj = mgmtObject (classKey, timestamps, row) + self.syncResult.append (obj) + elif cls == 'I': + self.instCb (ch.context, classKey, row, timestamps) + + def parse (self, ch, codec, opcode, seq): + """ Parse a message received from the topic queue. """ + if opcode == 's': + self.parseSchema (ch, codec) + elif opcode == 'c': + self.parseContent (ch, 'C', codec) + elif opcode == 'i': + self.parseContent (ch, 'I', codec) + elif opcode == 'g': + self.parseContent (ch, 'B', codec, seq) + else: + raise ValueError ("Unknown opcode: %c" % opcode); + + def method (self, channel, userSequence, objId, classId, methodName, args): + """ Invoke a method on an object """ + codec = Codec (self.spec) + sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) + self.setHeader (codec, ord ('M'), sequence) + objId.encode(codec) + codec.write_str8 (classId[0]) + codec.write_str8 (classId[1]) + codec.write_bin128 (classId[2]) + codec.write_str8 (methodName) + bank = "%d.%d" % (objId.getBroker(), objId.getBank()) + + # Encode args according to schema + if classId not in self.schema: + self.seqMgr.release (sequence) + raise ValueError ("Unknown class name: %s" % classId) + + schemaClass = self.schema[classId] + ms = schemaClass['M'] + arglist = None + for mname in ms: + (mdesc, margs) = ms[mname] + if mname == methodName: + arglist = margs + if arglist == None: + self.seqMgr.release (sequence) + raise ValueError ("Unknown method name: %s" % methodName) + + for arg in arglist: + if arg[2].find("I") != -1: + value = arg[8] # default + if arg[0] in args: + value = args[arg[0]] + if value == None: + self.seqMgr.release (sequence) + raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) + self.encodeValue (codec, value, arg[1]) + + packageName = classId[0] + className = classId[1] + msg = channel.message(codec.encoded, "agent." + bank) + channel.send ("qpid.management", msg) diff --git a/RC5/python/qpid/managementdata.py b/RC5/python/qpid/managementdata.py new file mode 100644 index 0000000000..46c746c0f9 --- /dev/null +++ b/RC5/python/qpid/managementdata.py @@ -0,0 +1,753 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +############################################################################### +## This file is being obsoleted by qmf/console.py +############################################################################### + +import qpid +import re +import socket +import struct +import os +import locale +from qpid.management import managementChannel, managementClient +from threading import Lock +from disp import Display +from shlex import split +from qpid.connection import Connection +from qpid.util import connect + +class Broker: + def __init__ (self, text): + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(text) + if not match: raise ValueError("'%s' is not a valid broker url" % (text)) + user, password, host, port = match.groups() + + self.host = socket.gethostbyname (host) + if port: self.port = int(port) + else: self.port = 5672 + self.username = user or "guest" + self.password = password or "guest" + + def name (self): + return self.host + ":" + str (self.port) + +class ManagementData: + + # + # Data Structure: + # + # Please note that this data structure holds only the most recent + # configuration and instrumentation data for each object. It does + # not hold the detailed historical data that is sent from the broker. + # The only historical data it keeps are the high and low watermarks + # for hi-lo statistics. + # + # tables :== {class-key} + # {<obj-id>} + # (timestamp, config-record, inst-record) + # class-key :== (<package-name>, <class-name>, <class-hash>) + # timestamp :== (<last-interval-time>, <create-time>, <delete-time>) + # config-record :== [element] + # inst-record :== [element] + # element :== (<element-name>, <element-value>) + # + + def registerObjId (self, objId): + if not objId.index() in self.idBackMap: + self.idBackMap[objId.index()] = self.nextId + self.idMap[self.nextId] = objId + self.nextId += 1 + + def displayObjId (self, objIdIndex): + if objIdIndex in self.idBackMap: + return self.idBackMap[objIdIndex] + else: + return 0 + + def rawObjId (self, displayId): + if displayId in self.idMap: + return self.idMap[displayId] + else: + return None + + def displayClassName (self, cls): + (packageName, className, hash) = cls + rev = self.schema[cls][4] + if rev == 0: + suffix = "" + else: + suffix = ".%d" % rev + return packageName + ":" + className + suffix + + def dataHandler (self, context, className, list, timestamps): + """ Callback for configuration and instrumentation data updates """ + self.lock.acquire () + try: + # If this class has not been seen before, create an empty dictionary to + # hold objects of this class + if className not in self.tables: + self.tables[className] = {} + + # Register the ID so a more friendly presentation can be displayed + objId = list[0][1] + oidx = objId.index() + self.registerObjId (objId) + + # If this object hasn't been seen before, create a new object record with + # the timestamps and empty lists for configuration and instrumentation data. + if oidx not in self.tables[className]: + self.tables[className][oidx] = (timestamps, [], []) + + (unused, oldConf, oldInst) = self.tables[className][oidx] + + # For config updates, simply replace old config list with the new one. + if context == 0: #config + self.tables[className][oidx] = (timestamps, list, oldInst) + + # For instrumentation updates, carry the minimum and maximum values for + # "hi-lo" stats forward. + elif context == 1: #inst + if len (oldInst) == 0: + newInst = list + else: + newInst = [] + for idx in range (len (list)): + (key, value) = list[idx] + if key.find ("High") == len (key) - 4: + if oldInst[idx][1] > value: + value = oldInst[idx][1] + if key.find ("Low") == len (key) - 3: + if oldInst[idx][1] < value: + value = oldInst[idx][1] + newInst.append ((key, value)) + self.tables[className][oidx] = (timestamps, oldConf, newInst) + + finally: + self.lock.release () + + def ctrlHandler (self, context, op, data): + if op == self.mclient.CTRL_BROKER_INFO: + pass + elif op == self.mclient.CTRL_HEARTBEAT: + pass + + def configHandler (self, context, className, list, timestamps): + self.dataHandler (0, className, list, timestamps); + + def instHandler (self, context, className, list, timestamps): + self.dataHandler (1, className, list, timestamps); + + def methodReply (self, broker, sequence, status, sText, args): + """ Callback for method-reply messages """ + self.lock.acquire () + try: + line = "Call Result: " + self.methodsPending[sequence] + \ + " " + str (status) + " (" + sText + ")" + print line, args + del self.methodsPending[sequence] + finally: + self.lock.release () + + def closeHandler (self, context, reason): + if self.operational: + print "Connection to broker lost:", reason + self.operational = False + if self.cli != None: + self.cli.setPromptMessage ("Broker Disconnected") + + def schemaHandler (self, context, classKey, configs, insts, methods, events): + """ Callback for schema updates """ + if classKey not in self.schema: + schemaRev = 0 + for key in self.schema: + if classKey[0] == key[0] and classKey[1] == key[1]: + schemaRev += 1 + self.schema[classKey] = (configs, insts, methods, events, schemaRev) + + def setCli (self, cliobj): + self.cli = cliobj + + def __init__ (self, disp, host, username="guest", password="guest"): + self.lock = Lock () + self.tables = {} + self.schema = {} + self.bootSequence = 0 + self.operational = False + self.disp = disp + self.cli = None + self.lastUnit = None + self.methodSeq = 1 + self.methodsPending = {} + self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) + + self.broker = Broker (host) + self.conn = Connection (connect (self.broker.host, self.broker.port), + username=self.broker.username, password=self.broker.password) + self.spec = self.conn.spec + self.conn.start () + + self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler, + self.instHandler, self.methodReply, self.closeHandler) + self.mclient.schemaListener (self.schemaHandler) + self.mch = self.mclient.addChannel (self.conn.session(self.sessionId)) + self.operational = True + self.idMap = {} + self.idBackMap = {} + self.nextId = 101 + + def close (self): + pass + + def refName (self, oid): + if oid == None: + return "NULL" + return str (self.displayObjId (oid.index())) + + def valueDisplay (self, classKey, key, value): + if value == None: + return "<NULL>" + for kind in range (2): + schema = self.schema[classKey][kind] + for item in schema: + if item[0] == key: + typecode = item[1] + unit = item[2] + if (typecode >= 1 and typecode <= 5) or typecode == 12 or typecode == 13 or \ + (typecode >= 16 and typecode <= 19): + if unit == None or unit == self.lastUnit: + return str (value) + else: + self.lastUnit = unit + suffix = "" + if value != 1: + suffix = "s" + return str (value) + " " + unit + suffix + elif typecode == 6 or typecode == 7: # strings + return value + elif typecode == 8: + if value == 0: + return "--" + return self.disp.timestamp (value) + elif typecode == 9: + return str (value) + elif typecode == 10: + return self.refName (value) + elif typecode == 11: + if value == 0: + return "False" + else: + return "True" + elif typecode == 14: + return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", value) + elif typecode == 15: + return str (value) + return "*type-error*" + + def getObjIndex (self, classKey, config): + """ Concatenate the values from index columns to form a unique object name """ + result = "" + schemaConfig = self.schema[classKey][0] + for item in schemaConfig: + if item[5] == 1 and item[0] != "id": + if result != "": + result = result + "." + for key,val in config: + if key == item[0]: + result = result + self.valueDisplay (classKey, key, val) + return result + + def getClassKey (self, className): + delimPos = className.find(":") + if delimPos == -1: + schemaRev = 0 + delim = className.find(".") + if delim != -1: + schemaRev = int(className[delim + 1:]) + name = className[0:delim] + else: + name = className + for key in self.schema: + if key[1] == name and self.schema[key][4] == schemaRev: + return key + else: + package = className[0:delimPos] + name = className[delimPos + 1:] + schemaRev = 0 + delim = name.find(".") + if delim != -1: + schemaRev = int(name[delim + 1:]) + name = name[0:delim] + for key in self.schema: + if key[0] == package and key[1] == name: + if self.schema[key][4] == schemaRev: + return key + return None + + def classCompletions (self, prefix): + """ Provide a list of candidate class names for command completion """ + self.lock.acquire () + complist = [] + try: + for name in self.tables: + if name.find (prefix) == 0: + complist.append (name) + finally: + self.lock.release () + return complist + + def typeName (self, typecode): + """ Convert type-codes to printable strings """ + if typecode == 1: + return "uint8" + elif typecode == 2: + return "uint16" + elif typecode == 3: + return "uint32" + elif typecode == 4: + return "uint64" + elif typecode == 5: + return "bool" + elif typecode == 6: + return "short-string" + elif typecode == 7: + return "long-string" + elif typecode == 8: + return "abs-time" + elif typecode == 9: + return "delta-time" + elif typecode == 10: + return "reference" + elif typecode == 11: + return "boolean" + elif typecode == 12: + return "float" + elif typecode == 13: + return "double" + elif typecode == 14: + return "uuid" + elif typecode == 15: + return "field-table" + elif typecode == 16: + return "int8" + elif typecode == 17: + return "int16" + elif typecode == 18: + return "int32" + elif typecode == 19: + return "int64" + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def accessName (self, code): + """ Convert element access codes to printable strings """ + if code == 1: + return "ReadCreate" + elif code == 2: + return "ReadWrite" + elif code == 3: + return "ReadOnly" + else: + raise ValueError ("Invalid access code: %d" %code) + + def notNone (self, text): + if text == None: + return "" + else: + return text + + def isOid (self, id): + for char in str (id): + if not char.isdigit () and not char == '-': + return False + return True + + def listOfIds (self, classKey, tokens): + """ Generate a tuple of object ids for a classname based on command tokens. """ + list = [] + if len(tokens) == 0 or tokens[0] == "all": + for id in self.tables[classKey]: + list.append (self.displayObjId (id)) + + elif tokens[0] == "active": + for id in self.tables[classKey]: + if self.tables[classKey][id][0][2] == 0: + list.append (self.displayObjId (id)) + + else: + for token in tokens: + if self.isOid (token): + if token.find ("-") != -1: + ids = token.split("-", 2) + for id in range (int (ids[0]), int (ids[1]) + 1): + if self.getClassForId (self.rawObjId (long (id))) == classKey: + list.append (id) + else: + list.append (int(token)) + + list.sort () + result = () + for item in list: + result = result + (item,) + return result + + def listClasses (self): + """ Generate a display of the list of classes """ + self.lock.acquire () + try: + rows = [] + sorted = self.tables.keys () + sorted.sort () + for name in sorted: + active = 0 + deleted = 0 + for record in self.tables[name]: + isdel = False + ts = self.tables[name][record][0] + if ts[2] > 0: + isdel = True + if isdel: + deleted = deleted + 1 + else: + active = active + 1 + rows.append ((self.displayClassName (name), active, deleted)) + if len (rows) != 0: + self.disp.table ("Management Object Types:", + ("ObjectType", "Active", "Deleted"), rows) + else: + print "Waiting for next periodic update" + finally: + self.lock.release () + + def listObjects (self, tokens): + """ Generate a display of a list of objects in a class """ + if len(tokens) == 0: + print "Error - No class name provided" + return + + self.lock.acquire () + try: + classKey = self.getClassKey (tokens[0]) + if classKey == None: + print ("Object type %s not known" % tokens[0]) + else: + rows = [] + if classKey in self.tables: + ids = self.listOfIds(classKey, tokens[1:]) + for objId in ids: + (ts, config, inst) = self.tables[classKey][self.rawObjId(objId).index()] + createTime = self.disp.timestamp (ts[1]) + destroyTime = "-" + if ts[2] > 0: + destroyTime = self.disp.timestamp (ts[2]) + objIndex = self.getObjIndex (classKey, config) + row = (objId, createTime, destroyTime, objIndex) + rows.append (row) + self.disp.table ("Objects of type %s" % self.displayClassName(classKey), + ("ID", "Created", "Destroyed", "Index"), + rows) + finally: + self.lock.release () + + def showObjects (self, tokens): + """ Generate a display of object data for a particular class """ + self.lock.acquire () + try: + self.lastUnit = None + if self.isOid (tokens[0]): + if tokens[0].find ("-") != -1: + rootId = int (tokens[0][0:tokens[0].find ("-")]) + else: + rootId = int (tokens[0]) + + classKey = self.getClassForId (self.rawObjId (rootId)) + remaining = tokens + if classKey == None: + print "Id not known: %d" % int (tokens[0]) + raise ValueError () + else: + classKey = self.getClassKey (tokens[0]) + remaining = tokens[1:] + if classKey not in self.tables: + print "Class not known: %s" % tokens[0] + raise ValueError () + + userIds = self.listOfIds (classKey, remaining) + if len (userIds) == 0: + print "No object IDs supplied" + raise ValueError () + + ids = [] + for id in userIds: + if self.getClassForId (self.rawObjId (long (id))) == classKey: + ids.append (self.rawObjId (long (id))) + + rows = [] + timestamp = None + config = self.tables[classKey][ids[0].index()][1] + for eIdx in range (len (config)): + key = config[eIdx][0] + if key != "id": + row = ("property", key) + for id in ids: + if timestamp == None or \ + timestamp < self.tables[classKey][id.index()][0][0]: + timestamp = self.tables[classKey][id.index()][0][0] + (key, value) = self.tables[classKey][id.index()][1][eIdx] + row = row + (self.valueDisplay (classKey, key, value),) + rows.append (row) + + inst = self.tables[classKey][ids[0].index()][2] + for eIdx in range (len (inst)): + key = inst[eIdx][0] + if key != "id": + row = ("statistic", key) + for id in ids: + (key, value) = self.tables[classKey][id.index()][2][eIdx] + row = row + (self.valueDisplay (classKey, key, value),) + rows.append (row) + + titleRow = ("Type", "Element") + for id in ids: + titleRow = titleRow + (self.refName(id),) + caption = "Object of type %s:" % self.displayClassName(classKey) + if timestamp != None: + caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")" + self.disp.table (caption, titleRow, rows) + + except: + pass + self.lock.release () + + def schemaSummary (self): + """ Generate a display of the list of classes in the schema """ + self.lock.acquire () + try: + rows = [] + sorted = self.schema.keys () + sorted.sort () + for classKey in sorted: + tuple = self.schema[classKey] + row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]), + len (tuple[2])) + rows.append (row) + self.disp.table ("Classes in Schema:", + ("Class", "Properties", "Statistics", "Methods"), + rows) + finally: + self.lock.release () + + def schemaTable (self, className): + """ Generate a display of details of the schema of a particular class """ + self.lock.acquire () + try: + classKey = self.getClassKey (className) + if classKey == None: + print ("Class name %s not known" % className) + raise ValueError () + + rows = [] + schemaRev = self.schema[classKey][4] + for config in self.schema[classKey][0]: + name = config[0] + if name != "id": + typename = self.typeName(config[1]) + unit = self.notNone (config[2]) + desc = self.notNone (config[3]) + access = self.accessName (config[4]) + extra = "" + if config[5] == 1: + extra += "index " + if config[6] != None: + extra += "Min: " + str(config[6]) + " " + if config[7] != None: + extra += "Max: " + str(config[7]) + " " + if config[8] != None: + extra += "MaxLen: " + str(config[8]) + " " + if config[9] == 1: + extra += "optional " + rows.append ((name, typename, unit, access, extra, desc)) + + for config in self.schema[classKey][1]: + name = config[0] + if name != "id": + typename = self.typeName(config[1]) + unit = self.notNone (config[2]) + desc = self.notNone (config[3]) + rows.append ((name, typename, unit, "", "", desc)) + + titles = ("Element", "Type", "Unit", "Access", "Notes", "Description") + self.disp.table ("Schema for class '%s':" % self.displayClassName(classKey), titles, rows) + + for mname in self.schema[classKey][2]: + (mdesc, args) = self.schema[classKey][2][mname] + caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc)) + rows = [] + for arg in args: + name = arg[0] + typename = self.typeName (arg[1]) + dir = arg[2] + unit = self.notNone (arg[3]) + desc = self.notNone (arg[4]) + extra = "" + if arg[5] != None: + extra = extra + "Min: " + str (arg[5]) + if arg[6] != None: + extra = extra + "Max: " + str (arg[6]) + if arg[7] != None: + extra = extra + "MaxLen: " + str (arg[7]) + if arg[8] != None: + extra = extra + "Default: " + str (arg[8]) + rows.append ((name, typename, dir, unit, extra, desc)) + titles = ("Argument", "Type", "Direction", "Unit", "Notes", "Description") + self.disp.table (caption, titles, rows) + + except Exception,e: + pass + self.lock.release () + + def getClassForId (self, objId): + """ Given an object ID, return the class key for the referenced object """ + for classKey in self.tables: + if objId.index() in self.tables[classKey]: + return classKey + return None + + def callMethod (self, userOid, methodName, args): + self.lock.acquire () + methodOk = True + try: + classKey = self.getClassForId (self.rawObjId (userOid)) + if classKey == None: + raise ValueError () + + if methodName not in self.schema[classKey][2]: + print "Method '%s' not valid for class '%s'" % (methodName, self.displayClassName(classKey)) + raise ValueError () + + schemaMethod = self.schema[classKey][2][methodName] + count = 0 + for arg in range(len(schemaMethod[1])): + if schemaMethod[1][arg][2].find("I") != -1: + count += 1 + if len (args) != count: + print "Wrong number of method args: Need %d, Got %d" % (count, len (args)) + raise ValueError () + + namedArgs = {} + idx = 0 + for arg in range(len(schemaMethod[1])): + if schemaMethod[1][arg][2].find("I") != -1: + namedArgs[schemaMethod[1][arg][0]] = args[idx] + idx += 1 + + self.methodSeq = self.methodSeq + 1 + self.methodsPending[self.methodSeq] = methodName + except Exception, e: + methodOk = False + self.lock.release () + if methodOk: +# try: + self.mclient.callMethod (self.mch, self.methodSeq, self.rawObjId (userOid), classKey, + methodName, namedArgs) +# except ValueError, e: +# print "Error invoking method:", e + + def makeIdRow (self, displayId): + if displayId in self.idMap: + objId = self.idMap[displayId] + else: + return None + if objId.getFlags() == 0: + flags = "" + else: + flags = str(objId.getFlags()) + seq = objId.getSequence() + if seq == 0: + seqText = "<durable>" + else: + seqText = str(seq) + return (displayId, flags, seqText, objId.getBroker(), objId.getBank(), hex(objId.getObject())) + + def listIds (self, select): + rows = [] + if select == 0: + sorted = self.idMap.keys() + sorted.sort() + for displayId in sorted: + row = self.makeIdRow (displayId) + rows.append(row) + else: + row = self.makeIdRow (select) + if row == None: + print "Display Id %d not known" % select + return + rows.append(row) + self.disp.table("Translation of Display IDs:", + ("DisplayID", "Flags", "BootSequence", "Broker", "Bank", "Object"), + rows) + + def do_list (self, data): + tokens = data.split () + if len (tokens) == 0: + self.listClasses () + else: + self.listObjects (tokens) + + def do_show (self, data): + tokens = data.split () + self.showObjects (tokens) + + def do_schema (self, data): + if data == "": + self.schemaSummary () + else: + self.schemaTable (data) + + def do_call (self, data): + encTokens = data.split () + try: + tokens = [a.decode(locale.getpreferredencoding()) for a in encArgs] + except: + tokens = encTokens + if len (tokens) < 2: + print "Not enough arguments supplied" + return + + displayId = long (tokens[0]) + methodName = tokens[1] + args = tokens[2:] + self.callMethod (displayId, methodName, args) + + def do_id (self, data): + if data == "": + select = 0 + else: + select = int(data) + self.listIds(select) + + def do_exit (self): + self.mclient.removeChannel (self.mch) diff --git a/RC5/python/qpid/message.py b/RC5/python/qpid/message.py new file mode 100644 index 0000000000..eb3ef5c03c --- /dev/null +++ b/RC5/python/qpid/message.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from connection08 import Method, Request +from sets import Set + +class Message: + + def __init__(self, channel, frame, content = None): + self.channel = channel + self.frame = frame + self.method = frame.method_type + self.content = content + if self.method.is_l4_command(): + self.command_id = self.channel.incoming_completion.sequence.next() + #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name + + def __len__(self): + return len(self.frame.args) + + def _idx(self, idx): + if idx < 0: idx += len(self) + if idx < 0 or idx > len(self): + raise IndexError(idx) + return idx + + def __getitem__(self, idx): + return self.frame.args[idx] + + def __getattr__(self, attr): + fields = self.method.fields.byname + if fields.has_key(attr): + f = fields[attr] + result = self[self.method.fields.index(f)] + else: + for r in self.method.responses: + if attr == r.name: + def respond(*args, **kwargs): + batch=0 + if kwargs.has_key("batchoffset"): + batch=kwargs.pop("batchoffset") + self.channel.respond(Method(r, r.arguments(*args, **kwargs)), batch, self.frame) + result = respond + break + else: + raise AttributeError(attr) + return result + + STR = "%s %s content = %s" + REPR = STR.replace("%s", "%r") + + def __str__(self): + return Message.STR % (self.method, self.frame.args, self.content) + + def __repr__(self): + return Message.REPR % (self.method, self.frame.args, self.content) + + def complete(self, cumulative=True): + self.channel.incoming_completion.complete(mark=self.command_id, cumulative=cumulative) diff --git a/RC5/python/qpid/packer.py b/RC5/python/qpid/packer.py new file mode 100644 index 0000000000..22c16918dc --- /dev/null +++ b/RC5/python/qpid/packer.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import struct + +class Packer: + + def read(self, n): abstract + + def write(self, s): abstract + + def unpack(self, fmt): + values = struct.unpack(fmt, self.read(struct.calcsize(fmt))) + if len(values) == 1: + return values[0] + else: + return values + + def pack(self, fmt, *args): + self.write(struct.pack(fmt, *args)) diff --git a/RC5/python/qpid/peer.py b/RC5/python/qpid/peer.py new file mode 100644 index 0000000000..648f32ceef --- /dev/null +++ b/RC5/python/qpid/peer.py @@ -0,0 +1,465 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module contains a skeletal peer implementation useful for +implementing an AMQP server, client, or proxy. The peer implementation +sorts incoming frames to their intended channels, and dispatches +incoming method frames to a delegate. +""" + +import thread, threading, traceback, socket, sys, logging +from connection08 import EOF, Method, Header, Body, Request, Response +from message import Message +from queue import Queue, Closed as QueueClosed +from content import Content +from cStringIO import StringIO +from time import time + +class Sequence: + + def __init__(self, start, step = 1): + # we should keep start for wrap around + self._next = start + self.step = step + self.lock = thread.allocate_lock() + + def next(self): + self.lock.acquire() + try: + result = self._next + self._next += self.step + return result + finally: + self.lock.release() + +class Peer: + + def __init__(self, conn, delegate, channel_factory=None): + self.conn = conn + self.delegate = delegate + self.outgoing = Queue(0) + self.work = Queue(0) + self.channels = {} + self.lock = thread.allocate_lock() + if channel_factory: + self.channel_factory = channel_factory + else: + self.channel_factory = Channel + + def channel(self, id): + self.lock.acquire() + try: + try: + ch = self.channels[id] + except KeyError: + ch = self.channel_factory(id, self.outgoing, self.conn.spec) + self.channels[id] = ch + finally: + self.lock.release() + return ch + + def start(self): + thread.start_new_thread(self.writer, ()) + thread.start_new_thread(self.reader, ()) + thread.start_new_thread(self.worker, ()) + + def fatal(self, message=None): + """Call when an unexpected exception occurs that will kill a thread.""" + if message: print >> sys.stderr, message + self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) + + def reader(self): + try: + while True: + try: + frame = self.conn.read() + except EOF, e: + self.work.close() + break + ch = self.channel(frame.channel) + ch.receive(frame, self.work) + except: + self.fatal() + + def closed(self, reason): + # We must close the delegate first because closing channels + # may wake up waiting threads and we don't want them to see + # the delegate as open. + self.delegate.closed(reason) + for ch in self.channels.values(): + ch.closed(reason) + + def writer(self): + try: + while True: + try: + message = self.outgoing.get() + self.conn.write(message) + except socket.error, e: + self.closed(e) + break + self.conn.flush() + except: + self.fatal() + + def worker(self): + try: + while True: + queue = self.work.get() + frame = queue.get() + channel = self.channel(frame.channel) + if frame.method_type.content: + content = read_content(queue) + else: + content = None + + self.delegate(channel, Message(channel, frame, content)) + except QueueClosed: + self.closed("worker closed") + except: + self.fatal() + +class Requester: + + def __init__(self, writer): + self.write = writer + self.sequence = Sequence(1) + self.mark = 0 + # request_id -> listener + self.outstanding = {} + + def request(self, method, listener, content = None): + frame = Request(self.sequence.next(), self.mark, method) + self.outstanding[frame.id] = listener + self.write(frame, content) + + def receive(self, channel, frame): + listener = self.outstanding.pop(frame.request_id) + listener(channel, frame) + +class Responder: + + def __init__(self, writer): + self.write = writer + self.sequence = Sequence(1) + + def respond(self, method, batch, request): + if isinstance(request, Method): + self.write(method) + else: + # allow batching from frame at either end + if batch<0: + frame = Response(self.sequence.next(), request.id+batch, -batch, method) + else: + frame = Response(self.sequence.next(), request.id, batch, method) + self.write(frame) + +class Closed(Exception): pass + +class Channel: + + def __init__(self, id, outgoing, spec): + self.id = id + self.outgoing = outgoing + self.spec = spec + self.incoming = Queue(0) + self.responses = Queue(0) + self.queue = None + self._closed = False + self.reason = None + + self.requester = Requester(self.write) + self.responder = Responder(self.write) + + self.completion = OutgoingCompletion() + self.incoming_completion = IncomingCompletion(self) + self.futures = {} + self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves + + self.invoker = self.invoke_method + self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) + self.synchronous = True + + def closed(self, reason): + if self._closed: + return + self._closed = True + self.reason = reason + self.incoming.close() + self.responses.close() + self.completion.close() + self.incoming_completion.reset() + for f in self.futures.values(): + f.put_response(self, reason) + + def write(self, frame, content = None): + if self._closed: + raise Closed(self.reason) + frame.channel = self.id + self.outgoing.put(frame) + if (isinstance(frame, (Method, Request)) + and content == None + and frame.method_type.content): + content = Content() + if content != None: + self.write_content(frame.method_type.klass, content) + + def write_content(self, klass, content): + header = Header(klass, content.weight(), content.size(), content.properties) + self.write(header) + for child in content.children: + self.write_content(klass, child) + # should split up if content.body exceeds max frame size + if content.body: + self.write(Body(content.body)) + + def receive(self, frame, work): + if isinstance(frame, Method): + if frame.method.response: + self.queue = self.responses + else: + self.queue = self.incoming + work.put(self.incoming) + elif isinstance(frame, Request): + self.queue = self.incoming + work.put(self.incoming) + elif isinstance(frame, Response): + self.requester.receive(self, frame) + if frame.method_type.content: + self.queue = self.responses + return + self.queue.put(frame) + + def queue_response(self, channel, frame): + channel.responses.put(frame.method) + + def request(self, method, listener, content = None): + self.requester.request(method, listener, content) + + def respond(self, method, batch, request): + self.responder.respond(method, batch, request) + + def invoke(self, type, args, kwargs): + if (type.klass.name in ["channel", "session"]) and (type.name in ["close", "open", "closed"]): + self.completion.reset() + self.incoming_completion.reset() + self.completion.next_command(type) + + content = kwargs.pop("content", None) + frame = Method(type, type.arguments(*args, **kwargs)) + return self.invoker(frame, content) + + # used for 0-9 + def invoke_reliable(self, frame, content = None): + if not self.synchronous: + future = Future() + self.request(frame, future.put_response, content) + if not frame.method.responses: return None + else: return future + + self.request(frame, self.queue_response, content) + if not frame.method.responses: + if self.use_execution_layer and frame.method_type.is_l4_command(): + self.execution_sync() + self.completion.wait() + if self._closed: + raise Closed(self.reason) + return None + try: + resp = self.responses.get() + if resp.method_type.content: + return Message(self, resp, read_content(self.responses)) + else: + return Message(self, resp) + except QueueClosed, e: + if self._closed: + raise Closed(self.reason) + else: + raise e + + # used for 0-8 and 0-10 + def invoke_method(self, frame, content = None): + if frame.method.result: + cmd_id = self.completion.command_id + future = Future() + self.futures[cmd_id] = future + + self.write(frame, content) + + try: + # here we depend on all nowait fields being named nowait + f = frame.method.fields.byname["nowait"] + nowait = frame.args[frame.method.fields.index(f)] + except KeyError: + nowait = False + + try: + if not nowait and frame.method.responses: + resp = self.responses.get() + if resp.method.content: + content = read_content(self.responses) + else: + content = None + if resp.method in frame.method.responses: + return Message(self, resp, content) + else: + raise ValueError(resp) + elif frame.method.result: + if self.synchronous: + fr = future.get_response(timeout=10) + if self._closed: + raise Closed(self.reason) + return fr + else: + return future + elif self.synchronous and not frame.method.response \ + and self.use_execution_layer and frame.method.is_l4_command(): + self.execution_sync() + completed = self.completion.wait(timeout=10) + if self._closed: + raise Closed(self.reason) + if not completed: + self.closed("Timed-out waiting for completion of %s" % frame) + except QueueClosed, e: + if self._closed: + raise Closed(self.reason) + else: + raise e + + def __getattr__(self, name): + type = self.spec.method(name) + if type == None: raise AttributeError(name) + method = lambda *args, **kwargs: self.invoke(type, args, kwargs) + self.__dict__[name] = method + return method + +def read_content(queue): + header = queue.get() + children = [] + for i in range(header.weight): + children.append(read_content(queue)) + buf = StringIO() + eof = header.eof + while not eof: + body = queue.get() + eof = body.eof + content = body.content + buf.write(content) + return Content(buf.getvalue(), children, header.properties.copy()) + +class Future: + def __init__(self): + self.completed = threading.Event() + + def put_response(self, channel, response): + self.response = response + self.completed.set() + + def get_response(self, timeout=None): + self.completed.wait(timeout) + if self.completed.isSet(): + return self.response + else: + return None + + def is_complete(self): + return self.completed.isSet() + +class OutgoingCompletion: + """ + Manages completion of outgoing commands i.e. command sent by this peer + """ + + def __init__(self): + self.condition = threading.Condition() + + #todo, implement proper wraparound + self.sequence = Sequence(0) #issues ids for outgoing commands + self.command_id = -1 #last issued id + self.mark = -1 #commands up to this mark are known to be complete + self._closed = False + + def next_command(self, method): + #the following test is a hack until the track/sub-channel is available + if method.is_l4_command(): + self.command_id = self.sequence.next() + + def reset(self): + self.sequence = Sequence(0) #reset counter + + def close(self): + self.reset() + self.condition.acquire() + try: + self._closed = True + self.condition.notifyAll() + finally: + self.condition.release() + + def complete(self, mark): + self.condition.acquire() + try: + self.mark = mark + #print "set mark to %s [%s] " % (self.mark, self) + self.condition.notifyAll() + finally: + self.condition.release() + + def wait(self, point_of_interest=-1, timeout=None): + if point_of_interest == -1: point_of_interest = self.command_id + start_time = time() + remaining = timeout + self.condition.acquire() + try: + while not self._closed and point_of_interest > self.mark: + #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self) + self.condition.wait(remaining) + if not self._closed and point_of_interest > self.mark and timeout: + if (start_time + timeout) < time(): break + else: remaining = timeout - (time() - start_time) + finally: + self.condition.release() + return point_of_interest <= self.mark + +class IncomingCompletion: + """ + Manages completion of incoming commands i.e. command received by this peer + """ + + def __init__(self, channel): + self.sequence = Sequence(0) #issues ids for incoming commands + self.mark = -1 #id of last command of whose completion notification was sent to the other peer + self.channel = channel + + def reset(self): + self.sequence = Sequence(0) #reset counter + + def complete(self, mark, cumulative=True): + if cumulative: + if mark > self.mark: + self.mark = mark + self.channel.execution_complete(cumulative_execution_mark=self.mark) + else: + #TODO: record and manage the ranges properly + range = [mark, mark] + if (self.mark == -1):#hack until wraparound is implemented + self.channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=range) + else: + self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range) diff --git a/RC5/python/qpid/queue.py b/RC5/python/qpid/queue.py new file mode 100644 index 0000000000..c9f4d1d1d0 --- /dev/null +++ b/RC5/python/qpid/queue.py @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module augments the standard python multithreaded Queue +implementation to add a close() method so that threads blocking on the +content of a queue can be notified if the queue is no longer in use. +""" + +from Queue import Queue as BaseQueue, Empty, Full +from threading import Thread +from exceptions import Closed + +class Queue(BaseQueue): + + END = object() + STOP = object() + + def __init__(self, *args, **kwargs): + BaseQueue.__init__(self, *args, **kwargs) + self.error = None + self.listener = None + self.exc_listener = None + self.thread = None + + def close(self, error = None): + self.error = error + self.put(Queue.END) + if self.thread is not None: + self.thread.join() + self.thread = None + + def get(self, block = True, timeout = None): + result = BaseQueue.get(self, block, timeout) + if result == Queue.END: + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Closed exception + self.put(Queue.END) + raise Closed(self.error) + else: + return result + + def listen(self, listener, exc_listener = None): + if listener is None and exc_listener is not None: + raise ValueError("cannot set exception listener without setting listener") + + if listener is None: + if self.thread is not None: + self.put(Queue.STOP) + self.thread.join() + self.thread = None + + self.listener = listener + self.exc_listener = exc_listener + + if listener is not None and self.thread is None: + self.thread = Thread(target = self.run) + self.thread.setDaemon(True) + self.thread.start() + + def run(self): + while True: + try: + o = self.get() + if o == Queue.STOP: break + self.listener(o) + except Closed, e: + if self.exc_listener is not None: + self.exc_listener(e) + break diff --git a/RC5/python/qpid/reference.py b/RC5/python/qpid/reference.py new file mode 100644 index 0000000000..48ecb67656 --- /dev/null +++ b/RC5/python/qpid/reference.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Support for amqp 'reference' content (as opposed to inline content) +""" + +import threading +from queue import Queue, Closed + +class NotOpened(Exception): pass + +class AlreadyOpened(Exception): pass + +""" +A representation of a reference id; can be passed wherever amqp +content is required in place of inline data +""" +class ReferenceId: + + def __init__(self, id): + self.id = id + +""" +Holds content received through 'reference api'. Instances of this +class will be placed in the consumers queue on receiving a transfer +(assuming the reference has been opened). Data can be retrieved in +chunks (as append calls are received) or in full (after reference has +been closed signalling data s complete). +""" + +class Reference: + + def __init__(self, id): + self.id = id + self.chunks = Queue(0) + + def close(self): + self.chunks.close() + + def append(self, bytes): + self.chunks.put(bytes) + + def get_chunk(self): + return self.chunks.get() + + def get_complete(self): + data = "" + for chunk in self: + data += chunk + return data + + def next(self): + try: + return self.get_chunk() + except Closed, e: + raise StopIteration + + def __iter__(self): + return self + +""" +Manages a set of opened references. New references can be opened and +existing references can be retrieved or closed. +""" +class References: + + def __init__(self): + self.map = {} + self.lock = threading.Lock() + + def get(self, id): + self.lock.acquire() + try: + try: + ref = self.map[id] + except KeyError: + raise NotOpened() + finally: + self.lock.release() + return ref + + def open(self, id): + self.lock.acquire() + try: + if id in self.map: raise AlreadyOpened() + self.map[id] = Reference(id) + finally: + self.lock.release() + + + def close(self, id): + self.get(id).close() + self.lock.acquire() + try: + self.map.pop(id) + finally: + self.lock.release() + diff --git a/RC5/python/qpid/session.py b/RC5/python/qpid/session.py new file mode 100644 index 0000000000..4a7ecbc28a --- /dev/null +++ b/RC5/python/qpid/session.py @@ -0,0 +1,379 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import Condition, RLock, Lock, currentThread +from invoker import Invoker +from datatypes import RangedSet, Struct, Future +from codec010 import StringCodec +from assembler import Segment +from queue import Queue +from datatypes import Message, serial +from util import wait, notify +from exceptions import * +from logging import getLogger + +log = getLogger("qpid.io.cmd") +msg = getLogger("qpid.io.msg") + +class SessionException(Exception): pass +class SessionClosed(SessionException): pass +class SessionDetached(SessionException): pass + +def client(*args): + return Client(*args) + +def server(*args): + return Server(*args) + +INCOMPLETE = object() + +class Session(Invoker): + + def __init__(self, name, spec, auto_sync=True, timeout=10, delegate=client): + self.name = name + self.spec = spec + self.auto_sync = auto_sync + self.timeout = timeout + self.channel = None + self.invoke_lock = Lock() + self._closing = False + self._closed = False + + self.condition = Condition() + + self.send_id = True + self.receiver = Receiver(self) + self.sender = Sender(self) + + self.lock = RLock() + self._incoming = {} + self.results = {} + self.exceptions = [] + + self.assembly = None + + self.delegate = delegate(self) + + def incoming(self, destination): + self.lock.acquire() + try: + queue = self._incoming.get(destination) + if queue == None: + queue = Incoming(self, destination) + self._incoming[destination] = queue + return queue + finally: + self.lock.release() + + def error(self): + exc = self.exceptions[:] + if len(exc) == 0: + return None + elif len(exc) == 1: + return exc[0] + else: + return tuple(exc) + + def sync(self, timeout=None): + ch = self.channel + if ch is not None and currentThread() == ch.connection.thread: + raise SessionException("deadlock detected") + if not self.auto_sync: + self.execution_sync(sync=True) + last = self.sender.next_id - 1 + if not wait(self.condition, lambda: + last in self.sender._completed or self.exceptions, + timeout): + raise Timeout() + if self.exceptions: + raise SessionException(self.error()) + + def close(self, timeout=None): + self.invoke_lock.acquire() + try: + self._closing = True + self.channel.session_detach(self.name) + finally: + self.invoke_lock.release() + if not wait(self.condition, lambda: self._closed, timeout): + raise Timeout() + + def closed(self): + self.lock.acquire() + try: + if self._closed: return + + error = self.error() + for id in self.results: + f = self.results[id] + f.error(error) + self.results.clear() + + for q in self._incoming.values(): + q.close(error) + + self._closed = True + notify(self.condition) + finally: + self.lock.release() + + def resolve_method(self, name): + cmd = self.spec.instructions.get(name) + if cmd is not None and cmd.track == self.spec["track.command"].value: + return self.METHOD, cmd + else: + # XXX + for st in self.spec.structs.values(): + if st.name == name: + return self.METHOD, st + if self.spec.structs_by_name.has_key(name): + return self.METHOD, self.spec.structs_by_name[name] + if self.spec.enums.has_key(name): + return self.VALUE, self.spec.enums[name] + return self.ERROR, None + + def invoke(self, type, args, kwargs): + # XXX + if not hasattr(type, "track"): + return type.new(args, kwargs) + + self.invoke_lock.acquire() + try: + return self.do_invoke(type, args, kwargs) + finally: + self.invoke_lock.release() + + def do_invoke(self, type, args, kwargs): + if self._closing: + raise SessionClosed() + + if self.channel == None: + raise SessionDetached() + + if type.segments: + if len(args) == len(type.fields) + 1: + message = args[-1] + args = args[:-1] + else: + message = kwargs.pop("message", None) + else: + message = None + + hdr = Struct(self.spec["session.header"]) + hdr.sync = self.auto_sync or kwargs.pop("sync", False) + + cmd = type.new(args, kwargs) + sc = StringCodec(self.spec) + sc.write_command(hdr, cmd) + + seg = Segment(True, (message == None or + (message.headers == None and message.body == None)), + type.segment_type, type.track, self.channel.id, sc.encoded) + + if type.result: + result = Future(exception=SessionException) + self.results[self.sender.next_id] = result + + self.send(seg) + + log.debug("SENT %s %s %s", seg.id, hdr, cmd) + + if message != None: + if message.headers != None: + sc = StringCodec(self.spec) + for st in message.headers: + sc.write_struct32(st) + seg = Segment(False, message.body == None, self.spec["segment_type.header"].value, + type.track, self.channel.id, sc.encoded) + self.send(seg) + if message.body != None: + seg = Segment(False, True, self.spec["segment_type.body"].value, + type.track, self.channel.id, message.body) + self.send(seg) + msg.debug("SENT %s", message) + + if type.result: + if self.auto_sync: + return result.get(self.timeout) + else: + return result + elif self.auto_sync: + self.sync(self.timeout) + + def received(self, seg): + self.receiver.received(seg) + if seg.first: + assert self.assembly == None + self.assembly = [] + self.assembly.append(seg) + if seg.last: + self.dispatch(self.assembly) + self.assembly = None + + def dispatch(self, assembly): + segments = assembly[:] + + hdr, cmd = assembly.pop(0).decode(self.spec) + log.debug("RECV %s %s %s", cmd.id, hdr, cmd) + + args = [] + + for st in cmd._type.segments: + if assembly: + seg = assembly[0] + if seg.type == st.segment_type: + args.append(seg.decode(self.spec)) + assembly.pop(0) + continue + args.append(None) + + assert len(assembly) == 0 + + attr = cmd._type.qname.replace(".", "_") + result = getattr(self.delegate, attr)(cmd, *args) + + if cmd._type.result: + self.execution_result(cmd.id, result) + + if result is not INCOMPLETE: + for seg in segments: + self.receiver.completed(seg) + # XXX: don't forget to obey sync for manual completion as well + if hdr.sync: + self.channel.session_completed(self.receiver._completed) + + def send(self, seg): + self.sender.send(seg) + + def __str__(self): + return '<Session: %s, %s>' % (self.name, self.channel) + + def __repr__(self): + return str(self) + +class Receiver: + + def __init__(self, session): + self.session = session + self.next_id = None + self.next_offset = None + self._completed = RangedSet() + + def received(self, seg): + if self.next_id == None or self.next_offset == None: + raise Exception("todo") + seg.id = self.next_id + seg.offset = self.next_offset + if seg.last: + self.next_id += 1 + self.next_offset = 0 + else: + self.next_offset += len(seg.payload) + + def completed(self, seg): + if seg.id == None: + raise ValueError("cannot complete unidentified segment") + if seg.last: + self._completed.add(seg.id) + + def known_completed(self, commands): + completed = RangedSet() + for c in self._completed.ranges: + for kc in commands.ranges: + if c.lower in kc and c.upper in kc: + break + else: + completed.add_range(c) + self._completed = completed + +class Sender: + + def __init__(self, session): + self.session = session + self.next_id = serial(0) + self.next_offset = 0 + self.segments = [] + self._completed = RangedSet() + + def send(self, seg): + seg.id = self.next_id + seg.offset = self.next_offset + if seg.last: + self.next_id += 1 + self.next_offset = 0 + else: + self.next_offset += len(seg.payload) + self.segments.append(seg) + if self.session.send_id: + self.session.send_id = False + self.session.channel.session_command_point(seg.id, seg.offset) + self.session.channel.connection.write_segment(seg) + + def completed(self, commands): + idx = 0 + while idx < len(self.segments): + seg = self.segments[idx] + if seg.id in commands: + del self.segments[idx] + else: + idx += 1 + for range in commands.ranges: + self._completed.add(range.lower, range.upper) + +class Incoming(Queue): + + def __init__(self, session, destination): + Queue.__init__(self) + self.session = session + self.destination = destination + + def start(self): + self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit) + for unit in self.session.credit_unit.values(): + self.session.message_flow(self.destination, unit, 0xFFFFFFFFL) + + def stop(self): + self.session.message_cancel(self.destination) + self.listen(None) + +class Delegate: + + def __init__(self, session): + self.session = session + + #XXX: do something with incoming accepts + def message_accept(self, ma): None + + def execution_result(self, er): + future = self.session.results.pop(er.command_id) + future.set(er.value) + + def execution_exception(self, ex): + self.session.exceptions.append(ex) + +class Client(Delegate): + + def message_transfer(self, cmd, headers, body): + m = Message(body) + m.headers = headers + m.id = cmd.id + messages = self.session.incoming(cmd.destination) + messages.put(m) + msg.debug("RECV %s", m) + return INCOMPLETE diff --git a/RC5/python/qpid/spec.py b/RC5/python/qpid/spec.py new file mode 100644 index 0000000000..e6d914044c --- /dev/null +++ b/RC5/python/qpid/spec.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module loads protocol metadata into python objects. It provides +access to spec metadata via a python object model, and can also +dynamically creating python methods, classes, and modules based on the +spec metadata. All the generated methods have proper signatures and +doc strings based on the spec metadata so the python help system can +be used to browse the spec documentation. The generated methods all +dispatch to the self.invoke(meth, args) callback of the containing +class so that the generated code can be reused in a variety of +situations. +""" + +import os, mllib, spec08, spec010 + +def default(): + try: + amqp_spec = os.environ["AMQP_SPEC"] + return amqp_spec + except KeyError: + try: + from qpid_config import amqp_spec + return amqp_spec + except ImportError: + raise Exception("unable to locate the amqp specification, please set " + "the AMQP_SPEC environment variable or supply " + "qpid_config.py on the PYTHONPATH") + +def load(specfile, *errata): + for name in (specfile,) + errata: + if not os.path.exists(name): + raise IOError("No such file or directory: '%s'" % name) + + doc = mllib.xml_parse(specfile) + major = doc["amqp/@major"] + minor = doc["amqp/@minor"] + + if major == "0" and minor == "10": + return spec010.load(specfile, *errata) + else: + return spec08.load(specfile, *errata) diff --git a/RC5/python/qpid/spec010.py b/RC5/python/qpid/spec010.py new file mode 100644 index 0000000000..cbc85a5e8b --- /dev/null +++ b/RC5/python/qpid/spec010.py @@ -0,0 +1,693 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, cPickle, datatypes, datetime +from codec010 import StringCodec +from util import mtime, fill + +class Node: + + def __init__(self, children): + self.children = children + self.named = {} + self.docs = [] + self.rules = [] + + def register(self): + for ch in self.children: + ch.register(self) + + def resolve(self): + for ch in self.children: + ch.resolve() + + def __getitem__(self, name): + path = name.split(".", 1) + nd = self.named + for step in path: + nd = nd[step] + return nd + + def __iter__(self): + return iter(self.children) + +class Anonymous: + + def __init__(self, children): + self.children = children + + def register(self, node): + for ch in self.children: + ch.register(node) + + def resolve(self): + for ch in self.children: + ch.resolve() + +class Named: + + def __init__(self, name): + self.name = name + self.qname = None + + def register(self, node): + self.spec = node.spec + self.klass = node.klass + node.named[self.name] = self + if node.qname: + self.qname = "%s.%s" % (node.qname, self.name) + else: + self.qname = self.name + + def __str__(self): + return self.qname + + def __repr__(self): + return str(self) + +class Lookup: + + def lookup(self, name): + value = None + if self.klass: + try: + value = self.klass[name] + except KeyError: + pass + if not value: + value = self.spec[name] + return value + +class Coded: + + def __init__(self, code): + self.code = code + +class Constant(Named, Node): + + def __init__(self, name, value, children): + Named.__init__(self, name) + Node.__init__(self, children) + self.value = value + + def register(self, node): + Named.register(self, node) + node.constants.append(self) + Node.register(self) + +class Type(Named, Node): + + def __init__(self, name, children): + Named.__init__(self, name) + Node.__init__(self, children) + + def is_present(self, value): + return value != None + + def register(self, node): + Named.register(self, node) + Node.register(self) + +class Primitive(Coded, Type): + + def __init__(self, name, code, fixed, variable, children): + Coded.__init__(self, code) + Type.__init__(self, name, children) + self.fixed = fixed + self.variable = variable + + def register(self, node): + Type.register(self, node) + if self.code is not None: + self.spec.types[self.code] = self + + def is_present(self, value): + if self.fixed == 0: + return value + else: + return Type.is_present(self, value) + + def encode(self, codec, value): + getattr(codec, "write_%s" % self.name)(value) + + def decode(self, codec): + return getattr(codec, "read_%s" % self.name)() + +class Domain(Type, Lookup): + + def __init__(self, name, type, children): + Type.__init__(self, name, children) + self.type = type + self.choices = {} + + def resolve(self): + self.type = self.lookup(self.type) + Node.resolve(self) + + def encode(self, codec, value): + self.type.encode(codec, value) + + def decode(self, codec): + return self.type.decode(codec) + +class Enum: + + def __init__(self, name): + self.name = name + self._names = () + self._values = () + + def values(self): + return self._values + + def __repr__(self): + return "%s(%s)" % (self.name, ", ".join(self._names)) + +class Choice(Named, Node): + + def __init__(self, name, value, children): + Named.__init__(self, name) + Node.__init__(self, children) + self.value = value + + def register(self, node): + Named.register(self, node) + node.choices[self.value] = self + Node.register(self) + try: + enum = node.spec.enums[node.name] + except KeyError: + enum = Enum(node.name) + node.spec.enums[node.name] = enum + setattr(enum, self.name, self.value) + enum._names += (self.name,) + enum._values += (self.value,) + +class Composite(Type, Coded): + + def __init__(self, name, label, code, size, pack, children): + Coded.__init__(self, code) + Type.__init__(self, name, children) + self.label = label + self.fields = [] + self.size = size + self.pack = pack + + def new(self, args, kwargs): + return datatypes.Struct(self, *args, **kwargs) + + def decode(self, codec): + codec.read_size(self.size) + if self.code is not None: + code = codec.read_uint16() + assert self.code == code + return datatypes.Struct(self, **self.decode_fields(codec)) + + def decode_fields(self, codec): + flags = 0 + for i in range(self.pack): + flags |= (codec.read_uint8() << 8*i) + + result = {} + + for i in range(len(self.fields)): + f = self.fields[i] + if flags & (0x1 << i): + result[f.name] = f.type.decode(codec) + else: + result[f.name] = None + return result + + def encode(self, codec, value): + sc = StringCodec(self.spec) + if self.code is not None: + sc.write_uint16(self.code) + self.encode_fields(sc, value) + codec.write_size(self.size, len(sc.encoded)) + codec.write(sc.encoded) + + def encode_fields(self, codec, values): + flags = 0 + for i in range(len(self.fields)): + f = self.fields[i] + if f.type.is_present(values[f.name]): + flags |= (0x1 << i) + for i in range(self.pack): + codec.write_uint8((flags >> 8*i) & 0xFF) + for i in range(len(self.fields)): + f = self.fields[i] + if flags & (0x1 << i): + f.type.encode(codec, values[f.name]) + + def docstring(self): + docs = [] + if self.label: + docs.append(self.label) + docs += [d.text for d in self.docs] + s = "\n\n".join([fill(t, 2) for t in docs]) + for f in self.fields: + fdocs = [] + if f.label: + fdocs.append(f.label) + else: + fdocs.append("") + fdocs += [d.text for d in f.docs] + s += "\n\n" + "\n\n".join([fill(fdocs[0], 4, f.name)] + + [fill(t, 4) for t in fdocs[1:]]) + return s + + +class Field(Named, Node, Lookup): + + def __init__(self, name, label, type, children): + Named.__init__(self, name) + Node.__init__(self, children) + self.label = label + self.type = type + self.exceptions = [] + + def default(self): + return None + + def register(self, node): + Named.register(self, node) + node.fields.append(self) + Node.register(self) + + def resolve(self): + self.type = self.lookup(self.type) + Node.resolve(self) + + def __str__(self): + return "%s: %s" % (self.qname, self.type.qname) + +class Struct(Composite): + + def register(self, node): + Composite.register(self, node) + if self.code is not None: + self.spec.structs[self.code] = self + self.spec.structs_by_name[self.name] = self + self.pyname = self.name + self.pydoc = self.docstring() + + def __str__(self): + fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname) + for f in self.fields]) + return "%s {\n %s\n}" % (self.qname, fields) + +class Segment: + + def __init__(self): + self.segment_type = None + + def register(self, node): + self.spec = node.spec + self.klass = node.klass + node.segments.append(self) + Node.register(self) + +class Instruction(Composite, Segment): + + def __init__(self, name, label, code, children): + Composite.__init__(self, name, label, code, 0, 2, children) + Segment.__init__(self) + self.track = None + self.handlers = [] + + def __str__(self): + return "%s(%s)" % (self.qname, ", ".join(["%s: %s" % (f.name, f.type.qname) + for f in self.fields])) + + def register(self, node): + Composite.register(self, node) + self.pyname = self.qname.replace(".", "_") + self.pydoc = self.docstring() + self.spec.instructions[self.pyname] = self + +class Control(Instruction): + + def __init__(self, name, code, label, children): + Instruction.__init__(self, name, code, label, children) + self.response = None + + def register(self, node): + Instruction.register(self, node) + node.controls.append(self) + self.spec.controls[self.code] = self + self.segment_type = self.spec["segment_type.control"].value + self.track = self.spec["track.control"].value + +class Command(Instruction): + + def __init__(self, name, label, code, children): + Instruction.__init__(self, name, label, code, children) + self.result = None + self.exceptions = [] + self.segments = [] + + def register(self, node): + Instruction.register(self, node) + node.commands.append(self) + self.spec.commands[self.code] = self + self.segment_type = self.spec["segment_type.command"].value + self.track = self.spec["track.command"].value + +class Header(Segment, Node): + + def __init__(self, children): + Segment.__init__(self) + Node.__init__(self, children) + self.entries = [] + + def register(self, node): + Segment.register(self, node) + self.segment_type = self.spec["segment_type.header"].value + Node.register(self) + +class Entry(Lookup): + + def __init__(self, type): + self.type = type + + def register(self, node): + self.spec = node.spec + self.klass = node.klass + node.entries.append(self) + + def resolve(self): + self.type = self.lookup(self.type) + +class Body(Segment, Node): + + def __init__(self, children): + Segment.__init__(self) + Node.__init__(self, children) + + def register(self, node): + Segment.register(self, node) + self.segment_type = self.spec["segment_type.body"].value + Node.register(self) + + def resolve(self): pass + +class Class(Named, Coded, Node): + + def __init__(self, name, code, children): + Named.__init__(self, name) + Coded.__init__(self, code) + Node.__init__(self, children) + self.controls = [] + self.commands = [] + + def register(self, node): + Named.register(self, node) + self.klass = self + node.classes.append(self) + Node.register(self) + +class Doc: + + def __init__(self, type, title, text): + self.type = type + self.title = title + self.text = text + + def register(self, node): + node.docs.append(self) + + def resolve(self): pass + +class Role(Named, Node): + + def __init__(self, name, children): + Named.__init__(self, name) + Node.__init__(self, children) + + def register(self, node): + Named.register(self, node) + Node.register(self) + +class Rule(Named, Node): + + def __init__(self, name, children): + Named.__init__(self, name) + Node.__init__(self, children) + + def register(self, node): + Named.register(self, node) + node.rules.append(self) + Node.register(self) + +class Exception(Named, Node): + + def __init__(self, name, error_code, children): + Named.__init__(self, name) + Node.__init__(self, children) + self.error_code = error_code + + def register(self, node): + Named.register(self, node) + node.exceptions.append(self) + Node.register(self) + +class Spec(Node): + + ENCODINGS = { + basestring: "vbin16", + int: "int64", + long: "int64", + float: "float", + None.__class__: "void", + list: "list", + tuple: "list", + dict: "map", + datatypes.timestamp: "datetime", + datetime.datetime: "datetime" + } + + def __init__(self, major, minor, port, children): + Node.__init__(self, children) + self.major = major + self.minor = minor + self.port = port + self.constants = [] + self.classes = [] + self.types = {} + self.qname = None + self.spec = self + self.klass = None + self.instructions = {} + self.controls = {} + self.commands = {} + self.structs = {} + self.structs_by_name = {} + self.enums = {} + + def encoding(self, klass): + if Spec.ENCODINGS.has_key(klass): + return self.named[Spec.ENCODINGS[klass]] + for base in klass.__bases__: + result = self.encoding(base) + if result != None: + return result + +class Implement: + + def __init__(self, handle): + self.handle = handle + + def register(self, node): + node.handlers.append(self.handle) + + def resolve(self): pass + +class Response(Node): + + def __init__(self, name, children): + Node.__init__(self, children) + self.name = name + + def register(self, node): + Node.register(self) + +class Result(Node, Lookup): + + def __init__(self, type, children): + self.type = type + Node.__init__(self, children) + + def register(self, node): + node.result = self + self.qname = node.qname + self.klass = node.klass + self.spec = node.spec + Node.register(self) + + def resolve(self): + self.type = self.lookup(self.type) + Node.resolve(self) + +import mllib + +def num(s): + if s: return int(s, 0) + +REPLACE = {" ": "_", "-": "_"} +KEYWORDS = {"global": "global_", + "return": "return_"} + +def id(name): + name = str(name) + for key, val in REPLACE.items(): + name = name.replace(key, val) + try: + name = KEYWORDS[name] + except KeyError: + pass + return name + +class Loader: + + def __init__(self): + self.class_code = 0 + + def code(self, nd): + c = num(nd["@code"]) + if c is None: + return None + else: + return c | (self.class_code << 8) + + def list(self, q): + result = [] + for nd in q: + result.append(nd.dispatch(self)) + return result + + def children(self, n): + return self.list(n.query["#tag"]) + + def data(self, d): + return d.data + + def do_amqp(self, a): + return Spec(num(a["@major"]), num(a["@minor"]), num(a["@port"]), + self.children(a)) + + def do_type(self, t): + return Primitive(id(t["@name"]), self.code(t), num(t["@fixed-width"]), + num(t["@variable-width"]), self.children(t)) + + def do_constant(self, c): + return Constant(id(c["@name"]), num(c["@value"]), self.children(c)) + + def do_domain(self, d): + return Domain(id(d["@name"]), id(d["@type"]), self.children(d)) + + def do_enum(self, e): + return Anonymous(self.children(e)) + + def do_choice(self, c): + return Choice(id(c["@name"]), num(c["@value"]), self.children(c)) + + def do_class(self, c): + code = num(c["@code"]) + self.class_code = code + children = self.children(c) + children += self.list(c.query["command/result/struct"]) + self.class_code = 0 + return Class(id(c["@name"]), code, children) + + def do_doc(self, doc): + text = reduce(lambda x, y: x + y, self.list(doc.children)) + return Doc(doc["@type"], doc["@title"], text) + + def do_xref(self, x): + return x["@ref"] + + def do_role(self, r): + return Role(id(r["@name"]), self.children(r)) + + def do_control(self, c): + return Control(id(c["@name"]), c["@label"], self.code(c), self.children(c)) + + def do_rule(self, r): + return Rule(id(r["@name"]), self.children(r)) + + def do_implement(self, i): + return Implement(id(i["@handle"])) + + def do_response(self, r): + return Response(id(r["@name"]), self.children(r)) + + def do_field(self, f): + return Field(id(f["@name"]), f["@label"], id(f["@type"]), self.children(f)) + + def do_struct(self, s): + return Struct(id(s["@name"]), s["@label"], self.code(s), num(s["@size"]), + num(s["@pack"]), self.children(s)) + + def do_command(self, c): + return Command(id(c["@name"]), c["@label"], self.code(c), self.children(c)) + + def do_segments(self, s): + return Anonymous(self.children(s)) + + def do_header(self, h): + return Header(self.children(h)) + + def do_entry(self, e): + return Entry(id(e["@type"])) + + def do_body(self, b): + return Body(self.children(b)) + + def do_result(self, r): + type = r["@type"] + if not type: + type = r["struct/@name"] + return Result(id(type), self.list(r.query["#tag", lambda x: x.name != "struct"])) + + def do_exception(self, e): + return Exception(id(e["@name"]), id(e["@error-code"]), self.children(e)) + +def load(xml): + fname = xml + ".pcl" + + if os.path.exists(fname) and mtime(fname) > mtime(__file__): + file = open(fname, "r") + s = cPickle.load(file) + file.close() + else: + doc = mllib.xml_parse(xml) + s = doc["amqp"].dispatch(Loader()) + s.register() + s.resolve() + + try: + file = open(fname, "w") + except IOError: + file = None + + if file: + cPickle.dump(s, file) + file.close() + + return s diff --git a/RC5/python/qpid/spec08.py b/RC5/python/qpid/spec08.py new file mode 100644 index 0000000000..a0047e7107 --- /dev/null +++ b/RC5/python/qpid/spec08.py @@ -0,0 +1,504 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module loads protocol metadata into python objects. It provides +access to spec metadata via a python object model, and can also +dynamically creating python methods, classes, and modules based on the +spec metadata. All the generated methods have proper signatures and +doc strings based on the spec metadata so the python help system can +be used to browse the spec documentation. The generated methods all +dispatch to the self.invoke(meth, args) callback of the containing +class so that the generated code can be reused in a variety of +situations. +""" + +import re, new, mllib, qpid +from util import fill + +class SpecContainer: + + def __init__(self): + self.items = [] + self.byname = {} + self.byid = {} + self.indexes = {} + + def add(self, item): + if self.byname.has_key(item.name): + raise ValueError("duplicate name: %s" % item) + if item.id == None: + item.id = len(self) + elif self.byid.has_key(item.id): + raise ValueError("duplicate id: %s" % item) + self.indexes[item] = len(self.items) + self.items.append(item) + self.byname[item.name] = item + self.byid[item.id] = item + + def index(self, item): + try: + return self.indexes[item] + except KeyError: + raise ValueError(item) + + def __iter__(self): + return iter(self.items) + + def __len__(self): + return len(self.items) + +class Metadata: + + PRINT = [] + + def __init__(self): + pass + + def __str__(self): + args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT) + return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) + + def __repr__(self): + return str(self) + +class Spec(Metadata): + + PRINT=["major", "minor", "file"] + + def __init__(self, major, minor, file): + Metadata.__init__(self) + self.major = major + self.minor = minor + self.file = file + self.constants = SpecContainer() + self.domains = SpecContainer() + self.classes = SpecContainer() + # methods indexed by classname_methname + self.methods = {} + # structs by type code + self.structs = {} + + def post_load(self): + self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) + self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) + + def method(self, name): + if not self.methods.has_key(name): + for cls in self.classes: + clen = len(cls.name) + if name.startswith(cls.name) and name[clen] == "_": + end = name[clen + 1:] + if cls.methods.byname.has_key(end): + self.methods[name] = cls.methods.byname[end] + return self.methods.get(name) + + def parse_method(self, name): + parts = re.split(r"\s*\.\s*", name) + if len(parts) != 2: + raise ValueError(name) + klass, meth = parts + return self.classes.byname[klass].methods.byname[meth] + + def struct(self, name, *args, **kwargs): + type = self.domains.byname[name].type + return qpid.Struct(type, *args, **kwargs) + + def define_module(self, name, doc = None): + module = new.module(name, doc) + module.__file__ = self.file + for c in self.classes: + cls = c.define_class(c.name) + cls.__module__ = module.__name__ + setattr(module, c.name, cls) + return module + + def define_class(self, name): + methods = {} + for c in self.classes: + for m in c.methods: + meth = m.klass.name + "_" + m.name + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Constant(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, klass, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.klass = klass + self.docs = docs + +class Domain(Metadata): + + PRINT=["name", "type"] + + def __init__(self, spec, name, type, description, docs): + Metadata.__init__(self) + self.spec = spec + self.id = None + self.name = name + self.type = type + self.description = description + self.docs = docs + +class Struct(Metadata): + + PRINT=["size", "type", "pack"] + + def __init__(self, size, type, pack): + Metadata.__init__(self) + self.size = size + self.type = type + self.pack = pack + self.fields = SpecContainer() + +class Class(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, handler, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.handler = handler + self.fields = SpecContainer() + self.methods = SpecContainer() + self.docs = docs + + def define_class(self, name): + methods = {} + for m in self.methods: + methods[m.name] = m.define_method(m.name) + return type(name, (), methods) + +class Method(Metadata): + + PRINT=["name", "id"] + + def __init__(self, klass, name, id, content, responses, result, synchronous, + description, docs): + Metadata.__init__(self) + self.klass = klass + self.name = name + self.id = id + self.content = content + self.responses = responses + self.result = result + self.synchronous = synchronous + self.fields = SpecContainer() + self.description = description + self.docs = docs + self.response = False + + def is_l4_command(self): + return self.klass.name not in ["execution", "channel", "connection", "session"] + + def arguments(self, *args, **kwargs): + nargs = len(args) + len(kwargs) + maxargs = len(self.fields) + if nargs > maxargs: + self._type_error("takes at most %s arguments (%s) given", maxargs, nargs) + result = [] + for f in self.fields: + idx = self.fields.index(f) + if idx < len(args): + result.append(args[idx]) + elif kwargs.has_key(f.name): + result.append(kwargs.pop(f.name)) + else: + result.append(Method.DEFAULTS[f.type]) + for key, value in kwargs.items(): + if self.fields.byname.has_key(key): + self._type_error("got multiple values for keyword argument '%s'", key) + else: + self._type_error("got an unexpected keyword argument '%s'", key) + return tuple(result) + + def _type_error(self, msg, *args): + raise TypeError("%s %s" % (self.name, msg % args)) + + def docstring(self): + s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) + for f in self.fields: + if f.docs: + s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] + + [fill(d, 4) for d in f.docs[1:]]) + if self.responses: + s += "\n\nValid responses: " + for r in self.responses: + s += r.name + " " + return s + + METHOD = "__method__" + DEFAULTS = {"bit": False, + "shortstr": "", + "longstr": "", + "table": {}, + "array": [], + "octet": 0, + "short": 0, + "long": 0, + "longlong": 0, + "timestamp": 0, + "content": None, + "uuid": "", + "rfc1982_long": 0, + "rfc1982_long_set": [], + "long_struct": None} + + def define_method(self, name): + g = {Method.METHOD: self} + l = {} + args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields] + methargs = args[:] + if self.content: + args += [("content", None)] + code = "def %s(self, %s):\n" % \ + (name, ", ".join(["%s = %r" % a for a in args])) + code += " %r\n" % self.docstring() + argnames = ", ".join([a[0] for a in methargs]) + code += " return self.invoke(%s" % Method.METHOD + if argnames: + code += ", (%s,)" % argnames + else: + code += ", ()" + if self.content: + code += ", content" + code += ")" + exec code in g, l + return l[name] + +class Field(Metadata): + + PRINT=["name", "id", "type"] + + def __init__(self, name, id, type, domain, description, docs): + Metadata.__init__(self) + self.name = name + self.id = id + self.type = type + self.domain = domain + self.description = description + self.docs = docs + + def default(self): + if isinstance(self.type, Struct): + return None + else: + return Method.DEFAULTS[self.type] + +WIDTHS = { + "octet": 1, + "short": 2, + "long": 4 + } + +def width(st, default=None): + if st in (None, "none", ""): + return default + else: + return WIDTHS[st] + +def get_result(nd, spec): + result = nd["result"] + if not result: return None + name = result["@domain"] + if name != None: return spec.domains.byname[name] + st_nd = result["struct"] + st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 + + int(st_nd["@type"]), width(st_nd["@pack"], 2)) + spec.structs[st.type] = st + load_fields(st_nd, st.fields, spec.domains.byname) + return st + +def get_desc(nd): + label = nd["@label"] + if not label: + label = nd.text() + if label: + label = label.strip() + return label + +def get_docs(nd): + return [n.text() for n in nd.query["doc"]] + +def load_fields(nd, l, domains): + for f_nd in nd.query["field"]: + type = f_nd["@domain"] + if type == None: + type = f_nd["@type"] + type = pythonize(type) + domain = None + while domains.has_key(type) and domains[type].type != type: + domain = domains[type] + type = domain.type + l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, domain, + get_desc(f_nd), get_docs(f_nd))) + +def load(specfile, *errata): + doc = mllib.xml_parse(specfile) + spec_root = doc["amqp"] + spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile) + + for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata): + # constants + for nd in root.query["constant"]: + val = nd["@value"] + if val.startswith("0x"): val = int(val, 16) + else: val = int(val) + const = Constant(spec, pythonize(nd["@name"]), val, nd["@class"], + get_docs(nd)) + try: + spec.constants.add(const) + except ValueError, e: + pass + #print "Warning:", e + + # domains are typedefs + structs = [] + for nd in root.query["domain"]: + type = nd["@type"] + if type == None: + st_nd = nd["struct"] + code = st_nd["@type"] + if code not in (None, "", "none"): + code = int(code) + type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2)) + if type.type != None: + spec.structs[type.type] = type + structs.append((type, st_nd)) + else: + type = pythonize(type) + domain = Domain(spec, pythonize(nd["@name"]), type, get_desc(nd), + get_docs(nd)) + spec.domains.add(domain) + + # structs + for st, st_nd in structs: + load_fields(st_nd, st.fields, spec.domains.byname) + + # classes + for c_nd in root.query["class"]: + cname = pythonize(c_nd["@name"]) + if spec.classes.byname.has_key(cname): + klass = spec.classes.byname[cname] + else: + klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"], + get_docs(c_nd)) + spec.classes.add(klass) + + added_methods = [] + load_fields(c_nd, klass.fields, spec.domains.byname) + for m_nd in c_nd.query["method"]: + mname = pythonize(m_nd["@name"]) + if klass.methods.byname.has_key(mname): + meth = klass.methods.byname[mname] + else: + meth = Method(klass, mname, + int(m_nd["@index"]), + m_nd["@content"] == "1", + [pythonize(nd["@name"]) for nd in m_nd.query["response"]], + get_result(m_nd, spec), + m_nd["@synchronous"] == "1", + get_desc(m_nd), + get_docs(m_nd)) + klass.methods.add(meth) + added_methods.append(meth) + load_fields(m_nd, meth.fields, spec.domains.byname) + # resolve the responses + for m in added_methods: + m.responses = [klass.methods.byname[r] for r in m.responses] + for resp in m.responses: + resp.response = True + + spec.post_load() + return spec + +REPLACE = {" ": "_", "-": "_"} +KEYWORDS = {"global": "global_", + "return": "return_"} + +def pythonize(name): + name = str(name) + for key, val in REPLACE.items(): + name = name.replace(key, val) + try: + name = KEYWORDS[name] + except KeyError: + pass + return name + +class Rule(Metadata): + + PRINT = ["text", "implement", "tests"] + + def __init__(self, text, implement, tests, path): + self.text = text + self.implement = implement + self.tests = tests + self.path = path + +def find_rules(node, rules): + if node.name == "rule": + rules.append(Rule(node.text, node.get("@implement"), + [ch.text for ch in node if ch.name == "test"], + node.path())) + if node.name == "doc" and node.get("@name") == "rule": + tests = [] + if node.has("@test"): + tests.append(node["@test"]) + rules.append(Rule(node.text, None, tests, node.path())) + for child in node: + find_rules(child, rules) + +def load_rules(specfile): + rules = [] + find_rules(xmlutil.parse(specfile), rules) + return rules + +def test_summary(): + template = """ + <html><head><title>AMQP Tests</title></head> + <body> + <table width="80%%" align="center"> + %s + </table> + </body> + </html> + """ + rows = [] + for rule in load_rules("amqp.org/specs/amqp7.xml"): + if rule.tests: + tests = ", ".join(rule.tests) + else: + tests = " " + rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>' + '<td><b>Implement:</b> %s</td>' + '<td><b>Tests:</b> %s</td></tr>' % + (rule.path[len("/root/amqp"):], rule.implement, tests)) + rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text) + rows.append('<tr><td colspan="3"> </td></tr>') + + print template % "\n".join(rows) diff --git a/RC5/python/qpid/testlib.py b/RC5/python/qpid/testlib.py new file mode 100644 index 0000000000..31f52169ae --- /dev/null +++ b/RC5/python/qpid/testlib.py @@ -0,0 +1,392 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Support library for qpid python tests. +# + +import sys, re, unittest, os, random, logging, traceback +import qpid.client, qpid.spec, qmf.console +import Queue +from fnmatch import fnmatch +from getopt import getopt, GetoptError +from qpid.content import Content +from qpid.message import Message + +#0-10 support +from qpid.connection import Connection +from qpid.spec010 import load +from qpid.util import connect, ssl, URL + +def findmodules(root): + """Find potential python modules under directory root""" + found = [] + for dirpath, subdirs, files in os.walk(root): + modpath = dirpath.replace(os.sep, '.') + if not re.match(r'\.svn$', dirpath): # Avoid SVN directories + for f in files: + match = re.match(r'(.+)\.py$', f) + if match and f != '__init__.py': + found.append('.'.join([modpath, match.group(1)])) + return found + +def default(value, default): + if (value == None): return default + else: return value + +class TestRunner: + + SPEC_FOLDER = "../specs" + + """Runs unit tests. + + Parses command line arguments, provides utility functions for tests, + runs the selected test suite. + """ + + def _die(self, message = None): + if message: print message + print """ +run-tests [options] [test*] +The name of a test is package.module.ClassName.testMethod +Options: + -?/-h/--help : this message + -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations: + 0-8 - use the default 0-8 specification. + 0-9 - use the default 0-9 specification. + 0-10-errata - use the 0-10 specification with qpid errata. + -e/--errata <errata.xml> : file containing amqp XML errata + -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to + -v/--verbose : verbose - lists tests as they are run. + -d/--debug : enable debug logging. + -i/--ignore <test> : ignore the named test. + -I/--ignore-file : file containing patterns to ignore. + -S/--skip-self-test : skips the client self tests in the 'tests folder' + -F/--spec-folder : folder that contains the specs to be loaded + """ + sys.exit(1) + + def setBroker(self, broker): + try: + self.url = URL(broker) + except ValueError: + self._die("'%s' is not a valid broker" % (broker)) + self.user = default(self.url.user, "guest") + self.password = default(self.url.password, "guest") + self.host = self.url.host + if self.url.scheme == URL.AMQPS: + self.ssl = True + default_port = 5671 + else: + self.ssl = False + default_port = 5672 + self.port = default(self.url.port, default_port) + + def ignoreFile(self, filename): + f = file(filename) + for line in f.readlines(): self.ignore.append(line.strip()) + f.close() + + def use08spec(self): + "True if we are running with the old 0-8 spec." + # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons. + return self.spec.major==8 and self.spec.minor==0 + + def use09spec(self): + "True if we are running with the 0-9 (non-wip) spec." + return self.spec.major==0 and self.spec.minor==9 + + def _parseargs(self, args): + # Defaults + self.setBroker("localhost") + self.verbose = 1 + self.ignore = [] + self.specfile = "0-8" + self.errata = [] + self.skip_self_test = False + + try: + opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:", + ["help", "spec", "errata=", "broker=", + "verbose", "skip-self-test", "ignore", + "ignore-file", "spec-folder"]) + except GetoptError, e: + self._die(str(e)) + for opt, value in opts: + if opt in ("-?", "-h", "--help"): self._die() + if opt in ("-s", "--spec"): self.specfile = value + if opt in ("-e", "--errata"): self.errata.append(value) + if opt in ("-b", "--broker"): self.setBroker(value) + if opt in ("-v", "--verbose"): self.verbose = 2 + if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) + if opt in ("-i", "--ignore"): self.ignore.append(value) + if opt in ("-I", "--ignore-file"): self.ignoreFile(value) + if opt in ("-S", "--skip-self-test"): self.skip_self_test = True + if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value + + # Abbreviations for default settings. + if (self.specfile == "0-10"): + self.spec = load(self.get_spec_file("amqp.0-10.xml")) + elif (self.specfile == "0-10-errata"): + self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml")) + else: + if (self.specfile == "0-8"): + self.specfile = self.get_spec_file("amqp.0-8.xml") + elif (self.specfile == "0-9"): + self.specfile = self.get_spec_file("amqp.0-9.xml") + self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) + + if (self.specfile == None): + self._die("No XML specification provided") + print "Using specification from:", self.specfile + + self.spec = qpid.spec.load(self.specfile, *self.errata) + + if len(self.tests) == 0: + if not self.skip_self_test: + self.tests=findmodules("tests") + if self.use08spec() or self.use09spec(): + self.tests+=findmodules("tests_0-8") + elif (self.spec.major == 99 and self.spec.minor == 0): + self.tests+=findmodules("tests_0-10_preview") + elif (self.spec.major == 0 and self.spec.minor == 10): + self.tests+=findmodules("tests_0-10") + + def testSuite(self): + class IgnoringTestSuite(unittest.TestSuite): + def addTest(self, test): + if isinstance(test, unittest.TestCase): + for pattern in testrunner.ignore: + if fnmatch(test.id(), pattern): + return + unittest.TestSuite.addTest(self, test) + + # Use our IgnoringTestSuite in the test loader. + unittest.TestLoader.suiteClass = IgnoringTestSuite + return unittest.defaultTestLoader.loadTestsFromNames(self.tests) + + def run(self, args=sys.argv[1:]): + self._parseargs(args) + runner = unittest.TextTestRunner(descriptions=False, + verbosity=self.verbose) + result = runner.run(self.testSuite()) + + if (self.ignore): + print "=======================================" + print "NOTE: the following tests were ignored:" + for t in self.ignore: print t + print "=======================================" + + return result.wasSuccessful() + + def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): + """Connect to the broker, returns a qpid.client.Client""" + host = host or self.host + port = port or self.port + spec = spec or self.spec + user = user or self.user + password = password or self.password + client = qpid.client.Client(host, port, spec) + if self.use08spec(): + client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) + else: + client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) + return client + + def get_spec_file(self, fname): + return TestRunner.SPEC_FOLDER + os.sep + fname + +# Global instance for tests to call connect. +testrunner = TestRunner() + + +class TestBase(unittest.TestCase): + """Base class for Qpid test cases. + + self.client is automatically connected with channel 1 open before + the test methods are run. + + Deletes queues and exchanges after. Tests call + self.queue_declare(channel, ...) and self.exchange_declare(chanel, + ...) which are wrappers for the Channel functions that note + resources to clean up later. + """ + + def setUp(self): + self.queues = [] + self.exchanges = [] + self.client = self.connect() + self.channel = self.client.channel(1) + self.version = (self.client.spec.major, self.client.spec.minor) + if self.version == (8, 0) or self.version == (0, 9): + self.channel.channel_open() + else: + self.channel.session_open() + + def tearDown(self): + try: + for ch, q in self.queues: + ch.queue_delete(queue=q) + for ch, ex in self.exchanges: + ch.exchange_delete(exchange=ex) + except: + print "Error on tearDown:" + print traceback.print_exc() + + if not self.client.closed: + self.client.channel(0).connection_close(reply_code=200) + else: + self.client.close() + + def connect(self, *args, **keys): + """Create a new connction, return the Client object""" + return testrunner.connect(*args, **keys) + + def queue_declare(self, channel=None, *args, **keys): + channel = channel or self.channel + reply = channel.queue_declare(*args, **keys) + self.queues.append((channel, keys["queue"])) + return reply + + def exchange_declare(self, channel=None, ticket=0, exchange='', + type='', passive=False, durable=False, + auto_delete=False, + arguments={}): + channel = channel or self.channel + reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments) + self.exchanges.append((channel,exchange)) + return reply + + def uniqueString(self): + """Generate a unique string, unique for this TestBase instance""" + if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; + return "Test Message " + str(self.uniqueCounter) + + def consume(self, queueName): + """Consume from named queue returns the Queue object.""" + if testrunner.use08spec() or testrunner.use09spec(): + reply = self.channel.basic_consume(queue=queueName, no_ack=True) + return self.client.queue(reply.consumer_tag) + else: + if not "uniqueTag" in dir(self): self.uniqueTag = 1 + else: self.uniqueTag += 1 + consumer_tag = "tag" + str(self.uniqueTag) + self.channel.message_subscribe(queue=queueName, destination=consumer_tag) + self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + return self.client.queue(consumer_tag) + + def subscribe(self, channel=None, **keys): + channel = channel or self.channel + consumer_tag = keys["destination"] + channel.message_subscribe(**keys) + channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + + def assertEmpty(self, queue): + """Assert that the queue is empty""" + try: + queue.get(timeout=1) + self.fail("Queue is not empty.") + except Queue.Empty: None # Ignore + + def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): + """ + Publish to exchange and assert queue.get() returns the same message. + """ + body = self.uniqueString() + if testrunner.use08spec() or testrunner.use09spec(): + self.channel.basic_publish( + exchange=exchange, + content=Content(body, properties=properties), + routing_key=routing_key) + else: + self.channel.message_transfer( + destination=exchange, + content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) + msg = queue.get(timeout=1) + if testrunner.use08spec() or testrunner.use09spec(): + self.assertEqual(body, msg.content.body) + if (properties): + self.assertEqual(properties, msg.content.properties) + else: + self.assertEqual(body, msg.content.body) + if (properties): + self.assertEqual(properties, msg.content['application_headers']) + + def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): + """ + Publish a message and consume it, assert it comes back intact. + Return the Queue object used to consume. + """ + self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) + + def assertChannelException(self, expectedCode, message): + if self.version == (8, 0) or self.version == (0, 9): + if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) + self.assertEqual("channel", message.method.klass.name) + self.assertEqual("close", message.method.name) + else: + if not isinstance(message, Message): self.fail("expected session_closed method, got %s" % (message)) + self.assertEqual("session", message.method.klass.name) + self.assertEqual("closed", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + + + def assertConnectionException(self, expectedCode, message): + if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message)) + self.assertEqual("connection", message.method.klass.name) + self.assertEqual("close", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + +class TestBase010(unittest.TestCase): + """ + Base class for Qpid test cases. using the final 0-10 spec + """ + + def setUp(self): + self.conn = self.connect() + self.session = self.conn.session("test-session", timeout=10) + self.qmf = None + + def startQmf(self): + self.qmf = qmf.console.Session() + self.qmf_broker = self.qmf.addBroker(str(testrunner.url)) + + def connect(self, host=None, port=None): + sock = connect(host or testrunner.host, port or testrunner.port) + if testrunner.url.scheme == URL.AMQPS: + sock = ssl(sock) + conn = Connection(sock, testrunner.spec, username=testrunner.user, + password=testrunner.password) + conn.start(timeout=10) + return conn + + def tearDown(self): + if not self.session.error(): self.session.close(timeout=10) + self.conn.close(timeout=10) + if self.qmf: + self.qmf.delBroker(self.qmf_broker) + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) diff --git a/RC5/python/qpid/util.py b/RC5/python/qpid/util.py new file mode 100644 index 0000000000..bb7f5090df --- /dev/null +++ b/RC5/python/qpid/util.py @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, socket, time, textwrap, re + +ssl = socket.ssl + +def connect(host, port): + sock = socket.socket() + sock.connect((host, port)) + sock.setblocking(1) + # XXX: we could use this on read, but we'd have to put write in a + # loop as well + # sock.settimeout(1) + return sock + +def listen(host, port, predicate = lambda: True, bound = lambda: None): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.listen(5) + bound() + while predicate(): + s, a = sock.accept() + yield s + +def mtime(filename): + return os.stat(filename).st_mtime + +def wait(condition, predicate, timeout=None): + condition.acquire() + try: + passed = 0 + start = time.time() + while not predicate(): + if timeout is None: + condition.wait() + elif passed < timeout: + condition.wait(timeout - passed) + else: + return False + passed = time.time() - start + return True + finally: + condition.release() + +def notify(condition, action=lambda: None): + condition.acquire() + try: + action() + condition.notifyAll() + finally: + condition.release() + +def fill(text, indent, heading = None): + sub = indent * " " + if heading: + if not text: + return (indent - 2) * " " + heading + init = (indent - 2) * " " + heading + " -- " + else: + init = sub + w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub) + return w.fill(" ".join(text.split())) + +class URL: + + RE = re.compile(r""" + # [ <scheme>:// ] [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? ([^@:/]+) (?: :([0-9]+))?$ +""", re.X) + + AMQPS = "amqps" + AMQP = "amqp" + + def __init__(self, s): + match = URL.RE.match(s) + if match is None: + raise ValueError(s) + self.scheme, self.user, self.password, self.host, port = match.groups() + if port is None: + self.port = None + else: + self.port = int(port) + + def __repr__(self): + return "URL(%r)" % str(self) + + def __str__(self): + s = "" + if self.scheme: + s += "%s://" % self.scheme + if self.user: + s += self.user + if self.password: + s += "/%s" % self.password + s += "@" + s += self.host + if self.port: + s += ":%s" % self.port + return s |
