diff options
Diffstat (limited to 'lib/sqlalchemy/util/queue.py')
-rw-r--r-- | lib/sqlalchemy/util/queue.py | 41 |
1 files changed, 2 insertions, 39 deletions
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index 82ff55a5d..c98aa7fda 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -15,11 +15,6 @@ rare cases be invoked within the ``get()`` method of the Queue itself, producing a ``put()`` inside the ``get()`` and therefore a reentrant condition. -An additional change includes a special "abort" method which can be used -to immediately raise a special exception for threads that are blocking -on get(). This is to accommodate a rare race condition that can occur -within QueuePool. - """ from collections import deque @@ -27,7 +22,7 @@ from time import time as _time from .compat import threading -__all__ = ['Empty', 'Full', 'Queue', 'SAAbort'] +__all__ = ['Empty', 'Full', 'Queue'] class Empty(Exception): @@ -42,12 +37,6 @@ class Full(Exception): pass -class SAAbort(Exception): - "Special SQLA exception to abort waiting" - def __init__(self, context): - self.context = context - - class Queue: def __init__(self, maxsize=0): """Initialize a queue object with a given maximum size. @@ -68,8 +57,6 @@ class Queue: # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) - # when this is set, SAAbort is raised within get(). - self._sqla_abort_context = False def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -158,13 +145,7 @@ class Queue: raise Empty elif timeout is None: while self._empty(): - # wait for only half a second, then - # loop around, so that we can see a change in - # _sqla_abort_context in case we missed the notify_all() - # called by abort() - self.not_empty.wait(.5) - if self._sqla_abort_context: - raise SAAbort(self._sqla_abort_context) + self.not_empty.wait() else: if timeout < 0: raise ValueError("'timeout' must be a positive number") @@ -174,30 +155,12 @@ class Queue: if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self._sqla_abort_context: - raise SAAbort(self._sqla_abort_context) item = self._get() self.not_full.notify() return item finally: self.not_empty.release() - def abort(self, context): - """Issue an 'abort', will force any thread waiting on get() - to stop waiting and raise SAAbort. - - """ - self._sqla_abort_context = context - if not self.not_full.acquire(False): - return - try: - # note that this is now optional - # as the waiters in get() both loop around - # to check the _sqla_abort_context flag periodically - self.not_empty.notify_all() - finally: - self.not_full.release() - def get_nowait(self): """Remove and return an item from the queue without blocking. |