diff options
author | Scott Moser <smoser@ubuntu.com> | 2016-08-10 09:06:15 -0600 |
---|---|---|
committer | Scott Moser <smoser@ubuntu.com> | 2016-08-10 09:06:15 -0600 |
commit | c3c3dc693c14175e110b5fe125d4d5f98ace9700 (patch) | |
tree | 8858702c2c8a6ad4bf1bb861a4565e0a9c28e588 /tests/unittests/test_datasource | |
parent | 5bd3493d732e5b1902872958e8681f17cbc81ce5 (diff) | |
download | cloud-init-trunk.tar.gz |
cloud-init development has moved its revision control to git.
It is available at
https://code.launchpad.net/cloud-init
Clone with
git clone https://git.launchpad.net/cloud-init
or
git clone git+ssh://git.launchpad.net/cloud-init
For more information see
https://git.launchpad.net/cloud-init/tree/HACKING.rst
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r-- | tests/unittests/test_datasource/__init__.py | 0 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 452 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 640 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure_helper.py | 412 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_cloudsigma.py | 99 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_cloudstack.py | 78 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 597 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_digitalocean.py | 127 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_gce.py | 166 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 163 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_nocloud.py | 178 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 300 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_openstack.py | 347 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 543 |
14 files changed, 0 insertions, 4102 deletions
diff --git a/tests/unittests/test_datasource/__init__.py b/tests/unittests/test_datasource/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/tests/unittests/test_datasource/__init__.py +++ /dev/null diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py deleted file mode 100644 index 12966563..00000000 --- a/tests/unittests/test_datasource/test_altcloud.py +++ /dev/null @@ -1,452 +0,0 @@ -# vi: ts=4 expandtab -# -# Copyright (C) 2009-2010 Canonical Ltd. -# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. -# Copyright (C) 2012 Yahoo! Inc. -# -# Author: Joe VLcek <JVLcek@RedHat.com> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -''' -This test file exercises the code in sources DataSourceAltCloud.py -''' - -import os -import shutil -import tempfile - -from cloudinit import helpers -from cloudinit import util -from unittest import TestCase - -# Get the cloudinit.sources.DataSourceAltCloud import items needed. -import cloudinit.sources.DataSourceAltCloud -from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud -from cloudinit.sources.DataSourceAltCloud import read_user_data_callback - -OS_UNAME_ORIG = getattr(os, 'uname') - - -def _write_cloud_info_file(value): - ''' - Populate the CLOUD_INFO_FILE which would be populated - with a cloud backend identifier ImageFactory when building - an image with ImageFactory. - ''' - cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') - cifile.write(value) - cifile.close() - os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664) - - -def _remove_cloud_info_file(): - ''' - Remove the test CLOUD_INFO_FILE - ''' - os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) - - -def _write_user_data_files(mount_dir, value): - ''' - Populate the deltacloud_user_data_file the user_data_file - which would be populated with user data. - ''' - deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' - user_data_file = mount_dir + '/user-data.txt' - - udfile = open(deltacloud_user_data_file, 'w') - udfile.write(value) - udfile.close() - os.chmod(deltacloud_user_data_file, 0o664) - - udfile = open(user_data_file, 'w') - udfile.write(value) - udfile.close() - os.chmod(user_data_file, 0o664) - - -def _remove_user_data_files(mount_dir, - dc_file=True, - non_dc_file=True): - ''' - Remove the test files: deltacloud_user_data_file and - user_data_file - ''' - deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' - user_data_file = mount_dir + '/user-data.txt' - - # Ignore any failures removeing files that are already gone. - if dc_file: - try: - os.remove(deltacloud_user_data_file) - except OSError: - pass - - if non_dc_file: - try: - os.remove(user_data_file) - except OSError: - pass - - -def _dmi_data(expected): - ''' - Spoof the data received over DMI - ''' - def _data(key): - return expected - - return _data - - -class TestGetCloudType(TestCase): - ''' - Test to exercise method: DataSourceAltCloud.get_cloud_type() - ''' - - def setUp(self): - '''Set up.''' - self.paths = helpers.Paths({'cloud_dir': '/tmp'}) - self.dmi_data = util.read_dmi_data - # We have a different code path for arm to deal with LP1243287 - # We have to switch arch to x86_64 to avoid test failure - force_arch('x86_64') - - def tearDown(self): - # Reset - util.read_dmi_data = self.dmi_data - force_arch() - - def test_rhev(self): - ''' - Test method get_cloud_type() for RHEVm systems. - Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor - ''' - util.read_dmi_data = _dmi_data('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEqual('RHEV', dsrc.get_cloud_type()) - - def test_vsphere(self): - ''' - Test method get_cloud_type() for vSphere systems. - Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor - ''' - util.read_dmi_data = _dmi_data('VMware Virtual Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEqual('VSPHERE', dsrc.get_cloud_type()) - - def test_unknown(self): - ''' - Test method get_cloud_type() for unknown systems. - Forcing read_dmi_data return to match an unrecognized return. - ''' - util.read_dmi_data = _dmi_data('Unrecognized Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) - - -class TestGetDataCloudInfoFile(TestCase): - ''' - Test to exercise method: DataSourceAltCloud.get_data() - With a contrived CLOUD_INFO_FILE - ''' - def setUp(self): - '''Set up.''' - self.paths = helpers.Paths({'cloud_dir': '/tmp'}) - self.cloud_info_file = tempfile.mkstemp()[1] - self.dmi_data = util.read_dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - self.cloud_info_file - - def tearDown(self): - # Reset - - # Attempt to remove the temp file ignoring errors - try: - os.remove(self.cloud_info_file) - except OSError: - pass - - util.read_dmi_data = self.dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/etc/sysconfig/cloud-info' - - def test_rhev(self): - '''Success Test module get_data() forcing RHEV.''' - - _write_cloud_info_file('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) - dsrc.user_data_rhevm = lambda: True - self.assertEqual(True, dsrc.get_data()) - - def test_vsphere(self): - '''Success Test module get_data() forcing VSPHERE.''' - - _write_cloud_info_file('VSPHERE') - dsrc = DataSourceAltCloud({}, None, self.paths) - dsrc.user_data_vsphere = lambda: True - self.assertEqual(True, dsrc.get_data()) - - def test_fail_rhev(self): - '''Failure Test module get_data() forcing RHEV.''' - - _write_cloud_info_file('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) - dsrc.user_data_rhevm = lambda: False - self.assertEqual(False, dsrc.get_data()) - - def test_fail_vsphere(self): - '''Failure Test module get_data() forcing VSPHERE.''' - - _write_cloud_info_file('VSPHERE') - dsrc = DataSourceAltCloud({}, None, self.paths) - dsrc.user_data_vsphere = lambda: False - self.assertEqual(False, dsrc.get_data()) - - def test_unrecognized(self): - '''Failure Test module get_data() forcing unrecognized.''' - - _write_cloud_info_file('unrecognized') - dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEqual(False, dsrc.get_data()) - - -class TestGetDataNoCloudInfoFile(TestCase): - ''' - Test to exercise method: DataSourceAltCloud.get_data() - Without a CLOUD_INFO_FILE - ''' - def setUp(self): - '''Set up.''' - self.paths = helpers.Paths({'cloud_dir': '/tmp'}) - self.dmi_data = util.read_dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - 'no such file' - # We have a different code path for arm to deal with LP1243287 - # We have to switch arch to x86_64 to avoid test failure - force_arch('x86_64') - - def tearDown(self): - # Reset - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/etc/sysconfig/cloud-info' - util.read_dmi_data = self.dmi_data - # Return back to original arch - force_arch() - - def test_rhev_no_cloud_file(self): - '''Test No cloud info file module get_data() forcing RHEV.''' - - util.read_dmi_data = _dmi_data('RHEV Hypervisor') - dsrc = DataSourceAltCloud({}, None, self.paths) - dsrc.user_data_rhevm = lambda: True - self.assertEqual(True, dsrc.get_data()) - - def test_vsphere_no_cloud_file(self): - '''Test No cloud info file module get_data() forcing VSPHERE.''' - - util.read_dmi_data = _dmi_data('VMware Virtual Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) - dsrc.user_data_vsphere = lambda: True - self.assertEqual(True, dsrc.get_data()) - - def test_failure_no_cloud_file(self): - '''Test No cloud info file module get_data() forcing unrecognized.''' - - util.read_dmi_data = _dmi_data('Unrecognized Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEqual(False, dsrc.get_data()) - - -class TestUserDataRhevm(TestCase): - ''' - Test to exercise method: DataSourceAltCloud.user_data_rhevm() - ''' - def setUp(self): - '''Set up.''' - self.paths = helpers.Paths({'cloud_dir': '/tmp'}) - self.mount_dir = tempfile.mkdtemp() - - _write_user_data_files(self.mount_dir, 'test user data') - - def tearDown(self): - # Reset - - _remove_user_data_files(self.mount_dir) - - # Attempt to remove the temp dir ignoring errors - try: - shutil.rmtree(self.mount_dir) - except OSError: - pass - - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['/sbin/modprobe', 'floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5'] - - def test_mount_cb_fails(self): - '''Test user_data_rhevm() where mount_cb fails.''' - - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['echo', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - - self.assertEqual(False, dsrc.user_data_rhevm()) - - def test_modprobe_fails(self): - '''Test user_data_rhevm() where modprobe fails.''' - - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['ls', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - - self.assertEqual(False, dsrc.user_data_rhevm()) - - def test_no_modprobe_cmd(self): - '''Test user_data_rhevm() with no modprobe command.''' - - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['bad command', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - - self.assertEqual(False, dsrc.user_data_rhevm()) - - def test_udevadm_fails(self): - '''Test user_data_rhevm() where udevadm fails.''' - - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['ls', 'udevadm floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - - self.assertEqual(False, dsrc.user_data_rhevm()) - - def test_no_udevadm_cmd(self): - '''Test user_data_rhevm() with no udevadm command.''' - - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['bad command', 'udevadm floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - - self.assertEqual(False, dsrc.user_data_rhevm()) - - -class TestUserDataVsphere(TestCase): - ''' - Test to exercise method: DataSourceAltCloud.user_data_vsphere() - ''' - def setUp(self): - '''Set up.''' - self.paths = helpers.Paths({'cloud_dir': '/tmp'}) - self.mount_dir = tempfile.mkdtemp() - - _write_user_data_files(self.mount_dir, 'test user data') - - def tearDown(self): - # Reset - - _remove_user_data_files(self.mount_dir) - - # Attempt to remove the temp dir ignoring errors - try: - shutil.rmtree(self.mount_dir) - except OSError: - pass - - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/etc/sysconfig/cloud-info' - - def test_user_data_vsphere(self): - '''Test user_data_vsphere() where mount_cb fails.''' - - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir - - dsrc = DataSourceAltCloud({}, None, self.paths) - - self.assertEqual(False, dsrc.user_data_vsphere()) - - -class TestReadUserDataCallback(TestCase): - ''' - Test to exercise method: DataSourceAltCloud.read_user_data_callback() - ''' - def setUp(self): - '''Set up.''' - self.paths = helpers.Paths({'cloud_dir': '/tmp'}) - self.mount_dir = tempfile.mkdtemp() - - _write_user_data_files(self.mount_dir, 'test user data') - - def tearDown(self): - # Reset - - _remove_user_data_files(self.mount_dir) - - # Attempt to remove the temp dir ignoring errors - try: - shutil.rmtree(self.mount_dir) - except OSError: - pass - - def test_callback_both(self): - '''Test read_user_data_callback() with both files.''' - - self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) - - def test_callback_dc(self): - '''Test read_user_data_callback() with only DC file.''' - - _remove_user_data_files(self.mount_dir, - dc_file=False, - non_dc_file=True) - - self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) - - def test_callback_non_dc(self): - '''Test read_user_data_callback() with only non-DC file.''' - - _remove_user_data_files(self.mount_dir, - dc_file=True, - non_dc_file=False) - - self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) - - def test_callback_none(self): - '''Test read_user_data_callback() no files are found.''' - - _remove_user_data_files(self.mount_dir) - self.assertEqual(None, read_user_data_callback(self.mount_dir)) - - -def force_arch(arch=None): - - def _os_uname(): - return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', arch) - - if arch: - setattr(os, 'uname', _os_uname) - elif arch is None: - setattr(os, 'uname', OS_UNAME_ORIG) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py deleted file mode 100644 index e90e903c..00000000 --- a/tests/unittests/test_datasource/test_azure.py +++ /dev/null @@ -1,640 +0,0 @@ -from cloudinit import helpers -from cloudinit.util import b64e, decode_binary, load_file -from cloudinit.sources import DataSourceAzure - -from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest - -import crypt -import os -import shutil -import stat -import tempfile -import xml.etree.ElementTree as ET -import yaml - - -def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): - if data is None: - data = {'HostName': 'FOOHOST'} - if pubkeys is None: - pubkeys = {} - - content = """<?xml version="1.0" encoding="utf-8"?> -<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1" - xmlns:oe="http://schemas.dmtf.org/ovf/environment/1" - xmlns:wa="http://schemas.microsoft.com/windowsazure" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - - <wa:ProvisioningSection><wa:Version>1.0</wa:Version> - <LinuxProvisioningConfigurationSet - xmlns="http://schemas.microsoft.com/windowsazure" - xmlns:i="http://www.w3.org/2001/XMLSchema-instance"> - <ConfigurationSetType>LinuxProvisioningConfiguration</ConfigurationSetType> - """ - for key, dval in data.items(): - if isinstance(dval, dict): - val = dval.get('text') - attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items() - if k != 'text']) - else: - val = dval - attrs = "" - content += "<%s%s>%s</%s>\n" % (key, attrs, val, key) - - if userdata: - content += "<UserData>%s</UserData>\n" % (b64e(userdata)) - - if pubkeys: - content += "<SSH><PublicKeys>\n" - for fp, path, value in pubkeys: - content += " <PublicKey>" - if fp and path: - content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" % - (fp, path)) - if value: - content += "<Value>%s</Value>" % value - content += "</PublicKey>\n" - content += "</PublicKeys></SSH>" - content += """ - </LinuxProvisioningConfigurationSet> - </wa:ProvisioningSection> - <wa:PlatformSettingsSection><wa:Version>1.0</wa:Version> - <PlatformSettings xmlns="http://schemas.microsoft.com/windowsazure" - xmlns:i="http://www.w3.org/2001/XMLSchema-instance"> - <KmsServerHostname>kms.core.windows.net</KmsServerHostname> - <ProvisionGuestAgent>false</ProvisionGuestAgent> - <GuestAgentPackageName i:nil="true" /> - </PlatformSettings></wa:PlatformSettingsSection> -</Environment> - """ - - return content - - -class TestAzureDataSource(TestCase): - - def setUp(self): - super(TestAzureDataSource, self).setUp() - if PY26: - raise SkipTest("Does not work on python 2.6") - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - # patch cloud_dir, so our 'seed_dir' is guaranteed empty - self.paths = helpers.Paths({'cloud_dir': self.tmp}) - self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent') - - self.patches = ExitStack() - self.addCleanup(self.patches.close) - - super(TestAzureDataSource, self).setUp() - - def apply_patches(self, patches): - for module, name, new in patches: - self.patches.enter_context(mock.patch.object(module, name, new)) - - def _get_ds(self, data): - - def dsdevs(): - return data.get('dsdevs', []) - - def _invoke_agent(cmd): - data['agent_invoked'] = cmd - - def _wait_for_files(flist, _maxwait=None, _naplen=None): - data['waited'] = flist - return [] - - def _pubkeys_from_crt_files(flist): - data['pubkey_files'] = flist - return ["pubkey_from: %s" % f for f in flist] - - if data.get('ovfcontent') is not None: - populate_dir(os.path.join(self.paths.seed_dir, "azure"), - {'ovf-env.xml': data['ovfcontent']}) - - mod = DataSourceAzure - mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d - - self.get_metadata_from_fabric = mock.MagicMock(return_value={ - 'public-keys': [], - }) - - self.instance_id = 'test-instance-id' - - self.apply_patches([ - (mod, 'list_possible_azure_ds_devs', dsdevs), - (mod, 'invoke_agent', _invoke_agent), - (mod, 'wait_for_files', _wait_for_files), - (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files), - (mod, 'perform_hostname_bounce', mock.MagicMock()), - (mod, 'get_hostname', mock.MagicMock()), - (mod, 'set_hostname', mock.MagicMock()), - (mod, 'get_metadata_from_fabric', self.get_metadata_from_fabric), - (mod.util, 'read_dmi_data', mock.MagicMock( - return_value=self.instance_id)), - ]) - - dsrc = mod.DataSourceAzureNet( - data.get('sys_cfg', {}), distro=None, paths=self.paths) - - return dsrc - - def xml_equals(self, oxml, nxml): - """Compare two sets of XML to make sure they are equal""" - - def create_tag_index(xml): - et = ET.fromstring(xml) - ret = {} - for x in et.iter(): - ret[x.tag] = x - return ret - - def tags_exists(x, y): - for tag in x.keys(): - self.assertIn(tag, y) - for tag in y.keys(): - self.assertIn(tag, x) - - def tags_equal(x, y): - for x_tag, x_val in x.items(): - y_val = y.get(x_val.tag) - self.assertEqual(x_val.text, y_val.text) - - old_cnt = create_tag_index(oxml) - new_cnt = create_tag_index(nxml) - tags_exists(old_cnt, new_cnt) - tags_equal(old_cnt, new_cnt) - - def xml_notequals(self, oxml, nxml): - try: - self.xml_equals(oxml, nxml) - except AssertionError: - return - raise AssertionError("XML is the same") - - def test_basic_seed_dir(self): - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, "") - self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName']) - self.assertTrue(os.path.isfile( - os.path.join(self.waagent_d, 'ovf-env.xml'))) - - def test_waagent_d_has_0700_perms(self): - # we expect /var/lib/waagent to be created 0700 - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue(os.path.isdir(self.waagent_d)) - self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700) - - def test_user_cfg_set_agent_command_plain(self): - # set dscfg in via plaintext - # we must have friendly-to-xml formatted plaintext in yaml_cfg - # not all plaintext is expected to work. - yaml_cfg = "{agent_command: my_command}\n" - cfg = yaml.safe_load(yaml_cfg) - odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(data['agent_invoked'], cfg['agent_command']) - - def test_user_cfg_set_agent_command(self): - # set dscfg in via base64 encoded yaml - cfg = {'agent_command': "my_command"} - odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': b64e(yaml.dump(cfg)), - 'encoding': 'base64'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(data['agent_invoked'], cfg['agent_command']) - - def test_sys_cfg_set_agent_command(self): - sys_cfg = {'datasource': {'Azure': {'agent_command': '_COMMAND'}}} - data = {'ovfcontent': construct_valid_ovf_env(data={}), - 'sys_cfg': sys_cfg} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(data['agent_invoked'], '_COMMAND') - - def test_username_used(self): - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(dsrc.cfg['system_info']['default_user']['name'], - "myuser") - - def test_password_given(self): - odata = {'HostName': "myhost", 'UserName': "myuser", - 'UserPassword': "mypass"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue('default_user' in dsrc.cfg['system_info']) - defuser = dsrc.cfg['system_info']['default_user'] - - # default user should be updated username and should not be locked. - self.assertEqual(defuser['name'], odata['UserName']) - self.assertFalse(defuser['lock_passwd']) - # passwd is crypt formated string $id$salt$encrypted - # encrypting plaintext with salt value of everything up to final '$' - # should equal that after the '$' - pos = defuser['passwd'].rfind("$") + 1 - self.assertEqual(defuser['passwd'], - crypt.crypt(odata['UserPassword'], - defuser['passwd'][0:pos])) - - def test_userdata_plain(self): - mydata = "FOOBAR" - odata = {'UserData': {'text': mydata, 'encoding': 'plain'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(decode_binary(dsrc.userdata_raw), mydata) - - def test_userdata_found(self): - mydata = "FOOBAR" - odata = {'UserData': {'text': b64e(mydata), 'encoding': 'base64'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8')) - - def test_no_datasource_expected(self): - # no source should be found if no seed_dir and no devs - data = {} - dsrc = self._get_ds({}) - ret = dsrc.get_data() - self.assertFalse(ret) - self.assertFalse('agent_invoked' in data) - - def test_cfg_has_pubkeys_fingerprint(self): - odata = {'HostName': "myhost", 'UserName': "myuser"} - mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}] - pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] - data = {'ovfcontent': construct_valid_ovf_env(data=odata, - pubkeys=pubkeys)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - for mypk in mypklist: - self.assertIn(mypk, dsrc.cfg['_pubkeys']) - self.assertIn('pubkey_from', dsrc.metadata['public-keys'][-1]) - - def test_cfg_has_pubkeys_value(self): - # make sure that provided key is used over fingerprint - odata = {'HostName': "myhost", 'UserName': "myuser"} - mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': 'value1'}] - pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] - data = {'ovfcontent': construct_valid_ovf_env(data=odata, - pubkeys=pubkeys)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - - for mypk in mypklist: - self.assertIn(mypk, dsrc.cfg['_pubkeys']) - self.assertIn(mypk['value'], dsrc.metadata['public-keys']) - - def test_cfg_has_no_fingerprint_has_value(self): - # test value is used when fingerprint not provided - odata = {'HostName': "myhost", 'UserName': "myuser"} - mypklist = [{'fingerprint': None, 'path': 'path1', 'value': 'value1'}] - pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] - data = {'ovfcontent': construct_valid_ovf_env(data=odata, - pubkeys=pubkeys)} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - - for mypk in mypklist: - self.assertIn(mypk['value'], dsrc.metadata['public-keys']) - - def test_default_ephemeral(self): - # make sure the ephemeral device works - odata = {} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - cfg = dsrc.get_config_obj() - - self.assertEqual(dsrc.device_name_to_device("ephemeral0"), - "/dev/sdb") - assert 'disk_setup' in cfg - assert 'fs_setup' in cfg - self.assertIsInstance(cfg['disk_setup'], dict) - self.assertIsInstance(cfg['fs_setup'], list) - - def test_provide_disk_aliases(self): - # Make sure that user can affect disk aliases - dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}} - odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': b64e(yaml.dump(dscfg)), - 'encoding': 'base64'}} - usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'}, - 'ephemeral0': False}} - userdata = '#cloud-config' + yaml.dump(usercfg) + "\n" - - ovfcontent = construct_valid_ovf_env(data=odata, userdata=userdata) - data = {'ovfcontent': ovfcontent, 'sys_cfg': {}} - - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - cfg = dsrc.get_config_obj() - self.assertTrue(cfg) - - def test_userdata_arrives(self): - userdata = "This is my user-data" - xml = construct_valid_ovf_env(data={}, userdata=userdata) - data = {'ovfcontent': xml} - dsrc = self._get_ds(data) - dsrc.get_data() - - self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw) - - def test_password_redacted_in_ovf(self): - odata = {'HostName': "myhost", 'UserName': "myuser", - 'UserPassword': "mypass"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - dsrc = self._get_ds(data) - ret = dsrc.get_data() - - self.assertTrue(ret) - ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml') - - # The XML should not be same since the user password is redacted - on_disk_ovf = load_file(ovf_env_path) - self.xml_notequals(data['ovfcontent'], on_disk_ovf) - - # Make sure that the redacted password on disk is not used by CI - self.assertNotEqual(dsrc.cfg.get('password'), - DataSourceAzure.DEF_PASSWD_REDACTION) - - # Make sure that the password was really encrypted - et = ET.fromstring(on_disk_ovf) - for elem in et.iter(): - if 'UserPassword' in elem.tag: - self.assertEqual(DataSourceAzure.DEF_PASSWD_REDACTION, - elem.text) - - def test_ovf_env_arrives_in_waagent_dir(self): - xml = construct_valid_ovf_env(data={}, userdata="FOODATA") - dsrc = self._get_ds({'ovfcontent': xml}) - dsrc.get_data() - - # 'data_dir' is '/var/lib/waagent' (walinux-agent's state dir) - # we expect that the ovf-env.xml file is copied there. - ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml') - self.assertTrue(os.path.exists(ovf_env_path)) - self.xml_equals(xml, load_file(ovf_env_path)) - - def test_ovf_can_include_unicode(self): - xml = construct_valid_ovf_env(data={}) - xml = u'\ufeff{0}'.format(xml) - dsrc = self._get_ds({'ovfcontent': xml}) - dsrc.get_data() - - def test_exception_fetching_fabric_data_doesnt_propagate(self): - ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) - ds.ds_cfg['agent_command'] = '__builtin__' - self.get_metadata_from_fabric.side_effect = Exception - self.assertFalse(ds.get_data()) - - def test_fabric_data_included_in_metadata(self): - ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) - ds.ds_cfg['agent_command'] = '__builtin__' - self.get_metadata_from_fabric.return_value = {'test': 'value'} - ret = ds.get_data() - self.assertTrue(ret) - self.assertEqual('value', ds.metadata['test']) - - def test_instance_id_from_dmidecode_used(self): - ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) - ds.get_data() - self.assertEqual(self.instance_id, ds.metadata['instance-id']) - - def test_instance_id_from_dmidecode_used_for_builtin(self): - ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) - ds.ds_cfg['agent_command'] = '__builtin__' - ds.get_data() - self.assertEqual(self.instance_id, ds.metadata['instance-id']) - - -class TestAzureBounce(TestCase): - - def mock_out_azure_moving_parts(self): - self.patches.enter_context( - mock.patch.object(DataSourceAzure, 'invoke_agent')) - self.patches.enter_context( - mock.patch.object(DataSourceAzure, 'wait_for_files')) - self.patches.enter_context( - mock.patch.object(DataSourceAzure, 'list_possible_azure_ds_devs', - mock.MagicMock(return_value=[]))) - self.patches.enter_context( - mock.patch.object(DataSourceAzure, - 'find_fabric_formatted_ephemeral_disk', - mock.MagicMock(return_value=None))) - self.patches.enter_context( - mock.patch.object(DataSourceAzure, - 'find_fabric_formatted_ephemeral_part', - mock.MagicMock(return_value=None))) - self.patches.enter_context( - mock.patch.object(DataSourceAzure, 'get_metadata_from_fabric', - mock.MagicMock(return_value={}))) - self.patches.enter_context( - mock.patch.object(DataSourceAzure.util, 'read_dmi_data', - mock.MagicMock(return_value='test-instance-id'))) - - def setUp(self): - super(TestAzureBounce, self).setUp() - self.tmp = tempfile.mkdtemp() - self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent') - self.paths = helpers.Paths({'cloud_dir': self.tmp}) - self.addCleanup(shutil.rmtree, self.tmp) - DataSourceAzure.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d - self.patches = ExitStack() - self.mock_out_azure_moving_parts() - self.get_hostname = self.patches.enter_context( - mock.patch.object(DataSourceAzure, 'get_hostname')) - self.set_hostname = self.patches.enter_context( - mock.patch.object(DataSourceAzure, 'set_hostname')) - self.subp = self.patches.enter_context( - mock.patch('cloudinit.sources.DataSourceAzure.util.subp')) - - def tearDown(self): - self.patches.close() - - def _get_ds(self, ovfcontent=None): - if ovfcontent is not None: - populate_dir(os.path.join(self.paths.seed_dir, "azure"), - {'ovf-env.xml': ovfcontent}) - return DataSourceAzure.DataSourceAzureNet( - {}, distro=None, paths=self.paths) - - def get_ovf_env_with_dscfg(self, hostname, cfg): - odata = { - 'HostName': hostname, - 'dscfg': { - 'text': b64e(yaml.dump(cfg)), - 'encoding': 'base64' - } - } - return construct_valid_ovf_env(data=odata) - - def test_disabled_bounce_does_not_change_hostname(self): - cfg = {'hostname_bounce': {'policy': 'off'}} - self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data() - self.assertEqual(0, self.set_hostname.call_count) - - @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') - def test_disabled_bounce_does_not_perform_bounce( - self, perform_hostname_bounce): - cfg = {'hostname_bounce': {'policy': 'off'}} - self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data() - self.assertEqual(0, perform_hostname_bounce.call_count) - - def test_same_hostname_does_not_change_hostname(self): - host_name = 'unchanged-host-name' - self.get_hostname.return_value = host_name - cfg = {'hostname_bounce': {'policy': 'yes'}} - self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data() - self.assertEqual(0, self.set_hostname.call_count) - - @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') - def test_unchanged_hostname_does_not_perform_bounce( - self, perform_hostname_bounce): - host_name = 'unchanged-host-name' - self.get_hostname.return_value = host_name - cfg = {'hostname_bounce': {'policy': 'yes'}} - self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data() - self.assertEqual(0, perform_hostname_bounce.call_count) - - @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') - def test_force_performs_bounce_regardless(self, perform_hostname_bounce): - host_name = 'unchanged-host-name' - self.get_hostname.return_value = host_name - cfg = {'hostname_bounce': {'policy': 'force'}} - self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data() - self.assertEqual(1, perform_hostname_bounce.call_count) - - def test_different_hostnames_sets_hostname(self): - expected_hostname = 'azure-expected-host-name' - self.get_hostname.return_value = 'default-host-name' - self._get_ds( - self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data() - self.assertEqual(expected_hostname, - self.set_hostname.call_args_list[0][0][0]) - - @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') - def test_different_hostnames_performs_bounce( - self, perform_hostname_bounce): - expected_hostname = 'azure-expected-host-name' - self.get_hostname.return_value = 'default-host-name' - self._get_ds( - self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data() - self.assertEqual(1, perform_hostname_bounce.call_count) - - def test_different_hostnames_sets_hostname_back(self): - initial_host_name = 'default-host-name' - self.get_hostname.return_value = initial_host_name - self._get_ds( - self.get_ovf_env_with_dscfg('some-host-name', {})).get_data() - self.assertEqual(initial_host_name, - self.set_hostname.call_args_list[-1][0][0]) - - @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') - def test_failure_in_bounce_still_resets_host_name( - self, perform_hostname_bounce): - perform_hostname_bounce.side_effect = Exception - initial_host_name = 'default-host-name' - self.get_hostname.return_value = initial_host_name - self._get_ds( - self.get_ovf_env_with_dscfg('some-host-name', {})).get_data() - self.assertEqual(initial_host_name, - self.set_hostname.call_args_list[-1][0][0]) - - def test_environment_correct_for_bounce_command(self): - interface = 'int0' - hostname = 'my-new-host' - old_hostname = 'my-old-host' - self.get_hostname.return_value = old_hostname - cfg = {'hostname_bounce': {'interface': interface, 'policy': 'force'}} - data = self.get_ovf_env_with_dscfg(hostname, cfg) - self._get_ds(data).get_data() - self.assertEqual(1, self.subp.call_count) - bounce_env = self.subp.call_args[1]['env'] - self.assertEqual(interface, bounce_env['interface']) - self.assertEqual(hostname, bounce_env['hostname']) - self.assertEqual(old_hostname, bounce_env['old_hostname']) - - def test_default_bounce_command_used_by_default(self): - cmd = 'default-bounce-command' - DataSourceAzure.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd - cfg = {'hostname_bounce': {'policy': 'force'}} - data = self.get_ovf_env_with_dscfg('some-hostname', cfg) - self._get_ds(data).get_data() - self.assertEqual(1, self.subp.call_count) - bounce_args = self.subp.call_args[1]['args'] - self.assertEqual(cmd, bounce_args) - - @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') - def test_set_hostname_option_can_disable_bounce( - self, perform_hostname_bounce): - cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}} - data = self.get_ovf_env_with_dscfg('some-hostname', cfg) - self._get_ds(data).get_data() - - self.assertEqual(0, perform_hostname_bounce.call_count) - - def test_set_hostname_option_can_disable_hostname_set(self): - cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}} - data = self.get_ovf_env_with_dscfg('some-hostname', cfg) - self._get_ds(data).get_data() - - self.assertEqual(0, self.set_hostname.call_count) - - -class TestReadAzureOvf(TestCase): - def test_invalid_xml_raises_non_azure_ds(self): - invalid_xml = "<foo>" + construct_valid_ovf_env(data={}) - self.assertRaises(DataSourceAzure.BrokenAzureDataSource, - DataSourceAzure.read_azure_ovf, invalid_xml) - - def test_load_with_pubkeys(self): - mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}] - pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] - content = construct_valid_ovf_env(pubkeys=pubkeys) - (_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content) - for mypk in mypklist: - self.assertIn(mypk, cfg['_pubkeys']) diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py deleted file mode 100644 index 65202ff0..00000000 --- a/tests/unittests/test_datasource/test_azure_helper.py +++ /dev/null @@ -1,412 +0,0 @@ -import os - -from cloudinit.sources.helpers import azure as azure_helper - -from ..helpers import ExitStack, mock, TestCase - - -GOAL_STATE_TEMPLATE = """\ -<?xml version="1.0" encoding="utf-8"?> -<GoalState xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:noNamespaceSchemaLocation="goalstate10.xsd"> - <Version>2012-11-30</Version> - <Incarnation>{incarnation}</Incarnation> - <Machine> - <ExpectedState>Started</ExpectedState> - <StopRolesDeadlineHint>300000</StopRolesDeadlineHint> - <LBProbePorts> - <Port>16001</Port> - </LBProbePorts> - <ExpectHealthReport>FALSE</ExpectHealthReport> - </Machine> - <Container> - <ContainerId>{container_id}</ContainerId> - <RoleInstanceList> - <RoleInstance> - <InstanceId>{instance_id}</InstanceId> - <State>Started</State> - <Configuration> - <HostingEnvironmentConfig> - http://100.86.192.70:80/...hostingEnvironmentConfig... - </HostingEnvironmentConfig> - <SharedConfig>http://100.86.192.70:80/..SharedConfig..</SharedConfig> - <ExtensionsConfig> - http://100.86.192.70:80/...extensionsConfig... - </ExtensionsConfig> - <FullConfig>http://100.86.192.70:80/...fullConfig...</FullConfig> - <Certificates>{certificates_url}</Certificates> - <ConfigName>68ce47.0.68ce47.0.utl-trusty--292258.1.xml</ConfigName> - </Configuration> - </RoleInstance> - </RoleInstanceList> - </Container> -</GoalState> -""" - - -class TestFindEndpoint(TestCase): - - def setUp(self): - super(TestFindEndpoint, self).setUp() - patches = ExitStack() - self.addCleanup(patches.close) - - self.load_file = patches.enter_context( - mock.patch.object(azure_helper.util, 'load_file')) - - def test_missing_file(self): - self.load_file.side_effect = IOError - self.assertRaises(IOError, - azure_helper.WALinuxAgentShim.find_endpoint) - - def test_missing_special_azure_line(self): - self.load_file.return_value = '' - self.assertRaises(ValueError, - azure_helper.WALinuxAgentShim.find_endpoint) - - @staticmethod - def _build_lease_content(encoded_address): - return '\n'.join([ - 'lease {', - ' interface "eth0";', - ' option unknown-245 {0};'.format(encoded_address), - '}']) - - def test_latest_lease_used(self): - encoded_addresses = ['5:4:3:2', '4:3:2:1'] - file_content = '\n'.join([self._build_lease_content(encoded_address) - for encoded_address in encoded_addresses]) - self.load_file.return_value = file_content - self.assertEqual(encoded_addresses[-1].replace(':', '.'), - azure_helper.WALinuxAgentShim.find_endpoint()) - - -class TestExtractIpAddressFromLeaseValue(TestCase): - - def test_hex_string(self): - ip_address, encoded_address = '98.76.54.32', '62:4c:36:20' - self.assertEqual( - ip_address, - azure_helper.WALinuxAgentShim.get_ip_from_lease_value( - encoded_address - )) - - def test_hex_string_with_single_character_part(self): - ip_address, encoded_address = '4.3.2.1', '4:3:2:1' - self.assertEqual( - ip_address, - azure_helper.WALinuxAgentShim.get_ip_from_lease_value( - encoded_address - )) - - def test_packed_string(self): - ip_address, encoded_address = '98.76.54.32', 'bL6 ' - self.assertEqual( - ip_address, - azure_helper.WALinuxAgentShim.get_ip_from_lease_value( - encoded_address - )) - - def test_packed_string_with_escaped_quote(self): - ip_address, encoded_address = '100.72.34.108', 'dH\\"l' - self.assertEqual( - ip_address, - azure_helper.WALinuxAgentShim.get_ip_from_lease_value( - encoded_address - )) - - def test_packed_string_containing_a_colon(self): - ip_address, encoded_address = '100.72.58.108', 'dH:l' - self.assertEqual( - ip_address, - azure_helper.WALinuxAgentShim.get_ip_from_lease_value( - encoded_address - )) - - -class TestGoalStateParsing(TestCase): - - default_parameters = { - 'incarnation': 1, - 'container_id': 'MyContainerId', - 'instance_id': 'MyInstanceId', - 'certificates_url': 'MyCertificatesUrl', - } - - def _get_goal_state(self, http_client=None, **kwargs): - if http_client is None: - http_client = mock.MagicMock() - parameters = self.default_parameters.copy() - parameters.update(kwargs) - xml = GOAL_STATE_TEMPLATE.format(**parameters) - if parameters['certificates_url'] is None: - new_xml_lines = [] - for line in xml.splitlines(): - if 'Certificates' in line: - continue - new_xml_lines.append(line) - xml = '\n'.join(new_xml_lines) - return azure_helper.GoalState(xml, http_client) - - def test_incarnation_parsed_correctly(self): - incarnation = '123' - goal_state = self._get_goal_state(incarnation=incarnation) - self.assertEqual(incarnation, goal_state.incarnation) - - def test_container_id_parsed_correctly(self): - container_id = 'TestContainerId' - goal_state = self._get_goal_state(container_id=container_id) - self.assertEqual(container_id, goal_state.container_id) - - def test_instance_id_parsed_correctly(self): - instance_id = 'TestInstanceId' - goal_state = self._get_goal_state(instance_id=instance_id) - self.assertEqual(instance_id, goal_state.instance_id) - - def test_certificates_xml_parsed_and_fetched_correctly(self): - http_client = mock.MagicMock() - certificates_url = 'TestCertificatesUrl' - goal_state = self._get_goal_state( - http_client=http_client, certificates_url=certificates_url) - certificates_xml = goal_state.certificates_xml - self.assertEqual(1, http_client.get.call_count) - self.assertEqual(certificates_url, http_client.get.call_args[0][0]) - self.assertTrue(http_client.get.call_args[1].get('secure', False)) - self.assertEqual(http_client.get.return_value.contents, - certificates_xml) - - def test_missing_certificates_skips_http_get(self): - http_client = mock.MagicMock() - goal_state = self._get_goal_state( - http_client=http_client, certificates_url=None) - certificates_xml = goal_state.certificates_xml - self.assertEqual(0, http_client.get.call_count) - self.assertIsNone(certificates_xml) - - -class TestAzureEndpointHttpClient(TestCase): - - regular_headers = { - 'x-ms-agent-name': 'WALinuxAgent', - 'x-ms-version': '2012-11-30', - } - - def setUp(self): - super(TestAzureEndpointHttpClient, self).setUp() - patches = ExitStack() - self.addCleanup(patches.close) - - self.read_file_or_url = patches.enter_context( - mock.patch.object(azure_helper.util, 'read_file_or_url')) - - def test_non_secure_get(self): - client = azure_helper.AzureEndpointHttpClient(mock.MagicMock()) - url = 'MyTestUrl' - response = client.get(url, secure=False) - self.assertEqual(1, self.read_file_or_url.call_count) - self.assertEqual(self.read_file_or_url.return_value, response) - self.assertEqual(mock.call(url, headers=self.regular_headers), - self.read_file_or_url.call_args) - - def test_secure_get(self): - url = 'MyTestUrl' - certificate = mock.MagicMock() - expected_headers = self.regular_headers.copy() - expected_headers.update({ - "x-ms-cipher-name": "DES_EDE3_CBC", - "x-ms-guest-agent-public-x509-cert": certificate, - }) - client = azure_helper.AzureEndpointHttpClient(certificate) - response = client.get(url, secure=True) - self.assertEqual(1, self.read_file_or_url.call_count) - self.assertEqual(self.read_file_or_url.return_value, response) - self.assertEqual(mock.call(url, headers=expected_headers), - self.read_file_or_url.call_args) - - def test_post(self): - data = mock.MagicMock() - url = 'MyTestUrl' - client = azure_helper.AzureEndpointHttpClient(mock.MagicMock()) - response = client.post(url, data=data) - self.assertEqual(1, self.read_file_or_url.call_count) - self.assertEqual(self.read_file_or_url.return_value, response) - self.assertEqual( - mock.call(url, data=data, headers=self.regular_headers), - self.read_file_or_url.call_args) - - def test_post_with_extra_headers(self): - url = 'MyTestUrl' - client = azure_helper.AzureEndpointHttpClient(mock.MagicMock()) - extra_headers = {'test': 'header'} - client.post(url, extra_headers=extra_headers) - self.assertEqual(1, self.read_file_or_url.call_count) - expected_headers = self.regular_headers.copy() - expected_headers.update(extra_headers) - self.assertEqual( - mock.call(mock.ANY, data=mock.ANY, headers=expected_headers), - self.read_file_or_url.call_args) - - -class TestOpenSSLManager(TestCase): - - def setUp(self): - super(TestOpenSSLManager, self).setUp() - patches = ExitStack() - self.addCleanup(patches.close) - - self.subp = patches.enter_context( - mock.patch.object(azure_helper.util, 'subp')) - try: - self.open = patches.enter_context( - mock.patch('__builtin__.open')) - except ImportError: - self.open = patches.enter_context( - mock.patch('builtins.open')) - - @mock.patch.object(azure_helper, 'cd', mock.MagicMock()) - @mock.patch.object(azure_helper.tempfile, 'mkdtemp') - def test_openssl_manager_creates_a_tmpdir(self, mkdtemp): - manager = azure_helper.OpenSSLManager() - self.assertEqual(mkdtemp.return_value, manager.tmpdir) - - def test_generate_certificate_uses_tmpdir(self): - subp_directory = {} - - def capture_directory(*args, **kwargs): - subp_directory['path'] = os.getcwd() - - self.subp.side_effect = capture_directory - manager = azure_helper.OpenSSLManager() - self.assertEqual(manager.tmpdir, subp_directory['path']) - manager.clean_up() - - @mock.patch.object(azure_helper, 'cd', mock.MagicMock()) - @mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock()) - @mock.patch.object(azure_helper.util, 'del_dir') - def test_clean_up(self, del_dir): - manager = azure_helper.OpenSSLManager() - manager.clean_up() - self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list) - - -class TestWALinuxAgentShim(TestCase): - - def setUp(self): - super(TestWALinuxAgentShim, self).setUp() - patches = ExitStack() - self.addCleanup(patches.close) - - self.AzureEndpointHttpClient = patches.enter_context( - mock.patch.object(azure_helper, 'AzureEndpointHttpClient')) - self.find_endpoint = patches.enter_context( - mock.patch.object( - azure_helper.WALinuxAgentShim, 'find_endpoint')) - self.GoalState = patches.enter_context( - mock.patch.object(azure_helper, 'GoalState')) - self.OpenSSLManager = patches.enter_context( - mock.patch.object(azure_helper, 'OpenSSLManager')) - patches.enter_context( - mock.patch.object(azure_helper.time, 'sleep', mock.MagicMock())) - - def test_http_client_uses_certificate(self): - shim = azure_helper.WALinuxAgentShim() - shim.register_with_azure_and_fetch_data() - self.assertEqual( - [mock.call(self.OpenSSLManager.return_value.certificate)], - self.AzureEndpointHttpClient.call_args_list) - - def test_correct_url_used_for_goalstate(self): - self.find_endpoint.return_value = 'test_endpoint' - shim = azure_helper.WALinuxAgentShim() - shim.register_with_azure_and_fetch_data() - get = self.AzureEndpointHttpClient.return_value.get - self.assertEqual( - [mock.call('http://test_endpoint/machine/?comp=goalstate')], - get.call_args_list) - self.assertEqual( - [mock.call(get.return_value.contents, - self.AzureEndpointHttpClient.return_value)], - self.GoalState.call_args_list) - - def test_certificates_used_to_determine_public_keys(self): - shim = azure_helper.WALinuxAgentShim() - data = shim.register_with_azure_and_fetch_data() - self.assertEqual( - [mock.call(self.GoalState.return_value.certificates_xml)], - self.OpenSSLManager.return_value.parse_certificates.call_args_list) - self.assertEqual( - self.OpenSSLManager.return_value.parse_certificates.return_value, - data['public-keys']) - - def test_absent_certificates_produces_empty_public_keys(self): - self.GoalState.return_value.certificates_xml = None - shim = azure_helper.WALinuxAgentShim() - data = shim.register_with_azure_and_fetch_data() - self.assertEqual([], data['public-keys']) - - def test_correct_url_used_for_report_ready(self): - self.find_endpoint.return_value = 'test_endpoint' - shim = azure_helper.WALinuxAgentShim() - shim.register_with_azure_and_fetch_data() - expected_url = 'http://test_endpoint/machine?comp=health' - self.assertEqual( - [mock.call(expected_url, data=mock.ANY, extra_headers=mock.ANY)], - self.AzureEndpointHttpClient.return_value.post.call_args_list) - - def test_goal_state_values_used_for_report_ready(self): - self.GoalState.return_value.incarnation = 'TestIncarnation' - self.GoalState.return_value.container_id = 'TestContainerId' - self.GoalState.return_value.instance_id = 'TestInstanceId' - shim = azure_helper.WALinuxAgentShim() - shim.register_with_azure_and_fetch_data() - posted_document = ( - self.AzureEndpointHttpClient.return_value.post.call_args[1]['data'] - ) - self.assertIn('TestIncarnation', posted_document) - self.assertIn('TestContainerId', posted_document) - self.assertIn('TestInstanceId', posted_document) - - def test_clean_up_can_be_called_at_any_time(self): - shim = azure_helper.WALinuxAgentShim() - shim.clean_up() - - def test_clean_up_will_clean_up_openssl_manager_if_instantiated(self): - shim = azure_helper.WALinuxAgentShim() - shim.register_with_azure_and_fetch_data() - shim.clean_up() - self.assertEqual( - 1, self.OpenSSLManager.return_value.clean_up.call_count) - - def test_failure_to_fetch_goalstate_bubbles_up(self): - class SentinelException(Exception): - pass - self.AzureEndpointHttpClient.return_value.get.side_effect = ( - SentinelException) - shim = azure_helper.WALinuxAgentShim() - self.assertRaises(SentinelException, - shim.register_with_azure_and_fetch_data) - - -class TestGetMetadataFromFabric(TestCase): - - @mock.patch.object(azure_helper, 'WALinuxAgentShim') - def test_data_from_shim_returned(self, shim): - ret = azure_helper.get_metadata_from_fabric() - self.assertEqual( - shim.return_value.register_with_azure_and_fetch_data.return_value, - ret) - - @mock.patch.object(azure_helper, 'WALinuxAgentShim') - def test_success_calls_clean_up(self, shim): - azure_helper.get_metadata_from_fabric() - self.assertEqual(1, shim.return_value.clean_up.call_count) - - @mock.patch.object(azure_helper, 'WALinuxAgentShim') - def test_failure_in_registration_calls_clean_up(self, shim): - class SentinelException(Exception): - pass - shim.return_value.register_with_azure_and_fetch_data.side_effect = ( - SentinelException) - self.assertRaises(SentinelException, - azure_helper.get_metadata_from_fabric) - self.assertEqual(1, shim.return_value.clean_up.call_count) diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py deleted file mode 100644 index 2a42ce0c..00000000 --- a/tests/unittests/test_datasource/test_cloudsigma.py +++ /dev/null @@ -1,99 +0,0 @@ -# coding: utf-8 - -import copy - -from cloudinit.cs_utils import Cepko -from cloudinit.sources import DataSourceCloudSigma - -from .. import helpers as test_helpers - -SERVER_CONTEXT = { - "cpu": 1000, - "cpus_instead_of_cores": False, - "global_context": {"some_global_key": "some_global_val"}, - "mem": 1073741824, - "meta": { - "ssh_public_key": "ssh-rsa AAAAB3NzaC1yc2E.../hQ5D5 john@doe", - "cloudinit-user-data": "#cloud-config\n\n...", - }, - "name": "test_server", - "requirements": [], - "smp": 1, - "tags": ["much server", "very performance"], - "uuid": "65b2fb23-8c03-4187-a3ba-8b7c919e8890", - "vnc_password": "9e84d6cb49e46379", - "vendor_data": { - "location": "zrh", - "cloudinit": "#cloud-config\n\n...", - } -} - - -class CepkoMock(Cepko): - def __init__(self, mocked_context): - self.result = mocked_context - - def all(self): - return self - - -class DataSourceCloudSigmaTest(test_helpers.TestCase): - def setUp(self): - super(DataSourceCloudSigmaTest, self).setUp() - self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "") - self.datasource.is_running_in_cloudsigma = lambda: True - self.datasource.cepko = CepkoMock(SERVER_CONTEXT) - self.datasource.get_data() - - def test_get_hostname(self): - self.assertEqual("test_server", self.datasource.get_hostname()) - self.datasource.metadata['name'] = '' - self.assertEqual("65b2fb23", self.datasource.get_hostname()) - self.datasource.metadata['name'] = u'тест' - self.assertEqual("65b2fb23", self.datasource.get_hostname()) - - def test_get_public_ssh_keys(self): - self.assertEqual([SERVER_CONTEXT['meta']['ssh_public_key']], - self.datasource.get_public_ssh_keys()) - - def test_get_instance_id(self): - self.assertEqual(SERVER_CONTEXT['uuid'], - self.datasource.get_instance_id()) - - def test_metadata(self): - self.assertEqual(self.datasource.metadata, SERVER_CONTEXT) - - def test_user_data(self): - self.assertEqual(self.datasource.userdata_raw, - SERVER_CONTEXT['meta']['cloudinit-user-data']) - - def test_encoded_user_data(self): - encoded_context = copy.deepcopy(SERVER_CONTEXT) - encoded_context['meta']['base64_fields'] = 'cloudinit-user-data' - encoded_context['meta']['cloudinit-user-data'] = 'aGkgd29ybGQK' - self.datasource.cepko = CepkoMock(encoded_context) - self.datasource.get_data() - - self.assertEqual(self.datasource.userdata_raw, b'hi world\n') - - def test_vendor_data(self): - self.assertEqual(self.datasource.vendordata_raw, - SERVER_CONTEXT['vendor_data']['cloudinit']) - - def test_lack_of_vendor_data(self): - stripped_context = copy.deepcopy(SERVER_CONTEXT) - del stripped_context["vendor_data"] - self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "") - self.datasource.cepko = CepkoMock(stripped_context) - self.datasource.get_data() - - self.assertIsNone(self.datasource.vendordata_raw) - - def test_lack_of_cloudinit_key_in_vendor_data(self): - stripped_context = copy.deepcopy(SERVER_CONTEXT) - del stripped_context["vendor_data"]["cloudinit"] - self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "") - self.datasource.cepko = CepkoMock(stripped_context) - self.datasource.get_data() - - self.assertIsNone(self.datasource.vendordata_raw) diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py deleted file mode 100644 index b1aab17b..00000000 --- a/tests/unittests/test_datasource/test_cloudstack.py +++ /dev/null @@ -1,78 +0,0 @@ -from cloudinit import helpers -from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack - -from ..helpers import TestCase, mock, ExitStack - - -class TestCloudStackPasswordFetching(TestCase): - - def setUp(self): - super(TestCloudStackPasswordFetching, self).setUp() - self.patches = ExitStack() - self.addCleanup(self.patches.close) - mod_name = 'cloudinit.sources.DataSourceCloudStack' - self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name))) - self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name))) - - def _set_password_server_response(self, response_string): - subp = mock.MagicMock(return_value=(response_string, '')) - self.patches.enter_context( - mock.patch('cloudinit.sources.DataSourceCloudStack.util.subp', - subp)) - return subp - - def test_empty_password_doesnt_create_config(self): - self._set_password_server_response('') - ds = DataSourceCloudStack({}, None, helpers.Paths({})) - ds.get_data() - self.assertEqual({}, ds.get_config_obj()) - - def test_saved_password_doesnt_create_config(self): - self._set_password_server_response('saved_password') - ds = DataSourceCloudStack({}, None, helpers.Paths({})) - ds.get_data() - self.assertEqual({}, ds.get_config_obj()) - - def test_password_sets_password(self): - password = 'SekritSquirrel' - self._set_password_server_response(password) - ds = DataSourceCloudStack({}, None, helpers.Paths({})) - ds.get_data() - self.assertEqual(password, ds.get_config_obj()['password']) - - def test_bad_request_doesnt_stop_ds_from_working(self): - self._set_password_server_response('bad_request') - ds = DataSourceCloudStack({}, None, helpers.Paths({})) - self.assertTrue(ds.get_data()) - - def assertRequestTypesSent(self, subp, expected_request_types): - request_types = [] - for call in subp.call_args_list: - args = call[0][0] - for arg in args: - if arg.startswith('DomU_Request'): - request_types.append(arg.split()[1]) - self.assertEqual(expected_request_types, request_types) - - def test_valid_response_means_password_marked_as_saved(self): - password = 'SekritSquirrel' - subp = self._set_password_server_response(password) - ds = DataSourceCloudStack({}, None, helpers.Paths({})) - ds.get_data() - self.assertRequestTypesSent(subp, - ['send_my_password', 'saved_password']) - - def _check_password_not_saved_for(self, response_string): - subp = self._set_password_server_response(response_string) - ds = DataSourceCloudStack({}, None, helpers.Paths({})) - ds.get_data() - self.assertRequestTypesSent(subp, ['send_my_password']) - - def test_password_not_saved_if_empty(self): - self._check_password_not_saved_for('') - - def test_password_not_saved_if_already_saved(self): - self._check_password_not_saved_for('saved_password') - - def test_password_not_saved_if_bad_request(self): - self._check_password_not_saved_for('bad_request') diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py deleted file mode 100644 index 18551b92..00000000 --- a/tests/unittests/test_datasource/test_configdrive.py +++ /dev/null @@ -1,597 +0,0 @@ -from copy import copy -import json -import os -import shutil -import six -import tempfile - -from cloudinit import helpers -from cloudinit.net import eni -from cloudinit.net import network_state -from cloudinit import settings -from cloudinit.sources import DataSourceConfigDrive as ds -from cloudinit.sources.helpers import openstack -from cloudinit import util - -from ..helpers import TestCase, ExitStack, mock - - -PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' -EC2_META = { - 'ami-id': 'ami-00000001', - 'ami-launch-index': 0, - 'ami-manifest-path': 'FIXME', - 'block-device-mapping': { - 'ami': 'sda1', - 'ephemeral0': 'sda2', - 'root': '/dev/sda1', - 'swap': 'sda3'}, - 'hostname': 'sm-foo-test.novalocal', - 'instance-action': 'none', - 'instance-id': 'i-00000001', - 'instance-type': 'm1.tiny', - 'local-hostname': 'sm-foo-test.novalocal', - 'local-ipv4': None, - 'placement': {'availability-zone': 'nova'}, - 'public-hostname': 'sm-foo-test.novalocal', - 'public-ipv4': '', - 'public-keys': {'0': {'openssh-key': PUBKEY}}, - 'reservation-id': 'r-iru5qm4m', - 'security-groups': ['default'] -} -USER_DATA = b'#!/bin/sh\necho This is user data\n' -OSTACK_META = { - 'availability_zone': 'nova', - 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'}, - {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}], - 'hostname': 'sm-foo-test.novalocal', - 'meta': {'dsmode': 'local', 'my-meta': 'my-value'}, - 'name': 'sm-foo-test', - 'public_keys': {'mykey': PUBKEY}, - 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'} - -CONTENT_0 = b'This is contents of /etc/foo.cfg\n' -CONTENT_1 = b'# this is /etc/bar/bar.cfg\n' -NETWORK_DATA = { - 'services': [ - {'type': 'dns', 'address': '199.204.44.24'}, - {'type': 'dns', 'address': '199.204.47.54'} - ], - 'links': [ - {'vif_id': '2ecc7709-b3f7-4448-9580-e1ec32d75bbd', - 'ethernet_mac_address': 'fa:16:3e:69:b0:58', - 'type': 'ovs', 'mtu': None, 'id': 'tap2ecc7709-b3'}, - {'vif_id': '2f88d109-5b57-40e6-af32-2472df09dc33', - 'ethernet_mac_address': 'fa:16:3e:d4:57:ad', - 'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'}, - {'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc', - 'ethernet_mac_address': 'fa:16:3e:05:30:fe', - 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04', 'name': 'nic0'} - ], - 'networks': [ - {'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp', - 'network_id': '6d6357ac-0f70-4afa-8bd7-c274cc4ea235', - 'id': 'network0'}, - {'link': 'tap2f88d109-5b', 'type': 'ipv4_dhcp', - 'network_id': 'd227a9b3-6960-4d94-8976-ee5788b44f54', - 'id': 'network1'}, - {'link': 'tap1a5382f8-04', 'type': 'ipv4_dhcp', - 'network_id': 'dab2ba57-cae2-4311-a5ed-010b263891f5', - 'id': 'network2'} - ] -} - -NETWORK_DATA_2 = { - "services": [ - {"type": "dns", "address": "1.1.1.191"}, - {"type": "dns", "address": "1.1.1.4"}], - "networks": [ - {"network_id": "d94bbe94-7abc-48d4-9c82-4628ea26164a", "type": "ipv4", - "netmask": "255.255.255.248", "link": "eth0", - "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0", - "gateway": "2.2.2.9"}], - "ip_address": "2.2.2.10", "id": "network0-ipv4"}, - {"network_id": "ca447c83-6409-499b-aaef-6ad1ae995348", "type": "ipv4", - "netmask": "255.255.255.224", "link": "eth1", - "routes": [], "ip_address": "3.3.3.24", "id": "network1-ipv4"}], - "links": [ - {"ethernet_mac_address": "fa:16:3e:dd:50:9a", "mtu": 1500, - "type": "vif", "id": "eth0", "vif_id": "vif-foo1"}, - {"ethernet_mac_address": "fa:16:3e:a8:14:69", "mtu": 1500, - "type": "vif", "id": "eth1", "vif_id": "vif-foo2"}] -} - - -KNOWN_MACS = { - 'fa:16:3e:69:b0:58': 'enp0s1', - 'fa:16:3e:d4:57:ad': 'enp0s2', - 'fa:16:3e:dd:50:9a': 'foo1', - 'fa:16:3e:a8:14:69': 'foo2', - 'fa:16:3e:ed:9a:59': 'foo3', -} - -CFG_DRIVE_FILES_V2 = { - 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META), - 'ec2/2009-04-04/user-data': USER_DATA, - 'ec2/latest/meta-data.json': json.dumps(EC2_META), - 'ec2/latest/user-data': USER_DATA, - 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META), - 'openstack/2012-08-10/user_data': USER_DATA, - 'openstack/content/0000': CONTENT_0, - 'openstack/content/0001': CONTENT_1, - 'openstack/latest/meta_data.json': json.dumps(OSTACK_META), - 'openstack/latest/user_data': USER_DATA, - 'openstack/latest/network_data.json': json.dumps(NETWORK_DATA), - 'openstack/2015-10-15/meta_data.json': json.dumps(OSTACK_META), - 'openstack/2015-10-15/user_data': USER_DATA, - 'openstack/2015-10-15/network_data.json': json.dumps(NETWORK_DATA)} - - -class TestConfigDriveDataSource(TestCase): - - def setUp(self): - super(TestConfigDriveDataSource, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - def test_ec2_metadata(self): - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - found = ds.read_config_drive(self.tmp) - self.assertTrue('ec2-metadata' in found) - ec2_md = found['ec2-metadata'] - self.assertEqual(EC2_META, ec2_md) - - def test_dev_os_remap(self): - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, - None, - helpers.Paths({})) - found = ds.read_config_drive(self.tmp) - cfg_ds.metadata = found['metadata'] - name_tests = { - 'ami': '/dev/vda1', - 'root': '/dev/vda1', - 'ephemeral0': '/dev/vda2', - 'swap': '/dev/vda3', - } - for name, dev_name in name_tests.items(): - with ExitStack() as mocks: - provided_name = dev_name[len('/dev/'):] - provided_name = "s" + provided_name[1:] - find_mock = mocks.enter_context( - mock.patch.object(util, 'find_devs_with', - return_value=[provided_name])) - # We want os.path.exists() to return False on its first call, - # and True on its second call. We use a handy generator as - # the mock side effect for this. The mocked function returns - # what the side effect returns. - - def exists_side_effect(): - yield False - yield True - exists_mock = mocks.enter_context( - mock.patch.object(os.path, 'exists', - side_effect=exists_side_effect())) - device = cfg_ds.device_name_to_device(name) - self.assertEqual(dev_name, device) - - find_mock.assert_called_once_with(mock.ANY) - self.assertEqual(exists_mock.call_count, 2) - - def test_dev_os_map(self): - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, - None, - helpers.Paths({})) - found = ds.read_config_drive(self.tmp) - os_md = found['metadata'] - cfg_ds.metadata = os_md - name_tests = { - 'ami': '/dev/vda1', - 'root': '/dev/vda1', - 'ephemeral0': '/dev/vda2', - 'swap': '/dev/vda3', - } - for name, dev_name in name_tests.items(): - with ExitStack() as mocks: - find_mock = mocks.enter_context( - mock.patch.object(util, 'find_devs_with', - return_value=[dev_name])) - exists_mock = mocks.enter_context( - mock.patch.object(os.path, 'exists', - return_value=True)) - device = cfg_ds.device_name_to_device(name) - self.assertEqual(dev_name, device) - - find_mock.assert_called_once_with(mock.ANY) - exists_mock.assert_called_once_with(mock.ANY) - - def test_dev_ec2_remap(self): - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, - None, - helpers.Paths({})) - found = ds.read_config_drive(self.tmp) - ec2_md = found['ec2-metadata'] - os_md = found['metadata'] - cfg_ds.ec2_metadata = ec2_md - cfg_ds.metadata = os_md - name_tests = { - 'ami': '/dev/vda1', - 'root': '/dev/vda1', - 'ephemeral0': '/dev/vda2', - 'swap': '/dev/vda3', - None: None, - 'bob': None, - 'root2k': None, - } - for name, dev_name in name_tests.items(): - # We want os.path.exists() to return False on its first call, - # and True on its second call. We use a handy generator as - # the mock side effect for this. The mocked function returns - # what the side effect returns. - def exists_side_effect(): - yield False - yield True - with mock.patch.object(os.path, 'exists', - side_effect=exists_side_effect()): - device = cfg_ds.device_name_to_device(name) - self.assertEqual(dev_name, device) - # We don't assert the call count for os.path.exists() because - # not all of the entries in name_tests results in two calls to - # that function. Specifically, 'root2k' doesn't seem to call - # it at all. - - def test_dev_ec2_map(self): - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, - None, - helpers.Paths({})) - found = ds.read_config_drive(self.tmp) - ec2_md = found['ec2-metadata'] - os_md = found['metadata'] - cfg_ds.ec2_metadata = ec2_md - cfg_ds.metadata = os_md - name_tests = { - 'ami': '/dev/sda1', - 'root': '/dev/sda1', - 'ephemeral0': '/dev/sda2', - 'swap': '/dev/sda3', - None: None, - 'bob': None, - 'root2k': None, - } - for name, dev_name in name_tests.items(): - with mock.patch.object(os.path, 'exists', return_value=True): - device = cfg_ds.device_name_to_device(name) - self.assertEqual(dev_name, device) - - def test_dir_valid(self): - """Verify a dir is read as such.""" - - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - - found = ds.read_config_drive(self.tmp) - - expected_md = copy(OSTACK_META) - expected_md['instance-id'] = expected_md['uuid'] - expected_md['local-hostname'] = expected_md['hostname'] - - self.assertEqual(USER_DATA, found['userdata']) - self.assertEqual(expected_md, found['metadata']) - self.assertEqual(NETWORK_DATA, found['networkdata']) - self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0) - self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1) - - def test_seed_dir_valid_extra(self): - """Verify extra files do not affect datasource validity.""" - - data = copy(CFG_DRIVE_FILES_V2) - data["myfoofile.txt"] = "myfoocontent" - data["openstack/latest/random-file.txt"] = "random-content" - - populate_dir(self.tmp, data) - - found = ds.read_config_drive(self.tmp) - - expected_md = copy(OSTACK_META) - expected_md['instance-id'] = expected_md['uuid'] - expected_md['local-hostname'] = expected_md['hostname'] - - self.assertEqual(expected_md, found['metadata']) - - def test_seed_dir_bad_json_metadata(self): - """Verify that bad json in metadata raises BrokenConfigDriveDir.""" - data = copy(CFG_DRIVE_FILES_V2) - - data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}" - data["openstack/2015-10-15/meta_data.json"] = "non-json garbage {}" - data["openstack/latest/meta_data.json"] = "non-json garbage {}" - - populate_dir(self.tmp, data) - - self.assertRaises(openstack.BrokenMetadata, - ds.read_config_drive, self.tmp) - - def test_seed_dir_no_configdrive(self): - """Verify that no metadata raises NonConfigDriveDir.""" - - my_d = os.path.join(self.tmp, "non-configdrive") - data = copy(CFG_DRIVE_FILES_V2) - data["myfoofile.txt"] = "myfoocontent" - data["openstack/latest/random-file.txt"] = "random-content" - data["content/foo"] = "foocontent" - - self.assertRaises(openstack.NonReadable, - ds.read_config_drive, my_d) - - def test_seed_dir_missing(self): - """Verify that missing seed_dir raises NonConfigDriveDir.""" - my_d = os.path.join(self.tmp, "nonexistantdirectory") - self.assertRaises(openstack.NonReadable, - ds.read_config_drive, my_d) - - def test_find_candidates(self): - devs_with_answers = {} - - def my_devs_with(*args, **kwargs): - criteria = args[0] if len(args) else kwargs.pop('criteria', None) - return devs_with_answers.get(criteria, []) - - def my_is_partition(dev): - return dev[-1] in "0123456789" and not dev.startswith("sr") - - try: - orig_find_devs_with = util.find_devs_with - util.find_devs_with = my_devs_with - - orig_is_partition = util.is_partition - util.is_partition = my_is_partition - - devs_with_answers = {"TYPE=vfat": [], - "TYPE=iso9660": ["/dev/vdb"], - "LABEL=config-2": ["/dev/vdb"]} - self.assertEqual(["/dev/vdb"], ds.find_candidate_devs()) - - # add a vfat item - # zdd reverse sorts after vdb, but config-2 label is preferred - devs_with_answers['TYPE=vfat'] = ["/dev/zdd"] - self.assertEqual(["/dev/vdb", "/dev/zdd"], - ds.find_candidate_devs()) - - # verify that partitions are considered, that have correct label. - devs_with_answers = {"TYPE=vfat": ["/dev/sda1"], - "TYPE=iso9660": [], - "LABEL=config-2": ["/dev/vdb3"]} - self.assertEqual(["/dev/vdb3"], - ds.find_candidate_devs()) - - finally: - util.find_devs_with = orig_find_devs_with - util.is_partition = orig_is_partition - - @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') - def test_pubkeys_v2(self, on_first_boot): - """Verify that public-keys work in config-drive-v2.""" - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - myds = cfg_ds_from_dir(self.tmp) - self.assertEqual(myds.get_public_ssh_keys(), - [OSTACK_META['public_keys']['mykey']]) - - -class TestNetJson(TestCase): - def setUp(self): - super(TestNetJson, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.maxDiff = None - - @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') - def test_network_data_is_found(self, on_first_boot): - """Verify that network_data is present in ds in config-drive-v2.""" - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - myds = cfg_ds_from_dir(self.tmp) - self.assertIsNotNone(myds.network_json) - - @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') - def test_network_config_is_converted(self, on_first_boot): - """Verify that network_data is converted and present on ds object.""" - populate_dir(self.tmp, CFG_DRIVE_FILES_V2) - myds = cfg_ds_from_dir(self.tmp) - network_config = openstack.convert_net_json(NETWORK_DATA, - known_macs=KNOWN_MACS) - self.assertEqual(myds.network_config, network_config) - - def test_network_config_conversions(self): - """Tests a bunch of input network json and checks the - expected conversions.""" - in_datas = [ - NETWORK_DATA, - { - 'services': [{'type': 'dns', 'address': '172.19.0.12'}], - 'networks': [{ - 'network_id': 'dacd568d-5be6-4786-91fe-750c374b78b4', - 'type': 'ipv4', - 'netmask': '255.255.252.0', - 'link': 'tap1a81968a-79', - 'routes': [{ - 'netmask': '0.0.0.0', - 'network': '0.0.0.0', - 'gateway': '172.19.3.254', - }], - 'ip_address': '172.19.1.34', - 'id': 'network0', - }], - 'links': [{ - 'type': 'bridge', - 'vif_id': '1a81968a-797a-400f-8a80-567f997eb93f', - 'ethernet_mac_address': 'fa:16:3e:ed:9a:59', - 'id': 'tap1a81968a-79', - 'mtu': None, - }], - }, - ] - out_datas = [ - { - 'version': 1, - 'config': [ - { - 'subnets': [{'type': 'dhcp4'}], - 'type': 'physical', - 'mac_address': 'fa:16:3e:69:b0:58', - 'name': 'enp0s1', - 'mtu': None, - }, - { - 'subnets': [{'type': 'dhcp4'}], - 'type': 'physical', - 'mac_address': 'fa:16:3e:d4:57:ad', - 'name': 'enp0s2', - 'mtu': None, - }, - { - 'subnets': [{'type': 'dhcp4'}], - 'type': 'physical', - 'mac_address': 'fa:16:3e:05:30:fe', - 'name': 'nic0', - 'mtu': None, - }, - { - 'type': 'nameserver', - 'address': '199.204.44.24', - }, - { - 'type': 'nameserver', - 'address': '199.204.47.54', - } - ], - - }, - { - 'version': 1, - 'config': [ - { - 'name': 'foo3', - 'mac_address': 'fa:16:3e:ed:9a:59', - 'mtu': None, - 'type': 'physical', - 'subnets': [ - { - 'address': '172.19.1.34', - 'netmask': '255.255.252.0', - 'type': 'static', - 'ipv4': True, - 'routes': [{ - 'gateway': '172.19.3.254', - 'netmask': '0.0.0.0', - 'network': '0.0.0.0', - }], - } - ] - }, - { - 'type': 'nameserver', - 'address': '172.19.0.12', - } - ], - }, - ] - for in_data, out_data in zip(in_datas, out_datas): - conv_data = openstack.convert_net_json(in_data, - known_macs=KNOWN_MACS) - self.assertEqual(out_data, conv_data) - - -class TestConvertNetworkData(TestCase): - def setUp(self): - super(TestConvertNetworkData, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - def _getnames_in_config(self, ncfg): - return set([n['name'] for n in ncfg['config'] - if n['type'] == 'physical']) - - def test_conversion_fills_names(self): - ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS) - expected = set(['nic0', 'enp0s1', 'enp0s2']) - found = self._getnames_in_config(ncfg) - self.assertEqual(found, expected) - - @mock.patch('cloudinit.net.get_interfaces_by_mac') - def test_convert_reads_system_prefers_name(self, get_interfaces_by_mac): - macs = KNOWN_MACS.copy() - macs.update({'fa:16:3e:05:30:fe': 'foonic1', - 'fa:16:3e:69:b0:58': 'ens1'}) - get_interfaces_by_mac.return_value = macs - - ncfg = openstack.convert_net_json(NETWORK_DATA) - expected = set(['nic0', 'ens1', 'enp0s2']) - found = self._getnames_in_config(ncfg) - self.assertEqual(found, expected) - - def test_convert_raises_value_error_on_missing_name(self): - macs = {'aa:aa:aa:aa:aa:00': 'ens1'} - self.assertRaises(ValueError, openstack.convert_net_json, - NETWORK_DATA, known_macs=macs) - - def test_conversion_with_route(self): - ncfg = openstack.convert_net_json(NETWORK_DATA_2, - known_macs=KNOWN_MACS) - # not the best test, but see that we get a route in the - # network config and that it gets rendered to an ENI file - routes = [] - for n in ncfg['config']: - for s in n.get('subnets', []): - routes.extend(s.get('routes', [])) - self.assertIn( - {'network': '0.0.0.0', 'netmask': '0.0.0.0', 'gateway': '2.2.2.9'}, - routes) - eni_renderer = eni.Renderer() - eni_renderer.render_network_state( - self.tmp, network_state.parse_net_config_data(ncfg)) - with open(os.path.join(self.tmp, "etc", - "network", "interfaces"), 'r') as f: - eni_rendering = f.read() - self.assertIn("route add default gw 2.2.2.9", eni_rendering) - - -def cfg_ds_from_dir(seed_d): - cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None, - helpers.Paths({})) - cfg_ds.seed_dir = seed_d - cfg_ds.known_macs = KNOWN_MACS.copy() - if not cfg_ds.get_data(): - raise RuntimeError("Data source did not extract itself from" - " seed directory %s" % seed_d) - return cfg_ds - - -def populate_ds_from_read_config(cfg_ds, source, results): - """Patch the DataSourceConfigDrive from the results of - read_config_drive_dir hopefully in line with what it would have - if cfg_ds.get_data had been successfully called""" - cfg_ds.source = source - cfg_ds.metadata = results.get('metadata') - cfg_ds.ec2_metadata = results.get('ec2-metadata') - cfg_ds.userdata_raw = results.get('userdata') - cfg_ds.version = results.get('version') - cfg_ds.network_json = results.get('networkdata') - cfg_ds._network_config = openstack.convert_net_json( - cfg_ds.network_json, known_macs=KNOWN_MACS) - - -def populate_dir(seed_dir, files): - for (name, content) in files.items(): - path = os.path.join(seed_dir, name) - dirname = os.path.dirname(path) - if not os.path.isdir(dirname): - os.makedirs(dirname) - if isinstance(content, six.text_type): - mode = "w" - else: - mode = "wb" - with open(path, mode) as fp: - fp.write(content) - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py deleted file mode 100644 index 8936a1e3..00000000 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ /dev/null @@ -1,127 +0,0 @@ -# -# Copyright (C) 2014 Neal Shrader -# -# Author: Neal Shrader <neal@digitalocean.com> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re - -from six.moves.urllib_parse import urlparse - -from cloudinit import helpers -from cloudinit import settings -from cloudinit.sources import DataSourceDigitalOcean - -from .. import helpers as test_helpers - -httpretty = test_helpers.import_httpretty() - -# Abbreviated for the test -DO_INDEX = """id - hostname - user-data - vendor-data - public-keys - region""" - -DO_MULTIPLE_KEYS = """ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com - ssh-rsa AAAAB3NzaC1yc2EAAAA... neal2@digitalocean.com""" -DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com" - -DO_META = { - '': DO_INDEX, - 'user-data': '#!/bin/bash\necho "user-data"', - 'vendor-data': '#!/bin/bash\necho "vendor-data"', - 'public-keys': DO_SINGLE_KEY, - 'region': 'nyc3', - 'id': '2000000', - 'hostname': 'cloudinit-test', -} - -MD_URL_RE = re.compile(r'http://169.254.169.254/metadata/v1/.*') - - -def _request_callback(method, uri, headers): - url_path = urlparse(uri).path - if url_path.startswith('/metadata/v1/'): - path = url_path.split('/metadata/v1/')[1:][0] - else: - path = None - if path in DO_META: - return (200, headers, DO_META.get(path)) - else: - return (404, headers, '') - - -class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase): - - def setUp(self): - self.ds = DataSourceDigitalOcean.DataSourceDigitalOcean( - settings.CFG_BUILTIN, None, - helpers.Paths({})) - super(TestDataSourceDigitalOcean, self).setUp() - - @httpretty.activate - def test_connection(self): - httpretty.register_uri( - httpretty.GET, MD_URL_RE, - body=_request_callback) - - success = self.ds.get_data() - self.assertTrue(success) - - @httpretty.activate - def test_metadata(self): - httpretty.register_uri( - httpretty.GET, MD_URL_RE, - body=_request_callback) - self.ds.get_data() - - self.assertEqual(DO_META.get('user-data'), - self.ds.get_userdata_raw()) - - self.assertEqual(DO_META.get('vendor-data'), - self.ds.get_vendordata_raw()) - - self.assertEqual(DO_META.get('region'), - self.ds.availability_zone) - - self.assertEqual(DO_META.get('id'), - self.ds.get_instance_id()) - - self.assertEqual(DO_META.get('hostname'), - self.ds.get_hostname()) - - self.assertEqual('http://mirrors.digitalocean.com/', - self.ds.get_package_mirror_info()) - - # Single key - self.assertEqual([DO_META.get('public-keys')], - self.ds.get_public_ssh_keys()) - - self.assertIsInstance(self.ds.get_public_ssh_keys(), list) - - @httpretty.activate - def test_multiple_ssh_keys(self): - DO_META['public_keys'] = DO_MULTIPLE_KEYS - httpretty.register_uri( - httpretty.GET, MD_URL_RE, - body=_request_callback) - self.ds.get_data() - - # Multiple keys - self.assertEqual(DO_META.get('public-keys').splitlines(), - self.ds.get_public_ssh_keys()) - - self.assertIsInstance(self.ds.get_public_ssh_keys(), list) diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py deleted file mode 100644 index 6e62a4d2..00000000 --- a/tests/unittests/test_datasource/test_gce.py +++ /dev/null @@ -1,166 +0,0 @@ -# -# Copyright (C) 2014 Vaidas Jablonskis -# -# Author: Vaidas Jablonskis <jablonskis@gmail.com> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import re - -from base64 import b64encode, b64decode -from six.moves.urllib_parse import urlparse - -from cloudinit import helpers -from cloudinit import settings -from cloudinit.sources import DataSourceGCE - -from .. import helpers as test_helpers - -httpretty = test_helpers.import_httpretty() - -GCE_META = { - 'instance/id': '123', - 'instance/zone': 'foo/bar', - 'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server', - 'instance/hostname': 'server.project-foo.local', - 'instance/attributes/user-data': b'/bin/echo foo\n', -} - -GCE_META_PARTIAL = { - 'instance/id': '1234', - 'instance/hostname': 'server.project-bar.local', - 'instance/zone': 'bar/baz', -} - -GCE_META_ENCODING = { - 'instance/id': '12345', - 'instance/hostname': 'server.project-baz.local', - 'instance/zone': 'baz/bang', - 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'), - 'instance/attributes/user-data-encoding': 'base64', -} - -HEADERS = {'X-Google-Metadata-Request': 'True'} -MD_URL_RE = re.compile( - r'http://metadata.google.internal/computeMetadata/v1/.*') - - -def _set_mock_metadata(gce_meta=None): - if gce_meta is None: - gce_meta = GCE_META - - def _request_callback(method, uri, headers): - url_path = urlparse(uri).path - if url_path.startswith('/computeMetadata/v1/'): - path = url_path.split('/computeMetadata/v1/')[1:][0] - else: - path = None - if path in gce_meta: - return (200, headers, gce_meta.get(path)) - else: - return (404, headers, '') - - httpretty.register_uri(httpretty.GET, MD_URL_RE, body=_request_callback) - - -@httpretty.activate -class TestDataSourceGCE(test_helpers.HttprettyTestCase): - - def setUp(self): - self.ds = DataSourceGCE.DataSourceGCE( - settings.CFG_BUILTIN, None, - helpers.Paths({})) - super(TestDataSourceGCE, self).setUp() - - def test_connection(self): - _set_mock_metadata() - success = self.ds.get_data() - self.assertTrue(success) - - req_header = httpretty.last_request().headers - self.assertDictContainsSubset(HEADERS, req_header) - - def test_metadata(self): - _set_mock_metadata() - self.ds.get_data() - - shostname = GCE_META.get('instance/hostname').split('.')[0] - self.assertEqual(shostname, - self.ds.get_hostname()) - - self.assertEqual(GCE_META.get('instance/id'), - self.ds.get_instance_id()) - - self.assertEqual(GCE_META.get('instance/attributes/user-data'), - self.ds.get_userdata_raw()) - - # test partial metadata (missing user-data in particular) - def test_metadata_partial(self): - _set_mock_metadata(GCE_META_PARTIAL) - self.ds.get_data() - - self.assertEqual(GCE_META_PARTIAL.get('instance/id'), - self.ds.get_instance_id()) - - shostname = GCE_META_PARTIAL.get('instance/hostname').split('.')[0] - self.assertEqual(shostname, self.ds.get_hostname()) - - def test_metadata_encoding(self): - _set_mock_metadata(GCE_META_ENCODING) - self.ds.get_data() - - decoded = b64decode( - GCE_META_ENCODING.get('instance/attributes/user-data')) - self.assertEqual(decoded, self.ds.get_userdata_raw()) - - def test_missing_required_keys_return_false(self): - for required_key in ['instance/id', 'instance/zone', - 'instance/hostname']: - meta = GCE_META_PARTIAL.copy() - del meta[required_key] - _set_mock_metadata(meta) - self.assertEqual(False, self.ds.get_data()) - httpretty.reset() - - def test_project_level_ssh_keys_are_used(self): - _set_mock_metadata() - self.ds.get_data() - - # we expect a list of public ssh keys with user names stripped - self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'], - self.ds.get_public_ssh_keys()) - - def test_instance_level_ssh_keys_are_used(self): - key_content = 'ssh-rsa JustAUser root@server' - meta = GCE_META.copy() - meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content) - - _set_mock_metadata(meta) - self.ds.get_data() - - self.assertIn(key_content, self.ds.get_public_ssh_keys()) - - def test_instance_level_keys_replace_project_level_keys(self): - key_content = 'ssh-rsa JustAUser root@server' - meta = GCE_META.copy() - meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content) - - _set_mock_metadata(meta) - self.ds.get_data() - - self.assertEqual([key_content], self.ds.get_public_ssh_keys()) - - def test_only_last_part_of_zone_used_for_availability_zone(self): - _set_mock_metadata() - self.ds.get_data() - self.assertEqual('bar', self.ds.availability_zone) diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py deleted file mode 100644 index f66f1c6d..00000000 --- a/tests/unittests/test_datasource/test_maas.py +++ /dev/null @@ -1,163 +0,0 @@ -from copy import copy -import os -import shutil -import tempfile - -from cloudinit.sources import DataSourceMAAS -from cloudinit import url_helper -from ..helpers import TestCase, populate_dir - -try: - from unittest import mock -except ImportError: - import mock - - -class TestMAASDataSource(TestCase): - - def setUp(self): - super(TestMAASDataSource, self).setUp() - # Make a temp directoy for tests to use. - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - def test_seed_dir_valid(self): - """Verify a valid seeddir is read as such.""" - - data = {'instance-id': 'i-valid01', - 'local-hostname': 'valid01-hostname', - 'user-data': b'valid01-userdata', - 'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'} - - my_d = os.path.join(self.tmp, "valid") - populate_dir(my_d, data) - - (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d) - - self.assertEqual(userdata, data['user-data']) - for key in ('instance-id', 'local-hostname'): - self.assertEqual(data[key], metadata[key]) - - # verify that 'userdata' is not returned as part of the metadata - self.assertFalse(('user-data' in metadata)) - - def test_seed_dir_valid_extra(self): - """Verify extra files do not affect seed_dir validity.""" - - data = {'instance-id': 'i-valid-extra', - 'local-hostname': 'valid-extra-hostname', - 'user-data': b'valid-extra-userdata', 'foo': 'bar'} - - my_d = os.path.join(self.tmp, "valid_extra") - populate_dir(my_d, data) - - (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d) - - self.assertEqual(userdata, data['user-data']) - for key in ('instance-id', 'local-hostname'): - self.assertEqual(data[key], metadata[key]) - - # additional files should not just appear as keys in metadata atm - self.assertFalse(('foo' in metadata)) - - def test_seed_dir_invalid(self): - """Verify that invalid seed_dir raises MAASSeedDirMalformed.""" - - valid = {'instance-id': 'i-instanceid', - 'local-hostname': 'test-hostname', 'user-data': ''} - - my_based = os.path.join(self.tmp, "valid_extra") - - # missing 'userdata' file - my_d = "%s-01" % my_based - invalid_data = copy(valid) - del invalid_data['local-hostname'] - populate_dir(my_d, invalid_data) - self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed, - DataSourceMAAS.read_maas_seed_dir, my_d) - - # missing 'instance-id' - my_d = "%s-02" % my_based - invalid_data = copy(valid) - del invalid_data['instance-id'] - populate_dir(my_d, invalid_data) - self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed, - DataSourceMAAS.read_maas_seed_dir, my_d) - - def test_seed_dir_none(self): - """Verify that empty seed_dir raises MAASSeedDirNone.""" - - my_d = os.path.join(self.tmp, "valid_empty") - self.assertRaises(DataSourceMAAS.MAASSeedDirNone, - DataSourceMAAS.read_maas_seed_dir, my_d) - - def test_seed_dir_missing(self): - """Verify that missing seed_dir raises MAASSeedDirNone.""" - self.assertRaises(DataSourceMAAS.MAASSeedDirNone, - DataSourceMAAS.read_maas_seed_dir, - os.path.join(self.tmp, "nonexistantdirectory")) - - def test_seed_url_valid(self): - """Verify that valid seed_url is read as such.""" - valid = { - 'meta-data/instance-id': 'i-instanceid', - 'meta-data/local-hostname': 'test-hostname', - 'meta-data/public-keys': 'test-hostname', - 'user-data': b'foodata', - } - valid_order = [ - 'meta-data/local-hostname', - 'meta-data/instance-id', - 'meta-data/public-keys', - 'user-data', - ] - my_seed = "http://example.com/xmeta" - my_ver = "1999-99-99" - my_headers = {'header1': 'value1', 'header2': 'value2'} - - def my_headers_cb(url): - return my_headers - - # Each time url_helper.readurl() is called, something different is - # returned based on the canned data above. We need to build up a list - # of side effect return values, which the mock will return. At the - # same time, we'll build up a list of expected call arguments for - # asserting after the code under test is run. - calls = [] - - def side_effect(): - for key in valid_order: - resp = valid.get(key) - url = "%s/%s/%s" % (my_seed, my_ver, key) - calls.append( - mock.call(url, headers=None, timeout=mock.ANY, - data=mock.ANY, sec_between=mock.ANY, - ssl_details=mock.ANY, retries=mock.ANY, - headers_cb=my_headers_cb, - exception_cb=mock.ANY)) - yield url_helper.StringResponse(resp) - - # Now do the actual call of the code under test. - with mock.patch.object(url_helper, 'readurl', - side_effect=side_effect()) as mockobj: - userdata, metadata = DataSourceMAAS.read_maas_seed_url( - my_seed, version=my_ver) - - self.assertEqual(b"foodata", userdata) - self.assertEqual(metadata['instance-id'], - valid['meta-data/instance-id']) - self.assertEqual(metadata['local-hostname'], - valid['meta-data/local-hostname']) - - mockobj.has_calls(calls) - - def test_seed_url_invalid(self): - """Verify that invalid seed_url raises MAASSeedDirMalformed.""" - pass - - def test_seed_url_missing(self): - """Verify seed_url with no found entries raises MAASSeedDirNone.""" - pass - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py deleted file mode 100644 index b0fa1130..00000000 --- a/tests/unittests/test_datasource/test_nocloud.py +++ /dev/null @@ -1,178 +0,0 @@ -from cloudinit import helpers -from cloudinit.sources import DataSourceNoCloud -from cloudinit import util -from ..helpers import TestCase, populate_dir, mock, ExitStack - -import os -import shutil -import tempfile - -import yaml - - -class TestNoCloudDataSource(TestCase): - - def setUp(self): - super(TestNoCloudDataSource, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.paths = helpers.Paths({'cloud_dir': self.tmp}) - - self.cmdline = "root=TESTCMDLINE" - - self.mocks = ExitStack() - self.addCleanup(self.mocks.close) - - self.mocks.enter_context( - mock.patch.object(util, 'get_cmdline', return_value=self.cmdline)) - - def test_nocloud_seed_dir(self): - md = {'instance-id': 'IID', 'dsmode': 'local'} - ud = b"USER_DATA_HERE" - populate_dir(os.path.join(self.paths.seed_dir, "nocloud"), - {'user-data': ud, 'meta-data': yaml.safe_dump(md)}) - - sys_cfg = { - 'datasource': {'NoCloud': {'fs_label': None}} - } - - ds = DataSourceNoCloud.DataSourceNoCloud - - dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertEqual(dsrc.userdata_raw, ud) - self.assertEqual(dsrc.metadata, md) - self.assertTrue(ret) - - def test_fs_label(self): - # find_devs_with should not be called ff fs_label is None - ds = DataSourceNoCloud.DataSourceNoCloud - - class PsuedoException(Exception): - pass - - def my_find_devs_with(*args, **kwargs): - raise PsuedoException - - self.mocks.enter_context( - mock.patch.object(util, 'find_devs_with', - side_effect=PsuedoException)) - - # by default, NoCloud should search for filesystems by label - sys_cfg = {'datasource': {'NoCloud': {}}} - dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - self.assertRaises(PsuedoException, dsrc.get_data) - - # but disabling searching should just end up with None found - sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}} - dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertFalse(ret) - - def test_no_datasource_expected(self): - # no source should be found if no cmdline, config, and fs_label=None - sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}} - - ds = DataSourceNoCloud.DataSourceNoCloud - dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - self.assertFalse(dsrc.get_data()) - - def test_seed_in_config(self): - ds = DataSourceNoCloud.DataSourceNoCloud - - data = { - 'fs_label': None, - 'meta-data': yaml.safe_dump({'instance-id': 'IID'}), - 'user-data': b"USER_DATA_RAW", - } - - sys_cfg = {'datasource': {'NoCloud': data}} - dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW") - self.assertEqual(dsrc.metadata.get('instance-id'), 'IID') - self.assertTrue(ret) - - def test_nocloud_seed_with_vendordata(self): - md = {'instance-id': 'IID', 'dsmode': 'local'} - ud = b"USER_DATA_HERE" - vd = b"THIS IS MY VENDOR_DATA" - - populate_dir(os.path.join(self.paths.seed_dir, "nocloud"), - {'user-data': ud, 'meta-data': yaml.safe_dump(md), - 'vendor-data': vd}) - - sys_cfg = { - 'datasource': {'NoCloud': {'fs_label': None}} - } - - ds = DataSourceNoCloud.DataSourceNoCloud - - dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertEqual(dsrc.userdata_raw, ud) - self.assertEqual(dsrc.metadata, md) - self.assertEqual(dsrc.vendordata_raw, vd) - self.assertTrue(ret) - - def test_nocloud_no_vendordata(self): - populate_dir(os.path.join(self.paths.seed_dir, "nocloud"), - {'user-data': b"ud", 'meta-data': "instance-id: IID\n"}) - - sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}} - - ds = DataSourceNoCloud.DataSourceNoCloud - - dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertEqual(dsrc.userdata_raw, b"ud") - self.assertFalse(dsrc.vendordata) - self.assertTrue(ret) - - -class TestParseCommandLineData(TestCase): - - def test_parse_cmdline_data_valid(self): - ds_id = "ds=nocloud" - pairs = ( - ("root=/dev/sda1 %(ds_id)s", {}), - ("%(ds_id)s; root=/dev/foo", {}), - ("%(ds_id)s", {}), - ("%(ds_id)s;", {}), - ("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}), - ("%(ds_id)s;seedfrom=SEED;local-hostname=xhost", - {'seedfrom': 'SEED', 'local-hostname': 'xhost'}), - ("%(ds_id)s;h=xhost", - {'local-hostname': 'xhost'}), - ("%(ds_id)s;h=xhost;i=IID", - {'local-hostname': 'xhost', 'instance-id': 'IID'}), - ) - - for (fmt, expected) in pairs: - fill = {} - cmdline = fmt % {'ds_id': ds_id} - ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill, - cmdline=cmdline) - self.assertEqual(expected, fill) - self.assertTrue(ret) - - def test_parse_cmdline_data_none(self): - ds_id = "ds=foo" - cmdlines = ( - "root=/dev/sda1 ro", - "console=/dev/ttyS0 root=/dev/foo", - "", - "ds=foocloud", - "ds=foo-net", - "ds=nocloud;s=SEED", - ) - - for cmdline in cmdlines: - fill = {} - ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill, - cmdline=cmdline) - self.assertEqual(fill, {}) - self.assertFalse(ret) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py deleted file mode 100644 index d796f030..00000000 --- a/tests/unittests/test_datasource/test_opennebula.py +++ /dev/null @@ -1,300 +0,0 @@ -from cloudinit import helpers -from cloudinit.sources import DataSourceOpenNebula as ds -from cloudinit import util -from ..helpers import TestCase, populate_dir - -import os -import pwd -import shutil -import tempfile -import unittest - - -TEST_VARS = { - 'VAR1': 'single', - 'VAR2': 'double word', - 'VAR3': 'multi\nline\n', - 'VAR4': "'single'", - 'VAR5': "'double word'", - 'VAR6': "'multi\nline\n'", - 'VAR7': 'single\\t', - 'VAR8': 'double\\tword', - 'VAR9': 'multi\\t\nline\n', - 'VAR10': '\\', # expect '\' - 'VAR11': '\'', # expect ' - 'VAR12': '$', # expect $ -} - -INVALID_CONTEXT = ';' -USER_DATA = '#cloud-config\napt_upgrade: true' -SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i' -HOSTNAME = 'foo.example.com' -PUBLIC_IP = '10.0.0.3' - -CMD_IP_OUT = '''\ -1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN - link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 -2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000 - link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff -''' - - -class TestOpenNebulaDataSource(TestCase): - parsed_user = None - - def setUp(self): - super(TestOpenNebulaDataSource, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.paths = helpers.Paths({'cloud_dir': self.tmp}) - - # defaults for few tests - self.ds = ds.DataSourceOpenNebula - self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula") - self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}} - - # we don't want 'sudo' called in tests. so we patch switch_user_cmd - def my_switch_user_cmd(user): - self.parsed_user = user - return [] - - self.switch_user_cmd_real = ds.switch_user_cmd - ds.switch_user_cmd = my_switch_user_cmd - - def tearDown(self): - ds.switch_user_cmd = self.switch_user_cmd_real - super(TestOpenNebulaDataSource, self).tearDown() - - def test_get_data_non_contextdisk(self): - orig_find_devs_with = util.find_devs_with - try: - # dont' try to lookup for CDs - util.find_devs_with = lambda n: [] - dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertFalse(ret) - finally: - util.find_devs_with = orig_find_devs_with - - def test_get_data_broken_contextdisk(self): - orig_find_devs_with = util.find_devs_with - try: - # dont' try to lookup for CDs - util.find_devs_with = lambda n: [] - populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT}) - dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) - self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data) - finally: - util.find_devs_with = orig_find_devs_with - - def test_get_data_invalid_identity(self): - orig_find_devs_with = util.find_devs_with - try: - # generate non-existing system user name - sys_cfg = self.sys_cfg - invalid_user = 'invalid' - while not sys_cfg['datasource']['OpenNebula'].get('parseuser'): - try: - pwd.getpwnam(invalid_user) - invalid_user += 'X' - except KeyError: - sys_cfg['datasource']['OpenNebula']['parseuser'] = \ - invalid_user - - # dont' try to lookup for CDs - util.find_devs_with = lambda n: [] - populate_context_dir(self.seed_dir, {'KEY1': 'val1'}) - dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data) - finally: - util.find_devs_with = orig_find_devs_with - - def test_get_data(self): - orig_find_devs_with = util.find_devs_with - try: - # dont' try to lookup for CDs - util.find_devs_with = lambda n: [] - populate_context_dir(self.seed_dir, {'KEY1': 'val1'}) - dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertTrue(ret) - finally: - util.find_devs_with = orig_find_devs_with - - def test_seed_dir_non_contextdisk(self): - self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, - self.seed_dir) - - def test_seed_dir_empty1_context(self): - populate_dir(self.seed_dir, {'context.sh': ''}) - results = ds.read_context_disk_dir(self.seed_dir) - - self.assertEqual(results['userdata'], None) - self.assertEqual(results['metadata'], {}) - - def test_seed_dir_empty2_context(self): - populate_context_dir(self.seed_dir, {}) - results = ds.read_context_disk_dir(self.seed_dir) - - self.assertEqual(results['userdata'], None) - self.assertEqual(results['metadata'], {}) - - def test_seed_dir_broken_context(self): - populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT}) - - self.assertRaises(ds.BrokenContextDiskDir, - ds.read_context_disk_dir, - self.seed_dir) - - def test_context_parser(self): - populate_context_dir(self.seed_dir, TEST_VARS) - results = ds.read_context_disk_dir(self.seed_dir) - - self.assertTrue('metadata' in results) - self.assertEqual(TEST_VARS, results['metadata']) - - def test_ssh_key(self): - public_keys = ['first key', 'second key'] - for c in range(4): - for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'): - my_d = os.path.join(self.tmp, "%s-%i" % (k, c)) - populate_context_dir(my_d, {k: '\n'.join(public_keys)}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('metadata' in results) - self.assertTrue('public-keys' in results['metadata']) - self.assertEqual(public_keys, - results['metadata']['public-keys']) - - public_keys.append(SSH_KEY % (c + 1,)) - - def test_user_data_plain(self): - for k in ('USER_DATA', 'USERDATA'): - my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: USER_DATA, - 'USERDATA_ENCODING': ''}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('userdata' in results) - self.assertEqual(USER_DATA, results['userdata']) - - def test_user_data_encoding_required_for_decode(self): - b64userdata = util.b64e(USER_DATA) - for k in ('USER_DATA', 'USERDATA'): - my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: b64userdata}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('userdata' in results) - self.assertEqual(b64userdata, results['userdata']) - - def test_user_data_base64_encoding(self): - for k in ('USER_DATA', 'USERDATA'): - my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: util.b64e(USER_DATA), - 'USERDATA_ENCODING': 'base64'}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('userdata' in results) - self.assertEqual(USER_DATA, results['userdata']) - - def test_hostname(self): - for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'): - my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: PUBLIC_IP}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('metadata' in results) - self.assertTrue('local-hostname' in results['metadata']) - self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname']) - - def test_network_interfaces(self): - populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'}) - results = ds.read_context_disk_dir(self.seed_dir) - - self.assertTrue('network-interfaces' in results) - - def test_find_candidates(self): - def my_devs_with(criteria): - return { - "LABEL=CONTEXT": ["/dev/sdb"], - "LABEL=CDROM": ["/dev/sr0"], - "TYPE=iso9660": ["/dev/vdb"], - }.get(criteria, []) - - orig_find_devs_with = util.find_devs_with - try: - util.find_devs_with = my_devs_with - self.assertEqual(["/dev/sdb", "/dev/sr0", "/dev/vdb"], - ds.find_candidate_devs()) - finally: - util.find_devs_with = orig_find_devs_with - - -class TestOpenNebulaNetwork(unittest.TestCase): - - def setUp(self): - super(TestOpenNebulaNetwork, self).setUp() - - def test_lo(self): - net = ds.OpenNebulaNetwork('', {}) - self.assertEqual(net.gen_conf(), u'''\ -auto lo -iface lo inet loopback -''') - - def test_eth0(self): - net = ds.OpenNebulaNetwork(CMD_IP_OUT, {}) - self.assertEqual(net.gen_conf(), u'''\ -auto lo -iface lo inet loopback - -auto eth0 -iface eth0 inet static - address 10.18.1.1 - network 10.18.1.0 - netmask 255.255.255.0 -''') - - def test_eth0_override(self): - context = { - 'DNS': '1.2.3.8', - 'ETH0_IP': '1.2.3.4', - 'ETH0_NETWORK': '1.2.3.0', - 'ETH0_MASK': '255.255.0.0', - 'ETH0_GATEWAY': '1.2.3.5', - 'ETH0_DOMAIN': 'example.com', - 'ETH0_DNS': '1.2.3.6 1.2.3.7' - } - - net = ds.OpenNebulaNetwork(CMD_IP_OUT, context) - self.assertEqual(net.gen_conf(), u'''\ -auto lo -iface lo inet loopback - -auto eth0 -iface eth0 inet static - address 1.2.3.4 - network 1.2.3.0 - netmask 255.255.0.0 - gateway 1.2.3.5 - dns-search example.com - dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7 -''') - - -class TestParseShellConfig(unittest.TestCase): - def test_no_seconds(self): - cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"]) - # we could test 'sleep 2', but that would make the test run slower. - ret = ds.parse_shell_config(cfg) - self.assertEqual(ret, {"foo": "bar", "xx": "foo"}) - - -def populate_context_dir(path, variables): - data = "# Context variables generated by OpenNebula\n" - for k, v in variables.items(): - data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''"))) - populate_dir(path, {'context.sh': data}) - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py deleted file mode 100644 index 5c8592c5..00000000 --- a/tests/unittests/test_datasource/test_openstack.py +++ /dev/null @@ -1,347 +0,0 @@ -# vi: ts=4 expandtab -# -# Copyright (C) 2014 Yahoo! Inc. -# -# Author: Joshua Harlow <harlowja@yahoo-inc.com> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import copy -import json -import re - -from .. import helpers as test_helpers - -from six.moves.urllib.parse import urlparse -from six import StringIO - -from cloudinit import helpers -from cloudinit import settings -from cloudinit.sources import DataSourceOpenStack as ds -from cloudinit.sources.helpers import openstack -from cloudinit import util - -hp = test_helpers.import_httpretty() - -BASE_URL = "http://169.254.169.254" -PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' -EC2_META = { - 'ami-id': 'ami-00000001', - 'ami-launch-index': '0', - 'ami-manifest-path': 'FIXME', - 'hostname': 'sm-foo-test.novalocal', - 'instance-action': 'none', - 'instance-id': 'i-00000001', - 'instance-type': 'm1.tiny', - 'local-hostname': 'sm-foo-test.novalocal', - 'local-ipv4': '0.0.0.0', - 'public-hostname': 'sm-foo-test.novalocal', - 'public-ipv4': '0.0.0.1', - 'reservation-id': 'r-iru5qm4m', -} -USER_DATA = b'#!/bin/sh\necho This is user data\n' -VENDOR_DATA = { - 'magic': '', -} -OSTACK_META = { - 'availability_zone': 'nova', - 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'}, - {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}], - 'hostname': 'sm-foo-test.novalocal', - 'meta': {'dsmode': 'local', 'my-meta': 'my-value'}, - 'name': 'sm-foo-test', - 'public_keys': {'mykey': PUBKEY}, - 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c', -} -CONTENT_0 = b'This is contents of /etc/foo.cfg\n' -CONTENT_1 = b'# this is /etc/bar/bar.cfg\n' -OS_FILES = { - 'openstack/latest/meta_data.json': json.dumps(OSTACK_META), - 'openstack/latest/user_data': USER_DATA, - 'openstack/content/0000': CONTENT_0, - 'openstack/content/0001': CONTENT_1, - 'openstack/latest/meta_data.json': json.dumps(OSTACK_META), - 'openstack/latest/user_data': USER_DATA, - 'openstack/latest/vendor_data.json': json.dumps(VENDOR_DATA), -} -EC2_FILES = { - 'latest/user-data': USER_DATA, -} -EC2_VERSIONS = [ - 'latest', -] - - -def _register_uris(version, ec2_files, ec2_meta, os_files): - """Registers a set of url patterns into httpretty that will mimic the - same data returned by the openstack metadata service (and ec2 service).""" - - def match_ec2_url(uri, headers): - path = uri.path.strip("/") - if len(path) == 0: - return (200, headers, "\n".join(EC2_VERSIONS)) - path = uri.path.lstrip("/") - if path in ec2_files: - return (200, headers, ec2_files.get(path)) - if path == 'latest/meta-data/': - buf = StringIO() - for (k, v) in ec2_meta.items(): - if isinstance(v, (list, tuple)): - buf.write("%s/" % (k)) - else: - buf.write("%s" % (k)) - buf.write("\n") - return (200, headers, buf.getvalue()) - if path.startswith('latest/meta-data/'): - value = None - pieces = path.split("/") - if path.endswith("/"): - pieces = pieces[2:-1] - value = util.get_cfg_by_path(ec2_meta, pieces) - else: - pieces = pieces[2:] - value = util.get_cfg_by_path(ec2_meta, pieces) - if value is not None: - return (200, headers, str(value)) - return (404, headers, '') - - def match_os_uri(uri, headers): - path = uri.path.strip("/") - if path == 'openstack': - return (200, headers, "\n".join([openstack.OS_LATEST])) - path = uri.path.lstrip("/") - if path in os_files: - return (200, headers, os_files.get(path)) - return (404, headers, '') - - def get_request_callback(method, uri, headers): - uri = urlparse(uri) - path = uri.path.lstrip("/").split("/") - if path[0] == 'openstack': - return match_os_uri(uri, headers) - return match_ec2_url(uri, headers) - - hp.register_uri(hp.GET, re.compile(r'http://169.254.169.254/.*'), - body=get_request_callback) - - -def _read_metadata_service(): - return ds.read_metadata_service(BASE_URL, retries=0, timeout=0.1) - - -class TestOpenStackDataSource(test_helpers.HttprettyTestCase): - VERSION = 'latest' - - @hp.activate - def test_successful(self): - _register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES) - f = _read_metadata_service() - self.assertEqual(VENDOR_DATA, f.get('vendordata')) - self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) - self.assertEqual(2, len(f['files'])) - self.assertEqual(USER_DATA, f.get('userdata')) - self.assertEqual(EC2_META, f.get('ec2-metadata')) - self.assertEqual(2, f.get('version')) - metadata = f['metadata'] - self.assertEqual('nova', metadata.get('availability_zone')) - self.assertEqual('sm-foo-test.novalocal', metadata.get('hostname')) - self.assertEqual('sm-foo-test.novalocal', - metadata.get('local-hostname')) - self.assertEqual('sm-foo-test', metadata.get('name')) - self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c', - metadata.get('uuid')) - self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c', - metadata.get('instance-id')) - - @hp.activate - def test_no_ec2(self): - _register_uris(self.VERSION, {}, {}, OS_FILES) - f = _read_metadata_service() - self.assertEqual(VENDOR_DATA, f.get('vendordata')) - self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) - self.assertEqual(USER_DATA, f.get('userdata')) - self.assertEqual({}, f.get('ec2-metadata')) - self.assertEqual(2, f.get('version')) - - @hp.activate - def test_bad_metadata(self): - os_files = copy.deepcopy(OS_FILES) - for k in list(os_files.keys()): - if k.endswith('meta_data.json'): - os_files.pop(k, None) - _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.NonReadable, _read_metadata_service) - - @hp.activate - def test_bad_uuid(self): - os_files = copy.deepcopy(OS_FILES) - os_meta = copy.deepcopy(OSTACK_META) - os_meta.pop('uuid') - for k in list(os_files.keys()): - if k.endswith('meta_data.json'): - os_files[k] = json.dumps(os_meta) - _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) - - @hp.activate - def test_userdata_empty(self): - os_files = copy.deepcopy(OS_FILES) - for k in list(os_files.keys()): - if k.endswith('user_data'): - os_files.pop(k, None) - _register_uris(self.VERSION, {}, {}, os_files) - f = _read_metadata_service() - self.assertEqual(VENDOR_DATA, f.get('vendordata')) - self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) - self.assertFalse(f.get('userdata')) - - @hp.activate - def test_vendordata_empty(self): - os_files = copy.deepcopy(OS_FILES) - for k in list(os_files.keys()): - if k.endswith('vendor_data.json'): - os_files.pop(k, None) - _register_uris(self.VERSION, {}, {}, os_files) - f = _read_metadata_service() - self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) - self.assertFalse(f.get('vendordata')) - - @hp.activate - def test_vendordata_invalid(self): - os_files = copy.deepcopy(OS_FILES) - for k in list(os_files.keys()): - if k.endswith('vendor_data.json'): - os_files[k] = '{' # some invalid json - _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) - - @hp.activate - def test_metadata_invalid(self): - os_files = copy.deepcopy(OS_FILES) - for k in list(os_files.keys()): - if k.endswith('meta_data.json'): - os_files[k] = '{' # some invalid json - _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) - - @hp.activate - def test_datasource(self): - _register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES) - ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN, - None, - helpers.Paths({})) - self.assertIsNone(ds_os.version) - found = ds_os.get_data(timeout=0.1, retries=0) - self.assertTrue(found) - self.assertEqual(2, ds_os.version) - md = dict(ds_os.metadata) - md.pop('instance-id', None) - md.pop('local-hostname', None) - self.assertEqual(OSTACK_META, md) - self.assertEqual(EC2_META, ds_os.ec2_metadata) - self.assertEqual(USER_DATA, ds_os.userdata_raw) - self.assertEqual(2, len(ds_os.files)) - self.assertEqual(VENDOR_DATA, ds_os.vendordata_pure) - self.assertEqual(ds_os.vendordata_raw, None) - - @hp.activate - def test_bad_datasource_meta(self): - os_files = copy.deepcopy(OS_FILES) - for k in list(os_files.keys()): - if k.endswith('meta_data.json'): - os_files[k] = '{' # some invalid json - _register_uris(self.VERSION, {}, {}, os_files) - ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN, - None, - helpers.Paths({})) - self.assertIsNone(ds_os.version) - found = ds_os.get_data(timeout=0.1, retries=0) - self.assertFalse(found) - self.assertIsNone(ds_os.version) - - @hp.activate - def test_no_datasource(self): - os_files = copy.deepcopy(OS_FILES) - for k in list(os_files.keys()): - if k.endswith('meta_data.json'): - os_files.pop(k) - _register_uris(self.VERSION, {}, {}, os_files) - ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN, - None, - helpers.Paths({})) - ds_os.ds_cfg = { - 'max_wait': 0, - 'timeout': 0, - } - self.assertIsNone(ds_os.version) - found = ds_os.get_data(timeout=0.1, retries=0) - self.assertFalse(found) - self.assertIsNone(ds_os.version) - - @hp.activate - def test_disabled_datasource(self): - os_files = copy.deepcopy(OS_FILES) - os_meta = copy.deepcopy(OSTACK_META) - os_meta['meta'] = { - 'dsmode': 'disabled', - } - for k in list(os_files.keys()): - if k.endswith('meta_data.json'): - os_files[k] = json.dumps(os_meta) - _register_uris(self.VERSION, {}, {}, os_files) - ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN, - None, - helpers.Paths({})) - ds_os.ds_cfg = { - 'max_wait': 0, - 'timeout': 0, - } - self.assertIsNone(ds_os.version) - found = ds_os.get_data(timeout=0.1, retries=0) - self.assertFalse(found) - self.assertIsNone(ds_os.version) - - -class TestVendorDataLoading(test_helpers.TestCase): - def cvj(self, data): - return openstack.convert_vendordata_json(data) - - def test_vd_load_none(self): - # non-existant vendor-data should return none - self.assertIsNone(self.cvj(None)) - - def test_vd_load_string(self): - self.assertEqual(self.cvj("foobar"), "foobar") - - def test_vd_load_list(self): - data = [{'foo': 'bar'}, 'mystring', list(['another', 'list'])] - self.assertEqual(self.cvj(data), data) - - def test_vd_load_dict_no_ci(self): - self.assertEqual(self.cvj({'foo': 'bar'}), None) - - def test_vd_load_dict_ci_dict(self): - self.assertRaises(ValueError, self.cvj, - {'foo': 'bar', 'cloud-init': {'x': 1}}) - - def test_vd_load_dict_ci_string(self): - data = {'foo': 'bar', 'cloud-init': 'VENDOR_DATA'} - self.assertEqual(self.cvj(data), data['cloud-init']) - - def test_vd_load_dict_ci_list(self): - data = {'foo': 'bar', 'cloud-init': ['VD_1', 'VD_2']} - self.assertEqual(self.cvj(data), data['cloud-init']) diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py deleted file mode 100644 index 9c6c8768..00000000 --- a/tests/unittests/test_datasource/test_smartos.py +++ /dev/null @@ -1,543 +0,0 @@ -# vi: ts=4 expandtab -# -# Copyright (C) 2013 Canonical Ltd. -# -# Author: Ben Howard <ben.howard@canonical.com> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# -# This is a testcase for the SmartOS datasource. It replicates a serial -# console and acts like the SmartOS console does in order to validate -# return responses. -# - -from __future__ import print_function - -from binascii import crc32 -import json -import os -import os.path -import re -import shutil -import stat -import tempfile -import uuid - -from cloudinit import serial -from cloudinit.sources import DataSourceSmartOS - -import six - -from cloudinit import helpers as c_helpers -from cloudinit.util import b64e - -from ..helpers import mock, FilesystemMockingTestCase, TestCase - -SDC_NICS = json.loads(""" -[ - { - "nic_tag": "external", - "primary": true, - "mtu": 1500, - "model": "virtio", - "gateway": "8.12.42.1", - "netmask": "255.255.255.0", - "ip": "8.12.42.102", - "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe", - "gateways": [ - "8.12.42.1" - ], - "vlan_id": 324, - "mac": "90:b8:d0:f5:e4:f5", - "interface": "net0", - "ips": [ - "8.12.42.102/24" - ] - }, - { - "nic_tag": "sdc_overlay/16187209", - "gateway": "192.168.128.1", - "model": "virtio", - "mac": "90:b8:d0:a5:ff:cd", - "netmask": "255.255.252.0", - "ip": "192.168.128.93", - "network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9", - "gateways": [ - "192.168.128.1" - ], - "vlan_id": 2, - "mtu": 8500, - "interface": "net1", - "ips": [ - "192.168.128.93/22" - ] - } -] -""") - -MOCK_RETURNS = { - 'hostname': 'test-host', - 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname', - 'disable_iptables_flag': None, - 'enable_motd_sys_info': None, - 'test-var1': 'some data', - 'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']), - 'sdc:datacenter_name': 'somewhere2', - 'sdc:operator-script': '\n'.join(['bin/true', '']), - 'sdc:uuid': str(uuid.uuid4()), - 'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']), - 'user-data': '\n'.join(['something', '']), - 'user-script': '\n'.join(['/bin/true', '']), - 'sdc:nics': json.dumps(SDC_NICS), -} - -DMI_DATA_RETURN = 'smartdc' - - -class PsuedoJoyentClient(object): - def __init__(self, data=None): - if data is None: - data = MOCK_RETURNS.copy() - self.data = data - return - - def get(self, key, default=None, strip=False): - if key in self.data: - r = self.data[key] - if strip: - r = r.strip() - else: - r = default - return r - - def get_json(self, key, default=None): - result = self.get(key, default=default) - if result is None: - return default - return json.loads(result) - - def exists(self): - return True - - -class TestSmartOSDataSource(FilesystemMockingTestCase): - def setUp(self): - super(TestSmartOSDataSource, self).setUp() - - dsmos = 'cloudinit.sources.DataSourceSmartOS' - patcher = mock.patch(dsmos + ".jmc_client_factory") - self.jmc_cfact = patcher.start() - self.addCleanup(patcher.stop) - patcher = mock.patch(dsmos + ".get_smartos_environ") - self.get_smartos_environ = patcher.start() - self.addCleanup(patcher.stop) - - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.paths = c_helpers.Paths({'cloud_dir': self.tmp}) - - self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp') - os.mkdir(self.legacy_user_d) - - self.orig_lud = DataSourceSmartOS.LEGACY_USER_D - DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d - - def tearDown(self): - DataSourceSmartOS.LEGACY_USER_D = self.orig_lud - super(TestSmartOSDataSource, self).tearDown() - - def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM, - sys_cfg=None, ds_cfg=None): - self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata) - self.get_smartos_environ.return_value = mode - - if sys_cfg is None: - sys_cfg = {} - - if ds_cfg is not None: - sys_cfg['datasource'] = sys_cfg.get('datasource', {}) - sys_cfg['datasource']['SmartOS'] = ds_cfg - - return DataSourceSmartOS.DataSourceSmartOS( - sys_cfg, distro=None, paths=self.paths) - - def test_no_base64(self): - ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} - dsrc = self._get_ds(ds_cfg=ds_cfg) - ret = dsrc.get_data() - self.assertTrue(ret) - - def test_uuid(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['sdc:uuid'], - dsrc.metadata['instance-id']) - - def test_root_keys(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) - - def test_hostname_b64(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - - def test_hostname(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - - def test_userdata(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['user-data'], - dsrc.metadata['legacy-user-data']) - self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) - - def test_sdc_nics(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']), - dsrc.metadata['network-data']) - - def test_sdc_scripts(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['user-script'], - dsrc.metadata['user-script']) - - legacy_script_f = "%s/user-script" % self.legacy_user_d - self.assertTrue(os.path.exists(legacy_script_f)) - self.assertTrue(os.path.islink(legacy_script_f)) - user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] - self.assertEqual(user_script_perm, '700') - - def test_scripts_shebanged(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['user-script'], - dsrc.metadata['user-script']) - - legacy_script_f = "%s/user-script" % self.legacy_user_d - self.assertTrue(os.path.exists(legacy_script_f)) - self.assertTrue(os.path.islink(legacy_script_f)) - shebang = None - with open(legacy_script_f, 'r') as f: - shebang = f.readlines()[0].strip() - self.assertEqual(shebang, "#!/bin/bash") - user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] - self.assertEqual(user_script_perm, '700') - - def test_scripts_shebang_not_added(self): - """ - Test that the SmartOS requirement that plain text scripts - are executable. This test makes sure that plain texts scripts - with out file magic have it added appropriately by cloud-init. - """ - - my_returns = MOCK_RETURNS.copy() - my_returns['user-script'] = '\n'.join(['#!/usr/bin/perl', - 'print("hi")', '']) - - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(my_returns['user-script'], - dsrc.metadata['user-script']) - - legacy_script_f = "%s/user-script" % self.legacy_user_d - self.assertTrue(os.path.exists(legacy_script_f)) - self.assertTrue(os.path.islink(legacy_script_f)) - shebang = None - with open(legacy_script_f, 'r') as f: - shebang = f.readlines()[0].strip() - self.assertEqual(shebang, "#!/usr/bin/perl") - - def test_userdata_removed(self): - """ - User-data in the SmartOS world is supposed to be written to a file - each and every boot. This tests to make sure that in the event the - legacy user-data is removed, the existing user-data is backed-up - and there is no /var/db/user-data left. - """ - - user_data_f = "%s/mdata-user-data" % self.legacy_user_d - with open(user_data_f, 'w') as f: - f.write("PREVIOUS") - - my_returns = MOCK_RETURNS.copy() - del my_returns['user-data'] - - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertFalse(dsrc.metadata.get('legacy-user-data')) - - found_new = False - for root, _dirs, files in os.walk(self.legacy_user_d): - for name in files: - name_f = os.path.join(root, name) - permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:] - if re.match(r'.*\/mdata-user-data$', name_f): - found_new = True - print(name_f) - self.assertEqual(permissions, '400') - - self.assertFalse(found_new) - - def test_vendor_data_not_default(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['sdc:vendor-data'], - dsrc.metadata['vendor-data']) - - def test_default_vendor_data(self): - my_returns = MOCK_RETURNS.copy() - def_op_script = my_returns['sdc:vendor-data'] - del my_returns['sdc:vendor-data'] - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data']) - - # we expect default vendor-data is a boothook - self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook")) - - def test_disable_iptables_flag(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['disable_iptables_flag'], - dsrc.metadata['iptables_disable']) - - def test_motd_sys_info(self): - dsrc = self._get_ds(mockdata=MOCK_RETURNS) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'], - dsrc.metadata['motd_sys_info']) - - def test_default_ephemeral(self): - # Test to make sure that the builtin config has the ephemeral - # configuration. - dsrc = self._get_ds() - cfg = dsrc.get_config_obj() - - ret = dsrc.get_data() - self.assertTrue(ret) - - assert 'disk_setup' in cfg - assert 'fs_setup' in cfg - self.assertIsInstance(cfg['disk_setup'], dict) - self.assertIsInstance(cfg['fs_setup'], list) - - def test_override_disk_aliases(self): - # Test to make sure that the built-in DS is overriden - builtin = DataSourceSmartOS.BUILTIN_DS_CONFIG - - mydscfg = {'disk_aliases': {'FOO': '/dev/bar'}} - - # expect that these values are in builtin, or this is pointless - for k in mydscfg: - self.assertIn(k, builtin) - - dsrc = self._get_ds(ds_cfg=mydscfg) - ret = dsrc.get_data() - self.assertTrue(ret) - - self.assertEqual(mydscfg['disk_aliases']['FOO'], - dsrc.ds_cfg['disk_aliases']['FOO']) - - self.assertEqual(dsrc.device_name_to_device('FOO'), - mydscfg['disk_aliases']['FOO']) - - -class TestJoyentMetadataClient(FilesystemMockingTestCase): - - def setUp(self): - super(TestJoyentMetadataClient, self).setUp() - - self.serial = mock.MagicMock(spec=serial.Serial) - self.request_id = 0xabcdef12 - self.metadata_value = 'value' - self.response_parts = { - 'command': 'SUCCESS', - 'crc': 'b5a9ff00', - 'length': 17 + len(b64e(self.metadata_value)), - 'payload': b64e(self.metadata_value), - 'request_id': '{0:08x}'.format(self.request_id), - } - - def make_response(): - payloadstr = '' - if 'payload' in self.response_parts: - payloadstr = ' {0}'.format(self.response_parts['payload']) - return ('V2 {length} {crc} {request_id} ' - '{command}{payloadstr}\n'.format( - payloadstr=payloadstr, - **self.response_parts).encode('ascii')) - - self.metasource_data = None - - def read_response(length): - if not self.metasource_data: - self.metasource_data = make_response() - self.metasource_data_len = len(self.metasource_data) - resp = self.metasource_data[:length] - self.metasource_data = self.metasource_data[length:] - return resp - - self.serial.read.side_effect = read_response - self.patched_funcs.enter_context( - mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint', - mock.Mock(return_value=self.request_id))) - - def _get_client(self): - return DataSourceSmartOS.JoyentMetadataClient( - fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM) - - def assertEndsWith(self, haystack, prefix): - self.assertTrue(haystack.endswith(prefix), - "{0} does not end with '{1}'".format( - repr(haystack), prefix)) - - def assertStartsWith(self, haystack, prefix): - self.assertTrue(haystack.startswith(prefix), - "{0} does not start with '{1}'".format( - repr(haystack), prefix)) - - def test_get_metadata_writes_a_single_line(self): - client = self._get_client() - client.get('some_key') - self.assertEqual(1, self.serial.write.call_count) - written_line = self.serial.write.call_args[0][0] - print(type(written_line)) - self.assertEndsWith(written_line.decode('ascii'), - b'\n'.decode('ascii')) - self.assertEqual(1, written_line.count(b'\n')) - - def _get_written_line(self, key='some_key'): - client = self._get_client() - client.get(key) - return self.serial.write.call_args[0][0] - - def test_get_metadata_writes_bytes(self): - self.assertIsInstance(self._get_written_line(), six.binary_type) - - def test_get_metadata_line_starts_with_v2(self): - foo = self._get_written_line() - self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii')) - - def test_get_metadata_uses_get_command(self): - parts = self._get_written_line().decode('ascii').strip().split(' ') - self.assertEqual('GET', parts[4]) - - def test_get_metadata_base64_encodes_argument(self): - key = 'my_key' - parts = self._get_written_line(key).decode('ascii').strip().split(' ') - self.assertEqual(b64e(key), parts[5]) - - def test_get_metadata_calculates_length_correctly(self): - parts = self._get_written_line().decode('ascii').strip().split(' ') - expected_length = len(' '.join(parts[3:])) - self.assertEqual(expected_length, int(parts[1])) - - def test_get_metadata_uses_appropriate_request_id(self): - parts = self._get_written_line().decode('ascii').strip().split(' ') - request_id = parts[3] - self.assertEqual(8, len(request_id)) - self.assertEqual(request_id, request_id.lower()) - - def test_get_metadata_uses_random_number_for_request_id(self): - line = self._get_written_line() - request_id = line.decode('ascii').strip().split(' ')[3] - self.assertEqual('{0:08x}'.format(self.request_id), request_id) - - def test_get_metadata_checksums_correctly(self): - parts = self._get_written_line().decode('ascii').strip().split(' ') - expected_checksum = '{0:08x}'.format( - crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff) - checksum = parts[2] - self.assertEqual(expected_checksum, checksum) - - def test_get_metadata_reads_a_line(self): - client = self._get_client() - client.get('some_key') - self.assertEqual(self.metasource_data_len, self.serial.read.call_count) - - def test_get_metadata_returns_valid_value(self): - client = self._get_client() - value = client.get('some_key') - self.assertEqual(self.metadata_value, value) - - def test_get_metadata_throws_exception_for_incorrect_length(self): - self.response_parts['length'] = 0 - client = self._get_client() - self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get, 'some_key') - - def test_get_metadata_throws_exception_for_incorrect_crc(self): - self.response_parts['crc'] = 'deadbeef' - client = self._get_client() - self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get, 'some_key') - - def test_get_metadata_throws_exception_for_request_id_mismatch(self): - self.response_parts['request_id'] = 'deadbeef' - client = self._get_client() - client._checksum = lambda _: self.response_parts['crc'] - self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get, 'some_key') - - def test_get_metadata_returns_None_if_value_not_found(self): - self.response_parts['payload'] = '' - self.response_parts['command'] = 'NOTFOUND' - self.response_parts['length'] = 17 - client = self._get_client() - client._checksum = lambda _: self.response_parts['crc'] - self.assertIsNone(client.get('some_key')) - - -class TestNetworkConversion(TestCase): - - def test_convert_simple(self): - expected = { - 'version': 1, - 'config': [ - {'name': 'net0', 'type': 'physical', - 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', - 'netmask': '255.255.255.0', - 'address': '8.12.42.102/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'}, - {'name': 'net1', 'type': 'physical', - 'subnets': [{'type': 'static', 'gateway': '192.168.128.1', - 'netmask': '255.255.252.0', - 'address': '192.168.128.93/22'}], - 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]} - found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS) - self.assertEqual(expected, found) |