summaryrefslogtreecommitdiff
path: root/fixtures/tests/_fixtures/test_popen.py
blob: e9ab0741492392468787c957a6eb48fe66912909 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#  fixtures: Fixtures with cleanups for testing and convenience.
#
# Copyright (c) 2010, 2011, Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
# license you chose for the specific language governing permissions and
# limitations under that license.

import inspect
import io
import subprocess
import sys

import testtools

from fixtures import FakePopen, TestWithFixtures
from fixtures._fixtures.popen import FakeProcess


class TestFakePopen(testtools.TestCase, TestWithFixtures):

    def test_installs_restores_global(self):
        fixture = FakePopen()
        popen = subprocess.Popen
        fixture.setUp()
        try:
            self.assertEqual(subprocess.Popen, fixture)
        finally:
            fixture.cleanUp()
            self.assertEqual(subprocess.Popen, popen)

    def test___call___is_recorded(self):
        fixture = self.useFixture(FakePopen())
        proc = fixture(['foo', 'bar'], 1, None, 'in', 'out', 'err')
        self.assertEqual(1, len(fixture.procs))
        self.assertEqual(
            dict(
                args=['foo', 'bar'], bufsize=1, executable=None,
                stdin='in', stdout='out', stderr='err',
            ),
            proc._args,
        )

    def test_inject_content_stdout(self):
        def get_info(args):
            return {'stdout': 'stdout'}
        fixture = self.useFixture(FakePopen(get_info))
        proc = fixture(['foo'])
        self.assertEqual('stdout', proc.stdout)

    def test_handles_all_Popen_args(self):
        all_args = dict(
            args="args", bufsize="bufsize", executable="executable",
            stdin="stdin", stdout="stdout", stderr="stderr",
            preexec_fn="preexec_fn", close_fds="close_fds", shell="shell",
            cwd="cwd", env="env", universal_newlines="universal_newlines",
            startupinfo="startupinfo", creationflags="creationflags",
            restore_signals="restore_signals",
            start_new_session="start_new_session", pass_fds="pass_fds",
            encoding="encoding", errors="errors")
        if sys.version_info >= (3, 7):
            all_args["text"] = "text"
        if sys.version_info >= (3, 9):
            all_args["group"] = "group"
            all_args["extra_groups"] = "extra_groups"
            all_args["user"] = "user"
            all_args["umask"] = "umask"
        if sys.version_info >= (3, 10):
            all_args["pipesize"] = "pipesize"
        if sys.version_info >= (3, 11):
            all_args["process_group"] = "process_group"

        def get_info(proc_args):
            self.assertEqual(all_args, proc_args)
            return {}

        fixture = self.useFixture(FakePopen(get_info))
        fixture(**all_args)

    @testtools.skipUnless(
        sys.version_info < (3, 7), "only relevant on Python <3.7")
    def test_rejects_3_7_args_on_older_versions(self):
        fixture = self.useFixture(FakePopen(lambda proc_args: {}))
        with testtools.ExpectedException(
                TypeError, r".* got an unexpected keyword argument 'text'"):
            fixture(args="args", text=True)

    @testtools.skipUnless(
        sys.version_info < (3, 9), "only relevant on Python <3.9")
    def test_rejects_3_9_args_on_older_versions(self):
        fixture = self.useFixture(FakePopen(lambda proc_args: {}))
        for arg_name in ("group", "extra_groups", "user", "umask"):
            kwargs = {arg_name: arg_name}
            expected_message = (
                r".* got an unexpected keyword argument '{}'".format(arg_name))
            with testtools.ExpectedException(TypeError, expected_message):
                fixture(args="args", **kwargs)

    @testtools.skipUnless(
        sys.version_info < (3, 10), "only relevant on Python <3.10")
    def test_rejects_3_10_args_on_older_versions(self):
        fixture = self.useFixture(FakePopen(lambda proc_args: {}))
        with testtools.ExpectedException(
                TypeError,
                r".* got an unexpected keyword argument 'pipesize'"):
            fixture(args="args", pipesize=1024)

    @testtools.skipUnless(
        sys.version_info < (3, 11), "only relevant on Python <3.11")
    def test_rejects_3_11_args_on_older_versions(self):
        fixture = self.useFixture(FakePopen(lambda proc_args: {}))
        with testtools.ExpectedException(
                TypeError,
                r".* got an unexpected keyword argument 'process_group'"):
            fixture(args="args", process_group=42)

    def test_function_signature(self):
        fake_signature = inspect.getfullargspec(FakePopen.__call__)
        real_signature = inspect.getfullargspec(subprocess.Popen)

        # compare args

        fake_args = fake_signature.args
        real_args = real_signature.args

        self.assertListEqual(
            fake_args,
            real_args,
            "Function signature of FakePopen doesn't match subprocess.Popen",
        )

        # compare kw-only args; order doesn't matter so we use a set

        fake_kwargs = set(fake_signature.kwonlyargs)
        real_kwargs = set(real_signature.kwonlyargs)

        if sys.version_info < (3, 11):
            fake_kwargs.remove('process_group')

        if sys.version_info < (3, 10):
            fake_kwargs.remove('pipesize')

        if sys.version_info < (3, 9):
            fake_kwargs.remove('group')
            fake_kwargs.remove('extra_groups')
            fake_kwargs.remove('user')
            fake_kwargs.remove('umask')

        if sys.version_info < (3, 7):
            fake_kwargs.remove('text')

        self.assertSetEqual(
            fake_kwargs,
            real_kwargs,
            "Function signature of FakePopen doesn't match subprocess.Popen",
        )

    def test_custom_returncode(self):
        def get_info(proc_args):
            return dict(returncode=1)
        proc = self.useFixture(FakePopen(get_info))(['foo'])
        self.assertEqual(None, proc.returncode)
        self.assertEqual(1, proc.wait())
        self.assertEqual(1, proc.returncode)

    def test_with_popen_custom(self):
        self.useFixture(FakePopen())
        with subprocess.Popen(['ls -lh']) as proc:
            self.assertEqual(None, proc.returncode)
            self.assertEqual(['ls -lh'], proc.args)


