diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-03-04 20:03:09 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-03-04 20:03:09 +0000 |
| commit | 75f598b22ea4573cff2d47fdd689b85cee5dd88d (patch) | |
| tree | 964aa4463e2140c5040dd36137a49ab9c261f19a /python | |
| parent | 24435b9c62976e0a4c0857f86057d3c93389b79f (diff) | |
| download | qpid-python-75f598b22ea4573cff2d47fdd689b85cee5dd88d.tar.gz | |
import of in-process 0-10 final python client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633610 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rwxr-xr-x | python/hello-010-world | 24 | ||||
| -rw-r--r-- | python/qpid/assembler.py | 110 | ||||
| -rw-r--r-- | python/qpid/codec010.py | 186 | ||||
| -rw-r--r-- | python/qpid/connection010.py | 172 | ||||
| -rw-r--r-- | python/qpid/datatypes.py | 112 | ||||
| -rw-r--r-- | python/qpid/delegates.py | 111 | ||||
| -rw-r--r-- | python/qpid/framer.py | 124 | ||||
| -rw-r--r-- | python/qpid/invoker.py | 32 | ||||
| -rw-r--r-- | python/qpid/packer.py | 36 | ||||
| -rw-r--r-- | python/qpid/session.py | 208 | ||||
| -rw-r--r-- | python/qpid/spec010.py | 617 | ||||
| -rw-r--r-- | python/qpid/util.py | 39 | ||||
| -rwxr-xr-x | python/server010 | 34 | ||||
| -rw-r--r-- | python/tests/__init__.py | 5 | ||||
| -rw-r--r-- | python/tests/assembler.py | 77 | ||||
| -rw-r--r-- | python/tests/connection010.py | 137 | ||||
| -rw-r--r-- | python/tests/datatypes.py | 91 | ||||
| -rw-r--r-- | python/tests/framer.py | 92 | ||||
| -rw-r--r-- | python/tests/spec010.py | 60 |
19 files changed, 2267 insertions, 0 deletions
diff --git a/python/hello-010-world b/python/hello-010-world new file mode 100755 index 0000000000..7685af8fd3 --- /dev/null +++ b/python/hello-010-world @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +from qpid.connection010 import Connection +from qpid.spec010 import load +from qpid.util import connect +from qpid.datatypes import Message + +spec = load("../specs/amqp.0-10.xml") +conn = Connection(connect("0.0.0.0", spec.port), spec) +conn.start(timeout=10) + +ssn = conn.session("my-session") + +ssn.queue_declare("asdf") + +ssn.message_transfer("this", None, None, Message("testing...")) +ssn.message_transfer("is") +ssn.message_transfer("a") +ssn.message_transfer("test") + +print ssn.queue_query("testing") + +ssn.close(timeout=10) +conn.close(timeout=10) diff --git a/python/qpid/assembler.py b/python/qpid/assembler.py new file mode 100644 index 0000000000..e0e5d3fb72 --- /dev/null +++ b/python/qpid/assembler.py @@ -0,0 +1,110 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from codec010 import StringCodec +from framer import * + +class Segment: + + def __init__(self, first, last, type, track, channel, payload): + self.id = None + self.offset = None + self.first = first + self.last = last + self.type = type + self.track = track + self.channel = channel + self.payload = payload + + def decode(self, spec): + segs = spec["segment_type"] + choice = segs.choices[self.type] + return getattr(self, "decode_%s" % choice.name)(spec) + + def decode_control(self, spec): + sc = StringCodec(spec, self.payload) + return sc.read_control() + + def decode_command(self, spec): + sc = StringCodec(spec, self.payload) + return sc.read_command() + + def decode_header(self, spec): + sc = StringCodec(spec, self.payload) + values = [] + while len(sc.encoded) > 0: + values.append(sc.read_struct32()) + return values + + def decode_body(self, spec): + return self + + def __str__(self): + return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type, + self.track, self.channel, self.payload) + + def __repr__(self): + return str(self) + +class Assembler(Framer): + + def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD): + Framer.__init__(self, sock) + self.max_payload = max_payload + self.fragments = {} + + def read_segment(self): + while True: + frame = self.read_frame() + + key = (frame.channel, frame.track) + seg = self.fragments.get(key) + if seg == None: + seg = Segment(frame.isFirstSegment(), frame.isLastSegment(), + frame.type, frame.track, frame.channel, "") + self.fragments[key] = seg + + seg.payload += frame.payload + + if frame.isLastFrame(): + self.fragments.pop(key) + return seg + + def write_segment(self, segment): + remaining = segment.payload + + first = True + while remaining: + payload = remaining[:self.max_payload] + remaining = remaining[self.max_payload:] + + flags = 0 + if first: + flags |= FIRST_FRM + first = False + if not remaining: + flags |= LAST_FRM + if segment.first: + flags |= FIRST_SEG + if segment.last: + flags |= LAST_SEG + + frame = Frame(flags, segment.type, segment.track, segment.channel, + payload) + self.write_frame(frame) diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py new file mode 100644 index 0000000000..5894981fc6 --- /dev/null +++ b/python/qpid/codec010.py @@ -0,0 +1,186 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from packer import Packer + +class Codec(Packer): + + def __init__(self, spec): + self.spec = spec + + def write_bit(self, b): + if not b: raise ValueError(b) + def read_bit(self): + return True + + def read_uint8(self): + return self.unpack("!B") + def write_uint8(self, n): + return self.pack("!B", n) + + def read_int8(self): + return self.unpack("!b") + def write_int8(self, n): + self.pack("!b", n) + + def read_char(self): + return self.unpack("!c") + def write_char(self, c): + self.pack("!c", c) + + def read_boolean(self): + return self.read_uint8() != 0 + def write_boolean(self, b): + if b: n = 1 + else: n = 0 + self.write_uint8(n) + + + def read_uint16(self): + return self.unpack("!H") + def write_uint16(self, n): + self.pack("!H", n) + + def read_int16(self): + return self.unpack("!h") + def write_int16(self, n): + return self.unpack("!h", n) + + + def read_uint32(self): + return self.unpack("!L") + def write_uint32(self, n): + self.pack("!L", n) + + def read_int32(self): + return self.unpack("!l") + def write_int32(self, n): + self.pack("!l", n) + + def read_float(self): + return self.unpack("!f") + def write_float(self, f): + self.pack("!f", f) + + def read_sequence_no(self): + return self.read_uint32() + def write_sequence_no(self, n): + self.write_uint32(n) + + + def read_uint64(self): + return self.unpack("!Q") + def write_uint64(self, n): + self.pack("!Q", n) + + def read_int64(self): + return self.unpack("!q") + def write_int64(self, n): + self.pack("!q", n) + + def read_double(self): + return self.unpack("!d") + def write_double(self, d): + self.pack("!d", d) + + + def read_vbin8(self): + return self.read(self.read_uint8()) + def write_vbin8(self, b): + self.write_uint8(len(b)) + self.write(b) + + def read_str8(self): + return self.read_vbin8().decode("utf8") + def write_str8(self, s): + self.write_vbin8(s.encode("utf8")) + + + def read_vbin16(self): + return self.read(self.read_uint16()) + def write_vbin16(self, b): + self.write_uint16(len(b)) + self.write(b) + + def read_vbin32(self): + return self.read(self.read_uint32()) + def write_vbin32(self, b): + self.write_uint32(len(b)) + self.write(b) + + def write_map(self, m): + pass + def read_map(self): + pass + + def write_array(self, a): + pass + def read_array(self): + pass + + def read_struct32(self): + size = self.read_uint32() + code = self.read_uint16() + struct = self.spec.structs[code] + return struct.decode_fields(self) + def write_struct32(self, value): + sc = StringCodec(self.spec) + sc.write_uint16(value.type.code) + value.type.encode_fields(sc, value) + self.write_vbin32(sc.encoded) + + def read_control(self): + cntrl = self.spec.controls[self.read_uint16()] + return cntrl.decode(self) + def write_control(self, type, ctrl): + self.write_uint16(type.code) + type.encode(self, ctrl) + + def read_command(self): + cmd = self.spec.commands[self.read_uint16()] + return cmd.decode(self) + def write_command(self, type, cmd): + self.write_uint16(type.code) + type.encode(self, cmd) + + def read_size(self, width): + if width > 0: + attr = "read_uint%d" % (width*8) + return getattr(self, attr)() + + def write_size(self, width, n): + if width > 0: + attr = "write_uint%d" % (width*8) + getattr(self, attr)(n) + + + +class StringCodec(Codec): + + def __init__(self, spec, encoded = ""): + Codec.__init__(self, spec) + self.encoded = encoded + + def write(self, s): + self.encoded += s + + def read(self, n): + result = self.encoded[:n] + self.encoded = self.encoded[n:] + return result diff --git a/python/qpid/connection010.py b/python/qpid/connection010.py new file mode 100644 index 0000000000..b25efd37a8 --- /dev/null +++ b/python/qpid/connection010.py @@ -0,0 +1,172 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import datatypes, session +from threading import Thread, Event, RLock +from framer import Closed +from assembler import Assembler, Segment +from codec010 import StringCodec +from session import Session +from invoker import Invoker +from spec010 import Control, Command +import delegates + +class Timeout(Exception): pass + +class ChannelBusy(Exception): pass + +class ChannelsBusy(Exception): pass + +class SessionBusy(Exception): pass + +def client(*args): + return delegates.Client(*args) + +def server(*args): + return delegates.Server(*args) + +class Connection(Assembler): + + def __init__(self, sock, spec, delegate=client): + Assembler.__init__(self, sock) + self.spec = spec + self.track = self.spec["track"] + self.delegate = delegate(self) + self.attached = {} + self.sessions = {} + self.lock = RLock() + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) + self.opened = Event() + self.closed = Event() + self.channel_max = 65535 + + def attach(self, name, ch, delegate, force=False): + self.lock.acquire() + try: + ssn = self.attached.get(ch.id) + if ssn is not None: + if ssn.name != name: + raise ChannelBusy(ch, ssn) + else: + ssn = self.sessions.get(name) + if ssn is None: + ssn = Session(name, self.spec, delegate=delegate) + self.sessions[name] = ssn + elif ssn.channel is not None: + if force: + del self.attached[ssn.channel.id] + ssn.channel = None + else: + raise SessionBusy(ssn) + self.attached[ch.id] = ssn + ssn.channel = ch + ch.session = ssn + return ssn + finally: + self.lock.release() + + def detach(self, name, ch): + self.lock.acquire() + try: + self.attached.pop(ch.id, None) + ssn = self.sessions.pop(name, None) + if ssn is not None: + ssn.channel = None + return ssn + finally: + self.lock.release() + + def __channel(self): + # XXX: ch 0? + for i in xrange(self.channel_max): + if not self.attached.has_key(i): + return i + else: + raise ChannelsBusy() + + def session(self, name, timeout=None, delegate=session.client): + self.lock.acquire() + try: + ssn = self.attach(name, Channel(self, self.__channel()), delegate) + ssn.channel.session_attach(name) + ssn.opened.wait(timeout) + if ssn.opened.isSet(): + return ssn + else: + raise Timeout() + finally: + self.lock.release() + + def start(self, timeout=None): + self.delegate.start() + self.thread.start() + self.opened.wait(timeout=timeout) + if not self.opened.isSet(): + raise Timeout() + + def run(self): + # XXX: we don't really have a good way to exit this loop without + # getting the other end to kill the socket + while True: + try: + seg = self.read_segment() + except Closed: + break + self.delegate.received(seg) + + def close(self, timeout=None): + Channel(self, 0).connection_close() + self.closed.wait(timeout=timeout) + if not self.closed.isSet(): + raise Timeout() + self.thread.join(timeout=timeout) + + def __str__(self): + return "%s:%s" % self.sock.getsockname() + + def __repr__(self): + return str(self) + +class Channel(Invoker): + + def __init__(self, connection, id): + self.connection = connection + self.id = id + self.session = None + + def resolve_method(self, name): + inst = self.connection.spec.instructions.get(name) + if inst is not None and isinstance(inst, Control): + return inst + else: + return None + + def invoke(self, type, args, kwargs): + cntrl = type.new(args, kwargs) + sc = StringCodec(self.connection.spec) + sc.write_control(type, cntrl) + self.connection.write_segment(Segment(True, True, type.segment_type, + type.track, self.id, sc.encoded)) + + def __str__(self): + return "%s[%s]" % (self.connection, self.id) + + def __repr__(self): + return str(self) diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py new file mode 100644 index 0000000000..9e3177154e --- /dev/null +++ b/python/qpid/datatypes.py @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import threading + +class Struct: + + def __init__(self, fields): + self.__dict__ = fields + + def __repr__(self): + return "Struct(%s)" % ", ".join(["%s=%r" % (k, v) + for k, v in self.__dict__.items()]) + + def fields(self): + return self.__dict__ + +class Message: + + def __init__(self, body): + self.headers = None + self.body = body + +class Range: + + def __init__(self, lower, upper): + self.lower = lower + self.upper = upper + + def __contains__(self, n): + return self.lower <= n and n <= self.upper + + def touches(self, r): + return (self.lower - 1 in r or + self.upper + 1 in r or + r.lower - 1 in self or + r.upper + 1 in self) + + def span(self, r): + return Range(min(self.lower, r.lower), max(self.upper, r.upper)) + + def __str__(self): + return "Range(%s, %s)" % (self.lower, self.upper) + + def __repr__(self): + return str(self) + +class RangeSet: + + def __init__(self): + self.ranges = [] + + def __contains__(self, n): + for r in self.ranges: + if n in r: + return True + return False + + def add_range(self, range): + idx = 0 + while idx < len(self.ranges): + r = self.ranges[idx] + if range.touches(r): + del self.ranges[idx] + range = range.span(r) + elif range.upper < r.lower: + self.ranges.insert(idx, range) + return + else: + idx += 1 + self.ranges.append(range) + + def add(self, n): + self.add_range(Range(n, n)) + + def __str__(self): + return "RangeSet(%s)" % str(self.ranges) + + def __repr__(self): + return str(self) + +class Future: + def __init__(self, initial=None): + self.value = initial + self._set = threading.Event() + + def set(self, value): + self.value = value + self._set.set() + + def get(self, timeout=None): + self._set.wait(timeout) + return self.value + + def is_set(self): + return self._set.isSet() diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py new file mode 100644 index 0000000000..4fdcc37384 --- /dev/null +++ b/python/qpid/delegates.py @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import connection010 +import session + +class Delegate: + + def __init__(self, connection, delegate=session.client): + self.connection = connection + self.spec = connection.spec + self.delegate = delegate + self.control = self.spec["track.control"].value + + def received(self, seg): + ssn = self.connection.attached.get(seg.channel) + if ssn is None: + ch = connection010.Channel(self.connection, seg.channel) + else: + ch = ssn.channel + + if seg.track == self.control: + cntrl = seg.decode(self.spec) + attr = cntrl.type.qname.replace(".", "_") + getattr(self, attr)(ch, cntrl) + elif ssn is None: + ch.session_detached() + else: + ssn.received(seg) + + def connection_close(self, ch, close): + ch.connection_close_ok() + self.connection.sock.close() + + def connection_close_ok(self, ch, close_ok): + self.connection.closed.set() + + def session_attach(self, ch, a): + try: + self.connection.attach(a.name, ch, self.delegate, a.force) + ch.session_attached(a.name) + except connection010.ChannelBusy: + ch.session_detached(a.name) + except connection010.SessionBusy: + ch.session_detached(a.name) + + def session_attached(self, ch, a): + ch.session.opened.set() + + def session_detach(self, ch, d): + self.connection.detach(d.name, ch) + ch.session_detached(d.name) + + def session_detached(self, ch, d): + ssn = self.connection.detach(d.name, ch) + if ssn is not None: + ssn.closed.set() + + def session_command_point(self, ch, cp): + ssn = ch.session + ssn.receiver.next_id = cp.command_id + ssn.receiver.next_offset = cp.command_offset + +class Server(Delegate): + + def start(self): + self.connection.read_header() + self.connection.write_header(self.spec.major, self.spec.minor) + connection010.Channel(self.connection, 0).connection_start() + + def connection_start_ok(self, ch, start_ok): + ch.connection_tune() + + def connection_tune_ok(self, ch, tune_ok): + pass + + def connection_open(self, ch, open): + self.connection.opened.set() + ch.connection_open_ok() + +class Client(Delegate): + + def start(self): + self.connection.write_header(self.spec.major, self.spec.minor) + self.connection.read_header() + + def connection_start(self, ch, start): + ch.connection_start_ok() + + def connection_tune(self, ch, tune): + ch.connection_tune_ok() + ch.connection_open() + + def connection_open_ok(self, ch, open_ok): + self.connection.opened.set() diff --git a/python/qpid/framer.py b/python/qpid/framer.py new file mode 100644 index 0000000000..adc52cc3bd --- /dev/null +++ b/python/qpid/framer.py @@ -0,0 +1,124 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import struct, socket +from packer import Packer + +FIRST_SEG = 0x08 +LAST_SEG = 0x04 +FIRST_FRM = 0x02 +LAST_FRM = 0x01 + +class Frame: + + HEADER = "!2BHxBH4x" + MAX_PAYLOAD = 65535 - struct.calcsize(HEADER) + + def __init__(self, flags, type, track, channel, payload): + if len(payload) > Frame.MAX_PAYLOAD: + raise ValueError("max payload size exceeded: %s" % len(payload)) + self.flags = flags + self.type = type + self.track = track + self.channel = channel + self.payload = payload + + def isFirstSegment(self): + return bool(FIRST_SEG & self.flags) + + def isLastSegment(self): + return bool(LAST_SEG & self.flags) + + def isFirstFrame(self): + return bool(FIRST_FRM & self.flags) + + def isLastFrame(self): + return bool(LAST_FRM & self.flags) + + def __str__(self): + return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()), + int(self.isLastSegment()), + int(self.isFirstFrame()), + int(self.isLastFrame()), + self.type, + self.track, + self.channel, + self.payload) + +class Closed(Exception): pass + +class Framer(Packer): + + HEADER="!4s4B" + + def __init__(self, sock): + self.sock = sock + + def aborted(self): + return False + + def write(self, buf): +# print "OUT: %r" % buf + while buf: + try: + n = self.sock.send(buf) + except socket.timeout: + if self.aborted(): + raise Closed() + else: + continue + buf = buf[n:] + + def read(self, n): + data = "" + while len(data) < n: + try: + s = self.sock.recv(n - len(data)) + except socket.timeout: + if self.aborted(): + raise Closed() + else: + continue + except socket.error, e: + if data != "": + raise e + else: + raise Closed() + if len(s) == 0: + raise Closed() +# print "IN: %r" % s + data += s + return data + + def read_header(self): + return self.unpack(Framer.HEADER) + + def write_header(self, major, minor): + self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor) + + def write_frame(self, frame): + size = len(frame.payload) + struct.calcsize(Frame.HEADER) + track = frame.track & 0x0F + self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel) + self.write(frame.payload) + + def read_frame(self): + flags, type, size, track, channel = self.unpack(Frame.HEADER) + payload = self.read(size - struct.calcsize(Frame.HEADER)) + return Frame(flags, type, track, channel, payload) diff --git a/python/qpid/invoker.py b/python/qpid/invoker.py new file mode 100644 index 0000000000..9e6f6943d8 --- /dev/null +++ b/python/qpid/invoker.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Invoker: + + def resolve_method(self, name): + pass + + def __getattr__(self, name): + resolved = self.resolve_method(name) + if resolved == None: + raise AttributeError("%s instance has no attribute '%s'" % + (self.__class__.__name__, name)) + method = lambda *args, **kwargs: self.invoke(resolved, args, kwargs) + self.__dict__[name] = method + return method diff --git a/python/qpid/packer.py b/python/qpid/packer.py new file mode 100644 index 0000000000..22c16918dc --- /dev/null +++ b/python/qpid/packer.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import struct + +class Packer: + + def read(self, n): abstract + + def write(self, s): abstract + + def unpack(self, fmt): + values = struct.unpack(fmt, self.read(struct.calcsize(fmt))) + if len(values) == 1: + return values[0] + else: + return values + + def pack(self, fmt, *args): + self.write(struct.pack(fmt, *args)) diff --git a/python/qpid/session.py b/python/qpid/session.py new file mode 100644 index 0000000000..2e5f47b63e --- /dev/null +++ b/python/qpid/session.py @@ -0,0 +1,208 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import Event +from invoker import Invoker +from datatypes import RangeSet, Struct, Future +from codec010 import StringCodec +from assembler import Segment + +class SessionDetached(Exception): pass + +def client(*args): + return Client(*args) + +def server(*args): + return Server(*args) + +class Session(Invoker): + + def __init__(self, name, spec, sync=True, timeout=10, delegate=client): + self.name = name + self.spec = spec + self.sync = sync + self.timeout = timeout + self.channel = None + self.opened = Event() + self.closed = Event() + self.receiver = Receiver(self) + self.sender = Sender(self) + self.delegate = delegate(self) + self.send_id = True + self.results = {} + + def close(self, timeout=None): + self.channel.session_detach(self.name) + self.closed.wait(timeout=timeout) + + def resolve_method(self, name): + cmd = self.spec.instructions.get(name) + if cmd is not None and cmd.track == self.spec["track.command"].value: + return cmd + else: + return None + + def invoke(self, type, args, kwargs): + if self.channel == None: + raise SessionDetached() + + if type.segments: + if len(args) == len(type.fields) + 1: + message = args[-1] + args = args[:-1] + else: + message = kwargs.pop("message", None) + else: + message = None + + cmd = type.new(args, kwargs) + sc = StringCodec(self.spec) + sc.write_command(type, cmd) + + seg = Segment(True, (message == None or + (message.headers == None and message.body == None)), + type.segment_type, type.track, self.channel.id, sc.encoded) + + if type.result: + result = Future() + self.results[self.sender.next_id] = result + + self.send(seg) + + if message != None: + if message.headers != None: + sc = StringCodec(self.spec) + for st in message.headers: + sc.write_struct32(st.type, st) + seg = Segment(False, message.body == None, self.spec["segment_type.header"].value, + type.track, self.channel.id, sc.encoded) + self.send(seg) + if message.body != None: + seg = Segment(False, True, self.spec["segment_type.body"].value, + type.track, self.channel.id, message.body) + self.send(seg) + + if type.result: + if self.sync: + return result.get(self.timeout) + else: + return result + + def received(self, seg): + self.receiver.received(seg) + if seg.type == self.spec["segment_type.command"].value: + cmd = seg.decode(self.spec) + attr = cmd.type.qname.replace(".", "_") + result = getattr(self.delegate, attr)(cmd) + if cmd.type.result: + self.execution_result(seg.id, result) + elif seg.type == self.spec["segment_type.header"].value: + self.delegate.header(seg.decode(self.spec)) + elif seg.type == self.spec["segment_type.body"].value: + self.delegate.body(seg.decode(self.spec)) + else: + raise ValueError("unknown segment type: %s" % seg.type) + self.receiver.completed(seg) + + def send(self, seg): + self.sender.send(seg) + + def __str__(self): + return '<Session: %s, %s>' % (self.name, self.channel) + + def __repr__(self): + return str(self) + +class Receiver: + + def __init__(self, session): + self.session = session + self.next_id = None + self.next_offset = None + self._completed = RangeSet() + + def received(self, seg): + if self.next_id == None or self.next_offset == None: + raise Exception("todo") + seg.id = self.next_id + seg.offset = self.next_offset + if seg.last: + self.next_id += 1 + self.next_offset = 0 + else: + self.next_offset += len(seg.payload) + + def completed(self, seg): + if seg.id == None: + raise ValueError("cannot complete unidentified segment") + if seg.last: + self._completed.add(seg.id) + +class Sender: + + def __init__(self, session): + self.session = session + self.next_id = 0 + self.next_offset = 0 + self.segments = [] + + def send(self, seg): + seg.id = self.next_id + seg.offset = self.next_offset + if seg.last: + self.next_id += 1 + self.next_offset = 0 + else: + self.next_offset += len(seg.payload) + self.segments.append(seg) + if self.session.send_id: + self.session.send_id = False + self.session.channel.session_command_point(seg.id, seg.offset) + self.session.channel.connection.write_segment(seg) + + def completed(self, commands): + idx = 0 + while idx < len(self.segments): + seg = self.segments[idx] + if seg.id in commands: + del self.segments[idx] + else: + idx += 1 + +from queue import Queue, Closed, Empty + +class Delegate: + + def __init__(self, session): + self.session = session + + def execution_result(self, er): + future = self.session.results[er.command_id] + future.set(er.value) + +class Client(Delegate): + + def message_transfer(self, cmd): + print "TRANSFER:", cmd + + def header(self, hdr): + print "HEADER:", hdr + + def body(self, seg): + print "BODY:", seg diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py new file mode 100644 index 0000000000..2d8bd6050d --- /dev/null +++ b/python/qpid/spec010.py @@ -0,0 +1,617 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import datatypes +from codec010 import StringCodec + +class Node: + + def __init__(self, children): + self.children = children + self.named = {} + self.docs = [] + self.rules = [] + + def register(self): + for ch in self.children: + ch.register(self) + + def resolve(self): + for ch in self.children: + ch.resolve() + + def __getitem__(self, name): + path = name.split(".", 1) + nd = self.named + for step in path: + nd = nd[step] + return nd + + def __iter__(self): + return iter(self.children) + +class Anonymous: + + def __init__(self, children): + self.children = children + + def register(self, node): + for ch in self.children: + ch.register(node) + + def resolve(self): + for ch in self.children: + ch.resolve() + +class Named: + + def __init__(self, name): + self.name = name + self.qname = None + + def register(self, node): + self.spec = node.spec + self.klass = node.klass + node.named[self.name] = self + if node.qname: + self.qname = "%s.%s" % (node.qname, self.name) + else: + self.qname = self.name + + def __str__(self): + return self.qname + + def __repr__(self): + return str(self) + +class Lookup: + + def lookup(self, name): + value = None + if self.klass: + try: + value = self.klass[name] + except KeyError: + pass + if not value: + value = self.spec[name] + return value + +class Coded: + + def __init__(self, code): + self.code = code + +class Constant(Named, Node): + + def __init__(self, name, value, children): + Named.__init__(self, name) + Node.__init__(self, children) + self.value = value + + def register(self, node): + Named.register(self, node) + node.constants.append(self) + Node.register(self) + +class Type(Named, Node): + + def __init__(self, name, children): + Named.__init__(self, name) + Node.__init__(self, children) + + def is_present(self, value): + return value != None + + def register(self, node): + Named.register(self, node) + node.types.append(self) + Node.register(self) + +class Primitive(Coded, Type): + + def __init__(self, name, code, fixed, variable, children): + Coded.__init__(self, code) + Type.__init__(self, name, children) + self.fixed = fixed + self.variable = variable + + def is_present(self, value): + if self.fixed == 0: + return value + else: + return Type.is_present(self, value) + + def encode(self, codec, value): + getattr(codec, "write_%s" % self.name)(value) + + def decode(self, codec): + return getattr(codec, "read_%s" % self.name)() + +class Domain(Type, Lookup): + + def __init__(self, name, type, children): + Type.__init__(self, name, children) + self.type = type + self.choices = {} + + def resolve(self): + self.type = self.lookup(self.type) + Node.resolve(self) + + def encode(self, codec, value): + self.type.encode(codec, value) + + def decode(self, codec): + return self.type.decode(codec) + +class Choice(Named, Node): + + def __init__(self, name, value, children): + Named.__init__(self, name) + Node.__init__(self, children) + self.value = value + + def register(self, node): + Named.register(self, node) + node.choices[self.value] = self + Node.register(self) + +class Composite(Type, Coded): + + def __init__(self, name, code, size, pack, children): + Coded.__init__(self, code) + Type.__init__(self, name, children) + self.fields = [] + self.size = size + self.pack = pack + + def new(self, args, kwargs): + if len(args) > len(self.fields): + raise TypeError("%s takes at most %s arguments (%s given)" % + (self.name, len(self.fields), len(self.args))) + + result = {"type": self} + + for a, f, in zip(args, self.fields): + result[f.name] = a + + for k, v in kwargs.items(): + f = self.named.get(k, None) + if f == None: + raise TypeError("%s got an unexpected keyword argument '%s'" % + (self.name, k)) + result[f.name] = v + + return datatypes.Struct(result) + + def decode(self, codec): + codec.read_size(self.size) + return self.decode_fields(codec) + + def decode_fields(self, codec): + flags = 0 + for i in range(self.pack): + flags |= (codec.read_uint8() << 8*i) + + result = {"type": self} + + for i in range(len(self.fields)): + f = self.fields[i] + if flags & (0x1 << i): + result[f.name] = f.type.decode(codec) + else: + result[f.name] = None + return datatypes.Struct(result) + + def encode(self, codec, value): + sc = StringCodec(self.spec) + self.encode_fields(sc, value) + codec.write_size(self.size, len(sc.encoded)) + codec.write(sc.encoded) + + def encode_fields(self, codec, value): + values = value.__dict__ + flags = 0 + for i in range(len(self.fields)): + f = self.fields[i] + if f.type.is_present(values.get(f.name, None)): + flags |= (0x1 << i) + for i in range(self.pack): + codec.write_uint8((flags >> 8*i) & 0xFF) + for i in range(len(self.fields)): + f = self.fields[i] + if flags & (0x1 << i): + f.type.encode(codec, values[f.name]) + +class Field(Named, Node, Lookup): + + def __init__(self, name, type, children): + Named.__init__(self, name) + Node.__init__(self, children) + self.type = type + self.exceptions = [] + + def register(self, node): + Named.register(self, node) + node.fields.append(self) + Node.register(self) + + def resolve(self): + self.type = self.lookup(self.type) + Node.resolve(self) + + def __str__(self): + return "%s: %s" % (self.qname, self.type.qname) + +class Struct(Composite): + + def register(self, node): + Composite.register(self, node) + self.spec.structs[self.code] = self + + def __str__(self): + fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname) + for f in self.fields]) + return "%s {\n %s\n}" % (self.qname, fields) + +class Segment(Node): + + def register(self, node): + self.spec = node.spec + self.klass = node.klass + node.segments.append(self) + Node.register(self) + +class Instruction(Composite, Segment): + + def __init__(self, name, code, children): + Composite.__init__(self, name, code, 0, 2, children) + self.segment_type = None + self.track = None + self.handlers = [] + + def __str__(self): + return "%s(%s)" % (self.qname, ", ".join(["%s: %s" % (f.name, f.type.qname) + for f in self.fields])) + + def register(self, node): + Composite.register(self, node) + self.spec.instructions[self.qname.replace(".", "_")] = self + +class Control(Instruction): + + def __init__(self, name, code, children): + Instruction.__init__(self, name, code, children) + self.response = None + + def register(self, node): + Instruction.register(self, node) + node.controls.append(self) + self.spec.controls[self.code] = self + self.segment_type = self.spec["segment_type.control"].value + self.track = self.spec["track.control"].value + +class Command(Instruction): + + def __init__(self, name, code, children): + Instruction.__init__(self, name, code, children) + self.result = None + self.exceptions = [] + self.segments = [] + + def register(self, node): + Instruction.register(self, node) + node.commands.append(self) + self.header = self.spec["session.header"] + self.spec.commands[self.code] = self + self.segment_type = self.spec["segment_type.command"].value + self.track = self.spec["track.command"].value + + def decode(self, codec): + hdr = self.header.decode(codec) + args = Instruction.decode(self, codec) + result = {} + result.update(hdr.fields()) + result.update(args.fields()) + return datatypes.Struct(result) + + def encode(self, codec, cmd): + self.header.encode(codec, cmd) + Instruction.encode(self, codec, cmd) + +class Header(Segment): + + def __init__(self, children): + self.entries = [] + Segment.__init__(self, children) + +class Entry(Lookup): + + def __init__(self, type): + self.type = type + + def register(self, node): + self.spec = node.spec + self.klass = node.klass + node.entries.append(self) + + def resolve(self): + self.type = self.lookup(self.type) + +class Body(Segment): + + def resolve(self): pass + +class Class(Named, Coded, Node): + + def __init__(self, name, code, children): + Named.__init__(self, name) + Coded.__init__(self, code) + Node.__init__(self, children) + self.types = [] + self.controls = [] + self.commands = [] + + def register(self, node): + Named.register(self, node) + self.klass = self + node.classes.append(self) + Node.register(self) + +class Doc: + + def __init__(self, type, title, text): + self.type = type + self.title = title + self.text = text + + def register(self, node): + node.docs.append(self) + + def resolve(self): pass + +class Role(Named, Node): + + def __init__(self, name, children): + Named.__init__(self, name) + Node.__init__(self, children) + + def register(self, node): + Named.register(self, node) + Node.register(self) + +class Rule(Named, Node): + + def __init__(self, name, children): + Named.__init__(self, name) + Node.__init__(self, children) + + def register(self, node): + Named.register(self, node) + node.rules.append(self) + Node.register(self) + +class Exception(Named, Node): + + def __init__(self, name, error_code, children): + Named.__init__(self, name) + Node.__init__(self, children) + self.error_code = error_code + + def register(self, node): + Named.register(self, node) + node.exceptions.append(self) + Node.register(self) + +class Spec(Node): + + def __init__(self, major, minor, port, children): + Node.__init__(self, children) + self.major = major + self.minor = minor + self.port = port + self.constants = [] + self.classes = [] + self.types = [] + self.qname = None + self.spec = self + self.klass = None + self.instructions = {} + self.controls = {} + self.commands = {} + self.structs = {} + +class Implement: + + def __init__(self, handle): + self.handle = handle + + def register(self, node): + node.handlers.append(self.handle) + + def resolve(self): pass + +class Response(Node): + + def __init__(self, name, children): + Node.__init__(self, children) + self.name = name + + def register(self, node): + Node.register(self) + +class Result(Node, Lookup): + + def __init__(self, type, children): + self.type = type + Node.__init__(self, children) + + def register(self, node): + node.result = self + self.qname = node.qname + self.klass = node.klass + self.spec = node.spec + Node.register(self) + + def resolve(self): + self.type = self.lookup(self.type) + Node.resolve(self) + +import mllib + +def num(s): + if s: return int(s, 0) + +REPLACE = {" ": "_", "-": "_"} +KEYWORDS = {"global": "global_", + "return": "return_"} + +def id(name): + name = str(name) + for key, val in REPLACE.items(): + name = name.replace(key, val) + try: + name = KEYWORDS[name] + except KeyError: + pass + return name + +class Loader: + + def __init__(self): + self.class_code = 0 + + def code(self, nd): + c = num(nd["@code"]) + if c is None: + return None + else: + return c | (self.class_code << 8) + + def list(self, q): + result = [] + for nd in q: + result.append(nd.dispatch(self)) + return result + + def children(self, n): + return self.list(n.query["#tag"]) + + def data(self, d): + return d.data + + def do_amqp(self, a): + return Spec(num(a["@major"]), num(a["@minor"]), num(a["@port"]), + self.children(a)) + + def do_type(self, t): + return Primitive(id(t["@name"]), self.code(t), num(t["@fixed-width"]), + num(t["@variable-width"]), self.children(t)) + + def do_constant(self, c): + return Constant(id(c["@name"]), num(c["@value"]), self.children(c)) + + def do_domain(self, d): + return Domain(id(d["@name"]), id(d["@type"]), self.children(d)) + + def do_enum(self, e): + return Anonymous(self.children(e)) + + def do_choice(self, c): + return Choice(id(c["@name"]), num(c["@value"]), self.children(c)) + + def do_class(self, c): + code = num(c["@code"]) + self.class_code = code + children = self.children(c) + children += self.list(c.query["command/result/struct"]) + self.class_code = 0 + return Class(id(c["@name"]), code, children) + + def do_doc(self, doc): + text = reduce(lambda x, y: x + y, self.list(doc.children)) + return Doc(doc["@type"], doc["@title"], text) + + def do_xref(self, x): + return x["@ref"] + + def do_role(self, r): + return Role(id(r["@name"]), self.children(r)) + + def do_control(self, c): + return Control(id(c["@name"]), self.code(c), self.children(c)) + + def do_rule(self, r): + return Rule(id(r["@name"]), self.children(r)) + + def do_implement(self, i): + return Implement(id(i["@handle"])) + + def do_response(self, r): + return Response(id(r["@name"]), self.children(r)) + + def do_field(self, f): + return Field(id(f["@name"]), id(f["@type"]), self.children(f)) + + def do_struct(self, s): + return Struct(id(s["@name"]), self.code(s), num(s["@size"]), + num(s["@pack"]), self.children(s)) + + def do_command(self, c): + return Command(id(c["@name"]), self.code(c), self.children(c)) + + def do_segments(self, s): + return Anonymous(self.children(s)) + + def do_header(self, h): + return Header(self.children(h)) + + def do_entry(self, e): + return Entry(id(e["@type"])) + + def do_body(self, b): + return Body(self.children(b)) + + def do_result(self, r): + type = r["@type"] + if not type: + type = r["struct/@name"] + return Result(id(type), self.list(r.query["#tag", lambda x: x.name != "struct"])) + + def do_exception(self, e): + return Exception(id(e["@name"]), id(e["@error-code"]), self.children(e)) + +import os, cPickle + +def load(xml): + fname = xml + ".pcl" + if os.path.exists(fname): + file = open(fname, "r") + s = cPickle.load(file) + file.close() + else: + doc = mllib.xml_parse(xml) + s = doc["amqp"].dispatch(Loader()) + s.register() + s.resolve() + file = open(fname, "w") + cPickle.dump(s, file) + file.close() + return s diff --git a/python/qpid/util.py b/python/qpid/util.py new file mode 100644 index 0000000000..c88cc0c9d6 --- /dev/null +++ b/python/qpid/util.py @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import socket + +def connect(host, port): + sock = socket.socket() + sock.connect((host, port)) + sock.setblocking(1) + # XXX: we could use this on read, but we'd have to put write in a + # loop as well + # sock.settimeout(1) + return sock + +def listen(host, port, predicate = lambda: True, bound = lambda: None): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + bound() + sock.listen(5) + while predicate(): + s, a = sock.accept() + yield s diff --git a/python/server010 b/python/server010 new file mode 100755 index 0000000000..b0e13d1e9f --- /dev/null +++ b/python/server010 @@ -0,0 +1,34 @@ +#!/usr/bin/env python + +from qpid import delegates +from qpid.connection010 import Connection +from qpid.util import connect, listen +from qpid.spec010 import load +from qpid.session import Client + +spec = load("../specs/amqp.0-10.xml") + +class Server: + + def connection(self, connection): + return delegates.Server(connection, self.session) + + def session(self, session): + return SessionDelegate(session) + +class SessionDelegate(Client): + + def __init__(self, session): + self.session = session + + def queue_declare(self, qd): + print "Queue %s declared..." % qd.queue + + def queue_query(self, qq): + return qq.type.result.type.new((qq.queue,), {}) + +server = Server() + +for s in listen("0.0.0.0", spec.port): + conn = Connection(s, spec, server.connection) + conn.start(5) diff --git a/python/tests/__init__.py b/python/tests/__init__.py index 41dcc705e6..8e9eeb44d6 100644 --- a/python/tests/__init__.py +++ b/python/tests/__init__.py @@ -22,3 +22,8 @@ from codec import * from queue import * from spec import * +from framer import * +from assembler import * +from datatypes import * +from connection010 import * +from spec010 import * diff --git a/python/tests/assembler.py b/python/tests/assembler.py new file mode 100644 index 0000000000..b76924e59d --- /dev/null +++ b/python/tests/assembler.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import * +from unittest import TestCase +from qpid.util import connect, listen +from qpid.assembler import * + +PORT = 1234 + +class AssemblerTest(TestCase): + + def setUp(self): + started = Event() + self.running = True + + def run(): + running = True + for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): + asm = Assembler(s) + try: + asm.write_header(*asm.read_header()[-2:]) + while True: + seg = asm.read_segment() + asm.write_segment(seg) + except Closed: + pass + + self.server = Thread(target=run) + self.server.setDaemon(True) + self.server.start() + + started.wait(3) + + def tearDown(self): + self.running = False + self.server.join() + + def test(self): + asm = Assembler(connect("0.0.0.0", PORT), max_payload = 1) + asm.write_header(0, 10) + asm.write_segment(Segment(True, False, 1, 2, 3, "TEST")) + asm.write_segment(Segment(False, True, 1, 2, 3, "ING")) + + assert asm.read_header() == ("AMQP", 1, 1, 0, 10) + + seg = asm.read_segment() + assert seg.first == True + assert seg.last == False + assert seg.type == 1 + assert seg.track == 2 + assert seg.channel == 3 + assert seg.payload == "TEST" + + seg = asm.read_segment() + assert seg.first == False + assert seg.last == True + assert seg.type == 1 + assert seg.track == 2 + assert seg.channel == 3 + assert seg.payload == "ING" diff --git a/python/tests/connection010.py b/python/tests/connection010.py new file mode 100644 index 0000000000..5e4bf983da --- /dev/null +++ b/python/tests/connection010.py @@ -0,0 +1,137 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import * +from unittest import TestCase +from qpid.util import connect, listen +from qpid.connection010 import * +from qpid.datatypes import Message +from qpid.testlib import testrunner +from qpid.delegates import Server +from qpid.queue import Queue +from qpid.spec010 import load +from qpid.session import Delegate + +PORT = 1234 + +class TestServer: + + def __init__(self, queue): + self.queue = queue + + def connection(self, connection): + return Server(connection, delegate=self.session) + + def session(self, session): + return TestSession(session, self.queue) + +class TestSession(Delegate): + + def __init__(self, session, queue): + self.session = session + self.queue = queue + + def queue_query(self, qq): + return qq.type.result.type.new((qq.queue,), {}) + + def message_transfer(self, cmd): + self.queue.put(cmd) + + def body(self, body): + self.queue.put(body) + +class ConnectionTest(TestCase): + + def setUp(self): + self.spec = load(testrunner.get_spec_file("amqp.0-10.xml")) + self.queue = Queue() + self.running = True + started = Event() + + def run(): + ts = TestServer(self.queue) + for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): + conn = Connection(s, self.spec, ts.connection) + try: + conn.start(5) + except Closed: + pass + + self.server = Thread(target=run) + self.server.setDaemon(True) + self.server.start() + + started.wait(3) + + def tearDown(self): + self.running = False + connect("0.0.0.0", PORT).close() + self.server.join(3) + + def test(self): + c = Connection(connect("0.0.0.0", PORT), self.spec) + c.start(10) + + ssn1 = c.session("test1") + ssn2 = c.session("test2") + + assert ssn1 == c.sessions["test1"] + assert ssn2 == c.sessions["test2"] + assert ssn1.channel != None + assert ssn2.channel != None + assert ssn1 in c.attached.values() + assert ssn2 in c.attached.values() + + ssn1.close(5) + + assert ssn1.channel == None + assert ssn1 not in c.attached.values() + assert ssn2 in c.sessions.values() + + ssn2.close(5) + + assert ssn2.channel == None + assert ssn2 not in c.attached.values() + assert ssn2 not in c.sessions.values() + + ssn = c.session("session") + + assert ssn.channel != None + assert ssn in c.sessions.values() + + destinations = ("one", "two", "three") + + for d in destinations: + ssn.message_transfer(d) + + for d in destinations: + cmd = self.queue.get(10) + assert cmd.destination == d + + msg = Message("this is a test") + ssn.message_transfer("four", message=msg) + cmd = self.queue.get(10) + assert cmd.destination == "four" + body = self.queue.get(10) + assert body.payload == msg.body + assert body.last + + qq = ssn.queue_query("asdf") + assert qq.queue == "asdf" + c.close(5) diff --git a/python/tests/datatypes.py b/python/tests/datatypes.py new file mode 100644 index 0000000000..cafd53c89f --- /dev/null +++ b/python/tests/datatypes.py @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from unittest import TestCase +from qpid.datatypes import * + +class RangeSetTest(TestCase): + + def check(self, ranges): + posts = [] + for range in ranges: + posts.append(range.lower) + posts.append(range.upper) + + sorted = posts[:] + sorted.sort() + + assert posts == sorted + + idx = 1 + while idx + 1 < len(posts): + assert posts[idx] + 1 != posts[idx+1] + idx += 2 + + def test(self): + rs = RangeSet() + + self.check(rs.ranges) + + rs.add(1) + + assert 1 in rs + assert 2 not in rs + assert 0 not in rs + self.check(rs.ranges) + + rs.add(2) + + assert 0 not in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + self.check(rs.ranges) + + rs.add(0) + + assert -1 not in rs + assert 0 in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + self.check(rs.ranges) + + rs.add(37) + + assert -1 not in rs + assert 0 in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + assert 36 not in rs + assert 37 in rs + assert 38 not in rs + self.check(rs.ranges) + + rs.add(-1) + self.check(rs.ranges) + + rs.add(-3) + self.check(rs.ranges) + + rs.add_range(Range(1, 20)) + assert 21 not in rs + assert 20 in rs + self.check(rs.ranges) diff --git a/python/tests/framer.py b/python/tests/framer.py new file mode 100644 index 0000000000..ea2e04e954 --- /dev/null +++ b/python/tests/framer.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import * +from unittest import TestCase +from qpid.util import connect, listen +from qpid.framer import * + +PORT = 1234 + +class FramerTest(TestCase): + + def setUp(self): + self.running = True + started = Event() + def run(): + for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): + conn = Framer(s) + try: + conn.write_header(*conn.read_header()[-2:]) + while True: + frame = conn.read_frame() + conn.write_frame(frame) + except Closed: + pass + + self.server = Thread(target=run) + self.server.setDaemon(True) + self.server.start() + + started.wait(3) + + def tearDown(self): + self.running = False + self.server.join(3) + + def test(self): + c = Framer(connect("0.0.0.0", PORT)) + + c.write_header(0, 10) + assert c.read_header() == ("AMQP", 1, 1, 0, 10) + + c.write_frame(Frame(FIRST_FRM, 1, 2, 3, "THIS")) + c.write_frame(Frame(0, 1, 2, 3, "IS")) + c.write_frame(Frame(0, 1, 2, 3, "A")) + c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST")) + + f = c.read_frame() + assert f.flags & FIRST_FRM + assert not (f.flags & LAST_FRM) + assert f.type == 1 + assert f.track == 2 + assert f.channel == 3 + assert f.payload == "THIS" + + f = c.read_frame() + assert f.flags == 0 + assert f.type == 1 + assert f.track == 2 + assert f.channel == 3 + assert f.payload == "IS" + + f = c.read_frame() + assert f.flags == 0 + assert f.type == 1 + assert f.track == 2 + assert f.channel == 3 + assert f.payload == "A" + + f = c.read_frame() + assert f.flags & LAST_FRM + assert not (f.flags & FIRST_FRM) + assert f.type == 1 + assert f.track == 2 + assert f.channel == 3 + assert f.payload == "TEST" diff --git a/python/tests/spec010.py b/python/tests/spec010.py new file mode 100644 index 0000000000..1c520ee323 --- /dev/null +++ b/python/tests/spec010.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from unittest import TestCase +from qpid.spec010 import load +from qpid.codec010 import Codec, StringCodec +from qpid.testlib import testrunner +from qpid.datatypes import Struct + +class SpecTest(TestCase): + + def setUp(self): + self.spec = load(testrunner.get_spec_file("amqp.0-10.xml")) + + def testSessionHeader(self): + hdr = self.spec["session.header"] + sc = StringCodec(self.spec) + hdr.encode(sc, Struct({"sync": True})) + assert sc.encoded == "\x01\x01" + + sc = StringCodec(self.spec) + hdr.encode(sc, Struct({"sync": False})) + assert sc.encoded == "\x01\x00" + + def encdec(self, type, value): + sc = StringCodec(self.spec) + type.encode(sc, value) + decoded = type.decode(sc) + return decoded + + def testMessageProperties(self): + props = Struct({"content_length": 0xDEADBEEF, + "reply_to": + Struct({"exchange": "the exchange name", "routing_key": "the routing key"})}) + dec = self.encdec(self.spec["message.message_properties"], props) + assert props.content_length == dec.content_length + assert props.reply_to.exchange == dec.reply_to.exchange + assert props.reply_to.routing_key == dec.reply_to.routing_key + + def testMessageSubscribe(self): + cmd = Struct({"exclusive": True, "destination": "this is a test"}) + dec = self.encdec(self.spec["message.subscribe"], cmd) + assert cmd.exclusive == dec.exclusive + assert cmd.destination == dec.destination |
