diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 23:41:20 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 00:05:09 +0200 |
commit | 6a252661c3bf4202a4d571f9c41d2afa48d9d75f (patch) | |
tree | 6c46c78078d91aa90b93f18b9c32215b4be57ee0 | |
parent | 867129e2950458ab75523b920a5e227e3efa8bbc (diff) | |
download | gitpython-6a252661c3bf4202a4d571f9c41d2afa48d9d75f.tar.gz |
pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks
-rw-r--r-- | lib/git/async/channel.py | 10 | ||||
-rw-r--r-- | lib/git/async/pool.py | 44 | ||||
-rw-r--r-- | lib/git/async/task.py | 12 | ||||
-rw-r--r-- | lib/git/async/thread.py | 43 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 43 | ||||
-rw-r--r-- | test/git/async/test_thread.py | 19 |
6 files changed, 96 insertions, 75 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 0a1db26b..2add9478 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -162,7 +162,15 @@ class RChannel(Channel): try: if wc.closed: have_timeout = True - break + # its about the 'in the meanwhile' :) - get everything + # we can in non-blocking mode. This will raise + try: + while True: + out.append(queue.get(False)) + # END until it raises Empty + except Empty: + break + # END finally, out of here # END don't continue on closed channels # END abort reading if it was closed ( in the meanwhile ) diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 620e2258..fcb0f442 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -49,7 +49,7 @@ class RPoolChannel(RChannel): If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - def read(self, count=0, block=False, timeout=None): + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" @@ -57,14 +57,21 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback - ################################################## - self._pool._prepare_processing(self._task, count) - ################################################## + ########## prepare ############################## + self._pool._prepare_channel_read(self._task, count) + + ######### read data ###### + # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + if self._post_cb: items = self._post_cb(items) + + ####### Finalize ######## + self._pool._post_channel_read(self._task) + return items #{ Internal @@ -119,17 +126,17 @@ class ThreadPool(object): self._consumed_tasks.append(task) return True # END stop processing - - # allow min-count override. This makes sure we take at least min-count - # items off the input queue ( later ) - if task.min_count is not None and count != 0 and count < task.min_count: - count = task.min_count - # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. if count < 1 or task._out_wc.size() < count: + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None and 0 < count < task.min_count: + count = task.min_count + # END handle min-count + numchunks = 1 chunksize = count remainder = 0 @@ -144,10 +151,10 @@ class ThreadPool(object): remainder = count - (numchunks * chunksize) # END handle chunking - print count, numchunks, chunksize, remainder # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower + print count, numchunks, chunksize, remainder, task._out_wc.size() if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task @@ -176,18 +183,13 @@ class ThreadPool(object): if remainder: task.process(remainder) # END handle chunksize - - # as we are serial, we can check for consumption right away - if task.error() or task.is_done(): - self._consumed_tasks.append(task) - # END handle consumption # END handle serial mode # END handle queuing # always walk the whole graph, we want to find consumed tasks return True - def _prepare_processing(self, task, count): + def _prepare_channel_read(self, task, count): """Process the tasks which depend on the given one to be sure the input channels are filled with data once we process the actual task @@ -201,10 +203,18 @@ class ThreadPool(object): is fine as we walked them depth-first.""" self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + def _post_channel_read(self, task): + """Called after we processed a read to cleanup""" + # check whether we consumed the task, and schedule it for deletion + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # delete consumed tasks to cleanup for task in self._consumed_tasks: self.del_task(task) # END for each task to delete + del(self._consumed_tasks[:]) def _del_task_if_orphaned(self, task): diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ec650237..3137746c 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -7,7 +7,13 @@ class OutputChannelTask(Node): additional information on how the task should be queued and processed. Results of the item processing are sent to an output channel, which is to be - set by the creator""" + set by the creator + + * **min_count** assures that not less than min_count items will be processed per call. + * **max_chunksize** assures that multi-threading is happening in smaller chunks. If + someone wants all items to be processed, using read(0), the whole task would go to + one worker, as well as dependent tasks. If you want finer granularity , you can + specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process '_out_wc', # output write channel '_exc', # exception caught @@ -42,7 +48,6 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - try: if self.apply_single: for item in items: @@ -58,6 +63,9 @@ class OutputChannelTask(Node): # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done + # We could check our output channel for how many items we have and put that + # into the equation, but whats important is that we were asked to produce + # count items. if not items or len(items) != count: self.set_done() # END handle done state diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 82acbd8f..0292289d 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -115,33 +115,17 @@ class WorkerThread(TerminatableThread): """ __slots__ = ('inq', 'outq') - class InvalidRoutineError(Exception): - """Class sent as return value in case of an error""" - def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() - def call(self, function, *args, **kwargs): - """Method that makes the call to the worker using the input queue, - returning our output queue - - :param funciton: can be a standalone function unrelated to this class, - a class method of this class or any instance method. - If it is a string, it will be considered a function residing on this instance - :param args: arguments to pass to function - :parma **kwargs: kwargs to pass to function""" - self.inq.put((function, args, kwargs)) - def run(self): """Process input tasks until we receive the quit signal""" while True: if self._should_terminate(): break # END check for stop request - routine = None - args = tuple() - kwargs = dict() + # don't wait too long, instead check for the termination request more often try: tasktuple = self.inq.get(True, 1) @@ -149,29 +133,19 @@ class WorkerThread(TerminatableThread): continue # END get task with timeout - if isinstance(tasktuple, (tuple, list)): - if len(tasktuple) == 3: - routine, args, kwargs = tasktuple - elif len(tasktuple) == 2: - routine, args = tasktuple - elif len(tasktuple) == 1: - routine = tasktuple[0] - # END tasktuple length check - elif inspect.isroutine(tasktuple): - routine = tasktuple - # END tasktuple handling + # needing exactly one function, and one arg + assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" + routine, arg = tasktuple try: rval = None if inspect.ismethod(routine): if routine.im_self is None: - rval = routine(self, *args, **kwargs) + rval = routine(self, arg) else: - rval = routine(*args, **kwargs) + rval = routine(arg) elif inspect.isroutine(routine): - rval = routine(*args, **kwargs) - elif isinstance(routine, basestring) and hasattr(self, routine): - rval = getattr(self, routine)(*args, **kwargs) + rval = routine(arg) else: # ignore unknown items print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) @@ -180,7 +154,8 @@ class WorkerThread(TerminatableThread): except StopIteration: break except Exception,e: - print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) + print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + break # abort ... # END routine exception handling # END endless loop diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 65b2d228..628e2a93 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -10,9 +10,12 @@ class TestThreadTaskNode(InputIteratorThreadTask): def __init__(self, *args, **kwargs): super(TestThreadTaskNode, self).__init__(*args, **kwargs) self.reset(self._iterator) + self.should_fail = False def do_fun(self, item): self.item_count += 1 + if self.should_fail: + raise AssertionError("I am failing just for the fun of it") return item def reset(self, iterator): @@ -29,7 +32,7 @@ class TestThreadTaskNode(InputIteratorThreadTask): :return: self""" assert self.process_count == pc assert self.item_count == fc - + assert not self.error() return self @@ -60,10 +63,10 @@ class TestThreadPool(TestBase): assert task._out_wc is not None # pull the result completely - we should get one task, which calls its - # function once. In serial mode, the order matches + # function once. In sync mode, the order matches items = rc.read() - task._assert(1, ni).reset(make_iter()) assert len(items) == ni + task._assert(1, ni).reset(make_iter()) assert items[0] == 0 and items[-1] == ni-1 # as the task is done, it should have been removed - we have read everything @@ -91,13 +94,17 @@ class TestThreadPool(TestBase): # if we query 1 item, it will prepare ni / 2 task.min_count = ni / 2 rc = p.add_task(task) - assert len(rc.read(1)) == 1 # 1 - assert len(rc.read(1)) == 1 - assert len(rc.read(ni-2)) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2 - task._assert(2, ni) # two chunks, 20 calls ( all items ) - assert p.num_tasks() == 1 + null_tasks # it still doesn't know, didn't read too much - assert len(rc.read()) == 0 # now we read too much and its done + assert len(rc.read(1)) == 1 # processes ni / 2 + assert len(rc.read(1)) == 1 # processes nothing + # rest - it has ni/2 - 2 on the queue, and pulls ni-2 + # It wants too much, so the task realizes its done. The task + # doesn't care about the items in its output channel + assert len(rc.read(ni-2)) == ni - 2 assert p.num_tasks() == null_tasks + task._assert(2, ni) # two chunks, 20 calls ( all items ) + + # its already done, gives us no more + assert len(rc.read()) == 0 # test chunking # we always want 4 chunks, these could go to individual nodes @@ -135,11 +142,25 @@ class TestThreadPool(TestBase): for i in range(ni): assert rc.read(1)[0] == i # END for each item - task._assert(ni / 4, ni) + task._assert(ni / task.min_count, ni) del(rc) assert p.num_tasks() == null_tasks + # test failure + # on failure, the processing stops and the task is finished, keeping + # his error for later + task.reset(make_iter()) + task.should_fail = True + rc = p.add_task(task) + assert len(rc.read()) == 0 # failure on first item + assert isinstance(task.error(), AssertionError) + assert p.num_tasks() == null_tasks + def _assert_async_dependent_tasks(self, p): + # includes failure in center task, 'recursive' orphan cleanup + # This will also verify that the channel-close mechanism works + # t1 -> t2 -> t3 + # t1 -> x -> t3 pass def test_base(self): @@ -199,6 +220,6 @@ class TestThreadPool(TestBase): # DEPENDENT TASK ASYNC MODE ########################### - # self._assert_async_dependent_tasks(p) + self._assert_async_dependent_tasks(p) diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py index 2ea8d1ff..a08c1dc7 100644 --- a/test/git/async/test_thread.py +++ b/test/git/async/test_thread.py @@ -3,28 +3,26 @@ from test.testlib import * from git.async.thread import * from Queue import Queue +import time class TestWorker(WorkerThread): def __init__(self, *args, **kwargs): super(TestWorker, self).__init__(*args, **kwargs) self.reset() - def fun(self, *args, **kwargs): + def fun(self, arg): self.called = True - self.args = args - self.kwargs = kwargs + self.arg = arg return True def make_assertion(self): assert self.called - assert self.args - assert self.kwargs + assert self.arg self.reset() def reset(self): self.called = False - self.args = None - self.kwargs = None + self.arg = None class TestThreads( TestCase ): @@ -36,10 +34,11 @@ class TestThreads( TestCase ): # test different method types standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs) - for function in ("fun", TestWorker.fun, worker.fun, standalone_func): - worker.call(function, 1, this='that') + for function in (TestWorker.fun, worker.fun, standalone_func): + worker.inq.put((function, 1)) + time.sleep(0.01) worker.make_assertion() # END for each function type - worker.call('quit') + worker.stop_and_join() |