summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/__init__.py1
-rw-r--r--lib/git/async/channel.py108
-rw-r--r--lib/git/async/pool.py116
-rw-r--r--lib/git/async/thread.py203
4 files changed, 428 insertions, 0 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py
new file mode 100644
index 00000000..89b9eb47
--- /dev/null
+++ b/lib/git/async/__init__.py
@@ -0,0 +1 @@
+"""Initialize the multi-processing package"""
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
new file mode 100644
index 00000000..c9cbfb87
--- /dev/null
+++ b/lib/git/async/channel.py
@@ -0,0 +1,108 @@
+"""Contains a queue based channel implementation"""
+from Queue import (
+ Queue,
+ Empty,
+ Full
+ )
+
+#{ Classes
+class Channel(object):
+ """A channel is similar to a system pipe. It has a write end as well as one or
+ more read ends. If Data is in the channel, it can be read, if not the read operation
+ will block until data becomes available.
+ If the channel is closed, any read operation will result in an exception
+
+ This base class is not instantiated directly, but instead serves as constructor
+ for RWChannel pairs.
+
+ Create a new channel """
+ __slots__ = tuple()
+
+ def __new__(cls, *args):
+ if cls is Channel:
+ max_items = 0
+ if len(args) == 1:
+ max_items = args[0]
+ if len(args) > 1:
+ raise ValueError("Specify not more than the number of items the channel should take")
+ wc = WChannel(max_items)
+ rc = RChannel(wc)
+ return wc, rc
+ # END constructor mode
+ return object.__new__(cls)
+
+
+class WChannel(Channel):
+ """The write end of a channel"""
+ __slots__ = ('_closed', '_queue')
+
+ def __init__(self, max_items=0):
+ """initialize this instance, able to hold max_items at once
+ Write calls will block if the channel is full, until someone reads from it"""
+ self._closed = False
+ self._queue = Queue(max_items)
+
+
+ #{ Interface
+ def write(self, item, block=True, timeout=None):
+ """Send an item into the channel, it can be read from the read end of the
+ channel accordingly
+ :param item: Item to send
+ :param block: If True, the call will block until there is free space in the
+ channel
+ :param timeout: timeout in seconds for blocking calls.
+ :raise IOError: when writing into closed file or when writing into a non-blocking
+ full channel
+ :note: may block if the channel has a limited capacity"""
+ if self._closed:
+ raise IOError("Cannot write to a closed channel")
+
+ try:
+ self._queue.put(item, block, timeout)
+ except Full:
+ raise IOError("Capacity of the channel was exeeded")
+ # END exception handling
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ self._closed = True
+
+ @property
+ def closed(self):
+ """:return: True if the channel was closed"""
+ return self._closed
+ #} END interface
+
+
+class RChannel(Channel):
+ """The read-end of a corresponding write channel"""
+ __slots__ = '_wc'
+
+ def __init__(self, wchannel):
+ """Initialize this instance from its parent write channel"""
+ self._wc = wchannel
+
+
+ #{ Interface
+
+ def read(self, block=True, timeout=None):
+ """:return: an item read from the channel
+ :param block: if True, the call will block until an item is available
+ :param timeout: if positive and block is True, it will block only for the
+ given amount of seconds.
+ :raise IOError: When reading from an empty channel ( if non-blocking, or
+ if the channel is still empty after the timeout"""
+ # if the channel is closed for writing, we never block
+ if self._wc.closed:
+ block = False
+
+ try:
+ return self._wc._queue.get(block, timeout)
+ except Empty:
+ raise IOError("Error reading from an empty channel")
+ # END handle reading
+
+ #} END interface
+
+#} END classes
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
new file mode 100644
index 00000000..f9f7880b
--- /dev/null
+++ b/lib/git/async/pool.py
@@ -0,0 +1,116 @@
+"""Implementation of a thread-pool working with channels"""
+from thread import WorkerThread
+from channel import (
+ Channel,
+ WChannel,
+ RChannel
+ )
+
+class Node(object):
+ """A quick and dirty to the point implementation of a simple, and slow ascyclic graph.
+ Its not designed to support big graphs, and sports only the functionality
+ we need"""
+ __slots__ = ('in_nodes', 'out_nodes')
+
+
+class Graph(object):
+ """A simple graph implementation, keeping nodes and providing basic access and
+ editing functions"""
+ __slots__ = "nodes"
+
+ def add_node(self, node):
+ pass
+
+ def del_node(self, node):
+ pass
+
+ def visit_input_depth_first(self, node, visitor=lambda n: True ):
+ """Visit all input nodes of the given node, depth first, calling visitor
+ for each node on our way. If the function returns False, the traversal
+ will not go any deeper, but continue at the next branch"""
+ pass
+
+
+class TaskNode(Node):
+ """Couples an input channel, an output channel, as well as a processing function
+ together.
+ It may contain additional information on how to handel read-errors from the
+ input channel"""
+ __slots__ = ('in_rc', 'out_wc', 'fun')
+
+ def is_done(self):
+ """:return: True if we are finished processing"""
+ return self.out_wc.closed
+
+
+class RPoolChannel(RChannel):
+ """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
+ before and after an item is to be read.
+
+ It acts like a handle to the underlying task"""
+ __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
+
+ def set_post_cb(self, fun = lambda item: item):
+ """Install a callback to call after the item has been read. The function
+ returns a possibly changed item. If it raises, the exception will be propagated
+ in an IOError, indicating read-failure
+ If a function is not provided, the call is effectively uninstalled."""
+
+ def set_pre_cb(self, fun = lambda : None):
+ """Install a callback to call before an item is read from the channel.
+ If it fails, the read will fail with an IOError
+ If a function is not provided, the call is effectively uninstalled."""
+
+ def read(block=False, timeout=None):
+ """Read an item that was processed by one of our threads
+ :note: Triggers task dependency handling needed to provide the necessary
+ input"""
+
+ #{ Internal
+ def _read(self, block=False, timeout=None):
+ """Calls the underlying channel's read directly, without triggering
+ the pool"""
+ return RChannel.read(self, block, timeout)
+
+ #} END internal
+
+
+class PoolWorker(WorkerThread):
+ """A worker thread which gets called to deal with Tasks. Tasks provide channls
+ with actual work, whose result will be send to the tasks output channel"""
+
+ @classmethod
+ def perform_task(cls, task):
+ # note : when getting the input channel, be sure not to trigger
+ # RPoolChannel
+ pass
+
+
+class ThreadPool(Graph):
+ """A thread pool maintains a set of one or more worker threads, but supports
+ a fully serial mode in which case the amount of threads is zero.
+
+ Work is distributed via Channels, which form a dependency graph. The evaluation
+ is lazy, as work will only be done once an output is requested.
+
+ :note: the current implementation returns channels which are meant to be
+ used only from the main thread"""
+ __slots__ = ( '_workers', # list of worker threads
+ '_queue', # master queue for tasks
+ '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel
+ )
+
+ def del_node(self, task):
+ """Delete the node ( being a task ), but delete the entries in our output channel
+ cache as well"""
+
+
+ def set_pool_size(self, size=0):
+ """Set the amount of workers to use in this pool.
+ :param size: if 0, the pool will do all work itself in the calling thread,
+ otherwise the work will be distributed among the given amount of threads"""
+
+ def add_task(self, task):
+ """Add a new task to be processed.
+ :return: your task instance with its output channel set. It can be used
+ to retrieve processed items"""
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
new file mode 100644
index 00000000..3938666a
--- /dev/null
+++ b/lib/git/async/thread.py
@@ -0,0 +1,203 @@
+# -*- coding: utf-8 -*-
+"""Module with threading utilities"""
+__docformat__ = "restructuredtext"
+import threading
+import inspect
+import Queue
+
+#{ Decorators
+
+def do_terminate_threads(whitelist=list()):
+ """Simple function which terminates all of our threads
+ :param whitelist: If whitelist is given, only the given threads will be terminated"""
+ for t in threading.enumerate():
+ if not isinstance(t, TerminatableThread):
+ continue
+ if whitelist and t not in whitelist:
+ continue
+ if isinstance(t, WorkerThread):
+ t.inq.put(t.quit)
+ # END worker special handling
+ t.stop_and_join()
+ # END for each thread
+
+def terminate_threads( func ):
+ """Kills all worker threads the method has created by sending the quit signal.
+ This takes over in case of an error in the main function"""
+ def wrapper(*args, **kwargs):
+ cur_threads = set(threading.enumerate())
+ try:
+ return func(*args, **kwargs)
+ finally:
+ do_terminate_threads(set(threading.enumerate()) - cur_threads)
+ # END finally shutdown threads
+ # END wrapper
+ wrapper.__name__ = func.__name__
+ return wrapper
+
+#} END decorators
+
+#{ Classes
+
+class TerminatableThread(threading.Thread):
+ """A simple thread able to terminate itself on behalf of the user.
+
+ Terminate a thread as follows:
+
+ t.stop_and_join()
+
+ Derived classes call _should_terminate() to determine whether they should
+ abort gracefully
+ """
+ __slots__ = '_terminate'
+
+ def __init__(self):
+ super(TerminatableThread, self).__init__()
+ self._terminate = False
+
+
+ #{ Subclass Interface
+ def _should_terminate(self):
+ """:return: True if this thread should terminate its operation immediately"""
+ return self._terminate
+
+ def _terminated(self):
+ """Called once the thread terminated. Its called in the main thread
+ and may perform cleanup operations"""
+ pass
+
+ def start(self):
+ """Start the thread and return self"""
+ super(TerminatableThread, self).start()
+ return self
+
+ #} END subclass interface
+
+ #{ Interface
+
+ def stop_and_join(self):
+ """Ask the thread to stop its operation and wait for it to terminate
+ :note: Depending on the implenetation, this might block a moment"""
+ self._terminate = True
+ self.join()
+ self._terminated()
+ #} END interface
+
+
+class WorkerThread(TerminatableThread):
+ """
+ This base allows to call functions on class instances natively and retrieve
+ their results asynchronously using a queue.
+ The thread runs forever unless it receives the terminate signal using
+ its task queue.
+
+ Tasks could be anything, but should usually be class methods and arguments to
+ allow the following:
+
+ inq = Queue()
+ outq = Queue()
+ w = WorkerThread(inq, outq)
+ w.start()
+ inq.put((WorkerThread.<method>, args, kwargs))
+ res = outq.get()
+
+ finally we call quit to terminate asap.
+
+ alternatively, you can make a call more intuitively - the output is the output queue
+ allowing you to get the result right away or later
+ w.call(arg, kwarg='value').get()
+
+ inq.put(WorkerThread.quit)
+ w.join()
+
+ You may provide the following tuples as task:
+ t[0] = class method, function or instance method
+ t[1] = optional, tuple or list of arguments to pass to the routine
+ t[2] = optional, dictionary of keyword arguments to pass to the routine
+ """
+ __slots__ = ('inq', 'outq')
+
+ class InvalidRoutineError(Exception):
+ """Class sent as return value in case of an error"""
+
+ def __init__(self, inq = None, outq = None):
+ super(WorkerThread, self).__init__()
+ self.inq = inq or Queue.Queue()
+ self.outq = outq or Queue.Queue()
+
+ def call(self, function, *args, **kwargs):
+ """Method that makes the call to the worker using the input queue,
+ returning our output queue
+
+ :param funciton: can be a standalone function unrelated to this class,
+ a class method of this class or any instance method.
+ If it is a string, it will be considered a function residing on this instance
+ :param args: arguments to pass to function
+ :parma **kwargs: kwargs to pass to function"""
+ self.inq.put((function, args, kwargs))
+ return self.outq
+
+ def wait_until_idle(self):
+ """wait until the input queue is empty, in the meanwhile, take all
+ results off the output queue."""
+ while not self.inq.empty():
+ try:
+ self.outq.get(False)
+ except Queue.Empty:
+ continue
+ # END while there are tasks on the queue
+
+ def run(self):
+ """Process input tasks until we receive the quit signal"""
+ while True:
+ if self._should_terminate():
+ break
+ # END check for stop request
+ routine = self.__class__.quit
+ args = tuple()
+ kwargs = dict()
+ tasktuple = self.inq.get()
+
+ if isinstance(tasktuple, (tuple, list)):
+ if len(tasktuple) == 3:
+ routine, args, kwargs = tasktuple
+ elif len(tasktuple) == 2:
+ routine, args = tasktuple
+ elif len(tasktuple) == 1:
+ routine = tasktuple[0]
+ # END tasktuple length check
+ elif inspect.isroutine(tasktuple):
+ routine = tasktuple
+ # END tasktuple handling
+
+ try:
+ rval = None
+ if inspect.ismethod(routine):
+ if routine.im_self is None:
+ rval = routine(self, *args, **kwargs)
+ else:
+ rval = routine(*args, **kwargs)
+ elif inspect.isroutine(routine):
+ rval = routine(*args, **kwargs)
+ elif isinstance(routine, basestring) and hasattr(self, routine):
+ rval = getattr(self, routine)(*args, **kwargs)
+ else:
+ # ignore unknown items
+ print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
+ self.outq.put(self.InvalidRoutineError(routine))
+ break
+ # END make routine call
+ self.outq.put(rval)
+ except StopIteration:
+ break
+ except Exception,e:
+ print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
+ self.outq.put(e)
+ # END routine exception handling
+ # END endless loop
+
+ def quit(self):
+ raise StopIteration
+
+
+#} END classes