diff options
author | Federico Caselli <cfederico87@gmail.com> | 2020-09-21 19:59:00 +0200 |
---|---|---|
committer | Federico Caselli <cfederico87@gmail.com> | 2020-09-23 21:29:56 +0200 |
commit | 219c717e2357439e719464add9f86dc2f40ae667 (patch) | |
tree | dc2290870ae8bc31fa8ea1a595130c39bb328359 /lib/sqlalchemy/dialects/postgresql/asyncpg.py | |
parent | 73ab000007bd25ac86ca2081868615c6c4820531 (diff) | |
download | sqlalchemy-219c717e2357439e719464add9f86dc2f40ae667.tar.gz |
Improve Asyncpg json handling
Set default type codec for ``json`` and ``jsonb`` types when using
the asyncpg driver. By default asyncpg will not decode them and return
strings instead.
Fixes: #5584
Change-Id: I41348eff8096ccf87b952d7e797c0694c6c4b5c4
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/asyncpg.py')
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/asyncpg.py | 78 |
1 files changed, 67 insertions, 11 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 6fa1dd78b..1f988153c 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -36,11 +36,21 @@ in conjunction with :func:`_sa.craete_engine`:: .. versionadded:: 1.4 +.. note:: + + By default asyncpg does not decode the ``json`` and ``jsonb`` types and + returns them as strings. SQLAlchemy sets default type decoder for ``json`` + and ``jsonb`` types using the python builtin ``json.loads`` function. + The json implementation used can be changed by setting the attribute + ``json_deserializer`` when creating the engine with + :func:`create_engine` or :func:`create_async_engine`. + """ # noqa import collections import decimal import itertools +import json as _py_json import re from . import json @@ -123,11 +133,17 @@ class AsyncpgJSON(json.JSON): def get_dbapi_type(self, dbapi): return dbapi.JSON + def result_processor(self, dialect, coltype): + return None + class AsyncpgJSONB(json.JSONB): def get_dbapi_type(self, dbapi): return dbapi.JSONB + def result_processor(self, dialect, coltype): + return None + class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType): def get_dbapi_type(self, dbapi): @@ -481,17 +497,6 @@ class AsyncAdapt_asyncpg_connection: self.deferrable = False self._transaction = None self._started = False - self.await_(self._setup_type_codecs()) - - async def _setup_type_codecs(self): - """set up type decoders at the asyncpg level. - - these are set_type_codec() calls to normalize - There was a tentative decoder for the "char" datatype here - to have it return strings however this type is actually a binary - type that other drivers are likely mis-interpreting. - - """ def _handle_exception(self, error): if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error): @@ -781,5 +786,56 @@ class PGDialect_asyncpg(PGDialect): e, self.dbapi.InterfaceError ) and "connection is closed" in str(e) + def on_connect(self): + super_connect = super(PGDialect_asyncpg, self).on_connect() + + def _jsonb_encoder(str_value): + # \x01 is the prefix for jsonb used by PostgreSQL. + # asyncpg requires it when format='binary' + return b"\x01" + str_value.encode() + + deserializer = self._json_deserializer or _py_json.loads + + def _json_decoder(bin_value): + return deserializer(bin_value.decode()) + + def _jsonb_decoder(bin_value): + # the byte is the \x01 prefix for jsonb used by PostgreSQL. + # asyncpg returns it when format='binary' + return deserializer(bin_value[1:].decode()) + + async def _setup_type_codecs(conn): + """set up type decoders at the asyncpg level. + + these are set_type_codec() calls to normalize + There was a tentative decoder for the "char" datatype here + to have it return strings however this type is actually a binary + type that other drivers are likely mis-interpreting. + + See https://github.com/MagicStack/asyncpg/issues/623 for reference + on why it's set up this way. + """ + await conn._connection.set_type_codec( + "json", + encoder=str.encode, + decoder=_json_decoder, + schema="pg_catalog", + format="binary", + ) + await conn._connection.set_type_codec( + "jsonb", + encoder=_jsonb_encoder, + decoder=_jsonb_decoder, + schema="pg_catalog", + format="binary", + ) + + def connect(conn): + conn.await_(_setup_type_codecs(conn)) + if super_connect is not None: + super_connect(conn) + + return connect + dialect = PGDialect_asyncpg |