summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 23:41:20 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 00:05:09 +0200
commit6a252661c3bf4202a4d571f9c41d2afa48d9d75f (patch)
tree6c46c78078d91aa90b93f18b9c32215b4be57ee0 /lib/git/async/pool.py
parent867129e2950458ab75523b920a5e227e3efa8bbc (diff)
downloadgitpython-6a252661c3bf4202a4d571f9c41d2afa48d9d75f.tar.gz
pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py44
1 files changed, 27 insertions, 17 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 620e2258..fcb0f442 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -49,7 +49,7 @@ class RPoolChannel(RChannel):
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
- def read(self, count=0, block=False, timeout=None):
+ def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
@@ -57,14 +57,21 @@ class RPoolChannel(RChannel):
self._pre_cb()
# END pre callback
- ##################################################
- self._pool._prepare_processing(self._task, count)
- ##################################################
+ ########## prepare ##############################
+ self._pool._prepare_channel_read(self._task, count)
+
+ ######### read data ######
+ # read actual items, tasks were setup to put their output into our channel ( as well )
items = RChannel.read(self, count, block, timeout)
+
if self._post_cb:
items = self._post_cb(items)
+
+ ####### Finalize ########
+ self._pool._post_channel_read(self._task)
+
return items
#{ Internal
@@ -119,17 +126,17 @@ class ThreadPool(object):
self._consumed_tasks.append(task)
return True
# END stop processing
-
- # allow min-count override. This makes sure we take at least min-count
- # items off the input queue ( later )
- if task.min_count is not None and count != 0 and count < task.min_count:
- count = task.min_count
- # END handle min-count
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
if count < 1 or task._out_wc.size() < count:
+ # allow min-count override. This makes sure we take at least min-count
+ # items off the input queue ( later )
+ if task.min_count is not None and 0 < count < task.min_count:
+ count = task.min_count
+ # END handle min-count
+
numchunks = 1
chunksize = count
remainder = 0
@@ -144,10 +151,10 @@ class ThreadPool(object):
remainder = count - (numchunks * chunksize)
# END handle chunking
- print count, numchunks, chunksize, remainder
# the following loops are kind of unrolled - code duplication
# should make things execute faster. Putting the if statements
# into the loop would be less code, but ... slower
+ print count, numchunks, chunksize, remainder, task._out_wc.size()
if self._workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
@@ -176,18 +183,13 @@ class ThreadPool(object):
if remainder:
task.process(remainder)
# END handle chunksize
-
- # as we are serial, we can check for consumption right away
- if task.error() or task.is_done():
- self._consumed_tasks.append(task)
- # END handle consumption
# END handle serial mode
# END handle queuing
# always walk the whole graph, we want to find consumed tasks
return True
- def _prepare_processing(self, task, count):
+ def _prepare_channel_read(self, task, count):
"""Process the tasks which depend on the given one to be sure the input
channels are filled with data once we process the actual task
@@ -201,10 +203,18 @@ class ThreadPool(object):
is fine as we walked them depth-first."""
self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
+ def _post_channel_read(self, task):
+ """Called after we processed a read to cleanup"""
+ # check whether we consumed the task, and schedule it for deletion
+ if task.error() or task.is_done():
+ self._consumed_tasks.append(task)
+ # END handle consumption
+
# delete consumed tasks to cleanup
for task in self._consumed_tasks:
self.del_task(task)
# END for each task to delete
+
del(self._consumed_tasks[:])
def _del_task_if_orphaned(self, task):