"""https://docs.python.org/3/library/zipapp.html"""
from __future__ import annotations
import argparse
import io
import json
import os
import shutil
import subprocess
import sys
import zipapp
import zipfile
from collections import defaultdict, deque
from email import message_from_string
from pathlib import Path, PurePosixPath
from shlex import quote
from stat import S_IWUSR
from tempfile import TemporaryDirectory
from packaging.markers import Marker
from packaging.requirements import Requirement
HERE = Path(__file__).parent.absolute()
VERSIONS = [f"3.{i}" for i in range(10, 6, -1)]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dest", default="virtualenv.pyz")
args = parser.parse_args()
with TemporaryDirectory() as folder:
packages = get_wheels_for_support_versions(Path(folder))
create_zipapp(os.path.abspath(args.dest), packages)
def create_zipapp(dest, packages):
bio = io.BytesIO()
base = PurePosixPath("__virtualenv__")
modules = defaultdict(lambda: defaultdict(dict))
dist = defaultdict(lambda: defaultdict(dict))
with zipfile.ZipFile(bio, "w") as zip_app:
write_packages_to_zipapp(base, dist, modules, packages, zip_app)
modules_json = json.dumps(modules, indent=2)
zip_app.writestr("modules.json", modules_json)
distributions_json = json.dumps(dist, indent=2)
zip_app.writestr("distributions.json", distributions_json)
zip_app.writestr("__main__.py", (HERE / "__main__zipapp.py").read_bytes())
bio.seek(0)
zipapp.create_archive(bio, dest)
print(f"zipapp created at {dest}")
def write_packages_to_zipapp(base, dist, modules, packages, zip_app):
has = set()
for name, p_w_v in packages.items():
for platform, w_v in p_w_v.items():
for wheel_data in w_v.values():
wheel = wheel_data.wheel
with zipfile.ZipFile(str(wheel)) as wheel_zip:
for filename in wheel_zip.namelist():
if name in ("virtualenv",):
dest = PurePosixPath(filename)
else:
dest = base / wheel.stem / filename
if dest.suffix in (".so", ".pyi"):
continue
if dest.suffix == ".py":
key = filename[:-3].replace("/", ".").replace("__init__", "").rstrip(".")
for version in wheel_data.versions:
modules[version][platform][key] = str(dest)
if dest.parent.suffix == ".dist-info":
for version in wheel_data.versions:
dist[version][platform][dest.parent.stem.split("-")[0]] = str(dest.parent)
dest_str = str(dest)
if dest_str in has:
continue
has.add(dest_str)
if "/tests/" in dest_str or "/docs/" in dest_str:
continue
print(dest_str)
content = wheel_zip.read(filename)
zip_app.writestr(dest_str, content)
del content
class WheelDownloader:
def __init__(self, into):
if into.exists():
shutil.rmtree(into)
into.mkdir(parents=True)
self.into = into
self.collected = defaultdict(lambda: defaultdict(dict))
self.pip_cmd = [str(Path(sys.executable).parent / "pip")]
self._cmd = self.pip_cmd + ["download", "-q", "--no-deps", "--dest", str(self.into)]
def run(self, target, versions):
whl = self.build_sdist(target)
todo = deque((version, None, whl) for version in versions)
wheel_store = {}
while todo:
version, platform, dep = todo.popleft()
dep_str = dep.name.split("-")[0] if isinstance(dep, Path) else dep.name
if dep_str in self.collected[version] and platform in self.collected[version][dep_str]:
continue
whl = self._get_wheel(dep, platform[2:] if platform and platform.startswith("==") else None, version)
if whl is None:
if dep_str not in wheel_store:
raise RuntimeError(f"failed to get {dep_str}, have {wheel_store}")
whl = wheel_store[dep_str]
else:
wheel_store[dep_str] = whl
self.collected[version][dep_str][platform] = whl
todo.extend(self.get_dependencies(whl, version))
def _get_wheel(self, dep, platform, version):
if isinstance(dep, Requirement):
before = set(self.into.iterdir())
if self._download(platform, False, "--python-version", version, "--only-binary", ":all:", str(dep)):
self._download(platform, True, "--python-version", version, str(dep))
after = set(self.into.iterdir())
new_files = after - before
# print(dep, new_files)
assert len(new_files) <= 1
if not len(new_files):
return None
new_file = next(iter(new_files))
if new_file.suffix == ".whl":
return new_file
dep = new_file
new_file = self.build_sdist(dep)
assert new_file.suffix == ".whl"
return new_file
def _download(self, platform, stop_print_on_fail, *args):
exe_cmd = self._cmd + list(args)
if platform is not None:
exe_cmd.extend(["--platform", platform])
return run_suppress_output(exe_cmd, stop_print_on_fail=stop_print_on_fail)
@staticmethod
def get_dependencies(whl, version):
with zipfile.ZipFile(str(whl), "r") as zip_file:
name = "/".join([f"{'-'.join(whl.name.split('-')[0:2])}.dist-info", "METADATA"])
with zip_file.open(name) as file_handler:
metadata = message_from_string(file_handler.read().decode("utf-8"))
deps = metadata.get_all("Requires-Dist")
if deps is None:
return
for dep in deps:
req = Requirement(dep)
markers = getattr(req.marker, "_markers", ()) or ()
if any(m for m in markers if isinstance(m, tuple) and len(m) == 3 and m[0].value == "extra"):
continue
py_versions = WheelDownloader._marker_at(markers, "python_version")
if py_versions:
marker = Marker('python_version < "1"')
marker._markers = [
markers[ver] for ver in sorted(i for i in set(py_versions) | {i - 1 for i in py_versions} if i >= 0)
]
matches_python = marker.evaluate({"python_version": version})
if not matches_python:
continue
deleted = 0
for ver in py_versions:
deleted += WheelDownloader._del_marker_at(markers, ver - deleted)
platforms = []
platform_positions = WheelDownloader._marker_at(markers, "sys_platform")
deleted = 0
for pos in platform_positions: # can only be ore meaningfully
platform = f"{markers[pos][1].value}{markers[pos][2].value}"
deleted += WheelDownloader._del_marker_at(markers, pos - deleted)
platforms.append(platform)
if not platforms:
platforms.append(None)
for platform in platforms:
yield version, platform, req
@staticmethod
def _marker_at(markers, key):
positions = []
for i, m in enumerate(markers):
if isinstance(m, tuple) and len(m) == 3 and m[0].value == key:
positions.append(i)
return positions
@staticmethod
def _del_marker_at(markers, at):
del markers[at]
deleted = 1
op = max(at - 1, 0)
if markers and isinstance(markers[op], str):
del markers[op]
deleted += 1
return deleted
def build_sdist(self, target):
if target.is_dir():
# pip 20.1 no longer guarantees this to be parallel safe, need to copy/lock
with TemporaryDirectory() as temp_folder:
folder = Path(temp_folder) / target.name
shutil.copytree(
str(target),
str(folder),
ignore=shutil.ignore_patterns(".tox", ".tox4", "venv", "__pycache__", "*.pyz"),
)
try:
return self._build_sdist(self.into, folder)
finally:
# permission error on Windows <3.7 https://bugs.python.org/issue26660
def onerror(func, path, exc_info): # noqa: U100
os.chmod(path, S_IWUSR)
func(path)
shutil.rmtree(str(folder), onerror=onerror)
else:
return self._build_sdist(target.parent / target.stem, target)
def _build_sdist(self, folder, target):
if not folder.exists() or not list(folder.iterdir()):
cmd = self.pip_cmd + ["wheel", "-w", str(folder), "--no-deps", str(target), "-q"]
run_suppress_output(cmd, stop_print_on_fail=True)
return list(folder.iterdir())[0]
def run_suppress_output(cmd, stop_print_on_fail=False):
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
encoding="utf-8",
)
out, err = process.communicate()
if stop_print_on_fail and process.returncode != 0:
print(f"exit with {process.returncode} of {' '.join(quote(i) for i in cmd)}", file=sys.stdout)
if out:
print(out, file=sys.stdout)
if err:
print(err, file=sys.stderr)
raise SystemExit(process.returncode)
return process.returncode
def get_wheels_for_support_versions(folder):
downloader = WheelDownloader(folder / "wheel-store")
downloader.run(HERE.parent, VERSIONS)
packages = defaultdict(lambda: defaultdict(lambda: defaultdict(WheelForVersion)))
for version, collected in downloader.collected.items():
for pkg, platform_to_wheel in collected.items():
name = Requirement(pkg).name
for platform, wheel in platform_to_wheel.items():
platform = platform or "==any"
wheel_versions = packages[name][platform][wheel.name]
wheel_versions.versions.append(version)
wheel_versions.wheel = wheel
for name, p_w_v in packages.items():
for platform, w_v in p_w_v.items():
print(f"{name} - {platform}")
for wheel, wheel_versions in w_v.items():
print(f"{' '.join(wheel_versions.versions)} of {wheel} (use {wheel_versions.wheel})")
return packages
class WheelForVersion:
def __init__(self, wheel=None, versions=None):
self.wheel = wheel
self.versions = versions if versions else []
def __repr__(self):
return f"{self.__class__.__name__}({self.wheel!r}, {self.versions!r})"
if __name__ == "__main__":
main()