diff options
Diffstat (limited to 'Lib/test/test_shutil.py')
-rw-r--r-- | Lib/test/test_shutil.py | 294 |
1 files changed, 292 insertions, 2 deletions
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 2cb2f14643..8d51994419 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -12,20 +12,28 @@ import errno import functools import pathlib import subprocess +import random +import string +import contextlib +import io from shutil import (make_archive, register_archive_format, unregister_archive_format, get_archive_formats, Error, unpack_archive, register_unpack_format, RegistryError, unregister_unpack_format, get_unpack_formats, - SameFileError) + SameFileError, _GiveupOnFastCopy) import tarfile import zipfile +try: + import posix +except ImportError: + posix = None from test import support from test.support import TESTFN, FakePath TESTFN2 = TESTFN + "2" - +OSX = sys.platform.startswith("darwin") try: import grp import pwd @@ -60,6 +68,24 @@ def write_file(path, content, binary=False): with open(path, 'wb' if binary else 'w') as fp: fp.write(content) +def write_test_file(path, size): + """Create a test file with an arbitrary size and random text content.""" + def chunks(total, step): + assert total >= step + while total > step: + yield step + total -= step + if total: + yield total + + bufsize = min(size, 8192) + chunk = b"".join([random.choice(string.ascii_letters).encode() + for i in range(bufsize)]) + with open(path, 'wb') as f: + for csize in chunks(size, bufsize): + f.write(chunk) + assert os.path.getsize(path) == size + def read_file(path, binary=False): """Return contents from a file located at *path*. @@ -84,6 +110,37 @@ def rlistdir(path): res.append(name) return res +def supports_file2file_sendfile(): + # ...apparently Linux and Solaris are the only ones + if not hasattr(os, "sendfile"): + return False + srcname = None + dstname = None + try: + with tempfile.NamedTemporaryFile("wb", delete=False) as f: + srcname = f.name + f.write(b"0123456789") + + with open(srcname, "rb") as src: + with tempfile.NamedTemporaryFile("wb", delete=False) as dst: + dstname = f.name + infd = src.fileno() + outfd = dst.fileno() + try: + os.sendfile(outfd, infd, 0, 2) + except OSError: + return False + else: + return True + finally: + if srcname is not None: + support.unlink(srcname) + if dstname is not None: + support.unlink(dstname) + + +SUPPORTS_SENDFILE = supports_file2file_sendfile() + class TestShutil(unittest.TestCase): @@ -1401,6 +1458,8 @@ class TestShutil(unittest.TestCase): self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) # But Error should work too, to stay backward compatible. self.assertRaises(Error, shutil.copyfile, src_file, src_file) + # Make sure file is not corrupted. + self.assertEqual(read_file(src_file), 'foo') def test_copytree_return_value(self): # copytree returns its destination path. @@ -1749,6 +1808,7 @@ class TestCopyFile(unittest.TestCase): self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile') + @unittest.skipIf(OSX, "skipped on OSX") def test_w_dest_open_fails(self): srcfile = self.Faux() @@ -1768,6 +1828,7 @@ class TestCopyFile(unittest.TestCase): self.assertEqual(srcfile._exited_with[1].args, ('Cannot open "destfile"',)) + @unittest.skipIf(OSX, "skipped on OSX") def test_w_dest_close_fails(self): srcfile = self.Faux() @@ -1790,6 +1851,7 @@ class TestCopyFile(unittest.TestCase): self.assertEqual(srcfile._exited_with[1].args, ('Cannot close',)) + @unittest.skipIf(OSX, "skipped on OSX") def test_w_source_close_fails(self): srcfile = self.Faux(True) @@ -1829,6 +1891,234 @@ class TestCopyFile(unittest.TestCase): finally: os.rmdir(dst_dir) + +class _ZeroCopyFileTest(object): + """Tests common to all zero-copy APIs.""" + FILESIZE = (10 * 1024 * 1024) # 10 MiB + FILEDATA = b"" + PATCHPOINT = "" + + @classmethod + def setUpClass(cls): + write_test_file(TESTFN, cls.FILESIZE) + with open(TESTFN, 'rb') as f: + cls.FILEDATA = f.read() + assert len(cls.FILEDATA) == cls.FILESIZE + + @classmethod + def tearDownClass(cls): + support.unlink(TESTFN) + + def tearDown(self): + support.unlink(TESTFN2) + + @contextlib.contextmanager + def get_files(self): + with open(TESTFN, "rb") as src: + with open(TESTFN2, "wb") as dst: + yield (src, dst) + + def zerocopy_fun(self, *args, **kwargs): + raise NotImplementedError("must be implemented in subclass") + + def reset(self): + self.tearDown() + self.tearDownClass() + self.setUpClass() + self.setUp() + + # --- + + def test_regular_copy(self): + with self.get_files() as (src, dst): + self.zerocopy_fun(src, dst) + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + # Make sure the fallback function is not called. + with self.get_files() as (src, dst): + with unittest.mock.patch('shutil.copyfileobj') as m: + shutil.copyfile(TESTFN, TESTFN2) + assert not m.called + + def test_same_file(self): + self.addCleanup(self.reset) + with self.get_files() as (src, dst): + with self.assertRaises(Exception): + self.zerocopy_fun(src, src) + # Make sure src file is not corrupted. + self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) + + def test_non_existent_src(self): + name = tempfile.mktemp() + with self.assertRaises(FileNotFoundError) as cm: + shutil.copyfile(name, "new") + self.assertEqual(cm.exception.filename, name) + + def test_empty_file(self): + srcname = TESTFN + 'src' + dstname = TESTFN + 'dst' + self.addCleanup(lambda: support.unlink(srcname)) + self.addCleanup(lambda: support.unlink(dstname)) + with open(srcname, "wb"): + pass + + with open(srcname, "rb") as src: + with open(dstname, "wb") as dst: + self.zerocopy_fun(src, dst) + + self.assertEqual(read_file(dstname, binary=True), b"") + + def test_unhandled_exception(self): + with unittest.mock.patch(self.PATCHPOINT, + side_effect=ZeroDivisionError): + self.assertRaises(ZeroDivisionError, + shutil.copyfile, TESTFN, TESTFN2) + + def test_exception_on_first_call(self): + # Emulate a case where the first call to the zero-copy + # function raises an exception in which case the function is + # supposed to give up immediately. + with unittest.mock.patch(self.PATCHPOINT, + side_effect=OSError(errno.EINVAL, "yo")): + with self.get_files() as (src, dst): + with self.assertRaises(_GiveupOnFastCopy): + self.zerocopy_fun(src, dst) + + def test_filesystem_full(self): + # Emulate a case where filesystem is full and sendfile() fails + # on first call. + with unittest.mock.patch(self.PATCHPOINT, + side_effect=OSError(errno.ENOSPC, "yo")): + with self.get_files() as (src, dst): + self.assertRaises(OSError, self.zerocopy_fun, src, dst) + + +@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported') +class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase): + PATCHPOINT = "os.sendfile" + + def zerocopy_fun(self, fsrc, fdst): + return shutil._fastcopy_sendfile(fsrc, fdst) + + def test_non_regular_file_src(self): + with io.BytesIO(self.FILEDATA) as src: + with open(TESTFN2, "wb") as dst: + with self.assertRaises(_GiveupOnFastCopy): + self.zerocopy_fun(src, dst) + shutil.copyfileobj(src, dst) + + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + + def test_non_regular_file_dst(self): + with open(TESTFN, "rb") as src: + with io.BytesIO() as dst: + with self.assertRaises(_GiveupOnFastCopy): + self.zerocopy_fun(src, dst) + shutil.copyfileobj(src, dst) + dst.seek(0) + self.assertEqual(dst.read(), self.FILEDATA) + + def test_exception_on_second_call(self): + def sendfile(*args, **kwargs): + if not flag: + flag.append(None) + return orig_sendfile(*args, **kwargs) + else: + raise OSError(errno.EBADF, "yo") + + flag = [] + orig_sendfile = os.sendfile + with unittest.mock.patch('os.sendfile', create=True, + side_effect=sendfile): + with self.get_files() as (src, dst): + with self.assertRaises(OSError) as cm: + shutil._fastcopy_sendfile(src, dst) + assert flag + self.assertEqual(cm.exception.errno, errno.EBADF) + + def test_cant_get_size(self): + # Emulate a case where src file size cannot be determined. + # Internally bufsize will be set to a small value and + # sendfile() will be called repeatedly. + with unittest.mock.patch('os.fstat', side_effect=OSError) as m: + with self.get_files() as (src, dst): + shutil._fastcopy_sendfile(src, dst) + assert m.called + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + + def test_small_chunks(self): + # Force internal file size detection to be smaller than the + # actual file size. We want to force sendfile() to be called + # multiple times, also in order to emulate a src fd which gets + # bigger while it is being copied. + mock = unittest.mock.Mock() + mock.st_size = 65536 + 1 + with unittest.mock.patch('os.fstat', return_value=mock) as m: + with self.get_files() as (src, dst): + shutil._fastcopy_sendfile(src, dst) + assert m.called + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + + def test_big_chunk(self): + # Force internal file size detection to be +100MB bigger than + # the actual file size. Make sure sendfile() does not rely on + # file size value except for (maybe) a better throughput / + # performance. + mock = unittest.mock.Mock() + mock.st_size = self.FILESIZE + (100 * 1024 * 1024) + with unittest.mock.patch('os.fstat', return_value=mock) as m: + with self.get_files() as (src, dst): + shutil._fastcopy_sendfile(src, dst) + assert m.called + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + + def test_blocksize_arg(self): + with unittest.mock.patch('os.sendfile', + side_effect=ZeroDivisionError) as m: + self.assertRaises(ZeroDivisionError, + shutil.copyfile, TESTFN, TESTFN2) + blocksize = m.call_args[0][3] + # Make sure file size and the block size arg passed to + # sendfile() are the same. + self.assertEqual(blocksize, os.path.getsize(TESTFN)) + # ...unless we're dealing with a small file. + support.unlink(TESTFN2) + write_file(TESTFN2, b"hello", binary=True) + self.addCleanup(support.unlink, TESTFN2 + '3') + self.assertRaises(ZeroDivisionError, + shutil.copyfile, TESTFN2, TESTFN2 + '3') + blocksize = m.call_args[0][3] + self.assertEqual(blocksize, 2 ** 23) + + def test_file2file_not_supported(self): + # Emulate a case where sendfile() only support file->socket + # fds. In such a case copyfile() is supposed to skip the + # fast-copy attempt from then on. + assert shutil._HAS_SENDFILE + try: + with unittest.mock.patch( + self.PATCHPOINT, + side_effect=OSError(errno.ENOTSOCK, "yo")) as m: + with self.get_files() as (src, dst): + with self.assertRaises(_GiveupOnFastCopy): + shutil._fastcopy_sendfile(src, dst) + assert m.called + assert not shutil._HAS_SENDFILE + + with unittest.mock.patch(self.PATCHPOINT) as m: + shutil.copyfile(TESTFN, TESTFN2) + assert not m.called + finally: + shutil._HAS_SENDFILE = True + + +@unittest.skipIf(not OSX, 'OSX only') +class TestZeroCopyOSX(_ZeroCopyFileTest, unittest.TestCase): + PATCHPOINT = "posix._fcopyfile" + + def zerocopy_fun(self, src, dst): + return shutil._fastcopy_osx(src, dst, posix._COPYFILE_DATA) + + class TermsizeTests(unittest.TestCase): def test_does_not_crash(self): """Check if get_terminal_size() returns a meaningful value. |