summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToshio Kuratomi <toshio@fedoraproject.org>2015-10-19 22:32:21 -0700
committerToshio Kuratomi <toshio@fedoraproject.org>2015-10-20 12:42:36 -0700
commitc2d188baff9b0d6076187f36bf63543967c705a1 (patch)
treedd88b0568a6a50ead8b47008b4892fa4a4669715
parentb46ce47a84085da66272c6194d00f9e527a7d2a9 (diff)
downloadansible-nolog-for-return-value.tar.gz
Hide values in json returns which were given in parameters marked no_log.nolog-for-return-value
-rw-r--r--lib/ansible/module_utils/basic.py86
-rw-r--r--test/units/module_utils/basic/test_heuristic_log_sanitize.py102
-rw-r--r--test/units/module_utils/basic/test_no_log.py151
-rw-r--r--test/units/module_utils/test_basic.py63
4 files changed, 336 insertions, 66 deletions
diff --git a/lib/ansible/module_utils/basic.py b/lib/ansible/module_utils/basic.py
index 60fe3233b4..82de3d5317 100644
--- a/lib/ansible/module_utils/basic.py
+++ b/lib/ansible/module_utils/basic.py
@@ -65,7 +65,7 @@ import grp
import pwd
import platform
import errno
-from itertools import repeat
+from itertools import repeat, chain
try:
import syslog
@@ -109,9 +109,24 @@ except AttributeError:
return d.items()
else:
# Python 2
- def iteritems(d): # Python 2
+ def iteritems(d):
return d.iteritems()
+try:
+ NUMBERTYPES = (int, long, float)
+except NameError:
+ # Python 3
+ NUMBERTYPES = (int, float)
+
+# Python2 & 3 way to get NoneType
+NoneType = type(None)
+
+try:
+ from collections import Sequence, Mapping
+except ImportError:
+ # python2.5
+ Sequence = (list, tuple)
+ Mapping = (dict,)
try:
import json
@@ -408,6 +423,51 @@ def heuristic_log_sanitize(data):
return ''.join(output)
+def _return_values(obj):
+ """ Return stringified values from datastructures. For use with removing
+ sensitive values pre-jsonification."""
+ if isinstance(obj, basestring):
+ if obj:
+ yield obj
+ return
+ elif isinstance(obj, Sequence):
+ for element in obj:
+ for subelement in _return_values(element):
+ yield subelement
+ elif isinstance(obj, Mapping):
+ for element in obj.items():
+ for subelement in _return_values(element[1]):
+ yield subelement
+ elif isinstance(obj, (bool, NoneType)):
+ # This must come before int because bools are also ints
+ return
+ elif isinstance(obj, NUMBERTYPES):
+ yield str(obj)
+ else:
+ raise TypeError('Unknown parameter type: %s, %s' % (type(obj), obj))
+
+def _remove_values(value, no_log_strings):
+ """ Remove strings in no_log_strings from value. If value is a container
+ type, then remove a lot more"""
+ if isinstance(value, basestring):
+ if value in no_log_strings:
+ return 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
+ for omit_me in no_log_strings:
+ value = value.replace(omit_me, '*' * 8)
+ elif isinstance(value, Sequence):
+ return [_remove_values(elem, no_log_strings) for elem in value]
+ elif isinstance(value, Mapping):
+ return dict((k, _remove_values(v, no_log_strings)) for k, v in value.items())
+ elif isinstance(value, tuple(chain(NUMBERTYPES, (bool, NoneType)))):
+ stringy_value = str(value)
+ if stringy_value in no_log_strings:
+ return 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
+ for omit_me in no_log_strings:
+ if omit_me in stringy_value:
+ return 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
+ else:
+ raise TypeError('Value of unknown type: %s, %s' % (type(value), value))
+ return value
def is_executable(path):
'''is the given path executable?'''
@@ -1397,7 +1457,7 @@ class AnsibleModule(object):
''' return a bool for the arg '''
if arg is None or type(arg) == bool:
return arg
- if type(arg) in types.StringTypes:
+ if isinstance(arg, basestring):
arg = arg.lower()
if arg in BOOLEANS_TRUE:
return True
@@ -1432,11 +1492,30 @@ class AnsibleModule(object):
for path in self.cleanup_files:
self.cleanup(path)
+ def remove_no_log_values(self, to_jsonify):
+ """ Strip values associated with no_log parameters from output.
+ Note: does not strip dict keys, only dict values.
+ """
+ no_log_strings = set()
+ # Use the argspec to determine which args are no_log
+ for arg_name, arg_opts in self.argument_spec.items():
+ if arg_opts.get('no_log', False):
+ # Find the value for the no_log'd param
+ no_log_object = self.params.get(arg_name, None)
+ if no_log_object:
+ no_log_strings.update(_return_values(no_log_object))
+
+ for field, value in to_jsonify.items():
+ to_jsonify[field] = _remove_values(value, no_log_strings)
+
+ return to_jsonify
+
def exit_json(self, **kwargs):
''' return from the module, without error '''
self.add_path_info(kwargs)
if not 'changed' in kwargs:
kwargs['changed'] = False
+ self.remove_no_log_values(kwargs)
self.do_cleanup_files()
print(self.jsonify(kwargs))
sys.exit(0)
@@ -1446,6 +1525,7 @@ class AnsibleModule(object):
self.add_path_info(kwargs)
assert 'msg' in kwargs, "implementation error -- msg to explain the error is required"
kwargs['failed'] = True
+ self.remove_no_log_values(kwargs)
self.do_cleanup_files()
print(self.jsonify(kwargs))
sys.exit(1)
diff --git a/test/units/module_utils/basic/test_heuristic_log_sanitize.py b/test/units/module_utils/basic/test_heuristic_log_sanitize.py
new file mode 100644
index 0000000000..2540e34ef6
--- /dev/null
+++ b/test/units/module_utils/basic/test_heuristic_log_sanitize.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division)
+__metaclass__ = type
+
+import sys
+import syslog
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch, MagicMock
+
+from ansible.module_utils.basic import heuristic_log_sanitize
+
+class TestHeuristicLogSanitize(unittest.TestCase):
+ def setUp(self):
+ self.URL_SECRET = 'http://username:pas:word@foo.com/data'
+ self.SSH_SECRET = 'username:pas:word@foo.com/data'
+ self.clean_data = repr(self._gen_data(3, True, True, 'no_secret_here'))
+ self.url_data = repr(self._gen_data(3, True, True, self.URL_SECRET))
+ self.ssh_data = repr(self._gen_data(3, True, True, self.SSH_SECRET))
+
+ def _gen_data(self, records, per_rec, top_level, secret_text):
+ hostvars = {'hostvars': {}}
+ for i in range(1, records, 1):
+ host_facts = {'host%s' % i:
+ {'pstack':
+ {'running': '875.1',
+ 'symlinked': '880.0',
+ 'tars': [],
+ 'versions': ['885.0']},
+ }}
+ if per_rec:
+ host_facts['host%s' % i]['secret'] = secret_text
+ hostvars['hostvars'].update(host_facts)
+ if top_level:
+ hostvars['secret'] = secret_text
+ return hostvars
+
+ def test_did_not_hide_too_much(self):
+ self.assertEquals(heuristic_log_sanitize(self.clean_data), self.clean_data)
+
+ def test_hides_url_secrets(self):
+ url_output = heuristic_log_sanitize(self.url_data)
+ # Basic functionality: Successfully hid the password
+ self.assertNotIn('pas:word', url_output)
+
+ # Slightly more advanced, we hid all of the password despite the ":"
+ self.assertNotIn('pas', url_output)
+
+ # In this implementation we replace the password with 8 "*" which is
+ # also the length of our password. The url fields should be able to
+ # accurately detect where the password ends so the length should be
+ # the same:
+ self.assertEqual(len(url_output), len(self.url_data))
+
+ def test_hides_ssh_secrets(self):
+ ssh_output = heuristic_log_sanitize(self.ssh_data)
+ self.assertNotIn('pas:word', ssh_output)
+
+ # Slightly more advanced, we hid all of the password despite the ":"
+ self.assertNotIn('pas', ssh_output)
+
+ # ssh checking is harder as the heuristic is overzealous in many
+ # cases. Since the input will have at least one ":" present before
+ # the password we can tell some things about the beginning and end of
+ # the data, though:
+ self.assertTrue(ssh_output.startswith("{'"))
+ self.assertTrue(ssh_output.endswith("}"))
+ self.assertIn(":********@foo.com/data'", ssh_output)
+
+
+class TestStripNoLog(unittest.TestCase):
+ def setUp(self):
+ data = ''
+
+ def test_return_strings(self):
+ pass
+
+ def test_strip_no_log(self):
+ pass
+
+class TestAnsibleModuleStripNoLogValues(unittest.TestCase):
+ pass
+
+
diff --git a/test/units/module_utils/basic/test_no_log.py b/test/units/module_utils/basic/test_no_log.py
new file mode 100644
index 0000000000..b4847fcde7
--- /dev/null
+++ b/test/units/module_utils/basic/test_no_log.py
@@ -0,0 +1,151 @@
+# -*- coding: utf-8 -*-
+# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division)
+__metaclass__ = type
+
+import json
+import sys
+import syslog
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch, MagicMock
+
+from ansible.module_utils import basic
+from ansible.module_utils.basic import heuristic_log_sanitize
+from ansible.module_utils.basic import _return_values, _remove_values
+
+
+class TestReturnValues(unittest.TestCase):
+ dataset = (
+ ('string', frozenset(['string'])),
+ ('', frozenset()),
+ (1, frozenset(['1'])),
+ (1.0, frozenset(['1.0'])),
+ (False, frozenset()),
+ (['1', '2', '3'], frozenset(['1', '2', '3'])),
+ (('1', '2', '3'), frozenset(['1', '2', '3'])),
+ ({'one': 1, 'two': 'dos'}, frozenset(['1', 'dos'])),
+ ({'one': 1, 'two': 'dos',
+ 'three': ['amigos', 'musketeers', None,
+ {'ping': 'pong', 'base': ('balls', 'raquets')}]},
+ frozenset(['1', 'dos', 'amigos', 'musketeers', 'pong', 'balls', 'raquets'])),
+ )
+
+ def test_return_values(self):
+ for data, expected in self.dataset:
+ self.assertEquals(frozenset(_return_values(data)), expected)
+
+ def test_unknown_type(self):
+ self.assertRaises(TypeError, frozenset, _return_values(object()))
+
+
+class TestRemoveValues(unittest.TestCase):
+ OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
+ dataset_no_remove = (
+ ('string', frozenset(['nope'])),
+ (1234, frozenset(['4321'])),
+ (False, frozenset(['4321'])),
+ (1.0, frozenset(['4321'])),
+ (['string', 'strang', 'strung'], frozenset(['nope'])),
+ ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['nope'])),
+ ({'one': 1, 'two': 'dos',
+ 'three': ['amigos', 'musketeers', None,
+ {'ping': 'pong', 'base': ['balls', 'raquets']}]},
+ frozenset(['nope'])),
+ )
+ dataset_remove = (
+ ('string', frozenset(['string']), OMIT),
+ (1234, frozenset(['1234']), OMIT),
+ (1234, frozenset(['23']), OMIT),
+ (1.0, frozenset(['1.0']), OMIT),
+ (['string', 'strang', 'strung'], frozenset(['strang']), ['string', OMIT, 'strung']),
+ (['string', 'strang', 'strung'], frozenset(['strang', 'string', 'strung']), [OMIT, OMIT, OMIT]),
+ (('string', 'strang', 'strung'), frozenset(['string', 'strung']), [OMIT, 'strang', OMIT]),
+ ((1234567890, 345678, 987654321), frozenset(['1234567890']), [OMIT, 345678, 987654321]),
+ ((1234567890, 345678, 987654321), frozenset(['345678']), [OMIT, OMIT, 987654321]),
+ ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key']),
+ {'one': 1, 'two': 'dos', 'secret': OMIT}),
+ ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key', 'dos', '1']),
+ {'one': OMIT, 'two': OMIT, 'secret': OMIT}),
+ ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key', 'dos', '1']),
+ {'one': OMIT, 'two': OMIT, 'secret': OMIT}),
+ ({'one': 1, 'two': 'dos', 'three': ['amigos', 'musketeers', None,
+ {'ping': 'pong', 'base': ['balls', 'raquets']}]},
+ frozenset(['balls', 'base', 'pong', 'amigos']),
+ {'one': 1, 'two': 'dos', 'three': [OMIT, 'musketeers',
+ None, {'ping': OMIT, 'base': [OMIT, 'raquets']}]}),
+ ('This sentence has an enigma wrapped in a mystery inside of a secret. - mr mystery',
+ frozenset(['enigma', 'mystery', 'secret']),
+ 'This sentence has an ******** wrapped in a ******** inside of a ********. - mr ********'),
+ )
+
+ def test_no_removal(self):
+ for value, no_log_strings in self.dataset_no_remove:
+ self.assertEquals(_remove_values(value, no_log_strings), value)
+
+ def test_strings_to_remove(self):
+ for value, no_log_strings, expected in self.dataset_remove:
+ self.assertEquals(_remove_values(value, no_log_strings), expected)
+
+ def test_unknown_type(self):
+ self.assertRaises(TypeError, _remove_values, object(), frozenset())
+
+
+@unittest.skipIf(sys.version_info[0] >= 3, "Python 3 is not supported on targets (yet)")
+class TestAnsibleModuleRemoveNoLogValues(unittest.TestCase):
+ OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
+ dataset = (
+ (dict(username='person', password='$ecret k3y'),
+ dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
+ not_secret='following the leader'),
+ dict(one=1, pwd=OMIT, url='https://username:password12345@foo.com/login/',
+ not_secret='following the leader')
+ ),
+ (dict(username='person', password='password12345'),
+ dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
+ not_secret='following the leader'),
+ dict(one=1, pwd='$ecret k3y', url='https://username:********@foo.com/login/',
+ not_secret='following the leader')
+ ),
+ (dict(username='person', password='$ecret k3y'),
+ dict(one=1, pwd='$ecret k3y', url='https://username:$ecret k3y@foo.com/login/',
+ not_secret='following the leader'),
+ dict(one=1, pwd=OMIT, url='https://username:********@foo.com/login/',
+ not_secret='following the leader')
+ ),
+ )
+
+ def setUp(self):
+ self.COMPLEX_ARGS = basic.MODULE_COMPLEX_ARGS
+
+ def tearDown(self):
+ basic.MODULE_COMPLEX_ARGS = self.COMPLEX_ARGS
+
+ def test_remove_no_log_values(self):
+ for args, return_val, expected in self.dataset:
+ basic.MODULE_COMPLEX_ARGS = json.dumps(args)
+ module = basic.AnsibleModule(
+ argument_spec = dict(
+ username=dict(),
+ password=dict(no_log=True),
+ token=dict(no_log=True),
+ ),
+ )
+ self.assertEquals(module.remove_no_log_values(return_val), expected)
diff --git a/test/units/module_utils/test_basic.py b/test/units/module_utils/test_basic.py
index 8e3e802180..86473dd203 100644
--- a/test/units/module_utils/test_basic.py
+++ b/test/units/module_utils/test_basic.py
@@ -154,69 +154,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
self.assertEqual(test_data, res2)
- def test_module_utils_basic_heuristic_log_sanitize(self):
- from ansible.module_utils.basic import heuristic_log_sanitize
-
- URL_SECRET = 'http://username:pas:word@foo.com/data'
- SSH_SECRET = 'username:pas:word@foo.com/data'
-
- def _gen_data(records, per_rec, top_level, secret_text):
- hostvars = {'hostvars': {}}
- for i in range(1, records, 1):
- host_facts = {'host%s' % i:
- {'pstack':
- {'running': '875.1',
- 'symlinked': '880.0',
- 'tars': [],
- 'versions': ['885.0']},
- }}
- if per_rec:
- host_facts['host%s' % i]['secret'] = secret_text
- hostvars['hostvars'].update(host_facts)
- if top_level:
- hostvars['secret'] = secret_text
- return hostvars
-
- url_data = repr(_gen_data(3, True, True, URL_SECRET))
- ssh_data = repr(_gen_data(3, True, True, SSH_SECRET))
-
- url_output = heuristic_log_sanitize(url_data)
- ssh_output = heuristic_log_sanitize(ssh_data)
-
- # Basic functionality: Successfully hid the password
- try:
- self.assertNotIn('pas:word', url_output)
- self.assertNotIn('pas:word', ssh_output)
-
- # Slightly more advanced, we hid all of the password despite the ":"
- self.assertNotIn('pas', url_output)
- self.assertNotIn('pas', ssh_output)
- except AttributeError:
- # python2.6 or less's unittest
- self.assertFalse('pas:word' in url_output, '%s is present in %s' % ('"pas:word"', url_output))
- self.assertFalse('pas:word' in ssh_output, '%s is present in %s' % ('"pas:word"', ssh_output))
-
- self.assertFalse('pas' in url_output, '%s is present in %s' % ('"pas"', url_output))
- self.assertFalse('pas' in ssh_output, '%s is present in %s' % ('"pas"', ssh_output))
-
- # In this implementation we replace the password with 8 "*" which is
- # also the length of our password. The url fields should be able to
- # accurately detect where the password ends so the length should be
- # the same:
- self.assertEqual(len(url_output), len(url_data))
-
- # ssh checking is harder as the heuristic is overzealous in many
- # cases. Since the input will have at least one ":" present before
- # the password we can tell some things about the beginning and end of
- # the data, though:
- self.assertTrue(ssh_output.startswith("{'"))
- self.assertTrue(ssh_output.endswith("}"))
- try:
- self.assertIn(":********@foo.com/data'", ssh_output)
- except AttributeError:
- # python2.6 or less's unittest
- self.assertTrue(":********@foo.com/data'" in ssh_output, '%s is not present in %s' % (":********@foo.com/data'", ssh_output))
-
def test_module_utils_basic_get_module_path(self):
from ansible.module_utils.basic import get_module_path
with patch('os.path.realpath', return_value='/path/to/foo/'):