diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
commit | f91495e271597034226f1b9651345091083172c4 (patch) | |
tree | e0e2aa63b7dc649083858366eaedb6ac4cc5739b /lib/git/async/thread.py | |
parent | 7c1169f6ea406fec1e26e99821e18e66437e65eb (diff) | |
parent | 7a0b79ee574999ecbc76696506352e4a5a0d7159 (diff) | |
download | gitpython-f91495e271597034226f1b9651345091083172c4.tar.gz |
Merge branch 'async'
Diffstat (limited to 'lib/git/async/thread.py')
-rw-r--r-- | lib/git/async/thread.py | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py new file mode 100644 index 00000000..96b4f0c4 --- /dev/null +++ b/lib/git/async/thread.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +"""Module with threading utilities""" +__docformat__ = "restructuredtext" +import threading +import inspect +import Queue + +import sys + +__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread', + 'WorkerThread') + + +#{ Decorators + +def do_terminate_threads(whitelist=list()): + """Simple function which terminates all of our threads + :param whitelist: If whitelist is given, only the given threads will be terminated""" + for t in threading.enumerate(): + if not isinstance(t, TerminatableThread): + continue + if whitelist and t not in whitelist: + continue + t.stop_and_join() + # END for each thread + +def terminate_threads( func ): + """Kills all worker threads the method has created by sending the quit signal. + This takes over in case of an error in the main function""" + def wrapper(*args, **kwargs): + cur_threads = set(threading.enumerate()) + try: + return func(*args, **kwargs) + finally: + do_terminate_threads(set(threading.enumerate()) - cur_threads) + # END finally shutdown threads + # END wrapper + wrapper.__name__ = func.__name__ + return wrapper + +#} END decorators + +#{ Classes + +class TerminatableThread(threading.Thread): + """A simple thread able to terminate itself on behalf of the user. + + Terminate a thread as follows: + + t.stop_and_join() + + Derived classes call _should_terminate() to determine whether they should + abort gracefully + """ + __slots__ = '_terminate' + + def __init__(self): + super(TerminatableThread, self).__init__() + self._terminate = False + + + #{ Subclass Interface + def _should_terminate(self): + """:return: True if this thread should terminate its operation immediately""" + return self._terminate + + def _terminated(self): + """Called once the thread terminated. Its called in the main thread + and may perform cleanup operations""" + pass + + def start(self): + """Start the thread and return self""" + super(TerminatableThread, self).start() + return self + + #} END subclass interface + + #{ Interface + + def stop_and_join(self): + """Ask the thread to stop its operation and wait for it to terminate + :note: Depending on the implenetation, this might block a moment""" + self._terminate = True + self.join() + self._terminated() + #} END interface + + +class StopProcessing(Exception): + """If thrown in a function processed by a WorkerThread, it will terminate""" + + +class WorkerThread(TerminatableThread): + """ This base allows to call functions on class instances natively. + As it is meant to work with a pool, the result of the call must be + handled by the callee. + The thread runs forever unless it receives the terminate signal using + its task queue. + + Tasks could be anything, but should usually be class methods and arguments to + allow the following: + + inq = Queue() + w = WorkerThread(inq) + w.start() + inq.put((WorkerThread.<method>, args, kwargs)) + + finally we call quit to terminate asap. + + alternatively, you can make a call more intuitively - the output is the output queue + allowing you to get the result right away or later + w.call(arg, kwarg='value').get() + + inq.put(WorkerThread.quit) + w.join() + + You may provide the following tuples as task: + t[0] = class method, function or instance method + t[1] = optional, tuple or list of arguments to pass to the routine + t[2] = optional, dictionary of keyword arguments to pass to the routine + """ + __slots__ = ('inq') + + + # define how often we should check for a shutdown request in case our + # taskqueue is empty + shutdown_check_time_s = 0.5 + + def __init__(self, inq = None): + super(WorkerThread, self).__init__() + self.inq = inq + if inq is None: + self.inq = Queue.Queue() + + @classmethod + def stop(cls, *args): + """If send via the inq of the thread, it will stop once it processed the function""" + raise StopProcessing + + def run(self): + """Process input tasks until we receive the quit signal""" + gettask = self.inq.get + while True: + if self._should_terminate(): + break + # END check for stop request + + # note: during shutdown, this turns None in the middle of waiting + # for an item to be put onto it - we can't du anything about it - + # even if we catch everything and break gracefully, the parent + # call will think we failed with an empty exception. + # Hence we just don't do anything about it. Alternatively + # we could override the start method to get our own bootstrapping, + # which would mean repeating plenty of code in of the threading module. + tasktuple = gettask() + + # needing exactly one function, and one arg + routine, arg = tasktuple + + try: + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, arg) + else: + rval = routine(arg) + elif inspect.isroutine(routine): + rval = routine(arg) + else: + # ignore unknown items + sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple))) + break + # END make routine call + finally: + # make sure we delete the routine to release the reference as soon + # as possible. Otherwise objects might not be destroyed + # while we are waiting + del(routine) + del(tasktuple) + except StopProcessing: + break + except Exception,e: + sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) + continue # just continue + # END routine exception handling + + # END handle routine release + # END endless loop + + def stop_and_join(self): + """Send stop message to ourselves - we don't block, the thread will terminate + once it has finished processing its input queue to receive our termination + event""" + # DONT call superclass as it will try to join - join's don't work for + # some reason, as python apparently doesn't switch threads (so often) + # while waiting ... I don't know, but the threads respond properly, + # but only if dear python switches to them + self.inq.put((self.stop, None)) +#} END classes |