diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-06-22 12:24:08 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-06-22 12:24:08 -0400 |
commit | 5f0a7bb152b30dd7b05771725a7ffe16e3af8f8a (patch) | |
tree | bc17038b5eb1a82ce41accbba56d9228d540858f /lib/sqlalchemy/util/queue.py | |
parent | 51a3a9ac8a76096a6a25eb2cc7404970561d5123 (diff) | |
download | sqlalchemy-5f0a7bb152b30dd7b05771725a7ffe16e3af8f8a.tar.gz |
- [bug] Fixed bug whereby
a disconnect detect + dispose that occurs
when the QueuePool has threads waiting
for connections would leave those
threads waiting for the duration of
the timeout on the old pool. The fix
now notifies those waiters with a special
exception case and has them move onto
the new pool. This fix may or may
not be ported to 0.7. [ticket:2522]
Diffstat (limited to 'lib/sqlalchemy/util/queue.py')
-rw-r--r-- | lib/sqlalchemy/util/queue.py | 40 |
1 files changed, 37 insertions, 3 deletions
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index e71ceb458..ebf736331 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -5,19 +5,28 @@ # the MIT License: http://www.opensource.org/licenses/mit-license.php """An adaptation of Py2.3/2.4's Queue module which supports reentrant -behavior, using RLock instead of Lock for its mutex object. +behavior, using RLock instead of Lock for its mutex object. The +Queue object is used exclusively by the sqlalchemy.pool.QueuePool +class. This is to support the connection pool's usage of weakref callbacks to return connections to the underlying Queue, which can in extremely rare cases be invoked within the ``get()`` method of the Queue itself, producing a ``put()`` inside the ``get()`` and therefore a reentrant -condition.""" +condition. + +An additional change includes a special "abort" method which can be used +to immediately raise a special exception for threads that are blocking +on get(). This is to accommodate a rare race condition that can occur +within QueuePool. + +""" from collections import deque from time import time as _time from sqlalchemy.util import threading -__all__ = ['Empty', 'Full', 'Queue'] +__all__ = ['Empty', 'Full', 'Queue', 'SAAbort'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." @@ -29,6 +38,11 @@ class Full(Exception): pass +class SAAbort(Exception): + "Special SQLA exception to abort waiting" + def __init__(self, context): + self.context = context + class Queue: def __init__(self, maxsize=0): """Initialize a queue object with a given maximum size. @@ -49,6 +63,9 @@ class Queue: # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) + # when this is set, SAAbort is raised within get(). + self._sqla_abort_context = False + def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -138,6 +155,8 @@ class Queue: elif timeout is None: while self._empty(): self.not_empty.wait() + if self._sqla_abort_context: + raise SAAbort(self._sqla_abort_context) else: if timeout < 0: raise ValueError("'timeout' must be a positive number") @@ -147,12 +166,27 @@ class Queue: if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) + if self._sqla_abort_context: + raise SAAbort(self._sqla_abort_context) item = self._get() self.not_full.notify() return item finally: self.not_empty.release() + def abort(self, context): + """Issue an 'abort', will force any thread waiting on get() + to stop waiting and raise SAAbort. + + """ + self._sqla_abort_context = context + if not self.not_full.acquire(False): + return + try: + self.not_empty.notify() + finally: + self.not_full.release() + def get_nowait(self): """Remove and return an item from the queue without blocking. |