diff options
Diffstat (limited to 'tests/unittests/test_datasource/test_opennebula.py')
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 300 |
1 files changed, 0 insertions, 300 deletions
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py deleted file mode 100644 index d796f030..00000000 --- a/tests/unittests/test_datasource/test_opennebula.py +++ /dev/null @@ -1,300 +0,0 @@ -from cloudinit import helpers -from cloudinit.sources import DataSourceOpenNebula as ds -from cloudinit import util -from ..helpers import TestCase, populate_dir - -import os -import pwd -import shutil -import tempfile -import unittest - - -TEST_VARS = { - 'VAR1': 'single', - 'VAR2': 'double word', - 'VAR3': 'multi\nline\n', - 'VAR4': "'single'", - 'VAR5': "'double word'", - 'VAR6': "'multi\nline\n'", - 'VAR7': 'single\\t', - 'VAR8': 'double\\tword', - 'VAR9': 'multi\\t\nline\n', - 'VAR10': '\\', # expect '\' - 'VAR11': '\'', # expect ' - 'VAR12': '$', # expect $ -} - -INVALID_CONTEXT = ';' -USER_DATA = '#cloud-config\napt_upgrade: true' -SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i' -HOSTNAME = 'foo.example.com' -PUBLIC_IP = '10.0.0.3' - -CMD_IP_OUT = '''\ -1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN - link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 -2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000 - link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff -''' - - -class TestOpenNebulaDataSource(TestCase): - parsed_user = None - - def setUp(self): - super(TestOpenNebulaDataSource, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.paths = helpers.Paths({'cloud_dir': self.tmp}) - - # defaults for few tests - self.ds = ds.DataSourceOpenNebula - self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula") - self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}} - - # we don't want 'sudo' called in tests. so we patch switch_user_cmd - def my_switch_user_cmd(user): - self.parsed_user = user - return [] - - self.switch_user_cmd_real = ds.switch_user_cmd - ds.switch_user_cmd = my_switch_user_cmd - - def tearDown(self): - ds.switch_user_cmd = self.switch_user_cmd_real - super(TestOpenNebulaDataSource, self).tearDown() - - def test_get_data_non_contextdisk(self): - orig_find_devs_with = util.find_devs_with - try: - # dont' try to lookup for CDs - util.find_devs_with = lambda n: [] - dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertFalse(ret) - finally: - util.find_devs_with = orig_find_devs_with - - def test_get_data_broken_contextdisk(self): - orig_find_devs_with = util.find_devs_with - try: - # dont' try to lookup for CDs - util.find_devs_with = lambda n: [] - populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT}) - dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) - self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data) - finally: - util.find_devs_with = orig_find_devs_with - - def test_get_data_invalid_identity(self): - orig_find_devs_with = util.find_devs_with - try: - # generate non-existing system user name - sys_cfg = self.sys_cfg - invalid_user = 'invalid' - while not sys_cfg['datasource']['OpenNebula'].get('parseuser'): - try: - pwd.getpwnam(invalid_user) - invalid_user += 'X' - except KeyError: - sys_cfg['datasource']['OpenNebula']['parseuser'] = \ - invalid_user - - # dont' try to lookup for CDs - util.find_devs_with = lambda n: [] - populate_context_dir(self.seed_dir, {'KEY1': 'val1'}) - dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) - self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data) - finally: - util.find_devs_with = orig_find_devs_with - - def test_get_data(self): - orig_find_devs_with = util.find_devs_with - try: - # dont' try to lookup for CDs - util.find_devs_with = lambda n: [] - populate_context_dir(self.seed_dir, {'KEY1': 'val1'}) - dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) - ret = dsrc.get_data() - self.assertTrue(ret) - finally: - util.find_devs_with = orig_find_devs_with - - def test_seed_dir_non_contextdisk(self): - self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, - self.seed_dir) - - def test_seed_dir_empty1_context(self): - populate_dir(self.seed_dir, {'context.sh': ''}) - results = ds.read_context_disk_dir(self.seed_dir) - - self.assertEqual(results['userdata'], None) - self.assertEqual(results['metadata'], {}) - - def test_seed_dir_empty2_context(self): - populate_context_dir(self.seed_dir, {}) - results = ds.read_context_disk_dir(self.seed_dir) - - self.assertEqual(results['userdata'], None) - self.assertEqual(results['metadata'], {}) - - def test_seed_dir_broken_context(self): - populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT}) - - self.assertRaises(ds.BrokenContextDiskDir, - ds.read_context_disk_dir, - self.seed_dir) - - def test_context_parser(self): - populate_context_dir(self.seed_dir, TEST_VARS) - results = ds.read_context_disk_dir(self.seed_dir) - - self.assertTrue('metadata' in results) - self.assertEqual(TEST_VARS, results['metadata']) - - def test_ssh_key(self): - public_keys = ['first key', 'second key'] - for c in range(4): - for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'): - my_d = os.path.join(self.tmp, "%s-%i" % (k, c)) - populate_context_dir(my_d, {k: '\n'.join(public_keys)}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('metadata' in results) - self.assertTrue('public-keys' in results['metadata']) - self.assertEqual(public_keys, - results['metadata']['public-keys']) - - public_keys.append(SSH_KEY % (c + 1,)) - - def test_user_data_plain(self): - for k in ('USER_DATA', 'USERDATA'): - my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: USER_DATA, - 'USERDATA_ENCODING': ''}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('userdata' in results) - self.assertEqual(USER_DATA, results['userdata']) - - def test_user_data_encoding_required_for_decode(self): - b64userdata = util.b64e(USER_DATA) - for k in ('USER_DATA', 'USERDATA'): - my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: b64userdata}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('userdata' in results) - self.assertEqual(b64userdata, results['userdata']) - - def test_user_data_base64_encoding(self): - for k in ('USER_DATA', 'USERDATA'): - my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: util.b64e(USER_DATA), - 'USERDATA_ENCODING': 'base64'}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('userdata' in results) - self.assertEqual(USER_DATA, results['userdata']) - - def test_hostname(self): - for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'): - my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: PUBLIC_IP}) - results = ds.read_context_disk_dir(my_d) - - self.assertTrue('metadata' in results) - self.assertTrue('local-hostname' in results['metadata']) - self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname']) - - def test_network_interfaces(self): - populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'}) - results = ds.read_context_disk_dir(self.seed_dir) - - self.assertTrue('network-interfaces' in results) - - def test_find_candidates(self): - def my_devs_with(criteria): - return { - "LABEL=CONTEXT": ["/dev/sdb"], - "LABEL=CDROM": ["/dev/sr0"], - "TYPE=iso9660": ["/dev/vdb"], - }.get(criteria, []) - - orig_find_devs_with = util.find_devs_with - try: - util.find_devs_with = my_devs_with - self.assertEqual(["/dev/sdb", "/dev/sr0", "/dev/vdb"], - ds.find_candidate_devs()) - finally: - util.find_devs_with = orig_find_devs_with - - -class TestOpenNebulaNetwork(unittest.TestCase): - - def setUp(self): - super(TestOpenNebulaNetwork, self).setUp() - - def test_lo(self): - net = ds.OpenNebulaNetwork('', {}) - self.assertEqual(net.gen_conf(), u'''\ -auto lo -iface lo inet loopback -''') - - def test_eth0(self): - net = ds.OpenNebulaNetwork(CMD_IP_OUT, {}) - self.assertEqual(net.gen_conf(), u'''\ -auto lo -iface lo inet loopback - -auto eth0 -iface eth0 inet static - address 10.18.1.1 - network 10.18.1.0 - netmask 255.255.255.0 -''') - - def test_eth0_override(self): - context = { - 'DNS': '1.2.3.8', - 'ETH0_IP': '1.2.3.4', - 'ETH0_NETWORK': '1.2.3.0', - 'ETH0_MASK': '255.255.0.0', - 'ETH0_GATEWAY': '1.2.3.5', - 'ETH0_DOMAIN': 'example.com', - 'ETH0_DNS': '1.2.3.6 1.2.3.7' - } - - net = ds.OpenNebulaNetwork(CMD_IP_OUT, context) - self.assertEqual(net.gen_conf(), u'''\ -auto lo -iface lo inet loopback - -auto eth0 -iface eth0 inet static - address 1.2.3.4 - network 1.2.3.0 - netmask 255.255.0.0 - gateway 1.2.3.5 - dns-search example.com - dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7 -''') - - -class TestParseShellConfig(unittest.TestCase): - def test_no_seconds(self): - cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"]) - # we could test 'sleep 2', but that would make the test run slower. - ret = ds.parse_shell_config(cfg) - self.assertEqual(ret, {"foo": "bar", "xx": "foo"}) - - -def populate_context_dir(path, variables): - data = "# Context variables generated by OpenNebula\n" - for k, v in variables.items(): - data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''"))) - populate_dir(path, {'context.sh': data}) - -# vi: ts=4 expandtab |