summaryrefslogtreecommitdiff
path: root/lib/git/odb
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-05 19:59:17 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-05 19:59:17 +0200
commit61138f2ece0cb864b933698174315c34a78835d1 (patch)
treebf764cc491c7f29bb4e9cb97b42d75b4b5c6458b /lib/git/odb
parent50e469109eed3a752d9a1b0297f16466ad92f8d2 (diff)
downloadgitpython-61138f2ece0cb864b933698174315c34a78835d1.tar.gz
Moved multiprocessing modules into own package, as they in fact have nothing to do with the object db. If that really works the way I want, it will become an own project, called async
Diffstat (limited to 'lib/git/odb')
-rw-r--r--lib/git/odb/channel.py108
-rw-r--r--lib/git/odb/pool.py104
-rw-r--r--lib/git/odb/thread.py203
3 files changed, 0 insertions, 415 deletions
diff --git a/lib/git/odb/channel.py b/lib/git/odb/channel.py
deleted file mode 100644
index c9cbfb87..00000000
--- a/lib/git/odb/channel.py
+++ /dev/null
@@ -1,108 +0,0 @@
-"""Contains a queue based channel implementation"""
-from Queue import (
- Queue,
- Empty,
- Full
- )
-
-#{ Classes
-class Channel(object):
- """A channel is similar to a system pipe. It has a write end as well as one or
- more read ends. If Data is in the channel, it can be read, if not the read operation
- will block until data becomes available.
- If the channel is closed, any read operation will result in an exception
-
- This base class is not instantiated directly, but instead serves as constructor
- for RWChannel pairs.
-
- Create a new channel """
- __slots__ = tuple()
-
- def __new__(cls, *args):
- if cls is Channel:
- max_items = 0
- if len(args) == 1:
- max_items = args[0]
- if len(args) > 1:
- raise ValueError("Specify not more than the number of items the channel should take")
- wc = WChannel(max_items)
- rc = RChannel(wc)
- return wc, rc
- # END constructor mode
- return object.__new__(cls)
-
-
-class WChannel(Channel):
- """The write end of a channel"""
- __slots__ = ('_closed', '_queue')
-
- def __init__(self, max_items=0):
- """initialize this instance, able to hold max_items at once
- Write calls will block if the channel is full, until someone reads from it"""
- self._closed = False
- self._queue = Queue(max_items)
-
-
- #{ Interface
- def write(self, item, block=True, timeout=None):
- """Send an item into the channel, it can be read from the read end of the
- channel accordingly
- :param item: Item to send
- :param block: If True, the call will block until there is free space in the
- channel
- :param timeout: timeout in seconds for blocking calls.
- :raise IOError: when writing into closed file or when writing into a non-blocking
- full channel
- :note: may block if the channel has a limited capacity"""
- if self._closed:
- raise IOError("Cannot write to a closed channel")
-
- try:
- self._queue.put(item, block, timeout)
- except Full:
- raise IOError("Capacity of the channel was exeeded")
- # END exception handling
-
- def close(self):
- """Close the channel. Multiple close calls on a closed channel are no
- an error"""
- self._closed = True
-
- @property
- def closed(self):
- """:return: True if the channel was closed"""
- return self._closed
- #} END interface
-
-
-class RChannel(Channel):
- """The read-end of a corresponding write channel"""
- __slots__ = '_wc'
-
- def __init__(self, wchannel):
- """Initialize this instance from its parent write channel"""
- self._wc = wchannel
-
-
- #{ Interface
-
- def read(self, block=True, timeout=None):
- """:return: an item read from the channel
- :param block: if True, the call will block until an item is available
- :param timeout: if positive and block is True, it will block only for the
- given amount of seconds.
- :raise IOError: When reading from an empty channel ( if non-blocking, or
- if the channel is still empty after the timeout"""
- # if the channel is closed for writing, we never block
- if self._wc.closed:
- block = False
-
- try:
- return self._wc._queue.get(block, timeout)
- except Empty:
- raise IOError("Error reading from an empty channel")
- # END handle reading
-
- #} END interface
-
-#} END classes
diff --git a/lib/git/odb/pool.py b/lib/git/odb/pool.py
deleted file mode 100644
index 5c3a7ead..00000000
--- a/lib/git/odb/pool.py
+++ /dev/null
@@ -1,104 +0,0 @@
-"""Implementation of a thread-pool working with channels"""
-from thread import TerminatableThread
-from channel import (
- Channel,
- WChannel,
- RChannel
- )
-
-class Node(object):
- """A quick and dirty to the point implementation of a simple, and slow ascyclic graph.
- Its not designed to support big graphs, and sports only the functionality
- we need"""
- __slots__('in_nodes', 'out_nodes')
-
-
-class Graph(object):
- """A simple graph implementation, keeping nodes and providing basic access and
- editing functions"""
- __slots__ = "nodes"
-
- def add_node(self, node):
- pass
-
- def del_node(self, node):
- pass
-
- def visit_input_depth_first(self, node, visitor=lambda n: True ):
- """Visit all input nodes of the given node, depth first, calling visitor
- for each node on our way. If the function returns False, the traversal
- will not go any deeper, but continue at the next branch"""
- pass
-
-
-class TaskNode(Node):
- """Couples an input channel, an output channel, as well as a processing function
- together.
- It may contain additional information on how to handel read-errors from the
- input channel"""
- __slots__ = ('in_rc', 'out_wc', 'fun')
-
- def is_done(self):
- """:return: True if we are finished processing"""
- return self.out_wc.closed
-
-
-class PoolChannel(Channel):
- """Base class for read and write channels which trigger the pool to evaluate
- its tasks, causing the evaluation of the task list effectively assure a read
- from actual output channel will not block forever due to task dependencies.
- """
- __slots__ = tuple()
-
-
-class RPoolChannel(PoolChannel):
- """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
- before and after an item is to be read"""
- __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
-
- def set_post_cb(self, fun = lambda item: item):
- """Install a callback to call after the item has been read. The function
- returns a possibly changed item. If it raises, the exception will be propagated
- in an IOError, indicating read-failure
- If a function is not provided, the call is effectively uninstalled."""
-
- def set_pre_cb(self, fun = lambda : None):
- """Install a callback to call before an item is read from the channel.
- If it fails, the read will fail with an IOError
- If a function is not provided, the call is effectively uninstalled."""
-
-
-class PoolWorker(WorkerThread):
- """A worker thread which gets called to deal with Tasks. Tasks provide channls
- with actual work, whose result will be send to the tasks output channel"""
-
- @classmethod
- def perform_task(cls, task):
- pass
-
-
-class ThreadPool(Graph):
- """A thread pool maintains a set of one or more worker threads, but supports
- a fully serial mode in which case the amount of threads is zero.
-
- Work is distributed via Channels, which form a dependency graph. The evaluation
- is lazy, as work will only be done once an output is requested."""
- __slots__ = ( '_workers', # list of worker threads
- '_queue', # master queue for tasks
- '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel
- )
-
- def del_node(self, task):
- """Delete the node ( being a task ), but delete the entries in our output channel
- cache as well"""
-
-
- def set_pool_size(self, size=0):
- """Set the amount of workers to use in this pool.
- :param size: if 0, the pool will do all work itself in the calling thread,
- otherwise the work will be distributed among the given amount of threads"""
-
- def add_task(self, task):
- """Add a new task to be processed.
- :return: your task instance with its output channel set. It can be used
- to retrieve processed items"""
diff --git a/lib/git/odb/thread.py b/lib/git/odb/thread.py
deleted file mode 100644
index 3938666a..00000000
--- a/lib/git/odb/thread.py
+++ /dev/null
@@ -1,203 +0,0 @@
-# -*- coding: utf-8 -*-
-"""Module with threading utilities"""
-__docformat__ = "restructuredtext"
-import threading
-import inspect
-import Queue
-
-#{ Decorators
-
-def do_terminate_threads(whitelist=list()):
- """Simple function which terminates all of our threads
- :param whitelist: If whitelist is given, only the given threads will be terminated"""
- for t in threading.enumerate():
- if not isinstance(t, TerminatableThread):
- continue
- if whitelist and t not in whitelist:
- continue
- if isinstance(t, WorkerThread):
- t.inq.put(t.quit)
- # END worker special handling
- t.stop_and_join()
- # END for each thread
-
-def terminate_threads( func ):
- """Kills all worker threads the method has created by sending the quit signal.
- This takes over in case of an error in the main function"""
- def wrapper(*args, **kwargs):
- cur_threads = set(threading.enumerate())
- try:
- return func(*args, **kwargs)
- finally:
- do_terminate_threads(set(threading.enumerate()) - cur_threads)
- # END finally shutdown threads
- # END wrapper
- wrapper.__name__ = func.__name__
- return wrapper
-
-#} END decorators
-
-#{ Classes
-
-class TerminatableThread(threading.Thread):
- """A simple thread able to terminate itself on behalf of the user.
-
- Terminate a thread as follows:
-
- t.stop_and_join()
-
- Derived classes call _should_terminate() to determine whether they should
- abort gracefully
- """
- __slots__ = '_terminate'
-
- def __init__(self):
- super(TerminatableThread, self).__init__()
- self._terminate = False
-
-
- #{ Subclass Interface
- def _should_terminate(self):
- """:return: True if this thread should terminate its operation immediately"""
- return self._terminate
-
- def _terminated(self):
- """Called once the thread terminated. Its called in the main thread
- and may perform cleanup operations"""
- pass
-
- def start(self):
- """Start the thread and return self"""
- super(TerminatableThread, self).start()
- return self
-
- #} END subclass interface
-
- #{ Interface
-
- def stop_and_join(self):
- """Ask the thread to stop its operation and wait for it to terminate
- :note: Depending on the implenetation, this might block a moment"""
- self._terminate = True
- self.join()
- self._terminated()
- #} END interface
-
-
-class WorkerThread(TerminatableThread):
- """
- This base allows to call functions on class instances natively and retrieve
- their results asynchronously using a queue.
- The thread runs forever unless it receives the terminate signal using
- its task queue.
-
- Tasks could be anything, but should usually be class methods and arguments to
- allow the following:
-
- inq = Queue()
- outq = Queue()
- w = WorkerThread(inq, outq)
- w.start()
- inq.put((WorkerThread.<method>, args, kwargs))
- res = outq.get()
-
- finally we call quit to terminate asap.
-
- alternatively, you can make a call more intuitively - the output is the output queue
- allowing you to get the result right away or later
- w.call(arg, kwarg='value').get()
-
- inq.put(WorkerThread.quit)
- w.join()
-
- You may provide the following tuples as task:
- t[0] = class method, function or instance method
- t[1] = optional, tuple or list of arguments to pass to the routine
- t[2] = optional, dictionary of keyword arguments to pass to the routine
- """
- __slots__ = ('inq', 'outq')
-
- class InvalidRoutineError(Exception):
- """Class sent as return value in case of an error"""
-
- def __init__(self, inq = None, outq = None):
- super(WorkerThread, self).__init__()
- self.inq = inq or Queue.Queue()
- self.outq = outq or Queue.Queue()
-
- def call(self, function, *args, **kwargs):
- """Method that makes the call to the worker using the input queue,
- returning our output queue
-
- :param funciton: can be a standalone function unrelated to this class,
- a class method of this class or any instance method.
- If it is a string, it will be considered a function residing on this instance
- :param args: arguments to pass to function
- :parma **kwargs: kwargs to pass to function"""
- self.inq.put((function, args, kwargs))
- return self.outq
-
- def wait_until_idle(self):
- """wait until the input queue is empty, in the meanwhile, take all
- results off the output queue."""
- while not self.inq.empty():
- try:
- self.outq.get(False)
- except Queue.Empty:
- continue
- # END while there are tasks on the queue
-
- def run(self):
- """Process input tasks until we receive the quit signal"""
- while True:
- if self._should_terminate():
- break
- # END check for stop request
- routine = self.__class__.quit
- args = tuple()
- kwargs = dict()
- tasktuple = self.inq.get()
-
- if isinstance(tasktuple, (tuple, list)):
- if len(tasktuple) == 3:
- routine, args, kwargs = tasktuple
- elif len(tasktuple) == 2:
- routine, args = tasktuple
- elif len(tasktuple) == 1:
- routine = tasktuple[0]
- # END tasktuple length check
- elif inspect.isroutine(tasktuple):
- routine = tasktuple
- # END tasktuple handling
-
- try:
- rval = None
- if inspect.ismethod(routine):
- if routine.im_self is None:
- rval = routine(self, *args, **kwargs)
- else:
- rval = routine(*args, **kwargs)
- elif inspect.isroutine(routine):
- rval = routine(*args, **kwargs)
- elif isinstance(routine, basestring) and hasattr(self, routine):
- rval = getattr(self, routine)(*args, **kwargs)
- else:
- # ignore unknown items
- print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
- self.outq.put(self.InvalidRoutineError(routine))
- break
- # END make routine call
- self.outq.put(rval)
- except StopIteration:
- break
- except Exception,e:
- print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
- self.outq.put(e)
- # END routine exception handling
- # END endless loop
-
- def quit(self):
- raise StopIteration
-
-
-#} END classes