# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php """ Application that runs a CGI script. """ import os import sys import subprocess from six.moves.urllib.parse import quote try: import select except ImportError: select = None import six from paste.util import converters __all__ = ['CGIError', 'CGIApplication'] class CGIError(Exception): """ Raised when the CGI script can't be found or doesn't act like a proper CGI script. """ class CGIApplication(object): """ This object acts as a proxy to a CGI application. You pass in the script path (``script``), an optional path to search for the script (if the name isn't absolute) (``path``). If you don't give a path, then ``$PATH`` will be used. """ def __init__(self, global_conf, script, path=None, include_os_environ=True, query_string=None): if global_conf: raise NotImplemented( "global_conf is no longer supported for CGIApplication " "(use make_cgi_application); please pass None instead") self.script_filename = script if path is None: path = os.environ.get('PATH', '').split(':') self.path = path if '?' in script: assert query_string is None, ( "You cannot have '?' in your script name (%r) and also " "give a query_string (%r)" % (script, query_string)) script, query_string = script.split('?', 1) if os.path.abspath(script) != script: # relative path for path_dir in self.path: if os.path.exists(os.path.join(path_dir, script)): self.script = os.path.join(path_dir, script) break else: raise CGIError( "Script %r not found in path %r" % (script, self.path)) else: self.script = script self.include_os_environ = include_os_environ self.query_string = query_string def __call__(self, environ, start_response): if 'REQUEST_URI' not in environ: environ['REQUEST_URI'] = ( quote(environ.get('SCRIPT_NAME', '')) + quote(environ.get('PATH_INFO', ''))) if self.include_os_environ: cgi_environ = os.environ.copy() else: cgi_environ = {} for name in environ: # Should unicode values be encoded? if (name.upper() == name and isinstance(environ[name], str)): cgi_environ[name] = environ[name] if self.query_string is not None: old = cgi_environ.get('QUERY_STRING', '') if old: old += '&' cgi_environ['QUERY_STRING'] = old + self.query_string cgi_environ['SCRIPT_FILENAME'] = self.script proc = subprocess.Popen( [self.script], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=cgi_environ, cwd=os.path.dirname(self.script), ) writer = CGIWriter(environ, start_response) if select and sys.platform != 'win32': proc_communicate( proc, stdin=StdinReader.from_environ(environ), stdout=writer, stderr=environ['wsgi.errors']) else: stdout, stderr = proc.communicate(StdinReader.from_environ(environ).read()) if stderr: environ['wsgi.errors'].write(stderr) writer.write(stdout) if not writer.headers_finished: start_response(writer.status, writer.headers) return [] class CGIWriter(object): def __init__(self, environ, start_response): self.environ = environ self.start_response = start_response self.status = '200 OK' self.headers = [] self.headers_finished = False self.writer = None self.buffer = b'' def write(self, data): if self.headers_finished: self.writer(data) return self.buffer += data while b'\n' in self.buffer: if b'\r\n' in self.buffer and self.buffer.find(b'\r\n') < self.buffer.find(b'\n'): line1, self.buffer = self.buffer.split(b'\r\n', 1) else: line1, self.buffer = self.buffer.split(b'\n', 1) if not line1: self.headers_finished = True self.writer = self.start_response( self.status, self.headers) self.writer(self.buffer) del self.buffer del self.headers del self.status break elif b':' not in line1: raise CGIError( "Bad header line: %r" % line1) else: name, value = line1.split(b':', 1) value = value.lstrip() name = name.strip() if six.PY3: name = name.decode('utf8') value = value.decode('utf8') if name.lower() == 'status': if ' ' not in value: # WSGI requires this space, sometimes CGI scripts don't set it: value = '%s General' % value self.status = value else: self.headers.append((name, value)) class StdinReader(object): def __init__(self, stdin, content_length): self.stdin = stdin self.content_length = content_length @classmethod def from_environ(cls, environ): length = environ.get('CONTENT_LENGTH') if length: length = int(length) else: length = 0 return cls(environ['wsgi.input'], length) def read(self, size=None): if not self.content_length: return b'' if size is None: text = self.stdin.read(self.content_length) else: text = self.stdin.read(min(self.content_length, size)) self.content_length -= len(text) return text def proc_communicate(proc, stdin=None, stdout=None, stderr=None): """ Run the given process, piping input/output/errors to the given file-like objects (which need not be actual file objects, unlike the arguments passed to Popen). Wait for process to terminate. Note: this is taken from the posix version of subprocess.Popen.communicate, but made more general through the use of file-like objects. """ read_set = [] write_set = [] input_buffer = b'' trans_nl = proc.universal_newlines and hasattr(open, 'newlines') if proc.stdin: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. proc.stdin.flush() if input: write_set.append(proc.stdin) else: proc.stdin.close() else: assert stdin is None if proc.stdout: read_set.append(proc.stdout) else: assert stdout is None if proc.stderr: read_set.append(proc.stderr) else: assert stderr is None while read_set or write_set: rlist, wlist, xlist = select.select(read_set, write_set, []) if proc.stdin in wlist: # When select has indicated that the file is writable, # we can write up to PIPE_BUF bytes without risk # blocking. POSIX defines PIPE_BUF >= 512 next, input_buffer = input_buffer, b'' next_len = 512-len(next) if next_len: next += stdin.read(next_len) if not next: proc.stdin.close() write_set.remove(proc.stdin) else: bytes_written = os.write(proc.stdin.fileno(), next) if bytes_written < len(next): input_buffer = next[bytes_written:] if proc.stdout in rlist: data = os.read(proc.stdout.fileno(), 1024) if data == b"": proc.stdout.close() read_set.remove(proc.stdout) if trans_nl: data = proc._translate_newlines(data) stdout.write(data) if proc.stderr in rlist: data = os.read(proc.stderr.fileno(), 1024) if data == b"": proc.stderr.close() read_set.remove(proc.stderr) if trans_nl: data = proc._translate_newlines(data) stderr.write(data) try: proc.wait() except OSError as e: if e.errno != 10: raise def make_cgi_application(global_conf, script, path=None, include_os_environ=None, query_string=None): """ Paste Deploy interface for :class:`CGIApplication` This object acts as a proxy to a CGI application. You pass in the script path (``script``), an optional path to search for the script (if the name isn't absolute) (``path``). If you don't give a path, then ``$PATH`` will be used. """ if path is None: path = global_conf.get('path') or global_conf.get('PATH') include_os_environ = converters.asbool(include_os_environ) return CGIApplication( None, script, path=path, include_os_environ=include_os_environ, query_string=query_string)