summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-03-05 14:39:40 +0000
committerRafael H. Schloming <rhs@apache.org>2008-03-05 14:39:40 +0000
commit86779be122dea590bc1e5201c58777ea3e362a95 (patch)
tree5867b18efe04c62c99e1ca14d177b0eda894bd82 /python/qpid
parent00f2ca6cf33f77e44b94db2701830f8c9bcd794e (diff)
downloadqpid-python-86779be122dea590bc1e5201c58777ea3e362a95.tar.gz
added incoming queues for messages; altered session dispatch to send entire assembly to a single handler; added logging switch for hello-010-world
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633861 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/assembler.py6
-rw-r--r--python/qpid/datatypes.py8
-rw-r--r--python/qpid/session.py79
-rw-r--r--python/qpid/spec010.py32
4 files changed, 94 insertions, 31 deletions
diff --git a/python/qpid/assembler.py b/python/qpid/assembler.py
index aac8b80cb4..fe78baaceb 100644
--- a/python/qpid/assembler.py
+++ b/python/qpid/assembler.py
@@ -46,7 +46,9 @@ class Segment:
def decode_command(self, spec):
sc = StringCodec(spec, self.payload)
- return sc.read_command()
+ cmd = sc.read_command()
+ cmd.id = self.id
+ return cmd
def decode_header(self, spec):
sc = StringCodec(spec, self.payload)
@@ -56,7 +58,7 @@ class Segment:
return values
def decode_body(self, spec):
- return self
+ return self.payload
def __str__(self):
return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py
index 649c8f4d76..efd1fdd4ff 100644
--- a/python/qpid/datatypes.py
+++ b/python/qpid/datatypes.py
@@ -43,6 +43,14 @@ class Message:
else:
self.headers = None
+ def __repr__(self):
+ args = []
+ if self.headers:
+ args.extend(self.headers)
+ if self.body:
+ args.append(self.body)
+ return "Message(%s)" % ", ".join(map(repr, args))
+
class Range:
def __init__(self, lower, upper):
diff --git a/python/qpid/session.py b/python/qpid/session.py
index 2e5f47b63e..334902bbf3 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -17,11 +17,14 @@
# under the License.
#
-from threading import Event
+from threading import Event, RLock
from invoker import Invoker
from datatypes import RangeSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
+from queue import Queue
+from datatypes import Message
+from logging import getLogger
class SessionDetached(Exception): pass
@@ -46,6 +49,20 @@ class Session(Invoker):
self.delegate = delegate(self)
self.send_id = True
self.results = {}
+ self.lock = RLock()
+ self._incoming = {}
+ self.assembly = None
+
+ def incoming(self, destination):
+ self.lock.acquire()
+ try:
+ queue = self._incoming.get(destination)
+ if queue == None:
+ queue = Queue()
+ self._incoming[destination] = queue
+ return queue
+ finally:
+ self.lock.release()
def close(self, timeout=None):
self.channel.session_detach(self.name)
@@ -106,19 +123,37 @@ class Session(Invoker):
def received(self, seg):
self.receiver.received(seg)
- if seg.type == self.spec["segment_type.command"].value:
- cmd = seg.decode(self.spec)
- attr = cmd.type.qname.replace(".", "_")
- result = getattr(self.delegate, attr)(cmd)
- if cmd.type.result:
- self.execution_result(seg.id, result)
- elif seg.type == self.spec["segment_type.header"].value:
- self.delegate.header(seg.decode(self.spec))
- elif seg.type == self.spec["segment_type.body"].value:
- self.delegate.body(seg.decode(self.spec))
- else:
- raise ValueError("unknown segment type: %s" % seg.type)
- self.receiver.completed(seg)
+ if seg.first:
+ assert self.assembly == None
+ self.assembly = []
+ self.assembly.append(seg)
+ if seg.last:
+ self.dispatch(self.assembly)
+ self.assembly = None
+
+ def dispatch(self, assembly):
+ cmd = assembly.pop(0).decode(self.spec)
+ args = []
+
+ for st in cmd.type.segments:
+ if assembly:
+ seg = assembly[0]
+ if seg.type == st.segment_type:
+ args.append(seg.decode(self.spec))
+ assembly.pop(0)
+ continue
+ args.append(None)
+
+ assert len(assembly) == 0
+
+ attr = cmd.type.qname.replace(".", "_")
+ result = getattr(self.delegate, attr)(cmd, *args)
+
+ if cmd.type.result:
+ self.execution_result(cmd.id, result)
+
+ for seg in assembly:
+ self.receiver.completed(seg)
def send(self, seg):
self.sender.send(seg)
@@ -196,13 +231,13 @@ class Delegate:
future = self.session.results[er.command_id]
future.set(er.value)
-class Client(Delegate):
+msg = getLogger("qpid.ssn.msg")
- def message_transfer(self, cmd):
- print "TRANSFER:", cmd
-
- def header(self, hdr):
- print "HEADER:", hdr
+class Client(Delegate):
- def body(self, seg):
- print "BODY:", seg
+ def message_transfer(self, cmd, headers, body):
+ m = Message(body)
+ m.headers = headers
+ messages = self.session.incoming(cmd.destination)
+ messages.put(m)
+ msg.debug("RECV: %s", m)
diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py
index e6b7946e17..c3f3e6ad57 100644
--- a/python/qpid/spec010.py
+++ b/python/qpid/spec010.py
@@ -194,7 +194,7 @@ class Composite(Type, Coded):
result[f.name] = a
for k, v in kwargs.items():
- f = self.named.get(k, None)
+ f = self.named.get(k)
if f == None:
raise TypeError("%s got an unexpected keyword argument '%s'" %
(self.name, k))
@@ -232,7 +232,7 @@ class Composite(Type, Coded):
flags = 0
for i in range(len(self.fields)):
f = self.fields[i]
- if f.type.is_present(values.get(f.name, None)):
+ if f.type.is_present(values.get(f.name)):
flags |= (0x1 << i)
for i in range(self.pack):
codec.write_uint8((flags >> 8*i) & 0xFF)
@@ -272,7 +272,10 @@ class Struct(Composite):
for f in self.fields])
return "%s {\n %s\n}" % (self.qname, fields)
-class Segment(Node):
+class Segment:
+
+ def __init__(self):
+ self.segment_type = None
def register(self, node):
self.spec = node.spec
@@ -284,7 +287,7 @@ class Instruction(Composite, Segment):
def __init__(self, name, code, children):
Composite.__init__(self, name, code, 0, 2, children)
- self.segment_type = None
+ Segment.__init__(self)
self.track = None
self.handlers = []
@@ -337,11 +340,17 @@ class Command(Instruction):
self.header.encode(codec, cmd)
Instruction.encode(self, codec, cmd)
-class Header(Segment):
+class Header(Segment, Node):
def __init__(self, children):
+ Segment.__init__(self)
+ Node.__init__(self, children)
self.entries = []
- Segment.__init__(self, children)
+
+ def register(self, node):
+ Segment.register(self, node)
+ self.segment_type = self.spec["segment_type.header"].value
+ Node.register(self)
class Entry(Lookup):
@@ -356,7 +365,16 @@ class Entry(Lookup):
def resolve(self):
self.type = self.lookup(self.type)
-class Body(Segment):
+class Body(Segment, Node):
+
+ def __init__(self, children):
+ Segment.__init__(self)
+ Node.__init__(self, children)
+
+ def register(self, node):
+ Segment.register(self, node)
+ self.segment_type = self.spec["segment_type.body"].value
+ Node.register(self)
def resolve(self): pass