diff options
Diffstat (limited to 'lib/sqlalchemy/util/queue.py')
-rw-r--r-- | lib/sqlalchemy/util/queue.py | 40 |
1 files changed, 37 insertions, 3 deletions
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index e71ceb458..ebf736331 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -5,19 +5,28 @@ # the MIT License: http://www.opensource.org/licenses/mit-license.php """An adaptation of Py2.3/2.4's Queue module which supports reentrant -behavior, using RLock instead of Lock for its mutex object. +behavior, using RLock instead of Lock for its mutex object. The +Queue object is used exclusively by the sqlalchemy.pool.QueuePool +class. This is to support the connection pool's usage of weakref callbacks to return connections to the underlying Queue, which can in extremely rare cases be invoked within the ``get()`` method of the Queue itself, producing a ``put()`` inside the ``get()`` and therefore a reentrant -condition.""" +condition. + +An additional change includes a special "abort" method which can be used +to immediately raise a special exception for threads that are blocking +on get(). This is to accommodate a rare race condition that can occur +within QueuePool. + +""" from collections import deque from time import time as _time from sqlalchemy.util import threading -__all__ = ['Empty', 'Full', 'Queue'] +__all__ = ['Empty', 'Full', 'Queue', 'SAAbort'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." @@ -29,6 +38,11 @@ class Full(Exception): pass +class SAAbort(Exception): + "Special SQLA exception to abort waiting" + def __init__(self, context): + self.context = context + class Queue: def __init__(self, maxsize=0): """Initialize a queue object with a given maximum size. @@ -49,6 +63,9 @@ class Queue: # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) + # when this is set, SAAbort is raised within get(). + self._sqla_abort_context = False + def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -138,6 +155,8 @@ class Queue: elif timeout is None: while self._empty(): self.not_empty.wait() + if self._sqla_abort_context: + raise SAAbort(self._sqla_abort_context) else: if timeout < 0: raise ValueError("'timeout' must be a positive number") @@ -147,12 +166,27 @@ class Queue: if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) + if self._sqla_abort_context: + raise SAAbort(self._sqla_abort_context) item = self._get() self.not_full.notify() return item finally: self.not_empty.release() + def abort(self, context): + """Issue an 'abort', will force any thread waiting on get() + to stop waiting and raise SAAbort. + + """ + self._sqla_abort_context = context + if not self.not_full.acquire(False): + return + try: + self.not_empty.notify() + finally: + self.not_full.release() + def get_nowait(self): """Remove and return an item from the queue without blocking. |