1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# Copyright 2019 Red Hat, Inc.
#
# This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory.
import os
import random
import re
import string
import tempfile
from ..logger import log
class CloudInitData():
disable = None
root_password_generate = None
root_password_file = None
generated_root_password = None
ssh_key = None
user_data = None
meta_data = None
network_config = None
def _generate_password(self):
if not self.generated_root_password:
self.generated_root_password = ""
for dummy in range(16):
self.generated_root_password += random.choice(
string.ascii_letters + string.digits)
return self.generated_root_password
def _get_password(self, pwdfile):
with open(pwdfile, "r") as fobj:
return fobj.readline().rstrip("\n\r")
def get_password_if_generated(self):
if self.root_password_generate:
return self._generate_password()
def get_root_password(self):
if self.root_password_file:
return self._get_password(self.root_password_file)
return self.get_password_if_generated()
def get_ssh_key(self):
if self.ssh_key:
return self._get_password(self.ssh_key)
def _create_metadata_content(cloudinit_data):
content = ""
if cloudinit_data.meta_data:
log.debug("Using meta-data content from path=%s",
cloudinit_data.meta_data)
content = open(cloudinit_data.meta_data).read()
return content
def _create_userdata_content(cloudinit_data):
if cloudinit_data.user_data:
log.debug("Using user-data content from path=%s",
cloudinit_data.user_data)
return open(cloudinit_data.user_data).read()
content = "#cloud-config\n"
if cloudinit_data.root_password_generate or cloudinit_data.root_password_file:
rootpass = cloudinit_data.get_root_password()
content += "chpasswd:\n"
content += " list: |\n"
content += " root:%s\n" % rootpass
if cloudinit_data.root_password_generate:
content += " expire: True\n"
elif cloudinit_data.root_password_file:
content += " expire: False\n"
if cloudinit_data.ssh_key:
rootpass = cloudinit_data.get_ssh_key()
content += "users:\n"
content += " - name: root\n"
content += " ssh-authorized-keys:\n"
content += " - %s\n" % rootpass
if cloudinit_data.disable:
content += "runcmd:\n"
content += "- [ sudo, touch, /etc/cloud/cloud-init.disabled ]\n"
log.debug("Generated cloud-init userdata: \n%s",
re.sub(r"root:(.*)", 'root:[SCRUBBLED]', content))
return content
def _create_network_config_content(cloudinit_data):
content = ""
if cloudinit_data.network_config:
log.debug("Using network-config content from path=%s",
cloudinit_data.network_config)
content = open(cloudinit_data.network_config).read()
return content
def create_files(scratchdir, cloudinit_data):
metadata = _create_metadata_content(cloudinit_data)
userdata = _create_userdata_content(cloudinit_data)
datas = [(metadata, "meta-data"), (userdata, "user-data")]
network_config = _create_network_config_content(cloudinit_data)
if network_config:
datas.append((network_config, 'network-config'))
filepairs = []
try:
for content, destfile in datas:
fileobj = tempfile.NamedTemporaryFile(
prefix="virtinst-", suffix=("-%s" % destfile),
dir=scratchdir, delete=False)
filename = fileobj.name
filepairs.append((filename, destfile))
with open(filename, "w+") as f:
f.write(content)
except Exception: # pragma: no cover
for filepair in filepairs:
os.unlink(filepair[0])
return filepairs
|