summaryrefslogtreecommitdiff
path: root/lib/ansible/modules/crypto/openssh_cert.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/modules/crypto/openssh_cert.py')
-rw-r--r--lib/ansible/modules/crypto/openssh_cert.py590
1 files changed, 0 insertions, 590 deletions
diff --git a/lib/ansible/modules/crypto/openssh_cert.py b/lib/ansible/modules/crypto/openssh_cert.py
deleted file mode 100644
index e22e27afa2..0000000000
--- a/lib/ansible/modules/crypto/openssh_cert.py
+++ /dev/null
@@ -1,590 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# (c) 2018, David Kainz <dkainz@mgit.at> <dave.jokain@gmx.at>
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-
-ANSIBLE_METADATA = {
- 'metadata_version': '1.1',
- 'status': ['preview'],
- 'supported_by': 'community'
-}
-
-DOCUMENTATION = '''
----
-module: openssh_cert
-author: "David Kainz (@lolcube)"
-version_added: "2.8"
-short_description: Generate OpenSSH host or user certificates.
-description:
- - Generate and regenerate OpenSSH host or user certificates.
-requirements:
- - "ssh-keygen"
-options:
- state:
- description:
- - Whether the host or user certificate should exist or not, taking action if the state is different from what is stated.
- type: str
- default: "present"
- choices: [ 'present', 'absent' ]
- type:
- description:
- - Whether the module should generate a host or a user certificate.
- - Required if I(state) is C(present).
- type: str
- choices: ['host', 'user']
- force:
- description:
- - Should the certificate be regenerated even if it already exists and is valid.
- type: bool
- default: false
- path:
- description:
- - Path of the file containing the certificate.
- type: path
- required: true
- signing_key:
- description:
- - The path to the private openssh key that is used for signing the public key in order to generate the certificate.
- - Required if I(state) is C(present).
- type: path
- public_key:
- description:
- - The path to the public key that will be signed with the signing key in order to generate the certificate.
- - Required if I(state) is C(present).
- type: path
- valid_from:
- description:
- - "The point in time the certificate is valid from. Time can be specified either as relative time or as absolute timestamp.
- Time will always be interpreted as UTC. Valid formats are: C([+-]timespec | YYYY-MM-DD | YYYY-MM-DDTHH:MM:SS | YYYY-MM-DD HH:MM:SS | always)
- where timespec can be an integer + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
- Note that if using relative time this module is NOT idempotent."
- - Required if I(state) is C(present).
- type: str
- valid_to:
- description:
- - "The point in time the certificate is valid to. Time can be specified either as relative time or as absolute timestamp.
- Time will always be interpreted as UTC. Valid formats are: C([+-]timespec | YYYY-MM-DD | YYYY-MM-DDTHH:MM:SS | YYYY-MM-DD HH:MM:SS | forever)
- where timespec can be an integer + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
- Note that if using relative time this module is NOT idempotent."
- - Required if I(state) is C(present).
- type: str
- valid_at:
- description:
- - "Check if the certificate is valid at a certain point in time. If it is not the certificate will be regenerated.
- Time will always be interpreted as UTC. Mainly to be used with relative timespec for I(valid_from) and / or I(valid_to).
- Note that if using relative time this module is NOT idempotent."
- type: str
- principals:
- description:
- - "Certificates may be limited to be valid for a set of principal (user/host) names.
- By default, generated certificates are valid for all users or hosts."
- type: list
- elements: str
- options:
- description:
- - "Specify certificate options when signing a key. The option that are valid for user certificates are:"
- - "C(clear): Clear all enabled permissions. This is useful for clearing the default set of permissions so permissions may be added individually."
- - "C(force-command=command): Forces the execution of command instead of any shell or
- command specified by the user when the certificate is used for authentication."
- - "C(no-agent-forwarding): Disable ssh-agent forwarding (permitted by default)."
- - "C(no-port-forwarding): Disable port forwarding (permitted by default)."
- - "C(no-pty Disable): PTY allocation (permitted by default)."
- - "C(no-user-rc): Disable execution of C(~/.ssh/rc) by sshd (permitted by default)."
- - "C(no-x11-forwarding): Disable X11 forwarding (permitted by default)"
- - "C(permit-agent-forwarding): Allows ssh-agent forwarding."
- - "C(permit-port-forwarding): Allows port forwarding."
- - "C(permit-pty): Allows PTY allocation."
- - "C(permit-user-rc): Allows execution of C(~/.ssh/rc) by sshd."
- - "C(permit-x11-forwarding): Allows X11 forwarding."
- - "C(source-address=address_list): Restrict the source addresses from which the certificate is considered valid.
- The C(address_list) is a comma-separated list of one or more address/netmask pairs in CIDR format."
- - "At present, no options are valid for host keys."
- type: list
- elements: str
- identifier:
- description:
- - Specify the key identity when signing a public key. The identifier that is logged by the server when the certificate is used for authentication.
- type: str
- serial_number:
- description:
- - "Specify the certificate serial number.
- The serial number is logged by the server when the certificate is used for authentication.
- The certificate serial number may be used in a KeyRevocationList.
- The serial number may be omitted for checks, but must be specified again for a new certificate.
- Note: The default value set by ssh-keygen is 0."
- type: int
-
-extends_documentation_fragment: files
-'''
-
-EXAMPLES = '''
-# Generate an OpenSSH user certificate that is valid forever and for all users
-- openssh_cert:
- type: user
- signing_key: /path/to/private_key
- public_key: /path/to/public_key.pub
- path: /path/to/certificate
- valid_from: always
- valid_to: forever
-
-# Generate an OpenSSH host certificate that is valid for 32 weeks from now and will be regenerated
-# if it is valid for less than 2 weeks from the time the module is being run
-- openssh_cert:
- type: host
- signing_key: /path/to/private_key
- public_key: /path/to/public_key.pub
- path: /path/to/certificate
- valid_from: +0s
- valid_to: +32w
- valid_at: +2w
-
-# Generate an OpenSSH host certificate that is valid forever and only for example.com and examplehost
-- openssh_cert:
- type: host
- signing_key: /path/to/private_key
- public_key: /path/to/public_key.pub
- path: /path/to/certificate
- valid_from: always
- valid_to: forever
- principals:
- - example.com
- - examplehost
-
-# Generate an OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019
-- openssh_cert:
- type: host
- signing_key: /path/to/private_key
- public_key: /path/to/public_key.pub
- path: /path/to/certificate
- valid_from: "2001-01-21"
- valid_to: "2019-01-21"
-
-# Generate an OpenSSH user Certificate with clear and force-command option:
-- openssh_cert:
- type: user
- signing_key: /path/to/private_key
- public_key: /path/to/public_key.pub
- path: /path/to/certificate
- valid_from: always
- valid_to: forever
- options:
- - "clear"
- - "force-command=/tmp/bla/foo"
-
-'''
-
-RETURN = '''
-type:
- description: type of the certificate (host or user)
- returned: changed or success
- type: str
- sample: host
-filename:
- description: path to the certificate
- returned: changed or success
- type: str
- sample: /tmp/certificate-cert.pub
-info:
- description: Information about the certificate. Output of C(ssh-keygen -L -f).
- returned: change or success
- type: list
- elements: str
-
-'''
-
-import os
-import errno
-import re
-import tempfile
-
-from datetime import datetime
-from datetime import MINYEAR, MAXYEAR
-from shutil import copy2
-from shutil import rmtree
-from ansible.module_utils.basic import AnsibleModule
-from ansible.module_utils.crypto import convert_relative_to_datetime
-from ansible.module_utils._text import to_native
-
-
-class CertificateError(Exception):
- pass
-
-
-class Certificate(object):
-
- def __init__(self, module):
- self.state = module.params['state']
- self.force = module.params['force']
- self.type = module.params['type']
- self.signing_key = module.params['signing_key']
- self.public_key = module.params['public_key']
- self.path = module.params['path']
- self.identifier = module.params['identifier']
- self.serial_number = module.params['serial_number']
- self.valid_from = module.params['valid_from']
- self.valid_to = module.params['valid_to']
- self.valid_at = module.params['valid_at']
- self.principals = module.params['principals']
- self.options = module.params['options']
- self.changed = False
- self.check_mode = module.check_mode
- self.cert_info = {}
-
- if self.state == 'present':
-
- if self.options and self.type == "host":
- module.fail_json(msg="Options can only be used with user certificates.")
-
- if self.valid_at:
- self.valid_at = self.valid_at.lstrip()
-
- self.valid_from = self.valid_from.lstrip()
- self.valid_to = self.valid_to.lstrip()
-
- self.ssh_keygen = module.get_bin_path('ssh-keygen', True)
-
- def generate(self, module):
-
- if not self.is_valid(module, perms_required=False) or self.force:
- args = [
- self.ssh_keygen,
- '-s', self.signing_key
- ]
-
- validity = ""
-
- if not (self.valid_from == "always" and self.valid_to == "forever"):
-
- if not self.valid_from == "always":
- timeobj = self.convert_to_datetime(module, self.valid_from)
- validity += (
- str(timeobj.year).zfill(4) +
- str(timeobj.month).zfill(2) +
- str(timeobj.day).zfill(2) +
- str(timeobj.hour).zfill(2) +
- str(timeobj.minute).zfill(2) +
- str(timeobj.second).zfill(2)
- )
- else:
- validity += "19700101010101"
-
- validity += ":"
-
- if self.valid_to == "forever":
- # on ssh-keygen versions that have the year 2038 bug this will cause the datetime to be 2038-01-19T04:14:07
- timeobj = datetime(MAXYEAR, 12, 31)
- else:
- timeobj = self.convert_to_datetime(module, self.valid_to)
-
- validity += (
- str(timeobj.year).zfill(4) +
- str(timeobj.month).zfill(2) +
- str(timeobj.day).zfill(2) +
- str(timeobj.hour).zfill(2) +
- str(timeobj.minute).zfill(2) +
- str(timeobj.second).zfill(2)
- )
-
- args.extend(["-V", validity])
-
- if self.type == 'host':
- args.extend(['-h'])
-
- if self.identifier:
- args.extend(['-I', self.identifier])
- else:
- args.extend(['-I', ""])
-
- if self.serial_number is not None:
- args.extend(['-z', str(self.serial_number)])
-
- if self.principals:
- args.extend(['-n', ','.join(self.principals)])
-
- if self.options:
- for option in self.options:
- args.extend(['-O'])
- args.extend([option])
-
- args.extend(['-P', ''])
-
- try:
- temp_directory = tempfile.mkdtemp()
- copy2(self.public_key, temp_directory)
- args.extend([temp_directory + "/" + os.path.basename(self.public_key)])
- module.run_command(args, environ_update=dict(TZ="UTC"), check_rc=True)
- copy2(temp_directory + "/" + os.path.splitext(os.path.basename(self.public_key))[0] + "-cert.pub", self.path)
- rmtree(temp_directory, ignore_errors=True)
- proc = module.run_command([self.ssh_keygen, '-L', '-f', self.path])
- self.cert_info = proc[1].split()
- self.changed = True
- except Exception as e:
- try:
- self.remove()
- rmtree(temp_directory, ignore_errors=True)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- raise CertificateError(exc)
- else:
- pass
- module.fail_json(msg="%s" % to_native(e))
-
- file_args = module.load_file_common_arguments(module.params)
- if module.set_fs_attributes_if_different(file_args, False):
- self.changed = True
-
- def convert_to_datetime(self, module, timestring):
-
- if self.is_relative(timestring):
- result = convert_relative_to_datetime(timestring)
- if result is None:
- module.fail_json(
- msg="'%s' is not a valid time format." % timestring)
- else:
- return result
- else:
- formats = ["%Y-%m-%d",
- "%Y-%m-%d %H:%M:%S",
- "%Y-%m-%dT%H:%M:%S",
- ]
- for fmt in formats:
- try:
- return datetime.strptime(timestring, fmt)
- except ValueError:
- pass
- module.fail_json(msg="'%s' is not a valid time format" % timestring)
-
- def is_relative(self, timestr):
- if timestr.startswith("+") or timestr.startswith("-"):
- return True
- return False
-
- def is_same_datetime(self, datetime_one, datetime_two):
-
- # This function is for backwards compatibility only because .total_seconds() is new in python2.7
- def timedelta_total_seconds(time_delta):
- return (time_delta.microseconds + 0.0 + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
- # try to use .total_ seconds() from python2.7
- try:
- return (datetime_one - datetime_two).total_seconds() == 0.0
- except AttributeError:
- return timedelta_total_seconds(datetime_one - datetime_two) == 0.0
-
- def is_valid(self, module, perms_required=True):
-
- def _check_state():
- return os.path.exists(self.path)
-
- if _check_state():
- proc = module.run_command([self.ssh_keygen, '-L', '-f', self.path], environ_update=dict(TZ="UTC"), check_rc=False)
- if proc[0] != 0:
- return False
- self.cert_info = proc[1].split()
- principals = re.findall("(?<=Principals:)(.*)(?=Critical)", proc[1], re.S)[0].split()
- principals = list(map(str.strip, principals))
- if principals == ["(none)"]:
- principals = None
- cert_type = re.findall("( user | host )", proc[1])[0].strip()
- serial_number = re.search(r"Serial: (\d+)", proc[1]).group(1)
- validity = re.findall("(from (\\d{4}-\\d{2}-\\d{2}T\\d{2}(:\\d{2}){2}) to (\\d{4}-\\d{2}-\\d{2}T\\d{2}(:\\d{2}){2}))", proc[1])
- if validity:
- if validity[0][1]:
- cert_valid_from = self.convert_to_datetime(module, validity[0][1])
- if self.is_same_datetime(cert_valid_from, self.convert_to_datetime(module, "1970-01-01 01:01:01")):
- cert_valid_from = datetime(MINYEAR, 1, 1)
- else:
- cert_valid_from = datetime(MINYEAR, 1, 1)
-
- if validity[0][3]:
- cert_valid_to = self.convert_to_datetime(module, validity[0][3])
- if self.is_same_datetime(cert_valid_to, self.convert_to_datetime(module, "2038-01-19 03:14:07")):
- cert_valid_to = datetime(MAXYEAR, 12, 31)
- else:
- cert_valid_to = datetime(MAXYEAR, 12, 31)
- else:
- cert_valid_from = datetime(MINYEAR, 1, 1)
- cert_valid_to = datetime(MAXYEAR, 12, 31)
- else:
- return False
-
- def _check_perms(module):
- file_args = module.load_file_common_arguments(module.params)
- return not module.set_fs_attributes_if_different(file_args, False)
-
- def _check_serial_number():
- if self.serial_number is None:
- return True
- return self.serial_number == int(serial_number)
-
- def _check_type():
- return self.type == cert_type
-
- def _check_principals():
- if not principals or not self.principals:
- return self.principals == principals
- return set(self.principals) == set(principals)
-
- def _check_validity(module):
- if self.valid_from == "always":
- earliest_time = datetime(MINYEAR, 1, 1)
- elif self.is_relative(self.valid_from):
- earliest_time = None
- else:
- earliest_time = self.convert_to_datetime(module, self.valid_from)
-
- if self.valid_to == "forever":
- last_time = datetime(MAXYEAR, 12, 31)
- elif self.is_relative(self.valid_to):
- last_time = None
- else:
- last_time = self.convert_to_datetime(module, self.valid_to)
-
- if earliest_time:
- if not self.is_same_datetime(earliest_time, cert_valid_from):
- return False
- if last_time:
- if not self.is_same_datetime(last_time, cert_valid_to):
- return False
-
- if self.valid_at:
- if cert_valid_from <= self.convert_to_datetime(module, self.valid_at) <= cert_valid_to:
- return True
-
- if earliest_time and last_time:
- return True
-
- return False
-
- if perms_required and not _check_perms(module):
- return False
-
- return _check_type() and _check_principals() and _check_validity(module) and _check_serial_number()
-
- def dump(self):
-
- """Serialize the object into a dictionary."""
-
- def filter_keywords(arr, keywords):
- concated = []
- string = ""
- for word in arr:
- if word in keywords:
- concated.append(string)
- string = word
- else:
- string += " " + word
- concated.append(string)
- # drop the certificate path
- concated.pop(0)
- return concated
-
- def format_cert_info():
- return filter_keywords(self.cert_info, [
- "Type:",
- "Public",
- "Signing",
- "Key",
- "Serial:",
- "Valid:",
- "Principals:",
- "Critical",
- "Extensions:"])
-
- if self.state == 'present':
- result = {
- 'changed': self.changed,
- 'type': self.type,
- 'filename': self.path,
- 'info': format_cert_info(),
- }
- else:
- result = {
- 'changed': self.changed,
- }
-
- return result
-
- def remove(self):
- """Remove the resource from the filesystem."""
-
- try:
- os.remove(self.path)
- self.changed = True
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- raise CertificateError(exc)
- else:
- pass
-
-
-def main():
-
- module = AnsibleModule(
- argument_spec=dict(
- state=dict(type='str', default='present', choices=['absent', 'present']),
- force=dict(type='bool', default=False),
- type=dict(type='str', choices=['host', 'user']),
- signing_key=dict(type='path'),
- public_key=dict(type='path'),
- path=dict(type='path', required=True),
- identifier=dict(type='str'),
- serial_number=dict(type='int'),
- valid_from=dict(type='str'),
- valid_to=dict(type='str'),
- valid_at=dict(type='str'),
- principals=dict(type='list', elements='str'),
- options=dict(type='list', elements='str'),
- ),
- supports_check_mode=True,
- add_file_common_args=True,
- required_if=[('state', 'present', ['type', 'signing_key', 'public_key', 'valid_from', 'valid_to'])],
- )
-
- def isBaseDir(path):
- base_dir = os.path.dirname(path) or '.'
- if not os.path.isdir(base_dir):
- module.fail_json(
- name=base_dir,
- msg='The directory %s does not exist or the file is not a directory' % base_dir
- )
- if module.params['state'] == "present":
- isBaseDir(module.params['signing_key'])
- isBaseDir(module.params['public_key'])
-
- isBaseDir(module.params['path'])
-
- certificate = Certificate(module)
-
- if certificate.state == 'present':
-
- if module.check_mode:
- certificate.changed = module.params['force'] or not certificate.is_valid(module)
- else:
- try:
- certificate.generate(module)
- except Exception as exc:
- module.fail_json(msg=to_native(exc))
-
- else:
-
- if module.check_mode:
- certificate.changed = os.path.exists(module.params['path'])
- if certificate.changed:
- certificate.cert_info = {}
- else:
- try:
- certificate.remove()
- except Exception as exc:
- module.fail_json(msg=to_native(exc))
-
- result = certificate.dump()
- module.exit_json(**result)
-
-
-if __name__ == '__main__':
- main()