diff options
Diffstat (limited to 'git/config.py')
-rw-r--r-- | git/config.py | 190 |
1 files changed, 109 insertions, 81 deletions
diff --git a/git/config.py b/git/config.py index ea7302f4..cc6fcfa4 100644 --- a/git/config.py +++ b/git/config.py @@ -9,7 +9,7 @@ configuration files""" import abc from functools import wraps import inspect -from io import IOBase +from io import BufferedReader, IOBase import logging import os import re @@ -29,14 +29,16 @@ import os.path as osp import configparser as cp +from pathlib import Path + # typing------------------------------------------------------- -from typing import TYPE_CHECKING, Tuple +from typing import Any, Callable, IO, List, Dict, Sequence, TYPE_CHECKING, Tuple, Union, cast, overload -from git.types import Literal +from git.types import Literal, Lit_config_levels, PathLike, TBD if TYPE_CHECKING: - pass + from git.repo.base import Repo # ------------------------------------------------------------- @@ -59,7 +61,7 @@ CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbr class MetaParserBuilder(abc.ABCMeta): """Utlity class wrapping base-class methods into decorators that assure read-only properties""" - def __new__(cls, name, bases, clsdict): + def __new__(cls, name: str, bases: TBD, clsdict: Dict[str, Any]) -> TBD: """ Equip all base-class methods with a needs_values decorator, and all non-const methods with a set_dirty_and_flush_changes decorator in addition to that.""" @@ -85,23 +87,23 @@ class MetaParserBuilder(abc.ABCMeta): return new_type -def needs_values(func): +def needs_values(func: Callable) -> Callable: """Returns method assuring we read values (on demand) before we try to access them""" @wraps(func) - def assure_data_present(self, *args, **kwargs): + def assure_data_present(self, *args: Any, **kwargs: Any) -> Any: self.read() return func(self, *args, **kwargs) # END wrapper method return assure_data_present -def set_dirty_and_flush_changes(non_const_func): +def set_dirty_and_flush_changes(non_const_func: Callable) -> Callable: """Return method that checks whether given non constant function may be called. If so, the instance will be set dirty. Additionally, we flush the changes right to disk""" - def flush_changes(self, *args, **kwargs): + def flush_changes(self, *args: Any, **kwargs: Any) -> Any: rval = non_const_func(self, *args, **kwargs) self._dirty = True self.write() @@ -124,66 +126,65 @@ class SectionConstraint(object): _valid_attrs_ = ("get_value", "set_value", "get", "set", "getint", "getfloat", "getboolean", "has_option", "remove_section", "remove_option", "options") - def __init__(self, config, section): + def __init__(self, config: 'GitConfigParser', section: str) -> None: self._config = config self._section_name = section - def __del__(self): + def __del__(self) -> None: # Yes, for some reason, we have to call it explicitly for it to work in PY3 ! # Apparently __del__ doesn't get call anymore if refcount becomes 0 # Ridiculous ... . self._config.release() - def __getattr__(self, attr): + def __getattr__(self, attr: str) -> Any: if attr in self._valid_attrs_: return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs) return super(SectionConstraint, self).__getattribute__(attr) - def _call_config(self, method, *args, **kwargs): + def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any: """Call the configuration at the given method which must take a section name as first argument""" return getattr(self._config, method)(self._section_name, *args, **kwargs) @property - def config(self): + def config(self) -> 'GitConfigParser': """return: Configparser instance we constrain""" return self._config - def release(self): + def release(self) -> None: """Equivalent to GitConfigParser.release(), which is called on our underlying parser instance""" return self._config.release() - def __enter__(self): + def __enter__(self) -> 'SectionConstraint': self._config.__enter__() return self - def __exit__(self, exception_type, exception_value, traceback): + def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None: self._config.__exit__(exception_type, exception_value, traceback) class _OMD(OrderedDict): """Ordered multi-dict.""" - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: Any) -> None: super(_OMD, self).__setitem__(key, [value]) - def add(self, key, value): + def add(self, key: str, value: Any) -> None: if key not in self: super(_OMD, self).__setitem__(key, [value]) - return - + return None super(_OMD, self).__getitem__(key).append(value) - def setall(self, key, values): + def setall(self, key: str, values: Any) -> None: super(_OMD, self).__setitem__(key, values) - def __getitem__(self, key): + def __getitem__(self, key: str) -> Any: return super(_OMD, self).__getitem__(key)[-1] - def getlast(self, key): + def getlast(self, key: str) -> Any: return super(_OMD, self).__getitem__(key)[-1] - def setlast(self, key, value): + def setlast(self, key: str, value: Any) -> None: if key not in self: super(_OMD, self).__setitem__(key, [value]) return @@ -191,22 +192,30 @@ class _OMD(OrderedDict): prior = super(_OMD, self).__getitem__(key) prior[-1] = value - def get(self, key, default=None): + @overload + def get(self, key: str, default: None = ...) -> None: + ... + + @overload + def get(self, key: str, default: Any = ...) -> Any: + ... + + def get(self, key: str, default: Union[Any, None] = None) -> Union[Any, None]: return super(_OMD, self).get(key, [default])[-1] - def getall(self, key): + def getall(self, key: str) -> Any: return super(_OMD, self).__getitem__(key) - def items(self): + def items(self) -> List[Tuple[str, Any]]: # type: ignore ## mypy doesn't like overwriting supertype signitures """List of (key, last value for key).""" return [(k, self[k]) for k in self] - def items_all(self): + def items_all(self) -> List[Tuple[str, List[Any]]]: """List of (key, list of values for key).""" return [(k, self.getall(k)) for k in self] -def get_config_path(config_level: Literal['system', 'global', 'user', 'repository']) -> str: +def get_config_path(config_level: Lit_config_levels) -> str: # we do not support an absolute path of the gitconfig on windows , # use the global config instead @@ -264,7 +273,10 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje # list of RawConfigParser methods able to change the instance _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set") - def __init__(self, file_or_files=None, read_only=True, merge_includes=True, config_level=None, repo=None): + def __init__(self, file_or_files: Union[None, PathLike, IO, Sequence[Union[PathLike, IO]]] = None, + read_only: bool = True, merge_includes: bool = True, + config_level: Union[Lit_config_levels, None] = None, + repo: Union['Repo', None] = None) -> None: """Initialize a configuration reader to read the given file_or_files and to possibly allow changes to it by setting read_only False @@ -290,11 +302,13 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._proxies = self._dict() if file_or_files is not None: - self._file_or_files = file_or_files + self._file_or_files = file_or_files # type: Union[PathLike, IO, Sequence[Union[PathLike, IO]]] else: if config_level is None: if read_only: - self._file_or_files = [get_config_path(f) for f in CONFIG_LEVELS if f != 'repository'] + self._file_or_files = [get_config_path(f) # type: ignore + for f in CONFIG_LEVELS # Can type f properly when 3.5 dropped + if f != 'repository'] else: raise ValueError("No configuration level or configuration files specified") else: @@ -305,10 +319,10 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._is_initialized = False self._merge_includes = merge_includes self._repo = repo - self._lock = None + self._lock = None # type: Union['LockFile', None] self._acquire_lock() - def _acquire_lock(self): + def _acquire_lock(self) -> None: if not self._read_only: if not self._lock: if isinstance(self._file_or_files, (tuple, list)): @@ -316,9 +330,11 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje "Write-ConfigParsers can operate on a single file only, multiple files have been passed") # END single file check - file_or_files = self._file_or_files - if not isinstance(self._file_or_files, str): - file_or_files = self._file_or_files.name + if isinstance(self._file_or_files, (str, Path)): # cannot narrow by os._pathlike until 3.5 dropped + file_or_files = self._file_or_files + else: + file_or_files = cast(IO, self._file_or_files).name + # END get filename from handle/stream # initialize lock base - we want to write self._lock = self.t_lock(file_or_files) @@ -327,19 +343,19 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._lock._obtain_lock() # END read-only check - def __del__(self): + def __del__(self) -> None: """Write pending changes if required and release locks""" # NOTE: only consistent in PY2 self.release() - def __enter__(self): + def __enter__(self) -> 'GitConfigParser': self._acquire_lock() return self - def __exit__(self, exception_type, exception_value, traceback): + def __exit__(self, exception_type, exception_value, traceback) -> None: self.release() - def release(self): + def release(self) -> None: """Flush changes and release the configuration write lock. This instance must not be used anymore afterwards. In Python 3, it's required to explicitly release locks and flush changes, as __del__ is not called deterministically anymore.""" @@ -359,13 +375,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje # Usually when shutting down the interpreter, don'y know how to fix this pass finally: - self._lock._release_lock() + if self._lock is not None: + self._lock._release_lock() - def optionxform(self, optionstr): + def optionxform(self, optionstr: str) -> str: """Do not transform options in any way when writing""" return optionstr - def _read(self, fp, fpname): + def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None: """A direct copy of the py2.4 version of the super class's _read method to assure it uses ordered dicts. Had to change one line to make it work. @@ -381,7 +398,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje is_multi_line = False e = None # None, or an exception - def string_decode(v): + def string_decode(v: str) -> str: if v[-1] == '\\': v = v[:-1] # end cut trailing escapes to prevent decode error @@ -463,11 +480,12 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje if e: raise e - def _has_includes(self): + def _has_includes(self) -> Union[bool, int]: return self._merge_includes and len(self._included_paths()) - def _included_paths(self): - """Return all paths that must be included to configuration. + def _included_paths(self) -> List[Tuple[str, str]]: + """Return List all paths that must be included to configuration + as Tuples of (option, value). """ paths = [] @@ -500,9 +518,9 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje ), value ) - - if fnmatch.fnmatchcase(self._repo.git_dir, value): - paths += self.items(section) + if self._repo.git_dir: + if fnmatch.fnmatchcase(str(self._repo.git_dir), value): + paths += self.items(section) elif keyword == "onbranch": try: @@ -516,33 +534,38 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return paths - def read(self): + def read(self) -> None: """Reads the data stored in the files we have been initialized with. It will ignore files that cannot be read, possibly leaving an empty configuration :return: Nothing :raise IOError: if a file cannot be handled""" if self._is_initialized: - return + return None self._is_initialized = True - if not isinstance(self._file_or_files, (tuple, list)): - files_to_read = [self._file_or_files] + files_to_read = [""] # type: List[Union[PathLike, IO]] ## just for types until 3.5 dropped + if isinstance(self._file_or_files, (str)): # replace with PathLike once 3.5 dropped + files_to_read = [self._file_or_files] # for str, as str is a type of Sequence + elif not isinstance(self._file_or_files, (tuple, list, Sequence)): + files_to_read = [self._file_or_files] # for IO or Path else: - files_to_read = list(self._file_or_files) + files_to_read = list(self._file_or_files) # for lists or tuples # end assure we have a copy of the paths to handle seen = set(files_to_read) num_read_include_files = 0 while files_to_read: file_path = files_to_read.pop(0) - fp = file_path file_ok = False - if hasattr(fp, "seek"): - self._read(fp, fp.name) + if hasattr(file_path, "seek"): + # must be a file objectfile-object + file_path = cast(IO[bytes], file_path) # replace with assert to narrow type, once sure + self._read(file_path, file_path.name) else: # assume a path if it is not a file-object + file_path = cast(PathLike, file_path) try: with open(file_path, 'rb') as fp: file_ok = True @@ -560,6 +583,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje if not file_ok: continue # end ignore relative paths if we don't know the configuration file path + file_path = cast(PathLike, file_path) assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work" include_path = osp.join(osp.dirname(file_path), include_path) # end make include path absolute @@ -580,7 +604,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._merge_includes = False # end - def _write(self, fp): + def _write(self, fp: IO) -> None: """Write an .ini-format representation of the configuration state in git compatible format""" def write_section(name, section_dict): @@ -599,11 +623,11 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje for name, value in self._sections.items(): write_section(name, value) - def items(self, section_name): + def items(self, section_name: str) -> List[Tuple[str, str]]: """:return: list((option, value), ...) pairs of all items in the given section""" return [(k, v) for k, v in super(GitConfigParser, self).items(section_name) if k != '__name__'] - def items_all(self, section_name): + def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]: """:return: list((option, [values...]), ...) pairs of all items in the given section""" rv = _OMD(self._defaults) @@ -620,7 +644,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return rv.items_all() @needs_values - def write(self): + def write(self) -> None: """Write changes to our file, if there are changes at all :raise IOError: if this is a read-only writer instance or if we could not obtain @@ -637,39 +661,44 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje if self._has_includes(): log.debug("Skipping write-back of configuration file as include files were merged in." + "Set merge_includes=False to prevent this.") - return + return None # end fp = self._file_or_files # we have a physical file on disk, so get a lock - is_file_lock = isinstance(fp, (str, IOBase)) - if is_file_lock: + is_file_lock = isinstance(fp, (str, IOBase)) # can't use Pathlike until 3.5 dropped + if is_file_lock and self._lock is not None: # else raise Error? self._lock._obtain_lock() + if not hasattr(fp, "seek"): - with open(self._file_or_files, "wb") as fp: - self._write(fp) + fp = cast(PathLike, fp) + with open(fp, "wb") as fp_open: + self._write(fp_open) else: + fp = cast(IO, fp) fp.seek(0) # make sure we do not overwrite into an existing file if hasattr(fp, 'truncate'): fp.truncate() self._write(fp) - def _assure_writable(self, method_name): + def _assure_writable(self, method_name: str) -> None: if self.read_only: raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name)) - def add_section(self, section): + def add_section(self, section: str) -> None: """Assures added options will stay in order""" return super(GitConfigParser, self).add_section(section) @property - def read_only(self): + def read_only(self) -> bool: """:return: True if this instance may change the configuration file""" return self._read_only - def get_value(self, section, option, default=None): + def get_value(self, section: str, option: str, default: Union[int, float, str, bool, None] = None + ) -> Union[int, float, str, bool]: + # can default or return type include bool? """Get an option's value. If multiple values are specified for this option in the section, the @@ -691,7 +720,8 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return self._string_to_value(valuestr) - def get_values(self, section, option, default=None): + def get_values(self, section: str, option: str, default: Union[int, float, str, bool, None] = None + ) -> List[Union[int, float, str, bool]]: """Get an option's values. If multiple values are specified for this option in the section, all are @@ -713,16 +743,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return [self._string_to_value(valuestr) for valuestr in lst] - def _string_to_value(self, valuestr): + def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]: types = (int, float) for numtype in types: try: val = numtype(valuestr) - # truncated value ? if val != float(valuestr): continue - return val except (ValueError, TypeError): continue @@ -742,14 +770,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return valuestr - def _value_to_string(self, value): + def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str: if isinstance(value, (int, float, bool)): return str(value) return force_text(value) @needs_values @set_dirty_and_flush_changes - def set_value(self, section, option, value): + def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> 'GitConfigParser': """Sets the given option in section to the given value. It will create the section if required, and will not throw as opposed to the default ConfigParser 'set' method. @@ -767,7 +795,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje @needs_values @set_dirty_and_flush_changes - def add_value(self, section, option, value): + def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> 'GitConfigParser': """Adds a value for the given option in section. It will create the section if required, and will not throw as opposed to the default ConfigParser 'set' method. The value becomes the new value of the option as returned @@ -784,7 +812,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._sections[section].add(option, self._value_to_string(value)) return self - def rename_section(self, section, new_name): + def rename_section(self, section: str, new_name: str) -> 'GitConfigParser': """rename the given section to new_name :raise ValueError: if section doesn't exit :raise ValueError: if a section with new_name does already exist |