summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r--lib/sqlalchemy/engine/base.py698
1 files changed, 445 insertions, 253 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 8c99f6309..5ce531338 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -10,13 +10,24 @@ import contextlib
import sys
import typing
from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import Iterator
+from typing import List
from typing import Mapping
+from typing import MutableMapping
+from typing import NoReturn
from typing import Optional
+from typing import Tuple
+from typing import Type
from typing import Union
from .interfaces import BindTyping
from .interfaces import ConnectionEventsTarget
+from .interfaces import DBAPICursor
from .interfaces import ExceptionContext
+from .interfaces import ExecutionContext
from .util import _distill_params_20
from .util import _distill_raw_params
from .util import TransactionalContext
@@ -26,22 +37,48 @@ from .. import log
from .. import util
from ..sql import compiler
from ..sql import util as sql_util
-from ..sql._typing import _ExecuteOptions
-from ..sql._typing import _ExecuteParams
+
+_CompiledCacheType = MutableMapping[Any, Any]
if typing.TYPE_CHECKING:
+ from . import Result
+ from . import ScalarResult
+ from .interfaces import _AnyExecuteParams
+ from .interfaces import _AnyMultiExecuteParams
+ from .interfaces import _AnySingleExecuteParams
+ from .interfaces import _CoreAnyExecuteParams
+ from .interfaces import _CoreMultiExecuteParams
+ from .interfaces import _CoreSingleExecuteParams
+ from .interfaces import _DBAPIAnyExecuteParams
+ from .interfaces import _DBAPIMultiExecuteParams
+ from .interfaces import _DBAPISingleExecuteParams
+ from .interfaces import _ExecuteOptions
+ from .interfaces import _ExecuteOptionsParameter
+ from .interfaces import _SchemaTranslateMapType
from .interfaces import Dialect
from .reflection import Inspector # noqa
from .url import URL
+ from ..event import dispatcher
+ from ..log import _EchoFlagType
+ from ..pool import _ConnectionFairy
from ..pool import Pool
from ..pool import PoolProxiedConnection
+ from ..sql import Executable
+ from ..sql.base import SchemaVisitor
+ from ..sql.compiler import Compiled
+ from ..sql.ddl import DDLElement
+ from ..sql.ddl import SchemaDropper
+ from ..sql.ddl import SchemaGenerator
+ from ..sql.functions import FunctionElement
+ from ..sql.schema import ColumnDefault
+ from ..sql.schema import HasSchemaAttr
"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.
"""
-_EMPTY_EXECUTION_OPTS = util.immutabledict()
-NO_OPTIONS = util.immutabledict()
+_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.immutabledict()
+NO_OPTIONS: Mapping[str, Any] = util.immutabledict()
class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
@@ -69,23 +106,32 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
+ dispatch: dispatcher[ConnectionEventsTarget]
+
_sqla_logger_namespace = "sqlalchemy.engine.Connection"
# used by sqlalchemy.engine.util.TransactionalContext
- _trans_context_manager = None
+ _trans_context_manager: Optional[TransactionalContext] = None
# legacy as of 2.0, should be eventually deprecated and
# removed. was used in the "pre_ping" recipe that's been in the docs
# a long time
should_close_with_result = False
+ _dbapi_connection: Optional[PoolProxiedConnection]
+
+ _execution_options: _ExecuteOptions
+
+ _transaction: Optional[RootTransaction]
+ _nested_transaction: Optional[NestedTransaction]
+
def __init__(
self,
- engine,
- connection=None,
- _has_events=None,
- _allow_revalidate=True,
- _allow_autobegin=True,
+ engine: Engine,
+ connection: Optional[PoolProxiedConnection] = None,
+ _has_events: Optional[bool] = None,
+ _allow_revalidate: bool = True,
+ _allow_autobegin: bool = True,
):
"""Construct a new Connection."""
self.engine = engine
@@ -125,14 +171,14 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.dispatch.engine_connect(self)
@util.memoized_property
- def _message_formatter(self):
+ def _message_formatter(self) -> Any:
if "logging_token" in self._execution_options:
token = self._execution_options["logging_token"]
return lambda msg: "[%s] %s" % (token, msg)
else:
return None
- def _log_info(self, message, *arg, **kw):
+ def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
fmt = self._message_formatter
if fmt:
@@ -143,7 +189,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.engine.logger.info(message, *arg, **kw)
- def _log_debug(self, message, *arg, **kw):
+ def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
fmt = self._message_formatter
if fmt:
@@ -155,19 +201,19 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.engine.logger.debug(message, *arg, **kw)
@property
- def _schema_translate_map(self):
+ def _schema_translate_map(self) -> Optional[_SchemaTranslateMapType]:
return self._execution_options.get("schema_translate_map", None)
- def schema_for_object(self, obj):
+ def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
"""Return the schema name for the given schema item taking into
account current schema translate map.
"""
name = obj.schema
- schema_translate_map = self._execution_options.get(
- "schema_translate_map", None
- )
+ schema_translate_map: Optional[
+ Mapping[Optional[str], str]
+ ] = self._execution_options.get("schema_translate_map", None)
if (
schema_translate_map
@@ -178,13 +224,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
else:
return name
- def __enter__(self):
+ def __enter__(self) -> Connection:
return self
- def __exit__(self, type_, value, traceback):
+ def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
self.close()
- def execution_options(self, **opt):
+ def execution_options(self, **opt: Any) -> Connection:
r"""Set non-SQL options for the connection which take effect
during execution.
@@ -346,13 +392,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
ORM-specific execution options
""" # noqa
- self._execution_options = self._execution_options.union(opt)
if self._has_events or self.engine._has_events:
self.dispatch.set_connection_execution_options(self, opt)
+ self._execution_options = self._execution_options.union(opt)
self.dialect.set_connection_execution_options(self, opt)
return self
- def get_execution_options(self):
+ def get_execution_options(self) -> _ExecuteOptions:
"""Get the non-SQL options which will take effect during execution.
.. versionadded:: 1.3
@@ -364,14 +410,27 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return self._execution_options
@property
- def closed(self):
+ def _still_open_and_dbapi_connection_is_valid(self) -> bool:
+ pool_proxied_connection = self._dbapi_connection
+ return (
+ pool_proxied_connection is not None
+ and pool_proxied_connection.is_valid
+ )
+
+ @property
+ def closed(self) -> bool:
"""Return True if this connection is closed."""
return self._dbapi_connection is None and not self.__can_reconnect
@property
- def invalidated(self):
- """Return True if this connection was invalidated."""
+ def invalidated(self) -> bool:
+ """Return True if this connection was invalidated.
+
+ This does not indicate whether or not the connection was
+ invalidated at the pool level, however
+
+ """
# prior to 1.4, "invalid" was stored as a state independent of
# "closed", meaning an invalidated connection could be "closed",
@@ -382,10 +441,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
# "closed" does not need to be "invalid". So the state is now
# represented by the two facts alone.
- return self._dbapi_connection is None and not self.closed
+ pool_proxied_connection = self._dbapi_connection
+ return pool_proxied_connection is None and self.__can_reconnect
@property
- def connection(self) -> "PoolProxiedConnection":
+ def connection(self) -> PoolProxiedConnection:
"""The underlying DB-API connection managed by this Connection.
This is a SQLAlchemy connection-pool proxied connection
@@ -410,7 +470,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
else:
return self._dbapi_connection
- def get_isolation_level(self):
+ def get_isolation_level(self) -> str:
"""Return the current isolation level assigned to this
:class:`_engine.Connection`.
@@ -442,15 +502,15 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
- set per :class:`_engine.Connection` isolation level
"""
+ dbapi_connection = self.connection.dbapi_connection
+ assert dbapi_connection is not None
try:
- return self.dialect.get_isolation_level(
- self.connection.dbapi_connection
- )
+ return self.dialect.get_isolation_level(dbapi_connection)
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
@property
- def default_isolation_level(self):
+ def default_isolation_level(self) -> str:
"""The default isolation level assigned to this
:class:`_engine.Connection`.
@@ -482,7 +542,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
return self.dialect.default_isolation_level
- def _invalid_transaction(self):
+ def _invalid_transaction(self) -> NoReturn:
raise exc.PendingRollbackError(
"Can't reconnect until invalid %stransaction is rolled "
"back. Please rollback() fully before proceeding"
@@ -490,7 +550,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
code="8s2b",
)
- def _revalidate_connection(self):
+ def _revalidate_connection(self) -> PoolProxiedConnection:
if self.__can_reconnect and self.invalidated:
if self._transaction is not None:
self._invalid_transaction()
@@ -499,13 +559,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
raise exc.ResourceClosedError("This Connection is closed")
@property
- def _still_open_and_dbapi_connection_is_valid(self):
- return self._dbapi_connection is not None and getattr(
- self._dbapi_connection, "is_valid", False
- )
-
- @property
- def info(self):
+ def info(self) -> Dict[str, Any]:
"""Info dictionary associated with the underlying DBAPI connection
referred to by this :class:`_engine.Connection`, allowing user-defined
data to be associated with the connection.
@@ -518,7 +572,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return self.connection.info
- def invalidate(self, exception=None):
+ def invalidate(self, exception: Optional[BaseException] = None) -> None:
"""Invalidate the underlying DBAPI connection associated with
this :class:`_engine.Connection`.
@@ -567,14 +621,18 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if self.invalidated:
return
+ # MARKMARK
if self.closed:
raise exc.ResourceClosedError("This Connection is closed")
if self._still_open_and_dbapi_connection_is_valid:
- self._dbapi_connection.invalidate(exception)
+ pool_proxied_connection = self._dbapi_connection
+ assert pool_proxied_connection is not None
+ pool_proxied_connection.invalidate(exception)
+
self._dbapi_connection = None
- def detach(self):
+ def detach(self) -> None:
"""Detach the underlying DB-API connection from its connection pool.
E.g.::
@@ -600,13 +658,21 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
- self._dbapi_connection.detach()
+ if self.closed:
+ raise exc.ResourceClosedError("This Connection is closed")
- def _autobegin(self):
- if self._allow_autobegin:
+ pool_proxied_connection = self._dbapi_connection
+ if pool_proxied_connection is None:
+ raise exc.InvalidRequestError(
+ "Can't detach an invalidated Connection"
+ )
+ pool_proxied_connection.detach()
+
+ def _autobegin(self) -> None:
+ if self._allow_autobegin and not self.__in_begin:
self.begin()
- def begin(self):
+ def begin(self) -> RootTransaction:
"""Begin a transaction prior to autobegin occurring.
E.g.::
@@ -671,14 +737,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
:class:`_engine.Engine`
"""
- if self.__in_begin:
- # for dialects that emit SQL within the process of
- # dialect.do_begin() or dialect.do_begin_twophase(), this
- # flag prevents "autobegin" from being emitted within that
- # process, while allowing self._transaction to remain at None
- # until it's complete.
- return
- elif self._transaction is None:
+ if self._transaction is None:
self._transaction = RootTransaction(self)
return self._transaction
else:
@@ -689,7 +748,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"is called first."
)
- def begin_nested(self):
+ def begin_nested(self) -> NestedTransaction:
"""Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
handle that controls the scope of the SAVEPOINT.
@@ -765,7 +824,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return NestedTransaction(self)
- def begin_twophase(self, xid=None):
+ def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
"""Begin a two-phase or XA transaction and return a transaction
handle.
@@ -794,7 +853,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
xid = self.engine.dialect.create_xid()
return TwoPhaseTransaction(self, xid)
- def commit(self):
+ def commit(self) -> None:
"""Commit the transaction that is currently in progress.
This method commits the current transaction if one has been started.
@@ -819,7 +878,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if self._transaction:
self._transaction.commit()
- def rollback(self):
+ def rollback(self) -> None:
"""Roll back the transaction that is currently in progress.
This method rolls back the current transaction if one has been started.
@@ -845,33 +904,33 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if self._transaction:
self._transaction.rollback()
- def recover_twophase(self):
+ def recover_twophase(self) -> List[Any]:
return self.engine.dialect.do_recover_twophase(self)
- def rollback_prepared(self, xid, recover=False):
+ def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
- def commit_prepared(self, xid, recover=False):
+ def commit_prepared(self, xid: Any, recover: bool = False) -> None:
self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
- def in_transaction(self):
+ def in_transaction(self) -> bool:
"""Return True if a transaction is in progress."""
return self._transaction is not None and self._transaction.is_active
- def in_nested_transaction(self):
+ def in_nested_transaction(self) -> bool:
"""Return True if a transaction is in progress."""
return (
self._nested_transaction is not None
and self._nested_transaction.is_active
)
- def _is_autocommit(self):
- return (
+ def _is_autocommit_isolation(self) -> bool:
+ return bool(
self._execution_options.get("isolation_level", None)
== "AUTOCOMMIT"
)
- def get_transaction(self):
+ def get_transaction(self) -> Optional[RootTransaction]:
"""Return the current root transaction in progress, if any.
.. versionadded:: 1.4
@@ -880,7 +939,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return self._transaction
- def get_nested_transaction(self):
+ def get_nested_transaction(self) -> Optional[NestedTransaction]:
"""Return the current nested transaction in progress, if any.
.. versionadded:: 1.4
@@ -888,7 +947,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
return self._nested_transaction
- def _begin_impl(self, transaction):
+ def _begin_impl(self, transaction: RootTransaction) -> None:
if self._echo:
self._log_info("BEGIN (implicit)")
@@ -904,13 +963,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
finally:
self.__in_begin = False
- def _rollback_impl(self):
+ def _rollback_impl(self) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.rollback(self)
if self._still_open_and_dbapi_connection_is_valid:
if self._echo:
- if self._is_autocommit():
+ if self._is_autocommit_isolation():
self._log_info(
"ROLLBACK using DBAPI connection.rollback(), "
"DBAPI should ignore due to autocommit mode"
@@ -922,13 +981,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
- def _commit_impl(self):
+ def _commit_impl(self) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.commit(self)
if self._echo:
- if self._is_autocommit():
+ if self._is_autocommit_isolation():
self._log_info(
"COMMIT using DBAPI connection.commit(), "
"DBAPI should ignore due to autocommit mode"
@@ -940,58 +999,54 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
- def _savepoint_impl(self, name=None):
+ def _savepoint_impl(self, name: Optional[str] = None) -> str:
if self._has_events or self.engine._has_events:
self.dispatch.savepoint(self, name)
if name is None:
self.__savepoint_seq += 1
name = "sa_savepoint_%s" % self.__savepoint_seq
- if self._still_open_and_dbapi_connection_is_valid:
- self.engine.dialect.do_savepoint(self, name)
- return name
+ self.engine.dialect.do_savepoint(self, name)
+ return name
- def _rollback_to_savepoint_impl(self, name):
+ def _rollback_to_savepoint_impl(self, name: str) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.rollback_savepoint(self, name, None)
if self._still_open_and_dbapi_connection_is_valid:
self.engine.dialect.do_rollback_to_savepoint(self, name)
- def _release_savepoint_impl(self, name):
+ def _release_savepoint_impl(self, name: str) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.release_savepoint(self, name, None)
- if self._still_open_and_dbapi_connection_is_valid:
- self.engine.dialect.do_release_savepoint(self, name)
+ self.engine.dialect.do_release_savepoint(self, name)
- def _begin_twophase_impl(self, transaction):
+ def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
if self._echo:
self._log_info("BEGIN TWOPHASE (implicit)")
if self._has_events or self.engine._has_events:
self.dispatch.begin_twophase(self, transaction.xid)
- if self._still_open_and_dbapi_connection_is_valid:
- self.__in_begin = True
- try:
- self.engine.dialect.do_begin_twophase(self, transaction.xid)
- except BaseException as e:
- self._handle_dbapi_exception(e, None, None, None, None)
- finally:
- self.__in_begin = False
+ self.__in_begin = True
+ try:
+ self.engine.dialect.do_begin_twophase(self, transaction.xid)
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
+ finally:
+ self.__in_begin = False
- def _prepare_twophase_impl(self, xid):
+ def _prepare_twophase_impl(self, xid: Any) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.prepare_twophase(self, xid)
- if self._still_open_and_dbapi_connection_is_valid:
- assert isinstance(self._transaction, TwoPhaseTransaction)
- try:
- self.engine.dialect.do_prepare_twophase(self, xid)
- except BaseException as e:
- self._handle_dbapi_exception(e, None, None, None, None)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
+ try:
+ self.engine.dialect.do_prepare_twophase(self, xid)
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
- def _rollback_twophase_impl(self, xid, is_prepared):
+ def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.rollback_twophase(self, xid, is_prepared)
@@ -1004,18 +1059,17 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
- def _commit_twophase_impl(self, xid, is_prepared):
+ def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.commit_twophase(self, xid, is_prepared)
- if self._still_open_and_dbapi_connection_is_valid:
- assert isinstance(self._transaction, TwoPhaseTransaction)
- try:
- self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
- except BaseException as e:
- self._handle_dbapi_exception(e, None, None, None, None)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
+ try:
+ self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
- def close(self):
+ def close(self) -> None:
"""Close this :class:`_engine.Connection`.
This results in a release of the underlying database
@@ -1050,7 +1104,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
# as we just closed the transaction, close the connection
# pool connection without doing an additional reset
if skip_reset:
- conn._close_no_reset()
+ cast("_ConnectionFairy", conn)._close_no_reset()
else:
conn.close()
@@ -1061,7 +1115,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self._dbapi_connection = None
self.__can_reconnect = False
- def scalar(self, statement, parameters=None, execution_options=None):
+ def scalar(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Any:
r"""Executes a SQL statement construct and returns a scalar object.
This method is shorthand for invoking the
@@ -1074,7 +1133,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
return self.execute(statement, parameters, execution_options).scalar()
- def scalars(self, statement, parameters=None, execution_options=None):
+ def scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult:
"""Executes and returns a scalar result set, which yields scalar values
from the first column of each row.
@@ -1093,10 +1157,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
def execute(
self,
- statement,
- parameters: Optional[_ExecuteParams] = None,
- execution_options: Optional[_ExecuteOptions] = None,
- ):
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Result:
r"""Executes a SQL statement construct and returns a
:class:`_engine.Result`.
@@ -1140,7 +1204,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
execution_options or NO_OPTIONS,
)
- def _execute_function(self, func, distilled_parameters, execution_options):
+ def _execute_function(
+ self,
+ func: FunctionElement[Any],
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Result:
"""Execute a sql.FunctionElement object."""
return self._execute_clauseelement(
@@ -1148,14 +1217,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
)
def _execute_default(
- self, default, distilled_parameters, execution_options
- ):
+ self,
+ default: ColumnDefault,
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Any:
"""Execute a schema.ColumnDefault object."""
execution_options = self._execution_options.merge_with(
execution_options
)
+ event_multiparams: Optional[_CoreMultiExecuteParams]
+ event_params: Optional[_CoreAnyExecuteParams]
+
# note for event handlers, the "distilled parameters" which is always
# a list of dicts is broken out into separate "multiparams" and
# "params" collections, which allows the handler to distinguish
@@ -1169,6 +1244,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
) = self._invoke_before_exec_event(
default, distilled_parameters, execution_options
)
+ else:
+ event_multiparams = event_params = None
try:
conn = self._dbapi_connection
@@ -1198,13 +1275,21 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return ret
- def _execute_ddl(self, ddl, distilled_parameters, execution_options):
+ def _execute_ddl(
+ self,
+ ddl: DDLElement,
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Result:
"""Execute a schema.DDL object."""
execution_options = ddl._execution_options.merge_with(
self._execution_options, execution_options
)
+ event_multiparams: Optional[_CoreMultiExecuteParams]
+ event_params: Optional[_CoreSingleExecuteParams]
+
if self._has_events or self.engine._has_events:
(
ddl,
@@ -1214,6 +1299,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
) = self._invoke_before_exec_event(
ddl, distilled_parameters, execution_options
)
+ else:
+ event_multiparams = event_params = None
exec_opts = self._execution_options.merge_with(execution_options)
schema_translate_map = exec_opts.get("schema_translate_map", None)
@@ -1243,8 +1330,19 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return ret
def _invoke_before_exec_event(
- self, elem, distilled_params, execution_options
- ):
+ self,
+ elem: Any,
+ distilled_params: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Tuple[
+ Any,
+ _CoreMultiExecuteParams,
+ _CoreMultiExecuteParams,
+ _CoreSingleExecuteParams,
+ ]:
+
+ event_multiparams: _CoreMultiExecuteParams
+ event_params: _CoreSingleExecuteParams
if len(distilled_params) == 1:
event_multiparams, event_params = [], distilled_params[0]
@@ -1275,8 +1373,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return elem, distilled_params, event_multiparams, event_params
def _execute_clauseelement(
- self, elem, distilled_parameters, execution_options
- ):
+ self,
+ elem: Executable,
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Result:
"""Execute a sql.ClauseElement object."""
execution_options = elem._execution_options.merge_with(
@@ -1309,7 +1410,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"schema_translate_map", None
)
- compiled_cache = execution_options.get(
+ compiled_cache: _CompiledCacheType = execution_options.get(
"compiled_cache", self.engine._compiled_cache
)
@@ -1346,10 +1447,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
def _execute_compiled(
self,
- compiled,
- distilled_parameters,
- execution_options=_EMPTY_EXECUTION_OPTS,
- ):
+ compiled: Compiled,
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
+ ) -> Result:
"""Execute a sql.Compiled object.
TODO: why do we have this? likely deprecate or remove
@@ -1395,8 +1496,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return ret
def exec_driver_sql(
- self, statement, parameters=None, execution_options=None
- ):
+ self,
+ statement: str,
+ parameters: Optional[_DBAPIAnyExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptions] = None,
+ ) -> Result:
r"""Executes a SQL statement construct and returns a
:class:`_engine.CursorResult`.
@@ -1456,7 +1560,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
dialect,
dialect.execution_ctx_cls._init_statement,
statement,
- distilled_parameters,
+ None,
execution_options,
statement,
distilled_parameters,
@@ -1466,14 +1570,14 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
def _execute_context(
self,
- dialect,
- constructor,
- statement,
- parameters,
- execution_options,
- *args,
- **kw,
- ):
+ dialect: Dialect,
+ constructor: Callable[..., ExecutionContext],
+ statement: Union[str, Compiled],
+ parameters: Optional[_AnyMultiExecuteParams],
+ execution_options: _ExecuteOptions,
+ *args: Any,
+ **kw: Any,
+ ) -> Result:
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.CursorResult`."""
@@ -1491,7 +1595,6 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self._handle_dbapi_exception(
e, str(statement), parameters, None, None
)
- return # not reached
if (
self._transaction
@@ -1514,29 +1617,33 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if dialect.bind_typing is BindTyping.SETINPUTSIZES:
context._set_input_sizes()
- cursor, statement, parameters = (
+ cursor, str_statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
+ effective_parameters: Optional[_AnyExecuteParams]
+
if not context.executemany:
- parameters = parameters[0]
+ effective_parameters = parameters[0]
+ else:
+ effective_parameters = parameters
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
- statement, parameters = fn(
+ str_statement, effective_parameters = fn(
self,
cursor,
- statement,
- parameters,
+ str_statement,
+ effective_parameters,
context,
context.executemany,
)
if self._echo:
- self._log_info(statement)
+ self._log_info(str_statement)
stats = context._get_cache_stats()
@@ -1545,7 +1652,9 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"[%s] %r",
stats,
sql_util._repr_params(
- parameters, batches=10, ismulti=context.executemany
+ effective_parameters,
+ batches=10,
+ ismulti=context.executemany,
),
)
else:
@@ -1554,45 +1663,61 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
% (stats,)
)
- evt_handled = False
+ evt_handled: bool = False
try:
if context.executemany:
+ effective_parameters = cast(
+ "_CoreMultiExecuteParams", effective_parameters
+ )
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
- if fn(cursor, statement, parameters, context):
+ if fn(
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
+ ):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
- cursor, statement, parameters, context
+ cursor, str_statement, effective_parameters, context
)
- elif not parameters and context.no_parameters:
+ elif not effective_parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
- if fn(cursor, statement, context):
+ if fn(cursor, str_statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
- cursor, statement, context
+ cursor, str_statement, context
)
else:
+ effective_parameters = cast(
+ "_CoreSingleExecuteParams", effective_parameters
+ )
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
- if fn(cursor, statement, parameters, context):
+ if fn(
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
+ ):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute(
- cursor, statement, parameters, context
+ cursor, str_statement, effective_parameters, context
)
if self._has_events or self.engine._has_events:
self.dispatch.after_cursor_execute(
self,
cursor,
- statement,
- parameters,
+ str_statement,
+ effective_parameters,
context,
context.executemany,
)
@@ -1603,12 +1728,18 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
except BaseException as e:
self._handle_dbapi_exception(
- e, statement, parameters, cursor, context
+ e, str_statement, effective_parameters, cursor, context
)
return result
- def _cursor_execute(self, cursor, statement, parameters, context=None):
+ def _cursor_execute(
+ self,
+ cursor: DBAPICursor,
+ statement: str,
+ parameters: _DBAPISingleExecuteParams,
+ context: Optional[ExecutionContext] = None,
+ ) -> None:
"""Execute a statement + params on the given cursor.
Adds appropriate logging and exception handling.
@@ -1648,7 +1779,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self, cursor, statement, parameters, context, False
)
- def _safe_close_cursor(self, cursor):
+ def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
"""Close the given cursor, catching exceptions
and turning into log warnings.
@@ -1665,8 +1796,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
_is_disconnect = False
def _handle_dbapi_exception(
- self, e, statement, parameters, cursor, context
- ):
+ self,
+ e: BaseException,
+ statement: Optional[str],
+ parameters: Optional[_AnyExecuteParams],
+ cursor: Optional[DBAPICursor],
+ context: Optional[ExecutionContext],
+ ) -> NoReturn:
exc_info = sys.exc_info()
is_exit_exception = util.is_exit_exception(e)
@@ -1708,7 +1844,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
sqlalchemy_exception = exc.DBAPIError.instance(
statement,
parameters,
- e,
+ cast(Exception, e),
self.dialect.dbapi.Error,
hide_parameters=self.engine.hide_parameters,
connection_invalidated=self._is_disconnect,
@@ -1784,8 +1920,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if newraise:
raise newraise.with_traceback(exc_info[2]) from e
elif should_wrap:
+ assert sqlalchemy_exception is not None
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
else:
+ assert exc_info[1] is not None
raise exc_info[1].with_traceback(exc_info[2])
finally:
del self._reentrant_error
@@ -1793,15 +1931,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
del self._is_disconnect
if not self.invalidated:
dbapi_conn_wrapper = self._dbapi_connection
+ assert dbapi_conn_wrapper is not None
if invalidate_pool_on_disconnect:
self.engine.pool._invalidate(dbapi_conn_wrapper, e)
self.invalidate(e)
@classmethod
- def _handle_dbapi_exception_noconnection(cls, e, dialect, engine):
+ def _handle_dbapi_exception_noconnection(
+ cls, e: BaseException, dialect: Dialect, engine: Engine
+ ) -> NoReturn:
exc_info = sys.exc_info()
- is_disconnect = dialect.is_disconnect(e, None, None)
+ is_disconnect = isinstance(
+ e, dialect.dbapi.Error
+ ) and dialect.is_disconnect(e, None, None)
should_wrap = isinstance(e, dialect.dbapi.Error)
@@ -1809,7 +1952,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
sqlalchemy_exception = exc.DBAPIError.instance(
None,
None,
- e,
+ cast(Exception, e),
dialect.dbapi.Error,
hide_parameters=engine.hide_parameters,
connection_invalidated=is_disconnect,
@@ -1852,11 +1995,18 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if newraise:
raise newraise.with_traceback(exc_info[2]) from e
elif should_wrap:
+ assert sqlalchemy_exception is not None
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
else:
+ assert exc_info[1] is not None
raise exc_info[1].with_traceback(exc_info[2])
- def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
+ def _run_ddl_visitor(
+ self,
+ visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]],
+ element: DDLElement,
+ **kwargs: Any,
+ ) -> None:
"""run a DDL visitor.
This method is only here so that the MockConnection can change the
@@ -1871,16 +2021,16 @@ class ExceptionContextImpl(ExceptionContext):
def __init__(
self,
- exception,
- sqlalchemy_exception,
- engine,
- connection,
- cursor,
- statement,
- parameters,
- context,
- is_disconnect,
- invalidate_pool_on_disconnect,
+ exception: BaseException,
+ sqlalchemy_exception: Optional[exc.StatementError],
+ engine: Optional[Engine],
+ connection: Optional[Connection],
+ cursor: Optional[DBAPICursor],
+ statement: Optional[str],
+ parameters: Optional[_DBAPIAnyExecuteParams],
+ context: Optional[ExecutionContext],
+ is_disconnect: bool,
+ invalidate_pool_on_disconnect: bool,
):
self.engine = engine
self.connection = connection
@@ -1932,33 +2082,35 @@ class Transaction(TransactionalContext):
__slots__ = ()
- _is_root = False
+ _is_root: bool = False
+ is_active: bool
+ connection: Connection
- def __init__(self, connection):
+ def __init__(self, connection: Connection):
raise NotImplementedError()
@property
- def _deactivated_from_connection(self):
+ def _deactivated_from_connection(self) -> bool:
"""True if this transaction is totally deactivated from the connection
and therefore can no longer affect its state.
"""
raise NotImplementedError()
- def _do_close(self):
+ def _do_close(self) -> None:
raise NotImplementedError()
- def _do_rollback(self):
+ def _do_rollback(self) -> None:
raise NotImplementedError()
- def _do_commit(self):
+ def _do_commit(self) -> None:
raise NotImplementedError()
@property
- def is_valid(self):
+ def is_valid(self) -> bool:
return self.is_active and not self.connection.invalidated
- def close(self):
+ def close(self) -> None:
"""Close this :class:`.Transaction`.
If this transaction is the base transaction in a begin/commit
@@ -1974,7 +2126,7 @@ class Transaction(TransactionalContext):
finally:
assert not self.is_active
- def rollback(self):
+ def rollback(self) -> None:
"""Roll back this :class:`.Transaction`.
The implementation of this may vary based on the type of transaction in
@@ -1996,7 +2148,7 @@ class Transaction(TransactionalContext):
finally:
assert not self.is_active
- def commit(self):
+ def commit(self) -> None:
"""Commit this :class:`.Transaction`.
The implementation of this may vary based on the type of transaction in
@@ -2017,16 +2169,16 @@ class Transaction(TransactionalContext):
finally:
assert not self.is_active
- def _get_subject(self):
+ def _get_subject(self) -> Connection:
return self.connection
- def _transaction_is_active(self):
+ def _transaction_is_active(self) -> bool:
return self.is_active
- def _transaction_is_closed(self):
+ def _transaction_is_closed(self) -> bool:
return not self._deactivated_from_connection
- def _rollback_can_be_called(self):
+ def _rollback_can_be_called(self) -> bool:
# for RootTransaction / NestedTransaction, it's safe to call
# rollback() even if the transaction is deactive and no warnings
# will be emitted. tested in
@@ -2060,7 +2212,7 @@ class RootTransaction(Transaction):
__slots__ = ("connection", "is_active")
- def __init__(self, connection):
+ def __init__(self, connection: Connection):
assert connection._transaction is None
if connection._trans_context_manager:
TransactionalContext._trans_ctx_check(connection)
@@ -2070,7 +2222,7 @@ class RootTransaction(Transaction):
self.is_active = True
- def _deactivate_from_connection(self):
+ def _deactivate_from_connection(self) -> None:
if self.is_active:
assert self.connection._transaction is self
self.is_active = False
@@ -2079,19 +2231,19 @@ class RootTransaction(Transaction):
util.warn("transaction already deassociated from connection")
@property
- def _deactivated_from_connection(self):
+ def _deactivated_from_connection(self) -> bool:
return self.connection._transaction is not self
- def _connection_begin_impl(self):
+ def _connection_begin_impl(self) -> None:
self.connection._begin_impl(self)
- def _connection_rollback_impl(self):
+ def _connection_rollback_impl(self) -> None:
self.connection._rollback_impl()
- def _connection_commit_impl(self):
+ def _connection_commit_impl(self) -> None:
self.connection._commit_impl()
- def _close_impl(self, try_deactivate=False):
+ def _close_impl(self, try_deactivate: bool = False) -> None:
try:
if self.is_active:
self._connection_rollback_impl()
@@ -2107,13 +2259,13 @@ class RootTransaction(Transaction):
assert not self.is_active
assert self.connection._transaction is not self
- def _do_close(self):
+ def _do_close(self) -> None:
self._close_impl()
- def _do_rollback(self):
+ def _do_rollback(self) -> None:
self._close_impl(try_deactivate=True)
- def _do_commit(self):
+ def _do_commit(self) -> None:
if self.is_active:
assert self.connection._transaction is self
@@ -2176,7 +2328,9 @@ class NestedTransaction(Transaction):
__slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
- def __init__(self, connection):
+ _savepoint: str
+
+ def __init__(self, connection: Connection):
assert connection._transaction is not None
if connection._trans_context_manager:
TransactionalContext._trans_ctx_check(connection)
@@ -2186,7 +2340,7 @@ class NestedTransaction(Transaction):
self._previous_nested = connection._nested_transaction
connection._nested_transaction = self
- def _deactivate_from_connection(self, warn=True):
+ def _deactivate_from_connection(self, warn: bool = True) -> None:
if self.connection._nested_transaction is self:
self.connection._nested_transaction = self._previous_nested
elif warn:
@@ -2195,10 +2349,10 @@ class NestedTransaction(Transaction):
)
@property
- def _deactivated_from_connection(self):
+ def _deactivated_from_connection(self) -> bool:
return self.connection._nested_transaction is not self
- def _cancel(self):
+ def _cancel(self) -> None:
# called by RootTransaction when the outer transaction is
# committed, rolled back, or closed to cancel all savepoints
# without any action being taken
@@ -2207,9 +2361,15 @@ class NestedTransaction(Transaction):
if self._previous_nested:
self._previous_nested._cancel()
- def _close_impl(self, deactivate_from_connection, warn_already_deactive):
+ def _close_impl(
+ self, deactivate_from_connection: bool, warn_already_deactive: bool
+ ) -> None:
try:
- if self.is_active and self.connection._transaction.is_active:
+ if (
+ self.is_active
+ and self.connection._transaction
+ and self.connection._transaction.is_active
+ ):
self.connection._rollback_to_savepoint_impl(self._savepoint)
finally:
self.is_active = False
@@ -2221,13 +2381,13 @@ class NestedTransaction(Transaction):
if deactivate_from_connection:
assert self.connection._nested_transaction is not self
- def _do_close(self):
+ def _do_close(self) -> None:
self._close_impl(True, False)
- def _do_rollback(self):
+ def _do_rollback(self) -> None:
self._close_impl(True, True)
- def _do_commit(self):
+ def _do_commit(self) -> None:
if self.is_active:
try:
self.connection._release_savepoint_impl(self._savepoint)
@@ -2261,12 +2421,14 @@ class TwoPhaseTransaction(RootTransaction):
__slots__ = ("xid", "_is_prepared")
- def __init__(self, connection, xid):
+ xid: Any
+
+ def __init__(self, connection: Connection, xid: Any):
self._is_prepared = False
self.xid = xid
super(TwoPhaseTransaction, self).__init__(connection)
- def prepare(self):
+ def prepare(self) -> None:
"""Prepare this :class:`.TwoPhaseTransaction`.
After a PREPARE, the transaction can be committed.
@@ -2277,13 +2439,13 @@ class TwoPhaseTransaction(RootTransaction):
self.connection._prepare_twophase_impl(self.xid)
self._is_prepared = True
- def _connection_begin_impl(self):
+ def _connection_begin_impl(self) -> None:
self.connection._begin_twophase_impl(self)
- def _connection_rollback_impl(self):
+ def _connection_rollback_impl(self) -> None:
self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
- def _connection_commit_impl(self):
+ def _connection_commit_impl(self) -> None:
self.connection._commit_twophase_impl(self.xid, self._is_prepared)
@@ -2310,17 +2472,23 @@ class Engine(
"""
- _execution_options = _EMPTY_EXECUTION_OPTS
- _has_events = False
- _connection_cls = Connection
- _sqla_logger_namespace = "sqlalchemy.engine.Engine"
- _is_future = False
+ dispatch: dispatcher[ConnectionEventsTarget]
- _schema_translate_map = None
+ _compiled_cache: Optional[_CompiledCacheType]
+
+ _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
+ _has_events: bool = False
+ _connection_cls: Type[Connection] = Connection
+ _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
+ _is_future: bool = False
+
+ _schema_translate_map: Optional[_SchemaTranslateMapType] = None
+ _option_cls: Type[OptionEngine]
dialect: Dialect
pool: Pool
url: URL
+ hide_parameters: bool
def __init__(
self,
@@ -2328,7 +2496,7 @@ class Engine(
dialect: Dialect,
url: URL,
logging_name: Optional[str] = None,
- echo: Union[None, str, bool] = None,
+ echo: Optional[_EchoFlagType] = None,
query_cache_size: int = 500,
execution_options: Optional[Mapping[str, Any]] = None,
hide_parameters: bool = False,
@@ -2350,7 +2518,7 @@ class Engine(
if execution_options:
self.update_execution_options(**execution_options)
- def _lru_size_alert(self, cache):
+ def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
if self._should_log_info:
self.logger.info(
"Compiled cache size pruning from %d items to %d. "
@@ -2360,10 +2528,10 @@ class Engine(
)
@property
- def engine(self):
+ def engine(self) -> Engine:
return self
- def clear_compiled_cache(self):
+ def clear_compiled_cache(self) -> None:
"""Clear the compiled cache associated with the dialect.
This applies **only** to the built-in cache that is established
@@ -2377,7 +2545,7 @@ class Engine(
if self._compiled_cache:
self._compiled_cache.clear()
- def update_execution_options(self, **opt):
+ def update_execution_options(self, **opt: Any) -> None:
r"""Update the default execution_options dictionary
of this :class:`_engine.Engine`.
@@ -2394,11 +2562,11 @@ class Engine(
:meth:`_engine.Engine.execution_options`
"""
- self._execution_options = self._execution_options.union(opt)
self.dispatch.set_engine_execution_options(self, opt)
+ self._execution_options = self._execution_options.union(opt)
self.dialect.set_engine_execution_options(self, opt)
- def execution_options(self, **opt):
+ def execution_options(self, **opt: Any) -> OptionEngine:
"""Return a new :class:`_engine.Engine` that will provide
:class:`_engine.Connection` objects with the given execution options.
@@ -2478,7 +2646,7 @@ class Engine(
""" # noqa E501
return self._option_cls(self, opt)
- def get_execution_options(self):
+ def get_execution_options(self) -> _ExecuteOptions:
"""Get the non-SQL options which will take effect during execution.
.. versionadded: 1.3
@@ -2490,14 +2658,14 @@ class Engine(
return self._execution_options
@property
- def name(self):
+ def name(self) -> str:
"""String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
in use by this :class:`Engine`."""
return self.dialect.name
@property
- def driver(self):
+ def driver(self) -> str:
"""Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
in use by this :class:`Engine`."""
@@ -2505,10 +2673,10 @@ class Engine(
echo = log.echo_property()
- def __repr__(self):
+ def __repr__(self) -> str:
return "Engine(%r)" % (self.url,)
- def dispose(self):
+ def dispose(self) -> None:
"""Dispose of the connection pool used by this
:class:`_engine.Engine`.
@@ -2538,7 +2706,9 @@ class Engine(
self.dispatch.engine_disposed(self)
@contextlib.contextmanager
- def _optional_conn_ctx_manager(self, connection=None):
+ def _optional_conn_ctx_manager(
+ self, connection: Optional[Connection] = None
+ ) -> Iterator[Connection]:
if connection is None:
with self.connect() as conn:
yield conn
@@ -2546,7 +2716,7 @@ class Engine(
yield connection
@contextlib.contextmanager
- def begin(self):
+ def begin(self) -> Iterator[Connection]:
"""Return a context manager delivering a :class:`_engine.Connection`
with a :class:`.Transaction` established.
@@ -2576,11 +2746,16 @@ class Engine(
with conn.begin():
yield conn
- def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
+ def _run_ddl_visitor(
+ self,
+ visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]],
+ element: DDLElement,
+ **kwargs: Any,
+ ) -> None:
with self.begin() as conn:
conn._run_ddl_visitor(visitorcallable, element, **kwargs)
- def connect(self):
+ def connect(self) -> Connection:
"""Return a new :class:`_engine.Connection` object.
The :class:`_engine.Connection` acts as a Python context manager, so
@@ -2605,7 +2780,7 @@ class Engine(
return self._connection_cls(self)
- def raw_connection(self):
+ def raw_connection(self) -> PoolProxiedConnection:
"""Return a "raw" DBAPI connection from the connection pool.
The returned object is a proxied version of the DBAPI
@@ -2630,10 +2805,20 @@ class Engine(
return self.pool.connect()
-class OptionEngineMixin:
+class OptionEngineMixin(log.Identified):
_sa_propagate_class_events = False
- def __init__(self, proxied, execution_options):
+ dispatch: dispatcher[ConnectionEventsTarget]
+ _compiled_cache: Optional[_CompiledCacheType]
+ dialect: Dialect
+ pool: Pool
+ url: URL
+ hide_parameters: bool
+ echo: log.echo_property
+
+ def __init__(
+ self, proxied: Engine, execution_options: _ExecuteOptionsParameter
+ ):
self._proxied = proxied
self.url = proxied.url
self.dialect = proxied.dialect
@@ -2660,27 +2845,34 @@ class OptionEngineMixin:
self._execution_options = proxied._execution_options
self.update_execution_options(**execution_options)
- def _get_pool(self):
- return self._proxied.pool
+ def update_execution_options(self, **opt: Any) -> None:
+ raise NotImplementedError()
- def _set_pool(self, pool):
- self._proxied.pool = pool
+ if not typing.TYPE_CHECKING:
+ # https://github.com/python/typing/discussions/1095
- pool = property(_get_pool, _set_pool)
+ @property
+ def pool(self) -> Pool:
+ return self._proxied.pool
- def _get_has_events(self):
- return self._proxied._has_events or self.__dict__.get(
- "_has_events", False
- )
+ @pool.setter
+ def pool(self, pool: Pool) -> None:
+ self._proxied.pool = pool
- def _set_has_events(self, value):
- self.__dict__["_has_events"] = value
+ @property
+ def _has_events(self) -> bool:
+ return self._proxied._has_events or self.__dict__.get(
+ "_has_events", False
+ )
- _has_events = property(_get_has_events, _set_has_events)
+ @_has_events.setter
+ def _has_events(self, value: bool) -> None:
+ self.__dict__["_has_events"] = value
class OptionEngine(OptionEngineMixin, Engine):
- pass
+ def update_execution_options(self, **opt: Any) -> None:
+ Engine.update_execution_options(self, **opt)
Engine._option_cls = OptionEngine