diff options
Diffstat (limited to 'src/pip/_internal/operations/install/wheel.py')
-rw-r--r-- | src/pip/_internal/operations/install/wheel.py | 387 |
1 files changed, 161 insertions, 226 deletions
diff --git a/src/pip/_internal/operations/install/wheel.py b/src/pip/_internal/operations/install/wheel.py index b5eafda98..1650d59a3 100644 --- a/src/pip/_internal/operations/install/wheel.py +++ b/src/pip/_internal/operations/install/wheel.py @@ -22,6 +22,7 @@ from typing import ( BinaryIO, Callable, Dict, + Generator, Iterable, Iterator, List, @@ -38,11 +39,14 @@ from zipfile import ZipFile, ZipInfo from pip._vendor.distlib.scripts import ScriptMaker from pip._vendor.distlib.util import get_export_entry from pip._vendor.packaging.utils import canonicalize_name -from pip._vendor.six import ensure_str, ensure_text, reraise from pip._internal.exceptions import InstallationError from pip._internal.locations import get_major_minor_version -from pip._internal.metadata import BaseDistribution, get_wheel_distribution +from pip._internal.metadata import ( + BaseDistribution, + FilesystemWheel, + get_wheel_distribution, +) from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl from pip._internal.models.scheme import SCHEME_KEYS, Scheme from pip._internal.utils.filesystem import adjacent_tmp_file, replace @@ -59,62 +63,55 @@ if TYPE_CHECKING: from typing import Protocol class File(Protocol): - src_record_path = None # type: RecordPath - dest_path = None # type: str - changed = None # type: bool + src_record_path: "RecordPath" + dest_path: str + changed: bool - def save(self): - # type: () -> None + def save(self) -> None: pass logger = logging.getLogger(__name__) -RecordPath = NewType('RecordPath', str) +RecordPath = NewType("RecordPath", str) InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]] -def rehash(path, blocksize=1 << 20): - # type: (str, int) -> Tuple[str, str] +def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]: """Return (encoded_digest, length) for path using hashlib.sha256()""" h, length = hash_file(path, blocksize) - digest = 'sha256=' + urlsafe_b64encode( - h.digest() - ).decode('latin1').rstrip('=') + digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=") return (digest, str(length)) -def csv_io_kwargs(mode): - # type: (str) -> Dict[str, Any] +def csv_io_kwargs(mode: str) -> Dict[str, Any]: """Return keyword arguments to properly open a CSV file in the given mode. """ - return {'mode': mode, 'newline': '', 'encoding': 'utf-8'} + return {"mode": mode, "newline": "", "encoding": "utf-8"} -def fix_script(path): - # type: (str) -> bool +def fix_script(path: str) -> bool: """Replace #!python with #!/path/to/python Return True if file was changed. """ # XXX RECORD hashes will need to be updated assert os.path.isfile(path) - with open(path, 'rb') as script: + with open(path, "rb") as script: firstline = script.readline() - if not firstline.startswith(b'#!python'): + if not firstline.startswith(b"#!python"): return False exename = sys.executable.encode(sys.getfilesystemencoding()) - firstline = b'#!' + exename + os.linesep.encode("ascii") + firstline = b"#!" + exename + os.linesep.encode("ascii") rest = script.read() - with open(path, 'wb') as script: + with open(path, "wb") as script: script.write(firstline) script.write(rest) return True -def wheel_root_is_purelib(metadata): - # type: (Message) -> bool +def wheel_root_is_purelib(metadata: Message) -> bool: return metadata.get("Root-Is-Purelib", "").lower() == "true" @@ -129,8 +126,7 @@ def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, s return console_scripts, gui_scripts -def message_about_scripts_not_on_PATH(scripts): - # type: (Sequence[str]) -> Optional[str] +def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]: """Determine if any scripts are not on PATH and format a warning. Returns a warning message if one or more scripts are not on PATH, otherwise None. @@ -139,7 +135,7 @@ def message_about_scripts_not_on_PATH(scripts): return None # Group scripts by the path they were installed in - grouped_by_dir = collections.defaultdict(set) # type: Dict[str, Set[str]] + grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set) for destfile in scripts: parent_dir = os.path.dirname(destfile) script_name = os.path.basename(destfile) @@ -147,23 +143,24 @@ def message_about_scripts_not_on_PATH(scripts): # We don't want to warn for directories that are on PATH. not_warn_dirs = [ - os.path.normcase(i).rstrip(os.sep) for i in - os.environ.get("PATH", "").split(os.pathsep) + os.path.normcase(i).rstrip(os.sep) + for i in os.environ.get("PATH", "").split(os.pathsep) ] # If an executable sits with sys.executable, we don't warn for it. # This covers the case of venv invocations without activating the venv. not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable))) - warn_for = { - parent_dir: scripts for parent_dir, scripts in grouped_by_dir.items() + warn_for: Dict[str, Set[str]] = { + parent_dir: scripts + for parent_dir, scripts in grouped_by_dir.items() if os.path.normcase(parent_dir) not in not_warn_dirs - } # type: Dict[str, Set[str]] + } if not warn_for: return None # Format a message msg_lines = [] for parent_dir, dir_scripts in warn_for.items(): - sorted_scripts = sorted(dir_scripts) # type: List[str] + sorted_scripts: List[str] = sorted(dir_scripts) if len(sorted_scripts) == 1: start_text = "script {} is".format(sorted_scripts[0]) else: @@ -172,8 +169,9 @@ def message_about_scripts_not_on_PATH(scripts): ) msg_lines.append( - "The {} installed in '{}' which is not on PATH." - .format(start_text, parent_dir) + "The {} installed in '{}' which is not on PATH.".format( + start_text, parent_dir + ) ) last_line_fmt = ( @@ -200,8 +198,9 @@ def message_about_scripts_not_on_PATH(scripts): return "\n".join(msg_lines) -def _normalized_outrows(outrows): - # type: (Iterable[InstalledCSVRow]) -> List[Tuple[str, str, str]] +def _normalized_outrows( + outrows: Iterable[InstalledCSVRow], +) -> List[Tuple[str, str, str]]: """Normalize the given rows of a RECORD file. Items in each row are converted into str. Rows are then sorted to make @@ -221,69 +220,57 @@ def _normalized_outrows(outrows): # For additional background, see-- # https://github.com/pypa/pip/issues/5868 return sorted( - (ensure_str(record_path, encoding='utf-8'), hash_, str(size)) - for record_path, hash_, size in outrows + (record_path, hash_, str(size)) for record_path, hash_, size in outrows ) -def _record_to_fs_path(record_path): - # type: (RecordPath) -> str - return record_path +def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str: + return os.path.join(lib_dir, record_path) -def _fs_to_record_path(path, relative_to=None): - # type: (str, Optional[str]) -> RecordPath - if relative_to is not None: - # On Windows, do not handle relative paths if they belong to different - # logical disks - if os.path.splitdrive(path)[0].lower() == \ - os.path.splitdrive(relative_to)[0].lower(): - path = os.path.relpath(path, relative_to) - path = path.replace(os.path.sep, '/') - return cast('RecordPath', path) +def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath: + # On Windows, do not handle relative paths if they belong to different + # logical disks + if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower(): + path = os.path.relpath(path, lib_dir) - -def _parse_record_path(record_column): - # type: (str) -> RecordPath - p = ensure_text(record_column, encoding='utf-8') - return cast('RecordPath', p) + path = path.replace(os.path.sep, "/") + return cast("RecordPath", path) def get_csv_rows_for_installed( - old_csv_rows, # type: List[List[str]] - installed, # type: Dict[RecordPath, RecordPath] - changed, # type: Set[RecordPath] - generated, # type: List[str] - lib_dir, # type: str -): - # type: (...) -> List[InstalledCSVRow] + old_csv_rows: List[List[str]], + installed: Dict[RecordPath, RecordPath], + changed: Set[RecordPath], + generated: List[str], + lib_dir: str, +) -> List[InstalledCSVRow]: """ :param installed: A map from archive RECORD path to installation RECORD path. """ - installed_rows = [] # type: List[InstalledCSVRow] + installed_rows: List[InstalledCSVRow] = [] for row in old_csv_rows: if len(row) > 3: - logger.warning('RECORD line has more than three elements: %s', row) - old_record_path = _parse_record_path(row[0]) + logger.warning("RECORD line has more than three elements: %s", row) + old_record_path = cast("RecordPath", row[0]) new_record_path = installed.pop(old_record_path, old_record_path) if new_record_path in changed: - digest, length = rehash(_record_to_fs_path(new_record_path)) + digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir)) else: - digest = row[1] if len(row) > 1 else '' - length = row[2] if len(row) > 2 else '' + digest = row[1] if len(row) > 1 else "" + length = row[2] if len(row) > 2 else "" installed_rows.append((new_record_path, digest, length)) for f in generated: path = _fs_to_record_path(f, lib_dir) digest, length = rehash(f) installed_rows.append((path, digest, length)) for installed_record_path in installed.values(): - installed_rows.append((installed_record_path, '', '')) + installed_rows.append((installed_record_path, "", "")) return installed_rows -def get_console_script_specs(console): - # type: (Dict[str, str]) -> List[str] +def get_console_script_specs(console: Dict[str, str]) -> List[str]: """ Given the mapping from entrypoint name to callable, return the relevant console script specs. @@ -326,62 +313,57 @@ def get_console_script_specs(console): # DEFAULT # - The default behavior is to install pip, pipX, pipX.Y, easy_install # and easy_install-X.Y. - pip_script = console.pop('pip', None) + pip_script = console.pop("pip", None) if pip_script: if "ENSUREPIP_OPTIONS" not in os.environ: - scripts_to_generate.append('pip = ' + pip_script) + scripts_to_generate.append("pip = " + pip_script) if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall": scripts_to_generate.append( - 'pip{} = {}'.format(sys.version_info[0], pip_script) + "pip{} = {}".format(sys.version_info[0], pip_script) ) - scripts_to_generate.append( - f'pip{get_major_minor_version()} = {pip_script}' - ) + scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}") # Delete any other versioned pip entry points - pip_ep = [k for k in console if re.match(r'pip(\d(\.\d)?)?$', k)] + pip_ep = [k for k in console if re.match(r"pip(\d(\.\d)?)?$", k)] for k in pip_ep: del console[k] - easy_install_script = console.pop('easy_install', None) + easy_install_script = console.pop("easy_install", None) if easy_install_script: if "ENSUREPIP_OPTIONS" not in os.environ: - scripts_to_generate.append( - 'easy_install = ' + easy_install_script - ) + scripts_to_generate.append("easy_install = " + easy_install_script) scripts_to_generate.append( - 'easy_install-{} = {}'.format( + "easy_install-{} = {}".format( get_major_minor_version(), easy_install_script ) ) # Delete any other versioned easy_install entry points easy_install_ep = [ - k for k in console if re.match(r'easy_install(-\d\.\d)?$', k) + k for k in console if re.match(r"easy_install(-\d\.\d)?$", k) ] for k in easy_install_ep: del console[k] # Generate the console entry points specified in the wheel - scripts_to_generate.extend(starmap('{} = {}'.format, console.items())) + scripts_to_generate.extend(starmap("{} = {}".format, console.items())) return scripts_to_generate class ZipBackedFile: - def __init__(self, src_record_path, dest_path, zip_file): - # type: (RecordPath, str, ZipFile) -> None + def __init__( + self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile + ) -> None: self.src_record_path = src_record_path self.dest_path = dest_path self._zip_file = zip_file self.changed = False - def _getinfo(self): - # type: () -> ZipInfo + def _getinfo(self) -> ZipInfo: return self._zip_file.getinfo(self.src_record_path) - def save(self): - # type: () -> None + def save(self) -> None: # directory creation is lazy and after file filtering # to ensure we don't install empty dirs; empty dirs can't be # uninstalled. @@ -410,22 +392,19 @@ class ZipBackedFile: class ScriptFile: - def __init__(self, file): - # type: (File) -> None + def __init__(self, file: "File") -> None: self._file = file self.src_record_path = self._file.src_record_path self.dest_path = self._file.dest_path self.changed = False - def save(self): - # type: () -> None + def save(self) -> None: self._file.save() self.changed = fix_script(self.dest_path) class MissingCallableSuffix(InstallationError): - def __init__(self, entry_point): - # type: (str) -> None + def __init__(self, entry_point: str) -> None: super().__init__( "Invalid script entry point: {} - A callable " "suffix is required. Cf https://packaging.python.org/" @@ -434,31 +413,30 @@ class MissingCallableSuffix(InstallationError): ) -def _raise_for_invalid_entrypoint(specification): - # type: (str) -> None +def _raise_for_invalid_entrypoint(specification: str) -> None: entry = get_export_entry(specification) if entry is not None and entry.suffix is None: raise MissingCallableSuffix(str(entry)) class PipScriptMaker(ScriptMaker): - def make(self, specification, options=None): - # type: (str, Dict[str, Any]) -> List[str] + def make( + self, specification: str, options: Optional[Dict[str, Any]] = None + ) -> List[str]: _raise_for_invalid_entrypoint(specification) return super().make(specification, options) def _install_wheel( - name, # type: str - wheel_zip, # type: ZipFile - wheel_path, # type: str - scheme, # type: Scheme - pycompile=True, # type: bool - warn_script_location=True, # type: bool - direct_url=None, # type: Optional[DirectUrl] - requested=False, # type: bool -): - # type: (...) -> None + name: str, + wheel_zip: ZipFile, + wheel_path: str, + scheme: Scheme, + pycompile: bool = True, + warn_script_location: bool = True, + direct_url: Optional[DirectUrl] = None, + requested: bool = False, +) -> None: """Install a wheel. :param name: Name of the project to install @@ -485,33 +463,23 @@ def _install_wheel( # installed = files copied from the wheel to the destination # changed = files changed while installing (scripts #! line typically) # generated = files newly generated during the install (script wrappers) - installed = {} # type: Dict[RecordPath, RecordPath] - changed = set() # type: Set[RecordPath] - generated = [] # type: List[str] + installed: Dict[RecordPath, RecordPath] = {} + changed: Set[RecordPath] = set() + generated: List[str] = [] - def record_installed(srcfile, destfile, modified=False): - # type: (RecordPath, str, bool) -> None + def record_installed( + srcfile: RecordPath, destfile: str, modified: bool = False + ) -> None: """Map archive RECORD paths to installation RECORD paths.""" newpath = _fs_to_record_path(destfile, lib_dir) installed[srcfile] = newpath if modified: - changed.add(_fs_to_record_path(destfile)) - - def all_paths(): - # type: () -> Iterable[RecordPath] - names = wheel_zip.namelist() - # If a flag is set, names may be unicode in Python 2. We convert to - # text explicitly so these are valid for lookup in RECORD. - decoded_names = map(ensure_text, names) - for name in decoded_names: - yield cast("RecordPath", name) - - def is_dir_path(path): - # type: (RecordPath) -> bool + changed.add(newpath) + + def is_dir_path(path: RecordPath) -> bool: return path.endswith("/") - def assert_no_path_traversal(dest_dir_path, target_path): - # type: (str, str) -> None + def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None: if not is_within_directory(dest_dir_path, target_path): message = ( "The wheel {!r} has a file {!r} trying to install" @@ -521,10 +489,10 @@ def _install_wheel( message.format(wheel_path, target_path, dest_dir_path) ) - def root_scheme_file_maker(zip_file, dest): - # type: (ZipFile, str) -> Callable[[RecordPath], File] - def make_root_scheme_file(record_path): - # type: (RecordPath) -> File + def root_scheme_file_maker( + zip_file: ZipFile, dest: str + ) -> Callable[[RecordPath], "File"]: + def make_root_scheme_file(record_path: RecordPath) -> "File": normed_path = os.path.normpath(record_path) dest_path = os.path.join(dest, normed_path) assert_no_path_traversal(dest, dest_path) @@ -532,17 +500,12 @@ def _install_wheel( return make_root_scheme_file - def data_scheme_file_maker(zip_file, scheme): - # type: (ZipFile, Scheme) -> Callable[[RecordPath], File] - scheme_paths = {} - for key in SCHEME_KEYS: - encoded_key = ensure_text(key) - scheme_paths[encoded_key] = ensure_text( - getattr(scheme, key), encoding=sys.getfilesystemencoding() - ) + def data_scheme_file_maker( + zip_file: ZipFile, scheme: Scheme + ) -> Callable[[RecordPath], "File"]: + scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS} - def make_data_scheme_file(record_path): - # type: (RecordPath) -> File + def make_data_scheme_file(record_path: RecordPath) -> "File": normed_path = os.path.normpath(record_path) try: _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2) @@ -561,9 +524,7 @@ def _install_wheel( "Unknown scheme key used in {}: {} (for file {!r}). .data" " directory contents should be in subdirectories named" " with a valid scheme key ({})" - ).format( - wheel_path, scheme_key, record_path, valid_scheme_keys - ) + ).format(wheel_path, scheme_key, record_path, valid_scheme_keys) raise InstallationError(message) dest_path = os.path.join(scheme_path, dest_subpath) @@ -572,30 +533,19 @@ def _install_wheel( return make_data_scheme_file - def is_data_scheme_path(path): - # type: (RecordPath) -> bool + def is_data_scheme_path(path: RecordPath) -> bool: return path.split("/", 1)[0].endswith(".data") - paths = all_paths() + paths = cast(List[RecordPath], wheel_zip.namelist()) file_paths = filterfalse(is_dir_path, paths) - root_scheme_paths, data_scheme_paths = partition( - is_data_scheme_path, file_paths - ) + root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths) - make_root_scheme_file = root_scheme_file_maker( - wheel_zip, - ensure_text(lib_dir, encoding=sys.getfilesystemencoding()), - ) - files = map(make_root_scheme_file, root_scheme_paths) + make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir) + files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths) - def is_script_scheme_path(path): - # type: (RecordPath) -> bool + def is_script_scheme_path(path: RecordPath) -> bool: parts = path.split("/", 2) - return ( - len(parts) > 2 and - parts[0].endswith(".data") and - parts[1] == "scripts" - ) + return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts" other_scheme_paths, script_scheme_paths = partition( is_script_scheme_path, data_scheme_paths @@ -606,30 +556,32 @@ def _install_wheel( files = chain(files, other_scheme_files) # Get the defined entry points - distribution = get_wheel_distribution(wheel_path, canonicalize_name(name)) + distribution = get_wheel_distribution( + FilesystemWheel(wheel_path), + canonicalize_name(name), + ) console, gui = get_entrypoints(distribution) - def is_entrypoint_wrapper(file): - # type: (File) -> bool + def is_entrypoint_wrapper(file: "File") -> bool: # EP, EP.exe and EP-script.py are scripts generated for # entry point EP by setuptools path = file.dest_path name = os.path.basename(path) - if name.lower().endswith('.exe'): + if name.lower().endswith(".exe"): matchname = name[:-4] - elif name.lower().endswith('-script.py'): + elif name.lower().endswith("-script.py"): matchname = name[:-10] elif name.lower().endswith(".pya"): matchname = name[:-4] else: matchname = name # Ignore setuptools-generated scripts - return (matchname in console or matchname in gui) + return matchname in console or matchname in gui - script_scheme_files = map(make_data_scheme_file, script_scheme_paths) - script_scheme_files = filterfalse( - is_entrypoint_wrapper, script_scheme_files + script_scheme_files: Iterator[File] = map( + make_data_scheme_file, script_scheme_paths ) + script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files) script_scheme_files = map(ScriptFile, script_scheme_files) files = chain(files, script_scheme_files) @@ -637,8 +589,7 @@ def _install_wheel( file.save() record_installed(file.src_record_path, file.dest_path, file.changed) - def pyc_source_file_paths(): - # type: () -> Iterator[str] + def pyc_source_file_paths() -> Generator[str, None, None]: # We de-duplicate installation paths, since there can be overlap (e.g. # file in .data maps to same location as file in wheel root). # Sorting installation paths makes it easier to reproduce and debug @@ -647,30 +598,21 @@ def _install_wheel( full_installed_path = os.path.join(lib_dir, installed_path) if not os.path.isfile(full_installed_path): continue - if not full_installed_path.endswith('.py'): + if not full_installed_path.endswith(".py"): continue yield full_installed_path - def pyc_output_path(path): - # type: (str) -> str - """Return the path the pyc file would have been written to. - """ + def pyc_output_path(path: str) -> str: + """Return the path the pyc file would have been written to.""" return importlib.util.cache_from_source(path) # Compile all of the pyc files for the installed files if pycompile: with captured_stdout() as stdout: with warnings.catch_warnings(): - warnings.filterwarnings('ignore') + warnings.filterwarnings("ignore") for path in pyc_source_file_paths(): - # Python 2's `compileall.compile_file` requires a str in - # error cases, so we must convert to the native type. - path_arg = ensure_str( - path, encoding=sys.getfilesystemencoding() - ) - success = compileall.compile_file( - path_arg, force=True, quiet=True - ) + success = compileall.compile_file(path, force=True, quiet=True) if success: pyc_path = pyc_output_path(path) assert os.path.exists(pyc_path) @@ -689,7 +631,7 @@ def _install_wheel( # Ensure we don't generate any variants for scripts because this is almost # never what somebody wants. # See https://bitbucket.org/pypa/distlib/issue/35/ - maker.variants = {''} + maker.variants = {""} # This is required because otherwise distlib creates scripts that are not # executable. @@ -699,14 +641,12 @@ def _install_wheel( # Generate the console and GUI entry points specified in the wheel scripts_to_generate = get_console_script_specs(console) - gui_scripts_to_generate = list(starmap('{} = {}'.format, gui.items())) + gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items())) generated_console_scripts = maker.make_multiple(scripts_to_generate) generated.extend(generated_console_scripts) - generated.extend( - maker.make_multiple(gui_scripts_to_generate, {'gui': True}) - ) + generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True})) if warn_script_location: msg = message_about_scripts_not_on_PATH(generated_console_scripts) @@ -716,8 +656,7 @@ def _install_wheel( generated_file_mode = 0o666 & ~current_umask() @contextlib.contextmanager - def _generate_file(path, **kwargs): - # type: (str, **Any) -> Iterator[BinaryIO] + def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]: with adjacent_tmp_file(path, **kwargs) as f: yield f os.chmod(f.name, generated_file_mode) @@ -726,9 +665,9 @@ def _install_wheel( dest_info_dir = os.path.join(lib_dir, info_dir) # Record pip as the installer - installer_path = os.path.join(dest_info_dir, 'INSTALLER') + installer_path = os.path.join(dest_info_dir, "INSTALLER") with _generate_file(installer_path) as installer_file: - installer_file.write(b'pip\n') + installer_file.write(b"pip\n") generated.append(installer_path) # Record the PEP 610 direct URL reference @@ -740,12 +679,12 @@ def _install_wheel( # Record the REQUESTED file if requested: - requested_path = os.path.join(dest_info_dir, 'REQUESTED') + requested_path = os.path.join(dest_info_dir, "REQUESTED") with open(requested_path, "wb"): pass generated.append(requested_path) - record_text = distribution.read_text('RECORD') + record_text = distribution.read_text("RECORD") record_rows = list(csv.reader(record_text.splitlines())) rows = get_csv_rows_for_installed( @@ -753,42 +692,38 @@ def _install_wheel( installed=installed, changed=changed, generated=generated, - lib_dir=lib_dir) + lib_dir=lib_dir, + ) # Record details of all files installed - record_path = os.path.join(dest_info_dir, 'RECORD') + record_path = os.path.join(dest_info_dir, "RECORD") - with _generate_file(record_path, **csv_io_kwargs('w')) as record_file: - # The type mypy infers for record_file is different for Python 3 - # (typing.IO[Any]) and Python 2 (typing.BinaryIO). We explicitly - # cast to typing.IO[str] as a workaround. - writer = csv.writer(cast('IO[str]', record_file)) + with _generate_file(record_path, **csv_io_kwargs("w")) as record_file: + # Explicitly cast to typing.IO[str] as a workaround for the mypy error: + # "writer" has incompatible type "BinaryIO"; expected "_Writer" + writer = csv.writer(cast("IO[str]", record_file)) writer.writerows(_normalized_outrows(rows)) @contextlib.contextmanager -def req_error_context(req_description): - # type: (str) -> Iterator[None] +def req_error_context(req_description: str) -> Generator[None, None, None]: try: yield except InstallationError as e: message = "For req: {}. {}".format(req_description, e.args[0]) - reraise( - InstallationError, InstallationError(message), sys.exc_info()[2] - ) + raise InstallationError(message) from e def install_wheel( - name, # type: str - wheel_path, # type: str - scheme, # type: Scheme - req_description, # type: str - pycompile=True, # type: bool - warn_script_location=True, # type: bool - direct_url=None, # type: Optional[DirectUrl] - requested=False, # type: bool -): - # type: (...) -> None + name: str, + wheel_path: str, + scheme: Scheme, + req_description: str, + pycompile: bool = True, + warn_script_location: bool = True, + direct_url: Optional[DirectUrl] = None, + requested: bool = False, +) -> None: with ZipFile(wheel_path, allowZip64=True) as z: with req_error_context(req_description): _install_wheel( |