summaryrefslogtreecommitdiff
path: root/tests/unittests/sources
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/sources')
-rw-r--r--tests/unittests/sources/azure/test_imds.py491
-rw-r--r--tests/unittests/sources/helpers/test_cloudsigma.py67
-rw-r--r--tests/unittests/sources/test_aliyun.py13
-rw-r--r--tests/unittests/sources/test_azure.py952
-rw-r--r--tests/unittests/sources/test_azure_helper.py68
-rw-r--r--tests/unittests/sources/test_ec2.py19
-rw-r--r--tests/unittests/sources/test_init.py10
-rw-r--r--tests/unittests/sources/test_lxd.py55
-rw-r--r--tests/unittests/sources/test_opennebula.py4
-rw-r--r--tests/unittests/sources/test_openstack.py118
-rw-r--r--tests/unittests/sources/test_ovf.py745
-rw-r--r--tests/unittests/sources/test_vmware.py709
-rw-r--r--tests/unittests/sources/test_vultr.py76
-rw-r--r--tests/unittests/sources/vmware/test_vmware_config_file.py44
14 files changed, 1781 insertions, 1590 deletions
diff --git a/tests/unittests/sources/azure/test_imds.py b/tests/unittests/sources/azure/test_imds.py
new file mode 100644
index 00000000..b5a72645
--- /dev/null
+++ b/tests/unittests/sources/azure/test_imds.py
@@ -0,0 +1,491 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import json
+import logging
+import math
+from unittest import mock
+
+import pytest
+import requests
+
+from cloudinit.sources.azure import imds
+from cloudinit.url_helper import UrlError
+
+MOCKPATH = "cloudinit.sources.azure.imds."
+
+
+@pytest.fixture
+def mock_readurl():
+ with mock.patch(MOCKPATH + "readurl", autospec=True) as m:
+ yield m
+
+
+@pytest.fixture
+def mock_requests_session_request():
+ with mock.patch("requests.Session.request", autospec=True) as m:
+ yield m
+
+
+@pytest.fixture
+def mock_url_helper_time_sleep():
+ with mock.patch("cloudinit.url_helper.time.sleep", autospec=True) as m:
+ yield m
+
+
+def fake_http_error_for_code(status_code: int):
+ response_failure = requests.Response()
+ response_failure.status_code = status_code
+ return requests.exceptions.HTTPError(
+ "fake error",
+ response=response_failure,
+ )
+
+
+class TestFetchMetadataWithApiFallback:
+ default_url = (
+ "http://169.254.169.254/metadata/instance?"
+ "api-version=2021-08-01&extended=true"
+ )
+ fallback_url = (
+ "http://169.254.169.254/metadata/instance?api-version=2019-06-01"
+ )
+ headers = {"Metadata": "true"}
+ retries = 10
+ timeout = 2
+
+ def test_basic(
+ self,
+ caplog,
+ mock_readurl,
+ ):
+ fake_md = {"foo": {"bar": []}}
+ mock_readurl.side_effect = [
+ mock.Mock(contents=json.dumps(fake_md).encode()),
+ ]
+
+ md = imds.fetch_metadata_with_api_fallback()
+
+ assert md == fake_md
+ assert mock_readurl.mock_calls == [
+ mock.call(
+ self.default_url,
+ timeout=self.timeout,
+ headers=self.headers,
+ retries=self.retries,
+ exception_cb=imds._readurl_exception_callback,
+ infinite=False,
+ log_req_resp=True,
+ ),
+ ]
+
+ warnings = [
+ x.message for x in caplog.records if x.levelno == logging.WARNING
+ ]
+ assert warnings == []
+
+ def test_basic_fallback(
+ self,
+ caplog,
+ mock_readurl,
+ ):
+ fake_md = {"foo": {"bar": []}}
+ mock_readurl.side_effect = [
+ UrlError("No IMDS version", code=400),
+ mock.Mock(contents=json.dumps(fake_md).encode()),
+ ]
+
+ md = imds.fetch_metadata_with_api_fallback()
+
+ assert md == fake_md
+ assert mock_readurl.mock_calls == [
+ mock.call(
+ self.default_url,
+ timeout=self.timeout,
+ headers=self.headers,
+ retries=self.retries,
+ exception_cb=imds._readurl_exception_callback,
+ infinite=False,
+ log_req_resp=True,
+ ),
+ mock.call(
+ self.fallback_url,
+ timeout=self.timeout,
+ headers=self.headers,
+ retries=self.retries,
+ exception_cb=imds._readurl_exception_callback,
+ infinite=False,
+ log_req_resp=True,
+ ),
+ ]
+
+ warnings = [
+ x.message for x in caplog.records if x.levelno == logging.WARNING
+ ]
+ assert warnings == [
+ "Failed to fetch metadata from IMDS: No IMDS version",
+ "Falling back to IMDS api-version: 2019-06-01",
+ ]
+
+ @pytest.mark.parametrize(
+ "error",
+ [
+ fake_http_error_for_code(404),
+ fake_http_error_for_code(410),
+ fake_http_error_for_code(429),
+ fake_http_error_for_code(500),
+ requests.ConnectionError("Fake connection error"),
+ requests.Timeout("Fake connection timeout"),
+ ],
+ )
+ def test_will_retry_errors(
+ self,
+ caplog,
+ mock_requests_session_request,
+ mock_url_helper_time_sleep,
+ error,
+ ):
+ fake_md = {"foo": {"bar": []}}
+ mock_requests_session_request.side_effect = [
+ error,
+ mock.Mock(content=json.dumps(fake_md)),
+ ]
+
+ md = imds.fetch_metadata_with_api_fallback()
+
+ assert md == fake_md
+ assert len(mock_requests_session_request.mock_calls) == 2
+ assert mock_url_helper_time_sleep.mock_calls == [mock.call(1)]
+
+ warnings = [
+ x.message for x in caplog.records if x.levelno == logging.WARNING
+ ]
+ assert warnings == []
+
+ def test_will_retry_errors_on_fallback(
+ self,
+ caplog,
+ mock_requests_session_request,
+ mock_url_helper_time_sleep,
+ ):
+ error = fake_http_error_for_code(400)
+ fake_md = {"foo": {"bar": []}}
+ mock_requests_session_request.side_effect = [
+ error,
+ fake_http_error_for_code(429),
+ mock.Mock(content=json.dumps(fake_md)),
+ ]
+
+ md = imds.fetch_metadata_with_api_fallback()
+
+ assert md == fake_md
+ assert len(mock_requests_session_request.mock_calls) == 3
+ assert mock_url_helper_time_sleep.mock_calls == [mock.call(1)]
+
+ warnings = [
+ x.message for x in caplog.records if x.levelno == logging.WARNING
+ ]
+ assert warnings == [
+ "Failed to fetch metadata from IMDS: fake error",
+ "Falling back to IMDS api-version: 2019-06-01",
+ ]
+
+ @pytest.mark.parametrize(
+ "error",
+ [
+ fake_http_error_for_code(404),
+ fake_http_error_for_code(410),
+ fake_http_error_for_code(429),
+ fake_http_error_for_code(500),
+ requests.ConnectionError("Fake connection error"),
+ requests.Timeout("Fake connection timeout"),
+ ],
+ )
+ def test_retry_until_failure(
+ self,
+ caplog,
+ mock_requests_session_request,
+ mock_url_helper_time_sleep,
+ error,
+ ):
+ mock_requests_session_request.side_effect = [error] * (11)
+
+ with pytest.raises(UrlError) as exc_info:
+ imds.fetch_metadata_with_api_fallback()
+
+ assert exc_info.value.cause == error
+ assert len(mock_requests_session_request.mock_calls) == (
+ self.retries + 1
+ )
+ assert (
+ mock_url_helper_time_sleep.mock_calls
+ == [mock.call(1)] * self.retries
+ )
+
+ warnings = [
+ x.message for x in caplog.records if x.levelno == logging.WARNING
+ ]
+ assert warnings == [f"Failed to fetch metadata from IMDS: {error!s}"]
+
+ @pytest.mark.parametrize(
+ "error",
+ [
+ fake_http_error_for_code(403),
+ fake_http_error_for_code(501),
+ ],
+ )
+ def test_will_not_retry_errors(
+ self,
+ caplog,
+ mock_requests_session_request,
+ mock_url_helper_time_sleep,
+ error,
+ ):
+ fake_md = {"foo": {"bar": []}}
+ mock_requests_session_request.side_effect = [
+ error,
+ mock.Mock(content=json.dumps(fake_md)),
+ ]
+
+ with pytest.raises(UrlError) as exc_info:
+ imds.fetch_metadata_with_api_fallback()
+
+ assert exc_info.value.cause == error
+ assert len(mock_requests_session_request.mock_calls) == 1
+ assert mock_url_helper_time_sleep.mock_calls == []
+
+ warnings = [
+ x.message for x in caplog.records if x.levelno == logging.WARNING
+ ]
+ assert warnings == [f"Failed to fetch metadata from IMDS: {error!s}"]
+
+ def test_non_json_repsonse(
+ self,
+ caplog,
+ mock_readurl,
+ ):
+ mock_readurl.side_effect = [
+ mock.Mock(contents=b"bad data"),
+ ]
+
+ with pytest.raises(ValueError):
+ imds.fetch_metadata_with_api_fallback()
+
+ assert mock_readurl.mock_calls == [
+ mock.call(
+ self.default_url,
+ timeout=self.timeout,
+ headers=self.headers,
+ retries=self.retries,
+ exception_cb=imds._readurl_exception_callback,
+ infinite=False,
+ log_req_resp=True,
+ ),
+ ]
+
+ warnings = [
+ x.message for x in caplog.records if x.levelno == logging.WARNING
+ ]
+ assert warnings == [
+ (
+ "Failed to parse metadata from IMDS: "
+ "Expecting value: line 1 column 1 (char 0)"
+ )
+ ]
+
+
+class TestFetchReprovisionData:
+ url = (
+ "http://169.254.169.254/metadata/"
+ "reprovisiondata?api-version=2019-06-01"
+ )
+ headers = {"Metadata": "true"}
+ timeout = 2
+
+ def test_basic(
+ self,
+ caplog,
+ mock_readurl,
+ ):
+ content = b"ovf content"
+ mock_readurl.side_effect = [
+ mock.Mock(contents=content),
+ ]
+
+ ovf = imds.fetch_reprovision_data()
+
+ assert ovf == content
+ assert mock_readurl.mock_calls == [
+ mock.call(
+ self.url,
+ timeout=self.timeout,
+ headers=self.headers,
+ exception_cb=mock.ANY,
+ infinite=True,
+ log_req_resp=False,
+ ),
+ ]
+
+ assert caplog.record_tuples == [
+ (
+ "cloudinit.sources.azure.imds",
+ logging.DEBUG,
+ "Polled IMDS 1 time(s)",
+ )
+ ]
+
+ @pytest.mark.parametrize(
+ "error",
+ [
+ fake_http_error_for_code(404),
+ fake_http_error_for_code(410),
+ ],
+ )
+ @pytest.mark.parametrize("failures", [1, 5, 100, 1000])
+ def test_will_retry_errors(
+ self,
+ caplog,
+ mock_requests_session_request,
+ mock_url_helper_time_sleep,
+ error,
+ failures,
+ ):
+ content = b"ovf content"
+ mock_requests_session_request.side_effect = [error] * failures + [
+ mock.Mock(content=content),
+ ]
+
+ ovf = imds.fetch_reprovision_data()
+
+ assert ovf == content
+ assert len(mock_requests_session_request.mock_calls) == failures + 1
+ assert (
+ mock_url_helper_time_sleep.mock_calls == [mock.call(1)] * failures
+ )
+
+ wrapped_error = UrlError(
+ error,
+ code=error.response.status_code,
+ headers=error.response.headers,
+ url=self.url,
+ )
+ backoff_logs = [
+ (
+ "cloudinit.sources.azure.imds",
+ logging.INFO,
+ "Polling IMDS failed with exception: "
+ f"{wrapped_error!r} count: {i}",
+ )
+ for i in range(1, failures + 1)
+ if i == 1 or math.log2(i).is_integer()
+ ]
+ assert caplog.record_tuples == backoff_logs + [
+ (
+ "cloudinit.url_helper",
+ logging.DEBUG,
+ mock.ANY,
+ ),
+ (
+ "cloudinit.sources.azure.imds",
+ logging.DEBUG,
+ f"Polled IMDS {failures+1} time(s)",
+ ),
+ ]
+
+ @pytest.mark.parametrize(
+ "error",
+ [
+ fake_http_error_for_code(404),
+ fake_http_error_for_code(410),
+ ],
+ )
+ @pytest.mark.parametrize("failures", [1, 5, 100, 1000])
+ @pytest.mark.parametrize(
+ "terminal_error",
+ [
+ requests.ConnectionError("Fake connection error"),
+ requests.Timeout("Fake connection timeout"),
+ ],
+ )
+ def test_retry_until_failure(
+ self,
+ caplog,
+ mock_requests_session_request,
+ mock_url_helper_time_sleep,
+ error,
+ failures,
+ terminal_error,
+ ):
+ mock_requests_session_request.side_effect = [error] * failures + [
+ terminal_error
+ ]
+
+ with pytest.raises(UrlError) as exc_info:
+ imds.fetch_reprovision_data()
+
+ assert exc_info.value.cause == terminal_error
+ assert len(mock_requests_session_request.mock_calls) == (failures + 1)
+ assert (
+ mock_url_helper_time_sleep.mock_calls == [mock.call(1)] * failures
+ )
+
+ wrapped_error = UrlError(
+ error,
+ code=error.response.status_code,
+ headers=error.response.headers,
+ url=self.url,
+ )
+
+ backoff_logs = [
+ (
+ "cloudinit.sources.azure.imds",
+ logging.INFO,
+ "Polling IMDS failed with exception: "
+ f"{wrapped_error!r} count: {i}",
+ )
+ for i in range(1, failures + 1)
+ if i == 1 or math.log2(i).is_integer()
+ ]
+ assert caplog.record_tuples == backoff_logs + [
+ (
+ "cloudinit.sources.azure.imds",
+ logging.INFO,
+ "Polling IMDS failed with exception: "
+ f"{exc_info.value!r} count: {failures+1}",
+ ),
+ ]
+
+ @pytest.mark.parametrize(
+ "error",
+ [
+ fake_http_error_for_code(403),
+ fake_http_error_for_code(501),
+ ],
+ )
+ def test_will_not_retry_errors(
+ self,
+ caplog,
+ mock_requests_session_request,
+ mock_url_helper_time_sleep,
+ error,
+ ):
+ fake_md = {"foo": {"bar": []}}
+ mock_requests_session_request.side_effect = [
+ error,
+ mock.Mock(content=json.dumps(fake_md)),
+ ]
+
+ with pytest.raises(UrlError) as exc_info:
+ imds.fetch_reprovision_data()
+
+ assert exc_info.value.cause == error
+ assert len(mock_requests_session_request.mock_calls) == 1
+ assert mock_url_helper_time_sleep.mock_calls == []
+
+ assert caplog.record_tuples == [
+ (
+ "cloudinit.sources.azure.imds",
+ logging.INFO,
+ "Polling IMDS failed with exception: "
+ f"{exc_info.value!r} count: 1",
+ ),
+ ]
diff --git a/tests/unittests/sources/helpers/test_cloudsigma.py b/tests/unittests/sources/helpers/test_cloudsigma.py
deleted file mode 100644
index 3c687388..00000000
--- a/tests/unittests/sources/helpers/test_cloudsigma.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from cloudinit.sources.helpers.cloudsigma import Cepko
-from tests.unittests import helpers as test_helpers
-
-SERVER_CONTEXT = {
- "cpu": 1000,
- "cpus_instead_of_cores": False,
- "global_context": {"some_global_key": "some_global_val"},
- "mem": 1073741824,
- "meta": {"ssh_public_key": "ssh-rsa AAAAB3NzaC1yc2E.../hQ5D5 john@doe"},
- "name": "test_server",
- "requirements": [],
- "smp": 1,
- "tags": ["much server", "very performance"],
- "uuid": "65b2fb23-8c03-4187-a3ba-8b7c919e889",
- "vnc_password": "9e84d6cb49e46379",
-}
-
-
-class CepkoMock(Cepko):
- def all(self):
- return SERVER_CONTEXT
-
- def get(self, key="", request_pattern=None):
- return SERVER_CONTEXT["tags"]
-
-
-# 2015-01-22 BAW: This test is completely useless because it only ever tests
-# the CepkoMock object. Even in its original form, I don't think it ever
-# touched the underlying Cepko class methods.
-class CepkoResultTests(test_helpers.TestCase):
- def setUp(self):
- self.c = Cepko()
- raise test_helpers.SkipTest("This test is completely useless")
-
- def test_getitem(self):
- result = self.c.all()
- self.assertEqual("65b2fb23-8c03-4187-a3ba-8b7c919e889", result["uuid"])
- self.assertEqual([], result["requirements"])
- self.assertEqual("much server", result["tags"][0])
- self.assertEqual(1, result["smp"])
-
- def test_len(self):
- self.assertEqual(len(SERVER_CONTEXT), len(self.c.all()))
-
- def test_contains(self):
- result = self.c.all()
- self.assertTrue("uuid" in result)
- self.assertFalse("uid" in result)
- self.assertTrue("meta" in result)
- self.assertFalse("ssh_public_key" in result)
-
- def test_iter(self):
- self.assertEqual(
- sorted(SERVER_CONTEXT.keys()),
- sorted([key for key in self.c.all()]),
- )
-
- def test_with_list_as_result(self):
- result = self.c.get("tags")
- self.assertEqual("much server", result[0])
- self.assertTrue("very performance" in result)
- self.assertEqual(2, len(result))
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/sources/test_aliyun.py b/tests/unittests/sources/test_aliyun.py
index fe4e54b5..f95923a4 100644
--- a/tests/unittests/sources/test_aliyun.py
+++ b/tests/unittests/sources/test_aliyun.py
@@ -98,6 +98,15 @@ class TestAliYunDatasource(test_helpers.ResponsesTestCase):
"instance-identity",
)
+ @property
+ def token_url(self):
+ return os.path.join(
+ self.metadata_address,
+ "latest",
+ "api",
+ "token",
+ )
+
def register_mock_metaserver(self, base_url, data):
def register_helper(register, base_url, body):
if isinstance(body, str):
@@ -127,6 +136,7 @@ class TestAliYunDatasource(test_helpers.ResponsesTestCase):
self.register_mock_metaserver(self.metadata_url, self.default_metadata)
self.register_mock_metaserver(self.userdata_url, self.default_userdata)
self.register_mock_metaserver(self.identity_url, self.default_identity)
+ self.responses.add(responses.PUT, self.token_url, "API-TOKEN")
def _test_get_data(self):
self.assertEqual(self.ds.metadata, self.default_metadata)
@@ -151,8 +161,9 @@ class TestAliYunDatasource(test_helpers.ResponsesTestCase):
self.default_metadata["hostname"], self.ds.get_hostname().hostname
)
+ @mock.patch("cloudinit.sources.DataSourceEc2.util.is_resolvable")
@mock.patch("cloudinit.sources.DataSourceAliYun._is_aliyun")
- def test_with_mock_server(self, m_is_aliyun):
+ def test_with_mock_server(self, m_is_aliyun, m_resolv):
m_is_aliyun.return_value = True
self.regist_default_server()
ret = self.ds.get_data()
diff --git a/tests/unittests/sources/test_azure.py b/tests/unittests/sources/test_azure.py
index 24fa061c..b5fe2672 100644
--- a/tests/unittests/sources/test_azure.py
+++ b/tests/unittests/sources/test_azure.py
@@ -3,7 +3,6 @@
import copy
import crypt
import json
-import logging
import os
import stat
import xml.etree.ElementTree as ET
@@ -11,13 +10,13 @@ from pathlib import Path
import pytest
import requests
-import responses
from cloudinit import distros, helpers, subp, url_helper
from cloudinit.net import dhcp
from cloudinit.sources import UNSET
from cloudinit.sources import DataSourceAzure as dsaz
from cloudinit.sources import InvalidMetaDataException
+from cloudinit.sources.azure import imds
from cloudinit.sources.helpers import netlink
from cloudinit.util import (
MountFailedError,
@@ -27,11 +26,9 @@ from cloudinit.util import (
load_json,
write_file,
)
-from cloudinit.version import version_string as vs
from tests.unittests.helpers import (
CiTestCase,
ExitStack,
- ResponsesTestCase,
mock,
populate_dir,
resourceLocation,
@@ -93,16 +90,6 @@ def mock_chassis_asset_tag():
@pytest.fixture
-def mock_device_driver():
- with mock.patch(
- MOCKPATH + "device_driver",
- autospec=True,
- return_value=None,
- ) as m:
- yield m
-
-
-@pytest.fixture
def mock_generate_fallback_config():
with mock.patch(
MOCKPATH + "net.generate_fallback_config",
@@ -173,9 +160,19 @@ def mock_net_dhcp_EphemeralIPv4Network():
yield m
-@pytest.fixture
+@pytest.fixture(autouse=True)
def mock_get_interfaces():
- with mock.patch(MOCKPATH + "net.get_interfaces", return_value=[]) as m:
+ with mock.patch(
+ MOCKPATH + "net.get_interfaces",
+ return_value=[
+ ("dummy0", "9e:65:d6:19:19:01", None, None),
+ ("enP3", "00:11:22:33:44:02", "unknown_accel", "0x3"),
+ ("eth0", "00:11:22:33:44:00", "hv_netvsc", "0x3"),
+ ("eth2", "00:11:22:33:44:01", "unknown", "0x3"),
+ ("eth3", "00:11:22:33:44:02", "unknown_with_unknown_vf", "0x3"),
+ ("lo", "00:00:00:00:00:00", None, None),
+ ],
+ ) as m:
yield m
@@ -205,7 +202,7 @@ def mock_os_path_isfile():
@pytest.fixture
def mock_readurl():
- with mock.patch(MOCKPATH + "readurl", autospec=True) as m:
+ with mock.patch(MOCKPATH + "imds.readurl", autospec=True) as m:
yield m
@@ -216,12 +213,6 @@ def mock_report_diagnostic_event():
@pytest.fixture
-def mock_requests_session_request():
- with mock.patch("requests.Session.request", autospec=True) as m:
- yield m
-
-
-@pytest.fixture
def mock_sleep():
with mock.patch(
MOCKPATH + "sleep",
@@ -237,12 +228,6 @@ def mock_subp_subp():
@pytest.fixture
-def mock_url_helper_time_sleep():
- with mock.patch("cloudinit.url_helper.time.sleep", autospec=True) as m:
- yield m
-
-
-@pytest.fixture
def mock_util_ensure_dir():
with mock.patch(
MOCKPATH + "util.ensure_dir",
@@ -507,6 +492,116 @@ class TestGenerateNetworkConfig:
},
),
(
+ "hv_netvsc driver",
+ {
+ "interface": [
+ {
+ "macAddress": "001122334400",
+ "ipv6": {"ipAddress": []},
+ "ipv4": {
+ "subnet": [
+ {"prefix": "24", "address": "10.0.0.0"}
+ ],
+ "ipAddress": [
+ {
+ "privateIpAddress": "10.0.0.4",
+ "publicIpAddress": "104.46.124.81",
+ }
+ ],
+ },
+ }
+ ]
+ },
+ {
+ "ethernets": {
+ "eth0": {
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": False,
+ "match": {
+ "macaddress": "00:11:22:33:44:00",
+ "driver": "hv_netvsc",
+ },
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ },
+ ),
+ (
+ "unknown",
+ {
+ "interface": [
+ {
+ "macAddress": "001122334401",
+ "ipv6": {"ipAddress": []},
+ "ipv4": {
+ "subnet": [
+ {"prefix": "24", "address": "10.0.0.0"}
+ ],
+ "ipAddress": [
+ {
+ "privateIpAddress": "10.0.0.4",
+ "publicIpAddress": "104.46.124.81",
+ }
+ ],
+ },
+ }
+ ]
+ },
+ {
+ "ethernets": {
+ "eth0": {
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": False,
+ "match": {
+ "macaddress": "00:11:22:33:44:01",
+ "driver": "unknown",
+ },
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ },
+ ),
+ (
+ "unknown with unknown matching VF",
+ {
+ "interface": [
+ {
+ "macAddress": "001122334402",
+ "ipv6": {"ipAddress": []},
+ "ipv4": {
+ "subnet": [
+ {"prefix": "24", "address": "10.0.0.0"}
+ ],
+ "ipAddress": [
+ {
+ "privateIpAddress": "10.0.0.4",
+ "publicIpAddress": "104.46.124.81",
+ }
+ ],
+ },
+ }
+ ]
+ },
+ {
+ "ethernets": {
+ "eth0": {
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": False,
+ "match": {
+ "macaddress": "00:11:22:33:44:02",
+ },
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ },
+ ),
+ (
"multiple interfaces with increasing route metric",
{
"interface": [
@@ -648,7 +743,7 @@ class TestGenerateNetworkConfig:
],
)
def test_parsing_scenarios(
- self, label, mock_device_driver, metadata, expected
+ self, label, mock_get_interfaces, metadata, expected
):
assert (
dsaz.generate_network_config_from_instance_network_metadata(
@@ -657,27 +752,6 @@ class TestGenerateNetworkConfig:
== expected
)
- def test_match_hv_netvsc(self, mock_device_driver):
- mock_device_driver.return_value = "hv_netvsc"
-
- assert dsaz.generate_network_config_from_instance_network_metadata(
- NETWORK_METADATA["network"]
- ) == {
- "ethernets": {
- "eth0": {
- "dhcp4": True,
- "dhcp4-overrides": {"route-metric": 100},
- "dhcp6": False,
- "match": {
- "macaddress": "00:0d:3a:04:75:98",
- "driver": "hv_netvsc",
- },
- "set-name": "eth0",
- }
- },
- "version": 2,
- }
-
class TestNetworkConfig:
fallback_config = {
@@ -693,7 +767,9 @@ class TestNetworkConfig:
],
}
- def test_single_ipv4_nic_configuration(self, azure_ds, mock_device_driver):
+ def test_single_ipv4_nic_configuration(
+ self, azure_ds, mock_get_interfaces
+ ):
"""Network config emits dhcp on single nic with ipv4"""
expected = {
"ethernets": {
@@ -712,7 +788,7 @@ class TestNetworkConfig:
assert azure_ds.network_config == expected
def test_uses_fallback_cfg_when_apply_network_config_is_false(
- self, azure_ds, mock_device_driver, mock_generate_fallback_config
+ self, azure_ds, mock_generate_fallback_config
):
azure_ds.ds_cfg["apply_network_config"] = False
azure_ds._metadata_imds = NETWORK_METADATA
@@ -721,7 +797,7 @@ class TestNetworkConfig:
assert azure_ds.network_config == self.fallback_config
def test_uses_fallback_cfg_when_imds_metadata_unset(
- self, azure_ds, mock_device_driver, mock_generate_fallback_config
+ self, azure_ds, mock_generate_fallback_config
):
azure_ds._metadata_imds = UNSET
mock_generate_fallback_config.return_value = self.fallback_config
@@ -729,7 +805,7 @@ class TestNetworkConfig:
assert azure_ds.network_config == self.fallback_config
def test_uses_fallback_cfg_when_no_network_metadata(
- self, azure_ds, mock_device_driver, mock_generate_fallback_config
+ self, azure_ds, mock_generate_fallback_config
):
"""Network config generates fallback network config when the
IMDS instance metadata is corrupted/invalid, such as when
@@ -745,7 +821,7 @@ class TestNetworkConfig:
assert azure_ds.network_config == self.fallback_config
def test_uses_fallback_cfg_when_no_interface_metadata(
- self, azure_ds, mock_device_driver, mock_generate_fallback_config
+ self, azure_ds, mock_generate_fallback_config
):
"""Network config generates fallback network config when the
IMDS instance metadata is corrupted/invalid, such as when
@@ -761,176 +837,6 @@ class TestNetworkConfig:
assert azure_ds.network_config == self.fallback_config
-class TestGetMetadataFromIMDS(ResponsesTestCase):
-
- with_logs = True
-
- def setUp(self):
- super(TestGetMetadataFromIMDS, self).setUp()
- self.network_md_url = "{}/instance?api-version=2019-06-01".format(
- dsaz.IMDS_URL
- )
-
- @mock.patch(MOCKPATH + "readurl", autospec=True)
- def test_get_metadata_uses_instance_url(self, m_readurl):
- """Make sure readurl is called with the correct url when accessing
- metadata"""
- m_readurl.return_value = url_helper.StringResponse(
- json.dumps(IMDS_NETWORK_METADATA).encode("utf-8")
- )
-
- dsaz.get_metadata_from_imds(retries=3, md_type=dsaz.MetadataType.ALL)
- m_readurl.assert_called_with(
- "http://169.254.169.254/metadata/instance?api-version=2019-06-01",
- exception_cb=mock.ANY,
- headers=mock.ANY,
- retries=mock.ANY,
- timeout=mock.ANY,
- infinite=False,
- )
-
- @mock.patch(MOCKPATH + "readurl", autospec=True)
- def test_get_network_metadata_uses_network_url(self, m_readurl):
- """Make sure readurl is called with the correct url when accessing
- network metadata"""
- m_readurl.return_value = url_helper.StringResponse(
- json.dumps(IMDS_NETWORK_METADATA).encode("utf-8")
- )
-
- dsaz.get_metadata_from_imds(
- retries=3, md_type=dsaz.MetadataType.NETWORK
- )
- m_readurl.assert_called_with(
- "http://169.254.169.254/metadata/instance/network?api-version="
- "2019-06-01",
- exception_cb=mock.ANY,
- headers=mock.ANY,
- retries=mock.ANY,
- timeout=mock.ANY,
- infinite=False,
- )
-
- @mock.patch(MOCKPATH + "readurl", autospec=True)
- @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True)
- def test_get_default_metadata_uses_instance_url(self, m_dhcp, m_readurl):
- """Make sure readurl is called with the correct url when accessing
- metadata"""
- m_readurl.return_value = url_helper.StringResponse(
- json.dumps(IMDS_NETWORK_METADATA).encode("utf-8")
- )
-
- dsaz.get_metadata_from_imds(retries=3)
- m_readurl.assert_called_with(
- "http://169.254.169.254/metadata/instance?api-version=2019-06-01",
- exception_cb=mock.ANY,
- headers=mock.ANY,
- retries=mock.ANY,
- timeout=mock.ANY,
- infinite=False,
- )
-
- @mock.patch(MOCKPATH + "readurl", autospec=True)
- def test_get_metadata_uses_extended_url(self, m_readurl):
- """Make sure readurl is called with the correct url when accessing
- metadata"""
- m_readurl.return_value = url_helper.StringResponse(
- json.dumps(IMDS_NETWORK_METADATA).encode("utf-8")
- )
-
- dsaz.get_metadata_from_imds(
- retries=3,
- md_type=dsaz.MetadataType.ALL,
- api_version="2021-08-01",
- )
- m_readurl.assert_called_with(
- "http://169.254.169.254/metadata/instance?api-version="
- "2021-08-01&extended=true",
- exception_cb=mock.ANY,
- headers=mock.ANY,
- retries=mock.ANY,
- timeout=mock.ANY,
- infinite=False,
- )
-
- @mock.patch(MOCKPATH + "readurl", autospec=True)
- def test_get_metadata_performs_dhcp_when_network_is_down(self, m_readurl):
- """Perform DHCP setup when nic is not up."""
- m_readurl.return_value = url_helper.StringResponse(
- json.dumps(NETWORK_METADATA).encode("utf-8")
- )
-
- self.assertEqual(
- NETWORK_METADATA, dsaz.get_metadata_from_imds(retries=2)
- )
-
- self.assertIn(
- "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
- self.logs.getvalue(),
- )
-
- m_readurl.assert_called_with(
- self.network_md_url,
- exception_cb=mock.ANY,
- headers={"Metadata": "true"},
- retries=2,
- timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
- infinite=False,
- )
-
- @mock.patch("cloudinit.url_helper.time.sleep")
- def test_get_metadata_from_imds_empty_when_no_imds_present(self, m_sleep):
- """Return empty dict when IMDS network metadata is absent."""
- # Workaround https://github.com/getsentry/responses/pull/166
- # url path can be reverted to "/instance?api-version=2019-12-01"
- response = requests.Response()
- response.status_code = 404
- self.responses.add(
- responses.GET,
- dsaz.IMDS_URL + "/instance",
- body=requests.HTTPError("...", response=response),
- status=404,
- )
-
- self.assertEqual(
- {},
- dsaz.get_metadata_from_imds(retries=2, api_version="2019-12-01"),
- )
-
- self.assertEqual([mock.call(1), mock.call(1)], m_sleep.call_args_list)
- self.assertIn(
- "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
- self.logs.getvalue(),
- )
-
- @mock.patch("requests.Session.request")
- @mock.patch("cloudinit.url_helper.time.sleep")
- def test_get_metadata_from_imds_retries_on_timeout(
- self, m_sleep, m_request
- ):
- """Retry IMDS network metadata on timeout errors."""
-
- self.attempt = 0
- m_request.side_effect = requests.Timeout("Fake Connection Timeout")
-
- def retry_callback(request, uri, headers):
- self.attempt += 1
- raise requests.Timeout("Fake connection timeout")
-
- self.responses.add(
- responses.GET,
- dsaz.IMDS_URL + "instance?api-version=2017-12-01",
- body=retry_callback,
- )
-
- self.assertEqual({}, dsaz.get_metadata_from_imds(retries=3))
-
- self.assertEqual([mock.call(1)] * 3, m_sleep.call_args_list)
- self.assertIn(
- "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
- self.logs.getvalue(),
- )
-
-
class TestAzureDataSource(CiTestCase):
with_logs = True
@@ -962,10 +868,10 @@ class TestAzureDataSource(CiTestCase):
self.m_dhcp.return_value.lease = {}
self.m_dhcp.return_value.iface = "eth4"
- self.m_get_metadata_from_imds = self.patches.enter_context(
+ self.m_fetch = self.patches.enter_context(
mock.patch.object(
- dsaz,
- "get_metadata_from_imds",
+ dsaz.imds,
+ "fetch_metadata_with_api_fallback",
mock.MagicMock(return_value=NETWORK_METADATA),
)
)
@@ -1069,13 +975,6 @@ scbus-1 on xpt0 bus 0
self.m_get_metadata_from_fabric = mock.MagicMock(return_value=[])
self.m_report_failure_to_fabric = mock.MagicMock(autospec=True)
- self.m_get_interfaces = mock.MagicMock(
- return_value=[
- ("dummy0", "9e:65:d6:19:19:01", None, None),
- ("eth0", "00:15:5d:69:63:ba", "hv_netvsc", "0x3"),
- ("lo", "00:00:00:00:00:00", None, None),
- ]
- )
self.m_list_possible_azure_ds = mock.MagicMock(
side_effect=_load_possible_azure_ds
)
@@ -1119,11 +1018,6 @@ scbus-1 on xpt0 bus 0
"get_interface_mac",
mock.MagicMock(return_value="00:15:5d:69:63:ba"),
),
- (
- dsaz.net,
- "get_interfaces",
- self.m_get_interfaces,
- ),
(dsaz.subp, "which", lambda x: True),
(
dsaz.dmi,
@@ -1238,7 +1132,7 @@ scbus-1 on xpt0 bus 0
self.assertEqual(1, m_crawl_metadata.call_count)
self.assertFalse(ret)
- def test_crawl_metadata_exception_should_report_failure_with_msg(self):
+ def test_crawl_metadata_exception_should_report_failure(self):
data = {}
dsrc = self._get_ds(data)
with mock.patch.object(
@@ -1249,9 +1143,7 @@ scbus-1 on xpt0 bus 0
m_crawl_metadata.side_effect = Exception
dsrc.get_data()
self.assertEqual(1, m_crawl_metadata.call_count)
- m_report_failure.assert_called_once_with(
- description=dsaz.DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE
- )
+ m_report_failure.assert_called_once_with()
def test_crawl_metadata_exc_should_log_could_not_crawl_msg(self):
data = {}
@@ -1292,7 +1184,7 @@ scbus-1 on xpt0 bus 0
data, write_ovf_to_data_dir=True, write_ovf_to_seed_dir=False
)
- self.m_get_metadata_from_imds.return_value = {}
+ self.m_fetch.return_value = {}
with mock.patch(MOCKPATH + "util.mount_cb") as m_mount_cb:
m_mount_cb.side_effect = [
MountFailedError("fail"),
@@ -1429,7 +1321,7 @@ scbus-1 on xpt0 bus 0
data = {"ovfcontent": ovfenv, "sys_cfg": {}}
dsrc = self._get_ds(data)
dsrc.crawl_metadata()
- self.assertEqual(1, self.m_get_metadata_from_imds.call_count)
+ self.assertEqual(1, self.m_fetch.call_count)
@mock.patch("cloudinit.sources.DataSourceAzure.util.write_file")
@mock.patch(
@@ -1446,7 +1338,7 @@ scbus-1 on xpt0 bus 0
dsrc = self._get_ds(data)
poll_imds_func.return_value = ovfenv
dsrc.crawl_metadata()
- self.assertEqual(2, self.m_get_metadata_from_imds.call_count)
+ self.assertEqual(2, self.m_fetch.call_count)
@mock.patch("cloudinit.sources.DataSourceAzure.util.write_file")
@mock.patch(
@@ -1497,9 +1389,11 @@ scbus-1 on xpt0 bus 0
"cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready",
return_value=True,
)
- @mock.patch("cloudinit.sources.DataSourceAzure.readurl")
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.imds.fetch_reprovision_data"
+ )
def test_crawl_metadata_on_reprovision_reports_ready_using_lease(
- self, m_readurl, m_report_ready, m_media_switch, m_write
+ self, m_fetch_reprovision_data, m_report_ready, m_media_switch, m_write
):
"""If reprovisioning, report ready using the obtained lease"""
ovfenv = construct_ovf_env(preprovisioned_vm=True)
@@ -1518,8 +1412,8 @@ scbus-1 on xpt0 bus 0
m_media_switch.return_value = None
reprovision_ovfenv = construct_ovf_env()
- m_readurl.return_value = url_helper.StringResponse(
- reprovision_ovfenv.encode("utf-8")
+ m_fetch_reprovision_data.return_value = reprovision_ovfenv.encode(
+ "utf-8"
)
dsrc.crawl_metadata()
@@ -1537,10 +1431,7 @@ scbus-1 on xpt0 bus 0
self.assertTrue(os.path.isdir(self.waagent_d))
self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700)
- @mock.patch(
- "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
- )
- def test_network_config_set_from_imds(self, m_driver):
+ def test_network_config_set_from_imds(self):
"""Datasource.network_config returns IMDS network data."""
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
@@ -1563,12 +1454,7 @@ scbus-1 on xpt0 bus 0
dsrc.get_data()
self.assertEqual(expected_network_config, dsrc.network_config)
- @mock.patch(
- "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
- )
- def test_network_config_set_from_imds_route_metric_for_secondary_nic(
- self, m_driver
- ):
+ def test_network_config_set_from_imds_route_metric_for_secondary_nic(self):
"""Datasource.network_config adds route-metric to secondary nics."""
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
@@ -1609,17 +1495,12 @@ scbus-1 on xpt0 bus 0
third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6"
imds_data["network"]["interface"].append(third_intf)
- self.m_get_metadata_from_imds.return_value = imds_data
+ self.m_fetch.return_value = imds_data
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(expected_network_config, dsrc.network_config)
- @mock.patch(
- "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
- )
- def test_network_config_set_from_imds_for_secondary_nic_no_ip(
- self, m_driver
- ):
+ def test_network_config_set_from_imds_for_secondary_nic_no_ip(self):
"""If an IP address is empty then there should no config for it."""
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
@@ -1640,7 +1521,7 @@ scbus-1 on xpt0 bus 0
}
imds_data = copy.deepcopy(NETWORK_METADATA)
imds_data["network"]["interface"].append(SECONDARY_INTERFACE_NO_IP)
- self.m_get_metadata_from_imds.return_value = imds_data
+ self.m_fetch.return_value = imds_data
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(expected_network_config, dsrc.network_config)
@@ -1999,28 +1880,15 @@ scbus-1 on xpt0 bus 0
self.assertFalse(dsrc._report_failure())
self.assertEqual(2, self.m_report_failure_to_fabric.call_count)
- def test_dsaz_report_failure_description_msg(self):
+ def test_dsaz_report_failure(self):
dsrc = self._get_ds({"ovfcontent": construct_ovf_env()})
with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata:
- # mock crawl metadata failure to cause report failure
m_crawl_metadata.side_effect = Exception
- test_msg = "Test report failure description message"
- self.assertTrue(dsrc._report_failure(description=test_msg))
- self.m_report_failure_to_fabric.assert_called_once_with(
- endpoint="168.63.129.16", description=test_msg
- )
-
- def test_dsaz_report_failure_no_description_msg(self):
- dsrc = self._get_ds({"ovfcontent": construct_ovf_env()})
-
- with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata:
- m_crawl_metadata.side_effect = Exception
-
- self.assertTrue(dsrc._report_failure()) # no description msg
+ self.assertTrue(dsrc._report_failure())
self.m_report_failure_to_fabric.assert_called_once_with(
- endpoint="168.63.129.16", description=None
+ endpoint="168.63.129.16"
)
def test_dsaz_report_failure_uses_cached_ephemeral_dhcp_ctx_lease(self):
@@ -2038,7 +1906,7 @@ scbus-1 on xpt0 bus 0
# ensure called with cached ephemeral dhcp lease option 245
self.m_report_failure_to_fabric.assert_called_once_with(
- endpoint="test-ep", description=mock.ANY
+ endpoint="test-ep"
)
def test_dsaz_report_failure_no_net_uses_new_ephemeral_dhcp_lease(self):
@@ -2060,7 +1928,7 @@ scbus-1 on xpt0 bus 0
# ensure called with the newly discovered
# ephemeral dhcp lease option 245
self.m_report_failure_to_fabric.assert_called_once_with(
- endpoint="1.2.3.4", description=mock.ANY
+ endpoint="1.2.3.4"
)
def test_exception_fetching_fabric_data_doesnt_propagate(self):
@@ -2157,7 +2025,7 @@ scbus-1 on xpt0 bus 0
[mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list
)
- @mock.patch(MOCKPATH + "net.get_interfaces", autospec=True)
+ @mock.patch(MOCKPATH + "net.get_interfaces")
def test_blacklist_through_distro(self, m_net_get_interfaces):
"""Verify Azure DS updates blacklist drivers in the distro's
networking object."""
@@ -2175,7 +2043,7 @@ scbus-1 on xpt0 bus 0
)
distro.networking.get_interfaces_by_mac()
- self.m_get_interfaces.assert_called_with(
+ m_net_get_interfaces.assert_called_with(
blacklist_drivers=dsaz.BLACKLIST_DRIVERS
)
@@ -2210,13 +2078,12 @@ scbus-1 on xpt0 bus 0
@mock.patch(
"cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates"
)
- @mock.patch(MOCKPATH + "get_metadata_from_imds")
def test_get_public_ssh_keys_with_no_openssh_format(
- self, m_get_metadata_from_imds, m_parse_certificates
+ self, m_parse_certificates
):
imds_data = copy.deepcopy(NETWORK_METADATA)
imds_data["compute"]["publicKeys"][0]["keyData"] = "no-openssh-format"
- m_get_metadata_from_imds.return_value = imds_data
+ self.m_fetch.return_value = imds_data
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
"ovfcontent": construct_ovf_env(),
@@ -2229,9 +2096,8 @@ scbus-1 on xpt0 bus 0
self.assertEqual(ssh_keys, [])
self.assertEqual(m_parse_certificates.call_count, 0)
- @mock.patch(MOCKPATH + "get_metadata_from_imds")
- def test_get_public_ssh_keys_without_imds(self, m_get_metadata_from_imds):
- m_get_metadata_from_imds.return_value = dict()
+ def test_get_public_ssh_keys_without_imds(self):
+ self.m_fetch.return_value = dict()
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
"ovfcontent": construct_ovf_env(),
@@ -2244,67 +2110,7 @@ scbus-1 on xpt0 bus 0
ssh_keys = dsrc.get_public_ssh_keys()
self.assertEqual(ssh_keys, ["key2"])
- @mock.patch(MOCKPATH + "get_metadata_from_imds")
- def test_imds_api_version_wanted_nonexistent(
- self, m_get_metadata_from_imds
- ):
- def get_metadata_from_imds_side_eff(*args, **kwargs):
- if kwargs["api_version"] == dsaz.IMDS_VER_WANT:
- raise url_helper.UrlError("No IMDS version", code=400)
- return NETWORK_METADATA
-
- m_get_metadata_from_imds.side_effect = get_metadata_from_imds_side_eff
- sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
- data = {
- "ovfcontent": construct_ovf_env(),
- "sys_cfg": sys_cfg,
- }
- dsrc = self._get_ds(data)
- dsrc.get_data()
- self.assertIsNotNone(dsrc.metadata)
-
- assert m_get_metadata_from_imds.mock_calls == [
- mock.call(
- retries=10,
- md_type=dsaz.MetadataType.ALL,
- api_version="2021-08-01",
- exc_cb=mock.ANY,
- infinite=False,
- ),
- mock.call(
- retries=10,
- md_type=dsaz.MetadataType.ALL,
- api_version="2019-06-01",
- exc_cb=mock.ANY,
- infinite=False,
- ),
- ]
-
- @mock.patch(
- MOCKPATH + "get_metadata_from_imds", return_value=NETWORK_METADATA
- )
- def test_imds_api_version_wanted_exists(self, m_get_metadata_from_imds):
- sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
- data = {
- "ovfcontent": construct_ovf_env(),
- "sys_cfg": sys_cfg,
- }
- dsrc = self._get_ds(data)
- dsrc.get_data()
- self.assertIsNotNone(dsrc.metadata)
-
- assert m_get_metadata_from_imds.mock_calls == [
- mock.call(
- retries=10,
- md_type=dsaz.MetadataType.ALL,
- api_version="2021-08-01",
- exc_cb=mock.ANY,
- infinite=False,
- )
- ]
-
- @mock.patch(MOCKPATH + "get_metadata_from_imds")
- def test_hostname_from_imds(self, m_get_metadata_from_imds):
+ def test_hostname_from_imds(self):
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
"ovfcontent": construct_ovf_env(),
@@ -2316,13 +2122,12 @@ scbus-1 on xpt0 bus 0
computerName="hostname1",
disablePasswordAuthentication="true",
)
- m_get_metadata_from_imds.return_value = imds_data_with_os_profile
+ self.m_fetch.return_value = imds_data_with_os_profile
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(dsrc.metadata["local-hostname"], "hostname1")
- @mock.patch(MOCKPATH + "get_metadata_from_imds")
- def test_username_from_imds(self, m_get_metadata_from_imds):
+ def test_username_from_imds(self):
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
"ovfcontent": construct_ovf_env(),
@@ -2334,15 +2139,14 @@ scbus-1 on xpt0 bus 0
computerName="hostname1",
disablePasswordAuthentication="true",
)
- m_get_metadata_from_imds.return_value = imds_data_with_os_profile
+ self.m_fetch.return_value = imds_data_with_os_profile
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(
dsrc.cfg["system_info"]["default_user"]["name"], "username1"
)
- @mock.patch(MOCKPATH + "get_metadata_from_imds")
- def test_disable_password_from_imds(self, m_get_metadata_from_imds):
+ def test_disable_password_from_imds(self):
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
"ovfcontent": construct_ovf_env(),
@@ -2354,13 +2158,12 @@ scbus-1 on xpt0 bus 0
computerName="hostname1",
disablePasswordAuthentication="true",
)
- m_get_metadata_from_imds.return_value = imds_data_with_os_profile
+ self.m_fetch.return_value = imds_data_with_os_profile
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertTrue(dsrc.metadata["disable_password"])
- @mock.patch(MOCKPATH + "get_metadata_from_imds")
- def test_userdata_from_imds(self, m_get_metadata_from_imds):
+ def test_userdata_from_imds(self):
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
"ovfcontent": construct_ovf_env(),
@@ -2374,16 +2177,13 @@ scbus-1 on xpt0 bus 0
disablePasswordAuthentication="true",
)
imds_data["compute"]["userData"] = b64e(userdata)
- m_get_metadata_from_imds.return_value = imds_data
+ self.m_fetch.return_value = imds_data
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(dsrc.userdata_raw, userdata.encode("utf-8"))
- @mock.patch(MOCKPATH + "get_metadata_from_imds")
- def test_userdata_from_imds_with_customdata_from_OVF(
- self, m_get_metadata_from_imds
- ):
+ def test_userdata_from_imds_with_customdata_from_OVF(self):
userdataOVF = "userdataOVF"
sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
@@ -2399,7 +2199,7 @@ scbus-1 on xpt0 bus 0
disablePasswordAuthentication="true",
)
imds_data["compute"]["userData"] = b64e(userdataImds)
- m_get_metadata_from_imds.return_value = imds_data
+ self.m_fetch.return_value = imds_data
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
@@ -2995,7 +2795,7 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
@mock.patch(MOCKPATH + "DataSourceAzure._report_ready")
@mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up")
@mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event")
- @mock.patch(MOCKPATH + "DataSourceAzure.get_imds_data_with_api_fallback")
+ @mock.patch(MOCKPATH + "imds.fetch_metadata_with_api_fallback")
@mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True)
@mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach")
@mock.patch("os.path.isfile")
@@ -3175,7 +2975,7 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
@mock.patch(
"cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect"
)
-@mock.patch("requests.Session.request")
+@mock.patch(MOCKPATH + "imds.fetch_reprovision_data")
@mock.patch(MOCKPATH + "DataSourceAzure._report_ready", return_value=True)
class TestPreprovisioningPollIMDS(CiTestCase):
def setUp(self):
@@ -3189,13 +2989,17 @@ class TestPreprovisioningPollIMDS(CiTestCase):
def test_poll_imds_re_dhcp_on_timeout(
self,
m_report_ready,
- m_request,
+ m_fetch_reprovisiondata,
m_media_switch,
m_dhcp,
m_net,
m_fallback,
):
"""The poll_imds will retry DHCP on IMDS timeout."""
+ m_fetch_reprovisiondata.side_effect = [
+ url_helper.UrlError(requests.Timeout("Fake connection timeout")),
+ b"ovf data",
+ ]
report_file = self.tmp_path("report_marker", self.tmp)
lease = {
"interface": "eth9",
@@ -3209,23 +3013,6 @@ class TestPreprovisioningPollIMDS(CiTestCase):
dhcp_ctx = mock.MagicMock(lease=lease)
dhcp_ctx.obtain_lease.return_value = lease
- self.tries = 0
-
- def fake_timeout_once(**kwargs):
- self.tries += 1
- if self.tries == 1:
- raise requests.Timeout("Fake connection timeout")
- elif self.tries in (2, 3):
- response = requests.Response()
- response.status_code = 404 if self.tries == 2 else 410
- raise requests.exceptions.HTTPError(
- "fake {}".format(response.status_code), response=response
- )
- # Third try should succeed and stop retries or redhcp
- return mock.MagicMock(status_code=200, text="good", content="good")
-
- m_request.side_effect = fake_timeout_once
-
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
with mock.patch.object(
dsa, "_reported_ready_marker_file", report_file
@@ -3235,14 +3022,14 @@ class TestPreprovisioningPollIMDS(CiTestCase):
assert m_report_ready.mock_calls == [mock.call()]
self.assertEqual(3, m_dhcp.call_count, "Expected 3 DHCP calls")
- self.assertEqual(4, self.tries, "Expected 4 total reads from IMDS")
+ assert m_fetch_reprovisiondata.call_count == 2
@mock.patch("os.path.isfile")
def test_poll_imds_skips_dhcp_if_ctx_present(
self,
m_isfile,
report_ready_func,
- fake_resp,
+ m_fetch_reprovisiondata,
m_media_switch,
m_dhcp,
m_net,
@@ -3271,7 +3058,7 @@ class TestPreprovisioningPollIMDS(CiTestCase):
m_ephemeral_dhcpv4,
m_isfile,
report_ready_func,
- m_request,
+ m_fetch_reprovisiondata,
m_media_switch,
m_dhcp,
m_net,
@@ -3282,17 +3069,15 @@ class TestPreprovisioningPollIMDS(CiTestCase):
polling for reprovisiondata. Note that if this ctx is set when
_poll_imds is called, then it is not expected to be waiting for
media_disconnect_connect either."""
-
- tries = 0
-
- def fake_timeout_once(**kwargs):
- nonlocal tries
- tries += 1
- if tries == 1:
- raise requests.Timeout("Fake connection timeout")
- return mock.MagicMock(status_code=200, text="good", content="good")
-
- m_request.side_effect = fake_timeout_once
+ m_fetch_reprovisiondata.side_effect = [
+ url_helper.UrlError(
+ requests.ConnectionError(
+ "Failed to establish a new connection: "
+ "[Errno 101] Network is unreachable"
+ )
+ ),
+ b"ovf data",
+ ]
report_file = self.tmp_path("report_marker", self.tmp)
m_isfile.return_value = True
distro = mock.MagicMock()
@@ -3307,12 +3092,12 @@ class TestPreprovisioningPollIMDS(CiTestCase):
self.assertEqual(1, m_dhcp_ctx.clean_network.call_count)
self.assertEqual(1, m_ephemeral_dhcpv4.call_count)
self.assertEqual(0, m_media_switch.call_count)
- self.assertEqual(2, m_request.call_count)
+ self.assertEqual(2, m_fetch_reprovisiondata.call_count)
def test_does_not_poll_imds_report_ready_when_marker_file_exists(
self,
m_report_ready,
- m_request,
+ m_fetch_reprovisiondata,
m_media_switch,
m_dhcp,
m_net,
@@ -3339,10 +3124,12 @@ class TestPreprovisioningPollIMDS(CiTestCase):
dsa._poll_imds()
self.assertEqual(m_report_ready.call_count, 0)
+ @mock.patch(MOCKPATH + "imds.fetch_metadata_with_api_fallback")
def test_poll_imds_report_ready_success_writes_marker_file(
self,
+ m_fetch,
m_report_ready,
- m_request,
+ m_fetch_reprovisiondata,
m_media_switch,
m_dhcp,
m_net,
@@ -3375,7 +3162,7 @@ class TestPreprovisioningPollIMDS(CiTestCase):
def test_poll_imds_report_ready_failure_raises_exc_and_doesnt_write_marker(
self,
m_report_ready,
- m_request,
+ m_fetch_reprovisiondata,
m_media_switch,
m_dhcp,
m_net,
@@ -3415,7 +3202,9 @@ class TestPreprovisioningPollIMDS(CiTestCase):
)
@mock.patch("cloudinit.net.ephemeral.EphemeralIPv4Network", autospec=True)
@mock.patch("cloudinit.net.ephemeral.maybe_perform_dhcp_discovery")
-@mock.patch("requests.Session.request")
+@mock.patch(
+ MOCKPATH + "imds.fetch_reprovision_data", side_effect=[b"ovf data"]
+)
class TestAzureDataSourcePreprovisioning(CiTestCase):
def setUp(self):
super(TestAzureDataSourcePreprovisioning, self).setUp()
@@ -3425,7 +3214,7 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d
def test_poll_imds_returns_ovf_env(
- self, m_request, m_dhcp, m_net, m_media_switch
+ self, m_fetch_reprovisiondata, m_dhcp, m_net, m_media_switch
):
"""The _poll_imds method should return the ovf_env.xml."""
m_media_switch.return_value = None
@@ -3437,30 +3226,8 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
"subnet-mask": "255.255.255.0",
}
]
- url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01"
- host = "169.254.169.254"
- full_url = url.format(host)
- m_request.return_value = mock.MagicMock(
- status_code=200, text="ovf", content="ovf"
- )
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
self.assertTrue(len(dsa._poll_imds()) > 0)
- self.assertEqual(
- m_request.call_args_list,
- [
- mock.call(
- allow_redirects=True,
- headers={
- "Metadata": "true",
- "User-Agent": "Cloud-Init/%s" % vs(),
- },
- method="GET",
- timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
- url=full_url,
- stream=False,
- )
- ],
- )
self.assertEqual(m_dhcp.call_count, 2)
m_net.assert_any_call(
broadcast="192.168.2.255",
@@ -3473,7 +3240,7 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
self.assertEqual(m_net.call_count, 2)
def test__reprovision_calls__poll_imds(
- self, m_request, m_dhcp, m_net, m_media_switch
+ self, m_fetch_reprovisiondata, m_dhcp, m_net, m_media_switch
):
"""The _reprovision method should call poll IMDS."""
m_media_switch.return_value = None
@@ -3486,33 +3253,14 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
"unknown-245": "624c3620",
}
]
- url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01"
- host = "169.254.169.254"
- full_url = url.format(host)
hostname = "myhost"
username = "myuser"
content = construct_ovf_env(username=username, hostname=hostname)
- m_request.return_value = mock.MagicMock(
- status_code=200, text=content, content=content
- )
+ m_fetch_reprovisiondata.side_effect = [content]
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
md, _ud, cfg, _d = dsa._reprovision()
self.assertEqual(md["local-hostname"], hostname)
self.assertEqual(cfg["system_info"]["default_user"]["name"], username)
- self.assertIn(
- mock.call(
- allow_redirects=True,
- headers={
- "Metadata": "true",
- "User-Agent": "Cloud-Init/%s" % vs(),
- },
- method="GET",
- timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
- url=full_url,
- stream=False,
- ),
- m_request.call_args_list,
- )
self.assertEqual(m_dhcp.call_count, 2)
m_net.assert_any_call(
broadcast="192.168.2.255",
@@ -3857,186 +3605,6 @@ def fake_http_error_for_code(status_code: int):
)
-@pytest.mark.parametrize(
- "md_type,expected_url",
- [
- (
- dsaz.MetadataType.ALL,
- "http://169.254.169.254/metadata/instance?"
- "api-version=2021-08-01&extended=true",
- ),
- (
- dsaz.MetadataType.NETWORK,
- "http://169.254.169.254/metadata/instance/network?"
- "api-version=2021-08-01",
- ),
- (
- dsaz.MetadataType.REPROVISION_DATA,
- "http://169.254.169.254/metadata/reprovisiondata?"
- "api-version=2021-08-01",
- ),
- ],
-)
-class TestIMDS:
- def test_basic_scenarios(
- self, azure_ds, caplog, mock_readurl, md_type, expected_url
- ):
- fake_md = {"foo": {"bar": []}}
- mock_readurl.side_effect = [
- mock.MagicMock(contents=json.dumps(fake_md).encode()),
- ]
-
- md = azure_ds.get_imds_data_with_api_fallback(
- retries=5,
- md_type=md_type,
- )
-
- assert md == fake_md
- assert mock_readurl.mock_calls == [
- mock.call(
- expected_url,
- timeout=2,
- headers={"Metadata": "true"},
- retries=5,
- exception_cb=dsaz.imds_readurl_exception_callback,
- infinite=False,
- ),
- ]
-
- warnings = [
- x.message for x in caplog.records if x.levelno == logging.WARNING
- ]
- assert warnings == []
-
- @pytest.mark.parametrize(
- "error",
- [
- fake_http_error_for_code(404),
- fake_http_error_for_code(410),
- fake_http_error_for_code(429),
- fake_http_error_for_code(500),
- requests.Timeout("Fake connection timeout"),
- ],
- )
- def test_will_retry_errors(
- self,
- azure_ds,
- caplog,
- md_type,
- expected_url,
- mock_requests_session_request,
- mock_url_helper_time_sleep,
- error,
- ):
- fake_md = {"foo": {"bar": []}}
- mock_requests_session_request.side_effect = [
- error,
- mock.Mock(content=json.dumps(fake_md)),
- ]
-
- md = azure_ds.get_imds_data_with_api_fallback(
- retries=5,
- md_type=md_type,
- )
-
- assert md == fake_md
- assert len(mock_requests_session_request.mock_calls) == 2
- assert mock_url_helper_time_sleep.mock_calls == [mock.call(1)]
-
- warnings = [
- x.message for x in caplog.records if x.levelno == logging.WARNING
- ]
- assert warnings == []
-
- @pytest.mark.parametrize("retries", [0, 1, 5, 10])
- @pytest.mark.parametrize(
- "error",
- [
- fake_http_error_for_code(404),
- fake_http_error_for_code(410),
- fake_http_error_for_code(429),
- fake_http_error_for_code(500),
- requests.Timeout("Fake connection timeout"),
- ],
- )
- def test_retry_until_failure(
- self,
- azure_ds,
- caplog,
- md_type,
- expected_url,
- mock_requests_session_request,
- mock_url_helper_time_sleep,
- error,
- retries,
- ):
- mock_requests_session_request.side_effect = [error] * (retries + 1)
-
- assert (
- azure_ds.get_imds_data_with_api_fallback(
- retries=retries,
- md_type=md_type,
- )
- == {}
- )
-
- assert len(mock_requests_session_request.mock_calls) == (retries + 1)
- assert (
- mock_url_helper_time_sleep.mock_calls == [mock.call(1)] * retries
- )
-
- warnings = [
- x.message for x in caplog.records if x.levelno == logging.WARNING
- ]
- assert warnings == [
- "Ignoring IMDS instance metadata. "
- "Get metadata from IMDS failed: %s" % error
- ]
-
- @pytest.mark.parametrize(
- "error",
- [
- fake_http_error_for_code(403),
- fake_http_error_for_code(501),
- requests.ConnectionError("Fake Network Unreachable"),
- ],
- )
- def test_will_not_retry_errors(
- self,
- azure_ds,
- caplog,
- md_type,
- expected_url,
- mock_requests_session_request,
- mock_url_helper_time_sleep,
- error,
- ):
- fake_md = {"foo": {"bar": []}}
- mock_requests_session_request.side_effect = [
- error,
- mock.Mock(content=json.dumps(fake_md)),
- ]
-
- assert (
- azure_ds.get_imds_data_with_api_fallback(
- retries=5,
- md_type=md_type,
- )
- == {}
- )
-
- assert len(mock_requests_session_request.mock_calls) == 1
- assert mock_url_helper_time_sleep.mock_calls == []
-
- warnings = [
- x.message for x in caplog.records if x.levelno == logging.WARNING
- ]
- assert warnings == [
- "Ignoring IMDS instance metadata. "
- "Get metadata from IMDS failed: %s" % error
- ]
-
-
class TestInstanceId:
def test_metadata(self, azure_ds, mock_dmi_read_dmi_data):
azure_ds.metadata = {"instance-id": "test-id"}
@@ -4141,8 +3709,9 @@ class TestProvisioning:
timeout=2,
headers={"Metadata": "true"},
retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
+ exception_cb=imds._readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
),
]
@@ -4200,29 +3769,31 @@ class TestProvisioning:
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
mock.call(
"http://169.254.169.254/metadata/reprovisiondata?"
"api-version=2019-06-01",
- timeout=2,
- headers={"Metadata": "true"},
exception_cb=mock.ANY,
- infinite=True,
+ headers={"Metadata": "true"},
log_req_resp=False,
+ infinite=True,
+ timeout=2,
),
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
]
@@ -4303,38 +3874,41 @@ class TestProvisioning:
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
mock.call(
- "http://169.254.169.254/metadata/instance/network?"
- "api-version=2021-08-01",
- timeout=2,
+ "http://169.254.169.254/metadata/instance?"
+ "api-version=2021-08-01&extended=true",
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=0,
- exception_cb=mock.ANY,
- infinite=True,
+ infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
mock.call(
"http://169.254.169.254/metadata/reprovisiondata?"
"api-version=2019-06-01",
- timeout=2,
- headers={"Metadata": "true"},
exception_cb=mock.ANY,
- infinite=True,
+ headers={"Metadata": "true"},
log_req_resp=False,
+ infinite=True,
+ timeout=2,
),
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
]
@@ -4451,38 +4025,41 @@ class TestProvisioning:
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
mock.call(
- "http://169.254.169.254/metadata/instance/network?"
- "api-version=2021-08-01",
- timeout=2,
+ "http://169.254.169.254/metadata/instance?"
+ "api-version=2021-08-01&extended=true",
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=0,
- exception_cb=mock.ANY,
- infinite=True,
+ infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
mock.call(
"http://169.254.169.254/metadata/reprovisiondata?"
"api-version=2019-06-01",
- timeout=2,
- headers={"Metadata": "true"},
exception_cb=mock.ANY,
+ headers={"Metadata": "true"},
infinite=True,
log_req_resp=False,
+ timeout=2,
),
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
]
@@ -4557,29 +4134,31 @@ class TestProvisioning:
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
mock.call(
"http://169.254.169.254/metadata/reprovisiondata?"
"api-version=2019-06-01",
- timeout=2,
- headers={"Metadata": "true"},
exception_cb=mock.ANY,
+ headers={"Metadata": "true"},
infinite=True,
log_req_resp=False,
+ timeout=2,
),
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
),
]
@@ -4636,12 +4215,13 @@ class TestProvisioning:
mock.call(
"http://169.254.169.254/metadata/instance?"
"api-version=2021-08-01&extended=true",
- timeout=2,
+ exception_cb=imds._readurl_exception_callback,
headers={"Metadata": "true"},
- retries=10,
- exception_cb=dsaz.imds_readurl_exception_callback,
infinite=False,
- )
+ log_req_resp=True,
+ retries=10,
+ timeout=2,
+ ),
]
assert self.mock_subp_subp.mock_calls == []
diff --git a/tests/unittests/sources/test_azure_helper.py b/tests/unittests/sources/test_azure_helper.py
index 0a41fedf..38a57b99 100644
--- a/tests/unittests/sources/test_azure_helper.py
+++ b/tests/unittests/sources/test_azure_helper.py
@@ -75,6 +75,23 @@ HEALTH_REPORT_XML_TEMPLATE = """\
</Health>
"""
+
+def get_formatted_health_report_xml_bytes(
+ container_id: str,
+ incarnation: int,
+ instance_id: str,
+ health_status: str,
+ health_detail_subsection: str,
+) -> bytes:
+ return HEALTH_REPORT_XML_TEMPLATE.format(
+ container_id=container_id,
+ incarnation=incarnation,
+ instance_id=instance_id,
+ health_status=health_status,
+ health_detail_subsection=health_detail_subsection,
+ ).encode("utf-8")
+
+
HEALTH_DETAIL_SUBSECTION_XML_TEMPLATE = dedent(
"""\
<Details>
@@ -626,14 +643,11 @@ class TestGoalStateHealthReporter(CiTestCase):
return element.text
return None
- def _get_formatted_health_report_xml_string(self, **kwargs):
- return HEALTH_REPORT_XML_TEMPLATE.format(**kwargs)
-
def _get_formatted_health_detail_subsection_xml_string(self, **kwargs):
return HEALTH_DETAIL_SUBSECTION_XML_TEMPLATE.format(**kwargs)
def _get_report_ready_health_document(self):
- return self._get_formatted_health_report_xml_string(
+ return get_formatted_health_report_xml_bytes(
incarnation=escape(str(self.default_parameters["incarnation"])),
container_id=escape(self.default_parameters["container_id"]),
instance_id=escape(self.default_parameters["instance_id"]),
@@ -651,7 +665,7 @@ class TestGoalStateHealthReporter(CiTestCase):
)
)
- return self._get_formatted_health_report_xml_string(
+ return get_formatted_health_report_xml_bytes(
incarnation=escape(str(self.default_parameters["incarnation"])),
container_id=escape(self.default_parameters["container_id"]),
instance_id=escape(self.default_parameters["instance_id"]),
@@ -887,7 +901,7 @@ class TestGoalStateHealthReporter(CiTestCase):
health_description=escape(health_description),
)
)
- health_document = self._get_formatted_health_report_xml_string(
+ health_document = get_formatted_health_report_xml_bytes(
incarnation=escape(incarnation),
container_id=escape(container_id),
instance_id=escape(instance_id),
@@ -1132,9 +1146,9 @@ class TestWALinuxAgentShim(CiTestCase):
posted_document = (
self.AzureEndpointHttpClient.return_value.post.call_args[1]["data"]
)
- self.assertIn(self.test_incarnation, posted_document)
- self.assertIn(self.test_container_id, posted_document)
- self.assertIn(self.test_instance_id, posted_document)
+ self.assertIn(self.test_incarnation.encode("utf-8"), posted_document)
+ self.assertIn(self.test_container_id.encode("utf-8"), posted_document)
+ self.assertIn(self.test_instance_id.encode("utf-8"), posted_document)
def test_goal_state_values_used_for_report_failure(self):
shim = wa_shim(endpoint="test_endpoint")
@@ -1142,14 +1156,14 @@ class TestWALinuxAgentShim(CiTestCase):
posted_document = (
self.AzureEndpointHttpClient.return_value.post.call_args[1]["data"]
)
- self.assertIn(self.test_incarnation, posted_document)
- self.assertIn(self.test_container_id, posted_document)
- self.assertIn(self.test_instance_id, posted_document)
+ self.assertIn(self.test_incarnation.encode("utf-8"), posted_document)
+ self.assertIn(self.test_container_id.encode("utf-8"), posted_document)
+ self.assertIn(self.test_instance_id.encode("utf-8"), posted_document)
def test_xml_elems_in_report_ready_post(self):
shim = wa_shim(endpoint="test_endpoint")
shim.register_with_azure_and_fetch_data()
- health_document = HEALTH_REPORT_XML_TEMPLATE.format(
+ health_document = get_formatted_health_report_xml_bytes(
incarnation=escape(self.test_incarnation),
container_id=escape(self.test_container_id),
instance_id=escape(self.test_instance_id),
@@ -1164,7 +1178,7 @@ class TestWALinuxAgentShim(CiTestCase):
def test_xml_elems_in_report_failure_post(self):
shim = wa_shim(endpoint="test_endpoint")
shim.register_with_azure_and_report_failure(description="TestDesc")
- health_document = HEALTH_REPORT_XML_TEMPLATE.format(
+ health_document = get_formatted_health_report_xml_bytes(
incarnation=escape(self.test_incarnation),
container_id=escape(self.test_container_id),
instance_id=escape(self.test_instance_id),
@@ -1382,35 +1396,11 @@ class TestGetMetadataGoalStateXMLAndReportFailureToFabric(CiTestCase):
)
self.assertEqual(1, self.m_shim.return_value.clean_up.call_count)
- def test_report_failure_to_fabric_with_desc_calls_shim_report_failure(
- self,
- ):
- azure_helper.report_failure_to_fabric(
- endpoint="test_endpoint", description="TestDesc"
- )
- self.m_shim.return_value.register_with_azure_and_report_failure.assert_called_once_with( # noqa: E501
- description="TestDesc"
- )
-
- def test_report_failure_to_fabric_with_no_desc_calls_shim_report_failure(
+ def test_report_failure_to_fabric_calls_shim_report_failure(
self,
):
azure_helper.report_failure_to_fabric(endpoint="test_endpoint")
# default err message description should be shown to the user
- # if no description is passed in
- self.m_shim.return_value.register_with_azure_and_report_failure.assert_called_once_with( # noqa: E501
- description=(
- azure_helper.DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE
- )
- )
-
- def test_report_failure_to_fabric_empty_desc_calls_shim_report_failure(
- self,
- ):
- azure_helper.report_failure_to_fabric(
- endpoint="test_endpoint", description=""
- )
- # default err message description should be shown to the user
# if an empty description is passed in
self.m_shim.return_value.register_with_azure_and_report_failure.assert_called_once_with( # noqa: E501
description=(
diff --git a/tests/unittests/sources/test_ec2.py b/tests/unittests/sources/test_ec2.py
index 4c832da7..3fe525e3 100644
--- a/tests/unittests/sources/test_ec2.py
+++ b/tests/unittests/sources/test_ec2.py
@@ -5,6 +5,7 @@ import json
import threading
from unittest import mock
+import pytest
import requests
import responses
@@ -223,6 +224,12 @@ TAGS_METADATA_2021_03_23: dict = {
}
+@pytest.fixture(autouse=True)
+def disable_is_resolvable():
+ with mock.patch("cloudinit.sources.DataSourceEc2.util.is_resolvable"):
+ yield
+
+
def _register_ssh_keys(rfunc, base_url, keys_data):
"""handle ssh key inconsistencies.
@@ -303,7 +310,7 @@ def register_mock_metaserver(base_url, data, responses_mock=None):
def myreg(*argc, **kwargs):
url, body = argc
- method = responses.PUT if ec2.API_TOKEN_ROUTE in url else responses.GET
+ method = responses.PUT if "latest/api/token" in url else responses.GET
status = kwargs.get("status", 200)
return responses_mock.add(method, url, body, status=status)
@@ -1180,6 +1187,16 @@ class TesIdentifyPlatform(test_helpers.CiTestCase):
return unspecial
@mock.patch("cloudinit.sources.DataSourceEc2._collect_platform_data")
+ def test_identify_aliyun(self, m_collect):
+ """aliyun should be identified if product name equals to
+ Alibaba Cloud ECS
+ """
+ m_collect.return_value = self.collmock(
+ product_name="Alibaba Cloud ECS"
+ )
+ self.assertEqual(ec2.CloudNames.ALIYUN, ec2.identify_platform())
+
+ @mock.patch("cloudinit.sources.DataSourceEc2._collect_platform_data")
def test_identify_zstack(self, m_collect):
"""zstack should be identified if chassis-asset-tag
ends in .zstack.io
diff --git a/tests/unittests/sources/test_init.py b/tests/unittests/sources/test_init.py
index a81c33a2..0447e02c 100644
--- a/tests/unittests/sources/test_init.py
+++ b/tests/unittests/sources/test_init.py
@@ -716,9 +716,13 @@ class TestDataSource(CiTestCase):
"cloudinit.sources.canonical_cloud_id", return_value="my-cloud"
):
datasource.get_data()
- self.assertEqual("my-cloud\n", util.load_file(cloud_id_link))
- # A symlink with the generic /run/cloud-init/cloud-id link is present
- self.assertTrue(util.is_link(cloud_id_link))
+ self.assertEqual("my-cloud\n", util.load_file(cloud_id_link))
+ # A symlink with the generic /run/cloud-init/cloud-id
+ # link is present
+ self.assertTrue(util.is_link(cloud_id_link))
+ datasource.persist_instance_data()
+ # cloud-id<cloud-type> not deleted: no cloud-id change
+ self.assertTrue(os.path.exists(cloud_id_file))
# When cloud-id changes, symlink and content change
with mock.patch(
"cloudinit.sources.canonical_cloud_id", return_value="my-cloud2"
diff --git a/tests/unittests/sources/test_lxd.py b/tests/unittests/sources/test_lxd.py
index b02ed177..efc24883 100644
--- a/tests/unittests/sources/test_lxd.py
+++ b/tests/unittests/sources/test_lxd.py
@@ -440,18 +440,22 @@ class TestReadMetadata:
"[GET] [HTTP:200] http://lxd/1.0/config",
],
),
- ( # Assert 404 on devices
+ ( # Assert 404 on devices logs about skipping
True,
{
"http://lxd/1.0/meta-data": "local-hostname: md\n",
"http://lxd/1.0/config": "[]",
+ # No devices URL response, so 404 raised
+ },
+ {
+ "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION,
+ "config": {},
+ "meta-data": "local-hostname: md\n",
},
- InvalidMetaDataException(
- "Invalid HTTP response [404] from http://lxd/1.0/devices"
- ),
[
"[GET] [HTTP:200] http://lxd/1.0/meta-data",
"[GET] [HTTP:200] http://lxd/1.0/config",
+ "Skipping http://lxd/1.0/devices on [HTTP:404]",
],
),
( # Assert non-JSON format from devices
@@ -693,5 +697,46 @@ class TestReadMetadata:
== m_session_get.call_args_list
)
+ @mock.patch.object(lxd.requests.Session, "get")
+ @mock.patch.object(lxd.time, "sleep")
+ def test_socket_retry(self, m_session_get, m_sleep):
+ """validate socket retry logic"""
+
+ def generate_return_codes():
+ """
+ [200]
+ [500, 200]
+ [500, 500, 200]
+ [500, 500, ..., 200]
+ """
+ five_hundreds = []
+
+ # generate a couple of longer ones to assert timeout condition
+ for _ in range(33):
+ five_hundreds.append(500)
+ yield [*five_hundreds, 200]
+
+ for return_codes in generate_return_codes():
+ m = mock.Mock(
+ get=mock.Mock(
+ side_effect=[
+ mock.MagicMock(
+ ok=mock.PropertyMock(return_value=True),
+ status_code=code,
+ text=mock.PropertyMock(
+ return_value="properly formatted http response"
+ ),
+ )
+ for code in return_codes
+ ]
+ )
+ )
+ resp = lxd._do_request(m, "http://agua/")
-# vi: ts=4 expandtab
+ # assert that 30 iterations or the first 200 code is the final
+ # attempt, whichever comes first
+ assert min(len(return_codes), 30) == m.get.call_count
+ if len(return_codes) < 31:
+ assert 200 == resp.status_code
+ else:
+ assert 500 == resp.status_code
diff --git a/tests/unittests/sources/test_opennebula.py b/tests/unittests/sources/test_opennebula.py
index af1c45b8..0fc332a9 100644
--- a/tests/unittests/sources/test_opennebula.py
+++ b/tests/unittests/sources/test_opennebula.py
@@ -121,7 +121,9 @@ class TestOpenNebulaDataSource(CiTestCase):
util.find_devs_with = lambda n: [] # type: ignore
populate_context_dir(self.seed_dir, {"KEY1": "val1"})
dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
+ with mock.patch(DS_PATH + ".pwd.getpwnam") as getpwnam:
+ ret = dsrc.get_data()
+ self.assertEqual([mock.call("nobody")], getpwnam.call_args_list)
self.assertTrue(ret)
finally:
util.find_devs_with = orig_find_devs_with
diff --git a/tests/unittests/sources/test_openstack.py b/tests/unittests/sources/test_openstack.py
index 8bcecae7..02516772 100644
--- a/tests/unittests/sources/test_openstack.py
+++ b/tests/unittests/sources/test_openstack.py
@@ -10,9 +10,11 @@ import re
from io import StringIO
from urllib.parse import urlparse
+import pytest
import responses
from cloudinit import helpers, settings, util
+from cloudinit.distros import Distro
from cloudinit.sources import UNSET, BrokenMetadata
from cloudinit.sources import DataSourceOpenStack as ds
from cloudinit.sources import convert_vendordata
@@ -76,6 +78,12 @@ EC2_VERSIONS = [
MOCK_PATH = "cloudinit.sources.DataSourceOpenStack."
+@pytest.fixture(autouse=True)
+def mock_is_resolvable():
+ with mock.patch(f"{MOCK_PATH}util.is_resolvable"):
+ yield
+
+
# TODO _register_uris should leverage test_ec2.register_mock_metaserver.
def _register_uris(version, ec2_files, ec2_meta, os_files, *, responses_mock):
"""Registers a set of url patterns into responses that will mimic the
@@ -292,15 +300,13 @@ class TestOpenStackDataSource(test_helpers.ResponsesTestCase):
OS_FILES,
responses_mock=self.responses,
)
+ distro = mock.MagicMock(spec=Distro)
+ distro.is_virtual = False
ds_os = ds.DataSourceOpenStack(
- settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": self.tmp})
+ settings.CFG_BUILTIN, distro, helpers.Paths({"run_dir": self.tmp})
)
self.assertIsNone(ds_os.version)
- mock_path = MOCK_PATH + "detect_openstack"
- with test_helpers.mock.patch(mock_path) as m_detect_os:
- m_detect_os.return_value = True
- found = ds_os.get_data()
- self.assertTrue(found)
+ self.assertTrue(ds_os.get_data())
self.assertEqual(2, ds_os.version)
md = dict(ds_os.metadata)
md.pop("instance-id", None)
@@ -344,8 +350,9 @@ class TestOpenStackDataSource(test_helpers.ResponsesTestCase):
]
self.assertIsNone(ds_os_local.version)
- mock_path = MOCK_PATH + "detect_openstack"
- with test_helpers.mock.patch(mock_path) as m_detect_os:
+ with test_helpers.mock.patch.object(
+ ds_os_local, "detect_openstack"
+ ) as m_detect_os:
m_detect_os.return_value = True
found = ds_os_local.get_data()
self.assertTrue(found)
@@ -370,12 +377,15 @@ class TestOpenStackDataSource(test_helpers.ResponsesTestCase):
_register_uris(
self.VERSION, {}, {}, os_files, responses_mock=self.responses
)
+ distro = mock.MagicMock(spec=Distro)
+ distro.is_virtual = True
ds_os = ds.DataSourceOpenStack(
- settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": self.tmp})
+ settings.CFG_BUILTIN, distro, helpers.Paths({"run_dir": self.tmp})
)
self.assertIsNone(ds_os.version)
- mock_path = MOCK_PATH + "detect_openstack"
- with test_helpers.mock.patch(mock_path) as m_detect_os:
+ with test_helpers.mock.patch.object(
+ ds_os, "detect_openstack"
+ ) as m_detect_os:
m_detect_os.return_value = True
found = ds_os.get_data()
self.assertFalse(found)
@@ -394,19 +404,17 @@ class TestOpenStackDataSource(test_helpers.ResponsesTestCase):
_register_uris(
self.VERSION, {}, {}, os_files, responses_mock=self.responses
)
+ distro = mock.MagicMock(spec=Distro)
+ distro.is_virtual = True
ds_os = ds.DataSourceOpenStack(
- settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": self.tmp})
+ settings.CFG_BUILTIN, distro, helpers.Paths({"run_dir": self.tmp})
)
ds_os.ds_cfg = {
"max_wait": 0,
"timeout": 0,
}
self.assertIsNone(ds_os.version)
- mock_path = MOCK_PATH + "detect_openstack"
- with test_helpers.mock.patch(mock_path) as m_detect_os:
- m_detect_os.return_value = True
- found = ds_os.get_data()
- self.assertFalse(found)
+ self.assertFalse(ds_os.get_data())
self.assertIsNone(ds_os.version)
def test_network_config_disabled_by_datasource_config(self):
@@ -471,16 +479,19 @@ class TestOpenStackDataSource(test_helpers.ResponsesTestCase):
_register_uris(
self.VERSION, {}, {}, os_files, responses_mock=self.responses
)
+ distro = mock.MagicMock(spec=Distro)
+ distro.is_virtual = True
ds_os = ds.DataSourceOpenStack(
- settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": self.tmp})
+ settings.CFG_BUILTIN, distro, helpers.Paths({"run_dir": self.tmp})
)
ds_os.ds_cfg = {
"max_wait": 0,
"timeout": 0,
}
self.assertIsNone(ds_os.version)
- mock_path = MOCK_PATH + "detect_openstack"
- with test_helpers.mock.patch(mock_path) as m_detect_os:
+ with test_helpers.mock.patch.object(
+ ds_os, "detect_openstack"
+ ) as m_detect_os:
m_detect_os.return_value = True
found = ds_os.get_data()
self.assertFalse(found)
@@ -568,13 +579,58 @@ class TestVendorDataLoading(test_helpers.TestCase):
@test_helpers.mock.patch(MOCK_PATH + "util.is_x86")
class TestDetectOpenStack(test_helpers.CiTestCase):
+ def setUp(self):
+ self.tmp = self.tmp_dir()
+
+ def _fake_ds(self) -> ds.DataSourceOpenStack:
+ distro = mock.MagicMock(spec=Distro)
+ distro.is_virtual = True
+ return ds.DataSourceOpenStack(
+ settings.CFG_BUILTIN, distro, helpers.Paths({"run_dir": self.tmp})
+ )
+
def test_detect_openstack_non_intel_x86(self, m_is_x86):
"""Return True on non-intel platforms because dmi isn't conclusive."""
m_is_x86.return_value = False
self.assertTrue(
- ds.detect_openstack(), "Expected detect_openstack == True"
+ self._fake_ds().detect_openstack(),
+ "Expected detect_openstack == True",
+ )
+
+ def test_detect_openstack_bare_metal(self, m_is_x86):
+ """Return True if the distro is non-virtual."""
+ m_is_x86.return_value = True
+
+ distro = mock.MagicMock(spec=Distro)
+ distro.is_virtual = False
+
+ fake_ds = self._fake_ds()
+ fake_ds.distro = distro
+
+ self.assertFalse(
+ fake_ds.distro.is_virtual,
+ "Expected distro.is_virtual == False",
)
+ with test_helpers.mock.patch.object(
+ fake_ds, "wait_for_metadata_service"
+ ) as m_wait_for_metadata_service:
+ m_wait_for_metadata_service.return_value = True
+
+ self.assertTrue(
+ fake_ds.wait_for_metadata_service(),
+ "Expected wait_for_metadata_service == True",
+ )
+
+ self.assertTrue(
+ fake_ds.detect_openstack(), "Expected detect_openstack == True"
+ )
+
+ self.assertTrue(
+ m_wait_for_metadata_service.called,
+ "Expected wait_for_metadata_service to be called",
+ )
+
@test_helpers.mock.patch(MOCK_PATH + "util.get_proc_env")
@test_helpers.mock.patch(MOCK_PATH + "dmi.read_dmi_data")
def test_not_detect_openstack_intel_x86_ec2(
@@ -594,7 +650,8 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
m_dmi.side_effect = fake_dmi_read
self.assertFalse(
- ds.detect_openstack(), "Expected detect_openstack == False on EC2"
+ self._fake_ds().detect_openstack(),
+ "Expected detect_openstack == False on EC2",
)
m_proc_env.assert_called_with(1)
@@ -609,7 +666,8 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
for product_name in openstack_product_names:
m_dmi.return_value = product_name
self.assertTrue(
- ds.detect_openstack(), "Failed to detect_openstack"
+ self._fake_ds().detect_openstack(),
+ "Failed to detect_openstack",
)
@test_helpers.mock.patch(MOCK_PATH + "dmi.read_dmi_data")
@@ -628,7 +686,7 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
m_dmi.side_effect = fake_dmi_read
self.assertTrue(
- ds.detect_openstack(),
+ self._fake_ds().detect_openstack(),
"Expected detect_openstack == True on OpenTelekomCloud",
)
@@ -648,7 +706,7 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
m_dmi.side_effect = fake_dmi_read
self.assertTrue(
- ds.detect_openstack(),
+ self._fake_ds().detect_openstack(),
"Expected detect_openstack == True on SAP CCloud VM",
)
@@ -668,7 +726,7 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
m_dmi.side_effect = fake_asset_tag_dmi_read
self.assertTrue(
- ds.detect_openstack(),
+ self._fake_ds().detect_openstack(),
"Expected detect_openstack == True on Huawei Cloud VM",
)
@@ -688,11 +746,11 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
m_dmi.side_effect = fake_dmi_read
self.assertTrue(
- ds.detect_openstack(accept_oracle=True),
+ self._fake_ds().detect_openstack(accept_oracle=True),
"Expected detect_openstack == True on OracleCloud.com",
)
self.assertFalse(
- ds.detect_openstack(accept_oracle=False),
+ self._fake_ds().detect_openstack(accept_oracle=False),
"Expected detect_openstack == False.",
)
@@ -711,7 +769,7 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
m_dmi.side_effect = fake_dmi_read
self.assertTrue(
- ds.detect_openstack(),
+ self._fake_ds().detect_openstack(),
"Expected detect_openstack == True on Generic OpenStack Platform",
)
@@ -749,7 +807,7 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
m_dmi.side_effect = fake_dmi_read
self.assertTrue(
- ds.detect_openstack(),
+ self._fake_ds().detect_openstack(),
"Expected detect_openstack == True on OpenTelekomCloud",
)
m_proc_env.assert_called_with(1)
diff --git a/tests/unittests/sources/test_ovf.py b/tests/unittests/sources/test_ovf.py
index 1fbd564f..109d8889 100644
--- a/tests/unittests/sources/test_ovf.py
+++ b/tests/unittests/sources/test_ovf.py
@@ -5,19 +5,13 @@
# This file is part of cloud-init. See LICENSE file for license information.
import base64
-import os
from collections import OrderedDict
from textwrap import dedent
from cloudinit import subp, util
from cloudinit.helpers import Paths
-from cloudinit.safeyaml import YAMLError
from cloudinit.sources import DataSourceOVF as dsovf
-from cloudinit.sources.DataSourceOVF import GuestCustScriptDisabled
-from cloudinit.sources.helpers.vmware.imc.config_custom_script import (
- CustomScriptNotFound,
-)
-from tests.unittests.helpers import CiTestCase, mock, wrap_and_call
+from tests.unittests.helpers import CiTestCase, mock
MPATH = "cloudinit.sources.DataSourceOVF."
@@ -203,34 +197,6 @@ class TestReadOvfEnv(CiTestCase):
self.assertIsNone(ud)
-class TestMarkerFiles(CiTestCase):
- def setUp(self):
- super(TestMarkerFiles, self).setUp()
- self.tdir = self.tmp_dir()
-
- def test_false_when_markerid_none(self):
- """Return False when markerid provided is None."""
- self.assertFalse(
- dsovf.check_marker_exists(markerid=None, marker_dir=self.tdir)
- )
-
- def test_markerid_file_exist(self):
- """Return False when markerid file path does not exist,
- True otherwise."""
- self.assertFalse(dsovf.check_marker_exists("123", self.tdir))
-
- marker_file = self.tmp_path(".markerfile-123.txt", self.tdir)
- util.write_file(marker_file, "")
- self.assertTrue(dsovf.check_marker_exists("123", self.tdir))
-
- def test_marker_file_setup(self):
- """Test creation of marker files."""
- markerfilepath = self.tmp_path(".markerfile-hi.txt", self.tdir)
- self.assertFalse(os.path.exists(markerfilepath))
- dsovf.setup_marker_files(markerid="hi", marker_dir=self.tdir)
- self.assertTrue(os.path.exists(markerfilepath))
-
-
class TestDatasourceOVF(CiTestCase):
with_logs = True
@@ -240,334 +206,8 @@ class TestDatasourceOVF(CiTestCase):
self.datasource = dsovf.DataSourceOVF
self.tdir = self.tmp_dir()
- def test_get_data_false_on_none_dmi_data(self):
- """When dmi for system-product-name is None, get_data returns False."""
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
- retcode = wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": None,
- "transport_iso9660": NOT_FOUND,
- "transport_vmware_guestinfo": NOT_FOUND,
- },
- ds.get_data,
- )
- self.assertFalse(retcode, "Expected False return from ds.get_data")
- self.assertIn(
- "DEBUG: No system-product-name found", self.logs.getvalue()
- )
-
- def test_get_data_vmware_customization_disabled(self):
- """When vmware customization is disabled via sys_cfg and
- allow_raw_data is disabled via ds_cfg, log a message.
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={
- "disable_vmware_customization": True,
- "datasource": {"OVF": {"allow_raw_data": False}},
- },
- distro={},
- paths=paths,
- )
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [MISC]
- MARKER-ID = 12345345
- """
- )
- util.write_file(conf_file, conf_content)
- retcode = wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "transport_iso9660": NOT_FOUND,
- "transport_vmware_guestinfo": NOT_FOUND,
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- },
- ds.get_data,
- )
- self.assertFalse(retcode, "Expected False return from ds.get_data")
- self.assertIn(
- "DEBUG: Customization for VMware platform is disabled.",
- self.logs.getvalue(),
- )
-
- def test_get_data_vmware_customization_sys_cfg_disabled(self):
- """When vmware customization is disabled via sys_cfg and
- no meta data is found, log a message.
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={
- "disable_vmware_customization": True,
- "datasource": {"OVF": {"allow_raw_data": True}},
- },
- distro={},
- paths=paths,
- )
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [MISC]
- MARKER-ID = 12345345
- """
- )
- util.write_file(conf_file, conf_content)
- retcode = wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "transport_iso9660": NOT_FOUND,
- "transport_vmware_guestinfo": NOT_FOUND,
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- },
- ds.get_data,
- )
- self.assertFalse(retcode, "Expected False return from ds.get_data")
- self.assertIn(
- "DEBUG: Customization using VMware config is disabled.",
- self.logs.getvalue(),
- )
-
- def test_get_data_allow_raw_data_disabled(self):
- """When allow_raw_data is disabled via ds_cfg and
- meta data is found, log a message.
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={
- "disable_vmware_customization": False,
- "datasource": {"OVF": {"allow_raw_data": False}},
- },
- distro={},
- paths=paths,
- )
-
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CLOUDINIT]
- METADATA = test-meta
- """
- )
- util.write_file(conf_file, conf_content)
- # Prepare the meta data file
- metadata_file = self.tmp_path("test-meta", self.tdir)
- util.write_file(metadata_file, "This is meta data")
- retcode = wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "transport_iso9660": NOT_FOUND,
- "transport_vmware_guestinfo": NOT_FOUND,
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "collect_imc_file_paths": [self.tdir + "/test-meta", "", ""],
- },
- ds.get_data,
- )
- self.assertFalse(retcode, "Expected False return from ds.get_data")
- self.assertIn(
- "DEBUG: Customization using raw data is disabled.",
- self.logs.getvalue(),
- )
-
- def test_get_data_vmware_customization_enabled(self):
- """When cloud-init workflow for vmware is enabled via sys_cfg log a
- message.
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": False},
- distro={},
- paths=paths,
- )
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CUSTOM-SCRIPT]
- SCRIPT-NAME = test-script
- [MISC]
- MARKER-ID = 12345345
- """
- )
- util.write_file(conf_file, conf_content)
- with mock.patch(MPATH + "get_tools_config", return_value="true"):
- with self.assertRaises(CustomScriptNotFound) as context:
- wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "get_nics_to_enable": "",
- },
- ds.get_data,
- )
- customscript = self.tmp_path("test-script", self.tdir)
- self.assertIn(
- "Script %s not found!!" % customscript, str(context.exception)
- )
-
- def test_get_data_cust_script_disabled(self):
- """If custom script is disabled by VMware tools configuration,
- raise a RuntimeError.
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": False},
- distro={},
- paths=paths,
- )
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CUSTOM-SCRIPT]
- SCRIPT-NAME = test-script
- [MISC]
- MARKER-ID = 12345346
- """
- )
- util.write_file(conf_file, conf_content)
- # Prepare the custom sript
- customscript = self.tmp_path("test-script", self.tdir)
- util.write_file(customscript, "This is the post cust script")
-
- with mock.patch(MPATH + "get_tools_config", return_value="invalid"):
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- with self.assertRaises(GuestCustScriptDisabled) as context:
- wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "get_nics_to_enable": "",
- },
- ds.get_data,
- )
- self.assertIn(
- "Custom script is disabled by VM Administrator",
- str(context.exception),
- )
-
- def test_get_data_cust_script_enabled(self):
- """If custom script is enabled by VMware tools configuration,
- execute the script.
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": False},
- distro={},
- paths=paths,
- )
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CUSTOM-SCRIPT]
- SCRIPT-NAME = test-script
- [MISC]
- MARKER-ID = 12345346
- """
- )
- util.write_file(conf_file, conf_content)
-
- # Mock custom script is enabled by return true when calling
- # get_tools_config
- with mock.patch(MPATH + "get_tools_config", return_value="true"):
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- with self.assertRaises(CustomScriptNotFound) as context:
- wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "get_nics_to_enable": "",
- },
- ds.get_data,
- )
- # Verify custom script is trying to be executed
- customscript = self.tmp_path("test-script", self.tdir)
- self.assertIn(
- "Script %s not found!!" % customscript, str(context.exception)
- )
-
- def test_get_data_force_run_post_script_is_yes(self):
- """If DEFAULT-RUN-POST-CUST-SCRIPT is yes, custom script could run if
- enable-custom-scripts is not defined in VM Tools configuration
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": False},
- distro={},
- paths=paths,
- )
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- # set DEFAULT-RUN-POST-CUST-SCRIPT = yes so that enable-custom-scripts
- # default value is TRUE
- conf_content = dedent(
- """\
- [CUSTOM-SCRIPT]
- SCRIPT-NAME = test-script
- [MISC]
- MARKER-ID = 12345346
- DEFAULT-RUN-POST-CUST-SCRIPT = yes
- """
- )
- util.write_file(conf_file, conf_content)
-
- # Mock get_tools_config(section, key, defaultVal) to return
- # defaultVal
- def my_get_tools_config(*args, **kwargs):
- return args[2]
-
- with mock.patch(
- MPATH + "get_tools_config", side_effect=my_get_tools_config
- ):
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- with self.assertRaises(CustomScriptNotFound) as context:
- wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "get_nics_to_enable": "",
- },
- ds.get_data,
- )
- # Verify custom script still runs although it is
- # disabled by VMware Tools
- customscript = self.tmp_path("test-script", self.tdir)
- self.assertIn(
- "Script %s not found!!" % customscript, str(context.exception)
- )
-
- def test_get_data_non_vmware_seed_platform_info(self):
- """Platform info properly reports when on non-vmware platforms."""
+ def test_get_data_seed_dir(self):
+ """Platform info properly reports when getting data from seed dir."""
paths = Paths({"cloud_dir": self.tdir, "run_dir": self.tdir})
# Write ovf-env.xml seed file
seed_dir = self.tmp_path("seed", dir=self.tdir)
@@ -577,37 +217,14 @@ class TestDatasourceOVF(CiTestCase):
self.assertEqual("ovf", ds.cloud_name)
self.assertEqual("ovf", ds.platform_type)
- with mock.patch(MPATH + "dmi.read_dmi_data", return_value="!VMware"):
- with mock.patch(MPATH + "transport_vmware_guestinfo") as m_guestd:
- with mock.patch(MPATH + "transport_iso9660") as m_iso9660:
- m_iso9660.return_value = NOT_FOUND
- m_guestd.return_value = NOT_FOUND
- self.assertTrue(ds.get_data())
- self.assertEqual(
- "ovf (%s/seed/ovf-env.xml)" % self.tdir, ds.subplatform
- )
-
- def test_get_data_vmware_seed_platform_info(self):
- """Platform info properly reports when on VMware platform."""
- paths = Paths({"cloud_dir": self.tdir, "run_dir": self.tdir})
- # Write ovf-env.xml seed file
- seed_dir = self.tmp_path("seed", dir=self.tdir)
- ovf_env = self.tmp_path("ovf-env.xml", dir=seed_dir)
- util.write_file(ovf_env, OVF_ENV_CONTENT)
- ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
-
- self.assertEqual("ovf", ds.cloud_name)
- self.assertEqual("ovf", ds.platform_type)
- with mock.patch(MPATH + "dmi.read_dmi_data", return_value="VMWare"):
- with mock.patch(MPATH + "transport_vmware_guestinfo") as m_guestd:
- with mock.patch(MPATH + "transport_iso9660") as m_iso9660:
- m_iso9660.return_value = NOT_FOUND
- m_guestd.return_value = NOT_FOUND
- self.assertTrue(ds.get_data())
- self.assertEqual(
- "vmware (%s/seed/ovf-env.xml)" % self.tdir,
- ds.subplatform,
- )
+ with mock.patch(MPATH + "transport_vmware_guestinfo") as m_guestd:
+ with mock.patch(MPATH + "transport_iso9660") as m_iso9660:
+ m_iso9660.return_value = NOT_FOUND
+ m_guestd.return_value = NOT_FOUND
+ self.assertTrue(ds.get_data())
+ self.assertEqual(
+ "ovf (%s/seed/ovf-env.xml)" % self.tdir, ds.subplatform
+ )
@mock.patch("cloudinit.subp.subp")
@mock.patch("cloudinit.sources.DataSource.persist_instance_data")
@@ -679,346 +296,6 @@ class TestDatasourceOVF(CiTestCase):
ds.network_config,
)
- def test_get_data_cloudinit_metadata_json(self):
- """Test metadata can be loaded to cloud-init metadata and network.
- The metadata format is json.
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": True},
- distro={},
- paths=paths,
- )
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CLOUDINIT]
- METADATA = test-meta
- """
- )
- util.write_file(conf_file, conf_content)
- # Prepare the meta data file
- metadata_file = self.tmp_path("test-meta", self.tdir)
- metadata_content = dedent(
- """\
- {
- "instance-id": "cloud-vm",
- "local-hostname": "my-host.domain.com",
- "network": {
- "version": 2,
- "ethernets": {
- "eths": {
- "match": {
- "name": "ens*"
- },
- "dhcp4": true
- }
- }
- }
- }
- """
- )
- util.write_file(metadata_file, metadata_content)
-
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- result = wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "collect_imc_file_paths": [
- self.tdir + "/test-meta",
- "",
- "",
- ],
- "get_nics_to_enable": "",
- },
- ds._get_data,
- )
-
- self.assertTrue(result)
- self.assertEqual("cloud-vm", ds.metadata["instance-id"])
- self.assertEqual("my-host.domain.com", ds.metadata["local-hostname"])
- self.assertEqual(2, ds.network_config["version"])
- self.assertTrue(ds.network_config["ethernets"]["eths"]["dhcp4"])
-
- def test_get_data_cloudinit_metadata_yaml(self):
- """Test metadata can be loaded to cloud-init metadata and network.
- The metadata format is yaml.
- """
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": True},
- distro={},
- paths=paths,
- )
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CLOUDINIT]
- METADATA = test-meta
- """
- )
- util.write_file(conf_file, conf_content)
- # Prepare the meta data file
- metadata_file = self.tmp_path("test-meta", self.tdir)
- metadata_content = dedent(
- """\
- instance-id: cloud-vm
- local-hostname: my-host.domain.com
- network:
- version: 2
- ethernets:
- nics:
- match:
- name: ens*
- dhcp4: yes
- """
- )
- util.write_file(metadata_file, metadata_content)
-
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- result = wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "collect_imc_file_paths": [
- self.tdir + "/test-meta",
- "",
- "",
- ],
- "get_nics_to_enable": "",
- },
- ds._get_data,
- )
-
- self.assertTrue(result)
- self.assertEqual("cloud-vm", ds.metadata["instance-id"])
- self.assertEqual("my-host.domain.com", ds.metadata["local-hostname"])
- self.assertEqual(2, ds.network_config["version"])
- self.assertTrue(ds.network_config["ethernets"]["nics"]["dhcp4"])
-
- def test_get_data_cloudinit_metadata_not_valid(self):
- """Test metadata is not JSON or YAML format."""
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": True},
- distro={},
- paths=paths,
- )
-
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CLOUDINIT]
- METADATA = test-meta
- """
- )
- util.write_file(conf_file, conf_content)
-
- # Prepare the meta data file
- metadata_file = self.tmp_path("test-meta", self.tdir)
- metadata_content = "[This is not json or yaml format]a=b"
- util.write_file(metadata_file, metadata_content)
-
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- with self.assertRaises(YAMLError) as context:
- wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "collect_imc_file_paths": [
- self.tdir + "/test-meta",
- "",
- "",
- ],
- "get_nics_to_enable": "",
- },
- ds.get_data,
- )
-
- self.assertIn(
- "expected '<document start>', but found '<scalar>'",
- str(context.exception),
- )
-
- def test_get_data_cloudinit_metadata_not_found(self):
- """Test metadata file can't be found."""
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": True},
- distro={},
- paths=paths,
- )
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CLOUDINIT]
- METADATA = test-meta
- """
- )
- util.write_file(conf_file, conf_content)
- # Don't prepare the meta data file
-
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- with self.assertRaises(FileNotFoundError) as context:
- wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "get_nics_to_enable": "",
- },
- ds.get_data,
- )
-
- self.assertIn("is not found", str(context.exception))
-
- def test_get_data_cloudinit_userdata(self):
- """Test user data can be loaded to cloud-init user data."""
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": False},
- distro={},
- paths=paths,
- )
-
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CLOUDINIT]
- METADATA = test-meta
- USERDATA = test-user
- """
- )
- util.write_file(conf_file, conf_content)
-
- # Prepare the meta data file
- metadata_file = self.tmp_path("test-meta", self.tdir)
- metadata_content = dedent(
- """\
- instance-id: cloud-vm
- local-hostname: my-host.domain.com
- network:
- version: 2
- ethernets:
- nics:
- match:
- name: ens*
- dhcp4: yes
- """
- )
- util.write_file(metadata_file, metadata_content)
-
- # Prepare the user data file
- userdata_file = self.tmp_path("test-user", self.tdir)
- userdata_content = "This is the user data"
- util.write_file(userdata_file, userdata_content)
-
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- result = wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "collect_imc_file_paths": [
- self.tdir + "/test-meta",
- self.tdir + "/test-user",
- "",
- ],
- "get_nics_to_enable": "",
- },
- ds._get_data,
- )
-
- self.assertTrue(result)
- self.assertEqual("cloud-vm", ds.metadata["instance-id"])
- self.assertEqual(userdata_content, ds.userdata_raw)
-
- def test_get_data_cloudinit_userdata_not_found(self):
- """Test userdata file can't be found."""
- paths = Paths({"cloud_dir": self.tdir})
- ds = self.datasource(
- sys_cfg={"disable_vmware_customization": True},
- distro={},
- paths=paths,
- )
-
- # Prepare the conf file
- conf_file = self.tmp_path("test-cust", self.tdir)
- conf_content = dedent(
- """\
- [CLOUDINIT]
- METADATA = test-meta
- USERDATA = test-user
- """
- )
- util.write_file(conf_file, conf_content)
-
- # Prepare the meta data file
- metadata_file = self.tmp_path("test-meta", self.tdir)
- metadata_content = dedent(
- """\
- instance-id: cloud-vm
- local-hostname: my-host.domain.com
- network:
- version: 2
- ethernets:
- nics:
- match:
- name: ens*
- dhcp4: yes
- """
- )
- util.write_file(metadata_file, metadata_content)
-
- # Don't prepare the user data file
-
- with mock.patch(
- MPATH + "set_customization_status", return_value=("msg", b"")
- ):
- with self.assertRaises(FileNotFoundError) as context:
- wrap_and_call(
- "cloudinit.sources.DataSourceOVF",
- {
- "dmi.read_dmi_data": "vmware",
- "util.del_dir": True,
- "search_file": self.tdir,
- "wait_for_imc_cfg_file": conf_file,
- "get_nics_to_enable": "",
- },
- ds.get_data,
- )
-
- self.assertIn("is not found", str(context.exception))
-
class TestTransportIso9660(CiTestCase):
def setUp(self):
diff --git a/tests/unittests/sources/test_vmware.py b/tests/unittests/sources/test_vmware.py
index b3663b0a..4911e5bc 100644
--- a/tests/unittests/sources/test_vmware.py
+++ b/tests/unittests/sources/test_vmware.py
@@ -1,6 +1,7 @@
-# Copyright (c) 2021 VMware, Inc. All Rights Reserved.
+# Copyright (c) 2021-2022 VMware, Inc. All Rights Reserved.
#
# Authors: Andrew Kutz <akutz@vmware.com>
+# Pengpeng Sun <pengpengs@vmware.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
@@ -8,18 +9,22 @@ import base64
import gzip
import os
from contextlib import ExitStack
+from textwrap import dedent
import pytest
-from cloudinit import dmi, helpers, safeyaml, settings
+from cloudinit import dmi, helpers, safeyaml, settings, util
from cloudinit.sources import DataSourceVMware
+from cloudinit.sources.helpers.vmware.imc import guestcust_util
from tests.unittests.helpers import (
CiTestCase,
FilesystemMockingTestCase,
mock,
populate_dir,
+ wrap_and_call,
)
+MPATH = "cloudinit.sources.DataSourceVMware."
PRODUCT_NAME_FILE_PATH = "/sys/class/dmi/id/product_name"
PRODUCT_NAME = "VMware7,1"
PRODUCT_UUID = "82343CED-E4C7-423B-8F6B-0D34D19067AB"
@@ -490,6 +495,706 @@ class TestDataSourceVMwareGuestInfo_InvalidPlatform(FilesystemMockingTestCase):
self.assertFalse(ret)
+class TestDataSourceVMwareIMC(CiTestCase):
+ """
+ Test the VMware Guest OS Customization transport
+ """
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestDataSourceVMwareIMC, self).setUp()
+ self.datasource = DataSourceVMware.DataSourceVMware
+ self.tdir = self.tmp_dir()
+
+ def test_get_data_false_on_none_dmi_data(self):
+ """When dmi for system-product-name is None, get_data returns False."""
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": None,
+ },
+ ds.get_data,
+ )
+ self.assertFalse(result, "Expected False return from ds.get_data")
+ self.assertIn("No system-product-name found", self.logs.getvalue())
+
+ def test_get_imc_data_vmware_customization_disabled(self):
+ """
+ When vmware customization is disabled via sys_cfg and
+ allow_raw_data is disabled via ds_cfg, log a message.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={
+ "disable_vmware_customization": True,
+ "datasource": {"VMware": {"allow_raw_data": False}},
+ },
+ distro={},
+ paths=paths,
+ )
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [MISC]
+ MARKER-ID = 12345345
+ """
+ )
+ util.write_file(conf_file, conf_content)
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ self.assertIn(
+ "Customization for VMware platform is disabled",
+ self.logs.getvalue(),
+ )
+
+ def test_get_imc_data_vmware_customization_sys_cfg_disabled(self):
+ """
+ When vmware customization is disabled via sys_cfg and
+ no meta data is found, log a message.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={
+ "disable_vmware_customization": True,
+ "datasource": {"VMware": {"allow_raw_data": True}},
+ },
+ distro={},
+ paths=paths,
+ )
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [MISC]
+ MARKER-ID = 12345345
+ """
+ )
+ util.write_file(conf_file, conf_content)
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ self.assertIn(
+ "No allowed customization configuration data found",
+ self.logs.getvalue(),
+ )
+
+ def test_get_imc_data_allow_raw_data_disabled(self):
+ """
+ When allow_raw_data is disabled via ds_cfg and
+ meta data is found, log a message.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={
+ "disable_vmware_customization": False,
+ "datasource": {"VMware": {"allow_raw_data": False}},
+ },
+ distro={},
+ paths=paths,
+ )
+
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """
+ )
+ util.write_file(conf_file, conf_content)
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ self.assertIn(
+ "No allowed customization configuration data found",
+ self.logs.getvalue(),
+ )
+
+ def test_get_imc_data_vmware_customization_enabled(self):
+ """
+ When cloud-init workflow for vmware is enabled via sys_cfg log a
+ message.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": False},
+ distro={},
+ paths=paths,
+ )
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CUSTOM-SCRIPT]
+ SCRIPT-NAME = test-script
+ [MISC]
+ MARKER-ID = 12345345
+ """
+ )
+ util.write_file(conf_file, conf_content)
+ with mock.patch(
+ MPATH + "guestcust_util.get_tools_config",
+ return_value="true",
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ custom_script = self.tmp_path("test-script", self.tdir)
+ self.assertIn(
+ "Script %s not found!!" % custom_script,
+ self.logs.getvalue(),
+ )
+
+ def test_get_imc_data_cust_script_disabled(self):
+ """
+ If custom script is disabled by VMware tools configuration,
+ log a message.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": False},
+ distro={},
+ paths=paths,
+ )
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CUSTOM-SCRIPT]
+ SCRIPT-NAME = test-script
+ [MISC]
+ MARKER-ID = 12345346
+ """
+ )
+ util.write_file(conf_file, conf_content)
+ # Prepare the custom sript
+ customscript = self.tmp_path("test-script", self.tdir)
+ util.write_file(customscript, "This is the post cust script")
+
+ with mock.patch(
+ MPATH + "guestcust_util.get_tools_config",
+ return_value="invalid",
+ ):
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ self.assertIn(
+ "Custom script is disabled by VM Administrator",
+ self.logs.getvalue(),
+ )
+
+ def test_get_imc_data_cust_script_enabled(self):
+ """
+ If custom script is enabled by VMware tools configuration,
+ execute the script.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": False},
+ distro={},
+ paths=paths,
+ )
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CUSTOM-SCRIPT]
+ SCRIPT-NAME = test-script
+ [MISC]
+ MARKER-ID = 12345346
+ """
+ )
+ util.write_file(conf_file, conf_content)
+
+ # Mock custom script is enabled by return true when calling
+ # get_tools_config
+ with mock.patch(
+ MPATH + "guestcust_util.get_tools_config",
+ return_value="true",
+ ):
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ # Verify custom script is trying to be executed
+ custom_script = self.tmp_path("test-script", self.tdir)
+ self.assertIn(
+ "Script %s not found!!" % custom_script,
+ self.logs.getvalue(),
+ )
+
+ def test_get_imc_data_force_run_post_script_is_yes(self):
+ """
+ If DEFAULT-RUN-POST-CUST-SCRIPT is yes, custom script could run if
+ enable-custom-scripts is not defined in VM Tools configuration
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": False},
+ distro={},
+ paths=paths,
+ )
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ # set DEFAULT-RUN-POST-CUST-SCRIPT = yes so that enable-custom-scripts
+ # default value is TRUE
+ conf_content = dedent(
+ """\
+ [CUSTOM-SCRIPT]
+ SCRIPT-NAME = test-script
+ [MISC]
+ MARKER-ID = 12345346
+ DEFAULT-RUN-POST-CUST-SCRIPT = yes
+ """
+ )
+ util.write_file(conf_file, conf_content)
+
+ # Mock get_tools_config(section, key, defaultVal) to return
+ # defaultVal
+ def my_get_tools_config(*args, **kwargs):
+ return args[2]
+
+ with mock.patch(
+ MPATH + "guestcust_util.get_tools_config",
+ side_effect=my_get_tools_config,
+ ):
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ # Verify custom script still runs although it is
+ # disabled by VMware Tools
+ custom_script = self.tmp_path("test-script", self.tdir)
+ self.assertIn(
+ "Script %s not found!!" % custom_script,
+ self.logs.getvalue(),
+ )
+
+ def test_get_data_cloudinit_metadata_json(self):
+ """
+ Test metadata can be loaded to cloud-init metadata and network.
+ The metadata format is json.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": True},
+ distro={},
+ paths=paths,
+ )
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """
+ )
+ util.write_file(conf_file, conf_content)
+ # Prepare the meta data file
+ metadata_file = self.tmp_path("test-meta", self.tdir)
+ metadata_content = dedent(
+ """\
+ {
+ "instance-id": "cloud-vm",
+ "local-hostname": "my-host.domain.com",
+ "network": {
+ "version": 2,
+ "ethernets": {
+ "eths": {
+ "match": {
+ "name": "ens*"
+ },
+ "dhcp4": true
+ }
+ }
+ }
+ }
+ """
+ )
+ util.write_file(metadata_file, metadata_content)
+
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ "guestcust_util.get_imc_dir_path": self.tdir,
+ },
+ ds._get_data,
+ )
+ self.assertTrue(result)
+ self.assertEqual("cloud-vm", ds.metadata["instance-id"])
+ self.assertEqual("my-host.domain.com", ds.metadata["local-hostname"])
+ self.assertEqual(2, ds.network_config["version"])
+ self.assertTrue(ds.network_config["ethernets"]["eths"]["dhcp4"])
+
+ def test_get_data_cloudinit_metadata_yaml(self):
+ """
+ Test metadata can be loaded to cloud-init metadata and network.
+ The metadata format is yaml.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": True},
+ distro={},
+ paths=paths,
+ )
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """
+ )
+ util.write_file(conf_file, conf_content)
+ # Prepare the meta data file
+ metadata_file = self.tmp_path("test-meta", self.tdir)
+ metadata_content = dedent(
+ """\
+ instance-id: cloud-vm
+ local-hostname: my-host.domain.com
+ network:
+ version: 2
+ ethernets:
+ nics:
+ match:
+ name: ens*
+ dhcp4: yes
+ """
+ )
+ util.write_file(metadata_file, metadata_content)
+
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ "guestcust_util.get_imc_dir_path": self.tdir,
+ },
+ ds._get_data,
+ )
+ self.assertTrue(result)
+ self.assertEqual("cloud-vm", ds.metadata["instance-id"])
+ self.assertEqual("my-host.domain.com", ds.metadata["local-hostname"])
+ self.assertEqual(2, ds.network_config["version"])
+ self.assertTrue(ds.network_config["ethernets"]["nics"]["dhcp4"])
+
+ def test_get_imc_data_cloudinit_metadata_not_valid(self):
+ """
+ Test metadata is not JSON or YAML format, log a message
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": True},
+ distro={},
+ paths=paths,
+ )
+
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """
+ )
+ util.write_file(conf_file, conf_content)
+
+ # Prepare the meta data file
+ metadata_file = self.tmp_path("test-meta", self.tdir)
+ metadata_content = "[This is not json or yaml format]a=b"
+ util.write_file(metadata_file, metadata_content)
+
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ "guestcust_util.get_imc_dir_path": self.tdir,
+ },
+ ds.get_data,
+ )
+ self.assertFalse(result)
+ self.assertIn(
+ "expected '<document start>', but found '<scalar>'",
+ self.logs.getvalue(),
+ )
+
+ def test_get_imc_data_cloudinit_metadata_not_found(self):
+ """
+ Test metadata file can't be found, log a message
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": True},
+ distro={},
+ paths=paths,
+ )
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """
+ )
+ util.write_file(conf_file, conf_content)
+ # Don't prepare the meta data file
+
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ "guestcust_util.get_imc_dir_path": self.tdir,
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ self.assertIn("Meta data file is not found", self.logs.getvalue())
+
+ def test_get_data_cloudinit_userdata(self):
+ """
+ Test user data can be loaded to cloud-init user data.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": False},
+ distro={},
+ paths=paths,
+ )
+
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CLOUDINIT]
+ METADATA = test-meta
+ USERDATA = test-user
+ """
+ )
+ util.write_file(conf_file, conf_content)
+
+ # Prepare the meta data file
+ metadata_file = self.tmp_path("test-meta", self.tdir)
+ metadata_content = dedent(
+ """\
+ instance-id: cloud-vm
+ local-hostname: my-host.domain.com
+ network:
+ version: 2
+ ethernets:
+ nics:
+ match:
+ name: ens*
+ dhcp4: yes
+ """
+ )
+ util.write_file(metadata_file, metadata_content)
+
+ # Prepare the user data file
+ userdata_file = self.tmp_path("test-user", self.tdir)
+ userdata_content = "This is the user data"
+ util.write_file(userdata_file, userdata_content)
+
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ "guestcust_util.get_imc_dir_path": self.tdir,
+ },
+ ds._get_data,
+ )
+ self.assertTrue(result)
+ self.assertEqual("cloud-vm", ds.metadata["instance-id"])
+ self.assertEqual(userdata_content, ds.userdata_raw)
+
+ def test_get_imc_data_cloudinit_userdata_not_found(self):
+ """
+ Test userdata file can't be found.
+ """
+ paths = helpers.Paths({"cloud_dir": self.tdir})
+ ds = self.datasource(
+ sys_cfg={"disable_vmware_customization": True},
+ distro={},
+ paths=paths,
+ )
+
+ # Prepare the conf file
+ conf_file = self.tmp_path("test-cust", self.tdir)
+ conf_content = dedent(
+ """\
+ [CLOUDINIT]
+ METADATA = test-meta
+ USERDATA = test-user
+ """
+ )
+ util.write_file(conf_file, conf_content)
+
+ # Prepare the meta data file
+ metadata_file = self.tmp_path("test-meta", self.tdir)
+ metadata_content = dedent(
+ """\
+ instance-id: cloud-vm
+ local-hostname: my-host.domain.com
+ network:
+ version: 2
+ ethernets:
+ nics:
+ match:
+ name: ens*
+ dhcp4: yes
+ """
+ )
+ util.write_file(metadata_file, metadata_content)
+
+ # Don't prepare the user data file
+
+ with mock.patch(
+ MPATH + "guestcust_util.set_customization_status",
+ return_value=("msg", b""),
+ ):
+ result = wrap_and_call(
+ "cloudinit.sources.DataSourceVMware",
+ {
+ "dmi.read_dmi_data": "vmware",
+ "util.del_dir": True,
+ "guestcust_util.search_file": self.tdir,
+ "guestcust_util.wait_for_cust_cfg_file": conf_file,
+ "guestcust_util.get_imc_dir_path": self.tdir,
+ },
+ ds.get_imc_data_fn,
+ )
+ self.assertEqual(result, (None, None, None))
+ self.assertIn("Userdata file is not found", self.logs.getvalue())
+
+
+class TestDataSourceVMwareIMC_MarkerFiles(CiTestCase):
+ def setUp(self):
+ super(TestDataSourceVMwareIMC_MarkerFiles, self).setUp()
+ self.tdir = self.tmp_dir()
+
+ def test_false_when_markerid_none(self):
+ """Return False when markerid provided is None."""
+ self.assertFalse(
+ guestcust_util.check_marker_exists(
+ markerid=None, marker_dir=self.tdir
+ )
+ )
+
+ def test_markerid_file_exist(self):
+ """Return False when markerid file path does not exist,
+ True otherwise."""
+ self.assertFalse(guestcust_util.check_marker_exists("123", self.tdir))
+ marker_file = self.tmp_path(".markerfile-123.txt", self.tdir)
+ util.write_file(marker_file, "")
+ self.assertTrue(guestcust_util.check_marker_exists("123", self.tdir))
+
+ def test_marker_file_setup(self):
+ """Test creation of marker files."""
+ markerfilepath = self.tmp_path(".markerfile-hi.txt", self.tdir)
+ self.assertFalse(os.path.exists(markerfilepath))
+ guestcust_util.setup_marker_files(marker_id="hi", marker_dir=self.tdir)
+ self.assertTrue(os.path.exists(markerfilepath))
+
+
def assert_metadata(test_obj, ds, metadata):
test_obj.assertEqual(metadata.get("instance-id"), ds.get_instance_id())
test_obj.assertEqual(
diff --git a/tests/unittests/sources/test_vultr.py b/tests/unittests/sources/test_vultr.py
index 27481e8e..488df4f3 100644
--- a/tests/unittests/sources/test_vultr.py
+++ b/tests/unittests/sources/test_vultr.py
@@ -30,6 +30,9 @@ VULTR_V1_1 = {
},
},
"hostname": "CLOUDINIT_1",
+ "local-hostname": "CLOUDINIT_1",
+ "instance-v2-id": "29bea708-2e6e-480a-90ad-0e6b5d5ad62f",
+ "instance-id": "29bea708-2e6e-480a-90ad-0e6b5d5ad62f",
"instanceid": "42506325",
"interfaces": [
{
@@ -50,7 +53,7 @@ VULTR_V1_1 = {
}
],
"public-keys": ["ssh-rsa AAAAB3NzaC1y...IQQhv5PAOKaIl+mM3c= test3@key"],
- "region": {"regioncode": "EWR"},
+ "region": "us",
"user-defined": [],
"startup-script": "echo No configured startup script",
"raid1-script": "",
@@ -85,7 +88,9 @@ VULTR_V1_2 = {
},
},
"hostname": "CLOUDINIT_2",
+ "local-hostname": "CLOUDINIT_2",
"instance-v2-id": "29bea708-2e6e-480a-90ad-0e6b5d5ad62f",
+ "instance-id": "29bea708-2e6e-480a-90ad-0e6b5d5ad62f",
"instanceid": "42872224",
"interfaces": [
{
@@ -121,7 +126,7 @@ VULTR_V1_2 = {
},
],
"public-keys": ["ssh-rsa AAAAB3NzaC1y...IQQhv5PAOKaIl+mM3c= test3@key"],
- "region": {"regioncode": "EWR"},
+ "region": "us",
"user-defined": [],
"startup-script": "echo No configured startup script",
"user-data": [],
@@ -139,8 +144,46 @@ VULTR_V1_2 = {
],
}
+VULTR_V1_3 = None
+
SSH_KEYS_1 = ["ssh-rsa AAAAB3NzaC1y...IQQhv5PAOKaIl+mM3c= test3@key"]
+CLOUD_INTERFACES = {
+ "version": 1,
+ "config": [
+ {
+ "type": "nameserver",
+ "address": ["108.61.10.10", "2001:19f0:300:1704::6"],
+ },
+ {
+ "type": "physical",
+ "mac_address": "56:00:03:1b:4e:ca",
+ "accept-ra": 1,
+ "subnets": [
+ {"type": "dhcp", "control": "auto"},
+ {"type": "ipv6_slaac", "control": "auto"},
+ {
+ "type": "static6",
+ "control": "auto",
+ "address": "2002:19f0:5:28a7::/64",
+ },
+ ],
+ },
+ {
+ "type": "physical",
+ "mac_address": "5a:00:03:1b:4e:ca",
+ "subnets": [
+ {
+ "type": "static",
+ "control": "auto",
+ "address": "10.1.112.3",
+ "netmask": "255.255.240.0",
+ }
+ ],
+ },
+ ],
+}
+
INTERFACES = ["lo", "dummy0", "eth1", "eth0", "eth2"]
ORDERED_INTERFACES = ["eth0", "eth1", "eth2"]
@@ -241,8 +284,14 @@ def check_route(url):
class TestDataSourceVultr(CiTestCase):
def setUp(self):
+ global VULTR_V1_3
super(TestDataSourceVultr, self).setUp()
+ # Create v3
+ VULTR_V1_3 = VULTR_V1_2.copy()
+ VULTR_V1_3["cloud_interfaces"] = CLOUD_INTERFACES.copy()
+ VULTR_V1_3["interfaces"] = []
+
# Stored as a dict to make it easier to maintain
raw1 = json.dumps(VULTR_V1_1["vendor-data"][0])
raw2 = json.dumps(VULTR_V1_2["vendor-data"][0])
@@ -250,6 +299,7 @@ class TestDataSourceVultr(CiTestCase):
# Make expected format
VULTR_V1_1["vendor-data"] = [raw1]
VULTR_V1_2["vendor-data"] = [raw2]
+ VULTR_V1_3["vendor-data"] = [raw2]
self.tmp = self.tmp_dir()
@@ -297,6 +347,28 @@ class TestDataSourceVultr(CiTestCase):
# Test network config generation
self.assertEqual(EXPECTED_VULTR_NETWORK_2, source.network_config)
+ # Test the datasource with new network config type
+ @mock.patch("cloudinit.net.get_interfaces_by_mac")
+ @mock.patch("cloudinit.sources.helpers.vultr.is_vultr")
+ @mock.patch("cloudinit.sources.helpers.vultr.get_metadata")
+ def test_datasource_cloud_interfaces(
+ self, mock_getmeta, mock_isvultr, mock_netmap
+ ):
+ mock_getmeta.return_value = VULTR_V1_3
+ mock_isvultr.return_value = True
+ mock_netmap.return_value = INTERFACE_MAP
+
+ distro = mock.MagicMock()
+ distro.get_tmp_exec_path = self.tmp_dir
+ source = DataSourceVultr.DataSourceVultr(
+ settings.CFG_BUILTIN, distro, helpers.Paths({"run_dir": self.tmp})
+ )
+
+ source._get_data()
+
+ # Test network config generation
+ self.assertEqual(EXPECTED_VULTR_NETWORK_2, source.network_config)
+
# Test network config generation
@mock.patch("cloudinit.net.get_interfaces_by_mac")
def test_network_config(self, mock_netmap):
diff --git a/tests/unittests/sources/vmware/test_vmware_config_file.py b/tests/unittests/sources/vmware/test_vmware_config_file.py
index 38d45d0e..2fc2e21c 100644
--- a/tests/unittests/sources/vmware/test_vmware_config_file.py
+++ b/tests/unittests/sources/vmware/test_vmware_config_file.py
@@ -1,5 +1,5 @@
# Copyright (C) 2015 Canonical Ltd.
-# Copyright (C) 2016 VMware INC.
+# Copyright (C) 2016-2022 VMware INC.
#
# Author: Sankar Tanguturi <stanguturi@vmware.com>
# Pengpeng Sun <pengpengs@vmware.com>
@@ -12,10 +12,6 @@ import sys
import tempfile
import textwrap
-from cloudinit.sources.DataSourceOVF import (
- get_network_config_from_conf,
- read_vmware_imc,
-)
from cloudinit.sources.helpers.vmware.imc.boot_proto import BootProtoEnum
from cloudinit.sources.helpers.vmware.imc.config import Config
from cloudinit.sources.helpers.vmware.imc.config_file import (
@@ -25,6 +21,10 @@ from cloudinit.sources.helpers.vmware.imc.config_nic import (
NicConfigurator,
gen_subnet,
)
+from cloudinit.sources.helpers.vmware.imc.guestcust_util import (
+ get_network_data_from_vmware_cust_cfg,
+ get_non_network_data_from_vmware_cust_cfg,
+)
from tests.unittests.helpers import CiTestCase, cloud_init_project_dir
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
@@ -63,23 +63,29 @@ class TestVmwareConfigFile(CiTestCase):
self.assertFalse(cf.should_keep_current_value("BAR"), "keepBar")
self.assertTrue(cf.should_remove_current_value("BAR"), "removeBar")
- def test_datasource_instance_id(self):
- """Tests instance id for the DatasourceOVF"""
+ def test_configfile_without_instance_id(self):
+ """
+ Tests instance id is None when configuration file has no instance id
+ """
cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
+ conf = Config(cf)
- instance_id_prefix = "iid-vmware-"
+ (md1, _) = get_non_network_data_from_vmware_cust_cfg(conf)
+ self.assertFalse("instance-id" in md1)
- conf = Config(cf)
+ (md2, _) = get_non_network_data_from_vmware_cust_cfg(conf)
+ self.assertFalse("instance-id" in md2)
- (md1, _, _) = read_vmware_imc(conf)
- self.assertIn(instance_id_prefix, md1["instance-id"])
- self.assertEqual(md1["instance-id"], "iid-vmware-imc")
+ def test_configfile_with_instance_id(self):
+ """Tests instance id get from configuration file"""
+ cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic-instance-id.cfg")
+ conf = Config(cf)
- (md2, _, _) = read_vmware_imc(conf)
- self.assertIn(instance_id_prefix, md2["instance-id"])
- self.assertEqual(md2["instance-id"], "iid-vmware-imc")
+ (md1, _) = get_non_network_data_from_vmware_cust_cfg(conf)
+ self.assertEqual(md1["instance-id"], conf.instance_id, "instance-id")
- self.assertEqual(md2["instance-id"], md1["instance-id"])
+ (md2, _) = get_non_network_data_from_vmware_cust_cfg(conf)
+ self.assertEqual(md2["instance-id"], conf.instance_id, "instance-id")
def test_configfile_static_2nics(self):
"""Tests Config class for a configuration with two static NICs."""
@@ -166,7 +172,7 @@ class TestVmwareConfigFile(CiTestCase):
config = Config(cf)
- network_config = get_network_config_from_conf(config, False)
+ network_config = get_network_data_from_vmware_cust_cfg(config, False)
self.assertEqual(1, network_config.get("version"))
@@ -201,14 +207,14 @@ class TestVmwareConfigFile(CiTestCase):
)
def test_get_config_dns_suffixes(self):
- """Tests if get_network_config_from_conf properly
+ """Tests if get_network_from_vmware_cust_cfg properly
generates nameservers and dns settings from a
specified configuration"""
cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
config = Config(cf)
- network_config = get_network_config_from_conf(config, False)
+ network_config = get_network_data_from_vmware_cust_cfg(config, False)
self.assertEqual(1, network_config.get("version"))