summaryrefslogtreecommitdiff
path: root/cloudinit/config/cc_ssh_import_id.py
blob: ed5ac4921fa4b8279a92e6baf9a76e167e30edcd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Copyright (C) 2009-2010 Canonical Ltd.
# Copyright (C) 2012, 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Scott Moser <scott.moser@canonical.com>
# Author: Juerg Haefliger <juerg.haefliger@hp.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
"""SSH Import ID: Import SSH id"""

import pwd
from logging import Logger
from textwrap import dedent

from cloudinit import subp, util
from cloudinit.cloud import Cloud
from cloudinit.config import Config
from cloudinit.config.schema import MetaSchema, get_meta_doc
from cloudinit.distros import ug_util
from cloudinit.settings import PER_INSTANCE

# https://launchpad.net/ssh-import-id
distros = ["ubuntu", "debian", "cos"]

SSH_IMPORT_ID_BINARY = "ssh-import-id"
MODULE_DESCRIPTION = """\
This module imports SSH keys from either a public keyserver, usually launchpad
or github using ``ssh-import-id``. Keys are referenced by the username they are
associated with on the keyserver. The keyserver can be specified by prepending
either ``lp:`` for launchpad or ``gh:`` for github to the username.
"""

meta: MetaSchema = {
    "id": "cc_ssh_import_id",
    "name": "SSH Import ID",
    "title": "Import SSH id",
    "description": MODULE_DESCRIPTION,
    "distros": distros,
    "frequency": PER_INSTANCE,
    "examples": [
        dedent(
            """\
            ssh_import_id:
             - user
             - gh:user
             - lp:user
            """
        )
    ],
    "activate_by_schema_keys": [],
}

__doc__ = get_meta_doc(meta)


def handle(
    name: str, cfg: Config, cloud: Cloud, log: Logger, args: list
) -> None:

    if not is_key_in_nested_dict(cfg, "ssh_import_id"):
        log.debug(
            "Skipping module named ssh-import-id, no 'ssh_import_id'"
            " directives found."
        )
        return
    elif not subp.which(SSH_IMPORT_ID_BINARY):
        log.warning(
            "ssh-import-id is not installed, but module ssh_import_id is "
            "configured. Skipping module."
        )
        return

    # import for "user: XXXXX"
    if len(args) != 0:
        user = args[0]
        ids = []
        if len(args) > 1:
            ids = args[1:]

        import_ssh_ids(ids, user, log)
        return

    # import for cloudinit created users
    (users, _groups) = ug_util.normalize_users_groups(cfg, cloud.distro)
    elist = []
    for (user, user_cfg) in users.items():
        import_ids = []
        if user_cfg["default"]:
            import_ids = util.get_cfg_option_list(cfg, "ssh_import_id", [])
        else:
            try:
                import_ids = user_cfg["ssh_import_id"]
            except Exception:
                log.debug("User %s is not configured for ssh_import_id", user)
                continue

        try:
            import_ids = util.uniq_merge(import_ids)
            import_ids = [str(i) for i in import_ids]
        except Exception:
            log.debug(
                "User %s is not correctly configured for ssh_import_id", user
            )
            continue

        if not len(import_ids):
            continue

        try:
            import_ssh_ids(import_ids, user, log)
        except Exception as exc:
            util.logexc(
                log, "ssh-import-id failed for: %s %s", user, import_ids
            )
            elist.append(exc)

    if len(elist):
        raise elist[0]


def import_ssh_ids(ids, user, log):

    if not (user and ids):
        log.debug("empty user(%s) or ids(%s). not importing", user, ids)
        return

    try:
        pwd.getpwnam(user)
    except KeyError as exc:
        raise exc

    # TODO: We have a use case that involes setting a proxy value earlier
    # in boot and the user wants this env used when using ssh-import-id.
    # E.g.,:
    # bootcmd:
    #   - mkdir -p /etc/systemd/system/cloud-config.service.d
    #   - mkdir -p /etc/systemd/system/cloud-final.service.d
    # write_files:
    #   - content: |
    #       http_proxy=http://192.168.1.2:3128/
    #       https_proxy=http://192.168.1.2:3128/
    #     path: /etc/cloud/env
    #   - content: |
    #       [Service]
    #       EnvironmentFile=/etc/cloud/env
    #       PassEnvironment=https_proxy http_proxy
    #     path: /etc/systemd/system/cloud-config.service.d/override.conf
    #   - content: |
    #       [Service]
    #       EnvironmentFile=/etc/cloud/env
    #       PassEnvironment=https_proxy http_proxy
    #     path: /etc/systemd/system/cloud-final.service.d/override.conf
    #
    # I'm including the `--preserve-env` here as a one-off, but we should
    # have a better way of setting env earlier in boot and using it later.
    # Perhaps a 'set_env' module?
    cmd = [
        "sudo",
        "--preserve-env=https_proxy",
        "-Hu",
        user,
        SSH_IMPORT_ID_BINARY,
    ] + ids
    log.debug("Importing SSH ids for user %s.", user)

    try:
        subp.subp(cmd, capture=False)
    except subp.ProcessExecutionError as exc:
        util.logexc(log, "Failed to run command to import %s SSH ids", user)
        raise exc


def is_key_in_nested_dict(config: dict, search_key: str) -> bool:
    """Search for key nested in config.

    Note: A dict embedded in a list of lists will not be found walked - but in
    this case we don't need it.
    """
    for config_key in config.keys():
        if search_key == config_key:
            return True
        if isinstance(config[config_key], dict):
            if is_key_in_nested_dict(config[config_key], search_key):
                return True
        if isinstance(config[config_key], list):
            # this code could probably be generalized to walking the whole
            # config by iterating lists in search of dictionaries
            for item in config[config_key]:
                if isinstance(item, dict):
                    if is_key_in_nested_dict(item, search_key):
                        return True
    return False