summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/codec010.py18
-rw-r--r--python/qpid/datatypes.py36
-rw-r--r--python/qpid/session.py7
3 files changed, 40 insertions, 21 deletions
diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py
index 2dcba4e917..f6539ffcee 100644
--- a/python/qpid/codec010.py
+++ b/python/qpid/codec010.py
@@ -18,6 +18,7 @@
#
from packer import Packer
+from datatypes import RangedSet
class Codec(Packer):
@@ -123,6 +124,23 @@ class Codec(Packer):
self.write_uint16(len(b))
self.write(b)
+ def read_sequence_set(self):
+ result = RangedSet()
+ size = self.read_uint16()
+ nranges = size/8
+ while nranges > 0:
+ lower = self.read_sequence_no()
+ upper = self.read_sequence_no()
+ result.add(lower, upper)
+ nranges -= 1
+ return result
+ def write_sequence_set(self, ss):
+ size = 8*len(ss.ranges)
+ self.write_uint16(size)
+ for range in ss.ranges:
+ self.write_sequence_no(range.lower)
+ self.write_sequence_no(range.upper)
+
def read_vbin32(self):
return self.read(self.read_uint32())
def write_vbin32(self, b):
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py
index efd1fdd4ff..7158e0d75e 100644
--- a/python/qpid/datatypes.py
+++ b/python/qpid/datatypes.py
@@ -42,20 +42,26 @@ class Message:
self.headers = args[:-1]
else:
self.headers = None
+ self.id = None
def __repr__(self):
args = []
if self.headers:
- args.extend(self.headers)
+ args.extend(map(repr, self.headers))
if self.body:
- args.append(self.body)
- return "Message(%s)" % ", ".join(map(repr, args))
+ args.append(repr(self.body))
+ if self.id is not None:
+ args.append("id=%s" % self.id)
+ return "Message(%s)" % ", ".join(args)
class Range:
- def __init__(self, lower, upper):
+ def __init__(self, lower, upper = None):
self.lower = lower
- self.upper = upper
+ if upper is None:
+ self.upper = lower
+ else:
+ self.upper = upper
def __contains__(self, n):
return self.lower <= n and n <= self.upper
@@ -69,16 +75,15 @@ class Range:
def span(self, r):
return Range(min(self.lower, r.lower), max(self.upper, r.upper))
- def __str__(self):
- return "Range(%s, %s)" % (self.lower, self.upper)
-
def __repr__(self):
- return str(self)
+ return "Range(%s, %s)" % (self.lower, self.upper)
-class RangeSet:
+class RangedSet:
- def __init__(self):
+ def __init__(self, *args):
self.ranges = []
+ for n in args:
+ self.add(n)
def __contains__(self, n):
for r in self.ranges:
@@ -100,14 +105,11 @@ class RangeSet:
idx += 1
self.ranges.append(range)
- def add(self, n):
- self.add_range(Range(n, n))
-
- def __str__(self):
- return "RangeSet(%s)" % str(self.ranges)
+ def add(self, lower, upper = None):
+ self.add_range(Range(lower, upper))
def __repr__(self):
- return str(self)
+ return "RangedSet(%s)" % str(self.ranges)
class Future:
def __init__(self, initial=None):
diff --git a/python/qpid/session.py b/python/qpid/session.py
index 334902bbf3..7702a19251 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -19,7 +19,7 @@
from threading import Event, RLock
from invoker import Invoker
-from datatypes import RangeSet, Struct, Future
+from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
from queue import Queue
@@ -170,7 +170,7 @@ class Receiver:
self.session = session
self.next_id = None
self.next_offset = None
- self._completed = RangeSet()
+ self._completed = RangedSet()
def received(self, seg):
if self.next_id == None or self.next_offset == None:
@@ -220,8 +220,6 @@ class Sender:
else:
idx += 1
-from queue import Queue, Closed, Empty
-
class Delegate:
def __init__(self, session):
@@ -238,6 +236,7 @@ class Client(Delegate):
def message_transfer(self, cmd, headers, body):
m = Message(body)
m.headers = headers
+ m.id = cmd.id
messages = self.session.incoming(cmd.destination)
messages.put(m)
msg.debug("RECV: %s", m)