diff options
Diffstat (limited to 'lib/sqlalchemy/util')
| -rw-r--r-- | lib/sqlalchemy/util/__init__.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/util/_concurrency_py3k.py | 110 | ||||
| -rw-r--r-- | lib/sqlalchemy/util/concurrency.py | 21 | ||||
| -rw-r--r-- | lib/sqlalchemy/util/queue.py | 64 |
4 files changed, 199 insertions, 0 deletions
diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py index ce9602745..1e3eb9a29 100644 --- a/lib/sqlalchemy/util/__init__.py +++ b/lib/sqlalchemy/util/__init__.py @@ -90,6 +90,10 @@ from .compat import unquote_plus # noqa from .compat import win32 # noqa from .compat import with_metaclass # noqa from .compat import zip_longest # noqa +from .concurrency import asyncio # noqa +from .concurrency import await_fallback # noqa +from .concurrency import await_only # noqa +from .concurrency import greenlet_spawn # noqa from .deprecations import deprecated # noqa from .deprecations import deprecated_20 # noqa from .deprecations import deprecated_20_cls # noqa diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py new file mode 100644 index 000000000..3b112ff7d --- /dev/null +++ b/lib/sqlalchemy/util/_concurrency_py3k.py @@ -0,0 +1,110 @@ +import asyncio +import sys +from typing import Any +from typing import Callable +from typing import Coroutine + +from .. import exc + +try: + import greenlet + + # implementation based on snaury gist at + # https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef + # Issue for context: https://github.com/python-greenlet/greenlet/issues/173 + + class _AsyncIoGreenlet(greenlet.greenlet): + def __init__(self, fn, driver): + greenlet.greenlet.__init__(self, fn, driver) + self.driver = driver + + def await_only(awaitable: Coroutine) -> Any: + """Awaits an async function in a sync method. + + The sync method must be insice a :func:`greenlet_spawn` context. + :func:`await_` calls cannot be nested. + + :param awaitable: The coroutine to call. + + """ + # this is called in the context greenlet while running fn + current = greenlet.getcurrent() + if not isinstance(current, _AsyncIoGreenlet): + raise exc.InvalidRequestError( + "greenlet_spawn has not been called; can't call await_() here." + ) + + # returns the control to the driver greenlet passing it + # a coroutine to run. Once the awaitable is done, the driver greenlet + # switches back to this greenlet with the result of awaitable that is + # then returned to the caller (or raised as error) + return current.driver.switch(awaitable) + + def await_fallback(awaitable: Coroutine) -> Any: + """Awaits an async function in a sync method. + + The sync method must be insice a :func:`greenlet_spawn` context. + :func:`await_` calls cannot be nested. + + :param awaitable: The coroutine to call. + + """ + # this is called in the context greenlet while running fn + current = greenlet.getcurrent() + if not isinstance(current, _AsyncIoGreenlet): + loop = asyncio.get_event_loop() + if loop.is_running(): + raise exc.InvalidRequestError( + "greenlet_spawn has not been called and asyncio event " + "loop is already running; can't call await_() here." + ) + return loop.run_until_complete(awaitable) + + return current.driver.switch(awaitable) + + async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any: + """Runs a sync function ``fn`` in a new greenlet. + + The sync function can then use :func:`await_` to wait for async + functions. + + :param fn: The sync callable to call. + :param \\*args: Positional arguments to pass to the ``fn`` callable. + :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable. + """ + context = _AsyncIoGreenlet(fn, greenlet.getcurrent()) + # runs the function synchronously in gl greenlet. If the execution + # is interrupted by await_, context is not dead and result is a + # coroutine to wait. If the context is dead the function has + # returned, and its result can be returned. + try: + result = context.switch(*args, **kwargs) + while not context.dead: + try: + # wait for a coroutine from await_ and then return its + # result back to it. + value = await result + except Exception: + # this allows an exception to be raised within + # the moderated greenlet so that it can continue + # its expected flow. + result = context.throw(*sys.exc_info()) + else: + result = context.switch(value) + finally: + # clean up to avoid cycle resolution by gc + del context.driver + return result + + +except ImportError: # pragma: no cover + greenlet = None + + def await_fallback(awaitable): + return asyncio.get_event_loop().run_until_complete(awaitable) + + def await_only(awaitable): + raise ValueError("Greenlet is required to use this function") + + async def greenlet_spawn(fn, *args, **kw): + raise ValueError("Greenlet is required to use this function") diff --git a/lib/sqlalchemy/util/concurrency.py b/lib/sqlalchemy/util/concurrency.py new file mode 100644 index 000000000..4c4ea20d1 --- /dev/null +++ b/lib/sqlalchemy/util/concurrency.py @@ -0,0 +1,21 @@ +from . import compat + + +if compat.py3k: + import asyncio + from ._concurrency_py3k import await_only + from ._concurrency_py3k import await_fallback + from ._concurrency_py3k import greenlet + from ._concurrency_py3k import greenlet_spawn +else: + asyncio = None + greenlet = None + + def await_only(thing): + return thing + + def await_fallback(thing): + return thing + + def greenlet_spawn(fn, *args, **kw): + raise ValueError("Cannot use this function in py2.") diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index 3433657d6..5f71c7bd6 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -21,7 +21,10 @@ condition. from collections import deque from time import time as _time +from . import compat from .compat import threading +from .concurrency import asyncio +from .concurrency import await_fallback __all__ = ["Empty", "Full", "Queue"] @@ -196,3 +199,64 @@ class Queue: else: # FIFO return self.queue.popleft() + + +class AsyncAdaptedQueue: + await_ = await_fallback + + def __init__(self, maxsize=0, use_lifo=False): + if use_lifo: + self._queue = asyncio.LifoQueue(maxsize=maxsize) + else: + self._queue = asyncio.Queue(maxsize=maxsize) + self.maxsize = maxsize + self.empty = self._queue.empty + self.full = self._queue.full + self.qsize = self._queue.qsize + + def put_nowait(self, item): + try: + return self._queue.put_nowait(item) + except asyncio.queues.QueueFull as err: + compat.raise_( + Full(), replace_context=err, + ) + + def put(self, item, block=True, timeout=None): + if not block: + return self.put_nowait(item) + + try: + if timeout: + return self.await_( + asyncio.wait_for(self._queue.put(item), timeout) + ) + else: + return self.await_(self._queue.put(item)) + except asyncio.queues.QueueFull as err: + compat.raise_( + Full(), replace_context=err, + ) + + def get_nowait(self): + try: + return self._queue.get_nowait() + except asyncio.queues.QueueEmpty as err: + compat.raise_( + Empty(), replace_context=err, + ) + + def get(self, block=True, timeout=None): + if not block: + return self.get_nowait() + try: + if timeout: + return self.await_( + asyncio.wait_for(self._queue.get(), timeout) + ) + else: + return self.await_(self._queue.get()) + except asyncio.queues.QueueEmpty as err: + compat.raise_( + Empty(), replace_context=err, + ) |
