summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-09-07 16:24:47 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-09-08 11:16:53 -0400
commite3716012c535c0aeac2a8cc5a32609ed2d4197c1 (patch)
treefb78685f17fd16260487b9036a8c250f7719f667 /lib/sqlalchemy
parent71fa1db1384b437e9d39817f5612f5dca6a28b87 (diff)
downloadsqlalchemy-e3716012c535c0aeac2a8cc5a32609ed2d4197c1.tar.gz
Create connection characteristics API; implement postgresql flags
Added support for PostgreSQL "readonly" and "deferrable" flags for all of psycopg2, asyncpg and pg8000 dialects. This takes advantage of a newly generalized version of the "isolation level" API to support other kinds of session attributes set via execution options that are reliably reset when connections are returned to the connection pool. Fixes: #5549 Change-Id: I0ad6d7a095e49d331618274c40ce75c76afdc7dd
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py20
-rw-r--r--lib/sqlalchemy/dialects/postgresql/base.py85
-rw-r--r--lib/sqlalchemy/dialects/postgresql/pg8000.py42
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py12
-rw-r--r--lib/sqlalchemy/engine/characteristics.py56
-rw-r--r--lib/sqlalchemy/engine/default.py88
-rw-r--r--lib/sqlalchemy/util/__init__.py1
-rw-r--r--lib/sqlalchemy/util/compat.py8
8 files changed, 289 insertions, 23 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 515ef6e28..eb87249b4 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -465,6 +465,8 @@ class AsyncAdapt_asyncpg_connection:
"dbapi",
"_connection",
"isolation_level",
+ "readonly",
+ "deferrable",
"_transaction",
"_started",
)
@@ -475,6 +477,8 @@ class AsyncAdapt_asyncpg_connection:
self.dbapi = dbapi
self._connection = connection
self.isolation_level = "read_committed"
+ self.readonly = False
+ self.deferrable = False
self._transaction = None
self._started = False
self.await_(self._setup_type_codecs())
@@ -530,7 +534,9 @@ class AsyncAdapt_asyncpg_connection:
try:
self._transaction = self._connection.transaction(
- isolation=self.isolation_level
+ isolation=self.isolation_level,
+ readonly=self.readonly,
+ deferrable=self.deferrable,
)
await self._transaction.start()
except Exception as error:
@@ -763,6 +769,18 @@ class PGDialect_asyncpg(PGDialect):
connection.set_isolation_level(level)
+ def set_readonly(self, connection, value):
+ connection.readonly = value
+
+ def get_readonly(self, connection):
+ return connection.readonly
+
+ def set_deferrable(self, connection, value):
+ connection.deferrable = value
+
+ def get_deferrable(self, connection):
+ return connection.deferrable
+
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
if "port" in opts:
diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py
index 8eb116111..6550dd20d 100644
--- a/lib/sqlalchemy/dialects/postgresql/base.py
+++ b/lib/sqlalchemy/dialects/postgresql/base.py
@@ -152,12 +152,44 @@ Valid values for ``isolation_level`` include:
.. seealso::
+ :ref:`postgresql_readonly_deferrable`
+
:ref:`dbapi_autocommit`
:ref:`psycopg2_isolation_level`
:ref:`pg8000_isolation_level`
+.. _postgresql_readonly_deferrable:
+
+Setting READ ONLY / DEFERRABLE
+------------------------------
+
+Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
+characteristics of the transaction, which is in addition to the isolation level
+setting. These two attributes can be established either in conjunction with or
+independently of the isolation level by passing the ``postgresql_readonly`` and
+``postgresql_deferrable`` flags with
+:meth:`_engine.Connection.execution_options`. The example below illustrates
+passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
+"READ ONLY" and "DEFERRABLE"::
+
+ with engine.connect() as conn:
+ conn = conn.execution_options(
+ isolation_level="SERIALIZABLE",
+ postgresql_readonly=True,
+ postgresql_deferrable=True
+ )
+ with conn.begin():
+ # ... work with transaction
+
+Note that some DBAPIs such as asyncpg only support "readonly" with
+SERIALIZABLE isolation.
+
+.. versionadded:: 1.4 added support for the ``postgresql_readonly``
+ and ``postgresql_deferrable`` execution options.
+
+
.. _postgresql_schema_reflection:
Remote-Schema Table Introspection and PostgreSQL search_path
@@ -1037,6 +1069,7 @@ from ... import exc
from ... import schema
from ... import sql
from ... import util
+from ...engine import characteristics
from ...engine import default
from ...engine import reflection
from ...sql import coercions
@@ -2610,6 +2643,36 @@ class PGExecutionContext(default.DefaultExecutionContext):
return AUTOCOMMIT_REGEXP.match(statement)
+class PGReadOnlyConnectionCharacteristic(
+ characteristics.ConnectionCharacteristic
+):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+ dialect.set_readonly(dbapi_conn, False)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ dialect.set_readonly(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_readonly(dbapi_conn)
+
+
+class PGDeferrableConnectionCharacteristic(
+ characteristics.ConnectionCharacteristic
+):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+ dialect.set_deferrable(dbapi_conn, False)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ dialect.set_deferrable(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_deferrable(dbapi_conn)
+
+
class PGDialect(default.DefaultDialect):
name = "postgresql"
supports_alter = True
@@ -2645,6 +2708,16 @@ class PGDialect(default.DefaultDialect):
implicit_returning = True
full_returning = True
+ connection_characteristics = (
+ default.DefaultDialect.connection_characteristics
+ )
+ connection_characteristics = connection_characteristics.union(
+ {
+ "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
+ "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
+ }
+ )
+
construct_arguments = [
(
schema.Index,
@@ -2762,6 +2835,18 @@ class PGDialect(default.DefaultDialect):
cursor.close()
return val.upper()
+ def set_readonly(self, connection, value):
+ raise NotImplementedError()
+
+ def get_readonly(self, connection):
+ raise NotImplementedError()
+
+ def set_deferrable(self, connection, value):
+ raise NotImplementedError()
+
+ def get_deferrable(self, connection):
+ raise NotImplementedError()
+
def do_begin_twophase(self, connection, xid):
self.do_begin(connection.connection)
diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py
index e08332a57..fd70828ff 100644
--- a/lib/sqlalchemy/dialects/postgresql/pg8000.py
+++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py
@@ -359,6 +359,48 @@ class PGDialect_pg8000(PGDialect):
% (level, self.name, ", ".join(self._isolation_lookup))
)
+ def set_readonly(self, connection, value):
+ cursor = connection.cursor()
+ try:
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
+ % ("READ ONLY" if value else "READ WRITE")
+ )
+ cursor.execute("COMMIT")
+ finally:
+ cursor.close()
+
+ def get_readonly(self, connection):
+ cursor = connection.cursor()
+ try:
+ cursor.execute("show transaction_read_only")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+
+ return val == "yes"
+
+ def set_deferrable(self, connection, value):
+ cursor = connection.cursor()
+ try:
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
+ % ("DEFERRABLE" if value else "NOT DEFERRABLE")
+ )
+ cursor.execute("COMMIT")
+ finally:
+ cursor.close()
+
+ def get_deferrable(self, connection):
+ cursor = connection.cursor()
+ try:
+ cursor.execute("show transaction_deferrable")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+
+ return val == "yes"
+
def set_client_encoding(self, connection, client_encoding):
# adjust for ConnectionFairy possibly being present
if hasattr(connection, "connection"):
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
index 2161b24fc..3cc62fc93 100644
--- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -803,6 +803,18 @@ class PGDialect_psycopg2(PGDialect):
connection.set_isolation_level(level)
+ def set_readonly(self, connection, value):
+ connection.readonly = value
+
+ def get_readonly(self, connection):
+ return connection.readonly
+
+ def set_deferrable(self, connection, value):
+ connection.deferrable = value
+
+ def get_deferrable(self, connection):
+ return connection.deferrable
+
def on_connect(self):
extras = self._psycopg2_extras()
extensions = self._psycopg2_extensions()
diff --git a/lib/sqlalchemy/engine/characteristics.py b/lib/sqlalchemy/engine/characteristics.py
new file mode 100644
index 000000000..c00bff40d
--- /dev/null
+++ b/lib/sqlalchemy/engine/characteristics.py
@@ -0,0 +1,56 @@
+import abc
+
+from ..util import ABC
+
+
+class ConnectionCharacteristic(ABC):
+ """An abstract base for an object that can set, get and reset a
+ per-connection characteristic, typically one that gets reset when the
+ connection is returned to the connection pool.
+
+ transaction isolation is the canonical example, and the
+ ``IsolationLevelCharacteristic`` implementation provides this for the
+ ``DefaultDialect``.
+
+ The ``ConnectionCharacteristic`` class should call upon the ``Dialect`` for
+ the implementation of each method. The object exists strictly to serve as
+ a dialect visitor that can be placed into the
+ ``DefaultDialect.connection_characteristics`` dictionary where it will take
+ effect for calls to :meth:`_engine.Connection.execution_options` and
+ related APIs.
+
+ .. versionadded:: 1.4
+
+ """
+
+ __slots__ = ()
+
+ transactional = False
+
+ @abc.abstractmethod
+ def reset_characteristic(self, dialect, dbapi_conn):
+ """Reset the characteristic on the connection to its default value."""
+
+ @abc.abstractmethod
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ """set characteristic on the connection to a given value."""
+
+ @abc.abstractmethod
+ def get_characteristic(self, dialect, dbapi_conn):
+ """Given a DBAPI connection, get the current value of the
+ characteristic.
+
+ """
+
+
+class IsolationLevelCharacteristic(ConnectionCharacteristic):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+ dialect.reset_isolation_level(dbapi_conn)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ dialect.set_isolation_level(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_isolation_level(dbapi_conn)
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 564258a28..0bd4cd14c 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -14,10 +14,12 @@ as the base class for their own corresponding classes.
"""
import codecs
+import functools
import random
import re
import weakref
+from . import characteristics
from . import cursor as _cursor
from . import interfaces
from .. import event
@@ -85,6 +87,10 @@ class DefaultDialect(interfaces.Dialect):
tuple_in_values = False
+ connection_characteristics = util.immutabledict(
+ {"isolation_level": characteristics.IsolationLevelCharacteristic()}
+ )
+
engine_config_types = util.immutabledict(
[
("convert_unicode", util.bool_or_str("force")),
@@ -513,38 +519,76 @@ class DefaultDialect(interfaces.Dialect):
return [[], opts]
def set_engine_execution_options(self, engine, opts):
- if "isolation_level" in opts:
- isolation_level = opts["isolation_level"]
+ supported_names = set(self.connection_characteristics).intersection(
+ opts
+ )
+ if supported_names:
+ characteristics = util.immutabledict(
+ (name, opts[name]) for name in supported_names
+ )
@event.listens_for(engine, "engine_connect")
- def set_isolation(connection, branch):
+ def set_connection_characteristics(connection, branch):
if not branch:
- self._set_connection_isolation(connection, isolation_level)
+ self._set_connection_characteristics(
+ connection, characteristics
+ )
def set_connection_execution_options(self, connection, opts):
- if "isolation_level" in opts:
- self._set_connection_isolation(connection, opts["isolation_level"])
+ supported_names = set(self.connection_characteristics).intersection(
+ opts
+ )
+ if supported_names:
+ characteristics = util.immutabledict(
+ (name, opts[name]) for name in supported_names
+ )
+ self._set_connection_characteristics(connection, characteristics)
+
+ def _set_connection_characteristics(self, connection, characteristics):
+
+ characteristic_values = [
+ (name, self.connection_characteristics[name], value)
+ for name, value in characteristics.items()
+ ]
- def _set_connection_isolation(self, connection, level):
if connection.in_transaction():
- if connection._is_future:
- raise exc.InvalidRequestError(
- "This connection has already begun a transaction; "
- "isolation level may not be altered until transaction end"
- )
- else:
- util.warn(
- "Connection is already established with a Transaction; "
- "setting isolation_level may implicitly rollback or "
- "commit "
- "the existing transaction, or have no effect until "
- "next transaction"
- )
- self.set_isolation_level(connection.connection, level)
+ trans_objs = [
+ (name, obj)
+ for name, obj, value in characteristic_values
+ if obj.transactional
+ ]
+ if trans_objs:
+ if connection._is_future:
+ raise exc.InvalidRequestError(
+ "This connection has already begun a transaction; "
+ "%s may not be altered until transaction end"
+ % (", ".join(name for name, obj in trans_objs))
+ )
+ else:
+ util.warn(
+ "Connection is already established with a "
+ "Transaction; "
+ "setting %s may implicitly rollback or "
+ "commit "
+ "the existing transaction, or have no effect until "
+ "next transaction"
+ % (", ".join(name for name, obj in trans_objs))
+ )
+
+ dbapi_connection = connection.connection.connection
+ for name, characteristic, value in characteristic_values:
+ characteristic.set_characteristic(self, dbapi_connection, value)
connection.connection._connection_record.finalize_callback.append(
- self.reset_isolation_level
+ functools.partial(self._reset_characteristics, characteristics)
)
+ def _reset_characteristics(self, characteristics, dbapi_connection):
+ for characteristic_name in characteristics:
+ characteristic = self.connection_characteristics[
+ characteristic_name
+ ]
+ characteristic.reset_characteristic(self, dbapi_connection)
+
def do_begin(self, dbapi_connection):
pass
diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py
index 1e3eb9a29..1d92084cc 100644
--- a/lib/sqlalchemy/util/__init__.py
+++ b/lib/sqlalchemy/util/__init__.py
@@ -44,6 +44,7 @@ from ._collections import UniqueAppender # noqa
from ._collections import update_copy # noqa
from ._collections import WeakPopulateDict # noqa
from ._collections import WeakSequence # noqa
+from .compat import ABC # noqa
from .compat import arm # noqa
from .compat import b # noqa
from .compat import b64decode # noqa
diff --git a/lib/sqlalchemy/util/compat.py b/lib/sqlalchemy/util/compat.py
index caa97f72b..e1d0e6444 100644
--- a/lib/sqlalchemy/util/compat.py
+++ b/lib/sqlalchemy/util/compat.py
@@ -193,6 +193,9 @@ if py3k:
# Unused. Kept for backwards compatibility.
callable = callable # noqa
+
+ from abc import ABC
+
else:
import base64
import ConfigParser as configparser # noqa
@@ -208,6 +211,11 @@ else:
from urllib import unquote_plus # noqa
from urlparse import parse_qsl # noqa
+ from abc import ABCMeta
+
+ class ABC(object):
+ __metaclass__ = ABCMeta
+
try:
import cPickle as pickle
except ImportError: