summaryrefslogtreecommitdiff
path: root/test/units/modules/system
diff options
context:
space:
mode:
Diffstat (limited to 'test/units/modules/system')
-rw-r--r--test/units/modules/system/test_java_keystore.py264
-rw-r--r--test/units/modules/system/test_pamd.py372
-rw-r--r--test/units/modules/system/test_parted.py240
-rw-r--r--test/units/modules/system/test_ufw.py434
4 files changed, 0 insertions, 1310 deletions
diff --git a/test/units/modules/system/test_java_keystore.py b/test/units/modules/system/test_java_keystore.py
deleted file mode 100644
index 434be518e3..0000000000
--- a/test/units/modules/system/test_java_keystore.py
+++ /dev/null
@@ -1,264 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (c) 2018, Ansible Project
-# Copyright (c) 2018, Abhijeet Kasurde <akasurde@redhat.com>
-#
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-import os
-
-from units.modules.utils import ModuleTestCase, set_module_args
-from units.compat.mock import patch
-from units.compat.mock import Mock
-from ansible.module_utils.basic import AnsibleModule
-from ansible.modules.system.java_keystore import create_jks, cert_changed, ArgumentSpec
-
-
-class TestCreateJavaKeystore(ModuleTestCase):
- """Test the creation of a Java keystore."""
-
- def setUp(self):
- """Setup."""
- super(TestCreateJavaKeystore, self).setUp()
-
- orig_exists = os.path.exists
- self.spec = ArgumentSpec()
- self.mock_create_file = patch('ansible.modules.system.java_keystore.create_file',
- side_effect=lambda path, content: path)
- self.mock_run_commands = patch('ansible.modules.system.java_keystore.run_commands')
- self.mock_os_path_exists = patch('os.path.exists',
- side_effect=lambda path: True if path == '/path/to/keystore.jks' else orig_exists(path))
- self.mock_selinux_context = patch('ansible.module_utils.basic.AnsibleModule.selinux_context',
- side_effect=lambda path: ['unconfined_u', 'object_r', 'user_home_t', 's0'])
- self.mock_is_special_selinux_path = patch('ansible.module_utils.basic.AnsibleModule.is_special_selinux_path',
- side_effect=lambda path: (False, None))
- self.run_commands = self.mock_run_commands.start()
- self.create_file = self.mock_create_file.start()
- self.selinux_context = self.mock_selinux_context.start()
- self.is_special_selinux_path = self.mock_is_special_selinux_path.start()
- self.os_path_exists = self.mock_os_path_exists.start()
-
- def tearDown(self):
- """Teardown."""
- super(TestCreateJavaKeystore, self).tearDown()
- self.mock_create_file.stop()
- self.mock_run_commands.stop()
- self.mock_selinux_context.stop()
- self.mock_is_special_selinux_path.stop()
- self.mock_os_path_exists.stop()
-
- def test_create_jks_success(self):
- set_module_args(dict(
- certificate='cert-foo',
- private_key='private-foo',
- dest='/path/to/keystore.jks',
- name='foo',
- password='changeit'
- ))
-
- module = AnsibleModule(
- argument_spec=self.spec.argument_spec,
- supports_check_mode=self.spec.supports_check_mode
- )
-
- module.exit_json = Mock()
-
- with patch('os.remove', return_value=True):
- self.run_commands.side_effect = lambda args, kwargs: (0, '', '')
- create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit")
- module.exit_json.assert_called_once_with(
- changed=True,
- cmd="keytool -importkeystore "
- "-destkeystore '/path/to/keystore.jks' "
- "-srckeystore '/tmp/keystore.p12' -srcstoretype pkcs12 -alias 'test' "
- "-deststorepass 'changeit' -srcstorepass 'changeit' -noprompt",
- msg='',
- rc=0,
- stdout_lines=''
- )
-
- def test_create_jks_fail_export_pkcs12(self):
- set_module_args(dict(
- certificate='cert-foo',
- private_key='private-foo',
- dest='/path/to/keystore.jks',
- name='foo',
- password='changeit'
- ))
-
- module = AnsibleModule(
- argument_spec=self.spec.argument_spec,
- supports_check_mode=self.spec.supports_check_mode
- )
-
- module.fail_json = Mock()
-
- with patch('os.remove', return_value=True):
- self.run_commands.side_effect = [(1, '', ''), (0, '', '')]
- create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit")
- module.fail_json.assert_called_once_with(
- cmd="openssl pkcs12 -export -name 'test' "
- "-in '/tmp/foo.crt' -inkey '/tmp/foo.key' "
- "-out '/tmp/keystore.p12' "
- "-passout 'pass:changeit'",
- msg='',
- rc=1
- )
-
- def test_create_jks_fail_import_key(self):
- set_module_args(dict(
- certificate='cert-foo',
- private_key='private-foo',
- dest='/path/to/keystore.jks',
- name='foo',
- password='changeit'
- ))
-
- module = AnsibleModule(
- argument_spec=self.spec.argument_spec,
- supports_check_mode=self.spec.supports_check_mode
- )
-
- module.fail_json = Mock()
-
- with patch('os.remove', return_value=True):
- self.run_commands.side_effect = [(0, '', ''), (1, '', '')]
- create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit")
- module.fail_json.assert_called_once_with(
- cmd="keytool -importkeystore "
- "-destkeystore '/path/to/keystore.jks' "
- "-srckeystore '/tmp/keystore.p12' -srcstoretype pkcs12 -alias 'test' "
- "-deststorepass 'changeit' -srcstorepass 'changeit' -noprompt",
- msg='',
- rc=1
- )
-
-
-class TestCertChanged(ModuleTestCase):
- """Test if the cert has changed."""
-
- def setUp(self):
- """Setup."""
- super(TestCertChanged, self).setUp()
- self.spec = ArgumentSpec()
- self.mock_create_file = patch('ansible.modules.system.java_keystore.create_file',
- side_effect=lambda path, content: path)
- self.mock_run_commands = patch('ansible.modules.system.java_keystore.run_commands')
- self.run_commands = self.mock_run_commands.start()
- self.create_file = self.mock_create_file.start()
-
- def tearDown(self):
- """Teardown."""
- super(TestCertChanged, self).tearDown()
- self.mock_create_file.stop()
- self.mock_run_commands.stop()
-
- def test_cert_unchanged_same_fingerprint(self):
- set_module_args(dict(
- certificate='cert-foo',
- private_key='private-foo',
- dest='/path/to/keystore.jks',
- name='foo',
- password='changeit'
- ))
-
- module = AnsibleModule(
- argument_spec=self.spec.argument_spec,
- supports_check_mode=self.spec.supports_check_mode
- )
-
- with patch('os.remove', return_value=True):
- self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), (0, 'SHA256: abcd:1234:efgh', '')]
- result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo')
- self.assertFalse(result, 'Fingerprint is identical')
-
- def test_cert_changed_fingerprint_mismatch(self):
- set_module_args(dict(
- certificate='cert-foo',
- private_key='private-foo',
- dest='/path/to/keystore.jks',
- name='foo',
- password='changeit'
- ))
-
- module = AnsibleModule(
- argument_spec=self.spec.argument_spec,
- supports_check_mode=self.spec.supports_check_mode
- )
-
- with patch('os.remove', return_value=True):
- self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), (0, 'SHA256: wxyz:9876:stuv', '')]
- result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo')
- self.assertTrue(result, 'Fingerprint mismatch')
-
- def test_cert_changed_alias_does_not_exist(self):
- set_module_args(dict(
- certificate='cert-foo',
- private_key='private-foo',
- dest='/path/to/keystore.jks',
- name='foo',
- password='changeit'
- ))
-
- module = AnsibleModule(
- argument_spec=self.spec.argument_spec,
- supports_check_mode=self.spec.supports_check_mode
- )
-
- with patch('os.remove', return_value=True):
- self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''),
- (1, 'keytool error: java.lang.Exception: Alias <foo> does not exist', '')]
- result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo')
- self.assertTrue(result, 'Certificate does not exist')
-
- def test_cert_changed_fail_read_cert(self):
- set_module_args(dict(
- certificate='cert-foo',
- private_key='private-foo',
- dest='/path/to/keystore.jks',
- name='foo',
- password='changeit'
- ))
-
- module = AnsibleModule(
- argument_spec=self.spec.argument_spec,
- supports_check_mode=self.spec.supports_check_mode
- )
-
- module.fail_json = Mock()
-
- with patch('os.remove', return_value=True):
- self.run_commands.side_effect = [(1, '', 'Oops'), (0, 'SHA256: wxyz:9876:stuv', '')]
- cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo')
- module.fail_json.assert_called_once_with(
- cmd="openssl x509 -noout -in /tmp/foo.crt -fingerprint -sha256",
- msg='',
- err='Oops',
- rc=1
- )
-
- def test_cert_changed_fail_read_keystore(self):
- set_module_args(dict(
- certificate='cert-foo',
- private_key='private-foo',
- dest='/path/to/keystore.jks',
- name='foo',
- password='changeit'
- ))
-
- module = AnsibleModule(
- argument_spec=self.spec.argument_spec,
- supports_check_mode=self.spec.supports_check_mode
- )
-
- module.fail_json = Mock(return_value=True)
-
- with patch('os.remove', return_value=True):
- self.run_commands.side_effect = [(0, 'foo: wxyz:9876:stuv', ''), (1, '', 'Oops')]
- cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo')
- module.fail_json.assert_called_with(
- cmd="keytool -list -alias 'foo' -keystore '/path/to/keystore.jks' -storepass 'changeit' -v",
- msg='',
- err='Oops',
- rc=1
- )
diff --git a/test/units/modules/system/test_pamd.py b/test/units/modules/system/test_pamd.py
deleted file mode 100644
index 93c1d08ad4..0000000000
--- a/test/units/modules/system/test_pamd.py
+++ /dev/null
@@ -1,372 +0,0 @@
-from __future__ import (absolute_import, division, print_function)
-from units.compat import unittest
-
-from ansible.modules.system.pamd import PamdRule
-from ansible.modules.system.pamd import PamdLine
-from ansible.modules.system.pamd import PamdComment
-from ansible.modules.system.pamd import PamdInclude
-from ansible.modules.system.pamd import PamdService
-
-
-class PamdLineTestCase(unittest.TestCase):
-
- def setUp(self):
- self.pamd_line = PamdLine("This is a test")
-
- def test_line(self):
- self.assertEqual("This is a test", str(self.pamd_line))
-
- def test_matches(self):
- self.assertFalse(self.pamd_line.matches("test", "matches", "foo", "bar"))
-
-
-class PamdIncludeTestCase(unittest.TestCase):
-
- def setUp(self):
- self.good_include = PamdInclude("@include foobar")
- self.bad_include = PamdInclude("include foobar")
-
- def test_line(self):
- self.assertEqual("@include foobar", str(self.good_include))
-
- def test_matches(self):
- self.assertFalse(self.good_include.matches("something", "something", "dark", "side"))
-
- def test_valid(self):
- self.assertTrue(self.good_include.is_valid)
- self.assertFalse(self.bad_include.is_valid)
-
-
-class PamdCommentTestCase(unittest.TestCase):
-
- def setUp(self):
- self.good_comment = PamdComment("# This is a test comment")
- self.bad_comment = PamdComment("This is a bad test comment")
-
- def test_line(self):
- self.assertEqual("# This is a test comment", str(self.good_comment))
-
- def test_matches(self):
- self.assertFalse(self.good_comment.matches("test", "matches", "foo", "bar"))
-
- def test_valid(self):
- self.assertTrue(self.good_comment.is_valid)
- self.assertFalse(self.bad_comment.is_valid)
-
-
-class PamdRuleTestCase(unittest.TestCase):
- def setUp(self):
- self.rule = PamdRule('account', 'optional', 'pam_keyinit.so', 'revoke')
-
- def test_type(self):
- self.assertEqual(self.rule.rule_type, 'account')
-
- def test_control(self):
- self.assertEqual(self.rule.rule_control, 'optional')
- self.assertEqual(self.rule._control, 'optional')
-
- def test_path(self):
- self.assertEqual(self.rule.rule_path, 'pam_keyinit.so')
-
- def test_args(self):
- self.assertEqual(self.rule.rule_args, ['revoke'])
-
- def test_valid(self):
- self.assertTrue(self.rule.validate()[0])
-
-
-class PamdRuleBadValidationTestCase(unittest.TestCase):
- def setUp(self):
- self.bad_type = PamdRule('foobar', 'optional', 'pam_keyinit.so', 'revoke')
- self.bad_control_simple = PamdRule('account', 'foobar', 'pam_keyinit.so', 'revoke')
- self.bad_control_value = PamdRule('account', '[foobar=1 default=ignore]', 'pam_keyinit.so', 'revoke')
- self.bad_control_action = PamdRule('account', '[success=1 default=foobar]', 'pam_keyinit.so', 'revoke')
-
- def test_validate_bad_type(self):
- self.assertFalse(self.bad_type.validate()[0])
-
- def test_validate_bad_control_simple(self):
- self.assertFalse(self.bad_control_simple.validate()[0])
-
- def test_validate_bad_control_value(self):
- self.assertFalse(self.bad_control_value.validate()[0])
-
- def test_validate_bad_control_action(self):
- self.assertFalse(self.bad_control_action.validate()[0])
-
-
-class PamdServiceTestCase(unittest.TestCase):
- def setUp(self):
- self.system_auth_string = """#%PAM-1.0
-# This file is auto-generated.
-# User changes will be destroyed the next time authconfig is run.
-@include common-auth
-@include common-account
-@include common-session
-auth required pam_env.so
-auth sufficient pam_unix.so nullok try_first_pass
-auth requisite pam_succeed_if.so uid
-auth required pam_deny.so
-# Test comment
-auth sufficient pam_rootok.so
-
-account required pam_unix.so
-account sufficient pam_localuser.so
-account sufficient pam_succeed_if.so uid
-account [success=1 default=ignore] \
- pam_succeed_if.so user = vagrant use_uid quiet
-account required pam_permit.so
-account required pam_access.so listsep=,
-session include system-auth
-
-password requisite pam_pwquality.so try_first_pass local_users_only retry=3 authtok_type=
-password sufficient pam_unix.so sha512 shadow nullok try_first_pass use_authtok
-password required pam_deny.so
-
-session optional pam_keyinit.so revoke
-session required pam_limits.so
--session optional pam_systemd.so
-session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid
-session [success=1 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid
-session required pam_unix.so"""
-
- self.simple_system_auth_string = """#%PAM-1.0
- auth required pam_env.so
-"""
-
- self.no_header_system_auth_string = """auth required pam_env.so
-auth sufficient pam_unix.so nullok try_first_pass
-auth requisite pam_succeed_if.so uid
-auth required pam_deny.so
-"""
-
- self.pamd = PamdService(self.system_auth_string)
-
- def test_properly_parsed(self):
- num_lines = len(self.system_auth_string.splitlines()) + 1
- num_lines_processed = len(str(self.pamd).splitlines())
- self.assertEqual(num_lines, num_lines_processed)
-
- def test_has_rule(self):
- self.assertTrue(self.pamd.has_rule('account', 'required', 'pam_permit.so'))
- self.assertTrue(self.pamd.has_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so'))
-
- def test_doesnt_have_rule(self):
- self.assertFalse(self.pamd.has_rule('account', 'requisite', 'pam_permit.so'))
-
- # Test Update
- def test_update_rule_type(self):
- self.assertTrue(self.pamd.update_rule('session', 'optional', 'pam_keyinit.so', new_type='account'))
- self.assertTrue(self.pamd.has_rule('account', 'optional', 'pam_keyinit.so'))
- test_rule = PamdRule('account', 'optional', 'pam_keyinit.so', 'revoke')
- self.assertIn(str(test_rule), str(self.pamd))
-
- def test_update_rule_that_doesnt_exist(self):
- self.assertFalse(self.pamd.update_rule('blah', 'blah', 'blah', new_type='account'))
- self.assertFalse(self.pamd.has_rule('blah', 'blah', 'blah'))
- test_rule = PamdRule('blah', 'blah', 'blah', 'account')
- self.assertNotIn(str(test_rule), str(self.pamd))
-
- def test_update_rule_type_two(self):
- self.assertTrue(self.pamd.update_rule('session', '[success=1 default=ignore]', 'pam_succeed_if.so', new_type='account'))
- self.assertTrue(self.pamd.has_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so'))
- test_rule = PamdRule('account', '[success=1 default=ignore]', 'pam_succeed_if.so')
- self.assertIn(str(test_rule), str(self.pamd))
-
- def test_update_rule_control_simple(self):
- self.assertTrue(self.pamd.update_rule('session', 'optional', 'pam_keyinit.so', new_control='required'))
- self.assertTrue(self.pamd.has_rule('session', 'required', 'pam_keyinit.so'))
- test_rule = PamdRule('session', 'required', 'pam_keyinit.so')
- self.assertIn(str(test_rule), str(self.pamd))
-
- def test_update_rule_control_complex(self):
- self.assertTrue(self.pamd.update_rule('session',
- '[success=1 default=ignore]',
- 'pam_succeed_if.so',
- new_control='[success=2 test=me default=ignore]'))
- self.assertTrue(self.pamd.has_rule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so'))
- test_rule = PamdRule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so')
- self.assertIn(str(test_rule), str(self.pamd))
-
- def test_update_rule_control_more_complex(self):
-
- self.assertTrue(self.pamd.update_rule('session',
- '[success=1 test=me default=ignore]',
- 'pam_succeed_if.so',
- new_control='[success=2 test=me default=ignore]'))
- self.assertTrue(self.pamd.has_rule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so'))
- test_rule = PamdRule('session', '[success=2 test=me default=ignore]', 'pam_succeed_if.so')
- self.assertIn(str(test_rule), str(self.pamd))
-
- def test_update_rule_module_path(self):
- self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', new_path='pam_limits.so'))
- self.assertTrue(self.pamd.has_rule('auth', 'required', 'pam_limits.so'))
-
- def test_update_rule_module_path_slash(self):
- self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so', new_path='/lib64/security/pam_duo.so'))
- self.assertTrue(self.pamd.has_rule('auth', 'required', '/lib64/security/pam_duo.so'))
-
- def test_update_rule_module_args(self):
- self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so', new_args='uid uid'))
- test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'uid uid')
- self.assertIn(str(test_rule), str(self.pamd))
-
- test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'nullok try_first_pass')
- self.assertNotIn(str(test_rule), str(self.pamd))
-
- def test_update_first_three(self):
- self.assertTrue(self.pamd.update_rule('auth', 'required', 'pam_env.so',
- new_type='one', new_control='two', new_path='three'))
- self.assertTrue(self.pamd.has_rule('one', 'two', 'three'))
-
- def test_update_first_three_with_module_args(self):
- self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so',
- new_type='one', new_control='two', new_path='three'))
- self.assertTrue(self.pamd.has_rule('one', 'two', 'three'))
- test_rule = PamdRule('one', 'two', 'three')
- self.assertIn(str(test_rule), str(self.pamd))
- self.assertIn(str(test_rule), str(self.pamd))
-
- def test_update_all_four(self):
- self.assertTrue(self.pamd.update_rule('auth', 'sufficient', 'pam_unix.so',
- new_type='one', new_control='two', new_path='three',
- new_args='four five'))
- test_rule = PamdRule('one', 'two', 'three', 'four five')
- self.assertIn(str(test_rule), str(self.pamd))
-
- test_rule = PamdRule('auth', 'sufficient', 'pam_unix.so', 'nullok try_first_pass')
- self.assertNotIn(str(test_rule), str(self.pamd))
-
- def test_update_rule_with_slash(self):
- self.assertTrue(self.pamd.update_rule('account', '[success=1 default=ignore]', 'pam_succeed_if.so',
- new_type='session', new_path='pam_access.so'))
- test_rule = PamdRule('session', '[success=1 default=ignore]', 'pam_access.so')
- self.assertIn(str(test_rule), str(self.pamd))
-
- # Insert Before
- def test_insert_before_rule(self):
-
- count = self.pamd.insert_before('account', 'required', 'pam_access.so',
- new_type='account', new_control='required', new_path='pam_limits.so')
- self.assertEqual(count, 1)
-
- rules = self.pamd.get("account", "required", "pam_access.so")
- for current_rule in rules:
- self.assertTrue(current_rule.prev.matches("account", "required", "pam_limits.so"))
-
- def test_insert_before_rule_where_rule_doesnt_exist(self):
-
- count = self.pamd.insert_before('account', 'sufficient', 'pam_access.so',
- new_type='account', new_control='required', new_path='pam_limits.so')
- self.assertFalse(count)
-
- def test_insert_before_rule_with_args(self):
- self.assertTrue(self.pamd.insert_before('account', 'required', 'pam_access.so',
- new_type='account', new_control='required', new_path='pam_limits.so',
- new_args='uid'))
-
- rules = self.pamd.get("account", "required", "pam_access.so")
- for current_rule in rules:
- self.assertTrue(current_rule.prev.matches("account", "required", "pam_limits.so", 'uid'))
-
- def test_insert_before_rule_test_duplicates(self):
- self.assertTrue(self.pamd.insert_before('account', 'required', 'pam_access.so',
- new_type='account', new_control='required', new_path='pam_limits.so'))
-
- self.pamd.insert_before('account', 'required', 'pam_access.so',
- new_type='account', new_control='required', new_path='pam_limits.so')
-
- rules = self.pamd.get("account", "required", "pam_access.so")
- for current_rule in rules:
- previous_rule = current_rule.prev
- self.assertTrue(previous_rule.matches("account", "required", "pam_limits.so"))
- self.assertFalse(previous_rule.prev.matches("account", "required", "pam_limits.so"))
-
- def test_insert_before_first_rule(self):
- self.assertTrue(self.pamd.insert_before('auth', 'required', 'pam_env.so',
- new_type='account', new_control='required', new_path='pam_limits.so'))
-
- def test_insert_before_first_rule_simple(self):
- simple_service = PamdService(self.simple_system_auth_string)
- self.assertTrue(simple_service.insert_before('auth', 'required', 'pam_env.so',
- new_type='account', new_control='required', new_path='pam_limits.so'))
-
- # Insert After
- def test_insert_after_rule(self):
- self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_unix.so',
- new_type='account', new_control='required', new_path='pam_permit.so'))
- rules = self.pamd.get("account", "required", "pam_unix.so")
- for current_rule in rules:
- self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so"))
-
- def test_insert_after_rule_with_args(self):
- self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_access.so',
- new_type='account', new_control='required', new_path='pam_permit.so',
- new_args='uid'))
- rules = self.pamd.get("account", "required", "pam_access.so")
- for current_rule in rules:
- self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid"))
-
- def test_insert_after_test_duplicates(self):
- self.assertTrue(self.pamd.insert_after('account', 'required', 'pam_access.so',
- new_type='account', new_control='required', new_path='pam_permit.so',
- new_args='uid'))
- self.assertFalse(self.pamd.insert_after('account', 'required', 'pam_access.so',
- new_type='account', new_control='required', new_path='pam_permit.so',
- new_args='uid'))
-
- rules = self.pamd.get("account", "required", "pam_access.so")
- for current_rule in rules:
- self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid"))
- self.assertFalse(current_rule.next.next.matches("account", "required", "pam_permit.so", "uid"))
-
- def test_insert_after_rule_last_rule(self):
- self.assertTrue(self.pamd.insert_after('session', 'required', 'pam_unix.so',
- new_type='account', new_control='required', new_path='pam_permit.so',
- new_args='uid'))
- rules = self.pamd.get("session", "required", "pam_unix.so")
- for current_rule in rules:
- self.assertTrue(current_rule.next.matches("account", "required", "pam_permit.so", "uid"))
-
- # Remove Module Arguments
- def test_remove_module_arguments_one(self):
- self.assertTrue(self.pamd.remove_module_arguments('auth', 'sufficient', 'pam_unix.so', 'nullok'))
-
- def test_remove_module_arguments_one_list(self):
- self.assertTrue(self.pamd.remove_module_arguments('auth', 'sufficient', 'pam_unix.so', ['nullok']))
-
- def test_remove_module_arguments_two(self):
- self.assertTrue(self.pamd.remove_module_arguments('session', '[success=1 default=ignore]', 'pam_succeed_if.so', 'service crond'))
-
- def test_remove_module_arguments_two_list(self):
- self.assertTrue(self.pamd.remove_module_arguments('session', '[success=1 default=ignore]', 'pam_succeed_if.so', ['service', 'crond']))
-
- def test_remove_module_arguments_where_none_existed(self):
- self.assertTrue(self.pamd.add_module_arguments('session', 'required', 'pam_limits.so', 'arg1 arg2= arg3=arg3'))
-
- def test_add_module_arguments_where_none_existed(self):
- self.assertTrue(self.pamd.add_module_arguments('account', 'required', 'pam_unix.so', 'arg1 arg2= arg3=arg3'))
-
- def test_add_module_arguments_where_none_existed_list(self):
- self.assertTrue(self.pamd.add_module_arguments('account', 'required', 'pam_unix.so', ['arg1', 'arg2=', 'arg3=arg3']))
-
- def test_add_module_arguments_where_some_existed(self):
- self.assertTrue(self.pamd.add_module_arguments('auth', 'sufficient', 'pam_unix.so', 'arg1 arg2= arg3=arg3'))
-
- def test_remove_rule(self):
- self.assertTrue(self.pamd.remove('account', 'required', 'pam_unix.so'))
- # Second run should not change anything
- self.assertFalse(self.pamd.remove('account', 'required', 'pam_unix.so'))
- test_rule = PamdRule('account', 'required', 'pam_unix.so')
- self.assertNotIn(str(test_rule), str(self.pamd))
-
- def test_remove_first_rule(self):
- no_header_service = PamdService(self.no_header_system_auth_string)
- self.assertTrue(no_header_service.remove('auth', 'required', 'pam_env.so'))
- test_rule = PamdRule('auth', 'required', 'pam_env.so')
- self.assertNotIn(str(test_rule), str(no_header_service))
-
- def test_remove_last_rule(self):
- self.assertTrue(self.pamd.remove('session', 'required', 'pam_unix.so'))
- test_rule = PamdRule('session', 'required', 'pam_unix.so')
- self.assertNotIn(str(test_rule), str(self.pamd))
diff --git a/test/units/modules/system/test_parted.py b/test/units/modules/system/test_parted.py
deleted file mode 100644
index 91439ffea3..0000000000
--- a/test/units/modules/system/test_parted.py
+++ /dev/null
@@ -1,240 +0,0 @@
-# (c) 2017 Red Hat Inc.
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-
-__metaclass__ = type
-from units.compat.mock import patch, call
-from ansible.modules.system import parted as parted_module
-from ansible.modules.system.parted import parse_partition_info
-from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
-
-# Example of output : parted -s -m /dev/sdb -- unit 'MB' print
-parted_output1 = """
-BYT;
-/dev/sdb:286061MB:scsi:512:512:msdos:ATA TOSHIBA THNSFJ25:;
-1:1.05MB:106MB:105MB:fat32::esp;
-2:106MB:368MB:262MB:ext2::;
-3:368MB:256061MB:255692MB:::;"""
-
-# corresponding dictionary after parsing by parse_partition_info
-parted_dict1 = {
- "generic": {
- "dev": "/dev/sdb",
- "size": 286061.0,
- "unit": "mb",
- "table": "msdos",
- "model": "ATA TOSHIBA THNSFJ25",
- "logical_block": 512,
- "physical_block": 512
- },
- "partitions": [{
- "num": 1,
- "begin": 1.05,
- "end": 106.0,
- "size": 105.0,
- "fstype": "fat32",
- "name": '',
- "flags": ["esp"],
- "unit": "mb"
- }, {
- "num": 2,
- "begin": 106.0,
- "end": 368.0,
- "size": 262.0,
- "fstype": "ext2",
- "name": '',
- "flags": [],
- "unit": "mb"
- }, {
- "num": 3,
- "begin": 368.0,
- "end": 256061.0,
- "size": 255692.0,
- "fstype": "",
- "name": '',
- "flags": [],
- "unit": "mb"
- }]
-}
-
-parted_output2 = """
-BYT;
-/dev/sdb:286061MB:scsi:512:512:msdos:ATA TOSHIBA THNSFJ25:;"""
-
-# corresponding dictionary after parsing by parse_partition_info
-parted_dict2 = {
- "generic": {
- "dev": "/dev/sdb",
- "size": 286061.0,
- "unit": "mb",
- "table": "msdos",
- "model": "ATA TOSHIBA THNSFJ25",
- "logical_block": 512,
- "physical_block": 512
- },
- "partitions": []
-}
-
-
-class TestParted(ModuleTestCase):
- def setUp(self):
- super(TestParted, self).setUp()
-
- self.module = parted_module
- self.mock_check_parted_label = (patch('ansible.modules.system.parted.check_parted_label', return_value=False))
- self.check_parted_label = self.mock_check_parted_label.start()
-
- self.mock_parted = (patch('ansible.modules.system.parted.parted'))
- self.parted = self.mock_parted.start()
-
- self.mock_run_command = (patch('ansible.module_utils.basic.AnsibleModule.run_command'))
- self.run_command = self.mock_run_command.start()
-
- self.mock_get_bin_path = (patch('ansible.module_utils.basic.AnsibleModule.get_bin_path'))
- self.get_bin_path = self.mock_get_bin_path.start()
-
- def tearDown(self):
- super(TestParted, self).tearDown()
- self.mock_run_command.stop()
- self.mock_get_bin_path.stop()
- self.mock_parted.stop()
- self.mock_check_parted_label.stop()
-
- def execute_module(self, failed=False, changed=False, script=None):
- if failed:
- result = self.failed()
- self.assertTrue(result['failed'], result)
- else:
- result = self.changed(changed)
- self.assertEqual(result['changed'], changed, result)
-
- if script:
- self.assertEqual(script, result['script'], result['script'])
-
- return result
-
- def failed(self):
- with self.assertRaises(AnsibleFailJson) as exc:
- self.module.main()
-
- result = exc.exception.args[0]
- self.assertTrue(result['failed'], result)
- return result
-
- def changed(self, changed=False):
- with self.assertRaises(AnsibleExitJson) as exc:
- self.module.main()
-
- result = exc.exception.args[0]
- self.assertEqual(result['changed'], changed, result)
- return result
-
- def test_parse_partition_info(self):
- """Test that the parse_partition_info returns the expected dictionary"""
- self.assertEqual(parse_partition_info(parted_output1, 'MB'), parted_dict1)
- self.assertEqual(parse_partition_info(parted_output2, 'MB'), parted_dict2)
-
- def test_partition_already_exists(self):
- set_module_args({
- 'device': '/dev/sdb',
- 'number': 1,
- 'state': 'present',
- })
- with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1):
- self.execute_module(changed=False)
-
- def test_create_new_partition(self):
- set_module_args({
- 'device': '/dev/sdb',
- 'number': 4,
- 'state': 'present',
- })
- with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1):
- self.execute_module(changed=True, script='unit KiB mkpart primary 0% 100%')
-
- def test_create_new_partition_1G(self):
- set_module_args({
- 'device': '/dev/sdb',
- 'number': 4,
- 'state': 'present',
- 'part_end': '1GiB',
- })
- with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1):
- self.execute_module(changed=True, script='unit KiB mkpart primary 0% 1GiB')
-
- def test_remove_partition_number_1(self):
- set_module_args({
- 'device': '/dev/sdb',
- 'number': 1,
- 'state': 'absent',
- })
- with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1):
- self.execute_module(changed=True, script='rm 1')
-
- def test_change_flag(self):
- # Flags are set in a second run of parted().
- # Between the two runs, the partition dict is updated.
- # use checkmode here allow us to continue even if the dictionary is
- # not updated.
- set_module_args({
- 'device': '/dev/sdb',
- 'number': 3,
- 'state': 'present',
- 'flags': ['lvm', 'boot'],
- '_ansible_check_mode': True,
- })
-
- with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1):
- self.parted.reset_mock()
- self.execute_module(changed=True)
- # When using multiple flags:
- # order of execution is non deterministic, because set() operations are used in
- # the current implementation.
- expected_calls_order1 = [call('unit KiB set 3 lvm on set 3 boot on ',
- '/dev/sdb', 'optimal')]
- expected_calls_order2 = [call('unit KiB set 3 boot on set 3 lvm on ',
- '/dev/sdb', 'optimal')]
- self.assertTrue(self.parted.mock_calls == expected_calls_order1 or
- self.parted.mock_calls == expected_calls_order2)
-
- def test_create_new_primary_lvm_partition(self):
- # use check_mode, see previous test comment
- set_module_args({
- 'device': '/dev/sdb',
- 'number': 4,
- 'flags': ["boot"],
- 'state': 'present',
- 'part_start': '257GiB',
- '_ansible_check_mode': True,
- })
- with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict1):
- self.execute_module(changed=True, script='unit KiB mkpart primary 257GiB 100% unit KiB set 4 boot on')
-
- def test_create_label_gpt(self):
- # Like previous test, current implementation use parted to create the partition and
- # then retrieve and update the dictionary. Use check_mode to force to continue even if
- # dictionary is not updated.
- set_module_args({
- 'device': '/dev/sdb',
- 'number': 1,
- 'flags': ["lvm"],
- 'label': 'gpt',
- 'name': 'lvmpartition',
- 'state': 'present',
- '_ansible_check_mode': True,
- })
- with patch('ansible.modules.system.parted.get_device_info', return_value=parted_dict2):
- self.execute_module(changed=True, script='unit KiB mklabel gpt mkpart primary 0% 100% unit KiB name 1 \'"lvmpartition"\' set 1 lvm on')
diff --git a/test/units/modules/system/test_ufw.py b/test/units/modules/system/test_ufw.py
deleted file mode 100644
index b169e94e67..0000000000
--- a/test/units/modules/system/test_ufw.py
+++ /dev/null
@@ -1,434 +0,0 @@
-
-from units.compat import unittest
-from units.compat.mock import patch
-from ansible.module_utils import basic
-from ansible.module_utils._text import to_bytes
-import ansible.modules.system.ufw as module
-
-import json
-
-
-# mock ufw messages
-
-ufw_version_35 = """ufw 0.35\nCopyright 2008-2015 Canonical Ltd.\n"""
-
-ufw_verbose_header = """Status: active
-Logging: on (low)
-Default: deny (incoming), allow (outgoing), deny (routed)
-New profiles: skip
-
-To Action From
--- ------ ----"""
-
-
-ufw_status_verbose_with_port_7000 = ufw_verbose_header + """
-7000/tcp ALLOW IN Anywhere
-7000/tcp (v6) ALLOW IN Anywhere (v6)
-"""
-
-user_rules_with_port_7000 = """### tuple ### allow tcp 7000 0.0.0.0/0 any 0.0.0.0/0 in
-### tuple ### allow tcp 7000 ::/0 any ::/0 in
-"""
-
-user_rules_with_ipv6 = """### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.251 in
-### tuple ### allow udp 5353 ::/0 any ff02::fb in
-"""
-
-ufw_status_verbose_with_ipv6 = ufw_verbose_header + """
-5353/udp ALLOW IN 224.0.0.251
-5353/udp ALLOW IN ff02::fb
-"""
-
-ufw_status_verbose_nothing = ufw_verbose_header
-
-skippg_adding_existing_rules = "Skipping adding existing rule\nSkipping adding existing rule (v6)\n"
-
-grep_config_cli = "grep -h '^### tuple' /lib/ufw/user.rules /lib/ufw/user6.rules /etc/ufw/user.rules /etc/ufw/user6.rules "
-grep_config_cli += "/var/lib/ufw/user.rules /var/lib/ufw/user6.rules"
-
-dry_mode_cmd_with_port_700 = {
- "ufw status verbose": ufw_status_verbose_with_port_7000,
- "ufw --version": ufw_version_35,
- "ufw --dry-run allow from any to any port 7000 proto tcp": skippg_adding_existing_rules,
- "ufw --dry-run delete allow from any to any port 7000 proto tcp": "",
- "ufw --dry-run delete allow from any to any port 7001 proto tcp": user_rules_with_port_7000,
- "ufw --dry-run route allow in on foo out on bar from 1.1.1.1 port 7000 to 8.8.8.8 port 7001 proto tcp": "",
- "ufw --dry-run allow in on foo from any to any port 7003 proto tcp": "",
- "ufw --dry-run allow in on foo from 1.1.1.1 port 7002 to 8.8.8.8 port 7003 proto tcp": "",
- "ufw --dry-run allow out on foo from any to any port 7004 proto tcp": "",
- "ufw --dry-run allow out on foo from 1.1.1.1 port 7003 to 8.8.8.8 port 7004 proto tcp": "",
- grep_config_cli: user_rules_with_port_7000
-}
-
-# setup configuration :
-# ufw reset
-# ufw enable
-# ufw allow proto udp to any port 5353 from 224.0.0.251
-# ufw allow proto udp to any port 5353 from ff02::fb
-dry_mode_cmd_with_ipv6 = {
- "ufw status verbose": ufw_status_verbose_with_ipv6,
- "ufw --version": ufw_version_35,
- # CONTENT of the command sudo ufw --dry-run delete allow in from ff02::fb port 5353 proto udp | grep -E "^### tupple"
- "ufw --dry-run delete allow from ff02::fb to any port 5353 proto udp": "### tuple ### allow udp any ::/0 5353 ff02::fb in",
- grep_config_cli: user_rules_with_ipv6,
- "ufw --dry-run allow from ff02::fb to any port 5353 proto udp": skippg_adding_existing_rules,
- "ufw --dry-run allow from 224.0.0.252 to any port 5353 proto udp": """### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.251 in
-### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.252 in
-""",
- "ufw --dry-run allow from 10.0.0.0/24 to any port 1577 proto udp": "### tuple ### allow udp 1577 0.0.0.0/0 any 10.0.0.0/24 in"
-}
-
-dry_mode_cmd_nothing = {
- "ufw status verbose": ufw_status_verbose_nothing,
- "ufw --version": ufw_version_35,
- grep_config_cli: "",
- "ufw --dry-run allow from any to :: port 23": "### tuple ### allow any 23 :: any ::/0 in"
-}
-
-
-def do_nothing_func_nothing(*args, **kwarg):
- return 0, dry_mode_cmd_nothing[args[0]], ""
-
-
-def do_nothing_func_ipv6(*args, **kwarg):
- return 0, dry_mode_cmd_with_ipv6[args[0]], ""
-
-
-def do_nothing_func_port_7000(*args, **kwarg):
- return 0, dry_mode_cmd_with_port_700[args[0]], ""
-
-
-def set_module_args(args):
- args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
- """prepare arguments so that they will be picked up during module creation"""
- basic._ANSIBLE_ARGS = to_bytes(args)
-
-
-class AnsibleExitJson(Exception):
- """Exception class to be raised by module.exit_json and caught by the test case"""
- pass
-
-
-class AnsibleFailJson(Exception):
- """Exception class to be raised by module.fail_json and caught by the test case"""
- pass
-
-
-def exit_json(*args, **kwargs):
- """function to patch over exit_json; package return data into an exception"""
- if 'changed' not in kwargs:
- kwargs['changed'] = False
- raise AnsibleExitJson(kwargs)
-
-
-def fail_json(*args, **kwargs):
- """function to patch over fail_json; package return data into an exception"""
- kwargs['failed'] = True
- raise AnsibleFailJson(kwargs)
-
-
-def get_bin_path(self, arg, required=False):
- """Mock AnsibleModule.get_bin_path"""
- return arg
-
-
-class TestUFW(unittest.TestCase):
-
- def setUp(self):
- self.mock_module_helper = patch.multiple(basic.AnsibleModule,
- exit_json=exit_json,
- fail_json=fail_json,
- get_bin_path=get_bin_path)
- self.mock_module_helper.start()
- self.addCleanup(self.mock_module_helper.stop)
-
- def test_filter_line_that_contains_ipv4(self):
- reg = module.compile_ipv4_regexp()
-
- self.assertTrue(reg.search("### tuple ### allow udp 5353 ::/0 any ff02::fb in") is None)
- self.assertTrue(reg.search("### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.251 in") is not None)
-
- self.assertTrue(reg.match("ff02::fb") is None)
- self.assertTrue(reg.match("224.0.0.251") is not None)
- self.assertTrue(reg.match("10.0.0.0/8") is not None)
- self.assertTrue(reg.match("somethingElse") is None)
- self.assertTrue(reg.match("::") is None)
- self.assertTrue(reg.match("any") is None)
-
- def test_filter_line_that_contains_ipv6(self):
- reg = module.compile_ipv6_regexp()
- self.assertTrue(reg.search("### tuple ### allow udp 5353 ::/0 any ff02::fb in") is not None)
- self.assertTrue(reg.search("### tuple ### allow udp 5353 0.0.0.0/0 any 224.0.0.251 in") is None)
- self.assertTrue(reg.search("### tuple ### allow any 23 :: any ::/0 in") is not None)
- self.assertTrue(reg.match("ff02::fb") is not None)
- self.assertTrue(reg.match("224.0.0.251") is None)
- self.assertTrue(reg.match("::") is not None)
-
- def test_check_mode_add_rules(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'port': '7000',
- '_ansible_check_mode': True
- })
- result = self.__getResult(do_nothing_func_port_7000)
- self.assertFalse(result.exception.args[0]['changed'])
-
- def test_check_mode_add_detailed_route(self):
- set_module_args({
- 'rule': 'allow',
- 'route': 'yes',
- 'interface_in': 'foo',
- 'interface_out': 'bar',
- 'proto': 'tcp',
- 'from_ip': '1.1.1.1',
- 'to_ip': '8.8.8.8',
- 'from_port': '7000',
- 'to_port': '7001',
- '_ansible_check_mode': True
- })
-
- result = self.__getResult(do_nothing_func_port_7000)
- self.assertTrue(result.exception.args[0]['changed'])
-
- def test_check_mode_add_ambiguous_route(self):
- set_module_args({
- 'rule': 'allow',
- 'route': 'yes',
- 'interface_in': 'foo',
- 'interface_out': 'bar',
- 'direction': 'in',
- 'interface': 'baz',
- '_ansible_check_mode': True
- })
-
- with self.assertRaises(AnsibleFailJson) as result:
- self.__getResult(do_nothing_func_port_7000)
-
- exc = result.exception.args[0]
- self.assertTrue(exc['failed'])
- self.assertIn('mutually exclusive', exc['msg'])
-
- def test_check_mode_add_interface_in(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'port': '7003',
- 'interface_in': 'foo',
- '_ansible_check_mode': True
- })
- result = self.__getResult(do_nothing_func_port_7000)
- self.assertTrue(result.exception.args[0]['changed'])
-
- def test_check_mode_add_interface_out(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'port': '7004',
- 'interface_out': 'foo',
- '_ansible_check_mode': True
- })
- result = self.__getResult(do_nothing_func_port_7000)
- self.assertTrue(result.exception.args[0]['changed'])
-
- def test_check_mode_add_non_route_interface_both(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'port': '7004',
- 'interface_in': 'foo',
- 'interface_out': 'bar',
- '_ansible_check_mode': True
- })
-
- with self.assertRaises(AnsibleFailJson) as result:
- self.__getResult(do_nothing_func_port_7000)
-
- exc = result.exception.args[0]
- self.assertTrue(exc['failed'])
- self.assertIn('combine', exc['msg'])
-
- def test_check_mode_add_direction_in(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'port': '7003',
- 'direction': 'in',
- 'interface': 'foo',
- '_ansible_check_mode': True
- })
- result = self.__getResult(do_nothing_func_port_7000)
- self.assertTrue(result.exception.args[0]['changed'])
-
- def test_check_mode_add_direction_in_with_ip(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'from_ip': '1.1.1.1',
- 'from_port': '7002',
- 'to_ip': '8.8.8.8',
- 'to_port': '7003',
- 'direction': 'in',
- 'interface': 'foo',
- '_ansible_check_mode': True
- })
- result = self.__getResult(do_nothing_func_port_7000)
- self.assertTrue(result.exception.args[0]['changed'])
-
- def test_check_mode_add_direction_out(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'port': '7004',
- 'direction': 'out',
- 'interface': 'foo',
- '_ansible_check_mode': True
- })
- result = self.__getResult(do_nothing_func_port_7000)
- self.assertTrue(result.exception.args[0]['changed'])
-
- def test_check_mode_add_direction_out_with_ip(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'from_ip': '1.1.1.1',
- 'from_port': '7003',
- 'to_ip': '8.8.8.8',
- 'to_port': '7004',
- 'direction': 'out',
- 'interface': 'foo',
- '_ansible_check_mode': True
- })
- result = self.__getResult(do_nothing_func_port_7000)
- self.assertTrue(result.exception.args[0]['changed'])
-
- def test_check_mode_delete_existing_rules(self):
-
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'port': '7000',
- 'delete': 'yes',
- '_ansible_check_mode': True,
- })
-
- self.assertTrue(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed'])
-
- def test_check_mode_delete_not_existing_rules(self):
-
- set_module_args({
- 'rule': 'allow',
- 'proto': 'tcp',
- 'port': '7001',
- 'delete': 'yes',
- '_ansible_check_mode': True,
- })
-
- self.assertFalse(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed'])
-
- def test_enable_mode(self):
- set_module_args({
- 'state': 'enabled',
- '_ansible_check_mode': True
- })
-
- self.assertFalse(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed'])
-
- def test_disable_mode(self):
- set_module_args({
- 'state': 'disabled',
- '_ansible_check_mode': True
- })
-
- self.assertTrue(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed'])
-
- def test_logging_off(self):
- set_module_args({
- 'logging': 'off',
- '_ansible_check_mode': True
- })
-
- self.assertTrue(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed'])
-
- def test_logging_on(self):
- set_module_args({
- 'logging': 'on',
- '_ansible_check_mode': True
- })
-
- self.assertFalse(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed'])
-
- def test_default_changed(self):
- set_module_args({
- 'default': 'allow',
- "direction": "incoming",
- '_ansible_check_mode': True
- })
- self.assertTrue(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed'])
-
- def test_default_not_changed(self):
- set_module_args({
- 'default': 'deny',
- "direction": "incoming",
- '_ansible_check_mode': True
- })
- self.assertFalse(self.__getResult(do_nothing_func_port_7000).exception.args[0]['changed'])
-
- def test_ipv6_remove(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'udp',
- 'port': '5353',
- 'from': 'ff02::fb',
- 'delete': 'yes',
- '_ansible_check_mode': True,
- })
- self.assertTrue(self.__getResult(do_nothing_func_ipv6).exception.args[0]['changed'])
-
- def test_ipv6_add_existing(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'udp',
- 'port': '5353',
- 'from': 'ff02::fb',
- '_ansible_check_mode': True,
- })
- self.assertFalse(self.__getResult(do_nothing_func_ipv6).exception.args[0]['changed'])
-
- def test_add_not_existing_ipv4_submask(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'udp',
- 'port': '1577',
- 'from': '10.0.0.0/24',
- '_ansible_check_mode': True,
- })
- self.assertTrue(self.__getResult(do_nothing_func_ipv6).exception.args[0]['changed'])
-
- def test_ipv4_add_with_existing_ipv6(self):
- set_module_args({
- 'rule': 'allow',
- 'proto': 'udp',
- 'port': '5353',
- 'from': '224.0.0.252',
- '_ansible_check_mode': True,
- })
- self.assertTrue(self.__getResult(do_nothing_func_ipv6).exception.args[0]['changed'])
-
- def test_ipv6_add_from_nothing(self):
- set_module_args({
- 'rule': 'allow',
- 'port': '23',
- 'to': '::',
- '_ansible_check_mode': True,
- })
- result = self.__getResult(do_nothing_func_nothing).exception.args[0]
- print(result)
- self.assertTrue(result['changed'])
-
- def __getResult(self, cmd_fun):
- with patch.object(basic.AnsibleModule, 'run_command') as mock_run_command:
- mock_run_command.side_effect = cmd_fun
- with self.assertRaises(AnsibleExitJson) as result:
- module.main()
- return result