summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/postgresql/asyncpg.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/asyncpg.py')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py78
1 files changed, 67 insertions, 11 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 6fa1dd78b..1f988153c 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -36,11 +36,21 @@ in conjunction with :func:`_sa.craete_engine`::
.. versionadded:: 1.4
+.. note::
+
+ By default asyncpg does not decode the ``json`` and ``jsonb`` types and
+ returns them as strings. SQLAlchemy sets default type decoder for ``json``
+ and ``jsonb`` types using the python builtin ``json.loads`` function.
+ The json implementation used can be changed by setting the attribute
+ ``json_deserializer`` when creating the engine with
+ :func:`create_engine` or :func:`create_async_engine`.
+
""" # noqa
import collections
import decimal
import itertools
+import json as _py_json
import re
from . import json
@@ -123,11 +133,17 @@ class AsyncpgJSON(json.JSON):
def get_dbapi_type(self, dbapi):
return dbapi.JSON
+ def result_processor(self, dialect, coltype):
+ return None
+
class AsyncpgJSONB(json.JSONB):
def get_dbapi_type(self, dbapi):
return dbapi.JSONB
+ def result_processor(self, dialect, coltype):
+ return None
+
class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType):
def get_dbapi_type(self, dbapi):
@@ -481,17 +497,6 @@ class AsyncAdapt_asyncpg_connection:
self.deferrable = False
self._transaction = None
self._started = False
- self.await_(self._setup_type_codecs())
-
- async def _setup_type_codecs(self):
- """set up type decoders at the asyncpg level.
-
- these are set_type_codec() calls to normalize
- There was a tentative decoder for the "char" datatype here
- to have it return strings however this type is actually a binary
- type that other drivers are likely mis-interpreting.
-
- """
def _handle_exception(self, error):
if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
@@ -781,5 +786,56 @@ class PGDialect_asyncpg(PGDialect):
e, self.dbapi.InterfaceError
) and "connection is closed" in str(e)
+ def on_connect(self):
+ super_connect = super(PGDialect_asyncpg, self).on_connect()
+
+ def _jsonb_encoder(str_value):
+ # \x01 is the prefix for jsonb used by PostgreSQL.
+ # asyncpg requires it when format='binary'
+ return b"\x01" + str_value.encode()
+
+ deserializer = self._json_deserializer or _py_json.loads
+
+ def _json_decoder(bin_value):
+ return deserializer(bin_value.decode())
+
+ def _jsonb_decoder(bin_value):
+ # the byte is the \x01 prefix for jsonb used by PostgreSQL.
+ # asyncpg returns it when format='binary'
+ return deserializer(bin_value[1:].decode())
+
+ async def _setup_type_codecs(conn):
+ """set up type decoders at the asyncpg level.
+
+ these are set_type_codec() calls to normalize
+ There was a tentative decoder for the "char" datatype here
+ to have it return strings however this type is actually a binary
+ type that other drivers are likely mis-interpreting.
+
+ See https://github.com/MagicStack/asyncpg/issues/623 for reference
+ on why it's set up this way.
+ """
+ await conn._connection.set_type_codec(
+ "json",
+ encoder=str.encode,
+ decoder=_json_decoder,
+ schema="pg_catalog",
+ format="binary",
+ )
+ await conn._connection.set_type_codec(
+ "jsonb",
+ encoder=_jsonb_encoder,
+ decoder=_jsonb_decoder,
+ schema="pg_catalog",
+ format="binary",
+ )
+
+ def connect(conn):
+ conn.await_(_setup_type_codecs(conn))
+ if super_connect is not None:
+ super_connect(conn)
+
+ return connect
+
dialect = PGDialect_asyncpg