summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--heat_cfntools/cfntools/cfn_helper.py19
-rw-r--r--heat_cfntools/tests/test_cfn_helper.py56
2 files changed, 65 insertions, 10 deletions
diff --git a/heat_cfntools/cfntools/cfn_helper.py b/heat_cfntools/cfntools/cfn_helper.py
index 7e75cf9..2351c1d 100644
--- a/heat_cfntools/cfntools/cfn_helper.py
+++ b/heat_cfntools/cfntools/cfn_helper.py
@@ -180,7 +180,23 @@ class CommandRunner(object):
self
"""
LOG.debug("Running command: %s" % self._command)
- cmd = ['su', user, '-c', self._command]
+
+ cmd = self._command
+ if isinstance(cmd, str):
+ cmd = cmd.split()
+
+ if user != 'root':
+ # lower the privileges
+ try:
+ real = pwd.getpwnam(user)
+ os.setuid(real.pw_uid)
+ except Exception as e:
+ LOG.error("Error setting privileges for user '%s': %s"
+ % (user, e))
+ # 126: command invoked cannot be executed
+ self._status = 126
+ return
+
subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, cwd=cwd, env=env)
output = subproc.communicate()
@@ -560,7 +576,6 @@ class PackagesHandler(object):
* if a version array is supplied, choose the highest version from the
array and follow same logic for version string above
"""
-
cmd = CommandRunner("which yum").run()
if cmd.status == 1:
# yum not available, use DNF if available
diff --git a/heat_cfntools/tests/test_cfn_helper.py b/heat_cfntools/tests/test_cfn_helper.py
index 59020e2..15641bc 100644
--- a/heat_cfntools/tests/test_cfn_helper.py
+++ b/heat_cfntools/tests/test_cfn_helper.py
@@ -29,7 +29,7 @@ from heat_cfntools.cfntools import cfn_helper
def popen_root_calls(calls):
kwargs = {'env': None, 'cwd': None, 'stderr': -1, 'stdout': -1}
return [
- mock.call(['su', 'root', '-c', call], **kwargs)
+ mock.call(call.split(), **kwargs)
for call in calls
]
@@ -51,9 +51,9 @@ class TestCommandRunner(testtools.TestCase):
def test_command_runner(self):
def returns(*args, **kwargs):
- if args[0][3] == '/bin/command1':
+ if args[0][0] == '/bin/command1':
return FakePOpen('All good')
- elif args[0][3] == '/bin/command2':
+ elif args[0][0] == '/bin/command2':
return FakePOpen('Doing something', 'error', -1)
else:
raise Exception('This should never happen')
@@ -73,13 +73,53 @@ class TestCommandRunner(testtools.TestCase):
calls = popen_root_calls(['/bin/command1', '/bin/command2'])
mock_popen.assert_has_calls(calls)
+ def test_runner_runs_command_as_list(self):
+ with mock.patch('subprocess.Popen') as mock_popen:
+ command = '/bin/command --option=value arg1 arg2'
+ expected_cmd = ['/bin/command', '--option=value', 'arg1', 'arg2']
+ cmd = cfn_helper.CommandRunner(command)
+ cmd.run()
+ self.assertTrue(mock_popen.called)
+ actual_cmd = mock_popen.call_args[0][0]
+ self.assertTrue(isinstance(actual_cmd, list))
+ self.assertItemsEqual(expected_cmd, actual_cmd)
+
+ @mock.patch.object(cfn_helper.pwd, 'getpwnam')
+ @mock.patch.object(cfn_helper.os, 'setuid')
+ def test_privileges_are_lowered_for_non_root_user(self, mock_setuid,
+ mock_getpwnam):
+ pw_entry = mock.MagicMock()
+ pw_entry.pw_uid = 1001
+ mock_getpwnam.return_value = pw_entry
+ with mock.patch('subprocess.Popen') as mock_popen:
+ command = '/bin/command --option=value arg1 arg2'
+ cmd = cfn_helper.CommandRunner(command)
+ cmd.run(user='nonroot')
+ mock_getpwnam.assert_called_once_with('nonroot')
+ mock_setuid.assert_called_once_with(1001)
+ self.assertTrue(mock_popen.called)
+
+ @mock.patch.object(cfn_helper.pwd, 'getpwnam')
+ @mock.patch.object(cfn_helper.os, 'setuid')
+ def test_run_returns_when_cannot_set_privileges(self, mock_setuid,
+ mock_getpwnam):
+ mock_setuid.side_effect = Exception('[Error 1] Permission Denied')
+ with mock.patch('subprocess.Popen') as mock_popen:
+ command = '/bin/command2'
+ cmd = cfn_helper.CommandRunner(command)
+ cmd.run(user='nonroot')
+ self.assertTrue(mock_getpwnam.called)
+ self.assertTrue(mock_setuid.called)
+ self.assertFalse(mock_popen.called)
+ self.assertEqual(126, cmd.status)
+
class TestPackages(testtools.TestCase):
def test_yum_install(self):
def returns(*args, **kwargs):
- if args[0][3].startswith('rpm -q '):
+ if (args[0][0] == 'rpm' and args[0][1] == '-q'):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
@@ -106,8 +146,8 @@ class TestPackages(testtools.TestCase):
def test_dnf_install_yum_unavailable(self):
def returns(*args, **kwargs):
- if args[0][3].startswith('rpm -q ') \
- or args[0][3] == 'which yum':
+ if ((args[0][0] == 'rpm' and args[0][1] == '-q')
+ or (args[0][0] == 'which' and args[0][1] == 'yum')):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
@@ -134,7 +174,7 @@ class TestPackages(testtools.TestCase):
def test_dnf_install(self):
def returns(*args, **kwargs):
- if args[0][3].startswith('rpm -q '):
+ if (args[0][0] == 'rpm' and args[0][1] == '-q'):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
@@ -161,7 +201,7 @@ class TestPackages(testtools.TestCase):
def test_zypper_install(self):
def returns(*args, **kwargs):
- if args[0][3].startswith('rpm -q '):
+ if (args[0][0] == 'rpm' and args[0][1] == '-q'):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)