summaryrefslogtreecommitdiff
path: root/src/tox/session/env_select.py
blob: 5bb283b3c1e27aeecef39e21a8414f5c34ac066e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
from __future__ import annotations

import logging
import re
from argparse import ArgumentParser
from collections import Counter
from dataclasses import dataclass
from itertools import chain
from typing import TYPE_CHECKING, Any, Dict, Iterable, Iterator, List, cast

from tox.config.loader.str_convert import StrConvert
from tox.tox_env.api import ToxEnvCreateArgs
from tox.tox_env.register import REGISTER
from tox.tox_env.runner import RunToxEnv

from ..config.types import EnvList
from ..report import HandledError
from ..tox_env.errors import Skip
from ..tox_env.package import PackageToxEnv

if TYPE_CHECKING:
    from tox.session.state import State


LOGGER = logging.getLogger(__name__)


class CliEnv:
    """CLI tox env selection"""

    def __init__(self, value: None | list[str] | str = None):
        if isinstance(value, str):
            value = StrConvert().to(value, of_type=List[str], factory=None)
        self._names: list[str] | None = value

    def __iter__(self) -> Iterator[str]:
        if not self.is_all and self._names is not None:  # pragma: no branch
            yield from self._names

    def __bool__(self) -> bool:
        return bool(self._names)

    def __str__(self) -> str:
        return "ALL" if self.is_all else ("<env_list>" if self.is_default_list else ",".join(self))

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({'' if self.is_default_list else repr(str(self))})"

    def __eq__(self, other: Any) -> bool:
        return type(self) == type(other) and self._names == other._names

    def __ne__(self, other: Any) -> bool:
        return not (self == other)

    @property
    def is_all(self) -> bool:
        return self._names is not None and "ALL" in self._names

    @property
    def is_default_list(self) -> bool:
        return not (self._names or [])


def register_env_select_flags(
    parser: ArgumentParser,
    default: CliEnv | None,
    multiple: bool = True,
    group_only: bool = False,
) -> ArgumentParser:
    """
    Register environment selection flags.

    :param parser: the parser to register to
    :param default: the default value for env selection
    :param multiple: allow selecting multiple environments
    :param group_only:
    :return:
    """
    if multiple:
        group = parser.add_argument_group("select target environment(s)")
        add_to: ArgumentParser = group.add_mutually_exclusive_group(required=False)  # type: ignore
    else:
        add_to = parser
    if not group_only:
        if multiple:
            help_msg = "enumerate (ALL -> all environments, not set -> use <env_list> from config)"
        else:
            help_msg = "environment to run"
        add_to.add_argument("-e", dest="env", help=help_msg, default=default, type=CliEnv)
    if multiple:
        help_msg = "labels to evaluate"
        add_to.add_argument("-m", dest="labels", metavar="label", help=help_msg, default=[], type=str, nargs="+")
        help_msg = (
            "factors to evaluate (passing multiple factors means 'AND', passing this option multiple times means 'OR')"
        )
        add_to.add_argument(
            "-f",
            dest="factors",
            metavar="factor",
            help=help_msg,
            default=[],
            type=str,
            nargs="+",
            action="append",
        )
    help_msg = "exclude all environments selected that match this regular expression"
    add_to.add_argument("--skip-env", dest="skip_env", metavar="re", help=help_msg, default="", type=str)
    return add_to


@dataclass
class _ToxEnvInfo:
    """tox environment information"""

    env: PackageToxEnv | RunToxEnv  #: the tox environment
    is_active: bool  #: a flag indicating if the environment is marked as active in the current run
    package_skip: tuple[str, Skip] | None = None  #: if set the creation of the packaging environment failed


