#!/usr/bin/env python3 # # Copyright (C) 2020 - 2023 Collabora Limited # Authors: # Gustavo Padovan # Guilherme Gallo # # SPDX-License-Identifier: MIT """Send a job to LAVA, track it and collect log back""" import contextlib import json import pathlib import sys import time from collections import defaultdict from dataclasses import dataclass, fields from datetime import datetime, timedelta from io import StringIO from os import environ, getenv, path from typing import Any, Optional import fire from lava.exceptions import ( MesaCIException, MesaCIParseException, MesaCIRetryError, MesaCITimeoutError, ) from lava.utils import CONSOLE_LOG from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS from lava.utils import ( GitlabSection, LAVAJob, LogFollower, LogSectionType, call_proxy, fatal_err, generate_lava_yaml_payload, hide_sensitive_data, print_log, setup_lava_proxy, ) from lavacli.utils import flow_yaml as lava_yaml # Initialize structural logging with a defaultdict, it can be changed for more # sophisticated dict-like data abstractions. STRUCTURAL_LOG = defaultdict(list) try: from ci.structured_logger import StructuredLogger except ImportError as e: print_log( f"Could not import StructuredLogger library: {e}. " "Falling back to defaultdict based structured logger." ) # Timeout in seconds to decide if the device from the dispatched LAVA job has # hung or not due to the lack of new log output. DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60)) # How many seconds the script should wait before try a new polling iteration to # check if the dispatched LAVA job is running or waiting in the job queue. WAIT_FOR_DEVICE_POLLING_TIME_SEC = int( getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1) ) # How many seconds the script will wait to let LAVA finalize the job and give # the final details. WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5)) WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int( getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 3) ) # How many seconds to wait between log output LAVA RPC calls. LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5)) # How many retries should be made when a timeout happen. NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int( getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2) ) def raise_exception_from_metadata(metadata: dict, job_id: int) -> None: """ Investigate infrastructure errors from the job metadata. If it finds an error, raise it as MesaCIException. """ if "result" not in metadata or metadata["result"] != "fail": return if "error_type" in metadata: error_type = metadata["error_type"] if error_type == "Infrastructure": raise MesaCIException( f"LAVA job {job_id} failed with Infrastructure Error. Retry." ) if error_type == "Job": # This happens when LAVA assumes that the job cannot terminate or # with mal-formed job definitions. As we are always validating the # jobs, only the former is probable to happen. E.g.: When some LAVA # action timed out more times than expected in job definition. raise MesaCIException( f"LAVA job {job_id} failed with JobError " "(possible LAVA timeout misconfiguration/bug). Retry." ) if "case" in metadata and metadata["case"] == "validate": raise MesaCIException( f"LAVA job {job_id} failed validation (possible download error). Retry." ) def raise_lava_error(job) -> None: # Look for infrastructure errors, raise them, and retry if we see them. results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id) results = lava_yaml.load(results_yaml) for res in results: metadata = res["metadata"] raise_exception_from_metadata(metadata, job.job_id) # If we reach this far, it means that the job ended without hwci script # result and no LAVA infrastructure problem was found job.status = "fail" def show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}"): with GitlabSection( "job_data", "LAVA job info", type=LogSectionType.LAVA_POST_PROCESSING, start_collapsed=True, colour=colour, ): wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES while not job.is_post_processed() and wait_post_processing_retries > 0: # Wait a little until LAVA finishes processing metadata time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC) wait_post_processing_retries -= 1 if not job.is_post_processed(): waited_for_sec: int = ( WAIT_FOR_LAVA_POST_PROCESSING_RETRIES * WAIT_FOR_DEVICE_POLLING_TIME_SEC ) print_log( f"Waited for {waited_for_sec} seconds" "for LAVA to post-process the job, it haven't finished yet. " "Dumping it's info anyway" ) details: dict[str, str] = job.show() for field, value in details.items(): print(f"{field:<15}: {value}") job.refresh_log() def fetch_logs(job, max_idle_time, log_follower) -> None: is_job_hanging(job, max_idle_time) time.sleep(LOG_POLLING_TIME_SEC) new_log_lines = fetch_new_log_lines(job) parsed_lines = parse_log_lines(job, log_follower, new_log_lines) for line in parsed_lines: print_log(line) def is_job_hanging(job, max_idle_time): # Poll to check for new logs, assuming that a prolonged period of # silence means that the device has died and we should try it again if datetime.now() - job.last_log_time > max_idle_time: max_idle_time_min = max_idle_time.total_seconds() / 60 raise MesaCITimeoutError( f"{CONSOLE_LOG['BOLD']}" f"{CONSOLE_LOG['FG_YELLOW']}" f"LAVA job {job.job_id} does not respond for {max_idle_time_min} " "minutes. Retry." f"{CONSOLE_LOG['RESET']}", timeout_duration=max_idle_time, ) def parse_log_lines(job, log_follower, new_log_lines): if log_follower.feed(new_log_lines): # If we had non-empty log data, we can assure that the device is alive. job.heartbeat() parsed_lines = log_follower.flush() # Only parse job results when the script reaches the end of the logs. # Depending on how much payload the RPC scheduler.jobs.logs get, it may # reach the LAVA_POST_PROCESSING phase. if log_follower.current_section.type in ( LogSectionType.TEST_CASE, LogSectionType.LAVA_POST_PROCESSING, ): parsed_lines = job.parse_job_result_from_log(parsed_lines) return parsed_lines def fetch_new_log_lines(job): # The XMLRPC binary packet may be corrupted, causing a YAML scanner error. # Retry the log fetching several times before exposing the error. for _ in range(5): with contextlib.suppress(MesaCIParseException): new_log_lines = job.get_logs() break else: raise MesaCIParseException return new_log_lines def submit_job(job): try: job.submit() except Exception as mesa_ci_err: raise MesaCIException( f"Could not submit LAVA job. Reason: {mesa_ci_err}" ) from mesa_ci_err def wait_for_job_get_started(job): print_log(f"Waiting for job {job.job_id} to start.") while not job.is_started(): time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC) job.refresh_log() print_log(f"Job {job.job_id} started.") def bootstrap_log_follower() -> LogFollower: gl = GitlabSection( id="lava_boot", header="LAVA boot", type=LogSectionType.LAVA_BOOT, start_collapsed=True, ) print(gl.start()) return LogFollower(starting_section=gl) def follow_job_execution(job, log_follower): with log_follower: max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) # Start to check job's health job.heartbeat() while not job.is_finished: fetch_logs(job, max_idle_time, log_follower) structural_log_phases(job, log_follower) # Mesa Developers expect to have a simple pass/fail job result. # If this does not happen, it probably means a LAVA infrastructure error # happened. if job.status not in ["pass", "fail"]: raise_lava_error(job) def structural_log_phases(job, log_follower): phases: dict[str, Any] = { s.header.split(" - ")[0]: { k: str(getattr(s, k)) for k in ("start_time", "end_time") } for s in log_follower.section_history } job.log["dut_job_phases"] = phases def print_job_final_status(job): if job.status == "running": job.status = "hung" color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"]) print_log( f"{color}" f"LAVA Job finished with status: {job.status}" f"{CONSOLE_LOG['RESET']}" ) job.refresh_log() show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}") def execute_job_with_retries( proxy, job_definition, retry_count, jobs_log ) -> Optional[LAVAJob]: last_failed_job = None for attempt_no in range(1, retry_count + 2): # Need to get the logger value from its object to enable autosave # features, if AutoSaveDict is enabled from StructuredLogging module jobs_log.append({}) job_log = jobs_log[-1] job = LAVAJob(proxy, job_definition, job_log) STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no try: job_log["submitter_start_time"] = datetime.now().isoformat() submit_job(job) wait_for_job_get_started(job) log_follower: LogFollower = bootstrap_log_follower() follow_job_execution(job, log_follower) return job except (MesaCIException, KeyboardInterrupt) as exception: job.handle_exception(exception) finally: print_job_final_status(job) # If LAVA takes too long to post process the job, the submitter # gives up and proceeds. job_log["submitter_end_time"] = datetime.now().isoformat() last_failed_job = job print_log( f"{CONSOLE_LOG['BOLD']}" f"Finished executing LAVA job in the attempt #{attempt_no}" f"{CONSOLE_LOG['RESET']}" ) return last_failed_job def retriable_follow_job(proxy, job_definition) -> LAVAJob: number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION last_attempted_job = execute_job_with_retries( proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"] ) if last_attempted_job.exception is not None: # Infra failed in all attempts raise MesaCIRetryError( f"{CONSOLE_LOG['BOLD']}" f"{CONSOLE_LOG['FG_RED']}" "Job failed after it exceeded the number of " f"{number_of_retries} retries." f"{CONSOLE_LOG['RESET']}", retry_count=number_of_retries, last_job=last_attempted_job, ) return last_attempted_job @dataclass class PathResolver: def __post_init__(self): for field in fields(self): value = getattr(self, field.name) if not value: continue if field.type == pathlib.Path: value = pathlib.Path(value) setattr(self, field.name, value.resolve()) @dataclass class LAVAJobSubmitter(PathResolver): boot_method: str ci_project_dir: str device_type: str job_timeout_min: int # The job timeout in minutes build_url: str = None dtb_filename: str = None dump_yaml: bool = False # Whether to dump the YAML payload to stdout first_stage_init: str = None jwt_file: pathlib.Path = None kernel_image_name: str = None kernel_image_type: str = "" kernel_url_prefix: str = None lava_tags: str = "" # Comma-separated LAVA tags for the job mesa_job_name: str = "mesa_ci_job" pipeline_info: str = "" rootfs_url_prefix: str = None validate_only: bool = False # Whether to only validate the job, not execute it visibility_group: str = None # Only affects LAVA farm maintainers job_rootfs_overlay_url: str = None structured_log_file: pathlib.Path = None # Log file path with structured LAVA log __structured_log_context = contextlib.nullcontext() # Structured Logger context def __post_init__(self) -> None: super().__post_init__() # Remove mesa job names with spaces, which breaks the lava-test-case command self.mesa_job_name = self.mesa_job_name.split(" ")[0] if not self.structured_log_file: return self.__structured_log_context = StructuredLoggerWrapper(self).logger_context() self.proxy = setup_lava_proxy() def __prepare_submission(self) -> str: # Overwrite the timeout for the testcases with the value offered by the # user. The testcase running time should be at least 4 times greater than # the other sections (boot and setup), so we can safely ignore them. # If LAVA fails to stop the job at this stage, it will fall back to the # script section timeout with a reasonable delay. GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta( minutes=self.job_timeout_min ) job_definition_stream = StringIO() lava_yaml.dump(generate_lava_yaml_payload(self), job_definition_stream) job_definition = job_definition_stream.getvalue() if self.dump_yaml: self.dump_job_definition(job_definition) validation_job = LAVAJob(self.proxy, job_definition) if errors := validation_job.validate(): fatal_err(f"Error in LAVA job definition: {errors}") print_log("LAVA job definition validated successfully") return job_definition @classmethod def is_under_ci(cls): ci_envvar: str = getenv("CI", "false") return ci_envvar.lower() == "true" def dump_job_definition(self, job_definition) -> None: with GitlabSection( "yaml_dump", "LAVA job definition (YAML)", type=LogSectionType.LAVA_BOOT, start_collapsed=True, ): print(hide_sensitive_data(job_definition)) def submit(self) -> None: """ Prepares and submits the LAVA job. If `validate_only` is True, it validates the job without submitting it. If the job finishes with a non-pass status or encounters an exception, the program exits with a non-zero return code. """ job_definition: str = self.__prepare_submission() if self.validate_only: return with self.__structured_log_context: last_attempt_job = None try: last_attempt_job = retriable_follow_job(self.proxy, job_definition) except MesaCIRetryError as retry_exception: last_attempt_job = retry_exception.last_job except Exception as exception: STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception) raise exception finally: self.finish_script(last_attempt_job) def print_log_artifact_url(self): base_url = "https://$CI_PROJECT_ROOT_NAMESPACE.pages.freedesktop.org/" artifacts_path = "-/$CI_PROJECT_NAME/-/jobs/$CI_JOB_ID/artifacts/" relative_log_path = self.structured_log_file.relative_to(pathlib.Path.cwd()) full_path = f"{base_url}{artifacts_path}{relative_log_path}" artifact_url = path.expandvars(full_path) print_log(f"Structural Logging data available at: {artifact_url}") def finish_script(self, last_attempt_job): if self.is_under_ci() and self.structured_log_file: self.print_log_artifact_url() if not last_attempt_job: # No job was run, something bad happened STRUCTURAL_LOG["job_combined_status"] = "script_crash" current_exception = str(sys.exc_info()[0]) STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception raise SystemExit(1) STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status if last_attempt_job.status != "pass": raise SystemExit(1) class StructuredLoggerWrapper: def __init__(self, submitter: LAVAJobSubmitter) -> None: self.__submitter: LAVAJobSubmitter = submitter def _init_logger(self): STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type STRUCTURAL_LOG["job_combined_fail_reason"] = None STRUCTURAL_LOG["job_combined_status"] = "not_submitted" STRUCTURAL_LOG["dut_attempt_counter"] = 0 # Initialize dut_jobs list to enable appends STRUCTURAL_LOG["dut_jobs"] = [] @contextlib.contextmanager def _simple_logger_context(self): log_file = pathlib.Path(self.__submitter.structured_log_file) log_file.parent.mkdir(parents=True, exist_ok=True) try: # Truncate the file log_file.write_text("") yield finally: log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2)) def logger_context(self): context = contextlib.nullcontext() try: global STRUCTURAL_LOG STRUCTURAL_LOG = StructuredLogger( self.__submitter.structured_log_file, truncate=True ).data except NameError: context = self._simple_logger_context() self._init_logger() return context if __name__ == "__main__": # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us -> # GitLab runner -> GitLab primary -> user, safe to say we don't need any # more buffering sys.stdout.reconfigure(line_buffering=True) sys.stderr.reconfigure(line_buffering=True) # LAVA farm is giving datetime in UTC timezone, let's set it locally for the # script run. # Setting environ here will not affect the system time, as the os.environ # lifetime follows the script one. environ["TZ"] = "UTC" time.tzset() fire.Fire(LAVAJobSubmitter)