summaryrefslogtreecommitdiff
path: root/lib/git/odb/thread.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-05 19:59:17 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-05 19:59:17 +0200
commit61138f2ece0cb864b933698174315c34a78835d1 (patch)
treebf764cc491c7f29bb4e9cb97b42d75b4b5c6458b /lib/git/odb/thread.py
parent50e469109eed3a752d9a1b0297f16466ad92f8d2 (diff)
downloadgitpython-61138f2ece0cb864b933698174315c34a78835d1.tar.gz
Moved multiprocessing modules into own package, as they in fact have nothing to do with the object db. If that really works the way I want, it will become an own project, called async
Diffstat (limited to 'lib/git/odb/thread.py')
-rw-r--r--lib/git/odb/thread.py203
1 files changed, 0 insertions, 203 deletions
diff --git a/lib/git/odb/thread.py b/lib/git/odb/thread.py
deleted file mode 100644
index 3938666a..00000000
--- a/lib/git/odb/thread.py
+++ /dev/null
@@ -1,203 +0,0 @@
-# -*- coding: utf-8 -*-
-"""Module with threading utilities"""
-__docformat__ = "restructuredtext"
-import threading
-import inspect
-import Queue
-
-#{ Decorators
-
-def do_terminate_threads(whitelist=list()):
- """Simple function which terminates all of our threads
- :param whitelist: If whitelist is given, only the given threads will be terminated"""
- for t in threading.enumerate():
- if not isinstance(t, TerminatableThread):
- continue
- if whitelist and t not in whitelist:
- continue
- if isinstance(t, WorkerThread):
- t.inq.put(t.quit)
- # END worker special handling
- t.stop_and_join()
- # END for each thread
-
-def terminate_threads( func ):
- """Kills all worker threads the method has created by sending the quit signal.
- This takes over in case of an error in the main function"""
- def wrapper(*args, **kwargs):
- cur_threads = set(threading.enumerate())
- try:
- return func(*args, **kwargs)
- finally:
- do_terminate_threads(set(threading.enumerate()) - cur_threads)
- # END finally shutdown threads
- # END wrapper
- wrapper.__name__ = func.__name__
- return wrapper
-
-#} END decorators
-
-#{ Classes
-
-class TerminatableThread(threading.Thread):
- """A simple thread able to terminate itself on behalf of the user.
-
- Terminate a thread as follows:
-
- t.stop_and_join()
-
- Derived classes call _should_terminate() to determine whether they should
- abort gracefully
- """
- __slots__ = '_terminate'
-
- def __init__(self):
- super(TerminatableThread, self).__init__()
- self._terminate = False
-
-
- #{ Subclass Interface
- def _should_terminate(self):
- """:return: True if this thread should terminate its operation immediately"""
- return self._terminate
-
- def _terminated(self):
- """Called once the thread terminated. Its called in the main thread
- and may perform cleanup operations"""
- pass
-
- def start(self):
- """Start the thread and return self"""
- super(TerminatableThread, self).start()
- return self
-
- #} END subclass interface
-
- #{ Interface
-
- def stop_and_join(self):
- """Ask the thread to stop its operation and wait for it to terminate
- :note: Depending on the implenetation, this might block a moment"""
- self._terminate = True
- self.join()
- self._terminated()
- #} END interface
-
-
-class WorkerThread(TerminatableThread):
- """
- This base allows to call functions on class instances natively and retrieve
- their results asynchronously using a queue.
- The thread runs forever unless it receives the terminate signal using
- its task queue.
-
- Tasks could be anything, but should usually be class methods and arguments to
- allow the following:
-
- inq = Queue()
- outq = Queue()
- w = WorkerThread(inq, outq)
- w.start()
- inq.put((WorkerThread.<method>, args, kwargs))
- res = outq.get()
-
- finally we call quit to terminate asap.
-
- alternatively, you can make a call more intuitively - the output is the output queue
- allowing you to get the result right away or later
- w.call(arg, kwarg='value').get()
-
- inq.put(WorkerThread.quit)
- w.join()
-
- You may provide the following tuples as task:
- t[0] = class method, function or instance method
- t[1] = optional, tuple or list of arguments to pass to the routine
- t[2] = optional, dictionary of keyword arguments to pass to the routine
- """
- __slots__ = ('inq', 'outq')
-
- class InvalidRoutineError(Exception):
- """Class sent as return value in case of an error"""
-
- def __init__(self, inq = None, outq = None):
- super(WorkerThread, self).__init__()
- self.inq = inq or Queue.Queue()
- self.outq = outq or Queue.Queue()
-
- def call(self, function, *args, **kwargs):
- """Method that makes the call to the worker using the input queue,
- returning our output queue
-
- :param funciton: can be a standalone function unrelated to this class,
- a class method of this class or any instance method.
- If it is a string, it will be considered a function residing on this instance
- :param args: arguments to pass to function
- :parma **kwargs: kwargs to pass to function"""
- self.inq.put((function, args, kwargs))
- return self.outq
-
- def wait_until_idle(self):
- """wait until the input queue is empty, in the meanwhile, take all
- results off the output queue."""
- while not self.inq.empty():
- try:
- self.outq.get(False)
- except Queue.Empty:
- continue
- # END while there are tasks on the queue
-
- def run(self):
- """Process input tasks until we receive the quit signal"""
- while True:
- if self._should_terminate():
- break
- # END check for stop request
- routine = self.__class__.quit
- args = tuple()
- kwargs = dict()
- tasktuple = self.inq.get()
-
- if isinstance(tasktuple, (tuple, list)):
- if len(tasktuple) == 3:
- routine, args, kwargs = tasktuple
- elif len(tasktuple) == 2:
- routine, args = tasktuple
- elif len(tasktuple) == 1:
- routine = tasktuple[0]
- # END tasktuple length check
- elif inspect.isroutine(tasktuple):
- routine = tasktuple
- # END tasktuple handling
-
- try:
- rval = None
- if inspect.ismethod(routine):
- if routine.im_self is None:
- rval = routine(self, *args, **kwargs)
- else:
- rval = routine(*args, **kwargs)
- elif inspect.isroutine(routine):
- rval = routine(*args, **kwargs)
- elif isinstance(routine, basestring) and hasattr(self, routine):
- rval = getattr(self, routine)(*args, **kwargs)
- else:
- # ignore unknown items
- print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
- self.outq.put(self.InvalidRoutineError(routine))
- break
- # END make routine call
- self.outq.put(rval)
- except StopIteration:
- break
- except Exception,e:
- print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
- self.outq.put(e)
- # END routine exception handling
- # END endless loop
-
- def quit(self):
- raise StopIteration
-
-
-#} END classes