diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 01:00:12 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 01:00:12 +0200 |
commit | b72e2704022d889f116e49abf3e1e5d3e3192d3b (patch) | |
tree | 65d2f5835a4853e72eb4b3cb5beb1b26aabcba36 /lib/git/async/pool.py | |
parent | ab59f78341f1dd188aaf4c30526f6295c63438b1 (diff) | |
download | gitpython-b72e2704022d889f116e49abf3e1e5d3e3192d3b.tar.gz |
Improved pool design and started rough implementation, top down to learn while going. Tests will be written soon for verification, its still quite theoretical
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 273 |
1 files changed, 217 insertions, 56 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index f9f7880b..7798d3d4 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,92 +1,146 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from Queue import Queue + +from graph import ( + Graph, + Node + ) + from channel import ( Channel, WChannel, RChannel ) -class Node(object): - """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. - Its not designed to support big graphs, and sports only the functionality - we need""" - __slots__ = ('in_nodes', 'out_nodes') - - -class Graph(object): - """A simple graph implementation, keeping nodes and providing basic access and - editing functions""" - __slots__ = "nodes" - - def add_node(self, node): - pass - - def del_node(self, node): - pass - - def visit_input_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch""" - pass - +import weakref +import sys class TaskNode(Node): """Couples an input channel, an output channel, as well as a processing function together. It may contain additional information on how to handel read-errors from the input channel""" - __slots__ = ('in_rc', 'out_wc', 'fun') + __slots__ = ( 'in_rc', # input read channel + '_out_wc', # output write channel + '_pool_ref', # ref to our pool + '_exc', # exception caught + 'fun', # function to call with items read from in_rc + 'max_chunksize', # maximium amount of items to process per process call + 'apply_single' # apply single items even if multiple where read + ) + + def __init__(self, in_rc, fun, apply_single=True): + self.in_rc = in_rc + self._out_wc = None + self._pool_ref = None + self._exc = None + self.fun = fun + self.max_chunksize = 0 # note set + self.apply_single = apply_single def is_done(self): """:return: True if we are finished processing""" - return self.out_wc.closed + return self._out_wc.closed + + def set_done(self): + """Set ourselves to being done, has we have completed the processing""" + self._out_wc.close() + + def error(self): + """:return: Exception caught during last processing or None""" + return self._exc + def process(self, count=1): + """Process count items and send the result individually to the output channel""" + if self._out_wc is None: + raise IOError("Cannot work in uninitialized task") + + read = self.in_rc.read + if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref(): + read = self.in_rc._read + items = read(count) + + try: + if self.apply_single: + for item in items: + self._out_wc.write(self.fun(item)) + # END for each item + else: + self._out_wc.write(self.fun(items)) + # END handle single apply + except Exception, e: + self._exc = e + self.set_done() + # END exception handling + + # if we didn't get all demanded items, which is also the case if count is 0 + # we have depleted the input channel and are done + if len(items) != count: + self.set_done() + # END handle done state + #{ Configuration + class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. - It acts like a handle to the underlying task""" + It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the item has been read. The function - returns a possibly changed item. If it raises, the exception will be propagated - in an IOError, indicating read-failure - If a function is not provided, the call is effectively uninstalled.""" + def __init__(self, wchannel, task, pool): + RChannel.__init__(self, wchannel) + self._task = task + self._pool = pool + self._pre_cb = None + self._post_cb = None - def set_pre_cb(self, fun = lambda : None): - """Install a callback to call before an item is read from the channel. + def __del__(self): + """Assures that our task will be deleted if we were the last reader""" + del(self._wc) # decrement ref-count + self._pool._del_task_if_orphaned(self._task) + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. If it fails, the read will fail with an IOError If a function is not provided, the call is effectively uninstalled.""" + self._pre_cb = fun + + def set_post_cb(self, fun = lambda item: item): + """Install a callback to call after the items were read. The function + returns a possibly changed item list. If it raises, the exception will be propagated. + If a function is not provided, the call is effectively uninstalled.""" + self._post_cb = fun - def read(block=False, timeout=None): + def read(self, count=1, block=False, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" + if self._pre_cb: + self._pre_cb() + # END pre callback + + ################################################## + self._pool._prepare_processing(self._task, count) + ################################################## + + items = RChannel.read(self, count, block, timeout) + if self._post_cb: + items = self._post_cb(items) #{ Internal - def _read(self, block=False, timeout=None): + def _read(self, count=1, block=False, timeout=None): """Calls the underlying channel's read directly, without triggering the pool""" - return RChannel.read(self, block, timeout) + return RChannel.read(self, count, block, timeout) #} END internal - -class PoolWorker(WorkerThread): - """A worker thread which gets called to deal with Tasks. Tasks provide channls - with actual work, whose result will be send to the tasks output channel""" - - @classmethod - def perform_task(cls, task): - # note : when getting the input channel, be sure not to trigger - # RPoolChannel - pass -class ThreadPool(Graph): +class ThreadPool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -94,23 +148,130 @@ class ThreadPool(Graph): is lazy, as work will only be done once an output is requested. :note: the current implementation returns channels which are meant to be - used only from the main thread""" - __slots__ = ( '_workers', # list of worker threads + used only from the main thread, hence you cannot consume their results + from multiple threads unless you use a task for it.""" + __slots__ = ( '_tasks', # a graph of tasks + '_consumed_tasks', # a list with tasks that are done or had an error + '_workers', # list of worker threads '_queue', # master queue for tasks - '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel + '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list ) - def del_node(self, task): - """Delete the node ( being a task ), but delete the entries in our output channel - cache as well""" + def __init__(self, size=0): + self._tasks = Graph() + self._consumed_tasks = list() + self._workers = list() + self._queue = Queue() + self._ordered_tasks_cache = dict() + + def __del__(self): + raise NotImplementedError("TODO: Proper cleanup") + + #{ Internal + def _queue_feeder_visitor(self, task, count): + """Walk the graph and find tasks that are done for later cleanup, and + queue all others for processing by our worker threads ( if available ).""" + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + + # if the task does not have the required output on its queue, schedule + # it for processing. If we should process all, we don't care about the + # amount as it should process until its all done. + if self._workers: + if count < 1 or task._out_wc.size() < count: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if task.max_chunksize: + chunksize = count / task.max_chunksize + remainder = count - (chunksize * task.max_chunksize) + for i in xrange(chunksize): + queue.put((task.process, chunksize)) + if remainder: + queue.put((task.process, remainder)) + else: + self._queue.put((task.process, count)) + # END handle chunksize + # END handle queuing + else: + # no workers, so we have to do the work ourselves + task.process(count) + # END handle serial mode + + # always walk the whole graph, we want to find consumed tasks + return True + + def _prepare_processing(self, task, count): + """Process the tasks which depend on the given one to be sure the input + channels are filled with data once we process the actual task + + Tasks have two important states: either they are done, or they are done + and have an error, so they are likely not to have finished all their work. + + Either way, we will put them onto a list of tasks to delete them, providng + information about the failed ones. + + Tasks which are not done will be put onto the queue for processing, which + is fine as we walked them depth-first.""" + self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + + # delete consumed tasks to cleanup + for task in self._consumed_tasks: + self.del_task(task) + # END for each task to delete + del(self._consumed_tasks[:]) + + def _del_task_if_orphaned(self, task): + """Check the task, and delete it if it is orphaned""" + if sys.getrefcount(task._out_wc) < 3: + self.del_task(task) + #} END internal + + #{ Interface + + def del_task(self, task): + """Delete the task + Additionally we will remove orphaned tasks, which can be identified if their + output channel is only held by themselves, so no one will ever consume + its items.""" + # now delete our actual node - must set it done os it closes its channels. + # Otherwise further reads of output tasks will block. + # Actually they may still block if anyone wants to read all ... without + # a timeout + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes + task.set_done() + self._tasks.del_node(task) + for t in in_tasks + self._del_task_if_orphaned(t) + # END handle orphans recursively def set_pool_size(self, size=0): """Set the amount of workers to use in this pool. :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads""" + raise NotImplementedError() def add_task(self, task): """Add a new task to be processed. - :return: your task instance with its output channel set. It can be used - to retrieve processed items""" + :return: a read channel to retrieve processed items. If that handle is lost, + the task will be considered orphaned and will be deleted on the next + occasion.""" + # create a write channel for it + wc, rc = Channel() + rc = RPoolChannel(wc, task, self) + task._out_wc = wc + task._pool_ref = weakref.ref(self) + + self._tasks.add_node(task) + + # If the input channel is one of our read channels, we add the relation + ic = task.in_rc + if isinstance(ic, RPoolChannel) and ic._pool is self: + self._tasks.add_edge(ic._task, task) + # END add task relation + + return rc + + #} END interface |