summaryrefslogtreecommitdiff
path: root/tests/unit/test_utils_subprocess.py
blob: b0de2bf578deaa3fe1856eb3565bbf58f7818ac7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# -*- coding: utf-8 -*-
import locale
import sys
from logging import DEBUG, ERROR, INFO, WARNING
from textwrap import dedent

import pytest

from pip._internal.cli.spinners import SpinnerInterface
from pip._internal.exceptions import InstallationError
from pip._internal.utils.misc import hide_value
from pip._internal.utils.subprocess import (
    call_subprocess,
    format_command_args,
    make_command,
    make_subprocess_output_error,
)


@pytest.mark.parametrize('args, expected', [
    (['pip', 'list'], 'pip list'),
    (['foo', 'space space', 'new\nline', 'double"quote', "single'quote"],
     """foo 'space space' 'new\nline' 'double"quote' 'single'"'"'quote'"""),
    # Test HiddenText arguments.
    (make_command(hide_value('secret1'), 'foo', hide_value('secret2')),
        "'****' foo '****'"),
])
def test_format_command_args(args, expected):
    actual = format_command_args(args)
    assert actual == expected


def test_make_subprocess_output_error():
    cmd_args = ['test', 'has space']
    cwd = '/path/to/cwd'
    lines = ['line1\n', 'line2\n', 'line3\n']
    actual = make_subprocess_output_error(
        cmd_args=cmd_args,
        cwd=cwd,
        lines=lines,
        exit_status=3,
    )
    expected = dedent("""\
    Command errored out with exit status 3:
     command: test 'has space'
         cwd: /path/to/cwd
    Complete output (3 lines):
    line1
    line2
    line3
    ----------------------------------------""")
    assert actual == expected, 'actual: {}'.format(actual)


def test_make_subprocess_output_error__non_ascii_command_arg(monkeypatch):
    """
    Test a command argument with a non-ascii character.
    """
    cmd_args = ['foo', 'déf']
    if sys.version_info[0] == 2:
        # Check in Python 2 that the str (bytes object) with the non-ascii
        # character has the encoding we expect. (This comes from the source
        # code encoding at the top of the file.)
        assert cmd_args[1].decode('utf-8') == u'déf'

    # We need to monkeypatch so the encoding will be correct on Windows.
    monkeypatch.setattr(locale, 'getpreferredencoding', lambda: 'utf-8')
    actual = make_subprocess_output_error(
        cmd_args=cmd_args,
        cwd='/path/to/cwd',
        lines=[],
        exit_status=1,
    )
    expected = dedent(u"""\
    Command errored out with exit status 1:
     command: foo 'déf'
         cwd: /path/to/cwd
    Complete output (0 lines):
    ----------------------------------------""")
    assert actual == expected, u'actual: {}'.format(actual)


@pytest.mark.skipif("sys.version_info < (3,)")
def test_make_subprocess_output_error__non_ascii_cwd_python_3(monkeypatch):
    """
    Test a str (text) cwd with a non-ascii character in Python 3.
    """
    cmd_args = ['test']
    cwd = '/path/to/cwd/déf'
    actual = make_subprocess_output_error(
        cmd_args=cmd_args,
        cwd=cwd,
        lines=[],
        exit_status=1,
    )
    expected = dedent("""\
    Command errored out with exit status 1:
     command: test
         cwd: /path/to/cwd/déf
    Complete output (0 lines):
    ----------------------------------------""")
    assert actual == expected, 'actual: {}'.format(actual)


@pytest.mark.parametrize('encoding', [
    'utf-8',
    # Test a Windows encoding.
    'cp1252',
])
@pytest.mark.skipif("sys.version_info >= (3,)")
def test_make_subprocess_output_error__non_ascii_cwd_python_2(
    monkeypatch, encoding,
):
    """
    Test a str (bytes object) cwd with a non-ascii character in Python 2.
    """
    cmd_args = ['test']
    cwd = u'/path/to/cwd/déf'.encode(encoding)
    monkeypatch.setattr(sys, 'getfilesystemencoding', lambda: encoding)
    actual = make_subprocess_output_error(
        cmd_args=cmd_args,
        cwd=cwd,
        lines=[],
        exit_status=1,
    )
    expected = dedent(u"""\
    Command errored out with exit status 1:
     command: test
         cwd: /path/to/cwd/déf
    Complete output (0 lines):
    ----------------------------------------""")
    assert actual == expected, u'actual: {}'.format(actual)


