"""Tests for the distutils2.install module.""" import os import logging from tempfile import mkstemp from distutils2 import install from distutils2.pypi.xmlrpc import Client from distutils2.metadata import Metadata from distutils2.tests.support import (LoggingCatcher, TempdirManager, unittest, fake_dec) from distutils2._backport.sysconfig import is_python_build try: import threading from distutils2.tests.pypi_server import use_xmlrpc_server except ImportError: threading = None use_xmlrpc_server = fake_dec class InstalledDist: """Distribution object, represent distributions currently installed on the system""" def __init__(self, name, version, deps): self.metadata = Metadata() self.name = name self.version = version self.metadata['Name'] = name self.metadata['Version'] = version self.metadata['Requires-Dist'] = deps def __repr__(self): return '' % self.metadata['Name'] class ToInstallDist: """Distribution that will be installed""" def __init__(self, files=False): self._files = files self.install_called = False self.install_called_with = {} self.uninstall_called = False self._real_files = [] self.name = "fake" self.version = "fake" if files: for f in range(0, 3): fp, fn = mkstemp() os.close(fp) self._real_files.append(fn) def _unlink_installed_files(self): if self._files: for fn in self._real_files: os.unlink(fn) def list_installed_files(self, **args): if self._files: return [(path, 'md5', 0) for path in self._real_files] def get_install(self, **args): return self.list_installed_files() class MagicMock: def __init__(self, return_value=None, raise_exception=False): self.called = False self._times_called = 0 self._called_with = [] self._return_value = return_value self._raise = raise_exception def __call__(self, *args, **kwargs): self.called = True self._times_called = self._times_called + 1 self._called_with.append((args, kwargs)) iterable = hasattr(self._raise, '__iter__') if self._raise: if ((not iterable and self._raise) or self._raise[self._times_called - 1]): raise Exception return self._return_value def called_with(self, *args, **kwargs): return (args, kwargs) in self._called_with def get_installed_dists(dists): """Return a list of fake installed dists. The list is name, version, deps""" objects = [] for name, version, deps in dists: objects.append(InstalledDist(name, version, deps)) return objects class TestInstall(LoggingCatcher, TempdirManager, unittest.TestCase): def _get_client(self, server, *args, **kwargs): return Client(server.full_address, *args, **kwargs) def _get_results(self, output): """return a list of results""" installed = [(o.name, str(o.version)) for o in output['install']] remove = [(o.name, str(o.version)) for o in output['remove']] conflict = [(o.name, str(o.version)) for o in output['conflict']] return installed, remove, conflict @unittest.skipIf(threading is None, 'needs threading') @use_xmlrpc_server() def test_existing_deps(self, server): # Test that the installer get the dependencies from the metadatas # and ask the index for this dependencies. # In this test case, we have choxie that is dependent from towel-stuff # 0.1, which is in-turn dependent on bacon <= 0.2: # choxie -> towel-stuff -> bacon. # Each release metadata is not provided in metadata 1.2. client = self._get_client(server) archive_path = '%s/distribution.tar.gz' % server.full_address server.xmlrpc.set_distributions([ {'name': 'choxie', 'version': '2.0.0.9', 'requires_dist': ['towel-stuff (0.1)'], 'url': archive_path}, {'name': 'towel-stuff', 'version': '0.1', 'requires_dist': ['bacon (<= 0.2)'], 'url': archive_path}, {'name': 'bacon', 'version': '0.1', 'requires_dist': [], 'url': archive_path}, ]) installed = get_installed_dists([('bacon', '0.1', [])]) output = install.get_infos("choxie", index=client, installed=installed) # we don't have installed bacon as it's already installed system-wide self.assertEqual(0, len(output['remove'])) self.assertEqual(2, len(output['install'])) readable_output = [(o.name, str(o.version)) for o in output['install']] self.assertIn(('towel-stuff', '0.1'), readable_output) self.assertIn(('choxie', '2.0.0.9'), readable_output) @unittest.skipIf(threading is None, 'needs threading') @use_xmlrpc_server() def test_upgrade_existing_deps(self, server): client = self._get_client(server) archive_path = '%s/distribution.tar.gz' % server.full_address server.xmlrpc.set_distributions([ {'name': 'choxie', 'version': '2.0.0.9', 'requires_dist': ['towel-stuff (0.1)'], 'url': archive_path}, {'name': 'towel-stuff', 'version': '0.1', 'requires_dist': ['bacon (>= 0.2)'], 'url': archive_path}, {'name': 'bacon', 'version': '0.2', 'requires_dist': [], 'url': archive_path}, ]) output = install.get_infos("choxie", index=client, installed=get_installed_dists([('bacon', '0.1', [])])) installed = [(o.name, str(o.version)) for o in output['install']] # we need bacon 0.2, but 0.1 is installed. # So we expect to remove 0.1 and to install 0.2 instead. remove = [(o.name, str(o.version)) for o in output['remove']] self.assertIn(('choxie', '2.0.0.9'), installed) self.assertIn(('towel-stuff', '0.1'), installed) self.assertIn(('bacon', '0.2'), installed) self.assertIn(('bacon', '0.1'), remove) self.assertEqual(0, len(output['conflict'])) @unittest.skipIf(threading is None, 'needs threading') @use_xmlrpc_server() def test_conflicts(self, server): # Tests that conflicts are detected client = self._get_client(server) archive_path = '%s/distribution.tar.gz' % server.full_address # choxie depends on towel-stuff, which depends on bacon. server.xmlrpc.set_distributions([ {'name': 'choxie', 'version': '2.0.0.9', 'requires_dist': ['towel-stuff (0.1)'], 'url': archive_path}, {'name': 'towel-stuff', 'version': '0.1', 'requires_dist': ['bacon (>= 0.2)'], 'url': archive_path}, {'name': 'bacon', 'version': '0.2', 'requires_dist': [], 'url': archive_path}, ]) # name, version, deps. already_installed = [('bacon', '0.1', []), ('chicken', '1.1', ['bacon (0.1)'])] output = install.get_infos( 'choxie', index=client, installed=get_installed_dists(already_installed)) # we need bacon 0.2, but 0.1 is installed. # So we expect to remove 0.1 and to install 0.2 instead. installed, remove, conflict = self._get_results(output) self.assertIn(('choxie', '2.0.0.9'), installed) self.assertIn(('towel-stuff', '0.1'), installed) self.assertIn(('bacon', '0.2'), installed) self.assertIn(('bacon', '0.1'), remove) self.assertIn(('chicken', '1.1'), conflict) @unittest.skipIf(threading is None, 'needs threading') @use_xmlrpc_server() def test_installation_unexisting_project(self, server): # Test that the isntalled raises an exception if the project does not # exists. client = self._get_client(server) self.assertRaises(install.InstallationException, install.get_infos, 'unexisting project', index=client) def test_move_files(self): # test that the files are really moved, and that the new path is # returned. path = self.mkdtemp() newpath = self.mkdtemp() files = [os.path.join(path, str(x)) for x in range(1, 20)] for f in files: open(f, 'ab+').close() output = [o for o in install._move_files(files, newpath)] # check that output return the list of old/new places for file_ in files: name = os.path.split(file_)[-1] newloc = os.path.join(newpath, name) self.assertIn((file_, newloc), output) # remove the files for f in [o[1] for o in output]: # o[1] is the new place os.remove(f) def test_update_infos(self): tests = [[ {'foo': ['foobar', 'foo', 'baz'], 'baz': ['foo', 'foo']}, {'foo': ['additional_content', 'yeah'], 'baz': ['test', 'foo']}, {'foo': ['foobar', 'foo', 'baz', 'additional_content', 'yeah'], 'baz': ['foo', 'foo', 'test', 'foo']}, ]] for dict1, dict2, expect in tests: install._update_infos(dict1, dict2) for key in expect: self.assertEqual(expect[key], dict1[key]) def test_install_custom_dir(self): dest = self.mkdtemp() project_dir, dist = self.create_dist( name='Spamlib', version='0.1', data_files={'spamd': '{scripts}/spamd'}) dist.name = MagicMock(return_value='Spamlib') dist.version = MagicMock(return_value='0.1') dist.unpack = MagicMock(return_value=project_dir) self.write_file([project_dir, 'setup.cfg'], ("[metadata]\n" "name = mypackage\n" "version = 0.1.0\n")) install.install_from_infos(dest, install=[dist]) self.assertEqual(len(os.listdir(dest)), 1) def test_install_dists_rollback(self): # if one of the distribution installation fails, call uninstall on all # installed distributions. old_install_dist = install._install_dist old_uninstall = getattr(install, 'uninstall', None) install._install_dist = MagicMock(return_value=[], raise_exception=(False, True)) install.remove = MagicMock() try: d1 = ToInstallDist() d2 = ToInstallDist() path = self.mkdtemp() self.assertRaises(Exception, install.install_dists, [d1, d2], path) self.assertTrue(install._install_dist.called_with(d1, path)) self.assertTrue(install.remove.called) finally: install._install_dist = old_install_dist install.remove = old_uninstall def test_install_dists_success(self): old_install_dist = install._install_dist install._install_dist = MagicMock(return_value=[]) try: # test that the install method is called on each distributions d1 = ToInstallDist() d2 = ToInstallDist() # should call install path = self.mkdtemp() install.install_dists([d1, d2], path) for dist in (d1, d2): self.assertTrue(install._install_dist.called_with(dist, path)) finally: install._install_dist = old_install_dist def test_install_from_infos_conflict(self): # assert conflicts raise an exception self.assertRaises(install.InstallationConflict, install.install_from_infos, conflicts=[ToInstallDist()]) def test_install_from_infos_remove_success(self): old_install_dists = install.install_dists install.install_dists = lambda x, y=None: None try: dists = [] for i in range(2): dists.append(ToInstallDist(files=True)) install.install_from_infos(remove=dists) # assert that the files have been removed for dist in dists: for f, md5, size in dist.list_installed_files(): self.assertFalse(os.path.exists(f)) finally: install.install_dists = old_install_dists def test_install_from_infos_remove_rollback(self): old_install_dist = install._install_dist old_uninstall = getattr(install, 'uninstall', None) install._install_dist = MagicMock(return_value=[], raise_exception=(False, True)) install.uninstall = MagicMock() try: # assert that if an error occurs, the removed files are restored. remove = [] for i in range(2): remove.append(ToInstallDist(files=True)) to_install = [ToInstallDist(), ToInstallDist()] temp_dir = self.mkdtemp() self.assertRaises(Exception, install.install_from_infos, install_path=temp_dir, install=to_install, remove=remove) # assert that the files are in the same place # assert that the files have been removed for dist in remove: for f, md5, size in dist.list_installed_files(): self.assertTrue(os.path.exists(f)) dist._unlink_installed_files() finally: install._install_dist = old_install_dist install.uninstall = old_uninstall def test_install_from_infos_install_succes(self): old_install_dist = install._install_dist install._install_dist = MagicMock([]) try: # assert that the distribution can be installed install_path = "my_install_path" to_install = [ToInstallDist(), ToInstallDist()] install.install_from_infos(install_path, install=to_install) for dist in to_install: install._install_dist.called_with(install_path) finally: install._install_dist = old_install_dist def test_install_permission_denied(self): # if we don't have access to the installation path, we should abort # immediately project = os.path.join(os.path.dirname(__file__), 'package.tgz') # when running from an uninstalled build, a warning is emitted and the # installation is not attempted if is_python_build(): self.assertFalse(install.install(project)) self.assertEqual(1, len(self.get_logs(logging.ERROR))) return install_path = self.mkdtemp() old_get_path = install.get_path install.get_path = lambda path: install_path old_mod = os.stat(install_path).st_mode os.chmod(install_path, 0) try: self.assertFalse(install.install(project)) finally: os.chmod(install_path, old_mod) install.get_path = old_get_path def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestInstall)) return suite if __name__ == '__main__': unittest.main(defaultTest='test_suite')