summaryrefslogtreecommitdiff
path: root/Tools/c-analyzer/c_common/scriptutil.py
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2020-10-22 18:42:51 -0600
committerGitHub <noreply@github.com>2020-10-22 18:42:51 -0600
commit345cd37abe324ad4f60f80e2c3133b8849e54e9b (patch)
tree5d965e662dca9dcac19e7eddd63a3d9d0b816fed /Tools/c-analyzer/c_common/scriptutil.py
parentec388cfb4ede56dace2bb78851ff6f38fa2a6abe (diff)
downloadcpython-git-345cd37abe324ad4f60f80e2c3133b8849e54e9b.tar.gz
bpo-36876: Fix the C analyzer tool. (GH-22841)
The original tool wasn't working right and it was simpler to create a new one, partially re-using some of the old code. At this point the tool runs properly on the master. (Try: ./python Tools/c-analyzer/c-analyzer.py analyze.) It take ~40 seconds on my machine to analyze the full CPython code base. Note that we'll need to iron out some OS-specific stuff (e.g. preprocessor). We're okay though since this tool isn't used yet in our workflow. We will also need to verify the analysis results in detail before activating the check in CI, though I'm pretty sure it's close. https://bugs.python.org/issue36876
Diffstat (limited to 'Tools/c-analyzer/c_common/scriptutil.py')
-rw-r--r--Tools/c-analyzer/c_common/scriptutil.py577
1 files changed, 577 insertions, 0 deletions
diff --git a/Tools/c-analyzer/c_common/scriptutil.py b/Tools/c-analyzer/c_common/scriptutil.py
new file mode 100644
index 0000000000..939a85003b
--- /dev/null
+++ b/Tools/c-analyzer/c_common/scriptutil.py
@@ -0,0 +1,577 @@
+import argparse
+import contextlib
+import fnmatch
+import logging
+import os
+import os.path
+import shutil
+import sys
+
+from . import fsutil, strutil, iterutil, logging as loggingutil
+
+
+def get_prog(spec=None, *, absolute=False, allowsuffix=True):
+ if spec is None:
+ _, spec = _find_script()
+ # This is more natural for prog than __file__ would be.
+ filename = sys.argv[0]
+ elif isinstance(spec, str):
+ filename = os.path.normpath(spec)
+ spec = None
+ else:
+ filename = spec.origin
+ if _is_standalone(filename):
+ # Check if "installed".
+ if allowsuffix or not filename.endswith('.py'):
+ basename = os.path.basename(filename)
+ found = shutil.which(basename)
+ if found:
+ script = os.path.abspath(filename)
+ found = os.path.abspath(found)
+ if os.path.normcase(script) == os.path.normcase(found):
+ return basename
+ # It is only "standalone".
+ if absolute:
+ filename = os.path.abspath(filename)
+ return filename
+ elif spec is not None:
+ module = spec.name
+ if module.endswith('.__main__'):
+ module = module[:-9]
+ return f'{sys.executable} -m {module}'
+ else:
+ if absolute:
+ filename = os.path.abspath(filename)
+ return f'{sys.executable} {filename}'
+
+
+def _find_script():
+ frame = sys._getframe(2)
+ while frame.f_globals['__name__'] != '__main__':
+ frame = frame.f_back
+
+ # This should match sys.argv[0].
+ filename = frame.f_globals['__file__']
+ # This will be None if -m wasn't used..
+ spec = frame.f_globals['__spec__']
+ return filename, spec
+
+
+def is_installed(filename, *, allowsuffix=True):
+ if not allowsuffix and filename.endswith('.py'):
+ return False
+ filename = os.path.abspath(os.path.normalize(filename))
+ found = shutil.which(os.path.basename(filename))
+ if not found:
+ return False
+ if found != filename:
+ return False
+ return _is_standalone(filename)
+
+
+def is_standalone(filename):
+ filename = os.path.abspath(os.path.normalize(filename))
+ return _is_standalone(filename)
+
+
+def _is_standalone(filename):
+ return fsutil.is_executable(filename)
+
+
+##################################
+# logging
+
+VERBOSITY = 3
+
+TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
+TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
+
+
+logger = logging.getLogger(__name__)
+
+
+def configure_logger(verbosity, logger=None, **kwargs):
+ if logger is None:
+ # Configure the root logger.
+ logger = logging.getLogger()
+ loggingutil.configure_logger(logger, verbosity, **kwargs)
+
+
+##################################
+# selections
+
+class UnsupportedSelectionError(Exception):
+ def __init__(self, values, possible):
+ self.values = tuple(values)
+ self.possible = tuple(possible)
+ super().__init__(f'unsupported selections {self.unique}')
+
+ @property
+ def unique(self):
+ return tuple(sorted(set(self.values)))
+
+
+def normalize_selection(selected: str, *, possible=None):
+ if selected in (None, True, False):
+ return selected
+ elif isinstance(selected, str):
+ selected = [selected]
+ elif not selected:
+ return ()
+
+ unsupported = []
+ _selected = set()
+ for item in selected:
+ if not item:
+ continue
+ for value in item.strip().replace(',', ' ').split():
+ if not value:
+ continue
+ # XXX Handle subtraction (leading "-").
+ if possible and value not in possible and value != 'all':
+ unsupported.append(value)
+ _selected.add(value)
+ if unsupported:
+ raise UnsupportedSelectionError(unsupported, tuple(possible))
+ if 'all' in _selected:
+ return True
+ return frozenset(selected)
+
+
+##################################
+# CLI parsing helpers
+
+class CLIArgSpec(tuple):
+ def __new__(cls, *args, **kwargs):
+ return super().__new__(cls, (args, kwargs))
+
+ def __repr__(self):
+ args, kwargs = self
+ args = [repr(arg) for arg in args]
+ for name, value in kwargs.items():
+ args.append(f'{name}={value!r}')
+ return f'{type(self).__name__}({", ".join(args)})'
+
+ def __call__(self, parser, *, _noop=(lambda a: None)):
+ self.apply(parser)
+ return _noop
+
+ def apply(self, parser):
+ args, kwargs = self
+ parser.add_argument(*args, **kwargs)
+
+
+def apply_cli_argspecs(parser, specs):
+ processors = []
+ for spec in specs:
+ if callable(spec):
+ procs = spec(parser)
+ _add_procs(processors, procs)
+ else:
+ args, kwargs = spec
+ parser.add_argument(args, kwargs)
+ return processors
+
+
+def _add_procs(flattened, procs):
+ # XXX Fail on non-empty, non-callable procs?
+ if not procs:
+ return
+ if callable(procs):
+ flattened.append(procs)
+ else:
+ #processors.extend(p for p in procs if callable(p))
+ for proc in procs:
+ _add_procs(flattened, proc)
+
+
+def add_verbosity_cli(parser):
+ parser.add_argument('-q', '--quiet', action='count', default=0)
+ parser.add_argument('-v', '--verbose', action='count', default=0)
+
+ def process_args(args):
+ ns = vars(args)
+ key = 'verbosity'
+ if key in ns:
+ parser.error(f'duplicate arg {key!r}')
+ ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
+ return key
+ return process_args
+
+
+def add_traceback_cli(parser):
+ parser.add_argument('--traceback', '--tb', action='store_true',
+ default=TRACEBACK)
+ parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
+ action='store_const', const=False)
+
+ def process_args(args):
+ ns = vars(args)
+ key = 'traceback_cm'
+ if key in ns:
+ parser.error(f'duplicate arg {key!r}')
+ showtb = ns.pop('traceback')
+
+ @contextlib.contextmanager
+ def traceback_cm():
+ restore = loggingutil.hide_emit_errors()
+ try:
+ yield
+ except BrokenPipeError:
+ # It was piped to "head" or something similar.
+ pass
+ except NotImplementedError:
+ raise # re-raise
+ except Exception as exc:
+ if not showtb:
+ sys.exit(f'ERROR: {exc}')
+ raise # re-raise
+ except KeyboardInterrupt:
+ if not showtb:
+ sys.exit('\nINTERRUPTED')
+ raise # re-raise
+ except BaseException as exc:
+ if not showtb:
+ sys.exit(f'{type(exc).__name__}: {exc}')
+ raise # re-raise
+ finally:
+ restore()
+ ns[key] = traceback_cm()
+ return key
+ return process_args
+
+
+def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
+# if opt is True:
+# parser.add_argument(f'--{dest}', action='append', **kwargs)
+# elif isinstance(opt, str) and opt.startswith('-'):
+# parser.add_argument(opt, dest=dest, action='append', **kwargs)
+# else:
+# arg = dest if not opt else opt
+# kwargs.setdefault('nargs', '+')
+# parser.add_argument(arg, dest=dest, action='append', **kwargs)
+ if not isinstance(opt, str):
+ parser.error(f'opt must be a string, got {opt!r}')
+ elif opt.startswith('-'):
+ parser.add_argument(opt, dest=dest, action='append', **kwargs)
+ else:
+ kwargs.setdefault('nargs', '+')
+ #kwargs.setdefault('metavar', opt.upper())
+ parser.add_argument(opt, dest=dest, action='append', **kwargs)
+
+ def process_args(args):
+ ns = vars(args)
+
+ # XXX Use normalize_selection()?
+ if isinstance(ns[dest], str):
+ ns[dest] = [ns[dest]]
+ selections = []
+ for many in ns[dest] or ():
+ for value in many.split(sep):
+ if value not in choices:
+ parser.error(f'unknown {dest} {value!r}')
+ selections.append(value)
+ ns[dest] = selections
+ return process_args
+
+
+def add_files_cli(parser, *, excluded=None, nargs=None):
+ process_files = add_file_filtering_cli(parser, excluded=excluded)
+ parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
+ return [
+ process_files,
+ ]
+
+
+def add_file_filtering_cli(parser, *, excluded=None):
+ parser.add_argument('--start')
+ parser.add_argument('--include', action='append')
+ parser.add_argument('--exclude', action='append')
+
+ excluded = tuple(excluded or ())
+
+ def process_args(args):
+ ns = vars(args)
+ key = 'iter_filenames'
+ if key in ns:
+ parser.error(f'duplicate arg {key!r}')
+
+ _include = tuple(ns.pop('include') or ())
+ _exclude = excluded + tuple(ns.pop('exclude') or ())
+ kwargs = dict(
+ start=ns.pop('start'),
+ include=tuple(_parse_files(_include)),
+ exclude=tuple(_parse_files(_exclude)),
+ # We use the default for "show_header"
+ )
+ ns[key] = (lambda files: fsutil.iter_filenames(files, **kwargs))
+ return process_args
+
+
+def _parse_files(filenames):
+ for filename, _ in strutil.parse_entries(filenames):
+ yield filename.strip()
+
+
+def add_failure_filtering_cli(parser, pool, *, default=False):
+ parser.add_argument('--fail', action='append',
+ metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
+ parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
+
+ def process_args(args):
+ ns = vars(args)
+
+ fail = ns.pop('fail')
+ try:
+ fail = normalize_selection(fail, possible=pool)
+ except UnsupportedSelectionError as exc:
+ parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
+ else:
+ if fail is None:
+ fail = default
+
+ if fail is True:
+ def ignore_exc(_exc):
+ return False
+ elif fail is False:
+ def ignore_exc(_exc):
+ return True
+ else:
+ def ignore_exc(exc):
+ for err in fail:
+ if type(exc) == pool[err]:
+ return False
+ else:
+ return True
+ args.ignore_exc = ignore_exc
+ return process_args
+
+
+def add_kind_filtering_cli(parser, *, default=None):
+ parser.add_argument('--kinds', action='append')
+
+ def process_args(args):
+ ns = vars(args)
+
+ kinds = []
+ for kind in ns.pop('kinds') or default or ():
+ kinds.extend(kind.strip().replace(',', ' ').split())
+
+ if not kinds:
+ match_kind = (lambda k: True)
+ else:
+ included = set()
+ excluded = set()
+ for kind in kinds:
+ if kind.startswith('-'):
+ kind = kind[1:]
+ excluded.add(kind)
+ if kind in included:
+ included.remove(kind)
+ else:
+ included.add(kind)
+ if kind in excluded:
+ excluded.remove(kind)
+ if excluded:
+ if included:
+ ... # XXX fail?
+ def match_kind(kind, *, _excluded=excluded):
+ return kind not in _excluded
+ else:
+ def match_kind(kind, *, _included=included):
+ return kind in _included
+ args.match_kind = match_kind
+ return process_args
+
+
+COMMON_CLI = [
+ add_verbosity_cli,
+ add_traceback_cli,
+ #add_dryrun_cli,
+]
+
+
+def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
+ arg_processors = {}
+ if isinstance(subset, str):
+ cmdname = subset
+ try:
+ _, argspecs, _ = commands[cmdname]
+ except KeyError:
+ raise ValueError(f'unsupported subset {subset!r}')
+ parser.set_defaults(cmd=cmdname)
+ arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
+ else:
+ if subset is None:
+ cmdnames = subset = list(commands)
+ elif not subset:
+ raise NotImplementedError
+ elif isinstance(subset, set):
+ cmdnames = [k for k in commands if k in subset]
+ subset = sorted(subset)
+ else:
+ cmdnames = [n for n in subset if n in commands]
+ if len(cmdnames) < len(subset):
+ bad = tuple(n for n in subset if n not in commands)
+ raise ValueError(f'unsupported subset {bad}')
+
+ common = argparse.ArgumentParser(add_help=False)
+ common_processors = apply_cli_argspecs(common, commonspecs)
+ subs = parser.add_subparsers(dest='cmd')
+ for cmdname in cmdnames:
+ description, argspecs, _ = commands[cmdname]
+ sub = subs.add_parser(
+ cmdname,
+ description=description,
+ parents=[common],
+ )
+ cmd_processors = _add_cmd_cli(sub, (), argspecs)
+ arg_processors[cmdname] = common_processors + cmd_processors
+ return arg_processors
+
+
+def _add_cmd_cli(parser, commonspecs, argspecs):
+ processors = []
+ argspecs = list(commonspecs or ()) + list(argspecs or ())
+ for argspec in argspecs:
+ if callable(argspec):
+ procs = argspec(parser)
+ _add_procs(processors, procs)
+ else:
+ if not argspec:
+ raise NotImplementedError
+ args = list(argspec)
+ if not isinstance(args[-1], str):
+ kwargs = args.pop()
+ if not isinstance(args[0], str):
+ try:
+ args, = args
+ except (TypeError, ValueError):
+ parser.error(f'invalid cmd args {argspec!r}')
+ else:
+ kwargs = {}
+ parser.add_argument(*args, **kwargs)
+ # There will be nothing to process.
+ return processors
+
+
+def _flatten_processors(processors):
+ for proc in processors:
+ if proc is None:
+ continue
+ if callable(proc):
+ yield proc
+ else:
+ yield from _flatten_processors(proc)
+
+
+def process_args(args, processors, *, keys=None):
+ processors = _flatten_processors(processors)
+ ns = vars(args)
+ extracted = {}
+ if keys is None:
+ for process_args in processors:
+ for key in process_args(args):
+ extracted[key] = ns.pop(key)
+ else:
+ remainder = set(keys)
+ for process_args in processors:
+ hanging = process_args(args)
+ if isinstance(hanging, str):
+ hanging = [hanging]
+ for key in hanging or ():
+ if key not in remainder:
+ raise NotImplementedError(key)
+ extracted[key] = ns.pop(key)
+ remainder.remove(key)
+ if remainder:
+ raise NotImplementedError(sorted(remainder))
+ return extracted
+
+
+def process_args_by_key(args, processors, keys):
+ extracted = process_args(args, processors, keys=keys)
+ return [extracted[key] for key in keys]
+
+
+##################################
+# commands
+
+def set_command(name, add_cli):
+ """A decorator factory to set CLI info."""
+ def decorator(func):
+ if hasattr(func, '__cli__'):
+ raise Exception(f'already set')
+ func.__cli__ = (name, add_cli)
+ return func
+ return decorator
+
+
+##################################
+# main() helpers
+
+def filter_filenames(filenames, iter_filenames=None):
+ for filename, check, _ in _iter_filenames(filenames, iter_filenames):
+ if (reason := check()):
+ logger.debug(f'{filename}: {reason}')
+ continue
+ yield filename
+
+
+def main_for_filenames(filenames, iter_filenames=None):
+ for filename, check, show in _iter_filenames(filenames, iter_filenames):
+ if show:
+ print()
+ print('-------------------------------------------')
+ print(filename)
+ if (reason := check()):
+ print(reason)
+ continue
+ yield filename
+
+
+def _iter_filenames(filenames, iter_files):
+ if iter_files is None:
+ iter_files = fsutil.iter_filenames
+ yield from iter_files(filenames)
+ return
+
+ onempty = Exception('no filenames provided')
+ items = iter_files(filenames)
+ items, peeked = iterutil.peek_and_iter(items)
+ if not items:
+ raise onempty
+ if isinstance(peeked, str):
+ check = (lambda: True)
+ for filename, ismany in iterutil.iter_many(items, onempty):
+ yield filename, check, ismany
+ elif len(peeked) == 3:
+ yield from items
+ else:
+ raise NotImplementedError
+
+
+def iter_marks(mark='.', *, group=5, groups=2, lines=10, sep=' '):
+ mark = mark or ''
+ sep = f'{mark}{sep}' if sep else mark
+ end = f'{mark}{os.linesep}'
+ div = os.linesep
+ perline = group * groups
+ perlines = perline * lines
+
+ if perline == 1:
+ yield end
+ elif group == 1:
+ yield sep
+
+ count = 1
+ while True:
+ if count % perline == 0:
+ yield end
+ if count % perlines == 0:
+ yield div
+ elif count % group == 0:
+ yield sep
+ else:
+ yield mark
+ count += 1