summaryrefslogtreecommitdiff
path: root/test/lib/ansible_test/_internal/coverage/analyze/targets/filter.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/coverage/analyze/targets/filter.py')
-rw-r--r--test/lib/ansible_test/_internal/coverage/analyze/targets/filter.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/coverage/analyze/targets/filter.py b/test/lib/ansible_test/_internal/coverage/analyze/targets/filter.py
new file mode 100644
index 0000000000..95bec3760b
--- /dev/null
+++ b/test/lib/ansible_test/_internal/coverage/analyze/targets/filter.py
@@ -0,0 +1,104 @@
+"""Filter an aggregated coverage file, keeping only the specified targets."""
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import re
+
+from .... import types as t
+
+from . import (
+ CoverageAnalyzeTargetsConfig,
+ expand_indexes,
+ generate_indexes,
+ make_report,
+ read_report,
+ write_report,
+)
+
+if t.TYPE_CHECKING:
+ from . import (
+ NamedPoints,
+ TargetIndexes,
+ )
+
+
+def command_coverage_analyze_targets_filter(args): # type: (CoverageAnalyzeTargetsFilterConfig) -> None
+ """Filter target names in an aggregated coverage file."""
+ covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file)
+
+ filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, lambda v: v)
+ filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, lambda v: v)
+
+ include_targets = set(args.include_targets) if args.include_targets else None
+ exclude_targets = set(args.exclude_targets) if args.exclude_targets else None
+
+ include_path = re.compile(args.include_path) if args.include_path else None
+ exclude_path = re.compile(args.exclude_path) if args.exclude_path else None
+
+ def path_filter_func(path):
+ if include_path and not re.search(include_path, path):
+ return False
+
+ if exclude_path and re.search(exclude_path, path):
+ return False
+
+ return True
+
+ def target_filter_func(targets):
+ if include_targets:
+ targets &= include_targets
+
+ if exclude_targets:
+ targets -= exclude_targets
+
+ return targets
+
+ filtered_path_arcs = filter_data(filtered_path_arcs, path_filter_func, target_filter_func)
+ filtered_path_lines = filter_data(filtered_path_lines, path_filter_func, target_filter_func)
+
+ target_indexes = {} # type: TargetIndexes
+ indexed_path_arcs = generate_indexes(target_indexes, filtered_path_arcs)
+ indexed_path_lines = generate_indexes(target_indexes, filtered_path_lines)
+
+ report = make_report(target_indexes, indexed_path_arcs, indexed_path_lines)
+
+ write_report(args, report, args.output_file)
+
+
+def filter_data(
+ data, # type: NamedPoints
+ path_filter_func, # type: t.Callable[[str], bool]
+ target_filter_func, # type: t.Callable[[t.Set[str]], t.Set[str]]
+): # type: (...) -> NamedPoints
+ """Filter the data set using the specified filter function."""
+ result = {} # type: NamedPoints
+
+ for src_path, src_points in data.items():
+ if not path_filter_func(src_path):
+ continue
+
+ dst_points = {}
+
+ for src_point, src_targets in src_points.items():
+ dst_targets = target_filter_func(src_targets)
+
+ if dst_targets:
+ dst_points[src_point] = dst_targets
+
+ if dst_points:
+ result[src_path] = dst_points
+
+ return result
+
+
+class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig):
+ """Configuration for the `coverage analyze targets filter` command."""
+ def __init__(self, args): # type: (t.Any) -> None
+ super(CoverageAnalyzeTargetsFilterConfig, self).__init__(args)
+
+ self.input_file = args.input_file # type: str
+ self.output_file = args.output_file # type: str
+ self.include_targets = args.include_targets # type: t.List[str]
+ self.exclude_targets = args.exclude_targets # type: t.List[str]
+ self.include_path = args.include_path # type: t.Optional[str]
+ self.exclude_path = args.exclude_path # type: t.Optional[str]