#!/usr/bin/env python3 # # Copyright (C) 2016 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom import os import errno import stat import shutil import string import collections import hashlib import pickle from collections import OrderedDict from . import _yaml from . import ProgramNotFoundError def list_relative_paths(directory, includedirs=False): """List relative filenames recursively Args: directory (str): The directory to list files in includedirs (bool): Whether to include directories in the returned list Returns: A sorted list of files in *directory*, relative to *directory* """ filelist = [] for (dirpath, _, filenames) in os.walk(directory): if includedirs: relpath = os.path.relpath(dirpath, directory) filelist.append(relpath) for f in filenames: fullpath = os.path.join(dirpath, f) relpath = os.path.relpath(fullpath, directory) filelist.append(relpath) return sorted(filelist) def safe_copy(src, dest): """Copy a file while preserving attributes Args: src (str): The source filename dest (str): The destination filename This is almost the same as shutil.copy2(), except that we unlink *dest* before overwriting it if it exists, just incase *dest* is a hardlink to a different file. """ # First unlink the target if it exists try: os.unlink(dest) except OSError as e: if e.errno != errno.ENOENT: raise e shutil.copy2(src, dest) def safe_link(src, dest): """Try to create a hardlink, but resort to copying in the case of cross device links. Args: src (str): The source filename dest (str): The destination filename """ # First unlink the target if it exists try: os.unlink(dest) except OSError as e: if e.errno != errno.ENOENT: raise e # If we can't link it due to cross-device hardlink, copy try: os.link(src, dest) except OSError as e: if e.errno == errno.EXDEV: shutil.copy2(src, dest) else: raise e def copy_files(src, dest, files=None): """Copy files from source to destination. Args: src (str): The source file or directory dest (str): The destination directory files (list): List of source files to copy If *files* is not specified, then all files in *src* will be copied to *dest* """ if not files: files = list_relative_paths(src, includedirs=True) # Use shutil.copy2() which uses copystat() to preserve attributes _process_list(src, dest, files, safe_copy) def link_files(src, dest, files=None): """Hardlink files from source to destination. Args: src (str): The source file or directory dest (str): The destination directory files (list): List of source files to copy If *files* is not specified, then all files in *src* will be linked to *dest*. If the hardlink cannot be created due to crossing filesystems, then the file will be copied instead. """ if not files: files = list_relative_paths(src, includedirs=True) _process_list(src, dest, files, safe_link) def get_host_tool(name): """Get the full path of a host tool Args: name (str): The name of the program to search for Returns: The full path to the program, if found Raises: :class:`.ProgramNotFoundError` """ search_path = os.environ.get('PATH') program_path = shutil.which(name, path=search_path) if not program_path: raise ProgramNotFoundError("Did not find '%s' in PATH: %s" % (name, search_path)) return program_path def url_directory_name(url): """Normalizes a url into a directory name Args: url (str): A url string Returns: A string which can be used as a directory name """ valid_chars = string.digits + string.ascii_letters + '%_' def transl(x): return x if x in valid_chars else '_' return ''.join([transl(x) for x in url]) # Recursively make directories in target area and copy permissions def _copy_directories(srcdir, destdir, target): dir = os.path.dirname(target) new_dir = os.path.join(destdir, dir) if not os.path.lexists(new_dir): if dir: _copy_directories(srcdir, destdir, dir) old_dir = os.path.join(srcdir, dir) if os.path.lexists(old_dir): dir_stat = os.lstat(old_dir) mode = dir_stat.st_mode if stat.S_ISDIR(mode) or stat.S_ISLNK(mode): os.makedirs(new_dir) shutil.copystat(old_dir, new_dir) else: raise OSError('Source directory tree has file where ' 'directory expected: %s' % dir) def _process_list(srcdir, destdir, filelist, actionfunc): def remove_if_exists(file_or_directory): if os.path.lexists(file_or_directory): # XXX We need to collect these to issue a report as a status message print("WARNING: Removing: {}".format(file_or_directory)) try: os.unlink(file_or_directory) except OSError as e: if e.errno == errno.EISDIR: shutil.rmtree(file_or_directory) else: raise e for path in filelist: srcpath = os.path.join(srcdir, path) destpath = os.path.join(destdir, path) # The destination directory may not have been created separately _copy_directories(srcdir, destdir, path) # XXX os.lstat is known to raise UnicodeEncodeError file_stat = os.lstat(srcpath) mode = file_stat.st_mode if stat.S_ISDIR(mode): # Ensure directory exists in destination, then recurse. if not os.path.lexists(destpath): os.makedirs(destpath) dest_stat = os.stat(os.path.realpath(destpath)) if not stat.S_ISDIR(dest_stat.st_mode): raise OSError('Destination not a directory. source has %s' ' destination has %s' % (srcpath, destpath)) shutil.copystat(srcpath, destpath) elif stat.S_ISLNK(mode): # Should we really nuke directories which symlinks replace ? # Should it be an error condition or just a warning ? # If a warning, should we drop the symlink instead ? remove_if_exists(destpath) target = os.readlink(srcpath) os.symlink(target, destpath) elif stat.S_ISREG(mode): # Process the file. remove_if_exists(destpath) actionfunc(srcpath, destpath) elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode): # Block or character device. Put contents of st_dev in a mknod. remove_if_exists(destpath) if os.path.lexists(destpath): os.remove(destpath) os.mknod(destpath, file_stat.st_mode, file_stat.st_rdev) os.chmod(destpath, file_stat.st_mode) else: # Unsupported type. raise OSError('Cannot extract %s into staging-area. Unsupported type.' % srcpath) # _relative_symlink_target() # # Fetches a relative path for symlink with an absolute target # # @root: The staging area root location # @symlink: Location of the symlink in staging area (including the root path) # @target: The symbolic link target, which may be an absolute path # # If @target is an absolute path, a relative path from the symbolic link # location will be returned, otherwise if @target is a relative path, it will # be returned unchanged. # # Using relative symlinks helps to keep the target self contained when staging # files onto the target. # def _relative_symlink_target(root, symlink, target): if os.path.isabs(target): # First fix the input a little, the symlink itself must not have a # trailing slash, otherwise we fail to remove the symlink filename # from it's directory components in os.path.split() # # The absolute target filename must have it's leading separator # removed, otherwise os.path.join() will discard the prefix symlink = symlink.rstrip(os.path.sep) target = target.lstrip(os.path.sep) # We want a relative path from the directory in which symlink # is located, not from the symlink itself. symlinkdir, _ = os.path.split(symlink) # Create a full path to the target, including the leading staging # directory fulltarget = os.path.join(root, target) # now get the relative path from the directory where the symlink # is located within the staging root, to the target within the same # staging root newtarget = os.path.relpath(fulltarget, symlinkdir) return newtarget else: return target # _generate_key() # # Generate an sha256 hex digest from the given value. The value # can be a simple value or recursive dictionary with lists etc, # anything simple enough to serialize. # # Args: # value: A value to get a key for # # Returns: # (str): An sha256 hex digest of the given value # def _generate_key(value): ordered = _node_sanitize(value) string = pickle.dumps(ordered) return hashlib.sha256(string).hexdigest() # _node_sanitize() # # Returnes an alphabetically ordered recursive copy # of the source node with internal provenance information stripped. # # Only dicts are ordered, list elements are left in order. # def _node_sanitize(node): if isinstance(node, collections.Mapping): result = OrderedDict() for key in sorted(node): if key == _yaml.PROVENANCE_KEY: continue result[key] = _node_sanitize(node[key]) return result elif isinstance(node, list): return [_node_sanitize(elt) for elt in node] return node