#
# Copyright (C) 2016-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
#
# Authors:
# Tristan Van Berkom
# Jürg Billeter
# Tristan Maat
import os
import itertools
from operator import itemgetter
from collections import OrderedDict
from pyroaring import BitMap # pylint: disable=no-name-in-module
from ._exceptions import PipelineError
from ._message import Message, MessageType
from ._profile import Topics, PROFILER
from . import Scope, Consistency
from ._project import ProjectRefStorage
# PipelineSelection()
#
# Defines the kind of pipeline selection to make when the pipeline
# is provided a list of targets, for whichever purpose.
#
# These values correspond to the CLI `--deps` arguments for convenience.
#
class PipelineSelection():
# Select only the target elements in the associated targets
NONE = 'none'
# As NONE, but redirect elements that are capable of it
REDIRECT = 'redirect'
# Select elements which must be built for the associated targets to be built
PLAN = 'plan'
# All dependencies of all targets, including the targets
ALL = 'all'
# All direct build dependencies and their recursive runtime dependencies,
# excluding the targets
BUILD = 'build'
# All direct runtime dependencies and their recursive runtime dependencies,
# including the targets
RUN = 'run'
# Pipeline()
#
# Args:
# project (Project): The Project object
# context (Context): The Context object
# artifacts (Context): The ArtifactCache object
#
class Pipeline():
def __init__(self, context, project, artifacts):
self._context = context # The Context
self._project = project # The toplevel project
#
# Private members
#
self._artifacts = artifacts
# load()
#
# Loads elements from target names.
#
# This function is called with a list of lists, such that multiple
# target groups may be specified. Element names specified in `targets`
# are allowed to be redundant.
#
# Args:
# target_groups (list of lists): Groups of toplevel targets to load
# rewritable (bool): Whether the loaded files should be rewritable
# this is a bit more expensive due to deep copies
#
# Returns:
# (tuple of lists): A tuple of grouped Element objects corresponding to target_groups
#
def load(self, target_groups, *, rewritable=False):
# First concatenate all the lists for the loader's sake
targets = list(itertools.chain(*target_groups))
with PROFILER.profile(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, "-") for t in targets)):
elements = self._project.load_elements(targets, rewritable=rewritable)
# Now create element groups to match the input target groups
elt_iter = iter(elements)
element_groups = [
[next(elt_iter) for i in range(len(group))]
for group in target_groups
]
return tuple(element_groups)
# load_artifacts()
#
# Loads ArtifactElements from target artifacts.
#
# Args:
# target (list [str]): Target artifacts to load
#
# Returns:
# (list [ArtifactElement]): A list of ArtifactElement objects
#
def load_artifacts(self, targets):
# XXX: This is not included as part of the "load-pipeline" profiler, we could move
# the profiler to Stream?
return self._project.load_artifacts(targets)
# resolve_elements()
#
# Resolve element state and cache keys.
#
# Args:
# targets (list of Element): The list of toplevel element targets
#
def resolve_elements(self, targets):
with self._context.messenger.timed_activity("Resolving cached state", silent_nested=True):
# XXX: Now that Element._update_state() can trigger recursive update_state calls
# it is possible that we could get a RecursionError. However, this is unlikely
# to happen, even for large projects (tested with the Debian stack). Although,
# if it does become a problem we may have to set the recursion limit to a
# greater value.
for element in self.dependencies(targets, Scope.ALL):
# Determine initial element state.
if not element._resolved_initial_state:
element._update_state()
# We may already have Elements which are cached and have their runtimes
# cached, if this is the case, we should immediately notify their reverse
# dependencies.
element._update_ready_for_runtime_and_cached()
# check_remotes()
#
# Check if the target artifact is cached in any of the available remotes
#
# Args:
# targets (list [Element]): The list of element targets
#
def check_remotes(self, targets):
with self._context.messenger.timed_activity("Querying remotes for cached status", silent_nested=True):
for element in targets:
element._cached_remotely()
# dependencies()
#
# Generator function to iterate over elements and optionally
# also iterate over sources.
#
# Args:
# targets (list of Element): The target Elements to loop over
# scope (Scope): The scope to iterate over
# recurse (bool): Whether to recurse into dependencies
#
def dependencies(self, targets, scope, *, recurse=True):
# Keep track of 'visited' in this scope, so that all targets
# share the same context.
visited = (BitMap(), BitMap())
for target in targets:
for element in target.dependencies(scope, recurse=recurse, visited=visited):
yield element
# plan()
#
# Generator function to iterate over only the elements
# which are required to build the pipeline target, omitting
# cached elements. The elements are yielded in a depth sorted
# ordering for optimal build plans
#
# Args:
# elements (list of Element): List of target elements to plan
#
# Returns:
# (list of Element): A depth sorted list of the build plan
#
def plan(self, elements):
# Keep locally cached elements in the plan if remote artifact cache is used
# to allow pulling artifact with strict cache key, if available.
plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes()
return _Planner().plan(elements, plan_cached)
# get_selection()
#
# Gets a full list of elements based on a toplevel
# list of element targets
#
# Args:
# targets (list of Element): The target Elements
# mode (PipelineSelection): The PipelineSelection mode
#
# Various commands define a --deps option to specify what elements to
# use in the result, this function reports a list that is appropriate for
# the selected option.
#
def get_selection(self, targets, mode, *, silent=True):
elements = None
if mode == PipelineSelection.NONE:
elements = targets
elif mode == PipelineSelection.REDIRECT:
# Redirect and log if permitted
elements = []
for t in targets:
new_elm = t._get_source_element()
if new_elm != t and not silent:
self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
.format(t.name, new_elm.name))
if new_elm not in elements:
elements.append(new_elm)
elif mode == PipelineSelection.PLAN:
elements = self.plan(targets)
else:
if mode == PipelineSelection.ALL:
scope = Scope.ALL
elif mode == PipelineSelection.BUILD:
scope = Scope.BUILD
elif mode == PipelineSelection.RUN:
scope = Scope.RUN
elements = list(self.dependencies(targets, scope))
return elements
# except_elements():
#
# Return what we are left with after the intersection between
# excepted and target elements and their unique dependencies is
# gone.
#
# Args:
# targets (list of Element): List of toplevel targetted elements
# elements (list of Element): The list to remove elements from
# except_targets (list of Element): List of toplevel except targets
#
# Returns:
# (list of Element): The elements list with the intersected
# exceptions removed
#
def except_elements(self, targets, elements, except_targets):
if not except_targets:
return elements
targeted = list(self.dependencies(targets, Scope.ALL))
visited = []
def find_intersection(element):
if element in visited:
return
visited.append(element)
# Intersection elements are those that are also in
# 'targeted', as long as we don't recurse into them.
if element in targeted:
yield element
else:
for dep in element.dependencies(Scope.ALL, recurse=False):
yield from find_intersection(dep)
# Build a list of 'intersection' elements, i.e. the set of
# elements that lie on the border closest to excepted elements
# between excepted and target elements.
intersection = list(itertools.chain.from_iterable(
find_intersection(element) for element in except_targets
))
# Now use this set of elements to traverse the targeted
# elements, except 'intersection' elements and their unique
# dependencies.
queue = []
visited = []
queue.extend(targets)
while queue:
element = queue.pop()
if element in visited or element in intersection:
continue
visited.append(element)
queue.extend(element.dependencies(Scope.ALL, recurse=False))
# That looks like a lot, but overall we only traverse (part
# of) the graph twice. This could be reduced to once if we
# kept track of parent elements, but is probably not
# significant.
# Ensure that we return elements in the same order they were
# in before.
return [element for element in elements if element in visited]
# targets_include()
#
# Checks whether the given targets are, or depend on some elements
#
# Args:
# targets (list of Element): A list of targets
# elements (list of Element): List of elements to check
#
# Returns:
# (bool): True if all of `elements` are the `targets`, or are
# somehow depended on by `targets`.
#
def targets_include(self, targets, elements):
target_element_set = set(self.dependencies(targets, Scope.ALL))
element_set = set(elements)
return element_set.issubset(target_element_set)
# subtract_elements()
#
# Subtract a subset of elements
#
# Args:
# elements (list of Element): The element list
# subtract (list of Element): List of elements to subtract from elements
#
# Returns:
# (list): The original elements list, with elements in subtract removed
#
def subtract_elements(self, elements, subtract):
subtract_set = set(subtract)
return [
e for e in elements
if e not in subtract_set
]
# add_elements()
#
# Add to a list of elements all elements that are not already in it
#
# Args:
# elements (list of Element): The element list
# add (list of Element): List of elements to add
#
# Returns:
# (list): The original elements list, with elements in add that weren't
# already in it added.
def add_elements(self, elements, add):
ret = elements[:]
ret.extend(e for e in add if e not in ret)
return ret
# track_cross_junction_filter()
#
# Filters out elements which are across junction boundaries,
# otherwise asserts that there are no such elements.
#
# This is currently assumed to be only relevant for element
# lists targetted at tracking.
#
# Args:
# project (Project): Project used for cross_junction filtering.
# All elements are expected to belong to that project.
# elements (list of Element): The list of elements to filter
# cross_junction_requested (bool): Whether the user requested
# cross junction tracking
#
# Returns:
# (list of Element): The filtered or asserted result
#
def track_cross_junction_filter(self, project, elements, cross_junction_requested):
# Filter out cross junctioned elements
if not cross_junction_requested:
elements = self._filter_cross_junctions(project, elements)
self._assert_junction_tracking(elements)
return elements
# assert_consistent()
#
# Asserts that the given list of elements are in a consistent state, that
# is to say that all sources are consistent and can at least be fetched.
#
# Consequently it also means that cache keys can be resolved.
#
def assert_consistent(self, elements):
inconsistent = []
inconsistent_workspaced = []
with self._context.messenger.timed_activity("Checking sources"):
for element in elements:
if element._get_consistency() == Consistency.INCONSISTENT:
if element._get_workspace():
inconsistent_workspaced.append(element)
else:
inconsistent.append(element)
if inconsistent:
detail = "Exact versions are missing for the following elements:\n\n"
for element in inconsistent:
detail += " Element: {} is inconsistent\n".format(element._get_full_name())
for source in element.sources():
if source._get_consistency() == Consistency.INCONSISTENT:
detail += " {} is missing ref\n".format(source)
detail += '\n'
detail += "Try tracking these elements first with `bst source track`\n"
raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
if inconsistent_workspaced:
detail = "Some workspaces do not exist but are not closed\n" + \
"Try closing them with `bst workspace close`\n\n"
for element in inconsistent_workspaced:
detail += " " + element._get_full_name() + "\n"
raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced")
# assert_sources_cached()
#
# Asserts that sources for the given list of elements are cached.
#
# Args:
# elements (list): The list of elements
#
def assert_sources_cached(self, elements):
uncached = []
with self._context.messenger.timed_activity("Checking sources"):
for element in elements:
if element._get_consistency() < Consistency.CACHED and \
not element._source_cached():
uncached.append(element)
if uncached:
detail = "Sources are not cached for the following elements:\n\n"
for element in uncached:
detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name())
for source in element.sources():
if source._get_consistency() < Consistency.CACHED:
detail += " {}\n".format(source)
detail += '\n'
detail += "Try fetching these elements first with `bst source fetch`,\n" + \
"or run this command with `--fetch` option\n"
raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
#############################################################
# Private Methods #
#############################################################
# _filter_cross_junction()
#
# Filters out cross junction elements from the elements
#
# Args:
# project (Project): The project on which elements are allowed
# elements (list of Element): The list of elements to be tracked
#
# Returns:
# (list): A filtered list of `elements` which does
# not contain any cross junction elements.
#
def _filter_cross_junctions(self, project, elements):
return [
element for element in elements
if element._get_project() is project
]
# _assert_junction_tracking()
#
# Raises an error if tracking is attempted on junctioned elements and
# a project.refs file is not enabled for the toplevel project.
#
# Args:
# elements (list of Element): The list of elements to be tracked
#
def _assert_junction_tracking(self, elements):
# We can track anything if the toplevel project uses project.refs
#
if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS:
return
# Ideally, we would want to report every cross junction element but not
# their dependencies, unless those cross junction elements dependencies
# were also explicitly requested on the command line.
#
# But this is too hard, lets shoot for a simple error.
for element in elements:
element_project = element._get_project()
if element_project is not self._project:
detail = "Requested to track sources across junction boundaries\n" + \
"in a project which does not use project.refs ref-storage."
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
# _message()
#
# Local message propagator
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
self._context.messenger.message(
Message(message_type, message, **args))
# _Planner()
#
# An internal object used for constructing build plan
# from a given resolved toplevel element, while considering what
# parts need to be built depending on build only dependencies
# being cached, and depth sorting for more efficient processing.
#
class _Planner():
def __init__(self):
self.depth_map = OrderedDict()
self.visiting_elements = set()
# Here we want to traverse the same element more than once when
# it is reachable from multiple places, with the interest of finding
# the deepest occurance of every element
def plan_element(self, element, depth):
if element in self.visiting_elements:
# circular dependency, already being processed
return
prev_depth = self.depth_map.get(element)
if prev_depth is not None and prev_depth >= depth:
# element and dependencies already processed at equal or greater depth
return
self.visiting_elements.add(element)
for dep in element.dependencies(Scope.RUN, recurse=False):
self.plan_element(dep, depth)
# Dont try to plan builds of elements that are cached already
if not element._cached_success():
for dep in element.dependencies(Scope.BUILD, recurse=False):
self.plan_element(dep, depth + 1)
self.depth_map[element] = depth
self.visiting_elements.remove(element)
def plan(self, roots, plan_cached):
for root in roots:
self.plan_element(root, 0)
depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True)
# Set the depth of each element
for index, item in enumerate(depth_sorted):
item[0]._set_depth(index)
return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]