summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 23:41:20 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 00:05:09 +0200
commit6a252661c3bf4202a4d571f9c41d2afa48d9d75f (patch)
tree6c46c78078d91aa90b93f18b9c32215b4be57ee0
parent867129e2950458ab75523b920a5e227e3efa8bbc (diff)
downloadgitpython-6a252661c3bf4202a4d571f9c41d2afa48d9d75f.tar.gz
pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks
-rw-r--r--lib/git/async/channel.py10
-rw-r--r--lib/git/async/pool.py44
-rw-r--r--lib/git/async/task.py12
-rw-r--r--lib/git/async/thread.py43
-rw-r--r--test/git/async/test_pool.py43
-rw-r--r--test/git/async/test_thread.py19
6 files changed, 96 insertions, 75 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 0a1db26b..2add9478 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -162,7 +162,15 @@ class RChannel(Channel):
try:
if wc.closed:
have_timeout = True
- break
+ # its about the 'in the meanwhile' :) - get everything
+ # we can in non-blocking mode. This will raise
+ try:
+ while True:
+ out.append(queue.get(False))
+ # END until it raises Empty
+ except Empty:
+ break
+ # END finally, out of here
# END don't continue on closed channels
# END abort reading if it was closed ( in the meanwhile )
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 620e2258..fcb0f442 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -49,7 +49,7 @@ class RPoolChannel(RChannel):
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
- def read(self, count=0, block=False, timeout=None):
+ def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
@@ -57,14 +57,21 @@ class RPoolChannel(RChannel):
self._pre_cb()
# END pre callback
- ##################################################
- self._pool._prepare_processing(self._task, count)
- ##################################################
+ ########## prepare ##############################
+ self._pool._prepare_channel_read(self._task, count)
+
+ ######### read data ######
+ # read actual items, tasks were setup to put their output into our channel ( as well )
items = RChannel.read(self, count, block, timeout)
+
if self._post_cb:
items = self._post_cb(items)
+
+ ####### Finalize ########
+ self._pool._post_channel_read(self._task)
+
return items
#{ Internal
@@ -119,17 +126,17 @@ class ThreadPool(object):
self._consumed_tasks.append(task)
return True
# END stop processing
-
- # allow min-count override. This makes sure we take at least min-count
- # items off the input queue ( later )
- if task.min_count is not None and count != 0 and count < task.min_count:
- count = task.min_count
- # END handle min-count
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
if count < 1 or task._out_wc.size() < count:
+ # allow min-count override. This makes sure we take at least min-count
+ # items off the input queue ( later )
+ if task.min_count is not None and 0 < count < task.min_count:
+ count = task.min_count
+ # END handle min-count
+
numchunks = 1
chunksize = count
remainder = 0
@@ -144,10 +151,10 @@ class ThreadPool(object):
remainder = count - (numchunks * chunksize)
# END handle chunking
- print count, numchunks, chunksize, remainder
# the following loops are kind of unrolled - code duplication
# should make things execute faster. Putting the if statements
# into the loop would be less code, but ... slower
+ print count, numchunks, chunksize, remainder, task._out_wc.size()
if self._workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
@@ -176,18 +183,13 @@ class ThreadPool(object):
if remainder:
task.process(remainder)
# END handle chunksize
-
- # as we are serial, we can check for consumption right away
- if task.error() or task.is_done():
- self._consumed_tasks.append(task)
- # END handle consumption
# END handle serial mode
# END handle queuing
# always walk the whole graph, we want to find consumed tasks
return True
- def _prepare_processing(self, task, count):
+ def _prepare_channel_read(self, task, count):
"""Process the tasks which depend on the given one to be sure the input
channels are filled with data once we process the actual task
@@ -201,10 +203,18 @@ class ThreadPool(object):
is fine as we walked them depth-first."""
self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
+ def _post_channel_read(self, task):
+ """Called after we processed a read to cleanup"""
+ # check whether we consumed the task, and schedule it for deletion
+ if task.error() or task.is_done():
+ self._consumed_tasks.append(task)
+ # END handle consumption
+
# delete consumed tasks to cleanup
for task in self._consumed_tasks:
self.del_task(task)
# END for each task to delete
+
del(self._consumed_tasks[:])
def _del_task_if_orphaned(self, task):
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index ec650237..3137746c 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -7,7 +7,13 @@ class OutputChannelTask(Node):
additional information on how the task should be queued and processed.
Results of the item processing are sent to an output channel, which is to be
- set by the creator"""
+ set by the creator
+
+ * **min_count** assures that not less than min_count items will be processed per call.
+ * **max_chunksize** assures that multi-threading is happening in smaller chunks. If
+ someone wants all items to be processed, using read(0), the whole task would go to
+ one worker, as well as dependent tasks. If you want finer granularity , you can
+ specify this here, causing chunks to be no larger than max_chunksize"""
__slots__ = ( '_read', # method to yield items to process
'_out_wc', # output write channel
'_exc', # exception caught
@@ -42,7 +48,6 @@ class OutputChannelTask(Node):
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
-
try:
if self.apply_single:
for item in items:
@@ -58,6 +63,9 @@ class OutputChannelTask(Node):
# if we didn't get all demanded items, which is also the case if count is 0
# we have depleted the input channel and are done
+ # We could check our output channel for how many items we have and put that
+ # into the equation, but whats important is that we were asked to produce
+ # count items.
if not items or len(items) != count:
self.set_done()
# END handle done state
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 82acbd8f..0292289d 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -115,33 +115,17 @@ class WorkerThread(TerminatableThread):
"""
__slots__ = ('inq', 'outq')
- class InvalidRoutineError(Exception):
- """Class sent as return value in case of an error"""
-
def __init__(self, inq = None):
super(WorkerThread, self).__init__()
self.inq = inq or Queue.Queue()
- def call(self, function, *args, **kwargs):
- """Method that makes the call to the worker using the input queue,
- returning our output queue
-
- :param funciton: can be a standalone function unrelated to this class,
- a class method of this class or any instance method.
- If it is a string, it will be considered a function residing on this instance
- :param args: arguments to pass to function
- :parma **kwargs: kwargs to pass to function"""
- self.inq.put((function, args, kwargs))
-
def run(self):
"""Process input tasks until we receive the quit signal"""
while True:
if self._should_terminate():
break
# END check for stop request
- routine = None
- args = tuple()
- kwargs = dict()
+
# don't wait too long, instead check for the termination request more often
try:
tasktuple = self.inq.get(True, 1)
@@ -149,29 +133,19 @@ class WorkerThread(TerminatableThread):
continue
# END get task with timeout
- if isinstance(tasktuple, (tuple, list)):
- if len(tasktuple) == 3:
- routine, args, kwargs = tasktuple
- elif len(tasktuple) == 2:
- routine, args = tasktuple
- elif len(tasktuple) == 1:
- routine = tasktuple[0]
- # END tasktuple length check
- elif inspect.isroutine(tasktuple):
- routine = tasktuple
- # END tasktuple handling
+ # needing exactly one function, and one arg
+ assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
+ routine, arg = tasktuple
try:
rval = None
if inspect.ismethod(routine):
if routine.im_self is None:
- rval = routine(self, *args, **kwargs)
+ rval = routine(self, arg)
else:
- rval = routine(*args, **kwargs)
+ rval = routine(arg)
elif inspect.isroutine(routine):
- rval = routine(*args, **kwargs)
- elif isinstance(routine, basestring) and hasattr(self, routine):
- rval = getattr(self, routine)(*args, **kwargs)
+ rval = routine(arg)
else:
# ignore unknown items
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
@@ -180,7 +154,8 @@ class WorkerThread(TerminatableThread):
except StopIteration:
break
except Exception,e:
- print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
+ print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
+ break # abort ...
# END routine exception handling
# END endless loop
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 65b2d228..628e2a93 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -10,9 +10,12 @@ class TestThreadTaskNode(InputIteratorThreadTask):
def __init__(self, *args, **kwargs):
super(TestThreadTaskNode, self).__init__(*args, **kwargs)
self.reset(self._iterator)
+ self.should_fail = False
def do_fun(self, item):
self.item_count += 1
+ if self.should_fail:
+ raise AssertionError("I am failing just for the fun of it")
return item
def reset(self, iterator):
@@ -29,7 +32,7 @@ class TestThreadTaskNode(InputIteratorThreadTask):
:return: self"""
assert self.process_count == pc
assert self.item_count == fc
-
+ assert not self.error()
return self
@@ -60,10 +63,10 @@ class TestThreadPool(TestBase):
assert task._out_wc is not None
# pull the result completely - we should get one task, which calls its
- # function once. In serial mode, the order matches
+ # function once. In sync mode, the order matches
items = rc.read()
- task._assert(1, ni).reset(make_iter())
assert len(items) == ni
+ task._assert(1, ni).reset(make_iter())
assert items[0] == 0 and items[-1] == ni-1
# as the task is done, it should have been removed - we have read everything
@@ -91,13 +94,17 @@ class TestThreadPool(TestBase):
# if we query 1 item, it will prepare ni / 2
task.min_count = ni / 2
rc = p.add_task(task)
- assert len(rc.read(1)) == 1 # 1
- assert len(rc.read(1)) == 1
- assert len(rc.read(ni-2)) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2
- task._assert(2, ni) # two chunks, 20 calls ( all items )
- assert p.num_tasks() == 1 + null_tasks # it still doesn't know, didn't read too much
- assert len(rc.read()) == 0 # now we read too much and its done
+ assert len(rc.read(1)) == 1 # processes ni / 2
+ assert len(rc.read(1)) == 1 # processes nothing
+ # rest - it has ni/2 - 2 on the queue, and pulls ni-2
+ # It wants too much, so the task realizes its done. The task
+ # doesn't care about the items in its output channel
+ assert len(rc.read(ni-2)) == ni - 2
assert p.num_tasks() == null_tasks
+ task._assert(2, ni) # two chunks, 20 calls ( all items )
+
+ # its already done, gives us no more
+ assert len(rc.read()) == 0
# test chunking
# we always want 4 chunks, these could go to individual nodes
@@ -135,11 +142,25 @@ class TestThreadPool(TestBase):
for i in range(ni):
assert rc.read(1)[0] == i
# END for each item
- task._assert(ni / 4, ni)
+ task._assert(ni / task.min_count, ni)
del(rc)
assert p.num_tasks() == null_tasks
+ # test failure
+ # on failure, the processing stops and the task is finished, keeping
+ # his error for later
+ task.reset(make_iter())
+ task.should_fail = True
+ rc = p.add_task(task)
+ assert len(rc.read()) == 0 # failure on first item
+ assert isinstance(task.error(), AssertionError)
+ assert p.num_tasks() == null_tasks
+
def _assert_async_dependent_tasks(self, p):
+ # includes failure in center task, 'recursive' orphan cleanup
+ # This will also verify that the channel-close mechanism works
+ # t1 -> t2 -> t3
+ # t1 -> x -> t3
pass
def test_base(self):
@@ -199,6 +220,6 @@ class TestThreadPool(TestBase):
# DEPENDENT TASK ASYNC MODE
###########################
- # self._assert_async_dependent_tasks(p)
+ self._assert_async_dependent_tasks(p)
diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py
index 2ea8d1ff..a08c1dc7 100644
--- a/test/git/async/test_thread.py
+++ b/test/git/async/test_thread.py
@@ -3,28 +3,26 @@
from test.testlib import *
from git.async.thread import *
from Queue import Queue
+import time
class TestWorker(WorkerThread):
def __init__(self, *args, **kwargs):
super(TestWorker, self).__init__(*args, **kwargs)
self.reset()
- def fun(self, *args, **kwargs):
+ def fun(self, arg):
self.called = True
- self.args = args
- self.kwargs = kwargs
+ self.arg = arg
return True
def make_assertion(self):
assert self.called
- assert self.args
- assert self.kwargs
+ assert self.arg
self.reset()
def reset(self):
self.called = False
- self.args = None
- self.kwargs = None
+ self.arg = None
class TestThreads( TestCase ):
@@ -36,10 +34,11 @@ class TestThreads( TestCase ):
# test different method types
standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs)
- for function in ("fun", TestWorker.fun, worker.fun, standalone_func):
- worker.call(function, 1, this='that')
+ for function in (TestWorker.fun, worker.fun, standalone_func):
+ worker.inq.put((function, 1))
+ time.sleep(0.01)
worker.make_assertion()
# END for each function type
- worker.call('quit')
+ worker.stop_and_join()