diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 12:10:56 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 12:19:44 +0200 |
commit | 619662a9138fd78df02c52cae6dc89db1d70a0e5 (patch) | |
tree | de9ea3a6982285c2194d6afed535ba87b077b2f3 /lib/git/async/pool.py | |
parent | a8a448b7864e21db46184eab0f0a21d7725d074f (diff) | |
download | gitpython-619662a9138fd78df02c52cae6dc89db1d70a0e5.tar.gz |
changed scheduling and chunksize calculation in respect to the task.min_count, to fix theoretical option for a deadlock in serial mode, and unnecessary blocking in async mode
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 213 |
1 files changed, 137 insertions, 76 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 5518e37e..009096f2 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,5 +1,6 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from threading import Lock from task import InputChannelTask from Queue import Queue, Empty @@ -83,7 +84,7 @@ class RPoolChannel(RChannel): #} END internal -class ThreadPool(object): +class Pool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -106,88 +107,35 @@ class ThreadPool(object): '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks + '_taskgraph_lock', # lock for accessing the task graph ) + # CONFIGURATION + # The type of worker to create - its expected to provide the Thread interface, + # taking the taskqueue as only init argument + # as well as a method called stop_and_join() to terminate it + WorkerCls = None + + # The type of lock to use to protect critical sections, providing the + # threading.Lock interface + LockCls = None + + # the type of the task queue to use - it must provide the Queue interface + TaskQueueCls = None + + def __init__(self, size=0): self._tasks = Graph() self._consumed_tasks = Queue() # make sure its threadsafe self._workers = list() - self._queue = Queue() + self._queue = self.TaskQueueCls() + self._taskgraph_lock = self.LockCls() self.set_size(size) def __del__(self): self.set_size(0) #{ Internal - def _queue_feeder_visitor(self, task, count): - """Walk the graph and find tasks that are done for later cleanup, and - queue all others for processing by our worker threads ( if available ).""" - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - return True - # END stop processing - - # if the task does not have the required output on its queue, schedule - # it for processing. If we should process all, we don't care about the - # amount as it should process until its all done. - if count < 1 or task._out_wc.size() < count: - # allow min-count override. This makes sure we take at least min-count - # items off the input queue ( later ) - if task.min_count is not None and 0 < count < task.min_count: - count = task.min_count - # END handle min-count - - numchunks = 1 - chunksize = count - remainder = 0 - - # we need the count set for this - can't chunk up unlimited items - # In serial mode we could do this by checking for empty input channels, - # but in dispatch mode its impossible ( == not easily possible ) - # Only try it if we have enough demand - if task.max_chunksize and count > task.max_chunksize: - numchunks = count / task.max_chunksize - chunksize = task.max_chunksize - remainder = count - (numchunks * chunksize) - # END handle chunking - - # the following loops are kind of unrolled - code duplication - # should make things execute faster. Putting the if statements - # into the loop would be less code, but ... slower - print count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: - # respect the chunk size, and split the task up if we want - # to process too much. This can be defined per task - queue = self._queue - if numchunks > 1: - for i in xrange(numchunks): - queue.put((task.process, chunksize)) - # END for each chunk to put - else: - queue.put((task.process, chunksize)) - # END try efficient looping - - if remainder: - queue.put((task.process, remainder)) - # END handle chunksize - else: - # no workers, so we have to do the work ourselves - if numchunks > 1: - for i in xrange(numchunks): - task.process(chunksize) - # END for each chunk to put - else: - task.process(chunksize) - # END try efficient looping - - if remainder: - task.process(remainder) - # END handle chunksize - # END handle serial mode - # END handle queuing - - # always walk the whole graph, we want to find consumed tasks - return True def _prepare_channel_read(self, task, count): """Process the tasks which depend on the given one to be sure the input @@ -201,7 +149,98 @@ class ThreadPool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + dfirst_tasks = list() + # for the walk, we must make sure the ordering does not change + # Note: the result of this could be cached + self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + + # check the min count on all involved tasks, and be sure that we don't + # have any task which produces less than the maximum min-count of all tasks + # The actual_count is used when chunking tasks up for the queue, whereas + # the count is usued to determine whether we still have enough output + # on the queue, checking qsize ( ->revise ) + # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces + # at least 10, T-1 goes with 1, then T will block after 1 item, which + # is read by the client. On the next read of 1 item, we would find T's + # queue empty and put in another 10, which could put another thread into + # blocking state. T-1 produces one more item, which is consumed right away + # by the two threads running T. Although this works in the end, it leaves + # many threads blocking and waiting for input, which is not desired. + # Setting the min-count to the max of the mincount of all tasks assures + # we have enough items for all. + # Addition: in serial mode, we would enter a deadlock if one task would + # ever wait for items ! + actual_count = count + min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks) + min_count = reduce(lambda m1, m2: max(m1, m2), min_counts) + if 0 < count < min_count: + actual_count = min_count + # END set actual count + + # the list includes our tasks - the first one to evaluate first, the + # requested one last + for task in dfirst_tasks: + if task.error() or task.is_done(): + self._consumed_tasks.put(task) + continue + # END skip processing + + # if the task does not have the required output on its queue, schedule + # it for processing. If we should process all, we don't care about the + # amount as it should process until its all done. + # NOTE: revise this for multi-tasking - checking qsize doesnt work there ! + if count < 1 or task._out_wc.size() < count: + # but we continue to use the actual count to produce the output + numchunks = 1 + chunksize = actual_count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and actual_count > task.max_chunksize: + numchunks = actual_count / task.max_chunksize + chunksize = task.max_chunksize + remainder = actual_count - (numchunks * chunksize) + # END handle chunking + + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + if self._workers: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if numchunks > 1: + for i in xrange(numchunks): + queue.put((task.process, chunksize)) + # END for each chunk to put + else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): + task.process(chunksize) + # END for each chunk to put + else: + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) + # END handle chunksize + # END handle serial mode + # END handle queuing + # END for each task to process + def _post_channel_read(self, task): """Called after we processed a read to cleanup""" @@ -250,7 +289,7 @@ class ThreadPool(object): cur_count = len(self._workers) if cur_count < size: for i in range(size - cur_count): - worker = WorkerThread(self._queue) + worker = self.WorkerCls(self._queue) worker.start() self._workers.append(worker) # END for each new worker to create @@ -291,7 +330,12 @@ class ThreadPool(object): # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes task.set_done() - self._tasks.del_node(task) + self._taskgraph_lock.acquire() + try: + self._tasks.del_node(task) + finally: + self._taskgraph_lock.release() + # END locked deletion for t in in_tasks: self._del_task_if_orphaned(t) @@ -314,16 +358,33 @@ class ThreadPool(object): task._pool_ref = weakref.ref(self) # END init input channel task - self._tasks.add_node(task) + self._taskgraph_lock.acquire() + try: + self._tasks.add_node(task) + finally: + self._taskgraph_lock.release() + # END sync task addition # If the input channel is one of our read channels, we add the relation if has_input_channel: ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: - self._tasks.add_edge(ic._task, task) + self._taskgraph_lock.acquire() + try: + self._tasks.add_edge(ic._task, task) + finally: + self._taskgraph_lock.release() + # END handle edge-adding # END add task relation # END handle input channels for connections return rc #} END interface + + +class ThreadPool(Pool): + """A pool using threads as worker""" + WorkerCls = WorkerThread + LockCls = Lock + TaskQueueCls = Queue |