summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-05 20:03:09 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-05 20:03:09 +0200
commitab59f78341f1dd188aaf4c30526f6295c63438b1 (patch)
tree697bc8a11201494d560f1ea65fd77b7ef09b238e /lib/git/async
parent61138f2ece0cb864b933698174315c34a78835d1 (diff)
downloadgitpython-ab59f78341f1dd188aaf4c30526f6295c63438b1.tar.gz
Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/__init__.py1
-rw-r--r--lib/git/async/channel.py108
-rw-r--r--lib/git/async/pool.py116
-rw-r--r--lib/git/async/thread.py203
4 files changed, 428 insertions, 0 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py
new file mode 100644
index 00000000..89b9eb47
--- /dev/null
+++ b/lib/git/async/__init__.py
@@ -0,0 +1 @@
+"""Initialize the multi-processing package"""
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
new file mode 100644
index 00000000..c9cbfb87
--- /dev/null
+++ b/lib/git/async/channel.py
@@ -0,0 +1,108 @@
+"""Contains a queue based channel implementation"""
+from Queue import (
+ Queue,
+ Empty,
+ Full
+ )
+
+#{ Classes
+class Channel(object):
+ """A channel is similar to a system pipe. It has a write end as well as one or
+ more read ends. If Data is in the channel, it can be read, if not the read operation
+ will block until data becomes available.
+ If the channel is closed, any read operation will result in an exception
+
+ This base class is not instantiated directly, but instead serves as constructor
+ for RWChannel pairs.
+
+ Create a new channel """
+ __slots__ = tuple()
+
+ def __new__(cls, *args):
+ if cls is Channel:
+ max_items = 0
+ if len(args) == 1:
+ max_items = args[0]
+ if len(args) > 1:
+ raise ValueError("Specify not more than the number of items the channel should take")
+ wc = WChannel(max_items)
+ rc = RChannel(wc)
+ return wc, rc
+ # END constructor mode
+ return object.__new__(cls)
+
+
+class WChannel(Channel):
+ """The write end of a channel"""
+ __slots__ = ('_closed', '_queue')
+
+ def __init__(self, max_items=0):
+ """initialize this instance, able to hold max_items at once
+ Write calls will block if the channel is full, until someone reads from it"""
+ self._closed = False
+ self._queue = Queue(max_items)
+
+
+ #{ Interface
+ def write(self, item, block=True, timeout=None):
+ """Send an item into the channel, it can be read from the read end of the
+ channel accordingly
+ :param item: Item to send
+ :param block: If True, the call will block until there is free space in the
+ channel
+ :param timeout: timeout in seconds for blocking calls.
+ :raise IOError: when writing into closed file or when writing into a non-blocking
+ full channel
+ :note: may block if the channel has a limited capacity"""
+ if self._closed:
+ raise IOError("Cannot write to a closed channel")
+
+ try:
+ self._queue.put(item, block, timeout)
+ except Full:
+ raise IOError("Capacity of the channel was exeeded")
+ # END exception handling
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ self._closed = True
+
+ @property
+ def closed(self):
+ """:return: True if the channel was closed"""
+ return self._closed
+ #} END interface
+
+
+class RChannel(Channel):
+ """The read-end of a corresponding write channel"""
+ __slots__ = '_wc'
+
+ def __init__(self, wchannel):
+ """Initialize this instance from its parent write channel"""
+ self._wc = wchannel
+
+
+ #{ Interface
+
+ def read(self, block=True, timeout=None):
+ """:return: an item read from the channel
+ :param block: if True, the call will block until an item is available
+ :param timeout: if positive and block is True, it will block only for the
+ given amount of seconds.
+ :raise IOError: When reading from an empty channel ( if non-blocking, or
+ if the channel is still empty after the timeout"""
+ # if the channel is closed for writing, we never block
+ if self._wc.closed:
+ block = False
+
+ try:
+ return self._wc._queue.get(block, timeout)
+ except Empty:
+ raise IOError("Error reading from an empty channel")
+ # END handle reading
+
+ #} END interface
+
+#} END classes
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
new file mode 100644
index 00000000..f9f7880b
--- /dev/null
+++ b/lib/git/async/pool.py
@@ -0,0 +1,116 @@
+"""Implementation of a thread-pool working with channels"""
+from thread import WorkerThread
+from channel import (
+ Channel,
+ WChannel,
+ RChannel
+ )
+
+class Node(object):
+ """A quick and dirty to the point implementation of a simple, and slow ascyclic graph.
+ Its not designed to support big graphs, and sports only the functionality
+ we need"""
+ __slots__ = ('in_nodes', 'out_nodes')
+
+
+class Graph(object):
+ """A simple graph implementation, keeping nodes and providing basic access and
+ editing functions"""
+ __slots__ = "nodes"
+
+ def add_node(self, node):
+ pass
+
+ def del_node(self, node):
+ pass
+
+ def visit_input_depth_first(self, node, visitor=lambda n: True ):
+ """Visit all input nodes of the given node, depth first, calling visitor
+ for each node on our way. If the function returns False, the traversal
+ will not go any deeper, but continue at the next branch"""
+ pass
+
+
+class TaskNode(Node):
+ """Couples an input channel, an output channel, as well as a processing function
+ together.
+ It may contain additional information on how to handel read-errors from the
+ input channel"""
+ __slots__ = ('in_rc', 'out_wc', 'fun')
+
+ def is_done(self):
+ """:return: True if we are finished processing"""
+ return self.out_wc.closed
+
+
+class RPoolChannel(RChannel):
+ """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
+ before and after an item is to be read.
+
+ It acts like a handle to the underlying task"""
+ __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
+
+ def set_post_cb(self, fun = lambda item: item):
+ """Install a callback to call after the item has been read. The function
+ returns a possibly changed item. If it raises, the exception will be propagated
+ in an IOError, indicating read-failure
+ If a function is not provided, the call is effectively uninstalled."""
+
+ def set_pre_cb(self, fun = lambda : None):
+ """Install a callback to call before an item is read from the channel.
+ If it fails, the read will fail with an IOError
+ If a function is not provided, the call is effectively uninstalled."""
+
+ def read(block=False, timeout=None):
+ """Read an item that was processed by one of our threads
+ :note: Triggers task dependency handling needed to provide the necessary
+ input"""
+
+ #{ Internal
+ def _read(self, block=False, timeout=None):
+ """Calls the underlying channel's read directly, without triggering
+ the pool"""
+ return RChannel.read(self, block, timeout)
+
+ #} END internal
+
+
+class PoolWorker(WorkerThread):
+ """A worker thread which gets called to deal with Tasks. Tasks provide channls
+ with actual work, whose result will be send to the tasks output channel"""
+
+ @classmethod
+ def perform_task(cls, task):
+ # note : when getting the input channel, be sure not to trigger
+ # RPoolChannel
+ pass
+
+
+class ThreadPool(Graph):
+ """A thread pool maintains a set of one or more worker threads, but supports
+ a fully serial mode in which case the amount of threads is zero.
+
+ Work is distributed via Channels, which form a dependency graph. The evaluation
+ is lazy, as work will only be done once an output is requested.
+
+ :note: the current implementation returns channels which are meant to be
+ used only from the main thread"""
+ __slots__ = ( '_workers', # list of worker threads
+ '_queue', # master queue for tasks
+ '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel
+ )
+
+ def del_node(self, task):
+ """Delete the node ( being a task ), but delete the entries in our output channel
+ cache as well"""
+
+
+ def set_pool_size(self, size=0):
+ """Set the amount of workers to use in this pool.
+ :param size: if 0, the pool will do all work itself in the calling thread,
+ otherwise the work will be distributed among the given amount of threads"""
+
+ def add_task(self, task):
+ """Add a new task to be processed.
+ :return: your task instance with its output channel set. It can be used
+ to retrieve processed items"""
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
new file mode 100644
index 00000000..3938666a
--- /dev/null
+++ b/lib/git/async/thread.py
@@ -0,0 +1,203 @@
+# -*- coding: utf-8 -*-
+"""Module with threading utilities"""
+__docformat__ = "restructuredtext"
+import threading
+import inspect
+import Queue
+
+#{ Decorators
+
+def do_terminate_threads(whitelist=list()):
+ """Simple function which terminates all of our threads
+ :param whitelist: If whitelist is given, only the given threads will be terminated"""
+ for t in threading.enumerate():
+ if not isinstance(t, TerminatableThread):
+ continue
+ if whitelist and t not in whitelist:
+ continue
+ if isinstance(t, WorkerThread):
+ t.inq.put(t.quit)
+ # END worker special handling
+ t.stop_and_join()
+ # END for each thread
+
+def terminate_threads( func ):
+ """Kills all worker threads the method has created by sending the quit signal.
+ This takes over in case of an error in the main function"""
+ def wrapper(*args, **kwargs):
+ cur_threads = set(threading.enumerate())
+ try:
+ return func(*args, **kwargs)
+ finally:
+ do_terminate_threads(set(threading.enumerate()) - cur_threads)
+ # END finally shutdown threads
+ # END wrapper
+ wrapper.__name__ = func.__name__
+ return wrapper
+
+#} END decorators
+
+#{ Classes
+
+class TerminatableThread(threading.Thread):
+ """A simple thread able to terminate itself on behalf of the user.
+
+ Terminate a thread as follows:
+
+ t.stop_and_join()
+
+ Derived classes call _should_terminate() to determine whether they should
+ abort gracefully
+ """
+ __slots__ = '_terminate'
+
+ def __init__(self):
+ super(TerminatableThread, self).__init__()
+ self._terminate = False
+
+
+ #{ Subclass Interface
+ def _should_terminate(self):
+ """:return: True if this thread should terminate its operation immediately"""
+ return self._terminate
+
+ def _terminated(self):
+ """Called once the thread terminated. Its called in the main thread
+ and may perform cleanup operations"""
+ pass
+
+ def start(self):
+ """Start the thread and return self"""
+ super(TerminatableThread, self).start()
+ return self
+
+ #} END subclass interface
+
+ #{ Interface
+
+ def stop_and_join(self):
+ """Ask the thread to stop its operation and wait for it to terminate
+ :note: Depending on the implenetation, this might block a moment"""
+ self._terminate = True
+ self.join()
+ self._terminated()
+ #} END interface
+
+
+class WorkerThread(TerminatableThread):
+ """
+ This base allows to call functions on class instances natively and retrieve
+ their results asynchronously using a queue.
+ The thread runs forever unless it receives the terminate signal using
+ its task queue.
+
+ Tasks could be anything, but should usually be class methods and arguments to
+ allow the following:
+
+ inq = Queue()
+ outq = Queue()
+ w = WorkerThread(inq, outq)
+ w.start()
+ inq.put((WorkerThread.<method>, args, kwargs))
+ res = outq.get()
+
+ finally we call quit to terminate asap.
+
+ alternatively, you can make a call more intuitively - the output is the output queue
+ allowing you to get the result right away or later
+ w.call(arg, kwarg='value').get()
+
+ inq.put(WorkerThread.quit)
+ w.join()
+
+ You may provide the following tuples as task:
+ t[0] = class method, function or instance method
+ t[1] = optional, tuple or list of arguments to pass to the routine
+ t[2] = optional, dictionary of keyword arguments to pass to the routine
+ """
+ __slots__ = ('inq', 'outq')
+
+ class InvalidRoutineError(Exception):
+ """Class sent as return value in case of an error"""
+
+ def __init__(self, inq = None, outq = None):
+ super(WorkerThread, self).__init__()
+ self.inq = inq or Queue.Queue()
+ self.outq = outq or Queue.Queue()
+
+ def call(self, function, *args, **kwargs):
+ """Method that makes the call to the worker using the input queue,
+ returning our output queue
+
+ :param funciton: can be a standalone function unrelated to this class,
+ a class method of this class or any instance method.
+ If it is a string, it will be considered a function residing on this instance
+ :param args: arguments to pass to function
+ :parma **kwargs: kwargs to pass to function"""
+ self.inq.put((function, args, kwargs))
+ return self.outq
+
+ def wait_until_idle(self):
+ """wait until the input queue is empty, in the meanwhile, take all
+ results off the output queue."""
+ while not self.inq.empty():
+ try:
+ self.outq.get(False)
+ except Queue.Empty:
+ continue
+ # END while there are tasks on the queue
+
+ def run(self):
+ """Process input tasks until we receive the quit signal"""
+ while True:
+ if self._should_terminate():
+ break
+ # END check for stop request
+ routine = self.__class__.quit
+ args = tuple()
+ kwargs = dict()
+ tasktuple = self.inq.get()
+
+ if isinstance(tasktuple, (tuple, list)):
+ if len(tasktuple) == 3:
+ routine, args, kwargs = tasktuple
+ elif len(tasktuple) == 2:
+ routine, args = tasktuple
+ elif len(tasktuple) == 1:
+ routine = tasktuple[0]
+ # END tasktuple length check
+ elif inspect.isroutine(tasktuple):
+ routine = tasktuple
+ # END tasktuple handling
+
+ try:
+ rval = None
+ if inspect.ismethod(routine):
+ if routine.im_self is None:
+ rval = routine(self, *args, **kwargs)
+ else:
+ rval = routine(*args, **kwargs)
+ elif inspect.isroutine(routine):
+ rval = routine(*args, **kwargs)
+ elif isinstance(routine, basestring) and hasattr(self, routine):
+ rval = getattr(self, routine)(*args, **kwargs)
+ else:
+ # ignore unknown items
+ print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
+ self.outq.put(self.InvalidRoutineError(routine))
+ break
+ # END make routine call
+ self.outq.put(rval)
+ except StopIteration:
+ break
+ except Exception,e:
+ print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
+ self.outq.put(e)
+ # END routine exception handling
+ # END endless loop
+
+ def quit(self):
+ raise StopIteration
+
+
+#} END classes