diff options
Diffstat (limited to 'git/util.py')
-rw-r--r-- | git/util.py | 155 |
1 files changed, 147 insertions, 8 deletions
diff --git a/git/util.py b/git/util.py index d00de1e4..b7d18023 100644 --- a/git/util.py +++ b/git/util.py @@ -5,6 +5,8 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php from __future__ import unicode_literals +import contextlib +from functools import wraps import getpass import logging import os @@ -13,10 +15,8 @@ import re import shutil import stat import time +from unittest.case import SkipTest -from functools import wraps - -from git.compat import is_win from gitdb.util import (# NOQA @IgnorePep8 make_sha, LockedFD, # @UnusedImport @@ -26,6 +26,7 @@ from gitdb.util import (# NOQA @IgnorePep8 to_bin_sha # @UnusedImport ) +from git.compat import is_win import os.path as osp from .compat import ( @@ -34,7 +35,6 @@ from .compat import ( PY3 ) from .exc import InvalidGitRepositoryError -from unittest.case import SkipTest # NOTE: Some of the unused imports might be used/imported by others. @@ -47,6 +47,8 @@ __all__ = ("stream_copy", "join_path", "to_native_path_windows", "to_native_path 'RemoteProgress', 'CallableRemoteProgress', 'rmtree', 'unbare_repo', 'HIDE_WINDOWS_KNOWN_ERRORS') +log = logging.getLogger(__name__) + #: We need an easy way to see if Appveyor TCs start failing, #: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, #: till then, we wish to hide them. @@ -70,6 +72,16 @@ def unbare_repo(func): return wrapper +@contextlib.contextmanager +def cwd(new_dir): + old_dir = os.getcwd() + os.chdir(new_dir) + try: + yield new_dir + finally: + os.chdir(old_dir) + + def rmtree(path): """Remove the given recursively. @@ -162,14 +174,141 @@ def assure_directory_exists(path, is_file=False): Otherwise it must be a directory :return: True if the directory was created, False if it already existed""" if is_file: - path = os.path.dirname(path) + path = osp.dirname(path) # END handle file - if not os.path.isdir(path): + if not osp.isdir(path): os.makedirs(path) return True return False +def _get_exe_extensions(): + try: + winprog_exts = tuple(p.upper() for p in os.environ['PATHEXT'].split(os.pathsep)) + except: + winprog_exts = ('.BAT', 'COM', '.EXE') + + return winprog_exts + + +def py_where(program, path=None): + # From: http://stackoverflow.com/a/377028/548792 + try: + winprog_exts = tuple(p.upper() for p in os.environ['PATHEXT'].split(os.pathsep)) + except: + winprog_exts = is_win and ('.BAT', 'COM', '.EXE') or () + + def is_exec(fpath): + return osp.isfile(fpath) and os.access(fpath, os.X_OK) and ( + os.name != 'nt' or not winprog_exts or any(fpath.upper().endswith(ext) + for ext in winprog_exts)) + + progs = [] + if not path: + path = os.environ["PATH"] + for folder in path.split(osp.pathsep): + folder = folder.strip('"') + if folder: + exe_path = osp.join(folder, program) + for f in [exe_path] + ['%s%s' % (exe_path, e) for e in winprog_exts]: + if is_exec(f): + progs.append(f) + return progs + + +def _cygexpath(drive, path): + if osp.isabs(path) and not drive: + ## Invoked from `cygpath()` directly with `D:Apps\123`? + # It's an error, leave it alone just slashes) + p = path + else: + p = osp.normpath(osp.expandvars(os.path.expanduser(path))) + if osp.isabs(p): + if drive: + # Confusing, maybe a remote system should expand vars. + p = path + else: + p = cygpath(p) + elif drive: + p = '/cygdrive/%s/%s' % (drive.lower(), p) + + return p.replace('\\', '/') + + +_cygpath_parsers = ( + ## See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx + ## and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths + (re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), + (lambda server, share, rest_path: '//%s/%s/%s' % (server, share, rest_path.replace('\\', '/'))), + False + ), + + (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), + _cygexpath, + False + ), + + (re.compile(r"(\w):[/\\](.*)"), + _cygexpath, + False + ), + + (re.compile(r"file:(.*)", re.I), + (lambda rest_path: rest_path), + True), + + (re.compile(r"(\w{2,}:.*)"), # remote URL, do nothing + (lambda url: url), + False), +) + + +def cygpath(path): + if not path.startswith(('/cygdrive', '//')): + for regex, parser, recurse in _cygpath_parsers: + match = regex.match(path) + if match: + path = parser(*match.groups()) + if recurse: + path = cygpath(path) + break + else: + path = _cygexpath(None, path) + + return path + + +#: Store boolean flags denoting if a specific Git executable +#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2). +_is_cygwin_cache = {} + + +def is_cygwin_git(git_executable): + if not is_win: + return False + + from subprocess import check_output + + is_cygwin = _is_cygwin_cache.get(git_executable) + if is_cygwin is None: + is_cygwin = False + try: + git_dir = osp.dirname(git_executable) + if not git_dir: + res = py_where(git_executable) + git_dir = osp.dirname(res[0]) if res else None + + ## Just a name given, not a real path. + uname_cmd = osp.join(git_dir, 'uname') + uname = check_output(uname_cmd, universal_newlines=True) + is_cygwin = 'CYGWIN' in uname + except Exception as ex: + log.debug('Failed checking if running in CYGWIN due to: %r', ex) + _is_cygwin_cache[git_executable] = is_cygwin + + return is_cygwin + + def get_user_id(): """:return: string identifying the currently active system user as name@node""" return "%s@%s" % (getpass.getuser(), platform.node()) @@ -589,7 +728,7 @@ class LockFile(object): if self._has_lock(): return lock_file = self._lock_file_path() - if os.path.isfile(lock_file): + if osp.isfile(lock_file): raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" % (self._file_path, lock_file)) @@ -659,7 +798,7 @@ class BlockingLockFile(LockFile): # synity check: if the directory leading to the lockfile is not # readable anymore, raise an execption curtime = time.time() - if not os.path.isdir(os.path.dirname(self._lock_file_path())): + if not osp.isdir(osp.dirname(self._lock_file_path())): msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( self._lock_file_path(), curtime - starttime) raise IOError(msg) |