diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 141 |
1 files changed, 70 insertions, 71 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index d6b5711d..1767c61c 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,5 +1,8 @@ """Implementation of a thread-pool working with channels""" -from thread import WorkerThread +from thread import ( + WorkerThread, + StopProcessing, + ) from threading import Lock from util import ( @@ -41,8 +44,18 @@ class RPoolChannel(RChannel): def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count - self._pool._del_task_if_orphaned(self._task) + del(self._wc) # decrement ref-count early + # now, if this is the last reader to the wc we just handled, there + # is no way anyone will ever read from the task again. If so, + # delete the task in question, it will take care of itself and orphans + # it might leave + # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which + # I can't explain, but appears to be normal in the destructor + # On the caller side, getrefcount returns 2, as expected + if sys.getrefcount(self) < 6: + print "__del__" + self._pool.del_task(self._task) + print "done" def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -78,14 +91,17 @@ class RPoolChannel(RChannel): # have an item, but its currently being produced by some worker. # This is why we: # * make no assumptions if there are multiple consumers - # * - have_enough = False - if count > 0: - have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # * + + # if the user tries to use us to read from a done task, we will never + # compute as all produced items are already in the channel + skip_compute = self._task.is_done() or self._task.error() + #if count > 0: + # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## - if not have_enough: + if not skip_compute: self._pool._prepare_channel_read(self._task, count) # END prepare pool scheduling @@ -102,7 +118,7 @@ class RPoolChannel(RChannel): ####### Finalize ######## self._pool._post_channel_read(self._task) - + return items #{ Internal @@ -134,8 +150,7 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a queue with tasks that are done or had an error - '_workers', # list of worker threads + '_num_workers', # list of workers '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph @@ -157,8 +172,7 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = None - self._workers = list() + self._num_workers = 0 self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() @@ -224,8 +238,11 @@ class Pool(object): # requested one last for task in dfirst_tasks: if task.error() or task.is_done(): - self._consumed_tasks.put(task) - continue + # in theory, the should never be consumed task in the pool, right ? + # They delete themselves once they are done. + # TODO: remove this check for performance later + raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") + #continue # END skip processing # if the task does not have the required output on its queue, schedule @@ -256,7 +273,7 @@ class Pool(object): # into the loop would be less code, but ... slower # DEBUG # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: + if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue @@ -297,28 +314,11 @@ class Pool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" - # check whether we consumed the task, and schedule it for deletion - # This could have happend after the read returned ( even though the pre-read - # checks it as well ) - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - # END handle consumption - - self._handle_consumed_tasks() - - def _handle_consumed_tasks(self): - """Remove all consumed tasks from our queue by deleting them""" - try: - while True: - ct = self._consumed_tasks.get(False) - self.del_task(ct) - # END for each task to delete - except Empty: - pass - # END pop queue empty - + pass + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" + # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: self.del_task(task) #} END internal @@ -326,7 +326,7 @@ class Pool(object): #{ Interface def size(self): """:return: amount of workers in the pool""" - return len(self._workers) + return self._num_workers def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, @@ -344,48 +344,51 @@ class Pool(object): # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves - cur_count = len(self._workers) + cur_count = self._num_workers if cur_count < size: - # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._consumed_tasks, self.TaskQueueCls): - self._consumed_tasks = Queue() - # END init queue - - for i in range(size - cur_count): - worker = self.WorkerCls(self._queue) - worker.start() - self._workers.append(worker) - # END for each new worker to create - elif cur_count > size: # we can safely increase the size, even from serial mode, as we would # only be able to do this if the serial ( sync ) mode finished processing. # Just adding more workers is not a problem at all. + add_count = size - cur_count + for i in range(add_count): + print "Add worker" + self.WorkerCls(self._queue).start() + # END for each new worker to create + self._num_workers += add_count + elif cur_count > size: + # We don't care which thread exactly gets hit by our stop request + # On their way, they will consume remaining tasks, but new ones + # could be added as we speak. del_count = cur_count - size for i in range(del_count): - self._workers[i].stop_and_join() + print "stop worker" + self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop - del(self._workers[:del_count]) + self._num_workers -= del_count # END handle count if size == 0: - while not self._queue.empty(): - try: - taskmethod, count = self._queue.get(False) - taskmethod(count) - except Queue.Empty: - continue - # END while there are tasks on the queue - - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._handle_consumed_tasks() - # END assure consumed tasks are empty - self._consumed_tasks = SyncQueue() + # NOTE: we do not preocess any tasks still on the queue, as we ill + # naturally do that once we read the next time, only on the tasks + # that are actually required. The queue will keep the tasks, + # and once we are deleted, they will vanish without additional + # time spend on them. If there shouldn't be any consumers anyway. + # If we should reenable some workers again, they will continue on the + # remaining tasks, probably with nothing to do. + # We can't clear the task queue if we have removed workers + # as they will receive the termination signal through it, and if + # we had added workers, we wouldn't be here ;). + pass # END process queue return self def num_tasks(self): """:return: amount of tasks""" - return len(self._tasks.nodes) + self._taskgraph_lock.acquire() + try: + return len(self._tasks.nodes) + finally: + self._taskgraph_lock.release() def del_task(self, task): """Delete the task @@ -396,6 +399,7 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" + print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -403,7 +407,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - + print "deleting ", id(task) # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. @@ -437,11 +441,6 @@ class Pool(object): rc = RPoolChannel(wc, task, self) task.set_wc(wc) - has_input_channel = isinstance(task, InputChannelTask) - if has_input_channel: - task.set_pool(self) - # END init input channel task - self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() @@ -451,7 +450,7 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if has_input_channel: + if isinstance(task, InputChannelTask): ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: self._taskgraph_lock.acquire() |