diff options
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r-- | python/qpid/session.py | 308 |
1 files changed, 0 insertions, 308 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py deleted file mode 100644 index 95714a128a..0000000000 --- a/python/qpid/session.py +++ /dev/null @@ -1,308 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from threading import Condition, RLock, Lock, currentThread -from generator import command_invoker -from datatypes import RangedSet, Struct, Future -from codec010 import StringCodec -from queue import Queue -from datatypes import Message, serial -from ops import Command, MessageTransfer -from util import wait, notify -from exceptions import * -from logging import getLogger - -log = getLogger("qpid.io.cmd") -msg = getLogger("qpid.io.msg") - -class SessionException(Exception): pass -class SessionClosed(SessionException): pass -class SessionDetached(SessionException): pass - -def client(*args): - return Client(*args) - -def server(*args): - return Server(*args) - -INCOMPLETE = object() - -class Session(command_invoker()): - - def __init__(self, name, auto_sync=True, timeout=10, delegate=client): - self.name = name - self.auto_sync = auto_sync - self.need_sync = True - self.timeout = timeout - self.channel = None - self.invoke_lock = Lock() - self._closing = False - self._closed = False - - self.condition = Condition() - - self.send_id = True - self.receiver = Receiver(self) - self.sender = Sender(self) - - self.lock = RLock() - self._incoming = {} - self.results = {} - self.exceptions = [] - - self.delegate = delegate(self) - - def incoming(self, destination): - self.lock.acquire() - try: - queue = self._incoming.get(destination) - if queue == None: - queue = Incoming(self, destination) - self._incoming[destination] = queue - return queue - finally: - self.lock.release() - - def error(self): - exc = self.exceptions[:] - if len(exc) == 0: - return None - elif len(exc) == 1: - return exc[0] - else: - return tuple(exc) - - def sync(self, timeout=None): - ch = self.channel - if ch is not None and currentThread() == ch.connection.thread: - raise SessionException("deadlock detected") - if self.need_sync: - self.execution_sync(sync=True) - last = self.sender.next_id - 1 - if not wait(self.condition, lambda: - last in self.sender._completed or self.exceptions, - timeout): - raise Timeout() - if self.exceptions: - raise SessionException(self.error()) - - def close(self, timeout=None): - self.invoke_lock.acquire() - try: - self._closing = True - self.channel.session_detach(self.name) - finally: - self.invoke_lock.release() - if not wait(self.condition, lambda: self._closed, timeout): - raise Timeout() - - def closed(self): - self.lock.acquire() - try: - if self._closed: return - - error = self.error() - for id in self.results: - f = self.results[id] - f.error(error) - self.results.clear() - - for q in self._incoming.values(): - q.close(error) - - self._closed = True - notify(self.condition) - finally: - self.lock.release() - - def invoke(self, op, args, kwargs): - if issubclass(op, Command): - self.invoke_lock.acquire() - try: - return self.do_invoke(op, args, kwargs) - finally: - self.invoke_lock.release() - else: - return op(*args, **kwargs) - - def do_invoke(self, op, args, kwargs): - if self._closing: - raise SessionClosed() - - ch = self.channel - if ch == None: - raise SessionDetached() - - if op == MessageTransfer: - if len(args) == len(op.FIELDS) + 1: - message = args[-1] - args = args[:-1] - else: - message = kwargs.pop("message", None) - if message is not None: - kwargs["headers"] = message.headers - kwargs["payload"] = message.body - - cmd = op(*args, **kwargs) - cmd.sync = self.auto_sync or cmd.sync - self.need_sync = not cmd.sync - cmd.channel = ch.id - - if op.RESULT: - result = Future(exception=SessionException) - self.results[self.sender.next_id] = result - - self.send(cmd) - - log.debug("SENT %s", cmd) - if op == MessageTransfer: - msg.debug("SENT %s", cmd) - - if op.RESULT: - if self.auto_sync: - return result.get(self.timeout) - else: - return result - elif self.auto_sync: - self.sync(self.timeout) - - def received(self, cmd): - self.receiver.received(cmd) - self.dispatch(cmd) - - def dispatch(self, cmd): - log.debug("RECV %s", cmd) - - result = getattr(self.delegate, cmd.NAME)(cmd) - if result is INCOMPLETE: - return - elif result is not None: - self.execution_result(cmd.id, result) - - self.receiver.completed(cmd) - # XXX: don't forget to obey sync for manual completion as well - if cmd.sync: - self.channel.session_completed(self.receiver._completed) - - def send(self, cmd): - self.sender.send(cmd) - - def __repr__(self): - return '<Session: %s, %s>' % (self.name, self.channel) - -class Receiver: - - def __init__(self, session): - self.session = session - self.next_id = None - self._completed = RangedSet() - - def received(self, cmd): - if self.next_id == None: - raise Exception("todo") - cmd.id = self.next_id - self.next_id += 1 - - def completed(self, cmd): - if cmd.id == None: - raise ValueError("cannot complete unidentified command") - self._completed.add(cmd.id) - - def known_completed(self, commands): - completed = RangedSet() - for c in self._completed.ranges: - for kc in commands.ranges: - if c.lower in kc and c.upper in kc: - break - else: - completed.add_range(c) - self._completed = completed - -class Sender: - - def __init__(self, session): - self.session = session - self.next_id = serial(0) - self.commands = [] - self._completed = RangedSet() - - def send(self, cmd): - ch = self.session.channel - if ch is None: - raise SessionDetached() - cmd.id = self.next_id - self.next_id += 1 - if self.session.send_id: - self.session.send_id = False - ch.session_command_point(cmd.id, 0) - self.commands.append(cmd) - ch.connection.write_op(cmd) - - def completed(self, commands): - idx = 0 - while idx < len(self.commands): - cmd = self.commands[idx] - if cmd.id in commands: - del self.commands[idx] - else: - idx += 1 - for range in commands.ranges: - self._completed.add(range.lower, range.upper) - -class Incoming(Queue): - - def __init__(self, session, destination): - Queue.__init__(self) - self.session = session - self.destination = destination - - def start(self): - self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit) - for unit in self.session.credit_unit.VALUES: - self.session.message_flow(self.destination, unit, 0xFFFFFFFFL) - - def stop(self): - self.session.message_cancel(self.destination) - self.listen(None) - -class Delegate: - - def __init__(self, session): - self.session = session - - #XXX: do something with incoming accepts - def message_accept(self, ma): None - - def execution_result(self, er): - future = self.session.results.pop(er.command_id) - future.set(er.value) - - def execution_exception(self, ex): - self.session.exceptions.append(ex) - -class Client(Delegate): - - def message_transfer(self, cmd): - m = Message(cmd.payload) - m.headers = cmd.headers - m.id = cmd.id - messages = self.session.incoming(cmd.destination) - messages.put(m) - msg.debug("RECV %s", m) - return INCOMPLETE |