summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/sqlalchemy/event/attr.py4
-rw-r--r--lib/sqlalchemy/event/base.py13
-rw-r--r--lib/sqlalchemy/event/registry.py19
-rw-r--r--lib/sqlalchemy/pool/base.py2
-rw-r--r--lib/sqlalchemy/pool/events.py6
-rw-r--r--lib/sqlalchemy/pool/impl.py1
-rw-r--r--lib/sqlalchemy/util/_concurrency_py3k.py11
-rw-r--r--lib/sqlalchemy/util/concurrency.py4
8 files changed, 51 insertions, 9 deletions
diff --git a/lib/sqlalchemy/event/attr.py b/lib/sqlalchemy/event/attr.py
index 87c6e980f..abb264f98 100644
--- a/lib/sqlalchemy/event/attr.py
+++ b/lib/sqlalchemy/event/attr.py
@@ -41,6 +41,7 @@ from . import registry
from .. import exc
from .. import util
from ..util import threading
+from ..util.concurrency import AsyncAdaptedLock
class RefCollection(util.MemoizedSlots):
@@ -277,6 +278,9 @@ class _EmptyListener(_InstanceLevelDispatch):
class _CompoundListener(_InstanceLevelDispatch):
__slots__ = "_exec_once_mutex", "_exec_once"
+ def _set_asyncio(self):
+ self._exec_once_mutex = AsyncAdaptedLock()
+
def _memoized_attr__exec_once_mutex(self):
return threading.Lock()
diff --git a/lib/sqlalchemy/event/base.py b/lib/sqlalchemy/event/base.py
index a87c1fe44..c78080738 100644
--- a/lib/sqlalchemy/event/base.py
+++ b/lib/sqlalchemy/event/base.py
@@ -241,8 +241,17 @@ class Events(util.with_metaclass(_EventMeta, object)):
return target
@classmethod
- def _listen(cls, event_key, propagate=False, insert=False, named=False):
- event_key.base_listen(propagate=propagate, insert=insert, named=named)
+ def _listen(
+ cls,
+ event_key,
+ propagate=False,
+ insert=False,
+ named=False,
+ asyncio=False,
+ ):
+ event_key.base_listen(
+ propagate=propagate, insert=insert, named=named, asyncio=asyncio
+ )
@classmethod
def _remove(cls, event_key):
diff --git a/lib/sqlalchemy/event/registry.py b/lib/sqlalchemy/event/registry.py
index 19b9174b7..144dd45dc 100644
--- a/lib/sqlalchemy/event/registry.py
+++ b/lib/sqlalchemy/event/registry.py
@@ -244,21 +244,26 @@ class _EventKey(object):
return self._key in _key_to_collection
def base_listen(
- self, propagate=False, insert=False, named=False, retval=None
+ self,
+ propagate=False,
+ insert=False,
+ named=False,
+ retval=None,
+ asyncio=False,
):
target, identifier = self.dispatch_target, self.identifier
dispatch_collection = getattr(target.dispatch, identifier)
+ for_modify = dispatch_collection.for_modify(target.dispatch)
+ if asyncio:
+ for_modify._set_asyncio()
+
if insert:
- dispatch_collection.for_modify(target.dispatch).insert(
- self, propagate
- )
+ for_modify.insert(self, propagate)
else:
- dispatch_collection.for_modify(target.dispatch).append(
- self, propagate
- )
+ for_modify.append(self, propagate)
@property
def _listen_fn(self):
diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py
index f20b63cf5..87383fef7 100644
--- a/lib/sqlalchemy/pool/base.py
+++ b/lib/sqlalchemy/pool/base.py
@@ -59,6 +59,8 @@ class Pool(log.Identified):
_dialect = _ConnDialect()
+ _is_asyncio = False
+
def __init__(
self,
creator,
diff --git a/lib/sqlalchemy/pool/events.py b/lib/sqlalchemy/pool/events.py
index 3954f907f..9443877a9 100644
--- a/lib/sqlalchemy/pool/events.py
+++ b/lib/sqlalchemy/pool/events.py
@@ -54,6 +54,12 @@ class PoolEvents(event.Events):
else:
return target
+ @classmethod
+ def _listen(cls, event_key, **kw):
+ target = event_key.dispatch_target
+
+ event_key.base_listen(asyncio=target._is_asyncio)
+
def connect(self, dbapi_connection, connection_record):
"""Called at the moment a particular DBAPI connection is first
created for a given :class:`_pool.Pool`.
diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py
index e1a9f00db..ffdd63671 100644
--- a/lib/sqlalchemy/pool/impl.py
+++ b/lib/sqlalchemy/pool/impl.py
@@ -218,6 +218,7 @@ class QueuePool(Pool):
class AsyncAdaptedQueuePool(QueuePool):
+ _is_asyncio = True
_queue_class = sqla_queue.AsyncAdaptedQueue
diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py
index 3b112ff7d..82125b771 100644
--- a/lib/sqlalchemy/util/_concurrency_py3k.py
+++ b/lib/sqlalchemy/util/_concurrency_py3k.py
@@ -96,6 +96,17 @@ try:
del context.driver
return result
+ class AsyncAdaptedLock:
+ def __init__(self):
+ self.mutex = asyncio.Lock()
+
+ def __enter__(self):
+ await_fallback(self.mutex.acquire())
+ return self
+
+ def __exit__(self, *arg, **kw):
+ self.mutex.release()
+
except ImportError: # pragma: no cover
greenlet = None
diff --git a/lib/sqlalchemy/util/concurrency.py b/lib/sqlalchemy/util/concurrency.py
index 4c4ea20d1..e0883aa68 100644
--- a/lib/sqlalchemy/util/concurrency.py
+++ b/lib/sqlalchemy/util/concurrency.py
@@ -7,6 +7,7 @@ if compat.py3k:
from ._concurrency_py3k import await_fallback
from ._concurrency_py3k import greenlet
from ._concurrency_py3k import greenlet_spawn
+ from ._concurrency_py3k import AsyncAdaptedLock
else:
asyncio = None
greenlet = None
@@ -19,3 +20,6 @@ else:
def greenlet_spawn(fn, *args, **kw):
raise ValueError("Cannot use this function in py2.")
+
+ def AsyncAdaptedLock(*args, **kw):
+ raise ValueError("Cannot use this function in py2.")