summaryrefslogtreecommitdiff
path: root/paste/cgiapp.py
blob: e5a62f451d7efd90fd047e5cffd5035a18df6e89 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php

"""
Application that runs a CGI script.
"""
import os
import sys
import subprocess
from six.moves.urllib.parse import quote
try:
    import select
except ImportError:
    select = None
import six

from paste.util import converters

__all__ = ['CGIError', 'CGIApplication']

class CGIError(Exception):
    """
    Raised when the CGI script can't be found or doesn't
    act like a proper CGI script.
    """

class CGIApplication(object):

    """
    This object acts as a proxy to a CGI application.  You pass in the
    script path (``script``), an optional path to search for the
    script (if the name isn't absolute) (``path``).  If you don't give
    a path, then ``$PATH`` will be used.
    """

    def __init__(self,
                 global_conf,
                 script,
                 path=None,
                 include_os_environ=True,
                 query_string=None):
        if global_conf:
            raise NotImplemented(
                "global_conf is no longer supported for CGIApplication "
                "(use make_cgi_application); please pass None instead")
        self.script_filename = script
        if path is None:
            path = os.environ.get('PATH', '').split(':')
        self.path = path
        if '?' in script:
            assert query_string is None, (
                "You cannot have '?' in your script name (%r) and also "
                "give a query_string (%r)" % (script, query_string))
            script, query_string = script.split('?', 1)
        if os.path.abspath(script) != script:
            # relative path
            for path_dir in self.path:
                if os.path.exists(os.path.join(path_dir, script)):
                    self.script = os.path.join(path_dir, script)
                    break
            else:
                raise CGIError(
                    "Script %r not found in path %r"
                    % (script, self.path))
        else:
            self.script = script
        self.include_os_environ = include_os_environ
        self.query_string = query_string

    def __call__(self, environ, start_response):
        if 'REQUEST_URI' not in environ:
            environ['REQUEST_URI'] = (
                quote(environ.get('SCRIPT_NAME', ''))
                + quote(environ.get('PATH_INFO', '')))
        if self.include_os_environ:
            cgi_environ = os.environ.copy()
        else:
            cgi_environ = {}
        for name in environ:
            # Should unicode values be encoded?
            if (name.upper() == name
                and isinstance(environ[name], str)):
                cgi_environ[name] = environ[name]
        if self.query_string is not None:
            old = cgi_environ.get('QUERY_STRING', '')
            if old:
                old += '&'
            cgi_environ['QUERY_STRING'] = old + self.query_string
        cgi_environ['SCRIPT_FILENAME'] = self.script
        proc = subprocess.Popen(
            [self.script],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            env=cgi_environ,
            cwd=os.path.dirname(self.script),
            )
        writer = CGIWriter(environ, start_response)
        if select and sys.platform != 'win32':
            proc_communicate(
                proc,
                stdin=StdinReader.from_environ(environ),
                stdout=writer,
                stderr=environ['wsgi.errors'])
        else:
            stdout, stderr = proc.communicate(StdinReader.from_environ(environ).read())
            if stderr:
                environ['wsgi.errors'].write(stderr)
            writer.write(stdout)
        if not writer.headers_finished:
            start_response(writer.status, writer.headers)
        return []

class CGIWriter(object):

    def __init__(self, environ, start_response):
        self.environ = environ
        self.start_response = start_response
        self.status = '200 OK'
        self.headers = []
        self.headers_finished = False
        self.writer = None
        self.buffer = b''

    def write(self, data):
        if self.headers_finished:
            self.writer(data)
            return
        self.buffer += data
        while b'\n' in self.buffer:
            if b'\r\n' in self.buffer and self.buffer.find(b'\r\n') < self.buffer.find(b'\n'):
                line1, self.buffer = self.buffer.split(b'\r\n', 1)
            else:
                line1, self.buffer = self.buffer.split(b'\n', 1)
            if not line1:
                self.headers_finished = True
                self.writer = self.start_response(
                    self.status, self.headers)
                self.writer(self.buffer)
                del self.buffer
                del self.headers
                del self.status
                break
            elif b':' not in line1:
                raise CGIError(
                    "Bad header line: %r" % line1)
            else:
                name, value = line1.split(b':', 1)
                value = value.lstrip()
                name = name.strip()
                if six.PY3:
                    name = name.decode('utf8')
                    value = value.decode('utf8')
                if name.lower() == 'status':
                    if ' ' not in value:
                        # WSGI requires this space, sometimes CGI scripts don't set it:
                        value = '%s General' % value
                    self.status = value
                else:
                    self.headers.append((name, value))

