diff options
| author | David Lord <davidism@gmail.com> | 2019-02-13 11:44:18 -0800 |
|---|---|---|
| committer | David Lord <davidism@gmail.com> | 2019-03-08 08:01:31 -0800 |
| commit | ab6150fa49afc61b0c5eed6d9545d03d1958e384 (patch) | |
| tree | ad5f13c9c2775ca59cc8e82ec124c4e065a65d1b /examples | |
| parent | 048d707d25685e6aea675c53945ceb7619e60344 (diff) | |
| download | werkzeug-code-style.tar.gz | |
apply code stylecode-style
* reorder-python-imports
* line fixers
* black
* flake8
Diffstat (limited to 'examples')
55 files changed, 1242 insertions, 880 deletions
diff --git a/examples/README.rst b/examples/README.rst index 593394a1..2b9df866 100644 --- a/examples/README.rst +++ b/examples/README.rst @@ -23,7 +23,7 @@ find in real life :-) A simple Wiki implementation. Requirements: - + - SQLAlchemy - Creoleparser >= 0.7 - genshi @@ -50,7 +50,7 @@ find in real life :-) A planet called plnt, pronounce plant. Requirements: - + - SQLAlchemy - Jinja2 - feedparser @@ -76,7 +76,7 @@ find in real life :-) A tinyurl clone for the Werkzeug tutorial. Requirements: - + - SQLAlchemy - Jinja2 @@ -98,7 +98,7 @@ find in real life :-) Like shorty, but implemented using CouchDB. Requirements : - + - werkzeug : http://werkzeug.pocoo.org - jinja : http://jinja.pocoo.org - couchdb 0.72 & above : https://couchdb.apache.org/ diff --git a/examples/contrib/securecookie.py b/examples/contrib/securecookie.py index a0a566d1..2f6544d3 100644 --- a/examples/contrib/securecookie.py +++ b/examples/contrib/securecookie.py @@ -9,15 +9,16 @@ :license: BSD-3-Clause """ from time import asctime -from werkzeug.serving import run_simple -from werkzeug.wrappers import BaseRequest, BaseResponse + from werkzeug.contrib.securecookie import SecureCookie +from werkzeug.serving import run_simple +from werkzeug.wrappers import BaseRequest +from werkzeug.wrappers import BaseResponse -SECRET_KEY = 'V\x8a$m\xda\xe9\xc3\x0f|f\x88\xbccj>\x8bI^3+' +SECRET_KEY = "V\x8a$m\xda\xe9\xc3\x0f|f\x88\xbccj>\x8bI^3+" class Request(BaseRequest): - def __init__(self, environ): BaseRequest.__init__(self, environ) self.session = SecureCookie.load_cookie(self, secret_key=SECRET_KEY) @@ -28,23 +29,23 @@ def index(request): def get_time(request): - return 'Time: %s' % request.session.get('time', 'not set') + return "Time: %s" % request.session.get("time", "not set") def set_time(request): - request.session['time'] = time = asctime() - return 'Time set to %s' % time + request.session["time"] = time = asctime() + return "Time set to %s" % time def application(environ, start_response): request = Request(environ) - response = BaseResponse({ - 'get': get_time, - 'set': set_time - }.get(request.path.strip('/'), index)(request), mimetype='text/html') + response = BaseResponse( + {"get": get_time, "set": set_time}.get(request.path.strip("/"), index)(request), + mimetype="text/html", + ) request.session.save_cookie(response) return response(environ, start_response) -if __name__ == '__main__': - run_simple('localhost', 5000, application) +if __name__ == "__main__": + run_simple("localhost", 5000, application) diff --git a/examples/contrib/sessions.py b/examples/contrib/sessions.py index ecf464a2..c5eef576 100644 --- a/examples/contrib/sessions.py +++ b/examples/contrib/sessions.py @@ -1,11 +1,11 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from werkzeug.contrib.sessions import SessionMiddleware +from werkzeug.contrib.sessions import SessionStore from werkzeug.serving import run_simple -from werkzeug.contrib.sessions import SessionStore, SessionMiddleware class MemorySessionStore(SessionStore): - def __init__(self, session_class=None): SessionStore.__init__(self, session_class=None) self.sessions = {} @@ -23,21 +23,22 @@ class MemorySessionStore(SessionStore): def application(environ, start_response): - session = environ['werkzeug.session'] - session['visit_count'] = session.get('visit_count', 0) + 1 + session = environ["werkzeug.session"] + session["visit_count"] = session.get("visit_count", 0) + 1 - start_response('200 OK', [('Content-Type', 'text/html')]) - return [''' - <!doctype html> + start_response("200 OK", [("Content-Type", "text/html")]) + return [ + """<!doctype html> <title>Session Example</title> <h1>Session Example</h1> - <p>You visited this page %d times.</p> - ''' % session['visit_count']] + <p>You visited this page %d times.</p>""" + % session["visit_count"] + ] def make_app(): return SessionMiddleware(application, MemorySessionStore()) -if __name__ == '__main__': - run_simple('localhost', 5000, make_app()) +if __name__ == "__main__": + run_simple("localhost", 5000, make_app()) diff --git a/examples/cookieauth.py b/examples/cookieauth.py index 64b5ae0d..ba23bda4 100644 --- a/examples/cookieauth.py +++ b/examples/cookieauth.py @@ -10,25 +10,25 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from werkzeug.serving import run_simple -from werkzeug.utils import cached_property, escape, redirect -from werkzeug.wrappers import Request, Response from werkzeug.contrib.securecookie import SecureCookie +from werkzeug.serving import run_simple +from werkzeug.utils import cached_property +from werkzeug.utils import escape +from werkzeug.utils import redirect +from werkzeug.wrappers import Request +from werkzeug.wrappers import Response # don't use this key but a different one; you could just use # os.unrandom(20) to get something random. Changing this key # invalidates all sessions at once. -SECRET_KEY = '\xfa\xdd\xb8z\xae\xe0}4\x8b\xea' +SECRET_KEY = "\xfa\xdd\xb8z\xae\xe0}4\x8b\xea" # the cookie name for the session -COOKIE_NAME = 'session' +COOKIE_NAME = "session" # the users that may access -USERS = { - 'admin': 'default', - 'user1': 'default' -} +USERS = {"admin": "default", "user1": "default"} class AppRequest(Request): @@ -36,11 +36,11 @@ class AppRequest(Request): def logout(self): """Log the user out.""" - self.session.pop('username', None) + self.session.pop("username", None) def login(self, username): """Log the user in.""" - self.session['username'] = username + self.session["username"] = username @property def logged_in(self): @@ -50,7 +50,7 @@ class AppRequest(Request): @property def user(self): """The user that is logged in.""" - return self.session.get('username') + return self.session.get("username") @cached_property def session(self): @@ -61,16 +61,16 @@ class AppRequest(Request): def login_form(request): - error = '' - if request.method == 'POST': - username = request.form.get('username') - password = request.form.get('password') + error = "" + if request.method == "POST": + username = request.form.get("username") + password = request.form.get("password") if password and USERS.get(username) == password: request.login(username) - return redirect('') - error = '<p>Invalid credentials' - return Response(''' - <title>Login</title><h1>Login</h1> + return redirect("") + error = "<p>Invalid credentials" + return Response( + """<title>Login</title><h1>Login</h1> <p>Not logged in. %s <form action="" method="post"> @@ -79,23 +79,28 @@ def login_form(request): <input type="text" name="username" size=20> <input type="password" name="password", size=20> <input type="submit" value="Login"> - </form>''' % error, mimetype='text/html') + </form>""" + % error, + mimetype="text/html", + ) def index(request): - return Response(''' - <title>Logged in</title> + return Response( + """<title>Logged in</title> <h1>Logged in</h1> <p>Logged in as %s - <p><a href="/?do=logout">Logout</a> - ''' % escape(request.user), mimetype='text/html') + <p><a href="/?do=logout">Logout</a>""" + % escape(request.user), + mimetype="text/html", + ) @AppRequest.application def application(request): - if request.args.get('do') == 'logout': + if request.args.get("do") == "logout": request.logout() - response = redirect('.') + response = redirect(".") elif request.logged_in: response = index(request) else: @@ -104,5 +109,5 @@ def application(request): return response -if __name__ == '__main__': - run_simple('localhost', 4000, application) +if __name__ == "__main__": + run_simple("localhost", 4000, application) diff --git a/examples/coolmagic/__init__.py b/examples/coolmagic/__init__.py index 88d5db80..0526f1d6 100644 --- a/examples/coolmagic/__init__.py +++ b/examples/coolmagic/__init__.py @@ -8,4 +8,4 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from coolmagic.application import make_app +from .application import make_app diff --git a/examples/coolmagic/application.py b/examples/coolmagic/application.py index 10ffd2a2..730f4d83 100644 --- a/examples/coolmagic/application.py +++ b/examples/coolmagic/application.py @@ -12,11 +12,18 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from os import path, listdir -from coolmagic.utils import Request, local_manager +from os import listdir +from os import path + +from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import NotFound from werkzeug.middleware.shared_data import SharedDataMiddleware -from werkzeug.routing import Map, Rule, RequestRedirect -from werkzeug.exceptions import HTTPException, NotFound +from werkzeug.routing import Map +from werkzeug.routing import RequestRedirect +from werkzeug.routing import Rule + +from .utils import local_manager +from .utils import Request class CoolMagicApplication(object): @@ -27,17 +34,17 @@ class CoolMagicApplication(object): def __init__(self, config): self.config = config - for fn in listdir(path.join(path.dirname(__file__), 'views')): - if fn.endswith('.py') and fn != '__init__.py': - __import__('coolmagic.views.' + fn[:-3]) + for fn in listdir(path.join(path.dirname(__file__), "views")): + if fn.endswith(".py") and fn != "__init__.py": + __import__("coolmagic.views." + fn[:-3]) from coolmagic.utils import exported_views + rules = [ # url for shared data. this will always be unmatched # because either the middleware or the webserver # handles that request first. - Rule('/public/<path:file>', - endpoint='shared_data') + Rule("/public/<path:file>", endpoint="shared_data") ] self.views = {} for endpoint, (func, rule, extra) in exported_views.items(): @@ -53,7 +60,7 @@ class CoolMagicApplication(object): endpoint, args = urls.match(req.path) resp = self.views[endpoint](**args) except NotFound: - resp = self.views['static.not_found']() + resp = self.views["static.not_found"]() except (HTTPException, RequestRedirect) as e: resp = e return resp(environ, start_response) @@ -68,9 +75,9 @@ def make_app(config=None): app = CoolMagicApplication(config) # static stuff - app = SharedDataMiddleware(app, { - '/public': path.join(path.dirname(__file__), 'public') - }) + app = SharedDataMiddleware( + app, {"/public": path.join(path.dirname(__file__), "public")} + ) # clean up locals app = local_manager.make_middleware(app) diff --git a/examples/coolmagic/helpers.py b/examples/coolmagic/helpers.py index 54638335..4cd4ac45 100644 --- a/examples/coolmagic/helpers.py +++ b/examples/coolmagic/helpers.py @@ -8,7 +8,7 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from coolmagic.utils import ThreadedRequest +from .utils import ThreadedRequest #: a thread local proxy request object diff --git a/examples/coolmagic/utils.py b/examples/coolmagic/utils.py index b69f95a4..f4cf20d5 100644 --- a/examples/coolmagic/utils.py +++ b/examples/coolmagic/utils.py @@ -11,17 +11,21 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from os.path import dirname, join -from jinja2 import Environment, FileSystemLoader -from werkzeug.local import Local, LocalManager -from werkzeug.wrappers import BaseRequest, BaseResponse +from os.path import dirname +from os.path import join + +from jinja2 import Environment +from jinja2 import FileSystemLoader +from werkzeug.local import Local +from werkzeug.local import LocalManager +from werkzeug.wrappers import BaseRequest +from werkzeug.wrappers import BaseResponse local = Local() local_manager = LocalManager([local]) template_env = Environment( - loader=FileSystemLoader(join(dirname(__file__), 'templates'), - use_memcache=False) + loader=FileSystemLoader(join(dirname(__file__), "templates"), use_memcache=False) ) exported_views = {} @@ -31,19 +35,23 @@ def export(string, template=None, **extra): Decorator for registering view functions and adding templates to it. """ + def wrapped(f): - endpoint = (f.__module__ + '.' + f.__name__)[16:] + endpoint = (f.__module__ + "." + f.__name__)[16:] if template is not None: old_f = f + def f(**kwargs): rv = old_f(**kwargs) if not isinstance(rv, Response): rv = TemplateResponse(template, **(rv or {})) return rv + f.__name__ = old_f.__name__ f.__doc__ = old_f.__doc__ exported_views[endpoint] = (f, string, extra) return f + return wrapped @@ -59,7 +67,8 @@ class Request(BaseRequest): The concrete request object used in the WSGI application. It has some helper functions that can be used to build URLs. """ - charset = 'utf-8' + + charset = "utf-8" def __init__(self, environ, url_adapter): BaseRequest.__init__(self, environ) @@ -74,9 +83,8 @@ class ThreadedRequest(object): """ def __getattr__(self, name): - if name == '__members__': - return [x for x in dir(local.request) if not - x.startswith('_')] + if name == "__members__": + return [x for x in dir(local.request) if not x.startswith("_")] return getattr(local.request, name) def __setattr__(self, name, value): @@ -87,8 +95,9 @@ class Response(BaseResponse): """ The concrete response object for the WSGI application. """ - charset = 'utf-8' - default_mimetype = 'text/html' + + charset = "utf-8" + default_mimetype = "text/html" class TemplateResponse(Response): @@ -98,9 +107,7 @@ class TemplateResponse(Response): def __init__(self, template_name, **values): from coolmagic import helpers - values.update( - request=local.request, - h=helpers - ) + + values.update(request=local.request, h=helpers) template = template_env.get_template(template_name) Response.__init__(self, template.render(values)) diff --git a/examples/coolmagic/views/static.py b/examples/coolmagic/views/static.py index edb09372..f4f95409 100644 --- a/examples/coolmagic/views/static.py +++ b/examples/coolmagic/views/static.py @@ -11,22 +11,22 @@ from coolmagic.utils import export -@export('/', template='static/index.html') +@export("/", template="static/index.html") def index(): pass -@export('/about', template='static/about.html') +@export("/about", template="static/about.html") def about(): pass -@export('/broken') +@export("/broken") def broken(): - raise RuntimeError('that\'s really broken') + raise RuntimeError("that's really broken") -@export(None, template='static/not_found.html') +@export(None, template="static/not_found.html") def not_found(): """ This function is always executed if an url does not diff --git a/examples/couchy/README b/examples/couchy/README index 1821ed11..24960448 100644 --- a/examples/couchy/README +++ b/examples/couchy/README @@ -5,4 +5,3 @@ Requirements : - jinja : http://jinja.pocoo.org - couchdb 0.72 & above : https://couchdb.apache.org/ - couchdb-python 0.3 & above : https://github.com/djc/couchdb-python - diff --git a/examples/couchy/application.py b/examples/couchy/application.py index f3a56a63..b958dcbc 100644 --- a/examples/couchy/application.py +++ b/examples/couchy/application.py @@ -1,27 +1,28 @@ from couchdb.client import Server -from couchy.utils import STATIC_PATH, local, local_manager, \ - url_map +from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import NotFound from werkzeug.middleware.shared_data import SharedDataMiddleware from werkzeug.wrappers import Request from werkzeug.wsgi import ClosingIterator -from werkzeug.exceptions import HTTPException, NotFound -from couchy import views -from couchy.models import URL +from . import views +from .models import URL +from .utils import local +from .utils import local_manager +from .utils import STATIC_PATH +from .utils import url_map -class Couchy(object): +class Couchy(object): def __init__(self, db_uri): local.application = self server = Server(db_uri) try: - db = server.create('urls') - except: - db = server['urls'] - self.dispatch = SharedDataMiddleware(self.dispatch, { - '/static': STATIC_PATH - }) + db = server.create("urls") + except Exception: + db = server["urls"] + self.dispatch = SharedDataMiddleware(self.dispatch, {"/static": STATIC_PATH}) URL.db = db @@ -38,8 +39,9 @@ class Couchy(object): response.status_code = 404 except HTTPException as e: response = e - return ClosingIterator(response(environ, start_response), - [local_manager.cleanup]) + return ClosingIterator( + response(environ, start_response), [local_manager.cleanup] + ) def __call__(self, environ, start_response): return self.dispatch(environ, start_response) diff --git a/examples/couchy/models.py b/examples/couchy/models.py index 4621a744..a0b50ca1 100644 --- a/examples/couchy/models.py +++ b/examples/couchy/models.py @@ -1,6 +1,12 @@ from datetime import datetime -from couchdb.mapping import Document, TextField, BooleanField, DateTimeField -from couchy.utils import url_for, get_random_uid + +from couchdb.mapping import BooleanField +from couchdb.mapping import DateTimeField +from couchdb.mapping import Document +from couchdb.mapping import TextField + +from .utils import get_random_uid +from .utils import url_for class URL(Document): @@ -19,13 +25,15 @@ class URL(Document): return URL.db.query(code) def store(self): - if getattr(self._data, 'id', None) is None: + if getattr(self._data, "id", None) is None: new_id = self.shorty_id if self.shorty_id else None while 1: id = new_id if new_id else get_random_uid() try: - docid = URL.db.resource.put(content=self._data, path='/%s/' % str(id))['id'] - except: + docid = URL.db.resource.put( + content=self._data, path="/%s/" % str(id) + )["id"] + except Exception: continue if docid: break @@ -36,7 +44,7 @@ class URL(Document): @property def short_url(self): - return url_for('link', uid=self.id, _external=True) + return url_for("link", uid=self.id, _external=True) def __repr__(self): - return '<URL %r>' % self.id + return "<URL %r>" % self.id diff --git a/examples/couchy/utils.py b/examples/couchy/utils.py index 4fe666a3..571a7ed9 100644 --- a/examples/couchy/utils.py +++ b/examples/couchy/utils.py @@ -1,49 +1,62 @@ from os import path -from random import sample, randrange -from jinja2 import Environment, FileSystemLoader -from werkzeug.local import Local, LocalManager +from random import randrange +from random import sample + +from jinja2 import Environment +from jinja2 import FileSystemLoader +from werkzeug.local import Local +from werkzeug.local import LocalManager +from werkzeug.routing import Map +from werkzeug.routing import Rule from werkzeug.urls import url_parse from werkzeug.utils import cached_property from werkzeug.wrappers import Response -from werkzeug.routing import Map, Rule -TEMPLATE_PATH = path.join(path.dirname(__file__), 'templates') -STATIC_PATH = path.join(path.dirname(__file__), 'static') -ALLOWED_SCHEMES = frozenset(['http', 'https', 'ftp', 'ftps']) -URL_CHARS = 'abcdefghijkmpqrstuvwxyzABCDEFGHIJKLMNPQRST23456789' +TEMPLATE_PATH = path.join(path.dirname(__file__), "templates") +STATIC_PATH = path.join(path.dirname(__file__), "static") +ALLOWED_SCHEMES = frozenset(["http", "https", "ftp", "ftps"]) +URL_CHARS = "abcdefghijkmpqrstuvwxyzABCDEFGHIJKLMNPQRST23456789" local = Local() local_manager = LocalManager([local]) -application = local('application') +application = local("application") -url_map = Map([Rule('/static/<file>', endpoint='static', build_only=True)]) +url_map = Map([Rule("/static/<file>", endpoint="static", build_only=True)]) jinja_env = Environment(loader=FileSystemLoader(TEMPLATE_PATH)) def expose(rule, **kw): def decorate(f): - kw['endpoint'] = f.__name__ + kw["endpoint"] = f.__name__ url_map.add(Rule(rule, **kw)) return f + return decorate + def url_for(endpoint, _external=False, **values): return local.url_adapter.build(endpoint, values, force_external=_external) -jinja_env.globals['url_for'] = url_for + + +jinja_env.globals["url_for"] = url_for + def render_template(template, **context): - return Response(jinja_env.get_template(template).render(**context), - mimetype='text/html') + return Response( + jinja_env.get_template(template).render(**context), mimetype="text/html" + ) + def validate_url(url): return url_parse(url)[0] in ALLOWED_SCHEMES + def get_random_uid(): - return ''.join(sample(URL_CHARS, randrange(3, 9))) + return "".join(sample(URL_CHARS, randrange(3, 9))) -class Pagination(object): +class Pagination(object): def __init__(self, results, per_page, page, endpoint): self.results = results self.per_page = per_page @@ -56,7 +69,11 @@ class Pagination(object): @cached_property def entries(self): - return self.results[((self.page - 1) * self.per_page):(((self.page - 1) * self.per_page)+self.per_page)] + return self.results[ + ((self.page - 1) * self.per_page) : ( + ((self.page - 1) * self.per_page) + self.per_page + ) + ] has_previous = property(lambda self: self.page > 1) has_next = property(lambda self: self.page < self.pages) diff --git a/examples/couchy/views.py b/examples/couchy/views.py index 39c8ea29..c1547e7d 100644 --- a/examples/couchy/views.py +++ b/examples/couchy/views.py @@ -1,61 +1,73 @@ -from werkzeug.utils import redirect from werkzeug.exceptions import NotFound -from couchy.utils import render_template, expose, \ - validate_url, url_for, Pagination -from couchy.models import URL +from werkzeug.utils import redirect +from .models import URL +from .utils import expose +from .utils import Pagination +from .utils import render_template +from .utils import url_for +from .utils import validate_url -@expose('/') + +@expose("/") def new(request): - error = url = '' - if request.method == 'POST': - url = request.form.get('url') - alias = request.form.get('alias') + error = url = "" + if request.method == "POST": + url = request.form.get("url") + alias = request.form.get("alias") if not validate_url(url): error = "I'm sorry but you cannot shorten this URL." elif alias: if len(alias) > 140: - error = 'Your alias is too long' - elif '/' in alias: - error = 'Your alias might not include a slash' + error = "Your alias is too long" + elif "/" in alias: + error = "Your alias might not include a slash" elif URL.load(alias): - error = 'The alias you have requested exists already' + error = "The alias you have requested exists already" if not error: - url = URL(target=url, public='private' not in request.form, shorty_id=alias if alias else None) + url = URL( + target=url, + public="private" not in request.form, + shorty_id=alias if alias else None, + ) url.store() uid = url.id - return redirect(url_for('display', uid=uid)) - return render_template('new.html', error=error, url=url) + return redirect(url_for("display", uid=uid)) + return render_template("new.html", error=error, url=url) + -@expose('/display/<uid>') +@expose("/display/<uid>") def display(request, uid): url = URL.load(uid) if not url: raise NotFound() - return render_template('display.html', url=url) + return render_template("display.html", url=url) -@expose('/u/<uid>') + +@expose("/u/<uid>") def link(request, uid): url = URL.load(uid) if not url: raise NotFound() return redirect(url.target, 301) -@expose('/list/', defaults={'page': 1}) -@expose('/list/<int:page>') + +@expose("/list/", defaults={"page": 1}) +@expose("/list/<int:page>") def list(request, page): def wrap(doc): data = doc.value - data['_id'] = doc.id + data["_id"] = doc.id return URL.wrap(data) - code = '''function(doc) { if (doc.public){ map([doc._id], doc); }}''' + code = """function(doc) { if (doc.public){ map([doc._id], doc); }}""" docResults = URL.query(code) results = [wrap(doc) for doc in docResults] - pagination = Pagination(results, 1, page, 'list') + pagination = Pagination(results, 1, page, "list") if pagination.page > 1 and not pagination.entries: raise NotFound() - return render_template('list.html', pagination=pagination) + return render_template("list.html", pagination=pagination) + def not_found(request): - return render_template('not_found.html') + return render_template("not_found.html") diff --git a/examples/cupoftee/__init__.py b/examples/cupoftee/__init__.py index f4a9d04d..184c5d0d 100644 --- a/examples/cupoftee/__init__.py +++ b/examples/cupoftee/__init__.py @@ -8,4 +8,4 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from cupoftee.application import make_app +from .application import make_app diff --git a/examples/cupoftee/application.py b/examples/cupoftee/application.py index 140b6b74..540e3f59 100644 --- a/examples/cupoftee/application.py +++ b/examples/cupoftee/application.py @@ -9,44 +9,58 @@ :license: BSD-3-Clause """ import time - -from jinja2 import Environment, PackageLoader from os import path from threading import Thread -from cupoftee.db import Database -from cupoftee.network import ServerBrowser +from jinja2 import Environment +from jinja2 import PackageLoader +from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import NotFound from werkzeug.middleware.shared_data import SharedDataMiddleware -from werkzeug.wrappers import Request, Response -from werkzeug.exceptions import HTTPException, NotFound -from werkzeug.routing import Map, Rule +from werkzeug.routing import Map +from werkzeug.routing import Rule +from werkzeug.wrappers import Request +from werkzeug.wrappers import Response + +from .db import Database +from .network import ServerBrowser -templates = path.join(path.dirname(__file__), 'templates') +templates = path.join(path.dirname(__file__), "templates") pages = {} -url_map = Map([Rule('/shared/<file>', endpoint='shared')]) +url_map = Map([Rule("/shared/<file>", endpoint="shared")]) def make_app(database, interval=120): - return SharedDataMiddleware(Cup(database, interval), { - '/shared': path.join(path.dirname(__file__), 'shared') - }) + return SharedDataMiddleware( + Cup(database, interval), + {"/shared": path.join(path.dirname(__file__), "shared")}, + ) class PageMeta(type): - def __init__(cls, name, bases, d): type.__init__(cls, name, bases, d) - if d.get('url_rule') is not None: + if d.get("url_rule") is not None: pages[cls.identifier] = cls - url_map.add(Rule(cls.url_rule, endpoint=cls.identifier, - **cls.url_arguments)) + url_map.add( + Rule(cls.url_rule, endpoint=cls.identifier, **cls.url_arguments) + ) identifier = property(lambda self: self.__name__.lower()) -class Page(object): - __metaclass__ = PageMeta +def _with_metaclass(meta, *bases): + """Create a base class with a metaclass.""" + + class metaclass(type): + def __new__(metacls, name, this_bases, d): + return meta(name, bases, d) + + return type.__new__(metaclass, "temporary_class", (), {}) + + +class Page(_with_metaclass(PageMeta, object)): url_arguments = {} def __init__(self, cup, request, url_adapter): @@ -62,17 +76,16 @@ class Page(object): def render_template(self, template=None): if template is None: - template = self.__class__.identifier + '.html' + template = self.__class__.identifier + ".html" context = dict(self.__dict__) context.update(url_for=self.url_for, self=self) return self.cup.render_template(template, context) def get_response(self): - return Response(self.render_template(), mimetype='text/html') + return Response(self.render_template(), mimetype="text/html") class Cup(object): - def __init__(self, database, interval=120): self.jinja_env = Environment(loader=PackageLoader("cupoftee"), autoescape=True) self.interval = interval @@ -111,4 +124,5 @@ class Cup(object): template = self.jinja_env.get_template(name) return template.render(context) + from cupoftee.pages import MissingPage diff --git a/examples/cupoftee/db.py b/examples/cupoftee/db.py index 9db97c3d..7f041220 100644 --- a/examples/cupoftee/db.py +++ b/examples/cupoftee/db.py @@ -9,8 +9,9 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ +from pickle import dumps +from pickle import loads from threading import Lock -from pickle import dumps, loads try: import dbm @@ -19,10 +20,9 @@ except ImportError: class Database(object): - def __init__(self, filename): self.filename = filename - self._fs = dbm.open(filename, 'cf') + self._fs = dbm.open(filename, "cf") self._local = {} self._lock = Lock() @@ -43,7 +43,7 @@ class Database(object): def __delitem__(self, key, value): with self._lock: self._local.pop(key, None) - if self._fs.has_key(key): + if key in self._fs: del self._fs[key] def __del__(self): @@ -75,5 +75,5 @@ class Database(object): try: self.sync() self._fs.close() - except: + except Exception: pass diff --git a/examples/cupoftee/network.py b/examples/cupoftee/network.py index 0e472c1b..74c775aa 100644 --- a/examples/cupoftee/network.py +++ b/examples/cupoftee/network.py @@ -9,9 +9,10 @@ :license: BSD-3-Clause """ import socket -from math import log from datetime import datetime -from cupoftee.utils import unicodecmp +from math import log + +from .utils import unicodecmp class ServerError(Exception): @@ -31,15 +32,14 @@ class Syncable(object): class ServerBrowser(Syncable): - def __init__(self, cup): self.cup = cup - self.servers = cup.db.setdefault('servers', dict) + self.servers = cup.db.setdefault("servers", dict) def _sync(self): to_delete = set(self.servers) for x in range(1, 17): - addr = ('master%d.teeworlds.com' % x, 8300) + addr = ("master%d.teeworlds.com" % x, 8300) print(addr) try: self._sync_master(addr, to_delete) @@ -48,20 +48,22 @@ class ServerBrowser(Syncable): for server_id in to_delete: self.servers.pop(server_id, None) if not self.servers: - raise IOError('no servers found') + raise IOError("no servers found") self.cup.db.sync() def _sync_master(self, addr, to_delete): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(5) - s.sendto(b'\x20\x00\x00\x00\x00\x48\xff\xff\xff\xffreqt', addr) + s.sendto(b"\x20\x00\x00\x00\x00\x48\xff\xff\xff\xffreqt", addr) data = s.recvfrom(1024)[0][14:] s.close() for n in range(0, len(data) // 6): - addr = ('.'.join(map(str, map(ord, data[n * 6:n * 6 + 4]))), - ord(data[n * 6 + 5]) * 256 + ord(data[n * 6 + 4])) - server_id = '%s:%d' % addr + addr = ( + ".".join(map(str, map(ord, data[n * 6 : n * 6 + 4]))), + ord(data[n * 6 + 5]) * 256 + ord(data[n * 6 + 4]), + ) + server_id = "%s:%d" % addr if server_id in self.servers: if not self.servers[server_id].sync(): continue @@ -74,31 +76,31 @@ class ServerBrowser(Syncable): class Server(Syncable): - def __init__(self, addr, server_id): self.addr = addr self.id = server_id self.players = [] if not self.sync(): - raise ServerError('server not responding in time') + raise ServerError("server not responding in time") def _sync(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(1) - s.sendto(b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xffgief', self.addr) - bits = s.recvfrom(1024)[0][14:].split(b'\x00') + s.sendto(b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xffgief", self.addr) + bits = s.recvfrom(1024)[0][14:].split(b"\x00") s.close() self.version, server_name, map_name = bits[:3] - self.name = server_name.decode('latin1') - self.map = map_name.decode('latin1') + self.name = server_name.decode("latin1") + self.map = map_name.decode("latin1") self.gametype = bits[3] - self.flags, self.progression, player_count, \ - self.max_players = map(int, bits[4:8]) + self.flags, self.progression, player_count, self.max_players = map( + int, bits[4:8] + ) # sync the player stats players = dict((p.name, p) for p in self.players) for i in range(player_count): - name = bits[8 + i * 2].decode('latin1') + name = bits[8 + i * 2].decode("latin1") score = int(bits[9 + i * 2]) # update existing player @@ -112,7 +114,7 @@ class Server(Syncable): for player in players.values(): try: self.players.remove(player) - except: + except Exception: pass # sort the player list and count them @@ -124,7 +126,6 @@ class Server(Syncable): class Player(object): - def __init__(self, server, name, score): self.server = server self.name = name diff --git a/examples/cupoftee/pages.py b/examples/cupoftee/pages.py index 7d799c25..c1a823b7 100644 --- a/examples/cupoftee/pages.py +++ b/examples/cupoftee/pages.py @@ -10,41 +10,42 @@ """ from functools import reduce -from werkzeug.utils import redirect from werkzeug.exceptions import NotFound -from cupoftee.application import Page -from cupoftee.utils import unicodecmp +from werkzeug.utils import redirect + +from .application import Page +from .utils import unicodecmp class ServerList(Page): - url_rule = '/' + url_rule = "/" def order_link(self, name, title): - cls = '' - link = '?order_by=' + name + cls = "" + link = "?order_by=" + name desc = False if name == self.order_by: desc = not self.order_desc - cls = ' class="%s"' % ('down' if desc else 'up') + cls = ' class="%s"' % ("down" if desc else "up") if desc: - link += '&dir=desc' + link += "&dir=desc" return '<a href="%s"%s>%s</a>' % (link, cls, title) def process(self): - self.order_by = self.request.args.get('order_by') or 'name' + self.order_by = self.request.args.get("order_by") or "name" sort_func = { - 'name': lambda x: x, - 'map': lambda x: x.map, - 'gametype': lambda x: x.gametype, - 'players': lambda x: x.player_count, - 'progression': lambda x: x.progression, + "name": lambda x: x, + "map": lambda x: x.map, + "gametype": lambda x: x.gametype, + "players": lambda x: x.player_count, + "progression": lambda x: x.progression, }.get(self.order_by) if sort_func is None: - return redirect(self.url_for('serverlist')) + return redirect(self.url_for("serverlist")) self.servers = self.cup.master.servers.values() self.servers.sort(key=sort_func) - if self.request.args.get('dir') == 'desc': + if self.request.args.get("dir") == "desc": self.servers.reverse() self.order_desc = True else: @@ -55,7 +56,7 @@ class ServerList(Page): class Server(Page): - url_rule = '/server/<id>' + url_rule = "/server/<id>" def process(self, id): try: @@ -65,10 +66,10 @@ class Server(Page): class Search(Page): - url_rule = '/search' + url_rule = "/search" def process(self): - self.user = self.request.args.get('user') + self.user = self.request.args.get("user") if self.user: self.results = [] for server in self.cup.master.servers.values(): @@ -78,7 +79,6 @@ class Search(Page): class MissingPage(Page): - def get_response(self): response = super(MissingPage, self).get_response() response.status_code = 404 diff --git a/examples/cupoftee/utils.py b/examples/cupoftee/utils.py index f13e2f3d..91717e85 100644 --- a/examples/cupoftee/utils.py +++ b/examples/cupoftee/utils.py @@ -11,7 +11,7 @@ import re -_sort_re = re.compile(r'\w+', re.UNICODE) +_sort_re = re.compile(r"\w+", re.UNICODE) def unicodecmp(a, b): diff --git a/examples/httpbasicauth.py b/examples/httpbasicauth.py index 5a774f3f..21d39300 100644 --- a/examples/httpbasicauth.py +++ b/examples/httpbasicauth.py @@ -10,12 +10,12 @@ :license: BSD-3-Clause """ from werkzeug.serving import run_simple -from werkzeug.wrappers import Request, Response +from werkzeug.wrappers import Request +from werkzeug.wrappers import Response class Application(object): - - def __init__(self, users, realm='login required'): + def __init__(self, users, realm="login required"): self.users = users self.realm = realm @@ -23,12 +23,15 @@ class Application(object): return username in self.users and self.users[username] == password def auth_required(self, request): - return Response('Could not verify your access level for that URL.\n' - 'You have to login with proper credentials', 401, - {'WWW-Authenticate': 'Basic realm="%s"' % self.realm}) + return Response( + "Could not verify your access level for that URL.\n" + "You have to login with proper credentials", + 401, + {"WWW-Authenticate": 'Basic realm="%s"' % self.realm}, + ) def dispatch_request(self, request): - return Response('Logged in as %s' % request.authorization.username) + return Response("Logged in as %s" % request.authorization.username) def __call__(self, environ, start_response): request = Request(environ) @@ -40,6 +43,6 @@ class Application(object): return response(environ, start_response) -if __name__ == '__main__': - application = Application({'user1': 'password', 'user2': 'password'}) - run_simple('localhost', 5000, application) +if __name__ == "__main__": + application = Application({"user1": "password", "user2": "password"}) + run_simple("localhost", 5000, application) diff --git a/examples/i18nurls/__init__.py b/examples/i18nurls/__init__.py index 393d270c..f5f5c6ed 100644 --- a/examples/i18nurls/__init__.py +++ b/examples/i18nurls/__init__.py @@ -1 +1 @@ -from i18nurls.application import Application as make_app +from .application import Application as make_app diff --git a/examples/i18nurls/application.py b/examples/i18nurls/application.py index 54f12353..103f6001 100644 --- a/examples/i18nurls/application.py +++ b/examples/i18nurls/application.py @@ -1,33 +1,39 @@ -from jinja2 import Environment, PackageLoader from os import path -from werkzeug.wrappers import Request as _Request, BaseResponse + +from jinja2 import Environment +from jinja2 import PackageLoader +from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import NotFound from werkzeug.routing import RequestRedirect -from werkzeug.exceptions import HTTPException, NotFound -from i18nurls.urls import map +from werkzeug.wrappers import BaseResponse +from werkzeug.wrappers import Request as _Request + +from .urls import map -TEMPLATES = path.join(path.dirname(__file__), 'templates') +TEMPLATES = path.join(path.dirname(__file__), "templates") views = {} def expose(name): """Register the function as view.""" + def wrapped(f): views[name] = f return f + return wrapped class Request(_Request): - def __init__(self, environ, urls): super(Request, self).__init__(environ) self.urls = urls self.matched_url = None def url_for(self, endpoint, **args): - if not 'lang_code' in args: - args['lang_code'] = self.language - if endpoint == 'this': + if "lang_code" not in args: + args["lang_code"] = self.language + if endpoint == "this": endpoint = self.matched_url[0] tmp = self.matched_url[1].copy() tmp.update(args) @@ -45,12 +51,12 @@ class TemplateResponse(Response): def __init__(self, template_name, **values): self.template_name = template_name self.template_values = values - Response.__init__(self, mimetype='text/html') + Response.__init__(self, mimetype="text/html") def __call__(self, environ, start_response): - req = environ['werkzeug.request'] + req = environ["werkzeug.request"] values = self.template_values.copy() - values['req'] = req + values["req"] = req self.data = self.render_template(self.template_name, values) return super(TemplateResponse, self).__call__(environ, start_response) @@ -60,9 +66,9 @@ class TemplateResponse(Response): class Application(object): - def __init__(self): from i18nurls import views + self.not_found = views.page_not_found def __call__(self, environ, start_response): @@ -71,14 +77,14 @@ class Application(object): try: endpoint, args = urls.match(req.path) req.matched_url = (endpoint, args) - if endpoint == '#language_select': + if endpoint == "#language_select": lng = req.accept_languages.best - lng = lng and lng.split('-')[0].lower() or 'en' - index_url = urls.build('index', {'lang_code': lng}) - resp = Response('Moved to %s' % index_url, status=302) - resp.headers['Location'] = index_url + lng = lng and lng.split("-")[0].lower() or "en" + index_url = urls.build("index", {"lang_code": lng}) + resp = Response("Moved to %s" % index_url, status=302) + resp.headers["Location"] = index_url else: - req.language = args.pop('lang_code', None) + req.language = args.pop("lang_code", None) resp = views[endpoint](req, **args) except NotFound: resp = self.not_found(req) diff --git a/examples/i18nurls/urls.py b/examples/i18nurls/urls.py index 57ff68a2..3dd54a00 100644 --- a/examples/i18nurls/urls.py +++ b/examples/i18nurls/urls.py @@ -1,11 +1,18 @@ -from werkzeug.routing import Map, Rule, Submount +from werkzeug.routing import Map +from werkzeug.routing import Rule +from werkzeug.routing import Submount -map = Map([ - Rule('/', endpoint='#language_select'), - Submount('/<string(length=2):lang_code>', [ - Rule('/', endpoint='index'), - Rule('/about', endpoint='about'), - Rule('/blog/', endpoint='blog/index'), - Rule('/blog/<int:post_id>', endpoint='blog/show') - ]) -]) +map = Map( + [ + Rule("/", endpoint="#language_select"), + Submount( + "/<string(length=2):lang_code>", + [ + Rule("/", endpoint="index"), + Rule("/about", endpoint="about"), + Rule("/blog/", endpoint="blog/index"), + Rule("/blog/<int:post_id>", endpoint="blog/show"), + ], + ), + ] +) diff --git a/examples/i18nurls/views.py b/examples/i18nurls/views.py index 7b8bdb70..6c0ca895 100644 --- a/examples/i18nurls/views.py +++ b/examples/i18nurls/views.py @@ -1,22 +1,29 @@ -from i18nurls.application import TemplateResponse, Response, expose +from .application import expose +from .application import Response +from .application import TemplateResponse -@expose('index') +@expose("index") def index(req): - return TemplateResponse('index.html', title='Index') + return TemplateResponse("index.html", title="Index") -@expose('about') + +@expose("about") def about(req): - return TemplateResponse('about.html', title='About') + return TemplateResponse("about.html", title="About") + -@expose('blog/index') +@expose("blog/index") def blog_index(req): - return TemplateResponse('blog.html', title='Blog Index', mode='index') + return TemplateResponse("blog.html", title="Blog Index", mode="index") -@expose('blog/show') + +@expose("blog/show") def blog_show(req, post_id): - return TemplateResponse('blog.html', title='Blog Post #%d' % post_id, - post_id=post_id, mode='show') + return TemplateResponse( + "blog.html", title="Blog Post #%d" % post_id, post_id=post_id, mode="show" + ) + def page_not_found(req): - return Response('<h1>Page Not Found</h1>', mimetype='text/html') + return Response("<h1>Page Not Found</h1>", mimetype="text/html") diff --git a/examples/manage-coolmagic.py b/examples/manage-coolmagic.py index 8059003f..f6abe80e 100755 --- a/examples/manage-coolmagic.py +++ b/examples/manage-coolmagic.py @@ -10,9 +10,10 @@ :license: BSD-3-Clause """ import click -from coolmagic import make_app from werkzeug.serving import run_simple +from coolmagic import make_app + @click.group() def cli(): @@ -20,36 +21,45 @@ def cli(): @cli.command() -@click.option('-h', '--hostname', type=str, default='localhost', help="localhost") -@click.option('-p', '--port', type=int, default=5000, help="5000") -@click.option('--no-reloader', is_flag=True, default=False) -@click.option('--debugger', is_flag=True) -@click.option('--no-evalex', is_flag=True, default=False) -@click.option('--threaded', is_flag=True) -@click.option('--processes', type=int, default=1, help="1") +@click.option("-h", "--hostname", type=str, default="localhost", help="localhost") +@click.option("-p", "--port", type=int, default=5000, help="5000") +@click.option("--no-reloader", is_flag=True, default=False) +@click.option("--debugger", is_flag=True) +@click.option("--no-evalex", is_flag=True, default=False) +@click.option("--threaded", is_flag=True) +@click.option("--processes", type=int, default=1, help="1") def runserver(hostname, port, no_reloader, debugger, no_evalex, threaded, processes): """Start a new development server.""" app = make_app() reloader = not no_reloader evalex = not no_evalex - run_simple(hostname, port, app, - use_reloader=reloader, use_debugger=debugger, - use_evalex=evalex, threaded=threaded, processes=processes) + run_simple( + hostname, + port, + app, + use_reloader=reloader, + use_debugger=debugger, + use_evalex=evalex, + threaded=threaded, + processes=processes, + ) @cli.command() -@click.option('--no-ipython', is_flag=True, default=False) +@click.option("--no-ipython", is_flag=True, default=False) def shell(no_ipython): """Start a new interactive python session.""" - banner = 'Interactive Werkzeug Shell' + banner = "Interactive Werkzeug Shell" namespace = dict() if not no_ipython: try: try: from IPython.frontend.terminal.embed import InteractiveShellEmbed + sh = InteractiveShellEmbed.instance(banner1=banner) except ImportError: from IPython.Shell import IPShellEmbed + sh = IPShellEmbed(banner=banner) except ImportError: pass @@ -57,7 +67,9 @@ def shell(no_ipython): sh(local_ns=namespace) return from code import interact + interact(banner, local=namespace) -if __name__ == '__main__': + +if __name__ == "__main__": cli() diff --git a/examples/manage-couchy.py b/examples/manage-couchy.py index 1bbf4a67..8a00a400 100755 --- a/examples/manage-couchy.py +++ b/examples/manage-couchy.py @@ -5,17 +5,15 @@ from werkzeug.serving import run_simple def make_app(): from couchy.application import Couchy - return Couchy('http://localhost:5984') + + return Couchy("http://localhost:5984") def make_shell(): from couchy import models, utils + application = make_app() - return { - "application": application, - "models": models, - "utils": utils, - } + return {"application": application, "models": models, "utils": utils} @click.group() @@ -26,40 +24,50 @@ def cli(): @cli.command() def initdb(): from couchy.application import Couchy - Couchy('http://localhost:5984').init_database() + + Couchy("http://localhost:5984").init_database() @cli.command() -@click.option('-h', '--hostname', type=str, default='localhost', help="localhost") -@click.option('-p', '--port', type=int, default=5000, help="5000") -@click.option('--no-reloader', is_flag=True, default=False) -@click.option('--debugger', is_flag=True) -@click.option('--no-evalex', is_flag=True, default=False) -@click.option('--threaded', is_flag=True) -@click.option('--processes', type=int, default=1, help="1") +@click.option("-h", "--hostname", type=str, default="localhost", help="localhost") +@click.option("-p", "--port", type=int, default=5000, help="5000") +@click.option("--no-reloader", is_flag=True, default=False) +@click.option("--debugger", is_flag=True) +@click.option("--no-evalex", is_flag=True, default=False) +@click.option("--threaded", is_flag=True) +@click.option("--processes", type=int, default=1, help="1") def runserver(hostname, port, no_reloader, debugger, no_evalex, threaded, processes): """Start a new development server.""" app = make_app() reloader = not no_reloader evalex = not no_evalex - run_simple(hostname, port, app, - use_reloader=reloader, use_debugger=debugger, - use_evalex=evalex, threaded=threaded, processes=processes) + run_simple( + hostname, + port, + app, + use_reloader=reloader, + use_debugger=debugger, + use_evalex=evalex, + threaded=threaded, + processes=processes, + ) @cli.command() -@click.option('--no-ipython', is_flag=True, default=False) +@click.option("--no-ipython", is_flag=True, default=False) def shell(no_ipython): """Start a new interactive python session.""" - banner = 'Interactive Werkzeug Shell' + banner = "Interactive Werkzeug Shell" namespace = make_shell() if not no_ipython: try: try: from IPython.frontend.terminal.embed import InteractiveShellEmbed + sh = InteractiveShellEmbed.instance(banner1=banner) except ImportError: from IPython.Shell import IPShellEmbed + sh = IPShellEmbed(banner=banner) except ImportError: pass @@ -67,7 +75,9 @@ def shell(no_ipython): sh(local_ns=namespace) return from code import interact + interact(banner, local=namespace) -if __name__ == '__main__': + +if __name__ == "__main__": cli() diff --git a/examples/manage-cupoftee.py b/examples/manage-cupoftee.py index e905f06b..a787f324 100755 --- a/examples/manage-cupoftee.py +++ b/examples/manage-cupoftee.py @@ -14,7 +14,8 @@ from werkzeug.serving import run_simple def make_app(): from cupoftee import make_app - return make_app('/tmp/cupoftee.db') + + return make_app("/tmp/cupoftee.db") @click.group() @@ -23,20 +24,27 @@ def cli(): @cli.command() -@click.option('-h', '--hostname', type=str, default='localhost', help="localhost") -@click.option('-p', '--port', type=int, default=5000, help="5000") -@click.option('--reloader', is_flag=True, default=False) -@click.option('--debugger', is_flag=True) -@click.option('--evalex', is_flag=True, default=False) -@click.option('--threaded', is_flag=True) -@click.option('--processes', type=int, default=1, help="1") +@click.option("-h", "--hostname", type=str, default="localhost", help="localhost") +@click.option("-p", "--port", type=int, default=5000, help="5000") +@click.option("--reloader", is_flag=True, default=False) +@click.option("--debugger", is_flag=True) +@click.option("--evalex", is_flag=True, default=False) +@click.option("--threaded", is_flag=True) +@click.option("--processes", type=int, default=1, help="1") def runserver(hostname, port, reloader, debugger, evalex, threaded, processes): """Start a new development server.""" app = make_app() - run_simple(hostname, port, app, - use_reloader=reloader, use_debugger=debugger, - use_evalex=evalex, threaded=threaded, processes=processes) - - -if __name__ == '__main__': + run_simple( + hostname, + port, + app, + use_reloader=reloader, + use_debugger=debugger, + use_evalex=evalex, + threaded=threaded, + processes=processes, + ) + + +if __name__ == "__main__": cli() diff --git a/examples/manage-i18nurls.py b/examples/manage-i18nurls.py index 04a5068e..a2b746a2 100755 --- a/examples/manage-i18nurls.py +++ b/examples/manage-i18nurls.py @@ -10,9 +10,10 @@ :license: BSD-3-Clause """ import click -from i18nurls import make_app from werkzeug.serving import run_simple +from i18nurls import make_app + @click.group() def cli(): @@ -20,36 +21,45 @@ def cli(): @cli.command() -@click.option('-h', '--hostname', type=str, default='localhost', help="localhost") -@click.option('-p', '--port', type=int, default=5000, help="5000") -@click.option('--no-reloader', is_flag=True, default=False) -@click.option('--debugger', is_flag=True) -@click.option('--no-evalex', is_flag=True, default=False) -@click.option('--threaded', is_flag=True) -@click.option('--processes', type=int, default=1, help="1") +@click.option("-h", "--hostname", type=str, default="localhost", help="localhost") +@click.option("-p", "--port", type=int, default=5000, help="5000") +@click.option("--no-reloader", is_flag=True, default=False) +@click.option("--debugger", is_flag=True) +@click.option("--no-evalex", is_flag=True, default=False) +@click.option("--threaded", is_flag=True) +@click.option("--processes", type=int, default=1, help="1") def runserver(hostname, port, no_reloader, debugger, no_evalex, threaded, processes): """Start a new development server.""" app = make_app() reloader = not no_reloader evalex = not no_evalex - run_simple(hostname, port, app, - use_reloader=reloader, use_debugger=debugger, - use_evalex=evalex, threaded=threaded, processes=processes) + run_simple( + hostname, + port, + app, + use_reloader=reloader, + use_debugger=debugger, + use_evalex=evalex, + threaded=threaded, + processes=processes, + ) @cli.command() -@click.option('--no-ipython', is_flag=True, default=False) +@click.option("--no-ipython", is_flag=True, default=False) def shell(no_ipython): """Start a new interactive python session.""" - banner = 'Interactive Werkzeug Shell' + banner = "Interactive Werkzeug Shell" namespace = dict() if not no_ipython: try: try: from IPython.frontend.terminal.embed import InteractiveShellEmbed + sh = InteractiveShellEmbed.instance(banner1=banner) except ImportError: from IPython.Shell import IPShellEmbed + sh = IPShellEmbed(banner=banner) except ImportError: pass @@ -57,7 +67,9 @@ def shell(no_ipython): sh(local_ns=namespace) return from code import interact + interact(banner, local=namespace) -if __name__ == '__main__': + +if __name__ == "__main__": cli() diff --git a/examples/manage-plnt.py b/examples/manage-plnt.py index 8ce5b229..80a03f78 100755 --- a/examples/manage-plnt.py +++ b/examples/manage-plnt.py @@ -9,16 +9,18 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -import click import os + +import click from werkzeug.serving import run_simple def make_app(): """Helper function that creates a plnt app.""" from plnt import Plnt - database_uri = os.environ.get('PLNT_DATABASE_URI') - app = Plnt(database_uri or 'sqlite:////tmp/plnt.db') + + database_uri = os.environ.get("PLNT_DATABASE_URI") + app = Plnt(database_uri or "sqlite:////tmp/plnt.db") app.bind_to_context() return app @@ -32,62 +34,88 @@ def cli(): def initdb(): """Initialize the database""" from plnt.database import Blog, session + make_app().init_database() # and now fill in some python blogs everybody should read (shamelessly # added my own blog too) blogs = [ - Blog('Armin Ronacher', 'http://lucumr.pocoo.org/', - 'http://lucumr.pocoo.org/cogitations/feed/'), - Blog('Georg Brandl', 'http://pyside.blogspot.com/', - 'http://pyside.blogspot.com/feeds/posts/default'), - Blog('Ian Bicking', 'http://blog.ianbicking.org/', - 'http://blog.ianbicking.org/feed/'), - Blog('Amir Salihefendic', 'http://amix.dk/', - 'http://feeds.feedburner.com/amixdk'), - Blog('Christopher Lenz', 'http://www.cmlenz.net/blog/', - 'http://www.cmlenz.net/blog/atom.xml'), - Blog('Frederick Lundh', 'http://online.effbot.org/', - 'http://online.effbot.org/rss.xml') + Blog( + "Armin Ronacher", + "http://lucumr.pocoo.org/", + "http://lucumr.pocoo.org/cogitations/feed/", + ), + Blog( + "Georg Brandl", + "http://pyside.blogspot.com/", + "http://pyside.blogspot.com/feeds/posts/default", + ), + Blog( + "Ian Bicking", + "http://blog.ianbicking.org/", + "http://blog.ianbicking.org/feed/", + ), + Blog( + "Amir Salihefendic", "http://amix.dk/", "http://feeds.feedburner.com/amixdk" + ), + Blog( + "Christopher Lenz", + "http://www.cmlenz.net/blog/", + "http://www.cmlenz.net/blog/atom.xml", + ), + Blog( + "Frederick Lundh", + "http://online.effbot.org/", + "http://online.effbot.org/rss.xml", + ), ] # okay. got tired here. if someone feels that he is missing, drop me # a line ;-) for blog in blogs: session.add(blog) session.commit() - click.echo('Initialized database, now run manage-plnt.py sync to get the posts') + click.echo("Initialized database, now run manage-plnt.py sync to get the posts") @cli.command() -@click.option('-h', '--hostname', type=str, default='localhost', help="localhost") -@click.option('-p', '--port', type=int, default=5000, help="5000") -@click.option('--no-reloader', is_flag=True, default=False) -@click.option('--debugger', is_flag=True) -@click.option('--no-evalex', is_flag=True, default=False) -@click.option('--threaded', is_flag=True) -@click.option('--processes', type=int, default=1, help="1") +@click.option("-h", "--hostname", type=str, default="localhost", help="localhost") +@click.option("-p", "--port", type=int, default=5000, help="5000") +@click.option("--no-reloader", is_flag=True, default=False) +@click.option("--debugger", is_flag=True) +@click.option("--no-evalex", is_flag=True, default=False) +@click.option("--threaded", is_flag=True) +@click.option("--processes", type=int, default=1, help="1") def runserver(hostname, port, no_reloader, debugger, no_evalex, threaded, processes): """Start a new development server.""" app = make_app() reloader = not no_reloader evalex = not no_evalex - run_simple(hostname, port, app, - use_reloader=reloader, use_debugger=debugger, - use_evalex=evalex, threaded=threaded, processes=processes) + run_simple( + hostname, + port, + app, + use_reloader=reloader, + use_debugger=debugger, + use_evalex=evalex, + threaded=threaded, + processes=processes, + ) @cli.command() -@click.option('--no-ipython', is_flag=True, default=False) +@click.option("--no-ipython", is_flag=True, default=False) def shell(no_ipython): """Start a new interactive python session.""" - banner = 'Interactive Werkzeug Shell' - namespace = {'app': make_app()} + banner = "Interactive Werkzeug Shell" + namespace = {"app": make_app()} if not no_ipython: try: try: from IPython.frontend.terminal.embed import InteractiveShellEmbed + sh = InteractiveShellEmbed.instance(banner1=banner) except ImportError: from IPython.Shell import IPShellEmbed + sh = IPShellEmbed(banner=banner) except ImportError: pass @@ -95,6 +123,7 @@ def shell(no_ipython): sh(local_ns=namespace) return from code import interact + interact(banner, local=namespace) @@ -102,8 +131,10 @@ def shell(no_ipython): def sync(): """Sync the blogs in the planet. Call this from a cronjob.""" from plnt.sync import sync + make_app().bind_to_context() sync() -if __name__ == '__main__': + +if __name__ == "__main__": cli() diff --git a/examples/manage-shorty.py b/examples/manage-shorty.py index d80eccab..80fb3731 100755 --- a/examples/manage-shorty.py +++ b/examples/manage-shorty.py @@ -1,24 +1,23 @@ #!/usr/bin/env python -import click import os import tempfile + +import click from werkzeug.serving import run_simple def make_app(): from shorty.application import Shorty + filename = os.path.join(tempfile.gettempdir(), "shorty.db") - return Shorty('sqlite:///{0}'.format(filename)) + return Shorty("sqlite:///{0}".format(filename)) def make_shell(): from shorty import models, utils + application = make_app() - return { - "application": application, - "models": models, - "utils": utils, - } + return {"application": application, "models": models, "utils": utils} @click.group() @@ -32,36 +31,45 @@ def initdb(): @cli.command() -@click.option('-h', '--hostname', type=str, default='localhost', help="localhost") -@click.option('-p', '--port', type=int, default=5000, help="5000") -@click.option('--no-reloader', is_flag=True, default=False) -@click.option('--debugger', is_flag=True) -@click.option('--no-evalex', is_flag=True, default=False) -@click.option('--threaded', is_flag=True) -@click.option('--processes', type=int, default=1, help="1") +@click.option("-h", "--hostname", type=str, default="localhost", help="localhost") +@click.option("-p", "--port", type=int, default=5000, help="5000") +@click.option("--no-reloader", is_flag=True, default=False) +@click.option("--debugger", is_flag=True) +@click.option("--no-evalex", is_flag=True, default=False) +@click.option("--threaded", is_flag=True) +@click.option("--processes", type=int, default=1, help="1") def runserver(hostname, port, no_reloader, debugger, no_evalex, threaded, processes): """Start a new development server.""" app = make_app() reloader = not no_reloader evalex = not no_evalex - run_simple(hostname, port, app, - use_reloader=reloader, use_debugger=debugger, - use_evalex=evalex, threaded=threaded, processes=processes) + run_simple( + hostname, + port, + app, + use_reloader=reloader, + use_debugger=debugger, + use_evalex=evalex, + threaded=threaded, + processes=processes, + ) @cli.command() -@click.option('--no-ipython', is_flag=True, default=False) +@click.option("--no-ipython", is_flag=True, default=False) def shell(no_ipython): """Start a new interactive python session.""" - banner = 'Interactive Werkzeug Shell' + banner = "Interactive Werkzeug Shell" namespace = make_shell() if not no_ipython: try: try: from IPython.frontend.terminal.embed import InteractiveShellEmbed + sh = InteractiveShellEmbed.instance(banner1=banner) except ImportError: from IPython.Shell import IPShellEmbed + sh = IPShellEmbed(banner=banner) except ImportError: pass @@ -69,7 +77,9 @@ def shell(no_ipython): sh(local_ns=namespace) return from code import interact + interact(banner, local=namespace) -if __name__ == '__main__': + +if __name__ == "__main__": cli() diff --git a/examples/manage-simplewiki.py b/examples/manage-simplewiki.py index 3e23ceb1..5fd66b85 100755 --- a/examples/manage-simplewiki.py +++ b/examples/manage-simplewiki.py @@ -9,26 +9,26 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -import click import os + +import click from werkzeug.serving import run_simple def make_wiki(): """Helper function that creates a new wiki instance.""" from simplewiki import SimpleWiki - database_uri = os.environ.get('SIMPLEWIKI_DATABASE_URI') - return SimpleWiki(database_uri or 'sqlite:////tmp/simplewiki.db') + + database_uri = os.environ.get("SIMPLEWIKI_DATABASE_URI") + return SimpleWiki(database_uri or "sqlite:////tmp/simplewiki.db") def make_shell(): from simplewiki import database + wiki = make_wiki() wiki.bind_to_context() - return { - 'wiki': wiki, - 'db': database - } + return {"wiki": wiki, "db": database} @click.group() @@ -42,36 +42,45 @@ def initdb(): @cli.command() -@click.option('-h', '--hostname', type=str, default='localhost', help="localhost") -@click.option('-p', '--port', type=int, default=5000, help="5000") -@click.option('--no-reloader', is_flag=True, default=False) -@click.option('--debugger', is_flag=True) -@click.option('--no-evalex', is_flag=True, default=False) -@click.option('--threaded', is_flag=True) -@click.option('--processes', type=int, default=1, help="1") +@click.option("-h", "--hostname", type=str, default="localhost", help="localhost") +@click.option("-p", "--port", type=int, default=5000, help="5000") +@click.option("--no-reloader", is_flag=True, default=False) +@click.option("--debugger", is_flag=True) +@click.option("--no-evalex", is_flag=True, default=False) +@click.option("--threaded", is_flag=True) +@click.option("--processes", type=int, default=1, help="1") def runserver(hostname, port, no_reloader, debugger, no_evalex, threaded, processes): """Start a new development server.""" app = make_wiki() reloader = not no_reloader evalex = not no_evalex - run_simple(hostname, port, app, - use_reloader=reloader, use_debugger=debugger, - use_evalex=evalex, threaded=threaded, processes=processes) + run_simple( + hostname, + port, + app, + use_reloader=reloader, + use_debugger=debugger, + use_evalex=evalex, + threaded=threaded, + processes=processes, + ) @cli.command() -@click.option('--no-ipython', is_flag=True, default=False) +@click.option("--no-ipython", is_flag=True, default=False) def shell(no_ipython): """Start a new interactive python session.""" - banner = 'Interactive Werkzeug Shell' + banner = "Interactive Werkzeug Shell" namespace = make_shell() if not no_ipython: try: try: from IPython.frontend.terminal.embed import InteractiveShellEmbed + sh = InteractiveShellEmbed.instance(banner1=banner) except ImportError: from IPython.Shell import IPShellEmbed + sh = IPShellEmbed(banner=banner) except ImportError: pass @@ -79,7 +88,9 @@ def shell(no_ipython): sh(local_ns=namespace) return from code import interact + interact(banner, local=namespace) -if __name__ == '__main__': + +if __name__ == "__main__": cli() diff --git a/examples/manage-webpylike.py b/examples/manage-webpylike.py index 2e77714d..010bdcee 100755 --- a/examples/manage-webpylike.py +++ b/examples/manage-webpylike.py @@ -13,13 +13,16 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -import click import os import sys -sys.path.append(os.path.join(os.path.dirname(__file__), 'webpylike')) -from webpylike.example import app + +import click from werkzeug.serving import run_simple +from webpylike.example import app + +sys.path.append(os.path.join(os.path.dirname(__file__), "webpylike")) + @click.group() def cli(): @@ -27,35 +30,44 @@ def cli(): @cli.command() -@click.option('-h', '--hostname', type=str, default='localhost', help="localhost") -@click.option('-p', '--port', type=int, default=5000, help="5000") -@click.option('--no-reloader', is_flag=True, default=False) -@click.option('--debugger', is_flag=True) -@click.option('--no-evalex', is_flag=True, default=False) -@click.option('--threaded', is_flag=True) -@click.option('--processes', type=int, default=1, help="1") +@click.option("-h", "--hostname", type=str, default="localhost", help="localhost") +@click.option("-p", "--port", type=int, default=5000, help="5000") +@click.option("--no-reloader", is_flag=True, default=False) +@click.option("--debugger", is_flag=True) +@click.option("--no-evalex", is_flag=True, default=False) +@click.option("--threaded", is_flag=True) +@click.option("--processes", type=int, default=1, help="1") def runserver(hostname, port, no_reloader, debugger, no_evalex, threaded, processes): """Start a new development server.""" reloader = not no_reloader evalex = not no_evalex - run_simple(hostname, port, app, - use_reloader=reloader, use_debugger=debugger, - use_evalex=evalex, threaded=threaded, processes=processes) + run_simple( + hostname, + port, + app, + use_reloader=reloader, + use_debugger=debugger, + use_evalex=evalex, + threaded=threaded, + processes=processes, + ) @cli.command() -@click.option('--no-ipython', is_flag=True, default=False) +@click.option("--no-ipython", is_flag=True, default=False) def shell(no_ipython): """Start a new interactive python session.""" - banner = 'Interactive Werkzeug Shell' + banner = "Interactive Werkzeug Shell" namespace = dict() if not no_ipython: try: try: from IPython.frontend.terminal.embed import InteractiveShellEmbed + sh = InteractiveShellEmbed.instance(banner1=banner) except ImportError: from IPython.Shell import IPShellEmbed + sh = IPShellEmbed(banner=banner) except ImportError: pass @@ -63,7 +75,9 @@ def shell(no_ipython): sh(local_ns=namespace) return from code import interact + interact(banner, local=namespace) -if __name__ == '__main__': + +if __name__ == "__main__": cli() diff --git a/examples/partial/complex_routing.py b/examples/partial/complex_routing.py index 18ce7b0a..596d00e4 100644 --- a/examples/partial/complex_routing.py +++ b/examples/partial/complex_routing.py @@ -1,19 +1,43 @@ -from werkzeug.routing import Map, Rule, Subdomain, Submount, EndpointPrefix +from werkzeug.routing import EndpointPrefix +from werkzeug.routing import Map +from werkzeug.routing import Rule +from werkzeug.routing import Subdomain +from werkzeug.routing import Submount -m = Map([ - # Static URLs - EndpointPrefix('static/', [ - Rule('/', endpoint='index'), - Rule('/about', endpoint='about'), - Rule('/help', endpoint='help'), - ]), - # Knowledge Base - Subdomain('kb', [EndpointPrefix('kb/', [ - Rule('/', endpoint='index'), - Submount('/browse', [ - Rule('/', endpoint='browse'), - Rule('/<int:id>/', defaults={'page': 1}, endpoint='browse'), - Rule('/<int:id>/<int:page>', endpoint='browse') - ]) - ])]) -]) +m = Map( + [ + # Static URLs + EndpointPrefix( + "static/", + [ + Rule("/", endpoint="index"), + Rule("/about", endpoint="about"), + Rule("/help", endpoint="help"), + ], + ), + # Knowledge Base + Subdomain( + "kb", + [ + EndpointPrefix( + "kb/", + [ + Rule("/", endpoint="index"), + Submount( + "/browse", + [ + Rule("/", endpoint="browse"), + Rule( + "/<int:id>/", + defaults={"page": 1}, + endpoint="browse", + ), + Rule("/<int:id>/<int:page>", endpoint="browse"), + ], + ), + ], + ) + ], + ), + ] +) diff --git a/examples/plnt/__init__.py b/examples/plnt/__init__.py index f1bb94f7..7ade4b72 100644 --- a/examples/plnt/__init__.py +++ b/examples/plnt/__init__.py @@ -8,4 +8,4 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from plnt.webapp import Plnt +from .webapp import Plnt diff --git a/examples/plnt/database.py b/examples/plnt/database.py index c74abb84..78e4ab6a 100644 --- a/examples/plnt/database.py +++ b/examples/plnt/database.py @@ -8,62 +8,73 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from sqlalchemy import MetaData, Table, Column, ForeignKey, \ - Integer, String, DateTime -from sqlalchemy.orm import dynamic_loader, scoped_session, create_session, \ - mapper -from plnt.utils import application, local_manager +from sqlalchemy import Column +from sqlalchemy import DateTime +from sqlalchemy import ForeignKey +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.orm import create_session +from sqlalchemy.orm import dynamic_loader +from sqlalchemy.orm import mapper +from sqlalchemy.orm import scoped_session + +from .utils import application +from .utils import local_manager def new_db_session(): - return create_session(application.database_engine, autoflush=True, - autocommit=False) + return create_session(application.database_engine, autoflush=True, autocommit=False) + metadata = MetaData() session = scoped_session(new_db_session, local_manager.get_ident) -blog_table = Table('blogs', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(120)), - Column('description', String), - Column('url', String(200)), - Column('feed_url', String(250)) +blog_table = Table( + "blogs", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(120)), + Column("description", String), + Column("url", String(200)), + Column("feed_url", String(250)), ) -entry_table = Table('entries', metadata, - Column('id', Integer, primary_key=True), - Column('blog_id', Integer, ForeignKey('blogs.id')), - Column('guid', String(200), unique=True), - Column('title', String(140)), - Column('url', String(200)), - Column('text', String), - Column('pub_date', DateTime), - Column('last_update', DateTime) +entry_table = Table( + "entries", + metadata, + Column("id", Integer, primary_key=True), + Column("blog_id", Integer, ForeignKey("blogs.id")), + Column("guid", String(200), unique=True), + Column("title", String(140)), + Column("url", String(200)), + Column("text", String), + Column("pub_date", DateTime), + Column("last_update", DateTime), ) class Blog(object): query = session.query_property() - def __init__(self, name, url, feed_url, description=u''): + def __init__(self, name, url, feed_url, description=u""): self.name = name self.url = url self.feed_url = feed_url self.description = description def __repr__(self): - return '<%s %r>' % (self.__class__.__name__, self.url) + return "<%s %r>" % (self.__class__.__name__, self.url) class Entry(object): query = session.query_property() def __repr__(self): - return '<%s %r>' % (self.__class__.__name__, self.guid) + return "<%s %r>" % (self.__class__.__name__, self.guid) mapper(Entry, entry_table) -mapper(Blog, blog_table, properties=dict( - entries=dynamic_loader(Entry, backref='blog') -)) +mapper(Blog, blog_table, properties=dict(entries=dynamic_loader(Entry, backref="blog"))) diff --git a/examples/plnt/sync.py b/examples/plnt/sync.py index b0265d85..e12ab844 100644 --- a/examples/plnt/sync.py +++ b/examples/plnt/sync.py @@ -8,14 +8,19 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -import feedparser from datetime import datetime + +import feedparser from werkzeug.utils import escape -from plnt.database import Blog, Entry, session -from plnt.utils import strip_tags, nl2p + +from .database import Blog +from .database import Entry +from .database import session +from .utils import nl2p +from .utils import strip_tags -HTML_MIMETYPES = {'text/html', 'application/xhtml+xml'} +HTML_MIMETYPES = {"text/html", "application/xhtml+xml"} def sync(): @@ -31,7 +36,7 @@ def sync(): for entry in feed.entries: # get the guid. either the id if specified, otherwise the link. # if none is available we skip the entry. - guid = entry.get('id') or entry.get('link') + guid = entry.get("id") or entry.get("link") if not guid: continue @@ -41,17 +46,18 @@ def sync(): # get title, url and text. skip if no title or no text is # given. if the link is missing we use the blog link. - if 'title_detail' in entry: - title = entry.title_detail.get('value') or '' - if entry.title_detail.get('type') in HTML_MIMETYPES: + if "title_detail" in entry: + title = entry.title_detail.get("value") or "" + if entry.title_detail.get("type") in HTML_MIMETYPES: title = strip_tags(title) else: title = escape(title) else: - title = entry.get('title') - url = entry.get('link') or blog.blog_url - text = 'content' in entry and entry.content[0] or \ - entry.get('summary_detail') + title = entry.get("title") + url = entry.get("link") or blog.blog_url + text = ( + "content" in entry and entry.content[0] or entry.get("summary_detail") + ) if not title or not text: continue @@ -59,10 +65,10 @@ def sync(): # if we have an html text we use that, otherwise we HTML # escape the text and use that one. We also handle XHTML # with our tag soup parser for the moment. - if text.get('type') not in HTML_MIMETYPES: - text = escape(nl2p(text.get('value') or '')) + if text.get("type") not in HTML_MIMETYPES: + text = escape(nl2p(text.get("value") or "")) else: - text = text.get('value') or '' + text = text.get("value") or "" # no text? continue if not text.strip(): @@ -70,10 +76,12 @@ def sync(): # get the pub date and updated date. This is rather complex # because different feeds do different stuff - pub_date = entry.get('published_parsed') or \ - entry.get('created_parsed') or \ - entry.get('date_parsed') - updated = entry.get('updated_parsed') or pub_date + pub_date = ( + entry.get("published_parsed") + or entry.get("created_parsed") + or entry.get("date_parsed") + ) + updated = entry.get("updated_parsed") or pub_date pub_date = pub_date or updated # if we don't have a pub_date we skip. diff --git a/examples/plnt/utils.py b/examples/plnt/utils.py index 957c28da..5c6f0d0b 100644 --- a/examples/plnt/utils.py +++ b/examples/plnt/utils.py @@ -10,13 +10,16 @@ """ import re from os import path -from jinja2 import Environment, FileSystemLoader +from jinja2 import Environment +from jinja2 import FileSystemLoader from werkzeug._compat import unichr -from werkzeug.local import Local, LocalManager +from werkzeug.local import Local +from werkzeug.local import LocalManager +from werkzeug.routing import Map +from werkzeug.routing import Rule from werkzeug.utils import cached_property from werkzeug.wrappers import Response -from werkzeug.routing import Map, Rule # context locals. these two objects are use by the application to @@ -29,24 +32,24 @@ local_manager = LocalManager([local]) # proxy objects -request = local('request') -application = local('application') -url_adapter = local('url_adapter') +request = local("request") +application = local("application") +url_adapter = local("url_adapter") # let's use jinja for templates this time -template_path = path.join(path.dirname(__file__), 'templates') +template_path = path.join(path.dirname(__file__), "templates") jinja_env = Environment(loader=FileSystemLoader(template_path)) # the collected url patterns -url_map = Map([Rule('/shared/<path:file>', endpoint='shared')]) +url_map = Map([Rule("/shared/<path:file>", endpoint="shared")]) endpoints = {} -_par_re = re.compile(r'\n{2,}') -_entity_re = re.compile(r'&([^;]+);') -_striptags_re = re.compile(r'(<!--.*-->|<[^>]*>)') +_par_re = re.compile(r"\n{2,}") +_entity_re = re.compile(r"&([^;]+);") +_striptags_re = re.compile(r"(<!--.*-->|<[^>]*>)") try: from html.entities import name2codepoint @@ -54,30 +57,32 @@ except ImportError: from htmlentitydefs import name2codepoint html_entities = name2codepoint.copy() -html_entities['apos'] = 39 +html_entities["apos"] = 39 del name2codepoint def expose(url_rule, endpoint=None, **kwargs): """Expose this function to the web layer.""" + def decorate(f): e = endpoint or f.__name__ endpoints[e] = f url_map.add(Rule(url_rule, endpoint=e, **kwargs)) return f + return decorate def render_template(template_name, **context): """Render a template into a response.""" tmpl = jinja_env.get_template(template_name) - context['url_for'] = url_for - return Response(tmpl.render(context), mimetype='text/html') + context["url_for"] = url_for + return Response(tmpl.render(context), mimetype="text/html") def nl2p(s): """Add paragraphs to a text.""" - return u'\n'.join(u'<p>%s</p>' % p for p in _par_re.split(s)) + return u"\n".join(u"<p>%s</p>" % p for p in _par_re.split(s)) def url_for(endpoint, **kw): @@ -87,22 +92,24 @@ def url_for(endpoint, **kw): def strip_tags(s): """Resolve HTML entities and remove tags from a string.""" + def handle_match(m): name = m.group(1) if name in html_entities: return unichr(html_entities[name]) - if name[:2] in ('#x', '#X'): + if name[:2] in ("#x", "#X"): try: return unichr(int(name[2:], 16)) except ValueError: - return u'' - elif name.startswith('#'): + return u"" + elif name.startswith("#"): try: return unichr(int(name[1:])) except ValueError: - return u'' - return u'' - return _entity_re.sub(handle_match, _striptags_re.sub('', s)) + return u"" + return u"" + + return _entity_re.sub(handle_match, _striptags_re.sub("", s)) class Pagination(object): @@ -118,8 +125,11 @@ class Pagination(object): @cached_property def entries(self): - return self.query.offset((self.page - 1) * self.per_page) \ - .limit(self.per_page).all() + return ( + self.query.offset((self.page - 1) * self.per_page) + .limit(self.per_page) + .all() + ) @cached_property def count(self): diff --git a/examples/plnt/views.py b/examples/plnt/views.py index 7397531c..d64e98e3 100644 --- a/examples/plnt/views.py +++ b/examples/plnt/views.py @@ -9,32 +9,35 @@ :license: BSD-3-Clause """ from datetime import date -from plnt.database import Entry -from plnt.utils import Pagination, expose, render_template + +from .database import Entry +from .utils import expose +from .utils import Pagination +from .utils import render_template #: number of items per page PER_PAGE = 30 -@expose('/', defaults={'page': 1}) -@expose('/page/<int:page>') +@expose("/", defaults={"page": 1}) +@expose("/page/<int:page>") def index(request, page): """Show the index page or any an offset of it.""" days = [] days_found = set() query = Entry.query.order_by(Entry.pub_date.desc()) - pagination = Pagination(query, PER_PAGE, page, 'index') + pagination = Pagination(query, PER_PAGE, page, "index") for entry in pagination.entries: day = date(*entry.pub_date.timetuple()[:3]) if day not in days_found: days_found.add(day) - days.append({'date': day, 'entries': []}) - days[-1]['entries'].append(entry) - return render_template('index.html', days=days, pagination=pagination) + days.append({"date": day, "entries": []}) + days[-1]["entries"].append(entry) + return render_template("index.html", days=days, pagination=pagination) -@expose('/about') +@expose("/about") def about(request): """Show the about page, so that we have another view func ;-)""" - return render_template('about.html') + return render_template("about.html") diff --git a/examples/plnt/webapp.py b/examples/plnt/webapp.py index b571841a..dee7336b 100644 --- a/examples/plnt/webapp.py +++ b/examples/plnt/webapp.py @@ -9,31 +9,31 @@ :license: BSD-3-Clause """ from os import path -from sqlalchemy import create_engine +from sqlalchemy import create_engine +from werkzeug.exceptions import HTTPException from werkzeug.middleware.shared_data import SharedDataMiddleware from werkzeug.wrappers import Request from werkzeug.wsgi import ClosingIterator -from werkzeug.exceptions import HTTPException -from plnt.utils import local, local_manager, url_map, endpoints -from plnt.database import session, metadata -# import the views module because it contains setup code -import plnt.views +from . import views # noqa: F401 +from .database import metadata +from .database import session +from .utils import endpoints +from .utils import local +from .utils import local_manager +from .utils import url_map #: path to shared data -SHARED_DATA = path.join(path.dirname(__file__), 'shared') +SHARED_DATA = path.join(path.dirname(__file__), "shared") class Plnt(object): - def __init__(self, database_uri): self.database_engine = create_engine(database_uri) self._dispatch = local_manager.middleware(self.dispatch_request) - self._dispatch = SharedDataMiddleware(self._dispatch, { - '/shared': SHARED_DATA - }) + self._dispatch = SharedDataMiddleware(self._dispatch, {"/shared": SHARED_DATA}) def init_database(self): metadata.create_all(self.database_engine) @@ -50,8 +50,7 @@ class Plnt(object): response = endpoints[endpoint](request, **values) except HTTPException as e: response = e - return ClosingIterator(response(environ, start_response), - session.remove) + return ClosingIterator(response(environ, start_response), session.remove) def __call__(self, environ, start_response): return self._dispatch(environ, start_response) diff --git a/examples/shortly/shortly.py b/examples/shortly/shortly.py index 3446de0f..a3ff9f7f 100644 --- a/examples/shortly/shortly.py +++ b/examples/shortly/shortly.py @@ -9,32 +9,35 @@ :license: BSD-3-Clause """ import os -import redis +import redis +from jinja2 import Environment +from jinja2 import FileSystemLoader +from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import NotFound from werkzeug.middleware.shared_data import SharedDataMiddleware +from werkzeug.routing import Map +from werkzeug.routing import Rule from werkzeug.urls import url_parse -from werkzeug.wrappers import Request, Response -from werkzeug.routing import Map, Rule -from werkzeug.exceptions import HTTPException, NotFound from werkzeug.utils import redirect - -from jinja2 import Environment, FileSystemLoader +from werkzeug.wrappers import Request +from werkzeug.wrappers import Response def base36_encode(number): - assert number >= 0, 'positive integer required' + assert number >= 0, "positive integer required" if number == 0: - return '0' + return "0" base36 = [] while number != 0: number, i = divmod(number, 36) - base36.append('0123456789abcdefghijklmnopqrstuvwxyz'[i]) - return ''.join(reversed(base36)) + base36.append("0123456789abcdefghijklmnopqrstuvwxyz"[i]) + return "".join(reversed(base36)) def is_valid_url(url): parts = url_parse(url) - return parts.scheme in ('http', 'https') + return parts.scheme in ("http", "https") def get_hostname(url): @@ -42,74 +45,77 @@ def get_hostname(url): class Shortly(object): - def __init__(self, config): - self.redis = redis.Redis(config['redis_host'], config['redis_port']) - template_path = os.path.join(os.path.dirname(__file__), 'templates') - self.jinja_env = Environment(loader=FileSystemLoader(template_path), - autoescape=True) - self.jinja_env.filters['hostname'] = get_hostname - - self.url_map = Map([ - Rule('/', endpoint='new_url'), - Rule('/<short_id>', endpoint='follow_short_link'), - Rule('/<short_id>+', endpoint='short_link_details') - ]) + self.redis = redis.Redis(config["redis_host"], config["redis_port"]) + template_path = os.path.join(os.path.dirname(__file__), "templates") + self.jinja_env = Environment( + loader=FileSystemLoader(template_path), autoescape=True + ) + self.jinja_env.filters["hostname"] = get_hostname + + self.url_map = Map( + [ + Rule("/", endpoint="new_url"), + Rule("/<short_id>", endpoint="follow_short_link"), + Rule("/<short_id>+", endpoint="short_link_details"), + ] + ) def on_new_url(self, request): error = None - url = '' - if request.method == 'POST': - url = request.form['url'] + url = "" + if request.method == "POST": + url = request.form["url"] if not is_valid_url(url): - error = 'Please enter a valid URL' + error = "Please enter a valid URL" else: short_id = self.insert_url(url) - return redirect('/%s+' % short_id) - return self.render_template('new_url.html', error=error, url=url) + return redirect("/%s+" % short_id) + return self.render_template("new_url.html", error=error, url=url) def on_follow_short_link(self, request, short_id): - link_target = self.redis.get('url-target:' + short_id) + link_target = self.redis.get("url-target:" + short_id) if link_target is None: raise NotFound() - self.redis.incr('click-count:' + short_id) + self.redis.incr("click-count:" + short_id) return redirect(link_target) def on_short_link_details(self, request, short_id): - link_target = self.redis.get('url-target:' + short_id) + link_target = self.redis.get("url-target:" + short_id) if link_target is None: raise NotFound() - click_count = int(self.redis.get('click-count:' + short_id) or 0) - return self.render_template('short_link_details.html', + click_count = int(self.redis.get("click-count:" + short_id) or 0) + return self.render_template( + "short_link_details.html", link_target=link_target, short_id=short_id, - click_count=click_count + click_count=click_count, ) def error_404(self): - response = self.render_template('404.html') + response = self.render_template("404.html") response.status_code = 404 return response def insert_url(self, url): - short_id = self.redis.get('reverse-url:' + url) + short_id = self.redis.get("reverse-url:" + url) if short_id is not None: return short_id - url_num = self.redis.incr('last-url-id') + url_num = self.redis.incr("last-url-id") short_id = base36_encode(url_num) - self.redis.set('url-target:' + short_id, url) - self.redis.set('reverse-url:' + url, short_id) + self.redis.set("url-target:" + short_id, url) + self.redis.set("reverse-url:" + url, short_id) return short_id def render_template(self, template_name, **context): t = self.jinja_env.get_template(template_name) - return Response(t.render(context), mimetype='text/html') + return Response(t.render(context), mimetype="text/html") def dispatch_request(self, request): adapter = self.url_map.bind_to_environ(request.environ) try: endpoint, values = adapter.match() - return getattr(self, 'on_' + endpoint)(request, **values) + return getattr(self, "on_" + endpoint)(request, **values) except NotFound: return self.error_404() except HTTPException as e: @@ -124,19 +130,17 @@ class Shortly(object): return self.wsgi_app(environ, start_response) -def create_app(redis_host='localhost', redis_port=6379, with_static=True): - app = Shortly({ - 'redis_host': redis_host, - 'redis_port': redis_port - }) +def create_app(redis_host="localhost", redis_port=6379, with_static=True): + app = Shortly({"redis_host": redis_host, "redis_port": redis_port}) if with_static: - app.wsgi_app = SharedDataMiddleware(app.wsgi_app, { - '/static': os.path.join(os.path.dirname(__file__), 'static') - }) + app.wsgi_app = SharedDataMiddleware( + app.wsgi_app, {"/static": os.path.join(os.path.dirname(__file__), "static")} + ) return app -if __name__ == '__main__': +if __name__ == "__main__": from werkzeug.serving import run_simple + app = create_app() - run_simple('127.0.0.1', 5000, app, use_debugger=True, use_reloader=True) + run_simple("127.0.0.1", 5000, app, use_debugger=True, use_reloader=True) diff --git a/examples/shorty/application.py b/examples/shorty/application.py index cfec075f..af793444 100644 --- a/examples/shorty/application.py +++ b/examples/shorty/application.py @@ -1,24 +1,25 @@ from sqlalchemy import create_engine - +from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import NotFound from werkzeug.middleware.shared_data import SharedDataMiddleware from werkzeug.wrappers import Request from werkzeug.wsgi import ClosingIterator -from werkzeug.exceptions import HTTPException, NotFound -from shorty.utils import STATIC_PATH, session, local, local_manager, \ - metadata, url_map -from shorty import views +from . import views +from .utils import local +from .utils import local_manager +from .utils import metadata +from .utils import session +from .utils import STATIC_PATH +from .utils import url_map class Shorty(object): - def __init__(self, db_uri): local.application = self self.database_engine = create_engine(db_uri, convert_unicode=True) - self.dispatch = SharedDataMiddleware(self.dispatch, { - '/static': STATIC_PATH - }) + self.dispatch = SharedDataMiddleware(self.dispatch, {"/static": STATIC_PATH}) def init_database(self): metadata.create_all(self.database_engine) @@ -36,8 +37,9 @@ class Shorty(object): response.status_code = 404 except HTTPException as e: response = e - return ClosingIterator(response(environ, start_response), - [session.remove, local_manager.cleanup]) + return ClosingIterator( + response(environ, start_response), [session.remove, local_manager.cleanup] + ) def __call__(self, environ, start_response): return self.dispatch(environ, start_response) diff --git a/examples/shorty/models.py b/examples/shorty/models.py index 78b5f1df..7d0df5bd 100644 --- a/examples/shorty/models.py +++ b/examples/shorty/models.py @@ -1,15 +1,27 @@ from datetime import datetime -from sqlalchemy import Table, Column, String, Boolean, DateTime + +from sqlalchemy import Boolean +from sqlalchemy import Column +from sqlalchemy import DateTime +from sqlalchemy import String +from sqlalchemy import Table from sqlalchemy.orm import mapper -from shorty.utils import session, metadata, url_for, get_random_uid -url_table = Table('urls', metadata, - Column('uid', String(140), primary_key=True), - Column('target', String(500)), - Column('added', DateTime), - Column('public', Boolean) +from .utils import get_random_uid +from .utils import metadata +from .utils import session +from .utils import url_for + +url_table = Table( + "urls", + metadata, + Column("uid", String(140), primary_key=True), + Column("target", String(500)), + Column("added", DateTime), + Column("public", Boolean), ) + class URL(object): query = session.query_property() @@ -27,9 +39,10 @@ class URL(object): @property def short_url(self): - return url_for('link', uid=self.uid, _external=True) + return url_for("link", uid=self.uid, _external=True) def __repr__(self): - return '<URL %r>' % self.uid + return "<URL %r>" % self.uid + mapper(URL, url_table) diff --git a/examples/shorty/utils.py b/examples/shorty/utils.py index bbcb0f20..f61f9ef9 100644 --- a/examples/shorty/utils.py +++ b/examples/shorty/utils.py @@ -1,57 +1,72 @@ from os import path -from random import sample, randrange -from jinja2 import Environment, FileSystemLoader -from werkzeug.local import Local, LocalManager +from random import randrange +from random import sample + +from jinja2 import Environment +from jinja2 import FileSystemLoader +from sqlalchemy import MetaData +from sqlalchemy.orm import create_session +from sqlalchemy.orm import scoped_session +from werkzeug.local import Local +from werkzeug.local import LocalManager +from werkzeug.routing import Map +from werkzeug.routing import Rule from werkzeug.urls import url_parse from werkzeug.utils import cached_property from werkzeug.wrappers import Response -from werkzeug.routing import Map, Rule -from sqlalchemy import MetaData -from sqlalchemy.orm import create_session, scoped_session -TEMPLATE_PATH = path.join(path.dirname(__file__), 'templates') -STATIC_PATH = path.join(path.dirname(__file__), 'static') -ALLOWED_SCHEMES = frozenset(['http', 'https', 'ftp', 'ftps']) -URL_CHARS = 'abcdefghijkmpqrstuvwxyzABCDEFGHIJKLMNPQRST23456789' +TEMPLATE_PATH = path.join(path.dirname(__file__), "templates") +STATIC_PATH = path.join(path.dirname(__file__), "static") +ALLOWED_SCHEMES = frozenset(["http", "https", "ftp", "ftps"]) +URL_CHARS = "abcdefghijkmpqrstuvwxyzABCDEFGHIJKLMNPQRST23456789" local = Local() local_manager = LocalManager([local]) -application = local('application') +application = local("application") metadata = MetaData() -url_map = Map([Rule('/static/<file>', endpoint='static', build_only=True)]) +url_map = Map([Rule("/static/<file>", endpoint="static", build_only=True)]) -session = scoped_session(lambda: create_session(application.database_engine, - autocommit=False, - autoflush=False)) +session = scoped_session( + lambda: create_session( + application.database_engine, autocommit=False, autoflush=False + ) +) jinja_env = Environment(loader=FileSystemLoader(TEMPLATE_PATH)) def expose(rule, **kw): def decorate(f): - kw['endpoint'] = f.__name__ + kw["endpoint"] = f.__name__ url_map.add(Rule(rule, **kw)) return f + return decorate + def url_for(endpoint, _external=False, **values): return local.url_adapter.build(endpoint, values, force_external=_external) -jinja_env.globals['url_for'] = url_for + + +jinja_env.globals["url_for"] = url_for + def render_template(template, **context): - return Response(jinja_env.get_template(template).render(**context), - mimetype='text/html') + return Response( + jinja_env.get_template(template).render(**context), mimetype="text/html" + ) + def validate_url(url): return url_parse(url)[0] in ALLOWED_SCHEMES + def get_random_uid(): - return ''.join(sample(URL_CHARS, randrange(3, 9))) + return "".join(sample(URL_CHARS, randrange(3, 9))) class Pagination(object): - def __init__(self, query, per_page, page, endpoint): self.query = query self.per_page = per_page @@ -64,8 +79,11 @@ class Pagination(object): @cached_property def entries(self): - return self.query.offset((self.page - 1) * self.per_page) \ - .limit(self.per_page).all() + return ( + self.query.offset((self.page - 1) * self.per_page) + .limit(self.per_page) + .all() + ) has_previous = property(lambda self: self.page > 1) has_next = property(lambda self: self.page < self.pages) diff --git a/examples/shorty/views.py b/examples/shorty/views.py index fa4269a3..7a1ee20b 100644 --- a/examples/shorty/views.py +++ b/examples/shorty/views.py @@ -1,52 +1,62 @@ -from werkzeug.utils import redirect from werkzeug.exceptions import NotFound -from shorty.utils import session, Pagination, render_template, expose, \ - validate_url, url_for -from shorty.models import URL +from werkzeug.utils import redirect + +from .models import URL +from .utils import expose +from .utils import Pagination +from .utils import render_template +from .utils import session +from .utils import url_for +from .utils import validate_url + -@expose('/') +@expose("/") def new(request): - error = url = '' - if request.method == 'POST': - url = request.form.get('url') - alias = request.form.get('alias') + error = url = "" + if request.method == "POST": + url = request.form.get("url") + alias = request.form.get("alias") if not validate_url(url): error = "I'm sorry but you cannot shorten this URL." elif alias: if len(alias) > 140: - error = 'Your alias is too long' - elif '/' in alias: - error = 'Your alias might not include a slash' + error = "Your alias is too long" + elif "/" in alias: + error = "Your alias might not include a slash" elif URL.query.get(alias): - error = 'The alias you have requested exists already' + error = "The alias you have requested exists already" if not error: - uid = URL(url, 'private' not in request.form, alias).uid + uid = URL(url, "private" not in request.form, alias).uid session.commit() - return redirect(url_for('display', uid=uid)) - return render_template('new.html', error=error, url=url) + return redirect(url_for("display", uid=uid)) + return render_template("new.html", error=error, url=url) -@expose('/display/<uid>') + +@expose("/display/<uid>") def display(request, uid): url = URL.query.get(uid) if not url: raise NotFound() - return render_template('display.html', url=url) + return render_template("display.html", url=url) + -@expose('/u/<uid>') +@expose("/u/<uid>") def link(request, uid): url = URL.query.get(uid) if not url: raise NotFound() return redirect(url.target, 301) -@expose('/list/', defaults={'page': 1}) -@expose('/list/<int:page>') + +@expose("/list/", defaults={"page": 1}) +@expose("/list/<int:page>") def list(request, page): query = URL.query.filter_by(public=True) - pagination = Pagination(query, 30, page, 'list') + pagination = Pagination(query, 30, page, "list") if pagination.page > 1 and not pagination.entries: raise NotFound() - return render_template('list.html', pagination=pagination) + return render_template("list.html", pagination=pagination) + def not_found(request): - return render_template('not_found.html') + return render_template("not_found.html") diff --git a/examples/simplewiki/__init__.py b/examples/simplewiki/__init__.py index c74c2cf6..591a06e9 100644 --- a/examples/simplewiki/__init__.py +++ b/examples/simplewiki/__init__.py @@ -9,4 +9,4 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from simplewiki.application import SimpleWiki +from .application import SimpleWiki diff --git a/examples/simplewiki/actions.py b/examples/simplewiki/actions.py index 29b2ff5b..c9a88b41 100644 --- a/examples/simplewiki/actions.py +++ b/examples/simplewiki/actions.py @@ -13,15 +13,22 @@ :license: BSD-3-Clause """ from difflib import unified_diff -from simplewiki.utils import Response, generate_template, \ - href, format_datetime -from simplewiki.database import RevisionedPage, Page, Revision, session + from werkzeug.utils import redirect +from .database import Page +from .database import Revision +from .database import RevisionedPage +from .database import session +from .utils import format_datetime +from .utils import generate_template +from .utils import href +from .utils import Response + def on_show(request, page_name): """Displays the page the user requests.""" - revision_id = request.args.get('rev', type=int) + revision_id = request.args.get("rev", type=int) query = RevisionedPage.query.filter_by(name=page_name) if revision_id: query = query.filter_by(revision_id=revision_id) @@ -32,32 +39,32 @@ def on_show(request, page_name): page = query.first() if page is None: return page_missing(request, page_name, revision_requested) - return Response(generate_template('action_show.html', - page=page - )) + return Response(generate_template("action_show.html", page=page)) def on_edit(request, page_name): """Edit the current revision of a page.""" - change_note = error = '' - revision = Revision.query.filter( - (Page.name == page_name) & - (Page.page_id == Revision.page_id) - ).order_by(Revision.revision_id.desc()).first() + change_note = error = "" + revision = ( + Revision.query.filter( + (Page.name == page_name) & (Page.page_id == Revision.page_id) + ) + .order_by(Revision.revision_id.desc()) + .first() + ) if revision is None: page = None else: page = revision.page - if request.method == 'POST': - text = request.form.get('text') - if request.form.get('cancel') or \ - revision and revision.text == text: + if request.method == "POST": + text = request.form.get("text") + if request.form.get("cancel") or revision and revision.text == text: return redirect(href(page.name)) elif not text: - error = 'You cannot save empty revisions.' + error = "You cannot save empty revisions." else: - change_note = request.form.get('change_note', '') + change_note = request.form.get("change_note", "") if page is None: page = Page(page_name) session.add(page) @@ -65,14 +72,17 @@ def on_edit(request, page_name): session.commit() return redirect(href(page.name)) - return Response(generate_template('action_edit.html', - revision=revision, - page=page, - new=page is None, - page_name=page_name, - change_note=change_note, - error=error - )) + return Response( + generate_template( + "action_edit.html", + revision=revision, + page=page, + new=page is None, + page_name=page_name, + change_note=change_note, + error=error, + ) + ) def on_log(request, page_name): @@ -80,109 +90,117 @@ def on_log(request, page_name): page = Page.query.filter_by(name=page_name).first() if page is None: return page_missing(request, page_name, False) - return Response(generate_template('action_log.html', - page=page - )) + return Response(generate_template("action_log.html", page=page)) def on_diff(request, page_name): """Show the diff between two revisions.""" - old = request.args.get('old', type=int) - new = request.args.get('new', type=int) - error = '' + old = request.args.get("old", type=int) + new = request.args.get("new", type=int) + error = "" diff = page = old_rev = new_rev = None if not (old and new): - error = 'No revisions specified.' + error = "No revisions specified." else: - revisions = dict((x.revision_id, x) for x in Revision.query.filter( - (Revision.revision_id.in_((old, new))) & - (Revision.page_id == Page.page_id) & - (Page.name == page_name) - )) + revisions = dict( + (x.revision_id, x) + for x in Revision.query.filter( + (Revision.revision_id.in_((old, new))) + & (Revision.page_id == Page.page_id) + & (Page.name == page_name) + ) + ) if len(revisions) != 2: - error = 'At least one of the revisions requested ' \ - 'does not exist.' + error = "At least one of the revisions requested does not exist." else: new_rev = revisions[new] old_rev = revisions[old] page = old_rev.page diff = unified_diff( - (old_rev.text + '\n').splitlines(True), - (new_rev.text + '\n').splitlines(True), - page.name, page.name, + (old_rev.text + "\n").splitlines(True), + (new_rev.text + "\n").splitlines(True), + page.name, + page.name, format_datetime(old_rev.timestamp), format_datetime(new_rev.timestamp), - 3 + 3, ) - return Response(generate_template('action_diff.html', - error=error, - old_revision=old_rev, - new_revision=new_rev, - page=page, - diff=diff - )) + return Response( + generate_template( + "action_diff.html", + error=error, + old_revision=old_rev, + new_revision=new_rev, + page=page, + diff=diff, + ) + ) def on_revert(request, page_name): """Revert an old revision.""" - rev_id = request.args.get('rev', type=int) + rev_id = request.args.get("rev", type=int) old_revision = page = None - error = 'No such revision' + error = "No such revision" - if request.method == 'POST' and request.form.get('cancel'): + if request.method == "POST" and request.form.get("cancel"): return redirect(href(page_name)) if rev_id: old_revision = Revision.query.filter( - (Revision.revision_id == rev_id) & - (Revision.page_id == Page.page_id) & - (Page.name == page_name) + (Revision.revision_id == rev_id) + & (Revision.page_id == Page.page_id) + & (Page.name == page_name) ).first() if old_revision: - new_revision = Revision.query.filter( - (Revision.page_id == Page.page_id) & - (Page.name == page_name) - ).order_by(Revision.revision_id.desc()).first() + new_revision = ( + Revision.query.filter( + (Revision.page_id == Page.page_id) & (Page.name == page_name) + ) + .order_by(Revision.revision_id.desc()) + .first() + ) if old_revision == new_revision: - error = 'You tried to revert the current active ' \ - 'revision.' + error = "You tried to revert the current active revision." elif old_revision.text == new_revision.text: - error = 'There are no changes between the current ' \ - 'revision and the revision you want to ' \ - 'restore.' + error = ( + "There are no changes between the current " + "revision and the revision you want to " + "restore." + ) else: - error = '' + error = "" page = old_revision.page - if request.method == 'POST': - change_note = request.form.get('change_note', '') - change_note = 'revert' + (change_note and ': ' + - change_note or '') - session.add(Revision(page, old_revision.text, - change_note)) + if request.method == "POST": + change_note = request.form.get("change_note", "") + change_note = "revert" + (change_note and ": " + change_note or "") + session.add(Revision(page, old_revision.text, change_note)) session.commit() return redirect(href(page_name)) - return Response(generate_template('action_revert.html', - error=error, - old_revision=old_revision, - page=page - )) + return Response( + generate_template( + "action_revert.html", error=error, old_revision=old_revision, page=page + ) + ) def page_missing(request, page_name, revision_requested, protected=False): """Displayed if page or revision does not exist.""" - return Response(generate_template('page_missing.html', - page_name=page_name, - revision_requested=revision_requested, - protected=protected - ), status=404) + return Response( + generate_template( + "page_missing.html", + page_name=page_name, + revision_requested=revision_requested, + protected=protected, + ), + status=404, + ) def missing_action(request, action): """Displayed if a user tried to access a action that does not exist.""" - return Response(generate_template('missing_action.html', - action=action - ), status=404) + return Response(generate_template("missing_action.html", action=action), status=404) diff --git a/examples/simplewiki/application.py b/examples/simplewiki/application.py index b994b17d..5f85eef3 100644 --- a/examples/simplewiki/application.py +++ b/examples/simplewiki/application.py @@ -11,19 +11,25 @@ :license: BSD-3-Clause """ from os import path -from sqlalchemy import create_engine +from sqlalchemy import create_engine from werkzeug.middleware.shared_data import SharedDataMiddleware from werkzeug.utils import redirect from werkzeug.wsgi import ClosingIterator -from simplewiki.utils import Request, local, local_manager, href -from simplewiki.database import session, metadata -from simplewiki import actions -from simplewiki.specialpages import pages, page_not_found + +from . import actions +from .database import metadata +from .database import session +from .specialpages import page_not_found +from .specialpages import pages +from .utils import href +from .utils import local +from .utils import local_manager +from .utils import Request #: path to shared data -SHARED_DATA = path.join(path.dirname(__file__), 'shared') +SHARED_DATA = path.join(path.dirname(__file__), "shared") class SimpleWiki(object): @@ -37,9 +43,9 @@ class SimpleWiki(object): # apply our middlewares. we apply the middlewars *inside* the # application and not outside of it so that we never lose the # reference to the `SimpleWiki` object. - self._dispatch = SharedDataMiddleware(self.dispatch_request, { - '/_shared': SHARED_DATA - }) + self._dispatch = SharedDataMiddleware( + self.dispatch_request, {"/_shared": SHARED_DATA} + ) # free the context locals at the end of the request self._dispatch = local_manager.make_middleware(self._dispatch) @@ -66,16 +72,15 @@ class SimpleWiki(object): # get the current action from the url and normalize the page name # which is just the request path - action_name = request.args.get('action') or 'show' - page_name = u'_'.join([x for x in request.path.strip('/') - .split() if x]) + action_name = request.args.get("action") or "show" + page_name = u"_".join([x for x in request.path.strip("/").split() if x]) # redirect to the Main_Page if the user requested the index if not page_name: - response = redirect(href('Main_Page')) + response = redirect(href("Main_Page")) # check special pages - elif page_name.startswith('Special:'): + elif page_name.startswith("Special:"): if page_name[8:] not in pages: response = page_not_found(request, page_name) else: @@ -85,15 +90,14 @@ class SimpleWiki(object): # action module. It's "on_" + the action name. If it doesn't # exists call the missing_action method from the same module. else: - action = getattr(actions, 'on_' + action_name, None) + action = getattr(actions, "on_" + action_name, None) if action is None: response = actions.missing_action(request, action_name) else: response = action(request, page_name) # make sure the session is removed properly - return ClosingIterator(response(environ, start_response), - session.remove) + return ClosingIterator(response(environ, start_response), session.remove) def __call__(self, environ, start_response): """Just forward a WSGI call to the first internal middleware.""" diff --git a/examples/simplewiki/database.py b/examples/simplewiki/database.py index d5df7387..f0cec34e 100644 --- a/examples/simplewiki/database.py +++ b/examples/simplewiki/database.py @@ -9,11 +9,23 @@ :license: BSD-3-Clause """ from datetime import datetime -from sqlalchemy import Table, Column, Integer, String, DateTime, \ - ForeignKey, MetaData, join -from sqlalchemy.orm import relation, create_session, scoped_session, \ - mapper -from simplewiki.utils import application, local_manager, parse_creole + +from sqlalchemy import Column +from sqlalchemy import DateTime +from sqlalchemy import ForeignKey +from sqlalchemy import Integer +from sqlalchemy import join +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.orm import create_session +from sqlalchemy.orm import mapper +from sqlalchemy.orm import relation +from sqlalchemy.orm import scoped_session + +from .utils import application +from .utils import local_manager +from .utils import parse_creole # create a global metadata @@ -28,8 +40,7 @@ def new_db_session(): application. If there is no application bound to the context it raises an exception. """ - return create_session(application.database_engine, autoflush=True, - autocommit=False) + return create_session(application.database_engine, autoflush=True, autocommit=False) # and create a new global session factory. Calling this object gives @@ -38,17 +49,21 @@ session = scoped_session(new_db_session, local_manager.get_ident) # our database tables. -page_table = Table('pages', metadata, - Column('page_id', Integer, primary_key=True), - Column('name', String(60), unique=True) +page_table = Table( + "pages", + metadata, + Column("page_id", Integer, primary_key=True), + Column("name", String(60), unique=True), ) -revision_table = Table('revisions', metadata, - Column('revision_id', Integer, primary_key=True), - Column('page_id', Integer, ForeignKey('pages.page_id')), - Column('timestamp', DateTime), - Column('text', String), - Column('change_note', String(200)) +revision_table = Table( + "revisions", + metadata, + Column("revision_id", Integer, primary_key=True), + Column("page_id", Integer, ForeignKey("pages.page_id")), + Column("timestamp", DateTime), + Column("text", String), + Column("change_note", String(200)), ) @@ -59,9 +74,10 @@ class Revision(object): new revisions. It's also used for the diff system and the revision log. """ + query = session.query_property() - def __init__(self, page, text, change_note='', timestamp=None): + def __init__(self, page, text, change_note="", timestamp=None): if isinstance(page, int): self.page_id = page else: @@ -75,11 +91,7 @@ class Revision(object): return parse_creole(self.text) def __repr__(self): - return '<%s %r:%r>' % ( - self.__class__.__name__, - self.page_id, - self.revision_id - ) + return "<%s %r:%r>" % (self.__class__.__name__, self.page_id, self.revision_id) class Page(object): @@ -87,6 +99,7 @@ class Page(object): Represents a simple page without any revisions. This is for example used in the page index where the page contents are not relevant. """ + query = session.query_property() def __init__(self, name): @@ -94,10 +107,10 @@ class Page(object): @property def title(self): - return self.name.replace('_', ' ') + return self.name.replace("_", " ") def __repr__(self): - return '<%s %r>' % (self.__class__.__name__, self.name) + return "<%s %r>" % (self.__class__.__name__, self.name) class RevisionedPage(Page, Revision): @@ -106,26 +119,32 @@ class RevisionedPage(Page, Revision): and the ability of SQLAlchemy to map to joins we can combine `Page` and `Revision` into one class here. """ + query = session.query_property() def __init__(self): - raise TypeError('cannot create WikiPage instances, use the Page and ' - 'Revision classes for data manipulation.') + raise TypeError( + "cannot create WikiPage instances, use the Page and " + "Revision classes for data manipulation." + ) def __repr__(self): - return '<%s %r:%r>' % ( - self.__class__.__name__, - self.name, - self.revision_id - ) + return "<%s %r:%r>" % (self.__class__.__name__, self.name, self.revision_id) # setup mappers mapper(Revision, revision_table) -mapper(Page, page_table, properties=dict( - revisions=relation(Revision, backref='page', - order_by=Revision.revision_id.desc()) -)) -mapper(RevisionedPage, join(page_table, revision_table), properties=dict( - page_id=[page_table.c.page_id, revision_table.c.page_id], -)) +mapper( + Page, + page_table, + properties=dict( + revisions=relation( + Revision, backref="page", order_by=Revision.revision_id.desc() + ) + ), +) +mapper( + RevisionedPage, + join(page_table, revision_table), + properties=dict(page_id=[page_table.c.page_id, revision_table.c.page_id]), +) diff --git a/examples/simplewiki/specialpages.py b/examples/simplewiki/specialpages.py index 9717d6c1..9636f7ca 100644 --- a/examples/simplewiki/specialpages.py +++ b/examples/simplewiki/specialpages.py @@ -9,10 +9,12 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from simplewiki.utils import Response, Pagination, generate_template -from simplewiki.database import RevisionedPage, Page -from simplewiki.actions import page_missing - +from .actions import page_missing +from .database import Page +from .database import RevisionedPage +from .utils import generate_template +from .utils import Pagination +from .utils import Response def page_index(request): @@ -20,19 +22,21 @@ def page_index(request): letters = {} for page in Page.query.order_by(Page.name): letters.setdefault(page.name.capitalize()[0], []).append(page) - return Response(generate_template('page_index.html', - letters=sorted(letters.items()) - )) + return Response( + generate_template("page_index.html", letters=sorted(letters.items())) + ) def recent_changes(request): """Display the recent changes.""" - page = max(1, request.args.get('page', type=int)) - query = RevisionedPage.query \ - .order_by(RevisionedPage.revision_id.desc()) - return Response(generate_template('recent_changes.html', - pagination=Pagination(query, 20, page, 'Special:Recent_Changes') - )) + page = max(1, request.args.get("page", type=int)) + query = RevisionedPage.query.order_by(RevisionedPage.revision_id.desc()) + return Response( + generate_template( + "recent_changes.html", + pagination=Pagination(query, 20, page, "Special:Recent_Changes"), + ) + ) def page_not_found(request, page_name): @@ -43,7 +47,4 @@ def page_not_found(request, page_name): return page_missing(request, page_name, True) -pages = { - 'Index': page_index, - 'Recent_Changes': recent_changes -} +pages = {"Index": page_index, "Recent_Changes": recent_changes} diff --git a/examples/simplewiki/templates/macros.xml b/examples/simplewiki/templates/macros.xml index 14bebd83..28ce06b9 100644 --- a/examples/simplewiki/templates/macros.xml +++ b/examples/simplewiki/templates/macros.xml @@ -1,6 +1,6 @@ <div xmlns="http://www.w3.org/1999/xhtml" xmlns:py="http://genshi.edgewall.org/" py:strip=""> - + <py:def function="render_pagination(pagination)"> <div class="pagination" py:if="pagination.pages > 1"> <py:choose test="pagination.has_previous"> diff --git a/examples/simplewiki/utils.py b/examples/simplewiki/utils.py index f6f11d29..66289e8b 100644 --- a/examples/simplewiki/utils.py +++ b/examples/simplewiki/utils.py @@ -9,20 +9,25 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -import creoleparser from os import path + +import creoleparser from genshi import Stream from genshi.template import TemplateLoader -from werkzeug.local import Local, LocalManager -from werkzeug.urls import url_encode, url_quote +from werkzeug.local import Local +from werkzeug.local import LocalManager +from werkzeug.urls import url_encode +from werkzeug.urls import url_quote from werkzeug.utils import cached_property -from werkzeug.wrappers import BaseRequest, BaseResponse +from werkzeug.wrappers import BaseRequest +from werkzeug.wrappers import BaseResponse # calculate the path to the templates an create the template loader -TEMPLATE_PATH = path.join(path.dirname(__file__), 'templates') -template_loader = TemplateLoader(TEMPLATE_PATH, auto_reload=True, - variable_lookup='lenient') +TEMPLATE_PATH = path.join(path.dirname(__file__), "templates") +template_loader = TemplateLoader( + TEMPLATE_PATH, auto_reload=True, variable_lookup="lenient" +) # context locals. these two objects are use by the application to @@ -30,27 +35,25 @@ template_loader = TemplateLoader(TEMPLATE_PATH, auto_reload=True, # current thread and the current greenlet if there is greenlet support. local = Local() local_manager = LocalManager([local]) -request = local('request') -application = local('application') +request = local("request") +application = local("application") # create a new creole parser creole_parser = creoleparser.Parser( - dialect=creoleparser.create_dialect(creoleparser.creole10_base, - wiki_links_base_url='', + dialect=creoleparser.create_dialect( + creoleparser.creole10_base, + wiki_links_base_url="", wiki_links_path_func=lambda page_name: href(page_name), - wiki_links_space_char='_', - no_wiki_monospace=True + wiki_links_space_char="_", + no_wiki_monospace=True, ), - method='html' + method="html", ) def generate_template(template_name, **context): """Load and generate a template.""" - context.update( - href=href, - format_datetime=format_datetime - ) + context.update(href=href, format_datetime=format_datetime) return template_loader.load(template_name).generate(**context) @@ -64,17 +67,17 @@ def href(*args, **kw): Simple function for URL generation. Position arguments are used for the URL path and keyword arguments are used for the url parameters. """ - result = [(request.script_root if request else '') + '/'] + result = [(request.script_root if request else "") + "/"] for idx, arg in enumerate(args): - result.append(('/' if idx else '') + url_quote(arg)) + result.append(("/" if idx else "") + url_quote(arg)) if kw: - result.append('?' + url_encode(kw)) - return ''.join(result) + result.append("?" + url_encode(kw)) + return "".join(result) def format_datetime(obj): """Format a datetime object.""" - return obj.strftime('%Y-%m-%d %H:%M') + return obj.strftime("%Y-%m-%d %H:%M") class Request(BaseRequest): @@ -94,14 +97,14 @@ class Response(BaseResponse): to html. This makes it possible to switch to xhtml or html5 easily. """ - default_mimetype = 'text/html' + default_mimetype = "text/html" - def __init__(self, response=None, status=200, headers=None, mimetype=None, - content_type=None): + def __init__( + self, response=None, status=200, headers=None, mimetype=None, content_type=None + ): if isinstance(response, Stream): - response = response.render('html', encoding=None, doctype='html') - BaseResponse.__init__(self, response, status, headers, mimetype, - content_type) + response = response.render("html", encoding=None, doctype="html") + BaseResponse.__init__(self, response, status, headers, mimetype, content_type) class Pagination(object): @@ -118,8 +121,11 @@ class Pagination(object): @cached_property def entries(self): - return self.query.offset((self.page - 1) * self.per_page) \ - .limit(self.per_page).all() + return ( + self.query.offset((self.page - 1) * self.per_page) + .limit(self.per_page) + .all() + ) @property def has_previous(self): diff --git a/examples/upload.py b/examples/upload.py index 88af63aa..6f520b42 100644 --- a/examples/upload.py +++ b/examples/upload.py @@ -9,36 +9,39 @@ :license: BSD-3-Clause """ from werkzeug.serving import run_simple -from werkzeug.wrappers import BaseRequest, BaseResponse +from werkzeug.wrappers import BaseRequest +from werkzeug.wrappers import BaseResponse from werkzeug.wsgi import wrap_file def view_file(req): - if not 'uploaded_file' in req.files: - return BaseResponse('no file uploaded') - f = req.files['uploaded_file'] - return BaseResponse(wrap_file(req.environ, f), mimetype=f.content_type, - direct_passthrough=True) + if "uploaded_file" not in req.files: + return BaseResponse("no file uploaded") + f = req.files["uploaded_file"] + return BaseResponse( + wrap_file(req.environ, f), mimetype=f.content_type, direct_passthrough=True + ) def upload_file(req): - return BaseResponse(''' - <h1>Upload File</h1> - <form action="" method="post" enctype="multipart/form-data"> - <input type="file" name="uploaded_file"> - <input type="submit" value="Upload"> - </form> - ''', mimetype='text/html') + return BaseResponse( + """<h1>Upload File</h1> + <form action="" method="post" enctype="multipart/form-data"> + <input type="file" name="uploaded_file"> + <input type="submit" value="Upload"> + </form>""", + mimetype="text/html", + ) def application(environ, start_response): req = BaseRequest(environ) - if req.method == 'POST': + if req.method == "POST": resp = view_file(req) else: resp = upload_file(req) return resp(environ, start_response) -if __name__ == '__main__': - run_simple('localhost', 5000, application, use_debugger=True) +if __name__ == "__main__": + run_simple("localhost", 5000, application, use_debugger=True) diff --git a/examples/webpylike/example.py b/examples/webpylike/example.py index 7dc52a87..0bb6896d 100644 --- a/examples/webpylike/example.py +++ b/examples/webpylike/example.py @@ -8,23 +8,22 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -from webpylike.webpylike import WebPyApp, View, Response +from .webpylike import Response +from .webpylike import View +from .webpylike import WebPyApp -urls = ( - '/', 'index', - '/about', 'about' -) +urls = ("/", "index", "/about", "about") class index(View): def GET(self): - return Response('Hello World') + return Response("Hello World") class about(View): def GET(self): - return Response('This is the about page') + return Response("This is the about page") app = WebPyApp(urls, globals()) diff --git a/examples/webpylike/webpylike.py b/examples/webpylike/webpylike.py index 6669ec63..0a7325ca 100644 --- a/examples/webpylike/webpylike.py +++ b/examples/webpylike/webpylike.py @@ -11,9 +11,13 @@ :license: BSD-3-Clause """ import re -from werkzeug.wrappers import BaseRequest, BaseResponse -from werkzeug.exceptions import HTTPException, MethodNotAllowed, \ - NotImplemented, NotFound + +from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import MethodNotAllowed +from werkzeug.exceptions import NotFound +from werkzeug.exceptions import NotImplemented +from werkzeug.wrappers import BaseRequest +from werkzeug.wrappers import BaseResponse class Request(BaseRequest): @@ -33,6 +37,7 @@ class View(object): def GET(self): raise MethodNotAllowed() + POST = DELETE = PUT = GET def HEAD(self): @@ -46,8 +51,9 @@ class WebPyApp(object): """ def __init__(self, urls, views): - self.urls = [(re.compile('^%s$' % urls[i]), urls[i + 1]) - for i in range(0, len(urls), 2)] + self.urls = [ + (re.compile("^%s$" % urls[i]), urls[i + 1]) for i in range(0, len(urls), 2) + ] self.views = views def __call__(self, environ, start_response): @@ -57,9 +63,8 @@ class WebPyApp(object): match = regex.match(req.path) if match is not None: view = self.views[view](self, req) - if req.method not in ('GET', 'HEAD', 'POST', - 'DELETE', 'PUT'): - raise NotImplemented() + if req.method not in ("GET", "HEAD", "POST", "DELETE", "PUT"): + raise NotImplemented() # noqa: F901 resp = getattr(view, req.method)(*match.groups()) break else: |
