summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/util
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/util')
-rw-r--r--lib/sqlalchemy/util/__init__.py4
-rw-r--r--lib/sqlalchemy/util/_concurrency_py3k.py110
-rw-r--r--lib/sqlalchemy/util/concurrency.py21
-rw-r--r--lib/sqlalchemy/util/queue.py64
4 files changed, 199 insertions, 0 deletions
diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py
index ce9602745..1e3eb9a29 100644
--- a/lib/sqlalchemy/util/__init__.py
+++ b/lib/sqlalchemy/util/__init__.py
@@ -90,6 +90,10 @@ from .compat import unquote_plus # noqa
from .compat import win32 # noqa
from .compat import with_metaclass # noqa
from .compat import zip_longest # noqa
+from .concurrency import asyncio # noqa
+from .concurrency import await_fallback # noqa
+from .concurrency import await_only # noqa
+from .concurrency import greenlet_spawn # noqa
from .deprecations import deprecated # noqa
from .deprecations import deprecated_20 # noqa
from .deprecations import deprecated_20_cls # noqa
diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py
new file mode 100644
index 000000000..3b112ff7d
--- /dev/null
+++ b/lib/sqlalchemy/util/_concurrency_py3k.py
@@ -0,0 +1,110 @@
+import asyncio
+import sys
+from typing import Any
+from typing import Callable
+from typing import Coroutine
+
+from .. import exc
+
+try:
+ import greenlet
+
+ # implementation based on snaury gist at
+ # https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef
+ # Issue for context: https://github.com/python-greenlet/greenlet/issues/173
+
+ class _AsyncIoGreenlet(greenlet.greenlet):
+ def __init__(self, fn, driver):
+ greenlet.greenlet.__init__(self, fn, driver)
+ self.driver = driver
+
+ def await_only(awaitable: Coroutine) -> Any:
+ """Awaits an async function in a sync method.
+
+ The sync method must be insice a :func:`greenlet_spawn` context.
+ :func:`await_` calls cannot be nested.
+
+ :param awaitable: The coroutine to call.
+
+ """
+ # this is called in the context greenlet while running fn
+ current = greenlet.getcurrent()
+ if not isinstance(current, _AsyncIoGreenlet):
+ raise exc.InvalidRequestError(
+ "greenlet_spawn has not been called; can't call await_() here."
+ )
+
+ # returns the control to the driver greenlet passing it
+ # a coroutine to run. Once the awaitable is done, the driver greenlet
+ # switches back to this greenlet with the result of awaitable that is
+ # then returned to the caller (or raised as error)
+ return current.driver.switch(awaitable)
+
+ def await_fallback(awaitable: Coroutine) -> Any:
+ """Awaits an async function in a sync method.
+
+ The sync method must be insice a :func:`greenlet_spawn` context.
+ :func:`await_` calls cannot be nested.
+
+ :param awaitable: The coroutine to call.
+
+ """
+ # this is called in the context greenlet while running fn
+ current = greenlet.getcurrent()
+ if not isinstance(current, _AsyncIoGreenlet):
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
+ raise exc.InvalidRequestError(
+ "greenlet_spawn has not been called and asyncio event "
+ "loop is already running; can't call await_() here."
+ )
+ return loop.run_until_complete(awaitable)
+
+ return current.driver.switch(awaitable)
+
+ async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any:
+ """Runs a sync function ``fn`` in a new greenlet.
+
+ The sync function can then use :func:`await_` to wait for async
+ functions.
+
+ :param fn: The sync callable to call.
+ :param \\*args: Positional arguments to pass to the ``fn`` callable.
+ :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable.
+ """
+ context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
+ # runs the function synchronously in gl greenlet. If the execution
+ # is interrupted by await_, context is not dead and result is a
+ # coroutine to wait. If the context is dead the function has
+ # returned, and its result can be returned.
+ try:
+ result = context.switch(*args, **kwargs)
+ while not context.dead:
+ try:
+ # wait for a coroutine from await_ and then return its
+ # result back to it.
+ value = await result
+ except Exception:
+ # this allows an exception to be raised within
+ # the moderated greenlet so that it can continue
+ # its expected flow.
+ result = context.throw(*sys.exc_info())
+ else:
+ result = context.switch(value)
+ finally:
+ # clean up to avoid cycle resolution by gc
+ del context.driver
+ return result
+
+
+except ImportError: # pragma: no cover
+ greenlet = None
+
+ def await_fallback(awaitable):
+ return asyncio.get_event_loop().run_until_complete(awaitable)
+
+ def await_only(awaitable):
+ raise ValueError("Greenlet is required to use this function")
+
+ async def greenlet_spawn(fn, *args, **kw):
+ raise ValueError("Greenlet is required to use this function")
diff --git a/lib/sqlalchemy/util/concurrency.py b/lib/sqlalchemy/util/concurrency.py
new file mode 100644
index 000000000..4c4ea20d1
--- /dev/null
+++ b/lib/sqlalchemy/util/concurrency.py
@@ -0,0 +1,21 @@
+from . import compat
+
+
+if compat.py3k:
+ import asyncio
+ from ._concurrency_py3k import await_only
+ from ._concurrency_py3k import await_fallback
+ from ._concurrency_py3k import greenlet
+ from ._concurrency_py3k import greenlet_spawn
+else:
+ asyncio = None
+ greenlet = None
+
+ def await_only(thing):
+ return thing
+
+ def await_fallback(thing):
+ return thing
+
+ def greenlet_spawn(fn, *args, **kw):
+ raise ValueError("Cannot use this function in py2.")
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py
index 3433657d6..5f71c7bd6 100644
--- a/lib/sqlalchemy/util/queue.py
+++ b/lib/sqlalchemy/util/queue.py
@@ -21,7 +21,10 @@ condition.
from collections import deque
from time import time as _time
+from . import compat
from .compat import threading
+from .concurrency import asyncio
+from .concurrency import await_fallback
__all__ = ["Empty", "Full", "Queue"]
@@ -196,3 +199,64 @@ class Queue:
else:
# FIFO
return self.queue.popleft()
+
+
+class AsyncAdaptedQueue:
+ await_ = await_fallback
+
+ def __init__(self, maxsize=0, use_lifo=False):
+ if use_lifo:
+ self._queue = asyncio.LifoQueue(maxsize=maxsize)
+ else:
+ self._queue = asyncio.Queue(maxsize=maxsize)
+ self.maxsize = maxsize
+ self.empty = self._queue.empty
+ self.full = self._queue.full
+ self.qsize = self._queue.qsize
+
+ def put_nowait(self, item):
+ try:
+ return self._queue.put_nowait(item)
+ except asyncio.queues.QueueFull as err:
+ compat.raise_(
+ Full(), replace_context=err,
+ )
+
+ def put(self, item, block=True, timeout=None):
+ if not block:
+ return self.put_nowait(item)
+
+ try:
+ if timeout:
+ return self.await_(
+ asyncio.wait_for(self._queue.put(item), timeout)
+ )
+ else:
+ return self.await_(self._queue.put(item))
+ except asyncio.queues.QueueFull as err:
+ compat.raise_(
+ Full(), replace_context=err,
+ )
+
+ def get_nowait(self):
+ try:
+ return self._queue.get_nowait()
+ except asyncio.queues.QueueEmpty as err:
+ compat.raise_(
+ Empty(), replace_context=err,
+ )
+
+ def get(self, block=True, timeout=None):
+ if not block:
+ return self.get_nowait()
+ try:
+ if timeout:
+ return self.await_(
+ asyncio.wait_for(self._queue.get(), timeout)
+ )
+ else:
+ return self.await_(self._queue.get())
+ except asyncio.queues.QueueEmpty as err:
+ compat.raise_(
+ Empty(), replace_context=err,
+ )