summaryrefslogtreecommitdiff
path: root/tests/integration_tests/util.py
blob: 1c2a9284a31dc133c4cc683a5ebfa7d4eff74b7d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import logging
import multiprocessing
import os
import re
import time
from collections import namedtuple
from contextlib import contextmanager
from functools import lru_cache
from itertools import chain
from pathlib import Path
from typing import Set

import pytest

from cloudinit.subp import subp
from tests.integration_tests.instances import IntegrationInstance

log = logging.getLogger("integration_testing")
key_pair = namedtuple("key_pair", "public_key private_key")

ASSETS_DIR = Path("tests/integration_tests/assets")
KEY_PATH = ASSETS_DIR / "keys"


def verify_ordered_items_in_text(to_verify: list, text: str):
    """Assert all items in list appear in order in text.

    Examples:
      verify_ordered_items_in_text(['a', '1'], 'ab1')  # passes
      verify_ordered_items_in_text(['1', 'a'], 'ab1')  # raises AssertionError
    """
    index = 0
    for item in to_verify:
        try:
            matched = re.search(item, text[index:])
        except re.error:
            matched = re.search(re.escape(item), text[index:])
        assert matched, "Expected item not found: '{}'".format(item)
        index = matched.start()


def verify_clean_log(log: str, ignore_deprecations: bool = True):
    """Assert no unexpected tracebacks or warnings in logs"""
    if ignore_deprecations:
        is_deprecated = re.compile("deprecat", flags=re.IGNORECASE)
        log_lines = log.split("\n")
        log_lines = list(
            filter(lambda line: not is_deprecated.search(line), log_lines)
        )
        log = "\n".join(log_lines)

    error_logs = re.findall("CRITICAL.*", log) + re.findall("ERROR.*", log)
    if error_logs:
        raise AssertionError(
            "Found unexpected errors: %s" % "\n".join(error_logs)
        )

    warning_count = log.count("WARN")
    expected_warnings = 0
    traceback_count = log.count("Traceback")
    expected_tracebacks = 0

    warning_texts = [
        # Consistently on all Azure launches:
        # azure.py[WARNING]: No lease found; using default endpoint
        "No lease found; using default endpoint",
        # Ubuntu lxd storage
        "thinpool by default on Ubuntu due to LP #1982780",
        "WARNING]: Could not match supplied host pattern, ignoring:",
    ]
    traceback_texts = []
    if "oracle" in log:
        # LP: #1842752
        lease_exists_text = "Stderr: RTNETLINK answers: File exists"
        warning_texts.append(lease_exists_text)
        traceback_texts.append(lease_exists_text)
        # LP: #1833446
        fetch_error_text = (
            "UrlError: 404 Client Error: Not Found for url: "
            "http://169.254.169.254/latest/meta-data/"
        )
        warning_texts.append(fetch_error_text)
        traceback_texts.append(fetch_error_text)
        # Oracle has a file in /etc/cloud/cloud.cfg.d that contains
        # users:
        # - default
        # - name: opc
        #   ssh_redirect_user: true
        # This can trigger a warning about opc having no public key
        warning_texts.append(
            "Unable to disable SSH logins for opc given ssh_redirect_user"
        )

    for warning_text in warning_texts:
        expected_warnings += log.count(warning_text)
    for traceback_text in traceback_texts:
        expected_tracebacks += log.count(traceback_text)

    assert warning_count == expected_warnings, (
        f"Unexpected warning count != {expected_warnings}. Found: "
        f"{re.findall('WARNING.*', log)}"
    )
    assert traceback_count == expected_tracebacks


def get_inactive_modules(log: str) -> Set[str]:
    matches = re.findall(
        r"Skipping modules '(.*)' because no applicable config is provided.",
        log,
    )
    return set(
        map(
            lambda module: module.strip(),
            chain(*map(lambda match: match.split(","), matches)),
        )
    )


@contextmanager
def emit_dots_on_travis():
    """emit a dot every 60 seconds if running on Travis.

    Travis will kill jobs that don't emit output for a certain amount of time.
    This context manager spins up a background process which will emit a dot to
    stdout every 60 seconds to avoid being killed.

    It should be wrapped selectively around operations that are known to take a
    long time.
    """
    if os.environ.get("TRAVIS") != "true":
        # If we aren't on Travis, don't do anything.
        yield
        return

    def emit_dots():
        while True:
            log.info(".")
            time.sleep(60)

    dot_process = multiprocessing.Process(target=emit_dots)
    dot_process.start()
    try:
        yield
    finally:
        dot_process.terminate()


def get_test_rsa_keypair(key_name: str = "test1") -> key_pair:
    private_key_path = KEY_PATH / "id_rsa.{}".format(key_name)
    public_key_path = KEY_PATH / "id_rsa.{}.pub".format(key_name)
    with public_key_path.open() as public_file:
        public_key = public_file.read()
    with private_key_path.open() as private_file:
        private_key = private_file.read()
    return key_pair(public_key, private_key)


def get_console_log(client: IntegrationInstance):
    try:
        console_log = client.instance.console_log()
    except NotImplementedError:
        pytest.skip("NotImplementedError when requesting console log")
    if console_log.lower().startswith("no console output"):
        pytest.fail("no console output")
    return console_log


@lru_cache()
def lxd_has_nocloud(client: IntegrationInstance) -> bool:
    # Bionic or Focal may be detected as NoCloud rather than LXD
    lxd_image_metadata = subp(
        ["lxc", "config", "metadata", "show", client.instance.name]
    )
    return "/var/lib/cloud/seed/nocloud" in lxd_image_metadata.stdout


def get_feature_flag_value(client: IntegrationInstance, key):
    value = client.execute(
        'python3 -c "from cloudinit import features; '
        f'print(features.{key})"'
    ).strip()
    if "NameError" in value:
        raise NameError(f"name '{key}' is not defined")
    return value