diff options
| author | ?ric Araujo <merwok@netwok.org> | 2011-01-29 17:12:02 +0100 |
|---|---|---|
| committer | ?ric Araujo <merwok@netwok.org> | 2011-01-29 17:12:02 +0100 |
| commit | c4229e44fffe980c8cf1c8dd800561f41df1affe (patch) | |
| tree | 6990dfa53720c7521ca71865034dc1c904bd51f3 /distutils2 | |
| parent | 04b4880fd837985753ed13e60aca3a8b77798d2d (diff) | |
| parent | f31dc8a82ef42b19c5b50234be764e7388f8c0ed (diff) | |
| download | disutils2-c4229e44fffe980c8cf1c8dd800561f41df1affe.tar.gz | |
Branch merge
Diffstat (limited to 'distutils2')
| -rw-r--r-- | distutils2/_backport/pkgutil.py | 64 | ||||
| -rw-r--r-- | distutils2/_backport/shutil.py | 280 | ||||
| -rw-r--r-- | distutils2/_backport/tests/test_pkgutil.py | 55 | ||||
| -rw-r--r-- | distutils2/_backport/tests/test_shutil.py | 945 | ||||
| -rw-r--r-- | distutils2/_backport/tests/test_sysconfig.py | 10 | ||||
| -rw-r--r-- | distutils2/command/cmd.py | 3 | ||||
| -rw-r--r-- | distutils2/tests/support.py | 28 | ||||
| -rw-r--r-- | distutils2/tests/test_command_install_dist.py | 8 |
8 files changed, 1222 insertions, 171 deletions
diff --git a/distutils2/_backport/pkgutil.py b/distutils2/_backport/pkgutil.py index ec81017..0c5c860 100644 --- a/distutils2/_backport/pkgutil.py +++ b/distutils2/_backport/pkgutil.py @@ -1,24 +1,19 @@ """Utilities to support packages.""" -# NOTE: This module must remain compatible with Python 2.3, as it is shared -# by setuptools for distribution with Python 2.3 and up. - import os import sys import imp -import os.path +import re +import warnings from csv import reader as csv_reader from types import ModuleType from distutils2.errors import DistutilsError from distutils2.metadata import DistributionMetadata from distutils2.version import suggest_normalized_version, VersionPredicate -import zipimport try: import cStringIO as StringIO except ImportError: import StringIO -import re -import warnings __all__ = [ @@ -28,10 +23,14 @@ __all__ = [ 'Distribution', 'EggInfoDistribution', 'distinfo_dirname', 'get_distributions', 'get_distribution', 'get_file_users', 'provides_distribution', 'obsoletes_distribution', - 'enable_cache', 'disable_cache', 'clear_cache' + 'enable_cache', 'disable_cache', 'clear_cache', ] +########################## +# PEP 302 Implementation # +########################## + def read_code(stream): # This helper is needed in order for the :pep:`302` emulation to # correctly handle compiled files @@ -41,7 +40,7 @@ def read_code(stream): if magic != imp.get_magic(): return None - stream.read(4) # Skip timestamp + stream.read(4) # Skip timestamp return marshal.load(stream) @@ -173,7 +172,6 @@ def iter_modules(path=None, prefix=''): #@simplegeneric def iter_importer_modules(importer, prefix=''): - "" if not hasattr(importer, 'iter_modules'): return [] return importer.iter_modules(prefix) @@ -331,9 +329,9 @@ class ImpLoader(object): def get_filename(self, fullname=None): fullname = self._fix_name(fullname) mod_type = self.etc[2] - if self.etc[2] == imp.PKG_DIRECTORY: + if mod_type == imp.PKG_DIRECTORY: return self._get_delegate().get_filename() - elif self.etc[2] in (imp.PY_SOURCE, imp.PY_COMPILED, imp.C_EXTENSION): + elif mod_type in (imp.PY_SOURCE, imp.PY_COMPILED, imp.C_EXTENSION): return self.filename return None @@ -432,7 +430,8 @@ def iter_importers(fullname=""): import mechanism will find the latter. Items of the following types can be affected by this discrepancy: - ``imp.C_EXTENSION, imp.PY_SOURCE, imp.PY_COMPILED, imp.PKG_DIRECTORY`` + :data:`imp.C_EXTENSION`, :data:`imp.PY_SOURCE`, :data:`imp.PY_COMPILED`, + :data:`imp.PKG_DIRECTORY` """ if fullname.startswith('.'): raise ImportError("Relative module names not supported") @@ -534,13 +533,13 @@ def extend_path(path, name): # frozen package. Return the path unchanged in that case. return path - pname = os.path.join(*name.split('.')) # Reconstitute as relative path + pname = os.path.join(*name.split('.')) # Reconstitute as relative path # Just in case os.extsep != '.' sname = os.extsep.join(name.split('.')) sname_pkg = sname + os.extsep + "pkg" init_py = "__init__" + os.extsep + "py" - path = path[:] # Start with a copy of the existing path + path = path[:] # Start with a copy of the existing path for dir in sys.path: if not isinstance(dir, basestring) or not os.path.isdir(dir): @@ -565,7 +564,7 @@ def extend_path(path, name): line = line.rstrip('\n') if not line or line.startswith('#'): continue - path.append(line) # Don't check for existence! + path.append(line) # Don't check for existence! f.close() return path @@ -609,6 +608,7 @@ def get_data(package, resource): resource_name = os.path.join(*parts) return loader.get_data(resource_name) + ########################## # PEP 376 Implementation # ########################## @@ -616,12 +616,12 @@ def get_data(package, resource): DIST_FILES = ('INSTALLER', 'METADATA', 'RECORD', 'REQUESTED',) # Cache -_cache_name = {} # maps names to Distribution instances -_cache_name_egg = {} # maps names to EggInfoDistribution instances -_cache_path = {} # maps paths to Distribution instances -_cache_path_egg = {} # maps paths to EggInfoDistribution instances -_cache_generated = False # indicates if .dist-info distributions are cached -_cache_generated_egg = False # indicates if .dist-info and .egg are cached +_cache_name = {} # maps names to Distribution instances +_cache_name_egg = {} # maps names to EggInfoDistribution instances +_cache_path = {} # maps paths to Distribution instances +_cache_path_egg = {} # maps paths to EggInfoDistribution instances +_cache_generated = False # indicates if .dist-info distributions are cached +_cache_generated_egg = False # indicates if .dist-info and .egg are cached _cache_enabled = True @@ -636,6 +636,7 @@ def enable_cache(): _cache_enabled = True + def disable_cache(): """ Disables the internal cache. @@ -647,9 +648,10 @@ def disable_cache(): _cache_enabled = False + def clear_cache(): """ Clears the internal cache. """ - global _cache_name, _cache_name_egg, cache_path, _cache_path_egg, \ + global _cache_name, _cache_name_egg, _cache_path, _cache_path_egg, \ _cache_generated, _cache_generated_egg _cache_name = {} @@ -872,7 +874,8 @@ class EggInfoDistribution(object): if isinstance(strs, basestring): for s in strs.splitlines(): s = s.strip() - if s and not s.startswith('#'): # skip blank lines/comments + # skip blank lines/comments + if s and not s.startswith('#'): yield s else: for ss in strs: @@ -890,6 +893,7 @@ class EggInfoDistribution(object): except IOError: requires = None else: + # FIXME handle the case where zipfile is not available zipf = zipimport.zipimporter(path) fileobj = StringIO.StringIO(zipf.get_data('EGG-INFO/PKG-INFO')) self.metadata = DistributionMetadata(fileobj=fileobj) @@ -952,7 +956,7 @@ class EggInfoDistribution(object): version = match.group('first') if match.group('rest'): version += match.group('rest') - version = version.replace(' ', '') # trim spaces + version = version.replace(' ', '') # trim spaces if version is None: reqs.append(name) else: @@ -982,12 +986,6 @@ class EggInfoDistribution(object): __hash__ = object.__hash__ -def _normalize_dist_name(name): - """Returns a normalized name from the given *name*. - :rtype: string""" - return name.replace('-', '_') - - def distinfo_dirname(name, version): """ The *name* and *version* parameters are converted into their @@ -1007,7 +1005,7 @@ def distinfo_dirname(name, version): :returns: directory name :rtype: string""" file_extension = '.dist-info' - name = _normalize_dist_name(name) + name = name.replace('-', '_') normalized_version = suggest_normalized_version(version) # Because this is a lookup procedure, something will be returned even if # it is a version that cannot be normalized @@ -1148,7 +1146,7 @@ def provides_distribution(name, version=None, use_egg_info=False): raise DistutilsError(('Distribution %s has invalid ' + 'provides field: %s') \ % (dist.name, p)) - p_ver = p_ver[1:-1] # trim off the parenthesis + p_ver = p_ver[1:-1] # trim off the parenthesis if p_name == name and predicate.match(p_ver): yield dist break diff --git a/distutils2/_backport/shutil.py b/distutils2/_backport/shutil.py index 9d8b1fd..802d089 100644 --- a/distutils2/_backport/shutil.py +++ b/distutils2/_backport/shutil.py @@ -1,4 +1,4 @@ -"""Utility functions for copying files and directory trees. +"""Utility functions for copying and archiving files and directory trees. XXX The functions here don't copy the resource fork or other metadata on Mac. @@ -9,7 +9,13 @@ import sys import stat from os.path import abspath import fnmatch -from warnings import warn +import errno + +try: + import bz2 + _BZ2_SUPPORTED = True +except ImportError: + _BZ2_SUPPORTED = False try: from pwd import getpwnam @@ -21,9 +27,12 @@ try: except ImportError: getgrnam = None -__all__ = ["copyfileobj","copyfile","copymode","copystat","copy","copy2", - "copytree","move","rmtree","Error", "SpecialFileError", - "ExecError","make_archive"] +__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", + "copytree", "move", "rmtree", "Error", "SpecialFileError", + "ExecError", "make_archive", "get_archive_formats", + "register_archive_format", "unregister_archive_format", + "get_unpack_formats", "register_unpack_format", + "unregister_unpack_format", "unpack_archive"] class Error(EnvironmentError): pass @@ -35,6 +44,14 @@ class SpecialFileError(EnvironmentError): class ExecError(EnvironmentError): """Raised when a command could not be executed""" +class ReadError(EnvironmentError): + """Raised when an archive cannot be read""" + +class RegistryError(Exception): + """Raised when a registery operation with the archiving + and unpacking registeries fails""" + + try: WindowsError except NameError: @@ -50,7 +67,7 @@ def copyfileobj(fsrc, fdst, length=16*1024): def _samefile(src, dst): # Macintosh, Unix. - if hasattr(os.path,'samefile'): + if hasattr(os.path, 'samefile'): try: return os.path.samefile(src, dst) except OSError: @@ -63,10 +80,8 @@ def _samefile(src, dst): def copyfile(src, dst): """Copy data from src to dst""" if _samefile(src, dst): - raise Error, "`%s` and `%s` are the same file" % (src, dst) + raise Error("`%s` and `%s` are the same file" % (src, dst)) - fsrc = None - fdst = None for fn in [src, dst]: try: st = os.stat(fn) @@ -77,15 +92,16 @@ def copyfile(src, dst): # XXX What about other special files? (sockets, devices...) if stat.S_ISFIFO(st.st_mode): raise SpecialFileError("`%s` is a named pipe" % fn) + + fsrc = open(src, 'rb') try: - fsrc = open(src, 'rb') fdst = open(dst, 'wb') - copyfileobj(fsrc, fdst) - finally: - if fdst: + try: + copyfileobj(fsrc, fdst) + finally: fdst.close() - if fsrc: - fsrc.close() + finally: + fsrc.close() def copymode(src, dst): """Copy mode bits from src to dst""" @@ -103,8 +119,12 @@ def copystat(src, dst): if hasattr(os, 'chmod'): os.chmod(dst, mode) if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): - os.chflags(dst, st.st_flags) - + try: + os.chflags(dst, st.st_flags) + except OSError, why: + if (not hasattr(errno, 'EOPNOTSUPP') or + why.errno != errno.EOPNOTSUPP): + raise def copy(src, dst): """Copy data and mode bits ("cp src dst"). @@ -140,8 +160,9 @@ def ignore_patterns(*patterns): return set(ignored_names) return _ignore_patterns -def copytree(src, dst, symlinks=False, ignore=None): - """Recursively copy a directory tree using copy2(). +def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, + ignore_dangling_symlinks=False): + """Recursively copy a directory tree. The destination directory must not already exist. If exception(s) occur, an Error is raised with a list of reasons. @@ -149,7 +170,13 @@ def copytree(src, dst, symlinks=False, ignore=None): If the optional symlinks flag is true, symbolic links in the source tree result in symbolic links in the destination tree; if it is false, the contents of the files pointed to by symbolic - links are copied. + links are copied. If the file pointed by the symlink doesn't + exist, an exception will be added in the list of errors raised in + an Error exception at the end of the copy process. + + You can set the optional ignore_dangling_symlinks flag to true if you + want to silence this exception. Notice that this has no effect on + platforms that don't support os.symlink. The optional ignore argument is a callable. If given, it is called with the `src` parameter, which is the directory @@ -163,7 +190,10 @@ def copytree(src, dst, symlinks=False, ignore=None): list of names relative to the `src` directory that should not be copied. - XXX Consider this example code rather than the ultimate tool. + The optional copy_function argument is a callable that will be used + to copy each file. It will be called with the source path and the + destination path as arguments. By default, copy2() is used, but any + function that supports the same signature (like copy()) can be used. """ names = os.listdir(src) @@ -182,14 +212,21 @@ def copytree(src, dst, symlinks=False, ignore=None): srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: - if symlinks and os.path.islink(srcname): + if os.path.islink(srcname): linkto = os.readlink(srcname) - os.symlink(linkto, dstname) + if symlinks: + os.symlink(linkto, dstname) + else: + # ignore dangling symlink if the flag is on + if not os.path.exists(linkto) and ignore_dangling_symlinks: + continue + # otherwise let the copy occurs. copy2 will raise an error + copy_function(srcname, dstname) elif os.path.isdir(srcname): - copytree(srcname, dstname, symlinks, ignore) + copytree(srcname, dstname, symlinks, ignore, copy_function) else: # Will raise a SpecialFileError for unsupported file types - copy2(srcname, dstname) + copy_function(srcname, dstname) # catch the Error from the recursive copytree so that we can # continue with other files except Error, err: @@ -205,7 +242,7 @@ def copytree(src, dst, symlinks=False, ignore=None): else: errors.extend((src, dst, str(why))) if errors: - raise Error, errors + raise Error(errors) def rmtree(path, ignore_errors=False, onerror=None): """Recursively delete a directory tree. @@ -235,7 +272,7 @@ def rmtree(path, ignore_errors=False, onerror=None): names = [] try: names = os.listdir(path) - except os.error, err: + except os.error: onerror(os.listdir, path, sys.exc_info()) for name in names: fullname = os.path.join(path, name) @@ -248,7 +285,7 @@ def rmtree(path, ignore_errors=False, onerror=None): else: try: os.remove(fullname) - except os.error, err: + except os.error: onerror(os.remove, fullname, sys.exc_info()) try: os.rmdir(path) @@ -282,13 +319,13 @@ def move(src, dst): if os.path.isdir(dst): real_dst = os.path.join(dst, _basename(src)) if os.path.exists(real_dst): - raise Error, "Destination path '%s' already exists" % real_dst + raise Error("Destination path '%s' already exists" % real_dst) try: os.rename(src, real_dst) except OSError: if os.path.isdir(src): if _destinsrc(src, dst): - raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst) + raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst)) copytree(src, real_dst, symlinks=True) rmtree(src) else: @@ -333,40 +370,41 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, """Create a (possibly compressed) tar file from all the files under 'base_dir'. - 'compress' must be "gzip" (the default), "compress", "bzip2", or None. - (compress will be deprecated in Python 3.2) + 'compress' must be "gzip" (the default), "bzip2", or None. 'owner' and 'group' can be used to define an owner and a group for the archive that is being built. If not provided, the current owner and group will be used. The output tar file will be named 'base_dir' + ".tar", possibly plus - the appropriate compression extension (".gz", ".bz2" or ".Z"). + the appropriate compression extension (".gz", or ".bz2"). Returns the output filename. """ - tar_compression = {'gzip': 'gz', 'bzip2': 'bz2', None: '', 'compress': ''} - compress_ext = {'gzip': '.gz', 'bzip2': '.bz2', 'compress': '.Z'} + tar_compression = {'gzip': 'gz', None: ''} + compress_ext = {'gzip': '.gz'} + + if _BZ2_SUPPORTED: + tar_compression['bzip2'] = 'bz2' + compress_ext['bzip2'] = '.bz2' # flags for compression program, each element of list will be an argument if compress is not None and compress not in compress_ext: - raise ValueError, \ - ("bad value for 'compress': must be None, 'gzip', 'bzip2' " - "or 'compress'") - - archive_name = base_name + '.tar' - if compress != 'compress': - archive_name += compress_ext.get(compress, '') + raise ValueError("bad value for 'compress', or compression format not " + "supported: %s" % compress) + archive_name = base_name + '.tar' + compress_ext.get(compress, '') archive_dir = os.path.dirname(archive_name) + if not os.path.exists(archive_dir): if logger is not None: - logger.info("creating %s" % archive_dir) + logger.info("creating %s", archive_dir) if not dry_run: os.makedirs(archive_dir) - # creating the tarball + # XXX late import because of circular dependency between shutil and + # tarfile :( from distutils2._backport import tarfile if logger is not None: @@ -391,23 +429,9 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, finally: tar.close() - # compression using `compress` - # XXX this block will be removed in Python 3.2 - if compress == 'compress': - warn("'compress' will be deprecated.", PendingDeprecationWarning) - # the option varies depending on the platform - compressed_name = archive_name + compress_ext[compress] - if sys.platform == 'win32': - cmd = [compress, archive_name, compressed_name] - else: - cmd = [compress, '-f', archive_name] - from distutils2.spawn import spawn - spawn(cmd, dry_run=dry_run) - return compressed_name - return archive_name -def _call_external_zip(directory, verbose=False): +def _call_external_zip(base_dir, zip_filename, verbose=False, dry_run=False): # XXX see if we want to keep an external call here if verbose: zipoptions = "-r" @@ -420,8 +444,7 @@ def _call_external_zip(directory, verbose=False): except DistutilsExecError: # XXX really should distinguish between "couldn't find # external 'zip' command" and "zip failed". - raise ExecError, \ - ("unable to create zip file '%s': " + raise ExecError("unable to create zip file '%s': " "could neither import the 'zipfile' module nor " "find a standalone zip utility") % zip_filename @@ -451,7 +474,7 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): zipfile = None if zipfile is None: - _call_external_zip(base_dir, verbose) + _call_external_zip(base_dir, zip_filename, verbose, dry_run) else: if logger is not None: logger.info("creating '%s' and adding '%s' to it", @@ -475,12 +498,14 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): _ARCHIVE_FORMATS = { 'gztar': (_make_tarball, [('compress', 'gzip')], "gzip'ed tar-file"), 'bztar': (_make_tarball, [('compress', 'bzip2')], "bzip2'ed tar-file"), - 'ztar': (_make_tarball, [('compress', 'compress')], - "compressed tar file"), 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), - 'zip': (_make_zipfile, [],"ZIP file") + 'zip': (_make_zipfile, [], "ZIP file"), } +if _BZ2_SUPPORTED: + _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], + "bzip2'ed tar-file") + def get_archive_formats(): """Returns a list of supported formats for archiving and unarchiving. @@ -507,7 +532,7 @@ def register_archive_format(name, function, extra_args=None, description=''): if not isinstance(extra_args, (tuple, list)): raise TypeError('extra_args needs to be a sequence') for element in extra_args: - if not isinstance(element, (tuple, list)) or len(element) !=2 : + if not isinstance(element, (tuple, list)) or len(element) !=2: raise TypeError('extra_args elements are : (arg_name, value)') _ARCHIVE_FORMATS[name] = (function, extra_args, description) @@ -520,7 +545,7 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, """Create an archive file (eg. zip or tar). 'base_name' is the name of the file to create, minus any format-specific - extension; 'format' is the archive format: one of "zip", "tar", "ztar", + extension; 'format' is the archive format: one of "zip", "tar", "bztar" or "gztar". 'root_dir' is a directory that will be the root directory of the @@ -549,7 +574,7 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, try: format_info = _ARCHIVE_FORMATS[format] except KeyError: - raise ValueError, "unknown archive format '%s'" % format + raise ValueError("unknown archive format '%s'" % format) func = format_info[0] for arg, val in format_info[1]: @@ -570,6 +595,61 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, return filename +def get_unpack_formats(): + """Returns a list of supported formats for unpacking. + + Each element of the returned sequence is a tuple + (name, extensions, description) + """ + formats = [(name, info[0], info[3]) for name, info in + _UNPACK_FORMATS.iteritems()] + formats.sort() + return formats + +def _check_unpack_options(extensions, function, extra_args): + """Checks what gets registered as an unpacker.""" + # first make sure no other unpacker is registered for this extension + existing_extensions = {} + for name, info in _UNPACK_FORMATS.iteritems(): + for ext in info[0]: + existing_extensions[ext] = name + + for extension in extensions: + if extension in existing_extensions: + msg = '%s is already registered for "%s"' + raise RegistryError(msg % (extension, + existing_extensions[extension])) + + if not callable(function): + raise TypeError('The registered function must be a callable') + + +def register_unpack_format(name, extensions, function, extra_args=None, + description=''): + """Registers an unpack format. + + `name` is the name of the format. `extensions` is a list of extensions + corresponding to the format. + + `function` is the callable that will be + used to unpack archives. The callable will receive archives to unpack. + If it's unable to handle an archive, it needs to raise a ReadError + exception. + + If provided, `extra_args` is a sequence of + (name, value) tuples that will be passed as arguments to the callable. + description can be provided to describe the format, and will be returned + by the get_unpack_formats() function. + """ + if extra_args is None: + extra_args = [] + _check_unpack_options(extensions, function, extra_args) + _UNPACK_FORMATS[name] = extensions, function, extra_args, description + +def unregister_unpack_format(name): + """Removes the pack format from the registery.""" + del _UNPACK_FORMATS[name] + def _ensure_directory(path): """Ensure that the parent directory of `path` exists""" dirname = os.path.dirname(path) @@ -577,6 +657,8 @@ def _ensure_directory(path): os.makedirs(dirname) def _unpack_zipfile(filename, extract_dir): + """Unpack zip `filename` to `extract_dir` + """ try: import zipfile except ImportError: @@ -589,6 +671,8 @@ def _unpack_zipfile(filename, extract_dir): try: for info in zip.infolist(): name = info.filename + + # don't extract absolute paths or ones with .. in them if name.startswith('/') or '..' in name: continue @@ -600,7 +684,7 @@ def _unpack_zipfile(filename, extract_dir): if not name.endswith('/'): # file data = zip.read(info.filename) - f = open(target,'wb') + f = open(target, 'wb') try: f.write(data) finally: @@ -610,6 +694,9 @@ def _unpack_zipfile(filename, extract_dir): zip.close() def _unpack_tarfile(filename, extract_dir): + """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` + """ + from distutils2._backport import tarfile try: tarobj = tarfile.open(filename) except tarfile.TarError: @@ -620,20 +707,55 @@ def _unpack_tarfile(filename, extract_dir): finally: tarobj.close() +_UNPACK_FORMATS = { + 'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"), + 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), + 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file") + } + +if _BZ2_SUPPORTED: + _UNPACK_FORMATS['bztar'] = (['.bz2'], _unpack_tarfile, [], + "bzip2'ed tar-file") + +def _find_unpack_format(filename): + for name, info in _UNPACK_FORMATS.iteritems(): + for extension in info[0]: + if filename.endswith(extension): + return name + return None + +def unpack_archive(filename, extract_dir=None, format=None): + """Unpack an archive. -_UNPACKERS = ( - (['.tar.gz', '.tgz', '.tar'], _unpack_tarfile), - (['.zip', '.egg'], _unpack_zipfile)) + `filename` is the name of the archive. + `extract_dir` is the name of the target directory, where the archive + is unpacked. If not provided, the current working directory is used. -def unpack_archive(filename, extract_dir=None): + `format` is the archive format: one of "zip", "tar", or "gztar". Or any + other registered format. If not provided, unpack_archive will use the + filename extension and see if an unpacker was registered for that + extension. + + In case none is found, a ValueError is raised. + """ if extract_dir is None: - extract_dir = os.path.dirname(filename) + extract_dir = os.getcwd() - for formats, func in _UNPACKERS: - for format in formats: - if filename.endswith(format): - func(filename, extract_dir) - return extract_dir + if format is not None: + try: + format_info = _UNPACK_FORMATS[format] + except KeyError: + raise ValueError("Unknown unpack format '{0}'".format(format)) + + func = format_info[0] + func(filename, extract_dir, **dict(format_info[1])) + else: + # we need to look at the registered unpackers supported extensions + format = _find_unpack_format(filename) + if format is None: + raise ReadError("Unknown archive format '{0}'".format(filename)) + func = _UNPACK_FORMATS[format][1] + kwargs = dict(_UNPACK_FORMATS[format][2]) raise ValueError('Unknown archive format: %s' % filename) diff --git a/distutils2/_backport/tests/test_pkgutil.py b/distutils2/_backport/tests/test_pkgutil.py index c07579f..ed27aee 100644 --- a/distutils2/_backport/tests/test_pkgutil.py +++ b/distutils2/_backport/tests/test_pkgutil.py @@ -12,10 +12,15 @@ try: except ImportError: from distutils2._backport.hashlib import md5 -from test.test_support import TESTFN - +from distutils2.errors import DistutilsError +from distutils2.metadata import DistributionMetadata from distutils2.tests import unittest, run_unittest, support + from distutils2._backport import pkgutil +from distutils2._backport.pkgutil import ( + Distribution, EggInfoDistribution, get_distribution, get_distributions, + provides_distribution, obsoletes_distribution, get_file_users, + distinfo_dirname, _yield_distributions) try: from os.path import relpath @@ -108,6 +113,12 @@ class TestPkgUtilData(unittest.TestCase): self.assertEqual(res1, RESOURCE_DATA) res2 = pkgutil.get_data(pkg, 'sub/res.txt') self.assertEqual(res2, RESOURCE_DATA) + + names = [] + for loader, name, ispkg in pkgutil.iter_modules([zip_file]): + names.append(name) + self.assertEqual(names, ['test_getdata_zipfile']) + del sys.path[0] del sys.modules[pkg] @@ -205,7 +216,7 @@ class TestPkgUtilDistribution(unittest.TestCase): record_writer.writerow(record_pieces( os.path.join(distinfo_dir, file))) record_writer.writerow([relpath(record_file, sys.prefix)]) - del record_writer # causes the RECORD file to close + del record_writer # causes the RECORD file to close record_reader = csv.reader(open(record_file, 'rb')) record_data = [] for row in record_reader: @@ -225,9 +236,6 @@ class TestPkgUtilDistribution(unittest.TestCase): def test_instantiation(self): # Test the Distribution class's instantiation provides us with usable # attributes. - # Import the Distribution class - from distutils2._backport.pkgutil import distinfo_dirname, Distribution - here = os.path.abspath(os.path.dirname(__file__)) name = 'choxie' version = '2.0.0.9' @@ -236,7 +244,6 @@ class TestPkgUtilDistribution(unittest.TestCase): dist = Distribution(dist_path) self.assertEqual(dist.name, name) - from distutils2.metadata import DistributionMetadata self.assertTrue(isinstance(dist.metadata, DistributionMetadata)) self.assertEqual(dist.metadata['version'], version) self.assertTrue(isinstance(dist.requested, type(bool()))) @@ -244,7 +251,6 @@ class TestPkgUtilDistribution(unittest.TestCase): def test_installed_files(self): # Test the iteration of installed files. # Test the distribution's installed files - from distutils2._backport.pkgutil import Distribution for distinfo_dir in self.distinfo_dirs: dist = Distribution(distinfo_dir) for path, md5_, size in dist.get_installed_files(): @@ -267,14 +273,12 @@ class TestPkgUtilDistribution(unittest.TestCase): false_path = relpath(os.path.join(*false_path), sys.prefix) # Test if the distribution uses the file in question - from distutils2._backport.pkgutil import Distribution dist = Distribution(distinfo_dir) self.assertTrue(dist.uses(true_path)) self.assertFalse(dist.uses(false_path)) def test_get_distinfo_file(self): # Test the retrieval of dist-info file objects. - from distutils2._backport.pkgutil import Distribution distinfo_name = 'choxie-2.0.0.9' other_distinfo_name = 'grammar-1.0a4' distinfo_dir = os.path.join(self.fake_dists_path, @@ -295,7 +299,6 @@ class TestPkgUtilDistribution(unittest.TestCase): # Is it the correct file? self.assertEqual(value.name, os.path.join(distinfo_dir, distfile)) - from distutils2.errors import DistutilsError # Test an absolute path that is part of another distributions dist-info other_distinfo_file = os.path.join(self.fake_dists_path, other_distinfo_name + '.dist-info', 'REQUESTED') @@ -307,7 +310,6 @@ class TestPkgUtilDistribution(unittest.TestCase): def test_get_distinfo_files(self): # Test for the iteration of RECORD path entries. - from distutils2._backport.pkgutil import Distribution distinfo_name = 'towel_stuff-0.1' distinfo_dir = os.path.join(self.fake_dists_path, distinfo_name + '.dist-info') @@ -345,7 +347,7 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, # Given a name and a version, we expect the distinfo_dirname function # to return a standard distribution information directory name. - items = [# (name, version, standard_dirname) + items = [ # (name, version, standard_dirname) # Test for a very simple single word name and decimal # version number ('docutils', '0.5', 'docutils-0.5.dist-info'), @@ -356,9 +358,6 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, ('python-ldap', '2.5 a---5', 'python_ldap-2.5 a---5.dist-info'), ] - # Import the function in question - from distutils2._backport.pkgutil import distinfo_dirname - # Loop through the items to validate the results for name, version, standard_dirname in items: dirname = distinfo_dirname(name, version) @@ -371,11 +370,6 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, ('towel-stuff', '0.1')] found_dists = [] - # Import the function in question - from distutils2._backport.pkgutil import get_distributions, \ - Distribution, \ - EggInfoDistribution - # Verify the fake dists have been found. dists = [dist for dist in get_distributions()] for dist in dists: @@ -416,12 +410,7 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, def test_get_distribution(self): # Test for looking up a distribution by name. # Test the lookup of the towel-stuff distribution - name = 'towel-stuff' # Note: This is different from the directory name - - # Import the function in question - from distutils2._backport.pkgutil import get_distribution, \ - Distribution, \ - EggInfoDistribution + name = 'towel-stuff' # Note: This is different from the directory name # Lookup the distribution dist = get_distribution(name) @@ -461,7 +450,6 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, def test_get_file_users(self): # Test the iteration of distributions that use a file. - from distutils2._backport.pkgutil import get_file_users, Distribution name = 'towel_stuff-0.1' path = os.path.join(self.fake_dists_path, name, 'towel_stuff', '__init__.py') @@ -471,9 +459,6 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, def test_provides(self): # Test for looking up distributions by what they provide - from distutils2._backport.pkgutil import provides_distribution - from distutils2.errors import DistutilsError - checkLists = lambda x, y: self.assertListEqual(sorted(x), sorted(y)) l = [dist.name for dist in provides_distribution('truffles')] @@ -522,12 +507,10 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, use_egg_info=True)] checkLists(l, ['strawberry']) - l = [dist.name for dist in provides_distribution('strawberry', '>0.6', use_egg_info=True)] checkLists(l, []) - l = [dist.name for dist in provides_distribution('banana', '0.4', use_egg_info=True)] checkLists(l, ['banana']) @@ -536,16 +519,12 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, use_egg_info=True)] checkLists(l, ['banana']) - l = [dist.name for dist in provides_distribution('banana', '!=0.4', use_egg_info=True)] checkLists(l, []) def test_obsoletes(self): # Test looking for distributions based on what they obsolete - from distutils2._backport.pkgutil import obsoletes_distribution - from distutils2.errors import DistutilsError - checkLists = lambda x, y: self.assertListEqual(sorted(x), sorted(y)) l = [dist.name for dist in obsoletes_distribution('truffles', '1.0')] @@ -555,7 +534,6 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, use_egg_info=True)] checkLists(l, ['cheese', 'bacon']) - l = [dist.name for dist in obsoletes_distribution('truffles', '0.8')] checkLists(l, ['choxie']) @@ -575,7 +553,6 @@ class TestPkgUtilPEP376(support.LoggingCatcher, support.WarningsCatcher, def test_yield_distribution(self): # tests the internal function _yield_distributions - from distutils2._backport.pkgutil import _yield_distributions checkLists = lambda x, y: self.assertListEqual(sorted(x), sorted(y)) eggs = [('bacon', '0.1'), ('banana', '0.4'), ('strawberry', '0.6'), diff --git a/distutils2/_backport/tests/test_shutil.py b/distutils2/_backport/tests/test_shutil.py new file mode 100644 index 0000000..55e7a0e --- /dev/null +++ b/distutils2/_backport/tests/test_shutil.py @@ -0,0 +1,945 @@ +import os +import sys +import tempfile +import stat +import tarfile +from os.path import splitdrive +from StringIO import StringIO + +from distutils.spawn import find_executable, spawn +from distutils2._backport import shutil +from distutils2._backport.shutil import ( + _make_tarball, _make_zipfile, make_archive, unpack_archive, + register_archive_format, unregister_archive_format, get_archive_formats, + register_unpack_format, unregister_unpack_format, get_unpack_formats, + Error, RegistryError) + +from distutils2.tests import unittest, support, TESTFN + +try: + import bz2 + BZ2_SUPPORTED = True +except ImportError: + BZ2_SUPPORTED = False + +TESTFN2 = TESTFN + "2" + +try: + import grp + import pwd + UID_GID_SUPPORT = True +except ImportError: + UID_GID_SUPPORT = False + +try: + import zlib +except ImportError: + zlib = None + +try: + import zipfile + ZIP_SUPPORT = True +except ImportError: + ZIP_SUPPORT = find_executable('zip') + +class TestShutil(unittest.TestCase): + + def setUp(self): + super(TestShutil, self).setUp() + self.tempdirs = [] + + def tearDown(self): + super(TestShutil, self).tearDown() + while self.tempdirs: + d = self.tempdirs.pop() + shutil.rmtree(d, os.name in ('nt', 'cygwin')) + + def write_file(self, path, content='xxx'): + """Writes a file in the given path. + + + path can be a string or a sequence. + """ + if isinstance(path, (list, tuple)): + path = os.path.join(*path) + f = open(path, 'w') + try: + f.write(content) + finally: + f.close() + + def mkdtemp(self): + """Create a temporary directory that will be cleaned up. + + Returns the path of the directory. + """ + d = tempfile.mkdtemp() + self.tempdirs.append(d) + return d + + def test_rmtree_errors(self): + # filename is guaranteed not to exist + filename = tempfile.mktemp() + self.assertRaises(OSError, shutil.rmtree, filename) + + # See bug #1071513 for why we don't run this on cygwin + # and bug #1076467 for why we don't run this as root. + if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin' + and not (hasattr(os, 'geteuid') and os.geteuid() == 0)): + def test_on_error(self): + self.errorState = 0 + os.mkdir(TESTFN) + self.childpath = os.path.join(TESTFN, 'a') + f = open(self.childpath, 'w') + f.close() + old_dir_mode = os.stat(TESTFN).st_mode + old_child_mode = os.stat(self.childpath).st_mode + # Make unwritable. + os.chmod(self.childpath, stat.S_IREAD) + os.chmod(TESTFN, stat.S_IREAD) + + shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) + # Test whether onerror has actually been called. + self.assertEqual(self.errorState, 2, + "Expected call to onerror function did not happen.") + + # Make writable again. + os.chmod(TESTFN, old_dir_mode) + os.chmod(self.childpath, old_child_mode) + + # Clean up. + shutil.rmtree(TESTFN) + + def check_args_to_onerror(self, func, arg, exc): + # test_rmtree_errors deliberately runs rmtree + # on a directory that is chmod 400, which will fail. + # This function is run when shutil.rmtree fails. + # 99.9% of the time it initially fails to remove + # a file in the directory, so the first time through + # func is os.remove. + # However, some Linux machines running ZFS on + # FUSE experienced a failure earlier in the process + # at os.listdir. The first failure may legally + # be either. + if self.errorState == 0: + if func is os.remove: + self.assertEqual(arg, self.childpath) + else: + self.assertIs(func, os.listdir, + "func must be either os.remove or os.listdir") + self.assertEqual(arg, TESTFN) + self.assertTrue(issubclass(exc[0], OSError)) + self.errorState = 1 + else: + self.assertEqual(func, os.rmdir) + self.assertEqual(arg, TESTFN) + self.assertTrue(issubclass(exc[0], OSError)) + self.errorState = 2 + + def test_rmtree_dont_delete_file(self): + # When called on a file instead of a directory, don't delete it. + handle, path = tempfile.mkstemp() + os.fdopen(handle).close() + self.assertRaises(OSError, shutil.rmtree, path) + os.remove(path) + + def _write_data(self, path, data): + f = open(path, "w") + f.write(data) + f.close() + + def test_copytree_simple(self): + + def read_data(path): + f = open(path) + data = f.read() + f.close() + return data + + src_dir = tempfile.mkdtemp() + dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') + self._write_data(os.path.join(src_dir, 'test.txt'), '123') + os.mkdir(os.path.join(src_dir, 'test_dir')) + self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') + + try: + shutil.copytree(src_dir, dst_dir) + self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) + self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) + self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', + 'test.txt'))) + actual = read_data(os.path.join(dst_dir, 'test.txt')) + self.assertEqual(actual, '123') + actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt')) + self.assertEqual(actual, '456') + finally: + for path in ( + os.path.join(src_dir, 'test.txt'), + os.path.join(dst_dir, 'test.txt'), + os.path.join(src_dir, 'test_dir', 'test.txt'), + os.path.join(dst_dir, 'test_dir', 'test.txt'), + ): + if os.path.exists(path): + os.remove(path) + for path in (src_dir, + os.path.dirname(dst_dir) + ): + if os.path.exists(path): + shutil.rmtree(path) + + def test_copytree_with_exclude(self): + + def read_data(path): + f = open(path) + data = f.read() + f.close() + return data + + # creating data + join = os.path.join + exists = os.path.exists + src_dir = tempfile.mkdtemp() + try: + dst_dir = join(tempfile.mkdtemp(), 'destination') + self._write_data(join(src_dir, 'test.txt'), '123') + self._write_data(join(src_dir, 'test.tmp'), '123') + os.mkdir(join(src_dir, 'test_dir')) + self._write_data(join(src_dir, 'test_dir', 'test.txt'), '456') + os.mkdir(join(src_dir, 'test_dir2')) + self._write_data(join(src_dir, 'test_dir2', 'test.txt'), '456') + os.mkdir(join(src_dir, 'test_dir2', 'subdir')) + os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) + self._write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), + '456') + self._write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), + '456') + + + # testing glob-like patterns + try: + patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') + shutil.copytree(src_dir, dst_dir, ignore=patterns) + # checking the result: some elements should not be copied + self.assertTrue(exists(join(dst_dir, 'test.txt'))) + self.assertTrue(not exists(join(dst_dir, 'test.tmp'))) + self.assertTrue(not exists(join(dst_dir, 'test_dir2'))) + finally: + if os.path.exists(dst_dir): + shutil.rmtree(dst_dir) + try: + patterns = shutil.ignore_patterns('*.tmp', 'subdir*') + shutil.copytree(src_dir, dst_dir, ignore=patterns) + # checking the result: some elements should not be copied + self.assertTrue(not exists(join(dst_dir, 'test.tmp'))) + self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2'))) + self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir'))) + finally: + if os.path.exists(dst_dir): + shutil.rmtree(dst_dir) + + # testing callable-style + try: + def _filter(src, names): + res = [] + for name in names: + path = os.path.join(src, name) + + if (os.path.isdir(path) and + path.split()[-1] == 'subdir'): + res.append(name) + elif os.path.splitext(path)[-1] in ('.py'): + res.append(name) + return res + + shutil.copytree(src_dir, dst_dir, ignore=_filter) + + # checking the result: some elements should not be copied + self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2', + 'test.py'))) + self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir'))) + + finally: + if os.path.exists(dst_dir): + shutil.rmtree(dst_dir) + finally: + shutil.rmtree(src_dir) + shutil.rmtree(os.path.dirname(dst_dir)) + + @support.skip_unless_symlink + def test_dont_copy_file_onto_link_to_itself(self): + # bug 851123. + os.mkdir(TESTFN) + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + try: + f = open(src, 'w') + f.write('cheddar') + f.close() + + if hasattr(os, "link"): + os.link(src, dst) + self.assertRaises(shutil.Error, shutil.copyfile, src, dst) + f = open(src, 'r') + try: + self.assertEqual(f.read(), 'cheddar') + finally: + f.close() + os.remove(dst) + + # Using `src` here would mean we end up with a symlink pointing + # to TESTFN/TESTFN/cheese, while it should point at + # TESTFN/cheese. + os.symlink('cheese', dst) + self.assertRaises(shutil.Error, shutil.copyfile, src, dst) + f = open(src, 'r') + try: + self.assertEqual(f.read(), 'cheddar') + finally: + f.close() + os.remove(dst) + finally: + try: + shutil.rmtree(TESTFN) + except OSError: + pass + + @support.skip_unless_symlink + def test_rmtree_on_symlink(self): + # bug 1669. + os.mkdir(TESTFN) + try: + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + os.mkdir(src) + os.symlink(src, dst) + self.assertRaises(OSError, shutil.rmtree, dst) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + + if hasattr(os, "mkfifo"): + # Issue #3002: copyfile and copytree block indefinitely on named pipes + def test_copyfile_named_pipe(self): + os.mkfifo(TESTFN) + try: + self.assertRaises(shutil.SpecialFileError, + shutil.copyfile, TESTFN, TESTFN2) + self.assertRaises(shutil.SpecialFileError, + shutil.copyfile, __file__, TESTFN) + finally: + os.remove(TESTFN) + + @unittest.skipUnless(hasattr(os, 'mkfifo'), 'requires os.mkfifo') + def test_copytree_named_pipe(self): + os.mkdir(TESTFN) + try: + subdir = os.path.join(TESTFN, "subdir") + os.mkdir(subdir) + pipe = os.path.join(subdir, "mypipe") + os.mkfifo(pipe) + try: + shutil.copytree(TESTFN, TESTFN2) + except shutil.Error, e: + errors = e.args[0] + self.assertEqual(len(errors), 1) + src, dst, error_msg = errors[0] + self.assertEqual("`%s` is a named pipe" % pipe, error_msg) + else: + self.fail("shutil.Error should have been raised") + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + shutil.rmtree(TESTFN2, ignore_errors=True) + + def test_copytree_special_func(self): + + src_dir = self.mkdtemp() + dst_dir = os.path.join(self.mkdtemp(), 'destination') + self._write_data(os.path.join(src_dir, 'test.txt'), '123') + os.mkdir(os.path.join(src_dir, 'test_dir')) + self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') + + copied = [] + def _copy(src, dst): + copied.append((src, dst)) + + shutil.copytree(src_dir, dst_dir, copy_function=_copy) + self.assertEquals(len(copied), 2) + + @support.skip_unless_symlink + def test_copytree_dangling_symlinks(self): + + # a dangling symlink raises an error at the end + src_dir = self.mkdtemp() + dst_dir = os.path.join(self.mkdtemp(), 'destination') + os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) + os.mkdir(os.path.join(src_dir, 'test_dir')) + self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') + self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) + + # a dangling symlink is ignored with the proper flag + dst_dir = os.path.join(self.mkdtemp(), 'destination2') + shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) + self.assertNotIn('test.txt', os.listdir(dst_dir)) + + # a dangling symlink is copied if symlinks=True + dst_dir = os.path.join(self.mkdtemp(), 'destination3') + shutil.copytree(src_dir, dst_dir, symlinks=True) + self.assertIn('test.txt', os.listdir(dst_dir)) + + @unittest.skipUnless(zlib, "requires zlib") + def test_make_tarball(self): + # creating something to tar + tmpdir = self.mkdtemp() + self.write_file([tmpdir, 'file1'], 'xxx') + self.write_file([tmpdir, 'file2'], 'xxx') + os.mkdir(os.path.join(tmpdir, 'sub')) + self.write_file([tmpdir, 'sub', 'file3'], 'xxx') + + tmpdir2 = self.mkdtemp() + unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0], + "source and target should be on same drive") + + base_name = os.path.join(tmpdir2, 'archive') + + # working with relative paths to avoid tar warnings + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(splitdrive(base_name)[1], '.') + finally: + os.chdir(old_dir) + + # check if the compressed tarball was created + tarball = base_name + '.tar.gz' + self.assertTrue(os.path.exists(tarball)) + + # trying an uncompressed one + base_name = os.path.join(tmpdir2, 'archive') + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(splitdrive(base_name)[1], '.', compress=None) + finally: + os.chdir(old_dir) + tarball = base_name + '.tar' + self.assertTrue(os.path.exists(tarball)) + + def _tarinfo(self, path): + tar = tarfile.open(path) + try: + names = tar.getnames() + names.sort() + return tuple(names) + finally: + tar.close() + + def _create_files(self): + # creating something to tar + tmpdir = self.mkdtemp() + dist = os.path.join(tmpdir, 'dist') + os.mkdir(dist) + self.write_file([dist, 'file1'], 'xxx') + self.write_file([dist, 'file2'], 'xxx') + os.mkdir(os.path.join(dist, 'sub')) + self.write_file([dist, 'sub', 'file3'], 'xxx') + os.mkdir(os.path.join(dist, 'sub2')) + tmpdir2 = self.mkdtemp() + base_name = os.path.join(tmpdir2, 'archive') + return tmpdir, tmpdir2, base_name + + @unittest.skipUnless(zlib, "Requires zlib") + @unittest.skipUnless(find_executable('tar') and find_executable('gzip'), + 'Need the tar command to run') + def test_tarfile_vs_tar(self): + tmpdir, tmpdir2, base_name = self._create_files() + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(base_name, 'dist') + finally: + os.chdir(old_dir) + + # check if the compressed tarball was created + tarball = base_name + '.tar.gz' + self.assertTrue(os.path.exists(tarball)) + + # now create another tarball using `tar` + tarball2 = os.path.join(tmpdir, 'archive2.tar.gz') + tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist'] + gzip_cmd = ['gzip', '-f9', 'archive2.tar'] + old_dir = os.getcwd() + old_stdout = sys.stdout + os.chdir(tmpdir) + sys.stdout = StringIO() + + try: + spawn(tar_cmd) + spawn(gzip_cmd) + finally: + os.chdir(old_dir) + sys.stdout = old_stdout + + self.assertTrue(os.path.exists(tarball2)) + # let's compare both tarballs + self.assertEquals(self._tarinfo(tarball), self._tarinfo(tarball2)) + + # trying an uncompressed one + base_name = os.path.join(tmpdir2, 'archive') + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(base_name, 'dist', compress=None) + finally: + os.chdir(old_dir) + tarball = base_name + '.tar' + self.assertTrue(os.path.exists(tarball)) + + # now for a dry_run + base_name = os.path.join(tmpdir2, 'archive') + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(base_name, 'dist', compress=None, dry_run=True) + finally: + os.chdir(old_dir) + tarball = base_name + '.tar' + self.assertTrue(os.path.exists(tarball)) + + @unittest.skipUnless(zlib, "Requires zlib") + @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') + def test_make_zipfile(self): + # creating something to tar + tmpdir = self.mkdtemp() + self.write_file([tmpdir, 'file1'], 'xxx') + self.write_file([tmpdir, 'file2'], 'xxx') + + tmpdir2 = self.mkdtemp() + base_name = os.path.join(tmpdir2, 'archive') + _make_zipfile(base_name, tmpdir) + + # check if the compressed tarball was created + tarball = base_name + '.zip' + self.assertTrue(os.path.exists(tarball)) + + + def test_make_archive(self): + tmpdir = self.mkdtemp() + base_name = os.path.join(tmpdir, 'archive') + self.assertRaises(ValueError, make_archive, base_name, 'xxx') + + @unittest.skipUnless(zlib, "Requires zlib") + def test_make_archive_owner_group(self): + # testing make_archive with owner and group, with various combinations + # this works even if there's not gid/uid support + if UID_GID_SUPPORT: + group = grp.getgrgid(0)[0] + owner = pwd.getpwuid(0)[0] + else: + group = owner = 'root' + + base_dir, root_dir, base_name = self._create_files() + base_name = os.path.join(self.mkdtemp() , 'archive') + res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, + group=group) + self.assertTrue(os.path.exists(res)) + + res = make_archive(base_name, 'zip', root_dir, base_dir) + self.assertTrue(os.path.exists(res)) + + res = make_archive(base_name, 'tar', root_dir, base_dir, + owner=owner, group=group) + self.assertTrue(os.path.exists(res)) + + res = make_archive(base_name, 'tar', root_dir, base_dir, + owner='kjhkjhkjg', group='oihohoh') + self.assertTrue(os.path.exists(res)) + + + @unittest.skipUnless(zlib, "Requires zlib") + @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") + def test_tarfile_root_owner(self): + tmpdir, tmpdir2, base_name = self._create_files() + old_dir = os.getcwd() + os.chdir(tmpdir) + group = grp.getgrgid(0)[0] + owner = pwd.getpwuid(0)[0] + try: + archive_name = _make_tarball(base_name, 'dist', compress=None, + owner=owner, group=group) + finally: + os.chdir(old_dir) + + # check if the compressed tarball was created + self.assertTrue(os.path.exists(archive_name)) + + # now checks the rights + archive = tarfile.open(archive_name) + try: + for member in archive.getmembers(): + self.assertEquals(member.uid, 0) + self.assertEquals(member.gid, 0) + finally: + archive.close() + + def test_make_archive_cwd(self): + current_dir = os.getcwd() + def _breaks(*args, **kw): + raise RuntimeError() + + register_archive_format('xxx', _breaks, [], 'xxx file') + try: + try: + make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) + except Exception: + pass + self.assertEquals(os.getcwd(), current_dir) + finally: + unregister_archive_format('xxx') + + def test_register_archive_format(self): + + self.assertRaises(TypeError, register_archive_format, 'xxx', 1) + self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, + 1) + self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, + [(1, 2), (1, 2, 3)]) + + register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') + formats = [name for name, params in get_archive_formats()] + self.assertIn('xxx', formats) + + unregister_archive_format('xxx') + formats = [name for name, params in get_archive_formats()] + self.assertNotIn('xxx', formats) + + def _compare_dirs(self, dir1, dir2): + # check that dir1 and dir2 are equivalent, + # return the diff + diff = [] + for root, dirs, files in os.walk(dir1): + for file_ in files: + path = os.path.join(root, file_) + target_path = os.path.join(dir2, os.path.split(path)[-1]) + if not os.path.exists(target_path): + diff.append(file_) + return diff + + @unittest.skipUnless(zlib, "Requires zlib") + def test_unpack_archive(self): + formats = ['tar', 'gztar', 'zip'] + if BZ2_SUPPORTED: + formats.append('bztar') + + for format in formats: + tmpdir = self.mkdtemp() + base_dir, root_dir, base_name = self._create_files() + tmpdir2 = self.mkdtemp() + filename = make_archive(base_name, format, root_dir, base_dir) + + # let's try to unpack it now + unpack_archive(filename, tmpdir2) + diff = self._compare_dirs(tmpdir, tmpdir2) + self.assertEquals(diff, []) + + def test_unpack_registery(self): + + formats = get_unpack_formats() + + def _boo(filename, extract_dir, extra): + self.assertEquals(extra, 1) + self.assertEquals(filename, 'stuff.boo') + self.assertEquals(extract_dir, 'xx') + + register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) + unpack_archive('stuff.boo', 'xx') + + # trying to register a .boo unpacker again + self.assertRaises(RegistryError, register_unpack_format, 'Boo2', + ['.boo'], _boo) + + # should work now + unregister_unpack_format('Boo') + register_unpack_format('Boo2', ['.boo'], _boo) + self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) + self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) + + # let's leave a clean state + unregister_unpack_format('Boo2') + self.assertEquals(get_unpack_formats(), formats) + + +class TestMove(unittest.TestCase): + + def setUp(self): + filename = "foo" + self.src_dir = tempfile.mkdtemp() + self.dst_dir = tempfile.mkdtemp() + self.src_file = os.path.join(self.src_dir, filename) + self.dst_file = os.path.join(self.dst_dir, filename) + # Try to create a dir in the current directory, hoping that it is + # not located on the same filesystem as the system tmp dir. + try: + self.dir_other_fs = tempfile.mkdtemp( + dir=os.path.dirname(__file__)) + self.file_other_fs = os.path.join(self.dir_other_fs, + filename) + except OSError: + self.dir_other_fs = None + f = open(self.src_file, "wb") + try: + f.write("spam") + finally: + f.close() + + def tearDown(self): + for d in (self.src_dir, self.dst_dir, self.dir_other_fs): + try: + if d: + shutil.rmtree(d) + except: + pass + + def _check_move_file(self, src, dst, real_dst): + f = open(src, "rb") + try: + contents = f.read() + finally: + f.close() + + shutil.move(src, dst) + f = open(real_dst, "rb") + try: + self.assertEqual(contents, f.read()) + finally: + f.close() + + self.assertFalse(os.path.exists(src)) + + def _check_move_dir(self, src, dst, real_dst): + contents = sorted(os.listdir(src)) + shutil.move(src, dst) + self.assertEqual(contents, sorted(os.listdir(real_dst))) + self.assertFalse(os.path.exists(src)) + + def test_move_file(self): + # Move a file to another location on the same filesystem. + self._check_move_file(self.src_file, self.dst_file, self.dst_file) + + def test_move_file_to_dir(self): + # Move a file inside an existing dir on the same filesystem. + self._check_move_file(self.src_file, self.dst_dir, self.dst_file) + + def test_move_file_other_fs(self): + # Move a file to an existing dir on another filesystem. + if not self.dir_other_fs: + # skip + return + self._check_move_file(self.src_file, self.file_other_fs, + self.file_other_fs) + + def test_move_file_to_dir_other_fs(self): + # Move a file to another location on another filesystem. + if not self.dir_other_fs: + # skip + return + self._check_move_file(self.src_file, self.dir_other_fs, + self.file_other_fs) + + def test_move_dir(self): + # Move a dir to another location on the same filesystem. + dst_dir = tempfile.mktemp() + try: + self._check_move_dir(self.src_dir, dst_dir, dst_dir) + finally: + try: + shutil.rmtree(dst_dir) + except: + pass + + def test_move_dir_other_fs(self): + # Move a dir to another location on another filesystem. + if not self.dir_other_fs: + # skip + return + dst_dir = tempfile.mktemp(dir=self.dir_other_fs) + try: + self._check_move_dir(self.src_dir, dst_dir, dst_dir) + finally: + try: + shutil.rmtree(dst_dir) + except: + pass + + def test_move_dir_to_dir(self): + # Move a dir inside an existing dir on the same filesystem. + self._check_move_dir(self.src_dir, self.dst_dir, + os.path.join(self.dst_dir, os.path.basename(self.src_dir))) + + def test_move_dir_to_dir_other_fs(self): + # Move a dir inside an existing dir on another filesystem. + if not self.dir_other_fs: + # skip + return + self._check_move_dir(self.src_dir, self.dir_other_fs, + os.path.join(self.dir_other_fs, os.path.basename(self.src_dir))) + + def test_existing_file_inside_dest_dir(self): + # A file with the same name inside the destination dir already exists. + f = open(self.dst_file, "wb") + try: + pass + finally: + f.close() + self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) + + def test_dont_move_dir_in_itself(self): + # Moving a dir inside itself raises an Error. + dst = os.path.join(self.src_dir, "bar") + self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) + + def test_destinsrc_false_negative(self): + os.mkdir(TESTFN) + try: + for src, dst in [('srcdir', 'srcdir/dest')]: + src = os.path.join(TESTFN, src) + dst = os.path.join(TESTFN, dst) + self.assertTrue(shutil._destinsrc(src, dst), + msg='_destinsrc() wrongly concluded that ' + 'dst (%s) is not in src (%s)' % (dst, src)) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + + def test_destinsrc_false_positive(self): + os.mkdir(TESTFN) + try: + for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: + src = os.path.join(TESTFN, src) + dst = os.path.join(TESTFN, dst) + self.assertFalse(shutil._destinsrc(src, dst), + msg='_destinsrc() wrongly concluded that ' + 'dst (%s) is in src (%s)' % (dst, src)) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + + +class TestCopyFile(unittest.TestCase): + + _delete = False + + class Faux(object): + _entered = False + _exited_with = None + _raised = False + + def __init__(self, raise_in_exit=False, suppress_at_exit=True): + self._raise_in_exit = raise_in_exit + self._suppress_at_exit = suppress_at_exit + + def read(self, *args): + return '' + + def __enter__(self): + self._entered = True + + def __exit__(self, exc_type, exc_val, exc_tb): + self._exited_with = exc_type, exc_val, exc_tb + if self._raise_in_exit: + self._raised = True + raise IOError("Cannot close") + return self._suppress_at_exit + + def tearDown(self): + if self._delete: + del shutil.open + + def _set_shutil_open(self, func): + shutil.open = func + self._delete = True + + def test_w_source_open_fails(self): + def _open(filename, mode='r'): + if filename == 'srcfile': + raise IOError('Cannot open "srcfile"') + assert 0 # shouldn't reach here. + + self._set_shutil_open(_open) + + self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile') + + @unittest.skip("can't use the with statement and support 2.4") + def test_w_dest_open_fails(self): + + srcfile = self.Faux() + + def _open(filename, mode='r'): + if filename == 'srcfile': + return srcfile + if filename == 'destfile': + raise IOError('Cannot open "destfile"') + assert 0 # shouldn't reach here. + + self._set_shutil_open(_open) + + shutil.copyfile('srcfile', 'destfile') + self.assertTrue(srcfile._entered) + self.assertTrue(srcfile._exited_with[0] is IOError) + self.assertEqual(srcfile._exited_with[1].args, + ('Cannot open "destfile"',)) + + @unittest.skip("can't use the with statement and support 2.4") + def test_w_dest_close_fails(self): + + srcfile = self.Faux() + destfile = self.Faux(True) + + def _open(filename, mode='r'): + if filename == 'srcfile': + return srcfile + if filename == 'destfile': + return destfile + assert 0 # shouldn't reach here. + + self._set_shutil_open(_open) + + shutil.copyfile('srcfile', 'destfile') + self.assertTrue(srcfile._entered) + self.assertTrue(destfile._entered) + self.assertTrue(destfile._raised) + self.assertTrue(srcfile._exited_with[0] is IOError) + self.assertEqual(srcfile._exited_with[1].args, + ('Cannot close',)) + + @unittest.skip("can't use the with statement and support 2.4") + def test_w_source_close_fails(self): + + srcfile = self.Faux(True) + destfile = self.Faux() + + def _open(filename, mode='r'): + if filename == 'srcfile': + return srcfile + if filename == 'destfile': + return destfile + assert 0 # shouldn't reach here. + + self._set_shutil_open(_open) + + self.assertRaises(IOError, + shutil.copyfile, 'srcfile', 'destfile') + self.assertTrue(srcfile._entered) + self.assertTrue(destfile._entered) + self.assertFalse(destfile._raised) + self.assertTrue(srcfile._exited_with[0] is None) + self.assertTrue(srcfile._raised) + + +def test_suite(): + suite = unittest.TestSuite() + load = unittest.defaultTestLoader.loadTestsFromTestCase + suite.addTest(load(TestCopyFile)) + suite.addTest(load(TestMove)) + suite.addTest(load(TestShutil)) + return suite + + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') diff --git a/distutils2/_backport/tests/test_sysconfig.py b/distutils2/_backport/tests/test_sysconfig.py index b488ef2..cfeb418 100644 --- a/distutils2/_backport/tests/test_sysconfig.py +++ b/distutils2/_backport/tests/test_sysconfig.py @@ -4,7 +4,7 @@ import os import sys import subprocess import shutil -from copy import copy, deepcopy +from copy import copy from ConfigParser import RawConfigParser from StringIO import StringIO @@ -15,13 +15,9 @@ from distutils2._backport.sysconfig import ( get_scheme_names, _main, _SCHEMES) from distutils2.tests import unittest -from distutils2.tests.support import EnvironGuard +from distutils2.tests.support import EnvironGuard, skip_unless_symlink from test.test_support import TESTFN, unlink -try: - from test.test_support import skip_unless_symlink -except ImportError: - skip_unless_symlink = unittest.skip( - 'requires test.test_support.skip_unless_symlink') + class TestSysConfig(EnvironGuard, unittest.TestCase): diff --git a/distutils2/command/cmd.py b/distutils2/command/cmd.py index e63c46b..754fcf4 100644 --- a/distutils2/command/cmd.py +++ b/distutils2/command/cmd.py @@ -165,7 +165,10 @@ class Command(object): header = "command options for '%s':" % self.get_command_name() self.announce(indent + header, level=logging.INFO) indent = indent + " " + negative_opt = getattr(self, 'negative_opt', ()) for (option, _, _) in self.user_options: + if option in negative_opt: + continue option = option.replace('-', '_') if option[-1] == "=": option = option[:-1] diff --git a/distutils2/tests/support.py b/distutils2/tests/support.py index a5a6c21..158ed77 100644 --- a/distutils2/tests/support.py +++ b/distutils2/tests/support.py @@ -17,10 +17,11 @@ tearDown): super(SomeTestCase, self).setUp() ... # other setup code -Read each class' docstring to see its purpose and usage. - Also provided is a DummyCommand class, useful to mock commands in the -tests of another command that needs them (see docstring). +tests of another command that needs them, a create_distribution function +and a skip_unless_symlink decorator. + +Each class or function has a docstring to explain its purpose and usage. """ import os @@ -35,7 +36,8 @@ from distutils2.dist import Distribution from distutils2.tests import unittest __all__ = ['LoggingCatcher', 'WarningsCatcher', 'TempdirManager', - 'EnvironGuard', 'DummyCommand', 'unittest'] + 'EnvironGuard', 'DummyCommand', 'unittest', 'create_distribution', + 'skip_unless_symlink'] class LoggingCatcher(object): @@ -135,7 +137,7 @@ class TempdirManager(object): finally: f.close() - def create_dist(self, pkg_name='foo', **kw): + def create_dist(self, **kw): """Create a stub distribution object and files. This function creates a Distribution instance (use keyword arguments @@ -143,17 +145,19 @@ class TempdirManager(object): (currently an empty directory). It returns the path to the directory and the Distribution instance. - You can use TempdirManager.write_file to write any file in that + You can use self.write_file to write any file in that directory, e.g. setup scripts or Python modules. """ # Late import so that third parties can import support without # loading a ton of distutils2 modules in memory. from distutils2.dist import Distribution + if 'name' not in kw: + kw['name'] = 'foo' tmp_dir = self.mkdtemp() - pkg_dir = os.path.join(tmp_dir, pkg_name) - os.mkdir(pkg_dir) + project_dir = os.path.join(tmp_dir, kw['name']) + os.mkdir(project_dir) dist = Distribution(attrs=kw) - return pkg_dir, dist + return project_dir, dist class EnvironGuard(object): @@ -211,3 +215,9 @@ def create_distribution(configfiles=()): d.parse_command_line() return d + +try: + from test.test_support import skip_unless_symlink +except ImportError: + skip_unless_symlink = unittest.skip( + 'requires test.test_support.skip_unless_symlink') diff --git a/distutils2/tests/test_command_install_dist.py b/distutils2/tests/test_command_install_dist.py index 2846bec..6306bc5 100644 --- a/distutils2/tests/test_command_install_dist.py +++ b/distutils2/tests/test_command_install_dist.py @@ -180,8 +180,8 @@ class InstallTestCase(support.TempdirManager, cmd.user = 'user' self.assertRaises(DistutilsOptionError, cmd.finalize_options) - def test_record(self): - + def test_old_record(self): + # test pre-PEP 376 --record option (outside dist-info dir) install_dir = self.mkdtemp() pkgdir, dist = self.create_dist() @@ -189,11 +189,11 @@ class InstallTestCase(support.TempdirManager, cmd = install_dist(dist) dist.command_obj['install_dist'] = cmd cmd.root = install_dir - cmd.record = os.path.join(pkgdir, 'RECORD') + cmd.record = os.path.join(pkgdir, 'filelist') cmd.ensure_finalized() cmd.run() - # let's check the RECORD file was created with four + # let's check the record file was created with four # lines, one for each .dist-info entry: METADATA, # INSTALLER, REQUSTED, RECORD f = open(cmd.record) |
