From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/pool.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index d6b5711d..cf1c2199 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,8 +80,8 @@ class RPoolChannel(RChannel): # * make no assumptions if there are multiple consumers # * have_enough = False - if count > 0: - have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + #if count > 0: + # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## @@ -319,6 +319,7 @@ class Pool(object): def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" + # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: self.del_task(task) #} END internal @@ -403,7 +404,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - + print "deleting ", id(task) # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. -- cgit v1.2.1 From 13dd59ba5b3228820841682b59bad6c22476ff66 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 17:25:43 +0200 Subject: task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone --- lib/git/async/pool.py | 57 ++++++++++++++------------------------------------- 1 file changed, 15 insertions(+), 42 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index cf1c2199..fce5e424 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -78,14 +78,17 @@ class RPoolChannel(RChannel): # have an item, but its currently being produced by some worker. # This is why we: # * make no assumptions if there are multiple consumers - # * - have_enough = False + # * + + # if the user tries to use us to read from a done task, we will never + # compute as all produced items are already in the channel + skip_compute = self._task.is_done() or self._task.error() #if count > 0: - # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## - if not have_enough: + if not skip_compute: self._pool._prepare_channel_read(self._task, count) # END prepare pool scheduling @@ -134,7 +137,6 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks @@ -157,7 +159,6 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = None self._workers = list() self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() @@ -224,8 +225,10 @@ class Pool(object): # requested one last for task in dfirst_tasks: if task.error() or task.is_done(): - self._consumed_tasks.put(task) - continue + # in theory, the should never be consumed task in the pool, right ? + # They delete themselves once they are done. + raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") + #continue # END skip processing # if the task does not have the required output on its queue, schedule @@ -297,26 +300,8 @@ class Pool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" - # check whether we consumed the task, and schedule it for deletion - # This could have happend after the read returned ( even though the pre-read - # checks it as well ) - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - # END handle consumption - - self._handle_consumed_tasks() - - def _handle_consumed_tasks(self): - """Remove all consumed tasks from our queue by deleting them""" - try: - while True: - ct = self._consumed_tasks.get(False) - self.del_task(ct) - # END for each task to delete - except Empty: - pass - # END pop queue empty - + pass + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call @@ -347,11 +332,6 @@ class Pool(object): # ourselves cur_count = len(self._workers) if cur_count < size: - # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._consumed_tasks, self.TaskQueueCls): - self._consumed_tasks = Queue() - # END init queue - for i in range(size - cur_count): worker = self.WorkerCls(self._queue) worker.start() @@ -377,9 +357,6 @@ class Pool(object): continue # END while there are tasks on the queue - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._handle_consumed_tasks() - # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue return self @@ -437,11 +414,7 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - - has_input_channel = isinstance(task, InputChannelTask) - if has_input_channel: - task.set_pool(self) - # END init input channel task + task.set_pool(self) self._taskgraph_lock.acquire() try: @@ -452,7 +425,7 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if has_input_channel: + if isinstance(task, InputChannelTask): ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: self._taskgraph_lock.acquire() -- cgit v1.2.1 From e5c0002d069382db1768349bf0c5ff40aafbf140 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 18:20:12 +0200 Subject: Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user --- lib/git/async/pool.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index fce5e424..a915f7b0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -41,8 +41,18 @@ class RPoolChannel(RChannel): def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count - self._pool._del_task_if_orphaned(self._task) + del(self._wc) # decrement ref-count early + # now, if this is the last reader to the wc we just handled, there + # is no way anyone will ever read from the task again. If so, + # delete the task in question, it will take care of itself and orphans + # it might leave + # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which + # I can't explain, but appears to be normal in the destructor + # On the caller side, getrefcount returns 2, as expected + if sys.getrefcount(self) < 6: + print "__del__" + self._pool.del_task(self._task) + print "done" def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -105,7 +115,7 @@ class RPoolChannel(RChannel): ####### Finalize ######## self._pool._post_channel_read(self._task) - + return items #{ Internal @@ -227,6 +237,7 @@ class Pool(object): if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? # They delete themselves once they are done. + # TODO: remove this check for performance later raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") #continue # END skip processing @@ -363,7 +374,11 @@ class Pool(object): def num_tasks(self): """:return: amount of tasks""" - return len(self._tasks.nodes) + self._taskgraph_lock.acquire() + try: + return len(self._tasks.nodes) + finally: + self._taskgraph_lock.release() def del_task(self, task): """Delete the task @@ -374,6 +389,7 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" + print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -414,7 +430,6 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - task.set_pool(self) self._taskgraph_lock.acquire() try: -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/pool.py | 56 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 23 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index a915f7b0..1767c61c 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,5 +1,8 @@ """Implementation of a thread-pool working with channels""" -from thread import WorkerThread +from thread import ( + WorkerThread, + StopProcessing, + ) from threading import Lock from util import ( @@ -147,7 +150,7 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_workers', # list of worker threads + '_num_workers', # list of workers '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph @@ -169,7 +172,7 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._workers = list() + self._num_workers = 0 self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() @@ -270,7 +273,7 @@ class Pool(object): # into the loop would be less code, but ... slower # DEBUG # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: + if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue @@ -323,7 +326,7 @@ class Pool(object): #{ Interface def size(self): """:return: amount of workers in the pool""" - return len(self._workers) + return self._num_workers def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, @@ -341,34 +344,41 @@ class Pool(object): # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves - cur_count = len(self._workers) + cur_count = self._num_workers if cur_count < size: - for i in range(size - cur_count): - worker = self.WorkerCls(self._queue) - worker.start() - self._workers.append(worker) - # END for each new worker to create - elif cur_count > size: # we can safely increase the size, even from serial mode, as we would # only be able to do this if the serial ( sync ) mode finished processing. # Just adding more workers is not a problem at all. + add_count = size - cur_count + for i in range(add_count): + print "Add worker" + self.WorkerCls(self._queue).start() + # END for each new worker to create + self._num_workers += add_count + elif cur_count > size: + # We don't care which thread exactly gets hit by our stop request + # On their way, they will consume remaining tasks, but new ones + # could be added as we speak. del_count = cur_count - size for i in range(del_count): - self._workers[i].stop_and_join() + print "stop worker" + self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop - del(self._workers[:del_count]) + self._num_workers -= del_count # END handle count if size == 0: - while not self._queue.empty(): - try: - taskmethod, count = self._queue.get(False) - taskmethod(count) - except Queue.Empty: - continue - # END while there are tasks on the queue - - self._consumed_tasks = SyncQueue() + # NOTE: we do not preocess any tasks still on the queue, as we ill + # naturally do that once we read the next time, only on the tasks + # that are actually required. The queue will keep the tasks, + # and once we are deleted, they will vanish without additional + # time spend on them. If there shouldn't be any consumers anyway. + # If we should reenable some workers again, they will continue on the + # remaining tasks, probably with nothing to do. + # We can't clear the task queue if we have removed workers + # as they will receive the termination signal through it, and if + # we had added workers, we wouldn't be here ;). + pass # END process queue return self -- cgit v1.2.1