summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py141
1 files changed, 70 insertions, 71 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index d6b5711d..1767c61c 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,5 +1,8 @@
"""Implementation of a thread-pool working with channels"""
-from thread import WorkerThread
+from thread import (
+ WorkerThread,
+ StopProcessing,
+ )
from threading import Lock
from util import (
@@ -41,8 +44,18 @@ class RPoolChannel(RChannel):
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
- del(self._wc) # decrement ref-count
- self._pool._del_task_if_orphaned(self._task)
+ del(self._wc) # decrement ref-count early
+ # now, if this is the last reader to the wc we just handled, there
+ # is no way anyone will ever read from the task again. If so,
+ # delete the task in question, it will take care of itself and orphans
+ # it might leave
+ # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which
+ # I can't explain, but appears to be normal in the destructor
+ # On the caller side, getrefcount returns 2, as expected
+ if sys.getrefcount(self) < 6:
+ print "__del__"
+ self._pool.del_task(self._task)
+ print "done"
def set_pre_cb(self, fun = lambda count: None):
"""Install a callback to call with the item count to be read before any
@@ -78,14 +91,17 @@ class RPoolChannel(RChannel):
# have an item, but its currently being produced by some worker.
# This is why we:
# * make no assumptions if there are multiple consumers
- # *
- have_enough = False
- if count > 0:
- have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ # *
+
+ # if the user tries to use us to read from a done task, we will never
+ # compute as all produced items are already in the channel
+ skip_compute = self._task.is_done() or self._task.error()
+ #if count > 0:
+ # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
# END
########## prepare ##############################
- if not have_enough:
+ if not skip_compute:
self._pool._prepare_channel_read(self._task, count)
# END prepare pool scheduling
@@ -102,7 +118,7 @@ class RPoolChannel(RChannel):
####### Finalize ########
self._pool._post_channel_read(self._task)
-
+
return items
#{ Internal
@@ -134,8 +150,7 @@ class Pool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_consumed_tasks', # a queue with tasks that are done or had an error
- '_workers', # list of worker threads
+ '_num_workers', # list of workers
'_queue', # master queue for tasks
'_taskorder_cache', # map task id -> ordered dependent tasks
'_taskgraph_lock', # lock for accessing the task graph
@@ -157,8 +172,7 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = None
- self._workers = list()
+ self._num_workers = 0
self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
@@ -224,8 +238,11 @@ class Pool(object):
# requested one last
for task in dfirst_tasks:
if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- continue
+ # in theory, the should never be consumed task in the pool, right ?
+ # They delete themselves once they are done.
+ # TODO: remove this check for performance later
+ raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
+ #continue
# END skip processing
# if the task does not have the required output on its queue, schedule
@@ -256,7 +273,7 @@ class Pool(object):
# into the loop would be less code, but ... slower
# DEBUG
# print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
- if self._workers:
+ if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
queue = self._queue
@@ -297,28 +314,11 @@ class Pool(object):
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
- # check whether we consumed the task, and schedule it for deletion
- # This could have happend after the read returned ( even though the pre-read
- # checks it as well )
- if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- # END handle consumption
-
- self._handle_consumed_tasks()
-
- def _handle_consumed_tasks(self):
- """Remove all consumed tasks from our queue by deleting them"""
- try:
- while True:
- ct = self._consumed_tasks.get(False)
- self.del_task(ct)
- # END for each task to delete
- except Empty:
- pass
- # END pop queue empty
-
+ pass
+
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
+ # 1 as its stored on the task, 1 for the getrefcount call
if sys.getrefcount(task._out_wc) < 3:
self.del_task(task)
#} END internal
@@ -326,7 +326,7 @@ class Pool(object):
#{ Interface
def size(self):
""":return: amount of workers in the pool"""
- return len(self._workers)
+ return self._num_workers
def set_size(self, size=0):
"""Set the amount of workers to use in this pool. When reducing the size,
@@ -344,48 +344,51 @@ class Pool(object):
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
- cur_count = len(self._workers)
+ cur_count = self._num_workers
if cur_count < size:
- # make sure we have a real queue, and can store our consumed tasks properly
- if not isinstance(self._consumed_tasks, self.TaskQueueCls):
- self._consumed_tasks = Queue()
- # END init queue
-
- for i in range(size - cur_count):
- worker = self.WorkerCls(self._queue)
- worker.start()
- self._workers.append(worker)
- # END for each new worker to create
- elif cur_count > size:
# we can safely increase the size, even from serial mode, as we would
# only be able to do this if the serial ( sync ) mode finished processing.
# Just adding more workers is not a problem at all.
+ add_count = size - cur_count
+ for i in range(add_count):
+ print "Add worker"
+ self.WorkerCls(self._queue).start()
+ # END for each new worker to create
+ self._num_workers += add_count
+ elif cur_count > size:
+ # We don't care which thread exactly gets hit by our stop request
+ # On their way, they will consume remaining tasks, but new ones
+ # could be added as we speak.
del_count = cur_count - size
for i in range(del_count):
- self._workers[i].stop_and_join()
+ print "stop worker"
+ self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
# END for each thread to stop
- del(self._workers[:del_count])
+ self._num_workers -= del_count
# END handle count
if size == 0:
- while not self._queue.empty():
- try:
- taskmethod, count = self._queue.get(False)
- taskmethod(count)
- except Queue.Empty:
- continue
- # END while there are tasks on the queue
-
- if self._consumed_tasks and not self._consumed_tasks.empty():
- self._handle_consumed_tasks()
- # END assure consumed tasks are empty
- self._consumed_tasks = SyncQueue()
+ # NOTE: we do not preocess any tasks still on the queue, as we ill
+ # naturally do that once we read the next time, only on the tasks
+ # that are actually required. The queue will keep the tasks,
+ # and once we are deleted, they will vanish without additional
+ # time spend on them. If there shouldn't be any consumers anyway.
+ # If we should reenable some workers again, they will continue on the
+ # remaining tasks, probably with nothing to do.
+ # We can't clear the task queue if we have removed workers
+ # as they will receive the termination signal through it, and if
+ # we had added workers, we wouldn't be here ;).
+ pass
# END process queue
return self
def num_tasks(self):
""":return: amount of tasks"""
- return len(self._tasks.nodes)
+ self._taskgraph_lock.acquire()
+ try:
+ return len(self._tasks.nodes)
+ finally:
+ self._taskgraph_lock.release()
def del_task(self, task):
"""Delete the task
@@ -396,6 +399,7 @@ class Pool(object):
This method blocks until all tasks to be removed have been processed, if
they are currently being processed.
:return: self"""
+ print "del_task: getting lock"
self._taskgraph_lock.acquire()
try:
# it can be that the task is already deleted, but its chunk was on the
@@ -403,7 +407,7 @@ class Pool(object):
if not task in self._tasks.nodes:
return self
# END early abort
-
+ print "deleting ", id(task)
# the task we are currently deleting could also be processed by
# a thread right now. We don't care about it as its taking care about
# its write channel itself, and sends everything it can to it.
@@ -437,11 +441,6 @@ class Pool(object):
rc = RPoolChannel(wc, task, self)
task.set_wc(wc)
- has_input_channel = isinstance(task, InputChannelTask)
- if has_input_channel:
- task.set_pool(self)
- # END init input channel task
-
self._taskgraph_lock.acquire()
try:
self._taskorder_cache.clear()
@@ -451,7 +450,7 @@ class Pool(object):
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if has_input_channel:
+ if isinstance(task, InputChannelTask):
ic = task.in_rc
if isinstance(ic, RPoolChannel) and ic._pool is self:
self._taskgraph_lock.acquire()