class TestFakeProcess(testtools.TestCase):

    def test_wait(self):
        proc = FakeProcess({}, {})
        proc.returncode = 45
        self.assertEqual(45, proc.wait())

    def test_communicate(self):
        proc = FakeProcess({}, {})
        self.assertEqual(('', ''), proc.communicate())
        self.assertEqual(0, proc.returncode)

    def test_communicate_with_out(self):
        proc = FakeProcess({}, {'stdout': io.BytesIO(b'foo')})
        self.assertEqual((b'foo', ''), proc.communicate())
        self.assertEqual(0, proc.returncode)

    def test_communicate_with_input(self):
        proc = FakeProcess({}, {'stdout': io.BytesIO(b'foo')})
        self.assertEqual((b'foo', ''), proc.communicate(input=b'bar'))

    def test_communicate_with_input_and_stdin(self):
        stdin = io.BytesIO()
        proc = FakeProcess({}, {'stdin': stdin})
        proc.communicate(input=b'hello')
        self.assertEqual(b'hello', stdin.getvalue())

    def test_communicate_with_timeout(self):
        proc = FakeProcess({}, {'stdout': io.BytesIO(b'foo')})
        self.assertEqual((b'foo', ''), proc.communicate(timeout=10))

    def test_args(self):
        proc = FakeProcess({"args": ["ls", "-lh"]}, {})
        proc.returncode = 45
        self.assertEqual(45, proc.wait())
        self.assertEqual(proc.args, ["ls", "-lh"])

    def test_kill(self):
        proc = FakeProcess({}, {})
        self.assertIs(None, proc.kill())

    def test_poll(self):
        proc = FakeProcess({}, {})
        self.assertIs(None, proc.poll())
        proc.communicate()
        self.assertEqual(0, proc.poll())

    def test_poll_with_returncode(self):
        proc = FakeProcess({}, {})
        proc.communicate()
        self.assertEqual(0, proc.poll())

    def test_wait_with_timeout_and_endtime(self):
        proc = FakeProcess({}, {})
        self.assertEqual(0, proc.wait(timeout=4, endtime=7))