diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 44 |
1 files changed, 27 insertions, 17 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 620e2258..fcb0f442 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -49,7 +49,7 @@ class RPoolChannel(RChannel): If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - def read(self, count=0, block=False, timeout=None): + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" @@ -57,14 +57,21 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback - ################################################## - self._pool._prepare_processing(self._task, count) - ################################################## + ########## prepare ############################## + self._pool._prepare_channel_read(self._task, count) + + ######### read data ###### + # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + if self._post_cb: items = self._post_cb(items) + + ####### Finalize ######## + self._pool._post_channel_read(self._task) + return items #{ Internal @@ -119,17 +126,17 @@ class ThreadPool(object): self._consumed_tasks.append(task) return True # END stop processing - - # allow min-count override. This makes sure we take at least min-count - # items off the input queue ( later ) - if task.min_count is not None and count != 0 and count < task.min_count: - count = task.min_count - # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. if count < 1 or task._out_wc.size() < count: + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None and 0 < count < task.min_count: + count = task.min_count + # END handle min-count + numchunks = 1 chunksize = count remainder = 0 @@ -144,10 +151,10 @@ class ThreadPool(object): remainder = count - (numchunks * chunksize) # END handle chunking - print count, numchunks, chunksize, remainder # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower + print count, numchunks, chunksize, remainder, task._out_wc.size() if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task @@ -176,18 +183,13 @@ class ThreadPool(object): if remainder: task.process(remainder) # END handle chunksize - - # as we are serial, we can check for consumption right away - if task.error() or task.is_done(): - self._consumed_tasks.append(task) - # END handle consumption # END handle serial mode # END handle queuing # always walk the whole graph, we want to find consumed tasks return True - def _prepare_processing(self, task, count): + def _prepare_channel_read(self, task, count): """Process the tasks which depend on the given one to be sure the input channels are filled with data once we process the actual task @@ -201,10 +203,18 @@ class ThreadPool(object): is fine as we walked them depth-first.""" self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + def _post_channel_read(self, task): + """Called after we processed a read to cleanup""" + # check whether we consumed the task, and schedule it for deletion + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # delete consumed tasks to cleanup for task in self._consumed_tasks: self.del_task(task) # END for each task to delete + del(self._consumed_tasks[:]) def _del_task_if_orphaned(self, task): |