diff options
Diffstat (limited to 'lib/sqlalchemy/engine')
| -rw-r--r-- | lib/sqlalchemy/engine/__init__.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 131 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/interfaces.py | 417 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/reflection.py | 1475 |
4 files changed, 1737 insertions, 288 deletions
diff --git a/lib/sqlalchemy/engine/__init__.py b/lib/sqlalchemy/engine/__init__.py index afba17075..77c2fea40 100644 --- a/lib/sqlalchemy/engine/__init__.py +++ b/lib/sqlalchemy/engine/__init__.py @@ -38,6 +38,8 @@ from .interfaces import ExecutionContext as ExecutionContext from .interfaces import TypeCompiler as TypeCompiler from .mock import create_mock_engine as create_mock_engine from .reflection import Inspector as Inspector +from .reflection import ObjectKind as ObjectKind +from .reflection import ObjectScope as ObjectScope from .result import ChunkedIteratorResult as ChunkedIteratorResult from .result import FrozenResult as FrozenResult from .result import IteratorResult as IteratorResult diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index df35e7128..40af06252 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -45,6 +45,8 @@ from .interfaces import CacheStats from .interfaces import DBAPICursor from .interfaces import Dialect from .interfaces import ExecutionContext +from .reflection import ObjectKind +from .reflection import ObjectScope from .. import event from .. import exc from .. import pool @@ -508,15 +510,22 @@ class DefaultDialect(Dialect): """ return type_api.adapt_type(typeobj, self.colspecs) - def has_index(self, connection, table_name, index_name, schema=None): - if not self.has_table(connection, table_name, schema=schema): + def has_index(self, connection, table_name, index_name, schema=None, **kw): + if not self.has_table(connection, table_name, schema=schema, **kw): return False - for idx in self.get_indexes(connection, table_name, schema=schema): + for idx in self.get_indexes( + connection, table_name, schema=schema, **kw + ): if idx["name"] == index_name: return True else: return False + def has_schema( + self, connection: Connection, schema_name: str, **kw: Any + ) -> bool: + return schema_name in self.get_schema_names(connection, **kw) + def validate_identifier(self, ident): if len(ident) > self.max_identifier_length: raise exc.IdentifierError( @@ -769,6 +778,122 @@ class DefaultDialect(Dialect): def get_driver_connection(self, connection): return connection + def _overrides_default(self, method): + return ( + getattr(type(self), method).__code__ + is not getattr(DefaultDialect, method).__code__ + ) + + def _default_multi_reflect( + self, + single_tbl_method, + connection, + kind, + schema, + filter_names, + scope, + **kw, + ): + + names_fns = [] + temp_names_fns = [] + if ObjectKind.TABLE in kind: + names_fns.append(self.get_table_names) + temp_names_fns.append(self.get_temp_table_names) + if ObjectKind.VIEW in kind: + names_fns.append(self.get_view_names) + temp_names_fns.append(self.get_temp_view_names) + if ObjectKind.MATERIALIZED_VIEW in kind: + names_fns.append(self.get_materialized_view_names) + # no temp materialized view at the moment + # temp_names_fns.append(self.get_temp_materialized_view_names) + + unreflectable = kw.pop("unreflectable", {}) + + if ( + filter_names + and scope is ObjectScope.ANY + and kind is ObjectKind.ANY + ): + # if names are given and no qualification on type of table + # (i.e. the Table(..., autoload) case), take the names as given, + # don't run names queries. If a table does not exit + # NoSuchTableError is raised and it's skipped + + # this also suits the case for mssql where we can reflect + # individual temp tables but there's no temp_names_fn + names = filter_names + else: + names = [] + name_kw = {"schema": schema, **kw} + fns = [] + if ObjectScope.DEFAULT in scope: + fns.extend(names_fns) + if ObjectScope.TEMPORARY in scope: + fns.extend(temp_names_fns) + + for fn in fns: + try: + names.extend(fn(connection, **name_kw)) + except NotImplementedError: + pass + + if filter_names: + filter_names = set(filter_names) + + # iterate over all the tables/views and call the single table method + for table in names: + if not filter_names or table in filter_names: + key = (schema, table) + try: + yield ( + key, + single_tbl_method( + connection, table, schema=schema, **kw + ), + ) + except exc.UnreflectableTableError as err: + if key not in unreflectable: + unreflectable[key] = err + except exc.NoSuchTableError: + pass + + def get_multi_table_options(self, connection, **kw): + return self._default_multi_reflect( + self.get_table_options, connection, **kw + ) + + def get_multi_columns(self, connection, **kw): + return self._default_multi_reflect(self.get_columns, connection, **kw) + + def get_multi_pk_constraint(self, connection, **kw): + return self._default_multi_reflect( + self.get_pk_constraint, connection, **kw + ) + + def get_multi_foreign_keys(self, connection, **kw): + return self._default_multi_reflect( + self.get_foreign_keys, connection, **kw + ) + + def get_multi_indexes(self, connection, **kw): + return self._default_multi_reflect(self.get_indexes, connection, **kw) + + def get_multi_unique_constraints(self, connection, **kw): + return self._default_multi_reflect( + self.get_unique_constraints, connection, **kw + ) + + def get_multi_check_constraints(self, connection, **kw): + return self._default_multi_reflect( + self.get_check_constraints, connection, **kw + ) + + def get_multi_table_comment(self, connection, **kw): + return self._default_multi_reflect( + self.get_table_comment, connection, **kw + ) + class StrCompileDialect(DefaultDialect): diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index b8e85b646..28ed03f99 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -15,7 +15,9 @@ from typing import Any from typing import Awaitable from typing import Callable from typing import ClassVar +from typing import Collection from typing import Dict +from typing import Iterable from typing import List from typing import Mapping from typing import MutableMapping @@ -324,7 +326,7 @@ class ReflectedColumn(TypedDict): nullable: bool """column nullability""" - default: str + default: Optional[str] """column default expression as a SQL string""" autoincrement: NotRequired[bool] @@ -343,11 +345,11 @@ class ReflectedColumn(TypedDict): comment: NotRequired[Optional[str]] """comment for the column, if present""" - computed: NotRequired[Optional[ReflectedComputed]] + computed: NotRequired[ReflectedComputed] """indicates this column is computed at insert (possibly update) time by the database.""" - identity: NotRequired[Optional[ReflectedIdentity]] + identity: NotRequired[ReflectedIdentity] """indicates this column is an IDENTITY column""" dialect_options: NotRequired[Dict[str, Any]] @@ -390,6 +392,9 @@ class ReflectedUniqueConstraint(TypedDict): column_names: List[str] """column names which comprise the constraint""" + duplicates_index: NotRequired[Optional[str]] + "Indicates if this unique constraint duplicates an index with this name" + dialect_options: NotRequired[Dict[str, Any]] """Additional dialect-specific options detected for this reflected object""" @@ -439,7 +444,7 @@ class ReflectedForeignKeyConstraint(TypedDict): referred_columns: List[str] """referenced column names""" - dialect_options: NotRequired[Dict[str, Any]] + options: NotRequired[Dict[str, Any]] """Additional dialect-specific options detected for this reflected object""" @@ -462,9 +467,8 @@ class ReflectedIndex(TypedDict): unique: bool """whether or not the index has a unique flag""" - duplicates_constraint: NotRequired[bool] - """boolean indicating this index mirrors a unique constraint of the same - name""" + duplicates_constraint: NotRequired[Optional[str]] + "Indicates if this index mirrors a unique constraint with this name" include_columns: NotRequired[List[str]] """columns to include in the INCLUDE clause for supporting databases. @@ -472,7 +476,7 @@ class ReflectedIndex(TypedDict): .. deprecated:: 2.0 Legacy value, will be replaced with - ``d["dialect_options"][<dialect name>]["include"]`` + ``d["dialect_options"]["<dialect name>_include"]`` """ @@ -494,7 +498,7 @@ class ReflectedTableComment(TypedDict): """ - text: str + text: Optional[str] """text of the comment""" @@ -547,6 +551,7 @@ class BindTyping(Enum): VersionInfoType = Tuple[Union[int, str], ...] +TableKey = Tuple[Optional[str], str] class Dialect(EventTarget): @@ -1040,7 +1045,7 @@ class Dialect(EventTarget): raise NotImplementedError() - def initialize(self, connection: "Connection") -> None: + def initialize(self, connection: Connection) -> None: """Called during strategized creation of the dialect with a connection. @@ -1060,9 +1065,14 @@ class Dialect(EventTarget): pass + if TYPE_CHECKING: + + def _overrides_default(self, method_name: str) -> bool: + ... + def get_columns( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, @@ -1074,13 +1084,40 @@ class Dialect(EventTarget): information as a list of dictionaries corresponding to the :class:`.ReflectedColumn` dictionary. + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_columns`. + """ + + def get_multi_columns( + self, + connection: Connection, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + **kw: Any, + ) -> Iterable[Tuple[TableKey, List[ReflectedColumn]]]: + """Return information about columns in all tables in the + given ``schema``. + + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_multi_columns`. + + .. note:: The :class:`_engine.DefaultDialect` provides a default + implementation that will call the single table method for + each object returned by :meth:`Dialect.get_table_names`, + :meth:`Dialect.get_view_names` or + :meth:`Dialect.get_materialized_view_names` depending on the + provided ``kind``. Dialects that want to support a faster + implementation should implement this method. + + .. versionadded:: 2.0 + """ raise NotImplementedError() def get_pk_constraint( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, @@ -1093,13 +1130,41 @@ class Dialect(EventTarget): key information as a dictionary corresponding to the :class:`.ReflectedPrimaryKeyConstraint` dictionary. + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_pk_constraint`. + + """ + raise NotImplementedError() + + def get_multi_pk_constraint( + self, + connection: Connection, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + **kw: Any, + ) -> Iterable[Tuple[TableKey, ReflectedPrimaryKeyConstraint]]: + """Return information about primary key constraints in + all tables in the given ``schema``. + + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_multi_pk_constraint`. + + .. note:: The :class:`_engine.DefaultDialect` provides a default + implementation that will call the single table method for + each object returned by :meth:`Dialect.get_table_names`, + :meth:`Dialect.get_view_names` or + :meth:`Dialect.get_materialized_view_names` depending on the + provided ``kind``. Dialects that want to support a faster + implementation should implement this method. + + .. versionadded:: 2.0 """ raise NotImplementedError() def get_foreign_keys( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, @@ -1111,42 +1176,104 @@ class Dialect(EventTarget): key information as a list of dicts corresponding to the :class:`.ReflectedForeignKeyConstraint` dictionary. + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_foreign_keys`. + """ + + raise NotImplementedError() + + def get_multi_foreign_keys( + self, + connection: Connection, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + **kw: Any, + ) -> Iterable[Tuple[TableKey, List[ReflectedForeignKeyConstraint]]]: + """Return information about foreign_keys in all tables + in the given ``schema``. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_multi_foreign_keys`. + + .. note:: The :class:`_engine.DefaultDialect` provides a default + implementation that will call the single table method for + each object returned by :meth:`Dialect.get_table_names`, + :meth:`Dialect.get_view_names` or + :meth:`Dialect.get_materialized_view_names` depending on the + provided ``kind``. Dialects that want to support a faster + implementation should implement this method. + + .. versionadded:: 2.0 + """ raise NotImplementedError() def get_table_names( - self, connection: "Connection", schema: Optional[str] = None, **kw: Any + self, connection: Connection, schema: Optional[str] = None, **kw: Any ) -> List[str]: - """Return a list of table names for ``schema``.""" + """Return a list of table names for ``schema``. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_table_names`. + + """ raise NotImplementedError() def get_temp_table_names( - self, connection: "Connection", schema: Optional[str] = None, **kw: Any + self, connection: Connection, schema: Optional[str] = None, **kw: Any ) -> List[str]: """Return a list of temporary table names on the given connection, if supported by the underlying backend. + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_temp_table_names`. + """ raise NotImplementedError() def get_view_names( - self, connection: "Connection", schema: Optional[str] = None, **kw: Any + self, connection: Connection, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + """Return a list of all non-materialized view names available in the + database. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_view_names`. + + :param schema: schema name to query, if not the default schema. + + """ + + raise NotImplementedError() + + def get_materialized_view_names( + self, connection: Connection, schema: Optional[str] = None, **kw: Any ) -> List[str]: - """Return a list of all view names available in the database. + """Return a list of all materialized view names available in the + database. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_materialized_view_names`. :param schema: schema name to query, if not the default schema. + + .. versionadded:: 2.0 + """ raise NotImplementedError() def get_sequence_names( - self, connection: "Connection", schema: Optional[str] = None, **kw: Any + self, connection: Connection, schema: Optional[str] = None, **kw: Any ) -> List[str]: """Return a list of all sequence names available in the database. + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_sequence_names`. + :param schema: schema name to query, if not the default schema. .. versionadded:: 1.4 @@ -1155,26 +1282,40 @@ class Dialect(EventTarget): raise NotImplementedError() def get_temp_view_names( - self, connection: "Connection", schema: Optional[str] = None, **kw: Any + self, connection: Connection, schema: Optional[str] = None, **kw: Any ) -> List[str]: """Return a list of temporary view names on the given connection, if supported by the underlying backend. + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_temp_view_names`. + """ raise NotImplementedError() + def get_schema_names(self, connection: Connection, **kw: Any) -> List[str]: + """Return a list of all schema names available in the database. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_schema_names`. + """ + raise NotImplementedError() + def get_view_definition( self, - connection: "Connection", + connection: Connection, view_name: str, schema: Optional[str] = None, **kw: Any, ) -> str: - """Return view definition. + """Return plain or materialized view definition. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_view_definition`. Given a :class:`_engine.Connection`, a string - `view_name`, and an optional string ``schema``, return the view + ``view_name``, and an optional string ``schema``, return the view definition. """ @@ -1182,7 +1323,7 @@ class Dialect(EventTarget): def get_indexes( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, @@ -1194,13 +1335,42 @@ class Dialect(EventTarget): information as a list of dictionaries corresponding to the :class:`.ReflectedIndex` dictionary. + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_indexes`. + """ + + raise NotImplementedError() + + def get_multi_indexes( + self, + connection: Connection, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + **kw: Any, + ) -> Iterable[Tuple[TableKey, List[ReflectedIndex]]]: + """Return information about indexes in in all tables + in the given ``schema``. + + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_multi_indexes`. + + .. note:: The :class:`_engine.DefaultDialect` provides a default + implementation that will call the single table method for + each object returned by :meth:`Dialect.get_table_names`, + :meth:`Dialect.get_view_names` or + :meth:`Dialect.get_materialized_view_names` depending on the + provided ``kind``. Dialects that want to support a faster + implementation should implement this method. + + .. versionadded:: 2.0 + """ raise NotImplementedError() def get_unique_constraints( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, @@ -1211,13 +1381,42 @@ class Dialect(EventTarget): unique constraint information as a list of dicts corresponding to the :class:`.ReflectedUniqueConstraint` dictionary. + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_unique_constraints`. + """ + + raise NotImplementedError() + + def get_multi_unique_constraints( + self, + connection: Connection, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + **kw: Any, + ) -> Iterable[Tuple[TableKey, List[ReflectedUniqueConstraint]]]: + """Return information about unique constraints in all tables + in the given ``schema``. + + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_multi_unique_constraints`. + + .. note:: The :class:`_engine.DefaultDialect` provides a default + implementation that will call the single table method for + each object returned by :meth:`Dialect.get_table_names`, + :meth:`Dialect.get_view_names` or + :meth:`Dialect.get_materialized_view_names` depending on the + provided ``kind``. Dialects that want to support a faster + implementation should implement this method. + + .. versionadded:: 2.0 + """ raise NotImplementedError() def get_check_constraints( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, @@ -1228,26 +1427,86 @@ class Dialect(EventTarget): check constraint information as a list of dicts corresponding to the :class:`.ReflectedCheckConstraint` dictionary. + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_check_constraints`. + + .. versionadded:: 1.1.0 + + """ + + raise NotImplementedError() + + def get_multi_check_constraints( + self, + connection: Connection, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + **kw: Any, + ) -> Iterable[Tuple[TableKey, List[ReflectedCheckConstraint]]]: + """Return information about check constraints in all tables + in the given ``schema``. + + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_multi_check_constraints`. + + .. note:: The :class:`_engine.DefaultDialect` provides a default + implementation that will call the single table method for + each object returned by :meth:`Dialect.get_table_names`, + :meth:`Dialect.get_view_names` or + :meth:`Dialect.get_materialized_view_names` depending on the + provided ``kind``. Dialects that want to support a faster + implementation should implement this method. + + .. versionadded:: 2.0 + """ raise NotImplementedError() def get_table_options( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, - ) -> Optional[Dict[str, Any]]: - r"""Return the "options" for the table identified by ``table_name`` - as a dictionary. + ) -> Dict[str, Any]: + """Return a dictionary of options specified when ``table_name`` + was created. + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_table_options`. """ - return None + raise NotImplementedError() + + def get_multi_table_options( + self, + connection: Connection, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + **kw: Any, + ) -> Iterable[Tuple[TableKey, Dict[str, Any]]]: + """Return a dictionary of options specified when the tables in the + given schema were created. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_multi_table_options`. + + .. note:: The :class:`_engine.DefaultDialect` provides a default + implementation that will call the single table method for + each object returned by :meth:`Dialect.get_table_names`, + :meth:`Dialect.get_view_names` or + :meth:`Dialect.get_materialized_view_names` depending on the + provided ``kind``. Dialects that want to support a faster + implementation should implement this method. + + .. versionadded:: 2.0 + + """ + raise NotImplementedError() def get_table_comment( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, @@ -1258,6 +1517,8 @@ class Dialect(EventTarget): table comment information as a dictionary corresponding to the :class:`.ReflectedTableComment` dictionary. + This is an internal dialect method. Applications should use + :meth:`.Inspector.get_table_comment`. :raise: ``NotImplementedError`` for dialects that don't support comments. @@ -1268,6 +1529,33 @@ class Dialect(EventTarget): raise NotImplementedError() + def get_multi_table_comment( + self, + connection: Connection, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + **kw: Any, + ) -> Iterable[Tuple[TableKey, ReflectedTableComment]]: + """Return information about the table comment in all tables + in the given ``schema``. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.get_multi_table_comment`. + + .. note:: The :class:`_engine.DefaultDialect` provides a default + implementation that will call the single table method for + each object returned by :meth:`Dialect.get_table_names`, + :meth:`Dialect.get_view_names` or + :meth:`Dialect.get_materialized_view_names` depending on the + provided ``kind``. Dialects that want to support a faster + implementation should implement this method. + + .. versionadded:: 2.0 + + """ + + raise NotImplementedError() + def normalize_name(self, name: str) -> str: """convert the given name to lowercase if it is detected as case insensitive. @@ -1290,7 +1578,7 @@ class Dialect(EventTarget): def has_table( self, - connection: "Connection", + connection: Connection, table_name: str, schema: Optional[str] = None, **kw: Any, @@ -1327,21 +1615,24 @@ class Dialect(EventTarget): def has_index( self, - connection: "Connection", + connection: Connection, table_name: str, index_name: str, schema: Optional[str] = None, + **kw: Any, ) -> bool: """Check the existence of a particular index name in the database. Given a :class:`_engine.Connection` object, a string - ``table_name`` and string index name, return True if an index of the - given name on the given table exists, false otherwise. + ``table_name`` and string index name, return ``True`` if an index of + the given name on the given table exists, ``False`` otherwise. The :class:`.DefaultDialect` implements this in terms of the :meth:`.Dialect.has_table` and :meth:`.Dialect.get_indexes` methods, however dialects can implement a more performant version. + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.has_index`. .. versionadded:: 1.4 @@ -1351,7 +1642,7 @@ class Dialect(EventTarget): def has_sequence( self, - connection: "Connection", + connection: Connection, sequence_name: str, schema: Optional[str] = None, **kw: Any, @@ -1359,13 +1650,39 @@ class Dialect(EventTarget): """Check the existence of a particular sequence in the database. Given a :class:`_engine.Connection` object and a string - `sequence_name`, return True if the given sequence exists in - the database, False otherwise. + `sequence_name`, return ``True`` if the given sequence exists in + the database, ``False`` otherwise. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.has_sequence`. + """ + + raise NotImplementedError() + + def has_schema( + self, connection: Connection, schema_name: str, **kw: Any + ) -> bool: + """Check the existence of a particular schema name in the database. + + Given a :class:`_engine.Connection` object, a string + ``schema_name``, return ``True`` if a schema of the + given exists, ``False`` otherwise. + + The :class:`.DefaultDialect` implements this by checking + the presence of ``schema_name`` among the schemas returned by + :meth:`.Dialect.get_schema_names`, + however dialects can implement a more performant version. + + This is an internal dialect method. Applications should use + :meth:`_engine.Inspector.has_schema`. + + .. versionadded:: 2.0 + """ raise NotImplementedError() - def _get_server_version_info(self, connection: "Connection") -> Any: + def _get_server_version_info(self, connection: Connection) -> Any: """Retrieve the server version info from the given connection. This is used by the default implementation to populate the @@ -1376,7 +1693,7 @@ class Dialect(EventTarget): raise NotImplementedError() - def _get_default_schema_name(self, connection: "Connection") -> str: + def _get_default_schema_name(self, connection: Connection) -> str: """Return the string name of the currently selected schema from the given connection. @@ -1481,7 +1798,7 @@ class Dialect(EventTarget): raise NotImplementedError() - def do_savepoint(self, connection: "Connection", name: str) -> None: + def do_savepoint(self, connection: Connection, name: str) -> None: """Create a savepoint with the given name. :param connection: a :class:`_engine.Connection`. @@ -1492,7 +1809,7 @@ class Dialect(EventTarget): raise NotImplementedError() def do_rollback_to_savepoint( - self, connection: "Connection", name: str + self, connection: Connection, name: str ) -> None: """Rollback a connection to the named savepoint. @@ -1503,9 +1820,7 @@ class Dialect(EventTarget): raise NotImplementedError() - def do_release_savepoint( - self, connection: "Connection", name: str - ) -> None: + def do_release_savepoint(self, connection: Connection, name: str) -> None: """Release the named savepoint on a connection. :param connection: a :class:`_engine.Connection`. @@ -1514,7 +1829,7 @@ class Dialect(EventTarget): raise NotImplementedError() - def do_begin_twophase(self, connection: "Connection", xid: Any) -> None: + def do_begin_twophase(self, connection: Connection, xid: Any) -> None: """Begin a two phase transaction on the given connection. :param connection: a :class:`_engine.Connection`. @@ -1524,7 +1839,7 @@ class Dialect(EventTarget): raise NotImplementedError() - def do_prepare_twophase(self, connection: "Connection", xid: Any) -> None: + def do_prepare_twophase(self, connection: Connection, xid: Any) -> None: """Prepare a two phase transaction on the given connection. :param connection: a :class:`_engine.Connection`. @@ -1536,7 +1851,7 @@ class Dialect(EventTarget): def do_rollback_twophase( self, - connection: "Connection", + connection: Connection, xid: Any, is_prepared: bool = True, recover: bool = False, @@ -1555,7 +1870,7 @@ class Dialect(EventTarget): def do_commit_twophase( self, - connection: "Connection", + connection: Connection, xid: Any, is_prepared: bool = True, recover: bool = False, @@ -1573,7 +1888,7 @@ class Dialect(EventTarget): raise NotImplementedError() - def do_recover_twophase(self, connection: "Connection") -> List[Any]: + def do_recover_twophase(self, connection: Connection) -> List[Any]: """Recover list of uncommitted prepared two phase transaction identifiers on the given connection. diff --git a/lib/sqlalchemy/engine/reflection.py b/lib/sqlalchemy/engine/reflection.py index 4fc57d5f4..32c89106b 100644 --- a/lib/sqlalchemy/engine/reflection.py +++ b/lib/sqlalchemy/engine/reflection.py @@ -27,39 +27,148 @@ methods such as get_table_names, get_columns, etc. from __future__ import annotations import contextlib +from dataclasses import dataclass +from enum import auto +from enum import Flag +from enum import unique +from typing import Any +from typing import Callable +from typing import Collection +from typing import Dict +from typing import Generator +from typing import Iterable from typing import List from typing import Optional +from typing import Sequence +from typing import Set +from typing import Tuple +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union from .base import Connection from .base import Engine -from .interfaces import ReflectedColumn from .. import exc from .. import inspection from .. import sql from .. import util from ..sql import operators from ..sql import schema as sa_schema +from ..sql.cache_key import _ad_hoc_cache_key_from_args +from ..sql.elements import TextClause from ..sql.type_api import TypeEngine +from ..sql.visitors import InternalTraversal from ..util import topological +from ..util.typing import final + +if TYPE_CHECKING: + from .interfaces import Dialect + from .interfaces import ReflectedCheckConstraint + from .interfaces import ReflectedColumn + from .interfaces import ReflectedForeignKeyConstraint + from .interfaces import ReflectedIndex + from .interfaces import ReflectedPrimaryKeyConstraint + from .interfaces import ReflectedTableComment + from .interfaces import ReflectedUniqueConstraint + from .interfaces import TableKey + +_R = TypeVar("_R") @util.decorator -def cache(fn, self, con, *args, **kw): +def cache( + fn: Callable[..., _R], + self: Dialect, + con: Connection, + *args: Any, + **kw: Any, +) -> _R: info_cache = kw.get("info_cache", None) if info_cache is None: return fn(self, con, *args, **kw) + exclude = {"info_cache", "unreflectable"} key = ( fn.__name__, tuple(a for a in args if isinstance(a, str)), - tuple((k, v) for k, v in kw.items() if k != "info_cache"), + tuple((k, v) for k, v in kw.items() if k not in exclude), ) - ret = info_cache.get(key) + ret: _R = info_cache.get(key) if ret is None: ret = fn(self, con, *args, **kw) info_cache[key] = ret return ret +def flexi_cache( + *traverse_args: Tuple[str, InternalTraversal] +) -> Callable[[Callable[..., _R]], Callable[..., _R]]: + @util.decorator + def go( + fn: Callable[..., _R], + self: Dialect, + con: Connection, + *args: Any, + **kw: Any, + ) -> _R: + info_cache = kw.get("info_cache", None) + if info_cache is None: + return fn(self, con, *args, **kw) + key = _ad_hoc_cache_key_from_args((fn.__name__,), traverse_args, args) + ret: _R = info_cache.get(key) + if ret is None: + ret = fn(self, con, *args, **kw) + info_cache[key] = ret + return ret + + return go + + +@unique +class ObjectKind(Flag): + """Enumerator that indicates which kind of object to return when calling + the ``get_multi`` methods. + + This is a Flag enum, so custom combinations can be passed. For example, + to reflect tables and plain views ``ObjectKind.TABLE | ObjectKind.VIEW`` + may be used. + + .. note:: + Not all dialect may support all kind of object. If a dialect does + not support a particular object an empty dict is returned. + In case a dialect supports an object, but the requested method + is not applicable for the specified kind the default value + will be returned for each reflected object. For example reflecting + check constraints of view return a dict with all the views with + empty lists as values. + """ + + TABLE = auto() + "Reflect table objects" + VIEW = auto() + "Reflect plain view objects" + MATERIALIZED_VIEW = auto() + "Reflect materialized view object" + + ANY_VIEW = VIEW | MATERIALIZED_VIEW + "Reflect any kind of view objects" + ANY = TABLE | VIEW | MATERIALIZED_VIEW + "Reflect all type of objects" + + +@unique +class ObjectScope(Flag): + """Enumerator that indicates which scope to use when calling + the ``get_multi`` methods. + """ + + DEFAULT = auto() + "Include default scope" + TEMPORARY = auto() + "Include only temp scope" + ANY = DEFAULT | TEMPORARY + "Include both default and temp scope" + + @inspection._self_inspects class Inspector(inspection.Inspectable["Inspector"]): """Performs database schema inspection. @@ -85,6 +194,12 @@ class Inspector(inspection.Inspectable["Inspector"]): """ + bind: Union[Engine, Connection] + engine: Engine + _op_context_requires_connect: bool + dialect: Dialect + info_cache: Dict[Any, Any] + @util.deprecated( "1.4", "The __init__() method on :class:`_reflection.Inspector` " @@ -96,7 +211,7 @@ class Inspector(inspection.Inspectable["Inspector"]): "in order to " "acquire an :class:`_reflection.Inspector`.", ) - def __init__(self, bind): + def __init__(self, bind: Union[Engine, Connection]): """Initialize a new :class:`_reflection.Inspector`. :param bind: a :class:`~sqlalchemy.engine.Connection`, @@ -108,38 +223,51 @@ class Inspector(inspection.Inspectable["Inspector"]): :meth:`_reflection.Inspector.from_engine` """ - return self._init_legacy(bind) + self._init_legacy(bind) @classmethod - def _construct(cls, init, bind): + def _construct( + cls, init: Callable[..., Any], bind: Union[Engine, Connection] + ) -> Inspector: if hasattr(bind.dialect, "inspector"): - cls = bind.dialect.inspector + cls = bind.dialect.inspector # type: ignore[attr-defined] self = cls.__new__(cls) init(self, bind) return self - def _init_legacy(self, bind): + def _init_legacy(self, bind: Union[Engine, Connection]) -> None: if hasattr(bind, "exec_driver_sql"): - self._init_connection(bind) + self._init_connection(bind) # type: ignore[arg-type] else: - self._init_engine(bind) + self._init_engine(bind) # type: ignore[arg-type] - def _init_engine(self, engine): + def _init_engine(self, engine: Engine) -> None: self.bind = self.engine = engine engine.connect().close() self._op_context_requires_connect = True self.dialect = self.engine.dialect self.info_cache = {} - def _init_connection(self, connection): + def _init_connection(self, connection: Connection) -> None: self.bind = connection self.engine = connection.engine self._op_context_requires_connect = False self.dialect = self.engine.dialect self.info_cache = {} + def clear_cache(self) -> None: + """reset the cache for this :class:`.Inspector`. + + Inspection methods that have data cached will emit SQL queries + when next called to get new data. + + .. versionadded:: 2.0 + + """ + self.info_cache.clear() + @classmethod @util.deprecated( "1.4", @@ -152,7 +280,7 @@ class Inspector(inspection.Inspectable["Inspector"]): "in order to " "acquire an :class:`_reflection.Inspector`.", ) - def from_engine(cls, bind): + def from_engine(cls, bind: Engine) -> Inspector: """Construct a new dialect-specific Inspector object from the given engine or connection. @@ -172,15 +300,15 @@ class Inspector(inspection.Inspectable["Inspector"]): return cls._construct(cls._init_legacy, bind) @inspection._inspects(Engine) - def _engine_insp(bind): + def _engine_insp(bind: Engine) -> Inspector: # type: ignore[misc] return Inspector._construct(Inspector._init_engine, bind) @inspection._inspects(Connection) - def _connection_insp(bind): + def _connection_insp(bind: Connection) -> Inspector: # type: ignore[misc] return Inspector._construct(Inspector._init_connection, bind) @contextlib.contextmanager - def _operation_context(self): + def _operation_context(self) -> Generator[Connection, None, None]: """Return a context that optimizes for multiple operations on a single transaction. @@ -189,10 +317,11 @@ class Inspector(inspection.Inspectable["Inspector"]): :class:`_engine.Connection`. """ + conn: Connection if self._op_context_requires_connect: - conn = self.bind.connect() + conn = self.bind.connect() # type: ignore[union-attr] else: - conn = self.bind + conn = self.bind # type: ignore[assignment] try: yield conn finally: @@ -200,7 +329,7 @@ class Inspector(inspection.Inspectable["Inspector"]): conn.close() @contextlib.contextmanager - def _inspection_context(self): + def _inspection_context(self) -> Generator[Inspector, None, None]: """Return an :class:`_reflection.Inspector` from this one that will run all operations on a single connection. @@ -213,7 +342,7 @@ class Inspector(inspection.Inspectable["Inspector"]): yield sub_insp @property - def default_schema_name(self): + def default_schema_name(self) -> Optional[str]: """Return the default schema name presented by the dialect for the current engine's database user. @@ -223,30 +352,38 @@ class Inspector(inspection.Inspectable["Inspector"]): """ return self.dialect.default_schema_name - def get_schema_names(self): - """Return all schema names.""" + def get_schema_names(self, **kw: Any) -> List[str]: + r"""Return all schema names. - if hasattr(self.dialect, "get_schema_names"): - with self._operation_context() as conn: - return self.dialect.get_schema_names( - conn, info_cache=self.info_cache - ) - return [] + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + """ - def get_table_names(self, schema=None): - """Return all table names in referred to within a particular schema. + with self._operation_context() as conn: + return self.dialect.get_schema_names( + conn, info_cache=self.info_cache, **kw + ) + + def get_table_names( + self, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + r"""Return all table names within a particular schema. The names are expected to be real tables only, not views. Views are instead returned using the - :meth:`_reflection.Inspector.get_view_names` - method. - + :meth:`_reflection.Inspector.get_view_names` and/or + :meth:`_reflection.Inspector.get_materialized_view_names` + methods. :param schema: Schema name. If ``schema`` is left at ``None``, the database's default schema is used, else the named schema is searched. If the database does not support named schemas, behavior is undefined if ``schema`` is not passed as ``None``. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. seealso:: @@ -258,43 +395,105 @@ class Inspector(inspection.Inspectable["Inspector"]): with self._operation_context() as conn: return self.dialect.get_table_names( - conn, schema, info_cache=self.info_cache + conn, schema, info_cache=self.info_cache, **kw ) - def has_table(self, table_name, schema=None): - """Return True if the backend has a table or view of the given name. + def has_table( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> bool: + r"""Return True if the backend has a table or view of the given name. :param table_name: name of the table to check :param schema: schema name to query, if not the default schema. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. versionadded:: 1.4 - the :meth:`.Inspector.has_table` method replaces the :meth:`_engine.Engine.has_table` method. - .. versionchanged:: 2.0:: The method checks also for views. + .. versionchanged:: 2.0:: The method checks also for any type of + views (plain or materialized). In previous version this behaviour was dialect specific. New dialect suite tests were added to ensure all dialect conform with this behaviour. """ - # TODO: info_cache? with self._operation_context() as conn: - return self.dialect.has_table(conn, table_name, schema) + return self.dialect.has_table( + conn, table_name, schema, info_cache=self.info_cache, **kw + ) - def has_sequence(self, sequence_name, schema=None): - """Return True if the backend has a table of the given name. + def has_sequence( + self, sequence_name: str, schema: Optional[str] = None, **kw: Any + ) -> bool: + r"""Return True if the backend has a sequence with the given name. - :param sequence_name: name of the table to check + :param sequence_name: name of the sequence to check :param schema: schema name to query, if not the default schema. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. versionadded:: 1.4 """ - # TODO: info_cache? with self._operation_context() as conn: - return self.dialect.has_sequence(conn, sequence_name, schema) + return self.dialect.has_sequence( + conn, sequence_name, schema, info_cache=self.info_cache, **kw + ) + + def has_index( + self, + table_name: str, + index_name: str, + schema: Optional[str] = None, + **kw: Any, + ) -> bool: + r"""Check the existence of a particular index name in the database. + + :param table_name: the name of the table the index belongs to + :param index_name: the name of the index to check + :param schema: schema name to query, if not the default schema. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + .. versionadded:: 2.0 + + """ + with self._operation_context() as conn: + return self.dialect.has_index( + conn, + table_name, + index_name, + schema, + info_cache=self.info_cache, + **kw, + ) + + def has_schema(self, schema_name: str, **kw: Any) -> bool: + r"""Return True if the backend has a schema with the given name. + + :param schema_name: name of the schema to check + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + .. versionadded:: 2.0 + + """ + with self._operation_context() as conn: + return self.dialect.has_schema( + conn, schema_name, info_cache=self.info_cache, **kw + ) - def get_sorted_table_and_fkc_names(self, schema=None): - """Return dependency-sorted table and foreign key constraint names in + def get_sorted_table_and_fkc_names( + self, + schema: Optional[str] = None, + **kw: Any, + ) -> List[Tuple[Optional[str], List[Tuple[str, Optional[str]]]]]: + r"""Return dependency-sorted table and foreign key constraint names in referred to within a particular schema. This will yield 2-tuples of @@ -309,6 +508,11 @@ class Inspector(inspection.Inspectable["Inspector"]): .. versionadded:: 1.0.- + :param schema: schema name to query, if not the default schema. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + .. seealso:: :meth:`_reflection.Inspector.get_table_names` @@ -317,24 +521,74 @@ class Inspector(inspection.Inspectable["Inspector"]): with an already-given :class:`_schema.MetaData`. """ - with self._operation_context() as conn: - tnames = self.dialect.get_table_names( - conn, schema, info_cache=self.info_cache + + return [ + ( + table_key[1] if table_key else None, + [(tname, fks) for (_, tname), fks in fk_collection], ) + for ( + table_key, + fk_collection, + ) in self.sort_tables_on_foreign_key_dependency( + consider_schemas=(schema,) + ) + ] - tuples = set() - remaining_fkcs = set() + def sort_tables_on_foreign_key_dependency( + self, + consider_schemas: Collection[Optional[str]] = (None,), + **kw: Any, + ) -> List[ + Tuple[ + Optional[Tuple[Optional[str], str]], + List[Tuple[Tuple[Optional[str], str], Optional[str]]], + ] + ]: + r"""Return dependency-sorted table and foreign key constraint names + referred to within multiple schemas. + + This method may be compared to + :meth:`.Inspector.get_sorted_table_and_fkc_names`, which + works on one schema at a time; here, the method is a generalization + that will consider multiple schemas at once including that it will + resolve for cross-schema foreign keys. + + .. versionadded:: 2.0 - fknames_for_table = {} - for tname in tnames: - fkeys = self.get_foreign_keys(tname, schema) - fknames_for_table[tname] = set([fk["name"] for fk in fkeys]) - for fkey in fkeys: - if tname != fkey["referred_table"]: - tuples.add((fkey["referred_table"], tname)) + """ + SchemaTab = Tuple[Optional[str], str] + + tuples: Set[Tuple[SchemaTab, SchemaTab]] = set() + remaining_fkcs: Set[Tuple[SchemaTab, Optional[str]]] = set() + fknames_for_table: Dict[SchemaTab, Set[Optional[str]]] = {} + tnames: List[SchemaTab] = [] + + for schname in consider_schemas: + schema_fkeys = self.get_multi_foreign_keys(schname, **kw) + tnames.extend(schema_fkeys) + for (_, tname), fkeys in schema_fkeys.items(): + fknames_for_table[(schname, tname)] = set( + [fk["name"] for fk in fkeys] + ) + for fkey in fkeys: + if ( + tname != fkey["referred_table"] + or schname != fkey["referred_schema"] + ): + tuples.add( + ( + ( + fkey["referred_schema"], + fkey["referred_table"], + ), + (schname, tname), + ) + ) try: candidate_sort = list(topological.sort(tuples, tnames)) except exc.CircularDependencyError as err: + edge: Tuple[SchemaTab, SchemaTab] for edge in err.edges: tuples.remove(edge) remaining_fkcs.update( @@ -342,16 +596,32 @@ class Inspector(inspection.Inspectable["Inspector"]): ) candidate_sort = list(topological.sort(tuples, tnames)) - return [ - (tname, fknames_for_table[tname].difference(remaining_fkcs)) - for tname in candidate_sort - ] + [(None, list(remaining_fkcs))] + ret: List[ + Tuple[Optional[SchemaTab], List[Tuple[SchemaTab, Optional[str]]]] + ] + ret = [ + ( + (schname, tname), + [ + ((schname, tname), fk) + for fk in fknames_for_table[(schname, tname)].difference( + name for _, name in remaining_fkcs + ) + ], + ) + for (schname, tname) in candidate_sort + ] + return ret + [(None, list(remaining_fkcs))] - def get_temp_table_names(self): - """Return a list of temporary table names for the current bind. + def get_temp_table_names(self, **kw: Any) -> List[str]: + r"""Return a list of temporary table names for the current bind. This method is unsupported by most dialects; currently - only SQLite implements it. + only Oracle, PostgreSQL and SQLite implements it. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. versionadded:: 1.0.0 @@ -359,28 +629,35 @@ class Inspector(inspection.Inspectable["Inspector"]): with self._operation_context() as conn: return self.dialect.get_temp_table_names( - conn, info_cache=self.info_cache + conn, info_cache=self.info_cache, **kw ) - def get_temp_view_names(self): - """Return a list of temporary view names for the current bind. + def get_temp_view_names(self, **kw: Any) -> List[str]: + r"""Return a list of temporary view names for the current bind. This method is unsupported by most dialects; currently - only SQLite implements it. + only PostgreSQL and SQLite implements it. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. versionadded:: 1.0.0 """ with self._operation_context() as conn: return self.dialect.get_temp_view_names( - conn, info_cache=self.info_cache + conn, info_cache=self.info_cache, **kw ) - def get_table_options(self, table_name, schema=None, **kw): - """Return a dictionary of options specified when the table of the + def get_table_options( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> Dict[str, Any]: + r"""Return a dictionary of options specified when the table of the given name was created. - This currently includes some options that apply to MySQL tables. + This currently includes some options that apply to MySQL and Oracle + tables. :param table_name: string name of the table. For special quoting, use :class:`.quoted_name`. @@ -389,60 +666,172 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dict with the table options. The returned keys depend on the + dialect in use. Each one is prefixed with the dialect name. + """ - if hasattr(self.dialect, "get_table_options"): - with self._operation_context() as conn: - return self.dialect.get_table_options( - conn, table_name, schema, info_cache=self.info_cache, **kw - ) - return {} + with self._operation_context() as conn: + return self.dialect.get_table_options( + conn, table_name, schema, info_cache=self.info_cache, **kw + ) + + def get_multi_table_options( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, Dict[str, Any]]: + r"""Return a dictionary of options specified when the tables in the + given schema were created. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + This currently includes some options that apply to MySQL and Oracle + tables. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if options of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are dictionaries with the table options. + The returned keys in each dict depend on the + dialect in use. Each one is prefixed with the dialect name. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + with self._operation_context() as conn: + res = self.dialect.get_multi_table_options( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + return dict(res) - def get_view_names(self, schema=None): - """Return all view names in `schema`. + def get_view_names( + self, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + r"""Return all non-materialized view names in `schema`. :param schema: Optional, retrieve names from a non-default schema. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + + .. versionchanged:: 2.0 For those dialects that previously included + the names of materialized views in this list (currently PostgreSQL), + this method no longer returns the names of materialized views. + the :meth:`.Inspector.get_materialized_view_names` method should + be used instead. + + .. seealso:: + + :meth:`.Inspector.get_materialized_view_names` """ with self._operation_context() as conn: return self.dialect.get_view_names( - conn, schema, info_cache=self.info_cache + conn, schema, info_cache=self.info_cache, **kw + ) + + def get_materialized_view_names( + self, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + r"""Return all materialized view names in `schema`. + + :param schema: Optional, retrieve names from a non-default schema. + For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + .. versionadded:: 2.0 + + .. seealso:: + + :meth:`.Inspector.get_view_names` + + """ + + with self._operation_context() as conn: + return self.dialect.get_materialized_view_names( + conn, schema, info_cache=self.info_cache, **kw ) - def get_sequence_names(self, schema=None): - """Return all sequence names in `schema`. + def get_sequence_names( + self, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + r"""Return all sequence names in `schema`. :param schema: Optional, retrieve names from a non-default schema. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. """ with self._operation_context() as conn: return self.dialect.get_sequence_names( - conn, schema, info_cache=self.info_cache + conn, schema, info_cache=self.info_cache, **kw ) - def get_view_definition(self, view_name, schema=None): - """Return definition for `view_name`. + def get_view_definition( + self, view_name: str, schema: Optional[str] = None, **kw: Any + ) -> str: + r"""Return definition for the plain or materialized view called + ``view_name``. + :param view_name: Name of the view. :param schema: Optional, retrieve names from a non-default schema. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. """ with self._operation_context() as conn: return self.dialect.get_view_definition( - conn, view_name, schema, info_cache=self.info_cache + conn, view_name, schema, info_cache=self.info_cache, **kw ) def get_columns( - self, table_name: str, schema: Optional[str] = None, **kw + self, table_name: str, schema: Optional[str] = None, **kw: Any ) -> List[ReflectedColumn]: - """Return information about columns in `table_name`. + r"""Return information about columns in ``table_name``. - Given a string `table_name` and an optional string `schema`, return - column information as a list of dicts with these keys: + Given a string ``table_name`` and an optional string ``schema``, + return column information as a list of dicts with these keys: * ``name`` - the column's name @@ -487,6 +876,10 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + :return: list of dictionaries, each representing the definition of a database column. @@ -496,17 +889,83 @@ class Inspector(inspection.Inspectable["Inspector"]): col_defs = self.dialect.get_columns( conn, table_name, schema, info_cache=self.info_cache, **kw ) - for col_def in col_defs: - # make this easy and only return instances for coltype - coltype = col_def["type"] - if not isinstance(coltype, TypeEngine): - col_def["type"] = coltype() + if col_defs: + self._instantiate_types([col_defs]) return col_defs - def get_pk_constraint(self, table_name, schema=None, **kw): - """Return information about primary key constraint on `table_name`. + def _instantiate_types( + self, data: Iterable[List[ReflectedColumn]] + ) -> None: + # make this easy and only return instances for coltype + for col_defs in data: + for col_def in col_defs: + coltype = col_def["type"] + if not isinstance(coltype, TypeEngine): + col_def["type"] = coltype() + + def get_multi_columns( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedColumn]]: + r"""Return information about columns in all objects in the given schema. + + The objects can be filtered by passing the names to use to + ``filter_names``. + + The column information is as described in + :meth:`Inspector.get_columns`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if columns of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing the + definition of a database column. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + table_col_defs = dict( + self.dialect.get_multi_columns( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + self._instantiate_types(table_col_defs.values()) + return table_col_defs + + def get_pk_constraint( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> ReflectedPrimaryKeyConstraint: + r"""Return information about primary key constraint in ``table_name``. - Given a string `table_name`, and an optional string `schema`, return + Given a string ``table_name``, and an optional string `schema`, return primary key information as a dictionary with these keys: * ``constrained_columns`` - @@ -522,16 +981,80 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary representing the definition of + a primary key constraint. + """ with self._operation_context() as conn: return self.dialect.get_pk_constraint( conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_foreign_keys(self, table_name, schema=None, **kw): - """Return information about foreign_keys in `table_name`. + def get_multi_pk_constraint( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, ReflectedPrimaryKeyConstraint]: + r"""Return information about primary key constraints in + all tables in the given schema. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + The primary key information is as described in + :meth:`Inspector.get_pk_constraint`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if primary keys of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are dictionaries, each representing the + definition of a primary key constraint. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_pk_constraint( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + + def get_foreign_keys( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> List[ReflectedForeignKeyConstraint]: + r"""Return information about foreign_keys in ``table_name``. - Given a string `table_name`, and an optional string `schema`, return + Given a string ``table_name``, and an optional string `schema`, return foreign key information as a list of dicts with these keys: * ``constrained_columns`` - @@ -557,6 +1080,13 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a list of dictionaries, each representing the + a foreign key definition. + """ with self._operation_context() as conn: @@ -564,10 +1094,68 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_indexes(self, table_name, schema=None, **kw): - """Return information about indexes in `table_name`. + def get_multi_foreign_keys( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedForeignKeyConstraint]]: + r"""Return information about foreign_keys in all tables + in the given schema. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + The foreign key informations as described in + :meth:`Inspector.get_foreign_keys`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if foreign keys of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing + a foreign key definition. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_foreign_keys( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) - Given a string `table_name` and an optional string `schema`, return + def get_indexes( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> List[ReflectedIndex]: + r"""Return information about indexes in ``table_name``. + + Given a string ``table_name`` and an optional string `schema`, return index information as a list of dicts with these keys: * ``name`` - @@ -598,6 +1186,13 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a list of dictionaries, each representing the + definition of an index. + """ with self._operation_context() as conn: @@ -605,10 +1200,71 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_unique_constraints(self, table_name, schema=None, **kw): - """Return information about unique constraints in `table_name`. + def get_multi_indexes( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedIndex]]: + r"""Return information about indexes in in all objects + in the given schema. + + The objects can be filtered by passing the names to use to + ``filter_names``. + + The foreign key information is as described in + :meth:`Inspector.get_foreign_keys`. + + The indexes information as described in + :meth:`Inspector.get_indexes`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if indexes of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing the + definition of an index. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_indexes( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + + def get_unique_constraints( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> List[ReflectedUniqueConstraint]: + r"""Return information about unique constraints in ``table_name``. - Given a string `table_name` and an optional string `schema`, return + Given a string ``table_name`` and an optional string `schema`, return unique constraint information as a list of dicts with these keys: * ``name`` - @@ -624,6 +1280,13 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a list of dictionaries, each representing the + definition of an unique constraint. + """ with self._operation_context() as conn: @@ -631,8 +1294,66 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_table_comment(self, table_name, schema=None, **kw): - """Return information about the table comment for ``table_name``. + def get_multi_unique_constraints( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedUniqueConstraint]]: + r"""Return information about unique constraints in all tables + in the given schema. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + The unique constraint information is as described in + :meth:`Inspector.get_unique_constraints`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if constraints of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing the + definition of an unique constraint. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_unique_constraints( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + + def get_table_comment( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> ReflectedTableComment: + r"""Return information about the table comment for ``table_name``. Given a string ``table_name`` and an optional string ``schema``, return table comment information as a dictionary with these keys: @@ -643,8 +1364,20 @@ class Inspector(inspection.Inspectable["Inspector"]): Raises ``NotImplementedError`` for a dialect that does not support comments. - .. versionadded:: 1.2 + :param table_name: string name of the table. For special quoting, + use :class:`.quoted_name`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary, with the table comment. + .. versionadded:: 1.2 """ with self._operation_context() as conn: @@ -652,10 +1385,71 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_check_constraints(self, table_name, schema=None, **kw): - """Return information about check constraints in `table_name`. + def get_multi_table_comment( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, ReflectedTableComment]: + r"""Return information about the table comment in all objects + in the given schema. + + The objects can be filtered by passing the names to use to + ``filter_names``. + + The comment information is as described in + :meth:`Inspector.get_table_comment`. + + Raises ``NotImplementedError`` for a dialect that does not support + comments. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if comments of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are dictionaries, representing the + table comments. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_table_comment( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + + def get_check_constraints( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> List[ReflectedCheckConstraint]: + r"""Return information about check constraints in ``table_name``. - Given a string `table_name` and an optional string `schema`, return + Given a string ``table_name`` and an optional string `schema`, return check constraint information as a list of dicts with these keys: * ``name`` - @@ -677,6 +1471,13 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a list of dictionaries, each representing the + definition of a check constraints. + .. versionadded:: 1.1.0 """ @@ -686,14 +1487,71 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) + def get_multi_check_constraints( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedCheckConstraint]]: + r"""Return information about check constraints in all tables + in the given schema. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + The check constraint information is as described in + :meth:`Inspector.get_check_constraints`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if constraints of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing the + definition of a check constraints. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_check_constraints( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + def reflect_table( self, - table, - include_columns, - exclude_columns=(), - resolve_fks=True, - _extend_on=None, - ): + table: sa_schema.Table, + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str] = (), + resolve_fks: bool = True, + _extend_on: Optional[Set[sa_schema.Table]] = None, + _reflect_info: Optional[_ReflectionInfo] = None, + ) -> None: """Given a :class:`_schema.Table` object, load its internal constructs based on introspection. @@ -741,21 +1599,34 @@ class Inspector(inspection.Inspectable["Inspector"]): if k in table.dialect_kwargs ) + table_key = (schema, table_name) + if _reflect_info is None or table_key not in _reflect_info.columns: + _reflect_info = self._get_reflection_info( + schema, + filter_names=[table_name], + kind=ObjectKind.ANY, + scope=ObjectScope.ANY, + _reflect_info=_reflect_info, + **table.dialect_kwargs, + ) + if table_key in _reflect_info.unreflectable: + raise _reflect_info.unreflectable[table_key] + + if table_key not in _reflect_info.columns: + raise exc.NoSuchTableError(table_name) + # reflect table options, like mysql_engine - tbl_opts = self.get_table_options( - table_name, schema, **table.dialect_kwargs - ) - if tbl_opts: - # add additional kwargs to the Table if the dialect - # returned them - table._validate_dialect_kwargs(tbl_opts) + if _reflect_info.table_options: + tbl_opts = _reflect_info.table_options.get(table_key) + if tbl_opts: + # add additional kwargs to the Table if the dialect + # returned them + table._validate_dialect_kwargs(tbl_opts) found_table = False - cols_by_orig_name = {} + cols_by_orig_name: Dict[str, sa_schema.Column[Any]] = {} - for col_d in self.get_columns( - table_name, schema, **table.dialect_kwargs - ): + for col_d in _reflect_info.columns[table_key]: found_table = True self._reflect_column( @@ -771,12 +1642,12 @@ class Inspector(inspection.Inspectable["Inspector"]): raise exc.NoSuchTableError(table_name) self._reflect_pk( - table_name, schema, table, cols_by_orig_name, exclude_columns + _reflect_info, table_key, table, cols_by_orig_name, exclude_columns ) self._reflect_fk( - table_name, - schema, + _reflect_info, + table_key, table, cols_by_orig_name, include_columns, @@ -787,8 +1658,8 @@ class Inspector(inspection.Inspectable["Inspector"]): ) self._reflect_indexes( - table_name, - schema, + _reflect_info, + table_key, table, cols_by_orig_name, include_columns, @@ -797,8 +1668,8 @@ class Inspector(inspection.Inspectable["Inspector"]): ) self._reflect_unique_constraints( - table_name, - schema, + _reflect_info, + table_key, table, cols_by_orig_name, include_columns, @@ -807,8 +1678,8 @@ class Inspector(inspection.Inspectable["Inspector"]): ) self._reflect_check_constraints( - table_name, - schema, + _reflect_info, + table_key, table, cols_by_orig_name, include_columns, @@ -817,17 +1688,27 @@ class Inspector(inspection.Inspectable["Inspector"]): ) self._reflect_table_comment( - table_name, schema, table, reflection_options + _reflect_info, + table_key, + table, + reflection_options, ) def _reflect_column( - self, table, col_d, include_columns, exclude_columns, cols_by_orig_name - ): + self, + table: sa_schema.Table, + col_d: ReflectedColumn, + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + ) -> None: orig_name = col_d["name"] table.metadata.dispatch.column_reflect(self, table, col_d) - table.dispatch.column_reflect(self, table, col_d) + table.dispatch.column_reflect( # type: ignore[attr-defined] + self, table, col_d + ) # fetch name again as column_reflect is allowed to # change it @@ -840,7 +1721,7 @@ class Inspector(inspection.Inspectable["Inspector"]): coltype = col_d["type"] col_kw = dict( - (k, col_d[k]) + (k, col_d[k]) # type: ignore[literal-required] for k in [ "nullable", "autoincrement", @@ -856,15 +1737,20 @@ class Inspector(inspection.Inspectable["Inspector"]): col_kw.update(col_d["dialect_options"]) colargs = [] + default: Any if col_d.get("default") is not None: - default = col_d["default"] - if isinstance(default, sql.elements.TextClause): - default = sa_schema.DefaultClause(default, _reflected=True) - elif not isinstance(default, sa_schema.FetchedValue): + default_text = col_d["default"] + assert default_text is not None + if isinstance(default_text, TextClause): default = sa_schema.DefaultClause( - sql.text(col_d["default"]), _reflected=True + default_text, _reflected=True ) - + elif not isinstance(default_text, sa_schema.FetchedValue): + default = sa_schema.DefaultClause( + sql.text(default_text), _reflected=True + ) + else: + default = default_text colargs.append(default) if "computed" in col_d: @@ -872,11 +1758,8 @@ class Inspector(inspection.Inspectable["Inspector"]): colargs.append(computed) if "identity" in col_d: - computed = sa_schema.Identity(**col_d["identity"]) - colargs.append(computed) - - if "sequence" in col_d: - self._reflect_col_sequence(col_d, colargs) + identity = sa_schema.Identity(**col_d["identity"]) + colargs.append(identity) cols_by_orig_name[orig_name] = col = sa_schema.Column( name, coltype, *colargs, **col_kw @@ -886,23 +1769,15 @@ class Inspector(inspection.Inspectable["Inspector"]): col.primary_key = True table.append_column(col, replace_existing=True) - def _reflect_col_sequence(self, col_d, colargs): - if "sequence" in col_d: - # TODO: mssql is using this. - seq = col_d["sequence"] - sequence = sa_schema.Sequence(seq["name"], 1, 1) - if "start" in seq: - sequence.start = seq["start"] - if "increment" in seq: - sequence.increment = seq["increment"] - colargs.append(sequence) - def _reflect_pk( - self, table_name, schema, table, cols_by_orig_name, exclude_columns - ): - pk_cons = self.get_pk_constraint( - table_name, schema, **table.dialect_kwargs - ) + self, + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + exclude_columns: Collection[str], + ) -> None: + pk_cons = _reflect_info.pk_constraint.get(table_key) if pk_cons: pk_cols = [ cols_by_orig_name[pk] @@ -919,19 +1794,17 @@ class Inspector(inspection.Inspectable["Inspector"]): def _reflect_fk( self, - table_name, - schema, - table, - cols_by_orig_name, - include_columns, - exclude_columns, - resolve_fks, - _extend_on, - reflection_options, - ): - fkeys = self.get_foreign_keys( - table_name, schema, **table.dialect_kwargs - ) + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + resolve_fks: bool, + _extend_on: Optional[Set[sa_schema.Table]], + reflection_options: Dict[str, Any], + ) -> None: + fkeys = _reflect_info.foreign_keys.get(table_key, []) for fkey_d in fkeys: conname = fkey_d["name"] # look for columns by orig name in cols_by_orig_name, @@ -963,6 +1836,7 @@ class Inspector(inspection.Inspectable["Inspector"]): schema=referred_schema, autoload_with=self.bind, _extend_on=_extend_on, + _reflect_info=_reflect_info, **reflection_options, ) for column in referred_columns: @@ -977,6 +1851,7 @@ class Inspector(inspection.Inspectable["Inspector"]): autoload_with=self.bind, schema=sa_schema.BLANK_SCHEMA, _extend_on=_extend_on, + _reflect_info=_reflect_info, **reflection_options, ) for column in referred_columns: @@ -1005,16 +1880,16 @@ class Inspector(inspection.Inspectable["Inspector"]): def _reflect_indexes( self, - table_name, - schema, - table, - cols_by_orig_name, - include_columns, - exclude_columns, - reflection_options, - ): + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + reflection_options: Dict[str, Any], + ) -> None: # Indexes - indexes = self.get_indexes(table_name, schema) + indexes = _reflect_info.indexes.get(table_key, []) for index_d in indexes: name = index_d["name"] columns = index_d["column_names"] @@ -1034,6 +1909,7 @@ class Inspector(inspection.Inspectable["Inspector"]): continue # look for columns by orig name in cols_by_orig_name, # but support columns that are in-Python only as fallback + idx_col: Any idx_cols = [] for c in columns: try: @@ -1045,7 +1921,7 @@ class Inspector(inspection.Inspectable["Inspector"]): except KeyError: util.warn( "%s key '%s' was not located in " - "columns for table '%s'" % (flavor, c, table_name) + "columns for table '%s'" % (flavor, c, table.name) ) continue c_sorting = column_sorting.get(c, ()) @@ -1063,22 +1939,16 @@ class Inspector(inspection.Inspectable["Inspector"]): def _reflect_unique_constraints( self, - table_name, - schema, - table, - cols_by_orig_name, - include_columns, - exclude_columns, - reflection_options, - ): - + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + reflection_options: Dict[str, Any], + ) -> None: + constraints = _reflect_info.unique_constraints.get(table_key, []) # Unique Constraints - try: - constraints = self.get_unique_constraints(table_name, schema) - except NotImplementedError: - # optional dialect feature - return - for const_d in constraints: conname = const_d["name"] columns = const_d["column_names"] @@ -1104,7 +1974,7 @@ class Inspector(inspection.Inspectable["Inspector"]): except KeyError: util.warn( "unique constraint key '%s' was not located in " - "columns for table '%s'" % (c, table_name) + "columns for table '%s'" % (c, table.name) ) else: constrained_cols.append(constrained_col) @@ -1114,29 +1984,166 @@ class Inspector(inspection.Inspectable["Inspector"]): def _reflect_check_constraints( self, - table_name, - schema, - table, - cols_by_orig_name, - include_columns, - exclude_columns, - reflection_options, - ): - try: - constraints = self.get_check_constraints(table_name, schema) - except NotImplementedError: - # optional dialect feature - return - + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + reflection_options: Dict[str, Any], + ) -> None: + constraints = _reflect_info.check_constraints.get(table_key, []) for const_d in constraints: table.append_constraint(sa_schema.CheckConstraint(**const_d)) def _reflect_table_comment( - self, table_name, schema, table, reflection_options - ): - try: - comment_dict = self.get_table_comment(table_name, schema) - except NotImplementedError: - return + self, + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + reflection_options: Dict[str, Any], + ) -> None: + comment_dict = _reflect_info.table_comment.get(table_key) + if comment_dict: + table.comment = comment_dict["text"] + + def _get_reflection_info( + self, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + available: Optional[Collection[str]] = None, + _reflect_info: Optional[_ReflectionInfo] = None, + **kw: Any, + ) -> _ReflectionInfo: + kw["schema"] = schema + + if filter_names and available and len(filter_names) > 100: + fraction = len(filter_names) / len(available) + else: + fraction = None + + unreflectable: Dict[TableKey, exc.UnreflectableTableError] + kw["unreflectable"] = unreflectable = {} + + has_result: bool = True + + def run( + meth: Any, + *, + optional: bool = False, + check_filter_names_from_meth: bool = False, + ) -> Any: + nonlocal has_result + # simple heuristic to improve reflection performance if a + # dialect implements multi_reflection: + # if more than 50% of the tables in the db are in filter_names + # load all the tables, since it's most likely faster to avoid + # a filter on that many tables. + if ( + fraction is None + or fraction <= 0.5 + or not self.dialect._overrides_default(meth.__name__) + ): + _fn = filter_names + else: + _fn = None + try: + if has_result: + res = meth(filter_names=_fn, **kw) + if check_filter_names_from_meth and not res: + # method returned no result data. + # skip any future call methods + has_result = False + else: + res = {} + except NotImplementedError: + if not optional: + raise + res = {} + return res + + info = _ReflectionInfo( + columns=run( + self.get_multi_columns, check_filter_names_from_meth=True + ), + pk_constraint=run(self.get_multi_pk_constraint), + foreign_keys=run(self.get_multi_foreign_keys), + indexes=run(self.get_multi_indexes), + unique_constraints=run( + self.get_multi_unique_constraints, optional=True + ), + table_comment=run(self.get_multi_table_comment, optional=True), + check_constraints=run( + self.get_multi_check_constraints, optional=True + ), + table_options=run(self.get_multi_table_options, optional=True), + unreflectable=unreflectable, + ) + if _reflect_info: + _reflect_info.update(info) + return _reflect_info else: - table.comment = comment_dict.get("text", None) + return info + + +@final +class ReflectionDefaults: + """provides blank default values for reflection methods.""" + + @classmethod + def columns(cls) -> List[ReflectedColumn]: + return [] + + @classmethod + def pk_constraint(cls) -> ReflectedPrimaryKeyConstraint: + return { # type: ignore # pep-655 not supported + "name": None, + "constrained_columns": [], + } + + @classmethod + def foreign_keys(cls) -> List[ReflectedForeignKeyConstraint]: + return [] + + @classmethod + def indexes(cls) -> List[ReflectedIndex]: + return [] + + @classmethod + def unique_constraints(cls) -> List[ReflectedUniqueConstraint]: + return [] + + @classmethod + def check_constraints(cls) -> List[ReflectedCheckConstraint]: + return [] + + @classmethod + def table_options(cls) -> Dict[str, Any]: + return {} + + @classmethod + def table_comment(cls) -> ReflectedTableComment: + return {"text": None} + + +@dataclass +class _ReflectionInfo: + columns: Dict[TableKey, List[ReflectedColumn]] + pk_constraint: Dict[TableKey, Optional[ReflectedPrimaryKeyConstraint]] + foreign_keys: Dict[TableKey, List[ReflectedForeignKeyConstraint]] + indexes: Dict[TableKey, List[ReflectedIndex]] + # optionals + unique_constraints: Dict[TableKey, List[ReflectedUniqueConstraint]] + table_comment: Dict[TableKey, Optional[ReflectedTableComment]] + check_constraints: Dict[TableKey, List[ReflectedCheckConstraint]] + table_options: Dict[TableKey, Dict[str, Any]] + unreflectable: Dict[TableKey, exc.UnreflectableTableError] + + def update(self, other: _ReflectionInfo) -> None: + for k, v in self.__dict__.items(): + ov = getattr(other, k) + if ov is not None: + if v is None: + setattr(self, k, ov) + else: + v.update(ov) |
