diff options
Diffstat (limited to 'cliapp')
-rw-r--r-- | cliapp/__init__.py | 34 | ||||
-rw-r--r-- | cliapp/app.py | 671 | ||||
-rw-r--r-- | cliapp/fmt.py | 125 | ||||
-rw-r--r-- | cliapp/genman.py | 148 | ||||
-rw-r--r-- | cliapp/hook.py | 76 | ||||
-rw-r--r-- | cliapp/hookmgr.py | 46 | ||||
-rw-r--r-- | cliapp/plugin.py | 125 | ||||
-rw-r--r-- | cliapp/pluginmgr.py | 175 | ||||
-rw-r--r-- | cliapp/runcmd.py | 284 | ||||
-rw-r--r-- | cliapp/settings.py | 772 |
10 files changed, 2456 insertions, 0 deletions
diff --git a/cliapp/__init__.py b/cliapp/__init__.py new file mode 100644 index 00000000..6e4d4081 --- /dev/null +++ b/cliapp/__init__.py @@ -0,0 +1,34 @@ +# Copyright (C) 2011 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +__version__ = '1.20130808' + + +from fmt import TextFormat +from settings import (Settings, log_group_name, config_group_name, + perf_group_name) +from runcmd import runcmd, runcmd_unchecked, shell_quote, ssh_runcmd +from app import Application, AppException + +# The plugin system +from hook import Hook, FilterHook +from hookmgr import HookManager +from plugin import Plugin +from pluginmgr import PluginManager + + +__all__ = locals() diff --git a/cliapp/app.py b/cliapp/app.py new file mode 100644 index 00000000..8e280afa --- /dev/null +++ b/cliapp/app.py @@ -0,0 +1,671 @@ +# Copyright (C) 2011 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import errno +import gc +import inspect +import logging +import logging.handlers +import os +import StringIO +import sys +import traceback +import time +import platform +import textwrap + +import cliapp + + +class AppException(Exception): + + '''Base class for application specific exceptions. + + Any exceptions that are subclasses of this one get printed as + nice errors to the user. Any other exceptions cause a Python + stack trace to be written to stderr. + + ''' + + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return self.msg + + +class LogHandler(logging.handlers.RotatingFileHandler): # pragma: no cover + + '''Like RotatingFileHandler, but set permissions of new files.''' + + def __init__(self, filename, perms=0600, *args, **kwargs): + self._perms = perms + logging.handlers.RotatingFileHandler.__init__(self, filename, + *args, **kwargs) + + def _open(self): + if not os.path.exists(self.baseFilename): + flags = os.O_CREAT | os.O_WRONLY + fd = os.open(self.baseFilename, flags, self._perms) + os.close(fd) + return logging.handlers.RotatingFileHandler._open(self) + + +class Application(object): + + '''A framework for Unix-like command line programs. + + The user should subclass this base class for each application. + The subclass does not need code for the mundane, boilerplate + parts that are the same in every utility, and can concentrate on the + interesting part that is unique to it. + + To start the application, call the `run` method. + + The ``progname`` argument sets tne name of the program, which is + used for various purposes, such as determining the name of the + configuration file. + + Similarly, ``version`` sets the version number of the program. + + ``description`` and ``epilog`` are included in the output of + ``--help``. They are formatted to fit the screen. Unlike the + default behavior of ``optparse``, empty lines separate + paragraphs. + + ''' + + def __init__(self, progname=None, version='0.0.0', description=None, + epilog=None): + self.fileno = 0 + self.global_lineno = 0 + self.lineno = 0 + self._description = description + if not hasattr(self, 'arg_synopsis'): + self.arg_synopsis = '[FILE]...' + if not hasattr(self, 'cmd_synopsis'): + self.cmd_synopsis = {} + + self.subcommands = {} + self.subcommand_aliases = {} + self.hidden_subcommands = set() + for method_name in self._subcommand_methodnames(): + cmd = self._unnormalize_cmd(method_name) + self.subcommands[cmd] = getattr(self, method_name) + + self.settings = cliapp.Settings(progname, version, + usage=self._format_usage, + description=self._format_description, + epilog=epilog) + + self.plugin_subdir = 'plugins' + + # For meliae memory dumps. + self.memory_dump_counter = 0 + self.last_memory_dump = 0 + + # For process duration. + self._started = os.times()[-1] + + def add_settings(self): + '''Add application specific settings.''' + + def run(self, args=None, stderr=sys.stderr, sysargv=sys.argv, + log=logging.critical): + '''Run the application.''' + + def run_it(): + self._run(args=args, stderr=stderr, log=log) + + if self.settings.progname is None and sysargv: + self.settings.progname = os.path.basename(sysargv[0]) + envname = '%s_PROFILE' % self._envname(self.settings.progname) + profname = os.environ.get(envname, '') + if profname: # pragma: no cover + import cProfile + cProfile.runctx('run_it()', globals(), locals(), profname) + else: + run_it() + + def _envname(self, progname): + '''Create an environment variable name of the name of a program.''' + + basename = os.path.basename(progname) + if '.' in basename: + basename = basename.split('.')[0] + + ok = 'abcdefghijklmnopqrstuvwxyz0123456789' + ok += ok.upper() + + return ''.join(x.upper() if x in ok else '_' for x in basename) + + def _set_process_name(self): # pragma: no cover + comm = '/proc/self/comm' + if platform.system() == 'Linux' and os.path.exists(comm): + with open('/proc/self/comm', 'w', 0) as f: + f.write(self.settings.progname[:15]) + + def _run(self, args=None, stderr=sys.stderr, log=logging.critical): + try: + self._set_process_name() + self.add_settings() + self.setup_plugin_manager() + + # A little bit of trickery here to make --no-default-configs and + # --config=foo work right: we first parse the command line once, + # and pick up any config files. Then we read configs. Finally, + # we re-parse the command line to allow any options to override + # config file settings. + self.setup() + self.enable_plugins() + if self.subcommands: + self.add_default_subcommands() + args = sys.argv[1:] if args is None else args + self.parse_args(args, configs_only=True) + self.settings.load_configs() + args = self.parse_args(args) + + self.setup_logging() + self.log_config() + + if self.settings['output']: + self.output = open(self.settings['output'], 'w') + else: + self.output = sys.stdout + + self.process_args(args) + self.cleanup() + self.disable_plugins() + except AppException, e: + log(traceback.format_exc()) + stderr.write('ERROR: %s\n' % str(e)) + sys.exit(1) + except SystemExit, e: + if e.code is not None and type(e.code) != int: + log(str(e)) + stderr.write('ERROR: %s\n' % str(e)) + sys.exit(e.code if type(e.code) == int else 1) + except KeyboardInterrupt, e: + sys.exit(255) + except IOError, e: # pragma: no cover + if e.errno == errno.EPIPE and e.filename is None: + # We're writing to stdout, and it broke. This almost always + # happens when we're being piped to less, and the user quits + # less before we finish writing everything out. So we ignore + # the error in that case. + sys.exit(1) + log(traceback.format_exc()) + stderr.write('ERROR: %s\n' % str(e)) + sys.exit(1) + except OSError, e: # pragma: no cover + log(traceback.format_exc()) + if hasattr(e, 'filename') and e.filename: + stderr.write('ERROR: %s: %s\n' % (e.filename, e.strerror)) + else: + stderr.write('ERROR: %s\n' % e.strerror) + sys.exit(1) + except BaseException, e: # pragma: no cover + log(traceback.format_exc()) + stderr.write(traceback.format_exc()) + sys.exit(1) + + logging.info('%s version %s ends normally' % + (self.settings.progname, self.settings.version)) + + def compute_setting_values(self, settings): + '''Compute setting values after configs and options are parsed. + + You can override this method to implement a default value for + a setting that is dependent on another setting. For example, + you might have settings "url" and "protocol", where protocol + gets set based on the schema of the url, unless explicitly + set by the user. So if the user sets just the url, to + "http://www.example.com/", the protocol would be set to + "http". If the user sets both url and protocol, the protocol + does not get modified by compute_setting_values. + + ''' + + def add_subcommand( + self, name, func, arg_synopsis=None, aliases=None, hidden=False): + '''Add a subcommand. + + Normally, subcommands are defined by add ``cmd_foo`` methods + to the application class. However, sometimes it is more convenient + to have them elsewhere (e.g., in plugins). This method allows + doing that. + + The callback function must accept a list of command line + non-option arguments. + + ''' + + if name not in self.subcommands: + self.subcommands[name] = func + self.cmd_synopsis[name] = arg_synopsis + self.subcommand_aliases[name] = aliases or [] + if hidden: # pragma: no cover + self.hidden_subcommands.add(name) + + def add_default_subcommands(self): + if 'help' not in self.subcommands: + self.add_subcommand('help', self.help) + if 'help-all' not in self.subcommands: + self.add_subcommand('help-all', self.help_all) + + def _help_helper(self, args, show_all): # pragma: no cover + try: + width = int(os.environ.get('COLUMNS', '78')) + except ValueError: + width = 78 + + fmt = cliapp.TextFormat(width=width) + + if args: + usage = self._format_usage_for(args[0]) + description = fmt.format(self._format_subcommand_help(args[0])) + text = '%s\n\n%s' % (usage, description) + else: + usage = self._format_usage(all=show_all) + description = fmt.format(self._format_description(all=show_all)) + text = '%s\n\n%s' % (usage, description) + + text = self.settings.progname.join(text.split('%prog')) + self.output.write(text) + + def help(self, args): # pragma: no cover + '''Print help.''' + self._help_helper(args, False) + + def help_all(self, args): # pragma: no cover + '''Print help, including hidden subcommands.''' + self._help_helper(args, True) + + def _subcommand_methodnames(self): + return [x + for x in dir(self) + if x.startswith('cmd_') and + inspect.ismethod(getattr(self, x))] + + def _normalize_cmd(self, cmd): + return 'cmd_%s' % cmd.replace('-', '_') + + def _unnormalize_cmd(self, method): + assert method.startswith('cmd_') + return method[len('cmd_'):].replace('_', '-') + + def _format_usage(self, all=False): + '''Format usage, possibly also subcommands, if any.''' + if self.subcommands: + lines = [] + prefix = 'Usage:' + for cmd in sorted(self.subcommands.keys()): + if all or cmd not in self.hidden_subcommands: + args = self.cmd_synopsis.get(cmd, '') or '' + lines.append( + '%s %%prog [options] %s %s' % (prefix, cmd, args)) + prefix = ' ' * len(prefix) + return '\n'.join(lines) + else: + return None + + def _format_usage_for(self, cmd): # pragma: no cover + args = self.cmd_synopsis.get(cmd, '') or '' + return 'Usage: %%prog [options] %s %s' % (cmd, args) + + def _format_description(self, all=False): + '''Format OptionParser description, with subcommand support.''' + if self.subcommands: + summaries = [] + for cmd in sorted(self.subcommands.keys()): + if all or cmd not in self.hidden_subcommands: + summaries.append(self._format_subcommand_summary(cmd)) + cmd_desc = ''.join(summaries) + return '%s\n%s' % (self._description or '', cmd_desc) + else: + return self._description + + def _format_subcommand_summary(self, cmd): # pragma: no cover + method = self.subcommands[cmd] + doc = method.__doc__ or '' + lines = doc.splitlines() + if lines: + summary = lines[0].strip() + else: + summary = '' + return '* %%prog %s: %s\n' % (cmd, summary) + + def _format_subcommand_help(self, cmd): # pragma: no cover + if cmd not in self.subcommands: + raise cliapp.AppException('Unknown subcommand %s' % cmd) + method = self.subcommands[cmd] + doc = method.__doc__ or '' + t = doc.split('\n', 1) + if len(t) == 1: + return doc + else: + first, rest = t + return first + '\n' + textwrap.dedent(rest) + + def setup_logging(self): # pragma: no cover + '''Set up logging.''' + + level_name = self.settings['log-level'] + levels = { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL, + 'fatal': logging.FATAL, + } + level = levels.get(level_name, logging.INFO) + + if self.settings['log'] == 'syslog': + handler = self.setup_logging_handler_for_syslog() + elif self.settings['log'] and self.settings['log'] != 'none': + handler = LogHandler( + self.settings['log'], + perms=int(self.settings['log-mode'], 8), + maxBytes=self.settings['log-max'], + backupCount=self.settings['log-keep'], + delay=False) + fmt = '%(asctime)s %(levelname)s %(message)s' + datefmt = '%Y-%m-%d %H:%M:%S' + formatter = logging.Formatter(fmt, datefmt) + handler.setFormatter(formatter) + else: + handler = self.setup_logging_handler_to_none() + # reduce amount of pointless I/O + level = logging.FATAL + + logger = logging.getLogger() + logger.addHandler(handler) + logger.setLevel(level) + + def setup_logging_handler_for_syslog(self): # pragma: no cover + '''Setup a logging.Handler for logging to syslog.''' + + handler = logging.handlers.SysLogHandler(address='/dev/log') + progname = '%%'.join(self.settings.progname.split('%')) + fmt = progname + ": %(levelname)s %(message)s" + formatter = logging.Formatter(fmt) + handler.setFormatter(formatter) + + return handler + + def setup_logging_handler_to_none(self): # pragma: no cover + '''Setup a logging.Handler that does not log anything anywhere.''' + + handler = logging.FileHandler('/dev/null') + return handler + + def setup_logging_handler_to_file(self): # pragma: no cover + '''Setup a logging.Handler for logging to a named file.''' + + handler = LogHandler( + self.settings['log'], + perms=int(self.settings['log-mode'], 8), + maxBytes=self.settings['log-max'], + backupCount=self.settings['log-keep'], + delay=False) + fmt = self.setup_logging_format() + datefmt = self.setup_logging_timestamp() + formatter = logging.Formatter(fmt, datefmt) + handler.setFormatter(formatter) + + return handler + + def setup_logging_format(self): # pragma: no cover + '''Return format string for log messages.''' + return '%(asctime)s %(levelname)s %(message)s' + + def setup_logging_timestamp(self): # pragma: no cover + '''Return timestamp format string for log message.''' + return '%Y-%m-%d %H:%M:%S' + + def log_config(self): + logging.info('%s version %s starts' % + (self.settings.progname, self.settings.version)) + logging.debug('sys.argv: %s' % sys.argv) + logging.debug('environment variables:') + for name in os.environ: + logging.debug('environment: %s=%s' % (name, os.environ[name])) + cp = self.settings.as_cp() + f = StringIO.StringIO() + cp.write(f) + logging.debug('Config:\n%s' % f.getvalue()) + logging.debug('Python version: %s' % sys.version) + + def app_directory(self): + '''Return the directory where the application class is defined. + + Plugins are searched relative to this directory, in the subdirectory + specified by self.plugin_subdir. + + ''' + + module_name = self.__class__.__module__ + module = sys.modules[module_name] + dirname = os.path.dirname(module.__file__) or '.' + return dirname + + def setup_plugin_manager(self): + '''Create a plugin manager.''' + self.pluginmgr = cliapp.PluginManager() + dirname = os.path.join(self.app_directory(), self.plugin_subdir) + self.pluginmgr.locations = [dirname] + + def enable_plugins(self): # pragma: no cover + '''Load plugins.''' + for plugin in self.pluginmgr.plugins: + plugin.app = self + plugin.setup() + self.pluginmgr.enable_plugins() + + def disable_plugins(self): + self.pluginmgr.disable_plugins() + + def parse_args(self, args, configs_only=False): + '''Parse the command line. + + Return list of non-option arguments. + + ''' + + return self.settings.parse_args( + args, configs_only=configs_only, arg_synopsis=self.arg_synopsis, + cmd_synopsis=self.cmd_synopsis, + compute_setting_values=self.compute_setting_values) + + def setup(self): + '''Prepare for process_args. + + This method is called just before enabling plugins. By default it + does nothing, but subclasses may override it with a suitable + implementation. This is easier than overriding process_args + itself. + + ''' + + def cleanup(self): + '''Clean up after process_args. + + This method is called just after process_args. By default it + does nothing, but subclasses may override it with a suitable + implementation. This is easier than overriding process_args + itself. + + ''' + + def process_args(self, args): + '''Process command line non-option arguments. + + The default is to call process_inputs with the argument list, + or to invoke the requested subcommand, if subcommands have + been defined. + + ''' + + + if self.subcommands: + if not args: + raise SystemExit('must give subcommand') + + cmd = args[0] + if cmd not in self.subcommands: + for name in self.subcommand_aliases: + if cmd in self.subcommand_aliases[name]: + cmd = name + break + else: + raise SystemExit('unknown subcommand %s' % args[0]) + + method = self.subcommands[cmd] + method(args[1:]) + else: + self.process_inputs(args) + + def process_inputs(self, args): + '''Process all arguments as input filenames. + + The default implementation calls process_input for each + input filename. If no filenames were given, then + process_input is called with ``-`` as the argument name. + This implements the usual Unix command line practice of + reading from stdin if no inputs are named. + + The attributes ``fileno``, ``global_lineno``, and ``lineno`` are set, + and count files and lines. The global line number is the + line number as if all input files were one. + + ''' + + for arg in args or ['-']: + self.process_input(arg) + + def open_input(self, name, mode='r'): + '''Open an input file for reading. + + The default behaviour is to open a file named on the local + filesystem. A subclass might override this behavior for URLs, + for example. + + The optional mode argument speficies the mode in which the file + gets opened. It should allow reading. Some files should perhaps + be opened in binary mode ('rb') instead of the default text mode. + + ''' + + if name == '-': + return sys.stdin + else: + return open(name, mode) + + def process_input(self, name, stdin=sys.stdin): + '''Process a particular input file. + + The ``stdin`` argument is meant for unit test only. + + ''' + + self.fileno += 1 + self.lineno = 0 + f = self.open_input(name) + for line in f: + self.global_lineno += 1 + self.lineno += 1 + self.process_input_line(name, line) + if f != stdin: + f.close() + + def process_input_line(self, filename, line): + '''Process one line of the input file. + + Applications that are line-oriented can redefine only this method in + a subclass, and should not need to care about the other methods. + + ''' + + def runcmd(self, *args, **kwargs): # pragma: no cover + return cliapp.runcmd(*args, **kwargs) + + def runcmd_unchecked(self, *args, **kwargs): # pragma: no cover + return cliapp.runcmd_unchecked(*args, **kwargs) + + def _vmrss(self): # pragma: no cover + '''Return current resident memory use, in KiB.''' + f = open('/proc/self/status') + rss = 0 + for line in f: + if line.startswith('VmRSS'): + rss = line.split()[1] + f.close() + return rss + + def dump_memory_profile(self, msg): # pragma: no cover + '''Log memory profiling information. + + Get the memory profiling method from the dump-memory-profile + setting, and log the results at DEBUG level. ``msg`` is a + message the caller provides to identify at what point the profiling + happens. + + ''' + + kind = self.settings['dump-memory-profile'] + interval = self.settings['memory-dump-interval'] + + if kind == 'none': + return + + now = time.time() + if self.last_memory_dump + interval > now: + return + self.last_memory_dump = now + + # Log wall clock and CPU times for self, children. + utime, stime, cutime, cstime, elapsed_time = os.times() + duration = elapsed_time - self._started + logging.debug('process duration: %s s' % duration) + logging.debug('CPU time, in process: %s s' % utime) + logging.debug('CPU time, in system: %s s' % stime) + logging.debug('CPU time, in children: %s s' % cutime) + logging.debug('CPU time, in system for children: %s s' % cstime) + + logging.debug('dumping memory profiling data: %s' % msg) + logging.debug('VmRSS: %s KiB' % self._vmrss()) + + if kind == 'simple': + return + + # These are fairly expensive operations, so we only log them + # if we're doing expensive stuff anyway. + logging.debug('# objects: %d' % len(gc.get_objects())) + logging.debug('# garbage: %d' % len(gc.garbage)) + + if kind == 'heapy': + from guppy import hpy + h = hpy() + logging.debug('memory profile:\n%s' % h.heap()) + elif kind == 'meliae': + filename = 'obnam-%d.meliae' % self.memory_dump_counter + logging.debug('memory profile: see %s' % filename) + from meliae import scanner + scanner.dump_all_objects(filename) + self.memory_dump_counter += 1 + diff --git a/cliapp/fmt.py b/cliapp/fmt.py new file mode 100644 index 00000000..1358b0ab --- /dev/null +++ b/cliapp/fmt.py @@ -0,0 +1,125 @@ +# Copyright (C) 2013 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +'''Simplistic text re-formatter. + +This module format text, paragraph by paragraph, so it is somewhat +nice-looking, with no line too long, and short lines joined. In +other words, like what the textwrap library does. However, it +extends textwrap by recognising bulleted lists. + +''' + + +import textwrap + + +class Paragraph(object): + + def __init__(self): + self._lines = [] + + def append(self, line): + self._lines.append(line) + + def _oneliner(self): + return ' '.join(' '.join(x.split()) for x in self._lines) + + def fill(self, width): + filled = textwrap.fill(self._oneliner(), width=width) + return filled + + +class BulletPoint(Paragraph): + + def fill(self, width): + text = self._oneliner() + assert text.startswith('* ') + filled = textwrap.fill(text[2:], width=width - 2) + lines = [' %s' % x for x in filled.splitlines(True)] + lines[0] = '* %s' % lines[0][2:] + return ''.join(lines) + + +class EmptyLine(Paragraph): + + def fill(self, width): + return '' + + +class TextFormat(object): + + def __init__(self, width=78): + self._width = width + + def format(self, text): + '''Return input string, but formatted nicely.''' + + filled_paras = [] + for para in self._paragraphs(text): + filled_paras.append(para.fill(self._width)) + filled = '\n'.join(filled_paras) + if text and not filled.endswith('\n'): + filled += '\n' + return filled + + def _paragraphs(self, text): + + def is_empty(line): + return line.strip() == '' + + def is_bullet(line): + return line.startswith('* ') + + def is_continuation(line): + return line.startswith(' ') + + current = None + in_list = False + for line in text.splitlines(True): + if in_list and is_continuation(line): + assert current is not None + current.append(line) + elif is_bullet(line): + if current: + yield current + if not in_list: + yield EmptyLine() + current = BulletPoint() + current.append(line) + in_list = True + elif is_empty(line): + if current: + yield current + yield EmptyLine() + current = None + in_list = False + else: + if in_list: + yield current + yield EmptyLine() + current = None + + if not current: + current = Paragraph() + current.append(line) + in_list = False + + if current: + yield current + + diff --git a/cliapp/genman.py b/cliapp/genman.py new file mode 100644 index 00000000..e81fcfee --- /dev/null +++ b/cliapp/genman.py @@ -0,0 +1,148 @@ +# Copyright (C) 2011 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import optparse +import re + + +class ManpageGenerator(object): + + '''Fill in a manual page template from an OptionParser instance.''' + + def __init__(self, template, parser, arg_synopsis, cmd_synopsis): + self.template = template + self.parser = parser + self.arg_synopsis = arg_synopsis + self.cmd_synopsis = cmd_synopsis + + def sort_options(self, options): + return sorted(options, + key=lambda o: (o._long_opts + o._short_opts)[0]) + + def option_list(self, container): + return self.sort_options(container.option_list) + + @property + def options(self): + return self.option_list(self.parser) + + def format_template(self): + sections = (('SYNOPSIS', self.format_synopsis()), + ('OPTIONS', self.format_options())) + text = self.template + for section, contents in sections: + pattern = '\n.SH %s\n' % section + text = text.replace(pattern, pattern + contents) + return text + + def format_synopsis(self): + lines = [] + lines += ['.nh'] + lines += ['.B %s' % self.esc_dashes(self.parser.prog)] + + all_options = self.option_list(self.parser) + for group in self.parser.option_groups: + all_options += self.option_list(group) + for option in self.sort_options(all_options): + for spec in self.format_option_for_synopsis(option): + lines += ['.RB [ %s ]' % spec] + + if self.cmd_synopsis: + lines += ['.PP'] + for cmd in sorted(self.cmd_synopsis): + lines += ['.br', + '.B %s' % self.esc_dashes(self.parser.prog), + '.RI [ options ]', + self.esc_dashes(cmd)] + lines += self.format_argspec(self.cmd_synopsis[cmd]) + elif self.arg_synopsis: + lines += self.format_argspec(self.arg_synopsis) + + lines += ['.hy'] + return ''.join('%s\n' % line for line in lines) + + def format_option_for_synopsis(self, option): + if option.metavar: + suffix = '\\fR=\\fI%s' % self.esc_dashes(option.metavar) + else: + suffix = '' + for name in option._short_opts + option._long_opts: + yield '%s%s' % (self.esc_dashes(name), suffix) + + def format_options(self): + lines = [] + + for option in self.sort_options(self.parser.option_list): + lines += self.format_option_for_options(option) + + for group in self.parser.option_groups: + lines += ['.SS "%s"' % group.title] + for option in self.sort_options(group.option_list): + lines += self.format_option_for_options(option) + + return ''.join('%s\n' % line for line in lines) + + def format_option_for_options(self, option): + lines = [] + lines += ['.TP'] + shorts = [self.esc_dashes(x) for x in option._short_opts] + if option.metavar: + longs = ['%s =\\fI%s' % (self.esc_dashes(x), option.metavar) + for x in option._long_opts] + else: + longs = ['%s' % self.esc_dashes(x) + for x in option._long_opts] + lines += ['.BR ' + ' ", " '.join(shorts + longs)] + lines += [self.esc_dots(self.expand_default(option).strip())] + return lines + + def expand_default(self, option): + default = self.parser.defaults.get(option.dest) + if default is optparse.NO_DEFAULT or default is None: + default = 'none' + else: + default = str(default) + return option.help.replace('%default', default) + + def esc_dashes(self, optname): + return '\\-'.join(optname.split('-')) + + def esc_dots(self, line): + if line.startswith('.'): + return '\\' + line + else: + return line + + def format_argspec(self, argspec): + roman = re.compile(r'[^A-Z]+') + italic = re.compile(r'[A-Z]+') + words = ['.RI'] + while argspec: + m = roman.match(argspec) + if m: + words += [self.esc_dashes(m.group(0))] + argspec = argspec[m.end():] + else: + words += ['""'] + m = italic.match(argspec) + if m: + words += [self.esc_dashes(m.group(0))] + argspec = argspec[m.end():] + else: + words += ['""'] + return [' '.join(words)] + diff --git a/cliapp/hook.py b/cliapp/hook.py new file mode 100644 index 00000000..d453f2e6 --- /dev/null +++ b/cliapp/hook.py @@ -0,0 +1,76 @@ +# Copyright (C) 2009-2012 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +'''Hooks with callbacks. + +In order to de-couple parts of the application, especially when plugins +are used, hooks can be used. A hook is a location in the application +code where plugins may want to do something. Each hook has a name and +a list of callbacks. The application defines the name and the location +where the hook will be invoked, and the plugins (or other parts of the +application) will register callbacks. + +''' + + +class Hook(object): + + '''A hook.''' + + def __init__(self): + self.callbacks = [] + + def add_callback(self, callback): + '''Add a callback to this hook. + + Return an identifier that can be used to remove this callback. + + ''' + + if callback not in self.callbacks: + self.callbacks.append(callback) + return callback + + def call_callbacks(self, *args, **kwargs): + '''Call all callbacks with the given arguments.''' + for callback in self.callbacks: + callback(*args, **kwargs) + + def remove_callback(self, callback_id): + '''Remove a specific callback.''' + if callback_id in self.callbacks: + self.callbacks.remove(callback_id) + + +class FilterHook(Hook): + + '''A hook which filters data through callbacks. + + Every hook of this type accepts a piece of data as its first argument + Each callback gets the return value of the previous one as its + argument. The caller gets the value of the final callback. + + Other arguments (with or without keywords) are passed as-is to + each callback. + + ''' + + def call_callbacks(self, data, *args, **kwargs): + for callback in self.callbacks: + data = callback(data, *args, **kwargs) + return data + diff --git a/cliapp/hookmgr.py b/cliapp/hookmgr.py new file mode 100644 index 00000000..80db2d0a --- /dev/null +++ b/cliapp/hookmgr.py @@ -0,0 +1,46 @@ +# Copyright (C) 2009-2012 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +class HookManager(object): + + '''Manage the set of hooks the application defines.''' + + def __init__(self): + self.hooks = {} + + def new(self, name, hook): + '''Add a new hook to the manager. + + If a hook with that name already exists, nothing happens. + + ''' + + if name not in self.hooks: + self.hooks[name] = hook + + def add_callback(self, name, callback): + '''Add a callback to a named hook.''' + return self.hooks[name].add_callback(callback) + + def remove_callback(self, name, callback_id): + '''Remove a specific callback from a named hook.''' + self.hooks[name].remove_callback(callback_id) + + def call(self, name, *args, **kwargs): + '''Call callbacks for a named hook, using given arguments.''' + return self.hooks[name].call_callbacks(*args, **kwargs) + diff --git a/cliapp/plugin.py b/cliapp/plugin.py new file mode 100644 index 00000000..c6ea66fe --- /dev/null +++ b/cliapp/plugin.py @@ -0,0 +1,125 @@ +# Copyright (C) 2009-2012 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +'''A generic plugin manager. + +The plugin manager finds files with plugins and loads them. It looks +for plugins in a number of locations specified by the caller. To add +a plugin to be loaded, it is enough to put it in one of the locations, +and name it *_plugin.py. (The naming convention is to allow having +other modules as well, such as unit tests, in the same locations.) + +''' + + +import imp +import inspect +import os + + +class Plugin(object): + + '''Base class for plugins. + + A plugin MUST NOT have any side effects when it is instantiated. + This is necessary so that it can be safely loaded by unit tests, + and so that a user interface can allow the user to disable it, + even if it is installed, with no ill effects. Any side effects + that would normally happen should occur in the enable() method, + and be undone by the disable() method. These methods must be + callable any number of times. + + The subclass MAY define the following attributes: + + * name + * description + * version + * required_application_version + + name is the user-visible identifier for the plugin. It defaults + to the plugin's classname. + + description is the user-visible description of the plugin. It may + be arbitrarily long, and can use pango markup language. Defaults + to the empty string. + + version is the plugin version. Defaults to '0.0.0'. It MUST be a + sequence of integers separated by periods. If several plugins with + the same name are found, the newest version is used. Versions are + compared integer by integer, starting with the first one, and a + missing integer treated as a zero. If two plugins have the same + version, either might be used. + + required_application_version gives the version of the minimal + application version the plugin is written for. The first integer + must match exactly: if the application is version 2.3.4, the + plugin's required_application_version must be at least 2 and + at most 2.3.4 to be loaded. Defaults to 0. + + ''' + + @property + def name(self): + return self.__class__.__name__ + + @property + def description(self): + return '' + + @property + def version(self): + return '0.0.0' + + @property + def required_application_version(self): + return '0.0.0' + + def setup(self): + '''Setup plugin. + + This is called at plugin load time. It should not yet enable the + plugin (the ``enable`` method does that), but it might do things + like add itself into a hook that adds command line arguments + to the application. + + ''' + + def enable_wrapper(self): + '''Enable plugin. + + The plugin manager will call this method, which then calls the + enable method. Plugins should implement the enable method. + The wrapper method is there to allow an application to provide + an extended base class that does some application specific + magic when plugins are enabled or disabled. + + ''' + + self.enable() + + def disable_wrapper(self): + '''Corresponds to enable_wrapper, but for disabling a plugin.''' + self.disable() + + def enable(self): + '''Enable the plugin.''' + raise NotImplemented() + + def disable(self): + '''Disable the plugin.''' + raise NotImplemented() + diff --git a/cliapp/pluginmgr.py b/cliapp/pluginmgr.py new file mode 100644 index 00000000..5e995236 --- /dev/null +++ b/cliapp/pluginmgr.py @@ -0,0 +1,175 @@ +# Copyright (C) 2009-2012 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +'''A generic plugin manager. + +The plugin manager finds files with plugins and loads them. It looks +for plugins in a number of locations specified by the caller. To add +a plugin to be loaded, it is enough to put it in one of the locations, +and name it *_plugin.py. (The naming convention is to allow having +other modules as well, such as unit tests, in the same locations.) + +''' + + +import imp +import inspect +import os + + +from cliapp import Plugin + + +class PluginManager(object): + + '''Manage plugins. + + This class finds and loads plugins, and keeps a list of them that + can be accessed in various ways. + + The locations are set via the locations attribute, which is a list. + + When a plugin is loaded, an instance of its class is created. This + instance is initialized using normal and keyword arguments specified + in the plugin manager attributes plugin_arguments and + plugin_keyword_arguments. + + The version of the application using the plugin manager is set via + the application_version attribute. This defaults to '0.0.0'. + + ''' + + suffix = '_plugin.py' + + def __init__(self): + self.locations = [] + self._plugins = None + self._plugin_files = None + self.plugin_arguments = [] + self.plugin_keyword_arguments = {} + self.application_version = '0.0.0' + + @property + def plugin_files(self): + if self._plugin_files is None: + self._plugin_files = self.find_plugin_files() + return self._plugin_files + + @property + def plugins(self): + if self._plugins is None: + self._plugins = self.load_plugins() + return self._plugins + + def __getitem__(self, name): + for plugin in self.plugins: + if plugin.name == name: + return plugin + raise KeyError('Plugin %s is not known' % name) + + def find_plugin_files(self): + '''Find files that may contain plugins. + + This finds all files named *_plugin.py in all locations. + The returned list is sorted. + + ''' + + pathnames = [] + + for location in self.locations: + try: + basenames = os.listdir(location) + except os.error: + continue + for basename in basenames: + s = os.path.join(location, basename) + if s.endswith(self.suffix) and os.path.exists(s): + pathnames.append(s) + + return sorted(pathnames) + + def load_plugins(self): + '''Load plugins from all plugin files.''' + + plugins = dict() + + for pathname in self.plugin_files: + for plugin in self.load_plugin_file(pathname): + if plugin.name in plugins: + p = plugins[plugin.name] + if self.is_older(p.version, plugin.version): + plugins[plugin.name] = plugin + else: + plugins[plugin.name] = plugin + + return plugins.values() + + def is_older(self, version1, version2): + '''Is version1 older than version2?''' + return self.parse_version(version1) < self.parse_version(version2) + + def load_plugin_file(self, pathname): + '''Return plugin classes in a plugin file.''' + + name, ext = os.path.splitext(os.path.basename(pathname)) + f = file(pathname, 'r') + module = imp.load_module(name, f, pathname, + ('.py', 'r', imp.PY_SOURCE)) + f.close() + + plugins = [] + for dummy, member in inspect.getmembers(module, inspect.isclass): + if issubclass(member, Plugin): + p = member(*self.plugin_arguments, + **self.plugin_keyword_arguments) + if self.compatible_version(p.required_application_version): + plugins.append(p) + + return plugins + + def compatible_version(self, required_application_version): + '''Check that the plugin is version-compatible with the application. + + This checks the plugin's required_application_version against + the declared application version and returns True if they are + compatible, and False if not. + + ''' + + req = self.parse_version(required_application_version) + app = self.parse_version(self.application_version) + + return app[0] == req[0] and app >= req + + def parse_version(self, version): + '''Parse a string represenation of a version into list of ints.''' + + return [int(s) for s in version.split('.')] + + def enable_plugins(self, plugins=None): + '''Enable all or selected plugins.''' + + for plugin in plugins or self.plugins: + plugin.enable_wrapper() + + def disable_plugins(self, plugins=None): + '''Disable all or selected plugins.''' + + for plugin in plugins or self.plugins: + plugin.disable_wrapper() + diff --git a/cliapp/runcmd.py b/cliapp/runcmd.py new file mode 100644 index 00000000..6304a488 --- /dev/null +++ b/cliapp/runcmd.py @@ -0,0 +1,284 @@ +# Copyright (C) 2011, 2012 Lars Wirzenius +# Copyright (C) 2012 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import errno +import fcntl +import logging +import os +import select +import subprocess + +import cliapp + + +def runcmd(argv, *args, **kwargs): + '''Run external command or pipeline. + + Example: ``runcmd(['grep', 'foo'], ['wc', '-l'], + feed_stdin='foo\nbar\n')`` + + Return the standard output of the command. + + Raise ``cliapp.AppException`` if external command returns + non-zero exit code. ``*args`` and ``**kwargs`` are passed + onto ``subprocess.Popen``. + + ''' + + our_options = ( + ('ignore_fail', False), + ('log_error', True), + ) + opts = {} + for name, default in our_options: + opts[name] = default + if name in kwargs: + opts[name] = kwargs[name] + del kwargs[name] + + exit, out, err = runcmd_unchecked(argv, *args, **kwargs) + if exit != 0: + msg = 'Command failed: %s\n%s' % (' '.join(argv), err) + if opts['ignore_fail']: + if opts['log_error']: + logging.info(msg) + else: + if opts['log_error']: + logging.error(msg) + raise cliapp.AppException(msg) + return out + +def runcmd_unchecked(argv, *argvs, **kwargs): + '''Run external command or pipeline. + + Return the exit code, and contents of standard output and error + of the command. + + See also ``runcmd``. + + ''' + + argvs = [argv] + list(argvs) + logging.debug('run external command: %s' % repr(argvs)) + + def pop_kwarg(name, default): + if name in kwargs: + value = kwargs[name] + del kwargs[name] + return value + else: + return default + + feed_stdin = pop_kwarg('feed_stdin', '') + pipe_stdin = pop_kwarg('stdin', subprocess.PIPE) + pipe_stdout = pop_kwarg('stdout', subprocess.PIPE) + pipe_stderr = pop_kwarg('stderr', subprocess.PIPE) + + try: + pipeline = _build_pipeline(argvs, + pipe_stdin, + pipe_stdout, + pipe_stderr, + kwargs) + return _run_pipeline(pipeline, feed_stdin, pipe_stdin, + pipe_stdout, pipe_stderr) + except OSError, e: # pragma: no cover + if e.errno == errno.ENOENT and e.filename is None: + e.filename = argv[0] + raise e + else: + raise + +def _build_pipeline(argvs, pipe_stdin, pipe_stdout, pipe_stderr, kwargs): + procs = [] + for i, argv in enumerate(argvs): + if i == 0 and i == len(argvs) - 1: + stdin = pipe_stdin + stdout = pipe_stdout + stderr = pipe_stderr + elif i == 0: + stdin = pipe_stdin + stdout = subprocess.PIPE + stderr = pipe_stderr + elif i == len(argvs) - 1: + stdin = procs[-1].stdout + stdout = pipe_stdout + stderr = pipe_stderr + else: + stdin = procs[-1].stdout + stdout = subprocess.PIPE + stderr = pipe_stderr + p = subprocess.Popen(argv, stdin=stdin, stdout=stdout, + stderr=stderr, close_fds=True, **kwargs) + procs.append(p) + + return procs + +def _run_pipeline(procs, feed_stdin, pipe_stdin, pipe_stdout, pipe_stderr): + + stdout_eof = False + stderr_eof = False + out = [] + err = [] + pos = 0 + io_size = 1024 + + def set_nonblocking(fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + + if feed_stdin and pipe_stdin == subprocess.PIPE: + set_nonblocking(procs[0].stdin.fileno()) + if pipe_stdout == subprocess.PIPE: + set_nonblocking(procs[-1].stdout.fileno()) + if pipe_stderr == subprocess.PIPE: + set_nonblocking(procs[-1].stderr.fileno()) + + def still_running(): + for p in procs: + p.poll() + for p in procs: + if p.returncode is None: + return True + if pipe_stdout == subprocess.PIPE and not stdout_eof: + return True + if pipe_stderr == subprocess.PIPE and not stderr_eof: + return True # pragma: no cover + return False + + while still_running(): + rlist = [] + if not stdout_eof and pipe_stdout == subprocess.PIPE: + rlist.append(procs[-1].stdout) + if not stderr_eof and pipe_stderr == subprocess.PIPE: + rlist.append(procs[-1].stderr) + + wlist = [] + if pipe_stdin == subprocess.PIPE and pos < len(feed_stdin): + wlist.append(procs[0].stdin) + + if rlist or wlist: + try: + r, w, x = select.select(rlist, wlist, []) + except select.error, e: # pragma: no cover + err, msg = e.args + if err == errno.EINTR: + break + raise + else: + break # Let's not busywait waiting for processes to die. + + if procs[0].stdin in w and pos < len(feed_stdin): + data = feed_stdin[pos : pos+io_size] + procs[0].stdin.write(data) + pos += len(data) + if pos >= len(feed_stdin): + procs[0].stdin.close() + + if procs[-1].stdout in r: + data = procs[-1].stdout.read(io_size) + if data: + out.append(data) + else: + stdout_eof = True + + if procs[-1].stderr in r: + data = procs[-1].stderr.read(io_size) + if data: + err.append(data) + else: + stderr_eof = True + + while still_running(): + for p in procs: + if p.returncode is None: + p.wait() + + errorcodes = [p.returncode for p in procs if p.returncode != 0] or [0] + return errorcodes[-1], ''.join(out), ''.join(err) + + + +def shell_quote(s): + '''Return a shell-quoted version of s.''' + + lower_ascii = 'abcdefghijklmnopqrstuvwxyz' + upper_ascii = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + digits = '0123456789' + punctuation = '-_/=.,:' + safe = set(lower_ascii + upper_ascii + digits + punctuation) + + quoted = [] + for c in s: + if c in safe: + quoted.append(c) + elif c == "'": + quoted.append('"\'"') + else: + quoted.append("'%c'" % c) + + return ''.join(quoted) + + +def ssh_runcmd(target, argv, **kwargs): # pragma: no cover + '''Run command in argv on remote host target. + + This is similar to runcmd, but the command is run on the remote + machine. The command is given as an argv array; elements in the + array are automatically quoted so they get passed to the other + side correctly. + + An optional ``tty=`` parameter can be passed to ``ssh_runcmd`` in + order to force or disable pseudo-tty allocation. This is often + required to run ``sudo`` on another machine and might be useful + in other situations as well. Supported values are ``tty=True`` for + forcing tty allocation, ``tty=False`` for disabling it and + ``tty=None`` for not passing anything tty related to ssh. + + With the ``tty`` option, + ``cliapp.runcmd(['ssh', '-tt', 'user@host', '--', 'sudo', 'ls'])`` + can be written as + ``cliapp.ssh_runcmd('user@host', ['sudo', 'ls'], tty=True)`` + which is more intuitive. + + The target is given as-is to ssh, and may use any syntax ssh + accepts. + + Environment variables may or may not be passed to the remote + machine: this is dependent on the ssh and sshd configurations. + Invoke env(1) explicitly to pass in the variables you need to + exist on the other end. + + Pipelines are not supported. + + ''' + + tty = kwargs.get('tty', None) + if tty: + ssh_cmd = ['ssh', '-tt', target, '--'] + elif tty is False: + ssh_cmd = ['ssh', '-T', target, '--'] + else: + ssh_cmd = ['ssh', target, '--'] + if 'tty' in kwargs: + del kwargs['tty'] + + local_argv = ssh_cmd + map(shell_quote, argv) + return runcmd(local_argv, **kwargs) + diff --git a/cliapp/settings.py b/cliapp/settings.py new file mode 100644 index 00000000..b2d0abb0 --- /dev/null +++ b/cliapp/settings.py @@ -0,0 +1,772 @@ +# Copyright (C) 2009-2012 Lars Wirzenius +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import ConfigParser +import optparse +import os +import re +import sys + +import cliapp +from cliapp.genman import ManpageGenerator + + +log_group_name = 'Logging' +config_group_name = 'Configuration files and settings' +perf_group_name = 'Peformance' + +default_group_names = [ + log_group_name, + config_group_name, + perf_group_name, +] + + +class Setting(object): + + action = 'store' + type = 'string' + nargs = 1 + choices = None + + def __init__( + self, names, default, help, metavar=None, group=None, hidden=False): + self.names = names + self.set_value(default) + self.help = help + self.metavar = metavar or self.default_metavar() + self.group = group + self.hidden = hidden + + def default_metavar(self): + return None + + def get_value(self): + return self._string_value + + def set_value(self, value): + self._string_value = value + + def call_get_value(self): + return self.get_value() + + def call_set_value(self, value): + self.set_value(value) + + value = property(call_get_value, call_set_value) + + def has_value(self): + return self.value is not None + + def parse_value(self, string): + self.value = string + + def format(self): # pragma: no cover + return str(self.value) + + +class StringSetting(Setting): + + def default_metavar(self): + return self.names[0].upper() + + +class StringListSetting(Setting): + + action = 'append' + + def __init__( + self, names, default, help, metavar=None, group=None, hidden=False): + Setting.__init__( + self, names, [], help, metavar=metavar, group=group, hidden=hidden) + self.default = default + self.using_default_value = True + + def default_metavar(self): + return self.names[0].upper() + + def get_value(self): + if self._string_value.strip(): + return [s.strip() for s in self._string_value.split(',')] + else: + return self.default + + def set_value(self, strings): + self._string_value = ','.join(strings) + self.using_default_value = False + + def has_value(self): + return self.value != [] + + def parse_value(self, string): + self.value = [s.strip() for s in string.split(',')] + + def format(self): # pragma: no cover + return ', '.join(self.value) + + +class ChoiceSetting(Setting): + + type = 'choice' + + def __init__( + self, names, choices, help, metavar=None, group=None, hidden=False): + Setting.__init__( + self, names, choices[0], help, metavar=metavar, group=group, + hidden=hidden) + self.choices = choices + + def default_metavar(self): + return self.names[0].upper() + + +class BooleanSetting(Setting): + + action = 'store_true' + nargs = None + type = None + + _trues = ['yes', 'on', '1', 'true'] + _false = 'no' + + def get_value(self): + return self._string_value.lower() in self._trues + + def set_value(self, value): + def is_true(): + if value is True or value is False: + return value + if type(value) in [str, unicode]: + return value.lower() in self._trues + return value + if is_true(): + self._string_value = self._trues[0] + else: + self._string_value = self._false + + +class ByteSizeSetting(Setting): + + def parse_human_size(self, size): + '''Parse a size using suffix into plain bytes.''' + + m = re.match(r'''(?P<number>\d+(\.\d+)?) \s* + (?P<unit>k|ki|m|mi|g|gi|t|ti)? b? \s*$''', + size.lower(), flags=re.X) + if not m: + return 0 + else: + number = float(m.group('number')) + unit = m.group('unit') + units = { + 'k': 10**3, + 'm': 10**6, + 'g': 10**9, + 't': 10**12, + 'ki': 2**10, + 'mi': 2**20, + 'gi': 2**30, + 'ti': 2**40, + } + return long(number * units.get(unit, 1)) + + def default_metavar(self): + return 'SIZE' + + def get_value(self): + return long(self._string_value) + + def set_value(self, value): + if type(value) == str: + value = self.parse_human_size(value) + self._string_value = str(value) + + +class IntegerSetting(Setting): + + type = 'int' + + def default_metavar(self): + return self.names[0].upper() + + def get_value(self): + return long(self._string_value) + + def set_value(self, value): + self._string_value = str(value) + + +class FormatHelpParagraphs(optparse.IndentedHelpFormatter): + + def _format_text(self, text): # pragma: no cover + '''Like the default, except handle paragraphs.''' + + fmt = cliapp.TextFormat(width=self.width) + formatted = fmt.format(text) + return formatted.rstrip('\n') + + +class Settings(object): + + '''Settings for a cliapp application. + + You probably don't need to create a settings object yourself, + since ``cliapp.Application`` does it for you. + + Settings are read from configuration files, and parsed from the + command line. Every setting has a type, name, and help text, + and may have a default value as well. + + For example:: + + settings.boolean(['verbose', 'v'], 'show what is going on') + + This would create a new setting, ``verbose``, with a shorter alias + ``v``. On the command line, the options ``--verbose`` and + ``-v`` would work equally well. There can be any number of aliases. + + The help text is shown if the user uses ``--help`` or + ``--generate-manpage``. + You can use the ``metavar`` keyword argument to set the name shown + in the generated option lists; the default name is whatever + ``optparse`` decides (i.e., name of option). + + Use ``load_configs`` to read configuration files, and + ``parse_args`` to parse command line arguments. + + The current value of a setting can be accessed by indexing + the settings class:: + + settings['verbose'] + + The list of configuration files for the appliation is stored + in ``config_files``. Add or remove from the list if you wish. + The files need to exist: those that don't are silently ignored. + + ''' + + def __init__(self, progname, version, usage=None, description=None, + epilog=None): + self._settingses = dict() + self._canonical_names = list() + + self.version = version + self.progname = progname + self.usage = usage + self.description = description + self.epilog = epilog + + self._add_default_settings() + + self._config_files = None + self._cp = ConfigParser.ConfigParser() + + def _add_default_settings(self): + self.string(['output'], + 'write output to FILE, instead of standard output', + metavar='FILE') + + self.string(['log'], + 'write log entries to FILE (default is to not write log ' + 'files at all); use "syslog" to log to system log, ' + 'or "none" to disable logging', + metavar='FILE', group=log_group_name) + self.choice(['log-level'], + ['debug', 'info', 'warning', 'error', 'critical', 'fatal'], + 'log at LEVEL, one of debug, info, warning, ' + 'error, critical, fatal (default: %default)', + metavar='LEVEL', group=log_group_name) + self.bytesize(['log-max'], + 'rotate logs larger than SIZE, ' + 'zero for never (default: %default)', + metavar='SIZE', default=0, group=log_group_name) + self.integer(['log-keep'], 'keep last N logs (%default)', + metavar='N', default=10, group=log_group_name) + self.string(['log-mode'], + 'set permissions of new log files to MODE (octal; ' + 'default %default)', + metavar='MODE', default='0600', group=log_group_name) + + self.choice(['dump-memory-profile'], + ['simple', 'none', 'meliae', 'heapy'], + 'make memory profiling dumps using METHOD, which is one ' + 'of: none, simple, meliae, or heapy ' + '(default: %default)', + metavar='METHOD', + group=perf_group_name) + self.integer(['memory-dump-interval'], + 'make memory profiling dumps at least SECONDS apart', + metavar='SECONDS', + default=300, + group=perf_group_name) + + def _add_setting(self, setting): + '''Add a setting to self._cp.''' + + self._canonical_names.append(setting.names[0]) + for name in setting.names: + self._settingses[name] = setting + + def string(self, names, help, default='', **kwargs): + '''Add a setting with a string value.''' + self._add_setting(StringSetting(names, default, help, **kwargs)) + + def string_list(self, names, help, default=None, **kwargs): + '''Add a setting which have multiple string values. + + An example would be an option that can be given multiple times + on the command line, e.g., "--exclude=foo --exclude=bar". + + ''' + + self._add_setting(StringListSetting(names, default or [], help, + **kwargs)) + + def choice(self, names, possibilities, help, **kwargs): + '''Add a setting which chooses from list of acceptable values. + + An example would be an option to set debugging level to be + one of a set of accepted names: debug, info, warning, etc. + + The default value is the first possibility. + + ''' + + self._add_setting(ChoiceSetting(names, possibilities, help, **kwargs)) + + def boolean(self, names, help, default=False, **kwargs): + '''Add a setting with a boolean value.''' + self._add_setting(BooleanSetting(names, default, help, **kwargs)) + + def bytesize(self, names, help, default=0, **kwargs): + '''Add a setting with a size in bytes. + + The user can use suffixes for kilo/mega/giga/tera/kibi/mibi/gibi/tibi. + + ''' + + self._add_setting(ByteSizeSetting(names, default, help, **kwargs)) + + def integer(self, names, help, default=0, **kwargs): + '''Add an integer setting.''' + self._add_setting(IntegerSetting(names, default, help, **kwargs)) + + def __getitem__(self, name): + return self._settingses[name].value + + def __setitem__(self, name, value): + self._settingses[name].value = value + + def __contains__(self, name): + return name in self._settingses + + def __iter__(self): + '''Iterate over canonical settings names.''' + for name in self._canonical_names: + yield name + + def keys(self): + '''Return canonical settings names.''' + return self._canonical_names[:] + + def require(self, name): + '''Raise exception if setting has not been set. + + Option must have a value, and a default value is OK. + + ''' + + if not self._settingses[name].has_value(): + raise cliapp.AppException('Setting %s has no value, ' + 'but one is required' % name) + + def _option_names(self, names): + '''Turn setting names into option names. + + Names with a single letter are short options, and get prefixed + with one dash. The rest get prefixed with two dashes. + + ''' + + return ['--%s' % name if len(name) > 1 else '-%s' % name + for name in names] + + def _destname(self, name): + name = '_'.join(name.split('-')) + return name + + def build_parser(self, configs_only=False, arg_synopsis=None, + cmd_synopsis=None, deferred_last=[], all_options=False, + add_help_option=True): + '''Build OptionParser for parsing command line.''' + + # Call a callback function unless we're in configs_only mode. + maybe = lambda func: (lambda *args: None) if configs_only else func + + + # Maintain lists of callback function calls that are deferred. + # We call them ourselves rather than have OptionParser call them + # directly so that we can do things like --dump-config only + # after the whole command line is parsed. + + def defer_last(func): # pragma: no cover + def callback(*args): + deferred_last.append(lambda: func(*args)) + return callback + + # Create the command line parser. + + def getit(x): + if x is None or type(x) in [str, unicode]: + return x + else: + return x() + usage = getit(self.usage) + description = getit(self.description) + p = optparse.OptionParser(prog=self.progname, version=self.version, + formatter=FormatHelpParagraphs(), + usage=usage, + description=description, + epilog=self.epilog, + add_help_option=add_help_option) + + # Create all OptionGroup objects. This way, the user code can + # add settings to built-in option groups. + + group_names = set(default_group_names) + for name in self._canonical_names: + s = self._settingses[name] + if s.group is not None: + group_names.add(s.group) + group_names = sorted(group_names) + + option_groups = {} + for name in group_names: + group = optparse.OptionGroup(p, name) + p.add_option_group(group) + option_groups[name] = group + + config_group = option_groups[config_group_name] + + # Return help text, unless setting/option is hidden, in which + # case return optparse.SUPPRESS_HELP. + + def help_text(text, hidden): + if all_options or not hidden: + return text + else: + return optparse.SUPPRESS_HELP + + # Add --dump-setting-names. + + def dump_setting_names(*args): # pragma: no cover + for name in self._canonical_names: + sys.stdout.write('%s\n' % name) + sys.exit(0) + + config_group.add_option('--dump-setting-names', + action='callback', + nargs=0, + callback=defer_last(maybe(dump_setting_names)), + help=help_text( + 'write out all names of settings and quit', True)) + + # Add --dump-config. + + def call_dump_config(*args): # pragma: no cover + self.dump_config(sys.stdout) + sys.exit(0) + + config_group.add_option('--dump-config', + action='callback', + nargs=0, + callback=defer_last(maybe(call_dump_config)), + help='write out the entire current configuration') + + # Add --no-default-configs. + + def reset_configs(option, opt_str, value, parser): + self.config_files = [] + + config_group.add_option('--no-default-configs', + action='callback', + nargs=0, + callback=reset_configs, + help='clear list of configuration files to read') + + # Add --config. + + def append_to_configs(option, opt_str, value, parser): + self.config_files.append(value) + + config_group.add_option('--config', + action='callback', + nargs=1, + type='string', + callback=append_to_configs, + help='add FILE to config files', + metavar='FILE') + + # Add --list-config-files. + + def list_config_files(*args): # pragma: no cover + for filename in self.config_files: + print filename + sys.exit(0) + + config_group.add_option('--list-config-files', + action='callback', + nargs=0, + callback=defer_last(maybe(list_config_files)), + help=help_text('list all possible config files', True)) + + # Add --generate-manpage. + + self._arg_synopsis = arg_synopsis + self._cmd_synopsis = cmd_synopsis + p.add_option('--generate-manpage', + action='callback', + nargs=1, + type='string', + callback=maybe(self._generate_manpage), + help=help_text('fill in manual page TEMPLATE', True), + metavar='TEMPLATE') + + # Add --help-all. + + def help_all(*args): # pragma: no cover + pp = self.build_parser( + configs_only=configs_only, + arg_synopsis=arg_synopsis, + cmd_synopsis=cmd_synopsis, + all_options=True) + sys.stdout.write(pp.format_help()) + sys.exit(0) + + if add_help_option: + config_group.add_option( + '--help-all', + action='callback', + help='show all options', + callback=defer_last(maybe(help_all))) + + # Add other options, from the user-defined and built-in + # settingses. + + def set_value(option, opt_str, value, parser, setting): + if setting.action == 'append': + if setting.using_default_value: + setting.value = [value] + else: + setting.value += [value] + elif setting.action == 'store_true': + setting.value = True + else: + assert setting.action == 'store' + setting.value = value + + def set_false(option, opt_str, value, parser, setting): + setting.value = False + + def add_option(obj, s): + option_names = self._option_names(s.names) + obj.add_option(*option_names, + action='callback', + callback=maybe(set_value), + callback_args=(s,), + type=s.type, + nargs=s.nargs, + choices=s.choices, + help=help_text(s.help, s.hidden), + metavar=s.metavar) + + def add_negation_option(obj, s): + option_names = self._option_names(s.names) + long_names = [x for x in option_names if x.startswith('--')] + neg_names = ['--no-' + x[2:] for x in long_names] + unused_names = [x for x in neg_names + if x[2:] not in self._settingses] + obj.add_option(*unused_names, + action='callback', + callback=maybe(set_false), + callback_args=(s,), + type=s.type, + help=help_text('', s.hidden)) + + # Add options for every setting. + + for name in self._canonical_names: + s = self._settingses[name] + if s.group is None: + obj = p + else: + obj = option_groups[s.group] + + add_option(obj, s) + if type(s) is BooleanSetting: + add_negation_option(obj, s) + p.set_defaults(**{self._destname(name): s.value}) + + return p + + def parse_args(self, args, parser=None, suppress_errors=False, + configs_only=False, arg_synopsis=None, + cmd_synopsis=None, compute_setting_values=None, + all_options=False, add_help_option=True): + '''Parse the command line. + + Return list of non-option arguments. ``args`` would usually + be ``sys.argv[1:]``. + + ''' + + deferred_last = [] + + p = parser or self.build_parser(configs_only=configs_only, + arg_synopsis=arg_synopsis, + cmd_synopsis=cmd_synopsis, + deferred_last=deferred_last, + all_options=all_options, + add_help_option=add_help_option) + + if suppress_errors: + p.error = lambda msg: sys.exit(1) + + options, args = p.parse_args(args) + if compute_setting_values: # pragma: no cover + compute_setting_values(self) + for callback in deferred_last: # pragma: no cover + callback() + return args + + @property + def _default_config_files(self): + '''Return list of default config files to read. + + The names of the files are dependent on the name of the program, + as set in the progname attribute. + + The files may or may not exist. + + ''' + + configs = [] + + configs.append('/etc/%s.conf' % self.progname) + configs += self._listconfs('/etc/%s' % self.progname) + configs.append(os.path.expanduser('~/.%s.conf' % self.progname)) + configs += self._listconfs( + os.path.expanduser('~/.config/%s' % self.progname)) + + return configs + + def _listconfs(self, dirname, listdir=os.listdir): + '''Return list of pathnames to config files in dirname. + + Config files are expectd to have names ending in '.conf'. + + If dirname does not exist or is not a directory, + return empty list. + + ''' + + if not os.path.isdir(dirname): + return [] + + basenames = listdir(dirname) + basenames.sort(key=lambda s: [ord(c) for c in s]) + return [os.path.join(dirname, x) + for x in basenames + if x.endswith('.conf')] + + def _get_config_files(self): + if self._config_files is None: + self._config_files = self._default_config_files + return self._config_files + + def _set_config_files(self, config_files): + self._config_files = config_files + + config_files = property(_get_config_files, _set_config_files) + + def set_from_raw_string(self, name, raw_string): + '''Set value of a setting from a raw, unparsed string value.''' + s = self._settingses[name] + s.parse_value(raw_string) + return s + + def load_configs(self, open=open): + '''Load all config files in self.config_files. + + Silently ignore files that do not exist. + + ''' + + cp = ConfigParser.ConfigParser() + cp.add_section('config') + + for pathname in self.config_files: + try: + f = open(pathname) + except IOError: + pass + else: + cp.readfp(f) + f.close() + + for name in cp.options('config'): + value = cp.get('config', name) + s = self.set_from_raw_string(name, value) + if hasattr(s, 'using_default_value'): + s.using_default_value = True + + # Remember the ConfigParser for use in as_cp later on. + self._cp = cp + + def _generate_manpage(self, o, os, value, p): # pragma: no cover + template = open(value).read() + generator = ManpageGenerator(template, p, self._arg_synopsis, + self._cmd_synopsis) + sys.stdout.write(generator.format_template()) + sys.exit(0) + + def as_cp(self): + '''Return a ConfigParser instance with current values of settings. + + Any sections outside of ``[config]`` are preserved as is. This + lets the application use those as it wishes, and assign any + meanings it desires to the section names. + + ''' + cp = ConfigParser.ConfigParser() + cp.add_section('config') + for name in self._canonical_names: + cp.set('config', name, self._settingses[name].format()) + + for section in self._cp.sections(): + if section != 'config': + cp.add_section(section) + for option in self._cp.options(section): + value = self._cp.get(section, option) + cp.set(section, option, value) + + return cp + + def dump_config(self, output): # pragma: no cover + cp = self.as_cp() + cp.write(output) + |