From e3716012c535c0aeac2a8cc5a32609ed2d4197c1 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 7 Sep 2020 16:24:47 -0400 Subject: Create connection characteristics API; implement postgresql flags Added support for PostgreSQL "readonly" and "deferrable" flags for all of psycopg2, asyncpg and pg8000 dialects. This takes advantage of a newly generalized version of the "isolation level" API to support other kinds of session attributes set via execution options that are reliably reset when connections are returned to the connection pool. Fixes: #5549 Change-Id: I0ad6d7a095e49d331618274c40ce75c76afdc7dd --- lib/sqlalchemy/engine/characteristics.py | 56 ++++++++++++++++++++ lib/sqlalchemy/engine/default.py | 88 ++++++++++++++++++++++++-------- 2 files changed, 122 insertions(+), 22 deletions(-) create mode 100644 lib/sqlalchemy/engine/characteristics.py (limited to 'lib/sqlalchemy/engine') diff --git a/lib/sqlalchemy/engine/characteristics.py b/lib/sqlalchemy/engine/characteristics.py new file mode 100644 index 000000000..c00bff40d --- /dev/null +++ b/lib/sqlalchemy/engine/characteristics.py @@ -0,0 +1,56 @@ +import abc + +from ..util import ABC + + +class ConnectionCharacteristic(ABC): + """An abstract base for an object that can set, get and reset a + per-connection characteristic, typically one that gets reset when the + connection is returned to the connection pool. + + transaction isolation is the canonical example, and the + ``IsolationLevelCharacteristic`` implementation provides this for the + ``DefaultDialect``. + + The ``ConnectionCharacteristic`` class should call upon the ``Dialect`` for + the implementation of each method. The object exists strictly to serve as + a dialect visitor that can be placed into the + ``DefaultDialect.connection_characteristics`` dictionary where it will take + effect for calls to :meth:`_engine.Connection.execution_options` and + related APIs. + + .. versionadded:: 1.4 + + """ + + __slots__ = () + + transactional = False + + @abc.abstractmethod + def reset_characteristic(self, dialect, dbapi_conn): + """Reset the characteristic on the connection to its default value.""" + + @abc.abstractmethod + def set_characteristic(self, dialect, dbapi_conn, value): + """set characteristic on the connection to a given value.""" + + @abc.abstractmethod + def get_characteristic(self, dialect, dbapi_conn): + """Given a DBAPI connection, get the current value of the + characteristic. + + """ + + +class IsolationLevelCharacteristic(ConnectionCharacteristic): + transactional = True + + def reset_characteristic(self, dialect, dbapi_conn): + dialect.reset_isolation_level(dbapi_conn) + + def set_characteristic(self, dialect, dbapi_conn, value): + dialect.set_isolation_level(dbapi_conn, value) + + def get_characteristic(self, dialect, dbapi_conn): + return dialect.get_isolation_level(dbapi_conn) diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 564258a28..0bd4cd14c 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -14,10 +14,12 @@ as the base class for their own corresponding classes. """ import codecs +import functools import random import re import weakref +from . import characteristics from . import cursor as _cursor from . import interfaces from .. import event @@ -85,6 +87,10 @@ class DefaultDialect(interfaces.Dialect): tuple_in_values = False + connection_characteristics = util.immutabledict( + {"isolation_level": characteristics.IsolationLevelCharacteristic()} + ) + engine_config_types = util.immutabledict( [ ("convert_unicode", util.bool_or_str("force")), @@ -513,38 +519,76 @@ class DefaultDialect(interfaces.Dialect): return [[], opts] def set_engine_execution_options(self, engine, opts): - if "isolation_level" in opts: - isolation_level = opts["isolation_level"] + supported_names = set(self.connection_characteristics).intersection( + opts + ) + if supported_names: + characteristics = util.immutabledict( + (name, opts[name]) for name in supported_names + ) @event.listens_for(engine, "engine_connect") - def set_isolation(connection, branch): + def set_connection_characteristics(connection, branch): if not branch: - self._set_connection_isolation(connection, isolation_level) + self._set_connection_characteristics( + connection, characteristics + ) def set_connection_execution_options(self, connection, opts): - if "isolation_level" in opts: - self._set_connection_isolation(connection, opts["isolation_level"]) + supported_names = set(self.connection_characteristics).intersection( + opts + ) + if supported_names: + characteristics = util.immutabledict( + (name, opts[name]) for name in supported_names + ) + self._set_connection_characteristics(connection, characteristics) + + def _set_connection_characteristics(self, connection, characteristics): + + characteristic_values = [ + (name, self.connection_characteristics[name], value) + for name, value in characteristics.items() + ] - def _set_connection_isolation(self, connection, level): if connection.in_transaction(): - if connection._is_future: - raise exc.InvalidRequestError( - "This connection has already begun a transaction; " - "isolation level may not be altered until transaction end" - ) - else: - util.warn( - "Connection is already established with a Transaction; " - "setting isolation_level may implicitly rollback or " - "commit " - "the existing transaction, or have no effect until " - "next transaction" - ) - self.set_isolation_level(connection.connection, level) + trans_objs = [ + (name, obj) + for name, obj, value in characteristic_values + if obj.transactional + ] + if trans_objs: + if connection._is_future: + raise exc.InvalidRequestError( + "This connection has already begun a transaction; " + "%s may not be altered until transaction end" + % (", ".join(name for name, obj in trans_objs)) + ) + else: + util.warn( + "Connection is already established with a " + "Transaction; " + "setting %s may implicitly rollback or " + "commit " + "the existing transaction, or have no effect until " + "next transaction" + % (", ".join(name for name, obj in trans_objs)) + ) + + dbapi_connection = connection.connection.connection + for name, characteristic, value in characteristic_values: + characteristic.set_characteristic(self, dbapi_connection, value) connection.connection._connection_record.finalize_callback.append( - self.reset_isolation_level + functools.partial(self._reset_characteristics, characteristics) ) + def _reset_characteristics(self, characteristics, dbapi_connection): + for characteristic_name in characteristics: + characteristic = self.connection_characteristics[ + characteristic_name + ] + characteristic.reset_characteristic(self, dbapi_connection) + def do_begin(self, dbapi_connection): pass -- cgit v1.2.1