diff options
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r-- | python/qpid/peer.py | 466 |
1 files changed, 0 insertions, 466 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py deleted file mode 100644 index 95055cc014..0000000000 --- a/python/qpid/peer.py +++ /dev/null @@ -1,466 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -This module contains a skeletal peer implementation useful for -implementing an AMQP server, client, or proxy. The peer implementation -sorts incoming frames to their intended channels, and dispatches -incoming method frames to a delegate. -""" - -import thread, threading, traceback, socket, sys, logging -from connection08 import EOF, Method, Header, Body, Request, Response, VersionError -from message import Message -from queue import Queue, Closed as QueueClosed -from content import Content -from cStringIO import StringIO -from time import time -from exceptions import Closed - -class Sequence: - - def __init__(self, start, step = 1): - # we should keep start for wrap around - self._next = start - self.step = step - self.lock = thread.allocate_lock() - - def next(self): - self.lock.acquire() - try: - result = self._next - self._next += self.step - return result - finally: - self.lock.release() - -class Peer: - - def __init__(self, conn, delegate, channel_factory=None): - self.conn = conn - self.delegate = delegate - self.outgoing = Queue(0) - self.work = Queue(0) - self.channels = {} - self.lock = thread.allocate_lock() - if channel_factory: - self.channel_factory = channel_factory - else: - self.channel_factory = Channel - - def channel(self, id): - self.lock.acquire() - try: - try: - ch = self.channels[id] - except KeyError: - ch = self.channel_factory(id, self.outgoing, self.conn.spec) - self.channels[id] = ch - finally: - self.lock.release() - return ch - - def start(self): - thread.start_new_thread(self.writer, ()) - thread.start_new_thread(self.reader, ()) - thread.start_new_thread(self.worker, ()) - - def fatal(self, message=None): - """Call when an unexpected exception occurs that will kill a thread.""" - if message: print >> sys.stderr, message - self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) - - def reader(self): - try: - while True: - try: - frame = self.conn.read() - except EOF, e: - self.work.close() - break - ch = self.channel(frame.channel) - ch.receive(frame, self.work) - except VersionError, e: - self.closed(e) - except: - self.fatal() - - def closed(self, reason): - # We must close the delegate first because closing channels - # may wake up waiting threads and we don't want them to see - # the delegate as open. - self.delegate.closed(reason) - for ch in self.channels.values(): - ch.closed(reason) - - def writer(self): - try: - while True: - try: - message = self.outgoing.get() - self.conn.write(message) - except socket.error, e: - self.closed(e) - break - self.conn.flush() - except: - self.fatal() - - def worker(self): - try: - while True: - queue = self.work.get() - frame = queue.get() - channel = self.channel(frame.channel) - if frame.method_type.content: - content = read_content(queue) - else: - content = None - - self.delegate(channel, Message(channel, frame, content)) - except QueueClosed: - self.closed("worker closed") - except: - self.fatal() - -class Requester: - - def __init__(self, writer): - self.write = writer - self.sequence = Sequence(1) - self.mark = 0 - # request_id -> listener - self.outstanding = {} - - def request(self, method, listener, content = None): - frame = Request(self.sequence.next(), self.mark, method) - self.outstanding[frame.id] = listener - self.write(frame, content) - - def receive(self, channel, frame): - listener = self.outstanding.pop(frame.request_id) - listener(channel, frame) - -class Responder: - - def __init__(self, writer): - self.write = writer - self.sequence = Sequence(1) - - def respond(self, method, batch, request): - if isinstance(request, Method): - self.write(method) - else: - # allow batching from frame at either end - if batch<0: - frame = Response(self.sequence.next(), request.id+batch, -batch, method) - else: - frame = Response(self.sequence.next(), request.id, batch, method) - self.write(frame) - -class Channel: - - def __init__(self, id, outgoing, spec): - self.id = id - self.outgoing = outgoing - self.spec = spec - self.incoming = Queue(0) - self.responses = Queue(0) - self.queue = None - self._closed = False - self.reason = None - - self.requester = Requester(self.write) - self.responder = Responder(self.write) - - self.completion = OutgoingCompletion() - self.incoming_completion = IncomingCompletion(self) - self.futures = {} - self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves - - self.invoker = self.invoke_method - self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) - self.synchronous = True - - def closed(self, reason): - if self._closed: - return - self._closed = True - self.reason = reason - self.incoming.close() - self.responses.close() - self.completion.close() - self.incoming_completion.reset() - for f in self.futures.values(): - f.put_response(self, reason) - - def write(self, frame, content = None): - if self._closed: - raise Closed(self.reason) - frame.channel = self.id - self.outgoing.put(frame) - if (isinstance(frame, (Method, Request)) - and content == None - and frame.method_type.content): - content = Content() - if content != None: - self.write_content(frame.method_type.klass, content) - - def write_content(self, klass, content): - header = Header(klass, content.weight(), content.size(), content.properties) - self.write(header) - for child in content.children: - self.write_content(klass, child) - # should split up if content.body exceeds max frame size - if content.body: - self.write(Body(content.body)) - - def receive(self, frame, work): - if isinstance(frame, Method): - if frame.method.response: - self.queue = self.responses - else: - self.queue = self.incoming - work.put(self.incoming) - elif isinstance(frame, Request): - self.queue = self.incoming - work.put(self.incoming) - elif isinstance(frame, Response): - self.requester.receive(self, frame) - if frame.method_type.content: - self.queue = self.responses - return - self.queue.put(frame) - - def queue_response(self, channel, frame): - channel.responses.put(frame.method) - - def request(self, method, listener, content = None): - self.requester.request(method, listener, content) - - def respond(self, method, batch, request): - self.responder.respond(method, batch, request) - - def invoke(self, type, args, kwargs): - if (type.klass.name in ["channel", "session"]) and (type.name in ["close", "open", "closed"]): - self.completion.reset() - self.incoming_completion.reset() - self.completion.next_command(type) - - content = kwargs.pop("content", None) - frame = Method(type, type.arguments(*args, **kwargs)) - return self.invoker(frame, content) - - # used for 0-9 - def invoke_reliable(self, frame, content = None): - if not self.synchronous: - future = Future() - self.request(frame, future.put_response, content) - if not frame.method.responses: return None - else: return future - - self.request(frame, self.queue_response, content) - if not frame.method.responses: - if self.use_execution_layer and frame.method_type.is_l4_command(): - self.execution_sync() - self.completion.wait() - if self._closed: - raise Closed(self.reason) - return None - try: - resp = self.responses.get() - if resp.method_type.content: - return Message(self, resp, read_content(self.responses)) - else: - return Message(self, resp) - except QueueClosed, e: - if self._closed: - raise Closed(self.reason) - else: - raise e - - # used for 0-8 and 0-10 - def invoke_method(self, frame, content = None): - if frame.method.result: - cmd_id = self.completion.command_id - future = Future() - self.futures[cmd_id] = future - - self.write(frame, content) - - try: - # here we depend on all nowait fields being named nowait - f = frame.method.fields.byname["nowait"] - nowait = frame.args[frame.method.fields.index(f)] - except KeyError: - nowait = False - - try: - if not nowait and frame.method.responses: - resp = self.responses.get() - if resp.method.content: - content = read_content(self.responses) - else: - content = None - if resp.method in frame.method.responses: - return Message(self, resp, content) - else: - raise ValueError(resp) - elif frame.method.result: - if self.synchronous: - fr = future.get_response(timeout=10) - if self._closed: - raise Closed(self.reason) - return fr - else: - return future - elif self.synchronous and not frame.method.response \ - and self.use_execution_layer and frame.method.is_l4_command(): - self.execution_sync() - completed = self.completion.wait(timeout=10) - if self._closed: - raise Closed(self.reason) - if not completed: - self.closed("Timed-out waiting for completion of %s" % frame) - except QueueClosed, e: - if self._closed: - raise Closed(self.reason) - else: - raise e - - def __getattr__(self, name): - type = self.spec.method(name) - if type == None: raise AttributeError(name) - method = lambda *args, **kwargs: self.invoke(type, args, kwargs) - self.__dict__[name] = method - return method - -def read_content(queue): - header = queue.get() - children = [] - for i in range(header.weight): - children.append(read_content(queue)) - buf = StringIO() - eof = header.eof - while not eof: - body = queue.get() - eof = body.eof - content = body.content - buf.write(content) - return Content(buf.getvalue(), children, header.properties.copy()) - -class Future: - def __init__(self): - self.completed = threading.Event() - - def put_response(self, channel, response): - self.response = response - self.completed.set() - - def get_response(self, timeout=None): - self.completed.wait(timeout) - if self.completed.isSet(): - return self.response - else: - return None - - def is_complete(self): - return self.completed.isSet() - -class OutgoingCompletion: - """ - Manages completion of outgoing commands i.e. command sent by this peer - """ - - def __init__(self): - self.condition = threading.Condition() - - #todo, implement proper wraparound - self.sequence = Sequence(0) #issues ids for outgoing commands - self.command_id = -1 #last issued id - self.mark = -1 #commands up to this mark are known to be complete - self._closed = False - - def next_command(self, method): - #the following test is a hack until the track/sub-channel is available - if method.is_l4_command(): - self.command_id = self.sequence.next() - - def reset(self): - self.sequence = Sequence(0) #reset counter - - def close(self): - self.reset() - self.condition.acquire() - try: - self._closed = True - self.condition.notifyAll() - finally: - self.condition.release() - - def complete(self, mark): - self.condition.acquire() - try: - self.mark = mark - #print "set mark to %s [%s] " % (self.mark, self) - self.condition.notifyAll() - finally: - self.condition.release() - - def wait(self, point_of_interest=-1, timeout=None): - if point_of_interest == -1: point_of_interest = self.command_id - start_time = time() - remaining = timeout - self.condition.acquire() - try: - while not self._closed and point_of_interest > self.mark: - #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self) - self.condition.wait(remaining) - if not self._closed and point_of_interest > self.mark and timeout: - if (start_time + timeout) < time(): break - else: remaining = timeout - (time() - start_time) - finally: - self.condition.release() - return point_of_interest <= self.mark - -class IncomingCompletion: - """ - Manages completion of incoming commands i.e. command received by this peer - """ - - def __init__(self, channel): - self.sequence = Sequence(0) #issues ids for incoming commands - self.mark = -1 #id of last command of whose completion notification was sent to the other peer - self.channel = channel - - def reset(self): - self.sequence = Sequence(0) #reset counter - - def complete(self, mark, cumulative=True): - if cumulative: - if mark > self.mark: - self.mark = mark - self.channel.execution_complete(cumulative_execution_mark=self.mark) - else: - #TODO: record and manage the ranges properly - range = [mark, mark] - if (self.mark == -1):#hack until wraparound is implemented - self.channel.execution_complete(cumulative_execution_mark=0xFFFFFFFFL, ranged_execution_set=range) - else: - self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range) |