diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2015-01-07 13:01:17 +0100 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2015-01-07 13:01:35 +0100 |
commit | 45eb728554953fafcee2aab0f76ca65e005326b0 (patch) | |
tree | 4fa71b51e4d017ad2188c6c385043caba29e868c | |
parent | d83f6e84cbeb45dce4576a9a4591446afefa50b2 (diff) | |
parent | 87a6ffa13ae2951a168cde5908c7a94b16562b96 (diff) | |
download | gitpython-45eb728554953fafcee2aab0f76ca65e005326b0.tar.gz |
Merge branch 'nonblocking-ops'
Fixes #145
-rw-r--r-- | doc/source/changes.rst | 5 | ||||
-rw-r--r-- | git/cmd.py | 117 | ||||
-rw-r--r-- | git/index/base.py | 19 | ||||
-rw-r--r-- | git/remote.py | 66 | ||||
-rw-r--r-- | git/repo/base.py | 13 | ||||
-rw-r--r-- | git/util.py | 40 |
6 files changed, 201 insertions, 59 deletions
diff --git a/doc/source/changes.rst b/doc/source/changes.rst index b7479e4f..4983b3d0 100644 --- a/doc/source/changes.rst +++ b/doc/source/changes.rst @@ -2,6 +2,11 @@ Changelog ========= +0.3.5 - Bugfixes +================ +* push/pull/fetch operations will not block anymore +* A list of all fixed issues can be found here: https://github.com/gitpython-developers/GitPython/issues?q=milestone%3A%22v0.3.5+-+bugfixes%22+ + 0.3.4 - Python 3 Support ======================== * Internally, hexadecimal SHA1 are treated as ascii encoded strings. Binary SHA1 are treated as bytes. @@ -6,7 +6,9 @@ import os import sys +import select import logging +import threading from subprocess import ( call, Popen, @@ -16,7 +18,8 @@ from subprocess import ( from .util import ( LazyMixin, - stream_copy + stream_copy, + WaitGroup ) from .exc import GitCommandError from git.compat import ( @@ -36,9 +39,121 @@ log = logging.getLogger('git.cmd') __all__ = ('Git', ) +# ============================================================================== +## @name Utilities +# ------------------------------------------------------------------------------ +# Documentation +## @{ + +def handle_process_output(process, stdout_handler, stderr_handler, finalizer): + """Registers for notifications to lean that process output is ready to read, and dispatches lines to + the respective line handlers. We are able to handle carriage returns in case progress is sent by that + mean. For performance reasons, we only apply this to stderr. + This function returns once the finalizer returns + :return: result of finalizer + :param process: subprocess.Popen instance + :param stdout_handler: f(stdout_line_string), or None + :param stderr_hanlder: f(stderr_line_string), or None + :param finalizer: f(proc) - wait for proc to finish""" + def read_line_fast(stream): + return stream.readline() + + def read_line_slow(stream): + line = b'' + while True: + char = stream.read(1) # reads individual single byte strings + if not char: + break + + if char in (b'\r', b'\n') and line: + break + else: + line += char + # END process parsed line + # END while file is not done reading + return line + # end + + def dispatch_line(fno): + stream, handler, readline = fdmap[fno] + # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, + # we are good ... + line = readline(stream).decode(defenc) + if line and handler: + handler(line) + return line + # end dispatch helper + # end + + def deplete_buffer(fno, wg=None): + while True: + line = dispatch_line(fno) + if not line: + break + # end deplete buffer + if wg: + wg.done() + # end + + fdmap = {process.stdout.fileno(): (process.stdout, stdout_handler, read_line_fast), + process.stderr.fileno(): (process.stderr, stderr_handler, read_line_slow)} + + if hasattr(select, 'poll'): + # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be + # an issue for us, as it matters how many handles or own process has + poll = select.poll() + READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR + CLOSED = select.POLLHUP | select.POLLERR + + poll.register(process.stdout, READ_ONLY) + poll.register(process.stderr, READ_ONLY) + + closed_streams = set() + while True: + # no timeout + poll_result = poll.poll() + for fd, result in poll_result: + if result & CLOSED: + closed_streams.add(fd) + else: + dispatch_line(fd) + # end handle closed stream + # end for each poll-result tuple + + if len(closed_streams) == len(fdmap): + break + # end its all done + # end endless loop + + # Depelete all remaining buffers + for fno in fdmap.keys(): + deplete_buffer(fno) + # end for each file handle + else: + # Oh ... probably we are on windows. select.select() can only handle sockets, we have files + # The only reliable way to do this now is to use threads and wait for both to finish + # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive + # NO: It's not enough unfortunately, and we will have to sync the threads + wg = WaitGroup() + for fno in fdmap.keys(): + wg.add(1) + t = threading.Thread(target=lambda: deplete_buffer(fno, wg)) + t.start() + # end + # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's + # actually started, which could make the wait() call to just return because the thread is not yet + # active + wg.wait() + # end + + return finalizer(process) + + def dashify(string): return string.replace('_', '-') +## -- End Utilities -- @} + class Git(LazyMixin): diff --git a/git/index/base.py b/git/index/base.py index cc883469..66fd5b1f 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -287,11 +287,11 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): changes according to the amount of trees. If 1 Tree is given, it will just be read into a new index If 2 Trees are given, they will be merged into a new index using a - two way merge algorithm. Tree 1 is the 'current' tree, tree 2 is the 'other' - one. It behaves like a fast-forward. - If 3 Trees are given, a 3-way merge will be performed with the first tree - being the common ancestor of tree 2 and tree 3. Tree 2 is the 'current' tree, - tree 3 is the 'other' one + two way merge algorithm. Tree 1 is the 'current' tree, tree 2 is the 'other' + one. It behaves like a fast-forward. + If 3 Trees are given, a 3-way merge will be performed with the first tree + being the common ancestor of tree 2 and tree 3. Tree 2 is the 'current' tree, + tree 3 is the 'other' one :param kwargs: Additional arguments passed to git-read-tree @@ -882,14 +882,11 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): def commit(self, message, parent_commits=None, head=True, author=None, committer=None): """Commit the current default index file, creating a commit object. - For more information on the arguments, see tree.commit. - :note: - If you have manually altered the .entries member of this instance, - don't forget to write() your changes to disk beforehand. - :return: - Commit object representing the new commit""" + :note: If you have manually altered the .entries member of this instance, + don't forget to write() your changes to disk beforehand. + :return: Commit object representing the new commit""" tree = self.write_tree() return Commit.create_from_tree(self.repo, tree, message, parent_commits, head, author=author, committer=committer) diff --git a/git/remote.py b/git/remote.py index 484bc031..87db5dd4 100644 --- a/git/remote.py +++ b/git/remote.py @@ -31,6 +31,7 @@ from git.util import ( join_path, finalize_process ) +from git.cmd import handle_process_output from gitdb.util import join from git.compat import defenc @@ -40,30 +41,6 @@ __all__ = ('RemoteProgress', 'PushInfo', 'FetchInfo', 'Remote') #{ Utilities -def digest_process_messages(fh, progress): - """Read progress messages from file-like object fh, supplying the respective - progress messages to the progress instance. - - :param fh: File handle to read from - :return: list(line, ...) list of lines without linebreaks that did - not contain progress information""" - line_so_far = b'' - dropped_lines = list() - while True: - char = fh.read(1) # reads individual single byte strings - if not char: - break - - if char in (b'\r', b'\n') and line_so_far: - dropped_lines.extend(progress._parse_progress_line(line_so_far.decode(defenc))) - line_so_far = b'' - else: - line_so_far += char - # END process parsed line - # END while file is not done reading - return dropped_lines - - def add_progress(kwargs, git, progress): """Add the --progress flag to the given kwargs dict if supported by the git command. If the actual progress in the given progress instance is not @@ -532,17 +509,25 @@ class Remote(LazyMixin, Iterable): # Basically we want all fetch info lines which appear to be in regular form, and thus have a # command character. Everything else we ignore, cmds = set(PushInfo._flag_map.keys()) & set(FetchInfo._flag_map.keys()) - for line in digest_process_messages(proc.stderr, progress): - if line.startswith('fatal:'): - raise GitCommandError(("Error when fetching: %s" % line,), 2) - # END handle special messages - for cmd in cmds: - if line[1] == cmd: - fetch_info_lines.append(line) - continue - # end find command code - # end for each comand code we know - # END for each line + + progress_handler = progress.new_message_handler() + + def my_progress_handler(line): + for pline in progress_handler(line): + if line.startswith('fatal:'): + raise GitCommandError(("Error when fetching: %s" % line,), 2) + # END handle special messages + for cmd in cmds: + if line[1] == cmd: + fetch_info_lines.append(line) + continue + # end find command code + # end for each comand code we know + # end for each line progress didn't handle + # end + + # We are only interested in stderr here ... + handle_process_output(proc, None, my_progress_handler, finalize_process) # read head information fp = open(join(self.repo.git_dir, 'FETCH_HEAD'), 'rb') @@ -555,7 +540,6 @@ class Remote(LazyMixin, Iterable): output.extend(FetchInfo._from_line(self.repo, err_line, fetch_line) for err_line, fetch_line in zip(fetch_info_lines, fetch_head_info)) - finalize_process(proc) return output def _get_push_info(self, proc, progress): @@ -564,11 +548,10 @@ class Remote(LazyMixin, Iterable): # read the lines manually as it will use carriage returns between the messages # to override the previous one. This is why we read the bytes manually # TODO: poll() on file descriptors to know what to read next, process streams concurrently - digest_process_messages(proc.stderr, progress) - + progress_handler = progress.new_message_handler() output = IterableList('name') - for line in proc.stdout.readlines(): - line = line.decode(defenc) + + def stdout_handler(line): try: output.append(PushInfo._from_line(self, line)) except ValueError: @@ -576,7 +559,8 @@ class Remote(LazyMixin, Iterable): pass # END exception handling # END for each line - finalize_process(proc) + + handle_process_output(proc, stdout_handler, progress_handler, finalize_process) return output def fetch(self, refspec=None, progress=None, **kwargs): diff --git a/git/repo/base.py b/git/repo/base.py index 2a63492b..e8db3540 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -5,7 +5,10 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php from git.exc import InvalidGitRepositoryError, NoSuchPathError -from git.cmd import Git +from git.cmd import ( + Git, + handle_process_output +) from git.refs import ( HEAD, Head, @@ -25,7 +28,6 @@ from git.index import IndexFile from git.config import GitConfigParser from git.remote import ( Remote, - digest_process_messages, add_progress ) @@ -711,9 +713,10 @@ class Repo(object): proc = git.clone(url, path, with_extended_output=True, as_process=True, v=True, **add_progress(kwargs, git, progress)) if progress: - digest_process_messages(proc.stderr, progress) - # END handle progress - finalize_process(proc) + handle_process_output(proc, None, progress.new_message_handler(), finalize_process) + else: + finalize_process(proc) + # end handle progress finally: if prev_cwd is not None: os.chdir(prev_cwd) diff --git a/git/util.py b/git/util.py index 4de736d3..4d1ea8d6 100644 --- a/git/util.py +++ b/git/util.py @@ -12,6 +12,7 @@ import stat import shutil import platform import getpass +import threading # NOTE: Some of the unused imports might be used/imported by others. # Handle once test-cases are back up and running. @@ -32,7 +33,7 @@ from gitdb.util import ( # NOQA __all__ = ("stream_copy", "join_path", "to_native_path_windows", "to_native_path_linux", "join_path_native", "Stats", "IndexFileSHA1Writer", "Iterable", "IterableList", "BlockingLockFile", "LockFile", 'Actor', 'get_user_id', 'assure_directory_exists', - 'RemoteProgress', 'rmtree') + 'RemoteProgress', 'rmtree', 'WaitGroup') #{ Utility Methods @@ -249,6 +250,14 @@ class RemoteProgress(object): # END for each sub line return failed_lines + def new_message_handler(self): + """:return: a progress handler suitable for handle_process_output(), passing lines on to this Progress + handler in a suitable format""" + def handler(line): + return self._parse_progress_line(line.rstrip()) + # end + return handler + def line_dropped(self, line): """Called whenever a line could not be understood and was therefore dropped.""" pass @@ -691,3 +700,32 @@ class Iterable(object): raise NotImplementedError("To be implemented by Subclass") #} END classes + + +class WaitGroup(object): + """WaitGroup is like Go sync.WaitGroup. + + Without all the useful corner cases. + By Peter Teichman, taken from https://gist.github.com/pteichman/84b92ae7cef0ab98f5a8 + """ + def __init__(self): + self.count = 0 + self.cv = threading.Condition() + + def add(self, n): + self.cv.acquire() + self.count += n + self.cv.release() + + def done(self): + self.cv.acquire() + self.count -= 1 + if self.count == 0: + self.cv.notify_all() + self.cv.release() + + def wait(self): + self.cv.acquire() + while self.count > 0: + self.cv.wait() + self.cv.release() |