summaryrefslogtreecommitdiff
path: root/paste/debug/prints.py
blob: c2cfaa676c8bc76449976a9fdb030c56c7e07458 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Middleware that displays everything that is printed inline in
application pages.
"""

from cStringIO import StringIO
import re
import cgi
from paste.util import threadedprint
from paste import wsgilib
from paste import response
from paste.deploy.converters import asbool

_threadedprint_installed = False

__all__ = ['PrintDebugMiddleware']

class TeeFile(object):

    def __init__(self, files):
        self.files = files

    def write(self, v):
        if isinstance(v, unicode):
            # WSGI is picky in this case
            v = str(v)
        for file in self.files:
            file.write(v)

class PrintDebugMiddleware(object):

    """
    This middleware captures all the printed statements, and inlines
    them in HTML pages, so that you can see all the (debug-intended)
    print statements in the page itself.

    There are two keys added to the environment to control this:
    ``environ['paste.printdebug_listeners']`` is a list of functions
    that will be called everytime something is printed.

    ``environ['paste.remove_printdebug']`` is a function that, if
    called, will disable printing of output for that request.
    """

    log_template = (
        '<pre style="width: 40%%; border: 2px solid #000; white-space: normal; '
        'background-color: #ffd; color: #000; float: right;">'
        '<b style="border-bottom: 1px solid #000">Log messages</b><br>'
        '%s</pre>')

    def __init__(self, app, global_conf=None, force_content_type=False,
                 print_wsgi_errors=True):
        self.app = app
        self.force_content_type = force_content_type
        self.print_wsgi_errors = asbool(print_wsgi_errors)

    def __call__(self, environ, start_response):
        global _threadedprint_installed
        if environ.get('paste.testing'):
            # In a testing environment this interception isn't
            # useful:
            return self.app(environ, start_response)
        if not _threadedprint_installed:
            # @@: Not strictly threadsafe
            _threadedprint_installed = True
            threadedprint.install(leave_stdout=True)
        removed = []
        def remove_printdebug():
            removed.append(None)
        environ['paste.remove_printdebug'] = remove_printdebug
        logged = StringIO()
        listeners = [logged]
        environ['paste.printdebug_listeners'] = listeners
        if self.print_wsgi_errors:
            listeners.append(environ['wsgi.errors'])
        replacement_stdout = TeeFile(listeners)
        try:
            threadedprint.register(replacement_stdout)
            status, headers, body = wsgilib.intercept_output(
                environ, self.app)
            if status is None:
                # Some error occurred
                status = '500 Server Error'
                headers = [('Content-type', 'text/html')]
                start_response(status, headers)
                if not body:
                    body = 'An error occurred'
            content_type = response.header_value(headers, 'content-type')
            if (removed or
                (not self.force_content_type and
                 (not content_type
                  or not content_type.startswith('text/html')))):
                if replacement_stdout == logged:
                    # Then the prints will be lost, unless...
                    environ['wsgi.errors'].write(logged.getvalue())
                start_response(status, headers)
                return [body]
            response.remove_header(headers, 'content-length')
            body = self.add_log(body, logged.getvalue())
            start_response(status, headers)
            return [body]
        finally:
            threadedprint.deregister()

    _body_re = re.compile(r'<body[^>]*>', re.I)
        
    def add_log(self, html, log):
        if not log:
            return html
        text = cgi.escape(log)
        text = text.replace('\n', '<br>\n')
        text = text.replace('  ', '&nbsp; ')
        log = self.log_template % text
        match = self._body_re.search(html)
        if not match:
            return log + html
        else:
            return html[:match.end()] + log + html[match.end():]