From 60193bd8bcb4ca78637078ea79bcc66b4ae14c6c Mon Sep 17 00:00:00 2001 From: Will Thames Date: Thu, 16 Feb 2017 05:47:57 +1000 Subject: Ensure ssh hostkey checks respect server port (#20840) * Add tests for `get_fqdn_and_port` method. Currently tests verify original behavior - returning default `ssh-keyscan` port Add test around `add_host_key` to verify underlying command arguments Add some new expectations for `get_fqdn_and_port` Test that non-standard port is passed to `ssh-keyscan` command * Ensure ssh hostkey checks respect server port ssh-keyscan will default to getting the host key for port 22. If the ssh service is running on a different port, ssh-keyscan will need to know this. Tidy up minor flake8 issues * Update known_hosts tests for port being None Ensure that git urls don't try and set port when a path is specified Update known_hosts tests to meet flake8 * Fix stdin swap context for test_known_hosts Move test_known_hosts from under basic, as it is its own library. Remove module_utils.known_hosts from pep8 legacy files list (cherry picked from commit 103ede26dfe012cc2aef268ff97b73fd9b906c23) --- lib/ansible/module_utils/known_hosts.py | 56 +++++++----- test/units/module_utils/basic/test_known_hosts.py | 55 ------------ test/units/module_utils/test_known_hosts.py | 105 ++++++++++++++++++++++ 3 files changed, 139 insertions(+), 77 deletions(-) delete mode 100644 test/units/module_utils/basic/test_known_hosts.py create mode 100644 test/units/module_utils/test_known_hosts.py diff --git a/lib/ansible/module_utils/known_hosts.py b/lib/ansible/module_utils/known_hosts.py index de074551f8..578d385b4c 100644 --- a/lib/ansible/module_utils/known_hosts.py +++ b/lib/ansible/module_utils/known_hosts.py @@ -28,6 +28,7 @@ import os import hmac +import re try: import urlparse @@ -41,23 +42,26 @@ except ImportError: HASHED_KEY_MAGIC = "|1|" + def add_git_host_key(module, url, accept_hostkey=True, create_dir=True): """ idempotently add a git url hostkey """ if is_ssh_url(url): - fqdn = get_fqdn(url) + fqdn, port = get_fqdn_and_port(url) if fqdn: known_host = check_hostkey(module, fqdn) if not known_host: if accept_hostkey: - rc, out, err = add_host_key(module, fqdn, create_dir=create_dir) + rc, out, err = add_host_key(module, fqdn, port=port, create_dir=create_dir) if rc != 0: module.fail_json(msg="failed to add %s hostkey: %s" % (fqdn, out + err)) else: - module.fail_json(msg="%s has an unknown hostkey. Set accept_hostkey to True or manually add the hostkey prior to running the git module" % fqdn) + module.fail_json(msg="%s has an unknown hostkey. Set accept_hostkey to True " + "or manually add the hostkey prior to running the git module" % fqdn) + def is_ssh_url(url): @@ -70,45 +74,51 @@ def is_ssh_url(url): return True return False -def get_fqdn(repo_url): - """ chop the hostname out of a url """ +def get_fqdn_and_port(repo_url): - result = None + """ chop the hostname and port out of a url """ + + fqdn = None + port = None + ipv6_re = re.compile('(\[[^]]*\])(?::([0-9]+))?') if "@" in repo_url and "://" not in repo_url: # most likely an user@host:path or user@host/path type URL repo_url = repo_url.split("@", 1)[1] - if repo_url.startswith('['): - result = repo_url.split(']', 1)[0] + ']' + match = ipv6_re.match(repo_url) + # For this type of URL, colon specifies the path, not the port + if match: + fqdn, path = match.groups() elif ":" in repo_url: - result = repo_url.split(":")[0] + fqdn = repo_url.split(":")[0] elif "/" in repo_url: - result = repo_url.split("/")[0] + fqdn = repo_url.split("/")[0] elif "://" in repo_url: # this should be something we can parse with urlparse parts = urlparse.urlparse(repo_url) # parts[1] will be empty on python2.4 on ssh:// or git:// urls, so # ensure we actually have a parts[1] before continuing. if parts[1] != '': - result = parts[1] - if "@" in result: - result = result.split("@", 1)[1] + fqdn = parts[1] + if "@" in fqdn: + fqdn = fqdn.split("@", 1)[1] + match = ipv6_re.match(fqdn) + if match: + fqdn, port = match.groups() + elif ":" in fqdn: + fqdn, port = fqdn.split(":")[0:2] + return fqdn, port - if result[0].startswith('['): - result = result.split(']', 1)[0] + ']' - elif ":" in result: - result = result.split(":")[0] - return result def check_hostkey(module, fqdn): return not not_in_host_file(module, fqdn) + # this is a variant of code found in connection_plugins/paramiko.py and we should modify # the paramiko code to import and use this. def not_in_host_file(self, host): - if 'USER' in os.environ: user_host_file = os.path.expandvars("~${USER}/.ssh/known_hosts") else: @@ -159,7 +169,7 @@ def not_in_host_file(self, host): return True -def add_host_key(module, fqdn, key_type="rsa", create_dir=False): +def add_host_key(module, fqdn, port=22, key_type="rsa", create_dir=False): """ use ssh-keyscan to add the hostkey """ @@ -184,7 +194,10 @@ def add_host_key(module, fqdn, key_type="rsa", create_dir=False): elif not os.path.isdir(user_ssh_dir): module.fail_json(msg="%s is not a directory" % user_ssh_dir) - this_cmd = "%s -t %s %s" % (keyscan_cmd, key_type, fqdn) + if port: + this_cmd = "%s -t %s -p %s %s" % (keyscan_cmd, key_type, port, fqdn) + else: + this_cmd = "%s -t %s %s" % (keyscan_cmd, key_type, fqdn) rc, out, err = module.run_command(this_cmd) # ssh-keyscan gives a 0 exit code and prints nothins on timeout @@ -193,4 +206,3 @@ def add_host_key(module, fqdn, key_type="rsa", create_dir=False): module.append_to_file(user_host_file, out) return rc, out, err - diff --git a/test/units/module_utils/basic/test_known_hosts.py b/test/units/module_utils/basic/test_known_hosts.py deleted file mode 100644 index 515d67686d..0000000000 --- a/test/units/module_utils/basic/test_known_hosts.py +++ /dev/null @@ -1,55 +0,0 @@ -# -*- coding: utf-8 -*- -# (c) 2015, Michael Scherer -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -from ansible.compat.tests import unittest -from ansible.module_utils import known_hosts - -class TestAnsibleModuleKnownHosts(unittest.TestCase): - urls = { - 'ssh://one.example.org/example.git': - {'is_ssh_url': True, 'get_fqdn': 'one.example.org'}, - 'ssh+git://two.example.org/example.git': - {'is_ssh_url': True, 'get_fqdn': 'two.example.org'}, - 'rsync://three.example.org/user/example.git': - {'is_ssh_url': False, 'get_fqdn': 'three.example.org'}, - 'git@four.example.org:user/example.git': - {'is_ssh_url': True, 'get_fqdn': 'four.example.org'}, - 'git+ssh://five.example.org/example.git': - {'is_ssh_url': True, 'get_fqdn': 'five.example.org'}, - 'ssh://six.example.org:21/example.org': - {'is_ssh_url': True, 'get_fqdn': 'six.example.org'}, - 'ssh://[2001:DB8::abcd:abcd]/example.git': - {'is_ssh_url': True, 'get_fqdn': '[2001:DB8::abcd:abcd]'}, - 'ssh://[2001:DB8::abcd:abcd]:22/example.git': - {'is_ssh_url': True, 'get_fqdn': '[2001:DB8::abcd:abcd]'}, - 'username@[2001:DB8::abcd:abcd]/example.git': - {'is_ssh_url': True, 'get_fqdn': '[2001:DB8::abcd:abcd]'}, - 'username@[2001:DB8::abcd:abcd]:22/example.git': - {'is_ssh_url': True, 'get_fqdn': '[2001:DB8::abcd:abcd]'}, - } - - def test_is_ssh_url(self): - for u in self.urls: - self.assertEqual(known_hosts.is_ssh_url(u), self.urls[u]['is_ssh_url']) - - def test_get_fqdn(self): - for u in self.urls: - self.assertEqual(known_hosts.get_fqdn(u), self.urls[u]['get_fqdn']) - - - diff --git a/test/units/module_utils/test_known_hosts.py b/test/units/module_utils/test_known_hosts.py new file mode 100644 index 0000000000..0bc8e55916 --- /dev/null +++ b/test/units/module_utils/test_known_hosts.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# (c) 2015, Michael Scherer +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from ansible.compat.tests import unittest +from ansible.module_utils import known_hosts + +import json +import ansible.module_utils.basic +from ansible.compat.tests.mock import Mock +from units.mock.procenv import swap_stdin_and_argv + + +class TestAnsibleModuleKnownHosts(unittest.TestCase): + urls = { + 'ssh://one.example.org/example.git': + {'is_ssh_url': True, 'get_fqdn': 'one.example.org', + 'add_host_key_cmd': " -t rsa one.example.org", + 'port': None}, + 'ssh+git://two.example.org/example.git': + {'is_ssh_url': True, 'get_fqdn': 'two.example.org', + 'add_host_key_cmd': " -t rsa two.example.org", + 'port': None}, + 'rsync://three.example.org/user/example.git': + {'is_ssh_url': False, 'get_fqdn': 'three.example.org', + 'add_host_key_cmd': None, # not called for non-ssh urls + 'port': None}, + 'git@four.example.org:user/example.git': + {'is_ssh_url': True, 'get_fqdn': 'four.example.org', + 'add_host_key_cmd': " -t rsa four.example.org", + 'port': None}, + 'git+ssh://five.example.org/example.git': + {'is_ssh_url': True, 'get_fqdn': 'five.example.org', + 'add_host_key_cmd': " -t rsa five.example.org", + 'port': None}, + 'ssh://six.example.org:21/example.org': # ssh on FTP Port? + {'is_ssh_url': True, 'get_fqdn': 'six.example.org', + 'add_host_key_cmd': " -t rsa -p 21 six.example.org", + 'port': '21'}, + 'ssh://[2001:DB8::abcd:abcd]/example.git': + {'is_ssh_url': True, 'get_fqdn': '[2001:DB8::abcd:abcd]', + 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", + 'port': None}, + 'ssh://[2001:DB8::abcd:abcd]:22/example.git': + {'is_ssh_url': True, 'get_fqdn': '[2001:DB8::abcd:abcd]', + 'add_host_key_cmd': " -t rsa -p 22 [2001:DB8::abcd:abcd]", + 'port': '22'}, + 'username@[2001:DB8::abcd:abcd]/example.git': + {'is_ssh_url': True, 'get_fqdn': '[2001:DB8::abcd:abcd]', + 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", + 'port': None}, + 'username@[2001:DB8::abcd:abcd]:path/example.git': + {'is_ssh_url': True, 'get_fqdn': '[2001:DB8::abcd:abcd]', + 'add_host_key_cmd': " -t rsa [2001:DB8::abcd:abcd]", + 'port': None}, + 'ssh://internal.git.server:7999/repos/repo.git': + {'is_ssh_url': True, 'get_fqdn': 'internal.git.server', + 'add_host_key_cmd': " -t rsa -p 7999 internal.git.server", + 'port': '7999'} + } + + def test_is_ssh_url(self): + for u in self.urls: + self.assertEqual(known_hosts.is_ssh_url(u), self.urls[u]['is_ssh_url']) + + def test_get_fqdn_and_port(self): + for u in self.urls: + self.assertEqual(known_hosts.get_fqdn_and_port(u), (self.urls[u]['get_fqdn'], self.urls[u]['port'])) + + def test_add_host_key(self): + + # Copied + args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) + # unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually + + with swap_stdin_and_argv(stdin_data=args): + ansible.module_utils.basic._ANSIBLE_ARGS = None + self.module = ansible.module_utils.basic.AnsibleModule(argument_spec=dict()) + + run_command = Mock() + run_command.return_value = (0, "Needs output, otherwise thinks ssh-keyscan timed out'", "") + self.module.run_command = run_command + + get_bin_path = Mock() + get_bin_path.return_value = keyscan_cmd = "/custom/path/ssh-keyscan" + self.module.get_bin_path = get_bin_path + + for u in self.urls: + if self.urls[u]['is_ssh_url']: + known_hosts.add_host_key(self.module, self.urls[u]['get_fqdn'], port=self.urls[u]['port']) + run_command.assert_called_with(keyscan_cmd + self.urls[u]['add_host_key_cmd']) -- cgit v1.2.1