diff options
-rw-r--r-- | git/cmd.py | 20 | ||||
-rw-r--r-- | git/util.py | 32 |
2 files changed, 43 insertions, 9 deletions
@@ -18,7 +18,8 @@ from subprocess import ( from .util import ( LazyMixin, - stream_copy + stream_copy, + WaitGroup ) from .exc import GitCommandError from git.compat import ( @@ -84,12 +85,14 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end dispatch helper # end - def deplete_buffer(fno): + def deplete_buffer(fno, wg=None): while True: line = dispatch_line(fno) if not line: break # end deplete buffer + if wg: + wg.done() # end fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), @@ -131,15 +134,16 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # The only reliable way to do this now is to use threads and wait for both to finish # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive # NO: It's not enough unfortunately, and we will have to sync the threads - threads = list() + wg = WaitGroup() for fno in fdmap.keys(): - t = threading.Thread(target = lambda: deplete_buffer(fno)) - threads.append(t) + wg.add(1) + t = threading.Thread(target = lambda: deplete_buffer(fno, wg)) t.start() # end - for t in threads: - t.join() - # end + # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's + # actually started, which could make the wait() call to just return because the thread is not yet + # active + wg.wait() # end return finalizer(process) diff --git a/git/util.py b/git/util.py index 34b09d32..e211ca41 100644 --- a/git/util.py +++ b/git/util.py @@ -12,6 +12,7 @@ import stat import shutil import platform import getpass +import threading # NOTE: Some of the unused imports might be used/imported by others. # Handle once test-cases are back up and running. @@ -32,7 +33,7 @@ from gitdb.util import ( # NOQA __all__ = ("stream_copy", "join_path", "to_native_path_windows", "to_native_path_linux", "join_path_native", "Stats", "IndexFileSHA1Writer", "Iterable", "IterableList", "BlockingLockFile", "LockFile", 'Actor', 'get_user_id', 'assure_directory_exists', - 'RemoteProgress', 'rmtree') + 'RemoteProgress', 'rmtree', 'WaitGroup') #{ Utility Methods @@ -699,3 +700,32 @@ class Iterable(object): raise NotImplementedError("To be implemented by Subclass") #} END classes + + +class WaitGroup(object): + """WaitGroup is like Go sync.WaitGroup. + + Without all the useful corner cases. + By Peter Teichman, taken from https://gist.github.com/pteichman/84b92ae7cef0ab98f5a8 + """ + def __init__(self): + self.count = 0 + self.cv = threading.Condition() + + def add(self, n): + self.cv.acquire() + self.count += n + self.cv.release() + + def done(self): + self.cv.acquire() + self.count -= 1 + if self.count == 0: + self.cv.notify_all() + self.cv.release() + + def wait(self): + self.cv.acquire() + while self.count > 0: + self.cv.wait() + self.cv.release() |