summaryrefslogtreecommitdiff
path: root/numpy/core/tests/test_cpu_features.py
blob: 2fad4dfd98d0bfd48eba8f3c1f5c979a0b0d30a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
import sys, platform, re, pytest
from numpy.core._multiarray_umath import (
    __cpu_features__,
    __cpu_baseline__,
    __cpu_dispatch__,
)
import numpy as np
import subprocess
import pathlib
import os
import re

def assert_features_equal(actual, desired, fname):
    __tracebackhide__ = True  # Hide traceback for py.test
    actual, desired = str(actual), str(desired)
    if actual == desired:
        return
    detected = str(__cpu_features__).replace("'", "")
    try:
        with open("/proc/cpuinfo") as fd:
            cpuinfo = fd.read(2048)
    except Exception as err:
        cpuinfo = str(err)

    try:
        import subprocess
        auxv = subprocess.check_output(['/bin/true'], env=dict(LD_SHOW_AUXV="1"))
        auxv = auxv.decode()
    except Exception as err:
        auxv = str(err)

    import textwrap
    error_report = textwrap.indent(
"""
###########################################
### Extra debugging information
###########################################
-------------------------------------------
--- NumPy Detections
-------------------------------------------
%s
-------------------------------------------
--- SYS / CPUINFO
-------------------------------------------
%s....
-------------------------------------------
--- SYS / AUXV
-------------------------------------------
%s
""" % (detected, cpuinfo, auxv), prefix='\r')

    raise AssertionError((
        "Failure Detection\n"
        " NAME: '%s'\n"
        " ACTUAL: %s\n"
        " DESIRED: %s\n"
        "%s"
    ) % (fname, actual, desired, error_report))

def _text_to_list(txt):
    out = txt.strip("][\n").replace("'", "").split(', ')
    return None if out[0] == "" else out

class AbstractTest:
    features = []
    features_groups = {}
    features_map = {}
    features_flags = set()

    def load_flags(self):
        # a hook
        pass
    def test_features(self):
        self.load_flags()
        for gname, features in self.features_groups.items():
            test_features = [self.cpu_have(f) for f in features]
            assert_features_equal(__cpu_features__.get(gname), all(test_features), gname)

        for feature_name in self.features:
            cpu_have = self.cpu_have(feature_name)
            npy_have = __cpu_features__.get(feature_name)
            assert_features_equal(npy_have, cpu_have, feature_name)

    def cpu_have(self, feature_name):
        map_names = self.features_map.get(feature_name, feature_name)
        if isinstance(map_names, str):
            return map_names in self.features_flags
        for f in map_names:
            if f in self.features_flags:
                return True
        return False

    def load_flags_cpuinfo(self, magic_key):
        self.features_flags = self.get_cpuinfo_item(magic_key)

    def get_cpuinfo_item(self, magic_key):
        values = set()
        with open('/proc/cpuinfo') as fd:
            for line in fd:
                if not line.startswith(magic_key):
                    continue
                flags_value = [s.strip() for s in line.split(':', 1)]
                if len(flags_value) == 2:
                    values = values.union(flags_value[1].upper().split())
        return values

    def load_flags_auxv(self):
        auxv = subprocess.check_output(['/bin/true'], env=dict(LD_SHOW_AUXV="1"))
        for at in auxv.split(b'\n'):
            if not at.startswith(b"AT_HWCAP"):
                continue
            hwcap_value = [s.strip() for s in at.split(b':', 1)]
            if len(hwcap_value) == 2:
                self.features_flags = self.features_flags.union(
                    hwcap_value[1].upper().decode().split()
                )

