diff options
-rw-r--r-- | heat_cfntools/cfntools/cfn_helper.py | 19 | ||||
-rw-r--r-- | heat_cfntools/tests/test_cfn_helper.py | 56 |
2 files changed, 65 insertions, 10 deletions
diff --git a/heat_cfntools/cfntools/cfn_helper.py b/heat_cfntools/cfntools/cfn_helper.py index 7e75cf9..2351c1d 100644 --- a/heat_cfntools/cfntools/cfn_helper.py +++ b/heat_cfntools/cfntools/cfn_helper.py @@ -180,7 +180,23 @@ class CommandRunner(object): self """ LOG.debug("Running command: %s" % self._command) - cmd = ['su', user, '-c', self._command] + + cmd = self._command + if isinstance(cmd, str): + cmd = cmd.split() + + if user != 'root': + # lower the privileges + try: + real = pwd.getpwnam(user) + os.setuid(real.pw_uid) + except Exception as e: + LOG.error("Error setting privileges for user '%s': %s" + % (user, e)) + # 126: command invoked cannot be executed + self._status = 126 + return + subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd, env=env) output = subproc.communicate() @@ -560,7 +576,6 @@ class PackagesHandler(object): * if a version array is supplied, choose the highest version from the array and follow same logic for version string above """ - cmd = CommandRunner("which yum").run() if cmd.status == 1: # yum not available, use DNF if available diff --git a/heat_cfntools/tests/test_cfn_helper.py b/heat_cfntools/tests/test_cfn_helper.py index 59020e2..15641bc 100644 --- a/heat_cfntools/tests/test_cfn_helper.py +++ b/heat_cfntools/tests/test_cfn_helper.py @@ -29,7 +29,7 @@ from heat_cfntools.cfntools import cfn_helper def popen_root_calls(calls): kwargs = {'env': None, 'cwd': None, 'stderr': -1, 'stdout': -1} return [ - mock.call(['su', 'root', '-c', call], **kwargs) + mock.call(call.split(), **kwargs) for call in calls ] @@ -51,9 +51,9 @@ class TestCommandRunner(testtools.TestCase): def test_command_runner(self): def returns(*args, **kwargs): - if args[0][3] == '/bin/command1': + if args[0][0] == '/bin/command1': return FakePOpen('All good') - elif args[0][3] == '/bin/command2': + elif args[0][0] == '/bin/command2': return FakePOpen('Doing something', 'error', -1) else: raise Exception('This should never happen') @@ -73,13 +73,53 @@ class TestCommandRunner(testtools.TestCase): calls = popen_root_calls(['/bin/command1', '/bin/command2']) mock_popen.assert_has_calls(calls) + def test_runner_runs_command_as_list(self): + with mock.patch('subprocess.Popen') as mock_popen: + command = '/bin/command --option=value arg1 arg2' + expected_cmd = ['/bin/command', '--option=value', 'arg1', 'arg2'] + cmd = cfn_helper.CommandRunner(command) + cmd.run() + self.assertTrue(mock_popen.called) + actual_cmd = mock_popen.call_args[0][0] + self.assertTrue(isinstance(actual_cmd, list)) + self.assertItemsEqual(expected_cmd, actual_cmd) + + @mock.patch.object(cfn_helper.pwd, 'getpwnam') + @mock.patch.object(cfn_helper.os, 'setuid') + def test_privileges_are_lowered_for_non_root_user(self, mock_setuid, + mock_getpwnam): + pw_entry = mock.MagicMock() + pw_entry.pw_uid = 1001 + mock_getpwnam.return_value = pw_entry + with mock.patch('subprocess.Popen') as mock_popen: + command = '/bin/command --option=value arg1 arg2' + cmd = cfn_helper.CommandRunner(command) + cmd.run(user='nonroot') + mock_getpwnam.assert_called_once_with('nonroot') + mock_setuid.assert_called_once_with(1001) + self.assertTrue(mock_popen.called) + + @mock.patch.object(cfn_helper.pwd, 'getpwnam') + @mock.patch.object(cfn_helper.os, 'setuid') + def test_run_returns_when_cannot_set_privileges(self, mock_setuid, + mock_getpwnam): + mock_setuid.side_effect = Exception('[Error 1] Permission Denied') + with mock.patch('subprocess.Popen') as mock_popen: + command = '/bin/command2' + cmd = cfn_helper.CommandRunner(command) + cmd.run(user='nonroot') + self.assertTrue(mock_getpwnam.called) + self.assertTrue(mock_setuid.called) + self.assertFalse(mock_popen.called) + self.assertEqual(126, cmd.status) + class TestPackages(testtools.TestCase): def test_yum_install(self): def returns(*args, **kwargs): - if args[0][3].startswith('rpm -q '): + if (args[0][0] == 'rpm' and args[0][1] == '-q'): return FakePOpen(returncode=1) else: return FakePOpen(returncode=0) @@ -106,8 +146,8 @@ class TestPackages(testtools.TestCase): def test_dnf_install_yum_unavailable(self): def returns(*args, **kwargs): - if args[0][3].startswith('rpm -q ') \ - or args[0][3] == 'which yum': + if ((args[0][0] == 'rpm' and args[0][1] == '-q') + or (args[0][0] == 'which' and args[0][1] == 'yum')): return FakePOpen(returncode=1) else: return FakePOpen(returncode=0) @@ -134,7 +174,7 @@ class TestPackages(testtools.TestCase): def test_dnf_install(self): def returns(*args, **kwargs): - if args[0][3].startswith('rpm -q '): + if (args[0][0] == 'rpm' and args[0][1] == '-q'): return FakePOpen(returncode=1) else: return FakePOpen(returncode=0) @@ -161,7 +201,7 @@ class TestPackages(testtools.TestCase): def test_zypper_install(self): def returns(*args, **kwargs): - if args[0][3].startswith('rpm -q '): + if (args[0][0] == 'rpm' and args[0][1] == '-q'): return FakePOpen(returncode=1) else: return FakePOpen(returncode=0) |