diff options
Diffstat (limited to 'python/qpid')
| -rw-r--r-- | python/qpid/codec010.py | 18 | ||||
| -rw-r--r-- | python/qpid/datatypes.py | 36 | ||||
| -rw-r--r-- | python/qpid/session.py | 7 |
3 files changed, 40 insertions, 21 deletions
diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py index 2dcba4e917..f6539ffcee 100644 --- a/python/qpid/codec010.py +++ b/python/qpid/codec010.py @@ -18,6 +18,7 @@ # from packer import Packer +from datatypes import RangedSet class Codec(Packer): @@ -123,6 +124,23 @@ class Codec(Packer): self.write_uint16(len(b)) self.write(b) + def read_sequence_set(self): + result = RangedSet() + size = self.read_uint16() + nranges = size/8 + while nranges > 0: + lower = self.read_sequence_no() + upper = self.read_sequence_no() + result.add(lower, upper) + nranges -= 1 + return result + def write_sequence_set(self, ss): + size = 8*len(ss.ranges) + self.write_uint16(size) + for range in ss.ranges: + self.write_sequence_no(range.lower) + self.write_sequence_no(range.upper) + def read_vbin32(self): return self.read(self.read_uint32()) def write_vbin32(self, b): diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py index efd1fdd4ff..7158e0d75e 100644 --- a/python/qpid/datatypes.py +++ b/python/qpid/datatypes.py @@ -42,20 +42,26 @@ class Message: self.headers = args[:-1] else: self.headers = None + self.id = None def __repr__(self): args = [] if self.headers: - args.extend(self.headers) + args.extend(map(repr, self.headers)) if self.body: - args.append(self.body) - return "Message(%s)" % ", ".join(map(repr, args)) + args.append(repr(self.body)) + if self.id is not None: + args.append("id=%s" % self.id) + return "Message(%s)" % ", ".join(args) class Range: - def __init__(self, lower, upper): + def __init__(self, lower, upper = None): self.lower = lower - self.upper = upper + if upper is None: + self.upper = lower + else: + self.upper = upper def __contains__(self, n): return self.lower <= n and n <= self.upper @@ -69,16 +75,15 @@ class Range: def span(self, r): return Range(min(self.lower, r.lower), max(self.upper, r.upper)) - def __str__(self): - return "Range(%s, %s)" % (self.lower, self.upper) - def __repr__(self): - return str(self) + return "Range(%s, %s)" % (self.lower, self.upper) -class RangeSet: +class RangedSet: - def __init__(self): + def __init__(self, *args): self.ranges = [] + for n in args: + self.add(n) def __contains__(self, n): for r in self.ranges: @@ -100,14 +105,11 @@ class RangeSet: idx += 1 self.ranges.append(range) - def add(self, n): - self.add_range(Range(n, n)) - - def __str__(self): - return "RangeSet(%s)" % str(self.ranges) + def add(self, lower, upper = None): + self.add_range(Range(lower, upper)) def __repr__(self): - return str(self) + return "RangedSet(%s)" % str(self.ranges) class Future: def __init__(self, initial=None): diff --git a/python/qpid/session.py b/python/qpid/session.py index 334902bbf3..7702a19251 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -19,7 +19,7 @@ from threading import Event, RLock from invoker import Invoker -from datatypes import RangeSet, Struct, Future +from datatypes import RangedSet, Struct, Future from codec010 import StringCodec from assembler import Segment from queue import Queue @@ -170,7 +170,7 @@ class Receiver: self.session = session self.next_id = None self.next_offset = None - self._completed = RangeSet() + self._completed = RangedSet() def received(self, seg): if self.next_id == None or self.next_offset == None: @@ -220,8 +220,6 @@ class Sender: else: idx += 1 -from queue import Queue, Closed, Empty - class Delegate: def __init__(self, session): @@ -238,6 +236,7 @@ class Client(Delegate): def message_transfer(self, cmd, headers, body): m = Message(body) m.headers = headers + m.id = cmd.id messages = self.session.incoming(cmd.destination) messages.put(m) msg.debug("RECV: %s", m) |
