summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 23:41:20 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 00:05:09 +0200
commit6a252661c3bf4202a4d571f9c41d2afa48d9d75f (patch)
tree6c46c78078d91aa90b93f18b9c32215b4be57ee0 /lib/git/async
parent867129e2950458ab75523b920a5e227e3efa8bbc (diff)
downloadgitpython-6a252661c3bf4202a4d571f9c41d2afa48d9d75f.tar.gz
pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py10
-rw-r--r--lib/git/async/pool.py44
-rw-r--r--lib/git/async/task.py12
-rw-r--r--lib/git/async/thread.py43
4 files changed, 55 insertions, 54 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 0a1db26b..2add9478 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -162,7 +162,15 @@ class RChannel(Channel):
try:
if wc.closed:
have_timeout = True
- break
+ # its about the 'in the meanwhile' :) - get everything
+ # we can in non-blocking mode. This will raise
+ try:
+ while True:
+ out.append(queue.get(False))
+ # END until it raises Empty
+ except Empty:
+ break
+ # END finally, out of here
# END don't continue on closed channels
# END abort reading if it was closed ( in the meanwhile )
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 620e2258..fcb0f442 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -49,7 +49,7 @@ class RPoolChannel(RChannel):
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
- def read(self, count=0, block=False, timeout=None):
+ def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
@@ -57,14 +57,21 @@ class RPoolChannel(RChannel):
self._pre_cb()
# END pre callback
- ##################################################
- self._pool._prepare_processing(self._task, count)
- ##################################################
+ ########## prepare ##############################
+ self._pool._prepare_channel_read(self._task, count)
+
+ ######### read data ######
+ # read actual items, tasks were setup to put their output into our channel ( as well )
items = RChannel.read(self, count, block, timeout)
+
if self._post_cb:
items = self._post_cb(items)
+
+ ####### Finalize ########
+ self._pool._post_channel_read(self._task)
+
return items
#{ Internal
@@ -119,17 +126,17 @@ class ThreadPool(object):
self._consumed_tasks.append(task)
return True
# END stop processing
-
- # allow min-count override. This makes sure we take at least min-count
- # items off the input queue ( later )
- if task.min_count is not None and count != 0 and count < task.min_count:
- count = task.min_count
- # END handle min-count
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
if count < 1 or task._out_wc.size() < count:
+ # allow min-count override. This makes sure we take at least min-count
+ # items off the input queue ( later )
+ if task.min_count is not None and 0 < count < task.min_count:
+ count = task.min_count
+ # END handle min-count
+
numchunks = 1
chunksize = count
remainder = 0
@@ -144,10 +151,10 @@ class ThreadPool(object):
remainder = count - (numchunks * chunksize)
# END handle chunking
- print count, numchunks, chunksize, remainder
# the following loops are kind of unrolled - code duplication
# should make things execute faster. Putting the if statements
# into the loop would be less code, but ... slower
+ print count, numchunks, chunksize, remainder, task._out_wc.size()
if self._workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
@@ -176,18 +183,13 @@ class ThreadPool(object):
if remainder:
task.process(remainder)
# END handle chunksize
-
- # as we are serial, we can check for consumption right away
- if task.error() or task.is_done():
- self._consumed_tasks.append(task)
- # END handle consumption
# END handle serial mode
# END handle queuing
# always walk the whole graph, we want to find consumed tasks
return True
- def _prepare_processing(self, task, count):
+ def _prepare_channel_read(self, task, count):
"""Process the tasks which depend on the given one to be sure the input
channels are filled with data once we process the actual task
@@ -201,10 +203,18 @@ class ThreadPool(object):
is fine as we walked them depth-first."""
self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
+ def _post_channel_read(self, task):
+ """Called after we processed a read to cleanup"""
+ # check whether we consumed the task, and schedule it for deletion
+ if task.error() or task.is_done():
+ self._consumed_tasks.append(task)
+ # END handle consumption
+
# delete consumed tasks to cleanup
for task in self._consumed_tasks:
self.del_task(task)
# END for each task to delete
+
del(self._consumed_tasks[:])
def _del_task_if_orphaned(self, task):
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index ec650237..3137746c 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -7,7 +7,13 @@ class OutputChannelTask(Node):
additional information on how the task should be queued and processed.
Results of the item processing are sent to an output channel, which is to be
- set by the creator"""
+ set by the creator
+
+ * **min_count** assures that not less than min_count items will be processed per call.
+ * **max_chunksize** assures that multi-threading is happening in smaller chunks. If
+ someone wants all items to be processed, using read(0), the whole task would go to
+ one worker, as well as dependent tasks. If you want finer granularity , you can
+ specify this here, causing chunks to be no larger than max_chunksize"""
__slots__ = ( '_read', # method to yield items to process
'_out_wc', # output write channel
'_exc', # exception caught
@@ -42,7 +48,6 @@ class OutputChannelTask(Node):
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
-
try:
if self.apply_single:
for item in items:
@@ -58,6 +63,9 @@ class OutputChannelTask(Node):
# if we didn't get all demanded items, which is also the case if count is 0
# we have depleted the input channel and are done
+ # We could check our output channel for how many items we have and put that
+ # into the equation, but whats important is that we were asked to produce
+ # count items.
if not items or len(items) != count:
self.set_done()
# END handle done state
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 82acbd8f..0292289d 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -115,33 +115,17 @@ class WorkerThread(TerminatableThread):
"""
__slots__ = ('inq', 'outq')
- class InvalidRoutineError(Exception):
- """Class sent as return value in case of an error"""
-
def __init__(self, inq = None):
super(WorkerThread, self).__init__()
self.inq = inq or Queue.Queue()
- def call(self, function, *args, **kwargs):
- """Method that makes the call to the worker using the input queue,
- returning our output queue
-
- :param funciton: can be a standalone function unrelated to this class,
- a class method of this class or any instance method.
- If it is a string, it will be considered a function residing on this instance
- :param args: arguments to pass to function
- :parma **kwargs: kwargs to pass to function"""
- self.inq.put((function, args, kwargs))
-
def run(self):
"""Process input tasks until we receive the quit signal"""
while True:
if self._should_terminate():
break
# END check for stop request
- routine = None
- args = tuple()
- kwargs = dict()
+
# don't wait too long, instead check for the termination request more often
try:
tasktuple = self.inq.get(True, 1)
@@ -149,29 +133,19 @@ class WorkerThread(TerminatableThread):
continue
# END get task with timeout
- if isinstance(tasktuple, (tuple, list)):
- if len(tasktuple) == 3:
- routine, args, kwargs = tasktuple
- elif len(tasktuple) == 2:
- routine, args = tasktuple
- elif len(tasktuple) == 1:
- routine = tasktuple[0]
- # END tasktuple length check
- elif inspect.isroutine(tasktuple):
- routine = tasktuple
- # END tasktuple handling
+ # needing exactly one function, and one arg
+ assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
+ routine, arg = tasktuple
try:
rval = None
if inspect.ismethod(routine):
if routine.im_self is None:
- rval = routine(self, *args, **kwargs)
+ rval = routine(self, arg)
else:
- rval = routine(*args, **kwargs)
+ rval = routine(arg)
elif inspect.isroutine(routine):
- rval = routine(*args, **kwargs)
- elif isinstance(routine, basestring) and hasattr(self, routine):
- rval = getattr(self, routine)(*args, **kwargs)
+ rval = routine(arg)
else:
# ignore unknown items
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
@@ -180,7 +154,8 @@ class WorkerThread(TerminatableThread):
except StopIteration:
break
except Exception,e:
- print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
+ print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
+ break # abort ...
# END routine exception handling
# END endless loop