diff options
author | Anderson Bravalheri <andersonbravalheri@gmail.com> | 2023-03-10 19:31:05 +0000 |
---|---|---|
committer | Anderson Bravalheri <andersonbravalheri@gmail.com> | 2023-05-03 14:10:21 +0100 |
commit | 7d8fba390a72ea98efdfc51c627aa3c96c368678 (patch) | |
tree | fa08afc6606a81fb18dec664ce3c0c523403ca11 /setuptools/_wheelbuilder.py | |
parent | c9a8429f4ce130a62dd9b3ade35dd506db0670ef (diff) | |
download | python-setuptools-git-7d8fba390a72ea98efdfc51c627aa3c96c368678.tar.gz |
Add simplified wheel file builder implementation
Diffstat (limited to 'setuptools/_wheelbuilder.py')
-rw-r--r-- | setuptools/_wheelbuilder.py | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/setuptools/_wheelbuilder.py b/setuptools/_wheelbuilder.py new file mode 100644 index 00000000..26c4b2a9 --- /dev/null +++ b/setuptools/_wheelbuilder.py @@ -0,0 +1,178 @@ +import csv +import hashlib +import io +import itertools +import logging +import os +import stat +import time +from base64 import urlsafe_b64encode +from pathlib import Path +from typing import Dict, Iterable, Optional, Tuple, Union +from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo + +from .discovery import _Filter + +_Path = Union[str, Path] +_Timestamp = Tuple[int, int, int, int, int, int] +_StrOrIter = Union[str, Iterable[str]] + +_HASH_ALG = "sha256" +_HASH_BUF_SIZE = 65536 +_MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC +_COMPRESSION = ZIP_DEFLATED +_WHEEL_VERSION = "1.0" +_META_TEMPLATE = f"""\ +Wheel-Version: {_WHEEL_VERSION} +Generator: {{generator}} +Root-Is-Purelib: {{root_is_purelib}} +""" + +_logger = logging.getLogger(__name__) + + +class WheelBuilder: + """Wrapper around ZipFile that abstracts some aspects of creating a ``.whl`` file + It should be used as a context manager and before closing will add the ``WHEEL`` + and ``RECORD`` files to the end of the archive. + The caller is responsible for writing files/dirs in a suitable order, + (which includes ensuring ``.dist-info`` is written last). + """ + + def __init__( + self, + path: _Path, + root_is_purelib: bool = True, + generator: Optional[str] = None, + timestamp: Optional[int] = None, + ): + self._path = Path(path) + self._root_is_purelib = root_is_purelib + self._generator = generator + self._zip = ZipFile(self._path, "w", compression=_COMPRESSION) + self._records: Dict[str, Tuple[str, int]] = {} + + basename = str(self._path.with_suffix("").name) + parts = basename.split("-") + self._distribution, self._version = parts[:2] + self._tags = parts[-3:] + self._build = parts[2] if len(parts) > 5 else "" + self._dist_info = f"{self._distribution}-{self._version}.dist-info" + self._timestamp = _get_timestamp(timestamp, int(time.time())) + assert len(self._tags), f"Invalid wheel name: {self._path}" + + def __enter__(self) -> "WheelBuilder": + self._zip.__enter__() + _logger.debug(f"creating '{str(self._path)!r}'") + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._add_wheel_meta() + self._save_record() + return self._zip.__exit__(exc_type, exc_value, traceback) + + def _default_generator(self) -> str: + from setuptools.version import __version__ + + return f"setuptools ({__version__})" + + def add_existing_file(self, arcname: str, file: _Path): + """Add a file that already exists in the file system to the wheel.""" + hashsum = hashlib.new(_HASH_ALG) + file_stat = os.stat(file) + zipinfo = ZipInfo(arcname, self._timestamp) + attr = stat.S_IMODE(file_stat.st_mode) | stat.S_IFMT(file_stat.st_mode) + zipinfo.external_attr = attr << 16 + zipinfo.compress_type = _COMPRESSION + + with open(file, "rb") as src, self._zip.open(zipinfo, "w") as dst: + while True: + buffer = src.read(_HASH_BUF_SIZE) + if not buffer: + file_size = src.tell() + break + dst.write(buffer) + hashsum.update(buffer) + + _logger.debug(f"adding {str(arcname)!r} [{attr:o}]") + hash_digest = urlsafe_b64encode(hashsum.digest()).decode('ascii').rstrip('=') + self._records[arcname] = (hash_digest, file_size) + + def add_tree( + self, path: _Path, prefix: Optional[str] = None, exclude: Iterable[str] = () + ): + """ + Add the file tree **UNDER** ``path`` to the wheel file (does not include + the parent directory itself). + You can use ``prefix`` to create a new parent directory. + """ + should_exclude = _Filter(*exclude) + for root, dirs, files in os.walk(path): + # Use sorted to improve determinism. + dirs[:] = [x for x in sorted(dirs) if x != "__pycache__"] + for name in sorted(files): + file = os.path.normpath(os.path.join(root, name)) + if not os.path.isfile(file) or should_exclude(file): + continue + arcname = os.path.relpath(file, path).replace(os.path.sep, "/") + if prefix: + arcname = os.path.join(prefix, arcname) + self.add_existing_file(arcname, file) + + def new_file(self, arcname: str, contents: _StrOrIter, permissions: int = 0o664): + """ + Create a new entry in the wheel named ``arcname`` that contains + the UTF-8 text specified by ``contents``. + """ + zipinfo = ZipInfo(arcname, self._timestamp) + zipinfo.external_attr = permissions << 16 + zipinfo.compress_type = _COMPRESSION + hashsum = hashlib.new(_HASH_ALG) + file_size = 0 + iter_contents = [contents] if isinstance(contents, str) else contents + with self._zip.open(zipinfo, "w") as fp: + for part in iter_contents: + bpart = bytes(part, "utf-8") + file_size += fp.write(bpart) + hashsum.update(bpart) + hash_digest = urlsafe_b64encode(hashsum.digest()).decode('ascii').rstrip('=') + self._records[arcname] = (hash_digest, file_size) + + def _save_record(self): + arcname = f"{self._dist_info}/RECORD" + zipinfo = ZipInfo(arcname, self._timestamp) + zipinfo.external_attr = 0o664 << 16 + zipinfo.compress_type = _COMPRESSION + out = self._zip.open(zipinfo, "w") + buf = io.TextIOWrapper(out, encoding="utf-8") + with out, buf: + writer = csv.writer(buf, delimiter=",", quotechar='"', lineterminator="\n") + for file, (hash_digest, size) in self._records.items(): + writer.writerow((file, f"{_HASH_ALG}={hash_digest}", size)) + writer.writerow((arcname, "", "")) + + def _add_wheel_meta(self): + arcname = f"{self._dist_info}/WHEEL" + beginning = _META_TEMPLATE.format( + generator=self._generator or self._default_generator(), + root_is_purelib=self._root_is_purelib, + ) + impl_tag, abi_tag, plat_tag = self._tags + tags = ( + f"Tag: {impl}-{abi}-{plat}\n" + for impl in impl_tag.split(".") + for abi in abi_tag.split(".") + for plat in plat_tag.split(".") + ) + build = [f"Build: {self._build}\n"] if self._build else [] + contents = itertools.chain([beginning], tags, build) + self.new_file(arcname, contents) + + +def _get_timestamp( + given: Optional[int] = None, + fallback: int = _MINIMUM_TIMESTAMP, +) -> _Timestamp: + timestamp = given or int(os.environ.get("SOURCE_DATE_EPOCH", fallback)) + timestamp = max(timestamp, _MINIMUM_TIMESTAMP) + return time.gmtime(timestamp)[0:6] |