summaryrefslogtreecommitdiff
path: root/fixtures/_fixtures/popen.py
blob: a099854b5b4b1c46fd122597d960bb0b2fba674a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#  fixtures: Fixtures with cleanups for testing and convenience.
#
# Copyright (c) 2010, 2011, Robert Collins <robertc@robertcollins.net>
# 
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
# license you chose for the specific language governing permissions and
# limitations under that license.

__all__ = [
    'FakePopen',
    'PopenFixture'
    ]

import random
import subprocess
import sys

from fixtures import Fixture


class FakeProcess(object):
    """A test double process, roughly meeting subprocess.Popen's contract."""

    def __init__(self, args, info):
        self._args = args
        self.stdin = info.get('stdin')
        self.stdout = info.get('stdout')
        self.stderr = info.get('stderr')
        self.pid = random.randint(0, 65536)
        self._returncode = info.get('returncode', 0)
        self.returncode = None

    @property
    def args(self):
        return self._args["args"]

    def poll(self):
        """Get the current value of FakeProcess.returncode.

        The returncode is None before communicate() and/or wait() are called,
        and it's set to the value provided by the 'info' dictionary otherwise
        (or 0 in case 'info' doesn't specify a value).
        """
        return self.returncode

    def communicate(self, input=None, timeout=None):
        self.returncode = self._returncode
        if self.stdin and input:
            self.stdin.write(input)
        if self.stdout:
            out = self.stdout.getvalue()
        else:
            out = ''
        if self.stderr:
            err = self.stderr.getvalue()
        else:
            err = ''
        return out, err

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.wait()

    def kill(self):
        pass

    def wait(self, timeout=None, endtime=None):
        if self.returncode is None:
            self.communicate()
        return self.returncode


class FakePopen(Fixture):
    """Replace subprocess.Popen.

    Primarily useful for testing, this fixture replaces subprocess.Popen with a
    test double.

    :ivar procs: A list of the processes created by the fixture.
    """

    _unpassed = object()

    def __init__(self, get_info=lambda _:{}):
        """Create a PopenFixture

        :param get_info: Optional callback to control the behaviour of the
            created process. This callback takes a kwargs dict for the Popen
            call, and should return a dict with any desired attributes.
            Only parameters that are supplied to the Popen call are in the
            dict, making it possible to detect the difference between 'passed
            with a default value' and 'not passed at all'.

            e.g. 
            def get_info(proc_args):
                self.assertEqual(subprocess.PIPE, proc_args['stdin'])
                return {'stdin': StringIO('foobar')}

            The default behaviour if no get_info is supplied is for the return
            process to have returncode of None, empty streams and a random pid.

            After communicate() or wait() are called on the process object,
            the returncode is set to whatever get_info returns (or 0 if
            get_info is not supplied or doesn't return a dict with an explicit
            'returncode' key).
        """
        super(FakePopen, self).__init__()
        self.get_info = get_info

    def _setUp(self):
        self.addCleanup(setattr, subprocess, 'Popen', subprocess.Popen)
        subprocess.Popen = self
        self.procs = []

    # The method has the correct signature so we error appropriately if called
    # wrongly.
    def __call__(self, args, bufsize=_unpassed, executable=_unpassed,
        stdin=_unpassed, stdout=_unpassed, stderr=_unpassed,
        preexec_fn=_unpassed, close_fds=_unpassed, shell=_unpassed,
        cwd=_unpassed, env=_unpassed, universal_newlines=_unpassed,
        startupinfo=_unpassed, creationflags=_unpassed,
        restore_signals=_unpassed, start_new_session=_unpassed,
        pass_fds=_unpassed, *, group=_unpassed, extra_groups=_unpassed,
        user=_unpassed, umask=_unpassed, encoding=_unpassed,
        errors=_unpassed, text=_unpassed, pipesize=_unpassed,
        process_group=_unpassed):
        # Reject arguments introduced by newer versions of Python in older
        # versions; this makes it harder to accidentally hide compatibility
        # problems using test doubles.
        if sys.version_info < (3, 7) and text is not FakePopen._unpassed:
            raise TypeError(
                "FakePopen.__call__() got an unexpected keyword argument "
                "'text'")
        if sys.version_info < (3, 9):
            for arg_name in "group", "extra_groups", "user", "umask":
                if locals()[arg_name] is not FakePopen._unpassed:
                    raise TypeError(
                        "FakePopen.__call__() got an unexpected keyword "
                        "argument '{}'".format(arg_name))
        if sys.version_info < (3, 10) and pipesize is not FakePopen._unpassed:
            raise TypeError(
                "FakePopen.__call__() got an unexpected keyword argument "
                "'pipesize'")
        if sys.version_info < (3, 11) and process_group is not FakePopen._unpassed:
            raise TypeError(
                "FakePopen.__call__() got an unexpected keyword argument "
                "'process_group'")

        proc_args = dict(args=args)
        local = locals()
        for param in [
            "bufsize", "executable", "stdin", "stdout", "stderr",
            "preexec_fn", "close_fds", "shell", "cwd", "env",
            "universal_newlines", "startupinfo", "creationflags",
            "restore_signals", "start_new_session", "pass_fds", "group",
            "extra_groups", "user", "umask", "encoding", "errors", "text",
            "pipesize", "process_group"]:
            if local[param] is not FakePopen._unpassed:
                proc_args[param] = local[param]
        proc_info = self.get_info(proc_args)
        result = FakeProcess(proc_args, proc_info)
        self.procs.append(result)
        return result


PopenFixture = FakePopen