@pytest.mark.skipif(
    sys.platform == 'emscripten',
    reason= (
        "The subprocess module is not available on WASM platforms and"
        " therefore this test class cannot be properly executed."
    ),
)
class TestEnvPrivation:
    cwd = pathlib.Path(__file__).parent.resolve()
    env = os.environ.copy()
    _enable = os.environ.pop('NPY_ENABLE_CPU_FEATURES', None)
    _disable = os.environ.pop('NPY_DISABLE_CPU_FEATURES', None)
    SUBPROCESS_ARGS = dict(cwd=cwd, capture_output=True, text=True, check=True)
    unavailable_feats = [
        feat for feat in __cpu_dispatch__ if not __cpu_features__[feat]
    ]
    UNAVAILABLE_FEAT = (
        None if len(unavailable_feats) == 0
        else unavailable_feats[0]
    )
    BASELINE_FEAT = None if len(__cpu_baseline__) == 0 else __cpu_baseline__[0]
    SCRIPT = """
def main():
    from numpy.core._multiarray_umath import __cpu_features__, __cpu_dispatch__

    detected = [feat for feat in __cpu_dispatch__ if __cpu_features__[feat]]
    print(detected)

if __name__ == "__main__":
    main()
    """

    @pytest.fixture(autouse=True)
    def setup_class(self, tmp_path_factory):
        file = tmp_path_factory.mktemp("runtime_test_script")
        file /= "_runtime_detect.py"
        file.write_text(self.SCRIPT)
        self.file = file
        return

    def _run(self):
        return subprocess.run(
            [sys.executable, self.file],
            env=self.env,
            **self.SUBPROCESS_ARGS,
            )

    # Helper function mimicing pytest.raises for subprocess call
    def _expect_error(
        self,
        msg,
        err_type,
        no_error_msg="Failed to generate error"
    ):
        try:
            self._run()
        except subprocess.CalledProcessError as e:
            assertion_message = f"Expected: {msg}\nGot: {e.stderr}"
            assert re.search(msg, e.stderr), assertion_message

            assertion_message = (
                f"Expected error of type: {err_type}; see full "
                f"error:\n{e.stderr}"
            )
            assert re.search(err_type, e.stderr), assertion_message
        else:
            assert False, no_error_msg

    def setup_method(self):
        """Ensure that the environment is reset"""
        self.env = os.environ.copy()
        return

    def test_runtime_feature_selection(self):
        """
        Ensure that when selecting `NPY_ENABLE_CPU_FEATURES`, only the
        features exactly specified are dispatched.
        """

        # Capture runtime-enabled features
        out = self._run()
        non_baseline_features = _text_to_list(out.stdout)

        if non_baseline_features is None:
            pytest.skip(
                "No dispatchable features outside of baseline detected."
            )
        feature = non_baseline_features[0]

        # Capture runtime-enabled features when `NPY_ENABLE_CPU_FEATURES` is
        # specified
        self.env['NPY_ENABLE_CPU_FEATURES'] = feature
        out = self._run()
        enabled_features = _text_to_list(out.stdout)

        # Ensure that only one feature is enabled, and it is exactly the one
        # specified by `NPY_ENABLE_CPU_FEATURES`
        assert set(enabled_features) == {feature}

        if len(non_baseline_features) < 2:
            pytest.skip("Only one non-baseline feature detected.")
        # Capture runtime-enabled features when `NPY_ENABLE_CPU_FEATURES` is
        # specified
        self.env['NPY_ENABLE_CPU_FEATURES'] = ",".join(non_baseline_features)
        out = self._run()
        enabled_features = _text_to_list(out.stdout)

        # Ensure that both features are enabled, and they are exactly the ones
        # specified by `NPY_ENABLE_CPU_FEATURES`
        assert set(enabled_features) == set(non_baseline_features)
        return

    @pytest.mark.parametrize("enabled, disabled",
    [
        ("feature", "feature"),
        ("feature", "same"),
    ])
    def test_both_enable_disable_set(self, enabled, disabled):
        """
        Ensure that when both environment variables are set then an
        ImportError is thrown
        """
        self.env['NPY_ENABLE_CPU_FEATURES'] = enabled
        self.env['NPY_DISABLE_CPU_FEATURES'] = disabled
        msg = "Both NPY_DISABLE_CPU_FEATURES and NPY_ENABLE_CPU_FEATURES"
        err_type = "ImportError"
        self._expect_error(msg, err_type)

    @pytest.mark.skipif(
        not __cpu_dispatch__,
        reason=(
            "NPY_*_CPU_FEATURES only parsed if "
            "`__cpu_dispatch__` is non-empty"
        )
    )
    @pytest.mark.parametrize("action", ["ENABLE", "DISABLE"])
    def test_variable_too_long(self, action):
        """
        Test that an error is thrown if the environment variables are too long
        to be processed. Current limit is 1024, but this may change later.
        """
        MAX_VAR_LENGTH = 1024
        # Actual length is MAX_VAR_LENGTH + 1 due to null-termination
        self.env[f'NPY_{action}_CPU_FEATURES'] = "t" * MAX_VAR_LENGTH
        msg = (
            f"Length of environment variable 'NPY_{action}_CPU_FEATURES' is "
            f"{MAX_VAR_LENGTH + 1}, only {MAX_VAR_LENGTH} accepted"
        )
        err_type = "RuntimeError"
        self._expect_error(msg, err_type)

    @pytest.mark.skipif(
        not __cpu_dispatch__,
        reason=(
            "NPY_*_CPU_FEATURES only parsed if "
            "`__cpu_dispatch__` is non-empty"
        )
    )
    def test_impossible_feature_disable(self):
        """
        Test that a RuntimeError is thrown if an impossible feature-disabling
        request is made. This includes disabling a baseline feature.
        """

        if self.BASELINE_FEAT is None:
            pytest.skip("There are no unavailable features to test with")
        bad_feature = self.BASELINE_FEAT
        self.env['NPY_DISABLE_CPU_FEATURES'] = bad_feature
        msg = (
            f"You cannot disable CPU feature '{bad_feature}', since it is "
            "part of the baseline optimizations"
        )
        err_type = "RuntimeError"
        self._expect_error(msg, err_type)

    def test_impossible_feature_enable(self):
        """
        Test that a RuntimeError is thrown if an impossible feature-enabling
        request is made. This includes enabling a feature not supported by the
        machine, or disabling a baseline optimization.
        """

        if self.UNAVAILABLE_FEAT is None:
            pytest.skip("There are no unavailable features to test with")
        bad_feature = self.UNAVAILABLE_FEAT
        self.env['NPY_ENABLE_CPU_FEATURES'] = bad_feature
        msg = (
            f"You cannot enable CPU features \\({bad_feature}\\), since "
            "they are not supported by your machine."
        )
        err_type = "RuntimeError"
        self._expect_error(msg, err_type)

        # Ensure that only the bad feature gets reported
        feats = f"{bad_feature}, {self.BASELINE_FEAT}"
        self.env['NPY_ENABLE_CPU_FEATURES'] = feats
        msg = (
            f"You cannot enable CPU features \\({bad_feature}\\), since they "
            "are not supported by your machine."
        )
        self._expect_error(msg, err_type)

