summaryrefslogtreecommitdiff
path: root/python/qpid/peer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r--python/qpid/peer.py466
1 files changed, 0 insertions, 466 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
deleted file mode 100644
index 95055cc014..0000000000
--- a/python/qpid/peer.py
+++ /dev/null
@@ -1,466 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-This module contains a skeletal peer implementation useful for
-implementing an AMQP server, client, or proxy. The peer implementation
-sorts incoming frames to their intended channels, and dispatches
-incoming method frames to a delegate.
-"""
-
-import thread, threading, traceback, socket, sys, logging
-from connection08 import EOF, Method, Header, Body, Request, Response, VersionError
-from message import Message
-from queue import Queue, Closed as QueueClosed
-from content import Content
-from cStringIO import StringIO
-from time import time
-from exceptions import Closed
-
-class Sequence:
-
- def __init__(self, start, step = 1):
- # we should keep start for wrap around
- self._next = start
- self.step = step
- self.lock = thread.allocate_lock()
-
- def next(self):
- self.lock.acquire()
- try:
- result = self._next
- self._next += self.step
- return result
- finally:
- self.lock.release()
-
-class Peer:
-
- def __init__(self, conn, delegate, channel_factory=None):
- self.conn = conn
- self.delegate = delegate
- self.outgoing = Queue(0)
- self.work = Queue(0)
- self.channels = {}
- self.lock = thread.allocate_lock()
- if channel_factory:
- self.channel_factory = channel_factory
- else:
- self.channel_factory = Channel
-
- def channel(self, id):
- self.lock.acquire()
- try:
- try:
- ch = self.channels[id]
- except KeyError:
- ch = self.channel_factory(id, self.outgoing, self.conn.spec)
- self.channels[id] = ch
- finally:
- self.lock.release()
- return ch
-
- def start(self):
- thread.start_new_thread(self.writer, ())
- thread.start_new_thread(self.reader, ())
- thread.start_new_thread(self.worker, ())
-
- def fatal(self, message=None):
- """Call when an unexpected exception occurs that will kill a thread."""
- if message: print >> sys.stderr, message
- self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
-
- def reader(self):
- try:
- while True:
- try:
- frame = self.conn.read()
- except EOF, e:
- self.work.close()
- break
- ch = self.channel(frame.channel)
- ch.receive(frame, self.work)
- except VersionError, e:
- self.closed(e)
- except:
- self.fatal()
-
- def closed(self, reason):
- # We must close the delegate first because closing channels
- # may wake up waiting threads and we don't want them to see
- # the delegate as open.
- self.delegate.closed(reason)
- for ch in self.channels.values():
- ch.closed(reason)
-
- def writer(self):
- try:
- while True:
- try:
- message = self.outgoing.get()
- self.conn.write(message)
- except socket.error, e:
- self.closed(e)
- break
- self.conn.flush()
- except:
- self.fatal()
-
- def worker(self):
- try:
- while True:
- queue = self.work.get()
- frame = queue.get()
- channel = self.channel(frame.channel)
- if frame.method_type.content:
- content = read_content(queue)
- else:
- content = None
-
- self.delegate(channel, Message(channel, frame, content))
- except QueueClosed:
- self.closed("worker closed")
- except:
- self.fatal()
-
-class Requester:
-
- def __init__(self, writer):
- self.write = writer
- self.sequence = Sequence(1)
- self.mark = 0
- # request_id -> listener
- self.outstanding = {}
-
- def request(self, method, listener, content = None):
- frame = Request(self.sequence.next(), self.mark, method)
- self.outstanding[frame.id] = listener
- self.write(frame, content)
-
- def receive(self, channel, frame):
- listener = self.outstanding.pop(frame.request_id)
- listener(channel, frame)
-
-class Responder:
-
- def __init__(self, writer):
- self.write = writer
- self.sequence = Sequence(1)
-
- def respond(self, method, batch, request):
- if isinstance(request, Method):
- self.write(method)
- else:
- # allow batching from frame at either end
- if batch<0:
- frame = Response(self.sequence.next(), request.id+batch, -batch, method)
- else:
- frame = Response(self.sequence.next(), request.id, batch, method)
- self.write(frame)
-
-class Channel:
-
- def __init__(self, id, outgoing, spec):
- self.id = id
- self.outgoing = outgoing
- self.spec = spec
- self.incoming = Queue(0)
- self.responses = Queue(0)
- self.queue = None
- self._closed = False
- self.reason = None
-
- self.requester = Requester(self.write)
- self.responder = Responder(self.write)
-
- self.completion = OutgoingCompletion()
- self.incoming_completion = IncomingCompletion(self)
- self.futures = {}
- self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves
-
- self.invoker = self.invoke_method
- self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
- self.synchronous = True
-
- def closed(self, reason):
- if self._closed:
- return
- self._closed = True
- self.reason = reason
- self.incoming.close()
- self.responses.close()
- self.completion.close()
- self.incoming_completion.reset()
- for f in self.futures.values():
- f.put_response(self, reason)
-
- def write(self, frame, content = None):
- if self._closed:
- raise Closed(self.reason)
- frame.channel = self.id
- self.outgoing.put(frame)
- if (isinstance(frame, (Method, Request))
- and content == None
- and frame.method_type.content):
- content = Content()
- if content != None:
- self.write_content(frame.method_type.klass, content)
-
- def write_content(self, klass, content):
- header = Header(klass, content.weight(), content.size(), content.properties)
- self.write(header)
- for child in content.children:
- self.write_content(klass, child)
- # should split up if content.body exceeds max frame size
- if content.body:
- self.write(Body(content.body))
-
- def receive(self, frame, work):
- if isinstance(frame, Method):
- if frame.method.response:
- self.queue = self.responses
- else:
- self.queue = self.incoming
- work.put(self.incoming)
- elif isinstance(frame, Request):
- self.queue = self.incoming
- work.put(self.incoming)
- elif isinstance(frame, Response):
- self.requester.receive(self, frame)
- if frame.method_type.content:
- self.queue = self.responses
- return
- self.queue.put(frame)
-
- def queue_response(self, channel, frame):
- channel.responses.put(frame.method)
-
- def request(self, method, listener, content = None):
- self.requester.request(method, listener, content)
-
- def respond(self, method, batch, request):
- self.responder.respond(method, batch, request)
-
- def invoke(self, type, args, kwargs):
- if (type.klass.name in ["channel", "session"]) and (type.name in ["close", "open", "closed"]):
- self.completion.reset()
- self.incoming_completion.reset()
- self.completion.next_command(type)
-
- content = kwargs.pop("content", None)
- frame = Method(type, type.arguments(*args, **kwargs))
- return self.invoker(frame, content)
-
- # used for 0-9
- def invoke_reliable(self, frame, content = None):
- if not self.synchronous:
- future = Future()
- self.request(frame, future.put_response, content)
- if not frame.method.responses: return None
- else: return future
-
- self.request(frame, self.queue_response, content)
- if not frame.method.responses:
- if self.use_execution_layer and frame.method_type.is_l4_command():
- self.execution_sync()
- self.completion.wait()
- if self._closed:
- raise Closed(self.reason)
- return None
- try:
- resp = self.responses.get()
- if resp.method_type.content:
- return Message(self, resp, read_content(self.responses))
- else:
- return Message(self, resp)
- except QueueClosed, e:
- if self._closed:
- raise Closed(self.reason)
- else:
- raise e
-
- # used for 0-8 and 0-10
- def invoke_method(self, frame, content = None):
- if frame.method.result:
- cmd_id = self.completion.command_id
- future = Future()
- self.futures[cmd_id] = future
-
- self.write(frame, content)
-
- try:
- # here we depend on all nowait fields being named nowait
- f = frame.method.fields.byname["nowait"]
- nowait = frame.args[frame.method.fields.index(f)]
- except KeyError:
- nowait = False
-
- try:
- if not nowait and frame.method.responses:
- resp = self.responses.get()
- if resp.method.content:
- content = read_content(self.responses)
- else:
- content = None
- if resp.method in frame.method.responses:
- return Message(self, resp, content)
- else:
- raise ValueError(resp)
- elif frame.method.result:
- if self.synchronous:
- fr = future.get_response(timeout=10)
- if self._closed:
- raise Closed(self.reason)
- return fr
- else:
- return future
- elif self.synchronous and not frame.method.response \
- and self.use_execution_layer and frame.method.is_l4_command():
- self.execution_sync()
- completed = self.completion.wait(timeout=10)
- if self._closed:
- raise Closed(self.reason)
- if not completed:
- self.closed("Timed-out waiting for completion of %s" % frame)
- except QueueClosed, e:
- if self._closed:
- raise Closed(self.reason)
- else:
- raise e
-
- def __getattr__(self, name):
- type = self.spec.method(name)
- if type == None: raise AttributeError(name)
- method = lambda *args, **kwargs: self.invoke(type, args, kwargs)
- self.__dict__[name] = method
- return method
-
-def read_content(queue):
- header = queue.get()
- children = []
- for i in range(header.weight):
- children.append(read_content(queue))
- buf = StringIO()
- eof = header.eof
- while not eof:
- body = queue.get()
- eof = body.eof
- content = body.content
- buf.write(content)
- return Content(buf.getvalue(), children, header.properties.copy())
-
-class Future:
- def __init__(self):
- self.completed = threading.Event()
-
- def put_response(self, channel, response):
- self.response = response
- self.completed.set()
-
- def get_response(self, timeout=None):
- self.completed.wait(timeout)
- if self.completed.isSet():
- return self.response
- else:
- return None
-
- def is_complete(self):
- return self.completed.isSet()
-
-class OutgoingCompletion:
- """
- Manages completion of outgoing commands i.e. command sent by this peer
- """
-
- def __init__(self):
- self.condition = threading.Condition()
-
- #todo, implement proper wraparound
- self.sequence = Sequence(0) #issues ids for outgoing commands
- self.command_id = -1 #last issued id
- self.mark = -1 #commands up to this mark are known to be complete
- self._closed = False
-
- def next_command(self, method):
- #the following test is a hack until the track/sub-channel is available
- if method.is_l4_command():
- self.command_id = self.sequence.next()
-
- def reset(self):
- self.sequence = Sequence(0) #reset counter
-
- def close(self):
- self.reset()
- self.condition.acquire()
- try:
- self._closed = True
- self.condition.notifyAll()
- finally:
- self.condition.release()
-
- def complete(self, mark):
- self.condition.acquire()
- try:
- self.mark = mark
- #print "set mark to %s [%s] " % (self.mark, self)
- self.condition.notifyAll()
- finally:
- self.condition.release()
-
- def wait(self, point_of_interest=-1, timeout=None):
- if point_of_interest == -1: point_of_interest = self.command_id
- start_time = time()
- remaining = timeout
- self.condition.acquire()
- try:
- while not self._closed and point_of_interest > self.mark:
- #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self)
- self.condition.wait(remaining)
- if not self._closed and point_of_interest > self.mark and timeout:
- if (start_time + timeout) < time(): break
- else: remaining = timeout - (time() - start_time)
- finally:
- self.condition.release()
- return point_of_interest <= self.mark
-
-class IncomingCompletion:
- """
- Manages completion of incoming commands i.e. command received by this peer
- """
-
- def __init__(self, channel):
- self.sequence = Sequence(0) #issues ids for incoming commands
- self.mark = -1 #id of last command of whose completion notification was sent to the other peer
- self.channel = channel
-
- def reset(self):
- self.sequence = Sequence(0) #reset counter
-
- def complete(self, mark, cumulative=True):
- if cumulative:
- if mark > self.mark:
- self.mark = mark
- self.channel.execution_complete(cumulative_execution_mark=self.mark)
- else:
- #TODO: record and manage the ranges properly
- range = [mark, mark]
- if (self.mark == -1):#hack until wraparound is implemented
- self.channel.execution_complete(cumulative_execution_mark=0xFFFFFFFFL, ranged_execution_set=range)
- else:
- self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range)