diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-11 16:47:40 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-11 16:47:40 +0200 |
commit | 29eb123beb1c55e5db4aa652d843adccbd09ae18 (patch) | |
tree | fc85ccf6a7eaf87b35058915b534ccf634f2ff94 /lib/git/async | |
parent | f606937a7a21237c866efafcad33675e6539c103 (diff) | |
parent | e14e3f143e7260de9581aee27e5a9b2645db72de (diff) | |
download | gitpython-29eb123beb1c55e5db4aa652d843adccbd09ae18.tar.gz |
Merge branch 'cleanup' into async
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/__init__.py | 29 | ||||
-rw-r--r-- | lib/git/async/pool.py | 6 | ||||
-rw-r--r-- | lib/git/async/task.py | 6 | ||||
-rw-r--r-- | lib/git/async/thread.py | 25 | ||||
-rw-r--r-- | lib/git/async/util.py | 1 |
5 files changed, 46 insertions, 21 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py index 89b9eb47..e212f1b2 100644 --- a/lib/git/async/__init__.py +++ b/lib/git/async/__init__.py @@ -1 +1,30 @@ """Initialize the multi-processing package""" + +#{ Initialization +def _init_atexit(): + """Setup an at-exit job to be sure our workers are shutdown correctly before + the interpreter quits""" + import atexit + import thread + atexit.register(thread.do_terminate_threads) + +def _init_signals(): + """Assure we shutdown our threads correctly when being interrupted""" + import signal + import thread + + prev_handler = signal.getsignal(signal.SIGINT) + def thread_interrupt_handler(signum, frame): + thread.do_terminate_threads() + if callable(prev_handler): + prev_handler(signum, frame) + raise KeyboardInterrupt() + # END call previous handler + # END signal handler + signal.signal(signal.SIGINT, thread_interrupt_handler) + + +#} END init + +_init_atexit() +_init_signals() diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index dbc201a9..7ee3e8eb 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -95,10 +95,6 @@ class PoolReader(CallbackReader): # provided enough - its better to have some possibly empty task runs # than having and empty queue that blocks. - # NOTE: TODO: that case is only possible if one Task could be connected - # to multiple input channels in a manner known by the system. Currently - # this is not possible, but should be implemented at some point. - # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel task = self._task_ref() @@ -260,8 +256,6 @@ class Pool(object): # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower - # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_writer.size() if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 49e7e7cf..10b22649 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -88,11 +88,7 @@ class OutputChannelTask(Node): self._num_writers += 1 self._wlock.release() - #print "%r: reading %i" % (self.id, count) - #if hasattr(self, 'reader'): - # print "from", self.reader().channel items = self._read(count) - #print "%r: done reading %i items" % (self.id, len(items)) try: try: @@ -117,7 +113,6 @@ class OutputChannelTask(Node): self._wlock.release() # END handle writer count except Exception, e: - print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again self.set_done() @@ -164,7 +159,6 @@ class OutputChannelTask(Node): self._wlock.acquire() try: if self._num_writers == 0: - # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel self.close() # END handle writers finally: diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 4d046a2f..afe0d79d 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -136,16 +136,21 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" - print self.name, "starts processing" # DEBUG - gettask = self.inq.get while True: if self._should_terminate(): break # END check for stop request - # we wait and block - to terminate, send the 'stop' method + # note: during shutdown, this turns None in the middle of waiting + # for an item to be put onto it - we can't du anything about it - + # even if we catch everything and break gracefully, the parent + # call will think we failed with an empty exception. + # Hence we just don't do anything about it. Alternatively + # we could override the start method to get our own bootstrapping, + # which would mean repeating plenty of code in of the threading module. tasktuple = gettask() + # needing exactly one function, and one arg routine, arg = tasktuple @@ -161,7 +166,7 @@ class WorkerThread(TerminatableThread): rval = routine(arg) else: # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple))) break # END make routine call finally: @@ -171,10 +176,9 @@ class WorkerThread(TerminatableThread): del(routine) del(tasktuple) except StopProcessing: - print self.name, "stops processing" # DEBUG break except Exception,e: - print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) continue # just continue # END routine exception handling @@ -182,7 +186,12 @@ class WorkerThread(TerminatableThread): # END endless loop def stop_and_join(self): - """Send stop message to ourselves""" + """Send stop message to ourselves - we don't block, the thread will terminate + once it has finished processing its input queue to receive our termination + event""" + # DONT call superclass as it will try to join - join's don't work for + # some reason, as python apparently doesn't switch threads (so often) + # while waiting ... I don't know, but the threads respond properly, + # but only if dear python switches to them self.inq.put((self.stop, None)) - super(WorkerThread, self).stop_and_join() #} END classes diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 11ab75a6..4c4f3929 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -2,7 +2,6 @@ from threading import ( Lock, - current_thread, _allocate_lock, _Condition, _sleep, |