diff options
author | Marc Abramowitz <marc@marc-abramowitz.com> | 2016-03-07 14:05:52 -0800 |
---|---|---|
committer | Marc Abramowitz <marc@marc-abramowitz.com> | 2016-03-07 14:05:52 -0800 |
commit | 42b22881290e00e06b840dee1e42f0f5ef044d47 (patch) | |
tree | b4fef928625acd3e8ee45ccaa8c7a6c9810b3601 /tests/test_registry.py | |
download | paste-git-tox_add_py35.tar.gz |
tox.ini: Add py35 to envlisttox_add_py35
Diffstat (limited to 'tests/test_registry.py')
-rw-r--r-- | tests/test_registry.py | 314 |
1 files changed, 314 insertions, 0 deletions
diff --git a/tests/test_registry.py b/tests/test_registry.py new file mode 100644 index 0000000..23cd9b6 --- /dev/null +++ b/tests/test_registry.py @@ -0,0 +1,314 @@ +# (c) 2005 Ben Bangert +# This module is part of the Python Paste Project and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php +from nose.tools import assert_raises + +from paste.fixture import * +from paste.registry import * +from paste.registry import Registry +from paste.evalexception.middleware import EvalException + +regobj = StackedObjectProxy() +secondobj = StackedObjectProxy(default=dict(hi='people')) + +def simpleapp(environ, start_response): + status = '200 OK' + response_headers = [('Content-type','text/plain')] + start_response(status, response_headers) + return [b'Hello world!\n'] + +def simpleapp_withregistry(environ, start_response): + status = '200 OK' + response_headers = [('Content-type','text/plain')] + start_response(status, response_headers) + body = 'Hello world!Value is %s\n' % regobj.keys() + if six.PY3: + body = body.encode('utf8') + return [body] + +def simpleapp_withregistry_default(environ, start_response): + status = '200 OK' + response_headers = [('Content-type','text/plain')] + start_response(status, response_headers) + body = 'Hello world!Value is %s\n' % secondobj + if six.PY3: + body = body.encode('utf8') + return [body] + + +class RegistryUsingApp(object): + def __init__(self, var, value, raise_exc=False): + self.var = var + self.value = value + self.raise_exc = raise_exc + + def __call__(self, environ, start_response): + if 'paste.registry' in environ: + environ['paste.registry'].register(self.var, self.value) + if self.raise_exc: + raise self.raise_exc + status = '200 OK' + response_headers = [('Content-type','text/plain')] + start_response(status, response_headers) + body = 'Hello world!\nThe variable is %s' % str(regobj) + if six.PY3: + body = body.encode('utf8') + return [body] + +class RegistryUsingIteratorApp(object): + def __init__(self, var, value): + self.var = var + self.value = value + + def __call__(self, environ, start_response): + if 'paste.registry' in environ: + environ['paste.registry'].register(self.var, self.value) + status = '200 OK' + response_headers = [('Content-type','text/plain')] + start_response(status, response_headers) + body = 'Hello world!\nThe variable is %s' % str(regobj) + if six.PY3: + body = body.encode('utf8') + return iter([body]) + +class RegistryMiddleMan(object): + def __init__(self, app, var, value, depth): + self.app = app + self.var = var + self.value = value + self.depth = depth + + def __call__(self, environ, start_response): + if 'paste.registry' in environ: + environ['paste.registry'].register(self.var, self.value) + line = ('\nInserted by middleware!\nInsertValue at depth %s is %s' + % (self.depth, str(regobj))) + if six.PY3: + line = line.encode('utf8') + app_response = [line] + app_iter = None + app_iter = self.app(environ, start_response) + if type(app_iter) in (list, tuple): + app_response.extend(app_iter) + else: + response = [] + for line in app_iter: + response.append(line) + if hasattr(app_iter, 'close'): + app_iter.close() + app_response.extend(response) + line = ('\nAppended by middleware!\nAppendValue at \ + depth %s is %s' % (self.depth, str(regobj))) + if six.PY3: + line = line.encode('utf8') + app_response.append(line) + return app_response + + +def test_simple(): + app = TestApp(simpleapp) + response = app.get('/') + assert 'Hello world' in response + +def test_solo_registry(): + obj = {'hi':'people'} + wsgiapp = RegistryUsingApp(regobj, obj) + wsgiapp = RegistryManager(wsgiapp) + app = TestApp(wsgiapp) + res = app.get('/') + assert 'Hello world' in res + assert 'The variable is' in res + assert "{'hi': 'people'}" in res + +def test_registry_no_object_error(): + app = TestApp(simpleapp_withregistry) + assert_raises(TypeError, app.get, '/') + +def test_with_default_object(): + app = TestApp(simpleapp_withregistry_default) + res = app.get('/') + print(res) + assert 'Hello world' in res + assert "Value is {'hi': 'people'}" in res + +def test_double_registry(): + obj = {'hi':'people'} + secondobj = {'bye':'friends'} + wsgiapp = RegistryUsingApp(regobj, obj) + wsgiapp = RegistryManager(wsgiapp) + wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0) + wsgiapp = RegistryManager(wsgiapp) + app = TestApp(wsgiapp) + res = app.get('/') + assert 'Hello world' in res + assert 'The variable is' in res + assert "{'hi': 'people'}" in res + assert "InsertValue at depth 0 is {'bye': 'friends'}" in res + assert "AppendValue at depth 0 is {'bye': 'friends'}" in res + +def test_really_deep_registry(): + keylist = ['fred', 'wilma', 'barney', 'homer', 'marge', 'bart', 'lisa', + 'maggie'] + valuelist = range(0, len(keylist)) + obj = {'hi':'people'} + wsgiapp = RegistryUsingApp(regobj, obj) + wsgiapp = RegistryManager(wsgiapp) + for depth in valuelist: + newobj = {keylist[depth]: depth} + wsgiapp = RegistryMiddleMan(wsgiapp, regobj, newobj, depth) + wsgiapp = RegistryManager(wsgiapp) + app = TestApp(wsgiapp) + res = app.get('/') + assert 'Hello world' in res + assert 'The variable is' in res + assert "{'hi': 'people'}" in res + for depth in valuelist: + assert "InsertValue at depth %s is {'%s': %s}" % \ + (depth, keylist[depth], depth) in res + for depth in valuelist: + assert "AppendValue at depth %s is {'%s': %s}" % \ + (depth, keylist[depth], depth) in res + +def test_iterating_response(): + obj = {'hi':'people'} + secondobj = {'bye':'friends'} + wsgiapp = RegistryUsingIteratorApp(regobj, obj) + wsgiapp = RegistryManager(wsgiapp) + wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0) + wsgiapp = RegistryManager(wsgiapp) + app = TestApp(wsgiapp) + res = app.get('/') + assert 'Hello world' in res + assert 'The variable is' in res + assert "{'hi': 'people'}" in res + assert "InsertValue at depth 0 is {'bye': 'friends'}" in res + assert "AppendValue at depth 0 is {'bye': 'friends'}" in res + +def _test_restorer(stack, data): + # We need to test the request's specific Registry. Initialize it here so we + # can use it later (RegistryManager will re-use one preexisting in the + # environ) + registry = Registry() + extra_environ={'paste.throw_errors': False, + 'paste.registry': registry} + request_id = restorer.get_request_id(extra_environ) + app = TestApp(stack) + res = app.get('/', extra_environ=extra_environ, expect_errors=True) + + # Ensure all the StackedObjectProxies are empty after the RegistryUsingApp + # raises an Exception + for stacked, proxied_obj, test_cleanup in data: + only_key = list(proxied_obj.keys())[0] + try: + assert only_key not in stacked + assert False + except TypeError: + # Definitely empty + pass + + # Ensure the StackedObjectProxies & Registry 'work' in the simulated + # EvalException context + replace = {'replace': 'dict'} + new = {'new': 'object'} + restorer.restoration_begin(request_id) + try: + for stacked, proxied_obj, test_cleanup in data: + # Ensure our original data magically re-appears in this context + only_key, only_val = list(proxied_obj.items())[0] + assert only_key in stacked and stacked[only_key] == only_val + + # Ensure the Registry still works + registry.prepare() + registry.register(stacked, new) + assert 'new' in stacked and stacked['new'] == 'object' + registry.cleanup() + + # Back to the original (pre-prepare()) + assert only_key in stacked and stacked[only_key] == only_val + + registry.replace(stacked, replace) + assert 'replace' in stacked and stacked['replace'] == 'dict' + + if test_cleanup: + registry.cleanup() + try: + stacked._current_obj() + assert False + except TypeError: + # Definitely empty + pass + finally: + restorer.restoration_end() + +def _restorer_data(): + S = StackedObjectProxy + d = [[S(name='first'), dict(top='of the registry stack'), False], + [S(name='second'), dict(middle='of the stack'), False], + [S(name='third'), dict(bottom='of the STACK.'), False]] + return d + +def _set_cleanup_test(data): + """Instruct _test_restorer to check registry cleanup at this level of the stack + """ + data[2] = True + +def test_restorer_basic(): + data = _restorer_data()[0] + wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception()) + wsgiapp = RegistryManager(wsgiapp) + _set_cleanup_test(data) + wsgiapp = EvalException(wsgiapp) + _test_restorer(wsgiapp, [data]) + +def test_restorer_basic_manager_outside(): + data = _restorer_data()[0] + wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception()) + wsgiapp = EvalException(wsgiapp) + wsgiapp = RegistryManager(wsgiapp) + _set_cleanup_test(data) + _test_restorer(wsgiapp, [data]) + +def test_restorer_middleman_nested_evalexception(): + data = _restorer_data()[:2] + wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception()) + wsgiapp = EvalException(wsgiapp) + wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0) + wsgiapp = RegistryManager(wsgiapp) + _set_cleanup_test(data[1]) + _test_restorer(wsgiapp, data) + +def test_restorer_nested_middleman(): + data = _restorer_data()[:2] + wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception()) + wsgiapp = RegistryManager(wsgiapp) + _set_cleanup_test(data[0]) + wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0) + wsgiapp = EvalException(wsgiapp) + wsgiapp = RegistryManager(wsgiapp) + _set_cleanup_test(data[1]) + _test_restorer(wsgiapp, data) + +def test_restorer_middlemen_nested_evalexception(): + data = _restorer_data() + wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception()) + wsgiapp = RegistryManager(wsgiapp) + _set_cleanup_test(data[0]) + wsgiapp = EvalException(wsgiapp) + wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0) + wsgiapp = RegistryManager(wsgiapp) + _set_cleanup_test(data[1]) + wsgiapp = RegistryMiddleMan(wsgiapp, data[2][0], data[2][1], 1) + wsgiapp = RegistryManager(wsgiapp) + _set_cleanup_test(data[2]) + _test_restorer(wsgiapp, data) + +def test_restorer_disabled(): + # Ensure restoration_begin/end work safely when there's no Registry + wsgiapp = TestApp(simpleapp) + wsgiapp.get('/') + try: + restorer.restoration_begin(1) + finally: + restorer.restoration_end() + # A second call should do nothing + restorer.restoration_end() |