summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_opennebula.py
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2016-08-10 09:06:15 -0600
committerScott Moser <smoser@ubuntu.com>2016-08-10 09:06:15 -0600
commitc3c3dc693c14175e110b5fe125d4d5f98ace9700 (patch)
tree8858702c2c8a6ad4bf1bb861a4565e0a9c28e588 /tests/unittests/test_datasource/test_opennebula.py
parent5bd3493d732e5b1902872958e8681f17cbc81ce5 (diff)
downloadcloud-init-trunk.tar.gz
README: Mention move of revision control to git.HEADtrunk
cloud-init development has moved its revision control to git. It is available at https://code.launchpad.net/cloud-init Clone with git clone https://git.launchpad.net/cloud-init or git clone git+ssh://git.launchpad.net/cloud-init For more information see https://git.launchpad.net/cloud-init/tree/HACKING.rst
Diffstat (limited to 'tests/unittests/test_datasource/test_opennebula.py')
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py300
1 files changed, 0 insertions, 300 deletions
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
deleted file mode 100644
index d796f030..00000000
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ /dev/null
@@ -1,300 +0,0 @@
-from cloudinit import helpers
-from cloudinit.sources import DataSourceOpenNebula as ds
-from cloudinit import util
-from ..helpers import TestCase, populate_dir
-
-import os
-import pwd
-import shutil
-import tempfile
-import unittest
-
-
-TEST_VARS = {
- 'VAR1': 'single',
- 'VAR2': 'double word',
- 'VAR3': 'multi\nline\n',
- 'VAR4': "'single'",
- 'VAR5': "'double word'",
- 'VAR6': "'multi\nline\n'",
- 'VAR7': 'single\\t',
- 'VAR8': 'double\\tword',
- 'VAR9': 'multi\\t\nline\n',
- 'VAR10': '\\', # expect '\'
- 'VAR11': '\'', # expect '
- 'VAR12': '$', # expect $
-}
-
-INVALID_CONTEXT = ';'
-USER_DATA = '#cloud-config\napt_upgrade: true'
-SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
-HOSTNAME = 'foo.example.com'
-PUBLIC_IP = '10.0.0.3'
-
-CMD_IP_OUT = '''\
-1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
- link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
-2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
- link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff
-'''
-
-
-class TestOpenNebulaDataSource(TestCase):
- parsed_user = None
-
- def setUp(self):
- super(TestOpenNebulaDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
-
- # defaults for few tests
- self.ds = ds.DataSourceOpenNebula
- self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula")
- self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}}
-
- # we don't want 'sudo' called in tests. so we patch switch_user_cmd
- def my_switch_user_cmd(user):
- self.parsed_user = user
- return []
-
- self.switch_user_cmd_real = ds.switch_user_cmd
- ds.switch_user_cmd = my_switch_user_cmd
-
- def tearDown(self):
- ds.switch_user_cmd = self.switch_user_cmd_real
- super(TestOpenNebulaDataSource, self).tearDown()
-
- def test_get_data_non_contextdisk(self):
- orig_find_devs_with = util.find_devs_with
- try:
- # dont' try to lookup for CDs
- util.find_devs_with = lambda n: []
- dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertFalse(ret)
- finally:
- util.find_devs_with = orig_find_devs_with
-
- def test_get_data_broken_contextdisk(self):
- orig_find_devs_with = util.find_devs_with
- try:
- # dont' try to lookup for CDs
- util.find_devs_with = lambda n: []
- populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
- dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
- self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
- finally:
- util.find_devs_with = orig_find_devs_with
-
- def test_get_data_invalid_identity(self):
- orig_find_devs_with = util.find_devs_with
- try:
- # generate non-existing system user name
- sys_cfg = self.sys_cfg
- invalid_user = 'invalid'
- while not sys_cfg['datasource']['OpenNebula'].get('parseuser'):
- try:
- pwd.getpwnam(invalid_user)
- invalid_user += 'X'
- except KeyError:
- sys_cfg['datasource']['OpenNebula']['parseuser'] = \
- invalid_user
-
- # dont' try to lookup for CDs
- util.find_devs_with = lambda n: []
- populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
- dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
- finally:
- util.find_devs_with = orig_find_devs_with
-
- def test_get_data(self):
- orig_find_devs_with = util.find_devs_with
- try:
- # dont' try to lookup for CDs
- util.find_devs_with = lambda n: []
- populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
- dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- finally:
- util.find_devs_with = orig_find_devs_with
-
- def test_seed_dir_non_contextdisk(self):
- self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir,
- self.seed_dir)
-
- def test_seed_dir_empty1_context(self):
- populate_dir(self.seed_dir, {'context.sh': ''})
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertEqual(results['userdata'], None)
- self.assertEqual(results['metadata'], {})
-
- def test_seed_dir_empty2_context(self):
- populate_context_dir(self.seed_dir, {})
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertEqual(results['userdata'], None)
- self.assertEqual(results['metadata'], {})
-
- def test_seed_dir_broken_context(self):
- populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
-
- self.assertRaises(ds.BrokenContextDiskDir,
- ds.read_context_disk_dir,
- self.seed_dir)
-
- def test_context_parser(self):
- populate_context_dir(self.seed_dir, TEST_VARS)
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertTrue('metadata' in results)
- self.assertEqual(TEST_VARS, results['metadata'])
-
- def test_ssh_key(self):
- public_keys = ['first key', 'second key']
- for c in range(4):
- for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'):
- my_d = os.path.join(self.tmp, "%s-%i" % (k, c))
- populate_context_dir(my_d, {k: '\n'.join(public_keys)})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('metadata' in results)
- self.assertTrue('public-keys' in results['metadata'])
- self.assertEqual(public_keys,
- results['metadata']['public-keys'])
-
- public_keys.append(SSH_KEY % (c + 1,))
-
- def test_user_data_plain(self):
- for k in ('USER_DATA', 'USERDATA'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: USER_DATA,
- 'USERDATA_ENCODING': ''})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('userdata' in results)
- self.assertEqual(USER_DATA, results['userdata'])
-
- def test_user_data_encoding_required_for_decode(self):
- b64userdata = util.b64e(USER_DATA)
- for k in ('USER_DATA', 'USERDATA'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: b64userdata})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('userdata' in results)
- self.assertEqual(b64userdata, results['userdata'])
-
- def test_user_data_base64_encoding(self):
- for k in ('USER_DATA', 'USERDATA'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: util.b64e(USER_DATA),
- 'USERDATA_ENCODING': 'base64'})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('userdata' in results)
- self.assertEqual(USER_DATA, results['userdata'])
-
- def test_hostname(self):
- for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: PUBLIC_IP})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('metadata' in results)
- self.assertTrue('local-hostname' in results['metadata'])
- self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname'])
-
- def test_network_interfaces(self):
- populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'})
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertTrue('network-interfaces' in results)
-
- def test_find_candidates(self):
- def my_devs_with(criteria):
- return {
- "LABEL=CONTEXT": ["/dev/sdb"],
- "LABEL=CDROM": ["/dev/sr0"],
- "TYPE=iso9660": ["/dev/vdb"],
- }.get(criteria, [])
-
- orig_find_devs_with = util.find_devs_with
- try:
- util.find_devs_with = my_devs_with
- self.assertEqual(["/dev/sdb", "/dev/sr0", "/dev/vdb"],
- ds.find_candidate_devs())
- finally:
- util.find_devs_with = orig_find_devs_with
-
-
-class TestOpenNebulaNetwork(unittest.TestCase):
-
- def setUp(self):
- super(TestOpenNebulaNetwork, self).setUp()
-
- def test_lo(self):
- net = ds.OpenNebulaNetwork('', {})
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-''')
-
- def test_eth0(self):
- net = ds.OpenNebulaNetwork(CMD_IP_OUT, {})
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-
-auto eth0
-iface eth0 inet static
- address 10.18.1.1
- network 10.18.1.0
- netmask 255.255.255.0
-''')
-
- def test_eth0_override(self):
- context = {
- 'DNS': '1.2.3.8',
- 'ETH0_IP': '1.2.3.4',
- 'ETH0_NETWORK': '1.2.3.0',
- 'ETH0_MASK': '255.255.0.0',
- 'ETH0_GATEWAY': '1.2.3.5',
- 'ETH0_DOMAIN': 'example.com',
- 'ETH0_DNS': '1.2.3.6 1.2.3.7'
- }
-
- net = ds.OpenNebulaNetwork(CMD_IP_OUT, context)
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-
-auto eth0
-iface eth0 inet static
- address 1.2.3.4
- network 1.2.3.0
- netmask 255.255.0.0
- gateway 1.2.3.5
- dns-search example.com
- dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
-''')
-
-
-class TestParseShellConfig(unittest.TestCase):
- def test_no_seconds(self):
- cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"])
- # we could test 'sleep 2', but that would make the test run slower.
- ret = ds.parse_shell_config(cfg)
- self.assertEqual(ret, {"foo": "bar", "xx": "foo"})
-
-
-def populate_context_dir(path, variables):
- data = "# Context variables generated by OpenNebula\n"
- for k, v in variables.items():
- data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
- populate_dir(path, {'context.sh': data})
-
-# vi: ts=4 expandtab