class StdinReader(object):

    def __init__(self, stdin, content_length):
        self.stdin = stdin
        self.content_length = content_length

    @classmethod
    def from_environ(cls, environ):
        length = environ.get('CONTENT_LENGTH')
        if length:
            length = int(length)
        else:
            length = 0
        return cls(environ['wsgi.input'], length)

    def read(self, size=None):
        if not self.content_length:
            return b''
        if size is None:
            text = self.stdin.read(self.content_length)
        else:
            text = self.stdin.read(min(self.content_length, size))
        self.content_length -= len(text)
        return text

def proc_communicate(proc, stdin=None, stdout=None, stderr=None):
    """
    Run the given process, piping input/output/errors to the given
    file-like objects (which need not be actual file objects, unlike
    the arguments passed to Popen).  Wait for process to terminate.

    Note: this is taken from the posix version of
    subprocess.Popen.communicate, but made more general through the
    use of file-like objects.
    """
    read_set = []
    write_set = []
    input_buffer = b''
    trans_nl = proc.universal_newlines and hasattr(open, 'newlines')

    if proc.stdin:
        # Flush stdio buffer.  This might block, if the user has
        # been writing to .stdin in an uncontrolled fashion.
        proc.stdin.flush()
        if input:
            write_set.append(proc.stdin)
        else:
            proc.stdin.close()
    else:
        assert stdin is None
    if proc.stdout:
        read_set.append(proc.stdout)
    else:
        assert stdout is None
    if proc.stderr:
        read_set.append(proc.stderr)
    else:
        assert stderr is None

    while read_set or write_set:
        rlist, wlist, xlist = select.select(read_set, write_set, [])

        if proc.stdin in wlist:
            # When select has indicated that the file is writable,
            # we can write up to PIPE_BUF bytes without risk
            # blocking.  POSIX defines PIPE_BUF >= 512
            next, input_buffer = input_buffer, b''
            next_len = 512-len(next)
            if next_len:
                next += stdin.read(next_len)
            if not next:
                proc.stdin.close()
                write_set.remove(proc.stdin)
            else:
                bytes_written = os.write(proc.stdin.fileno(), next)
                if bytes_written < len(next):
                    input_buffer = next[bytes_written:]

        if proc.stdout in rlist:
            data = os.read(proc.stdout.fileno(), 1024)
            if data == b"":
                proc.stdout.close()
                read_set.remove(proc.stdout)
            if trans_nl:
                data = proc._translate_newlines(data)
            stdout.write(data)

        if proc.stderr in rlist:
            data = os.read(proc.stderr.fileno(), 1024)
            if data == b"":
                proc.stderr.close()
                read_set.remove(proc.stderr)
            if trans_nl:
                data = proc._translate_newlines(data)
            stderr.write(data)

    try:
        proc.wait()
    except OSError as e:
        if e.errno != 10:
            raise

def make_cgi_application(global_conf, script, path=None, include_os_environ=None,
                         query_string=None):
    """
    Paste Deploy interface for :class:`CGIApplication`

    This object acts as a proxy to a CGI application.  You pass in the
    script path (``script``), an optional path to search for the
    script (if the name isn't absolute) (``path``).  If you don't give
    a path, then ``$PATH`` will be used.
    """
    if path is None:
        path = global_conf.get('path') or global_conf.get('PATH')
    include_os_environ = converters.asbool(include_os_environ)
    return CGIApplication(
        None,
        script, path=path, include_os_environ=include_os_environ,
        query_string=query_string)