summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-11-17 17:13:24 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2020-11-19 13:40:54 -0500
commit9b779611f9bafd6c0affafda9732cecdb8efa761 (patch)
tree41e98e407c5ca7872d3a97ce62214047c9c39bcf /lib/sqlalchemy
parent7082e4c447c664af43a6576f5749c97a9951d7dd (diff)
downloadsqlalchemy-9b779611f9bafd6c0affafda9732cecdb8efa761.tar.gz
Support pool.connect() event firing before all else
Fixed regression where a connection pool event specified with a keyword, most notably ``insert=True``, would be lost when the event were set up. This would prevent startup events that need to fire before dialect-level events from working correctly. The internal mechanics of the engine connection routine has been altered such that it's now guaranteed that a user-defined event handler for the :meth:`_pool.PoolEvents.connect` handler, when established using ``insert=True``, will allow an event handler to run that is definitely invoked **before** any dialect-specific initialization starts up, most notably when it does things like detect default schema name. Previously, this would occur in most cases but not unconditionally. A new example is added to the schema documentation illustrating how to establish the "default schema name" within an on-connect event (upcoming as part of I882edd5bbe06ee5b4d0a9c148854a57b2bcd4741) Addiional changes to support setting default schema name: The Oracle dialect now uses ``select sys_context( 'userenv', 'current_schema' ) from dual`` to get the default schema name, rather than ``SELECT USER FROM DUAL``, to accommodate for changes to the session-local schema name under Oracle. Added a read/write ``.autocommit`` attribute to the DBAPI-adaptation layer for the asyncpg dialect. This so that when working with DBAPI-specific schemes that need to use "autocommit" directly with the DBAPI connection, the same ``.autocommit`` attribute which works with both psycopg2 as well as pg8000 is available. Fixes: #5716 Fixes: #5708 Change-Id: I7dce56b4345ffc720e25e2aaccb7e42bb29e5671
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py4
-rw-r--r--lib/sqlalchemy/dialects/oracle/provision.py10
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py15
-rw-r--r--lib/sqlalchemy/dialects/postgresql/provision.py13
-rw-r--r--lib/sqlalchemy/engine/create.py19
-rw-r--r--lib/sqlalchemy/pool/events.py4
-rw-r--r--lib/sqlalchemy/testing/provision.py8
-rw-r--r--lib/sqlalchemy/testing/requirements.py7
-rw-r--r--lib/sqlalchemy/testing/suite/test_dialect.py76
-rw-r--r--lib/sqlalchemy/util/__init__.py1
-rw-r--r--lib/sqlalchemy/util/compat.py18
11 files changed, 160 insertions, 15 deletions
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
index 7bdaafd82..c0f7aa5ce 100644
--- a/lib/sqlalchemy/dialects/oracle/base.py
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -1587,7 +1587,9 @@ class OracleDialect(default.DefaultDialect):
def _get_default_schema_name(self, connection):
return self.normalize_name(
- connection.exec_driver_sql("SELECT USER FROM DUAL").scalar()
+ connection.exec_driver_sql(
+ "select sys_context( 'userenv', 'current_schema' ) from dual"
+ ).scalar()
)
def _resolve_synonym(
diff --git a/lib/sqlalchemy/dialects/oracle/provision.py b/lib/sqlalchemy/dialects/oracle/provision.py
index 01854fdce..d19dfc9fe 100644
--- a/lib/sqlalchemy/dialects/oracle/provision.py
+++ b/lib/sqlalchemy/dialects/oracle/provision.py
@@ -7,6 +7,7 @@ from ...testing.provision import drop_db
from ...testing.provision import follower_url_from_main
from ...testing.provision import log
from ...testing.provision import run_reap_dbs
+from ...testing.provision import set_default_schema_on_connection
from ...testing.provision import temp_table_keyword_args
from ...testing.provision import update_db_opts
@@ -106,3 +107,12 @@ def _oracle_temp_table_keyword_args(cfg, eng):
"prefixes": ["GLOBAL TEMPORARY"],
"oracle_on_commit": "PRESERVE ROWS",
}
+
+
+@set_default_schema_on_connection.for_db("oracle")
+def _oracle_set_default_schema_on_connection(
+ cfg, dbapi_connection, schema_name
+):
+ cursor = dbapi_connection.cursor()
+ cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name)
+ cursor.close()
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 569024790..412feda0f 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -491,7 +491,7 @@ class AsyncAdapt_asyncpg_connection:
def __init__(self, dbapi, connection):
self.dbapi = dbapi
self._connection = connection
- self.isolation_level = "read_committed"
+ self.isolation_level = self._isolation_setting = "read_committed"
self.readonly = False
self.deferrable = False
self._transaction = None
@@ -512,10 +512,21 @@ class AsyncAdapt_asyncpg_connection:
else:
raise error
+ @property
+ def autocommit(self):
+ return self.isolation_level == "autocommit"
+
+ @autocommit.setter
+ def autocommit(self, value):
+ if value:
+ self.isolation_level = "autocommit"
+ else:
+ self.isolation_level = self._isolation_setting
+
def set_isolation_level(self, level):
if self._started:
self.rollback()
- self.isolation_level = level
+ self.isolation_level = self._isolation_setting = level
async def _start_transaction(self):
if self.isolation_level == "autocommit":
diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py
index 6c6dc4be6..9433ec458 100644
--- a/lib/sqlalchemy/dialects/postgresql/provision.py
+++ b/lib/sqlalchemy/dialects/postgresql/provision.py
@@ -5,6 +5,7 @@ from ... import text
from ...testing.provision import create_db
from ...testing.provision import drop_db
from ...testing.provision import log
+from ...testing.provision import set_default_schema_on_connection
from ...testing.provision import temp_table_keyword_args
@@ -64,3 +65,15 @@ def _pg_drop_db(cfg, eng, ident):
@temp_table_keyword_args.for_db("postgresql")
def _postgresql_temp_table_keyword_args(cfg, eng):
return {"prefixes": ["TEMPORARY"]}
+
+
+@set_default_schema_on_connection.for_db("postgresql")
+def _postgresql_set_default_schema_on_connection(
+ cfg, dbapi_connection, schema_name
+):
+ existing_autocommit = dbapi_connection.autocommit
+ dbapi_connection.autocommit = True
+ cursor = dbapi_connection.cursor()
+ cursor.execute("SET SESSION search_path='%s'" % schema_name)
+ cursor.close()
+ dbapi_connection.autocommit = existing_autocommit
diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py
index 7f5b5e8f5..786f8f5d6 100644
--- a/lib/sqlalchemy/engine/create.py
+++ b/lib/sqlalchemy/engine/create.py
@@ -657,17 +657,14 @@ def create_engine(url, **kwargs):
dialect.initialize(c)
dialect.do_rollback(c.connection)
- if do_on_connect:
- event.listen(
- pool, "connect", first_connect, _once_unless_exception=True
- )
- else:
- event.listen(
- pool,
- "first_connect",
- first_connect,
- _once_unless_exception=True,
- )
+ # previously, the "first_connect" event was used here, which was then
+ # scaled back if the "on_connect" handler were present. now,
+ # since "on_connect" is virtually always present, just use
+ # "connect" event with once_unless_exception in all cases so that
+ # the connection event flow is consistent in all cases.
+ event.listen(
+ pool, "connect", first_connect, _once_unless_exception=True
+ )
dialect_cls.engine_created(engine)
if entrypoint is not dialect_cls:
diff --git a/lib/sqlalchemy/pool/events.py b/lib/sqlalchemy/pool/events.py
index 9443877a9..363afdd78 100644
--- a/lib/sqlalchemy/pool/events.py
+++ b/lib/sqlalchemy/pool/events.py
@@ -58,7 +58,9 @@ class PoolEvents(event.Events):
def _listen(cls, event_key, **kw):
target = event_key.dispatch_target
- event_key.base_listen(asyncio=target._is_asyncio)
+ kw.setdefault("asyncio", target._is_asyncio)
+
+ event_key.base_listen(**kw)
def connect(self, dbapi_connection, connection_record):
"""Called at the moment a particular DBAPI connection is first
diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py
index 678def327..9b5fa0255 100644
--- a/lib/sqlalchemy/testing/provision.py
+++ b/lib/sqlalchemy/testing/provision.py
@@ -313,3 +313,11 @@ def get_temp_table_name(cfg, eng, base_name):
use. The mssql dialect's implementation will need a "#" prepended.
"""
return base_name
+
+
+@register.init
+def set_default_schema_on_connection(cfg, dbapi_connection, schema_name):
+ raise NotImplementedError(
+ "backend does not implement a schema name set function: %s"
+ % (cfg.db.url,)
+ )
diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py
index bd2d4eaf9..7b938c993 100644
--- a/lib/sqlalchemy/testing/requirements.py
+++ b/lib/sqlalchemy/testing/requirements.py
@@ -458,6 +458,13 @@ class SuiteRequirements(Requirements):
return exclusions.closed()
@property
+ def default_schema_name_switch(self):
+ """target dialect implements provisioning module including
+ set_default_schema_on_connection"""
+
+ return exclusions.closed()
+
+ @property
def server_side_cursors(self):
"""Target dialect must support server side cursors."""
diff --git a/lib/sqlalchemy/testing/suite/test_dialect.py b/lib/sqlalchemy/testing/suite/test_dialect.py
index c5ede08c6..7f697b915 100644
--- a/lib/sqlalchemy/testing/suite/test_dialect.py
+++ b/lib/sqlalchemy/testing/suite/test_dialect.py
@@ -1,12 +1,15 @@
#! coding: utf-8
+from sqlalchemy import event
from .. import assert_raises
from .. import config
+from .. import engines
from .. import eq_
from .. import fixtures
from .. import ne_
from .. import provide_metadata
from ..config import requirements
+from ..provision import set_default_schema_on_connection
from ..schema import Column
from ..schema import Table
from ... import exc
@@ -209,3 +212,76 @@ class EscapingTest(fixtures.TestBase):
),
"some %% other value",
)
+
+
+class WeCanSetDefaultSchemaWEventsTest(fixtures.TestBase):
+ __backend__ = True
+
+ __requires__ = ("default_schema_name_switch",)
+
+ def test_control_case(self):
+ default_schema_name = config.db.dialect.default_schema_name
+
+ eng = engines.testing_engine()
+ with eng.connect():
+ pass
+
+ eq_(eng.dialect.default_schema_name, default_schema_name)
+
+ def test_wont_work_wo_insert(self):
+ default_schema_name = config.db.dialect.default_schema_name
+
+ eng = engines.testing_engine()
+
+ @event.listens_for(eng, "connect")
+ def on_connect(dbapi_connection, connection_record):
+ set_default_schema_on_connection(
+ config, dbapi_connection, config.test_schema
+ )
+
+ with eng.connect() as conn:
+ what_it_should_be = eng.dialect._get_default_schema_name(conn)
+ eq_(what_it_should_be, config.test_schema)
+
+ eq_(eng.dialect.default_schema_name, default_schema_name)
+
+ def test_schema_change_on_connect(self):
+ eng = engines.testing_engine()
+
+ @event.listens_for(eng, "connect", insert=True)
+ def on_connect(dbapi_connection, connection_record):
+ set_default_schema_on_connection(
+ config, dbapi_connection, config.test_schema
+ )
+
+ with eng.connect() as conn:
+ what_it_should_be = eng.dialect._get_default_schema_name(conn)
+ eq_(what_it_should_be, config.test_schema)
+
+ eq_(eng.dialect.default_schema_name, config.test_schema)
+
+ def test_schema_change_works_w_transactions(self):
+ eng = engines.testing_engine()
+
+ @event.listens_for(eng, "connect", insert=True)
+ def on_connect(dbapi_connection, *arg):
+ set_default_schema_on_connection(
+ config, dbapi_connection, config.test_schema
+ )
+
+ with eng.connect() as conn:
+ trans = conn.begin()
+ what_it_should_be = eng.dialect._get_default_schema_name(conn)
+ eq_(what_it_should_be, config.test_schema)
+ trans.rollback()
+
+ what_it_should_be = eng.dialect._get_default_schema_name(conn)
+ eq_(what_it_should_be, config.test_schema)
+
+ eq_(eng.dialect.default_schema_name, config.test_schema)
+
+
+class FutureWeCanSetDefaultSchemaWEventsTest(
+ fixtures.FutureEngineMixin, WeCanSetDefaultSchemaWEventsTest
+):
+ pass
diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py
index 7c5257b87..d632f7fed 100644
--- a/lib/sqlalchemy/util/__init__.py
+++ b/lib/sqlalchemy/util/__init__.py
@@ -66,6 +66,7 @@ from .compat import itertools_filter # noqa
from .compat import itertools_filterfalse # noqa
from .compat import namedtuple # noqa
from .compat import next # noqa
+from .compat import nullcontext # noqa
from .compat import osx # noqa
from .compat import parse_qsl # noqa
from .compat import perf_counter # noqa
diff --git a/lib/sqlalchemy/util/compat.py b/lib/sqlalchemy/util/compat.py
index 1480bbeee..e8c488047 100644
--- a/lib/sqlalchemy/util/compat.py
+++ b/lib/sqlalchemy/util/compat.py
@@ -47,6 +47,24 @@ FullArgSpec = collections.namedtuple(
],
)
+
+class nullcontext(object):
+ """Context manager that does no additional processing.
+
+ Vendored from Python 3.7.
+
+ """
+
+ def __init__(self, enter_result=None):
+ self.enter_result = enter_result
+
+ def __enter__(self):
+ return self.enter_result
+
+ def __exit__(self, *excinfo):
+ pass
+
+
try:
import threading
except ImportError: