diff options
| author | Kostis Anagnostopoulos <ankostis@gmail.com> | 2016-10-13 15:35:51 +0200 | 
|---|---|---|
| committer | Kostis Anagnostopoulos <ankostis@gmail.com> | 2016-10-14 17:43:17 +0200 | 
| commit | e6e23ed24b35c6154b4ee0da5ae51cd5688e5e67 (patch) | |
| tree | 173563f1efb1686ae6ac5a0a7f870cbbddc329fd /git/util.py | |
| parent | ba7c2a0f81f83c358ae256963da86f907ca7f13c (diff) | |
| download | gitpython-e6e23ed24b35c6154b4ee0da5ae51cd5688e5e67.tar.gz | |
cygwin, #533: Try to make it work with Cygwin's Git.
+ Make `Git.polish_url()` convert paths into Cygwin-friendly paths.
+ Add utility and soe TCs for funcs for detecting cygwin and converting
abs-paths to `/cygdrive/c/...`.
- Cygwin TCs failing:
  - PY2: err: 14, fail: 3
  - PY3: err: 13, fail: 3
Diffstat (limited to 'git/util.py')
| -rw-r--r-- | git/util.py | 155 | 
1 files changed, 147 insertions, 8 deletions
| diff --git a/git/util.py b/git/util.py index d00de1e4..b7d18023 100644 --- a/git/util.py +++ b/git/util.py @@ -5,6 +5,8 @@  # the BSD License: http://www.opensource.org/licenses/bsd-license.php  from __future__ import unicode_literals +import contextlib +from functools import wraps  import getpass  import logging  import os @@ -13,10 +15,8 @@ import re  import shutil  import stat  import time +from unittest.case import SkipTest -from functools import wraps - -from git.compat import is_win  from gitdb.util import (# NOQA @IgnorePep8      make_sha,      LockedFD,               # @UnusedImport @@ -26,6 +26,7 @@ from gitdb.util import (# NOQA @IgnorePep8      to_bin_sha              # @UnusedImport  ) +from git.compat import is_win  import os.path as osp  from .compat import ( @@ -34,7 +35,6 @@ from .compat import (      PY3  )  from .exc import InvalidGitRepositoryError -from unittest.case import SkipTest  # NOTE:  Some of the unused imports might be used/imported by others. @@ -47,6 +47,8 @@ __all__ = ("stream_copy", "join_path", "to_native_path_windows", "to_native_path             'RemoteProgress', 'CallableRemoteProgress', 'rmtree', 'unbare_repo',             'HIDE_WINDOWS_KNOWN_ERRORS') +log = logging.getLogger(__name__) +  #: We need an easy way to see if Appveyor TCs start failing,  #: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,  #: till then, we wish to hide them. @@ -70,6 +72,16 @@ def unbare_repo(func):      return wrapper +@contextlib.contextmanager +def cwd(new_dir): +    old_dir = os.getcwd() +    os.chdir(new_dir) +    try: +        yield new_dir +    finally: +        os.chdir(old_dir) + +  def rmtree(path):      """Remove the given recursively. @@ -162,14 +174,141 @@ def assure_directory_exists(path, is_file=False):          Otherwise it must be a directory      :return: True if the directory was created, False if it already existed"""      if is_file: -        path = os.path.dirname(path) +        path = osp.dirname(path)      # END handle file -    if not os.path.isdir(path): +    if not osp.isdir(path):          os.makedirs(path)          return True      return False +def _get_exe_extensions(): +    try: +        winprog_exts = tuple(p.upper() for p in os.environ['PATHEXT'].split(os.pathsep)) +    except: +        winprog_exts = ('.BAT', 'COM', '.EXE') + +    return winprog_exts + + +def py_where(program, path=None): +    # From: http://stackoverflow.com/a/377028/548792 +    try: +        winprog_exts = tuple(p.upper() for p in os.environ['PATHEXT'].split(os.pathsep)) +    except: +        winprog_exts = is_win and ('.BAT', 'COM', '.EXE') or () + +    def is_exec(fpath): +        return osp.isfile(fpath) and os.access(fpath, os.X_OK) and ( +            os.name != 'nt' or not winprog_exts or any(fpath.upper().endswith(ext) +                                                       for ext in winprog_exts)) + +    progs = [] +    if not path: +        path = os.environ["PATH"] +    for folder in path.split(osp.pathsep): +        folder = folder.strip('"') +        if folder: +            exe_path = osp.join(folder, program) +            for f in [exe_path] + ['%s%s' % (exe_path, e) for e in winprog_exts]: +                if is_exec(f): +                    progs.append(f) +    return progs + + +def _cygexpath(drive, path): +    if osp.isabs(path) and not drive: +        ## Invoked from `cygpath()` directly with `D:Apps\123`? +        #  It's an error, leave it alone just slashes) +        p = path +    else: +        p = osp.normpath(osp.expandvars(os.path.expanduser(path))) +        if osp.isabs(p): +            if drive: +                # Confusing, maybe a remote system should expand vars. +                p = path +            else: +                p = cygpath(p) +        elif drive: +            p = '/cygdrive/%s/%s' % (drive.lower(), p) + +    return p.replace('\\', '/') + + +_cygpath_parsers = ( +    ## See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx +    ## and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths +    (re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), +     (lambda server, share, rest_path: '//%s/%s/%s' % (server, share, rest_path.replace('\\', '/'))), +     False +     ), + +    (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), +     _cygexpath, +     False +     ), + +    (re.compile(r"(\w):[/\\](.*)"), +     _cygexpath, +     False +     ), + +    (re.compile(r"file:(.*)", re.I), +     (lambda rest_path: rest_path), +     True), + +    (re.compile(r"(\w{2,}:.*)"),  # remote URL, do nothing +     (lambda url: url), +     False), +) + + +def cygpath(path): +    if not path.startswith(('/cygdrive', '//')): +        for regex, parser, recurse in _cygpath_parsers: +            match = regex.match(path) +            if match: +                path = parser(*match.groups()) +                if recurse: +                    path = cygpath(path) +                break +        else: +            path = _cygexpath(None, path) + +    return path + + +#: Store boolean flags denoting if a specific Git executable +#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2). +_is_cygwin_cache = {} + + +def is_cygwin_git(git_executable): +    if not is_win: +        return False + +    from subprocess import check_output + +    is_cygwin = _is_cygwin_cache.get(git_executable) +    if is_cygwin is None: +        is_cygwin = False +        try: +            git_dir = osp.dirname(git_executable) +            if not git_dir: +                res = py_where(git_executable) +                git_dir = osp.dirname(res[0]) if res else None + +            ## Just a name given, not a real path. +            uname_cmd = osp.join(git_dir, 'uname') +            uname = check_output(uname_cmd, universal_newlines=True) +            is_cygwin = 'CYGWIN' in uname +        except Exception as ex: +            log.debug('Failed checking if running in CYGWIN due to: %r', ex) +        _is_cygwin_cache[git_executable] = is_cygwin + +    return is_cygwin + +  def get_user_id():      """:return: string identifying the currently active system user as name@node"""      return "%s@%s" % (getpass.getuser(), platform.node()) @@ -589,7 +728,7 @@ class LockFile(object):          if self._has_lock():              return          lock_file = self._lock_file_path() -        if os.path.isfile(lock_file): +        if osp.isfile(lock_file):              raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" %                            (self._file_path, lock_file)) @@ -659,7 +798,7 @@ class BlockingLockFile(LockFile):                  # synity check: if the directory leading to the lockfile is not                  # readable anymore, raise an execption                  curtime = time.time() -                if not os.path.isdir(os.path.dirname(self._lock_file_path())): +                if not osp.isdir(osp.dirname(self._lock_file_path())):                      msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (                          self._lock_file_path(), curtime - starttime)                      raise IOError(msg) | 