class EnvSelector:
    def __init__(self, state: State) -> None:
        # needs core to load the default tox environment list
        # to load the package environments of a run environments we need the run environment builder
        # to load labels we need core + the run environment
        self.on_empty_fallback_py = True
        self._warned_about: set[str] = set()  #: shared set of skipped environments that were already warned about
        self._state = state
        self._defined_envs_: None | dict[str, _ToxEnvInfo] = None
        self._pkg_env_counter: Counter[str] = Counter()
        from tox.plugin.manager import MANAGER

        self._manager = MANAGER
        self._log_handler = self._state._options.log_handler
        self._journal = self._state._journal
        self._provision: None | tuple[bool, str] = None

        self._state.conf.core.add_config("labels", Dict[str, EnvList], {}, "core labels")
        tox_env_filter_regex = getattr(state.conf.options, "skip_env", "").strip()
        self._filter_re = re.compile(tox_env_filter_regex) if tox_env_filter_regex else None

    @property
    def _cli_envs(self) -> CliEnv | None:
        return getattr(self._state.conf.options, "env", None)

    def _collect_names(self) -> Iterator[tuple[Iterable[str], bool]]:
        """:return: sources of tox environments defined with name and if is marked as target to run"""
        if self._provision is not None:  # pragma: no branch
            yield (self._provision[1],), False
        env_list, everything_active = self._state.conf.core["env_list"], False
        if self._cli_envs is None or self._cli_envs.is_default_list:
            yield env_list, True
        elif self._cli_envs.is_all:
            everything_active = True
        else:
            yield self._cli_envs, True
        yield self._state.conf, everything_active
        label_envs = dict.fromkeys(chain.from_iterable(self._state.conf.core["labels"].values()))
        if label_envs:
            yield label_envs.keys(), False

    def _env_name_to_active(self) -> dict[str, bool]:
        env_name_to_active_map = {}
        for a_collection, is_active in self._collect_names():
            for name in a_collection:
                if name not in env_name_to_active_map:
                    env_name_to_active_map[name] = is_active
        # for factor/label selection update the active flag
        if not (getattr(self._state.conf.options, "labels", []) or getattr(self._state.conf.options, "factors", [])):
            # if no active environment is defined fallback to py
            if self.on_empty_fallback_py and not any(env_name_to_active_map.values()):
                env_name_to_active_map["py"] = True
        return env_name_to_active_map

    @property
    def _defined_envs(self) -> dict[str, _ToxEnvInfo]:
        # The problem of classifying run/package environments:
        # There can be two type of tox environments: run or package. Given a tox environment name there's no easy way to
        # find out which it is.  Intuitively a run environment is any environment that's not used for packaging by
        # another run environment. To find out what are the packaging environments for a run environment you have to
        # first construct it. This implies a two phase solution: construct all environments and query their packaging
        # environments. The run environments are the ones not marked as of packaging type. This requires being able
        # to change tox environments type, if it was earlier discovered as a run environment and is marked as packaging
        # we need to redefine it, e.g. when it shows up in config as [testenv:.package] and afterwards by a run env is
        # marked as package_env.

        if self._defined_envs_ is None:
            self._defined_envs_ = {}
            failed: dict[str, Exception] = {}
            env_name_to_active = self._env_name_to_active()
            for name, is_active in env_name_to_active.items():
                if name in self._pkg_env_counter:  # already marked as packaging, nothing to do here
                    continue
                with self._log_handler.with_context(name):
                    run_env = self._build_run_env(name)
                    if run_env is None:
                        continue
                    self._defined_envs_[name] = _ToxEnvInfo(run_env, is_active)
                    pkg_name_type = run_env.get_package_env_types()
                if pkg_name_type is not None:
                    # build package env and assign it, then register the run environment which can trigger generation
                    # of additional run environments
                    start_package_env_use_counter = self._pkg_env_counter.copy()
                    try:
                        run_env.package_env = self._build_pkg_env(pkg_name_type, name, env_name_to_active)
                    except Exception as exception:
                        # if it's not a run environment,  wait to see if ends up being a packaging one -> rollback
                        failed[name] = exception
                        for key in self._pkg_env_counter - start_package_env_use_counter:
                            del self._defined_envs_[key]
                            self._state.conf.clear_env(key)
                        self._pkg_env_counter = start_package_env_use_counter
                        del self._defined_envs_[name]
                        self._state.conf.clear_env(name)
                    else:
                        try:
                            for env in run_env.package_envs:
                                # check if any packaging envs are already run and remove them
                                other_env_info = self._defined_envs_.get(env.name)
                                if other_env_info is not None and isinstance(other_env_info.env, RunToxEnv):
                                    del self._defined_envs_[env.name]  # pragma: no cover
                                    for _pkg_env in other_env_info.env.package_envs:  # pragma: no cover
                                        self._pkg_env_counter[_pkg_env.name] -= 1  # pragma: no cover
                        except Exception:
                            assert self._defined_envs_[name].package_skip is not None
            failed_to_create = failed.keys() - self._defined_envs_.keys()
            if failed_to_create:
                raise failed[next(iter(failed_to_create))]
            for name, count in self._pkg_env_counter.items():
                if not count:
                    self._defined_envs_.pop(name)  # pragma: no cover

            # reorder to as defined rather as found
            order = chain(env_name_to_active, (i for i in self._defined_envs_ if i not in env_name_to_active))
            self._defined_envs_ = {name: self._defined_envs_[name] for name in order if name in self._defined_envs_}
            self._finalize_config()
            self._mark_active()
        return self._defined_envs_

    def _finalize_config(self) -> None:
        assert self._defined_envs_ is not None
        for tox_env in self._defined_envs_.values():
            tox_env.env.conf.mark_finalized()
        self._state.conf.core.mark_finalized()

    def _build_run_env(self, name: str) -> RunToxEnv | None:
        if self._provision is not None and self._provision[0] is False and name == self._provision[1]:
            # ignore provision env unless this is a provision run
            return None
        if self._provision is not None and self._provision[0] and name != self._provision[1]:
            # ignore other envs when this is a provision run
            return None
        env_conf = self._state.conf.get_env(name, package=False)
        desc = "the tox execute used to evaluate this environment"
        env_conf.add_config(keys="runner", desc=desc, of_type=str, default=self._state.conf.options.default_runner)
        runner = REGISTER.runner(cast(str, env_conf["runner"]))
        journal = self._journal.get_env_journal(name)
        args = ToxEnvCreateArgs(env_conf, self._state.conf.core, self._state.conf.options, journal, self._log_handler)
        run_env = runner(args)
        self._manager.tox_add_env_config(env_conf, self._state)
        return run_env

    def _build_pkg_env(self, name_type: tuple[str, str], run_env_name: str, active: dict[str, bool]) -> PackageToxEnv:
        name, core_type = name_type
        with self._log_handler.with_context(name):
            if run_env_name == name:
                raise HandledError(f"{run_env_name} cannot self-package")
            missing_active = self._cli_envs is not None and self._cli_envs.is_all
            try:
                package_tox_env = self._get_package_env(core_type, name, active.get(name, missing_active))
                self._pkg_env_counter[name] += 1
                run_env: RunToxEnv = self._defined_envs_[run_env_name].env  # type: ignore
                child_package_envs = package_tox_env.register_run_env(run_env)
                try:
                    name_type = next(child_package_envs)
                    while True:
                        child_pkg_env = self._build_pkg_env(name_type, run_env_name, active)
                        self._pkg_env_counter[name_type[0]] += 1
                        name_type = child_package_envs.send(child_pkg_env)
                except StopIteration:
                    pass
            except Skip as exception:
                assert self._defined_envs_ is not None
                self._defined_envs_[run_env_name].package_skip = (name_type[0], exception)
            return package_tox_env

    def _get_package_env(self, packager: str, name: str, is_active: bool) -> PackageToxEnv:
        assert self._defined_envs_ is not None
        if name in self._defined_envs_:
            env = self._defined_envs_[name].env
            if isinstance(env, PackageToxEnv):
                if env.id() != packager:  # pragma: no branch # same env name is used by different packaging
                    msg = f"{name} is already defined as a {env.id()}, cannot be {packager} too"  # pragma: no cover
                    raise HandledError(msg)  # pragma: no cover
                return env
            else:
                self._state.conf.clear_env(name)
        package_type = REGISTER.package(packager)
        pkg_conf = self._state.conf.get_env(name, package=True)
        journal = self._journal.get_env_journal(name)
        args = ToxEnvCreateArgs(pkg_conf, self._state.conf.core, self._state.conf.options, journal, self._log_handler)
        pkg_env: PackageToxEnv = package_type(args)
        self._defined_envs_[name] = _ToxEnvInfo(pkg_env, is_active)
        self._manager.tox_add_env_config(pkg_conf, self._state)
        return pkg_env

    def _parse_factors(self) -> tuple[set[str], ...]:
        # factors is a list of lists, from the combination of nargs="+" and action="append"
        # also parse hyphenated factors into lists of factors
        # so that `-f foo-bar` and `-f foo bar` are treated equivalently
        raw_factors = getattr(self._state.conf.options, "factors", [])
        return tuple({f for factor in factor_list for f in factor.split("-")} for factor_list in raw_factors)

    def _mark_active(self) -> None:
        labels = set(getattr(self._state.conf.options, "labels", []))
        factors = self._parse_factors()

        assert self._defined_envs_ is not None
        if labels or factors:
            for env_info in self._defined_envs_.values():
                env_info.is_active = False  # if any was selected reset
            # ignore labels when provisioning will occur
            if labels and (self._provision is None or not self._provision[0]):
                for label in labels:
                    for env_name in self._state.conf.core["labels"].get(label, []):
                        self._defined_envs_[env_name].is_active = True
                for env_info in self._defined_envs_.values():
                    if labels.intersection(env_info.env.conf["labels"]):
                        env_info.is_active = True
            if factors:  # if matches mark it active
                for name, env_info in self._defined_envs_.items():
                    for factor_set in factors:
                        if factor_set.issubset(set(name.split("-"))):
                            env_info.is_active = True
                            break

    def __getitem__(self, item: str) -> RunToxEnv | PackageToxEnv:
        """
        :param item: the name of the environment
        :return: the tox environment
        """
        return self._defined_envs[item].env

    def iter(
        self,
        *,
        only_active: bool = True,
        package: bool = False,
    ) -> Iterator[str]:
        """
        Get tox environments.

        :param only_active: active environments are marked to be executed in the current target
        :param package: return package environments

        :return: an iteration of tox environments
        """
        for name, env_info in self._defined_envs.items():
            if only_active and not env_info.is_active:
                continue
            if not package and not isinstance(env_info.env, RunToxEnv):
                continue
            if self._filter_re is not None and self._filter_re.match(name):
                if name not in self._warned_about:
                    self._warned_about.add(name)
                    LOGGER.warning("skip environment %s, matches filter %r", name, self._filter_re.pattern)
                continue
            yield name

    def ensure_only_run_env_is_active(self) -> None:
        envs, active = self._defined_envs, self._env_name_to_active()
        invalid = [n for n, a in active.items() if a and isinstance(envs[n].env, PackageToxEnv)]
        if invalid:
            raise HandledError(f"cannot run packaging environment(s) {','.join(invalid)}")

    def _mark_provision(self, on: bool, provision_tox_env: str) -> None:
        self._provision = on, provision_tox_env


__all__ = [
    "register_env_select_flags",
    "EnvSelector",
    "CliEnv",
]