is_linux = sys.platform.startswith('linux')
is_cygwin = sys.platform.startswith('cygwin')
machine  = platform.machine()
is_x86   = re.match("^(amd64|x86|i386|i686)", machine, re.IGNORECASE)
@pytest.mark.skipif(
    not (is_linux or is_cygwin) or not is_x86, reason="Only for Linux and x86"
)
class Test_X86_Features(AbstractTest):
    features = [
        "MMX", "SSE", "SSE2", "SSE3", "SSSE3", "SSE41", "POPCNT", "SSE42",
        "AVX", "F16C", "XOP", "FMA4", "FMA3", "AVX2", "AVX512F", "AVX512CD",
        "AVX512ER", "AVX512PF", "AVX5124FMAPS", "AVX5124VNNIW", "AVX512VPOPCNTDQ",
        "AVX512VL", "AVX512BW", "AVX512DQ", "AVX512VNNI", "AVX512IFMA",
        "AVX512VBMI", "AVX512VBMI2", "AVX512BITALG", "AVX512FP16",
    ]
    features_groups = dict(
        AVX512_KNL = ["AVX512F", "AVX512CD", "AVX512ER", "AVX512PF"],
        AVX512_KNM = ["AVX512F", "AVX512CD", "AVX512ER", "AVX512PF", "AVX5124FMAPS",
                      "AVX5124VNNIW", "AVX512VPOPCNTDQ"],
        AVX512_SKX = ["AVX512F", "AVX512CD", "AVX512BW", "AVX512DQ", "AVX512VL"],
        AVX512_CLX = ["AVX512F", "AVX512CD", "AVX512BW", "AVX512DQ", "AVX512VL", "AVX512VNNI"],
        AVX512_CNL = ["AVX512F", "AVX512CD", "AVX512BW", "AVX512DQ", "AVX512VL", "AVX512IFMA",
                      "AVX512VBMI"],
        AVX512_ICL = ["AVX512F", "AVX512CD", "AVX512BW", "AVX512DQ", "AVX512VL", "AVX512IFMA",
                      "AVX512VBMI", "AVX512VNNI", "AVX512VBMI2", "AVX512BITALG", "AVX512VPOPCNTDQ"],
        AVX512_SPR = ["AVX512F", "AVX512CD", "AVX512BW", "AVX512DQ",
                      "AVX512VL", "AVX512IFMA", "AVX512VBMI", "AVX512VNNI",
                      "AVX512VBMI2", "AVX512BITALG", "AVX512VPOPCNTDQ",
                      "AVX512FP16"],
    )
    features_map = dict(
        SSE3="PNI", SSE41="SSE4_1", SSE42="SSE4_2", FMA3="FMA",
        AVX512VNNI="AVX512_VNNI", AVX512BITALG="AVX512_BITALG", AVX512VBMI2="AVX512_VBMI2",
        AVX5124FMAPS="AVX512_4FMAPS", AVX5124VNNIW="AVX512_4VNNIW", AVX512VPOPCNTDQ="AVX512_VPOPCNTDQ",
    )
    def load_flags(self):
        self.load_flags_cpuinfo("flags")

