summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/util/_concurrency_py3k.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2021-05-26 13:46:36 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2021-05-27 12:12:56 -0400
commitff1d989c1433c96ac1a50c3662e77ec1025c57cb (patch)
tree53d40e9839597f84aef668a860bf838fa6264c53 /lib/sqlalchemy/util/_concurrency_py3k.py
parent37db4530baaedfff078c5ced83e088bd631a8d75 (diff)
downloadsqlalchemy-ff1d989c1433c96ac1a50c3662e77ec1025c57cb.tar.gz
get tests to pass on python 3.10
Resolved various deprecation warnings which were appearing as of Python version 3.10.0b1. block aiomysql on python 3.10 as they are using the "loop" argument that's removed sqlcipher-binary has no builds on 3.10, block it for 3.10 Fixes: #6540 Fixes: #6543 Change-Id: Iec1e3881fb289878881ae043b1a18c3ecdf5f077
Diffstat (limited to 'lib/sqlalchemy/util/_concurrency_py3k.py')
-rw-r--r--lib/sqlalchemy/util/_concurrency_py3k.py21
1 files changed, 18 insertions, 3 deletions
diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py
index 5f03972b2..3b60e6584 100644
--- a/lib/sqlalchemy/util/_concurrency_py3k.py
+++ b/lib/sqlalchemy/util/_concurrency_py3k.py
@@ -74,7 +74,7 @@ def await_fallback(awaitable: Coroutine) -> Any:
# this is called in the context greenlet while running fn
current = greenlet.getcurrent()
if not isinstance(current, _AsyncIoGreenlet):
- loop = asyncio.get_event_loop()
+ loop = get_event_loop()
if loop.is_running():
raise exc.MissingGreenlet(
"greenlet_spawn has not been called and asyncio event "
@@ -152,7 +152,7 @@ class AsyncAdaptedLock:
def _util_async_run_coroutine_function(fn, *args, **kwargs):
"""for test suite/ util only"""
- loop = asyncio.get_event_loop()
+ loop = get_event_loop()
if loop.is_running():
raise Exception(
"for async run coroutine we expect that no greenlet or event "
@@ -164,10 +164,25 @@ def _util_async_run_coroutine_function(fn, *args, **kwargs):
def _util_async_run(fn, *args, **kwargs):
"""for test suite/ util only"""
- loop = asyncio.get_event_loop()
+ loop = get_event_loop()
if not loop.is_running():
return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs))
else:
# allow for a wrapped test function to call another
assert isinstance(greenlet.getcurrent(), _AsyncIoGreenlet)
return fn(*args, **kwargs)
+
+
+def get_event_loop():
+ """vendor asyncio.get_event_loop() for python 3.7 and above.
+
+ Python 3.10 deprecates get_event_loop() as a standalone.
+
+ """
+ if compat.py37:
+ try:
+ return asyncio.get_running_loop()
+ except RuntimeError:
+ return asyncio.get_event_loop_policy().get_event_loop()
+ else:
+ return asyncio.get_event_loop()