summaryrefslogtreecommitdiff
path: root/python/qpid/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r--python/qpid/session.py308
1 files changed, 0 insertions, 308 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py
deleted file mode 100644
index 95714a128a..0000000000
--- a/python/qpid/session.py
+++ /dev/null
@@ -1,308 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from threading import Condition, RLock, Lock, currentThread
-from generator import command_invoker
-from datatypes import RangedSet, Struct, Future
-from codec010 import StringCodec
-from queue import Queue
-from datatypes import Message, serial
-from ops import Command, MessageTransfer
-from util import wait, notify
-from exceptions import *
-from logging import getLogger
-
-log = getLogger("qpid.io.cmd")
-msg = getLogger("qpid.io.msg")
-
-class SessionException(Exception): pass
-class SessionClosed(SessionException): pass
-class SessionDetached(SessionException): pass
-
-def client(*args):
- return Client(*args)
-
-def server(*args):
- return Server(*args)
-
-INCOMPLETE = object()
-
-class Session(command_invoker()):
-
- def __init__(self, name, auto_sync=True, timeout=10, delegate=client):
- self.name = name
- self.auto_sync = auto_sync
- self.need_sync = True
- self.timeout = timeout
- self.channel = None
- self.invoke_lock = Lock()
- self._closing = False
- self._closed = False
-
- self.condition = Condition()
-
- self.send_id = True
- self.receiver = Receiver(self)
- self.sender = Sender(self)
-
- self.lock = RLock()
- self._incoming = {}
- self.results = {}
- self.exceptions = []
-
- self.delegate = delegate(self)
-
- def incoming(self, destination):
- self.lock.acquire()
- try:
- queue = self._incoming.get(destination)
- if queue == None:
- queue = Incoming(self, destination)
- self._incoming[destination] = queue
- return queue
- finally:
- self.lock.release()
-
- def error(self):
- exc = self.exceptions[:]
- if len(exc) == 0:
- return None
- elif len(exc) == 1:
- return exc[0]
- else:
- return tuple(exc)
-
- def sync(self, timeout=None):
- ch = self.channel
- if ch is not None and currentThread() == ch.connection.thread:
- raise SessionException("deadlock detected")
- if self.need_sync:
- self.execution_sync(sync=True)
- last = self.sender.next_id - 1
- if not wait(self.condition, lambda:
- last in self.sender._completed or self.exceptions,
- timeout):
- raise Timeout()
- if self.exceptions:
- raise SessionException(self.error())
-
- def close(self, timeout=None):
- self.invoke_lock.acquire()
- try:
- self._closing = True
- self.channel.session_detach(self.name)
- finally:
- self.invoke_lock.release()
- if not wait(self.condition, lambda: self._closed, timeout):
- raise Timeout()
-
- def closed(self):
- self.lock.acquire()
- try:
- if self._closed: return
-
- error = self.error()
- for id in self.results:
- f = self.results[id]
- f.error(error)
- self.results.clear()
-
- for q in self._incoming.values():
- q.close(error)
-
- self._closed = True
- notify(self.condition)
- finally:
- self.lock.release()
-
- def invoke(self, op, args, kwargs):
- if issubclass(op, Command):
- self.invoke_lock.acquire()
- try:
- return self.do_invoke(op, args, kwargs)
- finally:
- self.invoke_lock.release()
- else:
- return op(*args, **kwargs)
-
- def do_invoke(self, op, args, kwargs):
- if self._closing:
- raise SessionClosed()
-
- ch = self.channel
- if ch == None:
- raise SessionDetached()
-
- if op == MessageTransfer:
- if len(args) == len(op.FIELDS) + 1:
- message = args[-1]
- args = args[:-1]
- else:
- message = kwargs.pop("message", None)
- if message is not None:
- kwargs["headers"] = message.headers
- kwargs["payload"] = message.body
-
- cmd = op(*args, **kwargs)
- cmd.sync = self.auto_sync or cmd.sync
- self.need_sync = not cmd.sync
- cmd.channel = ch.id
-
- if op.RESULT:
- result = Future(exception=SessionException)
- self.results[self.sender.next_id] = result
-
- self.send(cmd)
-
- log.debug("SENT %s", cmd)
- if op == MessageTransfer:
- msg.debug("SENT %s", cmd)
-
- if op.RESULT:
- if self.auto_sync:
- return result.get(self.timeout)
- else:
- return result
- elif self.auto_sync:
- self.sync(self.timeout)
-
- def received(self, cmd):
- self.receiver.received(cmd)
- self.dispatch(cmd)
-
- def dispatch(self, cmd):
- log.debug("RECV %s", cmd)
-
- result = getattr(self.delegate, cmd.NAME)(cmd)
- if result is INCOMPLETE:
- return
- elif result is not None:
- self.execution_result(cmd.id, result)
-
- self.receiver.completed(cmd)
- # XXX: don't forget to obey sync for manual completion as well
- if cmd.sync:
- self.channel.session_completed(self.receiver._completed)
-
- def send(self, cmd):
- self.sender.send(cmd)
-
- def __repr__(self):
- return '<Session: %s, %s>' % (self.name, self.channel)
-
-class Receiver:
-
- def __init__(self, session):
- self.session = session
- self.next_id = None
- self._completed = RangedSet()
-
- def received(self, cmd):
- if self.next_id == None:
- raise Exception("todo")
- cmd.id = self.next_id
- self.next_id += 1
-
- def completed(self, cmd):
- if cmd.id == None:
- raise ValueError("cannot complete unidentified command")
- self._completed.add(cmd.id)
-
- def known_completed(self, commands):
- completed = RangedSet()
- for c in self._completed.ranges:
- for kc in commands.ranges:
- if c.lower in kc and c.upper in kc:
- break
- else:
- completed.add_range(c)
- self._completed = completed
-
-class Sender:
-
- def __init__(self, session):
- self.session = session
- self.next_id = serial(0)
- self.commands = []
- self._completed = RangedSet()
-
- def send(self, cmd):
- ch = self.session.channel
- if ch is None:
- raise SessionDetached()
- cmd.id = self.next_id
- self.next_id += 1
- if self.session.send_id:
- self.session.send_id = False
- ch.session_command_point(cmd.id, 0)
- self.commands.append(cmd)
- ch.connection.write_op(cmd)
-
- def completed(self, commands):
- idx = 0
- while idx < len(self.commands):
- cmd = self.commands[idx]
- if cmd.id in commands:
- del self.commands[idx]
- else:
- idx += 1
- for range in commands.ranges:
- self._completed.add(range.lower, range.upper)
-
-class Incoming(Queue):
-
- def __init__(self, session, destination):
- Queue.__init__(self)
- self.session = session
- self.destination = destination
-
- def start(self):
- self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit)
- for unit in self.session.credit_unit.VALUES:
- self.session.message_flow(self.destination, unit, 0xFFFFFFFFL)
-
- def stop(self):
- self.session.message_cancel(self.destination)
- self.listen(None)
-
-class Delegate:
-
- def __init__(self, session):
- self.session = session
-
- #XXX: do something with incoming accepts
- def message_accept(self, ma): None
-
- def execution_result(self, er):
- future = self.session.results.pop(er.command_id)
- future.set(er.value)
-
- def execution_exception(self, ex):
- self.session.exceptions.append(ex)
-
-class Client(Delegate):
-
- def message_transfer(self, cmd):
- m = Message(cmd.payload)
- m.headers = cmd.headers
- m.id = cmd.id
- messages = self.session.incoming(cmd.destination)
- messages.put(m)
- msg.debug("RECV %s", m)
- return INCOMPLETE