summaryrefslogtreecommitdiff
path: root/git/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/util.py')
-rw-r--r--git/util.py155
1 files changed, 147 insertions, 8 deletions
diff --git a/git/util.py b/git/util.py
index d00de1e4..b7d18023 100644
--- a/git/util.py
+++ b/git/util.py
@@ -5,6 +5,8 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from __future__ import unicode_literals
+import contextlib
+from functools import wraps
import getpass
import logging
import os
@@ -13,10 +15,8 @@ import re
import shutil
import stat
import time
+from unittest.case import SkipTest
-from functools import wraps
-
-from git.compat import is_win
from gitdb.util import (# NOQA @IgnorePep8
make_sha,
LockedFD, # @UnusedImport
@@ -26,6 +26,7 @@ from gitdb.util import (# NOQA @IgnorePep8
to_bin_sha # @UnusedImport
)
+from git.compat import is_win
import os.path as osp
from .compat import (
@@ -34,7 +35,6 @@ from .compat import (
PY3
)
from .exc import InvalidGitRepositoryError
-from unittest.case import SkipTest
# NOTE: Some of the unused imports might be used/imported by others.
@@ -47,6 +47,8 @@ __all__ = ("stream_copy", "join_path", "to_native_path_windows", "to_native_path
'RemoteProgress', 'CallableRemoteProgress', 'rmtree', 'unbare_repo',
'HIDE_WINDOWS_KNOWN_ERRORS')
+log = logging.getLogger(__name__)
+
#: We need an easy way to see if Appveyor TCs start failing,
#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,
#: till then, we wish to hide them.
@@ -70,6 +72,16 @@ def unbare_repo(func):
return wrapper
+@contextlib.contextmanager
+def cwd(new_dir):
+ old_dir = os.getcwd()
+ os.chdir(new_dir)
+ try:
+ yield new_dir
+ finally:
+ os.chdir(old_dir)
+
+
def rmtree(path):
"""Remove the given recursively.
@@ -162,14 +174,141 @@ def assure_directory_exists(path, is_file=False):
Otherwise it must be a directory
:return: True if the directory was created, False if it already existed"""
if is_file:
- path = os.path.dirname(path)
+ path = osp.dirname(path)
# END handle file
- if not os.path.isdir(path):
+ if not osp.isdir(path):
os.makedirs(path)
return True
return False
+def _get_exe_extensions():
+ try:
+ winprog_exts = tuple(p.upper() for p in os.environ['PATHEXT'].split(os.pathsep))
+ except:
+ winprog_exts = ('.BAT', 'COM', '.EXE')
+
+ return winprog_exts
+
+
+def py_where(program, path=None):
+ # From: http://stackoverflow.com/a/377028/548792
+ try:
+ winprog_exts = tuple(p.upper() for p in os.environ['PATHEXT'].split(os.pathsep))
+ except:
+ winprog_exts = is_win and ('.BAT', 'COM', '.EXE') or ()
+
+ def is_exec(fpath):
+ return osp.isfile(fpath) and os.access(fpath, os.X_OK) and (
+ os.name != 'nt' or not winprog_exts or any(fpath.upper().endswith(ext)
+ for ext in winprog_exts))
+
+ progs = []
+ if not path:
+ path = os.environ["PATH"]
+ for folder in path.split(osp.pathsep):
+ folder = folder.strip('"')
+ if folder:
+ exe_path = osp.join(folder, program)
+ for f in [exe_path] + ['%s%s' % (exe_path, e) for e in winprog_exts]:
+ if is_exec(f):
+ progs.append(f)
+ return progs
+
+
+def _cygexpath(drive, path):
+ if osp.isabs(path) and not drive:
+ ## Invoked from `cygpath()` directly with `D:Apps\123`?
+ # It's an error, leave it alone just slashes)
+ p = path
+ else:
+ p = osp.normpath(osp.expandvars(os.path.expanduser(path)))
+ if osp.isabs(p):
+ if drive:
+ # Confusing, maybe a remote system should expand vars.
+ p = path
+ else:
+ p = cygpath(p)
+ elif drive:
+ p = '/cygdrive/%s/%s' % (drive.lower(), p)
+
+ return p.replace('\\', '/')
+
+
+_cygpath_parsers = (
+ ## See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
+ ## and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
+ (re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
+ (lambda server, share, rest_path: '//%s/%s/%s' % (server, share, rest_path.replace('\\', '/'))),
+ False
+ ),
+
+ (re.compile(r"\\\\\?\\(\w):[/\\](.*)"),
+ _cygexpath,
+ False
+ ),
+
+ (re.compile(r"(\w):[/\\](.*)"),
+ _cygexpath,
+ False
+ ),
+
+ (re.compile(r"file:(.*)", re.I),
+ (lambda rest_path: rest_path),
+ True),
+
+ (re.compile(r"(\w{2,}:.*)"), # remote URL, do nothing
+ (lambda url: url),
+ False),
+)
+
+
+def cygpath(path):
+ if not path.startswith(('/cygdrive', '//')):
+ for regex, parser, recurse in _cygpath_parsers:
+ match = regex.match(path)
+ if match:
+ path = parser(*match.groups())
+ if recurse:
+ path = cygpath(path)
+ break
+ else:
+ path = _cygexpath(None, path)
+
+ return path
+
+
+#: Store boolean flags denoting if a specific Git executable
+#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2).
+_is_cygwin_cache = {}
+
+
+def is_cygwin_git(git_executable):
+ if not is_win:
+ return False
+
+ from subprocess import check_output
+
+ is_cygwin = _is_cygwin_cache.get(git_executable)
+ if is_cygwin is None:
+ is_cygwin = False
+ try:
+ git_dir = osp.dirname(git_executable)
+ if not git_dir:
+ res = py_where(git_executable)
+ git_dir = osp.dirname(res[0]) if res else None
+
+ ## Just a name given, not a real path.
+ uname_cmd = osp.join(git_dir, 'uname')
+ uname = check_output(uname_cmd, universal_newlines=True)
+ is_cygwin = 'CYGWIN' in uname
+ except Exception as ex:
+ log.debug('Failed checking if running in CYGWIN due to: %r', ex)
+ _is_cygwin_cache[git_executable] = is_cygwin
+
+ return is_cygwin
+
+
def get_user_id():
""":return: string identifying the currently active system user as name@node"""
return "%s@%s" % (getpass.getuser(), platform.node())
@@ -589,7 +728,7 @@ class LockFile(object):
if self._has_lock():
return
lock_file = self._lock_file_path()
- if os.path.isfile(lock_file):
+ if osp.isfile(lock_file):
raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" %
(self._file_path, lock_file))
@@ -659,7 +798,7 @@ class BlockingLockFile(LockFile):
# synity check: if the directory leading to the lockfile is not
# readable anymore, raise an execption
curtime = time.time()
- if not os.path.isdir(os.path.dirname(self._lock_file_path())):
+ if not osp.isdir(osp.dirname(self._lock_file_path())):
msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (
self._lock_file_path(), curtime - starttime)
raise IOError(msg)