diff options
Diffstat (limited to 'Tools/c-analyzer/c_common/scriptutil.py')
-rw-r--r-- | Tools/c-analyzer/c_common/scriptutil.py | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/Tools/c-analyzer/c_common/scriptutil.py b/Tools/c-analyzer/c_common/scriptutil.py new file mode 100644 index 0000000000..939a85003b --- /dev/null +++ b/Tools/c-analyzer/c_common/scriptutil.py @@ -0,0 +1,577 @@ +import argparse +import contextlib +import fnmatch +import logging +import os +import os.path +import shutil +import sys + +from . import fsutil, strutil, iterutil, logging as loggingutil + + +def get_prog(spec=None, *, absolute=False, allowsuffix=True): + if spec is None: + _, spec = _find_script() + # This is more natural for prog than __file__ would be. + filename = sys.argv[0] + elif isinstance(spec, str): + filename = os.path.normpath(spec) + spec = None + else: + filename = spec.origin + if _is_standalone(filename): + # Check if "installed". + if allowsuffix or not filename.endswith('.py'): + basename = os.path.basename(filename) + found = shutil.which(basename) + if found: + script = os.path.abspath(filename) + found = os.path.abspath(found) + if os.path.normcase(script) == os.path.normcase(found): + return basename + # It is only "standalone". + if absolute: + filename = os.path.abspath(filename) + return filename + elif spec is not None: + module = spec.name + if module.endswith('.__main__'): + module = module[:-9] + return f'{sys.executable} -m {module}' + else: + if absolute: + filename = os.path.abspath(filename) + return f'{sys.executable} {filename}' + + +def _find_script(): + frame = sys._getframe(2) + while frame.f_globals['__name__'] != '__main__': + frame = frame.f_back + + # This should match sys.argv[0]. + filename = frame.f_globals['__file__'] + # This will be None if -m wasn't used.. + spec = frame.f_globals['__spec__'] + return filename, spec + + +def is_installed(filename, *, allowsuffix=True): + if not allowsuffix and filename.endswith('.py'): + return False + filename = os.path.abspath(os.path.normalize(filename)) + found = shutil.which(os.path.basename(filename)) + if not found: + return False + if found != filename: + return False + return _is_standalone(filename) + + +def is_standalone(filename): + filename = os.path.abspath(os.path.normalize(filename)) + return _is_standalone(filename) + + +def _is_standalone(filename): + return fsutil.is_executable(filename) + + +################################## +# logging + +VERBOSITY = 3 + +TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip() +TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO')) + + +logger = logging.getLogger(__name__) + + +def configure_logger(verbosity, logger=None, **kwargs): + if logger is None: + # Configure the root logger. + logger = logging.getLogger() + loggingutil.configure_logger(logger, verbosity, **kwargs) + + +################################## +# selections + +class UnsupportedSelectionError(Exception): + def __init__(self, values, possible): + self.values = tuple(values) + self.possible = tuple(possible) + super().__init__(f'unsupported selections {self.unique}') + + @property + def unique(self): + return tuple(sorted(set(self.values))) + + +def normalize_selection(selected: str, *, possible=None): + if selected in (None, True, False): + return selected + elif isinstance(selected, str): + selected = [selected] + elif not selected: + return () + + unsupported = [] + _selected = set() + for item in selected: + if not item: + continue + for value in item.strip().replace(',', ' ').split(): + if not value: + continue + # XXX Handle subtraction (leading "-"). + if possible and value not in possible and value != 'all': + unsupported.append(value) + _selected.add(value) + if unsupported: + raise UnsupportedSelectionError(unsupported, tuple(possible)) + if 'all' in _selected: + return True + return frozenset(selected) + + +################################## +# CLI parsing helpers + +class CLIArgSpec(tuple): + def __new__(cls, *args, **kwargs): + return super().__new__(cls, (args, kwargs)) + + def __repr__(self): + args, kwargs = self + args = [repr(arg) for arg in args] + for name, value in kwargs.items(): + args.append(f'{name}={value!r}') + return f'{type(self).__name__}({", ".join(args)})' + + def __call__(self, parser, *, _noop=(lambda a: None)): + self.apply(parser) + return _noop + + def apply(self, parser): + args, kwargs = self + parser.add_argument(*args, **kwargs) + + +def apply_cli_argspecs(parser, specs): + processors = [] + for spec in specs: + if callable(spec): + procs = spec(parser) + _add_procs(processors, procs) + else: + args, kwargs = spec + parser.add_argument(args, kwargs) + return processors + + +def _add_procs(flattened, procs): + # XXX Fail on non-empty, non-callable procs? + if not procs: + return + if callable(procs): + flattened.append(procs) + else: + #processors.extend(p for p in procs if callable(p)) + for proc in procs: + _add_procs(flattened, proc) + + +def add_verbosity_cli(parser): + parser.add_argument('-q', '--quiet', action='count', default=0) + parser.add_argument('-v', '--verbose', action='count', default=0) + + def process_args(args): + ns = vars(args) + key = 'verbosity' + if key in ns: + parser.error(f'duplicate arg {key!r}') + ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet')) + return key + return process_args + + +def add_traceback_cli(parser): + parser.add_argument('--traceback', '--tb', action='store_true', + default=TRACEBACK) + parser.add_argument('--no-traceback', '--no-tb', dest='traceback', + action='store_const', const=False) + + def process_args(args): + ns = vars(args) + key = 'traceback_cm' + if key in ns: + parser.error(f'duplicate arg {key!r}') + showtb = ns.pop('traceback') + + @contextlib.contextmanager + def traceback_cm(): + restore = loggingutil.hide_emit_errors() + try: + yield + except BrokenPipeError: + # It was piped to "head" or something similar. + pass + except NotImplementedError: + raise # re-raise + except Exception as exc: + if not showtb: + sys.exit(f'ERROR: {exc}') + raise # re-raise + except KeyboardInterrupt: + if not showtb: + sys.exit('\nINTERRUPTED') + raise # re-raise + except BaseException as exc: + if not showtb: + sys.exit(f'{type(exc).__name__}: {exc}') + raise # re-raise + finally: + restore() + ns[key] = traceback_cm() + return key + return process_args + + +def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs): +# if opt is True: +# parser.add_argument(f'--{dest}', action='append', **kwargs) +# elif isinstance(opt, str) and opt.startswith('-'): +# parser.add_argument(opt, dest=dest, action='append', **kwargs) +# else: +# arg = dest if not opt else opt +# kwargs.setdefault('nargs', '+') +# parser.add_argument(arg, dest=dest, action='append', **kwargs) + if not isinstance(opt, str): + parser.error(f'opt must be a string, got {opt!r}') + elif opt.startswith('-'): + parser.add_argument(opt, dest=dest, action='append', **kwargs) + else: + kwargs.setdefault('nargs', '+') + #kwargs.setdefault('metavar', opt.upper()) + parser.add_argument(opt, dest=dest, action='append', **kwargs) + + def process_args(args): + ns = vars(args) + + # XXX Use normalize_selection()? + if isinstance(ns[dest], str): + ns[dest] = [ns[dest]] + selections = [] + for many in ns[dest] or (): + for value in many.split(sep): + if value not in choices: + parser.error(f'unknown {dest} {value!r}') + selections.append(value) + ns[dest] = selections + return process_args + + +def add_files_cli(parser, *, excluded=None, nargs=None): + process_files = add_file_filtering_cli(parser, excluded=excluded) + parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME') + return [ + process_files, + ] + + +def add_file_filtering_cli(parser, *, excluded=None): + parser.add_argument('--start') + parser.add_argument('--include', action='append') + parser.add_argument('--exclude', action='append') + + excluded = tuple(excluded or ()) + + def process_args(args): + ns = vars(args) + key = 'iter_filenames' + if key in ns: + parser.error(f'duplicate arg {key!r}') + + _include = tuple(ns.pop('include') or ()) + _exclude = excluded + tuple(ns.pop('exclude') or ()) + kwargs = dict( + start=ns.pop('start'), + include=tuple(_parse_files(_include)), + exclude=tuple(_parse_files(_exclude)), + # We use the default for "show_header" + ) + ns[key] = (lambda files: fsutil.iter_filenames(files, **kwargs)) + return process_args + + +def _parse_files(filenames): + for filename, _ in strutil.parse_entries(filenames): + yield filename.strip() + + +def add_failure_filtering_cli(parser, pool, *, default=False): + parser.add_argument('--fail', action='append', + metavar=f'"{{all|{"|".join(sorted(pool))}}},..."') + parser.add_argument('--no-fail', dest='fail', action='store_const', const=()) + + def process_args(args): + ns = vars(args) + + fail = ns.pop('fail') + try: + fail = normalize_selection(fail, possible=pool) + except UnsupportedSelectionError as exc: + parser.error(f'invalid --fail values: {", ".join(exc.unique)}') + else: + if fail is None: + fail = default + + if fail is True: + def ignore_exc(_exc): + return False + elif fail is False: + def ignore_exc(_exc): + return True + else: + def ignore_exc(exc): + for err in fail: + if type(exc) == pool[err]: + return False + else: + return True + args.ignore_exc = ignore_exc + return process_args + + +def add_kind_filtering_cli(parser, *, default=None): + parser.add_argument('--kinds', action='append') + + def process_args(args): + ns = vars(args) + + kinds = [] + for kind in ns.pop('kinds') or default or (): + kinds.extend(kind.strip().replace(',', ' ').split()) + + if not kinds: + match_kind = (lambda k: True) + else: + included = set() + excluded = set() + for kind in kinds: + if kind.startswith('-'): + kind = kind[1:] + excluded.add(kind) + if kind in included: + included.remove(kind) + else: + included.add(kind) + if kind in excluded: + excluded.remove(kind) + if excluded: + if included: + ... # XXX fail? + def match_kind(kind, *, _excluded=excluded): + return kind not in _excluded + else: + def match_kind(kind, *, _included=included): + return kind in _included + args.match_kind = match_kind + return process_args + + +COMMON_CLI = [ + add_verbosity_cli, + add_traceback_cli, + #add_dryrun_cli, +] + + +def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None): + arg_processors = {} + if isinstance(subset, str): + cmdname = subset + try: + _, argspecs, _ = commands[cmdname] + except KeyError: + raise ValueError(f'unsupported subset {subset!r}') + parser.set_defaults(cmd=cmdname) + arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs) + else: + if subset is None: + cmdnames = subset = list(commands) + elif not subset: + raise NotImplementedError + elif isinstance(subset, set): + cmdnames = [k for k in commands if k in subset] + subset = sorted(subset) + else: + cmdnames = [n for n in subset if n in commands] + if len(cmdnames) < len(subset): + bad = tuple(n for n in subset if n not in commands) + raise ValueError(f'unsupported subset {bad}') + + common = argparse.ArgumentParser(add_help=False) + common_processors = apply_cli_argspecs(common, commonspecs) + subs = parser.add_subparsers(dest='cmd') + for cmdname in cmdnames: + description, argspecs, _ = commands[cmdname] + sub = subs.add_parser( + cmdname, + description=description, + parents=[common], + ) + cmd_processors = _add_cmd_cli(sub, (), argspecs) + arg_processors[cmdname] = common_processors + cmd_processors + return arg_processors + + +def _add_cmd_cli(parser, commonspecs, argspecs): + processors = [] + argspecs = list(commonspecs or ()) + list(argspecs or ()) + for argspec in argspecs: + if callable(argspec): + procs = argspec(parser) + _add_procs(processors, procs) + else: + if not argspec: + raise NotImplementedError + args = list(argspec) + if not isinstance(args[-1], str): + kwargs = args.pop() + if not isinstance(args[0], str): + try: + args, = args + except (TypeError, ValueError): + parser.error(f'invalid cmd args {argspec!r}') + else: + kwargs = {} + parser.add_argument(*args, **kwargs) + # There will be nothing to process. + return processors + + +def _flatten_processors(processors): + for proc in processors: + if proc is None: + continue + if callable(proc): + yield proc + else: + yield from _flatten_processors(proc) + + +def process_args(args, processors, *, keys=None): + processors = _flatten_processors(processors) + ns = vars(args) + extracted = {} + if keys is None: + for process_args in processors: + for key in process_args(args): + extracted[key] = ns.pop(key) + else: + remainder = set(keys) + for process_args in processors: + hanging = process_args(args) + if isinstance(hanging, str): + hanging = [hanging] + for key in hanging or (): + if key not in remainder: + raise NotImplementedError(key) + extracted[key] = ns.pop(key) + remainder.remove(key) + if remainder: + raise NotImplementedError(sorted(remainder)) + return extracted + + +def process_args_by_key(args, processors, keys): + extracted = process_args(args, processors, keys=keys) + return [extracted[key] for key in keys] + + +################################## +# commands + +def set_command(name, add_cli): + """A decorator factory to set CLI info.""" + def decorator(func): + if hasattr(func, '__cli__'): + raise Exception(f'already set') + func.__cli__ = (name, add_cli) + return func + return decorator + + +################################## +# main() helpers + +def filter_filenames(filenames, iter_filenames=None): + for filename, check, _ in _iter_filenames(filenames, iter_filenames): + if (reason := check()): + logger.debug(f'{filename}: {reason}') + continue + yield filename + + +def main_for_filenames(filenames, iter_filenames=None): + for filename, check, show in _iter_filenames(filenames, iter_filenames): + if show: + print() + print('-------------------------------------------') + print(filename) + if (reason := check()): + print(reason) + continue + yield filename + + +def _iter_filenames(filenames, iter_files): + if iter_files is None: + iter_files = fsutil.iter_filenames + yield from iter_files(filenames) + return + + onempty = Exception('no filenames provided') + items = iter_files(filenames) + items, peeked = iterutil.peek_and_iter(items) + if not items: + raise onempty + if isinstance(peeked, str): + check = (lambda: True) + for filename, ismany in iterutil.iter_many(items, onempty): + yield filename, check, ismany + elif len(peeked) == 3: + yield from items + else: + raise NotImplementedError + + +def iter_marks(mark='.', *, group=5, groups=2, lines=10, sep=' '): + mark = mark or '' + sep = f'{mark}{sep}' if sep else mark + end = f'{mark}{os.linesep}' + div = os.linesep + perline = group * groups + perlines = perline * lines + + if perline == 1: + yield end + elif group == 1: + yield sep + + count = 1 + while True: + if count % perline == 0: + yield end + if count % perlines == 0: + yield div + elif count % group == 0: + yield sep + else: + yield mark + count += 1 |