summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-05 20:03:09 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-05 20:03:09 +0200
commitab59f78341f1dd188aaf4c30526f6295c63438b1 (patch)
tree697bc8a11201494d560f1ea65fd77b7ef09b238e /lib/git/async/pool.py
parent61138f2ece0cb864b933698174315c34a78835d1 (diff)
downloadgitpython-ab59f78341f1dd188aaf4c30526f6295c63438b1.tar.gz
Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py116
1 files changed, 116 insertions, 0 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
new file mode 100644
index 00000000..f9f7880b
--- /dev/null
+++ b/lib/git/async/pool.py
@@ -0,0 +1,116 @@
+"""Implementation of a thread-pool working with channels"""
+from thread import WorkerThread
+from channel import (
+ Channel,
+ WChannel,
+ RChannel
+ )
+
+class Node(object):
+ """A quick and dirty to the point implementation of a simple, and slow ascyclic graph.
+ Its not designed to support big graphs, and sports only the functionality
+ we need"""
+ __slots__ = ('in_nodes', 'out_nodes')
+
+
+class Graph(object):
+ """A simple graph implementation, keeping nodes and providing basic access and
+ editing functions"""
+ __slots__ = "nodes"
+
+ def add_node(self, node):
+ pass
+
+ def del_node(self, node):
+ pass
+
+ def visit_input_depth_first(self, node, visitor=lambda n: True ):
+ """Visit all input nodes of the given node, depth first, calling visitor
+ for each node on our way. If the function returns False, the traversal
+ will not go any deeper, but continue at the next branch"""
+ pass
+
+
+class TaskNode(Node):
+ """Couples an input channel, an output channel, as well as a processing function
+ together.
+ It may contain additional information on how to handel read-errors from the
+ input channel"""
+ __slots__ = ('in_rc', 'out_wc', 'fun')
+
+ def is_done(self):
+ """:return: True if we are finished processing"""
+ return self.out_wc.closed
+
+
+class RPoolChannel(RChannel):
+ """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
+ before and after an item is to be read.
+
+ It acts like a handle to the underlying task"""
+ __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
+
+ def set_post_cb(self, fun = lambda item: item):
+ """Install a callback to call after the item has been read. The function
+ returns a possibly changed item. If it raises, the exception will be propagated
+ in an IOError, indicating read-failure
+ If a function is not provided, the call is effectively uninstalled."""
+
+ def set_pre_cb(self, fun = lambda : None):
+ """Install a callback to call before an item is read from the channel.
+ If it fails, the read will fail with an IOError
+ If a function is not provided, the call is effectively uninstalled."""
+
+ def read(block=False, timeout=None):
+ """Read an item that was processed by one of our threads
+ :note: Triggers task dependency handling needed to provide the necessary
+ input"""
+
+ #{ Internal
+ def _read(self, block=False, timeout=None):
+ """Calls the underlying channel's read directly, without triggering
+ the pool"""
+ return RChannel.read(self, block, timeout)
+
+ #} END internal
+
+
+class PoolWorker(WorkerThread):
+ """A worker thread which gets called to deal with Tasks. Tasks provide channls
+ with actual work, whose result will be send to the tasks output channel"""
+
+ @classmethod
+ def perform_task(cls, task):
+ # note : when getting the input channel, be sure not to trigger
+ # RPoolChannel
+ pass
+
+
+class ThreadPool(Graph):
+ """A thread pool maintains a set of one or more worker threads, but supports
+ a fully serial mode in which case the amount of threads is zero.
+
+ Work is distributed via Channels, which form a dependency graph. The evaluation
+ is lazy, as work will only be done once an output is requested.
+
+ :note: the current implementation returns channels which are meant to be
+ used only from the main thread"""
+ __slots__ = ( '_workers', # list of worker threads
+ '_queue', # master queue for tasks
+ '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel
+ )
+
+ def del_node(self, task):
+ """Delete the node ( being a task ), but delete the entries in our output channel
+ cache as well"""
+
+
+ def set_pool_size(self, size=0):
+ """Set the amount of workers to use in this pool.
+ :param size: if 0, the pool will do all work itself in the calling thread,
+ otherwise the work will be distributed among the given amount of threads"""
+
+ def add_task(self, task):
+ """Add a new task to be processed.
+ :return: your task instance with its output channel set. It can be used
+ to retrieve processed items"""