diff options
author | Jim Rollenhagen <jim@jimrollenhagen.com> | 2019-09-26 09:43:27 -0400 |
---|---|---|
committer | Jim Rollenhagen <jim@jimrollenhagen.com> | 2019-09-26 09:43:27 -0400 |
commit | e9c6edfe510f4ed407f8d2d84b4b931a382b48b3 (patch) | |
tree | 94bbd6a34bcf09e99f7ae1be88b19960192d6adb /wsme/tests/test_restjson.py | |
parent | 1d73d6e50411ebc45fb96a6ed3c63ca91a500323 (diff) | |
download | wsme-master.tar.gz |
Diffstat (limited to 'wsme/tests/test_restjson.py')
-rw-r--r-- | wsme/tests/test_restjson.py | 779 |
1 files changed, 0 insertions, 779 deletions
diff --git a/wsme/tests/test_restjson.py b/wsme/tests/test_restjson.py deleted file mode 100644 index 7afbb86..0000000 --- a/wsme/tests/test_restjson.py +++ /dev/null @@ -1,779 +0,0 @@ -import base64 -import datetime -import decimal - -import wsme.tests.protocol - -try: - import simplejson as json -except ImportError: - import json # noqa - -from wsme.rest.json import fromjson, tojson, parse -from wsme.utils import parse_isodatetime, parse_isotime, parse_isodate -from wsme.types import isarray, isdict, isusertype, register_type -from wsme.types import UserType, ArrayType, DictType -from wsme.rest import expose, validate -from wsme.exc import ClientSideError, InvalidInput - - -import six -from six import b, u - -if six.PY3: - from urllib.parse import urlencode -else: - from urllib import urlencode # noqa - - -def prepare_value(value, datatype): - if isinstance(datatype, list): - return [prepare_value(item, datatype[0]) for item in value] - if isinstance(datatype, dict): - key_type, value_type = list(datatype.items())[0] - return dict(( - (prepare_value(item[0], key_type), - prepare_value(item[1], value_type)) - for item in value.items() - )) - if datatype in (datetime.date, datetime.time, datetime.datetime): - return value.isoformat() - if datatype == decimal.Decimal: - return str(value) - if datatype == wsme.types.binary: - return base64.encodestring(value).decode('ascii') - if datatype == wsme.types.bytes: - return value.decode('ascii') - return value - - -def prepare_result(value, datatype): - print(value, datatype) - if value is None: - return None - if datatype == wsme.types.binary: - return base64.decodestring(value.encode('ascii')) - if isusertype(datatype): - datatype = datatype.basetype - if isinstance(datatype, list): - return [prepare_result(item, datatype[0]) for item in value] - if isarray(datatype): - return [prepare_result(item, datatype.item_type) for item in value] - if isinstance(datatype, dict): - return dict(( - (prepare_result(item[0], list(datatype.keys())[0]), - prepare_result(item[1], list(datatype.values())[0])) - for item in value.items() - )) - if isdict(datatype): - return dict(( - (prepare_result(item[0], datatype.key_type), - prepare_result(item[1], datatype.value_type)) - for item in value.items() - )) - if datatype == datetime.date: - return parse_isodate(value) - if datatype == datetime.time: - return parse_isotime(value) - if datatype == datetime.datetime: - return parse_isodatetime(value) - if hasattr(datatype, '_wsme_attributes'): - for attr in datatype._wsme_attributes: - if attr.key not in value: - continue - value[attr.key] = prepare_result(value[attr.key], attr.datatype) - return value - if datatype == wsme.types.bytes: - return value.encode('ascii') - if type(value) != datatype: - print(type(value), datatype) - return datatype(value) - return value - - -class CustomInt(UserType): - basetype = int - name = "custom integer" - - -class Obj(wsme.types.Base): - id = int - name = wsme.types.text - - -class NestedObj(wsme.types.Base): - o = Obj - - -class CRUDResult(object): - data = Obj - message = wsme.types.text - - def __init__(self, data=wsme.types.Unset, message=wsme.types.Unset): - self.data = data - self.message = message - - -class MiniCrud(object): - @expose(CRUDResult, method='PUT') - @validate(Obj) - def create(self, data): - print(repr(data)) - return CRUDResult(data, u('create')) - - @expose(CRUDResult, method='GET', ignore_extra_args=True) - @validate(Obj) - def read(self, ref): - print(repr(ref)) - if ref.id == 1: - ref.name = u('test') - return CRUDResult(ref, u('read')) - - @expose(CRUDResult, method='POST') - @validate(Obj) - def update(self, data): - print(repr(data)) - return CRUDResult(data, u('update')) - - @expose(CRUDResult, wsme.types.text, body=Obj) - def update_with_body(self, msg, data): - print(repr(data)) - return CRUDResult(data, msg) - - @expose(CRUDResult, method='DELETE') - @validate(Obj) - def delete(self, ref): - print(repr(ref)) - if ref.id == 1: - ref.name = u('test') - return CRUDResult(ref, u('delete')) - - -wsme.tests.protocol.WSTestRoot.crud = MiniCrud() - - -class TestRestJson(wsme.tests.protocol.RestOnlyProtocolTestCase): - protocol = 'restjson' - - def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False, - body=None, **kw): - if body: - if isinstance(body, tuple): - body, datatype = body - else: - datatype = type(body) - body = prepare_value(body, datatype) - content = json.dumps(body) - else: - for key in kw: - if isinstance(kw[key], tuple): - value, datatype = kw[key] - else: - value = kw[key] - datatype = type(value) - kw[key] = prepare_value(value, datatype) - content = json.dumps(kw) - headers = { - 'Content-Type': 'application/json', - } - if _accept is not None: - headers["Accept"] = _accept - res = self.app.post( - '/' + fpath, - content, - headers=headers, - expect_errors=True) - print("Received:", res.body) - - if _no_result_decode: - return res - - r = json.loads(res.text) - if res.status_int == 200: - if _rt and r: - r = prepare_result(r, _rt) - return r - else: - raise wsme.tests.protocol.CallException( - r['faultcode'], - r['faultstring'], - r.get('debuginfo') - ) - - return json.loads(res.text) - - def test_fromjson(self): - assert fromjson(str, None) is None - - def test_keyargs(self): - r = self.app.get('/argtypes/setint.json?value=2') - print(r) - assert json.loads(r.text) == 2 - - nestedarray = 'value[0].inner.aint=54&value[1].inner.aint=55' - r = self.app.get('/argtypes/setnestedarray.json?' + nestedarray) - print(r) - assert json.loads(r.text) == [ - {'inner': {'aint': 54}}, - {'inner': {'aint': 55}}] - - def test_form_urlencoded_args(self): - params = { - 'value[0].inner.aint': 54, - 'value[1].inner.aint': 55 - } - body = urlencode(params) - r = self.app.post( - '/argtypes/setnestedarray.json', - body, - headers={'Content-Type': 'application/x-www-form-urlencoded'} - ) - print(r) - - assert json.loads(r.text) == [ - {'inner': {'aint': 54}}, - {'inner': {'aint': 55}}] - - def test_body_and_params(self): - r = self.app.post('/argtypes/setint.json?value=2', '{"value": 2}', - headers={"Content-Type": "application/json"}, - expect_errors=True) - print(r) - assert r.status_int == 400 - assert json.loads(r.text)['faultstring'] == \ - "Parameter value was given several times" - - def test_inline_body(self): - params = urlencode({'__body__': '{"value": 4}'}) - r = self.app.get('/argtypes/setint.json?' + params) - print(r) - assert json.loads(r.text) == 4 - - def test_empty_body(self): - params = urlencode({'__body__': ''}) - r = self.app.get('/returntypes/getint.json?' + params) - print(r) - assert json.loads(r.text) == 2 - - def test_invalid_content_type_body(self): - r = self.app.post('/argtypes/setint.json', '{"value": 2}', - headers={"Content-Type": "application/invalid"}, - expect_errors=True) - print(r) - assert r.status_int == 415 - assert json.loads(r.text)['faultstring'] == \ - "Unknown mimetype: application/invalid" - - def test_invalid_json_body(self): - r = self.app.post('/argtypes/setint.json', '{"value": 2', - headers={"Content-Type": "application/json"}, - expect_errors=True) - print(r) - assert r.status_int == 400 - assert json.loads(r.text)['faultstring'] == \ - "Request is not in valid JSON format" - - def test_unknown_arg(self): - r = self.app.post('/returntypes/getint.json', '{"a": 2}', - headers={"Content-Type": "application/json"}, - expect_errors=True) - print(r) - assert r.status_int == 400 - assert json.loads(r.text)['faultstring'].startswith( - "Unknown argument:" - ) - - r = self.app.get('/returntypes/getint.json?a=2', expect_errors=True) - print(r) - assert r.status_int == 400 - assert json.loads(r.text)['faultstring'].startswith( - "Unknown argument:" - ) - - def test_set_custom_object(self): - r = self.app.post( - '/argtypes/setcustomobject', - '{"value": {"aint": 2, "name": "test"}}', - headers={"Content-Type": "application/json"} - ) - self.assertEqual(r.status_int, 200) - self.assertEqual(r.json, {'aint': 2, 'name': 'test'}) - - def test_set_extended_int(self): - r = self.app.post( - '/argtypes/setextendedint', - '{"value": 3}', - headers={"Content-Type": "application/json"} - ) - self.assertEqual(r.status_int, 200) - self.assertEqual(r.json, 3) - - def test_unset_attrs(self): - class AType(object): - attr = int - - wsme.types.register_type(AType) - - j = tojson(AType, AType()) - assert j == {} - - def test_array_tojson(self): - assert tojson([int], None) is None - assert tojson([int], []) == [] - assert tojson([str], ['1', '4']) == ['1', '4'] - - def test_dict_tojson(self): - assert tojson({int: str}, None) is None - assert tojson({int: str}, {5: '5'}) == {5: '5'} - - def test_None_tojson(self): - for dt in (datetime.date, datetime.time, datetime.datetime, - decimal.Decimal): - assert tojson(dt, None) is None - - def test_None_fromjson(self): - for dt in (str, int, datetime.date, datetime.time, datetime.datetime, - decimal.Decimal, [int], {int: int}): - assert fromjson(dt, None) is None - - def test_parse_valid_date(self): - j = parse('{"a": "2011-01-01"}', {'a': datetime.date}, False) - assert isinstance(j['a'], datetime.date) - - def test_invalid_root_dict_fromjson(self): - try: - parse('["invalid"]', {'a': ArrayType(str)}, False) - assert False - except Exception as e: - assert isinstance(e, ClientSideError) - assert e.msg == "Request must be a JSON dict" - - def test_invalid_list_fromjson(self): - jlist = "invalid" - try: - parse('{"a": "%s"}' % jlist, {'a': ArrayType(str)}, False) - assert False - except Exception as e: - assert isinstance(e, InvalidInput) - assert e.fieldname == 'a' - assert e.value == jlist - assert e.msg == "Value not a valid list: %s" % jlist - - def test_invalid_dict_fromjson(self): - jdict = "invalid" - try: - parse('{"a": "%s"}' % jdict, {'a': DictType(str, str)}, False) - assert False - except Exception as e: - assert isinstance(e, InvalidInput) - assert e.fieldname == 'a' - assert e.value == jdict - assert e.msg == "Value not a valid dict: %s" % jdict - - def test_invalid_date_fromjson(self): - jdate = "2015-01-invalid" - try: - parse('{"a": "%s"}' % jdate, {'a': datetime.date}, False) - assert False - except Exception as e: - assert isinstance(e, InvalidInput) - assert e.fieldname == 'a' - assert e.value == jdate - assert e.msg == "'%s' is not a legal date value" % jdate - - def test_parse_valid_date_bodyarg(self): - j = parse('"2011-01-01"', {'a': datetime.date}, True) - assert isinstance(j['a'], datetime.date) - - def test_invalid_date_fromjson_bodyarg(self): - jdate = "2015-01-invalid" - try: - parse('"%s"' % jdate, {'a': datetime.date}, True) - assert False - except Exception as e: - assert isinstance(e, InvalidInput) - assert e.fieldname == 'a' - assert e.value == jdate - assert e.msg == "'%s' is not a legal date value" % jdate - - def test_valid_str_to_builtin_fromjson(self): - types = six.integer_types + (bool, float) - value = '2' - for t in types: - for ba in True, False: - jd = '%s' if ba else '{"a": %s}' - i = parse(jd % value, {'a': t}, ba) - self.assertEqual( - i, {'a': t(value)}, - "Parsed value does not correspond for %s: " - "%s != {'a': %s}" % ( - t, repr(i), repr(t(value)) - ) - ) - self.assertIsInstance(i['a'], t) - - def test_valid_int_fromjson(self): - value = 2 - for ba in True, False: - jd = '%d' if ba else '{"a": %d}' - i = parse(jd % value, {'a': int}, ba) - self.assertEqual(i, {'a': 2}) - self.assertIsInstance(i['a'], int) - - def test_valid_num_to_float_fromjson(self): - values = 2, 2.3 - for v in values: - for ba in True, False: - jd = '%f' if ba else '{"a": %f}' - i = parse(jd % v, {'a': float}, ba) - self.assertEqual(i, {'a': float(v)}) - self.assertIsInstance(i['a'], float) - - def test_invalid_str_to_buitin_fromjson(self): - types = six.integer_types + (float, bool) - value = '2a' - for t in types: - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - try: - parse(jd % value, {'a': t}, ba) - assert False, ( - "Value '%s' should not parse correctly for %s." % - (value, t) - ) - except ClientSideError as e: - self.assertIsInstance(e, InvalidInput) - self.assertEqual(e.fieldname, 'a') - self.assertEqual(e.value, value) - - def test_ambiguous_to_bool(self): - amb_values = ('', 'randomstring', '2', '-32', 'not true') - for value in amb_values: - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - try: - parse(jd % value, {'a': bool}, ba) - assert False, ( - "Value '%s' should not parse correctly for %s." % - (value, bool) - ) - except ClientSideError as e: - self.assertIsInstance(e, InvalidInput) - self.assertEqual(e.fieldname, 'a') - self.assertEqual(e.value, value) - - def test_true_strings_to_bool(self): - true_values = ('true', 't', 'yes', 'y', 'on', '1') - for value in true_values: - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - i = parse(jd % value, {'a': bool}, ba) - self.assertIsInstance(i['a'], bool) - self.assertTrue(i['a']) - - def test_false_strings_to_bool(self): - false_values = ('false', 'f', 'no', 'n', 'off', '0') - for value in false_values: - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - i = parse(jd % value, {'a': bool}, ba) - self.assertIsInstance(i['a'], bool) - self.assertFalse(i['a']) - - def test_true_ints_to_bool(self): - true_values = (1, 5, -3) - for value in true_values: - for ba in True, False: - jd = '%d' if ba else '{"a": %d}' - i = parse(jd % value, {'a': bool}, ba) - self.assertIsInstance(i['a'], bool) - self.assertTrue(i['a']) - - def test_false_ints_to_bool(self): - value = 0 - for ba in True, False: - jd = '%d' if ba else '{"a": %d}' - i = parse(jd % value, {'a': bool}, ba) - self.assertIsInstance(i['a'], bool) - self.assertFalse(i['a']) - - def test_valid_simple_custom_type_fromjson(self): - value = 2 - for ba in True, False: - jd = '"%d"' if ba else '{"a": "%d"}' - i = parse(jd % value, {'a': CustomInt()}, ba) - self.assertEqual(i, {'a': 2}) - self.assertIsInstance(i['a'], int) - - def test_invalid_simple_custom_type_fromjson(self): - value = '2b' - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - try: - i = parse(jd % value, {'a': CustomInt()}, ba) - self.assertEqual(i, {'a': 2}) - except ClientSideError as e: - self.assertIsInstance(e, InvalidInput) - self.assertEqual(e.fieldname, 'a') - self.assertEqual(e.value, value) - self.assertEqual( - e.msg, - "invalid literal for int() with base 10: '%s'" % value - ) - - def test_parse_unexpected_attribute(self): - o = { - "id": "1", - "name": "test", - "other": "unknown", - "other2": "still unknown", - } - for ba in True, False: - jd = o if ba else {"o": o} - try: - parse(json.dumps(jd), {'o': Obj}, ba) - raise AssertionError("Object should not parse correcty.") - except wsme.exc.UnknownAttribute as e: - self.assertEqual(e.attributes, set(['other', 'other2'])) - - def test_parse_unexpected_nested_attribute(self): - no = { - "o": { - "id": "1", - "name": "test", - "other": "unknown", - }, - } - for ba in False, True: - jd = no if ba else {"no": no} - try: - parse(json.dumps(jd), {'no': NestedObj}, ba) - except wsme.exc.UnknownAttribute as e: - self.assertEqual(e.attributes, set(['other'])) - self.assertEqual(e.fieldname, "no.o") - - def test_nest_result(self): - self.root.protocols[0].nest_result = True - r = self.app.get('/returntypes/getint.json') - print(r) - assert json.loads(r.text) == {"result": 2} - - def test_encode_sample_value(self): - class MyType(object): - aint = int - astr = str - - register_type(MyType) - - v = MyType() - v.aint = 4 - v.astr = 's' - - r = wsme.rest.json.encode_sample_value(MyType, v, True) - print(r) - assert r[0] == ('javascript') - assert r[1] == json.dumps({'aint': 4, 'astr': 's'}, ensure_ascii=False, - indent=4, sort_keys=True) - - def test_bytes_tojson(self): - assert tojson(wsme.types.bytes, None) is None - assert tojson(wsme.types.bytes, b('ascii')) == u('ascii') - - def test_encode_sample_params(self): - r = wsme.rest.json.encode_sample_params( - [('a', int, 2)], True - ) - assert r[0] == 'javascript', r[0] - assert r[1] == '''{ - "a": 2 -}''', r[1] - - def test_encode_sample_result(self): - r = wsme.rest.json.encode_sample_result( - int, 2, True - ) - assert r[0] == 'javascript', r[0] - assert r[1] == '''2''' - - def test_PUT(self): - data = {"id": 1, "name": u("test")} - content = json.dumps(dict(data=data)) - headers = { - 'Content-Type': 'application/json', - } - res = self.app.put( - '/crud', - content, - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "create" - - def test_GET(self): - headers = { - 'Accept': 'application/json', - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - - def test_GET_complex_accept(self): - headers = { - 'Accept': 'text/html,application/xml;q=0.9,*/*;q=0.8' - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - - def test_GET_complex_choose_xml(self): - headers = { - 'Accept': 'text/html,text/xml;q=0.9,*/*;q=0.8' - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - expect_errors=False) - print("Received:", res.body) - assert res.content_type == 'text/xml' - - def test_GET_complex_accept_no_match(self): - headers = { - 'Accept': 'text/html,application/xml;q=0.9' - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - status=406) - print("Received:", res.body) - assert res.body == b("Unacceptable Accept type: " - "text/html, application/xml;q=0.9 not in " - "['application/json', 'text/javascript', " - "'application/javascript', 'text/xml']") - - def test_GET_bad_simple_accept(self): - headers = { - 'Accept': 'text/plain', - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - status=406) - print("Received:", res.body) - assert res.body == b("Unacceptable Accept type: text/plain not in " - "['application/json', 'text/javascript', " - "'application/javascript', 'text/xml']") - - def test_POST(self): - headers = { - 'Content-Type': 'application/json', - } - res = self.app.post( - '/crud', - json.dumps(dict(data=dict(id=1, name=u('test')))), - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "update" - - def test_POST_bad_content_type(self): - headers = { - 'Content-Type': 'text/plain', - } - res = self.app.post( - '/crud', - json.dumps(dict(data=dict(id=1, name=u('test')))), - headers=headers, - status=415) - print("Received:", res.body) - assert res.body == b("Unacceptable Content-Type: text/plain not in " - "['application/json', 'text/javascript', " - "'application/javascript', 'text/xml']") - - def test_DELETE(self): - res = self.app.delete( - '/crud.json?ref.id=1', - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "delete" - - def test_extra_arguments(self): - headers = { - 'Accept': 'application/json', - } - res = self.app.get( - '/crud?ref.id=1&extraarg=foo', - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "read" - - def test_unexpected_extra_arg(self): - headers = { - 'Content-Type': 'application/json', - } - data = {"id": 1, "name": "test"} - content = json.dumps({"data": data, "other": "unexpected"}) - res = self.app.put( - '/crud', - content, - headers=headers, - expect_errors=True) - self.assertEqual(res.status_int, 400) - - def test_unexpected_extra_attribute(self): - """Expect a failure if we send an unexpected object attribute.""" - headers = { - 'Content-Type': 'application/json', - } - data = {"id": 1, "name": "test", "other": "unexpected"} - content = json.dumps({"data": data}) - res = self.app.put( - '/crud', - content, - headers=headers, - expect_errors=True) - self.assertEqual(res.status_int, 400) - - def test_body_arg(self): - headers = { - 'Content-Type': 'application/json', - } - res = self.app.post( - '/crud/update_with_body?msg=hello', - json.dumps(dict(id=1, name=u('test'))), - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "hello" |