summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py118
-rw-r--r--lib/git/async/pool.py141
-rw-r--r--lib/git/async/task.py60
-rw-r--r--lib/git/async/thread.py25
-rw-r--r--lib/git/async/util.py164
5 files changed, 294 insertions, 214 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 2d5ab79c..c05f7383 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -4,7 +4,11 @@ from Queue import (
Full
)
-from util import AsyncQueue
+from util import (
+ AsyncQueue,
+ ReadOnly
+ )
+
from time import time
import sys
@@ -23,12 +27,9 @@ class Channel(object):
def __new__(cls, *args):
if cls is Channel:
- max_items = 0
- if len(args) == 1:
- max_items = args[0]
- if len(args) > 1:
- raise ValueError("Specify not more than the number of items the channel should take")
- wc = WChannel(max_items)
+ if len(args) > 0:
+ raise ValueError("Cannot take any arguments when creating a new channel")
+ wc = WChannel()
rc = RChannel(wc)
return wc, rc
# END constructor mode
@@ -39,11 +40,11 @@ class WChannel(Channel):
"""The write end of a channel"""
__slots__ = ('_closed', '_queue')
- def __init__(self, max_items=0):
+ def __init__(self):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
self._closed = False
- self._queue = AsyncQueue(max_items)
+ self._queue = AsyncQueue()
#{ Interface
@@ -55,15 +56,13 @@ class WChannel(Channel):
channel
:param timeout: timeout in seconds for blocking calls.
:raise IOError: when writing into closed file
- :raise EOFError: when writing into a non-blocking full channel
- :note: may block if the channel has a limited capacity"""
- if self._closed:
- raise IOError("Cannot write to a closed channel")
-
+ :raise EOFError: when writing into a non-blocking full channel"""
+ # let the queue handle the 'closed' attribute, we write much more often
+ # to an open channel than to a closed one, saving a few cycles
try:
self._queue.put(item, block, timeout)
- except Full:
- raise EOFError("Capacity of the channel was exeeded")
+ except ReadOnly:
+ raise IOError("Cannot write to a closed channel")
# END exception handling
def size(self):
@@ -74,7 +73,11 @@ class WChannel(Channel):
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
+ # yes, close it a little too early, better than having anyone put
+ # additional items
+ # print "closing channel", self
self._closed = True
+ self._queue.set_writable(False)
@property
def closed(self):
@@ -101,7 +104,7 @@ class RChannel(Channel):
:param count: given amount of items to read. If < 1, all items will be read
:param block: if True, the call will block until an item is available
:param timeout: if positive and block is True, it will block only for the
- given amount of seconds.
+ given amount of seconds, returning the items it received so far.
:return: single item in a list if count is 1, or a list of count items.
If the channel was empty and count was 1, an empty list will be returned.
If count was greater 1, a list with less than count items will be
@@ -109,6 +112,7 @@ class RChannel(Channel):
If count was < 1, a list with all items that could be read will be
returned."""
# if the channel is closed for writing, we never block
+ # NOTE: is handled by the queue
if self._wc.closed or timeout == 0:
block = False
@@ -134,59 +138,47 @@ class RChannel(Channel):
pass
# END handle exceptions
else:
- # if we have really bad timing, the source of the channel
- # marks itself closed, but before setting it, the thread
- # switches to us. We read it, read False, and try to fetch
- # something, and never return. The whole closed channel thing
- # is not atomic ( of course )
- # This is why we never block for long, to get a chance to recheck
- # for closed channels.
- # We blend this into the timeout of the user
- ourtimeout = 0.25 # the smaller, the more responsive, but the slower
- wc = self._wc
- timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it
- assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe
- if timeout and ourtimeout > timeout:
- ourtimeout = timeout
- # END setup timeout
-
# to get everything into one loop, we set the count accordingly
if count == 0:
count = sys.maxint
# END handle count
+ endtime = sys.maxint # allows timeout for whole operation
+ if timeout is not None:
+ endtime = time() + timeout
+ # could be improved by a separate: no-endtime branch, saving the time calls
for i in xrange(count):
- have_timeout = False
- st = time()
- while True:
- try:
- if wc.closed:
- have_timeout = True
- # its about the 'in the meanwhile' :) - get everything
- # we can in non-blocking mode. This will raise
- try:
- while True:
- out.append(queue.get(False))
- # END until it raises Empty
- except Empty:
- break
- # END finally, out of here
- # END don't continue on closed channels
-
- # END abort reading if it was closed ( in the meanwhile )
- out.append(queue.get(block, ourtimeout))
- break # breakout right away
- except Empty:
- if timeout - (time() - st) <= 0:
- # hitting timeout
- have_timeout = True
- break
- # END abort if the user wants no more time spent here
- # END handle timeout
- # END endless timer loop
- if have_timeout:
+ try:
+ out.append(queue.get(block, timeout))
+ except Empty:
+ # here we are only if there is nothing on the queue,
+ # and if we are blocking. If we are not blocking, this
+ # indiccates that the queue was set unwritable in the meanwhile.
+ # hence we can abort now to prevent reading (possibly) forever
+ # Besides, this is racy as all threads will rip on the channel
+ # without waiting until its empty
+ if not block:
+ break
+ # END ignore empty
+
+ # if we have been unblocked because the closed state changed
+ # in the meanwhile, stop trying
+ # NOTE: must NOT cache _wc
+ if self._wc.closed:
+ # If we were closed, we drop out even if there might still
+ # be items. Now its time to get these items, according to
+ # our count. Just switch to unblocking mode.
+ # If we are to read unlimited items, this would run forever,
+ # but the EmptyException handler takes care of this
+ block = False
+
+ # we don't continue, but let the timer decide whether
+ # it wants to abort
+ # END handle channel cloased
+
+ if time() >= endtime:
break
- # END stop on timeout
+ # END stop operation on timeout
# END for each item
# END handle blocking
return out
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index d6b5711d..1767c61c 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,5 +1,8 @@
"""Implementation of a thread-pool working with channels"""
-from thread import WorkerThread
+from thread import (
+ WorkerThread,
+ StopProcessing,
+ )
from threading import Lock
from util import (
@@ -41,8 +44,18 @@ class RPoolChannel(RChannel):
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
- del(self._wc) # decrement ref-count
- self._pool._del_task_if_orphaned(self._task)
+ del(self._wc) # decrement ref-count early
+ # now, if this is the last reader to the wc we just handled, there
+ # is no way anyone will ever read from the task again. If so,
+ # delete the task in question, it will take care of itself and orphans
+ # it might leave
+ # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which
+ # I can't explain, but appears to be normal in the destructor
+ # On the caller side, getrefcount returns 2, as expected
+ if sys.getrefcount(self) < 6:
+ print "__del__"
+ self._pool.del_task(self._task)
+ print "done"
def set_pre_cb(self, fun = lambda count: None):
"""Install a callback to call with the item count to be read before any
@@ -78,14 +91,17 @@ class RPoolChannel(RChannel):
# have an item, but its currently being produced by some worker.
# This is why we:
# * make no assumptions if there are multiple consumers
- # *
- have_enough = False
- if count > 0:
- have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ # *
+
+ # if the user tries to use us to read from a done task, we will never
+ # compute as all produced items are already in the channel
+ skip_compute = self._task.is_done() or self._task.error()
+ #if count > 0:
+ # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
# END
########## prepare ##############################
- if not have_enough:
+ if not skip_compute:
self._pool._prepare_channel_read(self._task, count)
# END prepare pool scheduling
@@ -102,7 +118,7 @@ class RPoolChannel(RChannel):
####### Finalize ########
self._pool._post_channel_read(self._task)
-
+
return items
#{ Internal
@@ -134,8 +150,7 @@ class Pool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_consumed_tasks', # a queue with tasks that are done or had an error
- '_workers', # list of worker threads
+ '_num_workers', # list of workers
'_queue', # master queue for tasks
'_taskorder_cache', # map task id -> ordered dependent tasks
'_taskgraph_lock', # lock for accessing the task graph
@@ -157,8 +172,7 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = None
- self._workers = list()
+ self._num_workers = 0
self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
@@ -224,8 +238,11 @@ class Pool(object):
# requested one last
for task in dfirst_tasks:
if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- continue
+ # in theory, the should never be consumed task in the pool, right ?
+ # They delete themselves once they are done.
+ # TODO: remove this check for performance later
+ raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
+ #continue
# END skip processing
# if the task does not have the required output on its queue, schedule
@@ -256,7 +273,7 @@ class Pool(object):
# into the loop would be less code, but ... slower
# DEBUG
# print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
- if self._workers:
+ if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
queue = self._queue
@@ -297,28 +314,11 @@ class Pool(object):
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
- # check whether we consumed the task, and schedule it for deletion
- # This could have happend after the read returned ( even though the pre-read
- # checks it as well )
- if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- # END handle consumption
-
- self._handle_consumed_tasks()
-
- def _handle_consumed_tasks(self):
- """Remove all consumed tasks from our queue by deleting them"""
- try:
- while True:
- ct = self._consumed_tasks.get(False)
- self.del_task(ct)
- # END for each task to delete
- except Empty:
- pass
- # END pop queue empty
-
+ pass
+
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
+ # 1 as its stored on the task, 1 for the getrefcount call
if sys.getrefcount(task._out_wc) < 3:
self.del_task(task)
#} END internal
@@ -326,7 +326,7 @@ class Pool(object):
#{ Interface
def size(self):
""":return: amount of workers in the pool"""
- return len(self._workers)
+ return self._num_workers
def set_size(self, size=0):
"""Set the amount of workers to use in this pool. When reducing the size,
@@ -344,48 +344,51 @@ class Pool(object):
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
- cur_count = len(self._workers)
+ cur_count = self._num_workers
if cur_count < size:
- # make sure we have a real queue, and can store our consumed tasks properly
- if not isinstance(self._consumed_tasks, self.TaskQueueCls):
- self._consumed_tasks = Queue()
- # END init queue
-
- for i in range(size - cur_count):
- worker = self.WorkerCls(self._queue)
- worker.start()
- self._workers.append(worker)
- # END for each new worker to create
- elif cur_count > size:
# we can safely increase the size, even from serial mode, as we would
# only be able to do this if the serial ( sync ) mode finished processing.
# Just adding more workers is not a problem at all.
+ add_count = size - cur_count
+ for i in range(add_count):
+ print "Add worker"
+ self.WorkerCls(self._queue).start()
+ # END for each new worker to create
+ self._num_workers += add_count
+ elif cur_count > size:
+ # We don't care which thread exactly gets hit by our stop request
+ # On their way, they will consume remaining tasks, but new ones
+ # could be added as we speak.
del_count = cur_count - size
for i in range(del_count):
- self._workers[i].stop_and_join()
+ print "stop worker"
+ self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
# END for each thread to stop
- del(self._workers[:del_count])
+ self._num_workers -= del_count
# END handle count
if size == 0:
- while not self._queue.empty():
- try:
- taskmethod, count = self._queue.get(False)
- taskmethod(count)
- except Queue.Empty:
- continue
- # END while there are tasks on the queue
-
- if self._consumed_tasks and not self._consumed_tasks.empty():
- self._handle_consumed_tasks()
- # END assure consumed tasks are empty
- self._consumed_tasks = SyncQueue()
+ # NOTE: we do not preocess any tasks still on the queue, as we ill
+ # naturally do that once we read the next time, only on the tasks
+ # that are actually required. The queue will keep the tasks,
+ # and once we are deleted, they will vanish without additional
+ # time spend on them. If there shouldn't be any consumers anyway.
+ # If we should reenable some workers again, they will continue on the
+ # remaining tasks, probably with nothing to do.
+ # We can't clear the task queue if we have removed workers
+ # as they will receive the termination signal through it, and if
+ # we had added workers, we wouldn't be here ;).
+ pass
# END process queue
return self
def num_tasks(self):
""":return: amount of tasks"""
- return len(self._tasks.nodes)
+ self._taskgraph_lock.acquire()
+ try:
+ return len(self._tasks.nodes)
+ finally:
+ self._taskgraph_lock.release()
def del_task(self, task):
"""Delete the task
@@ -396,6 +399,7 @@ class Pool(object):
This method blocks until all tasks to be removed have been processed, if
they are currently being processed.
:return: self"""
+ print "del_task: getting lock"
self._taskgraph_lock.acquire()
try:
# it can be that the task is already deleted, but its chunk was on the
@@ -403,7 +407,7 @@ class Pool(object):
if not task in self._tasks.nodes:
return self
# END early abort
-
+ print "deleting ", id(task)
# the task we are currently deleting could also be processed by
# a thread right now. We don't care about it as its taking care about
# its write channel itself, and sends everything it can to it.
@@ -437,11 +441,6 @@ class Pool(object):
rc = RPoolChannel(wc, task, self)
task.set_wc(wc)
- has_input_channel = isinstance(task, InputChannelTask)
- if has_input_channel:
- task.set_pool(self)
- # END init input channel task
-
self._taskgraph_lock.acquire()
try:
self._taskorder_cache.clear()
@@ -451,7 +450,7 @@ class Pool(object):
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if has_input_channel:
+ if isinstance(task, InputChannelTask):
ic = task.in_rc
if isinstance(ic, RPoolChannel) and ic._pool is self:
self._taskgraph_lock.acquire()
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index cf486f48..f9536a45 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,10 +1,11 @@
from graph import Node
import threading
-import weakref
import sys
import new
+getrefcount = sys.getrefcount
+
class OutputChannelTask(Node):
"""Abstracts a named task as part of a set of interdependent tasks, which contains
additional information on how the task should be queued and processed.
@@ -53,7 +54,7 @@ class OutputChannelTask(Node):
def set_wc(self, wc):
"""Set the write channel to the given one
:note: resets it done state in order to allow proper queue handling"""
- self._done = False
+ self._done = False # TODO : fix this, this is a side-effect
self._scheduled_items = 0
self._out_wc = wc
@@ -84,10 +85,11 @@ class OutputChannelTask(Node):
finally:
self._slock.release()
# END threadsafe return
-
+
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
+ # print "task read", len(items)
try:
# increase the ref-count - we use this to determine whether anyone else
# is currently handling our output channel. As this method runs asynchronously,
@@ -117,6 +119,7 @@ class OutputChannelTask(Node):
# END handle single apply
except Exception, e:
self._exc = e
+ print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging
self.set_done()
# unschedule all, we don't know how many have been produced actually
# but only if we don't apply single please
@@ -140,10 +143,10 @@ class OutputChannelTask(Node):
# If we appear to be the only one left with our output channel, and are
# closed ( this could have been set in another thread as well ), make
# sure to close the output channel.
- # The count is: 1 = wc itself, 2 = first reader channel, and we have only
- # one, 3 is ours + x for every thread having its copy on the stack
+ # The count is: 1 = wc itself, 2 = first reader channel, + x for every
+ # thread having its copy on the stack
# + 1 for the instance we provide to refcount
- if self.is_done() and sys.getrefcount(self._out_wc) < 5:
+ if self.is_done() and getrefcount(self._out_wc) < 4:
self.close()
# END handle channel closure
#{ Configuration
@@ -201,48 +204,15 @@ class InputChannelTask(OutputChannelTask):
"""Uses an input channel as source for reading items
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""
- __slots__ = (
- 'in_rc', # channel to read items from
- '_pool_ref' # to be set by Pool
- )
def __init__(self, in_rc, *args, **kwargs):
OutputChannelTask.__init__(self, *args, **kwargs)
- self._in_rc = in_rc
-
+ self._read = in_rc.read
+
def process(self, count=1):
- """Verify our setup, and do some additional checking, before the
- base implementation can permanently perform all operations"""
- self._read = self._in_rc.read
- # make sure we don't trigger the pool if we read from a pool channel which
- # belongs to our own pool. Channels from different pools are fine though,
- # there we want to trigger its computation
- # PROBLEM: if the user keeps an end, but decides to put the same end into
- # a task of this pool, then all items might deplete without new ones being
- # produced, causing a deadlock. Just triggering the pool would be better,
- # but cost's more, unnecessarily if there is just one consumer, which is
- # the user.
- # * could encode usage in the channel type, and fail if the refcount on
- # the read-pool channel is too high
- # * maybe keep track of the elements that are requested or in-production
- # for each task, which would allow to precisely determine whether
- # the pool as to be triggered, and bail out early. Problem would
- # be the
- # * Perhaps one shouldn't seek the perfect solution , but instead
- # document whats working and what not, or under which conditions.
- # The whole system is simple, but gets more complicated the
- # smarter it wants to be.
- if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():
- self._read = self._in_rc._read
-
- # permanently install our base for processing
- self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self))
-
- # and call it
- return OutputChannelTask.process(self, count)
-
- def set_pool(self, pool):
- """Set our pool to the given one, it will be weakref'd"""
- self._pool_ref = weakref.ref(pool)
+ # for now, just blindly read our input, could trigger a pool, even
+ # ours, but why not ? It should be able to handle this
+ # TODO: remove this method
+ super(InputChannelTask, self).process(count)
#{ Configuration
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 4240a664..556b7e92 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -80,6 +80,10 @@ class TerminatableThread(threading.Thread):
self._terminated()
#} END interface
+
+class StopProcessing(Exception):
+ """If thrown in a function processed by a WorkerThread, it will terminate"""
+
class WorkerThread(TerminatableThread):
""" This base allows to call functions on class instances natively.
@@ -122,20 +126,22 @@ class WorkerThread(TerminatableThread):
self.inq = inq or Queue.Queue()
self._current_routine = None # routine we execute right now
+ @classmethod
+ def stop(cls, *args):
+ """If send via the inq of the thread, it will stop once it processed the function"""
+ raise StopProcessing
+
def run(self):
"""Process input tasks until we receive the quit signal"""
+ gettask = self.inq.get
while True:
self._current_routine = None
if self._should_terminate():
break
# END check for stop request
- # don't wait too long, instead check for the termination request more often
- try:
- tasktuple = self.inq.get(True, 1)
- except Queue.Empty:
- continue
- # END get task with timeout
+ # we wait and block - to terminate, send the 'stop' method
+ tasktuple = gettask()
# needing exactly one function, and one arg
assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
@@ -157,6 +163,8 @@ class WorkerThread(TerminatableThread):
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
break
# END make routine call
+ except StopProcessing:
+ break
except Exception,e:
print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
break # abort ...
@@ -167,5 +175,8 @@ class WorkerThread(TerminatableThread):
""":return: routine we are currently executing, or None if we have no task"""
return self._current_routine
-
+ def stop_and_join(self):
+ """Send stop message to ourselves"""
+ self.inq.put((self.stop, None))
+ super(WorkerThread, self).stop_and_join()
#} END classes
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 55766579..f3213ed6 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -2,6 +2,8 @@
from threading import (
Lock,
+ current_thread,
+ _allocate_lock,
_Condition,
_sleep,
_time,
@@ -57,36 +59,56 @@ class SyncQueue(deque):
"""Adapter to allow using a deque like a queue, without locking"""
def get(self, block=True, timeout=None):
try:
- return self.pop()
+ return self.popleft()
except IndexError:
raise Empty
# END raise empty
-
+
def empty(self):
return len(self) == 0
put = deque.append
-
-class HSCondition(_Condition):
+
+class HSCondition(object):
"""An attempt to make conditions less blocking, which gains performance
in return by sleeping less"""
+ # __slots__ = ("acquire", "release", "_lock", '_waiters')
+ __slots__ = ("_lock", '_waiters')
delay = 0.00002 # reduces wait times, but increases overhead
+ def __init__(self, lock=None):
+ if lock is None:
+ lock = Lock()
+ self._lock = lock
+ #self.acquire = lock.acquire
+ #self.release = lock.release
+ self._waiters = list()
+
+ def release(self):
+ self._lock.release()
+
+ def acquire(self, block=None):
+ if block is None:
+ self._lock.acquire()
+ else:
+ self._lock.acquire(block)
+
def wait(self, timeout=None):
- waiter = Lock()
- waiter.acquire()
- self.__dict__['_Condition__waiters'].append(waiter)
- saved_state = self._release_save()
+ waiter = _allocate_lock()
+ waiter.acquire() # get it the first time, no blocking
+ self._waiters.append(waiter)
+
+ # in the momemnt we release our lock, someone else might actually resume
+ self.release()
try: # restore state no matter what (e.g., KeyboardInterrupt)
+ # now we block, as we hold the lock already
if timeout is None:
waiter.acquire()
else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
+ # Balancing act: We can't afford a pure busy loop, because of the
+ # GIL, so we have to sleep
+ # We try to sleep only tiny amounts of time though to be very responsive
endtime = _time() + timeout
delay = self.delay
acquire = waiter.acquire
@@ -104,41 +126,127 @@ class HSCondition(_Condition):
# END endless loop
if not gotit:
try:
- self.__dict__['_Condition__waiters'].remove(waiter)
+ self._waiters.remove(waiter)
except ValueError:
pass
# END didn't ever get it
finally:
- self._acquire_restore(saved_state)
+ # reacquire the lock
+ self.acquire()
def notify(self, n=1):
- __waiters = self.__dict__['_Condition__waiters']
- if not __waiters:
+ if not self._waiters:
return
+ waiters = self._waiters
if n == 1:
- __waiters[0].release()
+ waiters[0].release()
try:
- __waiters.pop(0)
+ waiters.pop(0)
except IndexError:
pass
else:
- waiters = __waiters[:n]
- for waiter in waiters:
+ print "notify", waiters, n
+ for waiter in waiters[:n]:
waiter.release()
try:
- __waiters.remove(waiter)
+ waiters.remove(waiter)
except ValueError:
pass
# END handle n = 1 case faster
+ def notify_all(self):
+ self.notify(len(self._waiters))
+
+
+class ReadOnly(Exception):
+ """Thrown when trying to write to a read-only queue"""
+
class AsyncQueue(Queue):
- """A queue using different condition objects to gain multithreading performance"""
+ """A queue using different condition objects to gain multithreading performance.
+ Additionally it has a threadsafe writable flag, which will alert all readers
+ that there is nothing more to get here."""
+ __slots__ = ('mutex', 'not_empty', 'queue', '_writable')
+
def __init__(self, maxsize=0):
- Queue.__init__(self, maxsize)
-
+ self.queue = deque()
+ self.mutex = Lock()
self.not_empty = HSCondition(self.mutex)
- self.not_full = HSCondition(self.mutex)
- self.all_tasks_done = HSCondition(self.mutex)
+ self._writable = True
-
+ def qsize(self):
+ self.mutex.acquire()
+ try:
+ return len(self.queue)
+ finally:
+ self.mutex.release()
+
+ def writable(self):
+ self.mutex.acquire()
+ try:
+ return self._writable
+ finally:
+ self.mutex.release()
+
+ def set_writable(self, state):
+ """Set the writable flag of this queue to True or False
+ :return: The previous state"""
+ self.mutex.acquire()
+ try:
+ old = self._writable
+ self._writable = state
+ return old
+ finally:
+ self.mutex.release()
+
+ # if we won't receive anymore items, inform the getters
+ if not state:
+ self.not_empty.notify_all()
+ # END tell everyone
+ # END handle locking
+
+ def empty(self):
+ self.mutex.acquire()
+ try:
+ return not len(self.queue)
+ finally:
+ self.mutex.release()
+
+ def put(self, item, block=True, timeout=None):
+ self.mutex.acquire()
+ if not self._writable:
+ self.mutex.release()
+ raise ReadOnly
+ # END handle read-only
+ self.queue.append(item)
+ self.mutex.release()
+ self.not_empty.notify()
+
+ def get(self, block=True, timeout=None):
+ self.not_empty.acquire() # == self.mutex.acquire in that case
+ q = self.queue
+ try:
+ if block:
+ if timeout is None:
+ while not len(q) and self._writable:
+ self.not_empty.wait()
+ else:
+ endtime = _time() + timeout
+ while not len(q) and self._writable:
+ remaining = endtime - _time()
+ if remaining <= 0.0:
+ raise Empty
+ self.not_empty.wait(remaining)
+ # END handle timeout mode
+ # END handle block
+
+ # can throw if we woke up because we are not writable anymore
+ try:
+ return q.popleft()
+ except IndexError:
+ raise Empty
+ # END handle unblocking reason
+ finally:
+ self.not_empty.release()
+
+
#} END utilities