diff options
Diffstat (limited to 'tests/unittests/sources/test_vmware.py')
-rw-r--r-- | tests/unittests/sources/test_vmware.py | 709 |
1 files changed, 707 insertions, 2 deletions
diff --git a/tests/unittests/sources/test_vmware.py b/tests/unittests/sources/test_vmware.py index b3663b0a..4911e5bc 100644 --- a/tests/unittests/sources/test_vmware.py +++ b/tests/unittests/sources/test_vmware.py @@ -1,6 +1,7 @@ -# Copyright (c) 2021 VMware, Inc. All Rights Reserved. +# Copyright (c) 2021-2022 VMware, Inc. All Rights Reserved. # # Authors: Andrew Kutz <akutz@vmware.com> +# Pengpeng Sun <pengpengs@vmware.com> # # This file is part of cloud-init. See LICENSE file for license information. @@ -8,18 +9,22 @@ import base64 import gzip import os from contextlib import ExitStack +from textwrap import dedent import pytest -from cloudinit import dmi, helpers, safeyaml, settings +from cloudinit import dmi, helpers, safeyaml, settings, util from cloudinit.sources import DataSourceVMware +from cloudinit.sources.helpers.vmware.imc import guestcust_util from tests.unittests.helpers import ( CiTestCase, FilesystemMockingTestCase, mock, populate_dir, + wrap_and_call, ) +MPATH = "cloudinit.sources.DataSourceVMware." PRODUCT_NAME_FILE_PATH = "/sys/class/dmi/id/product_name" PRODUCT_NAME = "VMware7,1" PRODUCT_UUID = "82343CED-E4C7-423B-8F6B-0D34D19067AB" @@ -490,6 +495,706 @@ class TestDataSourceVMwareGuestInfo_InvalidPlatform(FilesystemMockingTestCase): self.assertFalse(ret) +class TestDataSourceVMwareIMC(CiTestCase): + """ + Test the VMware Guest OS Customization transport + """ + + with_logs = True + + def setUp(self): + super(TestDataSourceVMwareIMC, self).setUp() + self.datasource = DataSourceVMware.DataSourceVMware + self.tdir = self.tmp_dir() + + def test_get_data_false_on_none_dmi_data(self): + """When dmi for system-product-name is None, get_data returns False.""" + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource(sys_cfg={}, distro={}, paths=paths) + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": None, + }, + ds.get_data, + ) + self.assertFalse(result, "Expected False return from ds.get_data") + self.assertIn("No system-product-name found", self.logs.getvalue()) + + def test_get_imc_data_vmware_customization_disabled(self): + """ + When vmware customization is disabled via sys_cfg and + allow_raw_data is disabled via ds_cfg, log a message. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={ + "disable_vmware_customization": True, + "datasource": {"VMware": {"allow_raw_data": False}}, + }, + distro={}, + paths=paths, + ) + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [MISC] + MARKER-ID = 12345345 + """ + ) + util.write_file(conf_file, conf_content) + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + self.assertIn( + "Customization for VMware platform is disabled", + self.logs.getvalue(), + ) + + def test_get_imc_data_vmware_customization_sys_cfg_disabled(self): + """ + When vmware customization is disabled via sys_cfg and + no meta data is found, log a message. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={ + "disable_vmware_customization": True, + "datasource": {"VMware": {"allow_raw_data": True}}, + }, + distro={}, + paths=paths, + ) + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [MISC] + MARKER-ID = 12345345 + """ + ) + util.write_file(conf_file, conf_content) + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + self.assertIn( + "No allowed customization configuration data found", + self.logs.getvalue(), + ) + + def test_get_imc_data_allow_raw_data_disabled(self): + """ + When allow_raw_data is disabled via ds_cfg and + meta data is found, log a message. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={ + "disable_vmware_customization": False, + "datasource": {"VMware": {"allow_raw_data": False}}, + }, + distro={}, + paths=paths, + ) + + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CLOUDINIT] + METADATA = test-meta + """ + ) + util.write_file(conf_file, conf_content) + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + self.assertIn( + "No allowed customization configuration data found", + self.logs.getvalue(), + ) + + def test_get_imc_data_vmware_customization_enabled(self): + """ + When cloud-init workflow for vmware is enabled via sys_cfg log a + message. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": False}, + distro={}, + paths=paths, + ) + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CUSTOM-SCRIPT] + SCRIPT-NAME = test-script + [MISC] + MARKER-ID = 12345345 + """ + ) + util.write_file(conf_file, conf_content) + with mock.patch( + MPATH + "guestcust_util.get_tools_config", + return_value="true", + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + custom_script = self.tmp_path("test-script", self.tdir) + self.assertIn( + "Script %s not found!!" % custom_script, + self.logs.getvalue(), + ) + + def test_get_imc_data_cust_script_disabled(self): + """ + If custom script is disabled by VMware tools configuration, + log a message. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": False}, + distro={}, + paths=paths, + ) + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CUSTOM-SCRIPT] + SCRIPT-NAME = test-script + [MISC] + MARKER-ID = 12345346 + """ + ) + util.write_file(conf_file, conf_content) + # Prepare the custom sript + customscript = self.tmp_path("test-script", self.tdir) + util.write_file(customscript, "This is the post cust script") + + with mock.patch( + MPATH + "guestcust_util.get_tools_config", + return_value="invalid", + ): + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + self.assertIn( + "Custom script is disabled by VM Administrator", + self.logs.getvalue(), + ) + + def test_get_imc_data_cust_script_enabled(self): + """ + If custom script is enabled by VMware tools configuration, + execute the script. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": False}, + distro={}, + paths=paths, + ) + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CUSTOM-SCRIPT] + SCRIPT-NAME = test-script + [MISC] + MARKER-ID = 12345346 + """ + ) + util.write_file(conf_file, conf_content) + + # Mock custom script is enabled by return true when calling + # get_tools_config + with mock.patch( + MPATH + "guestcust_util.get_tools_config", + return_value="true", + ): + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + # Verify custom script is trying to be executed + custom_script = self.tmp_path("test-script", self.tdir) + self.assertIn( + "Script %s not found!!" % custom_script, + self.logs.getvalue(), + ) + + def test_get_imc_data_force_run_post_script_is_yes(self): + """ + If DEFAULT-RUN-POST-CUST-SCRIPT is yes, custom script could run if + enable-custom-scripts is not defined in VM Tools configuration + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": False}, + distro={}, + paths=paths, + ) + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + # set DEFAULT-RUN-POST-CUST-SCRIPT = yes so that enable-custom-scripts + # default value is TRUE + conf_content = dedent( + """\ + [CUSTOM-SCRIPT] + SCRIPT-NAME = test-script + [MISC] + MARKER-ID = 12345346 + DEFAULT-RUN-POST-CUST-SCRIPT = yes + """ + ) + util.write_file(conf_file, conf_content) + + # Mock get_tools_config(section, key, defaultVal) to return + # defaultVal + def my_get_tools_config(*args, **kwargs): + return args[2] + + with mock.patch( + MPATH + "guestcust_util.get_tools_config", + side_effect=my_get_tools_config, + ): + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + # Verify custom script still runs although it is + # disabled by VMware Tools + custom_script = self.tmp_path("test-script", self.tdir) + self.assertIn( + "Script %s not found!!" % custom_script, + self.logs.getvalue(), + ) + + def test_get_data_cloudinit_metadata_json(self): + """ + Test metadata can be loaded to cloud-init metadata and network. + The metadata format is json. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": True}, + distro={}, + paths=paths, + ) + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CLOUDINIT] + METADATA = test-meta + """ + ) + util.write_file(conf_file, conf_content) + # Prepare the meta data file + metadata_file = self.tmp_path("test-meta", self.tdir) + metadata_content = dedent( + """\ + { + "instance-id": "cloud-vm", + "local-hostname": "my-host.domain.com", + "network": { + "version": 2, + "ethernets": { + "eths": { + "match": { + "name": "ens*" + }, + "dhcp4": true + } + } + } + } + """ + ) + util.write_file(metadata_file, metadata_content) + + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + "guestcust_util.get_imc_dir_path": self.tdir, + }, + ds._get_data, + ) + self.assertTrue(result) + self.assertEqual("cloud-vm", ds.metadata["instance-id"]) + self.assertEqual("my-host.domain.com", ds.metadata["local-hostname"]) + self.assertEqual(2, ds.network_config["version"]) + self.assertTrue(ds.network_config["ethernets"]["eths"]["dhcp4"]) + + def test_get_data_cloudinit_metadata_yaml(self): + """ + Test metadata can be loaded to cloud-init metadata and network. + The metadata format is yaml. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": True}, + distro={}, + paths=paths, + ) + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CLOUDINIT] + METADATA = test-meta + """ + ) + util.write_file(conf_file, conf_content) + # Prepare the meta data file + metadata_file = self.tmp_path("test-meta", self.tdir) + metadata_content = dedent( + """\ + instance-id: cloud-vm + local-hostname: my-host.domain.com + network: + version: 2 + ethernets: + nics: + match: + name: ens* + dhcp4: yes + """ + ) + util.write_file(metadata_file, metadata_content) + + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + "guestcust_util.get_imc_dir_path": self.tdir, + }, + ds._get_data, + ) + self.assertTrue(result) + self.assertEqual("cloud-vm", ds.metadata["instance-id"]) + self.assertEqual("my-host.domain.com", ds.metadata["local-hostname"]) + self.assertEqual(2, ds.network_config["version"]) + self.assertTrue(ds.network_config["ethernets"]["nics"]["dhcp4"]) + + def test_get_imc_data_cloudinit_metadata_not_valid(self): + """ + Test metadata is not JSON or YAML format, log a message + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": True}, + distro={}, + paths=paths, + ) + + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CLOUDINIT] + METADATA = test-meta + """ + ) + util.write_file(conf_file, conf_content) + + # Prepare the meta data file + metadata_file = self.tmp_path("test-meta", self.tdir) + metadata_content = "[This is not json or yaml format]a=b" + util.write_file(metadata_file, metadata_content) + + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + "guestcust_util.get_imc_dir_path": self.tdir, + }, + ds.get_data, + ) + self.assertFalse(result) + self.assertIn( + "expected '<document start>', but found '<scalar>'", + self.logs.getvalue(), + ) + + def test_get_imc_data_cloudinit_metadata_not_found(self): + """ + Test metadata file can't be found, log a message + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": True}, + distro={}, + paths=paths, + ) + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CLOUDINIT] + METADATA = test-meta + """ + ) + util.write_file(conf_file, conf_content) + # Don't prepare the meta data file + + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + "guestcust_util.get_imc_dir_path": self.tdir, + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + self.assertIn("Meta data file is not found", self.logs.getvalue()) + + def test_get_data_cloudinit_userdata(self): + """ + Test user data can be loaded to cloud-init user data. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": False}, + distro={}, + paths=paths, + ) + + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CLOUDINIT] + METADATA = test-meta + USERDATA = test-user + """ + ) + util.write_file(conf_file, conf_content) + + # Prepare the meta data file + metadata_file = self.tmp_path("test-meta", self.tdir) + metadata_content = dedent( + """\ + instance-id: cloud-vm + local-hostname: my-host.domain.com + network: + version: 2 + ethernets: + nics: + match: + name: ens* + dhcp4: yes + """ + ) + util.write_file(metadata_file, metadata_content) + + # Prepare the user data file + userdata_file = self.tmp_path("test-user", self.tdir) + userdata_content = "This is the user data" + util.write_file(userdata_file, userdata_content) + + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + "guestcust_util.get_imc_dir_path": self.tdir, + }, + ds._get_data, + ) + self.assertTrue(result) + self.assertEqual("cloud-vm", ds.metadata["instance-id"]) + self.assertEqual(userdata_content, ds.userdata_raw) + + def test_get_imc_data_cloudinit_userdata_not_found(self): + """ + Test userdata file can't be found. + """ + paths = helpers.Paths({"cloud_dir": self.tdir}) + ds = self.datasource( + sys_cfg={"disable_vmware_customization": True}, + distro={}, + paths=paths, + ) + + # Prepare the conf file + conf_file = self.tmp_path("test-cust", self.tdir) + conf_content = dedent( + """\ + [CLOUDINIT] + METADATA = test-meta + USERDATA = test-user + """ + ) + util.write_file(conf_file, conf_content) + + # Prepare the meta data file + metadata_file = self.tmp_path("test-meta", self.tdir) + metadata_content = dedent( + """\ + instance-id: cloud-vm + local-hostname: my-host.domain.com + network: + version: 2 + ethernets: + nics: + match: + name: ens* + dhcp4: yes + """ + ) + util.write_file(metadata_file, metadata_content) + + # Don't prepare the user data file + + with mock.patch( + MPATH + "guestcust_util.set_customization_status", + return_value=("msg", b""), + ): + result = wrap_and_call( + "cloudinit.sources.DataSourceVMware", + { + "dmi.read_dmi_data": "vmware", + "util.del_dir": True, + "guestcust_util.search_file": self.tdir, + "guestcust_util.wait_for_cust_cfg_file": conf_file, + "guestcust_util.get_imc_dir_path": self.tdir, + }, + ds.get_imc_data_fn, + ) + self.assertEqual(result, (None, None, None)) + self.assertIn("Userdata file is not found", self.logs.getvalue()) + + +class TestDataSourceVMwareIMC_MarkerFiles(CiTestCase): + def setUp(self): + super(TestDataSourceVMwareIMC_MarkerFiles, self).setUp() + self.tdir = self.tmp_dir() + + def test_false_when_markerid_none(self): + """Return False when markerid provided is None.""" + self.assertFalse( + guestcust_util.check_marker_exists( + markerid=None, marker_dir=self.tdir + ) + ) + + def test_markerid_file_exist(self): + """Return False when markerid file path does not exist, + True otherwise.""" + self.assertFalse(guestcust_util.check_marker_exists("123", self.tdir)) + marker_file = self.tmp_path(".markerfile-123.txt", self.tdir) + util.write_file(marker_file, "") + self.assertTrue(guestcust_util.check_marker_exists("123", self.tdir)) + + def test_marker_file_setup(self): + """Test creation of marker files.""" + markerfilepath = self.tmp_path(".markerfile-hi.txt", self.tdir) + self.assertFalse(os.path.exists(markerfilepath)) + guestcust_util.setup_marker_files(marker_id="hi", marker_dir=self.tdir) + self.assertTrue(os.path.exists(markerfilepath)) + + def assert_metadata(test_obj, ds, metadata): test_obj.assertEqual(metadata.get("instance-id"), ds.get_instance_id()) test_obj.assertEqual( |