diff options
author | Alberto Contreras <alberto.contreras@canonical.com> | 2022-05-17 19:43:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-17 11:43:49 -0600 |
commit | 3e554d16b4d539c9bd0c743793d391d30bee167a (patch) | |
tree | fd157533b1ab6ebf524f9e831c188846a419601d | |
parent | 98388b5ddc8f61a873631b8588a4a109b9088abd (diff) | |
download | cloud-init-git-3e554d16b4d539c9bd0c743793d391d30bee167a.tar.gz |
cli: Redact files with permission errors in commands (#1440)
For non-root users, emit warnings and redact on any
/etc/cloud/cloud.cfg.d files which raise permissions errors.
Add tests covering this behavior for query, status and render
cmds.
Migrate `test_render.py` and `test_status.py` to Pytest.
LP: #1953430
SC-658
-rwxr-xr-x | cloudinit/cmd/devel/__init__.py | 3 | ||||
-rwxr-xr-x | cloudinit/cmd/status.py | 19 | ||||
-rw-r--r-- | cloudinit/stages.py | 13 | ||||
-rw-r--r-- | cloudinit/util.py | 32 | ||||
-rw-r--r-- | pyproject.toml | 5 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_render.py | 176 | ||||
-rw-r--r-- | tests/unittests/cmd/test_query.py | 96 | ||||
-rw-r--r-- | tests/unittests/cmd/test_status.py | 782 | ||||
-rw-r--r-- | tests/unittests/test_stages.py | 53 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 219 |
10 files changed, 746 insertions, 652 deletions
diff --git a/cloudinit/cmd/devel/__init__.py b/cloudinit/cmd/devel/__init__.py index ead5f7a9..9a8f2ebd 100755 --- a/cloudinit/cmd/devel/__init__.py +++ b/cloudinit/cmd/devel/__init__.py @@ -6,6 +6,7 @@ import logging from cloudinit import log +from cloudinit.helpers import Paths from cloudinit.stages import Init @@ -16,7 +17,7 @@ def addLogHandlerCLI(logger, log_level): return logger -def read_cfg_paths(): +def read_cfg_paths() -> Paths: """Return a Paths object based on the system configuration on disk.""" init = Init(ds_deps=[]) init.read_cfg() diff --git a/cloudinit/cmd/status.py b/cloudinit/cmd/status.py index f3b4f161..1c7c209b 100755 --- a/cloudinit/cmd/status.py +++ b/cloudinit/cmd/status.py @@ -11,9 +11,10 @@ import enum import os import sys from time import gmtime, sleep, strftime +from typing import Tuple +from cloudinit.cmd.devel import read_cfg_paths from cloudinit.distros import uses_systemd -from cloudinit.stages import Init from cloudinit.util import get_cmdline, load_file, load_json CLOUDINIT_DISABLED_FILE = "/etc/cloud/cloud-init.disabled" @@ -64,17 +65,16 @@ def get_parser(parser=None): return parser -def handle_status_args(name, args): +def handle_status_args(name, args) -> int: """Handle calls to 'cloud-init status' as a subcommand.""" # Read configured paths - init = Init(ds_deps=[]) - init.read_cfg() - status, status_detail, time = get_status_details(init.paths) + paths = read_cfg_paths() + status, status_detail, time = get_status_details(paths) if args.wait: while status in (UXAppStatus.NOT_RUN, UXAppStatus.RUNNING): sys.stdout.write(".") sys.stdout.flush() - status, status_detail, time = get_status_details(init.paths) + status, status_detail, time = get_status_details(paths) sleep(0.25) sys.stdout.write("\n") print("status: {0}".format(status.value)) @@ -115,17 +115,14 @@ def _is_cloudinit_disabled(disable_file, paths): return (is_disabled, reason) -def get_status_details(paths=None): +def get_status_details(paths=None) -> Tuple[UXAppStatus, str, str]: """Return a 3-tuple of status, status_details and time of last event. @param paths: An initialized cloudinit.helpers.paths object. Values are obtained from parsing paths.run_dir/status.json. """ - if not paths: - init = Init(ds_deps=[]) - init.read_cfg() - paths = init.paths + paths = paths or read_cfg_paths() status = UXAppStatus.NOT_RUN status_detail = "" diff --git a/cloudinit/stages.py b/cloudinit/stages.py index 4ebe413b..27af6055 100644 --- a/cloudinit/stages.py +++ b/cloudinit/stages.py @@ -9,7 +9,7 @@ import os import pickle import sys from collections import namedtuple -from typing import Dict, List, Optional, Set +from typing import Dict, Iterable, List, Optional, Set from cloudinit import cloud, distros, handlers, helpers, importer from cloudinit import log as logging @@ -58,12 +58,12 @@ def update_event_enabled( case, we only have the data source's `default_update_events`, so an event that should be enabled in userdata may be denied. """ - default_events = ( - datasource.default_update_events - ) # type: Dict[EventScope, Set[EventType]] - user_events = userdata_to_events( + default_events: Dict[ + EventScope, Set[EventType] + ] = datasource.default_update_events + user_events: Dict[EventScope, Set[EventType]] = userdata_to_events( cfg.get("updates", {}) - ) # type: Dict[EventScope, Set[EventType]] + ) # A value in the first will override a value in the second allowed = util.mergemanydict( [ @@ -73,6 +73,7 @@ def update_event_enabled( ) LOG.debug("Allowed events: %s", allowed) + scopes: Iterable[EventScope] if not scope: scopes = allowed.keys() else: diff --git a/cloudinit/util.py b/cloudinit/util.py index da2f63da..2639478a 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -32,7 +32,8 @@ import subprocess import sys import time from base64 import b64decode, b64encode -from errno import ENOENT +from collections import deque +from errno import EACCES, ENOENT from functools import lru_cache from typing import List from urllib import parse @@ -998,6 +999,7 @@ def read_seeded(base="", ext="", timeout=5, retries=10, file_retries=0): def read_conf_d(confd): + """Read configuration directory.""" # Get reverse sorted list (later trumps newer) confs = sorted(os.listdir(confd), reverse=True) @@ -1010,13 +1012,27 @@ def read_conf_d(confd): # Load them all so that they can be merged cfgs = [] for fn in confs: - cfgs.append(read_conf(os.path.join(confd, fn))) + try: + cfgs.append(read_conf(os.path.join(confd, fn))) + except OSError as e: + if e.errno == EACCES: + LOG.warning( + "REDACTED config part %s/%s for non-root user", confd, fn + ) return mergemanydict(cfgs) def read_conf_with_confd(cfgfile): - cfg = read_conf(cfgfile) + cfgs = deque() + cfg: dict = {} + try: + cfg = read_conf(cfgfile) + except OSError as e: + if e.errno == EACCES: + LOG.warning("REDACTED config part %s for non-root user", cfgfile) + else: + cfgs.append(cfg) confd = False if "conf_d" in cfg: @@ -1032,12 +1048,12 @@ def read_conf_with_confd(cfgfile): elif os.path.isdir("%s.d" % cfgfile): confd = "%s.d" % cfgfile - if not confd or not os.path.isdir(confd): - return cfg + if confd and os.path.isdir(confd): + # Conf.d settings override input configuration + confd_cfg = read_conf_d(confd) + cfgs.appendleft(confd_cfg) - # Conf.d settings override input configuration - confd_cfg = read_conf_d(confd) - return mergemanydict([confd_cfg, cfg]) + return mergemanydict(cfgs) def read_conf_from_cmdline(cmdline=None): diff --git a/pyproject.toml b/pyproject.toml index e1fb7dca..1aac03a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,6 @@ exclude=[ '^cloudinit/sources/DataSourceVMware\.py$', '^cloudinit/sources/__init__\.py$', '^cloudinit/sources/helpers/vmware/imc/config_file\.py$', - '^cloudinit/stages\.py$', '^cloudinit/templater\.py$', '^cloudinit/url_helper\.py$', '^conftest\.py$', @@ -56,12 +55,9 @@ exclude=[ '^tests/integration_tests/modules/test_growpart\.py$', '^tests/integration_tests/modules/test_ssh_keysfile\.py$', '^tests/unittests/__init__\.py$', - '^tests/unittests/cmd/devel/test_render\.py$', '^tests/unittests/cmd/test_clean\.py$', '^tests/unittests/cmd/test_cloud_id\.py$', '^tests/unittests/cmd/test_main\.py$', - '^tests/unittests/cmd/test_query\.py$', - '^tests/unittests/cmd/test_status\.py$', '^tests/unittests/config/test_cc_chef\.py$', '^tests/unittests/config/test_cc_landscape\.py$', '^tests/unittests/config/test_cc_locale\.py$', @@ -96,7 +92,6 @@ exclude=[ '^tests/unittests/test_subp\.py$', '^tests/unittests/test_templating\.py$', '^tests/unittests/test_url_helper\.py$', - '^tests/unittests/test_util\.py$', '^tools/mock-meta\.py$', ] diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py index 4afc64f0..3bc5032f 100644 --- a/tests/unittests/cmd/devel/test_render.py +++ b/tests/unittests/cmd/devel/test_render.py @@ -1,6 +1,5 @@ # This file is part of cloud-init. See LICENSE file for license information. -import os from collections import namedtuple from io import StringIO @@ -8,147 +7,136 @@ from cloudinit.cmd.devel import render from cloudinit.helpers import Paths from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE from cloudinit.util import ensure_dir, write_file -from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja +from tests.unittests.helpers import mock, skipUnlessJinja +M_PATH = "cloudinit.cmd.devel.render." -class TestRender(CiTestCase): - with_logs = True +class TestRender: - args = namedtuple("renderargs", "user_data instance_data debug") + Args = namedtuple("Args", "user_data instance_data debug") - def setUp(self): - super(TestRender, self).setUp() - self.tmp = self.tmp_dir() - - def test_handle_args_error_on_missing_user_data(self): + def test_handle_args_error_on_missing_user_data(self, caplog, tmpdir): """When user_data file path does not exist, log an error.""" - absent_file = self.tmp_path("user-data", dir=self.tmp) - instance_data = self.tmp_path("instance-data", dir=self.tmp) + absent_file = tmpdir.join("user-data") + instance_data = tmpdir.join("instance-data") write_file(instance_data, "{}") - args = self.args( + args = self.Args( user_data=absent_file, instance_data=instance_data, debug=False ) with mock.patch("sys.stderr", new_callable=StringIO): - self.assertEqual(1, render.handle_args("anyname", args)) - self.assertIn( - "Missing user-data file: %s" % absent_file, self.logs.getvalue() - ) + assert render.handle_args("anyname", args) == 1 + assert "Missing user-data file: %s" % absent_file in caplog.text - def test_handle_args_error_on_missing_instance_data(self): + def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): """When instance_data file path does not exist, log an error.""" - user_data = self.tmp_path("user-data", dir=self.tmp) - absent_file = self.tmp_path("instance-data", dir=self.tmp) - args = self.args( + user_data = tmpdir.join("user-data") + absent_file = tmpdir.join("instance-data") + args = self.Args( user_data=user_data, instance_data=absent_file, debug=False ) with mock.patch("sys.stderr", new_callable=StringIO): - self.assertEqual(1, render.handle_args("anyname", args)) - self.assertIn( - "Missing instance-data.json file: %s" % absent_file, - self.logs.getvalue(), + assert render.handle_args("anyname", args) == 1 + assert ( + "Missing instance-data.json file: %s" % absent_file in caplog.text ) - def test_handle_args_defaults_instance_data(self): + @mock.patch(M_PATH + "read_cfg_paths") + def test_handle_args_defaults_instance_data(self, m_paths, caplog, tmpdir): """When no instance_data argument, default to configured run_dir.""" - user_data = self.tmp_path("user-data", dir=self.tmp) - run_dir = self.tmp_path("run_dir", dir=self.tmp) + user_data = tmpdir.join("user-data") + run_dir = tmpdir.join("run_dir") ensure_dir(run_dir) - paths = Paths({"run_dir": run_dir}) - self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") - self.m_paths.return_value = paths - args = self.args(user_data=user_data, instance_data=None, debug=False) + m_paths.return_value = Paths({"run_dir": run_dir}) + args = self.Args(user_data=user_data, instance_data=None, debug=False) with mock.patch("sys.stderr", new_callable=StringIO): - self.assertEqual(1, render.handle_args("anyname", args)) - json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) - self.assertIn( - "Missing instance-data.json file: %s" % json_file, - self.logs.getvalue(), - ) - - def test_handle_args_root_fallback_from_sensitive_instance_data(self): + assert render.handle_args("anyname", args) == 1 + json_file = run_dir.join(INSTANCE_JSON_FILE) + msg = "Missing instance-data.json file: %s" % json_file + assert msg in caplog.text + + @mock.patch(M_PATH + "read_cfg_paths") + def test_handle_args_root_fallback_from_sensitive_instance_data( + self, m_paths, caplog, tmpdir + ): """When root user defaults to sensitive.json.""" - user_data = self.tmp_path("user-data", dir=self.tmp) - run_dir = self.tmp_path("run_dir", dir=self.tmp) + user_data = tmpdir.join("user-data") + run_dir = tmpdir.join("run_dir") ensure_dir(run_dir) - paths = Paths({"run_dir": run_dir}) - self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") - self.m_paths.return_value = paths - args = self.args(user_data=user_data, instance_data=None, debug=False) + m_paths.return_value = Paths({"run_dir": run_dir}) + args = self.Args(user_data=user_data, instance_data=None, debug=False) with mock.patch("sys.stderr", new_callable=StringIO): with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 - self.assertEqual(1, render.handle_args("anyname", args)) - json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) - json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) - self.assertIn( - "WARNING: Missing root-readable %s. Using redacted %s" - % (json_sensitive, json_file), - self.logs.getvalue(), - ) - self.assertIn( - "ERROR: Missing instance-data.json file: %s" % json_file, - self.logs.getvalue(), + assert render.handle_args("anyname", args) == 1 + json_file = run_dir.join(INSTANCE_JSON_FILE) + json_sensitive = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + assert ( + "Missing root-readable %s. Using redacted %s" + % (json_sensitive, json_file) + in caplog.text ) + assert "Missing instance-data.json file: %s" % json_file in caplog.text - def test_handle_args_root_uses_sensitive_instance_data(self): + @mock.patch(M_PATH + "read_cfg_paths") + def test_handle_args_root_uses_sensitive_instance_data( + self, m_paths, tmpdir + ): """When root user, and no instance-data arg, use sensitive.json.""" - user_data = self.tmp_path("user-data", dir=self.tmp) + user_data = tmpdir.join("user-data") write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") - run_dir = self.tmp_path("run_dir", dir=self.tmp) + run_dir = tmpdir.join("run_dir") ensure_dir(run_dir) - json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) + json_sensitive = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) write_file(json_sensitive, '{"my-var": "jinja worked"}') - paths = Paths({"run_dir": run_dir}) - self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") - self.m_paths.return_value = paths - args = self.args(user_data=user_data, instance_data=None, debug=False) - with mock.patch("sys.stderr", new_callable=StringIO): - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - with mock.patch("os.getuid") as m_getuid: - m_getuid.return_value = 0 - self.assertEqual(0, render.handle_args("anyname", args)) - self.assertIn("rendering: jinja worked", m_stdout.getvalue()) + m_paths.return_value = Paths({"run_dir": run_dir}) + args = self.Args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + assert render.handle_args("anyname", args) == 0 + assert "rendering: jinja worked" in m_stdout.getvalue() @skipUnlessJinja() - def test_handle_args_renders_instance_data_vars_in_template(self): + def test_handle_args_renders_instance_data_vars_in_template( + self, caplog, tmpdir + ): """If user_data file is a jinja template render instance-data vars.""" - user_data = self.tmp_path("user-data", dir=self.tmp) + user_data = tmpdir.join("user-data") write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") - instance_data = self.tmp_path("instance-data", dir=self.tmp) + instance_data = tmpdir.join("instance-data") write_file(instance_data, '{"my-var": "jinja worked"}') - args = self.args( + args = self.Args( user_data=user_data, instance_data=instance_data, debug=True ) - with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err: + with mock.patch("sys.stderr", new_callable=StringIO): with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - self.assertEqual(0, render.handle_args("anyname", args)) - self.assertIn( - "DEBUG: Converted jinja variables\n{", self.logs.getvalue() - ) - self.assertIn( - "DEBUG: Converted jinja variables\n{", m_console_err.getvalue() - ) - self.assertEqual("rendering: jinja worked", m_stdout.getvalue()) + assert render.handle_args("anyname", args) == 0 + assert "Converted jinja variables\n{" in caplog.text + # TODO enable after pytest>=3.4 + # more info: https://docs.pytest.org/en/stable/how-to/logging.html + # assert "Converted jinja variables\n{" in m_stderr.getvalue() + assert "rendering: jinja worked" == m_stdout.getvalue() @skipUnlessJinja() - def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self): + def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation( + self, caplog, tmpdir + ): """If user_data file has invalid jinja operations log warnings.""" - user_data = self.tmp_path("user-data", dir=self.tmp) + user_data = tmpdir.join("user-data") write_file(user_data, "##template: jinja\nrendering: {{ my-var }}") - instance_data = self.tmp_path("instance-data", dir=self.tmp) + instance_data = tmpdir.join("instance-data") write_file(instance_data, '{"my-var": "jinja worked"}') - args = self.args( + args = self.Args( user_data=user_data, instance_data=instance_data, debug=True ) with mock.patch("sys.stderr", new_callable=StringIO): - self.assertEqual(1, render.handle_args("anyname", args)) - self.assertIn( - "WARNING: Ignoring jinja template for %s: Undefined jinja" + assert render.handle_args("anyname", args) == 1 + assert ( + "Ignoring jinja template for %s: Undefined jinja" ' variable: "my-var". Jinja tried subtraction. Perhaps you meant' - ' "my_var"?' % user_data, - self.logs.getvalue(), - ) + ' "my_var"?' % user_data + ) in caplog.text # vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py index 03a73bb5..207078fa 100644 --- a/tests/unittests/cmd/test_query.py +++ b/tests/unittests/cmd/test_query.py @@ -20,6 +20,8 @@ from cloudinit.sources import ( from cloudinit.util import b64e, write_file from tests.unittests.helpers import mock +M_PATH = "cloudinit.cmd.query." + def _gzip_data(data): with BytesIO() as iobuf: @@ -28,11 +30,11 @@ def _gzip_data(data): return iobuf.getvalue() -@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "") +@mock.patch(M_PATH + "addLogHandlerCLI", lambda *args: "") class TestQuery: - args = namedtuple( - "queryargs", + Args = namedtuple( + "Args", "debug dump_all format instance_data list_keys user_data vendor_data" " varname", ) @@ -70,7 +72,7 @@ class TestQuery: def test_handle_args_error_on_missing_param(self, caplog, capsys): """Error when missing required parameters and print usage.""" - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -81,7 +83,7 @@ class TestQuery: varname=None, ) with mock.patch( - "cloudinit.cmd.query.addLogHandlerCLI", return_value="" + M_PATH + "addLogHandlerCLI", return_value="" ) as m_cli_log: assert 1 == query.handle_args("anyname", args) expected_error = ( @@ -108,13 +110,13 @@ class TestQuery: ), ), ) - def test_handle_args_error_on_invalid_vaname_paths( + def test_handle_args_error_on_invalid_varname_paths( self, inst_data, varname, expected_error, caplog, tmpdir ): """Error when varname is not a valid instance-data variable path.""" instance_data = tmpdir.join("instance-data") instance_data.write(inst_data) - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -125,12 +127,10 @@ class TestQuery: varname=varname, ) paths, _, _, _ = self._setup_paths(tmpdir) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths - with mock.patch( - "cloudinit.cmd.query.addLogHandlerCLI", return_value="" - ): - with mock.patch("cloudinit.cmd.query.load_userdata") as m_lud: + with mock.patch(M_PATH + "addLogHandlerCLI", return_value=""): + with mock.patch(M_PATH + "load_userdata") as m_lud: m_lud.return_value = "ud" assert 1 == query.handle_args("anyname", args) assert expected_error in caplog.text @@ -138,7 +138,7 @@ class TestQuery: def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): """When instance_data file path does not exist, log an error.""" absent_fn = tmpdir.join("absent") - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -159,7 +159,7 @@ class TestQuery: """When instance_data file is unreadable, log an error.""" noread_fn = tmpdir.join("unreadable") noread_fn.write("thou shall not pass") - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -169,15 +169,47 @@ class TestQuery: vendor_data="vd", varname=None, ) - with mock.patch("cloudinit.cmd.query.util.load_file") as m_load: + with mock.patch(M_PATH + "util.load_file") as m_load: m_load.side_effect = OSError(errno.EACCES, "Not allowed") assert 1 == query.handle_args("anyname", args) msg = "No read permission on '%s'. Try sudo" % noread_fn assert msg in caplog.text + @pytest.mark.parametrize( + "exception", + [ + (OSError(errno.EACCES, "Not allowed"),), + (OSError(errno.ENOENT, "Not allowed"),), + (IOError,), + ], + ) + def test_handle_args_error_when_no_read_permission_init_cfg( + self, exception, capsys + ): + """query.handle_status_args exists with 1 and no sys-output.""" + args = self.Args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + with mock.patch( + M_PATH + "read_cfg_paths", + side_effect=exception, + ) as m_read_cfg_paths: + query.handle_args("anyname", args) + assert m_read_cfg_paths.call_count == 1 + out, err = capsys.readouterr() + assert not out + assert not err + def test_handle_args_defaults_instance_data(self, caplog, tmpdir): """When no instance_data argument, default to configured run_dir.""" - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -188,7 +220,7 @@ class TestQuery: varname=None, ) paths, run_dir, _, _ = self._setup_paths(tmpdir) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths assert 1 == query.handle_args("anyname", args) json_file = run_dir.join(INSTANCE_JSON_FILE) @@ -197,7 +229,7 @@ class TestQuery: def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir): """When no instance_data argument, root falls back to redacted json.""" - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -208,7 +240,7 @@ class TestQuery: varname=None, ) paths, run_dir, _, _ = self._setup_paths(tmpdir) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 @@ -239,7 +271,7 @@ class TestQuery: ) sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) sensitive_file.write('{"my-var": "it worked"}') - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -249,7 +281,7 @@ class TestQuery: vendor_data=vendor_data.strpath, varname=None, ) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 @@ -277,7 +309,7 @@ class TestQuery: vd_path = os.path.join(paths.instance_link, "vendor-data.txt") write_file(vd_path, "instance_link_vd") - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -287,7 +319,7 @@ class TestQuery: vendor_data=None, varname=None, ) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch("os.getuid", return_value=0): assert 0 == query.handle_args("anyname", args) @@ -308,7 +340,7 @@ class TestQuery: ) sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) sensitive_file.write('{"my-var": "it worked"}') - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -318,7 +350,7 @@ class TestQuery: vendor_data=vendor_data.strpath, varname=None, ) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 @@ -334,7 +366,7 @@ class TestQuery: """When --all is specified query will dump all instance data vars.""" instance_data = tmpdir.join("instance-data") instance_data.write('{"my-var": "it worked"}') - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -359,7 +391,7 @@ class TestQuery: """When the argument varname is passed, report its value.""" instance_data = tmpdir.join("instance-data") instance_data.write('{"my-var": "it worked"}') - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -398,7 +430,7 @@ class TestQuery: """If user_data file is a jinja template render instance-data vars.""" instance_data = tmpdir.join("instance-data") instance_data.write(inst_data) - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -440,7 +472,7 @@ class TestQuery: } """ ) - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -466,7 +498,7 @@ class TestQuery: ' "top": "gun"}' ) expected = "top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n" - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -492,7 +524,7 @@ class TestQuery: + ' {"v2_2": "val2.2"}, "top": "gun"}' ) expected = "v1_1\nv1_2\n" - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -518,7 +550,7 @@ class TestQuery: + '{"v2_2": "val2.2"}, "top": "gun"}' ) expected_error = "--list-keys provided but 'top' is not a dict" - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py index c5f424da..e9169a55 100644 --- a/tests/unittests/cmd/test_status.py +++ b/tests/unittests/cmd/test_status.py @@ -4,188 +4,189 @@ import os from collections import namedtuple from io import StringIO from textwrap import dedent +from typing import Callable, Dict, Optional, Union +from unittest import mock + +import pytest from cloudinit.atomic_helper import write_json from cloudinit.cmd import status from cloudinit.util import ensure_file -from tests.unittests.helpers import CiTestCase, mock, wrap_and_call - -mypaths = namedtuple("MyPaths", "run_dir") -myargs = namedtuple("MyArgs", "long wait") - - -class TestStatus(CiTestCase): - def setUp(self): - super(TestStatus, self).setUp() - self.new_root = self.tmp_dir() - self.status_file = self.tmp_path("status.json", self.new_root) - self.disable_file = self.tmp_path("cloudinit-disable", self.new_root) - self.paths = mypaths(run_dir=self.new_root) - - class FakeInit(object): - paths = self.paths - - def __init__(self, ds_deps): - pass - - def read_cfg(self): - pass - - self.init_class = FakeInit - - def test__is_cloudinit_disabled_false_on_sysvinit(self): - """When not in an environment using systemd, return False.""" - ensure_file(self.disable_file) # Create the ignored disable file +from tests.unittests.helpers import wrap_and_call + +M_NAME = "cloudinit.cmd.status" +M_PATH = f"{M_NAME}." + +MyPaths = namedtuple("MyPaths", "run_dir") +MyArgs = namedtuple("MyArgs", "long wait") +Config = namedtuple( + "Config", "new_root, status_file, disable_file, result_file, paths" +) + + +@pytest.fixture(scope="function") +def config(tmpdir): + return Config( + new_root=tmpdir, + status_file=tmpdir.join("status.json"), + disable_file=tmpdir.join("cloudinit-disable"), + result_file=tmpdir.join("result.json"), + paths=MyPaths(run_dir=tmpdir), + ) + + +class TestStatus: + @pytest.mark.parametrize( + [ + "ensured_file", + "uses_systemd", + "get_cmdline", + "expected_is_disabled", + "is_disabled_msg", + "expected_reason", + ], + [ + # When not in an environment using systemd, return False. + pytest.param( + lambda config: config.disable_file, + False, + "root=/dev/my-root not-important", + False, + "expected enabled cloud-init on sysvinit", + "Cloud-init enabled on sysvinit", + id="false_on_sysvinit", + ), + # When using systemd and disable_file is present return disabled. + pytest.param( + lambda config: config.disable_file, + True, + "root=/dev/my-root not-important", + True, + "expected disabled cloud-init", + lambda config: f"Cloud-init disabled by {config.disable_file}", + id="true_on_disable_file", + ), + # Not disabled when using systemd and enabled via commandline. + pytest.param( + lambda config: config.disable_file, + True, + "something cloud-init=enabled else", + False, + "expected enabled cloud-init", + "Cloud-init enabled by kernel command line cloud-init=enabled", + id="false_on_kernel_cmdline_enable", + ), + # When kernel command line disables cloud-init return True. + pytest.param( + None, + True, + "something cloud-init=disabled else", + True, + "expected disabled cloud-init", + "Cloud-init disabled by kernel parameter cloud-init=disabled", + id="true_on_kernel_cmdline", + ), + # When cloud-init-generator writes disabled file return True. + pytest.param( + lambda config: os.path.join(config.paths.run_dir, "disabled"), + True, + "something", + True, + "expected disabled cloud-init", + "Cloud-init disabled by cloud-init-generator", + id="true_when_generator_disables", + ), + # Report enabled when systemd generator creates the enabled file. + pytest.param( + lambda config: os.path.join(config.paths.run_dir, "enabled"), + True, + "something ignored", + False, + "expected enabled cloud-init", + "Cloud-init enabled by systemd cloud-init-generator", + id="false_when_enabled_in_systemd", + ), + ], + ) + def test__is_cloudinit_disabled( + self, + ensured_file: Optional[Callable], + uses_systemd: bool, + get_cmdline: str, + expected_is_disabled: bool, + is_disabled_msg: str, + expected_reason: Union[str, Callable], + config: Config, + ): + if ensured_file is not None: + ensure_file(ensured_file(config)) (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { - "uses_systemd": False, - "get_cmdline": "root=/dev/my-root not-important", + "uses_systemd": uses_systemd, + "get_cmdline": get_cmdline, }, status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertFalse( - is_disabled, "expected enabled cloud-init on sysvinit" + config.disable_file, + config.paths, ) - self.assertEqual("Cloud-init enabled on sysvinit", reason) - - def test__is_cloudinit_disabled_true_on_disable_file(self): - """When using systemd and disable_file is present return disabled.""" - ensure_file(self.disable_file) # Create observed disable file - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - { - "uses_systemd": True, - "get_cmdline": "root=/dev/my-root not-important", - }, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertTrue(is_disabled, "expected disabled cloud-init") - self.assertEqual( - "Cloud-init disabled by {0}".format(self.disable_file), reason - ) - - def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self): - """Not disabled when using systemd and enabled via commandline.""" - ensure_file(self.disable_file) # Create ignored disable file - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - { - "uses_systemd": True, - "get_cmdline": "something cloud-init=enabled else", - }, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertFalse(is_disabled, "expected enabled cloud-init") - self.assertEqual( - "Cloud-init enabled by kernel command line cloud-init=enabled", - reason, - ) - - def test__is_cloudinit_disabled_true_on_kernel_cmdline(self): - """When kernel command line disables cloud-init return True.""" - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - { - "uses_systemd": True, - "get_cmdline": "something cloud-init=disabled else", - }, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertTrue(is_disabled, "expected disabled cloud-init") - self.assertEqual( - "Cloud-init disabled by kernel parameter cloud-init=disabled", - reason, - ) - - def test__is_cloudinit_disabled_true_when_generator_disables(self): - """When cloud-init-generator writes disabled file return True.""" - disabled_file = os.path.join(self.paths.run_dir, "disabled") - ensure_file(disabled_file) - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - {"uses_systemd": True, "get_cmdline": "something"}, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertTrue(is_disabled, "expected disabled cloud-init") - self.assertEqual("Cloud-init disabled by cloud-init-generator", reason) - - def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self): - """Report enabled when systemd generator creates the enabled file.""" - enabled_file = os.path.join(self.paths.run_dir, "enabled") - ensure_file(enabled_file) - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - {"uses_systemd": True, "get_cmdline": "something ignored"}, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertFalse(is_disabled, "expected enabled cloud-init") - self.assertEqual( - "Cloud-init enabled by systemd cloud-init-generator", reason - ) - - def test_status_returns_not_run(self): + assert is_disabled == expected_is_disabled, is_disabled_msg + if isinstance(expected_reason, str): + assert reason == expected_reason + else: + assert reason == expected_reason(config) + + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_returns_not_run(self, m_read_cfg_paths, config: Config): """When status.json does not exist yet, return 'not run'.""" - self.assertFalse( - os.path.exists(self.status_file), "Unexpected status.json found" - ) - cmdargs = myargs(long=False, wait=False) + m_read_cfg_paths.return_value = config.paths + assert not os.path.exists( + config.status_file + ), "Unexpected status.json found" + cmdargs = MyArgs(long=False, wait=False) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", - { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, - }, + M_NAME, + {"_is_cloudinit_disabled": (False, "")}, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(0, retcode) - self.assertEqual("status: not run\n", m_stdout.getvalue()) + assert retcode == 0 + assert m_stdout.getvalue() == "status: not run\n" - def test_status_returns_disabled_long_on_presence_of_disable_file(self): + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_returns_disabled_long_on_presence_of_disable_file( + self, m_read_cfg_paths, config: Config + ): """When cloudinit is disabled, return disabled reason.""" - + m_read_cfg_paths.return_value = config.paths checked_files = [] def fakeexists(filepath): checked_files.append(filepath) - status_file = os.path.join(self.paths.run_dir, "status.json") + status_file = os.path.join(config.paths.run_dir, "status.json") return bool(not filepath == status_file) - cmdargs = myargs(long=True, wait=False) + cmdargs = MyArgs(long=True, wait=False) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "os.path.exists": {"side_effect": fakeexists}, "_is_cloudinit_disabled": ( True, "disabled for some reason", ), - "Init": {"side_effect": self.init_class}, }, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(0, retcode) - self.assertEqual( - [os.path.join(self.paths.run_dir, "status.json")], checked_files - ) + assert retcode == 0 + assert checked_files == [ + os.path.join(config.paths.run_dir, "status.json") + ] expected = dedent( """\ status: disabled @@ -193,246 +194,217 @@ class TestStatus(CiTestCase): disabled for some reason """ ) - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_returns_running_on_no_results_json(self): - """Report running when status.json exists but result.json does not.""" - result_file = self.tmp_path("result.json", self.new_root) - write_json(self.status_file, {}) - self.assertFalse( - os.path.exists(result_file), "Unexpected result.json found" - ) - cmdargs = myargs(long=False, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", - { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, - }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(0, retcode) - self.assertEqual("status: running\n", m_stdout.getvalue()) - - def test_status_returns_running(self): - """Report running when status exists with an unfinished stage.""" - ensure_file(self.tmp_path("result.json", self.new_root)) - write_json( - self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} - ) - cmdargs = myargs(long=False, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + assert m_stdout.getvalue() == expected + + @pytest.mark.parametrize( + [ + "ensured_file", + "status_content", + "assert_file", + "cmdargs", + "expected_retcode", + "expected_status", + ], + [ + # Report running when status.json exists but result.json does not. + pytest.param( + None, + {}, + lambda config: config.result_file, + MyArgs(long=False, wait=False), + 0, + "status: running\n", + id="running_on_no_results_json", + ), + # Report running when status exists with an unfinished stage. + pytest.param( + lambda config: config.result_file, + {"v1": {"init": {"start": 1, "finished": None}}}, + None, + MyArgs(long=False, wait=False), + 0, + "status: running\n", + id="running", + ), + # Report done results.json exists no stages are unfinished. + pytest.param( + lambda config: config.result_file, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": None, # No current stage running + "datasource": ( + "DataSourceNoCloud " + "[seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "blah": {"finished": 123.456}, + "init": { + "errors": [], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(0, retcode) - self.assertEqual("status: running\n", m_stdout.getvalue()) - - def test_status_returns_done(self): - """Report done results.json exists no stages are unfinished.""" - ensure_file(self.tmp_path("result.json", self.new_root)) - write_json( - self.status_file, - { - "v1": { - "stage": None, # No current stage running - "datasource": ( - "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" - "[dsmode=net]" - ), - "blah": {"finished": 123.456}, - "init": { - "errors": [], - "start": 124.567, - "finished": 125.678, - }, - "init-local": {"start": 123.45, "finished": 123.46}, - } - }, - ) - cmdargs = myargs(long=False, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + None, + MyArgs(long=False, wait=False), + 0, + "status: done\n", + id="done", + ), + # Long format of done status includes datasource info. + pytest.param( + lambda config: config.result_file, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud " + "[seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": {"start": 124.567, "finished": 125.678}, + "init-local": {"start": 123.45, "finished": 123.46}, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(0, retcode) - self.assertEqual("status: done\n", m_stdout.getvalue()) - - def test_status_returns_done_long(self): - """Long format of done status includes datasource info.""" - ensure_file(self.tmp_path("result.json", self.new_root)) - write_json( - self.status_file, - { - "v1": { - "stage": None, - "datasource": ( - "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" - "[dsmode=net]" - ), - "init": {"start": 124.567, "finished": 125.678}, - "init-local": {"start": 123.45, "finished": 123.46}, - } - }, - ) - cmdargs = myargs(long=True, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + None, + MyArgs(long=True, wait=False), + 0, + dedent( + """\ + status: done + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + DataSourceNoCloud [seed=/var/.../seed/nocloud-net]\ +[dsmode=net] + """ + ), + id="returns_done_long", + ), + # Reports error when any stage has errors. + pytest.param( + None, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": None, + "blah": {"errors": [], "finished": 123.456}, + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(0, retcode) - expected = dedent( - """\ - status: done - time: Thu, 01 Jan 1970 00:02:05 +0000 - detail: - DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net] - """ - ) - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_on_errors(self): - """Reports error when any stage has errors.""" - write_json( - self.status_file, - { - "v1": { - "stage": None, - "blah": {"errors": [], "finished": 123.456}, - "init": { - "errors": ["error1"], - "start": 124.567, - "finished": 125.678, - }, - "init-local": {"start": 123.45, "finished": 123.46}, - } - }, - ) - cmdargs = myargs(long=False, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + None, + MyArgs(long=False, wait=False), + 1, + "status: error\n", + id="on_errors", + ), + # Long format of error status includes all error messages. + pytest.param( + None, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud " + "[seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": { + "errors": ["error2", "error3"], + "start": 123.45, + "finished": 123.46, + }, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(1, retcode) - self.assertEqual("status: error\n", m_stdout.getvalue()) - - def test_status_on_errors_long(self): - """Long format of error status includes all error messages.""" - write_json( - self.status_file, - { - "v1": { - "stage": None, - "datasource": ( - "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" - "[dsmode=net]" - ), - "init": { - "errors": ["error1"], - "start": 124.567, - "finished": 125.678, - }, - "init-local": { - "errors": ["error2", "error3"], - "start": 123.45, - "finished": 123.46, - }, - } - }, - ) - cmdargs = myargs(long=True, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + None, + MyArgs(long=True, wait=False), + 1, + dedent( + """\ + status: error + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + error1 + error2 + error3 + """ + ), + id="on_errors_long", + ), + # Long format reports the stage in which we are running. + pytest.param( + None, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(1, retcode) - expected = dedent( - """\ - status: error - time: Thu, 01 Jan 1970 00:02:05 +0000 - detail: - error1 - error2 - error3 - """ - ) - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_returns_running_long_format(self): - """Long format reports the stage in which we are running.""" + None, + MyArgs(long=True, wait=False), + 0, + dedent( + """\ + status: running + time: Thu, 01 Jan 1970 00:02:04 +0000 + detail: + Running in stage: init + """ + ), + id="running_long_format", + ), + ], + ) + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_output( + self, + m_read_cfg_paths, + ensured_file: Optional[Callable], + status_content: Dict, + assert_file, + cmdargs: MyArgs, + expected_retcode: int, + expected_status: str, + config: Config, + ): + m_read_cfg_paths.return_value = config.paths + if ensured_file: + ensure_file(ensured_file(config)) write_json( - self.status_file, - { - "v1": { - "stage": "init", - "init": {"start": 124.456, "finished": None}, - "init-local": {"start": 123.45, "finished": 123.46}, - } - }, + config.status_file, + status_content, ) - cmdargs = myargs(long=True, wait=False) + if assert_file: + assert not os.path.exists( + config.result_file + ), f"Unexpected {config.result_file} found" with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", - { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, - }, + M_NAME, + {"_is_cloudinit_disabled": (False, "")}, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(0, retcode) - expected = dedent( - """\ - status: running - time: Thu, 01 Jan 1970 00:02:04 +0000 - detail: - Running in stage: init - """ - ) - self.assertEqual(expected, m_stdout.getvalue()) + assert retcode == expected_retcode + assert m_stdout.getvalue() == expected_status - def test_status_wait_blocks_until_done(self): + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_wait_blocks_until_done( + self, m_read_cfg_paths, config: Config + ): """Specifying wait will poll every 1/4 second until done state.""" + m_read_cfg_paths.return_value = config.paths running_json = { "v1": { "stage": "init", @@ -448,37 +420,41 @@ class TestStatus(CiTestCase): } } - self.sleep_calls = 0 + sleep_calls = 0 def fake_sleep(interval): - self.assertEqual(0.25, interval) - self.sleep_calls += 1 - if self.sleep_calls == 2: - write_json(self.status_file, running_json) - elif self.sleep_calls == 3: - write_json(self.status_file, done_json) - result_file = self.tmp_path("result.json", self.new_root) + nonlocal sleep_calls + assert interval == 0.25 + sleep_calls += 1 + if sleep_calls == 2: + write_json(config.status_file, running_json) + elif sleep_calls == 3: + write_json(config.status_file, done_json) + result_file = config.result_file ensure_file(result_file) - cmdargs = myargs(long=False, wait=True) + cmdargs = MyArgs(long=False, wait=True) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "sleep": {"side_effect": fake_sleep}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, }, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(0, retcode) - self.assertEqual(4, self.sleep_calls) - self.assertEqual("....\nstatus: done\n", m_stdout.getvalue()) - - def test_status_wait_blocks_until_error(self): + assert retcode == 0 + assert sleep_calls == 4 + assert m_stdout.getvalue() == "....\nstatus: done\n" + + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_wait_blocks_until_error( + self, m_read_cfg_paths, config: Config + ): """Specifying wait will poll every 1/4 second until error state.""" + m_read_cfg_paths.return_value = config.paths running_json = { "v1": { "stage": "init", @@ -498,51 +474,53 @@ class TestStatus(CiTestCase): } } - self.sleep_calls = 0 + sleep_calls = 0 def fake_sleep(interval): - self.assertEqual(0.25, interval) - self.sleep_calls += 1 - if self.sleep_calls == 2: - write_json(self.status_file, running_json) - elif self.sleep_calls == 3: - write_json(self.status_file, error_json) - - cmdargs = myargs(long=False, wait=True) + nonlocal sleep_calls + assert interval == 0.25 + sleep_calls += 1 + if sleep_calls == 2: + write_json(config.status_file, running_json) + elif sleep_calls == 3: + write_json(config.status_file, error_json) + + cmdargs = MyArgs(long=False, wait=True) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "sleep": {"side_effect": fake_sleep}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, }, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(1, retcode) - self.assertEqual(4, self.sleep_calls) - self.assertEqual("....\nstatus: error\n", m_stdout.getvalue()) + assert retcode == 1 + assert sleep_calls == 4 + assert m_stdout.getvalue() == "....\nstatus: error\n" - def test_status_main(self): + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_main(self, m_read_cfg_paths, config: Config): """status.main can be run as a standalone script.""" + m_read_cfg_paths.return_value = config.paths write_json( - self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} + config.status_file, + {"v1": {"init": {"start": 1, "finished": None}}}, ) - with self.assertRaises(SystemExit) as context_manager: + with pytest.raises(SystemExit) as e: with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "sys.argv": {"new": ["status"]}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, }, status.main, ) - self.assertEqual(0, context_manager.exception.code) - self.assertEqual("status: running\n", m_stdout.getvalue()) + assert e.value.code == 0 + assert m_stdout.getvalue() == "status: running\n" # vi: ts=4 expandtab syntax=python diff --git a/tests/unittests/test_stages.py b/tests/unittests/test_stages.py index fdf0e490..9fa2e629 100644 --- a/tests/unittests/test_stages.py +++ b/tests/unittests/test_stages.py @@ -13,6 +13,7 @@ from cloudinit.util import write_file from tests.unittests.helpers import mock TEST_INSTANCE_ID = "i-testing" +M_PATH = "cloudinit.stages." class FakeDataSource(sources.DataSource): @@ -58,8 +59,8 @@ class TestInit: write_file(disable_file, "") assert (None, disable_file) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "net_config", [ @@ -80,8 +81,8 @@ class TestInit: assert caplog.records[0].levelname == "DEBUG" assert "network config disabled by cmdline" in caplog.text - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "net_config", [ @@ -102,8 +103,8 @@ class TestInit: assert caplog.records[0].levelname == "DEBUG" assert "network config disabled by initramfs" in caplog.text - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "net_config", [ @@ -130,8 +131,8 @@ class TestInit: assert caplog.records[0].levelname == "DEBUG" assert "network config disabled by ds" in caplog.text - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "net_config", [ @@ -156,8 +157,8 @@ class TestInit: assert caplog.records[0].levelname == "DEBUG" assert "network config disabled by system_cfg" in caplog.text - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -188,8 +189,8 @@ class TestInit: NetworkConfigSource.DS, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -217,8 +218,8 @@ class TestInit: in caplog.text ) - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -246,8 +247,8 @@ class TestInit: in caplog.text ) - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -273,8 +274,8 @@ class TestInit: NetworkConfigSource.CMD_LINE, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -300,8 +301,8 @@ class TestInit: NetworkConfigSource.INITRAMFS, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -327,8 +328,8 @@ class TestInit: NetworkConfigSource.SYSTEM_CFG, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -348,8 +349,8 @@ class TestInit: NetworkConfigSource.DS, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") def test_wb__find_networking_config_returns_fallback( self, m_cmdline, m_initramfs, caplog ): @@ -577,13 +578,13 @@ class TestInit_InitializeFilesystem: As it is replaced with a mock, consumers of this fixture can set `init._cfg` if the default empty dict configuration is not appropriate. """ - with mock.patch("cloudinit.stages.util.ensure_dirs"): + with mock.patch(M_PATH + "util.ensure_dirs"): init = stages.Init() init._cfg = {} init._paths = paths yield init - @mock.patch("cloudinit.stages.util.ensure_file") + @mock.patch(M_PATH + "util.ensure_file") def test_ensure_file_not_called_if_no_log_file_configured( self, m_ensure_file, init ): diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index d22d7747..bcb63787 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -3,6 +3,7 @@ """Tests for cloudinit.util""" import base64 +import errno import io import json import logging @@ -12,7 +13,9 @@ import re import shutil import stat import tempfile +from collections import deque from textwrap import dedent +from typing import Tuple from unittest import mock import pytest @@ -24,6 +27,7 @@ from tests.unittests import helpers from tests.unittests.helpers import CiTestCase LOG = logging.getLogger(__name__) +M_PATH = "cloudinit.util." MOUNT_INFO = [ "68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64", @@ -336,30 +340,108 @@ class FakeCloud(object): return self.hostname -class TestUtil(CiTestCase): +class TestUtil: def test_parse_mount_info_no_opts_no_arg(self): result = util.parse_mount_info("/home", MOUNT_INFO, LOG) - self.assertEqual(("/dev/sda2", "xfs", "/home"), result) + assert ("/dev/sda2", "xfs", "/home") == result def test_parse_mount_info_no_opts_arg(self): result = util.parse_mount_info("/home", MOUNT_INFO, LOG, False) - self.assertEqual(("/dev/sda2", "xfs", "/home"), result) + assert ("/dev/sda2", "xfs", "/home") == result def test_parse_mount_info_with_opts(self): result = util.parse_mount_info("/", MOUNT_INFO, LOG, True) - self.assertEqual(("/dev/sda1", "btrfs", "/", "ro,relatime"), result) + assert ("/dev/sda1", "btrfs", "/", "ro,relatime") == result - @mock.patch("cloudinit.util.get_mount_info") + @mock.patch(M_PATH + "get_mount_info") def test_mount_is_rw(self, m_mount_info): m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "rw,relatime") is_rw = util.mount_is_read_write("/") - self.assertEqual(is_rw, True) + assert is_rw is True - @mock.patch("cloudinit.util.get_mount_info") + @mock.patch(M_PATH + "get_mount_info") def test_mount_is_ro(self, m_mount_info): m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "ro,relatime") is_rw = util.mount_is_read_write("/") - self.assertEqual(is_rw, False) + assert is_rw is False + + @mock.patch( + M_PATH + "read_conf", + side_effect=(OSError(errno.EACCES, "Not allowed"), {"0": "0"}), + ) + def test_read_conf_d_no_permissions( + self, m_read_conf, caplog, capsys, tmpdir + ): + """If a user has not read permission to read a config file then + there is no exception nor stderr output and the user is informed via + logging warnings. + + Note: This is used in cmd, therefore want to keep the invariant of + not outputing to the console and log file permission errors. + """ + confs = [] + for i in range(2): + confs.append(tmpdir.join(f"conf-{i}.cfg")) + confs[i].write("{}") + assert {"0": "0"} == util.read_conf_d(tmpdir) + assert ( + caplog.text.count( + f"REDACTED config part {tmpdir}/conf-1.cfg for non-root user" + ) + == 1 + ) + assert m_read_conf.call_count == 2 + out, err = capsys.readouterr() + assert not out + assert not err + + @pytest.mark.parametrize( + "create_confd,expected_call", + [ + (False, mock.call(deque())), + (True, mock.call(deque([{"my_config": "foo"}]))), + ], + ) + @mock.patch(M_PATH + "mergemanydict") + @mock.patch(M_PATH + "read_conf_d", return_value={"my_config": "foo"}) + @mock.patch( + M_PATH + "read_conf", side_effect=OSError(errno.EACCES, "Not allowed") + ) + def test_read_conf_with_confd_no_permissions( + self, + m_read_conf, + m_read_confd, + m_mergemanydict, + create_confd, + expected_call, + caplog, + capsys, + tmpdir, + ): + """Read a conf file without permission. + + sys output is empty and the user is informed via logging warnings. + + Note: This is used in cmd, therefore want to keep the invariant of + not outputing to the console and log file permission errors. + """ + conf_fn = tmpdir.join("conf.cfg") + if create_confd: + confd_fn = tmpdir.mkdir("conf.cfg.d") + util.read_conf_with_confd(conf_fn) + assert ( + caplog.text.count( + f"REDACTED config part {conf_fn} for non-root user" + ) + == 1 + ) + assert m_read_conf.call_count == 1 + out, err = capsys.readouterr() + assert not out + assert not err + if create_confd: + assert [mock.call(confd_fn)] == m_read_confd.call_args_list + assert [expected_call] == m_mergemanydict.call_args_list class TestSymlink(CiTestCase): @@ -412,9 +494,9 @@ class TestSymlink(CiTestCase): class TestUptime(CiTestCase): - @mock.patch("cloudinit.util.boottime") - @mock.patch("cloudinit.util.os.path.exists") - @mock.patch("cloudinit.util.time.time") + @mock.patch(M_PATH + "boottime") + @mock.patch(M_PATH + "os.path.exists") + @mock.patch(M_PATH + "time.time") def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime): boottime = 1000.0 uptime = 10.0 @@ -688,7 +770,7 @@ class TestGetLinuxDistro(CiTestCase): if path == "/etc/redhat-release": return 1 - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists): """Verify we get the correct name if the os-release file has the distro name in quotes""" @@ -697,7 +779,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("sles", "12.3", platform.machine()), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists): """Verify we get the correct name if the os-release file does not have the distro name in quotes""" @@ -708,7 +790,7 @@ class TestGetLinuxDistro(CiTestCase): @mock.patch("platform.system") @mock.patch("platform.release") - @mock.patch("cloudinit.util._parse_redhat_release") + @mock.patch(M_PATH + "_parse_redhat_release") def test_get_linux_freebsd( self, m_parse_redhat_release, @@ -725,7 +807,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("freebsd", "12.0-RELEASE-p10", ""), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_centos6(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on CentOS 6.""" m_os_release.return_value = REDHAT_RELEASE_CENTOS_6 @@ -733,7 +815,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("centos", "6.10", "Final"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists): """Verify the correct release info on CentOS 7 without os-release.""" m_os_release.return_value = REDHAT_RELEASE_CENTOS_7 @@ -741,7 +823,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("centos", "7.5.1804", "Core"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists): """Verify redhat 7 read from os-release.""" m_os_release.return_value = OS_RELEASE_REDHAT_7 @@ -749,7 +831,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("redhat", "7.5", "Maipo"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists): """Verify redhat 7 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_REDHAT_7 @@ -757,7 +839,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("redhat", "7.5", "Maipo"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists): """Verify redhat 6 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_REDHAT_6 @@ -765,7 +847,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("redhat", "6.10", "Santiago"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_copr_centos(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on COPR CentOS.""" m_os_release.return_value = OS_RELEASE_CENTOS @@ -773,7 +855,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("centos", "7", "Core"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_almalinux8_rhrelease(self, m_os_release, m_path_exists): """Verify almalinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_ALMALINUX_8 @@ -781,7 +863,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_almalinux8_osrelease(self, m_os_release, m_path_exists): """Verify almalinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_ALMALINUX_8 @@ -789,7 +871,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_eurolinux7_rhrelease(self, m_os_release, m_path_exists): """Verify eurolinux 7 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_7 @@ -797,7 +879,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("eurolinux", "7.9", "Minsk"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_eurolinux7_osrelease(self, m_os_release, m_path_exists): """Verify eurolinux 7 read from os-release.""" m_os_release.return_value = OS_RELEASE_EUROLINUX_7 @@ -805,7 +887,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("eurolinux", "7.9", "Minsk"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_eurolinux8_rhrelease(self, m_os_release, m_path_exists): """Verify eurolinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_8 @@ -813,7 +895,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_eurolinux8_osrelease(self, m_os_release, m_path_exists): """Verify eurolinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_EUROLINUX_8 @@ -821,7 +903,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_miraclelinux8_rhrelease( self, m_os_release, m_path_exists ): @@ -831,7 +913,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("miracle", "8.4", "Peony"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_miraclelinux8_osrelease( self, m_os_release, m_path_exists ): @@ -841,7 +923,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("miraclelinux", "8", "Peony"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_rocky8_rhrelease(self, m_os_release, m_path_exists): """Verify rocky linux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_ROCKY_8 @@ -849,7 +931,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_rocky8_osrelease(self, m_os_release, m_path_exists): """Verify rocky linux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_ROCKY_8 @@ -857,7 +939,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_virtuozzo8_rhrelease(self, m_os_release, m_path_exists): """Verify virtuozzo linux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_VIRTUOZZO_8 @@ -865,7 +947,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_virtuozzo8_osrelease(self, m_os_release, m_path_exists): """Verify virtuozzo linux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_VIRTUOZZO_8 @@ -873,7 +955,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_cloud8_rhrelease(self, m_os_release, m_path_exists): """Verify cloudlinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_CLOUDLINUX_8 @@ -881,7 +963,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_cloud8_osrelease(self, m_os_release, m_path_exists): """Verify cloudlinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_CLOUDLINUX_8 @@ -889,7 +971,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_debian(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on Debian.""" m_os_release.return_value = OS_RELEASE_DEBIAN @@ -897,7 +979,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("debian", "9", "stretch"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_openeuler(self, m_os_release, m_path_exists): """Verify get the correct name and release name on Openeuler.""" m_os_release.return_value = OS_RELEASE_OPENEULER_20 @@ -905,7 +987,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("openEuler", "20.03", "LTS-SP2"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_opensuse(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE prior to openSUSE Leap 15. @@ -915,7 +997,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("opensuse", "42.3", platform.machine()), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE for openSUSE Leap 15.0 and later. @@ -925,7 +1007,7 @@ class TestGetLinuxDistro(CiTestCase): dist = util.get_linux_distro() self.assertEqual(("opensuse-leap", "15.0", platform.machine()), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE for openSUSE Tumbleweed @@ -937,7 +1019,7 @@ class TestGetLinuxDistro(CiTestCase): ("opensuse-tumbleweed", "20180920", platform.machine()), dist ) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_photon_os_release(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on PhotonOS""" m_os_release.return_value = OS_RELEASE_PHOTON @@ -1059,6 +1141,9 @@ class TestIsLXD(CiTestCase): class TestReadCcFromCmdline: + + random_string: Tuple + if hasattr(pytest, "param"): random_string = pytest.param( CiTestCase.random_string(), None, id="random_string" @@ -1182,8 +1267,8 @@ class TestMountCb: """Mock an already-mounted device, and yield (device, mount dict)""" device = "/dev/fake0" mountpoint = "/mnt/fake" - with mock.patch("cloudinit.util.subp.subp"): - with mock.patch("cloudinit.util.mounts") as m_mounts: + with mock.patch(M_PATH + "subp.subp"): + with mock.patch(M_PATH + "mounts") as m_mounts: mounts = {device: {"mountpoint": mountpoint}} m_mounts.return_value = mounts yield device, mounts[device] @@ -1206,9 +1291,9 @@ class TestMountCb: ("ufs", "ufs"), ], ) - @mock.patch("cloudinit.util.is_Linux", autospec=True) - @mock.patch("cloudinit.util.is_BSD", autospec=True) - @mock.patch("cloudinit.util.subp.subp") + @mock.patch(M_PATH + "is_Linux", autospec=True) + @mock.patch(M_PATH + "is_BSD", autospec=True) + @mock.patch(M_PATH + "subp.subp") @mock.patch("cloudinit.temp_utils.tempdir", autospec=True) def test_normalize_mtype_on_bsd( self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected @@ -1245,7 +1330,7 @@ class TestMountCb: with pytest.raises(TypeError): util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype) - @mock.patch("cloudinit.util.subp.subp") + @mock.patch(M_PATH + "subp.subp") def test_already_mounted_does_not_mount_or_umount_anything( self, m_subp, already_mounted_device ): @@ -1281,7 +1366,7 @@ class TestMountCb: ] == callback.call_args_list -@mock.patch("cloudinit.util.write_file") +@mock.patch(M_PATH + "write_file") class TestEnsureFile: """Tests for ``cloudinit.util.ensure_file``.""" @@ -1326,9 +1411,9 @@ class TestEnsureFile: assert "ab" == kwargs["omode"] -@mock.patch("cloudinit.util.grp.getgrnam") -@mock.patch("cloudinit.util.os.setgid") -@mock.patch("cloudinit.util.os.umask") +@mock.patch(M_PATH + "grp.getgrnam") +@mock.patch(M_PATH + "os.setgid") +@mock.patch(M_PATH + "os.umask") class TestRedirectOutputPreexecFn: """This tests specifically the preexec_fn used in redirect_output.""" @@ -1344,7 +1429,7 @@ class TestRedirectOutputPreexecFn: args = (test_string, None) elif request.param == "errfmt": args = (None, test_string) - with mock.patch("cloudinit.util.subprocess.Popen") as m_popen: + with mock.patch(M_PATH + "subprocess.Popen") as m_popen: util.redirect_output(*args) assert 1 == m_popen.call_count @@ -1778,7 +1863,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): expected = ("none", "tmpfs", "/run/lock") self.assertEqual(expected, util.parse_mount_info("/run/lock", lines)) - @mock.patch("cloudinit.util.os") + @mock.patch(M_PATH + "os") @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool(self, zpool_output, m_os): # mock /dev/zfs exists @@ -1794,7 +1879,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertIsNotNone(ret) m_os.path.exists.assert_called_with("/dev/zfs") - @mock.patch("cloudinit.util.os") + @mock.patch(M_PATH + "os") def test_get_device_info_from_zpool_no_dev_zfs(self, m_os): # mock /dev/zfs missing m_os.path.exists.return_value = False @@ -1802,7 +1887,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch("cloudinit.util.os") + @mock.patch(M_PATH + "os") @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os): """Handle case where there is no zpool command""" @@ -1812,7 +1897,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch("cloudinit.util.os") + @mock.patch(M_PATH + "os") @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os): # mock /dev/zfs exists @@ -1879,7 +1964,7 @@ class TestIsX86(helpers.CiTestCase): util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch ) - @mock.patch("cloudinit.util.os.uname") + @mock.patch(M_PATH + "os.uname") def test_is_x86_calls_uname_for_architecture(self, m_uname): """is_x86 returns True if platform from uname matches.""" m_uname.return_value = [0, 1, 2, 3, "x86_64"] @@ -1987,7 +2072,7 @@ class TestMultiLog(helpers.FilesystemMockingTestCase): self.assertEqual("", self.stdout.getvalue()) @mock.patch( - "cloudinit.util.write_to_console", + M_PATH + "write_to_console", mock.Mock(side_effect=OSError("Failed to write to console")), ) def test_logs_go_to_stdout_if_writing_to_console_fails_and_fallback_true( @@ -2001,7 +2086,7 @@ class TestMultiLog(helpers.FilesystemMockingTestCase): ) @mock.patch( - "cloudinit.util.write_to_console", + M_PATH + "write_to_console", mock.Mock(side_effect=OSError("Failed to write to console")), ) def test_logs_go_nowhere_if_writing_to_console_fails_and_fallback_false( @@ -2210,7 +2295,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_bad_content_in_os_release_no_effect(self, m_cmdline): """malformed os-release should not raise exception.""" m_cmdline.return_value = "root=/dev/sda" @@ -2220,7 +2305,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): self.reRoot() self.assertFalse(util.system_is_snappy()) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_snap_core_in_cmdline_is_snappy(self, m_cmdline): """The string snap_core= in kernel cmdline indicates snappy.""" cmdline = ( @@ -2233,7 +2318,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): self.assertTrue(util.system_is_snappy()) self.assertTrue(m_cmdline.call_count > 0) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_nothing_found_is_not_snappy(self, m_cmdline): """If no positive identification, then not snappy.""" m_cmdline.return_value = "root=/dev/sda" @@ -2241,7 +2326,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): self.assertFalse(util.system_is_snappy()) self.assertTrue(m_cmdline.call_count > 0) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_channel_ini_with_snappy_is_snappy(self, m_cmdline): """A Channel.ini file with 'ubuntu-core' indicates snappy.""" m_cmdline.return_value = "root=/dev/sda" @@ -2251,7 +2336,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_system_image_config_dir_is_snappy(self, m_cmdline): """Existence of /etc/system-image/config.d indicates snappy.""" m_cmdline.return_value = "root=/dev/sda" @@ -2296,7 +2381,7 @@ class TestGetProcEnv(helpers.TestCase): # return the value portion of key=val decoded. return blob.split(b"=", 1)[1].decode(encoding, errors) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_non_utf8_in_environment(self, m_load_file): """env may have non utf-8 decodable content.""" content = self.null.join( @@ -2315,7 +2400,7 @@ class TestGetProcEnv(helpers.TestCase): ) self.assertEqual(1, m_load_file.call_count) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_encoding_none_returns_bytes(self, m_load_file): """encoding none returns bytes.""" lines = (self.bootflag, self.simple1, self.simple2, self.mixed) @@ -2328,7 +2413,7 @@ class TestGetProcEnv(helpers.TestCase): ) self.assertEqual(1, m_load_file.call_count) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_all_utf8_encoded(self, m_load_file): """common path where only utf-8 decodable content.""" content = self.null.join((self.simple1, self.simple2)) @@ -2338,7 +2423,7 @@ class TestGetProcEnv(helpers.TestCase): ) self.assertEqual(1, m_load_file.call_count) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_non_existing_file_returns_empty_dict(self, m_load_file): """as implemented, a non-existing pid returns empty dict. This is how it was originally implemented.""" |