diff options
Diffstat (limited to 'test/units/modules/system')
-rw-r--r-- | test/units/modules/system/test_java_keystore.py | 264 | ||||
-rw-r--r-- | test/units/modules/system/test_pamd.py | 372 | ||||
-rw-r--r-- | test/units/modules/system/test_parted.py | 240 | ||||
-rw-r--r-- | test/units/modules/system/test_ufw.py | 434 |
4 files changed, 0 insertions, 1310 deletions
diff --git a/test/units/modules/system/test_java_keystore.py b/test/units/modules/system/test_java_keystore.py deleted file mode 100644 index 434be518e3..0000000000 --- a/test/units/modules/system/test_java_keystore.py +++ /dev/null @@ -1,264 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (c) 2018, Ansible Project -# Copyright (c) 2018, Abhijeet Kasurde <akasurde@redhat.com> -# -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -import os - -from units.modules.utils import ModuleTestCase, set_module_args -from units.compat.mock import patch -from units.compat.mock import Mock -from ansible.module_utils.basic import AnsibleModule -from ansible.modules.system.java_keystore import create_jks, cert_changed, ArgumentSpec - - -class TestCreateJavaKeystore(ModuleTestCase): - """Test the creation of a Java keystore.""" - - def setUp(self): - """Setup.""" - super(TestCreateJavaKeystore, self).setUp() - - orig_exists = os.path.exists - self.spec = ArgumentSpec() - self.mock_create_file = patch('ansible.modules.system.java_keystore.create_file', - side_effect=lambda path, content: path) - self.mock_run_commands = patch('ansible.modules.system.java_keystore.run_commands') - self.mock_os_path_exists = patch('os.path.exists', - side_effect=lambda path: True if path == '/path/to/keystore.jks' else orig_exists(path)) - self.mock_selinux_context = patch('ansible.module_utils.basic.AnsibleModule.selinux_context', - side_effect=lambda path: ['unconfined_u', 'object_r', 'user_home_t', 's0']) - self.mock_is_special_selinux_path = patch('ansible.module_utils.basic.AnsibleModule.is_special_selinux_path', - side_effect=lambda path: (False, None)) - self.run_commands = self.mock_run_commands.start() - self.create_file = self.mock_create_file.start() - self.selinux_context = self.mock_selinux_context.start() - self.is_special_selinux_path = self.mock_is_special_selinux_path.start() - self.os_path_exists = self.mock_os_path_exists.start() - - def tearDown(self): - """Teardown.""" - super(TestCreateJavaKeystore, self).tearDown() - self.mock_create_file.stop() - self.mock_run_commands.stop() - self.mock_selinux_context.stop() - self.mock_is_special_selinux_path.stop() - self.mock_os_path_exists.stop() - - def test_create_jks_success(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - - module.exit_json = Mock() - - with patch('os.remove', return_value=True): - self.run_commands.side_effect = lambda args, kwargs: (0, '', '') - create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit") - module.exit_json.assert_called_once_with( - changed=True, - cmd="keytool -importkeystore " - "-destkeystore '/path/to/keystore.jks' " - "-srckeystore '/tmp/keystore.p12' -srcstoretype pkcs12 -alias 'test' " - "-deststorepass 'changeit' -srcstorepass 'changeit' -noprompt", - msg='', - rc=0, - stdout_lines='' - ) - - def test_create_jks_fail_export_pkcs12(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - - module.fail_json = Mock() - - with patch('os.remove', return_value=True): - self.run_commands.side_effect = [(1, '', ''), (0, '', '')] - create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit") - module.fail_json.assert_called_once_with( - cmd="openssl pkcs12 -export -name 'test' " - "-in '/tmp/foo.crt' -inkey '/tmp/foo.key' " - "-out '/tmp/keystore.p12' " - "-passout 'pass:changeit'", - msg='', - rc=1 - ) - - def test_create_jks_fail_import_key(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - - module.fail_json = Mock() - - with patch('os.remove', return_value=True): - self.run_commands.side_effect = [(0, '', ''), (1, '', '')] - create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit") - module.fail_json.assert_called_once_with( - cmd="keytool -importkeystore " - "-destkeystore '/path/to/keystore.jks' " - "-srckeystore '/tmp/keystore.p12' -srcstoretype pkcs12 -alias 'test' " - "-deststorepass 'changeit' -srcstorepass 'changeit' -noprompt", - msg='', - rc=1 - ) - - -class TestCertChanged(ModuleTestCase): - """Test if the cert has changed.""" - - def setUp(self): - """Setup.""" - super(TestCertChanged, self).setUp() - self.spec = ArgumentSpec() - self.mock_create_file = patch('ansible.modules.system.java_keystore.create_file', - side_effect=lambda path, content: path) - self.mock_run_commands = patch('ansible.modules.system.java_keystore.run_commands') - self.run_commands = self.mock_run_commands.start() - self.create_file = self.mock_create_file.start() - - def tearDown(self): - """Teardown.""" - super(TestCertChanged, self).tearDown() - self.mock_create_file.stop() - self.mock_run_commands.stop() - - def test_cert_unchanged_same_fingerprint(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - - with patch('os.remove', return_value=True): - self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), (0, 'SHA256: abcd:1234:efgh', '')] - result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') - self.assertFalse(result, 'Fingerprint is identical') - - def test_cert_changed_fingerprint_mismatch(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - - with patch('os.remove', return_value=True): - self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), (0, 'SHA256: wxyz:9876:stuv', '')] - result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') - self.assertTrue(result, 'Fingerprint mismatch') - - def test_cert_changed_alias_does_not_exist(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - - with patch('os.remove', return_value=True): - self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), - (1, 'keytool error: java.lang.Exception: Alias <foo> does not exist', '')] - result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') - self.assertTrue(result, 'Certificate does not exist') - - def test_cert_changed_fail_read_cert(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - - module.fail_json = Mock() - - with patch('os.remove', return_value=True): - self.run_commands.side_effect = [(1, '', 'Oops'), (0, 'SHA256: wxyz:9876:stuv', '')] - cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') - module.fail_json.assert_called_once_with( - cmd="openssl x509 -noout -in /tmp/foo.crt -fingerprint -sha256", - msg='', - err='Oops', - rc=1 - ) - - def test_cert_changed_fail_read_keystore(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - - module.fail_json = Mock(return_value=True) - - with patch('os.remove', return_value=True): - self.run_commands.side_effect = [(0, 'foo: wxyz:9876:stuv', ''), (1, '', 'Oops')] - cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') - module.fail_json.assert_called_with( - cmd="keytool -list -alias 'foo' -keystore '/path/to/keystore.jks' -storepass 'changeit' -v", - msg='', - err='Oops', - rc=1 - ) diff --git a/test/units/modules/system/test_pamd.py b/test/units/modules/system/test_pamd.py deleted file mode 100644 index 93c1d08ad4..0000000000 --- a/test/units/modules/system/test_pamd.py +++ /dev/null @@ -1,372 +0,0 @@ -from __future__ import (absolute_import, division, print_function) -from units.compat import unittest - -from ansible.modules.system.pamd import PamdRule -from ansible.modules.system.pamd import PamdLine -from ansible.modules.system.pamd import PamdComment -from ansible.modules.system.pamd import PamdInclude -from ansible.modules.system.pamd import PamdService - - -class PamdLineTestCase(unittest.TestCase): - - def setUp(self): - self.pamd_line = PamdLine("This is a test") - - def test_line(self): - self.assertEqual("This is a test", str(self.pamd_line)) - - def test_matches(self): - self.assertFalse(self.pamd_line.matches("test", "matches", "foo", "bar")) - - -class PamdIncludeTestCase(unittest.TestCase): - - def setUp(self): - self.good_include = PamdInclude("@include foobar") - self.bad_include = PamdInclude("include foobar") - - def test_line(self): - self.assertEqual("@include foobar", str(self.good_include)) - - def test_matches(self): - self.assertFalse(self.good_include.matches("something", "something", "dark", "side")) - - def test_valid(self): - self.assertTrue(self.good_include.is_valid) - self.assertFalse(self.bad_include.is_valid) - - -class PamdCommentTestCase(unittest.TestCase): - - def setUp(self): - self.good_comment = PamdComment("# This is a test comment") - self.bad_comment = PamdComment("This is a bad test comment") - - def test_line(self): - self.assertEqual("# This is a test comment", str(self.good_comment)) - - def test_matches(self): - self.assertFalse(self.good_comment.matches("test", "matches", "foo", "bar")) - - def test_valid(self): - self.assertTrue(self.good_comment.is_valid) - self.assertFalse(self.bad_comment.is_valid) - - -class PamdRuleTestCase(unittest.TestCase): - def setUp(self): - self.rule = PamdRule('account', 'optional', 'pam_keyinit.so', 'revoke') - - def test_type(self): - self.assertEqual(self.rule.rule_type, 'account') - - def test_control(self): - self.assertEqual(self.rule.rule_control, 'optional') - self.assertEqual(self.rule._control, 'optional') - - def test_path(self): - self.assertEqual(self.rule.rule_path, 'pam_keyinit.so') - - def test_args(self): - self.assertEqual(self.rule.rule_args, ['revoke']) - - def test_valid(self): - self.assertTrue(self.rule.validate()[0]) - - -class PamdRuleBadValidationTestCase(unittest.TestCase): - def setUp(self): - self.bad_type = PamdRule('foobar', 'optional', 'pam_keyinit.so', 'revoke') - self.bad_control_simple = PamdRule('account', 'foobar', 'pam_keyinit.so', 'revoke') - self.bad_control_value = PamdRule('account', '[foobar=1 default=ignore]', 'pam_keyinit.so', 'revoke') - self.bad_control_action = PamdRule('account', '[success=1 default=foobar]', 'pam_keyinit.so', 'revoke') - - def test_validate_bad_type(self): - self.assertFalse(self.bad_type.validate()[0]) - - def test_validate_bad_control_simple(self): - self.assertFalse(self.bad_control_simple.validate()[0]) - - def test_validate_bad_control_value(self): - self.assertFalse(self.bad_control_value.validate()[0]) - - def test_validate_bad_control_action(self): - self.assertFalse(self.bad_control_action.validate()[0]) - - -class PamdServiceTestCase(unittest.TestCase): - def setUp(self): - self.system_auth_string = """#%PAM-1.0 -# This file is auto-generated. -# User changes will be destroyed the next time authconfig is run. -@include common-auth -@include common-account -@include common-session -auth required pam_env.so -auth sufficient pam_unix.so nullok try_first_pass -auth requisite pam_succeed_if.so uid -auth required pam_deny.so -# Test comment -auth sufficient pam_rootok.so - -account required pam_unix.so -account sufficient pam_localuser.so -account sufficient pam_succeed_if.so uid -account [success=1 default=ignore] \ - pam_succeed_if.so user = vagrant use_uid quiet -account required pam_permit.so -account required pam_access.so listsep=, -session include system-auth - -password requisite pam_pwquality.so try_first_pass local_users_only retry=3 authtok_type= -password sufficient pam_unix.so sha512 shadow nullok try_first_pass use_authtok -password required pam_deny.so - -session optional pam_keyinit.so revoke -session required pam_limits.so --session optional pam_systemd.so -session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid -session [success=1 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid -session required pam_unix.so""" - - self.simple_system_auth_string = """#%PAM-1.0 - auth required pam_env.so -""" - - self.no_header_system_auth_string = """auth required pam_env.so -auth sufficient pam_unix.so nullok try_first_pass -auth requisite pam_succeed_if.so uid -auth required pam_deny.so -""" - - self.pamd = PamdService(self.system_auth_string) - - def test_properly_parsed(self): - num_lines = len(self.system_auth_string.splitlines()) + 1 - num_lines_processed = len(str(self.pamd).splitlines()) - self.assertEqual(num_lines, num_lines_processed) - - def test_has_rule(self): - self.assertTrue(self.pamd.has_rule('account', 'required', 'pam_permit.so')) - self.assertTrue(self.pamd.has_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so')) - - def test_doesnt_have_rule(self): - self.assertFalse(self.pamd.has_rule('account', 'requisite', 'pam_permit.so')) - - # Test Update - def test_update_rule_type(self): - self.assertTrue(self.pamd.update_rule('session', 'optional', 'pam_keyinit.so', new_type='account')) - self.assertTrue(self.pamd.has_rule('account', 'optional', 'pam_keyinit.so')) - test_rule = PamdRule('account', 'optional', 'pam_keyinit.so', 'revoke') - self.assertIn(str(test_rule), str(self.pamd)) - - def test_update_rule_that_doesnt_exist(self): - self.assertFalse(self.pamd.update_rule('blah', 'blah', 'blah', new_type='account')) - self.assertFalse(self.pamd.has_rule('blah', 'blah', 'blah')) - test_rule = PamdRule('blah', 'blah', 'blah', 'account') - self.assertNotIn(str(test_rule), str(self.pamd)) - - def test_update_rule_type_two(self): - self.assertTrue(self.pamd.update_rule('session', '[success=1 default=ignore]', 'pam_succeed_if.so', new_type='account')) - self.assertTrue(self.pamd.has_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so')) - test_rule = PamdRule('account', '[success=1 default=ignore]', 'pam_succeed_if.so') - self.assertIn(str(test_rule), str(self.pamd)) - - def test_update_rule_control_simple(self): - self.assertTrue(self.pamd.update_rule('session', 'optional', 'pam_keyinit.so', new_control='required')) - self.assertTrue(self.pamd.has_rule('session', 'required', 'pam_keyinit.so')) - test_rule = PamdRule('session', 'required', 'pam_keyinit.so') - self.assertIn(str(test_rule), str(self.pamd)) - - def test_update_rule_control_complex(self): - self.assertTrue(self.pamd.update_rule('session', - '[success=1 default=ignore]', - 'pam_succeed_if.so', - new_control='[success=2 test=me default=ignore]')) - self.assertTrue(self.pamd.has_rule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so')) - test_rule = PamdRule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so') - self.assertIn(str(test_rule), str(self.pamd)) - - def test_update_rule_control_more_complex(self): - - self.assertTrue(self.pamd.update_rule('session', - '[success=1 test=me default=ignore]', - 'pam_succeed_if.so', - new_control='[success=2 test=me default=ignore]')) - self.assertTrue(self.pamd.has_rule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so')) - test_rule = PamdRule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so') - self.assertIn(str(test_rule), str(self.pamd)) - - def test_update_rule_module_path(self): - self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', new_path='pam_limits.so')) - self.assertTrue(self.pamd.has_rule('auth', 'required', 'pam_limits.so')) - - def test_update_rule_module_path_slash(self): - self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', new_path='/lib64/security/pam_duo.so')) - self.assertTrue(self.pamd.has_rule('auth', 'required', '/lib64/security/pam_duo.so')) - - def test_update_rule_module_args(self): - self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so', new_args='uid uid')) - test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'uid uid') - self.assertIn(str(test_rule), str(self.pamd)) - - test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'nullok try_first_pass') - self.assertNotIn(str(test_rule), str(self.pamd)) - - def test_update_first_three(self): - self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', - new_type='one', new_control='two', new_path='three')) - self.assertTrue(self.pamd.has_rule('one', 'two', 'three')) - - def test_update_first_three_with_module_args(self): - self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so', - new_type='one', new_control='two', new_path='three')) - self.assertTrue(self.pamd.has_rule('one', 'two', 'three')) - test_rule = PamdRule('one', 'two', 'three') - self.assertIn(str(test_rule), str(self.pamd)) - self.assertIn(str(test_rule), str(self.pamd)) - - def test_update_all_four(self): - self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so', - new_type='one', new_control='two', new_path='three', - new_args='four five')) - test_rule = PamdRule('one', 'two', 'three', 'four five') - self.assertIn(str(test_rule), str(self.pamd)) - - test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'nullok try_first_pass') - self.assertNotIn(str(test_rule), str(self.pamd)) - - def test_update_rule_with_slash(self): - self.assertTrue(self.pamd.update_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so', - new_type='session', new_path='pam_access.so')) - test_rule = PamdRule('session', '[success=1 default=ignore]', 'pam_access.so') - self.assertIn(str(test_rule), str(self.pamd)) - - # Insert Before - def test_insert_before_rule(self): - - count = self.pamd.insert_before('account', 'required', 'pam_access.so', - new_type='account', new_control='required', new_path='pam_limits.so') - self.assertEqual(count, 1) - - rules = self.pamd.get("account", "required", "pam_access.so") - for current_rule in rules: - self.assertTrue(current_rule.prev.matches("account", "required", "pam_limits.so")) - - def test_insert_before_rule_where_rule_doesnt_exist(self): - - count = self.pamd.insert_before('account', 'sufficient', 'pam_access.so', - new_type='account', new_control='required', new_path='pam_limits.so') - self.assertFalse(count) - - def test_insert_before_rule_with_args(self): - self.assertTrue(self.pamd.insert_before('account', 'required', 'pam_access.so', - new_type='account', new_control='required', new_path='pam_limits.so', - new_args='uid')) - - rules = self.pamd.get("account", "required", "pam_access.so") - for current_rule in rules: - self.assertTrue(current_rule.prev.matches("account", "required", "pam_limits.so", 'uid')) - - def test_insert_before_rule_test_duplicates(self): - self.assertTrue(self.pamd.insert_before('account', 'required', 'pam_access.so', - new_type='account', new_control='required', new_path='pam_limits.so')) - - self.pamd.insert_before('account', 'required', 'pam_access.so', - new_type='account', new_control='required', new_path='pam_limits.so') - - rules = self.pamd.get("account", "required", "pam_access.so") - for current_rule in rules: - previous_rule = current_rule.prev - self.assertTrue(previous_rule.matches("account", "required", "pam_limits.so")) - self.assertFalse(previous_rule.prev.matches("account", "required", "pam_limits.so")) - - def test_insert_before_first_rule(self): - self.assertTrue(self.pamd.insert_before('auth', 'required', 'pam_env.so', - new_type='account', new_control='required', new_path='pam_limits.so')) - - def test_insert_before_first_rule_simple(self): - simple_service = PamdService(self.simple_system_auth_string) - self.assertTrue(simple_service.insert_before('auth', 'required', 'pam_env.so', - new_type='account', new_control='required', new_path='pam_limits.so')) - - # Insert After - def test_insert_after_rule(self): - self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_unix.so', - new_type='account', new_control='required', new_path='pam_permit.so')) - rules = self.pamd.get("account", "required", "pam_unix.so") - for current_rule in rules: - self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so")) - - def test_insert_after_rule_with_args(self): - self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_access.so', - new_type='account', new_control='required', new_path='pam_permit.so', - new_args='uid')) - rules = self.pamd.get("account", "required", "pam_access.so") - for current_rule in rules: - self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid")) - - def test_insert_after_test_duplicates(self): - self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_access.so', - new_type='account', new_control='required', new_path='pam_permit.so', - new_args='uid')) - self.assertFalse(self.pamd.insert_after('account', 'required', 'pam_access.so', - new_type='account', new_control='required', new_path='pam_permit.so', - new_args='uid')) - - rules = self.pamd.get("account", "required", "pam_access.so") - for current_rule in rules: - self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid")) - self.assertFalse(current_rule.next.next.matches("account", "required", "pam_permit.so", "uid")) - - def test_insert_after_rule_last_rule(self): - self.assertTrue(self.pamd.insert_after('session', 'required', 'pam_unix.so', - new_type='account', new_control='required', new_path='pam_permit.so', - new_args='uid')) - rules = self.pamd.get("session", "required", "pam_unix.so") - for current_rule in rules: - self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid")) - - # Remove Module Arguments - def test_remove_module_arguments_one(self): - self.assertTrue(self.pamd.remove_module_arguments('auth', 'sufficient', 'pam_unix.so', 'nullok')) - - def test_remove_module_arguments_one_list(self): - self.assertTrue(self.pamd.remove_module_arguments('auth', 'sufficient', 'pam_unix.so', ['nullok'])) - - def test_remove_module_arguments_two(self): - self.assertTrue(self.pamd.remove_module_arguments('session', '[success=1 default=ignore]', 'pam_succeed_if.so', 'service crond')) - - def test_remove_module_arguments_two_list(self): - self.assertTrue(self.pamd.remove_module_arguments('session', '[success=1 default=ignore]', 'pam_succeed_if.so', ['service', 'crond'])) - - def test_remove_module_arguments_where_none_existed(self): - self.assertTrue(self.pamd.add_module_arguments('session', 'required', 'pam_limits.so', 'arg1 arg2= arg3=arg3')) - - def test_add_module_arguments_where_none_existed(self): - self.assertTrue(self.pamd.add_module_arguments('account', 'required', 'pam_unix.so', 'arg1 arg2= arg3=arg3')) - - def test_add_module_arguments_where_none_existed_list(self): - self.assertTrue(self.pamd.add_module_arguments('account', 'required', 'pam_unix.so', ['arg1', 'arg2=', 'arg3=arg3'])) - - def test_add_module_arguments_where_some_existed(self): - self.assertTrue(self.pamd.add_module_arguments('auth', 'sufficient', 'pam_unix.so', 'arg1 arg2= arg3=arg3')) - - def test_remove_rule(self): - self.assertTrue(self.pamd.remove('account', 'required', 'pam_unix.so')) - # Second run should not change anything - self.assertFalse(self.pamd.remove('account', 'required', 'pam_unix.so')) - test_rule = PamdRule('account', 'required', 'pam_unix.so') - self.assertNotIn(str(test_rule), str(self.pamd)) - - def test_remove_first_rule(self): - no_header_service = PamdService(self.no_header_system_auth_string) - self.assertTrue(no_header_service.remove('auth', 'required', 'pam_env.so')) - test_rule = PamdRule('auth', 'required', 'pam_env.so') - self.assertNotIn(str(test_rule), str(no_header_service)) - - def test_remove_last_rule(self): - self.assertTrue(self.pamd.remove('session', 'required', 'pam_unix.so')) - test_rule = PamdRule('session', 'required', 'pam_unix.so') - self.assertNotIn(str(test_rule), str(self.pamd)) diff --git a/test/units/modules/system/test_parted.py b/test/units/modules/system/test_parted.py deleted file mode 100644 index 91439ffea3..0000000000 --- a/test/units/modules/system/test_parted.py +++ /dev/null @@ -1,240 +0,0 @@ -# (c) 2017 Red Hat Inc. -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see <http://www.gnu.org/licenses/>. - -__metaclass__ = type -from units.compat.mock import patch, call -from ansible.modules.system import parted as parted_module -from ansible.modules.system.parted import parse_partition_info -from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args - -# Example of output : parted -s -m /dev/sdb -- unit 'MB' print -parted_output1 = """ -BYT; -/dev/sdb:286061MB:scsi:512:512:msdos:ATA TOSHIBA THNSFJ25:; -1:1.05MB:106MB:105MB:fat32::esp; -2:106MB:368MB:262MB:ext2::; -3:368MB:256061MB:255692MB:::;""" - -# corresponding dictionary after parsing by parse_partition_info -parted_dict1 = { - "generic": { - "dev": "/dev/sdb", - "size": 286061.0, - "unit": "mb", - "table": "msdos", - "model": "ATA TOSHIBA THNSFJ25", - "logical_block": 512, - "physical_block": 512 - }, - "partitions": [{ - "num": 1, - "begin": 1.05, - "end": 106.0, - "size": 105.0, - "fstype": "fat32", - "name": '', - "flags": ["esp"], - "unit": "mb" - }, { - "num": 2, - "begin": 106.0, - "end": 368.0, - "size": 262.0, - "fstype": "ext2", - "name": '', - "flags": [], - "unit": "mb" - }, { - "num": 3, - "begin": 368.0, - "end": 256061.0, - "size": 255692.0, - "fstype": "", - "name": '', - "flags": [], - "unit": "mb" - }] -} - -parted_output2 = """ -BYT; -/dev/sdb:286061MB:scsi:512:512:msdos:ATA TOSHIBA THNSFJ25:;""" - -# corresponding dictionary after parsing by parse_partition_info -parted_dict2 = { - "generic": { - "dev": "/dev/sdb", - "size": 286061.0, - "unit": "mb", - "table": "msdos", - "model": "ATA TOSHIBA THNSFJ25", - "logical_block": 512, - "physical_block": 512 - }, - "partitions": [] -} - - -class TestParted(ModuleTestCase): - def setUp(self): - super(TestParted, self).setUp() - - self.module = parted_module - self.mock_check_parted_label = (patch('ansible.modules.system.parted.check_parted_label', return_value=False)) - self.check_parted_label = self.mock_check_parted_label.start() - - self.mock_parted = (patch('ansible.modules.system.parted.parted')) - self.parted = self.mock_parted.start() - - self.mock_run_command = (patch('ansible.module_utils.basic.AnsibleModule.run_command')) - self.run_command = self.mock_run_command.start() - - self.mock_get_bin_path = (patch('ansible.module_utils.basic.AnsibleModule.get_bin_path')) - self.get_bin_path = self.mock_get_bin_path.start() - - def tearDown(self): - super(TestParted, self).tearDown() - self.mock_run_command.stop() - self.mock_get_bin_path.stop() - self.mock_parted.stop() - self.mock_check_parted_label.stop() - - def execute_module(self, failed=False, changed=False, script=None): - if failed: - result = self.failed() - self.assertTrue(result['failed'], result) - else: - result = self.changed(changed) - self.assertEqual(result['changed'], changed, result) - - if script: - self.assertEqual(script, result['script'], result['script']) - - return result - - def failed(self): - with self.assertRaises(AnsibleFailJson) as exc: - self.module.main() - - result = exc.exception.args[0] - self.assertTrue(result['failed'], result) - return result - - def changed(self, changed=False): - with self.assertRaises(AnsibleExitJson) as exc: - self.module.main() - - result = exc.exception.args[0] - self.assertEqual(result['changed'], changed, result) - return result - - def test_parse_partition_info(self): - """Test that the parse_partition_info returns the expected dictionary""" - self.assertEqual(parse_partition_info(parted_output1, 'MB'), parted_dict1) - self.assertEqual(parse_partition_info(parted_output2, 'MB'), parted_dict2) - - def test_partition_already_exists(self): - set_module_args({ - 'device': '/dev/sdb', - 'number': 1, - 'state': 'present', - }) - with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1): - self.execute_module(changed=False) - - def test_create_new_partition(self): - set_module_args({ - 'device': '/dev/sdb', - 'number': 4, - 'state': 'present', - }) - with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1): - self.execute_module(changed=True, script='unit KiB mkpart primary 0% 100%') - - def test_create_new_partition_1G(self): - set_module_args({ - 'device': '/dev/sdb', - 'number': 4, - 'state': 'present', - 'part_end': '1GiB', - }) - with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1): - self.execute_module(changed=True, script='unit KiB mkpart primary 0% 1GiB') - - def test_remove_partition_number_1(self): - set_module_args({ - 'device': '/dev/sdb', - 'number': 1, - 'state': 'absent', - }) - with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1): - self.execute_module(changed=True, script='rm 1') - - def test_change_flag(self): - # Flags are set in a second run of parted(). - # Between the two runs, the partition dict is updated. - # use checkmode here allow us to continue even if the dictionary is - # not updated. - set_module_args({ - 'device': '/dev/sdb', - 'number': 3, - 'state': 'present', - 'flags': ['lvm', 'boot'], - '_ansible_check_mode': True, - }) - - with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1): - self.parted.reset_mock() - self.execute_module(changed=True) - # When using multiple flags: - # order of execution is non deterministic, because set() operations are used in - # the current implementation. - expected_calls_order1 = [call('unit KiB set 3 lvm on set 3 boot on ', - '/dev/sdb', 'optimal')] - expected_calls_order2 = [call('unit KiB set 3 boot on set 3 lvm on ', - '/dev/sdb', 'optimal')] - self.assertTrue(self.parted.mock_calls == expected_calls_order1 or - self.parted.mock_calls == expected_calls_order2) - - def test_create_new_primary_lvm_partition(self): - # use check_mode, see previous test comment - set_module_args({ - 'device': '/dev/sdb', - 'number': 4, - 'flags': ["boot"], - 'state': 'present', - 'part_start': '257GiB', - '_ansible_check_mode': True, - }) - with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1): - self.execute_module(changed=True, script='unit KiB mkpart primary 257GiB 100% unit KiB set 4 boot on') - - def test_create_label_gpt(self): - # Like previous test, current implementation use parted to create the partition and - # then retrieve and update the dictionary. Use check_mode to force to continue even if - # dictionary is not updated. - set_module_args({ - 'device': '/dev/sdb', - 'number': 1, - 'flags': ["lvm"], - 'label': 'gpt', - 'name': 'lvmpartition', - 'state': 'present', - '_ansible_check_mode': True, - }) - with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict2): - self.execute_module(changed=True, script='unit KiB mklabel gpt mkpart primary 0% 100% unit KiB name 1 \'"lvmpartition"\' set 1 lvm on') diff --git a/test/units/modules/system/test_ufw.py b/test/units/modules/system/test_ufw.py deleted file mode 100644 index b169e94e67..0000000000 --- a/test/units/modules/system/test_ufw.py +++ /dev/null @@ -1,434 +0,0 @@ - -from units.compat import unittest -from units.compat.mock import patch -from ansible.module_utils import basic -from ansible.module_utils._text import to_bytes -import ansible.modules.system.ufw as module - -import json - - -# mock ufw messages - -ufw_version_35 = """ufw 0.35\nCopyright 2008-2015 Canonical Ltd.\n""" - -ufw_verbose_header = """Status: active -Logging: on (low) -Default: deny (incoming), allow (outgoing), deny (routed) -New profiles: skip - -To Action From --- ------ ----""" - - -ufw_status_verbose_with_port_7000 = ufw_verbose_header + """ -7000/tcp ALLOW IN Anywhere -7000/tcp (v6) ALLOW IN Anywhere (v6) -""" - -user_rules_with_port_7000 = """### tuple ### allow tcp 7000 0.0.0.0/0 any 0.0.0.0/0 in -### tuple ### allow tcp 7000 ::/0 any ::/0 in -""" - -user_rules_with_ipv6 = """### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.251 in -### tuple ### allow udp 5353 ::/0 any ff02::fb in -""" - -ufw_status_verbose_with_ipv6 = ufw_verbose_header + """ -5353/udp ALLOW IN 224.0.0.251 -5353/udp ALLOW IN ff02::fb -""" - -ufw_status_verbose_nothing = ufw_verbose_header - -skippg_adding_existing_rules = "Skipping adding existing rule\nSkipping adding existing rule (v6)\n" - -grep_config_cli = "grep -h '^### tuple' /lib/ufw/user.rules /lib/ufw/user6.rules /etc/ufw/user.rules /etc/ufw/user6.rules " -grep_config_cli += "/var/lib/ufw/user.rules /var/lib/ufw/user6.rules" - -dry_mode_cmd_with_port_700 = { - "ufw status verbose": ufw_status_verbose_with_port_7000, - "ufw --version": ufw_version_35, - "ufw --dry-run allow from any to any port 7000 proto tcp": skippg_adding_existing_rules, - "ufw --dry-run delete allow from any to any port 7000 proto tcp": "", - "ufw --dry-run delete allow from any to any port 7001 proto tcp": user_rules_with_port_7000, - "ufw --dry-run route allow in on foo out on bar from 1.1.1.1 port 7000 to 8.8.8.8 port 7001 proto tcp": "", - "ufw --dry-run allow in on foo from any to any port 7003 proto tcp": "", - "ufw --dry-run allow in on foo from 1.1.1.1 port 7002 to 8.8.8.8 port 7003 proto tcp": "", - "ufw --dry-run allow out on foo from any to any port 7004 proto tcp": "", - "ufw --dry-run allow out on foo from 1.1.1.1 port 7003 to 8.8.8.8 port 7004 proto tcp": "", - grep_config_cli: user_rules_with_port_7000 -} - -# setup configuration : -# ufw reset -# ufw enable -# ufw allow proto udp to any port 5353 from 224.0.0.251 -# ufw allow proto udp to any port 5353 from ff02::fb -dry_mode_cmd_with_ipv6 = { - "ufw status verbose": ufw_status_verbose_with_ipv6, - "ufw --version": ufw_version_35, - # CONTENT of the command sudo ufw --dry-run delete allow in from ff02::fb port 5353 proto udp | grep -E "^### tupple" - "ufw --dry-run delete allow from ff02::fb to any port 5353 proto udp": "### tuple ### allow udp any ::/0 5353 ff02::fb in", - grep_config_cli: user_rules_with_ipv6, - "ufw --dry-run allow from ff02::fb to any port 5353 proto udp": skippg_adding_existing_rules, - "ufw --dry-run allow from 224.0.0.252 to any port 5353 proto udp": """### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.251 in -### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.252 in -""", - "ufw --dry-run allow from 10.0.0.0/24 to any port 1577 proto udp": "### tuple ### allow udp 1577 0.0.0.0/0 any 10.0.0.0/24 in" -} - -dry_mode_cmd_nothing = { - "ufw status verbose": ufw_status_verbose_nothing, - "ufw --version": ufw_version_35, - grep_config_cli: "", - "ufw --dry-run allow from any to :: port 23": "### tuple ### allow any 23 :: any ::/0 in" -} - - -def do_nothing_func_nothing(*args, **kwarg): - return 0, dry_mode_cmd_nothing[args[0]], "" - - -def do_nothing_func_ipv6(*args, **kwarg): - return 0, dry_mode_cmd_with_ipv6[args[0]], "" - - -def do_nothing_func_port_7000(*args, **kwarg): - return 0, dry_mode_cmd_with_port_700[args[0]], "" - - -def set_module_args(args): - args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) - """prepare arguments so that they will be picked up during module creation""" - basic._ANSIBLE_ARGS = to_bytes(args) - - -class AnsibleExitJson(Exception): - """Exception class to be raised by module.exit_json and caught by the test case""" - pass - - -class AnsibleFailJson(Exception): - """Exception class to be raised by module.fail_json and caught by the test case""" - pass - - -def exit_json(*args, **kwargs): - """function to patch over exit_json; package return data into an exception""" - if 'changed' not in kwargs: - kwargs['changed'] = False - raise AnsibleExitJson(kwargs) - - -def fail_json(*args, **kwargs): - """function to patch over fail_json; package return data into an exception""" - kwargs['failed'] = True - raise AnsibleFailJson(kwargs) - - -def get_bin_path(self, arg, required=False): - """Mock AnsibleModule.get_bin_path""" - return arg - - -class TestUFW(unittest.TestCase): - - def setUp(self): - self.mock_module_helper = patch.multiple(basic.AnsibleModule, - exit_json=exit_json, - fail_json=fail_json, - get_bin_path=get_bin_path) - self.mock_module_helper.start() - self.addCleanup(self.mock_module_helper.stop) - - def test_filter_line_that_contains_ipv4(self): - reg = module.compile_ipv4_regexp() - - self.assertTrue(reg.search("### tuple ### allow udp 5353 ::/0 any ff02::fb in") is None) - self.assertTrue(reg.search("### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.251 in") is not None) - - self.assertTrue(reg.match("ff02::fb") is None) - self.assertTrue(reg.match("224.0.0.251") is not None) - self.assertTrue(reg.match("10.0.0.0/8") is not None) - self.assertTrue(reg.match("somethingElse") is None) - self.assertTrue(reg.match("::") is None) - self.assertTrue(reg.match("any") is None) - - def test_filter_line_that_contains_ipv6(self): - reg = module.compile_ipv6_regexp() - self.assertTrue(reg.search("### tuple ### allow udp 5353 ::/0 any ff02::fb in") is not None) - self.assertTrue(reg.search("### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.251 in") is None) - self.assertTrue(reg.search("### tuple ### allow any 23 :: any ::/0 in") is not None) - self.assertTrue(reg.match("ff02::fb") is not None) - self.assertTrue(reg.match("224.0.0.251") is None) - self.assertTrue(reg.match("::") is not None) - - def test_check_mode_add_rules(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'port': '7000', - '_ansible_check_mode': True - }) - result = self.__getResult(do_nothing_func_port_7000) - self.assertFalse(result.exception.args[0]['changed']) - - def test_check_mode_add_detailed_route(self): - set_module_args({ - 'rule': 'allow', - 'route': 'yes', - 'interface_in': 'foo', - 'interface_out': 'bar', - 'proto': 'tcp', - 'from_ip': '1.1.1.1', - 'to_ip': '8.8.8.8', - 'from_port': '7000', - 'to_port': '7001', - '_ansible_check_mode': True - }) - - result = self.__getResult(do_nothing_func_port_7000) - self.assertTrue(result.exception.args[0]['changed']) - - def test_check_mode_add_ambiguous_route(self): - set_module_args({ - 'rule': 'allow', - 'route': 'yes', - 'interface_in': 'foo', - 'interface_out': 'bar', - 'direction': 'in', - 'interface': 'baz', - '_ansible_check_mode': True - }) - - with self.assertRaises(AnsibleFailJson) as result: - self.__getResult(do_nothing_func_port_7000) - - exc = result.exception.args[0] - self.assertTrue(exc['failed']) - self.assertIn('mutually exclusive', exc['msg']) - - def test_check_mode_add_interface_in(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'port': '7003', - 'interface_in': 'foo', - '_ansible_check_mode': True - }) - result = self.__getResult(do_nothing_func_port_7000) - self.assertTrue(result.exception.args[0]['changed']) - - def test_check_mode_add_interface_out(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'port': '7004', - 'interface_out': 'foo', - '_ansible_check_mode': True - }) - result = self.__getResult(do_nothing_func_port_7000) - self.assertTrue(result.exception.args[0]['changed']) - - def test_check_mode_add_non_route_interface_both(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'port': '7004', - 'interface_in': 'foo', - 'interface_out': 'bar', - '_ansible_check_mode': True - }) - - with self.assertRaises(AnsibleFailJson) as result: - self.__getResult(do_nothing_func_port_7000) - - exc = result.exception.args[0] - self.assertTrue(exc['failed']) - self.assertIn('combine', exc['msg']) - - def test_check_mode_add_direction_in(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'port': '7003', - 'direction': 'in', - 'interface': 'foo', - '_ansible_check_mode': True - }) - result = self.__getResult(do_nothing_func_port_7000) - self.assertTrue(result.exception.args[0]['changed']) - - def test_check_mode_add_direction_in_with_ip(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'from_ip': '1.1.1.1', - 'from_port': '7002', - 'to_ip': '8.8.8.8', - 'to_port': '7003', - 'direction': 'in', - 'interface': 'foo', - '_ansible_check_mode': True - }) - result = self.__getResult(do_nothing_func_port_7000) - self.assertTrue(result.exception.args[0]['changed']) - - def test_check_mode_add_direction_out(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'port': '7004', - 'direction': 'out', - 'interface': 'foo', - '_ansible_check_mode': True - }) - result = self.__getResult(do_nothing_func_port_7000) - self.assertTrue(result.exception.args[0]['changed']) - - def test_check_mode_add_direction_out_with_ip(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'from_ip': '1.1.1.1', - 'from_port': '7003', - 'to_ip': '8.8.8.8', - 'to_port': '7004', - 'direction': 'out', - 'interface': 'foo', - '_ansible_check_mode': True - }) - result = self.__getResult(do_nothing_func_port_7000) - self.assertTrue(result.exception.args[0]['changed']) - - def test_check_mode_delete_existing_rules(self): - - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'port': '7000', - 'delete': 'yes', - '_ansible_check_mode': True, - }) - - self.assertTrue(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed']) - - def test_check_mode_delete_not_existing_rules(self): - - set_module_args({ - 'rule': 'allow', - 'proto': 'tcp', - 'port': '7001', - 'delete': 'yes', - '_ansible_check_mode': True, - }) - - self.assertFalse(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed']) - - def test_enable_mode(self): - set_module_args({ - 'state': 'enabled', - '_ansible_check_mode': True - }) - - self.assertFalse(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed']) - - def test_disable_mode(self): - set_module_args({ - 'state': 'disabled', - '_ansible_check_mode': True - }) - - self.assertTrue(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed']) - - def test_logging_off(self): - set_module_args({ - 'logging': 'off', - '_ansible_check_mode': True - }) - - self.assertTrue(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed']) - - def test_logging_on(self): - set_module_args({ - 'logging': 'on', - '_ansible_check_mode': True - }) - - self.assertFalse(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed']) - - def test_default_changed(self): - set_module_args({ - 'default': 'allow', - "direction": "incoming", - '_ansible_check_mode': True - }) - self.assertTrue(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed']) - - def test_default_not_changed(self): - set_module_args({ - 'default': 'deny', - "direction": "incoming", - '_ansible_check_mode': True - }) - self.assertFalse(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed']) - - def test_ipv6_remove(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'udp', - 'port': '5353', - 'from': 'ff02::fb', - 'delete': 'yes', - '_ansible_check_mode': True, - }) - self.assertTrue(self.__getResult(do_nothing_func_ipv6).exception.args[0]['changed']) - - def test_ipv6_add_existing(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'udp', - 'port': '5353', - 'from': 'ff02::fb', - '_ansible_check_mode': True, - }) - self.assertFalse(self.__getResult(do_nothing_func_ipv6).exception.args[0]['changed']) - - def test_add_not_existing_ipv4_submask(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'udp', - 'port': '1577', - 'from': '10.0.0.0/24', - '_ansible_check_mode': True, - }) - self.assertTrue(self.__getResult(do_nothing_func_ipv6).exception.args[0]['changed']) - - def test_ipv4_add_with_existing_ipv6(self): - set_module_args({ - 'rule': 'allow', - 'proto': 'udp', - 'port': '5353', - 'from': '224.0.0.252', - '_ansible_check_mode': True, - }) - self.assertTrue(self.__getResult(do_nothing_func_ipv6).exception.args[0]['changed']) - - def test_ipv6_add_from_nothing(self): - set_module_args({ - 'rule': 'allow', - 'port': '23', - 'to': '::', - '_ansible_check_mode': True, - }) - result = self.__getResult(do_nothing_func_nothing).exception.args[0] - print(result) - self.assertTrue(result['changed']) - - def __getResult(self, cmd_fun): - with patch.object(basic.AnsibleModule, 'run_command') as mock_run_command: - mock_run_command.side_effect = cmd_fun - with self.assertRaises(AnsibleExitJson) as result: - module.main() - return result |