diff options
-rw-r--r-- | lib/git/async/__init__.py | 29 | ||||
-rw-r--r-- | lib/git/async/thread.py | 25 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 8 |
3 files changed, 49 insertions, 13 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py index 89b9eb47..e212f1b2 100644 --- a/lib/git/async/__init__.py +++ b/lib/git/async/__init__.py @@ -1 +1,30 @@ """Initialize the multi-processing package""" + +#{ Initialization +def _init_atexit(): + """Setup an at-exit job to be sure our workers are shutdown correctly before + the interpreter quits""" + import atexit + import thread + atexit.register(thread.do_terminate_threads) + +def _init_signals(): + """Assure we shutdown our threads correctly when being interrupted""" + import signal + import thread + + prev_handler = signal.getsignal(signal.SIGINT) + def thread_interrupt_handler(signum, frame): + thread.do_terminate_threads() + if callable(prev_handler): + prev_handler(signum, frame) + raise KeyboardInterrupt() + # END call previous handler + # END signal handler + signal.signal(signal.SIGINT, thread_interrupt_handler) + + +#} END init + +_init_atexit() +_init_signals() diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 4d046a2f..afe0d79d 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -136,16 +136,21 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" - print self.name, "starts processing" # DEBUG - gettask = self.inq.get while True: if self._should_terminate(): break # END check for stop request - # we wait and block - to terminate, send the 'stop' method + # note: during shutdown, this turns None in the middle of waiting + # for an item to be put onto it - we can't du anything about it - + # even if we catch everything and break gracefully, the parent + # call will think we failed with an empty exception. + # Hence we just don't do anything about it. Alternatively + # we could override the start method to get our own bootstrapping, + # which would mean repeating plenty of code in of the threading module. tasktuple = gettask() + # needing exactly one function, and one arg routine, arg = tasktuple @@ -161,7 +166,7 @@ class WorkerThread(TerminatableThread): rval = routine(arg) else: # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple))) break # END make routine call finally: @@ -171,10 +176,9 @@ class WorkerThread(TerminatableThread): del(routine) del(tasktuple) except StopProcessing: - print self.name, "stops processing" # DEBUG break except Exception,e: - print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) continue # just continue # END routine exception handling @@ -182,7 +186,12 @@ class WorkerThread(TerminatableThread): # END endless loop def stop_and_join(self): - """Send stop message to ourselves""" + """Send stop message to ourselves - we don't block, the thread will terminate + once it has finished processing its input queue to receive our termination + event""" + # DONT call superclass as it will try to join - join's don't work for + # some reason, as python apparently doesn't switch threads (so often) + # while waiting ... I don't know, but the threads respond properly, + # but only if dear python switches to them self.inq.put((self.stop, None)) - super(WorkerThread, self).stop_and_join() #} END classes diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 40c6d66e..c786770a 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -373,10 +373,7 @@ class TestThreadPool(TestBase): - - # for some reason, sometimes it has multiple workerthreads already when he - # enters the method ... dunno yet, pools should clean up themselvess - #@terminate_threads + @terminate_threads def test_base(self): assert len(threading.enumerate()) == 1 @@ -463,10 +460,11 @@ class TestThreadPool(TestBase): # threads per core p.set_size(4) self._assert_single_task(p, True) + + # DEPENDENT TASK ASYNC MODE ########################### self._assert_async_dependent_tasks(p) print >> sys.stderr, "Done with everything" - # TODO: test multi-pool connections |