diff options
Diffstat (limited to 'src/buildstream/_frontend/complete.py')
-rw-r--r-- | src/buildstream/_frontend/complete.py | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/src/buildstream/_frontend/complete.py b/src/buildstream/_frontend/complete.py new file mode 100644 index 000000000..bf9324812 --- /dev/null +++ b/src/buildstream/_frontend/complete.py @@ -0,0 +1,338 @@ +# +# Copyright (c) 2014 by Armin Ronacher. +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# This module was forked from the python click library, Included +# original copyright notice from the Click library and following disclaimer +# as per their LICENSE requirements. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +import collections.abc +import copy +import os + +import click +from click.core import MultiCommand, Option, Argument +from click.parser import split_arg_string + +WORDBREAK = '=' + +COMPLETION_SCRIPT = ''' +%(complete_func)s() { + local IFS=$'\n' + COMPREPLY=( $( env COMP_WORDS="${COMP_WORDS[*]}" \\ + COMP_CWORD=$COMP_CWORD \\ + %(autocomplete_var)s=complete $1 ) ) + return 0 +} + +complete -F %(complete_func)s -o nospace %(script_names)s +''' + + +# An exception for our custom completion handler to +# indicate that it does not want to handle completion +# for this parameter +# +class CompleteUnhandled(Exception): + pass + + +def complete_path(path_type, incomplete, base_directory='.'): + """Helper method for implementing the completions() method + for File and Path parameter types. + """ + + # Try listing the files in the relative or absolute path + # specified in `incomplete` minus the last path component, + # otherwise list files starting from the current working directory. + entries = [] + base_path = '' + + # This is getting a bit messy + listed_base_directory = False + + if os.path.sep in incomplete: + split = incomplete.rsplit(os.path.sep, 1) + base_path = split[0] + + # If there was nothing on the left of the last separator, + # we are completing files in the filesystem root + base_path = os.path.join(base_directory, base_path) + else: + incomplete_base_path = os.path.join(base_directory, incomplete) + if os.path.isdir(incomplete_base_path): + base_path = incomplete_base_path + + try: + if base_path: + if os.path.isdir(base_path): + entries = [os.path.join(base_path, e) for e in os.listdir(base_path)] + else: + entries = os.listdir(base_directory) + listed_base_directory = True + except OSError: + # If for any reason the os reports an error from os.listdir(), just + # ignore this and avoid a stack trace + pass + + base_directory_slash = base_directory + if not base_directory_slash.endswith(os.sep): + base_directory_slash += os.sep + base_directory_len = len(base_directory_slash) + + def entry_is_dir(entry): + if listed_base_directory: + entry = os.path.join(base_directory, entry) + return os.path.isdir(entry) + + def fix_path(path): + + # Append slashes to any entries which are directories, or + # spaces for other files since they cannot be further completed + if entry_is_dir(path) and not path.endswith(os.sep): + path = path + os.sep + else: + path = path + " " + + # Remove the artificial leading path portion which + # may have been prepended for search purposes. + if path.startswith(base_directory_slash): + path = path[base_directory_len:] + + return path + + return [ + # Return an appropriate path for each entry + fix_path(e) for e in sorted(entries) + + # Filter out non directory elements when searching for a directory, + # the opposite is fine, however. + if not (path_type == 'Directory' and not entry_is_dir(e)) + ] + + +# Instead of delegating completions to the param type, +# hard code all of buildstream's completions here. +# +# This whole module should be removed in favor of more +# generic code in click once this issue is resolved: +# https://github.com/pallets/click/issues/780 +# +def get_param_type_completion(param_type, incomplete): + + if isinstance(param_type, click.Choice): + return [c + " " for c in param_type.choices] + elif isinstance(param_type, click.File): + return complete_path("File", incomplete) + elif isinstance(param_type, click.Path): + return complete_path(param_type.path_type, incomplete) + + return [] + + +def resolve_ctx(cli, prog_name, args): + """ + Parse into a hierarchy of contexts. Contexts are connected through the parent variable. + :param cli: command definition + :param prog_name: the program that is running + :param args: full list of args typed before the incomplete arg + :return: the final context/command parsed + """ + ctx = cli.make_context(prog_name, args, resilient_parsing=True) + args_remaining = ctx.protected_args + ctx.args + while ctx is not None and args_remaining: + if isinstance(ctx.command, MultiCommand): + cmd = ctx.command.get_command(ctx, args_remaining[0]) + if cmd is None: + return None + ctx = cmd.make_context(args_remaining[0], args_remaining[1:], parent=ctx, resilient_parsing=True) + args_remaining = ctx.protected_args + ctx.args + else: + ctx = ctx.parent + + return ctx + + +def start_of_option(param_str): + """ + :param param_str: param_str to check + :return: whether or not this is the start of an option declaration (i.e. starts "-" or "--") + """ + return param_str and param_str[:1] == '-' + + +def is_incomplete_option(all_args, cmd_param): + """ + :param all_args: the full original list of args supplied + :param cmd_param: the current command paramter + :return: whether or not the last option declaration (i.e. starts "-" or "--") is incomplete and + corresponds to this cmd_param. In other words whether this cmd_param option can still accept + values + """ + if cmd_param.is_flag: + return False + last_option = None + for index, arg_str in enumerate(reversed([arg for arg in all_args if arg != WORDBREAK])): + if index + 1 > cmd_param.nargs: + break + if start_of_option(arg_str): + last_option = arg_str + + return bool(last_option and last_option in cmd_param.opts) + + +def is_incomplete_argument(current_params, cmd_param): + """ + :param current_params: the current params and values for this argument as already entered + :param cmd_param: the current command parameter + :return: whether or not the last argument is incomplete and corresponds to this cmd_param. In + other words whether or not the this cmd_param argument can still accept values + """ + current_param_values = current_params[cmd_param.name] + if current_param_values is None: + return True + if cmd_param.nargs == -1: + return True + if isinstance(current_param_values, collections.abc.Iterable) \ + and cmd_param.nargs > 1 and len(current_param_values) < cmd_param.nargs: + return True + return False + + +def get_user_autocompletions(args, incomplete, cmd, cmd_param, override): + """ + :param args: full list of args typed before the incomplete arg + :param incomplete: the incomplete text of the arg to autocomplete + :param cmd_param: command definition + :param override: a callable (cmd_param, args, incomplete) that will be + called to override default completion based on parameter type. Should raise + 'CompleteUnhandled' if it could not find a completion. + :return: all the possible user-specified completions for the param + """ + + # Use the type specific default completions unless it was overridden + try: + return override(cmd=cmd, + cmd_param=cmd_param, + args=args, + incomplete=incomplete) + except CompleteUnhandled: + return get_param_type_completion(cmd_param.type, incomplete) or [] + + +def get_choices(cli, prog_name, args, incomplete, override): + """ + :param cli: command definition + :param prog_name: the program that is running + :param args: full list of args typed before the incomplete arg + :param incomplete: the incomplete text of the arg to autocomplete + :param override: a callable (cmd_param, args, incomplete) that will be + called to override default completion based on parameter type. Should raise + 'CompleteUnhandled' if it could not find a completion. + :return: all the possible completions for the incomplete + """ + all_args = copy.deepcopy(args) + + ctx = resolve_ctx(cli, prog_name, args) + if ctx is None: + return + + # In newer versions of bash long opts with '='s are partitioned, but it's easier to parse + # without the '=' + if start_of_option(incomplete) and WORDBREAK in incomplete: + partition_incomplete = incomplete.partition(WORDBREAK) + all_args.append(partition_incomplete[0]) + incomplete = partition_incomplete[2] + elif incomplete == WORDBREAK: + incomplete = '' + + choices = [] + found_param = False + if start_of_option(incomplete): + # completions for options + for param in ctx.command.params: + if isinstance(param, Option): + choices.extend([param_opt + " " for param_opt in param.opts + param.secondary_opts + if param_opt not in all_args or param.multiple]) + found_param = True + if not found_param: + # completion for option values by choices + for cmd_param in ctx.command.params: + if isinstance(cmd_param, Option) and is_incomplete_option(all_args, cmd_param): + choices.extend(get_user_autocompletions(all_args, incomplete, ctx.command, cmd_param, override)) + found_param = True + break + if not found_param: + # completion for argument values by choices + for cmd_param in ctx.command.params: + if isinstance(cmd_param, Argument) and is_incomplete_argument(ctx.params, cmd_param): + choices.extend(get_user_autocompletions(all_args, incomplete, ctx.command, cmd_param, override)) + found_param = True + break + + if not found_param and isinstance(ctx.command, MultiCommand): + # completion for any subcommands + choices.extend([cmd + " " for cmd in ctx.command.list_commands(ctx) + if not ctx.command.get_command(ctx, cmd).hidden]) + + if not start_of_option(incomplete) and ctx.parent is not None \ + and isinstance(ctx.parent.command, MultiCommand) and ctx.parent.command.chain: + # completion for chained commands + visible_commands = [cmd for cmd in ctx.parent.command.list_commands(ctx.parent) + if not ctx.parent.command.get_command(ctx.parent, cmd).hidden] + remaining_comands = set(visible_commands) - set(ctx.parent.protected_args) + choices.extend([cmd + " " for cmd in remaining_comands]) + + for item in choices: + if item.startswith(incomplete): + yield item + + +def do_complete(cli, prog_name, override): + cwords = split_arg_string(os.environ['COMP_WORDS']) + cword = int(os.environ['COMP_CWORD']) + args = cwords[1:cword] + try: + incomplete = cwords[cword] + except IndexError: + incomplete = '' + + for item in get_choices(cli, prog_name, args, incomplete, override): + click.echo(item) + + +# Main function called from main.py at startup here +# +def main_bashcomplete(cmd, prog_name, override): + """Internal handler for the bash completion support.""" + + if '_BST_COMPLETION' in os.environ: + do_complete(cmd, prog_name, override) + return True + + return False |