diff options
Diffstat (limited to 'urwid/event_loop/twisted_loop.py')
| -rw-r--r-- | urwid/event_loop/twisted_loop.py | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/urwid/event_loop/twisted_loop.py b/urwid/event_loop/twisted_loop.py new file mode 100644 index 0000000..4637c1b --- /dev/null +++ b/urwid/event_loop/twisted_loop.py @@ -0,0 +1,236 @@ +# Urwid main loop code +# Copyright (C) 2004-2012 Ian Ward +# Copyright (C) 2008 Walter Mundt +# Copyright (C) 2009 Andrew Psaltis +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Urwid web site: https://urwid.org/ + +"""Twisted Reactor based urwid EventLoop implementation. + +Twisted library is required. +""" + +from __future__ import annotations + +import sys +import typing +from collections.abc import Callable + +from twisted.internet.abstract import FileDescriptor +from twisted.internet.error import AlreadyCalled, AlreadyCancelled + +from .abstract_loop import EventLoop, ExitMainLoop + +if typing.TYPE_CHECKING: + from twisted.internet.interfaces import IReactorFDSet + from typing_extensions import ParamSpec + + _Spec = ParamSpec("_Spec") + _T = typing.TypeVar("_T") + +__all__ = ("TwistedEventLoop",) + + +class _TwistedInputDescriptor(FileDescriptor): + def __init__(self, reactor: IReactorFDSet, fd: int, cb: Callable[[], typing.Any]) -> None: + self._fileno = fd + self.cb = cb + super().__init__(reactor) + + def fileno(self) -> int: + return self._fileno + + def doRead(self): + return self.cb() + + +class TwistedEventLoop(EventLoop): + """ + Event loop based on Twisted_ + """ + _idle_emulation_delay = 1.0/256 # a short time (in seconds) + + def __init__(self, reactor=None, manage_reactor: bool = True) -> None: + """ + :param reactor: reactor to use + :type reactor: :class:`twisted.internet.reactor`. + :param: manage_reactor: `True` if you want this event loop to run + and stop the reactor. + :type manage_reactor: boolean + + .. WARNING:: + Twisted's reactor doesn't like to be stopped and run again. If you + need to stop and run your :class:`MainLoop`, consider setting + ``manage_reactor=False`` and take care of running/stopping the reactor + at the beginning/ending of your program yourself. + + You can also forego using :class:`MainLoop`'s run() entirely, and + instead call start() and stop() before and after starting the + reactor. + + .. _Twisted: https://twisted.org/ + """ + if reactor is None: + import twisted.internet.reactor + reactor = twisted.internet.reactor + self.reactor = reactor + self._watch_files: dict[int, _TwistedInputDescriptor] = {} + self._idle_handle: int = 0 + self._twisted_idle_enabled = False + self._idle_callbacks: dict[int, Callable[[], typing.Any]] = {} + self._exc: BaseException | None = None + self.manage_reactor = manage_reactor + self._enable_twisted_idle() + + def alarm(self, seconds: float | int, callback: Callable[[], typing.Any]): + """ + Call callback() a given time from now. No parameters are + passed to callback. + + Returns a handle that may be passed to remove_alarm() + + seconds -- floating point time to wait before calling callback + callback -- function to call from event loop + """ + handle = self.reactor.callLater(seconds, self.handle_exit(callback)) + return handle + + def remove_alarm(self, handle) -> bool: + """ + Remove an alarm. + + Returns True if the alarm exists, False otherwise + """ + try: + handle.cancel() + return True + except AlreadyCancelled: + return False + except AlreadyCalled: + return False + + def watch_file(self, fd: int, callback: Callable[[], typing.Any]) -> int: + """ + Call callback() when fd has some data to read. No parameters + are passed to callback. + + Returns a handle that may be passed to remove_watch_file() + + fd -- file descriptor to watch for input + callback -- function to call when input is available + """ + ind = _TwistedInputDescriptor(self.reactor, fd, self.handle_exit(callback)) + self._watch_files[fd] = ind + self.reactor.addReader(ind) + return fd + + def remove_watch_file(self, handle: int) -> bool: + """ + Remove an input file. + + Returns True if the input file exists, False otherwise + """ + if handle in self._watch_files: + self.reactor.removeReader(self._watch_files[handle]) + del self._watch_files[handle] + return True + return False + + def enter_idle(self, callback: Callable[[], typing.Any]) -> int: + """ + Add a callback for entering idle. + + Returns a handle that may be passed to remove_enter_idle() + """ + self._idle_handle += 1 + self._idle_callbacks[self._idle_handle] = callback + return self._idle_handle + + def _enable_twisted_idle(self) -> None: + """ + Twisted's reactors don't have an idle or enter-idle callback + so the best we can do for now is to set a timer event in a very + short time to approximate an enter-idle callback. + + .. WARNING:: + This will perform worse than the other event loops until we can find a + fix or workaround + """ + if self._twisted_idle_enabled: + return + self.reactor.callLater( + self._idle_emulation_delay, + self.handle_exit(self._twisted_idle_callback, enable_idle=False), + ) + self._twisted_idle_enabled = True + + def _twisted_idle_callback(self) -> None: + for callback in self._idle_callbacks.values(): + callback() + self._twisted_idle_enabled = False + + def remove_enter_idle(self, handle) -> bool: + """ + Remove an idle callback. + + Returns True if the handle was removed. + """ + try: + del self._idle_callbacks[handle] + except KeyError: + return False + return True + + def run(self) -> None: + """ + Start the event loop. Exit the loop when any callback raises + an exception. If ExitMainLoop is raised, exit cleanly. + """ + if not self.manage_reactor: + return + self.reactor.run() + if self._exc: + # An exception caused us to exit, raise it now + exc = self._exc + self._exc = None + raise exc.with_traceback(exc.__traceback__) + + def handle_exit(self, f: Callable[_Spec, _T], enable_idle: bool = True) -> Callable[_Spec, _T | None]: + """ + Decorator that cleanly exits the :class:`TwistedEventLoop` if + :class:`ExitMainLoop` is thrown inside of the wrapped function. Store the + exception info if some other exception occurs, it will be reraised after + the loop quits. + + *f* -- function to be wrapped + """ + def wrapper(*args: _Spec.args, **kwargs: _Spec.kwargs) -> _T | None: + rval = None + try: + rval = f(*args, **kwargs) + except ExitMainLoop: + if self.manage_reactor: + self.reactor.stop() + except BaseException as exc: + print(sys.exc_info()) + self._exc = exc + if self.manage_reactor: + self.reactor.crash() + if enable_idle: + self._enable_twisted_idle() + return rval + return wrapper |
