summaryrefslogtreecommitdiff
path: root/.gitlab-ci/lava/lava_job_submitter.py
blob: 0ba515a1bf5355114bb50a818189ff0fdd584ffb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
#!/usr/bin/env python3
#
# Copyright (C) 2020 - 2023 Collabora Limited
# Authors:
#     Gustavo Padovan <gustavo.padovan@collabora.com>
#     Guilherme Gallo <guilherme.gallo@collabora.com>
#
# SPDX-License-Identifier: MIT

"""Send a job to LAVA, track it and collect log back"""


import contextlib
import json
import pathlib
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, fields
from datetime import datetime, timedelta
from io import StringIO
from os import getenv
from typing import Any, Optional

import fire
from lava.exceptions import (
    MesaCIException,
    MesaCIParseException,
    MesaCIRetryError,
    MesaCITimeoutError,
)
from lava.utils import CONSOLE_LOG
from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
from lava.utils import (
    GitlabSection,
    LAVAJob,
    LogFollower,
    LogSectionType,
    call_proxy,
    fatal_err,
    generate_lava_yaml_payload,
    hide_sensitive_data,
    print_log,
    setup_lava_proxy,
)
from lavacli.utils import flow_yaml as lava_yaml

# Initialize structural logging with a defaultdict, it can be changed for more
# sophisticated dict-like data abstractions.
STRUCTURAL_LOG = defaultdict(list)

# Timeout in seconds to decide if the device from the dispatched LAVA job has
# hung or not due to the lack of new log output.
DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC",  5*60))

# How many seconds the script should wait before try a new polling iteration to
# check if the dispatched LAVA job is running or waiting in the job queue.
WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(
    getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1)
)

# How many seconds the script will wait to let LAVA finalize the job and give
# the final details.
WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5))
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int(
    getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 3)
)

# How many seconds to wait between log output LAVA RPC calls.
LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))

# How many retries should be made when a timeout happen.
NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(
    getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
)

def raise_exception_from_metadata(metadata: dict, job_id: int) -> None:
    """
    Investigate infrastructure errors from the job metadata.
    If it finds an error, raise it as MesaCIException.
    """
    if "result" not in metadata or metadata["result"] != "fail":
        return
    if "error_type" in metadata:
        error_type = metadata["error_type"]
        if error_type == "Infrastructure":
            raise MesaCIException(
                f"LAVA job {job_id} failed with Infrastructure Error. Retry."
            )
        if error_type == "Job":
            # This happens when LAVA assumes that the job cannot terminate or
            # with mal-formed job definitions. As we are always validating the
            # jobs, only the former is probable to happen. E.g.: When some LAVA
            # action timed out more times than expected in job definition.
            raise MesaCIException(
                f"LAVA job {job_id} failed with JobError "
                "(possible LAVA timeout misconfiguration/bug). Retry."
            )
    if "case" in metadata and metadata["case"] == "validate":
        raise MesaCIException(
            f"LAVA job {job_id} failed validation (possible download error). Retry."
        )


def raise_lava_error(job) -> None:
    # Look for infrastructure errors, raise them, and retry if we see them.
    results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
    results = lava_yaml.load(results_yaml)
    for res in results:
        metadata = res["metadata"]
        raise_exception_from_metadata(metadata, job.job_id)

    # If we reach this far, it means that the job ended without hwci script
    # result and no LAVA infrastructure problem was found
    job.status = "fail"


def show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}"):
    with GitlabSection(
        "job_data",
        "LAVA job info",
        type=LogSectionType.LAVA_POST_PROCESSING,
        start_collapsed=True,
        colour=colour,
    ):
        wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
        while not job.is_post_processed() and wait_post_processing_retries > 0:
            # Wait a little until LAVA finishes processing metadata
            time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC)
            wait_post_processing_retries -= 1

        if not job.is_post_processed():
            waited_for_sec: int = (
                WAIT_FOR_LAVA_POST_PROCESSING_RETRIES * WAIT_FOR_DEVICE_POLLING_TIME_SEC
            )
            print_log(
                f"Waited for {waited_for_sec} seconds"
                "for LAVA to post-process the job, it haven't finished yet. "
                "Dumping it's info anyway"
            )

        details: dict[str, str] = job.show()
        for field, value in details.items():
            print(f"{field:<15}: {value}")
        job.refresh_log()


