diff options
Diffstat (limited to 'wheel/install.py')
-rw-r--r-- | wheel/install.py | 481 |
1 files changed, 0 insertions, 481 deletions
diff --git a/wheel/install.py b/wheel/install.py deleted file mode 100644 index a422b0e..0000000 --- a/wheel/install.py +++ /dev/null @@ -1,481 +0,0 @@ -""" -Operations on existing wheel files, including basic installation. -""" -# XXX see patched pip to install - -import sys -import warnings -import os.path -import re -import zipfile -import hashlib -import csv - -import shutil - -try: - _big_number = sys.maxsize -except NameError: - _big_number = sys.maxint - -from .decorator import reify -from .util import (urlsafe_b64encode, from_json, urlsafe_b64decode, - native, binary, HashingFile) -from . import signatures -from .pkginfo import read_pkg_info_bytes -from .util import open_for_csv - -from .pep425tags import get_supported -from .paths import get_install_paths - -# The next major version after this version of the 'wheel' tool: -VERSION_TOO_HIGH = (1, 0) - -# Non-greedy matching of an optional build number may be too clever (more -# invalid wheel filenames will match). Separate regex for .dist-info? -WHEEL_INFO_RE = re.compile( - r"""^(?P<namever>(?P<name>.+?)(-(?P<ver>\d.+?))?) - ((-(?P<build>\d.*?))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?) - \.whl|\.dist-info)$""", - re.VERBOSE).match - -def parse_version(version): - """Use parse_version from pkg_resources or distutils as available.""" - global parse_version - try: - from pkg_resources import parse_version - except ImportError: - from distutils.version import LooseVersion as parse_version - return parse_version(version) - -class BadWheelFile(ValueError): - pass - - -class WheelFile(object): - """Parse wheel-specific attributes from a wheel (.whl) file and offer - basic installation and verification support. - - WheelFile can be used to simply parse a wheel filename by avoiding the - methods that require the actual file contents.""" - - WHEEL_INFO = "WHEEL" - RECORD = "RECORD" - - def __init__(self, - filename, - fp=None, - append=False, - context=get_supported): - """ - :param fp: A seekable file-like object or None to open(filename). - :param append: Open archive in append mode. - :param context: Function returning list of supported tags. Wheels - must have the same context to be sortable. - """ - self.filename = filename - self.fp = fp - self.append = append - self.context = context - basename = os.path.basename(filename) - self.parsed_filename = WHEEL_INFO_RE(basename) - if not basename.endswith('.whl') or self.parsed_filename is None: - raise BadWheelFile("Bad filename '%s'" % filename) - - def __repr__(self): - return self.filename - - @property - def distinfo_name(self): - return "%s.dist-info" % self.parsed_filename.group('namever') - - @property - def datadir_name(self): - return "%s.data" % self.parsed_filename.group('namever') - - @property - def record_name(self): - return "%s/%s" % (self.distinfo_name, self.RECORD) - - @property - def wheelinfo_name(self): - return "%s/%s" % (self.distinfo_name, self.WHEEL_INFO) - - @property - def tags(self): - """A wheel file is compatible with the Cartesian product of the - period-delimited tags in its filename. - To choose a wheel file among several candidates having the same - distribution version 'ver', an installer ranks each triple of - (pyver, abi, plat) that its Python installation can run, sorting - the wheels by the best-ranked tag it supports and then by their - arity which is just len(list(compatibility_tags)). - """ - tags = self.parsed_filename.groupdict() - for pyver in tags['pyver'].split('.'): - for abi in tags['abi'].split('.'): - for plat in tags['plat'].split('.'): - yield (pyver, abi, plat) - - compatibility_tags = tags - - @property - def arity(self): - """The number of compatibility tags the wheel declares.""" - return len(list(self.compatibility_tags)) - - @property - def rank(self): - """ - Lowest index of any of this wheel's tags in self.context(), and the - arity e.g. (0, 1) - """ - return self.compatibility_rank(self.context()) - - @property - def compatible(self): - return self.rank[0] != _big_number # bad API! - - # deprecated: - def compatibility_rank(self, supported): - """Rank the wheel against the supported tags. Smaller ranks are more - compatible! - - :param supported: A list of compatibility tags that the current - Python implemenation can run. - """ - preferences = [] - for tag in self.compatibility_tags: - try: - preferences.append(supported.index(tag)) - # Tag not present - except ValueError: - pass - if len(preferences): - return (min(preferences), self.arity) - return (_big_number, 0) - - # deprecated - def supports_current_python(self, x): - assert self.context == x, 'context mismatch' - return self.compatible - - # Comparability. - # Wheels are equal if they refer to the same file. - # If two wheels are not equal, compare based on (in this order): - # 1. Name - # 2. Version - # 3. Compatibility rank - # 4. Filename (as a tiebreaker) - @property - def _sort_key(self): - return (self.parsed_filename.group('name'), - parse_version(self.parsed_filename.group('ver')), - tuple(-x for x in self.rank), - self.filename) - - def __eq__(self, other): - return self.filename == other.filename - - def __ne__(self, other): - return self.filename != other.filename - - def __lt__(self, other): - if self.context != other.context: - raise TypeError("{0}.context != {1}.context".format(self, other)) - - return self._sort_key < other._sort_key - - # XXX prune - - sn = self.parsed_filename.group('name') - on = other.parsed_filename.group('name') - if sn != on: - return sn < on - sv = parse_version(self.parsed_filename.group('ver')) - ov = parse_version(other.parsed_filename.group('ver')) - if sv != ov: - return sv < ov - # Compatibility - if self.context != other.context: - raise TypeError("{0}.context != {1}.context".format(self, other)) - sc = self.rank - oc = other.rank - if sc != None and oc != None and sc != oc: - # Smaller compatibility ranks are "better" than larger ones, - # so we have to reverse the sense of the comparison here! - return sc > oc - elif sc == None and oc != None: - return False - return self.filename < other.filename - - def __gt__(self, other): - return other < self - - def __le__(self, other): - return self == other or self < other - - def __ge__(self, other): - return self == other or other < self - - # - # Methods using the file's contents: - # - - @reify - def zipfile(self): - mode = "r" - if self.append: - mode = "a" - vzf = VerifyingZipFile(self.fp if self.fp else self.filename, mode) - if not self.append: - self.verify(vzf) - return vzf - - @reify - def parsed_wheel_info(self): - """Parse wheel metadata (the .data/WHEEL file)""" - return read_pkg_info_bytes(self.zipfile.read(self.wheelinfo_name)) - - def check_version(self): - version = self.parsed_wheel_info['Wheel-Version'] - if tuple(map(int, version.split('.'))) >= VERSION_TOO_HIGH: - raise ValueError("Wheel version is too high") - - @reify - def install_paths(self): - """ - Consult distutils to get the install paths for our dist. A dict with - ('purelib', 'platlib', 'headers', 'scripts', 'data'). - - We use the name from our filename as the dist name, which means headers - could be installed in the wrong place if the filesystem-escaped name - is different than the Name. Who cares? - """ - name = self.parsed_filename.group('name') - return get_install_paths(name) - - def install(self, force=False, overrides={}): - """ - Install the wheel into site-packages. - """ - - # Utility to get the target directory for a particular key - def get_path(key): - return overrides.get(key) or self.install_paths[key] - - # The base target location is either purelib or platlib - if self.parsed_wheel_info['Root-Is-Purelib'] == 'true': - root = get_path('purelib') - else: - root = get_path('platlib') - - # Parse all the names in the archive - name_trans = {} - for info in self.zipfile.infolist(): - name = info.filename - # Zip files can contain entries representing directories. - # These end in a '/'. - # We ignore these, as we create directories on demand. - if name.endswith('/'): - continue - - # Pathnames in a zipfile namelist are always /-separated. - # In theory, paths could start with ./ or have other oddities - # but this won't happen in practical cases of well-formed wheels. - # We'll cover the simple case of an initial './' as it's both easy - # to do and more common than most other oddities. - if name.startswith('./'): - name = name[2:] - - # Split off the base directory to identify files that are to be - # installed in non-root locations - basedir, sep, filename = name.partition('/') - if sep and basedir == self.datadir_name: - # Data file. Target destination is elsewhere - key, sep, filename = filename.partition('/') - if not sep: - raise ValueError("Invalid filename in wheel: {0}".format(name)) - target = get_path(key) - else: - # Normal file. Target destination is root - key = '' - target = root - filename = name - - # Map the actual filename from the zipfile to its intended target - # directory and the pathname relative to that directory. - dest = os.path.normpath(os.path.join(target, filename)) - name_trans[info] = (key, target, filename, dest) - - # We're now ready to start processing the actual install. The process - # is as follows: - # 1. Prechecks - is the wheel valid, is its declared architecture - # OK, etc. [[Responsibility of the caller]] - # 2. Overwrite check - do any of the files to be installed already - # exist? - # 3. Actual install - put the files in their target locations. - # 4. Update RECORD - write a suitably modified RECORD file to - # reflect the actual installed paths. - - if not force: - for info, v in name_trans.items(): - k = info.filename - key, target, filename, dest = v - if os.path.exists(dest): - raise ValueError("Wheel file {0} would overwrite {1}. Use force if this is intended".format(k, dest)) - - # Get the name of our executable, for use when replacing script - # wrapper hashbang lines. - # We encode it using getfilesystemencoding, as that is "the name of - # the encoding used to convert Unicode filenames into system file - # names". - exename = sys.executable.encode(sys.getfilesystemencoding()) - record_data = [] - record_name = self.distinfo_name + '/RECORD' - for info, (key, target, filename, dest) in name_trans.items(): - name = info.filename - source = self.zipfile.open(info) - # Skip the RECORD file - if name == record_name: - continue - ddir = os.path.dirname(dest) - if not os.path.isdir(ddir): - os.makedirs(ddir) - destination = HashingFile(open(dest, 'wb')) - if key == 'scripts': - hashbang = source.readline() - if hashbang.startswith(b'#!python'): - hashbang = b'#!' + exename + binary(os.linesep) - destination.write(hashbang) - shutil.copyfileobj(source, destination) - reldest = os.path.relpath(dest, root) - reldest.replace(os.sep, '/') - record_data.append((reldest, destination.digest(), destination.length)) - destination.close() - source.close() - # preserve attributes (especially +x bit for scripts) - attrs = info.external_attr >> 16 - if attrs: # tends to be 0 if Windows. - os.chmod(dest, info.external_attr >> 16) - - record_name = os.path.join(root, self.record_name) - with open_for_csv(record_name, 'w+') as record_file: - writer = csv.writer(record_file) - for reldest, digest, length in sorted(record_data): - writer.writerow((reldest, digest, length)) - writer.writerow((self.record_name, '', '')) - - def verify(self, zipfile=None): - """Configure the VerifyingZipFile `zipfile` by verifying its signature - and setting expected hashes for every hash in RECORD. - Caller must complete the verification process by completely reading - every file in the archive (e.g. with extractall).""" - sig = None - if zipfile is None: - zipfile = self.zipfile - zipfile.strict = True - - record_name = '/'.join((self.distinfo_name, 'RECORD')) - sig_name = '/'.join((self.distinfo_name, 'RECORD.jws')) - # tolerate s/mime signatures: - smime_sig_name = '/'.join((self.distinfo_name, 'RECORD.p7s')) - zipfile.set_expected_hash(record_name, None) - zipfile.set_expected_hash(sig_name, None) - zipfile.set_expected_hash(smime_sig_name, None) - record = zipfile.read(record_name) - - record_digest = urlsafe_b64encode(hashlib.sha256(record).digest()) - try: - sig = from_json(native(zipfile.read(sig_name))) - except KeyError: # no signature - pass - if sig: - headers, payload = signatures.verify(sig) - if payload['hash'] != "sha256=" + native(record_digest): - msg = "RECORD.sig claimed RECORD hash {0} != computed hash {1}." - raise BadWheelFile(msg.format(payload['hash'], - native(record_digest))) - - reader = csv.reader((native(r) for r in record.splitlines())) - - for row in reader: - filename = row[0] - hash = row[1] - if not hash: - if filename not in (record_name, sig_name): - sys.stderr.write("%s has no hash!\n" % filename) - continue - algo, data = row[1].split('=', 1) - assert algo == "sha256", "Unsupported hash algorithm" - zipfile.set_expected_hash(filename, urlsafe_b64decode(binary(data))) - - -class VerifyingZipFile(zipfile.ZipFile): - """ZipFile that can assert that each of its extracted contents matches - an expected sha256 hash. Note that each file must be completly read in - order for its hash to be checked.""" - - def __init__(self, file, mode="r", - compression=zipfile.ZIP_STORED, - allowZip64=False): - zipfile.ZipFile.__init__(self, file, mode, compression, allowZip64) - - self.strict = False - self._expected_hashes = {} - self._hash_algorithm = hashlib.sha256 - - def set_expected_hash(self, name, hash): - """ - :param name: name of zip entry - :param hash: bytes of hash (or None for "don't care") - """ - self._expected_hashes[name] = hash - - def open(self, name_or_info, mode="r", pwd=None): - """Return file-like object for 'name'.""" - # A non-monkey-patched version would contain most of zipfile.py - ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd) - if isinstance(name_or_info, zipfile.ZipInfo): - name = name_or_info.filename - else: - name = name_or_info - if (name in self._expected_hashes - and self._expected_hashes[name] != None): - expected_hash = self._expected_hashes[name] - try: - _update_crc_orig = ef._update_crc - except AttributeError: - warnings.warn('Need ZipExtFile._update_crc to implement ' - 'file hash verification (in Python >= 2.7)') - return ef - running_hash = self._hash_algorithm() - if hasattr(ef, '_eof'): # py33 - def _update_crc(data): - _update_crc_orig(data) - running_hash.update(data) - if ef._eof and running_hash.digest() != expected_hash: - raise BadWheelFile("Bad hash for file %r" % ef.name) - else: - def _update_crc(data, eof=None): - _update_crc_orig(data, eof=eof) - running_hash.update(data) - if eof and running_hash.digest() != expected_hash: - raise BadWheelFile("Bad hash for file %r" % ef.name) - ef._update_crc = _update_crc - elif self.strict and name not in self._expected_hashes: - raise BadWheelFile("No expected hash for file %r" % ef.name) - return ef - - def pop(self): - """Truncate the last file off this zipfile. - Assumes infolist() is in the same order as the files (true for - ordinary zip files created by Python)""" - if not self.fp: - raise RuntimeError( - "Attempt to pop from ZIP archive that was already closed") - last = self.infolist().pop() - del self.NameToInfo[last.filename] - self.fp.seek(last.header_offset, os.SEEK_SET) - self.fp.truncate() - self._didModify = True |