diff options
author | Marc Abramowitz <marc@marc-abramowitz.com> | 2015-04-30 16:42:17 -0700 |
---|---|---|
committer | Marc Abramowitz <marc@marc-abramowitz.com> | 2015-04-30 16:42:17 -0700 |
commit | 12a3f1f4cfa7f88478dc1b0e949fcc095b9fc804 (patch) | |
tree | ddb8079523d846f0b074437fc33fa5e28b508183 /paste/cgiapp.py | |
download | paste-git-eliminate_cgi_parse_qsl.tar.gz |
Replace cgi.parse_qsl w/ six.moves.urllib.parse.parse_sqleliminate_cgi_parse_qsl_2eliminate_cgi_parse_qsl
because `cgi.parse_qsl` is deprecated, according to
https://docs.python.org/2/library/cgi.html#cgi.parse_qsl
Diffstat (limited to 'paste/cgiapp.py')
-rw-r--r-- | paste/cgiapp.py | 280 |
1 files changed, 280 insertions, 0 deletions
diff --git a/paste/cgiapp.py b/paste/cgiapp.py new file mode 100644 index 0000000..e5a62f4 --- /dev/null +++ b/paste/cgiapp.py @@ -0,0 +1,280 @@ +# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) +# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php + +""" +Application that runs a CGI script. +""" +import os +import sys +import subprocess +from six.moves.urllib.parse import quote +try: + import select +except ImportError: + select = None +import six + +from paste.util import converters + +__all__ = ['CGIError', 'CGIApplication'] + +class CGIError(Exception): + """ + Raised when the CGI script can't be found or doesn't + act like a proper CGI script. + """ + +class CGIApplication(object): + + """ + This object acts as a proxy to a CGI application. You pass in the + script path (``script``), an optional path to search for the + script (if the name isn't absolute) (``path``). If you don't give + a path, then ``$PATH`` will be used. + """ + + def __init__(self, + global_conf, + script, + path=None, + include_os_environ=True, + query_string=None): + if global_conf: + raise NotImplemented( + "global_conf is no longer supported for CGIApplication " + "(use make_cgi_application); please pass None instead") + self.script_filename = script + if path is None: + path = os.environ.get('PATH', '').split(':') + self.path = path + if '?' in script: + assert query_string is None, ( + "You cannot have '?' in your script name (%r) and also " + "give a query_string (%r)" % (script, query_string)) + script, query_string = script.split('?', 1) + if os.path.abspath(script) != script: + # relative path + for path_dir in self.path: + if os.path.exists(os.path.join(path_dir, script)): + self.script = os.path.join(path_dir, script) + break + else: + raise CGIError( + "Script %r not found in path %r" + % (script, self.path)) + else: + self.script = script + self.include_os_environ = include_os_environ + self.query_string = query_string + + def __call__(self, environ, start_response): + if 'REQUEST_URI' not in environ: + environ['REQUEST_URI'] = ( + quote(environ.get('SCRIPT_NAME', '')) + + quote(environ.get('PATH_INFO', ''))) + if self.include_os_environ: + cgi_environ = os.environ.copy() + else: + cgi_environ = {} + for name in environ: + # Should unicode values be encoded? + if (name.upper() == name + and isinstance(environ[name], str)): + cgi_environ[name] = environ[name] + if self.query_string is not None: + old = cgi_environ.get('QUERY_STRING', '') + if old: + old += '&' + cgi_environ['QUERY_STRING'] = old + self.query_string + cgi_environ['SCRIPT_FILENAME'] = self.script + proc = subprocess.Popen( + [self.script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=cgi_environ, + cwd=os.path.dirname(self.script), + ) + writer = CGIWriter(environ, start_response) + if select and sys.platform != 'win32': + proc_communicate( + proc, + stdin=StdinReader.from_environ(environ), + stdout=writer, + stderr=environ['wsgi.errors']) + else: + stdout, stderr = proc.communicate(StdinReader.from_environ(environ).read()) + if stderr: + environ['wsgi.errors'].write(stderr) + writer.write(stdout) + if not writer.headers_finished: + start_response(writer.status, writer.headers) + return [] + +class CGIWriter(object): + + def __init__(self, environ, start_response): + self.environ = environ + self.start_response = start_response + self.status = '200 OK' + self.headers = [] + self.headers_finished = False + self.writer = None + self.buffer = b'' + + def write(self, data): + if self.headers_finished: + self.writer(data) + return + self.buffer += data + while b'\n' in self.buffer: + if b'\r\n' in self.buffer and self.buffer.find(b'\r\n') < self.buffer.find(b'\n'): + line1, self.buffer = self.buffer.split(b'\r\n', 1) + else: + line1, self.buffer = self.buffer.split(b'\n', 1) + if not line1: + self.headers_finished = True + self.writer = self.start_response( + self.status, self.headers) + self.writer(self.buffer) + del self.buffer + del self.headers + del self.status + break + elif b':' not in line1: + raise CGIError( + "Bad header line: %r" % line1) + else: + name, value = line1.split(b':', 1) + value = value.lstrip() + name = name.strip() + if six.PY3: + name = name.decode('utf8') + value = value.decode('utf8') + if name.lower() == 'status': + if ' ' not in value: + # WSGI requires this space, sometimes CGI scripts don't set it: + value = '%s General' % value + self.status = value + else: + self.headers.append((name, value)) + +class StdinReader(object): + + def __init__(self, stdin, content_length): + self.stdin = stdin + self.content_length = content_length + + @classmethod + def from_environ(cls, environ): + length = environ.get('CONTENT_LENGTH') + if length: + length = int(length) + else: + length = 0 + return cls(environ['wsgi.input'], length) + + def read(self, size=None): + if not self.content_length: + return b'' + if size is None: + text = self.stdin.read(self.content_length) + else: + text = self.stdin.read(min(self.content_length, size)) + self.content_length -= len(text) + return text + +def proc_communicate(proc, stdin=None, stdout=None, stderr=None): + """ + Run the given process, piping input/output/errors to the given + file-like objects (which need not be actual file objects, unlike + the arguments passed to Popen). Wait for process to terminate. + + Note: this is taken from the posix version of + subprocess.Popen.communicate, but made more general through the + use of file-like objects. + """ + read_set = [] + write_set = [] + input_buffer = b'' + trans_nl = proc.universal_newlines and hasattr(open, 'newlines') + + if proc.stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + proc.stdin.flush() + if input: + write_set.append(proc.stdin) + else: + proc.stdin.close() + else: + assert stdin is None + if proc.stdout: + read_set.append(proc.stdout) + else: + assert stdout is None + if proc.stderr: + read_set.append(proc.stderr) + else: + assert stderr is None + + while read_set or write_set: + rlist, wlist, xlist = select.select(read_set, write_set, []) + + if proc.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + next, input_buffer = input_buffer, b'' + next_len = 512-len(next) + if next_len: + next += stdin.read(next_len) + if not next: + proc.stdin.close() + write_set.remove(proc.stdin) + else: + bytes_written = os.write(proc.stdin.fileno(), next) + if bytes_written < len(next): + input_buffer = next[bytes_written:] + + if proc.stdout in rlist: + data = os.read(proc.stdout.fileno(), 1024) + if data == b"": + proc.stdout.close() + read_set.remove(proc.stdout) + if trans_nl: + data = proc._translate_newlines(data) + stdout.write(data) + + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) + if data == b"": + proc.stderr.close() + read_set.remove(proc.stderr) + if trans_nl: + data = proc._translate_newlines(data) + stderr.write(data) + + try: + proc.wait() + except OSError as e: + if e.errno != 10: + raise + +def make_cgi_application(global_conf, script, path=None, include_os_environ=None, + query_string=None): + """ + Paste Deploy interface for :class:`CGIApplication` + + This object acts as a proxy to a CGI application. You pass in the + script path (``script``), an optional path to search for the + script (if the name isn't absolute) (``path``). If you don't give + a path, then ``$PATH`` will be used. + """ + if path is None: + path = global_conf.get('path') or global_conf.get('PATH') + include_os_environ = converters.asbool(include_os_environ) + return CGIApplication( + None, + script, path=path, include_os_environ=include_os_environ, + query_string=query_string) |