summaryrefslogtreecommitdiff
path: root/Lib/importlib
diff options
context:
space:
mode:
authorBarry Warsaw <barry@python.org>2017-12-30 15:18:06 -0500
committerGitHub <noreply@github.com>2017-12-30 15:18:06 -0500
commitdeae6b451fefd5fd3143dd65051e1d341e5a5f84 (patch)
tree21d0732eb5dd8f7d5ca469b39a1f3b703f3f83f3 /Lib/importlib
parentffcb4c0165827d0a48ea973cc88bc134c74879fb (diff)
downloadcpython-git-deae6b451fefd5fd3143dd65051e1d341e5a5f84.tar.gz
bpo-32248 - Implement importlib.resources (#4911)
Port importlib_resources to importlib.resources
Diffstat (limited to 'Lib/importlib')
-rw-r--r--Lib/importlib/resources.py303
1 files changed, 303 insertions, 0 deletions
diff --git a/Lib/importlib/resources.py b/Lib/importlib/resources.py
new file mode 100644
index 0000000000..8511f24d8e
--- /dev/null
+++ b/Lib/importlib/resources.py
@@ -0,0 +1,303 @@
+import os
+import tempfile
+
+from . import abc as resources_abc
+from builtins import open as builtins_open
+from contextlib import contextmanager, suppress
+from importlib import import_module
+from importlib.abc import ResourceLoader
+from io import BytesIO, TextIOWrapper
+from pathlib import Path
+from types import ModuleType
+from typing import Iterator, Optional, Set, Union # noqa: F401
+from typing import cast
+from typing.io import BinaryIO, TextIO
+from zipfile import ZipFile
+
+
+Package = Union[str, ModuleType]
+Resource = Union[str, os.PathLike]
+
+
+def _get_package(package) -> ModuleType:
+ """Take a package name or module object and return the module.
+
+ If a name, the module is imported. If the passed or imported module
+ object is not a package, raise an exception.
+ """
+ if hasattr(package, '__spec__'):
+ if package.__spec__.submodule_search_locations is None:
+ raise TypeError('{!r} is not a package'.format(
+ package.__spec__.name))
+ else:
+ return package
+ else:
+ module = import_module(package)
+ if module.__spec__.submodule_search_locations is None:
+ raise TypeError('{!r} is not a package'.format(package))
+ else:
+ return module
+
+
+def _normalize_path(path) -> str:
+ """Normalize a path by ensuring it is a string.
+
+ If the resulting string contains path separators, an exception is raised.
+ """
+ str_path = str(path)
+ parent, file_name = os.path.split(str_path)
+ if parent:
+ raise ValueError('{!r} must be only a file name'.format(path))
+ else:
+ return file_name
+
+
+def _get_resource_reader(
+ package: ModuleType) -> Optional[resources_abc.ResourceReader]:
+ # Return the package's loader if it's a ResourceReader. We can't use
+ # a issubclass() check here because apparently abc.'s __subclasscheck__()
+ # hook wants to create a weak reference to the object, but
+ # zipimport.zipimporter does not support weak references, resulting in a
+ # TypeError. That seems terrible.
+ if hasattr(package.__spec__.loader, 'open_resource'):
+ return cast(resources_abc.ResourceReader, package.__spec__.loader)
+ return None
+
+
+def open_binary(package: Package, resource: Resource) -> BinaryIO:
+ """Return a file-like object opened for binary reading of the resource."""
+ resource = _normalize_path(resource)
+ package = _get_package(package)
+ reader = _get_resource_reader(package)
+ if reader is not None:
+ return reader.open_resource(resource)
+ absolute_package_path = os.path.abspath(package.__spec__.origin)
+ package_path = os.path.dirname(absolute_package_path)
+ full_path = os.path.join(package_path, resource)
+ try:
+ return builtins_open(full_path, mode='rb')
+ except OSError:
+ # Just assume the loader is a resource loader; all the relevant
+ # importlib.machinery loaders are and an AttributeError for
+ # get_data() will make it clear what is needed from the loader.
+ loader = cast(ResourceLoader, package.__spec__.loader)
+ data = None
+ if hasattr(package.__spec__.loader, 'get_data'):
+ with suppress(OSError):
+ data = loader.get_data(full_path)
+ if data is None:
+ package_name = package.__spec__.name
+ message = '{!r} resource not found in {!r}'.format(
+ resource, package_name)
+ raise FileNotFoundError(message)
+ else:
+ return BytesIO(data)
+
+
+def open_text(package: Package,
+ resource: Resource,
+ encoding: str = 'utf-8',
+ errors: str = 'strict') -> TextIO:
+ """Return a file-like object opened for text reading of the resource."""
+ resource = _normalize_path(resource)
+ package = _get_package(package)
+ reader = _get_resource_reader(package)
+ if reader is not None:
+ return TextIOWrapper(reader.open_resource(resource), encoding, errors)
+ absolute_package_path = os.path.abspath(package.__spec__.origin)
+ package_path = os.path.dirname(absolute_package_path)
+ full_path = os.path.join(package_path, resource)
+ try:
+ return builtins_open(
+ full_path, mode='r', encoding=encoding, errors=errors)
+ except OSError:
+ # Just assume the loader is a resource loader; all the relevant
+ # importlib.machinery loaders are and an AttributeError for
+ # get_data() will make it clear what is needed from the loader.
+ loader = cast(ResourceLoader, package.__spec__.loader)
+ data = None
+ if hasattr(package.__spec__.loader, 'get_data'):
+ with suppress(OSError):
+ data = loader.get_data(full_path)
+ if data is None:
+ package_name = package.__spec__.name
+ message = '{!r} resource not found in {!r}'.format(
+ resource, package_name)
+ raise FileNotFoundError(message)
+ else:
+ return TextIOWrapper(BytesIO(data), encoding, errors)
+
+
+def read_binary(package: Package, resource: Resource) -> bytes:
+ """Return the binary contents of the resource."""
+ resource = _normalize_path(resource)
+ package = _get_package(package)
+ with open_binary(package, resource) as fp:
+ return fp.read()
+
+
+def read_text(package: Package,
+ resource: Resource,
+ encoding: str = 'utf-8',
+ errors: str = 'strict') -> str:
+ """Return the decoded string of the resource.
+
+ The decoding-related arguments have the same semantics as those of
+ bytes.decode().
+ """
+ resource = _normalize_path(resource)
+ package = _get_package(package)
+ with open_text(package, resource, encoding, errors) as fp:
+ return fp.read()
+
+
+@contextmanager
+def path(package: Package, resource: Resource) -> Iterator[Path]:
+ """A context manager providing a file path object to the resource.
+
+ If the resource does not already exist on its own on the file system,
+ a temporary file will be created. If the file was created, the file
+ will be deleted upon exiting the context manager (no exception is
+ raised if the file was deleted prior to the context manager
+ exiting).
+ """
+ resource = _normalize_path(resource)
+ package = _get_package(package)
+ reader = _get_resource_reader(package)
+ if reader is not None:
+ try:
+ yield Path(reader.resource_path(resource))
+ return
+ except FileNotFoundError:
+ pass
+ # Fall-through for both the lack of resource_path() *and* if
+ # resource_path() raises FileNotFoundError.
+ package_directory = Path(package.__spec__.origin).parent
+ file_path = package_directory / resource
+ if file_path.exists():
+ yield file_path
+ else:
+ with open_binary(package, resource) as fp:
+ data = fp.read()
+ # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
+ # blocks due to the need to close the temporary file to work on
+ # Windows properly.
+ fd, raw_path = tempfile.mkstemp()
+ try:
+ os.write(fd, data)
+ os.close(fd)
+ yield Path(raw_path)
+ finally:
+ try:
+ os.remove(raw_path)
+ except FileNotFoundError:
+ pass
+
+
+def is_resource(package: Package, name: str) -> bool:
+ """True if 'name' is a resource inside 'package'.
+
+ Directories are *not* resources.
+ """
+ package = _get_package(package)
+ _normalize_path(name)
+ reader = _get_resource_reader(package)
+ if reader is not None:
+ return reader.is_resource(name)
+ try:
+ package_contents = set(contents(package))
+ except (NotADirectoryError, FileNotFoundError):
+ return False
+ if name not in package_contents:
+ return False
+ # Just because the given file_name lives as an entry in the package's
+ # contents doesn't necessarily mean it's a resource. Directories are not
+ # resources, so let's try to find out if it's a directory or not.
+ path = Path(package.__spec__.origin).parent / name
+ if path.is_file():
+ return True
+ if path.is_dir():
+ return False
+ # If it's not a file and it's not a directory, what is it? Well, this
+ # means the file doesn't exist on the file system, so it probably lives
+ # inside a zip file. We have to crack open the zip, look at its table of
+ # contents, and make sure that this entry doesn't have sub-entries.
+ archive_path = package.__spec__.loader.archive # type: ignore
+ package_directory = Path(package.__spec__.origin).parent
+ with ZipFile(archive_path) as zf:
+ toc = zf.namelist()
+ relpath = package_directory.relative_to(archive_path)
+ candidate_path = relpath / name
+ for entry in toc:
+ try:
+ relative_to_candidate = Path(entry).relative_to(candidate_path)
+ except ValueError:
+ # The two paths aren't relative to each other so we can ignore it.
+ continue
+ # Since directories aren't explicitly listed in the zip file, we must
+ # infer their 'directory-ness' by looking at the number of path
+ # components in the path relative to the package resource we're
+ # looking up. If there are zero additional parts, it's a file, i.e. a
+ # resource. If there are more than zero it's a directory, i.e. not a
+ # resource. It has to be one of these two cases.
+ return len(relative_to_candidate.parts) == 0
+ # I think it's impossible to get here. It would mean that we are looking
+ # for a resource in a zip file, there's an entry matching it in the return
+ # value of contents(), but we never actually found it in the zip's table of
+ # contents.
+ raise AssertionError('Impossible situation')
+
+
+def contents(package: Package) -> Iterator[str]:
+ """Return the list of entries in 'package'.
+
+ Note that not all entries are resources. Specifically, directories are
+ not considered resources. Use `is_resource()` on each entry returned here
+ to check if it is a resource or not.
+ """
+ package = _get_package(package)
+ reader = _get_resource_reader(package)
+ if reader is not None:
+ yield from reader.contents()
+ return
+ # Is the package a namespace package? By definition, namespace packages
+ # cannot have resources.
+ if (package.__spec__.origin == 'namespace' and
+ not package.__spec__.has_location):
+ return []
+ package_directory = Path(package.__spec__.origin).parent
+ try:
+ yield from os.listdir(str(package_directory))
+ except (NotADirectoryError, FileNotFoundError):
+ # The package is probably in a zip file.
+ archive_path = getattr(package.__spec__.loader, 'archive', None)
+ if archive_path is None:
+ raise
+ relpath = package_directory.relative_to(archive_path)
+ with ZipFile(archive_path) as zf:
+ toc = zf.namelist()
+ subdirs_seen = set() # type: Set
+ for filename in toc:
+ path = Path(filename)
+ # Strip off any path component parts that are in common with the
+ # package directory, relative to the zip archive's file system
+ # path. This gives us all the parts that live under the named
+ # package inside the zip file. If the length of these subparts is
+ # exactly 1, then it is situated inside the package. The resulting
+ # length will be 0 if it's above the package, and it will be
+ # greater than 1 if it lives in a subdirectory of the package
+ # directory.
+ #
+ # However, since directories themselves don't appear in the zip
+ # archive as a separate entry, we need to return the first path
+ # component for any case that has > 1 subparts -- but only once!
+ if path.parts[:len(relpath.parts)] != relpath.parts:
+ continue
+ subparts = path.parts[len(relpath.parts):]
+ if len(subparts) == 1:
+ yield subparts[0]
+ elif len(subparts) > 1:
+ subdir = subparts[0]
+ if subdir not in subdirs_seen:
+ subdirs_seen.add(subdir)
+ yield subdir