diff options
Diffstat (limited to 'Lib/test/test_pathlib.py')
| -rw-r--r-- | Lib/test/test_pathlib.py | 30 | 
1 files changed, 30 insertions, 0 deletions
| diff --git a/Lib/test/test_pathlib.py b/Lib/test/test_pathlib.py index 46a705e1cf..21a6390e35 100644 --- a/Lib/test/test_pathlib.py +++ b/Lib/test/test_pathlib.py @@ -8,6 +8,7 @@ import socket  import stat  import tempfile  import unittest +from unittest import mock  from test import support  android_not_root = support.android_not_root @@ -1801,6 +1802,35 @@ class _BasePathTest(object):              p.mkdir(exist_ok=True)          self.assertEqual(cm.exception.errno, errno.EEXIST) +    def test_mkdir_concurrent_parent_creation(self): +        for pattern_num in range(32): +            p = self.cls(BASE, 'dirCPC%d' % pattern_num) +            self.assertFalse(p.exists()) + +            def my_mkdir(path, mode=0o777): +                path = str(path) +                # Emulate another process that would create the directory +                # just before we try to create it ourselves.  We do it +                # in all possible pattern combinations, assuming that this +                # function is called at most 5 times (dirCPC/dir1/dir2, +                # dirCPC/dir1, dirCPC, dirCPC/dir1, dirCPC/dir1/dir2). +                if pattern.pop(): +                    os.mkdir(path, mode)      # from another process +                    concurrently_created.add(path) +                os.mkdir(path, mode)          # our real call + +            pattern = [bool(pattern_num & (1 << n)) for n in range(5)] +            concurrently_created = set() +            p12 = p / 'dir1' / 'dir2' +            try: +                with mock.patch("pathlib._normal_accessor.mkdir", my_mkdir): +                    p12.mkdir(parents=True, exist_ok=False) +            except FileExistsError: +                self.assertIn(str(p12), concurrently_created) +            else: +                self.assertNotIn(str(p12), concurrently_created) +            self.assertTrue(p.exists()) +      @support.skip_unless_symlink      def test_symlink_to(self):          P = self.cls(BASE) | 
