diff options
| author | Alan Conway <aconway@apache.org> | 2007-03-16 20:26:11 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-03-16 20:26:11 +0000 |
| commit | 55a530448b4107edcb3bb8543b562c7208080995 (patch) | |
| tree | 23be2798e546f641ff4652f7255c090a39cd3010 /python/qpid/peer.py | |
| parent | f3cb9466b4b969747f97ab6716964179db96f124 (diff) | |
| download | qpid-python-55a530448b4107edcb3bb8543b562c7208080995.tar.gz | |
Merged revisions 496593 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9
........
r496593 | rhs | 2007-01-16 00:28:25 -0500 (Tue, 16 Jan 2007) | 1 line
0-9 request/response framing for python
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@519129 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/peer.py')
| -rw-r--r-- | python/qpid/peer.py | 192 |
1 files changed, 140 insertions, 52 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 7c6cf91dea..66d325994b 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -24,13 +24,30 @@ sorts incoming frames to their intended channels, and dispatches incoming method frames to a delegate. """ -import thread, traceback, socket, sys, logging -from connection import Frame, EOF, Method, Header, Body +import thread, threading, traceback, socket, sys, logging +from connection import EOF, Method, Header, Body, Request, Response from message import Message from queue import Queue, Closed as QueueClosed from content import Content from cStringIO import StringIO +class Sequence: + + def __init__(self, start, step = 1): + # we should keep start for wrap around + self._next = start + self.step = step + self.lock = thread.allocate_lock() + + def next(self): + self.lock.acquire() + try: + result = self._next + self._next += self.step + return result + finally: + self.lock.release() + class Peer: def __init__(self, conn, delegate): @@ -39,8 +56,6 @@ class Peer: self.outgoing = Queue(0) self.work = Queue(0) self.channels = {} - self.Channel = type("Channel%s" % conn.spec.klass.__name__, - (Channel, conn.spec.klass), {}) self.lock = thread.allocate_lock() def channel(self, id): @@ -49,7 +64,7 @@ class Peer: try: ch = self.channels[id] except KeyError: - ch = self.Channel(id, self.outgoing) + ch = Channel(id, self.outgoing, self.conn.spec) self.channels[id] = ch finally: self.lock.release() @@ -64,7 +79,7 @@ class Peer: """Call when an unexpected exception occurs that will kill a thread.""" if message: print >> sys.stderr, message self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) - + def reader(self): try: while True: @@ -74,7 +89,7 @@ class Peer: self.work.close() break ch = self.channel(frame.channel) - ch.dispatch(frame, self.work) + ch.receive(frame, self.work) except: self.fatal() @@ -99,37 +114,70 @@ class Peer: def worker(self): try: while True: - self.dispatch(self.work.get()) - except QueueClosed, e: - self.close(e) + queue = self.work.get() + frame = queue.get() + channel = self.channel(frame.channel) + if frame.method_type.content: + content = read_content(queue) + else: + content = None + + self.delegate(channel, Message(channel, frame, content)) except: self.fatal() - def dispatch(self, queue): - frame = queue.get() - channel = self.channel(frame.channel) - payload = frame.payload - if payload.method.content: - content = read_content(queue) +class Requester: + + def __init__(self, writer): + self.write = writer + self.sequence = Sequence(1) + self.mark = 0 + # request_id -> listener + self.outstanding = {} + + def request(self, method, listener, content = None): + frame = Request(self.sequence.next(), self.mark, method) + self.outstanding[frame.id] = listener + self.write(frame, content) + + def receive(self, channel, frame): + listener = self.outstanding.pop(frame.id) + listener(channel, frame) + +class Responder: + + def __init__(self, writer): + self.write = writer + self.sequence = Sequence(1) + + def respond(self, method, request): + if isinstance(request, Method): + self.write(method) else: - content = None - # Let the caller deal with exceptions thrown here. - message = Message(payload.method, payload.args, content) - self.delegate.dispatch(channel, message) + # XXX: batching + frame = Response(self.sequence.next(), request.id, 0, method) + self.write(frame) class Closed(Exception): pass class Channel: - def __init__(self, id, outgoing): + def __init__(self, id, outgoing, spec): self.id = id self.outgoing = outgoing + self.spec = spec self.incoming = Queue(0) self.responses = Queue(0) self.queue = None self.closed = False self.reason = None + self.requester = Requester(self.write) + self.responder = Responder(self.write) + + # XXX: better switch + self.reliable = False + def close(self, reason): if self.closed: return @@ -138,43 +186,87 @@ class Channel: self.incoming.close() self.responses.close() - def dispatch(self, frame, work): - payload = frame.payload - if isinstance(payload, Method): - if payload.method.response: + def write(self, frame, content = None): + if self.closed: + raise Closed(self.reason) + frame.channel = self.id + self.outgoing.put(frame) + if (isinstance(frame, (Method, Request)) + and content == None + and frame.method_type.content): + content = Content() + if content != None: + self.write_content(frame.method_type.klass, content) + + def write_content(self, klass, content): + size = content.size() + header = Header(klass, content.weight(), size, content.properties) + self.write(header) + for child in content.children: + self.write_content(klass, child) + # should split up if content.body exceeds max frame size + if size > 0: + self.write(Body(content.body)) + + def receive(self, frame, work): + if isinstance(frame, Method): + if frame.method.response: self.queue = self.responses else: self.queue = self.incoming work.put(self.incoming) + elif isinstance(frame, Request): + self.queue = self.incoming + work.put(self.incoming) + elif isinstance(frame, Response): + self.requester.receive(self, frame) + return self.queue.put(frame) - def invoke(self, method, args, content = None): - if self.closed: - raise Closed(self.reason) - frame = Frame(self.id, Method(method, *args)) - self.outgoing.put(frame) + def queue_response(self, channel, frame): + channel.responses.put(frame.method) + + def request(self, method, listener, content = None): + self.requester.request(method, listener, content) + + def respond(self, method, request): + self.responder.respond(method, request) - if method.content: - if content == None: - content = Content() - self.write_content(method.klass, content, self.outgoing) + def invoke(self, type, args, kwargs): + content = kwargs.pop("content", None) + frame = Method(type, type.arguments(*args, **kwargs)) + if self.reliable: + self.request(frame, self.queue_response, content) + try: + resp = self.responses.get() + return Message(self, resp) + except QueueClosed, e: + if self.closed: + raise Closed(self.reason) + else: + raise e + else: + return self.invoke_method(frame, content) + + def invoke_method(self, frame, content = None): + self.write(frame, content) try: # here we depend on all nowait fields being named nowait - f = method.fields.byname["nowait"] - nowait = args[method.fields.index(f)] + f = frame.method.fields.byname["nowait"] + nowait = frame.args[frame.method.fields.index(f)] except KeyError: nowait = False try: - if not nowait and method.responses: - resp = self.responses.get().payload + if not nowait and frame.method.responses: + resp = self.responses.get() if resp.method.content: content = read_content(self.responses) else: content = None - if resp.method in method.responses: - return Message(resp.method, resp.args, content) + if resp.method in frame.method.responses: + return Message(self, resp, content) else: raise ValueError(resp) except QueueClosed, e: @@ -183,19 +275,15 @@ class Channel: else: raise e - def write_content(self, klass, content, queue): - size = content.size() - header = Frame(self.id, Header(klass, content.weight(), size, **content.properties)) - queue.put(header) - for child in content.children: - self.write_content(klass, child, queue) - # should split up if content.body exceeds max frame size - if size > 0: - queue.put(Frame(self.id, Body(content.body))) + def __getattr__(self, name): + type = self.spec.method(name) + if type == None: raise AttributeError(name) + method = lambda *args, **kwargs: self.invoke(type, args, kwargs) + self.__dict__[name] = method + return method def read_content(queue): - frame = queue.get() - header = frame.payload + header = queue.get() children = [] for i in range(header.weight): children.append(read_content(queue)) @@ -204,7 +292,7 @@ def read_content(queue): buf = StringIO() while read < size: body = queue.get() - content = body.payload.content + content = body.content buf.write(content) read += len(content) return Content(buf.getvalue(), children, header.properties.copy()) |
