summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 01:00:12 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-06 01:00:12 +0200
commitb72e2704022d889f116e49abf3e1e5d3e3192d3b (patch)
tree65d2f5835a4853e72eb4b3cb5beb1b26aabcba36 /lib/git/async/pool.py
parentab59f78341f1dd188aaf4c30526f6295c63438b1 (diff)
downloadgitpython-b72e2704022d889f116e49abf3e1e5d3e3192d3b.tar.gz
Improved pool design and started rough implementation, top down to learn while going. Tests will be written soon for verification, its still quite theoretical
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py273
1 files changed, 217 insertions, 56 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index f9f7880b..7798d3d4 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,92 +1,146 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
+from Queue import Queue
+
+from graph import (
+ Graph,
+ Node
+ )
+
from channel import (
Channel,
WChannel,
RChannel
)
-class Node(object):
- """A quick and dirty to the point implementation of a simple, and slow ascyclic graph.
- Its not designed to support big graphs, and sports only the functionality
- we need"""
- __slots__ = ('in_nodes', 'out_nodes')
-
-
-class Graph(object):
- """A simple graph implementation, keeping nodes and providing basic access and
- editing functions"""
- __slots__ = "nodes"
-
- def add_node(self, node):
- pass
-
- def del_node(self, node):
- pass
-
- def visit_input_depth_first(self, node, visitor=lambda n: True ):
- """Visit all input nodes of the given node, depth first, calling visitor
- for each node on our way. If the function returns False, the traversal
- will not go any deeper, but continue at the next branch"""
- pass
-
+import weakref
+import sys
class TaskNode(Node):
"""Couples an input channel, an output channel, as well as a processing function
together.
It may contain additional information on how to handel read-errors from the
input channel"""
- __slots__ = ('in_rc', 'out_wc', 'fun')
+ __slots__ = ( 'in_rc', # input read channel
+ '_out_wc', # output write channel
+ '_pool_ref', # ref to our pool
+ '_exc', # exception caught
+ 'fun', # function to call with items read from in_rc
+ 'max_chunksize', # maximium amount of items to process per process call
+ 'apply_single' # apply single items even if multiple where read
+ )
+
+ def __init__(self, in_rc, fun, apply_single=True):
+ self.in_rc = in_rc
+ self._out_wc = None
+ self._pool_ref = None
+ self._exc = None
+ self.fun = fun
+ self.max_chunksize = 0 # note set
+ self.apply_single = apply_single
def is_done(self):
""":return: True if we are finished processing"""
- return self.out_wc.closed
+ return self._out_wc.closed
+
+ def set_done(self):
+ """Set ourselves to being done, has we have completed the processing"""
+ self._out_wc.close()
+
+ def error(self):
+ """:return: Exception caught during last processing or None"""
+ return self._exc
+ def process(self, count=1):
+ """Process count items and send the result individually to the output channel"""
+ if self._out_wc is None:
+ raise IOError("Cannot work in uninitialized task")
+
+ read = self.in_rc.read
+ if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref():
+ read = self.in_rc._read
+ items = read(count)
+
+ try:
+ if self.apply_single:
+ for item in items:
+ self._out_wc.write(self.fun(item))
+ # END for each item
+ else:
+ self._out_wc.write(self.fun(items))
+ # END handle single apply
+ except Exception, e:
+ self._exc = e
+ self.set_done()
+ # END exception handling
+
+ # if we didn't get all demanded items, which is also the case if count is 0
+ # we have depleted the input channel and are done
+ if len(items) != count:
+ self.set_done()
+ # END handle done state
+ #{ Configuration
+
class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
- It acts like a handle to the underlying task"""
+ It acts like a handle to the underlying task in the pool."""
__slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
- def set_post_cb(self, fun = lambda item: item):
- """Install a callback to call after the item has been read. The function
- returns a possibly changed item. If it raises, the exception will be propagated
- in an IOError, indicating read-failure
- If a function is not provided, the call is effectively uninstalled."""
+ def __init__(self, wchannel, task, pool):
+ RChannel.__init__(self, wchannel)
+ self._task = task
+ self._pool = pool
+ self._pre_cb = None
+ self._post_cb = None
- def set_pre_cb(self, fun = lambda : None):
- """Install a callback to call before an item is read from the channel.
+ def __del__(self):
+ """Assures that our task will be deleted if we were the last reader"""
+ del(self._wc) # decrement ref-count
+ self._pool._del_task_if_orphaned(self._task)
+
+ def set_pre_cb(self, fun = lambda count: None):
+ """Install a callback to call with the item count to be read before any
+ item is actually read from the channel.
If it fails, the read will fail with an IOError
If a function is not provided, the call is effectively uninstalled."""
+ self._pre_cb = fun
+
+ def set_post_cb(self, fun = lambda item: item):
+ """Install a callback to call after the items were read. The function
+ returns a possibly changed item list. If it raises, the exception will be propagated.
+ If a function is not provided, the call is effectively uninstalled."""
+ self._post_cb = fun
- def read(block=False, timeout=None):
+ def read(self, count=1, block=False, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
+ if self._pre_cb:
+ self._pre_cb()
+ # END pre callback
+
+ ##################################################
+ self._pool._prepare_processing(self._task, count)
+ ##################################################
+
+ items = RChannel.read(self, count, block, timeout)
+ if self._post_cb:
+ items = self._post_cb(items)
#{ Internal
- def _read(self, block=False, timeout=None):
+ def _read(self, count=1, block=False, timeout=None):
"""Calls the underlying channel's read directly, without triggering
the pool"""
- return RChannel.read(self, block, timeout)
+ return RChannel.read(self, count, block, timeout)
#} END internal
-
-class PoolWorker(WorkerThread):
- """A worker thread which gets called to deal with Tasks. Tasks provide channls
- with actual work, whose result will be send to the tasks output channel"""
-
- @classmethod
- def perform_task(cls, task):
- # note : when getting the input channel, be sure not to trigger
- # RPoolChannel
- pass
-class ThreadPool(Graph):
+class ThreadPool(object):
"""A thread pool maintains a set of one or more worker threads, but supports
a fully serial mode in which case the amount of threads is zero.
@@ -94,23 +148,130 @@ class ThreadPool(Graph):
is lazy, as work will only be done once an output is requested.
:note: the current implementation returns channels which are meant to be
- used only from the main thread"""
- __slots__ = ( '_workers', # list of worker threads
+ used only from the main thread, hence you cannot consume their results
+ from multiple threads unless you use a task for it."""
+ __slots__ = ( '_tasks', # a graph of tasks
+ '_consumed_tasks', # a list with tasks that are done or had an error
+ '_workers', # list of worker threads
'_queue', # master queue for tasks
- '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel
+ '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list
)
- def del_node(self, task):
- """Delete the node ( being a task ), but delete the entries in our output channel
- cache as well"""
+ def __init__(self, size=0):
+ self._tasks = Graph()
+ self._consumed_tasks = list()
+ self._workers = list()
+ self._queue = Queue()
+ self._ordered_tasks_cache = dict()
+
+ def __del__(self):
+ raise NotImplementedError("TODO: Proper cleanup")
+
+ #{ Internal
+ def _queue_feeder_visitor(self, task, count):
+ """Walk the graph and find tasks that are done for later cleanup, and
+ queue all others for processing by our worker threads ( if available )."""
+ if task.error() or task.is_done():
+ self._consumed_tasks.append(task)
+
+ # if the task does not have the required output on its queue, schedule
+ # it for processing. If we should process all, we don't care about the
+ # amount as it should process until its all done.
+ if self._workers:
+ if count < 1 or task._out_wc.size() < count:
+ # respect the chunk size, and split the task up if we want
+ # to process too much. This can be defined per task
+ queue = self._queue
+ if task.max_chunksize:
+ chunksize = count / task.max_chunksize
+ remainder = count - (chunksize * task.max_chunksize)
+ for i in xrange(chunksize):
+ queue.put((task.process, chunksize))
+ if remainder:
+ queue.put((task.process, remainder))
+ else:
+ self._queue.put((task.process, count))
+ # END handle chunksize
+ # END handle queuing
+ else:
+ # no workers, so we have to do the work ourselves
+ task.process(count)
+ # END handle serial mode
+
+ # always walk the whole graph, we want to find consumed tasks
+ return True
+
+ def _prepare_processing(self, task, count):
+ """Process the tasks which depend on the given one to be sure the input
+ channels are filled with data once we process the actual task
+
+ Tasks have two important states: either they are done, or they are done
+ and have an error, so they are likely not to have finished all their work.
+
+ Either way, we will put them onto a list of tasks to delete them, providng
+ information about the failed ones.
+
+ Tasks which are not done will be put onto the queue for processing, which
+ is fine as we walked them depth-first."""
+ self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
+
+ # delete consumed tasks to cleanup
+ for task in self._consumed_tasks:
+ self.del_task(task)
+ # END for each task to delete
+ del(self._consumed_tasks[:])
+
+ def _del_task_if_orphaned(self, task):
+ """Check the task, and delete it if it is orphaned"""
+ if sys.getrefcount(task._out_wc) < 3:
+ self.del_task(task)
+ #} END internal
+
+ #{ Interface
+
+ def del_task(self, task):
+ """Delete the task
+ Additionally we will remove orphaned tasks, which can be identified if their
+ output channel is only held by themselves, so no one will ever consume
+ its items."""
+ # now delete our actual node - must set it done os it closes its channels.
+ # Otherwise further reads of output tasks will block.
+ # Actually they may still block if anyone wants to read all ... without
+ # a timeout
+ # keep its input nodes as we check whether they were orphaned
+ in_tasks = task.in_nodes
+ task.set_done()
+ self._tasks.del_node(task)
+ for t in in_tasks
+ self._del_task_if_orphaned(t)
+ # END handle orphans recursively
def set_pool_size(self, size=0):
"""Set the amount of workers to use in this pool.
:param size: if 0, the pool will do all work itself in the calling thread,
otherwise the work will be distributed among the given amount of threads"""
+ raise NotImplementedError()
def add_task(self, task):
"""Add a new task to be processed.
- :return: your task instance with its output channel set. It can be used
- to retrieve processed items"""
+ :return: a read channel to retrieve processed items. If that handle is lost,
+ the task will be considered orphaned and will be deleted on the next
+ occasion."""
+ # create a write channel for it
+ wc, rc = Channel()
+ rc = RPoolChannel(wc, task, self)
+ task._out_wc = wc
+ task._pool_ref = weakref.ref(self)
+
+ self._tasks.add_node(task)
+
+ # If the input channel is one of our read channels, we add the relation
+ ic = task.in_rc
+ if isinstance(ic, RPoolChannel) and ic._pool is self:
+ self._tasks.add_edge(ic._task, task)
+ # END add task relation
+
+ return rc
+
+ #} END interface