From d1660231d257bdccaed9c01497b0ca79390e8f6f Mon Sep 17 00:00:00 2001 From: Ronny Pfannschmidt Date: Wed, 22 Mar 2023 16:27:02 +0100 Subject: breaking: purge do/do_ex --- src/setuptools_scm/git.py | 149 ++++++++++++++++++++------------------ src/setuptools_scm/hg.py | 10 ++- src/setuptools_scm/hg_git.py | 73 +++++++++++-------- src/setuptools_scm/scm_workdir.py | 13 +--- src/setuptools_scm/utils.py | 21 +----- 5 files changed, 133 insertions(+), 133 deletions(-) (limited to 'src') diff --git a/src/setuptools_scm/git.py b/src/setuptools_scm/git.py index aa82538..dc80fa3 100644 --- a/src/setuptools_scm/git.py +++ b/src/setuptools_scm/git.py @@ -1,23 +1,25 @@ from __future__ import annotations +import dataclasses import logging import os import re +import shlex import warnings from datetime import date from datetime import datetime -from os.path import isfile -from os.path import join from os.path import samefile +from pathlib import Path +from subprocess import CompletedProcess from typing import Callable +from typing import Sequence from typing import TYPE_CHECKING from . import _types as _t from . import Configuration +from ._run_cmd import run from .scm_workdir import Workdir -from .utils import _CmdResult from .utils import data_from_mime -from .utils import do_ex from .utils import require_command from .version import meta from .version import ScmVersion @@ -44,6 +46,12 @@ DEFAULT_DESCRIBE = [ ] +def run_git( + args: Sequence[str | os.PathLike[str]], rootdir: Path, *, check: bool = False +) -> CompletedProcess[str]: + return run(["git", "--git-dir", rootdir / ".git", *args], cwd=rootdir, check=check) + + class GitWorkdir(Workdir): """experimental, may change at any time""" @@ -53,12 +61,10 @@ class GitWorkdir(Workdir): def from_potential_worktree(cls, wd: _t.PathT) -> GitWorkdir | None: require_command(cls.COMMAND) wd = os.path.abspath(wd) - git_dir = join(wd, ".git") - real_wd, _, ret = do_ex( - ["git", "--git-dir", git_dir, "rev-parse", "--show-prefix"], wd - ) - real_wd = real_wd[:-1] # remove the trailing pathsep - if ret: + res = run_git(["rev-parse", "--show-prefix"], Path(wd)) + + real_wd = res.stdout[:-1] # remove the trailing pathsep + if res.returncode: return None if not real_wd: real_wd = wd @@ -74,59 +80,52 @@ class GitWorkdir(Workdir): return cls(real_wd) - def do_ex_git(self, cmd: list[str]) -> _CmdResult: - return self.do_ex(["git", "--git-dir", join(self.path, ".git")] + cmd) - def is_dirty(self) -> bool: - out, _, _ = self.do_ex_git(["status", "--porcelain", "--untracked-files=no"]) - return bool(out) + res = run_git(["status", "--porcelain", "--untracked-files=no"], self.path) + return bool(res.stdout) def get_branch(self) -> str | None: - branch, err, ret = self.do_ex_git(["rev-parse", "--abbrev-ref", "HEAD"]) - if ret: - log.info("branch= err %s %s %s", branch, err, ret) - branch, err, ret = self.do_ex_git(["symbolic-ref", "--short", "HEAD"]) - if ret: - log.warning("branch err (symbolic-ref): %s %s %s", branch, err, ret) - return None - return branch + res = run_git(["rev-parse", "--abbrev-ref", "HEAD"], self.path) + if res.returncode: + log.info("branch err (abbrev-err) %s", res) + res = run_git(["symbolic-ref", "--short", "HEAD"], self.path) + if res.returncode: + log.warning("branch err (symbolic-ref): %s", res) + return None + return res.stdout def get_head_date(self) -> date | None: - timestamp, err, ret = self.do_ex_git( - ["-c", "log.showSignature=false", "log", "-n", "1", "HEAD", "--format=%cI"] + res = run_git( + ["-c", "log.showSignature=false", "log", "-n", "1", "HEAD", "--format=%cI"], + self.path, ) - if ret: - log.warning("timestamp err %s %s %s", timestamp, err, ret) + if res.returncode: + log.warning("timestamp err %s", res) return None - # TODO, when dropping python3.6 use fromiso - date_part = timestamp.split("T")[0] - if "%c" in date_part: - log.warning("git too old -> timestamp is %s", timestamp) + if "%c" in res.stdout: + log.warning("git too old -> timestamp is %s", res.stdout) return None - return datetime.strptime(date_part, r"%Y-%m-%d").date() + return datetime.fromisoformat(res.stdout).date() def is_shallow(self) -> bool: - return isfile(join(self.path, ".git/shallow")) + return self.path.joinpath(".git/shallow").is_file() def fetch_shallow(self) -> None: - self.do_ex_git(["fetch", "--unshallow"]) + run_git(["fetch", "--unshallow"], self.path, check=True) def node(self) -> str | None: - node, _, ret = self.do_ex_git(["rev-parse", "--verify", "--quiet", "HEAD"]) - if not ret: - return node[:7] + res = run_git(["rev-parse", "--verify", "--quiet", "HEAD"], self.path) + if not res.returncode: + return res.stdout[:7] else: return None def count_all_nodes(self) -> int: - revs, _, _ = self.do_ex_git(["rev-list", "HEAD"]) - return revs.count("\n") + 1 + res = run_git(["rev-list", "HEAD"], self.path) + return res.stdout.count("\n") + 1 - def default_describe(self) -> _CmdResult: - git_dir = join(self.path, ".git") - return self.do_ex( - DEFAULT_DESCRIBE[:1] + ["--git-dir", git_dir] + DEFAULT_DESCRIBE[1:] - ) + def default_describe(self) -> CompletedProcess[str]: + return run_git(DEFAULT_DESCRIBE[1:], self.path) def warn_on_shallow(wd: GitWorkdir) -> None: @@ -150,7 +149,7 @@ def fail_on_shallow(wd: GitWorkdir) -> None: ) -def get_working_directory(config: Configuration, root: str) -> GitWorkdir | None: +def get_working_directory(config: Configuration, root: _t.PathT) -> GitWorkdir | None: """ Return the working directory (``GitWorkdir``). """ @@ -165,7 +164,7 @@ def get_working_directory(config: Configuration, root: str) -> GitWorkdir | None def parse( - root: str, + root: _t.PathT, config: Configuration, describe_command: str | list[str] | None = None, pre_parse: Callable[[GitWorkdir], None] = warn_on_shallow, @@ -182,31 +181,49 @@ def parse( return None -def _git_parse_inner( - config: Configuration, +def version_from_describe( wd: GitWorkdir | hg_git.GitWorkdirHgClient, - pre_parse: None | (Callable[[GitWorkdir | hg_git.GitWorkdirHgClient], None]) = None, - describe_command: _t.CMD_TYPE | None = None, -) -> ScmVersion: - if pre_parse: - pre_parse(wd) + config: Configuration, + describe_command: _t.CMD_TYPE | None, +) -> ScmVersion | None: + pass if config.git_describe_command is not None: describe_command = config.git_describe_command if describe_command is not None: - out, _, ret = wd.do_ex(describe_command) + if isinstance(describe_command, str): + describe_command = shlex.split(describe_command) + # todo: figure how ot ensure git with gitdir gets correctly invoked + assert describe_command[0] == "git", describe_command + describe_res = run_git(describe_command[1:], wd.path) else: - out, _, ret = wd.default_describe() + describe_res = wd.default_describe() + distance: int | None node: str | None - if ret == 0: - tag, distance, node, dirty = _git_parse_describe(out) + if describe_res.returncode == 0: + tag, distance, node, dirty = _git_parse_describe(describe_res.stdout) if distance == 0 and not dirty: distance = None - else: + return meta(tag=tag, distance=distance, dirty=dirty, node=node, config=config) + return None + + +def _git_parse_inner( + config: Configuration, + wd: GitWorkdir | hg_git.GitWorkdirHgClient, + pre_parse: None | (Callable[[GitWorkdir | hg_git.GitWorkdirHgClient], None]) = None, + describe_command: _t.CMD_TYPE | None = None, +) -> ScmVersion: + if pre_parse: + pre_parse(wd) + + version = version_from_describe(wd, config, describe_command) + + if version is None: # If 'git git_describe_command' failed, try to get the information otherwise. - tag = "0.0" + tag = config.version_cls("0.0") node = wd.node() if node is None: distance = 0 @@ -214,19 +231,13 @@ def _git_parse_inner( distance = wd.count_all_nodes() node = "g" + node dirty = wd.is_dirty() + version = meta( + tag=tag, distance=distance, dirty=dirty, node=node, config=config + ) branch = wd.get_branch() node_date = wd.get_head_date() or date.today() - - return meta( - tag, - branch=branch, - node=node, - node_date=node_date, - distance=distance, - dirty=dirty, - config=config, - ) + return dataclasses.replace(version, branch=branch, node_date=node_date) def _git_parse_describe( diff --git a/src/setuptools_scm/hg.py b/src/setuptools_scm/hg.py index d1f6102..a3b7d59 100644 --- a/src/setuptools_scm/hg.py +++ b/src/setuptools_scm/hg.py @@ -29,7 +29,7 @@ class HgWorkdir(Workdir): @classmethod def from_potential_worktree(cls, wd: _t.PathT) -> HgWorkdir | None: require_command(cls.COMMAND) - res = _run("hg root", wd) + res = _run(["hg", "root"], wd) if res.returncode: return None return cls(res.stdout) @@ -47,9 +47,11 @@ class HgWorkdir(Workdir): # mainly used to emulate Git branches, which is already supported with # the dedicated class GitWorkdirHgClient) - branch, dirty_str, dirty_date = self.do( - ["hg", "id", "-T", "{branch}\n{if(dirty, 1, 0)}\n{date|shortdate}"] - ).split("\n") + branch, dirty_str, dirty_date = _run( + ["hg", "id", "-T", "{branch}\n{if(dirty, 1, 0)}\n{date|shortdate}"], + cwd=self.path, + check=True, + ).stdout.split("\n") dirty = bool(int(dirty_str)) node_date = datetime.date.fromisoformat(dirty_date if dirty else node_date_str) diff --git a/src/setuptools_scm/hg_git.py b/src/setuptools_scm/hg_git.py index 461215b..884c290 100644 --- a/src/setuptools_scm/hg_git.py +++ b/src/setuptools_scm/hg_git.py @@ -5,17 +5,21 @@ import os from contextlib import suppress from datetime import date from datetime import datetime +from subprocess import CompletedProcess from . import _types as _t +from ._run_cmd import run as _run from .git import GitWorkdir from .hg import HgWorkdir -from .utils import _CmdResult -from .utils import do_ex from .utils import require_command log = logging.getLogger(__name__) -_FAKE_GIT_DESCRIBE_ERROR = _CmdResult("<>hg git failed", "", 1) +_FAKE_GIT_DESCRIBE_ERROR = CompletedProcess( + "fake git describe output for hg", + 1, + "<>hg git failed to describe", +) class GitWorkdirHgClient(GitWorkdir, HgWorkdir): @@ -24,28 +28,31 @@ class GitWorkdirHgClient(GitWorkdir, HgWorkdir): @classmethod def from_potential_worktree(cls, wd: _t.PathT) -> GitWorkdirHgClient | None: require_command(cls.COMMAND) - root, _, ret = do_ex(["hg", "root"], wd) - if ret: + res = _run(["hg", "root"], cwd=wd) + if res.returncode: return None - return cls(root) + return cls(res.stdout) def is_dirty(self) -> bool: - out, _, _ = self.do_ex('hg id -T "{dirty}"') - return bool(out) + res = _run(["hg", "id", "-T", "{dirty}"], cwd=self.path, check=True) + return bool(res.stdout) def get_branch(self) -> str | None: - res = self.do_ex('hg id -T "{bookmarks}"') + res = _run(["hg", "id", "-T", "{bookmarks}"], cwd=self.path) if res.returncode: log.info("branch err %s", res) return None - return res.out + return res.stdout def get_head_date(self) -> date | None: - date_part, err, ret = self.do_ex('hg log -r . -T "{shortdate(date)}"') - if ret: - log.info("head date err %s %s %s", date_part, err, ret) + res = _run('hg log -r . -T "{shortdate(date)}"', cwd=self.path) + if res.returncode: + log.info( + "head date err %s", + res, + ) return None - return datetime.strptime(date_part, r"%Y-%m-%d").date() + return datetime.strptime(res.stdout, r"%Y-%m-%d").date() def is_shallow(self) -> bool: return False @@ -54,11 +61,11 @@ class GitWorkdirHgClient(GitWorkdir, HgWorkdir): pass def get_hg_node(self) -> str | None: - node, _, ret = self.do_ex('hg log -r . -T "{node}"') - if not ret: - return node - else: + res = _run('hg log -r . -T "{node}"', cwd=self.path) + if res.returncode: return None + else: + return res.stdout def _hg2git(self, hg_node: str) -> str | None: with suppress(FileNotFoundError): @@ -78,7 +85,7 @@ class GitWorkdirHgClient(GitWorkdir, HgWorkdir): if git_node is None: # trying again after hg -> git - self.do_ex("hg gexport") + _run(["hg", "gexport"], cwd=self.path) git_node = self._hg2git(hg_node) if git_node is None: @@ -93,17 +100,17 @@ class GitWorkdirHgClient(GitWorkdir, HgWorkdir): return git_node[:7] def count_all_nodes(self) -> int: - revs, _, _ = self.do_ex(["hg", "log", "-r", "ancestors(.)", "-T", "."]) - return len(revs) + res = _run(["hg", "log", "-r", "ancestors(.)", "-T", "."], cwd=self.path) + return len(res.stdout) - def default_describe(self) -> _CmdResult: + def default_describe(self) -> CompletedProcess[str]: """ Tentative to reproduce the output of `git describe --dirty --tags --long --match *[0-9]*` """ - hg_tags_str, _, ret = self.do_ex( + res = _run( [ "hg", "log", @@ -111,11 +118,12 @@ class GitWorkdirHgClient(GitWorkdir, HgWorkdir): "(reverse(ancestors(.)) and tag(r're:v?[0-9].*'))", "-T", "{tags}{if(tags, ' ', '')}", - ] + ], + cwd=self.path, ) - if ret: + if res.returncode: return _FAKE_GIT_DESCRIBE_ERROR - hg_tags: list[str] = hg_tags_str.split() + hg_tags: list[str] = res.stdout.split() if not hg_tags: return _FAKE_GIT_DESCRIBE_ERROR @@ -132,10 +140,10 @@ class GitWorkdirHgClient(GitWorkdir, HgWorkdir): logging.warning("tag not found hg=%s git=%s", hg_tags, git_tags) return _FAKE_GIT_DESCRIBE_ERROR - out, _, ret = self.do_ex(["hg", "log", "-r", f"'{tag}'::.", "-T", "."]) - if ret: + res = _run(["hg", "log", "-r", f"'{tag}'::.", "-T", "."], cwd=self.path) + if res.returncode: return _FAKE_GIT_DESCRIBE_ERROR - distance = len(out) - 1 + distance = len(res.stdout) - 1 node = self.node() assert node is not None @@ -144,4 +152,9 @@ class GitWorkdirHgClient(GitWorkdir, HgWorkdir): if self.is_dirty(): desc += "-dirty" log.debug("faked describe %r", desc) - return _CmdResult(desc, "", 0) + return CompletedProcess( + ["setuptools-scm", "faked", "describe"], + returncode=0, + stdout=desc, + stderr="", + ) diff --git a/src/setuptools_scm/scm_workdir.py b/src/setuptools_scm/scm_workdir.py index 113f68a..12935f1 100644 --- a/src/setuptools_scm/scm_workdir.py +++ b/src/setuptools_scm/scm_workdir.py @@ -1,11 +1,9 @@ from __future__ import annotations +from pathlib import Path from typing import ClassVar from typing import TYPE_CHECKING -from .utils import _CmdResult -from .utils import do -from .utils import do_ex from .utils import require_command if TYPE_CHECKING: @@ -14,13 +12,8 @@ if TYPE_CHECKING: class Workdir: COMMAND: ClassVar[str] + path: Path def __init__(self, path: _t.PathT): require_command(self.COMMAND) - self.path = path - - def do_ex(self, cmd: _t.CMD_TYPE) -> _CmdResult: - return do_ex(cmd, cwd=self.path) - - def do(self, cmd: _t.CMD_TYPE) -> str: - return do(cmd, cwd=self.path) + self.path = Path(path) diff --git a/src/setuptools_scm/utils.py b/src/setuptools_scm/utils.py index 6233a3a..9f800e4 100644 --- a/src/setuptools_scm/utils.py +++ b/src/setuptools_scm/utils.py @@ -10,7 +10,6 @@ import warnings from pathlib import Path from types import CodeType from types import FunctionType -from typing import NamedTuple from typing import Sequence from typing import TYPE_CHECKING @@ -22,24 +21,6 @@ if TYPE_CHECKING: log = logging.getLogger(__name__) -class _CmdResult(NamedTuple): - out: str - err: str - returncode: int - - -def do_ex(cmd: _t.CMD_TYPE, cwd: _t.PathT = ".") -> _CmdResult: - res = _run_cmd.run(cmd, cwd) - return _CmdResult(res.stdout, res.stderr, res.returncode) - - -def do(cmd: _t.CMD_TYPE, cwd: _t.PathT = ".") -> str: - out, err, ret = do_ex(cmd, cwd) - if ret and log.getEffectiveLevel() > logging.DEBUG: - print(err) - return out - - def data_from_mime(path: _t.PathT) -> dict[str, str]: content = Path(path).read_text(encoding="utf-8") log.debug("mime %s content:\n%s", path, textwrap.indent(content, " ")) @@ -60,7 +41,7 @@ def has_command(name: str, args: Sequence[str] = ["help"], warn: bool = True) -> try: p = _run_cmd.run([name, *args], cwd=".", timeout=5) except OSError as e: - log.exception("command %s missing: %s", name, e) + log.warning("command %s missing: %s", name, e) res = False except subprocess.TimeoutExpired as e: log.warning("command %s timed out %s", name, e) -- cgit v1.2.1