diff options
author | mike bayer <mike_mp@zzzcomputing.com> | 2020-09-08 15:17:37 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@bbpush.zzzcomputing.com> | 2020-09-08 15:17:37 +0000 |
commit | 0d56a62f721ee6c91d8a8b6a407b959c9215b3b6 (patch) | |
tree | b87a0be641b46d466daf55d6bc7d550cee9a153a | |
parent | 6dc8d1dc6955db8107b683f2c2f3e4b62aad574b (diff) | |
parent | e3716012c535c0aeac2a8cc5a32609ed2d4197c1 (diff) | |
download | sqlalchemy-0d56a62f721ee6c91d8a8b6a407b959c9215b3b6.tar.gz |
Merge "Create connection characteristics API; implement postgresql flags"
-rw-r--r-- | doc/build/changelog/unreleased_14/5549.rst | 13 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/asyncpg.py | 20 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/base.py | 85 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/pg8000.py | 42 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 12 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/characteristics.py | 56 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/default.py | 88 | ||||
-rw-r--r-- | lib/sqlalchemy/util/__init__.py | 1 | ||||
-rw-r--r-- | lib/sqlalchemy/util/compat.py | 8 | ||||
-rw-r--r-- | test/dialect/postgresql/test_dialect.py | 106 | ||||
-rw-r--r-- | test/engine/test_transaction.py | 153 |
11 files changed, 559 insertions, 25 deletions
diff --git a/doc/build/changelog/unreleased_14/5549.rst b/doc/build/changelog/unreleased_14/5549.rst new file mode 100644 index 000000000..a555b82ce --- /dev/null +++ b/doc/build/changelog/unreleased_14/5549.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: usecase, postgresql + :tickets: 5549 + + Added support for PostgreSQL "readonly" and "deferrable" flags for all of + psycopg2, asyncpg and pg8000 dialects. This takes advantage of a newly + generalized version of the "isolation level" API to support other kinds of + session attributes set via execution options that are reliably reset + when connections are returned to the connection pool. + + .. seealso:: + + :ref:`postgresql_readonly_deferrable`
\ No newline at end of file diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 515ef6e28..eb87249b4 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -465,6 +465,8 @@ class AsyncAdapt_asyncpg_connection: "dbapi", "_connection", "isolation_level", + "readonly", + "deferrable", "_transaction", "_started", ) @@ -475,6 +477,8 @@ class AsyncAdapt_asyncpg_connection: self.dbapi = dbapi self._connection = connection self.isolation_level = "read_committed" + self.readonly = False + self.deferrable = False self._transaction = None self._started = False self.await_(self._setup_type_codecs()) @@ -530,7 +534,9 @@ class AsyncAdapt_asyncpg_connection: try: self._transaction = self._connection.transaction( - isolation=self.isolation_level + isolation=self.isolation_level, + readonly=self.readonly, + deferrable=self.deferrable, ) await self._transaction.start() except Exception as error: @@ -763,6 +769,18 @@ class PGDialect_asyncpg(PGDialect): connection.set_isolation_level(level) + def set_readonly(self, connection, value): + connection.readonly = value + + def get_readonly(self, connection): + return connection.readonly + + def set_deferrable(self, connection, value): + connection.deferrable = value + + def get_deferrable(self, connection): + return connection.deferrable + def create_connect_args(self, url): opts = url.translate_connect_args(username="user") if "port" in opts: diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 105e93c9d..84247d046 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -160,12 +160,44 @@ Valid values for ``isolation_level`` on most PostgreSQL dialects include: .. seealso:: + :ref:`postgresql_readonly_deferrable` + :ref:`dbapi_autocommit` :ref:`psycopg2_isolation_level` :ref:`pg8000_isolation_level` +.. _postgresql_readonly_deferrable: + +Setting READ ONLY / DEFERRABLE +------------------------------ + +Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" +characteristics of the transaction, which is in addition to the isolation level +setting. These two attributes can be established either in conjunction with or +independently of the isolation level by passing the ``postgresql_readonly`` and +``postgresql_deferrable`` flags with +:meth:`_engine.Connection.execution_options`. The example below illustrates +passing the ``"SERIALIZABLE"`` isolation level at the same time as setting +"READ ONLY" and "DEFERRABLE":: + + with engine.connect() as conn: + conn = conn.execution_options( + isolation_level="SERIALIZABLE", + postgresql_readonly=True, + postgresql_deferrable=True + ) + with conn.begin(): + # ... work with transaction + +Note that some DBAPIs such as asyncpg only support "readonly" with +SERIALIZABLE isolation. + +.. versionadded:: 1.4 added support for the ``postgresql_readonly`` + and ``postgresql_deferrable`` execution options. + + .. _postgresql_schema_reflection: Remote-Schema Table Introspection and PostgreSQL search_path @@ -1045,6 +1077,7 @@ from ... import exc from ... import schema from ... import sql from ... import util +from ...engine import characteristics from ...engine import default from ...engine import reflection from ...sql import coercions @@ -2618,6 +2651,36 @@ class PGExecutionContext(default.DefaultExecutionContext): return AUTOCOMMIT_REGEXP.match(statement) +class PGReadOnlyConnectionCharacteristic( + characteristics.ConnectionCharacteristic +): + transactional = True + + def reset_characteristic(self, dialect, dbapi_conn): + dialect.set_readonly(dbapi_conn, False) + + def set_characteristic(self, dialect, dbapi_conn, value): + dialect.set_readonly(dbapi_conn, value) + + def get_characteristic(self, dialect, dbapi_conn): + return dialect.get_readonly(dbapi_conn) + + +class PGDeferrableConnectionCharacteristic( + characteristics.ConnectionCharacteristic +): + transactional = True + + def reset_characteristic(self, dialect, dbapi_conn): + dialect.set_deferrable(dbapi_conn, False) + + def set_characteristic(self, dialect, dbapi_conn, value): + dialect.set_deferrable(dbapi_conn, value) + + def get_characteristic(self, dialect, dbapi_conn): + return dialect.get_deferrable(dbapi_conn) + + class PGDialect(default.DefaultDialect): name = "postgresql" supports_alter = True @@ -2653,6 +2716,16 @@ class PGDialect(default.DefaultDialect): implicit_returning = True full_returning = True + connection_characteristics = ( + default.DefaultDialect.connection_characteristics + ) + connection_characteristics = connection_characteristics.union( + { + "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), + "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), + } + ) + construct_arguments = [ ( schema.Index, @@ -2774,6 +2847,18 @@ class PGDialect(default.DefaultDialect): cursor.close() return val.upper() + def set_readonly(self, connection, value): + raise NotImplementedError() + + def get_readonly(self, connection): + raise NotImplementedError() + + def set_deferrable(self, connection, value): + raise NotImplementedError() + + def get_deferrable(self, connection): + raise NotImplementedError() + def do_begin_twophase(self, connection, xid): self.do_begin(connection.connection) diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py index e08332a57..fd70828ff 100644 --- a/lib/sqlalchemy/dialects/postgresql/pg8000.py +++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py @@ -359,6 +359,48 @@ class PGDialect_pg8000(PGDialect): % (level, self.name, ", ".join(self._isolation_lookup)) ) + def set_readonly(self, connection, value): + cursor = connection.cursor() + try: + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION %s" + % ("READ ONLY" if value else "READ WRITE") + ) + cursor.execute("COMMIT") + finally: + cursor.close() + + def get_readonly(self, connection): + cursor = connection.cursor() + try: + cursor.execute("show transaction_read_only") + val = cursor.fetchone()[0] + finally: + cursor.close() + + return val == "yes" + + def set_deferrable(self, connection, value): + cursor = connection.cursor() + try: + cursor.execute( + "SET SESSION CHARACTERISTICS AS TRANSACTION %s" + % ("DEFERRABLE" if value else "NOT DEFERRABLE") + ) + cursor.execute("COMMIT") + finally: + cursor.close() + + def get_deferrable(self, connection): + cursor = connection.cursor() + try: + cursor.execute("show transaction_deferrable") + val = cursor.fetchone()[0] + finally: + cursor.close() + + return val == "yes" + def set_client_encoding(self, connection, client_encoding): # adjust for ConnectionFairy possibly being present if hasattr(connection, "connection"): diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 2161b24fc..3cc62fc93 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -803,6 +803,18 @@ class PGDialect_psycopg2(PGDialect): connection.set_isolation_level(level) + def set_readonly(self, connection, value): + connection.readonly = value + + def get_readonly(self, connection): + return connection.readonly + + def set_deferrable(self, connection, value): + connection.deferrable = value + + def get_deferrable(self, connection): + return connection.deferrable + def on_connect(self): extras = self._psycopg2_extras() extensions = self._psycopg2_extensions() diff --git a/lib/sqlalchemy/engine/characteristics.py b/lib/sqlalchemy/engine/characteristics.py new file mode 100644 index 000000000..c00bff40d --- /dev/null +++ b/lib/sqlalchemy/engine/characteristics.py @@ -0,0 +1,56 @@ +import abc + +from ..util import ABC + + +class ConnectionCharacteristic(ABC): + """An abstract base for an object that can set, get and reset a + per-connection characteristic, typically one that gets reset when the + connection is returned to the connection pool. + + transaction isolation is the canonical example, and the + ``IsolationLevelCharacteristic`` implementation provides this for the + ``DefaultDialect``. + + The ``ConnectionCharacteristic`` class should call upon the ``Dialect`` for + the implementation of each method. The object exists strictly to serve as + a dialect visitor that can be placed into the + ``DefaultDialect.connection_characteristics`` dictionary where it will take + effect for calls to :meth:`_engine.Connection.execution_options` and + related APIs. + + .. versionadded:: 1.4 + + """ + + __slots__ = () + + transactional = False + + @abc.abstractmethod + def reset_characteristic(self, dialect, dbapi_conn): + """Reset the characteristic on the connection to its default value.""" + + @abc.abstractmethod + def set_characteristic(self, dialect, dbapi_conn, value): + """set characteristic on the connection to a given value.""" + + @abc.abstractmethod + def get_characteristic(self, dialect, dbapi_conn): + """Given a DBAPI connection, get the current value of the + characteristic. + + """ + + +class IsolationLevelCharacteristic(ConnectionCharacteristic): + transactional = True + + def reset_characteristic(self, dialect, dbapi_conn): + dialect.reset_isolation_level(dbapi_conn) + + def set_characteristic(self, dialect, dbapi_conn, value): + dialect.set_isolation_level(dbapi_conn, value) + + def get_characteristic(self, dialect, dbapi_conn): + return dialect.get_isolation_level(dbapi_conn) diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 564258a28..0bd4cd14c 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -14,10 +14,12 @@ as the base class for their own corresponding classes. """ import codecs +import functools import random import re import weakref +from . import characteristics from . import cursor as _cursor from . import interfaces from .. import event @@ -85,6 +87,10 @@ class DefaultDialect(interfaces.Dialect): tuple_in_values = False + connection_characteristics = util.immutabledict( + {"isolation_level": characteristics.IsolationLevelCharacteristic()} + ) + engine_config_types = util.immutabledict( [ ("convert_unicode", util.bool_or_str("force")), @@ -513,38 +519,76 @@ class DefaultDialect(interfaces.Dialect): return [[], opts] def set_engine_execution_options(self, engine, opts): - if "isolation_level" in opts: - isolation_level = opts["isolation_level"] + supported_names = set(self.connection_characteristics).intersection( + opts + ) + if supported_names: + characteristics = util.immutabledict( + (name, opts[name]) for name in supported_names + ) @event.listens_for(engine, "engine_connect") - def set_isolation(connection, branch): + def set_connection_characteristics(connection, branch): if not branch: - self._set_connection_isolation(connection, isolation_level) + self._set_connection_characteristics( + connection, characteristics + ) def set_connection_execution_options(self, connection, opts): - if "isolation_level" in opts: - self._set_connection_isolation(connection, opts["isolation_level"]) + supported_names = set(self.connection_characteristics).intersection( + opts + ) + if supported_names: + characteristics = util.immutabledict( + (name, opts[name]) for name in supported_names + ) + self._set_connection_characteristics(connection, characteristics) + + def _set_connection_characteristics(self, connection, characteristics): + + characteristic_values = [ + (name, self.connection_characteristics[name], value) + for name, value in characteristics.items() + ] - def _set_connection_isolation(self, connection, level): if connection.in_transaction(): - if connection._is_future: - raise exc.InvalidRequestError( - "This connection has already begun a transaction; " - "isolation level may not be altered until transaction end" - ) - else: - util.warn( - "Connection is already established with a Transaction; " - "setting isolation_level may implicitly rollback or " - "commit " - "the existing transaction, or have no effect until " - "next transaction" - ) - self.set_isolation_level(connection.connection, level) + trans_objs = [ + (name, obj) + for name, obj, value in characteristic_values + if obj.transactional + ] + if trans_objs: + if connection._is_future: + raise exc.InvalidRequestError( + "This connection has already begun a transaction; " + "%s may not be altered until transaction end" + % (", ".join(name for name, obj in trans_objs)) + ) + else: + util.warn( + "Connection is already established with a " + "Transaction; " + "setting %s may implicitly rollback or " + "commit " + "the existing transaction, or have no effect until " + "next transaction" + % (", ".join(name for name, obj in trans_objs)) + ) + + dbapi_connection = connection.connection.connection + for name, characteristic, value in characteristic_values: + characteristic.set_characteristic(self, dbapi_connection, value) connection.connection._connection_record.finalize_callback.append( - self.reset_isolation_level + functools.partial(self._reset_characteristics, characteristics) ) + def _reset_characteristics(self, characteristics, dbapi_connection): + for characteristic_name in characteristics: + characteristic = self.connection_characteristics[ + characteristic_name + ] + characteristic.reset_characteristic(self, dbapi_connection) + def do_begin(self, dbapi_connection): pass diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py index 1e3eb9a29..1d92084cc 100644 --- a/lib/sqlalchemy/util/__init__.py +++ b/lib/sqlalchemy/util/__init__.py @@ -44,6 +44,7 @@ from ._collections import UniqueAppender # noqa from ._collections import update_copy # noqa from ._collections import WeakPopulateDict # noqa from ._collections import WeakSequence # noqa +from .compat import ABC # noqa from .compat import arm # noqa from .compat import b # noqa from .compat import b64decode # noqa diff --git a/lib/sqlalchemy/util/compat.py b/lib/sqlalchemy/util/compat.py index caa97f72b..e1d0e6444 100644 --- a/lib/sqlalchemy/util/compat.py +++ b/lib/sqlalchemy/util/compat.py @@ -193,6 +193,9 @@ if py3k: # Unused. Kept for backwards compatibility. callable = callable # noqa + + from abc import ABC + else: import base64 import ConfigParser as configparser # noqa @@ -208,6 +211,11 @@ else: from urllib import unquote_plus # noqa from urlparse import parse_qsl # noqa + from abc import ABCMeta + + class ABC(object): + __metaclass__ = ABCMeta + try: import cPickle as pickle except ImportError: diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index 6eaa3295b..83bc6cac8 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -735,6 +735,112 @@ class MiscBackendTest( ".".join(str(x) for x in v) ) + def test_readonly_flag_connection(self): + with testing.db.connect() as conn: + # asyncpg requires serializable for readonly.. + conn = conn.execution_options( + isolation_level="SERIALIZABLE", postgresql_readonly=True + ) + + dbapi_conn = conn.connection.connection + + cursor = dbapi_conn.cursor() + cursor.execute("show transaction_read_only") + val = cursor.fetchone()[0] + cursor.close() + eq_(val, "on") + + cursor = dbapi_conn.cursor() + try: + cursor.execute("show transaction_read_only") + val = cursor.fetchone()[0] + finally: + cursor.close() + dbapi_conn.rollback() + eq_(val, "off") + + def test_deferrable_flag_connection(self): + with testing.db.connect() as conn: + # asyncpg but not for deferrable? which the PG docs actually + # state. weird + conn = conn.execution_options( + isolation_level="SERIALIZABLE", postgresql_deferrable=True + ) + + dbapi_conn = conn.connection.connection + + cursor = dbapi_conn.cursor() + cursor.execute("show transaction_deferrable") + val = cursor.fetchone()[0] + cursor.close() + eq_(val, "on") + + cursor = dbapi_conn.cursor() + try: + cursor.execute("show transaction_deferrable") + val = cursor.fetchone()[0] + finally: + cursor.close() + dbapi_conn.rollback() + eq_(val, "off") + + def test_readonly_flag_engine(self): + engine = engines.testing_engine( + options={ + "execution_options": dict( + isolation_level="SERIALIZABLE", postgresql_readonly=True + ) + } + ) + for i in range(2): + with engine.connect() as conn: + dbapi_conn = conn.connection.connection + + cursor = dbapi_conn.cursor() + cursor.execute("show transaction_read_only") + val = cursor.fetchone()[0] + cursor.close() + eq_(val, "on") + + cursor = dbapi_conn.cursor() + try: + cursor.execute("show transaction_read_only") + val = cursor.fetchone()[0] + finally: + cursor.close() + dbapi_conn.rollback() + eq_(val, "off") + + def test_deferrable_flag_engine(self): + engine = engines.testing_engine( + options={ + "execution_options": dict( + isolation_level="SERIALIZABLE", postgresql_deferrable=True + ) + } + ) + + for i in range(2): + with engine.connect() as conn: + # asyncpg but not for deferrable? which the PG docs actually + # state. weird + dbapi_conn = conn.connection.connection + + cursor = dbapi_conn.cursor() + cursor.execute("show transaction_deferrable") + val = cursor.fetchone()[0] + cursor.close() + eq_(val, "on") + + cursor = dbapi_conn.cursor() + try: + cursor.execute("show transaction_deferrable") + val = cursor.fetchone()[0] + finally: + cursor.close() + dbapi_conn.rollback() + eq_(val, "off") + @testing.requires.psycopg2_compatibility def test_psycopg2_non_standard_err(self): # note that psycopg2 is sometimes called psycopg2cffi diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index cd144e45f..26ccfdfd3 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -7,12 +7,17 @@ from sqlalchemy import func from sqlalchemy import INT from sqlalchemy import Integer from sqlalchemy import MetaData +from sqlalchemy import pool as _pool from sqlalchemy import select from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import util from sqlalchemy import VARCHAR +from sqlalchemy.engine import base +from sqlalchemy.engine import characteristics +from sqlalchemy.engine import default +from sqlalchemy.engine import url from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ @@ -1374,6 +1379,150 @@ class IsolationLevelTest(fixtures.TestBase): eq_(c2.get_isolation_level(), self._non_default_isolation_level()) +class ConnectionCharacteristicTest(fixtures.TestBase): + @testing.fixture + def characteristic_fixture(self): + class FooCharacteristic(characteristics.ConnectionCharacteristic): + transactional = True + + def reset_characteristic(self, dialect, dbapi_conn): + + dialect.reset_foo(dbapi_conn) + + def set_characteristic(self, dialect, dbapi_conn, value): + + dialect.set_foo(dbapi_conn, value) + + def get_characteristic(self, dialect, dbapi_conn): + return dialect.get_foo(dbapi_conn) + + class FooDialect(default.DefaultDialect): + connection_characteristics = util.immutabledict( + {"foo": FooCharacteristic()} + ) + + def reset_foo(self, dbapi_conn): + dbapi_conn.foo = "original_value" + + def set_foo(self, dbapi_conn, value): + dbapi_conn.foo = value + + def get_foo(self, dbapi_conn): + return dbapi_conn.foo + + connection = mock.Mock() + + def creator(): + connection.foo = "original_value" + return connection + + pool = _pool.SingletonThreadPool(creator=creator) + u = url.make_url("foo://") + return base.Engine(pool, FooDialect(), u), connection + + def test_engine_param_stays(self, characteristic_fixture): + + engine, connection = characteristic_fixture + + foo_level = engine.dialect.get_foo(engine.connect().connection) + + new_level = "new_level" + + ne_(foo_level, new_level) + + eng = engine.execution_options(foo=new_level) + eq_(eng.dialect.get_foo(eng.connect().connection), new_level) + + # check that it stays + conn = eng.connect() + eq_(eng.dialect.get_foo(conn.connection), new_level) + conn.close() + + conn = eng.connect() + eq_(eng.dialect.get_foo(conn.connection), new_level) + conn.close() + + def test_default_level(self, characteristic_fixture): + engine, connection = characteristic_fixture + + eq_( + engine.dialect.get_foo(engine.connect().connection), + "original_value", + ) + + def test_connection_invalidated(self, characteristic_fixture): + engine, connection = characteristic_fixture + + conn = engine.connect() + c2 = conn.execution_options(foo="new_value") + eq_(connection.foo, "new_value") + c2.invalidate() + c2.connection + + eq_(connection.foo, "original_value") + + def test_warning_in_transaction(self, characteristic_fixture): + engine, connection = characteristic_fixture + + c1 = engine.connect() + with expect_warnings( + "Connection is already established with a Transaction; " + "setting foo may implicitly rollback or commit " + "the existing transaction, or have no effect until next " + "transaction" + ): + with c1.begin(): + c1 = c1.execution_options(foo="new_foo") + + eq_( + engine.dialect.get_foo(c1.connection), "new_foo", + ) + # stays outside of transaction + eq_(engine.dialect.get_foo(c1.connection), "new_foo") + + @testing.fails("no error is raised yet here.") + def test_per_statement_bzzt(self, characteristic_fixture): + engine, connection = characteristic_fixture + + # this would need some on-execute mechanism to look inside of + # the characteristics list. unfortunately this would + # add some latency. + + assert_raises_message( + exc.ArgumentError, + r"'foo' execution option may only be specified " + r"on Connection.execution_options\(\), or " + r"per-engine using the isolation_level " + r"argument to create_engine\(\).", + connection.execute, + select([1]).execution_options(foo="bar"), + ) + + def test_per_engine(self, characteristic_fixture): + + engine, connection = characteristic_fixture + + pool, dialect, url = engine.pool, engine.dialect, engine.url + + eng = base.Engine( + pool, dialect, url, execution_options={"foo": "new_value"} + ) + + conn = eng.connect() + eq_(eng.dialect.get_foo(conn.connection), "new_value") + + def test_per_option_engine(self, characteristic_fixture): + + engine, connection = characteristic_fixture + + eng = engine.execution_options(foo="new_value") + + conn = eng.connect() + eq_( + eng.dialect.get_foo(conn.connection), "new_value", + ) + + class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): """Still some debate over if the "reset agent" should apply to the future connection or not. @@ -1586,7 +1735,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "This connection has already begun a transaction; " - "isolation level may not be altered until transaction end", + "isolation_level may not be altered until transaction end", conn.execution_options, isolation_level="AUTOCOMMIT", ) @@ -1600,7 +1749,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "This connection has already begun a transaction; " - "isolation level may not be altered until transaction end", + "isolation_level may not be altered until transaction end", conn.execution_options, isolation_level="AUTOCOMMIT", ) |