summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlberto Contreras <alberto.contreras@canonical.com>2022-05-17 19:43:49 +0200
committerGitHub <noreply@github.com>2022-05-17 11:43:49 -0600
commit3e554d16b4d539c9bd0c743793d391d30bee167a (patch)
treefd157533b1ab6ebf524f9e831c188846a419601d
parent98388b5ddc8f61a873631b8588a4a109b9088abd (diff)
downloadcloud-init-git-3e554d16b4d539c9bd0c743793d391d30bee167a.tar.gz
cli: Redact files with permission errors in commands (#1440)
For non-root users, emit warnings and redact on any /etc/cloud/cloud.cfg.d files which raise permissions errors. Add tests covering this behavior for query, status and render cmds. Migrate `test_render.py` and `test_status.py` to Pytest. LP: #1953430 SC-658
-rwxr-xr-xcloudinit/cmd/devel/__init__.py3
-rwxr-xr-xcloudinit/cmd/status.py19
-rw-r--r--cloudinit/stages.py13
-rw-r--r--cloudinit/util.py32
-rw-r--r--pyproject.toml5
-rw-r--r--tests/unittests/cmd/devel/test_render.py176
-rw-r--r--tests/unittests/cmd/test_query.py96
-rw-r--r--tests/unittests/cmd/test_status.py782
-rw-r--r--tests/unittests/test_stages.py53
-rw-r--r--tests/unittests/test_util.py219
10 files changed, 746 insertions, 652 deletions
diff --git a/cloudinit/cmd/devel/__init__.py b/cloudinit/cmd/devel/__init__.py
index ead5f7a9..9a8f2ebd 100755
--- a/cloudinit/cmd/devel/__init__.py
+++ b/cloudinit/cmd/devel/__init__.py
@@ -6,6 +6,7 @@
import logging
from cloudinit import log
+from cloudinit.helpers import Paths
from cloudinit.stages import Init
@@ -16,7 +17,7 @@ def addLogHandlerCLI(logger, log_level):
return logger
-def read_cfg_paths():
+def read_cfg_paths() -> Paths:
"""Return a Paths object based on the system configuration on disk."""
init = Init(ds_deps=[])
init.read_cfg()
diff --git a/cloudinit/cmd/status.py b/cloudinit/cmd/status.py
index f3b4f161..1c7c209b 100755
--- a/cloudinit/cmd/status.py
+++ b/cloudinit/cmd/status.py
@@ -11,9 +11,10 @@ import enum
import os
import sys
from time import gmtime, sleep, strftime
+from typing import Tuple
+from cloudinit.cmd.devel import read_cfg_paths
from cloudinit.distros import uses_systemd
-from cloudinit.stages import Init
from cloudinit.util import get_cmdline, load_file, load_json
CLOUDINIT_DISABLED_FILE = "/etc/cloud/cloud-init.disabled"
@@ -64,17 +65,16 @@ def get_parser(parser=None):
return parser
-def handle_status_args(name, args):
+def handle_status_args(name, args) -> int:
"""Handle calls to 'cloud-init status' as a subcommand."""
# Read configured paths
- init = Init(ds_deps=[])
- init.read_cfg()
- status, status_detail, time = get_status_details(init.paths)
+ paths = read_cfg_paths()
+ status, status_detail, time = get_status_details(paths)
if args.wait:
while status in (UXAppStatus.NOT_RUN, UXAppStatus.RUNNING):
sys.stdout.write(".")
sys.stdout.flush()
- status, status_detail, time = get_status_details(init.paths)
+ status, status_detail, time = get_status_details(paths)
sleep(0.25)
sys.stdout.write("\n")
print("status: {0}".format(status.value))
@@ -115,17 +115,14 @@ def _is_cloudinit_disabled(disable_file, paths):
return (is_disabled, reason)
-def get_status_details(paths=None):
+def get_status_details(paths=None) -> Tuple[UXAppStatus, str, str]:
"""Return a 3-tuple of status, status_details and time of last event.
@param paths: An initialized cloudinit.helpers.paths object.
Values are obtained from parsing paths.run_dir/status.json.
"""
- if not paths:
- init = Init(ds_deps=[])
- init.read_cfg()
- paths = init.paths
+ paths = paths or read_cfg_paths()
status = UXAppStatus.NOT_RUN
status_detail = ""
diff --git a/cloudinit/stages.py b/cloudinit/stages.py
index 4ebe413b..27af6055 100644
--- a/cloudinit/stages.py
+++ b/cloudinit/stages.py
@@ -9,7 +9,7 @@ import os
import pickle
import sys
from collections import namedtuple
-from typing import Dict, List, Optional, Set
+from typing import Dict, Iterable, List, Optional, Set
from cloudinit import cloud, distros, handlers, helpers, importer
from cloudinit import log as logging
@@ -58,12 +58,12 @@ def update_event_enabled(
case, we only have the data source's `default_update_events`,
so an event that should be enabled in userdata may be denied.
"""
- default_events = (
- datasource.default_update_events
- ) # type: Dict[EventScope, Set[EventType]]
- user_events = userdata_to_events(
+ default_events: Dict[
+ EventScope, Set[EventType]
+ ] = datasource.default_update_events
+ user_events: Dict[EventScope, Set[EventType]] = userdata_to_events(
cfg.get("updates", {})
- ) # type: Dict[EventScope, Set[EventType]]
+ )
# A value in the first will override a value in the second
allowed = util.mergemanydict(
[
@@ -73,6 +73,7 @@ def update_event_enabled(
)
LOG.debug("Allowed events: %s", allowed)
+ scopes: Iterable[EventScope]
if not scope:
scopes = allowed.keys()
else:
diff --git a/cloudinit/util.py b/cloudinit/util.py
index da2f63da..2639478a 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -32,7 +32,8 @@ import subprocess
import sys
import time
from base64 import b64decode, b64encode
-from errno import ENOENT
+from collections import deque
+from errno import EACCES, ENOENT
from functools import lru_cache
from typing import List
from urllib import parse
@@ -998,6 +999,7 @@ def read_seeded(base="", ext="", timeout=5, retries=10, file_retries=0):
def read_conf_d(confd):
+ """Read configuration directory."""
# Get reverse sorted list (later trumps newer)
confs = sorted(os.listdir(confd), reverse=True)
@@ -1010,13 +1012,27 @@ def read_conf_d(confd):
# Load them all so that they can be merged
cfgs = []
for fn in confs:
- cfgs.append(read_conf(os.path.join(confd, fn)))
+ try:
+ cfgs.append(read_conf(os.path.join(confd, fn)))
+ except OSError as e:
+ if e.errno == EACCES:
+ LOG.warning(
+ "REDACTED config part %s/%s for non-root user", confd, fn
+ )
return mergemanydict(cfgs)
def read_conf_with_confd(cfgfile):
- cfg = read_conf(cfgfile)
+ cfgs = deque()
+ cfg: dict = {}
+ try:
+ cfg = read_conf(cfgfile)
+ except OSError as e:
+ if e.errno == EACCES:
+ LOG.warning("REDACTED config part %s for non-root user", cfgfile)
+ else:
+ cfgs.append(cfg)
confd = False
if "conf_d" in cfg:
@@ -1032,12 +1048,12 @@ def read_conf_with_confd(cfgfile):
elif os.path.isdir("%s.d" % cfgfile):
confd = "%s.d" % cfgfile
- if not confd or not os.path.isdir(confd):
- return cfg
+ if confd and os.path.isdir(confd):
+ # Conf.d settings override input configuration
+ confd_cfg = read_conf_d(confd)
+ cfgs.appendleft(confd_cfg)
- # Conf.d settings override input configuration
- confd_cfg = read_conf_d(confd)
- return mergemanydict([confd_cfg, cfg])
+ return mergemanydict(cfgs)
def read_conf_from_cmdline(cmdline=None):
diff --git a/pyproject.toml b/pyproject.toml
index e1fb7dca..1aac03a8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -42,7 +42,6 @@ exclude=[
'^cloudinit/sources/DataSourceVMware\.py$',
'^cloudinit/sources/__init__\.py$',
'^cloudinit/sources/helpers/vmware/imc/config_file\.py$',
- '^cloudinit/stages\.py$',
'^cloudinit/templater\.py$',
'^cloudinit/url_helper\.py$',
'^conftest\.py$',
@@ -56,12 +55,9 @@ exclude=[
'^tests/integration_tests/modules/test_growpart\.py$',
'^tests/integration_tests/modules/test_ssh_keysfile\.py$',
'^tests/unittests/__init__\.py$',
- '^tests/unittests/cmd/devel/test_render\.py$',
'^tests/unittests/cmd/test_clean\.py$',
'^tests/unittests/cmd/test_cloud_id\.py$',
'^tests/unittests/cmd/test_main\.py$',
- '^tests/unittests/cmd/test_query\.py$',
- '^tests/unittests/cmd/test_status\.py$',
'^tests/unittests/config/test_cc_chef\.py$',
'^tests/unittests/config/test_cc_landscape\.py$',
'^tests/unittests/config/test_cc_locale\.py$',
@@ -96,7 +92,6 @@ exclude=[
'^tests/unittests/test_subp\.py$',
'^tests/unittests/test_templating\.py$',
'^tests/unittests/test_url_helper\.py$',
- '^tests/unittests/test_util\.py$',
'^tools/mock-meta\.py$',
]
diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py
index 4afc64f0..3bc5032f 100644
--- a/tests/unittests/cmd/devel/test_render.py
+++ b/tests/unittests/cmd/devel/test_render.py
@@ -1,6 +1,5 @@
# This file is part of cloud-init. See LICENSE file for license information.
-import os
from collections import namedtuple
from io import StringIO
@@ -8,147 +7,136 @@ from cloudinit.cmd.devel import render
from cloudinit.helpers import Paths
from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE
from cloudinit.util import ensure_dir, write_file
-from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja
+from tests.unittests.helpers import mock, skipUnlessJinja
+M_PATH = "cloudinit.cmd.devel.render."
-class TestRender(CiTestCase):
- with_logs = True
+class TestRender:
- args = namedtuple("renderargs", "user_data instance_data debug")
+ Args = namedtuple("Args", "user_data instance_data debug")
- def setUp(self):
- super(TestRender, self).setUp()
- self.tmp = self.tmp_dir()
-
- def test_handle_args_error_on_missing_user_data(self):
+ def test_handle_args_error_on_missing_user_data(self, caplog, tmpdir):
"""When user_data file path does not exist, log an error."""
- absent_file = self.tmp_path("user-data", dir=self.tmp)
- instance_data = self.tmp_path("instance-data", dir=self.tmp)
+ absent_file = tmpdir.join("user-data")
+ instance_data = tmpdir.join("instance-data")
write_file(instance_data, "{}")
- args = self.args(
+ args = self.Args(
user_data=absent_file, instance_data=instance_data, debug=False
)
with mock.patch("sys.stderr", new_callable=StringIO):
- self.assertEqual(1, render.handle_args("anyname", args))
- self.assertIn(
- "Missing user-data file: %s" % absent_file, self.logs.getvalue()
- )
+ assert render.handle_args("anyname", args) == 1
+ assert "Missing user-data file: %s" % absent_file in caplog.text
- def test_handle_args_error_on_missing_instance_data(self):
+ def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir):
"""When instance_data file path does not exist, log an error."""
- user_data = self.tmp_path("user-data", dir=self.tmp)
- absent_file = self.tmp_path("instance-data", dir=self.tmp)
- args = self.args(
+ user_data = tmpdir.join("user-data")
+ absent_file = tmpdir.join("instance-data")
+ args = self.Args(
user_data=user_data, instance_data=absent_file, debug=False
)
with mock.patch("sys.stderr", new_callable=StringIO):
- self.assertEqual(1, render.handle_args("anyname", args))
- self.assertIn(
- "Missing instance-data.json file: %s" % absent_file,
- self.logs.getvalue(),
+ assert render.handle_args("anyname", args) == 1
+ assert (
+ "Missing instance-data.json file: %s" % absent_file in caplog.text
)
- def test_handle_args_defaults_instance_data(self):
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_handle_args_defaults_instance_data(self, m_paths, caplog, tmpdir):
"""When no instance_data argument, default to configured run_dir."""
- user_data = self.tmp_path("user-data", dir=self.tmp)
- run_dir = self.tmp_path("run_dir", dir=self.tmp)
+ user_data = tmpdir.join("user-data")
+ run_dir = tmpdir.join("run_dir")
ensure_dir(run_dir)
- paths = Paths({"run_dir": run_dir})
- self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
- self.m_paths.return_value = paths
- args = self.args(user_data=user_data, instance_data=None, debug=False)
+ m_paths.return_value = Paths({"run_dir": run_dir})
+ args = self.Args(user_data=user_data, instance_data=None, debug=False)
with mock.patch("sys.stderr", new_callable=StringIO):
- self.assertEqual(1, render.handle_args("anyname", args))
- json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
- self.assertIn(
- "Missing instance-data.json file: %s" % json_file,
- self.logs.getvalue(),
- )
-
- def test_handle_args_root_fallback_from_sensitive_instance_data(self):
+ assert render.handle_args("anyname", args) == 1
+ json_file = run_dir.join(INSTANCE_JSON_FILE)
+ msg = "Missing instance-data.json file: %s" % json_file
+ assert msg in caplog.text
+
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_handle_args_root_fallback_from_sensitive_instance_data(
+ self, m_paths, caplog, tmpdir
+ ):
"""When root user defaults to sensitive.json."""
- user_data = self.tmp_path("user-data", dir=self.tmp)
- run_dir = self.tmp_path("run_dir", dir=self.tmp)
+ user_data = tmpdir.join("user-data")
+ run_dir = tmpdir.join("run_dir")
ensure_dir(run_dir)
- paths = Paths({"run_dir": run_dir})
- self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
- self.m_paths.return_value = paths
- args = self.args(user_data=user_data, instance_data=None, debug=False)
+ m_paths.return_value = Paths({"run_dir": run_dir})
+ args = self.Args(user_data=user_data, instance_data=None, debug=False)
with mock.patch("sys.stderr", new_callable=StringIO):
with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
- self.assertEqual(1, render.handle_args("anyname", args))
- json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
- json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
- self.assertIn(
- "WARNING: Missing root-readable %s. Using redacted %s"
- % (json_sensitive, json_file),
- self.logs.getvalue(),
- )
- self.assertIn(
- "ERROR: Missing instance-data.json file: %s" % json_file,
- self.logs.getvalue(),
+ assert render.handle_args("anyname", args) == 1
+ json_file = run_dir.join(INSTANCE_JSON_FILE)
+ json_sensitive = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
+ assert (
+ "Missing root-readable %s. Using redacted %s"
+ % (json_sensitive, json_file)
+ in caplog.text
)
+ assert "Missing instance-data.json file: %s" % json_file in caplog.text
- def test_handle_args_root_uses_sensitive_instance_data(self):
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_handle_args_root_uses_sensitive_instance_data(
+ self, m_paths, tmpdir
+ ):
"""When root user, and no instance-data arg, use sensitive.json."""
- user_data = self.tmp_path("user-data", dir=self.tmp)
+ user_data = tmpdir.join("user-data")
write_file(user_data, "##template: jinja\nrendering: {{ my_var }}")
- run_dir = self.tmp_path("run_dir", dir=self.tmp)
+ run_dir = tmpdir.join("run_dir")
ensure_dir(run_dir)
- json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
+ json_sensitive = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
write_file(json_sensitive, '{"my-var": "jinja worked"}')
- paths = Paths({"run_dir": run_dir})
- self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
- self.m_paths.return_value = paths
- args = self.args(user_data=user_data, instance_data=None, debug=False)
- with mock.patch("sys.stderr", new_callable=StringIO):
- with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
- with mock.patch("os.getuid") as m_getuid:
- m_getuid.return_value = 0
- self.assertEqual(0, render.handle_args("anyname", args))
- self.assertIn("rendering: jinja worked", m_stdout.getvalue())
+ m_paths.return_value = Paths({"run_dir": run_dir})
+ args = self.Args(user_data=user_data, instance_data=None, debug=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 0
+ assert render.handle_args("anyname", args) == 0
+ assert "rendering: jinja worked" in m_stdout.getvalue()
@skipUnlessJinja()
- def test_handle_args_renders_instance_data_vars_in_template(self):
+ def test_handle_args_renders_instance_data_vars_in_template(
+ self, caplog, tmpdir
+ ):
"""If user_data file is a jinja template render instance-data vars."""
- user_data = self.tmp_path("user-data", dir=self.tmp)
+ user_data = tmpdir.join("user-data")
write_file(user_data, "##template: jinja\nrendering: {{ my_var }}")
- instance_data = self.tmp_path("instance-data", dir=self.tmp)
+ instance_data = tmpdir.join("instance-data")
write_file(instance_data, '{"my-var": "jinja worked"}')
- args = self.args(
+ args = self.Args(
user_data=user_data, instance_data=instance_data, debug=True
)
- with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err:
+ with mock.patch("sys.stderr", new_callable=StringIO):
with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
- self.assertEqual(0, render.handle_args("anyname", args))
- self.assertIn(
- "DEBUG: Converted jinja variables\n{", self.logs.getvalue()
- )
- self.assertIn(
- "DEBUG: Converted jinja variables\n{", m_console_err.getvalue()
- )
- self.assertEqual("rendering: jinja worked", m_stdout.getvalue())
+ assert render.handle_args("anyname", args) == 0
+ assert "Converted jinja variables\n{" in caplog.text
+ # TODO enable after pytest>=3.4
+ # more info: https://docs.pytest.org/en/stable/how-to/logging.html
+ # assert "Converted jinja variables\n{" in m_stderr.getvalue()
+ assert "rendering: jinja worked" == m_stdout.getvalue()
@skipUnlessJinja()
- def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self):
+ def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(
+ self, caplog, tmpdir
+ ):
"""If user_data file has invalid jinja operations log warnings."""
- user_data = self.tmp_path("user-data", dir=self.tmp)
+ user_data = tmpdir.join("user-data")
write_file(user_data, "##template: jinja\nrendering: {{ my-var }}")
- instance_data = self.tmp_path("instance-data", dir=self.tmp)
+ instance_data = tmpdir.join("instance-data")
write_file(instance_data, '{"my-var": "jinja worked"}')
- args = self.args(
+ args = self.Args(
user_data=user_data, instance_data=instance_data, debug=True
)
with mock.patch("sys.stderr", new_callable=StringIO):
- self.assertEqual(1, render.handle_args("anyname", args))
- self.assertIn(
- "WARNING: Ignoring jinja template for %s: Undefined jinja"
+ assert render.handle_args("anyname", args) == 1
+ assert (
+ "Ignoring jinja template for %s: Undefined jinja"
' variable: "my-var". Jinja tried subtraction. Perhaps you meant'
- ' "my_var"?' % user_data,
- self.logs.getvalue(),
- )
+ ' "my_var"?' % user_data
+ ) in caplog.text
# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py
index 03a73bb5..207078fa 100644
--- a/tests/unittests/cmd/test_query.py
+++ b/tests/unittests/cmd/test_query.py
@@ -20,6 +20,8 @@ from cloudinit.sources import (
from cloudinit.util import b64e, write_file
from tests.unittests.helpers import mock
+M_PATH = "cloudinit.cmd.query."
+
def _gzip_data(data):
with BytesIO() as iobuf:
@@ -28,11 +30,11 @@ def _gzip_data(data):
return iobuf.getvalue()
-@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "")
+@mock.patch(M_PATH + "addLogHandlerCLI", lambda *args: "")
class TestQuery:
- args = namedtuple(
- "queryargs",
+ Args = namedtuple(
+ "Args",
"debug dump_all format instance_data list_keys user_data vendor_data"
" varname",
)
@@ -70,7 +72,7 @@ class TestQuery:
def test_handle_args_error_on_missing_param(self, caplog, capsys):
"""Error when missing required parameters and print usage."""
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=False,
format=None,
@@ -81,7 +83,7 @@ class TestQuery:
varname=None,
)
with mock.patch(
- "cloudinit.cmd.query.addLogHandlerCLI", return_value=""
+ M_PATH + "addLogHandlerCLI", return_value=""
) as m_cli_log:
assert 1 == query.handle_args("anyname", args)
expected_error = (
@@ -108,13 +110,13 @@ class TestQuery:
),
),
)
- def test_handle_args_error_on_invalid_vaname_paths(
+ def test_handle_args_error_on_invalid_varname_paths(
self, inst_data, varname, expected_error, caplog, tmpdir
):
"""Error when varname is not a valid instance-data variable path."""
instance_data = tmpdir.join("instance-data")
instance_data.write(inst_data)
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=False,
format=None,
@@ -125,12 +127,10 @@ class TestQuery:
varname=varname,
)
paths, _, _, _ = self._setup_paths(tmpdir)
- with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ with mock.patch(M_PATH + "read_cfg_paths") as m_paths:
m_paths.return_value = paths
- with mock.patch(
- "cloudinit.cmd.query.addLogHandlerCLI", return_value=""
- ):
- with mock.patch("cloudinit.cmd.query.load_userdata") as m_lud:
+ with mock.patch(M_PATH + "addLogHandlerCLI", return_value=""):
+ with mock.patch(M_PATH + "load_userdata") as m_lud:
m_lud.return_value = "ud"
assert 1 == query.handle_args("anyname", args)
assert expected_error in caplog.text
@@ -138,7 +138,7 @@ class TestQuery:
def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir):
"""When instance_data file path does not exist, log an error."""
absent_fn = tmpdir.join("absent")
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -159,7 +159,7 @@ class TestQuery:
"""When instance_data file is unreadable, log an error."""
noread_fn = tmpdir.join("unreadable")
noread_fn.write("thou shall not pass")
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -169,15 +169,47 @@ class TestQuery:
vendor_data="vd",
varname=None,
)
- with mock.patch("cloudinit.cmd.query.util.load_file") as m_load:
+ with mock.patch(M_PATH + "util.load_file") as m_load:
m_load.side_effect = OSError(errno.EACCES, "Not allowed")
assert 1 == query.handle_args("anyname", args)
msg = "No read permission on '%s'. Try sudo" % noread_fn
assert msg in caplog.text
+ @pytest.mark.parametrize(
+ "exception",
+ [
+ (OSError(errno.EACCES, "Not allowed"),),
+ (OSError(errno.ENOENT, "Not allowed"),),
+ (IOError,),
+ ],
+ )
+ def test_handle_args_error_when_no_read_permission_init_cfg(
+ self, exception, capsys
+ ):
+ """query.handle_status_args exists with 1 and no sys-output."""
+ args = self.Args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
+ with mock.patch(
+ M_PATH + "read_cfg_paths",
+ side_effect=exception,
+ ) as m_read_cfg_paths:
+ query.handle_args("anyname", args)
+ assert m_read_cfg_paths.call_count == 1
+ out, err = capsys.readouterr()
+ assert not out
+ assert not err
+
def test_handle_args_defaults_instance_data(self, caplog, tmpdir):
"""When no instance_data argument, default to configured run_dir."""
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -188,7 +220,7 @@ class TestQuery:
varname=None,
)
paths, run_dir, _, _ = self._setup_paths(tmpdir)
- with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ with mock.patch(M_PATH + "read_cfg_paths") as m_paths:
m_paths.return_value = paths
assert 1 == query.handle_args("anyname", args)
json_file = run_dir.join(INSTANCE_JSON_FILE)
@@ -197,7 +229,7 @@ class TestQuery:
def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir):
"""When no instance_data argument, root falls back to redacted json."""
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -208,7 +240,7 @@ class TestQuery:
varname=None,
)
paths, run_dir, _, _ = self._setup_paths(tmpdir)
- with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ with mock.patch(M_PATH + "read_cfg_paths") as m_paths:
m_paths.return_value = paths
with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
@@ -239,7 +271,7 @@ class TestQuery:
)
sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
sensitive_file.write('{"my-var": "it worked"}')
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -249,7 +281,7 @@ class TestQuery:
vendor_data=vendor_data.strpath,
varname=None,
)
- with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ with mock.patch(M_PATH + "read_cfg_paths") as m_paths:
m_paths.return_value = paths
with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
@@ -277,7 +309,7 @@ class TestQuery:
vd_path = os.path.join(paths.instance_link, "vendor-data.txt")
write_file(vd_path, "instance_link_vd")
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -287,7 +319,7 @@ class TestQuery:
vendor_data=None,
varname=None,
)
- with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ with mock.patch(M_PATH + "read_cfg_paths") as m_paths:
m_paths.return_value = paths
with mock.patch("os.getuid", return_value=0):
assert 0 == query.handle_args("anyname", args)
@@ -308,7 +340,7 @@ class TestQuery:
)
sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
sensitive_file.write('{"my-var": "it worked"}')
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -318,7 +350,7 @@ class TestQuery:
vendor_data=vendor_data.strpath,
varname=None,
)
- with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ with mock.patch(M_PATH + "read_cfg_paths") as m_paths:
m_paths.return_value = paths
with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
@@ -334,7 +366,7 @@ class TestQuery:
"""When --all is specified query will dump all instance data vars."""
instance_data = tmpdir.join("instance-data")
instance_data.write('{"my-var": "it worked"}')
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -359,7 +391,7 @@ class TestQuery:
"""When the argument varname is passed, report its value."""
instance_data = tmpdir.join("instance-data")
instance_data.write('{"my-var": "it worked"}')
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -398,7 +430,7 @@ class TestQuery:
"""If user_data file is a jinja template render instance-data vars."""
instance_data = tmpdir.join("instance-data")
instance_data.write(inst_data)
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=False,
format=None,
@@ -440,7 +472,7 @@ class TestQuery:
}
"""
)
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=True,
format=None,
@@ -466,7 +498,7 @@ class TestQuery:
' "top": "gun"}'
)
expected = "top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n"
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=False,
format=None,
@@ -492,7 +524,7 @@ class TestQuery:
+ ' {"v2_2": "val2.2"}, "top": "gun"}'
)
expected = "v1_1\nv1_2\n"
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=False,
format=None,
@@ -518,7 +550,7 @@ class TestQuery:
+ '{"v2_2": "val2.2"}, "top": "gun"}'
)
expected_error = "--list-keys provided but 'top' is not a dict"
- args = self.args(
+ args = self.Args(
debug=False,
dump_all=False,
format=None,
diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py
index c5f424da..e9169a55 100644
--- a/tests/unittests/cmd/test_status.py
+++ b/tests/unittests/cmd/test_status.py
@@ -4,188 +4,189 @@ import os
from collections import namedtuple
from io import StringIO
from textwrap import dedent
+from typing import Callable, Dict, Optional, Union
+from unittest import mock
+
+import pytest
from cloudinit.atomic_helper import write_json
from cloudinit.cmd import status
from cloudinit.util import ensure_file
-from tests.unittests.helpers import CiTestCase, mock, wrap_and_call
-
-mypaths = namedtuple("MyPaths", "run_dir")
-myargs = namedtuple("MyArgs", "long wait")
-
-
-class TestStatus(CiTestCase):
- def setUp(self):
- super(TestStatus, self).setUp()
- self.new_root = self.tmp_dir()
- self.status_file = self.tmp_path("status.json", self.new_root)
- self.disable_file = self.tmp_path("cloudinit-disable", self.new_root)
- self.paths = mypaths(run_dir=self.new_root)
-
- class FakeInit(object):
- paths = self.paths
-
- def __init__(self, ds_deps):
- pass
-
- def read_cfg(self):
- pass
-
- self.init_class = FakeInit
-
- def test__is_cloudinit_disabled_false_on_sysvinit(self):
- """When not in an environment using systemd, return False."""
- ensure_file(self.disable_file) # Create the ignored disable file
+from tests.unittests.helpers import wrap_and_call
+
+M_NAME = "cloudinit.cmd.status"
+M_PATH = f"{M_NAME}."
+
+MyPaths = namedtuple("MyPaths", "run_dir")
+MyArgs = namedtuple("MyArgs", "long wait")
+Config = namedtuple(
+ "Config", "new_root, status_file, disable_file, result_file, paths"
+)
+
+
+@pytest.fixture(scope="function")
+def config(tmpdir):
+ return Config(
+ new_root=tmpdir,
+ status_file=tmpdir.join("status.json"),
+ disable_file=tmpdir.join("cloudinit-disable"),
+ result_file=tmpdir.join("result.json"),
+ paths=MyPaths(run_dir=tmpdir),
+ )
+
+
+class TestStatus:
+ @pytest.mark.parametrize(
+ [
+ "ensured_file",
+ "uses_systemd",
+ "get_cmdline",
+ "expected_is_disabled",
+ "is_disabled_msg",
+ "expected_reason",
+ ],
+ [
+ # When not in an environment using systemd, return False.
+ pytest.param(
+ lambda config: config.disable_file,
+ False,
+ "root=/dev/my-root not-important",
+ False,
+ "expected enabled cloud-init on sysvinit",
+ "Cloud-init enabled on sysvinit",
+ id="false_on_sysvinit",
+ ),
+ # When using systemd and disable_file is present return disabled.
+ pytest.param(
+ lambda config: config.disable_file,
+ True,
+ "root=/dev/my-root not-important",
+ True,
+ "expected disabled cloud-init",
+ lambda config: f"Cloud-init disabled by {config.disable_file}",
+ id="true_on_disable_file",
+ ),
+ # Not disabled when using systemd and enabled via commandline.
+ pytest.param(
+ lambda config: config.disable_file,
+ True,
+ "something cloud-init=enabled else",
+ False,
+ "expected enabled cloud-init",
+ "Cloud-init enabled by kernel command line cloud-init=enabled",
+ id="false_on_kernel_cmdline_enable",
+ ),
+ # When kernel command line disables cloud-init return True.
+ pytest.param(
+ None,
+ True,
+ "something cloud-init=disabled else",
+ True,
+ "expected disabled cloud-init",
+ "Cloud-init disabled by kernel parameter cloud-init=disabled",
+ id="true_on_kernel_cmdline",
+ ),
+ # When cloud-init-generator writes disabled file return True.
+ pytest.param(
+ lambda config: os.path.join(config.paths.run_dir, "disabled"),
+ True,
+ "something",
+ True,
+ "expected disabled cloud-init",
+ "Cloud-init disabled by cloud-init-generator",
+ id="true_when_generator_disables",
+ ),
+ # Report enabled when systemd generator creates the enabled file.
+ pytest.param(
+ lambda config: os.path.join(config.paths.run_dir, "enabled"),
+ True,
+ "something ignored",
+ False,
+ "expected enabled cloud-init",
+ "Cloud-init enabled by systemd cloud-init-generator",
+ id="false_when_enabled_in_systemd",
+ ),
+ ],
+ )
+ def test__is_cloudinit_disabled(
+ self,
+ ensured_file: Optional[Callable],
+ uses_systemd: bool,
+ get_cmdline: str,
+ expected_is_disabled: bool,
+ is_disabled_msg: str,
+ expected_reason: Union[str, Callable],
+ config: Config,
+ ):
+ if ensured_file is not None:
+ ensure_file(ensured_file(config))
(is_disabled, reason) = wrap_and_call(
- "cloudinit.cmd.status",
+ M_NAME,
{
- "uses_systemd": False,
- "get_cmdline": "root=/dev/my-root not-important",
+ "uses_systemd": uses_systemd,
+ "get_cmdline": get_cmdline,
},
status._is_cloudinit_disabled,
- self.disable_file,
- self.paths,
- )
- self.assertFalse(
- is_disabled, "expected enabled cloud-init on sysvinit"
+ config.disable_file,
+ config.paths,
)
- self.assertEqual("Cloud-init enabled on sysvinit", reason)
-
- def test__is_cloudinit_disabled_true_on_disable_file(self):
- """When using systemd and disable_file is present return disabled."""
- ensure_file(self.disable_file) # Create observed disable file
- (is_disabled, reason) = wrap_and_call(
- "cloudinit.cmd.status",
- {
- "uses_systemd": True,
- "get_cmdline": "root=/dev/my-root not-important",
- },
- status._is_cloudinit_disabled,
- self.disable_file,
- self.paths,
- )
- self.assertTrue(is_disabled, "expected disabled cloud-init")
- self.assertEqual(
- "Cloud-init disabled by {0}".format(self.disable_file), reason
- )
-
- def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self):
- """Not disabled when using systemd and enabled via commandline."""
- ensure_file(self.disable_file) # Create ignored disable file
- (is_disabled, reason) = wrap_and_call(
- "cloudinit.cmd.status",
- {
- "uses_systemd": True,
- "get_cmdline": "something cloud-init=enabled else",
- },
- status._is_cloudinit_disabled,
- self.disable_file,
- self.paths,
- )
- self.assertFalse(is_disabled, "expected enabled cloud-init")
- self.assertEqual(
- "Cloud-init enabled by kernel command line cloud-init=enabled",
- reason,
- )
-
- def test__is_cloudinit_disabled_true_on_kernel_cmdline(self):
- """When kernel command line disables cloud-init return True."""
- (is_disabled, reason) = wrap_and_call(
- "cloudinit.cmd.status",
- {
- "uses_systemd": True,
- "get_cmdline": "something cloud-init=disabled else",
- },
- status._is_cloudinit_disabled,
- self.disable_file,
- self.paths,
- )
- self.assertTrue(is_disabled, "expected disabled cloud-init")
- self.assertEqual(
- "Cloud-init disabled by kernel parameter cloud-init=disabled",
- reason,
- )
-
- def test__is_cloudinit_disabled_true_when_generator_disables(self):
- """When cloud-init-generator writes disabled file return True."""
- disabled_file = os.path.join(self.paths.run_dir, "disabled")
- ensure_file(disabled_file)
- (is_disabled, reason) = wrap_and_call(
- "cloudinit.cmd.status",
- {"uses_systemd": True, "get_cmdline": "something"},
- status._is_cloudinit_disabled,
- self.disable_file,
- self.paths,
- )
- self.assertTrue(is_disabled, "expected disabled cloud-init")
- self.assertEqual("Cloud-init disabled by cloud-init-generator", reason)
-
- def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self):
- """Report enabled when systemd generator creates the enabled file."""
- enabled_file = os.path.join(self.paths.run_dir, "enabled")
- ensure_file(enabled_file)
- (is_disabled, reason) = wrap_and_call(
- "cloudinit.cmd.status",
- {"uses_systemd": True, "get_cmdline": "something ignored"},
- status._is_cloudinit_disabled,
- self.disable_file,
- self.paths,
- )
- self.assertFalse(is_disabled, "expected enabled cloud-init")
- self.assertEqual(
- "Cloud-init enabled by systemd cloud-init-generator", reason
- )
-
- def test_status_returns_not_run(self):
+ assert is_disabled == expected_is_disabled, is_disabled_msg
+ if isinstance(expected_reason, str):
+ assert reason == expected_reason
+ else:
+ assert reason == expected_reason(config)
+
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_status_returns_not_run(self, m_read_cfg_paths, config: Config):
"""When status.json does not exist yet, return 'not run'."""
- self.assertFalse(
- os.path.exists(self.status_file), "Unexpected status.json found"
- )
- cmdargs = myargs(long=False, wait=False)
+ m_read_cfg_paths.return_value = config.paths
+ assert not os.path.exists(
+ config.status_file
+ ), "Unexpected status.json found"
+ cmdargs = MyArgs(long=False, wait=False)
with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- "cloudinit.cmd.status",
- {
- "_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
- },
+ M_NAME,
+ {"_is_cloudinit_disabled": (False, "")},
status.handle_status_args,
"ignored",
cmdargs,
)
- self.assertEqual(0, retcode)
- self.assertEqual("status: not run\n", m_stdout.getvalue())
+ assert retcode == 0
+ assert m_stdout.getvalue() == "status: not run\n"
- def test_status_returns_disabled_long_on_presence_of_disable_file(self):
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_status_returns_disabled_long_on_presence_of_disable_file(
+ self, m_read_cfg_paths, config: Config
+ ):
"""When cloudinit is disabled, return disabled reason."""
-
+ m_read_cfg_paths.return_value = config.paths
checked_files = []
def fakeexists(filepath):
checked_files.append(filepath)
- status_file = os.path.join(self.paths.run_dir, "status.json")
+ status_file = os.path.join(config.paths.run_dir, "status.json")
return bool(not filepath == status_file)
- cmdargs = myargs(long=True, wait=False)
+ cmdargs = MyArgs(long=True, wait=False)
with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- "cloudinit.cmd.status",
+ M_NAME,
{
"os.path.exists": {"side_effect": fakeexists},
"_is_cloudinit_disabled": (
True,
"disabled for some reason",
),
- "Init": {"side_effect": self.init_class},
},
status.handle_status_args,
"ignored",
cmdargs,
)
- self.assertEqual(0, retcode)
- self.assertEqual(
- [os.path.join(self.paths.run_dir, "status.json")], checked_files
- )
+ assert retcode == 0
+ assert checked_files == [
+ os.path.join(config.paths.run_dir, "status.json")
+ ]
expected = dedent(
"""\
status: disabled
@@ -193,246 +194,217 @@ class TestStatus(CiTestCase):
disabled for some reason
"""
)
- self.assertEqual(expected, m_stdout.getvalue())
-
- def test_status_returns_running_on_no_results_json(self):
- """Report running when status.json exists but result.json does not."""
- result_file = self.tmp_path("result.json", self.new_root)
- write_json(self.status_file, {})
- self.assertFalse(
- os.path.exists(result_file), "Unexpected result.json found"
- )
- cmdargs = myargs(long=False, wait=False)
- with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- "cloudinit.cmd.status",
- {
- "_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
- },
- status.handle_status_args,
- "ignored",
- cmdargs,
- )
- self.assertEqual(0, retcode)
- self.assertEqual("status: running\n", m_stdout.getvalue())
-
- def test_status_returns_running(self):
- """Report running when status exists with an unfinished stage."""
- ensure_file(self.tmp_path("result.json", self.new_root))
- write_json(
- self.status_file, {"v1": {"init": {"start": 1, "finished": None}}}
- )
- cmdargs = myargs(long=False, wait=False)
- with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- "cloudinit.cmd.status",
+ assert m_stdout.getvalue() == expected
+
+ @pytest.mark.parametrize(
+ [
+ "ensured_file",
+ "status_content",
+ "assert_file",
+ "cmdargs",
+ "expected_retcode",
+ "expected_status",
+ ],
+ [
+ # Report running when status.json exists but result.json does not.
+ pytest.param(
+ None,
+ {},
+ lambda config: config.result_file,
+ MyArgs(long=False, wait=False),
+ 0,
+ "status: running\n",
+ id="running_on_no_results_json",
+ ),
+ # Report running when status exists with an unfinished stage.
+ pytest.param(
+ lambda config: config.result_file,
+ {"v1": {"init": {"start": 1, "finished": None}}},
+ None,
+ MyArgs(long=False, wait=False),
+ 0,
+ "status: running\n",
+ id="running",
+ ),
+ # Report done results.json exists no stages are unfinished.
+ pytest.param(
+ lambda config: config.result_file,
{
- "_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
+ "v1": {
+ "stage": None, # No current stage running
+ "datasource": (
+ "DataSourceNoCloud "
+ "[seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "blah": {"finished": 123.456},
+ "init": {
+ "errors": [],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
},
- status.handle_status_args,
- "ignored",
- cmdargs,
- )
- self.assertEqual(0, retcode)
- self.assertEqual("status: running\n", m_stdout.getvalue())
-
- def test_status_returns_done(self):
- """Report done results.json exists no stages are unfinished."""
- ensure_file(self.tmp_path("result.json", self.new_root))
- write_json(
- self.status_file,
- {
- "v1": {
- "stage": None, # No current stage running
- "datasource": (
- "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
- "[dsmode=net]"
- ),
- "blah": {"finished": 123.456},
- "init": {
- "errors": [],
- "start": 124.567,
- "finished": 125.678,
- },
- "init-local": {"start": 123.45, "finished": 123.46},
- }
- },
- )
- cmdargs = myargs(long=False, wait=False)
- with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- "cloudinit.cmd.status",
+ None,
+ MyArgs(long=False, wait=False),
+ 0,
+ "status: done\n",
+ id="done",
+ ),
+ # Long format of done status includes datasource info.
+ pytest.param(
+ lambda config: config.result_file,
{
- "_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
+ "v1": {
+ "stage": None,
+ "datasource": (
+ "DataSourceNoCloud "
+ "[seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "init": {"start": 124.567, "finished": 125.678},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
},
- status.handle_status_args,
- "ignored",
- cmdargs,
- )
- self.assertEqual(0, retcode)
- self.assertEqual("status: done\n", m_stdout.getvalue())
-
- def test_status_returns_done_long(self):
- """Long format of done status includes datasource info."""
- ensure_file(self.tmp_path("result.json", self.new_root))
- write_json(
- self.status_file,
- {
- "v1": {
- "stage": None,
- "datasource": (
- "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
- "[dsmode=net]"
- ),
- "init": {"start": 124.567, "finished": 125.678},
- "init-local": {"start": 123.45, "finished": 123.46},
- }
- },
- )
- cmdargs = myargs(long=True, wait=False)
- with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- "cloudinit.cmd.status",
+ None,
+ MyArgs(long=True, wait=False),
+ 0,
+ dedent(
+ """\
+ status: done
+ time: Thu, 01 Jan 1970 00:02:05 +0000
+ detail:
+ DataSourceNoCloud [seed=/var/.../seed/nocloud-net]\
+[dsmode=net]
+ """
+ ),
+ id="returns_done_long",
+ ),
+ # Reports error when any stage has errors.
+ pytest.param(
+ None,
{
- "_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
+ "v1": {
+ "stage": None,
+ "blah": {"errors": [], "finished": 123.456},
+ "init": {
+ "errors": ["error1"],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
},
- status.handle_status_args,
- "ignored",
- cmdargs,
- )
- self.assertEqual(0, retcode)
- expected = dedent(
- """\
- status: done
- time: Thu, 01 Jan 1970 00:02:05 +0000
- detail:
- DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net]
- """
- )
- self.assertEqual(expected, m_stdout.getvalue())
-
- def test_status_on_errors(self):
- """Reports error when any stage has errors."""
- write_json(
- self.status_file,
- {
- "v1": {
- "stage": None,
- "blah": {"errors": [], "finished": 123.456},
- "init": {
- "errors": ["error1"],
- "start": 124.567,
- "finished": 125.678,
- },
- "init-local": {"start": 123.45, "finished": 123.46},
- }
- },
- )
- cmdargs = myargs(long=False, wait=False)
- with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- "cloudinit.cmd.status",
+ None,
+ MyArgs(long=False, wait=False),
+ 1,
+ "status: error\n",
+ id="on_errors",
+ ),
+ # Long format of error status includes all error messages.
+ pytest.param(
+ None,
{
- "_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
+ "v1": {
+ "stage": None,
+ "datasource": (
+ "DataSourceNoCloud "
+ "[seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "init": {
+ "errors": ["error1"],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {
+ "errors": ["error2", "error3"],
+ "start": 123.45,
+ "finished": 123.46,
+ },
+ }
},
- status.handle_status_args,
- "ignored",
- cmdargs,
- )
- self.assertEqual(1, retcode)
- self.assertEqual("status: error\n", m_stdout.getvalue())
-
- def test_status_on_errors_long(self):
- """Long format of error status includes all error messages."""
- write_json(
- self.status_file,
- {
- "v1": {
- "stage": None,
- "datasource": (
- "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
- "[dsmode=net]"
- ),
- "init": {
- "errors": ["error1"],
- "start": 124.567,
- "finished": 125.678,
- },
- "init-local": {
- "errors": ["error2", "error3"],
- "start": 123.45,
- "finished": 123.46,
- },
- }
- },
- )
- cmdargs = myargs(long=True, wait=False)
- with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- "cloudinit.cmd.status",
+ None,
+ MyArgs(long=True, wait=False),
+ 1,
+ dedent(
+ """\
+ status: error
+ time: Thu, 01 Jan 1970 00:02:05 +0000
+ detail:
+ error1
+ error2
+ error3
+ """
+ ),
+ id="on_errors_long",
+ ),
+ # Long format reports the stage in which we are running.
+ pytest.param(
+ None,
{
- "_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
},
- status.handle_status_args,
- "ignored",
- cmdargs,
- )
- self.assertEqual(1, retcode)
- expected = dedent(
- """\
- status: error
- time: Thu, 01 Jan 1970 00:02:05 +0000
- detail:
- error1
- error2
- error3
- """
- )
- self.assertEqual(expected, m_stdout.getvalue())
-
- def test_status_returns_running_long_format(self):
- """Long format reports the stage in which we are running."""
+ None,
+ MyArgs(long=True, wait=False),
+ 0,
+ dedent(
+ """\
+ status: running
+ time: Thu, 01 Jan 1970 00:02:04 +0000
+ detail:
+ Running in stage: init
+ """
+ ),
+ id="running_long_format",
+ ),
+ ],
+ )
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_status_output(
+ self,
+ m_read_cfg_paths,
+ ensured_file: Optional[Callable],
+ status_content: Dict,
+ assert_file,
+ cmdargs: MyArgs,
+ expected_retcode: int,
+ expected_status: str,
+ config: Config,
+ ):
+ m_read_cfg_paths.return_value = config.paths
+ if ensured_file:
+ ensure_file(ensured_file(config))
write_json(
- self.status_file,
- {
- "v1": {
- "stage": "init",
- "init": {"start": 124.456, "finished": None},
- "init-local": {"start": 123.45, "finished": 123.46},
- }
- },
+ config.status_file,
+ status_content,
)
- cmdargs = myargs(long=True, wait=False)
+ if assert_file:
+ assert not os.path.exists(
+ config.result_file
+ ), f"Unexpected {config.result_file} found"
with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- "cloudinit.cmd.status",
- {
- "_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
- },
+ M_NAME,
+ {"_is_cloudinit_disabled": (False, "")},
status.handle_status_args,
"ignored",
cmdargs,
)
- self.assertEqual(0, retcode)
- expected = dedent(
- """\
- status: running
- time: Thu, 01 Jan 1970 00:02:04 +0000
- detail:
- Running in stage: init
- """
- )
- self.assertEqual(expected, m_stdout.getvalue())
+ assert retcode == expected_retcode
+ assert m_stdout.getvalue() == expected_status
- def test_status_wait_blocks_until_done(self):
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_status_wait_blocks_until_done(
+ self, m_read_cfg_paths, config: Config
+ ):
"""Specifying wait will poll every 1/4 second until done state."""
+ m_read_cfg_paths.return_value = config.paths
running_json = {
"v1": {
"stage": "init",
@@ -448,37 +420,41 @@ class TestStatus(CiTestCase):
}
}
- self.sleep_calls = 0
+ sleep_calls = 0
def fake_sleep(interval):
- self.assertEqual(0.25, interval)
- self.sleep_calls += 1
- if self.sleep_calls == 2:
- write_json(self.status_file, running_json)
- elif self.sleep_calls == 3:
- write_json(self.status_file, done_json)
- result_file = self.tmp_path("result.json", self.new_root)
+ nonlocal sleep_calls
+ assert interval == 0.25
+ sleep_calls += 1
+ if sleep_calls == 2:
+ write_json(config.status_file, running_json)
+ elif sleep_calls == 3:
+ write_json(config.status_file, done_json)
+ result_file = config.result_file
ensure_file(result_file)
- cmdargs = myargs(long=False, wait=True)
+ cmdargs = MyArgs(long=False, wait=True)
with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- "cloudinit.cmd.status",
+ M_NAME,
{
"sleep": {"side_effect": fake_sleep},
"_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
},
status.handle_status_args,
"ignored",
cmdargs,
)
- self.assertEqual(0, retcode)
- self.assertEqual(4, self.sleep_calls)
- self.assertEqual("....\nstatus: done\n", m_stdout.getvalue())
-
- def test_status_wait_blocks_until_error(self):
+ assert retcode == 0
+ assert sleep_calls == 4
+ assert m_stdout.getvalue() == "....\nstatus: done\n"
+
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_status_wait_blocks_until_error(
+ self, m_read_cfg_paths, config: Config
+ ):
"""Specifying wait will poll every 1/4 second until error state."""
+ m_read_cfg_paths.return_value = config.paths
running_json = {
"v1": {
"stage": "init",
@@ -498,51 +474,53 @@ class TestStatus(CiTestCase):
}
}
- self.sleep_calls = 0
+ sleep_calls = 0
def fake_sleep(interval):
- self.assertEqual(0.25, interval)
- self.sleep_calls += 1
- if self.sleep_calls == 2:
- write_json(self.status_file, running_json)
- elif self.sleep_calls == 3:
- write_json(self.status_file, error_json)
-
- cmdargs = myargs(long=False, wait=True)
+ nonlocal sleep_calls
+ assert interval == 0.25
+ sleep_calls += 1
+ if sleep_calls == 2:
+ write_json(config.status_file, running_json)
+ elif sleep_calls == 3:
+ write_json(config.status_file, error_json)
+
+ cmdargs = MyArgs(long=False, wait=True)
with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- "cloudinit.cmd.status",
+ M_NAME,
{
"sleep": {"side_effect": fake_sleep},
"_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
},
status.handle_status_args,
"ignored",
cmdargs,
)
- self.assertEqual(1, retcode)
- self.assertEqual(4, self.sleep_calls)
- self.assertEqual("....\nstatus: error\n", m_stdout.getvalue())
+ assert retcode == 1
+ assert sleep_calls == 4
+ assert m_stdout.getvalue() == "....\nstatus: error\n"
- def test_status_main(self):
+ @mock.patch(M_PATH + "read_cfg_paths")
+ def test_status_main(self, m_read_cfg_paths, config: Config):
"""status.main can be run as a standalone script."""
+ m_read_cfg_paths.return_value = config.paths
write_json(
- self.status_file, {"v1": {"init": {"start": 1, "finished": None}}}
+ config.status_file,
+ {"v1": {"init": {"start": 1, "finished": None}}},
)
- with self.assertRaises(SystemExit) as context_manager:
+ with pytest.raises(SystemExit) as e:
with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
wrap_and_call(
- "cloudinit.cmd.status",
+ M_NAME,
{
"sys.argv": {"new": ["status"]},
"_is_cloudinit_disabled": (False, ""),
- "Init": {"side_effect": self.init_class},
},
status.main,
)
- self.assertEqual(0, context_manager.exception.code)
- self.assertEqual("status: running\n", m_stdout.getvalue())
+ assert e.value.code == 0
+ assert m_stdout.getvalue() == "status: running\n"
# vi: ts=4 expandtab syntax=python
diff --git a/tests/unittests/test_stages.py b/tests/unittests/test_stages.py
index fdf0e490..9fa2e629 100644
--- a/tests/unittests/test_stages.py
+++ b/tests/unittests/test_stages.py
@@ -13,6 +13,7 @@ from cloudinit.util import write_file
from tests.unittests.helpers import mock
TEST_INSTANCE_ID = "i-testing"
+M_PATH = "cloudinit.stages."
class FakeDataSource(sources.DataSource):
@@ -58,8 +59,8 @@ class TestInit:
write_file(disable_file, "")
assert (None, disable_file) == self.init._find_networking_config()
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"net_config",
[
@@ -80,8 +81,8 @@ class TestInit:
assert caplog.records[0].levelname == "DEBUG"
assert "network config disabled by cmdline" in caplog.text
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"net_config",
[
@@ -102,8 +103,8 @@ class TestInit:
assert caplog.records[0].levelname == "DEBUG"
assert "network config disabled by initramfs" in caplog.text
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"net_config",
[
@@ -130,8 +131,8 @@ class TestInit:
assert caplog.records[0].levelname == "DEBUG"
assert "network config disabled by ds" in caplog.text
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"net_config",
[
@@ -156,8 +157,8 @@ class TestInit:
assert caplog.records[0].levelname == "DEBUG"
assert "network config disabled by system_cfg" in caplog.text
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"in_config,out_config",
[
@@ -188,8 +189,8 @@ class TestInit:
NetworkConfigSource.DS,
) == self.init._find_networking_config()
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"in_config,out_config",
[
@@ -217,8 +218,8 @@ class TestInit:
in caplog.text
)
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"in_config,out_config",
[
@@ -246,8 +247,8 @@ class TestInit:
in caplog.text
)
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"in_config,out_config",
[
@@ -273,8 +274,8 @@ class TestInit:
NetworkConfigSource.CMD_LINE,
) == self.init._find_networking_config()
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"in_config,out_config",
[
@@ -300,8 +301,8 @@ class TestInit:
NetworkConfigSource.INITRAMFS,
) == self.init._find_networking_config()
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"in_config,out_config",
[
@@ -327,8 +328,8 @@ class TestInit:
NetworkConfigSource.SYSTEM_CFG,
) == self.init._find_networking_config()
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
@pytest.mark.parametrize(
"in_config,out_config",
[
@@ -348,8 +349,8 @@ class TestInit:
NetworkConfigSource.DS,
) == self.init._find_networking_config()
- @mock.patch("cloudinit.stages.cmdline.read_initramfs_config")
- @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config")
+ @mock.patch(M_PATH + "cmdline.read_initramfs_config")
+ @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config")
def test_wb__find_networking_config_returns_fallback(
self, m_cmdline, m_initramfs, caplog
):
@@ -577,13 +578,13 @@ class TestInit_InitializeFilesystem:
As it is replaced with a mock, consumers of this fixture can set
`init._cfg` if the default empty dict configuration is not appropriate.
"""
- with mock.patch("cloudinit.stages.util.ensure_dirs"):
+ with mock.patch(M_PATH + "util.ensure_dirs"):
init = stages.Init()
init._cfg = {}
init._paths = paths
yield init
- @mock.patch("cloudinit.stages.util.ensure_file")
+ @mock.patch(M_PATH + "util.ensure_file")
def test_ensure_file_not_called_if_no_log_file_configured(
self, m_ensure_file, init
):
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index d22d7747..bcb63787 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -3,6 +3,7 @@
"""Tests for cloudinit.util"""
import base64
+import errno
import io
import json
import logging
@@ -12,7 +13,9 @@ import re
import shutil
import stat
import tempfile
+from collections import deque
from textwrap import dedent
+from typing import Tuple
from unittest import mock
import pytest
@@ -24,6 +27,7 @@ from tests.unittests import helpers
from tests.unittests.helpers import CiTestCase
LOG = logging.getLogger(__name__)
+M_PATH = "cloudinit.util."
MOUNT_INFO = [
"68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64",
@@ -336,30 +340,108 @@ class FakeCloud(object):
return self.hostname
-class TestUtil(CiTestCase):
+class TestUtil:
def test_parse_mount_info_no_opts_no_arg(self):
result = util.parse_mount_info("/home", MOUNT_INFO, LOG)
- self.assertEqual(("/dev/sda2", "xfs", "/home"), result)
+ assert ("/dev/sda2", "xfs", "/home") == result
def test_parse_mount_info_no_opts_arg(self):
result = util.parse_mount_info("/home", MOUNT_INFO, LOG, False)
- self.assertEqual(("/dev/sda2", "xfs", "/home"), result)
+ assert ("/dev/sda2", "xfs", "/home") == result
def test_parse_mount_info_with_opts(self):
result = util.parse_mount_info("/", MOUNT_INFO, LOG, True)
- self.assertEqual(("/dev/sda1", "btrfs", "/", "ro,relatime"), result)
+ assert ("/dev/sda1", "btrfs", "/", "ro,relatime") == result
- @mock.patch("cloudinit.util.get_mount_info")
+ @mock.patch(M_PATH + "get_mount_info")
def test_mount_is_rw(self, m_mount_info):
m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "rw,relatime")
is_rw = util.mount_is_read_write("/")
- self.assertEqual(is_rw, True)
+ assert is_rw is True
- @mock.patch("cloudinit.util.get_mount_info")
+ @mock.patch(M_PATH + "get_mount_info")
def test_mount_is_ro(self, m_mount_info):
m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "ro,relatime")
is_rw = util.mount_is_read_write("/")
- self.assertEqual(is_rw, False)
+ assert is_rw is False
+
+ @mock.patch(
+ M_PATH + "read_conf",
+ side_effect=(OSError(errno.EACCES, "Not allowed"), {"0": "0"}),
+ )
+ def test_read_conf_d_no_permissions(
+ self, m_read_conf, caplog, capsys, tmpdir
+ ):
+ """If a user has not read permission to read a config file then
+ there is no exception nor stderr output and the user is informed via
+ logging warnings.
+
+ Note: This is used in cmd, therefore want to keep the invariant of
+ not outputing to the console and log file permission errors.
+ """
+ confs = []
+ for i in range(2):
+ confs.append(tmpdir.join(f"conf-{i}.cfg"))
+ confs[i].write("{}")
+ assert {"0": "0"} == util.read_conf_d(tmpdir)
+ assert (
+ caplog.text.count(
+ f"REDACTED config part {tmpdir}/conf-1.cfg for non-root user"
+ )
+ == 1
+ )
+ assert m_read_conf.call_count == 2
+ out, err = capsys.readouterr()
+ assert not out
+ assert not err
+
+ @pytest.mark.parametrize(
+ "create_confd,expected_call",
+ [
+ (False, mock.call(deque())),
+ (True, mock.call(deque([{"my_config": "foo"}]))),
+ ],
+ )
+ @mock.patch(M_PATH + "mergemanydict")
+ @mock.patch(M_PATH + "read_conf_d", return_value={"my_config": "foo"})
+ @mock.patch(
+ M_PATH + "read_conf", side_effect=OSError(errno.EACCES, "Not allowed")
+ )
+ def test_read_conf_with_confd_no_permissions(
+ self,
+ m_read_conf,
+ m_read_confd,
+ m_mergemanydict,
+ create_confd,
+ expected_call,
+ caplog,
+ capsys,
+ tmpdir,
+ ):
+ """Read a conf file without permission.
+
+ sys output is empty and the user is informed via logging warnings.
+
+ Note: This is used in cmd, therefore want to keep the invariant of
+ not outputing to the console and log file permission errors.
+ """
+ conf_fn = tmpdir.join("conf.cfg")
+ if create_confd:
+ confd_fn = tmpdir.mkdir("conf.cfg.d")
+ util.read_conf_with_confd(conf_fn)
+ assert (
+ caplog.text.count(
+ f"REDACTED config part {conf_fn} for non-root user"
+ )
+ == 1
+ )
+ assert m_read_conf.call_count == 1
+ out, err = capsys.readouterr()
+ assert not out
+ assert not err
+ if create_confd:
+ assert [mock.call(confd_fn)] == m_read_confd.call_args_list
+ assert [expected_call] == m_mergemanydict.call_args_list
class TestSymlink(CiTestCase):
@@ -412,9 +494,9 @@ class TestSymlink(CiTestCase):
class TestUptime(CiTestCase):
- @mock.patch("cloudinit.util.boottime")
- @mock.patch("cloudinit.util.os.path.exists")
- @mock.patch("cloudinit.util.time.time")
+ @mock.patch(M_PATH + "boottime")
+ @mock.patch(M_PATH + "os.path.exists")
+ @mock.patch(M_PATH + "time.time")
def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime):
boottime = 1000.0
uptime = 10.0
@@ -688,7 +770,7 @@ class TestGetLinuxDistro(CiTestCase):
if path == "/etc/redhat-release":
return 1
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists):
"""Verify we get the correct name if the os-release file has
the distro name in quotes"""
@@ -697,7 +779,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("sles", "12.3", platform.machine()), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists):
"""Verify we get the correct name if the os-release file does not
have the distro name in quotes"""
@@ -708,7 +790,7 @@ class TestGetLinuxDistro(CiTestCase):
@mock.patch("platform.system")
@mock.patch("platform.release")
- @mock.patch("cloudinit.util._parse_redhat_release")
+ @mock.patch(M_PATH + "_parse_redhat_release")
def test_get_linux_freebsd(
self,
m_parse_redhat_release,
@@ -725,7 +807,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("freebsd", "12.0-RELEASE-p10", ""), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_centos6(self, m_os_release, m_path_exists):
"""Verify we get the correct name and release name on CentOS 6."""
m_os_release.return_value = REDHAT_RELEASE_CENTOS_6
@@ -733,7 +815,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("centos", "6.10", "Final"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists):
"""Verify the correct release info on CentOS 7 without os-release."""
m_os_release.return_value = REDHAT_RELEASE_CENTOS_7
@@ -741,7 +823,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("centos", "7.5.1804", "Core"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists):
"""Verify redhat 7 read from os-release."""
m_os_release.return_value = OS_RELEASE_REDHAT_7
@@ -749,7 +831,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("redhat", "7.5", "Maipo"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists):
"""Verify redhat 7 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_REDHAT_7
@@ -757,7 +839,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("redhat", "7.5", "Maipo"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists):
"""Verify redhat 6 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_REDHAT_6
@@ -765,7 +847,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("redhat", "6.10", "Santiago"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_copr_centos(self, m_os_release, m_path_exists):
"""Verify we get the correct name and release name on COPR CentOS."""
m_os_release.return_value = OS_RELEASE_CENTOS
@@ -773,7 +855,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("centos", "7", "Core"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_almalinux8_rhrelease(self, m_os_release, m_path_exists):
"""Verify almalinux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_ALMALINUX_8
@@ -781,7 +863,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_almalinux8_osrelease(self, m_os_release, m_path_exists):
"""Verify almalinux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_ALMALINUX_8
@@ -789,7 +871,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_eurolinux7_rhrelease(self, m_os_release, m_path_exists):
"""Verify eurolinux 7 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_7
@@ -797,7 +879,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("eurolinux", "7.9", "Minsk"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_eurolinux7_osrelease(self, m_os_release, m_path_exists):
"""Verify eurolinux 7 read from os-release."""
m_os_release.return_value = OS_RELEASE_EUROLINUX_7
@@ -805,7 +887,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("eurolinux", "7.9", "Minsk"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_eurolinux8_rhrelease(self, m_os_release, m_path_exists):
"""Verify eurolinux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_8
@@ -813,7 +895,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_eurolinux8_osrelease(self, m_os_release, m_path_exists):
"""Verify eurolinux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_EUROLINUX_8
@@ -821,7 +903,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_miraclelinux8_rhrelease(
self, m_os_release, m_path_exists
):
@@ -831,7 +913,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("miracle", "8.4", "Peony"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_miraclelinux8_osrelease(
self, m_os_release, m_path_exists
):
@@ -841,7 +923,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("miraclelinux", "8", "Peony"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_rocky8_rhrelease(self, m_os_release, m_path_exists):
"""Verify rocky linux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_ROCKY_8
@@ -849,7 +931,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_rocky8_osrelease(self, m_os_release, m_path_exists):
"""Verify rocky linux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_ROCKY_8
@@ -857,7 +939,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_virtuozzo8_rhrelease(self, m_os_release, m_path_exists):
"""Verify virtuozzo linux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_VIRTUOZZO_8
@@ -865,7 +947,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_virtuozzo8_osrelease(self, m_os_release, m_path_exists):
"""Verify virtuozzo linux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_VIRTUOZZO_8
@@ -873,7 +955,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_cloud8_rhrelease(self, m_os_release, m_path_exists):
"""Verify cloudlinux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_CLOUDLINUX_8
@@ -881,7 +963,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_cloud8_osrelease(self, m_os_release, m_path_exists):
"""Verify cloudlinux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_CLOUDLINUX_8
@@ -889,7 +971,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_debian(self, m_os_release, m_path_exists):
"""Verify we get the correct name and release name on Debian."""
m_os_release.return_value = OS_RELEASE_DEBIAN
@@ -897,7 +979,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("debian", "9", "stretch"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_openeuler(self, m_os_release, m_path_exists):
"""Verify get the correct name and release name on Openeuler."""
m_os_release.return_value = OS_RELEASE_OPENEULER_20
@@ -905,7 +987,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("openEuler", "20.03", "LTS-SP2"), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_opensuse(self, m_os_release, m_path_exists):
"""Verify we get the correct name and machine arch on openSUSE
prior to openSUSE Leap 15.
@@ -915,7 +997,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("opensuse", "42.3", platform.machine()), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists):
"""Verify we get the correct name and machine arch on openSUSE
for openSUSE Leap 15.0 and later.
@@ -925,7 +1007,7 @@ class TestGetLinuxDistro(CiTestCase):
dist = util.get_linux_distro()
self.assertEqual(("opensuse-leap", "15.0", platform.machine()), dist)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists):
"""Verify we get the correct name and machine arch on openSUSE
for openSUSE Tumbleweed
@@ -937,7 +1019,7 @@ class TestGetLinuxDistro(CiTestCase):
("opensuse-tumbleweed", "20180920", platform.machine()), dist
)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_get_linux_photon_os_release(self, m_os_release, m_path_exists):
"""Verify we get the correct name and machine arch on PhotonOS"""
m_os_release.return_value = OS_RELEASE_PHOTON
@@ -1059,6 +1141,9 @@ class TestIsLXD(CiTestCase):
class TestReadCcFromCmdline:
+
+ random_string: Tuple
+
if hasattr(pytest, "param"):
random_string = pytest.param(
CiTestCase.random_string(), None, id="random_string"
@@ -1182,8 +1267,8 @@ class TestMountCb:
"""Mock an already-mounted device, and yield (device, mount dict)"""
device = "/dev/fake0"
mountpoint = "/mnt/fake"
- with mock.patch("cloudinit.util.subp.subp"):
- with mock.patch("cloudinit.util.mounts") as m_mounts:
+ with mock.patch(M_PATH + "subp.subp"):
+ with mock.patch(M_PATH + "mounts") as m_mounts:
mounts = {device: {"mountpoint": mountpoint}}
m_mounts.return_value = mounts
yield device, mounts[device]
@@ -1206,9 +1291,9 @@ class TestMountCb:
("ufs", "ufs"),
],
)
- @mock.patch("cloudinit.util.is_Linux", autospec=True)
- @mock.patch("cloudinit.util.is_BSD", autospec=True)
- @mock.patch("cloudinit.util.subp.subp")
+ @mock.patch(M_PATH + "is_Linux", autospec=True)
+ @mock.patch(M_PATH + "is_BSD", autospec=True)
+ @mock.patch(M_PATH + "subp.subp")
@mock.patch("cloudinit.temp_utils.tempdir", autospec=True)
def test_normalize_mtype_on_bsd(
self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected
@@ -1245,7 +1330,7 @@ class TestMountCb:
with pytest.raises(TypeError):
util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype)
- @mock.patch("cloudinit.util.subp.subp")
+ @mock.patch(M_PATH + "subp.subp")
def test_already_mounted_does_not_mount_or_umount_anything(
self, m_subp, already_mounted_device
):
@@ -1281,7 +1366,7 @@ class TestMountCb:
] == callback.call_args_list
-@mock.patch("cloudinit.util.write_file")
+@mock.patch(M_PATH + "write_file")
class TestEnsureFile:
"""Tests for ``cloudinit.util.ensure_file``."""
@@ -1326,9 +1411,9 @@ class TestEnsureFile:
assert "ab" == kwargs["omode"]
-@mock.patch("cloudinit.util.grp.getgrnam")
-@mock.patch("cloudinit.util.os.setgid")
-@mock.patch("cloudinit.util.os.umask")
+@mock.patch(M_PATH + "grp.getgrnam")
+@mock.patch(M_PATH + "os.setgid")
+@mock.patch(M_PATH + "os.umask")
class TestRedirectOutputPreexecFn:
"""This tests specifically the preexec_fn used in redirect_output."""
@@ -1344,7 +1429,7 @@ class TestRedirectOutputPreexecFn:
args = (test_string, None)
elif request.param == "errfmt":
args = (None, test_string)
- with mock.patch("cloudinit.util.subprocess.Popen") as m_popen:
+ with mock.patch(M_PATH + "subprocess.Popen") as m_popen:
util.redirect_output(*args)
assert 1 == m_popen.call_count
@@ -1778,7 +1863,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
expected = ("none", "tmpfs", "/run/lock")
self.assertEqual(expected, util.parse_mount_info("/run/lock", lines))
- @mock.patch("cloudinit.util.os")
+ @mock.patch(M_PATH + "os")
@mock.patch("cloudinit.subp.subp")
def test_get_device_info_from_zpool(self, zpool_output, m_os):
# mock /dev/zfs exists
@@ -1794,7 +1879,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
self.assertIsNotNone(ret)
m_os.path.exists.assert_called_with("/dev/zfs")
- @mock.patch("cloudinit.util.os")
+ @mock.patch(M_PATH + "os")
def test_get_device_info_from_zpool_no_dev_zfs(self, m_os):
# mock /dev/zfs missing
m_os.path.exists.return_value = False
@@ -1802,7 +1887,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
ret = util.get_device_info_from_zpool("vmzroot")
self.assertIsNone(ret)
- @mock.patch("cloudinit.util.os")
+ @mock.patch(M_PATH + "os")
@mock.patch("cloudinit.subp.subp")
def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os):
"""Handle case where there is no zpool command"""
@@ -1812,7 +1897,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
ret = util.get_device_info_from_zpool("vmzroot")
self.assertIsNone(ret)
- @mock.patch("cloudinit.util.os")
+ @mock.patch(M_PATH + "os")
@mock.patch("cloudinit.subp.subp")
def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os):
# mock /dev/zfs exists
@@ -1879,7 +1964,7 @@ class TestIsX86(helpers.CiTestCase):
util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch
)
- @mock.patch("cloudinit.util.os.uname")
+ @mock.patch(M_PATH + "os.uname")
def test_is_x86_calls_uname_for_architecture(self, m_uname):
"""is_x86 returns True if platform from uname matches."""
m_uname.return_value = [0, 1, 2, 3, "x86_64"]
@@ -1987,7 +2072,7 @@ class TestMultiLog(helpers.FilesystemMockingTestCase):
self.assertEqual("", self.stdout.getvalue())
@mock.patch(
- "cloudinit.util.write_to_console",
+ M_PATH + "write_to_console",
mock.Mock(side_effect=OSError("Failed to write to console")),
)
def test_logs_go_to_stdout_if_writing_to_console_fails_and_fallback_true(
@@ -2001,7 +2086,7 @@ class TestMultiLog(helpers.FilesystemMockingTestCase):
)
@mock.patch(
- "cloudinit.util.write_to_console",
+ M_PATH + "write_to_console",
mock.Mock(side_effect=OSError("Failed to write to console")),
)
def test_logs_go_nowhere_if_writing_to_console_fails_and_fallback_false(
@@ -2210,7 +2295,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
self.reRoot(root_d)
self.assertTrue(util.system_is_snappy())
- @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch(M_PATH + "get_cmdline")
def test_bad_content_in_os_release_no_effect(self, m_cmdline):
"""malformed os-release should not raise exception."""
m_cmdline.return_value = "root=/dev/sda"
@@ -2220,7 +2305,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
self.reRoot()
self.assertFalse(util.system_is_snappy())
- @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch(M_PATH + "get_cmdline")
def test_snap_core_in_cmdline_is_snappy(self, m_cmdline):
"""The string snap_core= in kernel cmdline indicates snappy."""
cmdline = (
@@ -2233,7 +2318,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
self.assertTrue(util.system_is_snappy())
self.assertTrue(m_cmdline.call_count > 0)
- @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch(M_PATH + "get_cmdline")
def test_nothing_found_is_not_snappy(self, m_cmdline):
"""If no positive identification, then not snappy."""
m_cmdline.return_value = "root=/dev/sda"
@@ -2241,7 +2326,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
self.assertFalse(util.system_is_snappy())
self.assertTrue(m_cmdline.call_count > 0)
- @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch(M_PATH + "get_cmdline")
def test_channel_ini_with_snappy_is_snappy(self, m_cmdline):
"""A Channel.ini file with 'ubuntu-core' indicates snappy."""
m_cmdline.return_value = "root=/dev/sda"
@@ -2251,7 +2336,7 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
self.reRoot(root_d)
self.assertTrue(util.system_is_snappy())
- @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch(M_PATH + "get_cmdline")
def test_system_image_config_dir_is_snappy(self, m_cmdline):
"""Existence of /etc/system-image/config.d indicates snappy."""
m_cmdline.return_value = "root=/dev/sda"
@@ -2296,7 +2381,7 @@ class TestGetProcEnv(helpers.TestCase):
# return the value portion of key=val decoded.
return blob.split(b"=", 1)[1].decode(encoding, errors)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_non_utf8_in_environment(self, m_load_file):
"""env may have non utf-8 decodable content."""
content = self.null.join(
@@ -2315,7 +2400,7 @@ class TestGetProcEnv(helpers.TestCase):
)
self.assertEqual(1, m_load_file.call_count)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_encoding_none_returns_bytes(self, m_load_file):
"""encoding none returns bytes."""
lines = (self.bootflag, self.simple1, self.simple2, self.mixed)
@@ -2328,7 +2413,7 @@ class TestGetProcEnv(helpers.TestCase):
)
self.assertEqual(1, m_load_file.call_count)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_all_utf8_encoded(self, m_load_file):
"""common path where only utf-8 decodable content."""
content = self.null.join((self.simple1, self.simple2))
@@ -2338,7 +2423,7 @@ class TestGetProcEnv(helpers.TestCase):
)
self.assertEqual(1, m_load_file.call_count)
- @mock.patch("cloudinit.util.load_file")
+ @mock.patch(M_PATH + "load_file")
def test_non_existing_file_returns_empty_dict(self, m_load_file):
"""as implemented, a non-existing pid returns empty dict.
This is how it was originally implemented."""