def fetch_logs(job, max_idle_time, log_follower) -> None:
    is_job_hanging(job, max_idle_time)

    time.sleep(LOG_POLLING_TIME_SEC)
    new_log_lines = fetch_new_log_lines(job)
    parsed_lines = parse_log_lines(job, log_follower, new_log_lines)

    for line in parsed_lines:
        print_log(line)


def is_job_hanging(job, max_idle_time):
    # Poll to check for new logs, assuming that a prolonged period of
    # silence means that the device has died and we should try it again
    if datetime.now() - job.last_log_time > max_idle_time:
        max_idle_time_min = max_idle_time.total_seconds() / 60

        raise MesaCITimeoutError(
            f"{CONSOLE_LOG['BOLD']}"
            f"{CONSOLE_LOG['FG_YELLOW']}"
            f"LAVA job {job.job_id} does not respond for {max_idle_time_min} "
            "minutes. Retry."
            f"{CONSOLE_LOG['RESET']}",
            timeout_duration=max_idle_time,
        )


def parse_log_lines(job, log_follower, new_log_lines):
    if log_follower.feed(new_log_lines):
        # If we had non-empty log data, we can assure that the device is alive.
        job.heartbeat()
    parsed_lines = log_follower.flush()

    # Only parse job results when the script reaches the end of the logs.
    # Depending on how much payload the RPC scheduler.jobs.logs get, it may
    # reach the LAVA_POST_PROCESSING phase.
    if log_follower.current_section.type in (
        LogSectionType.TEST_CASE,
        LogSectionType.LAVA_POST_PROCESSING,
    ):
        parsed_lines = job.parse_job_result_from_log(parsed_lines)
    return parsed_lines


def fetch_new_log_lines(job):
    # The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
    # Retry the log fetching several times before exposing the error.
    for _ in range(5):
        with contextlib.suppress(MesaCIParseException):
            new_log_lines = job.get_logs()
            break
    else:
        raise MesaCIParseException
    return new_log_lines


def submit_job(job):
    try:
        job.submit()
    except Exception as mesa_ci_err:
        raise MesaCIException(
            f"Could not submit LAVA job. Reason: {mesa_ci_err}"
        ) from mesa_ci_err


def wait_for_job_get_started(job):
    print_log(f"Waiting for job {job.job_id} to start.")
    while not job.is_started():
        time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
    job.refresh_log()
    print_log(f"Job {job.job_id} started.")


def bootstrap_log_follower() -> LogFollower:
    gl = GitlabSection(
        id="lava_boot",
        header="LAVA boot",
        type=LogSectionType.LAVA_BOOT,
        start_collapsed=True,
    )
    print(gl.start())
    return LogFollower(starting_section=gl)


def follow_job_execution(job, log_follower):
    with log_follower:
        max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
        # Start to check job's health
        job.heartbeat()
        while not job.is_finished:
            fetch_logs(job, max_idle_time, log_follower)
            structural_log_phases(job, log_follower)

    # Mesa Developers expect to have a simple pass/fail job result.
    # If this does not happen, it probably means a LAVA infrastructure error
    # happened.
    if job.status not in ["pass", "fail"]:
        raise_lava_error(job)


def structural_log_phases(job, log_follower):
    phases: dict[str, Any] = {
        s.header.split(" - ")[0]: {
            k: str(getattr(s, k)) for k in ("start_time", "end_time")
        }
        for s in log_follower.section_history
    }
    job.log["dut_job_phases"] = phases


def print_job_final_status(job):
    if job.status == "running":
        job.status = "hung"

    color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"])
    print_log(
        f"{color}"
        f"LAVA Job finished with status: {job.status}"
        f"{CONSOLE_LOG['RESET']}"
    )

    job.refresh_log()
    show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}")


def execute_job_with_retries(
    proxy, job_definition, retry_count, jobs_log
) -> Optional[LAVAJob]:
    for attempt_no in range(1, retry_count + 2):
        # Need to get the logger value from its object to enable autosave
        # features
        jobs_log.append({})
        job_log = jobs_log[-1]
        job = LAVAJob(proxy, job_definition, job_log)
        STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no

        try:
            submit_job(job)
            wait_for_job_get_started(job)
            log_follower: LogFollower = bootstrap_log_follower()
            follow_job_execution(job, log_follower)
            return job

        except (MesaCIException, KeyboardInterrupt) as exception:
            job.handle_exception(exception)
            print_log(
                f"{CONSOLE_LOG['BOLD']}"
                f"Finished executing LAVA job in the attempt #{attempt_no}"
                f"{CONSOLE_LOG['RESET']}"
            )

        finally:
            job_log["finished_time"] = datetime.now().isoformat()
            print_job_final_status(job)


