diff options
author | Scott Moser <smoser@ubuntu.com> | 2017-08-30 21:18:05 -0400 |
---|---|---|
committer | git-ubuntu importer <ubuntu-devel-discuss@lists.ubuntu.com> | 2017-08-31 01:23:11 +0000 |
commit | 291d42c3f8f396a16bc129e3e3b7ae262090a86e (patch) | |
tree | 36417030288770586ed936ac30b792f0dc202d69 /tests | |
parent | 622629b8856284b75d66b2fd5a30976cd8464479 (diff) | |
download | cloud-init-git-291d42c3f8f396a16bc129e3e3b7ae262090a86e.tar.gz |
0.7.9-259-g7e76c57b-0ubuntu1 (patches unapplied)
Imported using git-ubuntu import.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/test_cli.py | 36 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_ec2.py | 97 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_debian.py | 66 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_generic.py | 16 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_opensuse.py | 12 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_sles.py | 12 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_debug.py | 11 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_landscape.py | 129 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_locale.py | 58 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_puppet.py | 142 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_runcmd.py | 108 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_set_hostname.py | 5 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_schema.py | 157 | ||||
-rw-r--r-- | tests/unittests/test_log.py | 58 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 13 |
15 files changed, 874 insertions, 46 deletions
diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py index 7780f164..12f01852 100644 --- a/tests/unittests/test_cli.py +++ b/tests/unittests/test_cli.py @@ -46,7 +46,7 @@ class TestCLI(test_helpers.FilesystemMockingTestCase): self._call_main() error = self.stderr.getvalue() expected_subcommands = ['analyze', 'init', 'modules', 'single', - 'dhclient-hook', 'features'] + 'dhclient-hook', 'features', 'devel'] for subcommand in expected_subcommands: self.assertIn(subcommand, error) @@ -70,6 +70,21 @@ class TestCLI(test_helpers.FilesystemMockingTestCase): self.assertEqual('modules', parseargs.action[0]) self.assertEqual('main_modules', parseargs.action[1].__name__) + def test_conditional_subcommands_from_entry_point_sys_argv(self): + """Subcommands from entry-point are properly parsed from sys.argv.""" + expected_errors = [ + 'usage: cloud-init analyze', 'usage: cloud-init devel'] + conditional_subcommands = ['analyze', 'devel'] + # The cloud-init entrypoint calls main without passing sys_argv + for subcommand in conditional_subcommands: + with mock.patch('sys.argv', ['cloud-init', subcommand]): + try: + cli.main() + except SystemExit as e: + self.assertEqual(2, e.code) # exit 2 on proper usage docs + for error_message in expected_errors: + self.assertIn(error_message, self.stderr.getvalue()) + def test_analyze_subcommand_parser(self): """The subcommand cloud-init analyze calls the correct subparser.""" self._call_main(['cloud-init', 'analyze']) @@ -79,6 +94,25 @@ class TestCLI(test_helpers.FilesystemMockingTestCase): for subcommand in expected_subcommands: self.assertIn(subcommand, error) + def test_devel_subcommand_parser(self): + """The subcommand cloud-init devel calls the correct subparser.""" + self._call_main(['cloud-init', 'devel']) + # These subcommands only valid for cloud-init schema script + expected_subcommands = ['schema'] + error = self.stderr.getvalue() + for subcommand in expected_subcommands: + self.assertIn(subcommand, error) + + @mock.patch('cloudinit.config.schema.handle_schema_args') + def test_wb_devel_schema_subcommand_parser(self, m_schema): + """The subcommand cloud-init schema calls the correct subparser.""" + exit_code = self._call_main(['cloud-init', 'devel', 'schema']) + self.assertEqual(1, exit_code) + # Known whitebox output from schema subcommand + self.assertEqual( + 'Expected either --config-file argument or --doc\n', + self.stderr.getvalue()) + @mock.patch('cloudinit.cmd.main.main_single') def test_single_subcommand(self, m_main_single): """The subcommand 'single' calls main_single with valid args.""" diff --git a/tests/unittests/test_datasource/test_ec2.py b/tests/unittests/test_datasource/test_ec2.py index 33d02619..e1ce6446 100644 --- a/tests/unittests/test_datasource/test_ec2.py +++ b/tests/unittests/test_datasource/test_ec2.py @@ -1,5 +1,6 @@ # This file is part of cloud-init. See LICENSE file for license information. +import copy import httpretty import mock @@ -195,6 +196,34 @@ class TestEc2(test_helpers.HttprettyTestCase): return ds @httpretty.activate + def test_network_config_property_returns_version_1_network_data(self): + """network_config property returns network version 1 for metadata.""" + ds = self._setup_ds( + platform_data=self.valid_platform_data, + sys_cfg={'datasource': {'Ec2': {'strict_id': True}}}, + md=DEFAULT_METADATA) + ds.get_data() + mac1 = '06:17:04:d7:26:09' # Defined in DEFAULT_METADATA + expected = {'version': 1, 'config': [ + {'mac_address': '06:17:04:d7:26:09', 'name': 'eth9', + 'subnets': [{'type': 'dhcp4'}, {'type': 'dhcp6'}], + 'type': 'physical'}]} + patch_path = ( + 'cloudinit.sources.DataSourceEc2.net.get_interfaces_by_mac') + with mock.patch(patch_path) as m_get_interfaces_by_mac: + m_get_interfaces_by_mac.return_value = {mac1: 'eth9'} + self.assertEqual(expected, ds.network_config) + + def test_network_config_property_is_cached_in_datasource(self): + """network_config property is cached in DataSourceEc2.""" + ds = self._setup_ds( + platform_data=self.valid_platform_data, + sys_cfg={'datasource': {'Ec2': {'strict_id': True}}}, + md=DEFAULT_METADATA) + ds._network_config = {'cached': 'data'} + self.assertEqual({'cached': 'data'}, ds.network_config) + + @httpretty.activate @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery') def test_valid_platform_with_strict_true(self, m_dhcp): """Valid platform data should return true with strict_id true.""" @@ -287,4 +316,72 @@ class TestEc2(test_helpers.HttprettyTestCase): self.assertIn('Crawl of metadata service took', self.logs.getvalue()) +class TestConvertEc2MetadataNetworkConfig(test_helpers.CiTestCase): + + def setUp(self): + super(TestConvertEc2MetadataNetworkConfig, self).setUp() + self.mac1 = '06:17:04:d7:26:09' + self.network_metadata = { + 'network': {'interfaces': {'macs': { + self.mac1: {'public-ipv4s': '172.31.2.16'}}}}} + + def test_convert_ec2_metadata_network_config_skips_absent_macs(self): + """Any mac absent from metadata is skipped by network config.""" + macs_to_nics = {self.mac1: 'eth9', 'DE:AD:BE:EF:FF:FF': 'vitualnic2'} + + # DE:AD:BE:EF:FF:FF represented by OS but not in metadata + expected = {'version': 1, 'config': [ + {'mac_address': self.mac1, 'type': 'physical', + 'name': 'eth9', 'subnets': [{'type': 'dhcp4'}]}]} + self.assertEqual( + expected, + ec2.convert_ec2_metadata_network_config( + self.network_metadata, macs_to_nics)) + + def test_convert_ec2_metadata_network_config_handles_only_dhcp6(self): + """Config dhcp6 when ipv6s is in metadata for a mac.""" + macs_to_nics = {self.mac1: 'eth9'} + network_metadata_ipv6 = copy.deepcopy(self.network_metadata) + nic1_metadata = ( + network_metadata_ipv6['network']['interfaces']['macs'][self.mac1]) + nic1_metadata['ipv6s'] = '2620:0:1009:fd00:e442:c88d:c04d:dc85/64' + nic1_metadata.pop('public-ipv4s') + expected = {'version': 1, 'config': [ + {'mac_address': self.mac1, 'type': 'physical', + 'name': 'eth9', 'subnets': [{'type': 'dhcp6'}]}]} + self.assertEqual( + expected, + ec2.convert_ec2_metadata_network_config( + network_metadata_ipv6, macs_to_nics)) + + def test_convert_ec2_metadata_network_config_handles_dhcp4_and_dhcp6(self): + """Config both dhcp4 and dhcp6 when both vpc-ipv6 and ipv4 exists.""" + macs_to_nics = {self.mac1: 'eth9'} + network_metadata_both = copy.deepcopy(self.network_metadata) + nic1_metadata = ( + network_metadata_both['network']['interfaces']['macs'][self.mac1]) + nic1_metadata['ipv6s'] = '2620:0:1009:fd00:e442:c88d:c04d:dc85/64' + expected = {'version': 1, 'config': [ + {'mac_address': self.mac1, 'type': 'physical', + 'name': 'eth9', + 'subnets': [{'type': 'dhcp4'}, {'type': 'dhcp6'}]}]} + self.assertEqual( + expected, + ec2.convert_ec2_metadata_network_config( + network_metadata_both, macs_to_nics)) + + def test_convert_ec2_metadata_gets_macs_from_get_interfaces_by_mac(self): + """Convert Ec2 Metadata calls get_interfaces_by_mac by default.""" + expected = {'version': 1, 'config': [ + {'mac_address': self.mac1, 'type': 'physical', + 'name': 'eth9', + 'subnets': [{'type': 'dhcp4'}]}]} + patch_path = ( + 'cloudinit.sources.DataSourceEc2.net.get_interfaces_by_mac') + with mock.patch(patch_path) as m_get_interfaces_by_mac: + m_get_interfaces_by_mac.return_value = {self.mac1: 'eth9'} + self.assertEqual( + expected, + ec2.convert_ec2_metadata_network_config(self.network_metadata)) + # vi: ts=4 expandtab diff --git a/tests/unittests/test_distros/test_debian.py b/tests/unittests/test_distros/test_debian.py index 2330ad52..72d3aad6 100644 --- a/tests/unittests/test_distros/test_debian.py +++ b/tests/unittests/test_distros/test_debian.py @@ -1,67 +1,85 @@ # This file is part of cloud-init. See LICENSE file for license information. -from ..helpers import (CiTestCase, mock) - -from cloudinit.distros.debian import apply_locale +from cloudinit import distros from cloudinit import util +from ..helpers import (FilesystemMockingTestCase, mock) @mock.patch("cloudinit.distros.debian.util.subp") -class TestDebianApplyLocale(CiTestCase): +class TestDebianApplyLocale(FilesystemMockingTestCase): + + def setUp(self): + super(TestDebianApplyLocale, self).setUp() + self.new_root = self.tmp_dir() + self.patchOS(self.new_root) + self.patchUtils(self.new_root) + self.spath = self.tmp_path('etc/default/locale', self.new_root) + cls = distros.fetch("debian") + self.distro = cls("debian", {}, None) + def test_no_rerun(self, m_subp): """If system has defined locale, no re-run is expected.""" - spath = self.tmp_path("default-locale") m_subp.return_value = (None, None) locale = 'en_US.UTF-8' - util.write_file(spath, 'LANG=%s\n' % locale, omode="w") - apply_locale(locale, sys_path=spath) + util.write_file(self.spath, 'LANG=%s\n' % locale, omode="w") + self.distro.apply_locale(locale, out_fn=self.spath) m_subp.assert_not_called() + def test_no_regen_on_c_utf8(self, m_subp): + """If locale is set to C.UTF8, do not attempt to call locale-gen""" + m_subp.return_value = (None, None) + locale = 'C.UTF-8' + util.write_file(self.spath, 'LANG=%s\n' % 'en_US.UTF-8', omode="w") + self.distro.apply_locale(locale, out_fn=self.spath) + self.assertEqual( + [['update-locale', '--locale-file=' + self.spath, + 'LANG=%s' % locale]], + [p[0][0] for p in m_subp.call_args_list]) + def test_rerun_if_different(self, m_subp): """If system has different locale, locale-gen should be called.""" - spath = self.tmp_path("default-locale") m_subp.return_value = (None, None) locale = 'en_US.UTF-8' - util.write_file(spath, 'LANG=fr_FR.UTF-8', omode="w") - apply_locale(locale, sys_path=spath) + util.write_file(self.spath, 'LANG=fr_FR.UTF-8', omode="w") + self.distro.apply_locale(locale, out_fn=self.spath) self.assertEqual( [['locale-gen', locale], - ['update-locale', '--locale-file=' + spath, 'LANG=%s' % locale]], + ['update-locale', '--locale-file=' + self.spath, + 'LANG=%s' % locale]], [p[0][0] for p in m_subp.call_args_list]) def test_rerun_if_no_file(self, m_subp): """If system has no locale file, locale-gen should be called.""" - spath = self.tmp_path("default-locale") m_subp.return_value = (None, None) locale = 'en_US.UTF-8' - apply_locale(locale, sys_path=spath) + self.distro.apply_locale(locale, out_fn=self.spath) self.assertEqual( [['locale-gen', locale], - ['update-locale', '--locale-file=' + spath, 'LANG=%s' % locale]], + ['update-locale', '--locale-file=' + self.spath, + 'LANG=%s' % locale]], [p[0][0] for p in m_subp.call_args_list]) def test_rerun_on_unset_system_locale(self, m_subp): """If system has unset locale, locale-gen should be called.""" m_subp.return_value = (None, None) - spath = self.tmp_path("default-locale") locale = 'en_US.UTF-8' - util.write_file(spath, 'LANG=', omode="w") - apply_locale(locale, sys_path=spath) + util.write_file(self.spath, 'LANG=', omode="w") + self.distro.apply_locale(locale, out_fn=self.spath) self.assertEqual( [['locale-gen', locale], - ['update-locale', '--locale-file=' + spath, 'LANG=%s' % locale]], + ['update-locale', '--locale-file=' + self.spath, + 'LANG=%s' % locale]], [p[0][0] for p in m_subp.call_args_list]) def test_rerun_on_mismatched_keys(self, m_subp): """If key is LC_ALL and system has only LANG, rerun is expected.""" m_subp.return_value = (None, None) - spath = self.tmp_path("default-locale") locale = 'en_US.UTF-8' - util.write_file(spath, 'LANG=', omode="w") - apply_locale(locale, sys_path=spath, keyname='LC_ALL') + util.write_file(self.spath, 'LANG=', omode="w") + self.distro.apply_locale(locale, out_fn=self.spath, keyname='LC_ALL') self.assertEqual( [['locale-gen', locale], - ['update-locale', '--locale-file=' + spath, + ['update-locale', '--locale-file=' + self.spath, 'LC_ALL=%s' % locale]], [p[0][0] for p in m_subp.call_args_list]) @@ -69,14 +87,14 @@ class TestDebianApplyLocale(CiTestCase): """locale as None or "" is invalid and should raise ValueError.""" with self.assertRaises(ValueError) as ctext_m: - apply_locale(None) + self.distro.apply_locale(None) m_subp.assert_not_called() self.assertEqual( 'Failed to provide locale value.', str(ctext_m.exception)) with self.assertRaises(ValueError) as ctext_m: - apply_locale("") + self.distro.apply_locale("") m_subp.assert_not_called() self.assertEqual( 'Failed to provide locale value.', str(ctext_m.exception)) diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py index c9be277e..b355a19e 100644 --- a/tests/unittests/test_distros/test_generic.py +++ b/tests/unittests/test_distros/test_generic.py @@ -228,5 +228,21 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): os.symlink('/', '/run/systemd/system') self.assertFalse(d.uses_systemd()) + @mock.patch('cloudinit.distros.debian.read_system_locale') + def test_get_locale_ubuntu(self, m_locale): + """Test ubuntu distro returns locale set to C.UTF-8""" + m_locale.return_value = 'C.UTF-8' + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + locale = d.get_locale() + self.assertEqual('C.UTF-8', locale) + + def test_get_locale_rhel(self): + """Test rhel distro returns NotImplementedError exception""" + cls = distros.fetch("rhel") + d = cls("rhel", {}, None) + with self.assertRaises(NotImplementedError): + d.get_locale() + # vi: ts=4 expandtab diff --git a/tests/unittests/test_distros/test_opensuse.py b/tests/unittests/test_distros/test_opensuse.py new file mode 100644 index 00000000..bdb1d633 --- /dev/null +++ b/tests/unittests/test_distros/test_opensuse.py @@ -0,0 +1,12 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from ..helpers import CiTestCase + +from . import _get_distro + + +class TestopenSUSE(CiTestCase): + + def test_get_distro(self): + distro = _get_distro("opensuse") + self.assertEqual(distro.osfamily, 'suse') diff --git a/tests/unittests/test_distros/test_sles.py b/tests/unittests/test_distros/test_sles.py new file mode 100644 index 00000000..c656aacc --- /dev/null +++ b/tests/unittests/test_distros/test_sles.py @@ -0,0 +1,12 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from ..helpers import CiTestCase + +from . import _get_distro + + +class TestSLES(CiTestCase): + + def test_get_distro(self): + distro = _get_distro("sles") + self.assertEqual(distro.osfamily, 'suse') diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py index 929f786e..1873c3e1 100644 --- a/tests/unittests/test_handler/test_handler_debug.py +++ b/tests/unittests/test_handler/test_handler_debug.py @@ -11,7 +11,7 @@ from cloudinit import util from cloudinit.sources import DataSourceNone -from .. import helpers as t_help +from ..helpers import (FilesystemMockingTestCase, mock) import logging import shutil @@ -20,7 +20,8 @@ import tempfile LOG = logging.getLogger(__name__) -class TestDebug(t_help.FilesystemMockingTestCase): +@mock.patch('cloudinit.distros.debian.read_system_locale') +class TestDebug(FilesystemMockingTestCase): def setUp(self): super(TestDebug, self).setUp() self.new_root = tempfile.mkdtemp() @@ -36,7 +37,8 @@ class TestDebug(t_help.FilesystemMockingTestCase): ds.metadata.update(metadata) return cloud.Cloud(ds, paths, {}, d, None) - def test_debug_write(self): + def test_debug_write(self, m_locale): + m_locale.return_value = 'en_US.UTF-8' cfg = { 'abc': '123', 'c': u'\u20a0', @@ -54,7 +56,8 @@ class TestDebug(t_help.FilesystemMockingTestCase): for k in cfg.keys(): self.assertIn(k, contents) - def test_debug_no_write(self): + def test_debug_no_write(self, m_locale): + m_locale.return_value = 'en_US.UTF-8' cfg = { 'abc': '123', 'debug': { diff --git a/tests/unittests/test_handler/test_handler_landscape.py b/tests/unittests/test_handler/test_handler_landscape.py new file mode 100644 index 00000000..7c247fa9 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_landscape.py @@ -0,0 +1,129 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit.config import cc_landscape +from cloudinit.sources import DataSourceNone +from cloudinit import (distros, helpers, cloud, util) +from ..helpers import FilesystemMockingTestCase, mock, wrap_and_call + +from configobj import ConfigObj +import logging + + +LOG = logging.getLogger(__name__) + + +class TestLandscape(FilesystemMockingTestCase): + + with_logs = True + + def setUp(self): + super(TestLandscape, self).setUp() + self.new_root = self.tmp_dir() + self.conf = self.tmp_path('client.conf', self.new_root) + self.default_file = self.tmp_path('default_landscape', self.new_root) + + def _get_cloud(self, distro): + self.patchUtils(self.new_root) + paths = helpers.Paths({'templates_dir': self.new_root}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + return cloud.Cloud(myds, paths, {}, mydist, None) + + def test_handler_skips_empty_landscape_cloudconfig(self): + """Empty landscape cloud-config section does no work.""" + mycloud = self._get_cloud('ubuntu') + mycloud.distro = mock.MagicMock() + cfg = {'landscape': {}} + cc_landscape.handle('notimportant', cfg, mycloud, LOG, None) + self.assertFalse(mycloud.distro.install_packages.called) + + def test_handler_error_on_invalid_landscape_type(self): + """Raise an error when landscape configuraiton option is invalid.""" + mycloud = self._get_cloud('ubuntu') + cfg = {'landscape': 'wrongtype'} + with self.assertRaises(RuntimeError) as context_manager: + cc_landscape.handle('notimportant', cfg, mycloud, LOG, None) + self.assertIn( + "'landscape' key existed in config, but not a dict", + str(context_manager.exception)) + + @mock.patch('cloudinit.config.cc_landscape.util') + def test_handler_restarts_landscape_client(self, m_util): + """handler restarts lansdscape-client after install.""" + mycloud = self._get_cloud('ubuntu') + cfg = {'landscape': {'client': {}}} + wrap_and_call( + 'cloudinit.config.cc_landscape', + {'LSC_CLIENT_CFG_FILE': {'new': self.conf}}, + cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None) + self.assertEqual( + [mock.call(['service', 'landscape-client', 'restart'])], + m_util.subp.call_args_list) + + def test_handler_installs_client_and_creates_config_file(self): + """Write landscape client.conf and install landscape-client.""" + mycloud = self._get_cloud('ubuntu') + cfg = {'landscape': {'client': {}}} + expected = {'client': { + 'log_level': 'info', + 'url': 'https://landscape.canonical.com/message-system', + 'ping_url': 'http://landscape.canonical.com/ping', + 'data_path': '/var/lib/landscape/client'}} + mycloud.distro = mock.MagicMock() + wrap_and_call( + 'cloudinit.config.cc_landscape', + {'LSC_CLIENT_CFG_FILE': {'new': self.conf}, + 'LS_DEFAULT_FILE': {'new': self.default_file}}, + cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None) + self.assertEqual( + [mock.call('landscape-client')], + mycloud.distro.install_packages.call_args) + self.assertEqual(expected, dict(ConfigObj(self.conf))) + self.assertIn( + 'Wrote landscape config file to {0}'.format(self.conf), + self.logs.getvalue()) + default_content = util.load_file(self.default_file) + self.assertEqual('RUN=1\n', default_content) + + def test_handler_writes_merged_client_config_file_with_defaults(self): + """Merge and write options from LSC_CLIENT_CFG_FILE with defaults.""" + # Write existing sparse client.conf file + util.write_file(self.conf, '[client]\ncomputer_title = My PC\n') + mycloud = self._get_cloud('ubuntu') + cfg = {'landscape': {'client': {}}} + expected = {'client': { + 'log_level': 'info', + 'url': 'https://landscape.canonical.com/message-system', + 'ping_url': 'http://landscape.canonical.com/ping', + 'data_path': '/var/lib/landscape/client', + 'computer_title': 'My PC'}} + wrap_and_call( + 'cloudinit.config.cc_landscape', + {'LSC_CLIENT_CFG_FILE': {'new': self.conf}}, + cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None) + self.assertEqual(expected, dict(ConfigObj(self.conf))) + self.assertIn( + 'Wrote landscape config file to {0}'.format(self.conf), + self.logs.getvalue()) + + def test_handler_writes_merged_provided_cloudconfig_with_defaults(self): + """Merge and write options from cloud-config options with defaults.""" + # Write empty sparse client.conf file + util.write_file(self.conf, '') + mycloud = self._get_cloud('ubuntu') + cfg = {'landscape': {'client': {'computer_title': 'My PC'}}} + expected = {'client': { + 'log_level': 'info', + 'url': 'https://landscape.canonical.com/message-system', + 'ping_url': 'http://landscape.canonical.com/ping', + 'data_path': '/var/lib/landscape/client', + 'computer_title': 'My PC'}} + wrap_and_call( + 'cloudinit.config.cc_landscape', + {'LSC_CLIENT_CFG_FILE': {'new': self.conf}}, + cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None) + self.assertEqual(expected, dict(ConfigObj(self.conf))) + self.assertIn( + 'Wrote landscape config file to {0}'.format(self.conf), + self.logs.getvalue()) diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py index e9a810c5..a789db32 100644 --- a/tests/unittests/test_handler/test_handler_locale.py +++ b/tests/unittests/test_handler/test_handler_locale.py @@ -20,6 +20,8 @@ from configobj import ConfigObj from six import BytesIO import logging +import mock +import os import shutil import tempfile @@ -27,6 +29,9 @@ LOG = logging.getLogger(__name__) class TestLocale(t_help.FilesystemMockingTestCase): + + with_logs = True + def setUp(self): super(TestLocale, self).setUp() self.new_root = tempfile.mkdtemp() @@ -49,9 +54,58 @@ class TestLocale(t_help.FilesystemMockingTestCase): } cc = self._get_cloud('sles') cc_locale.handle('cc_locale', cfg, cc, LOG, []) + if cc.distro.uses_systemd(): + locale_conf = cc.distro.systemd_locale_conf_fn + else: + locale_conf = cc.distro.locale_conf_fn + contents = util.load_file(locale_conf, decode=False) + n_cfg = ConfigObj(BytesIO(contents)) + if cc.distro.uses_systemd(): + self.assertEqual({'LANG': cfg['locale']}, dict(n_cfg)) + else: + self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg)) + + def test_set_locale_sles_default(self): + cfg = {} + cc = self._get_cloud('sles') + cc_locale.handle('cc_locale', cfg, cc, LOG, []) - contents = util.load_file('/etc/sysconfig/language', decode=False) + if cc.distro.uses_systemd(): + locale_conf = cc.distro.systemd_locale_conf_fn + keyname = 'LANG' + else: + locale_conf = cc.distro.locale_conf_fn + keyname = 'RC_LANG' + + contents = util.load_file(locale_conf, decode=False) n_cfg = ConfigObj(BytesIO(contents)) - self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg)) + self.assertEqual({keyname: 'en_US.UTF-8'}, dict(n_cfg)) + + def test_locale_update_config_if_different_than_default(self): + """Test cc_locale writes updates conf if different than default""" + locale_conf = os.path.join(self.new_root, "etc/default/locale") + util.write_file(locale_conf, 'LANG="en_US.UTF-8"\n') + cfg = {'locale': 'C.UTF-8'} + cc = self._get_cloud('ubuntu') + with mock.patch('cloudinit.distros.debian.util.subp') as m_subp: + with mock.patch('cloudinit.distros.debian.LOCALE_CONF_FN', + locale_conf): + cc_locale.handle('cc_locale', cfg, cc, LOG, []) + m_subp.assert_called_with(['update-locale', + '--locale-file=%s' % locale_conf, + 'LANG=C.UTF-8'], capture=False) + + def test_locale_rhel_defaults_en_us_utf8(self): + """Test cc_locale gets en_US.UTF-8 from distro get_locale fallback""" + cfg = {} + cc = self._get_cloud('rhel') + update_sysconfig = 'cloudinit.distros.rhel_util.update_sysconfig_file' + with mock.patch.object(cc.distro, 'uses_systemd') as m_use_sd: + m_use_sd.return_value = True + with mock.patch(update_sysconfig) as m_update_syscfg: + cc_locale.handle('cc_locale', cfg, cc, LOG, []) + m_update_syscfg.assert_called_with('/etc/locale.conf', + {'LANG': 'en_US.UTF-8'}) + # vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_puppet.py b/tests/unittests/test_handler/test_handler_puppet.py new file mode 100644 index 00000000..805c76ba --- /dev/null +++ b/tests/unittests/test_handler/test_handler_puppet.py @@ -0,0 +1,142 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit.config import cc_puppet +from cloudinit.sources import DataSourceNone +from cloudinit import (distros, helpers, cloud, util) +from ..helpers import CiTestCase, mock + +import logging + + +LOG = logging.getLogger(__name__) + + +@mock.patch('cloudinit.config.cc_puppet.util') +@mock.patch('cloudinit.config.cc_puppet.os') +class TestAutostartPuppet(CiTestCase): + + with_logs = True + + def test_wb_autostart_puppet_updates_puppet_default(self, m_os, m_util): + """Update /etc/default/puppet to autostart if it exists.""" + + def _fake_exists(path): + return path == '/etc/default/puppet' + + m_os.path.exists.side_effect = _fake_exists + cc_puppet._autostart_puppet(LOG) + self.assertEqual( + [mock.call(['sed', '-i', '-e', 's/^START=.*/START=yes/', + '/etc/default/puppet'], capture=False)], + m_util.subp.call_args_list) + + def test_wb_autostart_pupppet_enables_puppet_systemctl(self, m_os, m_util): + """If systemctl is present, enable puppet via systemctl.""" + + def _fake_exists(path): + return path == '/bin/systemctl' + + m_os.path.exists.side_effect = _fake_exists + cc_puppet._autostart_puppet(LOG) + expected_calls = [mock.call( + ['/bin/systemctl', 'enable', 'puppet.service'], capture=False)] + self.assertEqual(expected_calls, m_util.subp.call_args_list) + + def test_wb_autostart_pupppet_enables_puppet_chkconfig(self, m_os, m_util): + """If chkconfig is present, enable puppet via checkcfg.""" + + def _fake_exists(path): + return path == '/sbin/chkconfig' + + m_os.path.exists.side_effect = _fake_exists + cc_puppet._autostart_puppet(LOG) + expected_calls = [mock.call( + ['/sbin/chkconfig', 'puppet', 'on'], capture=False)] + self.assertEqual(expected_calls, m_util.subp.call_args_list) + + +@mock.patch('cloudinit.config.cc_puppet._autostart_puppet') +class TestPuppetHandle(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestPuppetHandle, self).setUp() + self.new_root = self.tmp_dir() + self.conf = self.tmp_path('puppet.conf') + + def _get_cloud(self, distro): + paths = helpers.Paths({'templates_dir': self.new_root}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + return cloud.Cloud(myds, paths, {}, mydist, None) + + def test_handler_skips_missing_puppet_key_in_cloudconfig(self, m_auto): + """Cloud-config containing no 'puppet' key is skipped.""" + mycloud = self._get_cloud('ubuntu') + cfg = {} + cc_puppet.handle('notimportant', cfg, mycloud, LOG, None) + self.assertIn( + "no 'puppet' configuration found", self.logs.getvalue()) + self.assertEqual(0, m_auto.call_count) + + @mock.patch('cloudinit.config.cc_puppet.util.subp') + def test_handler_puppet_config_starts_puppet_service(self, m_subp, m_auto): + """Cloud-config 'puppet' configuration starts puppet.""" + mycloud = self._get_cloud('ubuntu') + cfg = {'puppet': {'install': False}} + cc_puppet.handle('notimportant', cfg, mycloud, LOG, None) + self.assertEqual(1, m_auto.call_count) + self.assertEqual( + [mock.call(['service', 'puppet', 'start'], capture=False)], + m_subp.call_args_list) + + @mock.patch('cloudinit.config.cc_puppet.util.subp') + def test_handler_empty_puppet_config_installs_puppet(self, m_subp, m_auto): + """Cloud-config empty 'puppet' configuration installs latest puppet.""" + mycloud = self._get_cloud('ubuntu') + mycloud.distro = mock.MagicMock() + cfg = {'puppet': {}} + cc_puppet.handle('notimportant', cfg, mycloud, LOG, None) + self.assertEqual( + [mock.call(('puppet', None))], + mycloud.distro.install_packages.call_args_list) + + @mock.patch('cloudinit.config.cc_puppet.util.subp') + def test_handler_puppet_config_installs_puppet_on_true(self, m_subp, _): + """Cloud-config with 'puppet' key installs when 'install' is True.""" + mycloud = self._get_cloud('ubuntu') + mycloud.distro = mock.MagicMock() + cfg = {'puppet': {'install': True}} + cc_puppet.handle('notimportant', cfg, mycloud, LOG, None) + self.assertEqual( + [mock.call(('puppet', None))], + mycloud.distro.install_packages.call_args_list) + + @mock.patch('cloudinit.config.cc_puppet.util.subp') + def test_handler_puppet_config_installs_puppet_version(self, m_subp, _): + """Cloud-config 'puppet' configuration can specify a version.""" + mycloud = self._get_cloud('ubuntu') + mycloud.distro = mock.MagicMock() + cfg = {'puppet': {'version': '3.8'}} + cc_puppet.handle('notimportant', cfg, mycloud, LOG, None) + self.assertEqual( + [mock.call(('puppet', '3.8'))], + mycloud.distro.install_packages.call_args_list) + + @mock.patch('cloudinit.config.cc_puppet.util.subp') + def test_handler_puppet_config_updates_puppet_conf(self, m_subp, m_auto): + """When 'conf' is provided update values in PUPPET_CONF_PATH.""" + mycloud = self._get_cloud('ubuntu') + cfg = { + 'puppet': { + 'conf': {'agent': {'server': 'puppetmaster.example.org'}}}} + util.write_file(self.conf, '[agent]\nserver = origpuppet\nother = 3') + puppet_conf_path = 'cloudinit.config.cc_puppet.PUPPET_CONF_PATH' + mycloud.distro = mock.MagicMock() + with mock.patch(puppet_conf_path, self.conf): + cc_puppet.handle('notimportant', cfg, mycloud, LOG, None) + content = util.load_file(self.conf) + expected = '[agent]\nserver = puppetmaster.example.org\nother = 3\n\n' + self.assertEqual(expected, content) diff --git a/tests/unittests/test_handler/test_handler_runcmd.py b/tests/unittests/test_handler/test_handler_runcmd.py new file mode 100644 index 00000000..7880ee72 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_runcmd.py @@ -0,0 +1,108 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit.config import cc_runcmd +from cloudinit.sources import DataSourceNone +from cloudinit import (distros, helpers, cloud, util) +from ..helpers import FilesystemMockingTestCase, skipIf + +import logging +import os +import stat + +try: + import jsonschema + assert jsonschema # avoid pyflakes error F401: import unused + _missing_jsonschema_dep = False +except ImportError: + _missing_jsonschema_dep = True + +LOG = logging.getLogger(__name__) + + +class TestRuncmd(FilesystemMockingTestCase): + + with_logs = True + + def setUp(self): + super(TestRuncmd, self).setUp() + self.subp = util.subp + self.new_root = self.tmp_dir() + + def _get_cloud(self, distro): + self.patchUtils(self.new_root) + paths = helpers.Paths({'scripts': self.new_root}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + paths.datasource = myds + return cloud.Cloud(myds, paths, {}, mydist, None) + + def test_handler_skip_if_no_runcmd(self): + """When the provided config doesn't contain runcmd, skip it.""" + cfg = {} + mycloud = self._get_cloud('ubuntu') + cc_runcmd.handle('notimportant', cfg, mycloud, LOG, None) + self.assertIn( + "Skipping module named notimportant, no 'runcmd' key", + self.logs.getvalue()) + + def test_handler_invalid_command_set(self): + """Commands which can't be converted to shell will raise errors.""" + invalid_config = {'runcmd': 1} + cc = self._get_cloud('ubuntu') + cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, []) + self.assertIn( + 'Failed to shellify 1 into file' + ' /var/lib/cloud/instances/iid-datasource-none/scripts/runcmd', + self.logs.getvalue()) + + @skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency") + def test_handler_schema_validation_warns_non_array_type(self): + """Schema validation warns of non-array type for runcmd key. + + Schema validation is not strict, so runcmd attempts to shellify the + invalid content. + """ + invalid_config = {'runcmd': 1} + cc = self._get_cloud('ubuntu') + cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, []) + self.assertIn( + 'Invalid config:\nruncmd: 1 is not of type \'array\'', + self.logs.getvalue()) + self.assertIn('Failed to shellify', self.logs.getvalue()) + + @skipIf(_missing_jsonschema_dep, 'No python-jsonschema dependency') + def test_handler_schema_validation_warns_non_array_item_type(self): + """Schema validation warns of non-array or string runcmd items. + + Schema validation is not strict, so runcmd attempts to shellify the + invalid content. + """ + invalid_config = { + 'runcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]} + cc = self._get_cloud('ubuntu') + cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, []) + expected_warnings = [ + 'runcmd.1: 20 is not valid under any of the given schemas', + 'runcmd.3: {\'a\': \'n\'} is not valid under any of the given' + ' schema' + ] + logs = self.logs.getvalue() + for warning in expected_warnings: + self.assertIn(warning, logs) + self.assertIn('Failed to shellify', logs) + + def test_handler_write_valid_runcmd_schema_to_file(self): + """Valid runcmd schema is written to a runcmd shell script.""" + valid_config = {'runcmd': [['ls', '/']]} + cc = self._get_cloud('ubuntu') + cc_runcmd.handle('cc_runcmd', valid_config, cc, LOG, []) + runcmd_file = os.path.join( + self.new_root, + 'var/lib/cloud/instances/iid-datasource-none/scripts/runcmd') + self.assertEqual("#!/bin/sh\n'ls' '/'\n", util.load_file(runcmd_file)) + file_stat = os.stat(runcmd_file) + self.assertEqual(0o700, stat.S_IMODE(file_stat.st_mode)) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py index 4b18de75..8165bf9a 100644 --- a/tests/unittests/test_handler/test_handler_set_hostname.py +++ b/tests/unittests/test_handler/test_handler_set_hostname.py @@ -70,7 +70,8 @@ class TestHostname(t_help.FilesystemMockingTestCase): cc = cloud.Cloud(ds, paths, {}, distro, None) self.patchUtils(self.tmp) cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, []) - contents = util.load_file("/etc/HOSTNAME") - self.assertEqual('blah', contents.strip()) + if not distro.uses_systemd(): + contents = util.load_file(distro.hostname_conf_fn) + self.assertEqual('blah', contents.strip()) # vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py index eda4802a..640f11d4 100644 --- a/tests/unittests/test_handler/test_schema.py +++ b/tests/unittests/test_handler/test_schema.py @@ -1,9 +1,9 @@ # This file is part of cloud-init. See LICENSE file for license information. from cloudinit.config.schema import ( - CLOUD_CONFIG_HEADER, SchemaValidationError, get_schema_doc, - validate_cloudconfig_file, validate_cloudconfig_schema, - main) + CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file, + get_schema_doc, get_schema, validate_cloudconfig_file, + validate_cloudconfig_schema, main) from cloudinit.util import write_file from ..helpers import CiTestCase, mock, skipIf @@ -11,6 +11,7 @@ from ..helpers import CiTestCase, mock, skipIf from copy import copy from six import StringIO from textwrap import dedent +from yaml import safe_load try: import jsonschema @@ -20,6 +21,29 @@ except ImportError: _missing_jsonschema_dep = True +class GetSchemaTest(CiTestCase): + + def test_get_schema_coalesces_known_schema(self): + """Every cloudconfig module with schema is listed in allOf keyword.""" + schema = get_schema() + self.assertItemsEqual( + ['cc_ntp', 'cc_runcmd'], + [subschema['id'] for subschema in schema['allOf']]) + self.assertEqual('cloud-config-schema', schema['id']) + self.assertEqual( + 'http://json-schema.org/draft-04/schema#', + schema['$schema']) + # FULL_SCHEMA is updated by the get_schema call + from cloudinit.config.schema import FULL_SCHEMA + self.assertItemsEqual(['id', '$schema', 'allOf'], FULL_SCHEMA.keys()) + + def test_get_schema_returns_global_when_set(self): + """When FULL_SCHEMA global is already set, get_schema returns it.""" + m_schema_path = 'cloudinit.config.schema.FULL_SCHEMA' + with mock.patch(m_schema_path, {'here': 'iam'}): + self.assertEqual({'here': 'iam'}, get_schema()) + + class SchemaValidationErrorTest(CiTestCase): """Test validate_cloudconfig_schema""" @@ -151,11 +175,11 @@ class GetSchemaDocTest(CiTestCase): full_schema.update( {'properties': { 'prop1': {'type': 'array', 'description': 'prop-description', - 'items': {'type': 'int'}}}}) + 'items': {'type': 'integer'}}}}) self.assertEqual( dedent(""" name - --- + ---- **Summary:** title description @@ -167,27 +191,71 @@ class GetSchemaDocTest(CiTestCase): **Supported distros:** debian, rhel **Config schema**: - **prop1:** (array of int) prop-description\n\n"""), + **prop1:** (array of integer) prop-description\n\n"""), + get_schema_doc(full_schema)) + + def test_get_schema_doc_handles_multiple_types(self): + """get_schema_doc delimits multiple property types with a '/'.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'properties': { + 'prop1': {'type': ['string', 'integer'], + 'description': 'prop-description'}}}) + self.assertIn( + '**prop1:** (string/integer) prop-description', + get_schema_doc(full_schema)) + + def test_get_schema_doc_handles_nested_oneof_property_types(self): + """get_schema_doc describes array items oneOf declarations in type.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'properties': { + 'prop1': {'type': 'array', + 'items': { + 'oneOf': [{'type': 'string'}, + {'type': 'integer'}]}, + 'description': 'prop-description'}}}) + self.assertIn( + '**prop1:** (array of (string)/(integer)) prop-description', get_schema_doc(full_schema)) def test_get_schema_doc_returns_restructured_text_with_examples(self): """get_schema_doc returns indented examples when present in schema.""" full_schema = copy(self.required_schema) full_schema.update( - {'examples': {'ex1': [1, 2, 3]}, + {'examples': [{'ex1': [1, 2, 3]}], 'properties': { 'prop1': {'type': 'array', 'description': 'prop-description', - 'items': {'type': 'int'}}}}) + 'items': {'type': 'integer'}}}}) self.assertIn( dedent(""" **Config schema**: - **prop1:** (array of int) prop-description + **prop1:** (array of integer) prop-description **Examples**:: ex1"""), get_schema_doc(full_schema)) + def test_get_schema_doc_handles_unstructured_examples(self): + """get_schema_doc properly indented examples which as just strings.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'examples': ['My example:\n [don\'t, expand, "this"]'], + 'properties': { + 'prop1': {'type': 'array', 'description': 'prop-description', + 'items': {'type': 'integer'}}}}) + self.assertIn( + dedent(""" + **Config schema**: + **prop1:** (array of integer) prop-description + + **Examples**:: + + My example: + [don't, expand, "this"]"""), + get_schema_doc(full_schema)) + def test_get_schema_doc_raises_key_errors(self): """get_schema_doc raises KeyErrors on missing keys.""" for key in self.required_schema: @@ -198,13 +266,78 @@ class GetSchemaDocTest(CiTestCase): self.assertIn(key, str(context_mgr.exception)) +class AnnotatedCloudconfigFileTest(CiTestCase): + maxDiff = None + + def test_annotated_cloudconfig_file_no_schema_errors(self): + """With no schema_errors, print the original content.""" + content = b'ntp:\n pools: [ntp1.pools.com]\n' + self.assertEqual( + content, + annotated_cloudconfig_file({}, content, schema_errors=[])) + + def test_annotated_cloudconfig_file_schema_annotates_and_adds_footer(self): + """With schema_errors, error lines are annotated and a footer added.""" + content = dedent("""\ + #cloud-config + # comment + ntp: + pools: [-99, 75] + """).encode() + expected = dedent("""\ + #cloud-config + # comment + ntp: # E1 + pools: [-99, 75] # E2,E3 + + # Errors: ------------- + # E1: Some type error + # E2: -99 is not a string + # E3: 75 is not a string + + """) + parsed_config = safe_load(content[13:]) + schema_errors = [ + ('ntp', 'Some type error'), ('ntp.pools.0', '-99 is not a string'), + ('ntp.pools.1', '75 is not a string')] + self.assertEqual( + expected, + annotated_cloudconfig_file(parsed_config, content, schema_errors)) + + def test_annotated_cloudconfig_file_annotates_separate_line_items(self): + """Errors are annotated for lists with items on separate lines.""" + content = dedent("""\ + #cloud-config + # comment + ntp: + pools: + - -99 + - 75 + """).encode() + expected = dedent("""\ + ntp: + pools: + - -99 # E1 + - 75 # E2 + """) + parsed_config = safe_load(content[13:]) + schema_errors = [ + ('ntp.pools.0', '-99 is not a string'), + ('ntp.pools.1', '75 is not a string')] + self.assertIn( + expected, + annotated_cloudconfig_file(parsed_config, content, schema_errors)) + + class MainTest(CiTestCase): def test_main_missing_args(self): """Main exits non-zero and reports an error on missing parameters.""" with mock.patch('sys.argv', ['mycmd']): with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: - self.assertEqual(1, main(), 'Expected non-zero exit code') + with self.assertRaises(SystemExit) as context_manager: + main() + self.assertEqual('1', str(context_manager.exception)) self.assertEqual( 'Expected either --config-file argument or --doc\n', m_stderr.getvalue()) @@ -216,13 +349,13 @@ class MainTest(CiTestCase): with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: self.assertEqual(0, main(), 'Expected 0 exit code') self.assertIn('\nNTP\n---\n', m_stdout.getvalue()) + self.assertIn('\nRuncmd\n------\n', m_stdout.getvalue()) def test_main_validates_config_file(self): """When --config-file parameter is provided, main validates schema.""" myyaml = self.tmp_path('my.yaml') myargs = ['mycmd', '--config-file', myyaml] - with open(myyaml, 'wb') as stream: - stream.write(b'#cloud-config\nntp:') # shortest ntp schema + write_file(myyaml, b'#cloud-config\nntp:') # shortest ntp schema with mock.patch('sys.argv', myargs): with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: self.assertEqual(0, main(), 'Expected 0 exit code') diff --git a/tests/unittests/test_log.py b/tests/unittests/test_log.py new file mode 100644 index 00000000..68fb4b8d --- /dev/null +++ b/tests/unittests/test_log.py @@ -0,0 +1,58 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +"""Tests for cloudinit.log """ + +from .helpers import CiTestCase +from cloudinit.analyze.dump import CLOUD_INIT_ASCTIME_FMT +from cloudinit import log as ci_logging +import datetime +import logging +import six +import time + + +class TestCloudInitLogger(CiTestCase): + + def setUp(self): + # set up a logger like cloud-init does in setupLogging, but instead + # of sys.stderr, we'll plug in a StringIO() object so we can see + # what gets logged + logging.Formatter.converter = time.gmtime + self.ci_logs = six.StringIO() + self.ci_root = logging.getLogger() + console = logging.StreamHandler(self.ci_logs) + console.setFormatter(logging.Formatter(ci_logging.DEF_CON_FORMAT)) + console.setLevel(ci_logging.DEBUG) + self.ci_root.addHandler(console) + self.ci_root.setLevel(ci_logging.DEBUG) + self.LOG = logging.getLogger('test_cloudinit_logger') + + def test_logger_uses_gmtime(self): + """Test that log message have timestamp in UTC (gmtime)""" + + # Log a message, extract the timestamp from the log entry + # convert to datetime, and compare to a utc timestamp before + # and after the logged message. + + # Due to loss of precision in the LOG timestamp, subtract and add + # time to the utc stamps for comparison + # + # utc_before: 2017-08-23 14:19:42.569299 + # parsed dt : 2017-08-23 14:19:43.069000 + # utc_after : 2017-08-23 14:19:43.570064 + + utc_before = datetime.datetime.utcnow() - datetime.timedelta(0, 0.5) + self.LOG.error('Test message') + utc_after = datetime.datetime.utcnow() + datetime.timedelta(0, 0.5) + + # extract timestamp from log: + # 2017-08-23 14:19:43,069 - test_log.py[ERROR]: Test message + logstr = self.ci_logs.getvalue().splitlines()[0] + timestampstr = logstr.split(' - ')[0] + parsed_dt = datetime.datetime.strptime(timestampstr, + CLOUD_INIT_ASCTIME_FMT) + + self.assertLess(utc_before, parsed_dt) + self.assertLess(parsed_dt, utc_after) + self.assertLess(utc_before, utc_after) + self.assertGreater(utc_after, parsed_dt) diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index f38a664c..5f11c88f 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -568,7 +568,8 @@ class TestReadSeeded(helpers.TestCase): self.assertEqual(found_ud, ud) -class TestSubp(helpers.TestCase): +class TestSubp(helpers.CiTestCase): + with_logs = True stdin2err = [BASH, '-c', 'cat >&2'] stdin2out = ['cat'] @@ -650,6 +651,16 @@ class TestSubp(helpers.TestCase): self.assertEqual( ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines()) + def test_subp_warn_missing_shebang(self): + """Warn on no #! in script""" + noshebang = self.tmp_path('noshebang') + util.write_file(noshebang, 'true\n') + + os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC) + self.assertRaisesRegexp(util.ProcessExecutionError, + 'Missing #! in script\?', + util.subp, (noshebang,)) + def test_returns_none_if_no_capture(self): (out, err) = util.subp(self.stdin2out, data=b'', capture=False) self.assertIsNone(err) |