diff options
author | Zuul <zuul@review.opendev.org> | 2023-05-09 09:03:25 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-05-09 09:03:25 +0000 |
commit | 141c5ff1c3d6be65c2e9fffac033a09793b6a0ca (patch) | |
tree | 6eb15b509bca21e9d7d63f2abf975cc2ab73af36 /ironic_python_agent | |
parent | 03e88b579e3b1f1b8c631dbba772314fb4fc1441 (diff) | |
parent | e7a048ecbeb0365725c5675eadb0834fd6feabf4 (diff) | |
download | ironic-python-agent-master.tar.gz |
Diffstat (limited to 'ironic_python_agent')
-rw-r--r-- | ironic_python_agent/extensions/standby.py | 47 | ||||
-rw-r--r-- | ironic_python_agent/tests/unit/extensions/test_standby.py | 128 |
2 files changed, 169 insertions, 6 deletions
diff --git a/ironic_python_agent/extensions/standby.py b/ironic_python_agent/extensions/standby.py index 90affd75..3841a949 100644 --- a/ironic_python_agent/extensions/standby.py +++ b/ironic_python_agent/extensions/standby.py @@ -14,6 +14,7 @@ import hashlib import os +import re import tempfile import time from urllib import parse as urlparse @@ -107,6 +108,24 @@ def _is_checksum_url(checksum): return False +MD5_MATCH = r"^([a-fA-F\d]{32})\s" # MD5 at beginning of line +MD5_MATCH_END = r"\s([a-fA-F\d]{32})$" # MD5 at end of line +MD5_MATCH_ONLY = r"^([a-fA-F\d]{32})$" # MD5 only +SHA256_MATCH = r"^([a-fA-F\d]{64})\s" # SHA256 at beginning of line +SHA256_MATCH_END = r"\s([a-fA-F\d]{64})$" # SHA256 at end of line +SHA256_MATCH_ONLY = r"^([a-fA-F\d]{64})$" # SHA256 only +SHA512_MATCH = r"^([a-fA-F\d]{128})\s" # SHA512 at beginning of line +SHA512_MATCH_END = r"\s([a-fA-F\d]{128})$" # SHA512 at end of line +SHA512_MATCH_ONLY = r"^([a-fA-F\d]{128})$" # SHA512 only +FILENAME_MATCH_END = r"\s[*]?{filename}$" # Filename binary/text end of line +FILENAME_MATCH_PARENTHESES = r"\s\({filename}\)\s" # CentOS images + +CHECKSUM_MATCHERS = (MD5_MATCH, MD5_MATCH_END, SHA256_MATCH, SHA256_MATCH_END, + SHA512_MATCH, SHA512_MATCH_END) +CHECKSUM_ONLY_MATCHERS = (MD5_MATCH_ONLY, SHA256_MATCH_ONLY, SHA512_MATCH_ONLY) +FILENAME_MATCHERS = (FILENAME_MATCH_END, FILENAME_MATCH_PARENTHESES) + + def _fetch_checksum(checksum, image_info): """Fetch checksum from remote location, if needed.""" if not _is_checksum_url(checksum): @@ -121,17 +140,33 @@ def _fetch_checksum(checksum, image_info): elif len(lines) == 1: # Special case - checksums file with only the checksum itself if ' ' not in lines[0]: - return lines[0] + for matcher in CHECKSUM_ONLY_MATCHERS: + checksum = re.findall(matcher, lines[0]) + if checksum: + return checksum[0] + raise errors.ImageDownloadError( + checksum, ("Invalid checksum file (No valid checksum found) %s" + % lines)) # FIXME(dtantsur): can we assume the same name for all images? expected_fname = os.path.basename(urlparse.urlparse( image_info['urls'][0]).path) for line in lines: - checksum, fname = line.strip().split(None, 1) - # The star symbol designates binary mode, which is the same as text - # mode on GNU systems. - if fname.strip().lstrip('*') == expected_fname: - return checksum.strip() + # Ignore comment lines + if line.startswith("#"): + continue + + # Ignore checksums for other files + for matcher in FILENAME_MATCHERS: + if re.findall(matcher.format(filename=expected_fname), line): + break + else: + continue + + for matcher in CHECKSUM_MATCHERS: + checksum = re.findall(matcher, line) + if checksum: + return checksum[0] raise errors.ImageDownloadError( checksum, "Checksum file does not contain name %s" % expected_fname) diff --git a/ironic_python_agent/tests/unit/extensions/test_standby.py b/ironic_python_agent/tests/unit/extensions/test_standby.py index 195336df..07e0d89d 100644 --- a/ironic_python_agent/tests/unit/extensions/test_standby.py +++ b/ironic_python_agent/tests/unit/extensions/test_standby.py @@ -1690,6 +1690,113 @@ foobar irrelevant file.img ]) self.assertEqual(fake_cs, image_download._hash_algo.hexdigest()) + def test_download_image_and_centos_checksum_md5(self, requests_mock, + hash_mock): + content = ['SpongeBob', 'SquarePants'] + fake_cs = "019fe036425da1c562f2e9f5299820bf" + cs_response = mock.Mock() + cs_response.status_code = 200 + cs_response.text = """ +# centos-image.img: 1005593088 bytes +MD5 (centos-image.img) = %s +""" % fake_cs + response = mock.Mock() + response.status_code = 200 + response.iter_content.return_value = content + requests_mock.side_effect = [cs_response, response] + + image_info = _build_fake_image_info( + 'http://example.com/path/centos-image.img') + image_info['checksum'] = 'http://example.com/checksum' + del image_info['os_hash_algo'] + del image_info['os_hash_value'] + CONF.set_override('md5_enabled', True) + hash_mock.return_value.hexdigest.return_value = fake_cs + image_download = standby.ImageDownload(image_info) + + self.assertEqual(content, list(image_download)) + requests_mock.assert_has_calls([ + mock.call('http://example.com/checksum', cert=None, + verify=True, + stream=True, proxies={}, timeout=60), + mock.call(image_info['urls'][0], cert=None, verify=True, + stream=True, proxies={}, timeout=60), + ]) + self.assertEqual(fake_cs, image_download._hash_algo.hexdigest()) + + def test_download_image_and_centos_checksum_sha256(self, requests_mock, + hash_mock): + content = ['SpongeBob', 'SquarePants'] + fake_cs = ('3b678e4fb651d450f4970e1647abc9b0a38bff3febd3d558753' + '623c66369a633') + cs_response = mock.Mock() + cs_response.status_code = 200 + cs_response.text = """ +# centos-image.img: 1005593088 bytes +SHA256 (centos-image.img) = %s +""" % fake_cs + response = mock.Mock() + response.status_code = 200 + response.iter_content.return_value = iter(content) + requests_mock.side_effect = [cs_response, response] + + image_info = _build_fake_image_info( + 'http://example.com/path/centos-image.img') + image_info['checksum'] = 'http://example.com/checksum' + del image_info['os_hash_algo'] + del image_info['os_hash_value'] + hash_mock.return_value.hexdigest.return_value = fake_cs + image_download = standby.ImageDownload(image_info) + + self.assertEqual(content, list(image_download)) + requests_mock.assert_has_calls([ + mock.call('http://example.com/checksum', cert=None, + verify=True, + stream=True, proxies={}, timeout=60), + mock.call(image_info['urls'][0], cert=None, verify=True, + stream=True, proxies={}, timeout=60), + ]) + self.assertEqual(fake_cs, image_download._hash_algo.hexdigest()) + hash_mock.assert_has_calls([ + mock.call('sha256')]) + + def test_download_image_and_centos_checksum_sha512(self, requests_mock, + hash_mock): + content = ['SpongeBob', 'SquarePants'] + fake_cs = ('3b678e4fb651d450f4970e1647abc9b0a38bff3febd3d558753' + '623c66369a6333b678e4fb651d450f4970e1647abc9b0a38b' + 'ff3febd3d558753623c66369a633') + cs_response = mock.Mock() + cs_response.status_code = 200 + cs_response.text = """ +# centos-image.img: 1005593088 bytes +SHA512 (centos-image.img) = %s +""" % fake_cs + response = mock.Mock() + response.status_code = 200 + response.iter_content.return_value = iter(content) + requests_mock.side_effect = [cs_response, response] + + image_info = _build_fake_image_info( + 'http://example.com/path/centos-image.img') + image_info['checksum'] = 'http://example.com/checksum' + del image_info['os_hash_algo'] + del image_info['os_hash_value'] + hash_mock.return_value.hexdigest.return_value = fake_cs + image_download = standby.ImageDownload(image_info) + + self.assertEqual(content, list(image_download)) + requests_mock.assert_has_calls([ + mock.call('http://example.com/checksum', cert=None, + verify=True, + stream=True, proxies={}, timeout=60), + mock.call(image_info['urls'][0], cert=None, verify=True, + stream=True, proxies={}, timeout=60), + ]) + self.assertEqual(fake_cs, image_download._hash_algo.hexdigest()) + hash_mock.assert_has_calls([ + mock.call('sha512')]) + def test_download_image_and_checksum_multiple_sha256(self, requests_mock, hash_mock): content = ['SpongeBob', 'SquarePants'] @@ -1900,3 +2007,24 @@ foobar irrelevant file.img 'Received status code 400 from ' 'http://example.com/checksum', standby.ImageDownload, image_info) + + def test_download_image_and_invalid_checksum(self, requests_mock, + hash_mock): + content = ['SpongeBob', 'SquarePants'] + fake_cs = "invalid" + cs_response = mock.Mock() + cs_response.status_code = 200 + cs_response.text = fake_cs + '\n' + response = mock.Mock() + response.status_code = 200 + response.iter_content.return_value = content + requests_mock.side_effect = [cs_response, response] + + image_info = _build_fake_image_info( + 'http://example.com/path/image.img') + image_info['os_hash_algo'] = 'sha512' + image_info['os_hash_value'] = 'http://example.com/checksum' + self.assertRaisesRegex( + errors.ImageDownloadError, + r"Invalid checksum file \(No valid checksum found\) \['invalid'\]", + standby.ImageDownload, image_info) |