summaryrefslogtreecommitdiff
path: root/ceilometer/openstack/common/processutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'ceilometer/openstack/common/processutils.py')
-rw-r--r--ceilometer/openstack/common/processutils.py285
1 files changed, 285 insertions, 0 deletions
diff --git a/ceilometer/openstack/common/processutils.py b/ceilometer/openstack/common/processutils.py
new file mode 100644
index 00000000..ebb3cb74
--- /dev/null
+++ b/ceilometer/openstack/common/processutils.py
@@ -0,0 +1,285 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+System-level utilities and helper functions.
+"""
+
+import errno
+import logging
+import multiprocessing
+import os
+import random
+import shlex
+import signal
+
+from eventlet.green import subprocess
+from eventlet import greenthread
+import six
+
+from ceilometer.openstack.common.gettextutils import _
+from ceilometer.openstack.common import strutils
+
+
+LOG = logging.getLogger(__name__)
+
+
+class InvalidArgumentError(Exception):
+ def __init__(self, message=None):
+ super(InvalidArgumentError, self).__init__(message)
+
+
+class UnknownArgumentError(Exception):
+ def __init__(self, message=None):
+ super(UnknownArgumentError, self).__init__(message)
+
+
+class ProcessExecutionError(Exception):
+ def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
+ description=None):
+ self.exit_code = exit_code
+ self.stderr = stderr
+ self.stdout = stdout
+ self.cmd = cmd
+ self.description = description
+
+ if description is None:
+ description = _("Unexpected error while running command.")
+ if exit_code is None:
+ exit_code = '-'
+ message = _('%(description)s\n'
+ 'Command: %(cmd)s\n'
+ 'Exit code: %(exit_code)s\n'
+ 'Stdout: %(stdout)r\n'
+ 'Stderr: %(stderr)r') % {'description': description,
+ 'cmd': cmd,
+ 'exit_code': exit_code,
+ 'stdout': stdout,
+ 'stderr': stderr}
+ super(ProcessExecutionError, self).__init__(message)
+
+
+class NoRootWrapSpecified(Exception):
+ def __init__(self, message=None):
+ super(NoRootWrapSpecified, self).__init__(message)
+
+
+def _subprocess_setup():
+ # Python installs a SIGPIPE handler by default. This is usually not what
+ # non-Python subprocesses expect.
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+
+def execute(*cmd, **kwargs):
+ """Helper method to shell out and execute a command through subprocess.
+
+ Allows optional retry.
+
+ :param cmd: Passed to subprocess.Popen.
+ :type cmd: string
+ :param process_input: Send to opened process.
+ :type process_input: string
+ :param env_variables: Environment variables and their values that
+ will be set for the process.
+ :type env_variables: dict
+ :param check_exit_code: Single bool, int, or list of allowed exit
+ codes. Defaults to [0]. Raise
+ :class:`ProcessExecutionError` unless
+ program exits with one of these code.
+ :type check_exit_code: boolean, int, or [int]
+ :param delay_on_retry: True | False. Defaults to True. If set to True,
+ wait a short amount of time before retrying.
+ :type delay_on_retry: boolean
+ :param attempts: How many times to retry cmd.
+ :type attempts: int
+ :param run_as_root: True | False. Defaults to False. If set to True,
+ the command is prefixed by the command specified
+ in the root_helper kwarg.
+ :type run_as_root: boolean
+ :param root_helper: command to prefix to commands called with
+ run_as_root=True
+ :type root_helper: string
+ :param shell: whether or not there should be a shell used to
+ execute this command. Defaults to false.
+ :type shell: boolean
+ :param loglevel: log level for execute commands.
+ :type loglevel: int. (Should be logging.DEBUG or logging.INFO)
+ :returns: (stdout, stderr) from process execution
+ :raises: :class:`UnknownArgumentError` on
+ receiving unknown arguments
+ :raises: :class:`ProcessExecutionError`
+ """
+
+ process_input = kwargs.pop('process_input', None)
+ env_variables = kwargs.pop('env_variables', None)
+ check_exit_code = kwargs.pop('check_exit_code', [0])
+ ignore_exit_code = False
+ delay_on_retry = kwargs.pop('delay_on_retry', True)
+ attempts = kwargs.pop('attempts', 1)
+ run_as_root = kwargs.pop('run_as_root', False)
+ root_helper = kwargs.pop('root_helper', '')
+ shell = kwargs.pop('shell', False)
+ loglevel = kwargs.pop('loglevel', logging.DEBUG)
+
+ if isinstance(check_exit_code, bool):
+ ignore_exit_code = not check_exit_code
+ check_exit_code = [0]
+ elif isinstance(check_exit_code, int):
+ check_exit_code = [check_exit_code]
+
+ if kwargs:
+ raise UnknownArgumentError(_('Got unknown keyword args: %r') % kwargs)
+
+ if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0:
+ if not root_helper:
+ raise NoRootWrapSpecified(
+ message=_('Command requested root, but did not '
+ 'specify a root helper.'))
+ cmd = shlex.split(root_helper) + list(cmd)
+
+ cmd = map(str, cmd)
+ sanitized_cmd = strutils.mask_password(' '.join(cmd))
+
+ while attempts > 0:
+ attempts -= 1
+ try:
+ LOG.log(loglevel, _('Running cmd (subprocess): %s'), sanitized_cmd)
+ _PIPE = subprocess.PIPE # pylint: disable=E1101
+
+ if os.name == 'nt':
+ preexec_fn = None
+ close_fds = False
+ else:
+ preexec_fn = _subprocess_setup
+ close_fds = True
+
+ obj = subprocess.Popen(cmd,
+ stdin=_PIPE,
+ stdout=_PIPE,
+ stderr=_PIPE,
+ close_fds=close_fds,
+ preexec_fn=preexec_fn,
+ shell=shell,
+ env=env_variables)
+ result = None
+ for _i in six.moves.range(20):
+ # NOTE(russellb) 20 is an arbitrary number of retries to
+ # prevent any chance of looping forever here.
+ try:
+ if process_input is not None:
+ result = obj.communicate(process_input)
+ else:
+ result = obj.communicate()
+ except OSError as e:
+ if e.errno in (errno.EAGAIN, errno.EINTR):
+ continue
+ raise
+ break
+ obj.stdin.close() # pylint: disable=E1101
+ _returncode = obj.returncode # pylint: disable=E1101
+ LOG.log(loglevel, 'Result was %s' % _returncode)
+ if not ignore_exit_code and _returncode not in check_exit_code:
+ (stdout, stderr) = result
+ sanitized_stdout = strutils.mask_password(stdout)
+ sanitized_stderr = strutils.mask_password(stderr)
+ raise ProcessExecutionError(exit_code=_returncode,
+ stdout=sanitized_stdout,
+ stderr=sanitized_stderr,
+ cmd=sanitized_cmd)
+ return result
+ except ProcessExecutionError:
+ if not attempts:
+ raise
+ else:
+ LOG.log(loglevel, _('%r failed. Retrying.'), sanitized_cmd)
+ if delay_on_retry:
+ greenthread.sleep(random.randint(20, 200) / 100.0)
+ finally:
+ # NOTE(termie): this appears to be necessary to let the subprocess
+ # call clean something up in between calls, without
+ # it two execute calls in a row hangs the second one
+ greenthread.sleep(0)
+
+
+def trycmd(*args, **kwargs):
+ """A wrapper around execute() to more easily handle warnings and errors.
+
+ Returns an (out, err) tuple of strings containing the output of
+ the command's stdout and stderr. If 'err' is not empty then the
+ command can be considered to have failed.
+
+ :discard_warnings True | False. Defaults to False. If set to True,
+ then for succeeding commands, stderr is cleared
+
+ """
+ discard_warnings = kwargs.pop('discard_warnings', False)
+
+ try:
+ out, err = execute(*args, **kwargs)
+ failed = False
+ except ProcessExecutionError as exn:
+ out, err = '', six.text_type(exn)
+ failed = True
+
+ if not failed and discard_warnings and err:
+ # Handle commands that output to stderr but otherwise succeed
+ err = ''
+
+ return out, err
+
+
+def ssh_execute(ssh, cmd, process_input=None,
+ addl_env=None, check_exit_code=True):
+ LOG.debug('Running cmd (SSH): %s', cmd)
+ if addl_env:
+ raise InvalidArgumentError(_('Environment not supported over SSH'))
+
+ if process_input:
+ # This is (probably) fixable if we need it...
+ raise InvalidArgumentError(_('process_input not supported over SSH'))
+
+ stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
+ channel = stdout_stream.channel
+
+ # NOTE(justinsb): This seems suspicious...
+ # ...other SSH clients have buffering issues with this approach
+ stdout = stdout_stream.read()
+ stderr = stderr_stream.read()
+ stdin_stream.close()
+
+ exit_status = channel.recv_exit_status()
+
+ # exit_status == -1 if no exit code was returned
+ if exit_status != -1:
+ LOG.debug('Result was %s' % exit_status)
+ if check_exit_code and exit_status != 0:
+ raise ProcessExecutionError(exit_code=exit_status,
+ stdout=stdout,
+ stderr=stderr,
+ cmd=cmd)
+
+ return (stdout, stderr)
+
+
+def get_worker_count():
+ """Utility to get the default worker count.
+
+ @return: The number of CPUs if that can be determined, else a default
+ worker count of 1 is returned.
+ """
+ try:
+ return multiprocessing.cpu_count()
+ except NotImplementedError:
+ return 1