summaryrefslogtreecommitdiff
path: root/python/qpid/peer.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-16 20:26:11 +0000
committerAlan Conway <aconway@apache.org>2007-03-16 20:26:11 +0000
commit55a530448b4107edcb3bb8543b562c7208080995 (patch)
tree23be2798e546f641ff4652f7255c090a39cd3010 /python/qpid/peer.py
parentf3cb9466b4b969747f97ab6716964179db96f124 (diff)
downloadqpid-python-55a530448b4107edcb3bb8543b562c7208080995.tar.gz
Merged revisions 496593 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9 ........ r496593 | rhs | 2007-01-16 00:28:25 -0500 (Tue, 16 Jan 2007) | 1 line 0-9 request/response framing for python ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@519129 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r--python/qpid/peer.py192
1 files changed, 140 insertions, 52 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 7c6cf91dea..66d325994b 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -24,13 +24,30 @@ sorts incoming frames to their intended channels, and dispatches
incoming method frames to a delegate.
"""
-import thread, traceback, socket, sys, logging
-from connection import Frame, EOF, Method, Header, Body
+import thread, threading, traceback, socket, sys, logging
+from connection import EOF, Method, Header, Body, Request, Response
from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
+class Sequence:
+
+ def __init__(self, start, step = 1):
+ # we should keep start for wrap around
+ self._next = start
+ self.step = step
+ self.lock = thread.allocate_lock()
+
+ def next(self):
+ self.lock.acquire()
+ try:
+ result = self._next
+ self._next += self.step
+ return result
+ finally:
+ self.lock.release()
+
class Peer:
def __init__(self, conn, delegate):
@@ -39,8 +56,6 @@ class Peer:
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
- self.Channel = type("Channel%s" % conn.spec.klass.__name__,
- (Channel, conn.spec.klass), {})
self.lock = thread.allocate_lock()
def channel(self, id):
@@ -49,7 +64,7 @@ class Peer:
try:
ch = self.channels[id]
except KeyError:
- ch = self.Channel(id, self.outgoing)
+ ch = Channel(id, self.outgoing, self.conn.spec)
self.channels[id] = ch
finally:
self.lock.release()
@@ -64,7 +79,7 @@ class Peer:
"""Call when an unexpected exception occurs that will kill a thread."""
if message: print >> sys.stderr, message
self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
-
+
def reader(self):
try:
while True:
@@ -74,7 +89,7 @@ class Peer:
self.work.close()
break
ch = self.channel(frame.channel)
- ch.dispatch(frame, self.work)
+ ch.receive(frame, self.work)
except:
self.fatal()
@@ -99,37 +114,70 @@ class Peer:
def worker(self):
try:
while True:
- self.dispatch(self.work.get())
- except QueueClosed, e:
- self.close(e)
+ queue = self.work.get()
+ frame = queue.get()
+ channel = self.channel(frame.channel)
+ if frame.method_type.content:
+ content = read_content(queue)
+ else:
+ content = None
+
+ self.delegate(channel, Message(channel, frame, content))
except:
self.fatal()
- def dispatch(self, queue):
- frame = queue.get()
- channel = self.channel(frame.channel)
- payload = frame.payload
- if payload.method.content:
- content = read_content(queue)
+class Requester:
+
+ def __init__(self, writer):
+ self.write = writer
+ self.sequence = Sequence(1)
+ self.mark = 0
+ # request_id -> listener
+ self.outstanding = {}
+
+ def request(self, method, listener, content = None):
+ frame = Request(self.sequence.next(), self.mark, method)
+ self.outstanding[frame.id] = listener
+ self.write(frame, content)
+
+ def receive(self, channel, frame):
+ listener = self.outstanding.pop(frame.id)
+ listener(channel, frame)
+
+class Responder:
+
+ def __init__(self, writer):
+ self.write = writer
+ self.sequence = Sequence(1)
+
+ def respond(self, method, request):
+ if isinstance(request, Method):
+ self.write(method)
else:
- content = None
- # Let the caller deal with exceptions thrown here.
- message = Message(payload.method, payload.args, content)
- self.delegate.dispatch(channel, message)
+ # XXX: batching
+ frame = Response(self.sequence.next(), request.id, 0, method)
+ self.write(frame)
class Closed(Exception): pass
class Channel:
- def __init__(self, id, outgoing):
+ def __init__(self, id, outgoing, spec):
self.id = id
self.outgoing = outgoing
+ self.spec = spec
self.incoming = Queue(0)
self.responses = Queue(0)
self.queue = None
self.closed = False
self.reason = None
+ self.requester = Requester(self.write)
+ self.responder = Responder(self.write)
+
+ # XXX: better switch
+ self.reliable = False
+
def close(self, reason):
if self.closed:
return
@@ -138,43 +186,87 @@ class Channel:
self.incoming.close()
self.responses.close()
- def dispatch(self, frame, work):
- payload = frame.payload
- if isinstance(payload, Method):
- if payload.method.response:
+ def write(self, frame, content = None):
+ if self.closed:
+ raise Closed(self.reason)
+ frame.channel = self.id
+ self.outgoing.put(frame)
+ if (isinstance(frame, (Method, Request))
+ and content == None
+ and frame.method_type.content):
+ content = Content()
+ if content != None:
+ self.write_content(frame.method_type.klass, content)
+
+ def write_content(self, klass, content):
+ size = content.size()
+ header = Header(klass, content.weight(), size, content.properties)
+ self.write(header)
+ for child in content.children:
+ self.write_content(klass, child)
+ # should split up if content.body exceeds max frame size
+ if size > 0:
+ self.write(Body(content.body))
+
+ def receive(self, frame, work):
+ if isinstance(frame, Method):
+ if frame.method.response:
self.queue = self.responses
else:
self.queue = self.incoming
work.put(self.incoming)
+ elif isinstance(frame, Request):
+ self.queue = self.incoming
+ work.put(self.incoming)
+ elif isinstance(frame, Response):
+ self.requester.receive(self, frame)
+ return
self.queue.put(frame)
- def invoke(self, method, args, content = None):
- if self.closed:
- raise Closed(self.reason)
- frame = Frame(self.id, Method(method, *args))
- self.outgoing.put(frame)
+ def queue_response(self, channel, frame):
+ channel.responses.put(frame.method)
+
+ def request(self, method, listener, content = None):
+ self.requester.request(method, listener, content)
+
+ def respond(self, method, request):
+ self.responder.respond(method, request)
- if method.content:
- if content == None:
- content = Content()
- self.write_content(method.klass, content, self.outgoing)
+ def invoke(self, type, args, kwargs):
+ content = kwargs.pop("content", None)
+ frame = Method(type, type.arguments(*args, **kwargs))
+ if self.reliable:
+ self.request(frame, self.queue_response, content)
+ try:
+ resp = self.responses.get()
+ return Message(self, resp)
+ except QueueClosed, e:
+ if self.closed:
+ raise Closed(self.reason)
+ else:
+ raise e
+ else:
+ return self.invoke_method(frame, content)
+
+ def invoke_method(self, frame, content = None):
+ self.write(frame, content)
try:
# here we depend on all nowait fields being named nowait
- f = method.fields.byname["nowait"]
- nowait = args[method.fields.index(f)]
+ f = frame.method.fields.byname["nowait"]
+ nowait = frame.args[frame.method.fields.index(f)]
except KeyError:
nowait = False
try:
- if not nowait and method.responses:
- resp = self.responses.get().payload
+ if not nowait and frame.method.responses:
+ resp = self.responses.get()
if resp.method.content:
content = read_content(self.responses)
else:
content = None
- if resp.method in method.responses:
- return Message(resp.method, resp.args, content)
+ if resp.method in frame.method.responses:
+ return Message(self, resp, content)
else:
raise ValueError(resp)
except QueueClosed, e:
@@ -183,19 +275,15 @@ class Channel:
else:
raise e
- def write_content(self, klass, content, queue):
- size = content.size()
- header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
- queue.put(header)
- for child in content.children:
- self.write_content(klass, child, queue)
- # should split up if content.body exceeds max frame size
- if size > 0:
- queue.put(Frame(self.id, Body(content.body)))
+ def __getattr__(self, name):
+ type = self.spec.method(name)
+ if type == None: raise AttributeError(name)
+ method = lambda *args, **kwargs: self.invoke(type, args, kwargs)
+ self.__dict__[name] = method
+ return method
def read_content(queue):
- frame = queue.get()
- header = frame.payload
+ header = queue.get()
children = []
for i in range(header.weight):
children.append(read_content(queue))
@@ -204,7 +292,7 @@ def read_content(queue):
buf = StringIO()
while read < size:
body = queue.get()
- content = body.payload.content
+ content = body.content
buf.write(content)
read += len(content)
return Content(buf.getvalue(), children, header.properties.copy())