summaryrefslogtreecommitdiff
path: root/test/lib/ansible_test/_internal/coverage/analyze/targets/__init__.py
blob: a01b804f260afd7be4704eba02da679d340755c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Analyze integration test target code coverage."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import os

from .... import types as t

from ....io import (
    read_json_file,
    write_json_file,
)

from ....util import (
    ApplicationError,
    display,
)

from .. import (
    CoverageAnalyzeConfig,
)

if t.TYPE_CHECKING:
    Arcs = t.Dict[str, t.Dict[t.Tuple[int, int], t.Set[int]]]
    Lines = t.Dict[str, t.Dict[int, t.Set[int]]]
    TargetIndexes = t.Dict[str, int]
    TargetSetIndexes = t.Dict[t.FrozenSet[int], int]


def make_report(target_indexes, arcs, lines):  # type: (TargetIndexes, Arcs, Lines) -> t.Dict[str, t.Any]
    """Condense target indexes, arcs and lines into a compact report."""
    set_indexes = {}
    arc_refs = dict((path, dict((format_arc(arc), get_target_set_index(indexes, set_indexes)) for arc, indexes in data.items())) for path, data in arcs.items())
    line_refs = dict((path, dict((line, get_target_set_index(indexes, set_indexes)) for line, indexes in data.items())) for path, data in lines.items())

    report = dict(
        targets=[name for name, index in sorted(target_indexes.items(), key=lambda kvp: kvp[1])],
        target_sets=[sorted(data) for data, index in sorted(set_indexes.items(), key=lambda kvp: kvp[1])],
        arcs=arc_refs,
        lines=line_refs,
    )

    return report


def load_report(report):  # type: (t.Dict[str, t.Any]) -> t.Tuple[t.List[str], Arcs, Lines]
    """Extract target indexes, arcs and lines from an existing report."""
    try:
        target_indexes = report['targets']  # type: t.List[str]
        target_sets = report['target_sets']  # type: t.List[t.List[int]]
        arc_data = report['arcs']  # type: t.Dict[str, t.Dict[str, int]]
        line_data = report['lines']  # type: t.Dict[str, t.Dict[int, int]]
    except KeyError as ex:
        raise ApplicationError('Document is missing key "%s".' % ex.args)
    except TypeError:
        raise ApplicationError('Document is type "%s" instead of "dict".' % type(report).__name__)

    arcs = dict((path, dict((parse_arc(arc), set(target_sets[index])) for arc, index in data.items())) for path, data in arc_data.items())
    lines = dict((path, dict((int(line), set(target_sets[index])) for line, index in data.items())) for path, data in line_data.items())

    return target_indexes, arcs, lines


def read_report(path):  # type: (str) -> t.Tuple[t.List[str], Arcs, Lines]
    """Read a JSON report from disk."""
    try:
        report = read_json_file(path)
    except Exception as ex:
        raise ApplicationError('File "%s" is not valid JSON: %s' % (path, ex))

    try:
        return load_report(report)
    except ApplicationError as ex:
        raise ApplicationError('File "%s" is not an aggregated coverage data file. %s' % (path, ex))


def write_report(args, report, path):  # type: (CoverageAnalyzeTargetsConfig, t.Dict[str, t.Any], str) -> None
    """Write a JSON report to disk."""
    if args.explain:
        return

    write_json_file(path, report, formatted=False)

    display.info('Generated %d byte report with %d targets covering %d files.' % (
        os.path.getsize(path), len(report['targets']), len(set(report['arcs'].keys()) | set(report['lines'].keys())),
    ), verbosity=1)


def format_arc(value):  # type: (t.Tuple[int, int]) -> str
    """Format an arc tuple as a string."""
    return '%d:%d' % value


def parse_arc(value):  # type: (str) -> t.Tuple[int, int]
    """Parse an arc string into a tuple."""
    first, last = tuple(map(int, value.split(':')))
    return first, last


def get_target_set_index(data, target_set_indexes):  # type: (t.Set[int], TargetSetIndexes) -> int
    """Find or add the target set in the result set and return the target set index."""
    return target_set_indexes.setdefault(frozenset(data), len(target_set_indexes))


def get_target_index(name, target_indexes):  # type: (str, TargetIndexes) -> int
    """Find or add the target in the result set and return the target index."""
    return target_indexes.setdefault(name, len(target_indexes))


class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig):
    """Configuration for the `coverage analyze targets` command."""
    def __init__(self, args):  # type: (t.Any) -> None
        super(CoverageAnalyzeTargetsConfig, self).__init__(args)

        self.info_stderr = True