summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--git/cmd.py20
-rw-r--r--git/util.py32
2 files changed, 43 insertions, 9 deletions
diff --git a/git/cmd.py b/git/cmd.py
index 5ba5edb4..e03d0cdc 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -18,7 +18,8 @@ from subprocess import (
from .util import (
LazyMixin,
- stream_copy
+ stream_copy,
+ WaitGroup
)
from .exc import GitCommandError
from git.compat import (
@@ -84,12 +85,14 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
# end dispatch helper
# end
- def deplete_buffer(fno):
+ def deplete_buffer(fno, wg=None):
while True:
line = dispatch_line(fno)
if not line:
break
# end deplete buffer
+ if wg:
+ wg.done()
# end
fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast),
@@ -131,15 +134,16 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
# The only reliable way to do this now is to use threads and wait for both to finish
# Since the finalizer is expected to wait, we don't have to introduce our own wait primitive
# NO: It's not enough unfortunately, and we will have to sync the threads
- threads = list()
+ wg = WaitGroup()
for fno in fdmap.keys():
- t = threading.Thread(target = lambda: deplete_buffer(fno))
- threads.append(t)
+ wg.add(1)
+ t = threading.Thread(target = lambda: deplete_buffer(fno, wg))
t.start()
# end
- for t in threads:
- t.join()
- # end
+ # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's
+ # actually started, which could make the wait() call to just return because the thread is not yet
+ # active
+ wg.wait()
# end
return finalizer(process)
diff --git a/git/util.py b/git/util.py
index 34b09d32..e211ca41 100644
--- a/git/util.py
+++ b/git/util.py
@@ -12,6 +12,7 @@ import stat
import shutil
import platform
import getpass
+import threading
# NOTE: Some of the unused imports might be used/imported by others.
# Handle once test-cases are back up and running.
@@ -32,7 +33,7 @@ from gitdb.util import ( # NOQA
__all__ = ("stream_copy", "join_path", "to_native_path_windows", "to_native_path_linux",
"join_path_native", "Stats", "IndexFileSHA1Writer", "Iterable", "IterableList",
"BlockingLockFile", "LockFile", 'Actor', 'get_user_id', 'assure_directory_exists',
- 'RemoteProgress', 'rmtree')
+ 'RemoteProgress', 'rmtree', 'WaitGroup')
#{ Utility Methods
@@ -699,3 +700,32 @@ class Iterable(object):
raise NotImplementedError("To be implemented by Subclass")
#} END classes
+
+
+class WaitGroup(object):
+ """WaitGroup is like Go sync.WaitGroup.
+
+ Without all the useful corner cases.
+ By Peter Teichman, taken from https://gist.github.com/pteichman/84b92ae7cef0ab98f5a8
+ """
+ def __init__(self):
+ self.count = 0
+ self.cv = threading.Condition()
+
+ def add(self, n):
+ self.cv.acquire()
+ self.count += n
+ self.cv.release()
+
+ def done(self):
+ self.cv.acquire()
+ self.count -= 1
+ if self.count == 0:
+ self.cv.notify_all()
+ self.cv.release()
+
+ def wait(self):
+ self.cv.acquire()
+ while self.count > 0:
+ self.cv.wait()
+ self.cv.release()