summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2017-08-30 21:18:05 -0400
committergit-ubuntu importer <ubuntu-devel-discuss@lists.ubuntu.com>2017-08-31 01:23:11 +0000
commit291d42c3f8f396a16bc129e3e3b7ae262090a86e (patch)
tree36417030288770586ed936ac30b792f0dc202d69 /tests
parent622629b8856284b75d66b2fd5a30976cd8464479 (diff)
downloadcloud-init-git-291d42c3f8f396a16bc129e3e3b7ae262090a86e.tar.gz
0.7.9-259-g7e76c57b-0ubuntu1 (patches unapplied)
Imported using git-ubuntu import.
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/test_cli.py36
-rw-r--r--tests/unittests/test_datasource/test_ec2.py97
-rw-r--r--tests/unittests/test_distros/test_debian.py66
-rw-r--r--tests/unittests/test_distros/test_generic.py16
-rw-r--r--tests/unittests/test_distros/test_opensuse.py12
-rw-r--r--tests/unittests/test_distros/test_sles.py12
-rw-r--r--tests/unittests/test_handler/test_handler_debug.py11
-rw-r--r--tests/unittests/test_handler/test_handler_landscape.py129
-rw-r--r--tests/unittests/test_handler/test_handler_locale.py58
-rw-r--r--tests/unittests/test_handler/test_handler_puppet.py142
-rw-r--r--tests/unittests/test_handler/test_handler_runcmd.py108
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py5
-rw-r--r--tests/unittests/test_handler/test_schema.py157
-rw-r--r--tests/unittests/test_log.py58
-rw-r--r--tests/unittests/test_util.py13
15 files changed, 874 insertions, 46 deletions
diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py
index 7780f164..12f01852 100644
--- a/tests/unittests/test_cli.py
+++ b/tests/unittests/test_cli.py
@@ -46,7 +46,7 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
self._call_main()
error = self.stderr.getvalue()
expected_subcommands = ['analyze', 'init', 'modules', 'single',
- 'dhclient-hook', 'features']
+ 'dhclient-hook', 'features', 'devel']
for subcommand in expected_subcommands:
self.assertIn(subcommand, error)
@@ -70,6 +70,21 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
self.assertEqual('modules', parseargs.action[0])
self.assertEqual('main_modules', parseargs.action[1].__name__)
+ def test_conditional_subcommands_from_entry_point_sys_argv(self):
+ """Subcommands from entry-point are properly parsed from sys.argv."""
+ expected_errors = [
+ 'usage: cloud-init analyze', 'usage: cloud-init devel']
+ conditional_subcommands = ['analyze', 'devel']
+ # The cloud-init entrypoint calls main without passing sys_argv
+ for subcommand in conditional_subcommands:
+ with mock.patch('sys.argv', ['cloud-init', subcommand]):
+ try:
+ cli.main()
+ except SystemExit as e:
+ self.assertEqual(2, e.code) # exit 2 on proper usage docs
+ for error_message in expected_errors:
+ self.assertIn(error_message, self.stderr.getvalue())
+
def test_analyze_subcommand_parser(self):
"""The subcommand cloud-init analyze calls the correct subparser."""
self._call_main(['cloud-init', 'analyze'])
@@ -79,6 +94,25 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
for subcommand in expected_subcommands:
self.assertIn(subcommand, error)
+ def test_devel_subcommand_parser(self):
+ """The subcommand cloud-init devel calls the correct subparser."""
+ self._call_main(['cloud-init', 'devel'])
+ # These subcommands only valid for cloud-init schema script
+ expected_subcommands = ['schema']
+ error = self.stderr.getvalue()
+ for subcommand in expected_subcommands:
+ self.assertIn(subcommand, error)
+
+ @mock.patch('cloudinit.config.schema.handle_schema_args')
+ def test_wb_devel_schema_subcommand_parser(self, m_schema):
+ """The subcommand cloud-init schema calls the correct subparser."""
+ exit_code = self._call_main(['cloud-init', 'devel', 'schema'])
+ self.assertEqual(1, exit_code)
+ # Known whitebox output from schema subcommand
+ self.assertEqual(
+ 'Expected either --config-file argument or --doc\n',
+ self.stderr.getvalue())
+
@mock.patch('cloudinit.cmd.main.main_single')
def test_single_subcommand(self, m_main_single):
"""The subcommand 'single' calls main_single with valid args."""
diff --git a/tests/unittests/test_datasource/test_ec2.py b/tests/unittests/test_datasource/test_ec2.py
index 33d02619..e1ce6446 100644
--- a/tests/unittests/test_datasource/test_ec2.py
+++ b/tests/unittests/test_datasource/test_ec2.py
@@ -1,5 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import copy
import httpretty
import mock
@@ -195,6 +196,34 @@ class TestEc2(test_helpers.HttprettyTestCase):
return ds
@httpretty.activate
+ def test_network_config_property_returns_version_1_network_data(self):
+ """network_config property returns network version 1 for metadata."""
+ ds = self._setup_ds(
+ platform_data=self.valid_platform_data,
+ sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
+ md=DEFAULT_METADATA)
+ ds.get_data()
+ mac1 = '06:17:04:d7:26:09' # Defined in DEFAULT_METADATA
+ expected = {'version': 1, 'config': [
+ {'mac_address': '06:17:04:d7:26:09', 'name': 'eth9',
+ 'subnets': [{'type': 'dhcp4'}, {'type': 'dhcp6'}],
+ 'type': 'physical'}]}
+ patch_path = (
+ 'cloudinit.sources.DataSourceEc2.net.get_interfaces_by_mac')
+ with mock.patch(patch_path) as m_get_interfaces_by_mac:
+ m_get_interfaces_by_mac.return_value = {mac1: 'eth9'}
+ self.assertEqual(expected, ds.network_config)
+
+ def test_network_config_property_is_cached_in_datasource(self):
+ """network_config property is cached in DataSourceEc2."""
+ ds = self._setup_ds(
+ platform_data=self.valid_platform_data,
+ sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
+ md=DEFAULT_METADATA)
+ ds._network_config = {'cached': 'data'}
+ self.assertEqual({'cached': 'data'}, ds.network_config)
+
+ @httpretty.activate
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
def test_valid_platform_with_strict_true(self, m_dhcp):
"""Valid platform data should return true with strict_id true."""
@@ -287,4 +316,72 @@ class TestEc2(test_helpers.HttprettyTestCase):
self.assertIn('Crawl of metadata service took', self.logs.getvalue())
+class TestConvertEc2MetadataNetworkConfig(test_helpers.CiTestCase):
+
+ def setUp(self):
+ super(TestConvertEc2MetadataNetworkConfig, self).setUp()
+ self.mac1 = '06:17:04:d7:26:09'
+ self.network_metadata = {
+ 'network': {'interfaces': {'macs': {
+ self.mac1: {'public-ipv4s': '172.31.2.16'}}}}}
+
+ def test_convert_ec2_metadata_network_config_skips_absent_macs(self):
+ """Any mac absent from metadata is skipped by network config."""
+ macs_to_nics = {self.mac1: 'eth9', 'DE:AD:BE:EF:FF:FF': 'vitualnic2'}
+
+ # DE:AD:BE:EF:FF:FF represented by OS but not in metadata
+ expected = {'version': 1, 'config': [
+ {'mac_address': self.mac1, 'type': 'physical',
+ 'name': 'eth9', 'subnets': [{'type': 'dhcp4'}]}]}
+ self.assertEqual(
+ expected,
+ ec2.convert_ec2_metadata_network_config(
+ self.network_metadata, macs_to_nics))
+
+ def test_convert_ec2_metadata_network_config_handles_only_dhcp6(self):
+ """Config dhcp6 when ipv6s is in metadata for a mac."""
+ macs_to_nics = {self.mac1: 'eth9'}
+ network_metadata_ipv6 = copy.deepcopy(self.network_metadata)
+ nic1_metadata = (
+ network_metadata_ipv6['network']['interfaces']['macs'][self.mac1])
+ nic1_metadata['ipv6s'] = '2620:0:1009:fd00:e442:c88d:c04d:dc85/64'
+ nic1_metadata.pop('public-ipv4s')
+ expected = {'version': 1, 'config': [
+ {'mac_address': self.mac1, 'type': 'physical',
+ 'name': 'eth9', 'subnets': [{'type': 'dhcp6'}]}]}
+ self.assertEqual(
+ expected,
+ ec2.convert_ec2_metadata_network_config(
+ network_metadata_ipv6, macs_to_nics))
+
+ def test_convert_ec2_metadata_network_config_handles_dhcp4_and_dhcp6(self):
+ """Config both dhcp4 and dhcp6 when both vpc-ipv6 and ipv4 exists."""
+ macs_to_nics = {self.mac1: 'eth9'}
+ network_metadata_both = copy.deepcopy(self.network_metadata)
+ nic1_metadata = (
+ network_metadata_both['network']['interfaces']['macs'][self.mac1])
+ nic1_metadata['ipv6s'] = '2620:0:1009:fd00:e442:c88d:c04d:dc85/64'
+ expected = {'version': 1, 'config': [
+ {'mac_address': self.mac1, 'type': 'physical',
+ 'name': 'eth9',
+ 'subnets': [{'type': 'dhcp4'}, {'type': 'dhcp6'}]}]}
+ self.assertEqual(
+ expected,
+ ec2.convert_ec2_metadata_network_config(
+ network_metadata_both, macs_to_nics))
+
+ def test_convert_ec2_metadata_gets_macs_from_get_interfaces_by_mac(self):
+ """Convert Ec2 Metadata calls get_interfaces_by_mac by default."""
+ expected = {'version': 1, 'config': [
+ {'mac_address': self.mac1, 'type': 'physical',
+ 'name': 'eth9',
+ 'subnets': [{'type': 'dhcp4'}]}]}
+ patch_path = (
+ 'cloudinit.sources.DataSourceEc2.net.get_interfaces_by_mac')
+ with mock.patch(patch_path) as m_get_interfaces_by_mac:
+ m_get_interfaces_by_mac.return_value = {self.mac1: 'eth9'}
+ self.assertEqual(
+ expected,
+ ec2.convert_ec2_metadata_network_config(self.network_metadata))
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_distros/test_debian.py b/tests/unittests/test_distros/test_debian.py
index 2330ad52..72d3aad6 100644
--- a/tests/unittests/test_distros/test_debian.py
+++ b/tests/unittests/test_distros/test_debian.py
@@ -1,67 +1,85 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from ..helpers import (CiTestCase, mock)
-
-from cloudinit.distros.debian import apply_locale
+from cloudinit import distros
from cloudinit import util
+from ..helpers import (FilesystemMockingTestCase, mock)
@mock.patch("cloudinit.distros.debian.util.subp")
-class TestDebianApplyLocale(CiTestCase):
+class TestDebianApplyLocale(FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestDebianApplyLocale, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.patchOS(self.new_root)
+ self.patchUtils(self.new_root)
+ self.spath = self.tmp_path('etc/default/locale', self.new_root)
+ cls = distros.fetch("debian")
+ self.distro = cls("debian", {}, None)
+
def test_no_rerun(self, m_subp):
"""If system has defined locale, no re-run is expected."""
- spath = self.tmp_path("default-locale")
m_subp.return_value = (None, None)
locale = 'en_US.UTF-8'
- util.write_file(spath, 'LANG=%s\n' % locale, omode="w")
- apply_locale(locale, sys_path=spath)
+ util.write_file(self.spath, 'LANG=%s\n' % locale, omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
m_subp.assert_not_called()
+ def test_no_regen_on_c_utf8(self, m_subp):
+ """If locale is set to C.UTF8, do not attempt to call locale-gen"""
+ m_subp.return_value = (None, None)
+ locale = 'C.UTF-8'
+ util.write_file(self.spath, 'LANG=%s\n' % 'en_US.UTF-8', omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
+ self.assertEqual(
+ [['update-locale', '--locale-file=' + self.spath,
+ 'LANG=%s' % locale]],
+ [p[0][0] for p in m_subp.call_args_list])
+
def test_rerun_if_different(self, m_subp):
"""If system has different locale, locale-gen should be called."""
- spath = self.tmp_path("default-locale")
m_subp.return_value = (None, None)
locale = 'en_US.UTF-8'
- util.write_file(spath, 'LANG=fr_FR.UTF-8', omode="w")
- apply_locale(locale, sys_path=spath)
+ util.write_file(self.spath, 'LANG=fr_FR.UTF-8', omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
self.assertEqual(
[['locale-gen', locale],
- ['update-locale', '--locale-file=' + spath, 'LANG=%s' % locale]],
+ ['update-locale', '--locale-file=' + self.spath,
+ 'LANG=%s' % locale]],
[p[0][0] for p in m_subp.call_args_list])
def test_rerun_if_no_file(self, m_subp):
"""If system has no locale file, locale-gen should be called."""
- spath = self.tmp_path("default-locale")
m_subp.return_value = (None, None)
locale = 'en_US.UTF-8'
- apply_locale(locale, sys_path=spath)
+ self.distro.apply_locale(locale, out_fn=self.spath)
self.assertEqual(
[['locale-gen', locale],
- ['update-locale', '--locale-file=' + spath, 'LANG=%s' % locale]],
+ ['update-locale', '--locale-file=' + self.spath,
+ 'LANG=%s' % locale]],
[p[0][0] for p in m_subp.call_args_list])
def test_rerun_on_unset_system_locale(self, m_subp):
"""If system has unset locale, locale-gen should be called."""
m_subp.return_value = (None, None)
- spath = self.tmp_path("default-locale")
locale = 'en_US.UTF-8'
- util.write_file(spath, 'LANG=', omode="w")
- apply_locale(locale, sys_path=spath)
+ util.write_file(self.spath, 'LANG=', omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath)
self.assertEqual(
[['locale-gen', locale],
- ['update-locale', '--locale-file=' + spath, 'LANG=%s' % locale]],
+ ['update-locale', '--locale-file=' + self.spath,
+ 'LANG=%s' % locale]],
[p[0][0] for p in m_subp.call_args_list])
def test_rerun_on_mismatched_keys(self, m_subp):
"""If key is LC_ALL and system has only LANG, rerun is expected."""
m_subp.return_value = (None, None)
- spath = self.tmp_path("default-locale")
locale = 'en_US.UTF-8'
- util.write_file(spath, 'LANG=', omode="w")
- apply_locale(locale, sys_path=spath, keyname='LC_ALL')
+ util.write_file(self.spath, 'LANG=', omode="w")
+ self.distro.apply_locale(locale, out_fn=self.spath, keyname='LC_ALL')
self.assertEqual(
[['locale-gen', locale],
- ['update-locale', '--locale-file=' + spath,
+ ['update-locale', '--locale-file=' + self.spath,
'LC_ALL=%s' % locale]],
[p[0][0] for p in m_subp.call_args_list])
@@ -69,14 +87,14 @@ class TestDebianApplyLocale(CiTestCase):
"""locale as None or "" is invalid and should raise ValueError."""
with self.assertRaises(ValueError) as ctext_m:
- apply_locale(None)
+ self.distro.apply_locale(None)
m_subp.assert_not_called()
self.assertEqual(
'Failed to provide locale value.', str(ctext_m.exception))
with self.assertRaises(ValueError) as ctext_m:
- apply_locale("")
+ self.distro.apply_locale("")
m_subp.assert_not_called()
self.assertEqual(
'Failed to provide locale value.', str(ctext_m.exception))
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
index c9be277e..b355a19e 100644
--- a/tests/unittests/test_distros/test_generic.py
+++ b/tests/unittests/test_distros/test_generic.py
@@ -228,5 +228,21 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
os.symlink('/', '/run/systemd/system')
self.assertFalse(d.uses_systemd())
+ @mock.patch('cloudinit.distros.debian.read_system_locale')
+ def test_get_locale_ubuntu(self, m_locale):
+ """Test ubuntu distro returns locale set to C.UTF-8"""
+ m_locale.return_value = 'C.UTF-8'
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ locale = d.get_locale()
+ self.assertEqual('C.UTF-8', locale)
+
+ def test_get_locale_rhel(self):
+ """Test rhel distro returns NotImplementedError exception"""
+ cls = distros.fetch("rhel")
+ d = cls("rhel", {}, None)
+ with self.assertRaises(NotImplementedError):
+ d.get_locale()
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_distros/test_opensuse.py b/tests/unittests/test_distros/test_opensuse.py
new file mode 100644
index 00000000..bdb1d633
--- /dev/null
+++ b/tests/unittests/test_distros/test_opensuse.py
@@ -0,0 +1,12 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from ..helpers import CiTestCase
+
+from . import _get_distro
+
+
+class TestopenSUSE(CiTestCase):
+
+ def test_get_distro(self):
+ distro = _get_distro("opensuse")
+ self.assertEqual(distro.osfamily, 'suse')
diff --git a/tests/unittests/test_distros/test_sles.py b/tests/unittests/test_distros/test_sles.py
new file mode 100644
index 00000000..c656aacc
--- /dev/null
+++ b/tests/unittests/test_distros/test_sles.py
@@ -0,0 +1,12 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from ..helpers import CiTestCase
+
+from . import _get_distro
+
+
+class TestSLES(CiTestCase):
+
+ def test_get_distro(self):
+ distro = _get_distro("sles")
+ self.assertEqual(distro.osfamily, 'suse')
diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py
index 929f786e..1873c3e1 100644
--- a/tests/unittests/test_handler/test_handler_debug.py
+++ b/tests/unittests/test_handler/test_handler_debug.py
@@ -11,7 +11,7 @@ from cloudinit import util
from cloudinit.sources import DataSourceNone
-from .. import helpers as t_help
+from ..helpers import (FilesystemMockingTestCase, mock)
import logging
import shutil
@@ -20,7 +20,8 @@ import tempfile
LOG = logging.getLogger(__name__)
-class TestDebug(t_help.FilesystemMockingTestCase):
+@mock.patch('cloudinit.distros.debian.read_system_locale')
+class TestDebug(FilesystemMockingTestCase):
def setUp(self):
super(TestDebug, self).setUp()
self.new_root = tempfile.mkdtemp()
@@ -36,7 +37,8 @@ class TestDebug(t_help.FilesystemMockingTestCase):
ds.metadata.update(metadata)
return cloud.Cloud(ds, paths, {}, d, None)
- def test_debug_write(self):
+ def test_debug_write(self, m_locale):
+ m_locale.return_value = 'en_US.UTF-8'
cfg = {
'abc': '123',
'c': u'\u20a0',
@@ -54,7 +56,8 @@ class TestDebug(t_help.FilesystemMockingTestCase):
for k in cfg.keys():
self.assertIn(k, contents)
- def test_debug_no_write(self):
+ def test_debug_no_write(self, m_locale):
+ m_locale.return_value = 'en_US.UTF-8'
cfg = {
'abc': '123',
'debug': {
diff --git a/tests/unittests/test_handler/test_handler_landscape.py b/tests/unittests/test_handler/test_handler_landscape.py
new file mode 100644
index 00000000..7c247fa9
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_landscape.py
@@ -0,0 +1,129 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_landscape
+from cloudinit.sources import DataSourceNone
+from cloudinit import (distros, helpers, cloud, util)
+from ..helpers import FilesystemMockingTestCase, mock, wrap_and_call
+
+from configobj import ConfigObj
+import logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class TestLandscape(FilesystemMockingTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestLandscape, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.conf = self.tmp_path('client.conf', self.new_root)
+ self.default_file = self.tmp_path('default_landscape', self.new_root)
+
+ def _get_cloud(self, distro):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({'templates_dir': self.new_root})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def test_handler_skips_empty_landscape_cloudconfig(self):
+ """Empty landscape cloud-config section does no work."""
+ mycloud = self._get_cloud('ubuntu')
+ mycloud.distro = mock.MagicMock()
+ cfg = {'landscape': {}}
+ cc_landscape.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertFalse(mycloud.distro.install_packages.called)
+
+ def test_handler_error_on_invalid_landscape_type(self):
+ """Raise an error when landscape configuraiton option is invalid."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': 'wrongtype'}
+ with self.assertRaises(RuntimeError) as context_manager:
+ cc_landscape.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertIn(
+ "'landscape' key existed in config, but not a dict",
+ str(context_manager.exception))
+
+ @mock.patch('cloudinit.config.cc_landscape.util')
+ def test_handler_restarts_landscape_client(self, m_util):
+ """handler restarts lansdscape-client after install."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': {'client': {}}}
+ wrap_and_call(
+ 'cloudinit.config.cc_landscape',
+ {'LSC_CLIENT_CFG_FILE': {'new': self.conf}},
+ cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call(['service', 'landscape-client', 'restart'])],
+ m_util.subp.call_args_list)
+
+ def test_handler_installs_client_and_creates_config_file(self):
+ """Write landscape client.conf and install landscape-client."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': {'client': {}}}
+ expected = {'client': {
+ 'log_level': 'info',
+ 'url': 'https://landscape.canonical.com/message-system',
+ 'ping_url': 'http://landscape.canonical.com/ping',
+ 'data_path': '/var/lib/landscape/client'}}
+ mycloud.distro = mock.MagicMock()
+ wrap_and_call(
+ 'cloudinit.config.cc_landscape',
+ {'LSC_CLIENT_CFG_FILE': {'new': self.conf},
+ 'LS_DEFAULT_FILE': {'new': self.default_file}},
+ cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call('landscape-client')],
+ mycloud.distro.install_packages.call_args)
+ self.assertEqual(expected, dict(ConfigObj(self.conf)))
+ self.assertIn(
+ 'Wrote landscape config file to {0}'.format(self.conf),
+ self.logs.getvalue())
+ default_content = util.load_file(self.default_file)
+ self.assertEqual('RUN=1\n', default_content)
+
+ def test_handler_writes_merged_client_config_file_with_defaults(self):
+ """Merge and write options from LSC_CLIENT_CFG_FILE with defaults."""
+ # Write existing sparse client.conf file
+ util.write_file(self.conf, '[client]\ncomputer_title = My PC\n')
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': {'client': {}}}
+ expected = {'client': {
+ 'log_level': 'info',
+ 'url': 'https://landscape.canonical.com/message-system',
+ 'ping_url': 'http://landscape.canonical.com/ping',
+ 'data_path': '/var/lib/landscape/client',
+ 'computer_title': 'My PC'}}
+ wrap_and_call(
+ 'cloudinit.config.cc_landscape',
+ {'LSC_CLIENT_CFG_FILE': {'new': self.conf}},
+ cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(expected, dict(ConfigObj(self.conf)))
+ self.assertIn(
+ 'Wrote landscape config file to {0}'.format(self.conf),
+ self.logs.getvalue())
+
+ def test_handler_writes_merged_provided_cloudconfig_with_defaults(self):
+ """Merge and write options from cloud-config options with defaults."""
+ # Write empty sparse client.conf file
+ util.write_file(self.conf, '')
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': {'client': {'computer_title': 'My PC'}}}
+ expected = {'client': {
+ 'log_level': 'info',
+ 'url': 'https://landscape.canonical.com/message-system',
+ 'ping_url': 'http://landscape.canonical.com/ping',
+ 'data_path': '/var/lib/landscape/client',
+ 'computer_title': 'My PC'}}
+ wrap_and_call(
+ 'cloudinit.config.cc_landscape',
+ {'LSC_CLIENT_CFG_FILE': {'new': self.conf}},
+ cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(expected, dict(ConfigObj(self.conf)))
+ self.assertIn(
+ 'Wrote landscape config file to {0}'.format(self.conf),
+ self.logs.getvalue())
diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py
index e9a810c5..a789db32 100644
--- a/tests/unittests/test_handler/test_handler_locale.py
+++ b/tests/unittests/test_handler/test_handler_locale.py
@@ -20,6 +20,8 @@ from configobj import ConfigObj
from six import BytesIO
import logging
+import mock
+import os
import shutil
import tempfile
@@ -27,6 +29,9 @@ LOG = logging.getLogger(__name__)
class TestLocale(t_help.FilesystemMockingTestCase):
+
+ with_logs = True
+
def setUp(self):
super(TestLocale, self).setUp()
self.new_root = tempfile.mkdtemp()
@@ -49,9 +54,58 @@ class TestLocale(t_help.FilesystemMockingTestCase):
}
cc = self._get_cloud('sles')
cc_locale.handle('cc_locale', cfg, cc, LOG, [])
+ if cc.distro.uses_systemd():
+ locale_conf = cc.distro.systemd_locale_conf_fn
+ else:
+ locale_conf = cc.distro.locale_conf_fn
+ contents = util.load_file(locale_conf, decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
+ if cc.distro.uses_systemd():
+ self.assertEqual({'LANG': cfg['locale']}, dict(n_cfg))
+ else:
+ self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg))
+
+ def test_set_locale_sles_default(self):
+ cfg = {}
+ cc = self._get_cloud('sles')
+ cc_locale.handle('cc_locale', cfg, cc, LOG, [])
- contents = util.load_file('/etc/sysconfig/language', decode=False)
+ if cc.distro.uses_systemd():
+ locale_conf = cc.distro.systemd_locale_conf_fn
+ keyname = 'LANG'
+ else:
+ locale_conf = cc.distro.locale_conf_fn
+ keyname = 'RC_LANG'
+
+ contents = util.load_file(locale_conf, decode=False)
n_cfg = ConfigObj(BytesIO(contents))
- self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg))
+ self.assertEqual({keyname: 'en_US.UTF-8'}, dict(n_cfg))
+
+ def test_locale_update_config_if_different_than_default(self):
+ """Test cc_locale writes updates conf if different than default"""
+ locale_conf = os.path.join(self.new_root, "etc/default/locale")
+ util.write_file(locale_conf, 'LANG="en_US.UTF-8"\n')
+ cfg = {'locale': 'C.UTF-8'}
+ cc = self._get_cloud('ubuntu')
+ with mock.patch('cloudinit.distros.debian.util.subp') as m_subp:
+ with mock.patch('cloudinit.distros.debian.LOCALE_CONF_FN',
+ locale_conf):
+ cc_locale.handle('cc_locale', cfg, cc, LOG, [])
+ m_subp.assert_called_with(['update-locale',
+ '--locale-file=%s' % locale_conf,
+ 'LANG=C.UTF-8'], capture=False)
+
+ def test_locale_rhel_defaults_en_us_utf8(self):
+ """Test cc_locale gets en_US.UTF-8 from distro get_locale fallback"""
+ cfg = {}
+ cc = self._get_cloud('rhel')
+ update_sysconfig = 'cloudinit.distros.rhel_util.update_sysconfig_file'
+ with mock.patch.object(cc.distro, 'uses_systemd') as m_use_sd:
+ m_use_sd.return_value = True
+ with mock.patch(update_sysconfig) as m_update_syscfg:
+ cc_locale.handle('cc_locale', cfg, cc, LOG, [])
+ m_update_syscfg.assert_called_with('/etc/locale.conf',
+ {'LANG': 'en_US.UTF-8'})
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_puppet.py b/tests/unittests/test_handler/test_handler_puppet.py
new file mode 100644
index 00000000..805c76ba
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_puppet.py
@@ -0,0 +1,142 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_puppet
+from cloudinit.sources import DataSourceNone
+from cloudinit import (distros, helpers, cloud, util)
+from ..helpers import CiTestCase, mock
+
+import logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+@mock.patch('cloudinit.config.cc_puppet.util')
+@mock.patch('cloudinit.config.cc_puppet.os')
+class TestAutostartPuppet(CiTestCase):
+
+ with_logs = True
+
+ def test_wb_autostart_puppet_updates_puppet_default(self, m_os, m_util):
+ """Update /etc/default/puppet to autostart if it exists."""
+
+ def _fake_exists(path):
+ return path == '/etc/default/puppet'
+
+ m_os.path.exists.side_effect = _fake_exists
+ cc_puppet._autostart_puppet(LOG)
+ self.assertEqual(
+ [mock.call(['sed', '-i', '-e', 's/^START=.*/START=yes/',
+ '/etc/default/puppet'], capture=False)],
+ m_util.subp.call_args_list)
+
+ def test_wb_autostart_pupppet_enables_puppet_systemctl(self, m_os, m_util):
+ """If systemctl is present, enable puppet via systemctl."""
+
+ def _fake_exists(path):
+ return path == '/bin/systemctl'
+
+ m_os.path.exists.side_effect = _fake_exists
+ cc_puppet._autostart_puppet(LOG)
+ expected_calls = [mock.call(
+ ['/bin/systemctl', 'enable', 'puppet.service'], capture=False)]
+ self.assertEqual(expected_calls, m_util.subp.call_args_list)
+
+ def test_wb_autostart_pupppet_enables_puppet_chkconfig(self, m_os, m_util):
+ """If chkconfig is present, enable puppet via checkcfg."""
+
+ def _fake_exists(path):
+ return path == '/sbin/chkconfig'
+
+ m_os.path.exists.side_effect = _fake_exists
+ cc_puppet._autostart_puppet(LOG)
+ expected_calls = [mock.call(
+ ['/sbin/chkconfig', 'puppet', 'on'], capture=False)]
+ self.assertEqual(expected_calls, m_util.subp.call_args_list)
+
+
+@mock.patch('cloudinit.config.cc_puppet._autostart_puppet')
+class TestPuppetHandle(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestPuppetHandle, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.conf = self.tmp_path('puppet.conf')
+
+ def _get_cloud(self, distro):
+ paths = helpers.Paths({'templates_dir': self.new_root})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def test_handler_skips_missing_puppet_key_in_cloudconfig(self, m_auto):
+ """Cloud-config containing no 'puppet' key is skipped."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertIn(
+ "no 'puppet' configuration found", self.logs.getvalue())
+ self.assertEqual(0, m_auto.call_count)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_puppet_config_starts_puppet_service(self, m_subp, m_auto):
+ """Cloud-config 'puppet' configuration starts puppet."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'puppet': {'install': False}}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(1, m_auto.call_count)
+ self.assertEqual(
+ [mock.call(['service', 'puppet', 'start'], capture=False)],
+ m_subp.call_args_list)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_empty_puppet_config_installs_puppet(self, m_subp, m_auto):
+ """Cloud-config empty 'puppet' configuration installs latest puppet."""
+ mycloud = self._get_cloud('ubuntu')
+ mycloud.distro = mock.MagicMock()
+ cfg = {'puppet': {}}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call(('puppet', None))],
+ mycloud.distro.install_packages.call_args_list)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_puppet_config_installs_puppet_on_true(self, m_subp, _):
+ """Cloud-config with 'puppet' key installs when 'install' is True."""
+ mycloud = self._get_cloud('ubuntu')
+ mycloud.distro = mock.MagicMock()
+ cfg = {'puppet': {'install': True}}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call(('puppet', None))],
+ mycloud.distro.install_packages.call_args_list)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_puppet_config_installs_puppet_version(self, m_subp, _):
+ """Cloud-config 'puppet' configuration can specify a version."""
+ mycloud = self._get_cloud('ubuntu')
+ mycloud.distro = mock.MagicMock()
+ cfg = {'puppet': {'version': '3.8'}}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call(('puppet', '3.8'))],
+ mycloud.distro.install_packages.call_args_list)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_puppet_config_updates_puppet_conf(self, m_subp, m_auto):
+ """When 'conf' is provided update values in PUPPET_CONF_PATH."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {
+ 'puppet': {
+ 'conf': {'agent': {'server': 'puppetmaster.example.org'}}}}
+ util.write_file(self.conf, '[agent]\nserver = origpuppet\nother = 3')
+ puppet_conf_path = 'cloudinit.config.cc_puppet.PUPPET_CONF_PATH'
+ mycloud.distro = mock.MagicMock()
+ with mock.patch(puppet_conf_path, self.conf):
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ content = util.load_file(self.conf)
+ expected = '[agent]\nserver = puppetmaster.example.org\nother = 3\n\n'
+ self.assertEqual(expected, content)
diff --git a/tests/unittests/test_handler/test_handler_runcmd.py b/tests/unittests/test_handler/test_handler_runcmd.py
new file mode 100644
index 00000000..7880ee72
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_runcmd.py
@@ -0,0 +1,108 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_runcmd
+from cloudinit.sources import DataSourceNone
+from cloudinit import (distros, helpers, cloud, util)
+from ..helpers import FilesystemMockingTestCase, skipIf
+
+import logging
+import os
+import stat
+
+try:
+ import jsonschema
+ assert jsonschema # avoid pyflakes error F401: import unused
+ _missing_jsonschema_dep = False
+except ImportError:
+ _missing_jsonschema_dep = True
+
+LOG = logging.getLogger(__name__)
+
+
+class TestRuncmd(FilesystemMockingTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestRuncmd, self).setUp()
+ self.subp = util.subp
+ self.new_root = self.tmp_dir()
+
+ def _get_cloud(self, distro):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({'scripts': self.new_root})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ paths.datasource = myds
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def test_handler_skip_if_no_runcmd(self):
+ """When the provided config doesn't contain runcmd, skip it."""
+ cfg = {}
+ mycloud = self._get_cloud('ubuntu')
+ cc_runcmd.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertIn(
+ "Skipping module named notimportant, no 'runcmd' key",
+ self.logs.getvalue())
+
+ def test_handler_invalid_command_set(self):
+ """Commands which can't be converted to shell will raise errors."""
+ invalid_config = {'runcmd': 1}
+ cc = self._get_cloud('ubuntu')
+ cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, [])
+ self.assertIn(
+ 'Failed to shellify 1 into file'
+ ' /var/lib/cloud/instances/iid-datasource-none/scripts/runcmd',
+ self.logs.getvalue())
+
+ @skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency")
+ def test_handler_schema_validation_warns_non_array_type(self):
+ """Schema validation warns of non-array type for runcmd key.
+
+ Schema validation is not strict, so runcmd attempts to shellify the
+ invalid content.
+ """
+ invalid_config = {'runcmd': 1}
+ cc = self._get_cloud('ubuntu')
+ cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, [])
+ self.assertIn(
+ 'Invalid config:\nruncmd: 1 is not of type \'array\'',
+ self.logs.getvalue())
+ self.assertIn('Failed to shellify', self.logs.getvalue())
+
+ @skipIf(_missing_jsonschema_dep, 'No python-jsonschema dependency')
+ def test_handler_schema_validation_warns_non_array_item_type(self):
+ """Schema validation warns of non-array or string runcmd items.
+
+ Schema validation is not strict, so runcmd attempts to shellify the
+ invalid content.
+ """
+ invalid_config = {
+ 'runcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]}
+ cc = self._get_cloud('ubuntu')
+ cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, [])
+ expected_warnings = [
+ 'runcmd.1: 20 is not valid under any of the given schemas',
+ 'runcmd.3: {\'a\': \'n\'} is not valid under any of the given'
+ ' schema'
+ ]
+ logs = self.logs.getvalue()
+ for warning in expected_warnings:
+ self.assertIn(warning, logs)
+ self.assertIn('Failed to shellify', logs)
+
+ def test_handler_write_valid_runcmd_schema_to_file(self):
+ """Valid runcmd schema is written to a runcmd shell script."""
+ valid_config = {'runcmd': [['ls', '/']]}
+ cc = self._get_cloud('ubuntu')
+ cc_runcmd.handle('cc_runcmd', valid_config, cc, LOG, [])
+ runcmd_file = os.path.join(
+ self.new_root,
+ 'var/lib/cloud/instances/iid-datasource-none/scripts/runcmd')
+ self.assertEqual("#!/bin/sh\n'ls' '/'\n", util.load_file(runcmd_file))
+ file_stat = os.stat(runcmd_file)
+ self.assertEqual(0o700, stat.S_IMODE(file_stat.st_mode))
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
index 4b18de75..8165bf9a 100644
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -70,7 +70,8 @@ class TestHostname(t_help.FilesystemMockingTestCase):
cc = cloud.Cloud(ds, paths, {}, distro, None)
self.patchUtils(self.tmp)
cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, [])
- contents = util.load_file("/etc/HOSTNAME")
- self.assertEqual('blah', contents.strip())
+ if not distro.uses_systemd():
+ contents = util.load_file(distro.hostname_conf_fn)
+ self.assertEqual('blah', contents.strip())
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py
index eda4802a..640f11d4 100644
--- a/tests/unittests/test_handler/test_schema.py
+++ b/tests/unittests/test_handler/test_schema.py
@@ -1,9 +1,9 @@
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.config.schema import (
- CLOUD_CONFIG_HEADER, SchemaValidationError, get_schema_doc,
- validate_cloudconfig_file, validate_cloudconfig_schema,
- main)
+ CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file,
+ get_schema_doc, get_schema, validate_cloudconfig_file,
+ validate_cloudconfig_schema, main)
from cloudinit.util import write_file
from ..helpers import CiTestCase, mock, skipIf
@@ -11,6 +11,7 @@ from ..helpers import CiTestCase, mock, skipIf
from copy import copy
from six import StringIO
from textwrap import dedent
+from yaml import safe_load
try:
import jsonschema
@@ -20,6 +21,29 @@ except ImportError:
_missing_jsonschema_dep = True
+class GetSchemaTest(CiTestCase):
+
+ def test_get_schema_coalesces_known_schema(self):
+ """Every cloudconfig module with schema is listed in allOf keyword."""
+ schema = get_schema()
+ self.assertItemsEqual(
+ ['cc_ntp', 'cc_runcmd'],
+ [subschema['id'] for subschema in schema['allOf']])
+ self.assertEqual('cloud-config-schema', schema['id'])
+ self.assertEqual(
+ 'http://json-schema.org/draft-04/schema#',
+ schema['$schema'])
+ # FULL_SCHEMA is updated by the get_schema call
+ from cloudinit.config.schema import FULL_SCHEMA
+ self.assertItemsEqual(['id', '$schema', 'allOf'], FULL_SCHEMA.keys())
+
+ def test_get_schema_returns_global_when_set(self):
+ """When FULL_SCHEMA global is already set, get_schema returns it."""
+ m_schema_path = 'cloudinit.config.schema.FULL_SCHEMA'
+ with mock.patch(m_schema_path, {'here': 'iam'}):
+ self.assertEqual({'here': 'iam'}, get_schema())
+
+
class SchemaValidationErrorTest(CiTestCase):
"""Test validate_cloudconfig_schema"""
@@ -151,11 +175,11 @@ class GetSchemaDocTest(CiTestCase):
full_schema.update(
{'properties': {
'prop1': {'type': 'array', 'description': 'prop-description',
- 'items': {'type': 'int'}}}})
+ 'items': {'type': 'integer'}}}})
self.assertEqual(
dedent("""
name
- ---
+ ----
**Summary:** title
description
@@ -167,27 +191,71 @@ class GetSchemaDocTest(CiTestCase):
**Supported distros:** debian, rhel
**Config schema**:
- **prop1:** (array of int) prop-description\n\n"""),
+ **prop1:** (array of integer) prop-description\n\n"""),
+ get_schema_doc(full_schema))
+
+ def test_get_schema_doc_handles_multiple_types(self):
+ """get_schema_doc delimits multiple property types with a '/'."""
+ full_schema = copy(self.required_schema)
+ full_schema.update(
+ {'properties': {
+ 'prop1': {'type': ['string', 'integer'],
+ 'description': 'prop-description'}}})
+ self.assertIn(
+ '**prop1:** (string/integer) prop-description',
+ get_schema_doc(full_schema))
+
+ def test_get_schema_doc_handles_nested_oneof_property_types(self):
+ """get_schema_doc describes array items oneOf declarations in type."""
+ full_schema = copy(self.required_schema)
+ full_schema.update(
+ {'properties': {
+ 'prop1': {'type': 'array',
+ 'items': {
+ 'oneOf': [{'type': 'string'},
+ {'type': 'integer'}]},
+ 'description': 'prop-description'}}})
+ self.assertIn(
+ '**prop1:** (array of (string)/(integer)) prop-description',
get_schema_doc(full_schema))
def test_get_schema_doc_returns_restructured_text_with_examples(self):
"""get_schema_doc returns indented examples when present in schema."""
full_schema = copy(self.required_schema)
full_schema.update(
- {'examples': {'ex1': [1, 2, 3]},
+ {'examples': [{'ex1': [1, 2, 3]}],
'properties': {
'prop1': {'type': 'array', 'description': 'prop-description',
- 'items': {'type': 'int'}}}})
+ 'items': {'type': 'integer'}}}})
self.assertIn(
dedent("""
**Config schema**:
- **prop1:** (array of int) prop-description
+ **prop1:** (array of integer) prop-description
**Examples**::
ex1"""),
get_schema_doc(full_schema))
+ def test_get_schema_doc_handles_unstructured_examples(self):
+ """get_schema_doc properly indented examples which as just strings."""
+ full_schema = copy(self.required_schema)
+ full_schema.update(
+ {'examples': ['My example:\n [don\'t, expand, "this"]'],
+ 'properties': {
+ 'prop1': {'type': 'array', 'description': 'prop-description',
+ 'items': {'type': 'integer'}}}})
+ self.assertIn(
+ dedent("""
+ **Config schema**:
+ **prop1:** (array of integer) prop-description
+
+ **Examples**::
+
+ My example:
+ [don't, expand, "this"]"""),
+ get_schema_doc(full_schema))
+
def test_get_schema_doc_raises_key_errors(self):
"""get_schema_doc raises KeyErrors on missing keys."""
for key in self.required_schema:
@@ -198,13 +266,78 @@ class GetSchemaDocTest(CiTestCase):
self.assertIn(key, str(context_mgr.exception))
+class AnnotatedCloudconfigFileTest(CiTestCase):
+ maxDiff = None
+
+ def test_annotated_cloudconfig_file_no_schema_errors(self):
+ """With no schema_errors, print the original content."""
+ content = b'ntp:\n pools: [ntp1.pools.com]\n'
+ self.assertEqual(
+ content,
+ annotated_cloudconfig_file({}, content, schema_errors=[]))
+
+ def test_annotated_cloudconfig_file_schema_annotates_and_adds_footer(self):
+ """With schema_errors, error lines are annotated and a footer added."""
+ content = dedent("""\
+ #cloud-config
+ # comment
+ ntp:
+ pools: [-99, 75]
+ """).encode()
+ expected = dedent("""\
+ #cloud-config
+ # comment
+ ntp: # E1
+ pools: [-99, 75] # E2,E3
+
+ # Errors: -------------
+ # E1: Some type error
+ # E2: -99 is not a string
+ # E3: 75 is not a string
+
+ """)
+ parsed_config = safe_load(content[13:])
+ schema_errors = [
+ ('ntp', 'Some type error'), ('ntp.pools.0', '-99 is not a string'),
+ ('ntp.pools.1', '75 is not a string')]
+ self.assertEqual(
+ expected,
+ annotated_cloudconfig_file(parsed_config, content, schema_errors))
+
+ def test_annotated_cloudconfig_file_annotates_separate_line_items(self):
+ """Errors are annotated for lists with items on separate lines."""
+ content = dedent("""\
+ #cloud-config
+ # comment
+ ntp:
+ pools:
+ - -99
+ - 75
+ """).encode()
+ expected = dedent("""\
+ ntp:
+ pools:
+ - -99 # E1
+ - 75 # E2
+ """)
+ parsed_config = safe_load(content[13:])
+ schema_errors = [
+ ('ntp.pools.0', '-99 is not a string'),
+ ('ntp.pools.1', '75 is not a string')]
+ self.assertIn(
+ expected,
+ annotated_cloudconfig_file(parsed_config, content, schema_errors))
+
+
class MainTest(CiTestCase):
def test_main_missing_args(self):
"""Main exits non-zero and reports an error on missing parameters."""
with mock.patch('sys.argv', ['mycmd']):
with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- self.assertEqual(1, main(), 'Expected non-zero exit code')
+ with self.assertRaises(SystemExit) as context_manager:
+ main()
+ self.assertEqual('1', str(context_manager.exception))
self.assertEqual(
'Expected either --config-file argument or --doc\n',
m_stderr.getvalue())
@@ -216,13 +349,13 @@ class MainTest(CiTestCase):
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
self.assertEqual(0, main(), 'Expected 0 exit code')
self.assertIn('\nNTP\n---\n', m_stdout.getvalue())
+ self.assertIn('\nRuncmd\n------\n', m_stdout.getvalue())
def test_main_validates_config_file(self):
"""When --config-file parameter is provided, main validates schema."""
myyaml = self.tmp_path('my.yaml')
myargs = ['mycmd', '--config-file', myyaml]
- with open(myyaml, 'wb') as stream:
- stream.write(b'#cloud-config\nntp:') # shortest ntp schema
+ write_file(myyaml, b'#cloud-config\nntp:') # shortest ntp schema
with mock.patch('sys.argv', myargs):
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
self.assertEqual(0, main(), 'Expected 0 exit code')
diff --git a/tests/unittests/test_log.py b/tests/unittests/test_log.py
new file mode 100644
index 00000000..68fb4b8d
--- /dev/null
+++ b/tests/unittests/test_log.py
@@ -0,0 +1,58 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Tests for cloudinit.log """
+
+from .helpers import CiTestCase
+from cloudinit.analyze.dump import CLOUD_INIT_ASCTIME_FMT
+from cloudinit import log as ci_logging
+import datetime
+import logging
+import six
+import time
+
+
+class TestCloudInitLogger(CiTestCase):
+
+ def setUp(self):
+ # set up a logger like cloud-init does in setupLogging, but instead
+ # of sys.stderr, we'll plug in a StringIO() object so we can see
+ # what gets logged
+ logging.Formatter.converter = time.gmtime
+ self.ci_logs = six.StringIO()
+ self.ci_root = logging.getLogger()
+ console = logging.StreamHandler(self.ci_logs)
+ console.setFormatter(logging.Formatter(ci_logging.DEF_CON_FORMAT))
+ console.setLevel(ci_logging.DEBUG)
+ self.ci_root.addHandler(console)
+ self.ci_root.setLevel(ci_logging.DEBUG)
+ self.LOG = logging.getLogger('test_cloudinit_logger')
+
+ def test_logger_uses_gmtime(self):
+ """Test that log message have timestamp in UTC (gmtime)"""
+
+ # Log a message, extract the timestamp from the log entry
+ # convert to datetime, and compare to a utc timestamp before
+ # and after the logged message.
+
+ # Due to loss of precision in the LOG timestamp, subtract and add
+ # time to the utc stamps for comparison
+ #
+ # utc_before: 2017-08-23 14:19:42.569299
+ # parsed dt : 2017-08-23 14:19:43.069000
+ # utc_after : 2017-08-23 14:19:43.570064
+
+ utc_before = datetime.datetime.utcnow() - datetime.timedelta(0, 0.5)
+ self.LOG.error('Test message')
+ utc_after = datetime.datetime.utcnow() + datetime.timedelta(0, 0.5)
+
+ # extract timestamp from log:
+ # 2017-08-23 14:19:43,069 - test_log.py[ERROR]: Test message
+ logstr = self.ci_logs.getvalue().splitlines()[0]
+ timestampstr = logstr.split(' - ')[0]
+ parsed_dt = datetime.datetime.strptime(timestampstr,
+ CLOUD_INIT_ASCTIME_FMT)
+
+ self.assertLess(utc_before, parsed_dt)
+ self.assertLess(parsed_dt, utc_after)
+ self.assertLess(utc_before, utc_after)
+ self.assertGreater(utc_after, parsed_dt)
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index f38a664c..5f11c88f 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -568,7 +568,8 @@ class TestReadSeeded(helpers.TestCase):
self.assertEqual(found_ud, ud)
-class TestSubp(helpers.TestCase):
+class TestSubp(helpers.CiTestCase):
+ with_logs = True
stdin2err = [BASH, '-c', 'cat >&2']
stdin2out = ['cat']
@@ -650,6 +651,16 @@ class TestSubp(helpers.TestCase):
self.assertEqual(
['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())
+ def test_subp_warn_missing_shebang(self):
+ """Warn on no #! in script"""
+ noshebang = self.tmp_path('noshebang')
+ util.write_file(noshebang, 'true\n')
+
+ os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
+ self.assertRaisesRegexp(util.ProcessExecutionError,
+ 'Missing #! in script\?',
+ util.subp, (noshebang,))
+
def test_returns_none_if_no_capture(self):
(out, err) = util.subp(self.stdin2out, data=b'', capture=False)
self.assertIsNone(err)