summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordermotbradley <dermot_bradley@yahoo.com>2023-02-15 04:58:10 +0000
committerGitHub <noreply@github.com>2023-02-14 21:58:10 -0700
commitba3d611a7267ca6ac89cf7bb03fff4a14be9b5c0 (patch)
tree35b035773273ff4c9927df86f976f63cd53a0a91
parentbb414c7866c4728b2105e84f7b426ab81cc4bf4d (diff)
downloadcloud-init-git-ba3d611a7267ca6ac89cf7bb03fff4a14be9b5c0.tar.gz
Overhaul/rewrite of certificate handling as follows: (#1962)
Change "ca-certs" references to "ca_certs". New certificates are written to individual files, with an incrementing number as part of their filename, rather than all being placed in a single file. This resolves issues caused when certificate files containing more than a single certificate are placed in /etc/ssl/certs (by utilities such as "update-ca-certificates" run by ca_certs). Alpine / Debian / Ubuntu: The current behaviour, whilst it works, is incorrect with regard to the design of the underlying OS utilities for managing certificates. For "remove_defaults" the system-installed certificate files should not be actually deleted (otherwise it becomes problematic if someone wishes to later re-enable one or more of them), rather they should be deactivated and these OSes already provide the means to do so - this MR modifies the certificate entries in the /etc/ca-certificates.conf file by prefixing them with "!" - when the update-ca-certificate utility is then run it will *not* place such delimited certificates into either the /etc/ssl/certs/ directory (via symlinks) nor add them to the (re)generated certificates bundle file. Additionally it is incorrect for added certificates to be placed in the /usr/share/ca-certificates directory - this location is intended for standard/"official" certificates, the /usr/local/share/ca-certificates directory is intended for "local" or "site-specific" certificates and so this PR adds them there instead - for certs in /usr/local/share/ca-certificates the update-ca-certificates utility will automatically use them, there is *no* need to add their filenames to the /etc/ca-certificates.conf file. LP: #1931174
-rw-r--r--cloudinit/config/cc_ca_certs.py165
-rw-r--r--doc/examples/cloud-config-ca-certs.txt11
-rw-r--r--tests/integration_tests/modules/test_ca_certs.py6
-rw-r--r--tests/unittests/config/test_cc_ca_certs.py242
4 files changed, 199 insertions, 225 deletions
diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py
index 302a67a4..169b0e18 100644
--- a/cloudinit/config/cc_ca_certs.py
+++ b/cloudinit/config/cc_ca_certs.py
@@ -8,45 +8,47 @@ import os
from logging import Logger
from textwrap import dedent
+from cloudinit import log as logging
from cloudinit import subp, util
from cloudinit.cloud import Cloud
from cloudinit.config import Config
from cloudinit.config.schema import MetaSchema, get_meta_doc
from cloudinit.settings import PER_INSTANCE
+LOG = logging.getLogger(__name__)
+
DEFAULT_CONFIG = {
- "ca_cert_path": "/usr/share/ca-certificates/",
- "ca_cert_filename": "cloud-init-ca-certs.crt",
+ "ca_cert_path": None,
+ "ca_cert_local_path": "/usr/local/share/ca-certificates/",
+ "ca_cert_filename": "cloud-init-ca-cert-{cert_index}.crt",
"ca_cert_config": "/etc/ca-certificates.conf",
- "ca_cert_system_path": "/etc/ssl/certs/",
"ca_cert_update_cmd": ["update-ca-certificates"],
}
DISTRO_OVERRIDES = {
"rhel": {
- "ca_cert_path": "/usr/share/pki/ca-trust-source/",
- "ca_cert_filename": "anchors/cloud-init-ca-certs.crt",
+ "ca_cert_path": "/etc/pki/ca-trust/",
+ "ca_cert_local_path": "/usr/share/pki/ca-trust-source/",
+ "ca_cert_filename": "anchors/cloud-init-ca-cert-{cert_index}.crt",
"ca_cert_config": None,
- "ca_cert_system_path": "/etc/pki/ca-trust/",
"ca_cert_update_cmd": ["update-ca-trust"],
- }
+ },
}
MODULE_DESCRIPTION = """\
-This module adds CA certificates to ``/etc/ca-certificates.conf`` and updates
-the ssl cert cache using ``update-ca-certificates``. The default certificates
-can be removed from the system with the configuration option
-``remove_defaults``.
+This module adds CA certificates to the system's CA store and updates any
+related files using the appropriate OS-specific utility. The default CA
+certificates can be disabled/deleted from use by the system with the
+configuration option ``remove_defaults``.
.. note::
certificates must be specified using valid yaml. in order to specify a
multiline certificate, the yaml multiline list syntax must be used
.. note::
- For Alpine Linux the "remove_defaults" functionality works if the
- ca-certificates package is installed but not if the
- ca-certificates-bundle package is installed.
+ Alpine Linux requires the ca-certificates package to be installed in
+ order to provide the ``update-ca-certificates`` command.
"""
-distros = ["alpine", "debian", "ubuntu", "rhel"]
+distros = ["alpine", "debian", "rhel", "ubuntu"]
meta: MetaSchema = {
"id": "cc_ca_certs",
@@ -79,11 +81,11 @@ def _distro_ca_certs_configs(distro_name):
"""Return a distro-specific ca_certs config dictionary
@param distro_name: String providing the distro class name.
- @returns: Dict of distro configurations for ca-cert.
+ @returns: Dict of distro configurations for ca_cert.
"""
cfg = DISTRO_OVERRIDES.get(distro_name, DEFAULT_CONFIG)
cfg["ca_cert_full_path"] = os.path.join(
- cfg["ca_cert_path"], cfg["ca_cert_filename"]
+ cfg["ca_cert_local_path"], cfg["ca_cert_filename"]
)
return cfg
@@ -100,124 +102,145 @@ def update_ca_certs(distro_cfg):
def add_ca_certs(distro_cfg, certs):
"""
Adds certificates to the system. To actually apply the new certificates
- you must also call L{update_ca_certs}.
+ you must also call the appropriate distro-specific utility such as
+ L{update_ca_certs}.
@param distro_cfg: A hash providing _distro_ca_certs_configs function.
@param certs: A list of certificate strings.
"""
if not certs:
return
- # First ensure they are strings...
- cert_file_contents = "\n".join([str(c) for c in certs])
- util.write_file(
- distro_cfg["ca_cert_full_path"], cert_file_contents, mode=0o644
- )
- update_cert_config(distro_cfg)
+ # Write each certificate to a separate file.
+ for cert_index, c in enumerate(certs, 1):
+ # First ensure they are strings...
+ cert_file_contents = str(c)
+ cert_file_name = distro_cfg["ca_cert_full_path"].format(
+ cert_index=cert_index
+ )
+ util.write_file(cert_file_name, cert_file_contents, mode=0o644)
-def update_cert_config(distro_cfg):
+def disable_default_ca_certs(distro_name, distro_cfg):
"""
- Update Certificate config file to add the file path managed cloud-init
+ Disables all default trusted CA certificates. For Alpine, Debian and
+ Ubuntu to actually apply the changes you must also call
+ L{update_ca_certs}.
+
+ @param distro_name: String providing the distro class name.
+ @param distro_cfg: A hash providing _distro_ca_certs_configs function.
+ """
+ if distro_name == "rhel":
+ remove_default_ca_certs(distro_cfg)
+ elif distro_name in ["alpine", "debian", "ubuntu"]:
+ disable_system_ca_certs(distro_cfg)
+
+ if distro_name in ["debian", "ubuntu"]:
+ debconf_sel = (
+ "ca-certificates ca-certificates/trust_new_crts " + "select no"
+ )
+ subp.subp(("debconf-set-selections", "-"), debconf_sel)
+
+
+def disable_system_ca_certs(distro_cfg):
+ """
+ For every entry in the CA_CERT_CONFIG file prefix the entry with a "!"
+ in order to disable it.
@param distro_cfg: A hash providing _distro_ca_certs_configs function.
"""
if distro_cfg["ca_cert_config"] is None:
return
- if os.stat(distro_cfg["ca_cert_config"]).st_size == 0:
- # If the CA_CERT_CONFIG file is empty (i.e. all existing
- # CA certs have been deleted) then simply output a single
- # line with the cloud-init cert filename.
- out = "%s\n" % distro_cfg["ca_cert_filename"]
- else:
- # Append cert filename to CA_CERT_CONFIG file.
- # We have to strip the content because blank lines in the file
- # causes subsequent entries to be ignored. (LP: #1077020)
+ header_comment = (
+ "# Modified by cloud-init to deselect certs due to user-data"
+ )
+ added_header = False
+ if os.stat(distro_cfg["ca_cert_config"]).st_size != 0:
orig = util.load_file(distro_cfg["ca_cert_config"])
- cr_cont = "\n".join(
- [
- line
- for line in orig.splitlines()
- if line != distro_cfg["ca_cert_filename"]
- ]
- )
- out = "%s\n%s\n" % (cr_cont.rstrip(), distro_cfg["ca_cert_filename"])
- util.write_file(distro_cfg["ca_cert_config"], out, omode="wb")
+ out_lines = []
+ for line in orig.splitlines():
+ if line == header_comment:
+ added_header = True
+ out_lines.append(line)
+ elif line == "" or line[0] in ("#", "!"):
+ out_lines.append(line)
+ else:
+ if not added_header:
+ out_lines.append(header_comment)
+ added_header = True
+ out_lines.append("!" + line)
+ util.write_file(
+ distro_cfg["ca_cert_config"], "\n".join(out_lines) + "\n", omode="wb"
+ )
-def remove_default_ca_certs(distro_name, distro_cfg):
+def remove_default_ca_certs(distro_cfg):
"""
- Removes all default trusted CA certificates from the system. To actually
- apply the change you must also call L{update_ca_certs}.
+ Removes all default trusted CA certificates from the system.
- @param distro_name: String providing the distro class name.
@param distro_cfg: A hash providing _distro_ca_certs_configs function.
"""
- util.delete_dir_contents(distro_cfg["ca_cert_path"])
- util.delete_dir_contents(distro_cfg["ca_cert_system_path"])
- util.write_file(distro_cfg["ca_cert_config"], "", mode=0o644)
+ if distro_cfg["ca_cert_path"] is None:
+ return
- if distro_name in ["debian", "ubuntu"]:
- debconf_sel = (
- "ca-certificates ca-certificates/trust_new_crts " + "select no"
- )
- subp.subp(("debconf-set-selections", "-"), debconf_sel)
+ LOG.debug("Deleting system CA certificates")
+ util.delete_dir_contents(distro_cfg["ca_cert_path"])
+ util.delete_dir_contents(distro_cfg["ca_cert_local_path"])
def handle(
name: str, cfg: Config, cloud: Cloud, log: Logger, args: list
) -> None:
"""
- Call to handle ca-cert sections in cloud-config file.
+ Call to handle ca_cert sections in cloud-config file.
- @param name: The module name "ca-cert" from cloud.cfg
+ @param name: The module name "ca_cert" from cloud.cfg
@param cfg: A nested dict containing the entire cloud config contents.
@param cloud: The L{CloudInit} object in use.
@param log: Pre-initialized Python logger object to use for logging.
@param args: Any module arguments from cloud.cfg
"""
if "ca-certs" in cfg:
- log.warning(
+ LOG.warning(
"DEPRECATION: key 'ca-certs' is now deprecated. Use 'ca_certs'"
" instead."
)
elif "ca_certs" not in cfg:
- log.debug(
+ LOG.debug(
"Skipping module named %s, no 'ca_certs' key in configuration",
name,
)
return
if "ca-certs" in cfg and "ca_certs" in cfg:
- log.warning(
+ LOG.warning(
"Found both ca-certs (deprecated) and ca_certs config keys."
" Ignoring ca-certs."
)
ca_cert_cfg = cfg.get("ca_certs", cfg.get("ca-certs"))
distro_cfg = _distro_ca_certs_configs(cloud.distro.name)
- # If there is a remove_defaults option set to true, remove the system
+ # If there is a remove_defaults option set to true, disable the system
# default trusted CA certs first.
if "remove-defaults" in ca_cert_cfg:
- log.warning(
+ LOG.warning(
"DEPRECATION: key 'ca-certs.remove-defaults' is now deprecated."
" Use 'ca_certs.remove_defaults' instead."
)
- if ca_cert_cfg.get("remove-defaults", False):
- log.debug("Removing default certificates")
- remove_default_ca_certs(cloud.distro.name, distro_cfg)
- elif ca_cert_cfg.get("remove_defaults", False):
- log.debug("Removing default certificates")
- remove_default_ca_certs(cloud.distro.name, distro_cfg)
+ if ca_cert_cfg.get(
+ "remove_defaults", ca_cert_cfg.get("remove-defaults", False)
+ ):
+ LOG.debug("Disabling/removing default certificates")
+ disable_default_ca_certs(cloud.distro.name, distro_cfg)
# If we are given any new trusted CA certs to add, add them.
if "trusted" in ca_cert_cfg:
trusted_certs = util.get_cfg_option_list(ca_cert_cfg, "trusted")
if trusted_certs:
- log.debug("Adding %d certificates" % len(trusted_certs))
+ LOG.debug("Adding %d certificates", len(trusted_certs))
add_ca_certs(distro_cfg, trusted_certs)
# Update the system with the new cert configuration.
- log.debug("Updating certificates")
+ LOG.debug("Updating certificates")
update_ca_certs(distro_cfg)
diff --git a/doc/examples/cloud-config-ca-certs.txt b/doc/examples/cloud-config-ca-certs.txt
index 9f7beb05..427465d4 100644
--- a/doc/examples/cloud-config-ca-certs.txt
+++ b/doc/examples/cloud-config-ca-certs.txt
@@ -8,11 +8,12 @@
# It should be passed as user-data when starting the instance.
ca_certs:
- # If present and set to True, the 'remove_defaults' parameter will remove
- # all the default trusted CA certificates that are normally shipped with
- # Ubuntu.
- # This is mainly for paranoid admins - most users will not need this
- # functionality.
+ # If present and set to True, the 'remove_defaults' parameter will either
+ # disable all the trusted CA certifications normally shipped with
+ # Alpine, Debian or Ubuntu. On RedHat, this action will delete those
+ # certificates.
+ # This is mainly for very security-sensitive use cases - most users will not
+ # need this functionality.
remove_defaults: true
# If present, the 'trusted' parameter should contain a certificate (or list
diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py
index 8d18fb76..2baedda9 100644
--- a/tests/integration_tests/modules/test_ca_certs.py
+++ b/tests/integration_tests/modules/test_ca_certs.py
@@ -76,10 +76,10 @@ class TestCaCerts:
unlinked_files.append(filename)
assert ["ca-certificates.crt"] == unlinked_files
- assert "cloud-init-ca-certs.pem" == links["a535c1f3.0"]
+ assert "cloud-init-ca-cert-1.pem" == links["a535c1f3.0"]
assert (
- "/usr/share/ca-certificates/cloud-init-ca-certs.crt"
- == links["cloud-init-ca-certs.pem"]
+ "/usr/local/share/ca-certificates/cloud-init-ca-cert-1.crt"
+ == links["cloud-init-ca-cert-1.pem"]
)
def test_cert_installed(self, class_client: IntegrationInstance):
diff --git a/tests/unittests/config/test_cc_ca_certs.py b/tests/unittests/config/test_cc_ca_certs.py
index a0b402ac..5f5a5843 100644
--- a/tests/unittests/config/test_cc_ca_certs.py
+++ b/tests/unittests/config/test_cc_ca_certs.py
@@ -74,15 +74,19 @@ class TestConfig(TestCase):
mock.patch.object(cc_ca_certs, "update_ca_certs")
)
self.mock_remove = self.mocks.enter_context(
- mock.patch.object(cc_ca_certs, "remove_default_ca_certs")
+ mock.patch.object(cc_ca_certs, "disable_default_ca_certs")
)
- def test_no_trusted_list(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_no_trusted_list(self, _):
"""
Test that no certificates are written if the 'trusted' key is not
present.
"""
- config = {"ca-certs": {}}
+ config = {"ca_certs": {}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -93,9 +97,13 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_empty_trusted_list(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_empty_trusted_list(self, _):
"""Test that no certificate are written if 'trusted' list is empty."""
- config = {"ca-certs": {"trusted": []}}
+ config = {"ca_certs": {"trusted": []}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -106,9 +114,13 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_single_trusted(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_single_trusted(self, _):
"""Test that a single cert gets passed to add_ca_certs."""
- config = {"ca-certs": {"trusted": ["CERT1"]}}
+ config = {"ca_certs": {"trusted": ["CERT1"]}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -120,9 +132,13 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_multiple_trusted(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_multiple_trusted(self, _):
"""Test that multiple certs get passed to add_ca_certs."""
- config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
+ config = {"ca_certs": {"trusted": ["CERT1", "CERT2"]}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -134,7 +150,11 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_remove_default_ca_certs(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_remove_default_ca_certs(self, _):
"""Test remove_defaults works as expected."""
config = {"ca_certs": {"remove_defaults": True}}
@@ -147,7 +167,11 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 1)
- def test_no_remove_defaults_if_false(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_no_remove_defaults_if_false(self, _):
"""Test remove_defaults is not called when config value is False."""
config = {"ca_certs": {"remove_defaults": False}}
@@ -160,8 +184,14 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_correct_order_for_remove_then_add(self):
- """Test remove_defaults is not called when config value is False."""
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_correct_order_for_remove_then_add(self, _):
+ """
+ Test remove_defaults is called before add.
+ """
config = {"ca_certs": {"remove_defaults": True, "trusted": ["CERT1"]}}
for distro_name in cc_ca_certs.distros:
@@ -170,9 +200,9 @@ class TestConfig(TestCase):
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
+ self.assertEqual(self.mock_remove.call_count, 1)
self.mock_add.assert_called_once_with(conf, ["CERT1"])
self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 1)
class TestAddCaCerts(TestCase):
@@ -200,51 +230,10 @@ class TestAddCaCerts(TestCase):
cc_ca_certs.add_ca_certs(conf, [])
self.assertEqual(mockobj.call_count, 0)
- def test_single_cert_trailing_cr(self):
- """Test adding a single certificate to the trusted CAs
- when existing ca-certificates has trailing newline"""
- cert = "CERT1\nLINE2\nLINE3"
-
- ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
- expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
-
- self.m_stat.return_value.st_size = 1
-
- for distro_name in cc_ca_certs.distros:
- conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
-
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, "write_file")
- )
- mock_load = mocks.enter_context(
- mock.patch.object(
- util, "load_file", return_value=ca_certs_content
- )
- )
-
- cc_ca_certs.add_ca_certs(conf, [cert])
-
- mock_write.assert_has_calls(
- [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
- )
- if conf["ca_cert_config"] is not None:
- mock_write.assert_has_calls(
- [
- mock.call(
- conf["ca_cert_config"], expected, omode="wb"
- )
- ]
- )
- mock_load.assert_called_once_with(conf["ca_cert_config"])
-
- def test_single_cert_no_trailing_cr(self):
- """Test adding a single certificate to the trusted CAs
- when existing ca-certificates has no trailing newline"""
+ def test_single_cert(self):
+ """Test adding a single certificate to the trusted CAs."""
cert = "CERT1\nLINE2\nLINE3"
- ca_certs_content = "line1\nline2\nline3"
-
self.m_stat.return_value.st_size = 1
for distro_name in cc_ca_certs.distros:
@@ -254,65 +243,24 @@ class TestAddCaCerts(TestCase):
mock_write = mocks.enter_context(
mock.patch.object(util, "write_file")
)
- mock_load = mocks.enter_context(
- mock.patch.object(
- util, "load_file", return_value=ca_certs_content
- )
- )
cc_ca_certs.add_ca_certs(conf, [cert])
mock_write.assert_has_calls(
- [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
- )
- if conf["ca_cert_config"] is not None:
- mock_write.assert_has_calls(
- [
- mock.call(
- conf["ca_cert_config"],
- "%s\n%s\n"
- % (ca_certs_content, conf["ca_cert_filename"]),
- omode="wb",
- )
- ]
- )
-
- mock_load.assert_called_once_with(conf["ca_cert_config"])
-
- def test_single_cert_to_empty_existing_ca_file(self):
- """Test adding a single certificate to the trusted CAs
- when existing ca-certificates.conf is empty"""
- cert = "CERT1\nLINE2\nLINE3"
-
- expected = "cloud-init-ca-certs.crt\n"
-
- self.m_stat.return_value.st_size = 0
-
- for distro_name in cc_ca_certs.distros:
- conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- with mock.patch.object(
- util, "write_file", autospec=True
- ) as m_write:
-
- cc_ca_certs.add_ca_certs(conf, [cert])
-
- m_write.assert_has_calls(
- [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
+ [
+ mock.call(
+ conf["ca_cert_full_path"].format(cert_index=1),
+ cert,
+ mode=0o644,
+ )
+ ]
)
- if conf["ca_cert_config"] is not None:
- m_write.assert_has_calls(
- [
- mock.call(
- conf["ca_cert_config"], expected, omode="wb"
- )
- ]
- )
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
- expected_cert_file = "\n".join(certs)
- ca_certs_content = "line1\nline2\nline3"
+ expected_cert_1_file = certs[0]
+ expected_cert_2_file = certs[1]
self.m_stat.return_value.st_size = 1
@@ -323,36 +271,23 @@ class TestAddCaCerts(TestCase):
mock_write = mocks.enter_context(
mock.patch.object(util, "write_file")
)
- mock_load = mocks.enter_context(
- mock.patch.object(
- util, "load_file", return_value=ca_certs_content
- )
- )
cc_ca_certs.add_ca_certs(conf, certs)
mock_write.assert_has_calls(
[
mock.call(
- conf["ca_cert_full_path"],
- expected_cert_file,
+ conf["ca_cert_full_path"].format(cert_index=1),
+ expected_cert_1_file,
mode=0o644,
- )
+ ),
+ mock.call(
+ conf["ca_cert_full_path"].format(cert_index=2),
+ expected_cert_2_file,
+ mode=0o644,
+ ),
]
)
- if conf["ca_cert_config"] is not None:
- mock_write.assert_has_calls(
- [
- mock.call(
- conf["ca_cert_config"],
- "%s\n%s\n"
- % (ca_certs_content, conf["ca_cert_filename"]),
- omode="wb",
- )
- ]
- )
-
- mock_load.assert_called_once_with(conf["ca_cert_config"])
class TestUpdateCaCerts(unittest.TestCase):
@@ -378,6 +313,12 @@ class TestRemoveDefaultCaCerts(TestCase):
)
def test_commands(self):
+ ca_certs_content = "# line1\nline2\nline3\n"
+ expected = (
+ "# line1\n# Modified by cloud-init to deselect certs due to"
+ " user-data\n!line2\n!line3\n"
+ )
+
for distro_name in cc_ca_certs.distros:
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
@@ -385,33 +326,42 @@ class TestRemoveDefaultCaCerts(TestCase):
mock_delete = mocks.enter_context(
mock.patch.object(util, "delete_dir_contents")
)
- mock_write = mocks.enter_context(
- mock.patch.object(util, "write_file")
+ mock_load = mocks.enter_context(
+ mock.patch.object(
+ util, "load_file", return_value=ca_certs_content
+ )
)
mock_subp = mocks.enter_context(
mock.patch.object(subp, "subp")
)
-
- cc_ca_certs.remove_default_ca_certs(distro_name, conf)
-
- mock_delete.assert_has_calls(
- [
- mock.call(conf["ca_cert_path"]),
- mock.call(conf["ca_cert_system_path"]),
- ]
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, "write_file")
)
- if conf["ca_cert_config"] is not None:
+ cc_ca_certs.disable_default_ca_certs(distro_name, conf)
+
+ if distro_name == "rhel":
+ mock_delete.assert_has_calls(
+ [
+ mock.call(conf["ca_cert_path"]),
+ mock.call(conf["ca_cert_local_path"]),
+ ]
+ )
+ self.assertEqual([], mock_subp.call_args_list)
+ elif distro_name in ["alpine", "debian", "ubuntu"]:
+ mock_load.assert_called_once_with(conf["ca_cert_config"])
mock_write.assert_called_once_with(
- conf["ca_cert_config"], "", mode=0o644
+ conf["ca_cert_config"], expected, omode="wb"
)
- if distro_name in ["debian", "ubuntu"]:
- mock_subp.assert_called_once_with(
- ("debconf-set-selections", "-"),
- "ca-certificates ca-certificates/trust_new_crts"
- " select no",
- )
+ if distro_name in ["debian", "ubuntu"]:
+ mock_subp.assert_called_once_with(
+ ("debconf-set-selections", "-"),
+ "ca-certificates ca-certificates/trust_new_crts"
+ " select no",
+ )
+ else:
+ assert mock_subp.call_count == 0
class TestCACertsSchema: