summaryrefslogtreecommitdiff
path: root/setuptools/_wheelbuilder.py
blob: b223edae0b4d94f486457e437d19794201b4113e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import csv
import hashlib
import io
import itertools
import logging
import os
import stat
import time
from base64 import urlsafe_b64encode
from pathlib import Path
from typing import Dict, Iterable, Optional, Tuple, Union
from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo

from .discovery import _Filter

_Path = Union[str, Path]
_Timestamp = Tuple[int, int, int, int, int, int]
_StrOrIter = Union[str, Iterable[str]]

_HASH_ALG = "sha256"
_HASH_BUF_SIZE = 65536
_MINIMUM_TIMESTAMP = 315532800  # 1980-01-01 00:00:00 UTC
_DEFAULT_COMPRESSION = ZIP_DEFLATED
_WHEEL_VERSION = "1.0"
_META_TEMPLATE = f"""\
Wheel-Version: {_WHEEL_VERSION}
Generator: {{generator}}
Root-Is-Purelib: {{root_is_purelib}}
"""

_logger = logging.getLogger(__name__)


class WheelBuilder:
    """Wrapper around ZipFile that abstracts some aspects of creating a ``.whl`` file
    It should be used as a context manager and before closing will add the ``WHEEL``
    and ``RECORD`` files to the end of the archive.
    The caller is responsible for writing files/dirs in a suitable order,
    (which includes ensuring ``.dist-info`` is written last).
    """

    def __init__(
        self,
        path: _Path,
        root_is_purelib: bool = True,
        compression: int = _DEFAULT_COMPRESSION,
        generator: Optional[str] = None,
        timestamp: Optional[int] = None,
    ):
        self._path = Path(path)
        self._root_is_purelib = root_is_purelib
        self._generator = generator
        self._compression = compression
        self._zip = ZipFile(self._path, "w", compression=compression)
        self._records: Dict[str, Tuple[str, int]] = {}

        basename = str(self._path.with_suffix("").name)
        parts = basename.split("-")
        self._distribution, self._version = parts[:2]
        self._tags = parts[-3:]
        self._build = parts[2] if len(parts) > 5 else ""
        self._dist_info = f"{self._distribution}-{self._version}.dist-info"
        self._timestamp = _get_timestamp(timestamp, int(time.time()))
        assert len(self._tags), f"Invalid wheel name: {self._path}"

    def __enter__(self) -> "WheelBuilder":
        self._zip.__enter__()
        _logger.debug(f"creating '{str(self._path)!r}'")
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self._add_wheel_meta()
        self._save_record()
        return self._zip.__exit__(exc_type, exc_value, traceback)

    def _default_generator(self) -> str:
        from setuptools.version import __version__

        return f"setuptools ({__version__})"

    def add_existing_file(self, arcname: str, file: _Path):
        """Add a file that already exists in the file system to the wheel."""
        hashsum = hashlib.new(_HASH_ALG)
        file_stat = os.stat(file)
        zipinfo = ZipInfo(arcname, self._timestamp)
        attr = stat.S_IMODE(file_stat.st_mode) | stat.S_IFMT(file_stat.st_mode)
        zipinfo.external_attr = attr << 16
        zipinfo.compress_type = self._compression

        with open(file, "rb") as src, self._zip.open(zipinfo, "w") as dst:
            while True:
                buffer = src.read(_HASH_BUF_SIZE)
                if not buffer:
                    file_size = src.tell()
                    break
                dst.write(buffer)
                hashsum.update(buffer)

        _logger.debug(f"adding {str(arcname)!r} [{attr:o}]")
        hash_digest = urlsafe_b64encode(hashsum.digest()).decode('ascii').rstrip('=')
        self._records[arcname] = (hash_digest, file_size)

    def add_tree(
        self, path: _Path, prefix: Optional[str] = None, exclude: Iterable[str] = ()
    ):
        """
        Add the file tree **UNDER** ``path`` to the wheel file (does not include
        the parent directory itself).
        You can use ``prefix`` to create a new parent directory.
        """
        should_exclude = _Filter(*exclude)
        for root, dirs, files in os.walk(path):
            # Use sorted to improve determinism.
            dirs[:] = [x for x in sorted(dirs) if x != "__pycache__"]
            for name in sorted(files):
                file = os.path.normpath(os.path.join(root, name))
                arcname = os.path.relpath(file, path).replace(os.path.sep, "/")
                if not os.path.isfile(file) or should_exclude(arcname):
                    continue
                if prefix:
                    arcname = os.path.join(prefix, arcname)
                self.add_existing_file(arcname, file)

    def new_file(self, arcname: str, contents: _StrOrIter, permissions: int = 0o664):
        """
        Create a new entry in the wheel named ``arcname`` that contains
        the UTF-8 text specified by ``contents``.
        """
        zipinfo = ZipInfo(arcname, self._timestamp)
        zipinfo.external_attr = (permissions | stat.S_IFREG) << 16
        zipinfo.compress_type = self._compression
        hashsum = hashlib.new(_HASH_ALG)
        file_size = 0
        iter_contents = [contents] if isinstance(contents, str) else contents
        with self._zip.open(zipinfo, "w") as fp:
            for part in iter_contents:
                bpart = bytes(part, "utf-8")
                file_size += fp.write(bpart)
                hashsum.update(bpart)
        hash_digest = urlsafe_b64encode(hashsum.digest()).decode('ascii').rstrip('=')
        self._records[arcname] = (hash_digest, file_size)

    def _save_record(self):
        arcname = f"{self._dist_info}/RECORD"
        zipinfo = ZipInfo(arcname, self._timestamp)
        zipinfo.external_attr = (0o664 | stat.S_IFREG) << 16
        zipinfo.compress_type = self._compression
        out = self._zip.open(zipinfo, "w")
        buf = io.TextIOWrapper(out, encoding="utf-8")
        with out, buf:
            writer = csv.writer(buf, delimiter=",", quotechar='"', lineterminator="\n")
            for file, (hash_digest, size) in self._records.items():
                writer.writerow((file, f"{_HASH_ALG}={hash_digest}", size))
            writer.writerow((arcname, "", ""))

    def _add_wheel_meta(self):
        arcname = f"{self._dist_info}/WHEEL"
        beginning = _META_TEMPLATE.format(
            generator=self._generator or self._default_generator(),
            root_is_purelib=self._root_is_purelib,
        )
        impl_tag, abi_tag, plat_tag = self._tags
        tags = (
            f"Tag: {impl}-{abi}-{plat}\n"
            for impl in impl_tag.split(".")
            for abi in abi_tag.split(".")
            for plat in plat_tag.split(".")
        )
        build = [f"Build: {self._build}\n"] if self._build else []
        contents = itertools.chain([beginning], tags, build)
        self.new_file(arcname, contents)


def _get_timestamp(
    given: Optional[int] = None,
    fallback: int = _MINIMUM_TIMESTAMP,
) -> _Timestamp:
    timestamp = given or int(os.environ.get("SOURCE_DATE_EPOCH", fallback))
    timestamp = max(timestamp, _MINIMUM_TIMESTAMP)
    return time.gmtime(timestamp)[0:6]