is_power = re.match("^(powerpc|ppc)64", machine, re.IGNORECASE)
@pytest.mark.skipif(not is_linux or not is_power, reason="Only for Linux and Power")
class Test_POWER_Features(AbstractTest):
    features = ["VSX", "VSX2", "VSX3", "VSX4"]
    features_map = dict(VSX2="ARCH_2_07", VSX3="ARCH_3_00", VSX4="ARCH_3_1")

    def load_flags(self):
        self.load_flags_auxv()


is_zarch = re.match("^(s390x)", machine, re.IGNORECASE)
@pytest.mark.skipif(not is_linux or not is_zarch,
                    reason="Only for Linux and IBM Z")
class Test_ZARCH_Features(AbstractTest):
    features = ["VX", "VXE", "VXE2"]

    def load_flags(self):
        self.load_flags_auxv()


is_arm = re.match("^(arm|aarch64)", machine, re.IGNORECASE)
@pytest.mark.skipif(not is_linux or not is_arm, reason="Only for Linux and ARM")
class Test_ARM_Features(AbstractTest):
    features = [
        "NEON", "ASIMD", "FPHP", "ASIMDHP", "ASIMDDP", "ASIMDFHM"
    ]
    features_groups = dict(
        NEON_FP16  = ["NEON", "HALF"],
        NEON_VFPV4 = ["NEON", "VFPV4"],
    )
    def load_flags(self):
        self.load_flags_cpuinfo("Features")
        arch = self.get_cpuinfo_item("CPU architecture")
        # in case of mounting virtual filesystem of aarch64 kernel
        is_rootfs_v8 = int('0'+next(iter(arch))) > 7 if arch else 0
        if  re.match("^(aarch64|AARCH64)", machine) or is_rootfs_v8:
            self.features_map = dict(
                NEON="ASIMD", HALF="ASIMD", VFPV4="ASIMD"
            )
        else:
            self.features_map = dict(
                # ELF auxiliary vector and /proc/cpuinfo on Linux kernel(armv8 aarch32)
                # doesn't provide information about ASIMD, so we assume that ASIMD is supported
                # if the kernel reports any one of the following ARM8 features.
                ASIMD=("AES", "SHA1", "SHA2", "PMULL", "CRC32")
            )