# # Copyright (C) 2020 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom import os from contextlib import contextmanager from typing import TYPE_CHECKING, Optional, List, Tuple from .plugin import Plugin from .types import CoreWarnings, OverlapAction from .utils import FileListResult if TYPE_CHECKING: from typing import Dict # pylint: disable=cyclic-import from .element import Element # pylint: enable=cyclic-import # OverlapCollector() # # Collects results of Element.stage_artifact() and saves # them in order to raise a proper overlap error at the end # of staging. # # Args: # element (Element): The element for which we are staging artifacts # class OverlapCollector: def __init__(self, element: "Element"): # The Element we are staging for, on which we'll issue warnings self._element = element # type: Element # The list of sessions self._sessions = [] # type: List[OverlapCollectorSession] # The active session, if any self._session = None # type: Optional[OverlapCollectorSession] # session() # # Create a session for collecting overlaps, calls to OverlapCollector.collect_stage_result() # are expected to always occur within the context of a session (this context manager). # # Upon exiting this context, warnings and/or errors will be issued for any overlaps # which occurred either as a result of overlapping files within this session, or # as a result of files staged during this session, overlapping with files staged in # previous sessions in this OverlapCollector. # # Args: # action (OverlapAction): The action to take for this overall session's overlaps with other sessions # location (str): The Sandbox relative location this session was created for # @contextmanager def session(self, action: str, location: Optional[str]): assert self._session is None, "Stage session already started" if location is None: location = "/" self._session = OverlapCollectorSession(self._element, action, location) # Run code body where staging results can be collected. yield # Issue warnings for the current session, passing along previously completed sessions self._session.warnings(self._sessions) # Store the newly ended session and end the session self._sessions.append(self._session) self._session = None # collect_stage_result() # # Collect and accumulate results of Element.stage_artifact() # # Args: # element (Element): The name of the element staged # result (FileListResult): The result of Element.stage_artifact() # def collect_stage_result(self, element: "Element", result: FileListResult): assert self._session is not None, "Staging files outside of staging session" self._session.collect_stage_result(element, result) # OverlapCollectorSession() # # Collect the results of a single session # # Args: # element (Element): The element for which we are staging artifacts # action (OverlapAction): The action to take for this overall session's overlaps with other sessions # location (str): The Sandbox relative location this session was created for # class OverlapCollectorSession: def __init__(self, element: "Element", action: str, location: str): # The Element we are staging for, on which we'll issue warnings self._element = element # type: Element # The OverlapAction for this session self._action = action # type: str # The Sandbox relative directory this session was created for self._location = location # type: str # Dictionary of files which were ignored (See FileListResult()), keyed by element unique ID self._ignored = {} # type: Dict[int, List[str]] # Dictionary of files which were staged, keyed by element unique ID self._files_written = {} # type: Dict[int, List[str]] # Dictionary of element IDs which overlapped, keyed by the file they overlap on self._overlaps = {} # type: Dict[str, List[int]] # collect_stage_result() # # Collect and accumulate results of Element.stage_artifact() # # Args: # element (Element): The name of the element staged # result (FileListResult): The result of Element.stage_artifact() # def collect_stage_result(self, element: "Element", result: FileListResult): for overwritten_file in result.overwritten: overlap_list = None try: overlap_list = self._overlaps[overwritten_file] except KeyError: # Create a fresh list # self._overlaps[overwritten_file] = overlap_list = [] # Search files which were staged in this session, start the # list off with the bottom most element # for element_id, staged_files in self._files_written.items(): if overwritten_file in staged_files: overlap_list.append(element_id) break # Add the currently staged element to the overlap list, it might be # the only element in the list if it overlaps with a file staged # from a previous session. # overlap_list.append(element._unique_id) # Record written files and ignored files. # self._files_written[element._unique_id] = result.files_written if result.ignored: self._ignored[element._unique_id] = result.ignored # warnings() # # Issue any warnings as a batch as a result of staging artifacts, # based on the results collected with collect_stage_result(). # # Args: # sessions (list): List of previously completed sessions # def warnings(self, sessions: List["OverlapCollectorSession"]): # Collect a table of filenames which overlapped something from outside of this session. # external_overlaps = {} # type: Dict[str, int] # # First issue the warnings for this session # if self._overlaps: overlap_warning = False detail = "Staged files overwrite existing files in staging area: {}\n".format(self._location) for filename, element_ids in self._overlaps.items(): # If there is only one element in the overlap list, it means it has # overlapped a file from a previous session. # # Ignore it and handle the warning below # if len(element_ids) == 1: external_overlaps[filename] = element_ids[0] continue # Filter whitelisted elements out of the list of overlapping elements # # Ignore the bottom-most element as it does not overlap anything. # overlapping_element_ids = element_ids[1:] warning_elements = self._filter_whitelisted(filename, overlapping_element_ids) if warning_elements: overlap_warning = True detail += self._overlap_detail(filename, warning_elements, element_ids) if overlap_warning: self._element.warn( "Non-whitelisted overlaps detected", detail=detail, warning_token=CoreWarnings.OVERLAPS ) if self._ignored: detail = "Not staging files which would replace non-empty directories in staging area: {}\n".format( self._location ) for element_id, ignored_filenames in self._ignored.items(): element = Plugin._lookup(element_id) detail += "\nFrom {}:\n".format(element._get_full_name()) detail += " " + " ".join( ["{}\n".format(os.path.join(self._location, filename)) for filename in ignored_filenames] ) self._element.warn( "Not staging files which would have replaced non-empty directories", detail=detail, warning_token=CoreWarnings.UNSTAGED_FILES, ) if external_overlaps and self._action != OverlapAction.IGNORE: detail = "Detected file overlaps while staging elements into: {}\n".format(self._location) # Find the session responsible for the overlap # for filename, element_id in external_overlaps.items(): absolute_filename = os.path.join(self._location, filename) overlapped_id, location = self._search_stage_element(absolute_filename, sessions) element = Plugin._lookup(element_id) overlapped = Plugin._lookup(overlapped_id) detail += "{}: {} overlaps files previously staged by {} in: {}\n".format( absolute_filename, element._get_full_name(), overlapped._get_full_name(), location ) if self._action == OverlapAction.WARNING: self._element.warn("Overlaps detected", detail=detail, warning_token=CoreWarnings.OVERLAPS) else: from .element import ElementError raise ElementError("Overlaps detected", detail=detail, reason="overlaps") # _search_stage_element() # # Search the sessions list for the element responsible for staging the given file # # Args: # filename (str): The sandbox relative file which was overwritten # sessions (List[OverlapCollectorSession]) # # Returns: # element_id (int): The unique ID of the element responsible # location (str): The sandbox relative staging location where element_id was staged # def _search_stage_element(self, filename: str, sessions: List["OverlapCollectorSession"]) -> Tuple[int, str]: for session in reversed(sessions): for element_id, staged_files in session._files_written.items(): if any( staged_file for staged_file in staged_files if os.path.join(session._location, staged_file) == filename ): return element_id, session._location assert False, "Could not find element responsible for staging: {}".format(filename) # Silence the linter with an unreachable return statement return None, None # _filter_whitelisted() # # Args: # filename (str): The staging session relative filename # element_ids (List[int]): Ordered list of elements # # Returns: # (List[Element]): The list of element objects which are not whitelisted # def _filter_whitelisted(self, filename: str, element_ids: List[int]): overlap_elements = [] for element_id in element_ids: element = Plugin._lookup(element_id) if not element._file_is_whitelisted(filename): overlap_elements.append(element) return overlap_elements # _overlap_detail() # # Get a string to describe overlaps on a filename # # Args: # filename (str): The filename being overlapped # overlap_elements (List[Element]): A list of Elements overlapping # element_ids (List[int]): The ordered ID list of elements which staged this file # def _overlap_detail(self, filename, overlap_elements, element_ids): filename = os.path.join(self._location, filename) if overlap_elements: overlap_element_names = [element._get_full_name() for element in overlap_elements] overlap_order_elements = [Plugin._lookup(element_id) for element_id in element_ids] overlap_order_names = [element._get_full_name() for element in overlap_order_elements] return "{}: {} {} not permitted to overlap other elements, order {} \n".format( filename, " and ".join(overlap_element_names), "is" if len(overlap_element_names) == 1 else "are", " above ".join(reversed(overlap_order_names)), ) else: return ""