def retriable_follow_job(proxy, job_definition) -> LAVAJob:
    number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION

    if finished_job := execute_job_with_retries(
        proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"]
    ):
        return finished_job

    # Job failed in all attempts
    raise MesaCIRetryError(
        f"{CONSOLE_LOG['BOLD']}"
        f"{CONSOLE_LOG['FG_RED']}"
        "Job failed after it exceeded the number of "
        f"{number_of_retries} retries."
        f"{CONSOLE_LOG['RESET']}",
        retry_count=number_of_retries,
    )


@dataclass
class PathResolver:
    def __post_init__(self):
        for field in fields(self):
            value = getattr(self, field.name)
            if not value:
                continue
            if field.type == pathlib.Path:
                value = pathlib.Path(value)
                setattr(self, field.name, value.resolve())


@dataclass
class LAVAJobSubmitter(PathResolver):
    boot_method: str
    ci_project_dir: str
    device_type: str
    job_timeout_min: int  # The job timeout in minutes
    build_url: str = None
    dtb_filename: str = None
    dump_yaml: bool = False  # Whether to dump the YAML payload to stdout
    first_stage_init: str = None
    jwt_file: pathlib.Path = None
    kernel_image_name: str = None
    kernel_image_type: str = ""
    kernel_url_prefix: str = None
    lava_tags: str = ""  # Comma-separated LAVA tags for the job
    mesa_job_name: str = "mesa_ci_job"
    pipeline_info: str = ""
    rootfs_url_prefix: str = None
    validate_only: bool = False  # Whether to only validate the job, not execute it
    visibility_group: str = None  # Only affects LAVA farm maintainers
    job_rootfs_overlay_url: str = None
    structured_log_file: pathlib.Path = None  # Log file path with structured LAVA log

    def __post_init__(self) -> None:
        super().__post_init__()
        # Remove mesa job names with spaces, which breaks the lava-test-case command
        self.mesa_job_name = self.mesa_job_name.split(" ")[0]

        if self.structured_log_file:
            self.setup_structured_logger()

    def dump(self, job_definition):
        if self.dump_yaml:
            with GitlabSection(
                "yaml_dump",
                "LAVA job definition (YAML)",
                type=LogSectionType.LAVA_BOOT,
                start_collapsed=True,
            ):
                print(hide_sensitive_data(job_definition))

    def submit(self):
        proxy = setup_lava_proxy()

        # Overwrite the timeout for the testcases with the value offered by the
        # user. The testcase running time should be at least 4 times greater than
        # the other sections (boot and setup), so we can safely ignore them.
        # If LAVA fails to stop the job at this stage, it will fall back to the
        # script section timeout with a reasonable delay.
        GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta(
            minutes=self.job_timeout_min
        )

        job_definition_stream = StringIO()
        lava_yaml.dump(generate_lava_yaml_payload(self), job_definition_stream)
        job_definition = job_definition_stream.getvalue()

        self.dump(job_definition)

        job = LAVAJob(proxy, job_definition)
        if errors := job.validate():
            fatal_err(f"Error in LAVA job definition: {errors}")
        print_log("LAVA job definition validated successfully")

        if self.validate_only:
            return

        try:
            finished_job = retriable_follow_job(proxy, job_definition)
        except Exception as exception:
            STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
            raise exception
        exit_code = 0 if finished_job.status == "pass" else 1
        STRUCTURAL_LOG["job_combined_status"] = job.status
        sys.exit(exit_code)

    def setup_structured_logger(self):
        try:
            global STRUCTURAL_LOG
            STRUCTURAL_LOG = StructuredLogger(
                self.structured_log_file, truncate=True
            ).data
        except NameError as e:
            print(
                f"Could not import StructuredLogger library: {e}. "
                "Falling back to DummyLogger"
            )

        STRUCTURAL_LOG["fixed_tags"] = self.lava_tags
        STRUCTURAL_LOG["dut_job_type"] = self.device_type
        STRUCTURAL_LOG["job_combined_fail_reason"] = None
        STRUCTURAL_LOG["job_combined_status"] = "not_submitted"


if __name__ == "__main__":
    # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
    # GitLab runner -> GitLab primary -> user, safe to say we don't need any
    # more buffering
    sys.stdout.reconfigure(line_buffering=True)
    sys.stderr.reconfigure(line_buffering=True)

    fire.Fire(LAVAJobSubmitter)