summaryrefslogtreecommitdiff
path: root/src/pip/_internal/operations/install/wheel.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pip/_internal/operations/install/wheel.py')
-rw-r--r--src/pip/_internal/operations/install/wheel.py387
1 files changed, 161 insertions, 226 deletions
diff --git a/src/pip/_internal/operations/install/wheel.py b/src/pip/_internal/operations/install/wheel.py
index b5eafda98..1650d59a3 100644
--- a/src/pip/_internal/operations/install/wheel.py
+++ b/src/pip/_internal/operations/install/wheel.py
@@ -22,6 +22,7 @@ from typing import (
BinaryIO,
Callable,
Dict,
+ Generator,
Iterable,
Iterator,
List,
@@ -38,11 +39,14 @@ from zipfile import ZipFile, ZipInfo
from pip._vendor.distlib.scripts import ScriptMaker
from pip._vendor.distlib.util import get_export_entry
from pip._vendor.packaging.utils import canonicalize_name
-from pip._vendor.six import ensure_str, ensure_text, reraise
from pip._internal.exceptions import InstallationError
from pip._internal.locations import get_major_minor_version
-from pip._internal.metadata import BaseDistribution, get_wheel_distribution
+from pip._internal.metadata import (
+ BaseDistribution,
+ FilesystemWheel,
+ get_wheel_distribution,
+)
from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
from pip._internal.models.scheme import SCHEME_KEYS, Scheme
from pip._internal.utils.filesystem import adjacent_tmp_file, replace
@@ -59,62 +63,55 @@ if TYPE_CHECKING:
from typing import Protocol
class File(Protocol):
- src_record_path = None # type: RecordPath
- dest_path = None # type: str
- changed = None # type: bool
+ src_record_path: "RecordPath"
+ dest_path: str
+ changed: bool
- def save(self):
- # type: () -> None
+ def save(self) -> None:
pass
logger = logging.getLogger(__name__)
-RecordPath = NewType('RecordPath', str)
+RecordPath = NewType("RecordPath", str)
InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
-def rehash(path, blocksize=1 << 20):
- # type: (str, int) -> Tuple[str, str]
+def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]:
"""Return (encoded_digest, length) for path using hashlib.sha256()"""
h, length = hash_file(path, blocksize)
- digest = 'sha256=' + urlsafe_b64encode(
- h.digest()
- ).decode('latin1').rstrip('=')
+ digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=")
return (digest, str(length))
-def csv_io_kwargs(mode):
- # type: (str) -> Dict[str, Any]
+def csv_io_kwargs(mode: str) -> Dict[str, Any]:
"""Return keyword arguments to properly open a CSV file
in the given mode.
"""
- return {'mode': mode, 'newline': '', 'encoding': 'utf-8'}
+ return {"mode": mode, "newline": "", "encoding": "utf-8"}
-def fix_script(path):
- # type: (str) -> bool
+def fix_script(path: str) -> bool:
"""Replace #!python with #!/path/to/python
Return True if file was changed.
"""
# XXX RECORD hashes will need to be updated
assert os.path.isfile(path)
- with open(path, 'rb') as script:
+ with open(path, "rb") as script:
firstline = script.readline()
- if not firstline.startswith(b'#!python'):
+ if not firstline.startswith(b"#!python"):
return False
exename = sys.executable.encode(sys.getfilesystemencoding())
- firstline = b'#!' + exename + os.linesep.encode("ascii")
+ firstline = b"#!" + exename + os.linesep.encode("ascii")
rest = script.read()
- with open(path, 'wb') as script:
+ with open(path, "wb") as script:
script.write(firstline)
script.write(rest)
return True
-def wheel_root_is_purelib(metadata):
- # type: (Message) -> bool
+def wheel_root_is_purelib(metadata: Message) -> bool:
return metadata.get("Root-Is-Purelib", "").lower() == "true"
@@ -129,8 +126,7 @@ def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, s
return console_scripts, gui_scripts
-def message_about_scripts_not_on_PATH(scripts):
- # type: (Sequence[str]) -> Optional[str]
+def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]:
"""Determine if any scripts are not on PATH and format a warning.
Returns a warning message if one or more scripts are not on PATH,
otherwise None.
@@ -139,7 +135,7 @@ def message_about_scripts_not_on_PATH(scripts):
return None
# Group scripts by the path they were installed in
- grouped_by_dir = collections.defaultdict(set) # type: Dict[str, Set[str]]
+ grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set)
for destfile in scripts:
parent_dir = os.path.dirname(destfile)
script_name = os.path.basename(destfile)
@@ -147,23 +143,24 @@ def message_about_scripts_not_on_PATH(scripts):
# We don't want to warn for directories that are on PATH.
not_warn_dirs = [
- os.path.normcase(i).rstrip(os.sep) for i in
- os.environ.get("PATH", "").split(os.pathsep)
+ os.path.normcase(i).rstrip(os.sep)
+ for i in os.environ.get("PATH", "").split(os.pathsep)
]
# If an executable sits with sys.executable, we don't warn for it.
# This covers the case of venv invocations without activating the venv.
not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable)))
- warn_for = {
- parent_dir: scripts for parent_dir, scripts in grouped_by_dir.items()
+ warn_for: Dict[str, Set[str]] = {
+ parent_dir: scripts
+ for parent_dir, scripts in grouped_by_dir.items()
if os.path.normcase(parent_dir) not in not_warn_dirs
- } # type: Dict[str, Set[str]]
+ }
if not warn_for:
return None
# Format a message
msg_lines = []
for parent_dir, dir_scripts in warn_for.items():
- sorted_scripts = sorted(dir_scripts) # type: List[str]
+ sorted_scripts: List[str] = sorted(dir_scripts)
if len(sorted_scripts) == 1:
start_text = "script {} is".format(sorted_scripts[0])
else:
@@ -172,8 +169,9 @@ def message_about_scripts_not_on_PATH(scripts):
)
msg_lines.append(
- "The {} installed in '{}' which is not on PATH."
- .format(start_text, parent_dir)
+ "The {} installed in '{}' which is not on PATH.".format(
+ start_text, parent_dir
+ )
)
last_line_fmt = (
@@ -200,8 +198,9 @@ def message_about_scripts_not_on_PATH(scripts):
return "\n".join(msg_lines)
-def _normalized_outrows(outrows):
- # type: (Iterable[InstalledCSVRow]) -> List[Tuple[str, str, str]]
+def _normalized_outrows(
+ outrows: Iterable[InstalledCSVRow],
+) -> List[Tuple[str, str, str]]:
"""Normalize the given rows of a RECORD file.
Items in each row are converted into str. Rows are then sorted to make
@@ -221,69 +220,57 @@ def _normalized_outrows(outrows):
# For additional background, see--
# https://github.com/pypa/pip/issues/5868
return sorted(
- (ensure_str(record_path, encoding='utf-8'), hash_, str(size))
- for record_path, hash_, size in outrows
+ (record_path, hash_, str(size)) for record_path, hash_, size in outrows
)
-def _record_to_fs_path(record_path):
- # type: (RecordPath) -> str
- return record_path
+def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str:
+ return os.path.join(lib_dir, record_path)
-def _fs_to_record_path(path, relative_to=None):
- # type: (str, Optional[str]) -> RecordPath
- if relative_to is not None:
- # On Windows, do not handle relative paths if they belong to different
- # logical disks
- if os.path.splitdrive(path)[0].lower() == \
- os.path.splitdrive(relative_to)[0].lower():
- path = os.path.relpath(path, relative_to)
- path = path.replace(os.path.sep, '/')
- return cast('RecordPath', path)
+def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath:
+ # On Windows, do not handle relative paths if they belong to different
+ # logical disks
+ if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower():
+ path = os.path.relpath(path, lib_dir)
-
-def _parse_record_path(record_column):
- # type: (str) -> RecordPath
- p = ensure_text(record_column, encoding='utf-8')
- return cast('RecordPath', p)
+ path = path.replace(os.path.sep, "/")
+ return cast("RecordPath", path)
def get_csv_rows_for_installed(
- old_csv_rows, # type: List[List[str]]
- installed, # type: Dict[RecordPath, RecordPath]
- changed, # type: Set[RecordPath]
- generated, # type: List[str]
- lib_dir, # type: str
-):
- # type: (...) -> List[InstalledCSVRow]
+ old_csv_rows: List[List[str]],
+ installed: Dict[RecordPath, RecordPath],
+ changed: Set[RecordPath],
+ generated: List[str],
+ lib_dir: str,
+) -> List[InstalledCSVRow]:
"""
:param installed: A map from archive RECORD path to installation RECORD
path.
"""
- installed_rows = [] # type: List[InstalledCSVRow]
+ installed_rows: List[InstalledCSVRow] = []
for row in old_csv_rows:
if len(row) > 3:
- logger.warning('RECORD line has more than three elements: %s', row)
- old_record_path = _parse_record_path(row[0])
+ logger.warning("RECORD line has more than three elements: %s", row)
+ old_record_path = cast("RecordPath", row[0])
new_record_path = installed.pop(old_record_path, old_record_path)
if new_record_path in changed:
- digest, length = rehash(_record_to_fs_path(new_record_path))
+ digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir))
else:
- digest = row[1] if len(row) > 1 else ''
- length = row[2] if len(row) > 2 else ''
+ digest = row[1] if len(row) > 1 else ""
+ length = row[2] if len(row) > 2 else ""
installed_rows.append((new_record_path, digest, length))
for f in generated:
path = _fs_to_record_path(f, lib_dir)
digest, length = rehash(f)
installed_rows.append((path, digest, length))
for installed_record_path in installed.values():
- installed_rows.append((installed_record_path, '', ''))
+ installed_rows.append((installed_record_path, "", ""))
return installed_rows
-def get_console_script_specs(console):
- # type: (Dict[str, str]) -> List[str]
+def get_console_script_specs(console: Dict[str, str]) -> List[str]:
"""
Given the mapping from entrypoint name to callable, return the relevant
console script specs.
@@ -326,62 +313,57 @@ def get_console_script_specs(console):
# DEFAULT
# - The default behavior is to install pip, pipX, pipX.Y, easy_install
# and easy_install-X.Y.
- pip_script = console.pop('pip', None)
+ pip_script = console.pop("pip", None)
if pip_script:
if "ENSUREPIP_OPTIONS" not in os.environ:
- scripts_to_generate.append('pip = ' + pip_script)
+ scripts_to_generate.append("pip = " + pip_script)
if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
scripts_to_generate.append(
- 'pip{} = {}'.format(sys.version_info[0], pip_script)
+ "pip{} = {}".format(sys.version_info[0], pip_script)
)
- scripts_to_generate.append(
- f'pip{get_major_minor_version()} = {pip_script}'
- )
+ scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}")
# Delete any other versioned pip entry points
- pip_ep = [k for k in console if re.match(r'pip(\d(\.\d)?)?$', k)]
+ pip_ep = [k for k in console if re.match(r"pip(\d(\.\d)?)?$", k)]
for k in pip_ep:
del console[k]
- easy_install_script = console.pop('easy_install', None)
+ easy_install_script = console.pop("easy_install", None)
if easy_install_script:
if "ENSUREPIP_OPTIONS" not in os.environ:
- scripts_to_generate.append(
- 'easy_install = ' + easy_install_script
- )
+ scripts_to_generate.append("easy_install = " + easy_install_script)
scripts_to_generate.append(
- 'easy_install-{} = {}'.format(
+ "easy_install-{} = {}".format(
get_major_minor_version(), easy_install_script
)
)
# Delete any other versioned easy_install entry points
easy_install_ep = [
- k for k in console if re.match(r'easy_install(-\d\.\d)?$', k)
+ k for k in console if re.match(r"easy_install(-\d\.\d)?$", k)
]
for k in easy_install_ep:
del console[k]
# Generate the console entry points specified in the wheel
- scripts_to_generate.extend(starmap('{} = {}'.format, console.items()))
+ scripts_to_generate.extend(starmap("{} = {}".format, console.items()))
return scripts_to_generate
class ZipBackedFile:
- def __init__(self, src_record_path, dest_path, zip_file):
- # type: (RecordPath, str, ZipFile) -> None
+ def __init__(
+ self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile
+ ) -> None:
self.src_record_path = src_record_path
self.dest_path = dest_path
self._zip_file = zip_file
self.changed = False
- def _getinfo(self):
- # type: () -> ZipInfo
+ def _getinfo(self) -> ZipInfo:
return self._zip_file.getinfo(self.src_record_path)
- def save(self):
- # type: () -> None
+ def save(self) -> None:
# directory creation is lazy and after file filtering
# to ensure we don't install empty dirs; empty dirs can't be
# uninstalled.
@@ -410,22 +392,19 @@ class ZipBackedFile:
class ScriptFile:
- def __init__(self, file):
- # type: (File) -> None
+ def __init__(self, file: "File") -> None:
self._file = file
self.src_record_path = self._file.src_record_path
self.dest_path = self._file.dest_path
self.changed = False
- def save(self):
- # type: () -> None
+ def save(self) -> None:
self._file.save()
self.changed = fix_script(self.dest_path)
class MissingCallableSuffix(InstallationError):
- def __init__(self, entry_point):
- # type: (str) -> None
+ def __init__(self, entry_point: str) -> None:
super().__init__(
"Invalid script entry point: {} - A callable "
"suffix is required. Cf https://packaging.python.org/"
@@ -434,31 +413,30 @@ class MissingCallableSuffix(InstallationError):
)
-def _raise_for_invalid_entrypoint(specification):
- # type: (str) -> None
+def _raise_for_invalid_entrypoint(specification: str) -> None:
entry = get_export_entry(specification)
if entry is not None and entry.suffix is None:
raise MissingCallableSuffix(str(entry))
class PipScriptMaker(ScriptMaker):
- def make(self, specification, options=None):
- # type: (str, Dict[str, Any]) -> List[str]
+ def make(
+ self, specification: str, options: Optional[Dict[str, Any]] = None
+ ) -> List[str]:
_raise_for_invalid_entrypoint(specification)
return super().make(specification, options)
def _install_wheel(
- name, # type: str
- wheel_zip, # type: ZipFile
- wheel_path, # type: str
- scheme, # type: Scheme
- pycompile=True, # type: bool
- warn_script_location=True, # type: bool
- direct_url=None, # type: Optional[DirectUrl]
- requested=False, # type: bool
-):
- # type: (...) -> None
+ name: str,
+ wheel_zip: ZipFile,
+ wheel_path: str,
+ scheme: Scheme,
+ pycompile: bool = True,
+ warn_script_location: bool = True,
+ direct_url: Optional[DirectUrl] = None,
+ requested: bool = False,
+) -> None:
"""Install a wheel.
:param name: Name of the project to install
@@ -485,33 +463,23 @@ def _install_wheel(
# installed = files copied from the wheel to the destination
# changed = files changed while installing (scripts #! line typically)
# generated = files newly generated during the install (script wrappers)
- installed = {} # type: Dict[RecordPath, RecordPath]
- changed = set() # type: Set[RecordPath]
- generated = [] # type: List[str]
+ installed: Dict[RecordPath, RecordPath] = {}
+ changed: Set[RecordPath] = set()
+ generated: List[str] = []
- def record_installed(srcfile, destfile, modified=False):
- # type: (RecordPath, str, bool) -> None
+ def record_installed(
+ srcfile: RecordPath, destfile: str, modified: bool = False
+ ) -> None:
"""Map archive RECORD paths to installation RECORD paths."""
newpath = _fs_to_record_path(destfile, lib_dir)
installed[srcfile] = newpath
if modified:
- changed.add(_fs_to_record_path(destfile))
-
- def all_paths():
- # type: () -> Iterable[RecordPath]
- names = wheel_zip.namelist()
- # If a flag is set, names may be unicode in Python 2. We convert to
- # text explicitly so these are valid for lookup in RECORD.
- decoded_names = map(ensure_text, names)
- for name in decoded_names:
- yield cast("RecordPath", name)
-
- def is_dir_path(path):
- # type: (RecordPath) -> bool
+ changed.add(newpath)
+
+ def is_dir_path(path: RecordPath) -> bool:
return path.endswith("/")
- def assert_no_path_traversal(dest_dir_path, target_path):
- # type: (str, str) -> None
+ def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None:
if not is_within_directory(dest_dir_path, target_path):
message = (
"The wheel {!r} has a file {!r} trying to install"
@@ -521,10 +489,10 @@ def _install_wheel(
message.format(wheel_path, target_path, dest_dir_path)
)
- def root_scheme_file_maker(zip_file, dest):
- # type: (ZipFile, str) -> Callable[[RecordPath], File]
- def make_root_scheme_file(record_path):
- # type: (RecordPath) -> File
+ def root_scheme_file_maker(
+ zip_file: ZipFile, dest: str
+ ) -> Callable[[RecordPath], "File"]:
+ def make_root_scheme_file(record_path: RecordPath) -> "File":
normed_path = os.path.normpath(record_path)
dest_path = os.path.join(dest, normed_path)
assert_no_path_traversal(dest, dest_path)
@@ -532,17 +500,12 @@ def _install_wheel(
return make_root_scheme_file
- def data_scheme_file_maker(zip_file, scheme):
- # type: (ZipFile, Scheme) -> Callable[[RecordPath], File]
- scheme_paths = {}
- for key in SCHEME_KEYS:
- encoded_key = ensure_text(key)
- scheme_paths[encoded_key] = ensure_text(
- getattr(scheme, key), encoding=sys.getfilesystemencoding()
- )
+ def data_scheme_file_maker(
+ zip_file: ZipFile, scheme: Scheme
+ ) -> Callable[[RecordPath], "File"]:
+ scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS}
- def make_data_scheme_file(record_path):
- # type: (RecordPath) -> File
+ def make_data_scheme_file(record_path: RecordPath) -> "File":
normed_path = os.path.normpath(record_path)
try:
_, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
@@ -561,9 +524,7 @@ def _install_wheel(
"Unknown scheme key used in {}: {} (for file {!r}). .data"
" directory contents should be in subdirectories named"
" with a valid scheme key ({})"
- ).format(
- wheel_path, scheme_key, record_path, valid_scheme_keys
- )
+ ).format(wheel_path, scheme_key, record_path, valid_scheme_keys)
raise InstallationError(message)
dest_path = os.path.join(scheme_path, dest_subpath)
@@ -572,30 +533,19 @@ def _install_wheel(
return make_data_scheme_file
- def is_data_scheme_path(path):
- # type: (RecordPath) -> bool
+ def is_data_scheme_path(path: RecordPath) -> bool:
return path.split("/", 1)[0].endswith(".data")
- paths = all_paths()
+ paths = cast(List[RecordPath], wheel_zip.namelist())
file_paths = filterfalse(is_dir_path, paths)
- root_scheme_paths, data_scheme_paths = partition(
- is_data_scheme_path, file_paths
- )
+ root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths)
- make_root_scheme_file = root_scheme_file_maker(
- wheel_zip,
- ensure_text(lib_dir, encoding=sys.getfilesystemencoding()),
- )
- files = map(make_root_scheme_file, root_scheme_paths)
+ make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir)
+ files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths)
- def is_script_scheme_path(path):
- # type: (RecordPath) -> bool
+ def is_script_scheme_path(path: RecordPath) -> bool:
parts = path.split("/", 2)
- return (
- len(parts) > 2 and
- parts[0].endswith(".data") and
- parts[1] == "scripts"
- )
+ return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts"
other_scheme_paths, script_scheme_paths = partition(
is_script_scheme_path, data_scheme_paths
@@ -606,30 +556,32 @@ def _install_wheel(
files = chain(files, other_scheme_files)
# Get the defined entry points
- distribution = get_wheel_distribution(wheel_path, canonicalize_name(name))
+ distribution = get_wheel_distribution(
+ FilesystemWheel(wheel_path),
+ canonicalize_name(name),
+ )
console, gui = get_entrypoints(distribution)
- def is_entrypoint_wrapper(file):
- # type: (File) -> bool
+ def is_entrypoint_wrapper(file: "File") -> bool:
# EP, EP.exe and EP-script.py are scripts generated for
# entry point EP by setuptools
path = file.dest_path
name = os.path.basename(path)
- if name.lower().endswith('.exe'):
+ if name.lower().endswith(".exe"):
matchname = name[:-4]
- elif name.lower().endswith('-script.py'):
+ elif name.lower().endswith("-script.py"):
matchname = name[:-10]
elif name.lower().endswith(".pya"):
matchname = name[:-4]
else:
matchname = name
# Ignore setuptools-generated scripts
- return (matchname in console or matchname in gui)
+ return matchname in console or matchname in gui
- script_scheme_files = map(make_data_scheme_file, script_scheme_paths)
- script_scheme_files = filterfalse(
- is_entrypoint_wrapper, script_scheme_files
+ script_scheme_files: Iterator[File] = map(
+ make_data_scheme_file, script_scheme_paths
)
+ script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files)
script_scheme_files = map(ScriptFile, script_scheme_files)
files = chain(files, script_scheme_files)
@@ -637,8 +589,7 @@ def _install_wheel(
file.save()
record_installed(file.src_record_path, file.dest_path, file.changed)
- def pyc_source_file_paths():
- # type: () -> Iterator[str]
+ def pyc_source_file_paths() -> Generator[str, None, None]:
# We de-duplicate installation paths, since there can be overlap (e.g.
# file in .data maps to same location as file in wheel root).
# Sorting installation paths makes it easier to reproduce and debug
@@ -647,30 +598,21 @@ def _install_wheel(
full_installed_path = os.path.join(lib_dir, installed_path)
if not os.path.isfile(full_installed_path):
continue
- if not full_installed_path.endswith('.py'):
+ if not full_installed_path.endswith(".py"):
continue
yield full_installed_path
- def pyc_output_path(path):
- # type: (str) -> str
- """Return the path the pyc file would have been written to.
- """
+ def pyc_output_path(path: str) -> str:
+ """Return the path the pyc file would have been written to."""
return importlib.util.cache_from_source(path)
# Compile all of the pyc files for the installed files
if pycompile:
with captured_stdout() as stdout:
with warnings.catch_warnings():
- warnings.filterwarnings('ignore')
+ warnings.filterwarnings("ignore")
for path in pyc_source_file_paths():
- # Python 2's `compileall.compile_file` requires a str in
- # error cases, so we must convert to the native type.
- path_arg = ensure_str(
- path, encoding=sys.getfilesystemencoding()
- )
- success = compileall.compile_file(
- path_arg, force=True, quiet=True
- )
+ success = compileall.compile_file(path, force=True, quiet=True)
if success:
pyc_path = pyc_output_path(path)
assert os.path.exists(pyc_path)
@@ -689,7 +631,7 @@ def _install_wheel(
# Ensure we don't generate any variants for scripts because this is almost
# never what somebody wants.
# See https://bitbucket.org/pypa/distlib/issue/35/
- maker.variants = {''}
+ maker.variants = {""}
# This is required because otherwise distlib creates scripts that are not
# executable.
@@ -699,14 +641,12 @@ def _install_wheel(
# Generate the console and GUI entry points specified in the wheel
scripts_to_generate = get_console_script_specs(console)
- gui_scripts_to_generate = list(starmap('{} = {}'.format, gui.items()))
+ gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items()))
generated_console_scripts = maker.make_multiple(scripts_to_generate)
generated.extend(generated_console_scripts)
- generated.extend(
- maker.make_multiple(gui_scripts_to_generate, {'gui': True})
- )
+ generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True}))
if warn_script_location:
msg = message_about_scripts_not_on_PATH(generated_console_scripts)
@@ -716,8 +656,7 @@ def _install_wheel(
generated_file_mode = 0o666 & ~current_umask()
@contextlib.contextmanager
- def _generate_file(path, **kwargs):
- # type: (str, **Any) -> Iterator[BinaryIO]
+ def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
with adjacent_tmp_file(path, **kwargs) as f:
yield f
os.chmod(f.name, generated_file_mode)
@@ -726,9 +665,9 @@ def _install_wheel(
dest_info_dir = os.path.join(lib_dir, info_dir)
# Record pip as the installer
- installer_path = os.path.join(dest_info_dir, 'INSTALLER')
+ installer_path = os.path.join(dest_info_dir, "INSTALLER")
with _generate_file(installer_path) as installer_file:
- installer_file.write(b'pip\n')
+ installer_file.write(b"pip\n")
generated.append(installer_path)
# Record the PEP 610 direct URL reference
@@ -740,12 +679,12 @@ def _install_wheel(
# Record the REQUESTED file
if requested:
- requested_path = os.path.join(dest_info_dir, 'REQUESTED')
+ requested_path = os.path.join(dest_info_dir, "REQUESTED")
with open(requested_path, "wb"):
pass
generated.append(requested_path)
- record_text = distribution.read_text('RECORD')
+ record_text = distribution.read_text("RECORD")
record_rows = list(csv.reader(record_text.splitlines()))
rows = get_csv_rows_for_installed(
@@ -753,42 +692,38 @@ def _install_wheel(
installed=installed,
changed=changed,
generated=generated,
- lib_dir=lib_dir)
+ lib_dir=lib_dir,
+ )
# Record details of all files installed
- record_path = os.path.join(dest_info_dir, 'RECORD')
+ record_path = os.path.join(dest_info_dir, "RECORD")
- with _generate_file(record_path, **csv_io_kwargs('w')) as record_file:
- # The type mypy infers for record_file is different for Python 3
- # (typing.IO[Any]) and Python 2 (typing.BinaryIO). We explicitly
- # cast to typing.IO[str] as a workaround.
- writer = csv.writer(cast('IO[str]', record_file))
+ with _generate_file(record_path, **csv_io_kwargs("w")) as record_file:
+ # Explicitly cast to typing.IO[str] as a workaround for the mypy error:
+ # "writer" has incompatible type "BinaryIO"; expected "_Writer"
+ writer = csv.writer(cast("IO[str]", record_file))
writer.writerows(_normalized_outrows(rows))
@contextlib.contextmanager
-def req_error_context(req_description):
- # type: (str) -> Iterator[None]
+def req_error_context(req_description: str) -> Generator[None, None, None]:
try:
yield
except InstallationError as e:
message = "For req: {}. {}".format(req_description, e.args[0])
- reraise(
- InstallationError, InstallationError(message), sys.exc_info()[2]
- )
+ raise InstallationError(message) from e
def install_wheel(
- name, # type: str
- wheel_path, # type: str
- scheme, # type: Scheme
- req_description, # type: str
- pycompile=True, # type: bool
- warn_script_location=True, # type: bool
- direct_url=None, # type: Optional[DirectUrl]
- requested=False, # type: bool
-):
- # type: (...) -> None
+ name: str,
+ wheel_path: str,
+ scheme: Scheme,
+ req_description: str,
+ pycompile: bool = True,
+ warn_script_location: bool = True,
+ direct_url: Optional[DirectUrl] = None,
+ requested: bool = False,
+) -> None:
with ZipFile(wheel_path, allowZip64=True) as z:
with req_error_context(req_description):
_install_wheel(