1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
from __future__ import annotations
import logging
import os
import subprocess
import tarfile
from typing import IO
from . import _types as _t
from ._run_cmd import run as _run
from ._trace import trace
from .file_finder import is_toplevel_acceptable
from .file_finder import scm_find_files
from .utils import data_from_mime
log = logging.getLogger(__name__)
def _git_toplevel(path: str) -> str | None:
try:
cwd = os.path.abspath(path or ".")
res = _run(["git", "rev-parse", "HEAD"], cwd=cwd)
if res.returncode:
# BAIL if there is no commit
log.error("listing git files failed - pretending there aren't any")
return None
res = _run(
["git", "rev-parse", "--show-prefix"],
cwd=cwd,
)
if res.returncode:
return None
out = res.stdout[:-1] # remove the trailing pathsep
if not out:
out = cwd
else:
# Here, ``out`` is a relative path to root of git.
# ``cwd`` is absolute path to current working directory.
# the below method removes the length of ``out`` from
# ``cwd``, which gives the git toplevel
assert cwd.replace("\\", "/").endswith(out), f"cwd={cwd!r}\nout={out!r}"
# In windows cwd contains ``\`` which should be replaced by ``/``
# for this assertion to work. Length of string isn't changed by replace
# ``\\`` is just and escape for `\`
out = cwd[: -len(out)]
trace("find files toplevel", out)
return os.path.normcase(os.path.realpath(out.strip()))
except subprocess.CalledProcessError:
# git returned error, we are not in a git repo
return None
except OSError:
# git command not found, probably
return None
def _git_interpret_archive(fd: IO[bytes], toplevel: str) -> tuple[set[str], set[str]]:
with tarfile.open(fileobj=fd, mode="r|*") as tf:
git_files = set()
git_dirs = {toplevel}
for member in tf.getmembers():
name = os.path.normcase(member.name).replace("/", os.path.sep)
if member.type == tarfile.DIRTYPE:
git_dirs.add(name)
else:
git_files.add(name)
return git_files, git_dirs
def _git_ls_files_and_dirs(toplevel: str) -> tuple[set[str], set[str]]:
# use git archive instead of git ls-file to honor
# export-ignore git attribute
cmd = ["git", "archive", "--prefix", toplevel + os.path.sep, "HEAD"]
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, cwd=toplevel, stderr=subprocess.DEVNULL
)
assert proc.stdout is not None
try:
try:
return _git_interpret_archive(proc.stdout, toplevel)
finally:
# ensure we avoid resource warnings by cleaning up the process
proc.stdout.close()
proc.terminate()
except Exception:
if proc.wait() != 0:
log.error("listing git files failed - pretending there aren't any")
return set(), set()
def git_find_files(path: _t.PathT = "") -> list[str]:
toplevel = _git_toplevel(os.fspath(path))
if not is_toplevel_acceptable(toplevel):
return []
fullpath = os.path.abspath(os.path.normpath(path))
if not fullpath.startswith(toplevel):
trace("toplevel mismatch", toplevel, fullpath)
git_files, git_dirs = _git_ls_files_and_dirs(toplevel)
return scm_find_files(path, git_files, git_dirs)
def git_archive_find_files(path: _t.PathT = "") -> list[str]:
# This function assumes that ``path`` is obtained from a git archive
# and therefore all the files that should be ignored were already removed.
archival = os.path.join(path, ".git_archival.txt")
if not os.path.exists(archival):
return []
data = data_from_mime(archival)
if "$Format" in data.get("node", ""):
# Substitutions have not been performed, so not a reliable archive
return []
trace("git archive detected - fallback to listing all files")
return scm_find_files(path, set(), set(), force_all_files=True)
|