diff options
Diffstat (limited to 'lib/sqlalchemy/orm/util.py')
| -rw-r--r-- | lib/sqlalchemy/orm/util.py | 466 | 
1 files changed, 170 insertions, 296 deletions
| diff --git a/lib/sqlalchemy/orm/util.py b/lib/sqlalchemy/orm/util.py index 75f711007..45c578355 100644 --- a/lib/sqlalchemy/orm/util.py +++ b/lib/sqlalchemy/orm/util.py @@ -5,13 +5,22 @@  # This module is part of SQLAlchemy and is released under  # the MIT License: https://www.opensource.org/licenses/mit-license.php -  import re  import types +import typing +from typing import Any +from typing import Generic +from typing import Optional +from typing import overload +from typing import Tuple +from typing import Type +from typing import TypeVar +from typing import Union  import weakref  from . import attributes  # noqa  from .base import _class_to_mapper  # noqa +from .base import _IdentityKeyType  from .base import _never_set  # noqa  from .base import _none_set  # noqa  from .base import attribute_str  # noqa @@ -45,8 +54,17 @@ from ..sql import util as sql_util  from ..sql import visitors  from ..sql.annotation import SupportsCloneAnnotations  from ..sql.base import ColumnCollection +from ..sql.selectable import FromClause  from ..util.langhelpers import MemoizedSlots +from ..util.typing import de_stringify_annotation +from ..util.typing import is_origin_of + +if typing.TYPE_CHECKING: +    from .mapper import Mapper +    from ..engine import Row +    from ..sql.selectable import Alias +_T = TypeVar("_T", bound=Any)  all_cascades = frozenset(      ( @@ -276,7 +294,28 @@ def polymorphic_union(      return sql.union_all(*result).alias(aliasname) -def identity_key(*args, **kwargs): +@overload +def identity_key( +    class_: type, ident: Tuple[Any, ...], *, identity_token: Optional[str] +) -> _IdentityKeyType: +    ... + + +@overload +def identity_key(*, instance: Any) -> _IdentityKeyType: +    ... + + +@overload +def identity_key( +    class_: type, *, row: "Row", identity_token: Optional[str] +) -> _IdentityKeyType: +    ... + + +def identity_key( +    class_=None, ident=None, *, instance=None, row=None, identity_token=None +) -> _IdentityKeyType:      r"""Generate "identity key" tuples, as are used as keys in the      :attr:`.Session.identity_map` dictionary. @@ -340,29 +379,11 @@ def identity_key(*args, **kwargs):          .. versionadded:: 1.2 added identity_token      """ -    if args: -        row = None -        largs = len(args) -        if largs == 1: -            class_ = args[0] -            try: -                row = kwargs.pop("row") -            except KeyError: -                ident = kwargs.pop("ident") -        elif largs in (2, 3): -            class_, ident = args -        else: -            raise sa_exc.ArgumentError( -                "expected up to three positional arguments, " "got %s" % largs -            ) - -        identity_token = kwargs.pop("identity_token", None) -        if kwargs: -            raise sa_exc.ArgumentError( -                "unknown keyword arguments: %s" % ", ".join(kwargs) -            ) +    if class_ is not None:          mapper = class_mapper(class_)          if row is None: +            if ident is None: +                raise sa_exc.ArgumentError("ident or row is required")              return mapper.identity_key_from_primary_key(                  util.to_list(ident), identity_token=identity_token              ) @@ -370,14 +391,11 @@ def identity_key(*args, **kwargs):              return mapper.identity_key_from_row(                  row, identity_token=identity_token              ) -    else: -        instance = kwargs.pop("instance") -        if kwargs: -            raise sa_exc.ArgumentError( -                "unknown keyword arguments: %s" % ", ".join(kwargs.keys) -            ) +    elif instance is not None:          mapper = object_mapper(instance)          return mapper.identity_key_from_instance(instance) +    else: +        raise sa_exc.ArgumentError("class or instance is required")  class ORMAdapter(sql_util.ColumnAdapter): @@ -420,7 +438,7 @@ class ORMAdapter(sql_util.ColumnAdapter):          return not entity or entity.isa(self.mapper) -class AliasedClass: +class AliasedClass(inspection.Inspectable["AliasedInsp"], Generic[_T]):      r"""Represents an "aliased" form of a mapped class for usage with Query.      The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` @@ -481,7 +499,7 @@ class AliasedClass:      def __init__(          self, -        mapped_class_or_ac, +        mapped_class_or_ac: Union[Type[_T], "Mapper[_T]", "AliasedClass[_T]"],          alias=None,          name=None,          flat=False, @@ -611,6 +629,7 @@ class AliasedInsp(      ORMEntityColumnsClauseRole,      ORMFromClauseRole,      sql_base.HasCacheKey, +    roles.HasFromClauseElement,      InspectionAttr,      MemoizedSlots,  ): @@ -747,6 +766,73 @@ class AliasedInsp(          self._target = mapped_class_or_ac          # self._target = mapper.class_  # mapped_class_or_ac +    @classmethod +    def _alias_factory( +        cls, +        element: Union[ +            Type[_T], "Mapper[_T]", "FromClause", "AliasedClass[_T]" +        ], +        alias=None, +        name=None, +        flat=False, +        adapt_on_names=False, +    ) -> Union["AliasedClass[_T]", "Alias"]: + +        if isinstance(element, FromClause): +            if adapt_on_names: +                raise sa_exc.ArgumentError( +                    "adapt_on_names only applies to ORM elements" +                ) +            if name: +                return element.alias(name=name, flat=flat) +            else: +                return coercions.expect( +                    roles.AnonymizedFromClauseRole, element, flat=flat +                ) +        else: +            return AliasedClass( +                element, +                alias=alias, +                flat=flat, +                name=name, +                adapt_on_names=adapt_on_names, +            ) + +    @classmethod +    def _with_polymorphic_factory( +        cls, +        base, +        classes, +        selectable=False, +        flat=False, +        polymorphic_on=None, +        aliased=False, +        innerjoin=False, +        _use_mapper_path=False, +    ): + +        primary_mapper = _class_to_mapper(base) + +        if selectable not in (None, False) and flat: +            raise sa_exc.ArgumentError( +                "the 'flat' and 'selectable' arguments cannot be passed " +                "simultaneously to with_polymorphic()" +            ) + +        mappers, selectable = primary_mapper._with_polymorphic_args( +            classes, selectable, innerjoin=innerjoin +        ) +        if aliased or flat: +            selectable = selectable._anonymous_fromclause(flat=flat) +        return AliasedClass( +            base, +            selectable, +            with_polymorphic_mappers=mappers, +            with_polymorphic_discriminator=polymorphic_on, +            use_mapper_path=_use_mapper_path, +            represents_outer_join=not innerjoin, +        ) +      @property      def entity(self):          # to eliminate reference cycles, the AliasedClass is held weakly. @@ -1107,215 +1193,6 @@ inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)  inspection._inspects(AliasedInsp)(lambda target: target) -def aliased(element, alias=None, name=None, flat=False, adapt_on_names=False): -    """Produce an alias of the given element, usually an :class:`.AliasedClass` -    instance. - -    E.g.:: - -        my_alias = aliased(MyClass) - -        session.query(MyClass, my_alias).filter(MyClass.id > my_alias.id) - -    The :func:`.aliased` function is used to create an ad-hoc mapping of a -    mapped class to a new selectable.  By default, a selectable is generated -    from the normally mapped selectable (typically a :class:`_schema.Table` -    ) using the -    :meth:`_expression.FromClause.alias` method. However, :func:`.aliased` -    can also be -    used to link the class to a new :func:`_expression.select` statement. -    Also, the :func:`.with_polymorphic` function is a variant of -    :func:`.aliased` that is intended to specify a so-called "polymorphic -    selectable", that corresponds to the union of several joined-inheritance -    subclasses at once. - -    For convenience, the :func:`.aliased` function also accepts plain -    :class:`_expression.FromClause` constructs, such as a -    :class:`_schema.Table` or -    :func:`_expression.select` construct.   In those cases, the -    :meth:`_expression.FromClause.alias` -    method is called on the object and the new -    :class:`_expression.Alias` object returned.  The returned -    :class:`_expression.Alias` is not -    ORM-mapped in this case. - -    .. seealso:: - -        :ref:`tutorial_orm_entity_aliases` - in the :ref:`unified_tutorial` - -        :ref:`orm_queryguide_orm_aliases` - in the :ref:`queryguide_toplevel` - -    :ref:`ormtutorial_aliases` - in the legacy :ref:`ormtutorial_toplevel` - -    :param element: element to be aliased.  Is normally a mapped class, -     but for convenience can also be a :class:`_expression.FromClause` -     element. - -    :param alias: Optional selectable unit to map the element to.  This is -     usually used to link the object to a subquery, and should be an aliased -     select construct as one would produce from the -     :meth:`_query.Query.subquery` method or -     the :meth:`_expression.Select.subquery` or -     :meth:`_expression.Select.alias` methods of the :func:`_expression.select` -     construct. - -    :param name: optional string name to use for the alias, if not specified -     by the ``alias`` parameter.  The name, among other things, forms the -     attribute name that will be accessible via tuples returned by a -     :class:`_query.Query` object.  Not supported when creating aliases -     of :class:`_sql.Join` objects. - -    :param flat: Boolean, will be passed through to the -     :meth:`_expression.FromClause.alias` call so that aliases of -     :class:`_expression.Join` objects will alias the individual tables -     inside the join, rather than creating a subquery.  This is generally -     supported by all modern databases with regards to right-nested joins -     and generally produces more efficient queries. - -    :param adapt_on_names: if True, more liberal "matching" will be used when -     mapping the mapped columns of the ORM entity to those of the -     given selectable - a name-based match will be performed if the -     given selectable doesn't otherwise have a column that corresponds -     to one on the entity.  The use case for this is when associating -     an entity with some derived selectable such as one that uses -     aggregate functions:: - -        class UnitPrice(Base): -            __tablename__ = 'unit_price' -            ... -            unit_id = Column(Integer) -            price = Column(Numeric) - -        aggregated_unit_price = Session.query( -                                    func.sum(UnitPrice.price).label('price') -                                ).group_by(UnitPrice.unit_id).subquery() - -        aggregated_unit_price = aliased(UnitPrice, -                    alias=aggregated_unit_price, adapt_on_names=True) - -     Above, functions on ``aggregated_unit_price`` which refer to -     ``.price`` will return the -     ``func.sum(UnitPrice.price).label('price')`` column, as it is -     matched on the name "price".  Ordinarily, the "price" function -     wouldn't have any "column correspondence" to the actual -     ``UnitPrice.price`` column as it is not a proxy of the original. - -    """ -    if isinstance(element, expression.FromClause): -        if adapt_on_names: -            raise sa_exc.ArgumentError( -                "adapt_on_names only applies to ORM elements" -            ) -        if name: -            return element.alias(name=name, flat=flat) -        else: -            return coercions.expect( -                roles.AnonymizedFromClauseRole, element, flat=flat -            ) -    else: -        return AliasedClass( -            element, -            alias=alias, -            flat=flat, -            name=name, -            adapt_on_names=adapt_on_names, -        ) - - -def with_polymorphic( -    base, -    classes, -    selectable=False, -    flat=False, -    polymorphic_on=None, -    aliased=False, -    innerjoin=False, -    _use_mapper_path=False, -): -    """Produce an :class:`.AliasedClass` construct which specifies -    columns for descendant mappers of the given base. - -    Using this method will ensure that each descendant mapper's -    tables are included in the FROM clause, and will allow filter() -    criterion to be used against those tables.  The resulting -    instances will also have those columns already loaded so that -    no "post fetch" of those columns will be required. - -    .. seealso:: - -        :ref:`with_polymorphic` - full discussion of -        :func:`_orm.with_polymorphic`. - -    :param base: Base class to be aliased. - -    :param classes: a single class or mapper, or list of -        class/mappers, which inherit from the base class. -        Alternatively, it may also be the string ``'*'``, in which case -        all descending mapped classes will be added to the FROM clause. - -    :param aliased: when True, the selectable will be aliased.   For a -        JOIN, this means the JOIN will be SELECTed from inside of a subquery -        unless the :paramref:`_orm.with_polymorphic.flat` flag is set to -        True, which is recommended for simpler use cases. - -    :param flat: Boolean, will be passed through to the -     :meth:`_expression.FromClause.alias` call so that aliases of -     :class:`_expression.Join` objects will alias the individual tables -     inside the join, rather than creating a subquery.  This is generally -     supported by all modern databases with regards to right-nested joins -     and generally produces more efficient queries.  Setting this flag is -     recommended as long as the resulting SQL is functional. - -    :param selectable: a table or subquery that will -        be used in place of the generated FROM clause. This argument is -        required if any of the desired classes use concrete table -        inheritance, since SQLAlchemy currently cannot generate UNIONs -        among tables automatically. If used, the ``selectable`` argument -        must represent the full set of tables and columns mapped by every -        mapped class. Otherwise, the unaccounted mapped columns will -        result in their table being appended directly to the FROM clause -        which will usually lead to incorrect results. - -        When left at its default value of ``False``, the polymorphic -        selectable assigned to the base mapper is used for selecting rows. -        However, it may also be passed as ``None``, which will bypass the -        configured polymorphic selectable and instead construct an ad-hoc -        selectable for the target classes given; for joined table inheritance -        this will be a join that includes all target mappers and their -        subclasses. - -    :param polymorphic_on: a column to be used as the "discriminator" -        column for the given selectable. If not given, the polymorphic_on -        attribute of the base classes' mapper will be used, if any. This -        is useful for mappings that don't have polymorphic loading -        behavior by default. - -    :param innerjoin: if True, an INNER JOIN will be used.  This should -       only be specified if querying for one specific subtype only -    """ -    primary_mapper = _class_to_mapper(base) - -    if selectable not in (None, False) and flat: -        raise sa_exc.ArgumentError( -            "the 'flat' and 'selectable' arguments cannot be passed " -            "simultaneously to with_polymorphic()" -        ) - -    mappers, selectable = primary_mapper._with_polymorphic_args( -        classes, selectable, innerjoin=innerjoin -    ) -    if aliased or flat: -        selectable = selectable._anonymous_fromclause(flat=flat) -    return AliasedClass( -        base, -        selectable, -        with_polymorphic_mappers=mappers, -        with_polymorphic_discriminator=polymorphic_on, -        use_mapper_path=_use_mapper_path, -        represents_outer_join=not innerjoin, -    ) - -  @inspection._self_inspects  class Bundle(      ORMColumnsClauseRole, @@ -1667,62 +1544,6 @@ class _ORMJoin(expression.Join):          return _ORMJoin(self, right, onclause, isouter=True, full=full) -def join( -    left, right, onclause=None, isouter=False, full=False, join_to_left=None -): -    r"""Produce an inner join between left and right clauses. - -    :func:`_orm.join` is an extension to the core join interface -    provided by :func:`_expression.join()`, where the -    left and right selectables may be not only core selectable -    objects such as :class:`_schema.Table`, but also mapped classes or -    :class:`.AliasedClass` instances.   The "on" clause can -    be a SQL expression, or an attribute or string name -    referencing a configured :func:`_orm.relationship`. - -    :func:`_orm.join` is not commonly needed in modern usage, -    as its functionality is encapsulated within that of the -    :meth:`_query.Query.join` method, which features a -    significant amount of automation beyond :func:`_orm.join` -    by itself.  Explicit usage of :func:`_orm.join` -    with :class:`_query.Query` involves usage of the -    :meth:`_query.Query.select_from` method, as in:: - -        from sqlalchemy.orm import join -        session.query(User).\ -            select_from(join(User, Address, User.addresses)).\ -            filter(Address.email_address=='foo@bar.com') - -    In modern SQLAlchemy the above join can be written more -    succinctly as:: - -        session.query(User).\ -                join(User.addresses).\ -                filter(Address.email_address=='foo@bar.com') - -    See :meth:`_query.Query.join` for information on modern usage -    of ORM level joins. - -    .. deprecated:: 0.8 - -        the ``join_to_left`` parameter is deprecated, and will be removed -        in a future release.  The parameter has no effect. - -    """ -    return _ORMJoin(left, right, onclause, isouter, full) - - -def outerjoin(left, right, onclause=None, full=False, join_to_left=None): -    """Produce a left outer join between left and right clauses. - -    This is the "outer join" version of the :func:`_orm.join` function, -    featuring the same behavior except that an OUTER JOIN is generated. -    See that function's documentation for other usage details. - -    """ -    return _ORMJoin(left, right, onclause, True, full) - -  def with_parent(instance, prop, from_entity=None):      """Create filtering criterion that relates this query's primary entity      to the given related instance, using established @@ -1964,3 +1785,56 @@ def _getitem(iterable_query, item):              return list(iterable_query)[-1]          else:              return list(iterable_query[item : item + 1])[0] + + +def _is_mapped_annotation(raw_annotation: Union[type, str], cls: type): +    annotated = de_stringify_annotation(cls, raw_annotation) +    return is_origin_of(annotated, "Mapped", module="sqlalchemy.orm") + + +def _extract_mapped_subtype( +    raw_annotation: Union[type, str], +    cls: type, +    key: str, +    attr_cls: type, +    required: bool, +    is_dataclass_field: bool, +) -> Optional[Union[type, str]]: + +    if raw_annotation is None: + +        if required: +            raise sa_exc.ArgumentError( +                f"Python typing annotation is required for attribute " +                f'"{cls.__name__}.{key}" when primary argument(s) for ' +                f'"{attr_cls.__name__}" construct are None or not present' +            ) +        return None + +    annotated = de_stringify_annotation(cls, raw_annotation) + +    if is_dataclass_field: +        return annotated +    else: +        if ( +            not hasattr(annotated, "__origin__") +            or not issubclass(annotated.__origin__, attr_cls) +            and not issubclass(attr_cls, annotated.__origin__) +        ): +            our_annotated_str = ( +                annotated.__name__ +                if not isinstance(annotated, str) +                else repr(annotated) +            ) +            raise sa_exc.ArgumentError( +                f'Type annotation for "{cls.__name__}.{key}" should use the ' +                f'syntax "Mapped[{our_annotated_str}]" or ' +                f'"{attr_cls.__name__}[{our_annotated_str}]".' +            ) + +        if len(annotated.__args__) != 1: +            raise sa_exc.ArgumentError( +                "Expected sub-type for Mapped[] annotation" +            ) + +        return annotated.__args__[0] | 