# This test is mainly important for checking unicode in Python 2.
def test_make_subprocess_output_error__non_ascii_line():
    """
    Test a line with a non-ascii character.
    """
    lines = [u'curly-quote: \u2018\n']
    actual = make_subprocess_output_error(
        cmd_args=['test'],
        cwd='/path/to/cwd',
        lines=lines,
        exit_status=1,
    )
    expected = dedent(u"""\
    Command errored out with exit status 1:
     command: test
         cwd: /path/to/cwd
    Complete output (1 lines):
    curly-quote: \u2018
    ----------------------------------------""")
    assert actual == expected, u'actual: {}'.format(actual)


class FakeSpinner(SpinnerInterface):

    def __init__(self):
        self.spin_count = 0
        self.final_status = None

    def spin(self):
        self.spin_count += 1

    def finish(self, final_status):
        self.final_status = final_status


class TestCallSubprocess(object):

    """
    Test call_subprocess().
    """

    def check_result(
        self, capfd, caplog, log_level, spinner, result, expected,
        expected_spinner,
    ):
        """
        Check the result of calling call_subprocess().

        :param log_level: the logging level that caplog was set to.
        :param spinner: the FakeSpinner object passed to call_subprocess()
            to be checked.
        :param result: the call_subprocess() return value to be checked.
        :param expected: a pair (expected_proc, expected_records), where
            1) `expected_proc` is the expected return value of
              call_subprocess() as a list of lines, or None if the return
              value is expected to be None;
            2) `expected_records` is the expected value of
              caplog.record_tuples.
        :param expected_spinner: a 2-tuple of the spinner's expected
            (spin_count, final_status).
        """
        expected_proc, expected_records = expected

        if expected_proc is None:
            assert result is None
        else:
            assert result.splitlines() == expected_proc

        # Confirm that stdout and stderr haven't been written to.
        captured = capfd.readouterr()
        assert (captured.out, captured.err) == ('', '')

        records = caplog.record_tuples
        if len(records) != len(expected_records):
            raise RuntimeError('{} != {}'.format(records, expected_records))

        for record, expected_record in zip(records, expected_records):
            # Check the logger_name and log level parts exactly.
            assert record[:2] == expected_record[:2]
            # For the message portion, check only a substring.  Also, we
            # can't use startswith() since the order of stdout and stderr
            # isn't guaranteed in cases where stderr is also present.
            # For example, we observed the stderr lines coming before stdout
            # in CI for PyPy 2.7 even though stdout happens first
            # chronologically.
            assert expected_record[2] in record[2]

        assert (spinner.spin_count, spinner.final_status) == expected_spinner

    def prepare_call(self, caplog, log_level, command=None):
        if command is None:
            command = 'print("Hello"); print("world")'

        caplog.set_level(log_level)
        spinner = FakeSpinner()
        args = [sys.executable, '-c', command]

        return (args, spinner)

    def test_debug_logging(self, capfd, caplog):
        """
        Test DEBUG logging (and without passing show_stdout=True).
        """
        log_level = DEBUG
        args, spinner = self.prepare_call(caplog, log_level)
        result = call_subprocess(args, spinner=spinner)

        expected = (['Hello', 'world'], [
            ('pip.subprocessor', DEBUG, 'Running command '),
            ('pip.subprocessor', DEBUG, 'Hello'),
            ('pip.subprocessor', DEBUG, 'world'),
        ])
        # The spinner shouldn't spin in this case since the subprocess
        # output is already being logged to the console.
        self.check_result(
            capfd, caplog, log_level, spinner, result, expected,
            expected_spinner=(0, None),
        )

    def test_info_logging(self, capfd, caplog):
        """
        Test INFO logging (and without passing show_stdout=True).
        """
        log_level = INFO
        args, spinner = self.prepare_call(caplog, log_level)
        result = call_subprocess(args, spinner=spinner)

        expected = (['Hello', 'world'], [])
        # The spinner should spin twice in this case since the subprocess
        # output isn't being written to the console.
        self.check_result(
            capfd, caplog, log_level, spinner, result, expected,
            expected_spinner=(2, 'done'),
        )

    def test_info_logging__subprocess_error(self, capfd, caplog):
        """
        Test INFO logging of a subprocess with an error (and without passing
        show_stdout=True).
        """
        log_level = INFO
        command = 'print("Hello"); print("world"); exit("fail")'
        args, spinner = self.prepare_call(caplog, log_level, command=command)

        with pytest.raises(InstallationError) as exc:
            call_subprocess(args, spinner=spinner)
        result = None
        exc_message = str(exc.value)
        assert exc_message.startswith(
            'Command errored out with exit status 1: '
        )
        assert exc_message.endswith('Check the logs for full command output.')

        expected = (None, [
            ('pip.subprocessor', ERROR, 'Complete output (3 lines):\n'),
        ])
        # The spinner should spin three times in this case since the
        # subprocess output isn't being written to the console.
        self.check_result(
            capfd, caplog, log_level, spinner, result, expected,
            expected_spinner=(3, 'error'),
        )

        # Do some further checking on the captured log records to confirm
        # that the subprocess output was logged.
        last_record = caplog.record_tuples[-1]
        last_message = last_record[2]
        lines = last_message.splitlines()

        # We have to sort before comparing the lines because we can't
        # guarantee the order in which stdout and stderr will appear.
        # For example, we observed the stderr lines coming before stdout
        # in CI for PyPy 2.7 even though stdout happens first chronologically.
        actual = sorted(lines)
        # Test the "command" line separately because we can't test an
        # exact match.
        command_line = actual.pop(1)
        assert actual == [
            '     cwd: None',
            '----------------------------------------',
            'Command errored out with exit status 1:',
            'Complete output (3 lines):',
            'Hello',
            'fail',
            'world',
        ], 'lines: {}'.format(actual)  # Show the full output on failure.

        assert command_line.startswith(' command: ')
        assert command_line.endswith('print("world"); exit("fail")\'')

    def test_info_logging_with_show_stdout_true(self, capfd, caplog):
        """
        Test INFO logging with show_stdout=True.
        """
        log_level = INFO
        args, spinner = self.prepare_call(caplog, log_level)
        result = call_subprocess(args, spinner=spinner, show_stdout=True)

        expected = (['Hello', 'world'], [
            ('pip.subprocessor', INFO, 'Running command '),
            ('pip.subprocessor', INFO, 'Hello'),
            ('pip.subprocessor', INFO, 'world'),
        ])
        # The spinner shouldn't spin in this case since the subprocess
        # output is already being written to the console.
        self.check_result(
            capfd, caplog, log_level, spinner, result, expected,
            expected_spinner=(0, None),
        )

    @pytest.mark.parametrize((
        'exit_status', 'show_stdout', 'extra_ok_returncodes', 'log_level',
        'expected'),
        [
            # The spinner should show here because show_stdout=False means
            # the subprocess should get logged at DEBUG level, but the passed
            # log level is only INFO.
            (0, False, None, INFO, (None, 'done', 2)),
            # Test some cases where the spinner should not be shown.
            (0, False, None, DEBUG, (None, None, 0)),
            # Test show_stdout=True.
            (0, True, None, DEBUG, (None, None, 0)),
            (0, True, None, INFO, (None, None, 0)),
            # The spinner should show here because show_stdout=True means
            # the subprocess should get logged at INFO level, but the passed
            # log level is only WARNING.
            (0, True, None, WARNING, (None, 'done', 2)),
            # Test a non-zero exit status.
            (3, False, None, INFO, (InstallationError, 'error', 2)),
            # Test a non-zero exit status also in extra_ok_returncodes.
            (3, False, (3, ), INFO, (None, 'done', 2)),
    ])
    def test_spinner_finish(
        self, exit_status, show_stdout, extra_ok_returncodes, log_level,
        caplog, expected,
    ):
        """
        Test that the spinner finishes correctly.
        """
        expected_exc_type = expected[0]
        expected_final_status = expected[1]
        expected_spin_count = expected[2]

        command = (
            'print("Hello"); print("world"); exit({})'.format(exit_status)
        )
        args, spinner = self.prepare_call(caplog, log_level, command=command)
        try:
            call_subprocess(
                args,
                show_stdout=show_stdout,
                extra_ok_returncodes=extra_ok_returncodes,
                spinner=spinner,
            )
        except Exception as exc:
            exc_type = type(exc)
        else:
            exc_type = None

        assert exc_type == expected_exc_type
        assert spinner.final_status == expected_final_status
        assert spinner.spin_count == expected_spin_count

    def test_closes_stdin(self):
        with pytest.raises(InstallationError):
            call_subprocess(
                [sys.executable, '-c', 'input()'],
                show_stdout=True,
            )