From b72e2704022d889f116e49abf3e1e5d3e3192d3b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 01:00:12 +0200 Subject: Improved pool design and started rough implementation, top down to learn while going. Tests will be written soon for verification, its still quite theoretical --- lib/git/async/channel.py | 49 ++++++--- lib/git/async/graph.py | 36 +++++++ lib/git/async/pool.py | 273 +++++++++++++++++++++++++++++++++++++---------- 3 files changed, 290 insertions(+), 68 deletions(-) create mode 100644 lib/git/async/graph.py (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index c9cbfb87..70daed24 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -7,7 +7,7 @@ from Queue import ( #{ Classes class Channel(object): - """A channel is similar to a system pipe. It has a write end as well as one or + """A channel is similar to a file like object. It has a write end as well as one or more read ends. If Data is in the channel, it can be read, if not the read operation will block until data becomes available. If the channel is closed, any read operation will result in an exception @@ -51,8 +51,8 @@ class WChannel(Channel): :param block: If True, the call will block until there is free space in the channel :param timeout: timeout in seconds for blocking calls. - :raise IOError: when writing into closed file or when writing into a non-blocking - full channel + :raise IOError: when writing into closed file + :raise EOFError: when writing into a non-blocking full channel :note: may block if the channel has a limited capacity""" if self._closed: raise IOError("Cannot write to a closed channel") @@ -60,9 +60,14 @@ class WChannel(Channel): try: self._queue.put(item, block, timeout) except Full: - raise IOError("Capacity of the channel was exeeded") + raise EOFError("Capacity of the channel was exeeded") # END exception handling + def size(self): + """:return: approximate number of items that could be read from the read-ends + of this channel""" + return self._queue.qsize() + def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" @@ -86,22 +91,42 @@ class RChannel(Channel): #{ Interface - def read(self, block=True, timeout=None): - """:return: an item read from the channel + def read(self, count=0, block=True, timeout=None): + """read a list of items read from the channel. The list, as a sequence + of items, is similar to the string of characters returned when reading from + file like objects. + :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the given amount of seconds. - :raise IOError: When reading from an empty channel ( if non-blocking, or - if the channel is still empty after the timeout""" + :return: single item in a list if count is 1, or a list of count items. + If the channel was empty and count was 1, an empty list will be returned. + If count was greater 1, a list with less than count items will be + returned. + If count was < 1, a list with all items that could be read will be + returned.""" # if the channel is closed for writing, we never block if self._wc.closed: block = False - + + out = list() try: - return self._wc._queue.get(block, timeout) + if count == 1: + out.append(self._wc._queue.get(block, timeout)) + elif count < 1: + while True: + out.append(self._wc._queue.get(block, timeout)) + # END for each item + return out + else: + for i in xrange(count): + out.append(self._wc._queue.get(block, timeout)) + # END for each item + # END handle count except Empty: - raise IOError("Error reading from an empty channel") - # END handle reading + pass + # END handle exceptions + return out #} END interface diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py new file mode 100644 index 00000000..0c0a2137 --- /dev/null +++ b/lib/git/async/graph.py @@ -0,0 +1,36 @@ +"""Simplistic implementation of a graph""" + +class Node(object): + """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. + Its not designed to support big graphs, and sports only the functionality + we need""" + __slots__ = ('in_nodes', 'out_nodes') + + +class Graph(object): + """A simple graph implementation, keeping nodes and providing basic access and + editing functions""" + __slots__ = "nodes" + + def __init__(self): + self.nodes = list() + + def add_node(self, node): + """Add a new node to the graph""" + raise NotImplementedError() + + def del_node(self, node): + """Delete a node from the graph""" + raise NotImplementedError() + + def add_edge(self, u, v): + """Add an undirected edge between the given nodes u and v. + :raise ValueError: If the new edge would create a cycle""" + raise NotImplementedError() + + def visit_input_depth_first(self, node, visitor=lambda n: True ): + """Visit all input nodes of the given node, depth first, calling visitor + for each node on our way. If the function returns False, the traversal + will not go any deeper, but continue at the next branch""" + raise NotImplementedError() + diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index f9f7880b..7798d3d4 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,92 +1,146 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from Queue import Queue + +from graph import ( + Graph, + Node + ) + from channel import ( Channel, WChannel, RChannel ) -class Node(object): - """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. - Its not designed to support big graphs, and sports only the functionality - we need""" - __slots__ = ('in_nodes', 'out_nodes') - - -class Graph(object): - """A simple graph implementation, keeping nodes and providing basic access and - editing functions""" - __slots__ = "nodes" - - def add_node(self, node): - pass - - def del_node(self, node): - pass - - def visit_input_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch""" - pass - +import weakref +import sys class TaskNode(Node): """Couples an input channel, an output channel, as well as a processing function together. It may contain additional information on how to handel read-errors from the input channel""" - __slots__ = ('in_rc', 'out_wc', 'fun') + __slots__ = ( 'in_rc', # input read channel + '_out_wc', # output write channel + '_pool_ref', # ref to our pool + '_exc', # exception caught + 'fun', # function to call with items read from in_rc + 'max_chunksize', # maximium amount of items to process per process call + 'apply_single' # apply single items even if multiple where read + ) + + def __init__(self, in_rc, fun, apply_single=True): + self.in_rc = in_rc + self._out_wc = None + self._pool_ref = None + self._exc = None + self.fun = fun + self.max_chunksize = 0 # note set + self.apply_single = apply_single def is_done(self): """:return: True if we are finished processing""" - return self.out_wc.closed + return self._out_wc.closed + + def set_done(self): + """Set ourselves to being done, has we have completed the processing""" + self._out_wc.close() + + def error(self): + """:return: Exception caught during last processing or None""" + return self._exc + def process(self, count=1): + """Process count items and send the result individually to the output channel""" + if self._out_wc is None: + raise IOError("Cannot work in uninitialized task") + + read = self.in_rc.read + if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref(): + read = self.in_rc._read + items = read(count) + + try: + if self.apply_single: + for item in items: + self._out_wc.write(self.fun(item)) + # END for each item + else: + self._out_wc.write(self.fun(items)) + # END handle single apply + except Exception, e: + self._exc = e + self.set_done() + # END exception handling + + # if we didn't get all demanded items, which is also the case if count is 0 + # we have depleted the input channel and are done + if len(items) != count: + self.set_done() + # END handle done state + #{ Configuration + class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. - It acts like a handle to the underlying task""" + It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the item has been read. The function - returns a possibly changed item. If it raises, the exception will be propagated - in an IOError, indicating read-failure - If a function is not provided, the call is effectively uninstalled.""" + def __init__(self, wchannel, task, pool): + RChannel.__init__(self, wchannel) + self._task = task + self._pool = pool + self._pre_cb = None + self._post_cb = None - def set_pre_cb(self, fun = lambda : None): - """Install a callback to call before an item is read from the channel. + def __del__(self): + """Assures that our task will be deleted if we were the last reader""" + del(self._wc) # decrement ref-count + self._pool._del_task_if_orphaned(self._task) + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. If it fails, the read will fail with an IOError If a function is not provided, the call is effectively uninstalled.""" + self._pre_cb = fun + + def set_post_cb(self, fun = lambda item: item): + """Install a callback to call after the items were read. The function + returns a possibly changed item list. If it raises, the exception will be propagated. + If a function is not provided, the call is effectively uninstalled.""" + self._post_cb = fun - def read(block=False, timeout=None): + def read(self, count=1, block=False, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" + if self._pre_cb: + self._pre_cb() + # END pre callback + + ################################################## + self._pool._prepare_processing(self._task, count) + ################################################## + + items = RChannel.read(self, count, block, timeout) + if self._post_cb: + items = self._post_cb(items) #{ Internal - def _read(self, block=False, timeout=None): + def _read(self, count=1, block=False, timeout=None): """Calls the underlying channel's read directly, without triggering the pool""" - return RChannel.read(self, block, timeout) + return RChannel.read(self, count, block, timeout) #} END internal - -class PoolWorker(WorkerThread): - """A worker thread which gets called to deal with Tasks. Tasks provide channls - with actual work, whose result will be send to the tasks output channel""" - - @classmethod - def perform_task(cls, task): - # note : when getting the input channel, be sure not to trigger - # RPoolChannel - pass -class ThreadPool(Graph): +class ThreadPool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -94,23 +148,130 @@ class ThreadPool(Graph): is lazy, as work will only be done once an output is requested. :note: the current implementation returns channels which are meant to be - used only from the main thread""" - __slots__ = ( '_workers', # list of worker threads + used only from the main thread, hence you cannot consume their results + from multiple threads unless you use a task for it.""" + __slots__ = ( '_tasks', # a graph of tasks + '_consumed_tasks', # a list with tasks that are done or had an error + '_workers', # list of worker threads '_queue', # master queue for tasks - '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel + '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list ) - def del_node(self, task): - """Delete the node ( being a task ), but delete the entries in our output channel - cache as well""" + def __init__(self, size=0): + self._tasks = Graph() + self._consumed_tasks = list() + self._workers = list() + self._queue = Queue() + self._ordered_tasks_cache = dict() + + def __del__(self): + raise NotImplementedError("TODO: Proper cleanup") + + #{ Internal + def _queue_feeder_visitor(self, task, count): + """Walk the graph and find tasks that are done for later cleanup, and + queue all others for processing by our worker threads ( if available ).""" + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + + # if the task does not have the required output on its queue, schedule + # it for processing. If we should process all, we don't care about the + # amount as it should process until its all done. + if self._workers: + if count < 1 or task._out_wc.size() < count: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if task.max_chunksize: + chunksize = count / task.max_chunksize + remainder = count - (chunksize * task.max_chunksize) + for i in xrange(chunksize): + queue.put((task.process, chunksize)) + if remainder: + queue.put((task.process, remainder)) + else: + self._queue.put((task.process, count)) + # END handle chunksize + # END handle queuing + else: + # no workers, so we have to do the work ourselves + task.process(count) + # END handle serial mode + + # always walk the whole graph, we want to find consumed tasks + return True + + def _prepare_processing(self, task, count): + """Process the tasks which depend on the given one to be sure the input + channels are filled with data once we process the actual task + + Tasks have two important states: either they are done, or they are done + and have an error, so they are likely not to have finished all their work. + + Either way, we will put them onto a list of tasks to delete them, providng + information about the failed ones. + + Tasks which are not done will be put onto the queue for processing, which + is fine as we walked them depth-first.""" + self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + + # delete consumed tasks to cleanup + for task in self._consumed_tasks: + self.del_task(task) + # END for each task to delete + del(self._consumed_tasks[:]) + + def _del_task_if_orphaned(self, task): + """Check the task, and delete it if it is orphaned""" + if sys.getrefcount(task._out_wc) < 3: + self.del_task(task) + #} END internal + + #{ Interface + + def del_task(self, task): + """Delete the task + Additionally we will remove orphaned tasks, which can be identified if their + output channel is only held by themselves, so no one will ever consume + its items.""" + # now delete our actual node - must set it done os it closes its channels. + # Otherwise further reads of output tasks will block. + # Actually they may still block if anyone wants to read all ... without + # a timeout + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes + task.set_done() + self._tasks.del_node(task) + for t in in_tasks + self._del_task_if_orphaned(t) + # END handle orphans recursively def set_pool_size(self, size=0): """Set the amount of workers to use in this pool. :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads""" + raise NotImplementedError() def add_task(self, task): """Add a new task to be processed. - :return: your task instance with its output channel set. It can be used - to retrieve processed items""" + :return: a read channel to retrieve processed items. If that handle is lost, + the task will be considered orphaned and will be deleted on the next + occasion.""" + # create a write channel for it + wc, rc = Channel() + rc = RPoolChannel(wc, task, self) + task._out_wc = wc + task._pool_ref = weakref.ref(self) + + self._tasks.add_node(task) + + # If the input channel is one of our read channels, we add the relation + ic = task.in_rc + if isinstance(ic, RPoolChannel) and ic._pool is self: + self._tasks.add_edge(ic._task, task) + # END add task relation + + return rc + + #} END interface -- cgit v1.2.1