diff options
| author | Bernát Gábor <bgabor8@bloomberg.net> | 2020-01-21 12:20:43 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-01-21 12:20:43 +0000 |
| commit | 886ba275110f0a1f4777445dab4757b90f7ac6da (patch) | |
| tree | d2d65198ee8cda7cd42b6033e84a8879adf00277 /src/virtualenv/discovery | |
| parent | 8d6af57d76edcf425beab6d53d4c14f1e49f7ca5 (diff) | |
| download | virtualenv-886ba275110f0a1f4777445dab4757b90f7ac6da.tar.gz | |
separate describe/create - check upfront if can create (#1502)
* start
Signed-off-by: Bernat Gabor <bgabor8@bloomberg.net>
* test for Windows
Signed-off-by: Bernat Gabor <bgabor8@bloomberg.net>
* test for Windows
Signed-off-by: Bernat Gabor <bgabor8@bloomberg.net>
* fix Windows symlink cache
Diffstat (limited to 'src/virtualenv/discovery')
| -rw-r--r-- | src/virtualenv/discovery/__init__.py | 1 | ||||
| -rw-r--r-- | src/virtualenv/discovery/builtin.py | 144 | ||||
| -rw-r--r-- | src/virtualenv/discovery/discover.py | 27 | ||||
| -rw-r--r-- | src/virtualenv/discovery/py_info.py | 387 | ||||
| -rw-r--r-- | src/virtualenv/discovery/py_spec.py | 123 | ||||
| -rw-r--r-- | src/virtualenv/discovery/windows/__init__.py | 21 | ||||
| -rw-r--r-- | src/virtualenv/discovery/windows/pep514.py | 162 |
7 files changed, 865 insertions, 0 deletions
diff --git a/src/virtualenv/discovery/__init__.py b/src/virtualenv/discovery/__init__.py new file mode 100644 index 0000000..01e6d4f --- /dev/null +++ b/src/virtualenv/discovery/__init__.py @@ -0,0 +1 @@ +from __future__ import absolute_import, unicode_literals diff --git a/src/virtualenv/discovery/builtin.py b/src/virtualenv/discovery/builtin.py new file mode 100644 index 0000000..8b705f0 --- /dev/null +++ b/src/virtualenv/discovery/builtin.py @@ -0,0 +1,144 @@ +from __future__ import absolute_import, unicode_literals + +import logging +import os +import sys + +import six + +from virtualenv.info import IS_WIN + +from .discover import Discover +from .py_info import CURRENT, PythonInfo +from .py_spec import PythonSpec + + +class Builtin(Discover): + def __init__(self, options): + super(Builtin, self).__init__() + self.python_spec = options.python + + @classmethod + def add_parser_arguments(cls, parser): + parser.add_argument( + "-p", + "--python", + dest="python", + metavar="py", + help="target interpreter for which to create a virtual (either absolute path or identifier string)", + default=sys.executable, + ) + + def run(self): + return get_interpreter(self.python_spec) + + def __repr__(self): + return six.ensure_str(self.__unicode__()) + + def __unicode__(self): + return "{} discover of python_spec={!r}".format(self.__class__.__name__, self.python_spec) + + +def get_interpreter(key): + spec = PythonSpec.from_string_spec(key) + logging.info("find interpreter for spec %r", spec) + proposed_paths = set() + for interpreter, impl_must_match in propose_interpreters(spec): + if interpreter.executable not in proposed_paths: + logging.info("proposed %s", interpreter) + if interpreter.satisfies(spec, impl_must_match): + logging.info("accepted target interpreter %s", interpreter) + return interpreter + proposed_paths.add(interpreter.executable) + + +def propose_interpreters(spec): + # 1. we always try with the lowest hanging fruit first, the current interpreter + yield CURRENT, True + + # 2. if it's an absolute path and exists, use that + if spec.is_abs and os.path.exists(spec.path): + yield PythonInfo.from_exe(spec.path), True + + # 3. otherwise fallback to platform default logic + if IS_WIN: + from .windows import propose_interpreters + + for interpreter in propose_interpreters(spec): + yield interpreter, True + + paths = get_paths() + # find on path, the path order matters (as the candidates are less easy to control by end user) + tested_exes = set() + for pos, path in enumerate(paths): + path = six.ensure_text(path) + logging.debug(LazyPathDump(pos, path)) + for candidate, match in possible_specs(spec): + found = check_path(candidate, path) + if found is not None: + exe = os.path.abspath(found) + if exe not in tested_exes: + tested_exes.add(exe) + interpreter = PathPythonInfo.from_exe(exe, raise_on_error=False) + if interpreter is not None: + yield interpreter, match + + +def get_paths(): + path = os.environ.get(str("PATH"), None) + if path is None: + try: + path = os.confstr("CS_PATH") + except (AttributeError, ValueError): + path = os.defpath + if not path: + paths = [] + else: + paths = [p for p in path.split(os.pathsep) if os.path.exists(p)] + return paths + + +class LazyPathDump(object): + def __init__(self, pos, path): + self.pos = pos + self.path = path + + def __repr__(self): + return six.ensure_str(self.__unicode__()) + + def __unicode__(self): + content = "discover from PATH[{}]:{} with =>".format(self.pos, self.path) + for file_name in os.listdir(self.path): + try: + file_path = os.path.join(self.path, file_name) + if os.path.isdir(file_path) or not os.access(file_path, os.X_OK): + continue + except OSError: + pass + content += " " + content += file_name + return content + + +def check_path(candidate, path): + _, ext = os.path.splitext(candidate) + if sys.platform == "win32" and ext != ".exe": + candidate = candidate + ".exe" + if os.path.isfile(candidate): + return candidate + candidate = os.path.join(path, candidate) + if os.path.isfile(candidate): + return candidate + return None + + +def possible_specs(spec): + # 4. then maybe it's something exact on PATH - if it was direct lookup implementation no longer counts + yield spec.str_spec, False + # 5. or from the spec we can deduce a name on path that matches + for exe, match in spec.generate_names(): + yield exe, match + + +class PathPythonInfo(PythonInfo): + """""" diff --git a/src/virtualenv/discovery/discover.py b/src/virtualenv/discovery/discover.py new file mode 100644 index 0000000..13e258f --- /dev/null +++ b/src/virtualenv/discovery/discover.py @@ -0,0 +1,27 @@ +from __future__ import absolute_import, unicode_literals + +from abc import ABCMeta, abstractmethod + +import six + + +@six.add_metaclass(ABCMeta) +class Discover(object): + def __init__(self): + self._has_run = False + self._interpreter = None + + @classmethod + def add_parser_arguments(cls, parser): + raise NotImplementedError + + @abstractmethod + def run(self): + raise NotImplementedError + + @property + def interpreter(self): + if self._has_run is False: + self._interpreter = self.run() + self._has_run = True + return self._interpreter diff --git a/src/virtualenv/discovery/py_info.py b/src/virtualenv/discovery/py_info.py new file mode 100644 index 0000000..57da58d --- /dev/null +++ b/src/virtualenv/discovery/py_info.py @@ -0,0 +1,387 @@ +""" +The PythonInfo contains information about a concrete instance of a Python interpreter + +Note: this file is also used to query target interpreters, so can only use standard library methods +""" +from __future__ import absolute_import, print_function + +import json +import logging +import os +import pipes +import platform +import re +import sys +import sysconfig +from collections import OrderedDict, namedtuple +from distutils.command.install import SCHEME_KEYS +from distutils.dist import Distribution + +VersionInfo = namedtuple("VersionInfo", ["major", "minor", "micro", "releaselevel", "serial"]) + + +def _get_path_extensions(): + return list(OrderedDict.fromkeys([""] + os.environ.get("PATHEXT", "").lower().split(os.pathsep))) + + +EXTENSIONS = _get_path_extensions() +_CONF_VAR_RE = re.compile(r"\{\w+\}") + + +class PythonInfo(object): + """Contains information for a Python interpreter""" + + def __init__(self): + def u(v): + return v.decode("utf-8") if isinstance(v, bytes) else v + + # qualifies the python + self.platform = u(sys.platform) + self.implementation = u(platform.python_implementation()) + if self.implementation == "PyPy": + self.pypy_version_info = tuple(u(i) for i in sys.pypy_version_info) + + # this is a tuple in earlier, struct later, unify to our own named tuple + self.version_info = VersionInfo(*list(u(i) for i in sys.version_info)) + self.architecture = 64 if sys.maxsize > 2 ** 32 else 32 + + self.executable = u(sys.executable) # executable we were called with + self.original_executable = u(self.executable) + self.base_executable = u(getattr(sys, "_base_executable", None)) # some platforms may set this + + self.version = u(sys.version) + self.os = u(os.name) + + # information about the prefix - determines python home + self.prefix = u(getattr(sys, "prefix", None)) # prefix we think + self.base_prefix = u(getattr(sys, "base_prefix", None)) # venv + self.real_prefix = u(getattr(sys, "real_prefix", None)) # old virtualenv + + # information about the exec prefix - dynamic stdlib modules + self.base_exec_prefix = u(getattr(sys, "base_exec_prefix", None)) + self.exec_prefix = u(getattr(sys, "exec_prefix", None)) + + try: + __import__("venv") + has = True + except ImportError: + has = False + self.has_venv = has + self.path = [u(i) for i in sys.path] + self.file_system_encoding = u(sys.getfilesystemencoding()) + self.stdout_encoding = u(getattr(sys.stdout, "encoding", None)) + + self.sysconfig_paths = {u(i): u(sysconfig.get_path(i, expand=False)) for i in sysconfig.get_path_names()} + config_var_keys = set() + for element in self.sysconfig_paths.values(): + for k in _CONF_VAR_RE.findall(element): + config_var_keys.add(u(k[1:-1])) + + self.sysconfig_vars = {u(i): u(sysconfig.get_config_var(i)) for i in config_var_keys} + if self.implementation == "PyPy" and sys.version_info.major == 2: + self.sysconfig_vars[u"implementation_lower"] = u"python" + + self.distutils_install = {u(k): u(v) for k, v in self._distutils_install().items()} + self.system_stdlib = self.sysconfig_path( + "stdlib", + {k: (self.system_prefix if v.startswith(self.prefix) else v) for k, v in self.sysconfig_vars.items()}, + ) + self._creators = None + + @staticmethod + def _distutils_install(): + # follow https://github.com/pypa/pip/blob/master/src/pip/_internal/locations.py#L95 + d = Distribution({"script_args": "--no-user-cfg"}) + d.parse_config_files() + i = d.get_command_obj("install", create=True) + i.prefix = "a" + i.finalize_options() + result = {key: (getattr(i, "install_{}".format(key))[1:]).lstrip(os.sep) for key in SCHEME_KEYS} + return result + + @property + def version_str(self): + return ".".join(str(i) for i in self.version_info[0:3]) + + @property + def version_release_str(self): + return ".".join(str(i) for i in self.version_info[0:2]) + + @property + def python_name(self): + version_info = self.version_info + return "python{}.{}".format(version_info.major, version_info.minor) + + @property + def is_old_virtualenv(self): + return self.real_prefix is not None + + @property + def is_venv(self): + return self.base_prefix is not None and self.version_info.major == 3 + + def __unicode__(self): + content = repr(self) + if sys.version_info == 2: + content = content.decode("utf-8") + return content + + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.__dict__) + + def __str__(self): + content = "{}({})".format( + self.__class__.__name__, + ", ".join( + "{}={}".format(k, v) + for k, v in ( + ( + "spec", + "{}{}-{}".format( + self.implementation, ".".join(str(i) for i in self.version_info), self.architecture + ), + ), + ("exe", self.executable), + ("original" if self.original_executable != self.executable else None, self.original_executable), + ( + "base" + if self.base_executable is not None and self.base_executable != self.executable + else None, + self.base_executable, + ), + ("platform", self.platform), + ("version", repr(self.version)), + ("encoding_fs_io", "{}-{}".format(self.file_system_encoding, self.stdout_encoding)), + ) + if k is not None + ), + ) + return content + + def to_json(self): + # don't save calculated paths, as these are non primitive types + data = {var: (getattr(self, var) if var not in ("_creators",) else None) for var in vars(self)} + # noinspection PyProtectedMember + data["version_info"] = data["version_info"]._asdict() # namedtuple to dictionary + return json.dumps(data, indent=2) + + @classmethod + def from_json(cls, payload): + data = json.loads(payload) + data["version_info"] = VersionInfo(**data["version_info"]) # restore this to a named tuple structure + result = cls() + result.__dict__ = {k: v for k, v in data.items()} + return result + + @property + def system_prefix(self): + return self.real_prefix or self.base_prefix or self.prefix + + @property + def system_exec_prefix(self): + return self.real_prefix or self.base_exec_prefix or self.exec_prefix + + @property + def system_executable(self): + env_prefix = self.real_prefix or self.base_prefix + if env_prefix: # if this is a virtual environment + if self.real_prefix is None and self.base_executable is not None: # use the saved host if present + return self.base_executable + # otherwise fallback to discovery mechanism + return self.find_exe_based_of(inside_folder=env_prefix) + else: + # need original executable here, as if we need to copy we want to copy the interpreter itself, not the + # setup script things may be wrapped up in + return self.original_executable + + def find_exe_based_of(self, inside_folder): + # we don't know explicitly here, do some guess work - our executable name should tell + possible_names = self._find_possible_exe_names() + possible_folders = self._find_possible_folders(inside_folder) + for folder in possible_folders: + for name in possible_names: + candidate = os.path.join(folder, name) + if os.path.exists(candidate): + info = PythonInfo.from_exe(candidate) + keys = {"implementation", "architecture", "version_info"} + if all(getattr(info, k) == getattr(self, k) for k in keys): + return candidate + what = "|".join(possible_names) # pragma: no cover + raise RuntimeError( + "failed to detect {} in {}".format(what, os.pathsep.join(possible_folders)) + ) # pragma: no cover + + def _find_possible_folders(self, inside_folder): + candidate_folder = OrderedDict() + executables = OrderedDict() + executables[self.executable] = None + executables[self.original_executable] = None + for exe in executables.keys(): + base = os.path.dirname(exe) + # following path pattern of the current + if base.startswith(self.prefix): + relative = base[len(self.prefix) :] + candidate_folder["{}{}".format(inside_folder, relative)] = None + + # or at root level + candidate_folder[inside_folder] = None + + return list(candidate_folder.keys()) + + def _find_possible_exe_names(self): + name_candidate = OrderedDict() + for name in self._possible_base(): + for at in (3, 2, 1, 0): + version = ".".join(str(i) for i in self.version_info[:at]) + for arch in ["-{}".format(self.architecture), ""]: + for ext in EXTENSIONS: + candidate = "{}{}{}{}".format(name, version, arch, ext) + name_candidate[candidate] = None + return list(name_candidate.keys()) + + def _possible_base(self): + possible_base = OrderedDict() + possible_base[os.path.splitext(os.path.basename(self.executable))[0]] = None + possible_base[self.implementation] = None + # python is always the final option as in practice is used by multiple implementation as exe name + if "python" in possible_base: + del possible_base["python"] + possible_base["python"] = None + for base in possible_base: + lower = base.lower() + yield lower + from virtualenv.info import fs_is_case_sensitive + + if fs_is_case_sensitive(): + if base != lower: + yield base + upper = base.upper() + if upper != base: + yield upper + + _cache_from_exe = {} + + @classmethod + def clear_cache(cls): + cls._cache_from_exe.clear() + + @classmethod + def from_exe(cls, exe, raise_on_error=True): + # this method is not used by itself, so here and called functions can import stuff locally + from virtualenv.util.path import Path + + path = Path(exe).resolve() + if path in cls._cache_from_exe: + result, failure = cls._cache_from_exe[path] + else: + failure, result = cls._load_for_exe(exe) + cls._cache_from_exe[path] = result, failure + if failure is not None: + if raise_on_error: + raise failure + else: + logging.warning("%s", str(failure)) + return result + + @classmethod + def _load_for_exe(cls, exe): + from virtualenv.util.subprocess import subprocess, Popen + + cmd = cls._get_exe_cmd(exe) + # noinspection DuplicatedCode + # this is duplicated here because this file is executed on its own, so cannot be refactored otherwise + logging.debug(u"get interpreter info via cmd: %s", Cmd(cmd)) + try: + process = Popen( + cmd, universal_newlines=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE + ) + out, err = process.communicate() + code = process.returncode + except OSError as os_error: + out, err, code = "", os_error.strerror, os_error.errno + result, failure = None, None + if code == 0: + result = cls.from_json(out) + result.executable = exe # keep original executable as this may contain initialization code + else: + msg = "failed to query {} with code {}{}{}".format( + exe, code, " out: {!r}".format(out) if out else "", " err: {!r}".format(err) if err else "" + ) + failure = RuntimeError(msg) + return failure, result + + @classmethod + def _get_exe_cmd(cls, exe): + cmd = [exe, "-s"] + from virtualenv.info import IS_ZIPAPP + + self_path = os.path.abspath(__file__) + if IS_ZIPAPP: + from virtualenv.util.zipapp import extract_to_app_data + from virtualenv.util.path import Path + + path = str(extract_to_app_data(Path(self_path))) + else: + path = "{}.py".format(os.path.splitext(self_path)[0]) + cmd.append(path) + return cmd + + def satisfies(self, spec, impl_must_match): + """check if a given specification can be satisfied by the this python interpreter instance""" + if self.executable == spec.path: # if the path is a our own executable path we're done + return True + + if spec.path is not None: # if path set, and is not our original executable name, this does not match + root, _ = os.path.splitext(os.path.basename(self.original_executable)) + if root != spec.path: + return False + + if impl_must_match: + if spec.implementation is not None and spec.implementation != self.implementation: + return False + + if spec.architecture is not None and spec.architecture != self.architecture: + return False + + for our, req in zip(self.version_info[0:3], (spec.major, spec.minor, spec.patch)): + if req is not None and our is not None and our != req: + return False + return True + + def sysconfig_path(self, key, config_var=None, sep=os.sep): + pattern = self.sysconfig_paths[key] + if config_var is None: + config_var = self.sysconfig_vars + else: + base = {k: v for k, v in self.sysconfig_vars.items()} + base.update(config_var) + config_var = base + return pattern.format(**config_var).replace(u"/", sep) + + def creators(self, refresh=False): + if self._creators is None or refresh is True: + from virtualenv.run.plugin.creators import CreatorSelector + + self._creators = CreatorSelector.for_interpreter(self) + return self._creators + + +class Cmd(object): + def __init__(self, cmd, env=None): + self.cmd = cmd + self.env = env + + def __repr__(self): + def e(v): + return v.decode("utf-8") if isinstance(v, bytes) else v + + cmd_repr = e(" ").join(pipes.quote(e(c)) for c in self.cmd) + if self.env is not None: + cmd_repr += e(" env of {!r}").format(self.env) + return cmd_repr + + +CURRENT = PythonInfo() + +if __name__ == "__main__": + print(CURRENT.to_json()) diff --git a/src/virtualenv/discovery/py_spec.py b/src/virtualenv/discovery/py_spec.py new file mode 100644 index 0000000..62d7920 --- /dev/null +++ b/src/virtualenv/discovery/py_spec.py @@ -0,0 +1,123 @@ +"""A Python specification is an abstract requirement definition of a interpreter""" +from __future__ import absolute_import, unicode_literals + +import os +import re +import sys +from collections import OrderedDict + +import six + +from virtualenv.info import fs_is_case_sensitive + +PATTERN = re.compile(r"^(?P<impl>[a-zA-Z]+)?(?P<version>[0-9.]+)?(?:-(?P<arch>32|64))?$") +IS_WIN = sys.platform == "win32" + + +class PythonSpec(object): + """Contains specification about a Python Interpreter""" + + def __init__(self, str_spec, implementation, major, minor, patch, architecture, path): + self.str_spec = str_spec + self.implementation = implementation + self.major = major + self.minor = minor + self.patch = patch + self.architecture = architecture + self.path = path + + @classmethod + def from_string_spec(cls, string_spec): + impl, major, minor, patch, arch, path = None, None, None, None, None, None + if os.path.isabs(string_spec): + path = string_spec + else: + ok = False + match = re.match(PATTERN, string_spec) + if match: + + def _int_or_none(val): + return None if val is None else int(val) + + try: + groups = match.groupdict() + version = groups["version"] + if version is not None: + versions = tuple(int(i) for i in version.split(".") if i) + if len(versions) > 3: + raise ValueError + if len(versions) == 3: + major, minor, patch = versions + elif len(versions) == 2: + major, minor = versions + elif len(versions) == 1: + version_data = versions[0] + major = int(str(version_data)[0]) # first digit major + if version_data > 9: + minor = int(str(version_data)[1:]) + ok = True + except ValueError: + pass + else: + impl = groups["impl"] + if impl == "py" or impl == "python": + impl = "CPython" + arch = _int_or_none(groups["arch"]) + + if not ok: + path = string_spec + + return cls(string_spec, impl, major, minor, patch, arch, path) + + def generate_names(self): + impls = OrderedDict() + if self.implementation: + # first consider implementation as it is + impls[self.implementation] = False + if fs_is_case_sensitive(): + # for case sensitive file systems consider lower and upper case versions too + # trivia: MacBooks and all pre 2018 Windows-es were case insensitive by default + impls[self.implementation.lower()] = False + impls[self.implementation.upper()] = False + impls["python"] = True # finally consider python as alias, implementation must match now + version = self.major, self.minor, self.patch + try: + version = version[: version.index(None)] + except ValueError: + pass + for impl, match in impls.items(): + for at in range(len(version), -1, -1): + cur_ver = version[0:at] + spec = "{}{}".format(impl, ".".join(str(i) for i in cur_ver)) + yield spec, match + + @property + def is_abs(self): + return self.path is not None and os.path.isabs(self.path) + + def satisfies(self, spec): + """called when there's a candidate metadata spec to see if compatible - e.g. PEP-514 on Windows""" + if spec.is_abs and self.is_abs and self.path != spec.path: + return False + if spec.implementation is not None and spec.implementation != self.implementation: + return False + if spec.architecture is not None and spec.architecture != self.architecture: + return False + + for our, req in zip((self.major, self.minor, self.patch), (spec.major, spec.minor, spec.patch)): + if req is not None and our is not None and our != req: + return False + return True + + def __unicode__(self): + return "{}({})".format( + type(self).__name__, + ", ".join( + "{}={}".format(k, getattr(self, k)) + for k in ("str_spec", "implementation", "major", "minor", "patch", "architecture", "path") + if getattr(self, k) is not None + ), + ) + + def __repr__(self): + return six.ensure_str(self.__unicode__()) diff --git a/src/virtualenv/discovery/windows/__init__.py b/src/virtualenv/discovery/windows/__init__.py new file mode 100644 index 0000000..f321d7f --- /dev/null +++ b/src/virtualenv/discovery/windows/__init__.py @@ -0,0 +1,21 @@ +from __future__ import absolute_import, unicode_literals + +from ..py_info import PythonInfo +from ..py_spec import PythonSpec +from .pep514 import discover_pythons + + +class Pep514PythonInfo(PythonInfo): + """""" + + +def propose_interpreters(spec): + # see if PEP-514 entries are good + for name, major, minor, arch, exe, _ in discover_pythons(): + # pre-filter + registry_spec = PythonSpec(None, name, major, minor, None, arch, exe) + if registry_spec.satisfies(spec): + interpreter = Pep514PythonInfo.from_exe(exe, raise_on_error=False) + if interpreter is not None: + if interpreter.satisfies(spec, impl_must_match=True): + yield interpreter diff --git a/src/virtualenv/discovery/windows/pep514.py b/src/virtualenv/discovery/windows/pep514.py new file mode 100644 index 0000000..8bdd30c --- /dev/null +++ b/src/virtualenv/discovery/windows/pep514.py @@ -0,0 +1,162 @@ +"""Implement https://www.python.org/dev/peps/pep-0514/ to discover interpreters - Windows only""" +from __future__ import absolute_import, print_function, unicode_literals + +import os +import re +from logging import basicConfig, getLogger + +import six + +if six.PY3: + import winreg +else: + # noinspection PyUnresolvedReferences + import _winreg as winreg + +LOGGER = getLogger(__name__) + + +def enum_keys(key): + at = 0 + while True: + try: + yield winreg.EnumKey(key, at) + except OSError: + break + at += 1 + + +def get_value(key, value_name): + try: + return winreg.QueryValueEx(key, value_name)[0] + except OSError: + return None + + +def discover_pythons(): + for hive, hive_name, key, flags, default_arch in [ + (winreg.HKEY_CURRENT_USER, "HKEY_CURRENT_USER", r"Software\Python", 0, 64), + (winreg.HKEY_LOCAL_MACHINE, "HKEY_LOCAL_MACHINE", r"Software\Python", winreg.KEY_WOW64_64KEY, 64), + (winreg.HKEY_LOCAL_MACHINE, "HKEY_LOCAL_MACHINE", r"Software\Python", winreg.KEY_WOW64_32KEY, 32), + ]: + for spec in process_set(hive, hive_name, key, flags, default_arch): + yield spec + + +def process_set(hive, hive_name, key, flags, default_arch): + try: + with winreg.OpenKeyEx(hive, key, 0, winreg.KEY_READ | flags) as root_key: + for company in enum_keys(root_key): + if company == "PyLauncher": # reserved + continue + for spec in process_company(hive_name, company, root_key, default_arch): + yield spec + except OSError: + pass + + +def process_company(hive_name, company, root_key, default_arch): + with winreg.OpenKeyEx(root_key, company) as company_key: + for tag in enum_keys(company_key): + spec = process_tag(hive_name, company, company_key, tag, default_arch) + if spec is not None: + yield spec + + +def process_tag(hive_name, company, company_key, tag, default_arch): + with winreg.OpenKeyEx(company_key, tag) as tag_key: + version = load_version_data(hive_name, company, tag, tag_key) + if version is not None: # if failed to get version bail + major, minor, _ = version + arch = load_arch_data(hive_name, company, tag, tag_key, default_arch) + if arch is not None: + exe_data = load_exe(hive_name, company, company_key, tag) + if exe_data is not None: + exe, args = exe_data + name = str("python") if company == "PythonCore" else company + return name, major, minor, arch, exe, args + + +def load_exe(hive_name, company, company_key, tag): + key_path = "{}/{}/{}".format(hive_name, company, tag) + try: + with winreg.OpenKeyEx(company_key, r"{}\InstallPath".format(tag)) as ip_key: + with ip_key: + exe = get_value(ip_key, "ExecutablePath") + if exe is None: + ip = get_value(ip_key, None) + if ip is None: + msg(key_path, "no ExecutablePath or default for it") + + else: + exe = os.path.join(ip, str("python.exe")) + if exe is not None and os.path.exists(exe): + args = get_value(ip_key, "ExecutableArguments") + return exe, args + else: + msg(key_path, "exe does not exists {}".format(key_path, exe)) + except OSError: + msg("{}/{}".format(key_path, "InstallPath"), "missing") + return None + + +def load_arch_data(hive_name, company, tag, tag_key, default_arch): + arch_str = get_value(tag_key, "SysArchitecture") + if arch_str is not None: + key_path = "{}/{}/{}/SysArchitecture".format(hive_name, company, tag) + try: + return parse_arch(arch_str) + except ValueError as sys_arch: + msg(key_path, sys_arch) + return default_arch + + +def parse_arch(arch_str): + if isinstance(arch_str, six.string_types): + match = re.match(r"^(\d+)bit$", arch_str) + if match: + return int(next(iter(match.groups()))) + error = "invalid format {}".format(arch_str) + else: + error = "arch is not string: {}".format(repr(arch_str)) + raise ValueError(error) + + +def load_version_data(hive_name, company, tag, tag_key): + for candidate, key_path in [ + (get_value(tag_key, "SysVersion"), "{}/{}/{}/SysVersion".format(hive_name, company, tag)), + (tag, "{}/{}/{}".format(hive_name, company, tag)), + ]: + if candidate is not None: + try: + return parse_version(candidate) + except ValueError as sys_version: + msg(key_path, sys_version) + return None + + +def parse_version(version_str): + if isinstance(version_str, six.string_types): + match = re.match(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?$", version_str) + if match: + return tuple(int(i) if i is not None else None for i in match.groups()) + error = "invalid format {}".format(version_str) + else: + error = "version is not string: {}".format(repr(version_str)) + raise ValueError(error) + + +def msg(path, what): + LOGGER.warning("PEP-514 violation in Windows Registry at {} error: {}".format(path, what)) + + +def _run(): + basicConfig() + interpreters = [] + for spec in discover_pythons(): + interpreters.append(repr(spec)) + print("\n".join(sorted(interpreters))) + + +if __name__ == "__main__": + _run() |
