diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-27 12:58:12 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-25 13:56:37 -0400 |
| commit | 6930dfc032c3f9f474e71ab4e021c0ef8384930e (patch) | |
| tree | 34b919a3c34edaffda1750f161a629fc5b9a8020 /lib/sqlalchemy/orm | |
| parent | dce8c7a125cb99fad62c76cd145752d5afefae36 (diff) | |
| download | sqlalchemy-6930dfc032c3f9f474e71ab4e021c0ef8384930e.tar.gz | |
Convert execution to move through Session
This patch replaces the ORM execution flow with a
single pathway through Session.execute() for all queries,
including Core and ORM.
Currently included is full support for ORM Query,
Query.from_statement(), select(), as well as the
baked query and horizontal shard systems. Initial
changes have also been made to the dogpile caching
example, which like baked query makes use of a
new ORM-specific execution hook that replaces the
use of both QueryEvents.before_compile() as well
as Query._execute_and_instances() as the central
ORM interception hooks.
select() and Query() constructs alike can be passed to
Session.execute() where they will return ORM
results in a Results object. This API is currently
used internally by Query. Full support for
Session.execute()->results to behave in a fully
2.0 fashion will be in later changesets.
bulk update/delete with ORM support will also
be delivered via the update() and delete()
constructs, however these have not yet been adapted
to the new system and may follow in a subsequent
update.
Performance is also beginning to lag as of this
commit and some previous ones. It is hoped that
a few central functions such as the coercions
functions can be rewritten in C to re-gain
performance. Additionally, query caching
is now available and some subsequent patches
will attempt to cache more of the per-execution
work from the ORM layer, e.g. column getters
and adapters.
This patch also contains initial "turn on" of the
caching system enginewide via the query_cache_size
parameter to create_engine(). Still defaulting at
zero for "no caching". The caching system still
needs adjustments in order to gain adequate performance.
Change-Id: I047a7ebb26aa85dc01f6789fac2bff561dcd555d
Diffstat (limited to 'lib/sqlalchemy/orm')
| -rw-r--r-- | lib/sqlalchemy/orm/__init__.py | 3 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/attributes.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/context.py | 619 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/events.py | 37 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/interfaces.py | 105 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/loading.py | 231 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/mapper.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/path_registry.py | 19 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/persistence.py | 7 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/properties.py | 27 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/query.py | 312 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/relationships.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/session.py | 319 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/strategies.py | 73 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/util.py | 31 |
15 files changed, 1255 insertions, 538 deletions
diff --git a/lib/sqlalchemy/orm/__init__.py b/lib/sqlalchemy/orm/__init__.py index 0a353f81c..110c27811 100644 --- a/lib/sqlalchemy/orm/__init__.py +++ b/lib/sqlalchemy/orm/__init__.py @@ -30,6 +30,7 @@ from .mapper import reconstructor # noqa from .mapper import validates # noqa from .properties import ColumnProperty # noqa from .query import AliasOption # noqa +from .query import FromStatement # noqa from .query import Query # noqa from .relationships import foreign # noqa from .relationships import RelationshipProperty # noqa @@ -39,8 +40,10 @@ from .session import close_all_sessions # noqa from .session import make_transient # noqa from .session import make_transient_to_detached # noqa from .session import object_session # noqa +from .session import ORMExecuteState # noqa from .session import Session # noqa from .session import sessionmaker # noqa +from .session import SessionTransaction # noqa from .strategy_options import Load # noqa from .util import aliased # noqa from .util import Bundle # noqa diff --git a/lib/sqlalchemy/orm/attributes.py b/lib/sqlalchemy/orm/attributes.py index 7b4415bfe..262a1efc9 100644 --- a/lib/sqlalchemy/orm/attributes.py +++ b/lib/sqlalchemy/orm/attributes.py @@ -207,6 +207,10 @@ class QueryableAttribute( def __clause_element__(self): return self.expression + @property + def _from_objects(self): + return self.expression._from_objects + def _bulk_update_tuples(self, value): """Return setter tuples for a bulk UPDATE.""" diff --git a/lib/sqlalchemy/orm/context.py b/lib/sqlalchemy/orm/context.py index 0a3701134..3acab7df7 100644 --- a/lib/sqlalchemy/orm/context.py +++ b/lib/sqlalchemy/orm/context.py @@ -18,19 +18,21 @@ from .util import Bundle from .util import join as orm_join from .util import ORMAdapter from .. import exc as sa_exc +from .. import future from .. import inspect from .. import sql from .. import util -from ..future.selectable import Select as FutureSelect from ..sql import coercions from ..sql import expression from ..sql import roles from ..sql import util as sql_util from ..sql import visitors from ..sql.base import CacheableOptions +from ..sql.base import CompileState from ..sql.base import Options +from ..sql.selectable import LABEL_STYLE_DISAMBIGUATE_ONLY +from ..sql.selectable import LABEL_STYLE_NONE from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL -from ..sql.selectable import Select from ..sql.selectable import SelectState from ..sql.visitors import ExtendedInternalTraversal from ..sql.visitors import InternalTraversal @@ -44,6 +46,8 @@ class QueryContext(object): "orm_query", "query", "load_options", + "bind_arguments", + "execution_options", "session", "autoflush", "populate_existing", @@ -51,7 +55,7 @@ class QueryContext(object): "version_check", "refresh_state", "create_eager_joins", - "propagate_options", + "propagated_loader_options", "attributes", "runid", "partials", @@ -70,20 +74,30 @@ class QueryContext(object): _yield_per = None _refresh_state = None _lazy_loaded_from = None + _orm_query = None _params = util.immutabledict() - def __init__(self, compile_state, session): - query = compile_state.query + def __init__( + self, + compile_state, + session, + load_options, + execution_options=None, + bind_arguments=None, + ): + self.load_options = load_options + self.execution_options = execution_options or {} + self.bind_arguments = bind_arguments or {} self.compile_state = compile_state self.orm_query = compile_state.orm_query - self.query = compile_state.query + self.query = query = compile_state.query self.session = session - self.load_options = load_options = query.load_options - self.propagate_options = set( + self.propagated_loader_options = { o for o in query._with_options if o.propagate_to_loaders - ) + } + self.attributes = dict(compile_state.attributes) self.autoflush = load_options._autoflush @@ -92,11 +106,7 @@ class QueryContext(object): self.version_check = load_options._version_check self.refresh_state = load_options._refresh_state self.yield_per = load_options._yield_per - - if self.refresh_state is not None: - self.identity_token = load_options._refresh_identity_token - else: - self.identity_token = None + self.identity_token = load_options._refresh_identity_token if self.yield_per and compile_state._no_yield_pers: raise sa_exc.InvalidRequestError( @@ -119,25 +129,10 @@ class QueryContext(object): ) -class QueryCompileState(sql.base.CompileState): - _joinpath = _joinpoint = util.immutabledict() - _from_obj_alias = None - _has_mapper_entities = False - - _has_orm_entities = False - multi_row_eager_loaders = False - compound_eager_adapter = None - loaders_require_buffering = False - loaders_require_uniquing = False - - correlate = None - _where_criteria = () - _having_criteria = () - - orm_query = None - +class ORMCompileState(CompileState): class default_compile_options(CacheableOptions): _cache_key_traversal = [ + ("_orm_results", InternalTraversal.dp_boolean), ("_bake_ok", InternalTraversal.dp_boolean), ( "_with_polymorphic_adapt_map", @@ -153,136 +148,310 @@ class QueryCompileState(sql.base.CompileState): ("_for_refresh_state", InternalTraversal.dp_boolean), ] + _orm_results = True _bake_ok = True _with_polymorphic_adapt_map = () _current_path = _path_registry _enable_single_crit = True - _statement = None _enable_eagerloads = True _orm_only_from_obj_alias = True _only_load_props = None _set_base_alias = False _for_refresh_state = False + # non-cache-key elements mostly for legacy use + _statement = None + _orm_query = None + + @classmethod + def merge(cls, other): + return cls + other._state_dict() + + orm_query = None + current_path = _path_registry + def __init__(self, *arg, **kw): raise NotImplementedError() @classmethod - def _create_for_select(cls, statement, compiler, **kw): - if not statement._is_future: - return SelectState(statement, compiler, **kw) + def create_for_statement(cls, statement_container, compiler, **kw): + raise NotImplementedError() - self = cls.__new__(cls) + @classmethod + def _create_for_legacy_query(cls, query, for_statement=False): + stmt = query._statement_20(orm_results=not for_statement) - if not isinstance( - statement.compile_options, cls.default_compile_options - ): - statement.compile_options = cls.default_compile_options - orm_state = self._create_for_legacy_query_via_either(statement) - compile_state = SelectState(orm_state.statement, compiler, **kw) - compile_state._orm_state = orm_state - return compile_state + if query.compile_options._statement is not None: + compile_state_cls = ORMFromStatementCompileState + else: + compile_state_cls = ORMSelectCompileState + + # true in all cases except for two tests in test/orm/test_events.py + # assert stmt.compile_options._orm_query is query + return compile_state_cls._create_for_statement_or_query( + stmt, for_statement=for_statement + ) @classmethod - def _create_future_select_from_query(cls, query): - stmt = FutureSelect.__new__(FutureSelect) - - # the internal state of Query is now a mirror of that of - # Select which can be transferred directly. The Select - # supports compilation into its correct form taking all ORM - # features into account via the plugin and the compile options. - # however it does not export its columns or other attributes - # correctly if deprecated ORM features that adapt plain mapped - # elements are used; for this reason the Select() returned here - # can always support direct execution, but for composition in a larger - # select only works if it does not represent legacy ORM adaption - # features. - stmt.__dict__.update( - dict( - _raw_columns=query._raw_columns, - _compile_state_plugin="orm", # ;) - _where_criteria=query._where_criteria, - _from_obj=query._from_obj, - _legacy_setup_joins=query._legacy_setup_joins, - _order_by_clauses=query._order_by_clauses, - _group_by_clauses=query._group_by_clauses, - _having_criteria=query._having_criteria, - _distinct=query._distinct, - _distinct_on=query._distinct_on, - _with_options=query._with_options, - _with_context_options=query._with_context_options, - _hints=query._hints, - _statement_hints=query._statement_hints, - _correlate=query._correlate, - _auto_correlate=query._auto_correlate, - _limit_clause=query._limit_clause, - _offset_clause=query._offset_clause, - _for_update_arg=query._for_update_arg, - _prefixes=query._prefixes, - _suffixes=query._suffixes, - _label_style=query._label_style, - compile_options=query.compile_options, - # this will be moving but for now make it work like orm.Query - load_options=query.load_options, + def _create_for_statement_or_query( + cls, statement_container, for_statement=False, + ): + raise NotImplementedError() + + @classmethod + def orm_pre_session_exec( + cls, session, statement, execution_options, bind_arguments + ): + if execution_options: + # TODO: will have to provide public API to set some load + # options and also extract them from that API here, likely + # execution options + load_options = execution_options.get( + "_sa_orm_load_options", QueryContext.default_load_options ) + else: + load_options = QueryContext.default_load_options + + bind_arguments["clause"] = statement + + # new in 1.4 - the coercions system is leveraged to allow the + # "subject" mapper of a statement be propagated to the top + # as the statement is built. "subject" mapper is the generally + # standard object used as an identifier for multi-database schemes. + + if "plugin_subject" in statement._propagate_attrs: + bind_arguments["mapper"] = statement._propagate_attrs[ + "plugin_subject" + ].mapper + + if load_options._autoflush: + session._autoflush() + + @classmethod + def orm_setup_cursor_result(cls, session, bind_arguments, result): + execution_context = result.context + compile_state = execution_context.compiled.compile_state + + # cover edge case where ORM entities used in legacy select + # were passed to session.execute: + # session.execute(legacy_select([User.id, User.name])) + # see test_query->test_legacy_tuple_old_select + if not execution_context.compiled.statement._is_future: + return result + + execution_options = execution_context.execution_options + + # we are getting these right above in orm_pre_session_exec(), + # then getting them again right here. + load_options = execution_options.get( + "_sa_orm_load_options", QueryContext.default_load_options + ) + querycontext = QueryContext( + compile_state, + session, + load_options, + execution_options, + bind_arguments, ) + return loading.instances(result, querycontext) - return stmt + @property + def _mapper_entities(self): + return ( + ent for ent in self._entities if isinstance(ent, _MapperEntity) + ) + + def _create_with_polymorphic_adapter(self, ext_info, selectable): + if ( + not ext_info.is_aliased_class + and ext_info.mapper.persist_selectable + not in self._polymorphic_adapters + ): + self._mapper_loads_polymorphically_with( + ext_info.mapper, + sql_util.ColumnAdapter( + selectable, ext_info.mapper._equivalent_columns + ), + ) + + def _mapper_loads_polymorphically_with(self, mapper, adapter): + for m2 in mapper._with_polymorphic_mappers or [mapper]: + self._polymorphic_adapters[m2] = adapter + for m in m2.iterate_to_root(): + self._polymorphic_adapters[m.local_table] = adapter + + +@sql.base.CompileState.plugin_for("orm", "grouping") +class ORMFromStatementCompileState(ORMCompileState): + _aliased_generations = util.immutabledict() + _from_obj_alias = None + _has_mapper_entities = False + + _has_orm_entities = False + multi_row_eager_loaders = False + compound_eager_adapter = None + loaders_require_buffering = False + loaders_require_uniquing = False + + @classmethod + def create_for_statement(cls, statement_container, compiler, **kw): + compiler._rewrites_selected_columns = True + return cls._create_for_statement_or_query(statement_container) @classmethod - def _create_for_legacy_query( - cls, query, for_statement=False, entities_only=False + def _create_for_statement_or_query( + cls, statement_container, for_statement=False, ): - # as we are seeking to use Select() with ORM state as the - # primary executable element, have all Query objects that are not - # from_statement() convert to a Select() first, then run on that. + # from .query import FromStatement - if query.compile_options._statement is not None: - return cls._create_for_legacy_query_via_either( - query, - for_statement=for_statement, - entities_only=entities_only, - orm_query=query, - ) + # assert isinstance(statement_container, FromStatement) + + self = cls.__new__(cls) + self._primary_entity = None + + self.orm_query = statement_container.compile_options._orm_query + + self.statement_container = self.query = statement_container + self.requested_statement = statement_container.element + + self._entities = [] + self._with_polymorphic_adapt_map = {} + self._polymorphic_adapters = {} + self._no_yield_pers = set() + + _QueryEntity.to_compile_state(self, statement_container._raw_columns) + + self.compile_options = statement_container.compile_options + + self.current_path = statement_container.compile_options._current_path + + if statement_container._with_options: + self.attributes = {"_unbound_load_dedupes": set()} + + for opt in statement_container._with_options: + if opt._is_compile_state: + opt.process_compile_state(self) + else: + self.attributes = {} + + if statement_container._with_context_options: + for fn, key in statement_container._with_context_options: + fn(self) + + self.primary_columns = [] + self.secondary_columns = [] + self.eager_joins = {} + self.single_inh_entities = {} + self.create_eager_joins = [] + self._fallback_from_clauses = [] + self._setup_for_statement() + + return self + + def _setup_for_statement(self): + statement = self.requested_statement + if ( + isinstance(statement, expression.SelectBase) + and not statement._is_textual + and not statement.use_labels + ): + self.statement = statement.apply_labels() else: - assert query.compile_options._statement is None + self.statement = statement + self.order_by = None - stmt = cls._create_future_select_from_query(query) + if isinstance(self.statement, expression.TextClause): + # setup for all entities. Currently, this is not useful + # for eager loaders, as the eager loaders that work are able + # to do their work entirely in row_processor. + for entity in self._entities: + entity.setup_compile_state(self) - return cls._create_for_legacy_query_via_either( - stmt, - for_statement=for_statement, - entities_only=entities_only, - orm_query=query, + # we did the setup just to get primary columns. + self.statement = expression.TextualSelect( + self.statement, self.primary_columns, positional=False ) + else: + # allow TextualSelect with implicit columns as well + # as select() with ad-hoc columns, see test_query::TextTest + self._from_obj_alias = sql.util.ColumnAdapter( + self.statement, adapt_on_names=True + ) + # set up for eager loaders, however if we fix subqueryload + # it should not need to do this here. the model of eager loaders + # that can work entirely in row_processor might be interesting + # here though subqueryloader has a lot of upfront work to do + # see test/orm/test_query.py -> test_related_eagerload_against_text + # for where this part makes a difference. would rather have + # subqueryload figure out what it needs more intelligently. + # for entity in self._entities: + # entity.setup_compile_state(self) + + def _adapt_col_list(self, cols, current_adapter): + return cols + + def _get_current_adapter(self): + return None + + +@sql.base.CompileState.plugin_for("orm", "select") +class ORMSelectCompileState(ORMCompileState, SelectState): + _joinpath = _joinpoint = util.immutabledict() + _from_obj_alias = None + _has_mapper_entities = False + + _has_orm_entities = False + multi_row_eager_loaders = False + compound_eager_adapter = None + loaders_require_buffering = False + loaders_require_uniquing = False + + correlate = None + _where_criteria = () + _having_criteria = () + + orm_query = None + + @classmethod + def create_for_statement(cls, statement, compiler, **kw): + if not statement._is_future: + return SelectState(statement, compiler, **kw) + + compiler._rewrites_selected_columns = True + + orm_state = cls._create_for_statement_or_query( + statement, for_statement=True + ) + SelectState.__init__(orm_state, orm_state.statement, compiler, **kw) + return orm_state @classmethod - def _create_for_legacy_query_via_either( - cls, query, for_statement=False, entities_only=False, orm_query=None + def _create_for_statement_or_query( + cls, query, for_statement=False, _entities_only=False, ): + assert isinstance(query, future.Select) + + query.compile_options = cls.default_compile_options.merge( + query.compile_options + ) self = cls.__new__(cls) self._primary_entity = None - self.has_select = isinstance(query, Select) + self.orm_query = query.compile_options._orm_query - if orm_query: - self.orm_query = orm_query - self.query = query - self.has_orm_query = True - else: - self.query = query - if not self.has_select: - self.orm_query = query - self.has_orm_query = True - else: - self.orm_query = None - self.has_orm_query = False + self.query = query self.select_statement = select_statement = query + if not hasattr(select_statement.compile_options, "_orm_results"): + select_statement.compile_options = cls.default_compile_options + select_statement.compile_options += {"_orm_results": for_statement} + else: + for_statement = not select_statement.compile_options._orm_results + self.query = query self._entities = [] @@ -300,19 +469,28 @@ class QueryCompileState(sql.base.CompileState): _QueryEntity.to_compile_state(self, select_statement._raw_columns) - if entities_only: + if _entities_only: return self self.compile_options = query.compile_options + + # TODO: the name of this flag "for_statement" has to change, + # as it is difficult to distinguish from the "query._statement" use + # case which is something totally different self.for_statement = for_statement - if self.has_orm_query and not for_statement: - self.label_style = LABEL_STYLE_TABLENAME_PLUS_COL + # determine label style. we can make different decisions here. + # at the moment, trying to see if we can always use DISAMBIGUATE_ONLY + # rather than LABEL_STYLE_NONE, and if we can use disambiguate style + # for new style ORM selects too. + if self.select_statement._label_style is LABEL_STYLE_NONE: + if self.orm_query and not for_statement: + self.label_style = LABEL_STYLE_TABLENAME_PLUS_COL + else: + self.label_style = LABEL_STYLE_DISAMBIGUATE_ONLY else: self.label_style = self.select_statement._label_style - self.labels = self.label_style is LABEL_STYLE_TABLENAME_PLUS_COL - self.current_path = select_statement.compile_options._current_path self.eager_order_by = () @@ -321,7 +499,7 @@ class QueryCompileState(sql.base.CompileState): self.attributes = {"_unbound_load_dedupes": set()} for opt in self.select_statement._with_options: - if not opt._is_legacy_option: + if opt._is_compile_state: opt.process_compile_state(self) else: self.attributes = {} @@ -341,13 +519,50 @@ class QueryCompileState(sql.base.CompileState): info.selectable for info in select_statement._from_obj ] - if self.compile_options._statement is not None: - self._setup_for_statement() - else: - self._setup_for_generate() + self._setup_for_generate() return self + @classmethod + def _create_entities_collection(cls, query): + """Creates a partial ORMSelectCompileState that includes + the full collection of _MapperEntity and other _QueryEntity objects. + + Supports a few remaining use cases that are pre-compilation + but still need to gather some of the column / adaption information. + + """ + self = cls.__new__(cls) + + self._entities = [] + self._primary_entity = None + self._aliased_generations = {} + self._polymorphic_adapters = {} + + # legacy: only for query.with_polymorphic() + self._with_polymorphic_adapt_map = wpam = dict( + query.compile_options._with_polymorphic_adapt_map + ) + if wpam: + self._setup_with_polymorphics() + + _QueryEntity.to_compile_state(self, query._raw_columns) + return self + + @classmethod + def determine_last_joined_entity(cls, statement): + setup_joins = statement._setup_joins + + if not setup_joins: + return None + + (target, onclause, from_, flags) = setup_joins[-1] + + if isinstance(target, interfaces.PropComparator): + return target.entity + else: + return target + def _setup_with_polymorphics(self): # legacy: only for query.with_polymorphic() for ext_info, wp in self._with_polymorphic_adapt_map.items(): @@ -404,34 +619,6 @@ class QueryCompileState(sql.base.CompileState): return None - def _deep_entity_zero(self): - """Return a 'deep' entity; this is any entity we can find associated - with the first entity / column experssion. this is used only for - session.get_bind(). - - it is hoped this concept can be removed in an upcoming change - to the ORM execution model. - - """ - for ent in self.from_clauses: - if "parententity" in ent._annotations: - return ent._annotations["parententity"].mapper - for ent in self._entities: - ezero = ent._deep_entity_zero() - if ezero is not None: - return ezero.mapper - else: - return None - - @property - def _mapper_entities(self): - for ent in self._entities: - if isinstance(ent, _MapperEntity): - yield ent - - def _bind_mapper(self): - return self._deep_entity_zero() - def _only_full_mapper_zero(self, methname): if self._entities != [self._primary_entity]: raise sa_exc.InvalidRequestError( @@ -490,7 +677,7 @@ class QueryCompileState(sql.base.CompileState): else query._order_by_clauses ) - if query._having_criteria is not None: + if query._having_criteria: self._having_criteria = tuple( current_adapter(crit, True, True) if current_adapter else crit for crit in query._having_criteria @@ -527,7 +714,7 @@ class QueryCompileState(sql.base.CompileState): for s in query._correlate ) ) - elif self.has_select and not query._auto_correlate: + elif not query._auto_correlate: self.correlate = (None,) # PART II @@ -582,33 +769,6 @@ class QueryCompileState(sql.base.CompileState): {"deepentity": ezero} ) - def _setup_for_statement(self): - compile_options = self.compile_options - - if ( - isinstance(compile_options._statement, expression.SelectBase) - and not compile_options._statement._is_textual - and not compile_options._statement.use_labels - ): - self.statement = compile_options._statement.apply_labels() - else: - self.statement = compile_options._statement - self.order_by = None - - if isinstance(self.statement, expression.TextClause): - # setup for all entities, including contains_eager entities. - for entity in self._entities: - entity.setup_compile_state(self) - self.statement = expression.TextualSelect( - self.statement, self.primary_columns, positional=False - ) - else: - # allow TextualSelect with implicit columns as well - # as select() with ad-hoc columns, see test_query::TextTest - self._from_obj_alias = sql.util.ColumnAdapter( - self.statement, adapt_on_names=True - ) - def _compound_eager_statement(self): # for eager joins present and LIMIT/OFFSET/DISTINCT, # wrap the query inside a select, @@ -659,9 +819,10 @@ class QueryCompileState(sql.base.CompileState): self.compound_eager_adapter = sql_util.ColumnAdapter(inner, equivs) - statement = sql.select( - [inner] + self.secondary_columns, use_labels=self.labels + statement = future.select( + *([inner] + self.secondary_columns) # use_labels=self.labels ) + statement._label_style = self.label_style # Oracle however does not allow FOR UPDATE on the subquery, # and the Oracle dialect ignores it, plus for PostgreSQL, MySQL @@ -752,6 +913,7 @@ class QueryCompileState(sql.base.CompileState): group_by, ): + Select = future.Select statement = Select.__new__(Select) statement._raw_columns = raw_columns statement._from_obj = from_obj @@ -794,25 +956,6 @@ class QueryCompileState(sql.base.CompileState): return statement - def _create_with_polymorphic_adapter(self, ext_info, selectable): - if ( - not ext_info.is_aliased_class - and ext_info.mapper.persist_selectable - not in self._polymorphic_adapters - ): - self._mapper_loads_polymorphically_with( - ext_info.mapper, - sql_util.ColumnAdapter( - selectable, ext_info.mapper._equivalent_columns - ), - ) - - def _mapper_loads_polymorphically_with(self, mapper, adapter): - for m2 in mapper._with_polymorphic_mappers or [mapper]: - self._polymorphic_adapters[m2] = adapter - for m in m2.iterate_to_root(): - self._polymorphic_adapters[m.local_table] = adapter - def _adapt_polymorphic_element(self, element): if "parententity" in element._annotations: search = element._annotations["parententity"] @@ -924,6 +1067,8 @@ class QueryCompileState(sql.base.CompileState): # onclause = right right = None + elif "parententity" in right._annotations: + right = right._annotations["parententity"].entity if onclause is None: r_info = inspect(right) @@ -932,7 +1077,6 @@ class QueryCompileState(sql.base.CompileState): "Expected mapped entity or " "selectable/table as join target" ) - if isinstance(onclause, interfaces.PropComparator): of_type = getattr(onclause, "_of_type", None) else: @@ -1584,7 +1728,7 @@ class QueryCompileState(sql.base.CompileState): "aliased_generation": aliased_generation, } - return right, inspect(right), onclause + return inspect(right), right, onclause def _update_joinpoint(self, jp): self._joinpoint = jp @@ -1668,14 +1812,8 @@ class QueryCompileState(sql.base.CompileState): def _column_descriptions(query_or_select_stmt): - # TODO: this is a hack for now, as it is a little bit non-performant - # to build up QueryEntity for every entity right now. - ctx = QueryCompileState._create_for_legacy_query_via_either( - query_or_select_stmt, - entities_only=True, - orm_query=query_or_select_stmt - if not isinstance(query_or_select_stmt, Select) - else None, + ctx = ORMSelectCompileState._create_entities_collection( + query_or_select_stmt ) return [ { @@ -1731,23 +1869,6 @@ def _entity_from_pre_ent_zero(query_or_augmented_select): return ent -@sql.base.CompileState.plugin_for( - "orm", "select", "determine_last_joined_entity" -) -def _determine_last_joined_entity(statement): - setup_joins = statement._setup_joins - - if not setup_joins: - return None - - (target, onclause, from_, flags) = setup_joins[-1] - - if isinstance(target, interfaces.PropComparator): - return target.entity - else: - return target - - def _legacy_determine_last_joined_entity(setup_joins, entity_zero): """given the legacy_setup_joins collection at a point in time, figure out what the "filter by entity" would be in terms @@ -1929,9 +2050,6 @@ class _MapperEntity(_QueryEntity): def entity_zero_or_selectable(self): return self.entity_zero - def _deep_entity_zero(self): - return self.entity_zero - def corresponds_to(self, entity): return _entity_corresponds_to(self.entity_zero, entity) @@ -2093,14 +2211,6 @@ class _BundleEntity(_QueryEntity): else: return None - def _deep_entity_zero(self): - for ent in self._entities: - ezero = ent._deep_entity_zero() - if ezero is not None: - return ezero - else: - return None - def setup_compile_state(self, compile_state): for ent in self._entities: ent.setup_compile_state(compile_state) @@ -2175,17 +2285,6 @@ class _RawColumnEntity(_ColumnEntity): ) self._extra_entities = (self.expr, self.column) - def _deep_entity_zero(self): - for obj in visitors.iterate( - self.column, {"column_tables": True, "column_collections": False}, - ): - if "parententity" in obj._annotations: - return obj._annotations["parententity"] - elif "deepentity" in obj._annotations: - return obj._annotations["deepentity"] - else: - return None - def corresponds_to(self, entity): return False @@ -2276,9 +2375,6 @@ class _ORMColumnEntity(_ColumnEntity): ezero, ezero.selectable ) - def _deep_entity_zero(self): - return self.mapper - def corresponds_to(self, entity): if _is_aliased_class(entity): # TODO: polymorphic subclasses ? @@ -2342,8 +2438,3 @@ class _ORMColumnEntity(_ColumnEntity): compile_state.primary_columns.append(column) compile_state.attributes[("fetch_column", self)] = column - - -sql.base.CompileState.plugin_for("orm", "select")( - QueryCompileState._create_for_select -) diff --git a/lib/sqlalchemy/orm/events.py b/lib/sqlalchemy/orm/events.py index f5d191860..be7aa272e 100644 --- a/lib/sqlalchemy/orm/events.py +++ b/lib/sqlalchemy/orm/events.py @@ -1397,6 +1397,43 @@ class SessionEvents(event.Events): event_key.base_listen(**kw) + def do_orm_execute(self, orm_execute_state): + """Intercept statement executions that occur in terms of a :class:`.Session`. + + This event is invoked for all top-level SQL statements invoked + from the :meth:`_orm.Session.execute` method. As of SQLAlchemy 1.4, + all ORM queries emitted on behalf of a :class:`_orm.Session` will + flow through this method, so this event hook provides the single + point at which ORM queries of all types may be intercepted before + they are invoked, and additionally to replace their execution with + a different process. + + This event is a ``do_`` event, meaning it has the capability to replace + the operation that the :meth:`_orm.Session.execute` method normally + performs. The intended use for this includes sharding and + result-caching schemes which may seek to invoke the same statement + across multiple database connections, returning a result that is + merged from each of them, or which don't invoke the statement at all, + instead returning data from a cache. + + The hook intends to replace the use of the + ``Query._execute_and_instances`` method that could be subclassed prior + to SQLAlchemy 1.4. + + :param orm_execute_state: an instance of :class:`.ORMExecuteState` + which contains all information about the current execution, as well + as helper functions used to derive other commonly required + information. See that object for details. + + .. seealso:: + + :class:`.ORMExecuteState` + + + .. versionadded:: 1.4 + + """ + def after_transaction_create(self, session, transaction): """Execute when a new :class:`.SessionTransaction` is created. diff --git a/lib/sqlalchemy/orm/interfaces.py b/lib/sqlalchemy/orm/interfaces.py index 313f2fda8..6c0f5d3ef 100644 --- a/lib/sqlalchemy/orm/interfaces.py +++ b/lib/sqlalchemy/orm/interfaces.py @@ -64,6 +64,12 @@ __all__ = ( ) +class ORMStatementRole(roles.CoerceTextStatementRole): + _role_name = ( + "Executable SQL or text() construct, including ORM " "aware objects" + ) + + class ORMColumnsClauseRole(roles.ColumnsClauseRole): _role_name = "ORM mapped entity, aliased entity, or Column expression" @@ -662,8 +668,15 @@ class StrategizedProperty(MapperProperty): ) -class LoaderOption(HasCacheKey): - """Describe a modification to an ORM statement at compilation time. +class ORMOption(object): + """Base class for option objects that are passed to ORM queries. + + These options may be consumed by :meth:`.Query.options`, + :meth:`.Select.options`, or in a more general sense by any + :meth:`.Executable.options` method. They are interpreted at + statement compile time or execution time in modern use. The + deprecated :class:`.MapperOption` is consumed at ORM query construction + time. .. versionadded:: 1.4 @@ -680,6 +693,18 @@ class LoaderOption(HasCacheKey): """ + _is_compile_state = False + + +class LoaderOption(HasCacheKey, ORMOption): + """Describe a loader modification to an ORM statement at compilation time. + + .. versionadded:: 1.4 + + """ + + _is_compile_state = True + def process_compile_state(self, compile_state): """Apply a modification to a given :class:`.CompileState`.""" @@ -693,18 +718,39 @@ class LoaderOption(HasCacheKey): return False +class UserDefinedOption(ORMOption): + """Base class for a user-defined option that can be consumed from the + :meth:`.SessionEvents.do_orm_execute` event hook. + + """ + + _is_legacy_option = False + + propagate_to_loaders = False + """if True, indicate this option should be carried along + to "secondary" Query objects produced during lazy loads + or refresh operations. + + """ + + def __init__(self, payload=None): + self.payload = payload + + def _gen_cache_key(self, *arg, **kw): + return () + + @util.deprecated_cls( "1.4", "The :class:`.MapperOption class is deprecated and will be removed " - "in a future release. ORM options now run within the compilation " - "phase and are based on the :class:`.LoaderOption` class which is " - "intended for internal consumption only. For " - "modifications to queries on a per-execution basis, the " - ":meth:`.before_execute` hook will now intercept ORM :class:`.Query` " - "objects before they are invoked", + "in a future release. For " + "modifications to queries on a per-execution basis, use the " + ":class:`.UserDefinedOption` class to establish state within a " + ":class:`.Query` or other Core statement, then use the " + ":meth:`.SessionEvents.before_orm_execute` hook to consume them.", constructor=None, ) -class MapperOption(object): +class MapperOption(ORMOption): """Describe a modification to a Query""" _is_legacy_option = True @@ -735,23 +781,6 @@ class MapperOption(object): def _generate_path_cache_key(self, path): """Used by the "baked lazy loader" to see if this option can be cached. - The "baked lazy loader" refers to the :class:`_query.Query` that is - produced during a lazy load operation for a mapped relationship. - It does not yet apply to the "lazy" load operation for deferred - or expired column attributes, however this may change in the future. - - This loader generates SQL for a query only once and attempts to cache - it; from that point on, if the SQL has been cached it will no longer - run the :meth:`_query.Query.options` method of the - :class:`_query.Query`. The - :class:`.MapperOption` object that wishes to participate within a lazy - load operation therefore needs to tell the baked loader that it either - needs to forego this caching, or that it needs to include the state of - the :class:`.MapperOption` itself as part of its cache key, otherwise - SQL or other query state that has been affected by the - :class:`.MapperOption` may be cached in place of a query that does not - include these modifications, or the option may not be invoked at all. - By default, this method returns the value ``False``, which means the :class:`.BakedQuery` generated by the lazy loader will not cache the SQL when this :class:`.MapperOption` is present. @@ -760,26 +789,10 @@ class MapperOption(object): an unlimited number of :class:`_query.Query` objects for an unlimited number of :class:`.MapperOption` objects. - .. versionchanged:: 1.2.8 the default return value of - :meth:`.MapperOption._generate_cache_key` is False; previously it - was ``None`` indicating "safe to cache, don't include as part of - the cache key" - - To enable caching of :class:`_query.Query` objects within lazy loaders - , a - given :class:`.MapperOption` that returns a cache key must return a key - that uniquely identifies the complete state of this option, which will - match any other :class:`.MapperOption` that itself retains the - identical state. This includes path options, flags, etc. It should - be a state that is repeatable and part of a limited set of possible - options. - - If the :class:`.MapperOption` does not apply to the given path and - would not affect query results on such a path, it should return None, - indicating the :class:`_query.Query` is safe to cache for this given - loader path and that this :class:`.MapperOption` need not be - part of the cache key. - + For caching support it is recommended to use the + :class:`.UserDefinedOption` class in conjunction with + the :meth:`.Session.do_orm_execute` method so that statements may + be modified before they are cached. """ return False diff --git a/lib/sqlalchemy/orm/loading.py b/lib/sqlalchemy/orm/loading.py index 48641685e..616e757a3 100644 --- a/lib/sqlalchemy/orm/loading.py +++ b/lib/sqlalchemy/orm/loading.py @@ -26,6 +26,7 @@ from .base import _SET_DEFERRED_EXPIRED from .util import _none_set from .util import state_str from .. import exc as sa_exc +from .. import future from .. import util from ..engine import result_tuple from ..engine.result import ChunkedIteratorResult @@ -36,8 +37,20 @@ from ..sql import util as sql_util _new_runid = util.counter() -def instances(query, cursor, context): - """Return an ORM result as an iterator.""" +def instances(cursor, context): + """Return a :class:`.Result` given an ORM query context. + + :param cursor: a :class:`.CursorResult`, generated by a statement + which came from :class:`.ORMCompileState` + + :param context: a :class:`.QueryContext` object + + :return: a :class:`.Result` object representing ORM results + + .. versionchanged:: 1.4 The instances() function now uses + :class:`.Result` objects and has an all new interface. + + """ context.runid = _new_runid() context.post_load_paths = {} @@ -80,7 +93,7 @@ def instances(query, cursor, context): ], ) - def chunks(size, as_tuples): + def chunks(size): while True: yield_per = size @@ -94,7 +107,7 @@ def instances(query, cursor, context): else: fetch = cursor.fetchall() - if not as_tuples: + if single_entity: proc = process[0] rows = [proc(row) for row in fetch] else: @@ -111,20 +124,62 @@ def instances(query, cursor, context): break result = ChunkedIteratorResult( - row_metadata, chunks, source_supports_scalars=single_entity + row_metadata, chunks, source_supports_scalars=single_entity, raw=cursor + ) + + result._attributes = result._attributes.union( + dict(filtered=filtered, is_single_entity=single_entity) ) + if context.yield_per: result.yield_per(context.yield_per) - if single_entity: - result = result.scalars() + return result - filtered = context.compile_state._has_mapper_entities - if filtered: - result = result.unique() +@util.preload_module("sqlalchemy.orm.context") +def merge_frozen_result(session, statement, frozen_result, load=True): + querycontext = util.preloaded.orm_context - return result + if load: + # flush current contents if we expect to load data + session._autoflush() + + ctx = querycontext.ORMSelectCompileState._create_entities_collection( + statement + ) + + autoflush = session.autoflush + try: + session.autoflush = False + mapped_entities = [ + i + for i, e in enumerate(ctx._entities) + if isinstance(e, querycontext._MapperEntity) + ] + keys = [ent._label_name for ent in ctx._entities] + + keyed_tuple = result_tuple( + keys, [ent._extra_entities for ent in ctx._entities] + ) + + result = [] + for newrow in frozen_result.rewrite_rows(): + for i in mapped_entities: + if newrow[i] is not None: + newrow[i] = session._merge( + attributes.instance_state(newrow[i]), + attributes.instance_dict(newrow[i]), + load=load, + _recursive={}, + _resolve_conflict_map={}, + ) + + result.append(keyed_tuple(newrow)) + + return frozen_result.with_new_rows(result) + finally: + session.autoflush = autoflush @util.preload_module("sqlalchemy.orm.context") @@ -145,9 +200,7 @@ def merge_result(query, iterator, load=True): else: frozen_result = None - ctx = querycontext.QueryCompileState._create_for_legacy_query( - query, entities_only=True - ) + ctx = querycontext.ORMSelectCompileState._create_entities_collection(query) autoflush = session.autoflush try: @@ -235,12 +288,15 @@ def get_from_identity(session, mapper, key, passive): def load_on_ident( - query, + session, + statement, key, + load_options=None, refresh_state=None, with_for_update=None, only_load_props=None, no_autoflush=False, + bind_arguments=util.immutabledict(), ): """Load the given identity key from the database.""" if key is not None: @@ -249,38 +305,59 @@ def load_on_ident( else: ident = identity_token = None - if no_autoflush: - query = query.autoflush(False) - return load_on_pk_identity( - query, + session, + statement, ident, + load_options=load_options, refresh_state=refresh_state, with_for_update=with_for_update, only_load_props=only_load_props, identity_token=identity_token, + no_autoflush=no_autoflush, + bind_arguments=bind_arguments, ) def load_on_pk_identity( - query, + session, + statement, primary_key_identity, + load_options=None, refresh_state=None, with_for_update=None, only_load_props=None, identity_token=None, + no_autoflush=False, + bind_arguments=util.immutabledict(), ): """Load the given primary key identity from the database.""" + query = statement + q = query._clone() + + # TODO: fix these imports .... + from .context import QueryContext, ORMCompileState + + if load_options is None: + load_options = QueryContext.default_load_options + + compile_options = ORMCompileState.default_compile_options.merge( + q.compile_options + ) + + # checking that query doesnt have criteria on it + # just delete it here w/ optional assertion? since we are setting a + # where clause also if refresh_state is None: - q = query._clone() - q._get_condition() - else: - q = query._clone() + _no_criterion_assertion(q, "get", order_by=False, distinct=False) if primary_key_identity is not None: - mapper = query._only_full_mapper_zero("load_on_pk_identity") + # mapper = query._only_full_mapper_zero("load_on_pk_identity") + + # TODO: error checking? + mapper = query._raw_columns[0]._annotations["parententity"] (_get_clause, _get_params) = mapper._get_clause @@ -320,9 +397,8 @@ def load_on_pk_identity( ] ) - q.load_options += {"_params": params} + load_options += {"_params": params} - # with_for_update needs to be query.LockmodeArg() if with_for_update is not None: version_check = True q._for_update_arg = with_for_update @@ -333,11 +409,15 @@ def load_on_pk_identity( version_check = False if refresh_state and refresh_state.load_options: - # if refresh_state.load_path.parent: - q = q._with_current_path(refresh_state.load_path.parent) - q = q.options(refresh_state.load_options) + compile_options += {"_current_path": refresh_state.load_path.parent} + q = q.options(*refresh_state.load_options) - q._get_options( + # TODO: most of the compile_options that are not legacy only involve this + # function, so try to see if handling of them can mostly be local to here + + q.compile_options, load_options = _set_get_options( + compile_options, + load_options, populate_existing=bool(refresh_state), version_check=version_check, only_load_props=only_load_props, @@ -346,12 +426,76 @@ def load_on_pk_identity( ) q._order_by = None + if no_autoflush: + load_options += {"_autoflush": False} + + result = ( + session.execute( + q, + params=load_options._params, + execution_options={"_sa_orm_load_options": load_options}, + bind_arguments=bind_arguments, + ) + .unique() + .scalars() + ) + try: - return q.one() + return result.one() except orm_exc.NoResultFound: return None +def _no_criterion_assertion(stmt, meth, order_by=True, distinct=True): + if ( + stmt._where_criteria + or stmt.compile_options._statement is not None + or stmt._from_obj + or stmt._legacy_setup_joins + or stmt._limit_clause is not None + or stmt._offset_clause is not None + or stmt._group_by_clauses + or (order_by and stmt._order_by_clauses) + or (distinct and stmt._distinct) + ): + raise sa_exc.InvalidRequestError( + "Query.%s() being called on a " + "Query with existing criterion. " % meth + ) + + +def _set_get_options( + compile_opt, + load_opt, + populate_existing=None, + version_check=None, + only_load_props=None, + refresh_state=None, + identity_token=None, +): + + compile_options = {} + load_options = {} + if version_check: + load_options["_version_check"] = version_check + if populate_existing: + load_options["_populate_existing"] = populate_existing + if refresh_state: + load_options["_refresh_state"] = refresh_state + compile_options["_for_refresh_state"] = True + if only_load_props: + compile_options["_only_load_props"] = frozenset(only_load_props) + if identity_token: + load_options["_refresh_identity_token"] = identity_token + + if load_options: + load_opt += load_options + if compile_options: + compile_opt += compile_options + + return compile_opt, load_opt + + def _setup_entity_query( compile_state, mapper, @@ -487,7 +631,7 @@ def _instance_processor( context, path, mapper, result, adapter, populators ) - propagate_options = context.propagate_options + propagated_loader_options = context.propagated_loader_options load_path = ( context.compile_state.current_path + path if context.compile_state.current_path.path @@ -639,8 +783,8 @@ def _instance_processor( # be conservative about setting load_path when populate_existing # is in effect; want to maintain options from the original # load. see test_expire->test_refresh_maintains_deferred_options - if isnew and (propagate_options or not populate_existing): - state.load_options = propagate_options + if isnew and (propagated_loader_options or not populate_existing): + state.load_options = propagated_loader_options state.load_path = load_path _populate_full( @@ -1055,7 +1199,7 @@ def load_scalar_attributes(mapper, state, attribute_names, passive): result = False - no_autoflush = passive & attributes.NO_AUTOFLUSH + no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) # in the case of inheritance, particularly concrete and abstract # concrete inheritance, the class manager might have some keys @@ -1080,10 +1224,16 @@ def load_scalar_attributes(mapper, state, attribute_names, passive): # note: using from_statement() here means there is an adaption # with adapt_on_names set up. the other option is to make the # aliased() against a subquery which affects the SQL. + + from .query import FromStatement + + stmt = FromStatement(mapper, statement).options( + strategy_options.Load(mapper).undefer("*") + ) + result = load_on_ident( - session.query(mapper) - .options(strategy_options.Load(mapper).undefer("*")) - .from_statement(statement), + session, + stmt, None, only_load_props=attribute_names, refresh_state=state, @@ -1121,7 +1271,8 @@ def load_scalar_attributes(mapper, state, attribute_names, passive): return result = load_on_ident( - session.query(mapper), + session, + future.select(mapper).apply_labels(), identity_key, refresh_state=state, only_load_props=attribute_names, diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py index a6fb1039f..7bfe70c36 100644 --- a/lib/sqlalchemy/orm/mapper.py +++ b/lib/sqlalchemy/orm/mapper.py @@ -2237,6 +2237,8 @@ class Mapper( "parentmapper": self, "compile_state_plugin": "orm", } + )._set_propagate_attrs( + {"compile_state_plugin": "orm", "plugin_subject": self} ) @property diff --git a/lib/sqlalchemy/orm/path_registry.py b/lib/sqlalchemy/orm/path_registry.py index 1698a5181..2e5941713 100644 --- a/lib/sqlalchemy/orm/path_registry.py +++ b/lib/sqlalchemy/orm/path_registry.py @@ -228,10 +228,29 @@ class RootRegistry(PathRegistry): PathRegistry.root = RootRegistry() +class PathToken(HasCacheKey, str): + """cacheable string token""" + + _intern = {} + + def _gen_cache_key(self, anon_map, bindparams): + return (str(self),) + + @classmethod + def intern(cls, strvalue): + if strvalue in cls._intern: + return cls._intern[strvalue] + else: + cls._intern[strvalue] = result = PathToken(strvalue) + return result + + class TokenRegistry(PathRegistry): __slots__ = ("token", "parent", "path", "natural_path") def __init__(self, parent, token): + token = PathToken.intern(token) + self.token = token self.parent = parent self.path = parent.path + (token,) diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py index d14f6c27b..163ebf22a 100644 --- a/lib/sqlalchemy/orm/persistence.py +++ b/lib/sqlalchemy/orm/persistence.py @@ -25,6 +25,7 @@ from . import loading from . import sync from .base import state_str from .. import exc as sa_exc +from .. import future from .. import sql from .. import util from ..sql import coercions @@ -1424,8 +1425,10 @@ def _finalize_insert_update_commands(base_mapper, uowtransaction, states): if toload_now: state.key = base_mapper._identity_key_from_state(state) + stmt = future.select(mapper).apply_labels() loading.load_on_ident( - uowtransaction.session.query(mapper), + uowtransaction.session, + stmt, state.key, refresh_state=state, only_load_props=toload_now, @@ -1723,7 +1726,7 @@ class BulkUD(object): self.context ) = compile_state = query._compile_state() - self.mapper = compile_state._bind_mapper() + self.mapper = compile_state._entity_zero() if isinstance( compile_state._entities[0], query_context._RawColumnEntity, diff --git a/lib/sqlalchemy/orm/properties.py b/lib/sqlalchemy/orm/properties.py index 027786c19..4cf501e3f 100644 --- a/lib/sqlalchemy/orm/properties.py +++ b/lib/sqlalchemy/orm/properties.py @@ -346,14 +346,20 @@ class ColumnProperty(StrategizedProperty): pe = self._parententity # no adapter, so we aren't aliased # assert self._parententity is self._parentmapper - return self.prop.columns[0]._annotate( - { - "entity_namespace": pe, - "parententity": pe, - "parentmapper": pe, - "orm_key": self.prop.key, - "compile_state_plugin": "orm", - } + return ( + self.prop.columns[0] + ._annotate( + { + "entity_namespace": pe, + "parententity": pe, + "parentmapper": pe, + "orm_key": self.prop.key, + "compile_state_plugin": "orm", + } + ) + ._set_propagate_attrs( + {"compile_state_plugin": "orm", "plugin_subject": pe} + ) ) def _memoized_attr_info(self): @@ -388,6 +394,11 @@ class ColumnProperty(StrategizedProperty): "orm_key": self.prop.key, "compile_state_plugin": "orm", } + )._set_propagate_attrs( + { + "compile_state_plugin": "orm", + "plugin_subject": self._parententity, + } ) for col in self.prop.columns ] diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 8a861c3dc..25d6f4736 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -18,6 +18,7 @@ ORM session, whereas the ``Select`` construct interacts directly with the database to return iterable result sets. """ +import itertools from . import attributes from . import exc as orm_exc @@ -28,7 +29,8 @@ from .base import _assertions from .context import _column_descriptions from .context import _legacy_determine_last_joined_entity from .context import _legacy_filter_by_entity_zero -from .context import QueryCompileState +from .context import ORMCompileState +from .context import ORMFromStatementCompileState from .context import QueryContext from .interfaces import ORMColumnsClauseRole from .util import aliased @@ -42,18 +44,22 @@ from .. import inspection from .. import log from .. import sql from .. import util +from ..future.selectable import Select as FutureSelect from ..sql import coercions from ..sql import expression from ..sql import roles from ..sql import util as sql_util +from ..sql.annotation import SupportsCloneAnnotations from ..sql.base import _generative from ..sql.base import Executable +from ..sql.selectable import _SelectFromElements from ..sql.selectable import ForUpdateArg from ..sql.selectable import HasHints from ..sql.selectable import HasPrefixes from ..sql.selectable import HasSuffixes from ..sql.selectable import LABEL_STYLE_NONE from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL +from ..sql.selectable import SelectStatementGrouping from ..sql.util import _entity_namespace_key from ..util import collections_abc @@ -62,7 +68,15 @@ __all__ = ["Query", "QueryContext", "aliased"] @inspection._self_inspects @log.class_logger -class Query(HasPrefixes, HasSuffixes, HasHints, Executable): +class Query( + _SelectFromElements, + SupportsCloneAnnotations, + HasPrefixes, + HasSuffixes, + HasHints, + Executable, +): + """ORM-level SQL construction object. :class:`_query.Query` @@ -105,7 +119,7 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): _legacy_setup_joins = () _label_style = LABEL_STYLE_NONE - compile_options = QueryCompileState.default_compile_options + compile_options = ORMCompileState.default_compile_options load_options = QueryContext.default_load_options @@ -115,6 +129,11 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): _enable_assertions = True _last_joined_entity = None + # mirrors that of ClauseElement, used to propagate the "orm" + # plugin as well as the "subject" of the plugin, e.g. the mapper + # we are querying against. + _propagate_attrs = util.immutabledict() + def __init__(self, entities, session=None): """Construct a :class:`_query.Query` directly. @@ -148,7 +167,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): def _set_entities(self, entities): self._raw_columns = [ - coercions.expect(roles.ColumnsClauseRole, ent) + coercions.expect( + roles.ColumnsClauseRole, ent, apply_propagate_attrs=self + ) for ent in util.to_list(entities) ] @@ -183,7 +204,10 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): def _set_select_from(self, obj, set_base_alias): fa = [ coercions.expect( - roles.StrictFromClauseRole, elem, allow_select=True + roles.StrictFromClauseRole, + elem, + allow_select=True, + apply_propagate_attrs=self, ) for elem in obj ] @@ -332,15 +356,13 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): if ( not self.compile_options._set_base_alias and not self.compile_options._with_polymorphic_adapt_map - and self.compile_options._statement is None + # and self.compile_options._statement is None ): # if we don't have legacy top level aliasing features in use # then convert to a future select() directly stmt = self._statement_20() else: - stmt = QueryCompileState._create_for_legacy_query( - self, for_statement=True - ).statement + stmt = self._compile_state(for_statement=True).statement if self.load_options._params: # this is the search and replace thing. this is kind of nuts @@ -349,8 +371,67 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): return stmt - def _statement_20(self): - return QueryCompileState._create_future_select_from_query(self) + def _statement_20(self, orm_results=False): + # TODO: this event needs to be deprecated, as it currently applies + # only to ORM query and occurs at this spot that is now more + # or less an artificial spot + if self.dispatch.before_compile: + for fn in self.dispatch.before_compile: + new_query = fn(self) + if new_query is not None and new_query is not self: + self = new_query + if not fn._bake_ok: + self.compile_options += {"_bake_ok": False} + + if self.compile_options._statement is not None: + stmt = FromStatement( + self._raw_columns, self.compile_options._statement + ) + # TODO: once SubqueryLoader uses select(), we can remove + # "_orm_query" from this structure + stmt.__dict__.update( + _with_options=self._with_options, + _with_context_options=self._with_context_options, + compile_options=self.compile_options + + {"_orm_query": self.with_session(None)}, + _execution_options=self._execution_options, + ) + stmt._propagate_attrs = self._propagate_attrs + else: + stmt = FutureSelect.__new__(FutureSelect) + + stmt.__dict__.update( + _raw_columns=self._raw_columns, + _where_criteria=self._where_criteria, + _from_obj=self._from_obj, + _legacy_setup_joins=self._legacy_setup_joins, + _order_by_clauses=self._order_by_clauses, + _group_by_clauses=self._group_by_clauses, + _having_criteria=self._having_criteria, + _distinct=self._distinct, + _distinct_on=self._distinct_on, + _with_options=self._with_options, + _with_context_options=self._with_context_options, + _hints=self._hints, + _statement_hints=self._statement_hints, + _correlate=self._correlate, + _auto_correlate=self._auto_correlate, + _limit_clause=self._limit_clause, + _offset_clause=self._offset_clause, + _for_update_arg=self._for_update_arg, + _prefixes=self._prefixes, + _suffixes=self._suffixes, + _label_style=self._label_style, + compile_options=self.compile_options + + {"_orm_query": self.with_session(None)}, + _execution_options=self._execution_options, + ) + + if not orm_results: + stmt.compile_options += {"_orm_results": False} + + stmt._propagate_attrs = self._propagate_attrs + return stmt def subquery(self, name=None, with_labels=False, reduce_columns=False): """return the full SELECT statement represented by @@ -879,7 +960,17 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): elif instance is attributes.PASSIVE_CLASS_MISMATCH: return None - return db_load_fn(self, primary_key_identity) + # apply_labels() not strictly necessary, however this will ensure that + # tablename_colname style is used which at the moment is asserted + # in a lot of unit tests :) + + statement = self._statement_20(orm_results=True).apply_labels() + return db_load_fn( + self.session, + statement, + primary_key_identity, + load_options=self.load_options, + ) @property def lazy_loaded_from(self): @@ -1059,7 +1150,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): self._raw_columns = list(self._raw_columns) self._raw_columns.append( - coercions.expect(roles.ColumnsClauseRole, entity) + coercions.expect( + roles.ColumnsClauseRole, entity, apply_propagate_attrs=self + ) ) @_generative @@ -1397,7 +1490,10 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): self._raw_columns = list(self._raw_columns) self._raw_columns.extend( - coercions.expect(roles.ColumnsClauseRole, c) for c in column + coercions.expect( + roles.ColumnsClauseRole, c, apply_propagate_attrs=self + ) + for c in column ) @util.deprecated( @@ -1584,7 +1680,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): """ for criterion in list(criterion): - criterion = coercions.expect(roles.WhereHavingRole, criterion) + criterion = coercions.expect( + roles.WhereHavingRole, criterion, apply_propagate_attrs=self + ) # legacy vvvvvvvvvvvvvvvvvvvvvvvvvvv if self._aliased_generation: @@ -1742,7 +1840,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): """ self._having_criteria += ( - coercions.expect(roles.WhereHavingRole, criterion), + coercions.expect( + roles.WhereHavingRole, criterion, apply_propagate_attrs=self + ), ) def _set_op(self, expr_fn, *q): @@ -2177,7 +2277,12 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): self._legacy_setup_joins += tuple( ( - coercions.expect(roles.JoinTargetRole, prop[0], legacy=True), + coercions.expect( + roles.JoinTargetRole, + prop[0], + legacy=True, + apply_propagate_attrs=self, + ), prop[1] if len(prop) == 2 else None, None, { @@ -2605,7 +2710,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): ORM tutorial """ - statement = coercions.expect(roles.SelectStatementRole, statement) + statement = coercions.expect( + roles.SelectStatementRole, statement, apply_propagate_attrs=self + ) self.compile_options += {"_statement": statement} def first(self): @@ -2711,76 +2818,50 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): def __iter__(self): return self._iter().__iter__() - # TODO: having _iter(), _execute_and_instances, _connection_from_session, - # etc., is all too much. + def _iter(self): + # new style execution. + params = self.load_options._params + statement = self._statement_20(orm_results=True) + result = self.session.execute( + statement, + params, + execution_options={"_sa_orm_load_options": self.load_options}, + ) - # new recipes / extensions should be based on an event hook of some kind, - # can allow an execution that would return a Result to take in all the - # information and return a different Result. this has to be at - # the session / connection .execute() level, and can perhaps be - # before_execute() but needs to be focused around rewriting of results. + # legacy: automatically set scalars, unique + if result._attributes.get("is_single_entity", False): + result = result.scalars() - # the dialect do_execute() *may* be this but that seems a bit too low - # level. it may need to be ORM session based and be a session event, - # becasue it might not invoke the cursor, might invoke for multiple - # connections, etc. OK really has to be a session level event in this - # case to support horizontal sharding. + if result._attributes.get("filtered", False): + result = result.unique() - def _iter(self): - context = self._compile_context() + return result + + def _execute_crud(self, stmt, mapper): + conn = self.session.connection( + mapper=mapper, clause=stmt, close_with_result=True + ) - if self.load_options._autoflush: - self.session._autoflush() - return self._execute_and_instances(context) + return conn._execute_20( + stmt, self.load_options._params, self._execution_options + ) def __str__(self): - compile_state = self._compile_state() + statement = self._statement_20(orm_results=True) + try: bind = ( - self._get_bind_args(compile_state, self.session.get_bind) + self._get_bind_args(statement, self.session.get_bind) if self.session else None ) except sa_exc.UnboundExecutionError: bind = None - return str(compile_state.statement.compile(bind)) - - def _connection_from_session(self, **kw): - conn = self.session.connection(**kw) - if self._execution_options: - conn = conn.execution_options(**self._execution_options) - return conn - - def _execute_and_instances(self, querycontext, params=None): - conn = self._get_bind_args( - querycontext.compile_state, - self._connection_from_session, - close_with_result=True, - ) - if params is None: - params = querycontext.load_options._params + return str(statement.compile(bind)) - result = conn._execute_20( - querycontext.compile_state.statement, - params, - # execution_options=self.session._orm_execution_options(), - ) - return loading.instances(querycontext.query, result, querycontext) - - def _execute_crud(self, stmt, mapper): - conn = self._connection_from_session( - mapper=mapper, clause=stmt, close_with_result=True - ) - - return conn.execute(stmt, self.load_options._params) - - def _get_bind_args(self, compile_state, fn, **kw): - return fn( - mapper=compile_state._bind_mapper(), - clause=compile_state.statement, - **kw - ) + def _get_bind_args(self, statement, fn, **kw): + return fn(clause=statement, **kw) @property def column_descriptions(self): @@ -2837,10 +2918,21 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): "for linking ORM results to arbitrary select constructs.", version="1.4", ) - compile_state = QueryCompileState._create_for_legacy_query(self) - context = QueryContext(compile_state, self.session) + compile_state = ORMCompileState._create_for_legacy_query(self) + context = QueryContext( + compile_state, self.session, self.load_options + ) + + result = loading.instances(result_proxy, context) + + # legacy: automatically set scalars, unique + if result._attributes.get("is_single_entity", False): + result = result.scalars() + + if result._attributes.get("filtered", False): + result = result.unique() - return loading.instances(self, result_proxy, context) + return result def merge_result(self, iterator, load=True): """Merge a result into this :class:`_query.Query` object's Session. @@ -3239,36 +3331,62 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable): return update_op.rowcount def _compile_state(self, for_statement=False, **kw): - # TODO: this needs to become a general event for all - # Executable objects as well (all ClauseElement?) - # but then how do we clarify that this event is only for - # *top level* compile, not as an embedded element is visted? - # how does that even work because right now a Query that does things - # like from_self() will in fact invoke before_compile for each - # inner element. - # OK perhaps with 2.0 style folks will continue using before_execute() - # as they can now, as a select() with ORM elements will be delivered - # there, OK. sort of fixes the "bake_ok" problem too. - if self.dispatch.before_compile: - for fn in self.dispatch.before_compile: - new_query = fn(self) - if new_query is not None and new_query is not self: - self = new_query - if not fn._bake_ok: - self.compile_options += {"_bake_ok": False} - - compile_state = QueryCompileState._create_for_legacy_query( + return ORMCompileState._create_for_legacy_query( self, for_statement=for_statement, **kw ) - return compile_state def _compile_context(self, for_statement=False): compile_state = self._compile_state(for_statement=for_statement) - context = QueryContext(compile_state, self.session) + context = QueryContext(compile_state, self.session, self.load_options) return context +class FromStatement(SelectStatementGrouping, Executable): + """Core construct that represents a load of ORM objects from a finished + select or text construct. + + """ + + compile_options = ORMFromStatementCompileState.default_compile_options + + _compile_state_factory = ORMFromStatementCompileState.create_for_statement + + _is_future = True + + _for_update_arg = None + + def __init__(self, entities, element): + self._raw_columns = [ + coercions.expect( + roles.ColumnsClauseRole, ent, apply_propagate_attrs=self + ) + for ent in util.to_list(entities) + ] + super(FromStatement, self).__init__(element) + + def _compiler_dispatch(self, compiler, **kw): + compile_state = self._compile_state_factory(self, self, **kw) + + toplevel = not compiler.stack + + if toplevel: + compiler.compile_state = compile_state + + return compiler.process(compile_state.statement, **kw) + + def _ensure_disambiguated_names(self): + return self + + def get_children(self, **kw): + for elem in itertools.chain.from_iterable( + element._from_objects for element in self._raw_columns + ): + yield elem + for elem in super(FromStatement, self).get_children(**kw): + yield elem + + class AliasOption(interfaces.LoaderOption): @util.deprecated( "1.4", diff --git a/lib/sqlalchemy/orm/relationships.py b/lib/sqlalchemy/orm/relationships.py index f539e968f..e82cd174f 100644 --- a/lib/sqlalchemy/orm/relationships.py +++ b/lib/sqlalchemy/orm/relationships.py @@ -2737,12 +2737,12 @@ class JoinCondition(object): def replace(element): if "remote" in element._annotations: - v = element._annotations.copy() + v = dict(element._annotations) del v["remote"] v["local"] = True return element._with_annotations(v) elif "local" in element._annotations: - v = element._annotations.copy() + v = dict(element._annotations) del v["local"] v["remote"] = True return element._with_annotations(v) diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index 6cb8a0062..8d2f13df3 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -12,6 +12,7 @@ import sys import weakref from . import attributes +from . import context from . import exc from . import identity from . import loading @@ -28,13 +29,12 @@ from .base import state_str from .unitofwork import UOWTransaction from .. import engine from .. import exc as sa_exc -from .. import sql +from .. import future from .. import util from ..inspection import inspect from ..sql import coercions from ..sql import roles -from ..sql import util as sql_util - +from ..sql import visitors __all__ = ["Session", "SessionTransaction", "sessionmaker"] @@ -98,6 +98,160 @@ DEACTIVE = util.symbol("DEACTIVE") CLOSED = util.symbol("CLOSED") +class ORMExecuteState(object): + """Stateful object used for the :meth:`.SessionEvents.do_orm_execute` + + .. versionadded:: 1.4 + + """ + + __slots__ = ( + "session", + "statement", + "parameters", + "execution_options", + "bind_arguments", + ) + + def __init__( + self, session, statement, parameters, execution_options, bind_arguments + ): + self.session = session + self.statement = statement + self.parameters = parameters + self.execution_options = execution_options + self.bind_arguments = bind_arguments + + def invoke_statement( + self, + statement=None, + params=None, + execution_options=None, + bind_arguments=None, + ): + """Execute the statement represented by this + :class:`.ORMExecuteState`, without re-invoking events. + + This method essentially performs a re-entrant execution of the + current statement for which the :meth:`.SessionEvents.do_orm_execute` + event is being currently invoked. The use case for this is + for event handlers that want to override how the ultimate results + object is returned, such as for schemes that retrieve results from + an offline cache or which concatenate results from multiple executions. + + :param statement: optional statement to be invoked, in place of the + statement currently represented by :attr:`.ORMExecuteState.statement`. + + :param params: optional dictionary of parameters which will be merged + into the existing :attr:`.ORMExecuteState.parameters` of this + :class:`.ORMExecuteState`. + + :param execution_options: optional dictionary of execution options + will be merged into the existing + :attr:`.ORMExecuteState.execution_options` of this + :class:`.ORMExecuteState`. + + :param bind_arguments: optional dictionary of bind_arguments + which will be merged amongst the current + :attr:`.ORMExecuteState.bind_arguments` + of this :class:`.ORMExecuteState`. + + :return: a :class:`_engine.Result` object with ORM-level results. + + .. seealso:: + + :ref:`examples_caching` - includes example use of the + :meth:`.SessionEvents.do_orm_execute` hook as well as the + :meth:`.ORMExecuteState.invoke_query` method. + + + """ + + if statement is None: + statement = self.statement + + _bind_arguments = dict(self.bind_arguments) + if bind_arguments: + _bind_arguments.update(bind_arguments) + _bind_arguments["_sa_skip_events"] = True + + if params: + _params = dict(self.parameters) + _params.update(params) + else: + _params = self.parameters + + if execution_options: + _execution_options = dict(self.execution_options) + _execution_options.update(execution_options) + else: + _execution_options = self.execution_options + + return self.session.execute( + statement, _params, _execution_options, _bind_arguments + ) + + @property + def orm_query(self): + """Return the :class:`_orm.Query` object associated with this + execution. + + For SQLAlchemy-2.0 style usage, the :class:`_orm.Query` object + is not used at all, and this attribute will return None. + + """ + load_opts = self.load_options + if load_opts._orm_query: + return load_opts._orm_query + + opts = self._orm_compile_options() + if opts is not None: + return opts._orm_query + else: + return None + + def _orm_compile_options(self): + opts = self.statement.compile_options + if isinstance(opts, context.ORMCompileState.default_compile_options): + return opts + else: + return None + + @property + def loader_strategy_path(self): + """Return the :class:`.PathRegistry` for the current load path. + + This object represents the "path" in a query along relationships + when a particular object or collection is being loaded. + + """ + opts = self._orm_compile_options() + if opts is not None: + return opts._current_path + else: + return None + + @property + def load_options(self): + """Return the load_options that will be used for this execution.""" + + return self.execution_options.get( + "_sa_orm_load_options", context.QueryContext.default_load_options + ) + + @property + def user_defined_options(self): + """The sequence of :class:`.UserDefinedOptions` that have been + associated with the statement being invoked. + + """ + return [ + opt + for opt in self.statement._with_options + if not opt._is_compile_state and not opt._is_legacy_option + ] + + class SessionTransaction(object): """A :class:`.Session`-level transaction. @@ -1032,9 +1186,7 @@ class Session(_SessionClassMethods): def connection( self, - mapper=None, - clause=None, - bind=None, + bind_arguments=None, close_with_result=False, execution_options=None, **kw @@ -1059,23 +1211,18 @@ class Session(_SessionClassMethods): resolved through any of the optional keyword arguments. This ultimately makes usage of the :meth:`.get_bind` method for resolution. + :param bind_arguments: dictionary of bind arguments. may include + "mapper", "bind", "clause", other custom arguments that are passed + to :meth:`.Session.get_bind`. + :param bind: - Optional :class:`_engine.Engine` to be used as the bind. If - this engine is already involved in an ongoing transaction, - that connection will be used. This argument takes precedence - over ``mapper``, ``clause``. + deprecated; use bind_arguments :param mapper: - Optional :func:`.mapper` mapped class, used to identify - the appropriate bind. This argument takes precedence over - ``clause``. + deprecated; use bind_arguments :param clause: - A :class:`_expression.ClauseElement` (i.e. - :func:`_expression.select`, - :func:`_expression.text`, - etc.) which will be used to locate a bind, if a bind - cannot otherwise be identified. + deprecated; use bind_arguments :param close_with_result: Passed to :meth:`_engine.Engine.connect`, indicating the :class:`_engine.Connection` should be considered @@ -1097,13 +1244,16 @@ class Session(_SessionClassMethods): :ref:`session_transaction_isolation` :param \**kw: - Additional keyword arguments are sent to :meth:`get_bind()`, - allowing additional arguments to be passed to custom - implementations of :meth:`get_bind`. + deprecated; use bind_arguments """ + + if not bind_arguments: + bind_arguments = kw + + bind = bind_arguments.pop("bind", None) if bind is None: - bind = self.get_bind(mapper, clause=clause, **kw) + bind = self.get_bind(**bind_arguments) return self._connection_for_bind( bind, @@ -1124,7 +1274,14 @@ class Session(_SessionClassMethods): conn = conn.execution_options(**execution_options) return conn - def execute(self, clause, params=None, mapper=None, bind=None, **kw): + def execute( + self, + statement, + params=None, + execution_options=util.immutabledict(), + bind_arguments=None, + **kw + ): r"""Execute a SQL expression construct or string statement within the current transaction. @@ -1222,22 +1379,19 @@ class Session(_SessionClassMethods): "executemany" will be invoked. The keys in each dictionary must correspond to parameter names present in the statement. + :param bind_arguments: dictionary of additional arguments to determine + the bind. may include "mapper", "bind", or other custom arguments. + Contents of this dictionary are passed to the + :meth:`.Session.get_bind` method. + :param mapper: - Optional :func:`.mapper` or mapped class, used to identify - the appropriate bind. This argument takes precedence over - ``clause`` when locating a bind. See :meth:`.Session.get_bind` - for more details. + deprecated; use the bind_arguments dictionary :param bind: - Optional :class:`_engine.Engine` to be used as the bind. If - this engine is already involved in an ongoing transaction, - that connection will be used. This argument takes - precedence over ``mapper`` and ``clause`` when locating - a bind. + deprecated; use the bind_arguments dictionary :param \**kw: - Additional keyword arguments are sent to :meth:`.Session.get_bind()` - to allow extensibility of "bind" schemes. + deprecated; use the bind_arguments dictionary .. seealso:: @@ -1253,20 +1407,63 @@ class Session(_SessionClassMethods): in order to execute the statement. """ - clause = coercions.expect(roles.CoerceTextStatementRole, clause) - if bind is None: - bind = self.get_bind(mapper, clause=clause, **kw) + statement = coercions.expect(roles.CoerceTextStatementRole, statement) - return self._connection_for_bind( - bind, close_with_result=True - )._execute_20(clause, params,) + if not bind_arguments: + bind_arguments = kw + elif kw: + bind_arguments.update(kw) + + compile_state_cls = statement._get_plugin_compile_state_cls("orm") + if compile_state_cls: + compile_state_cls.orm_pre_session_exec( + self, statement, execution_options, bind_arguments + ) + else: + bind_arguments.setdefault("clause", statement) + if statement._is_future: + execution_options = util.immutabledict().merge_with( + execution_options, {"future_result": True} + ) + + if self.dispatch.do_orm_execute: + skip_events = bind_arguments.pop("_sa_skip_events", False) + + if not skip_events: + orm_exec_state = ORMExecuteState( + self, statement, params, execution_options, bind_arguments + ) + for fn in self.dispatch.do_orm_execute: + result = fn(orm_exec_state) + if result: + return result + + bind = self.get_bind(**bind_arguments) + + conn = self._connection_for_bind(bind, close_with_result=True) + result = conn._execute_20(statement, params or {}, execution_options) - def scalar(self, clause, params=None, mapper=None, bind=None, **kw): + if compile_state_cls: + result = compile_state_cls.orm_setup_cursor_result( + self, bind_arguments, result + ) + + return result + + def scalar( + self, + statement, + params=None, + execution_options=None, + mapper=None, + bind=None, + **kw + ): """Like :meth:`~.Session.execute` but return a scalar result.""" return self.execute( - clause, params=params, mapper=mapper, bind=bind, **kw + statement, params=params, mapper=mapper, bind=bind, **kw ).scalar() def close(self): @@ -1422,7 +1619,7 @@ class Session(_SessionClassMethods): """ self._add_bind(table, bind) - def get_bind(self, mapper=None, clause=None): + def get_bind(self, mapper=None, clause=None, bind=None): """Return a "bind" to which this :class:`.Session` is bound. The "bind" is usually an instance of :class:`_engine.Engine`, @@ -1497,6 +1694,8 @@ class Session(_SessionClassMethods): :meth:`.Session.bind_table` """ + if bind: + return bind if mapper is clause is None: if self.bind: @@ -1520,6 +1719,8 @@ class Session(_SessionClassMethods): raise if self.__binds: + # matching mappers and selectables to entries in the + # binds dictionary; supported use case. if mapper: for cls in mapper.class_.__mro__: if cls in self.__binds: @@ -1528,18 +1729,32 @@ class Session(_SessionClassMethods): clause = mapper.persist_selectable if clause is not None: - for t in sql_util.find_tables(clause, include_crud=True): - if t in self.__binds: - return self.__binds[t] + for obj in visitors.iterate(clause): + if obj in self.__binds: + return self.__binds[obj] + # session has a single bind; supported use case. if self.bind: return self.bind - if isinstance(clause, sql.expression.ClauseElement) and clause.bind: - return clause.bind + # now we are in legacy territory. looking for "bind" on tables + # that are via bound metadata. this goes away in 2.0. + if mapper and clause is None: + clause = mapper.persist_selectable - if mapper and mapper.persist_selectable.bind: - return mapper.persist_selectable.bind + if clause is not None: + if clause.bind: + return clause.bind + # for obj in visitors.iterate(clause): + # if obj.bind: + # return obj.bind + + if mapper: + if mapper.persist_selectable.bind: + return mapper.persist_selectable.bind + # for obj in visitors.iterate(mapper.persist_selectable): + # if obj.bind: + # return obj.bind context = [] if mapper is not None: @@ -1722,9 +1937,11 @@ class Session(_SessionClassMethods): else: with_for_update = None + stmt = future.select(object_mapper(instance)) if ( loading.load_on_ident( - self.query(object_mapper(instance)), + self, + stmt, state.key, refresh_state=state, with_for_update=with_for_update, diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py index c0c090b3d..a7d501b53 100644 --- a/lib/sqlalchemy/orm/strategies.py +++ b/lib/sqlalchemy/orm/strategies.py @@ -33,6 +33,7 @@ from .util import _none_set from .util import aliased from .. import event from .. import exc as sa_exc +from .. import future from .. import inspect from .. import log from .. import sql @@ -440,10 +441,13 @@ class DeferredColumnLoader(LoaderStrategy): if self.raiseload: self._invoke_raise_load(state, passive, "raise") - query = session.query(localparent) if ( loading.load_on_ident( - query, state.key, only_load_props=group, refresh_state=state + session, + future.select(localparent).apply_labels(), + state.key, + only_load_props=group, + refresh_state=state, ) is None ): @@ -897,7 +901,7 @@ class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots): q(session) .with_post_criteria(lambda q: q._set_lazyload_from(state)) ._load_on_pk_identity( - session.query(self.mapper), primary_key_identity + session, session.query(self.mapper), primary_key_identity ) ) @@ -1090,7 +1094,6 @@ class SubqueryLoader(PostLoader): parentmapper=None, **kwargs ): - if ( not compile_state.compile_options._enable_eagerloads or compile_state.compile_options._for_refresh_state @@ -1146,6 +1149,7 @@ class SubqueryLoader(PostLoader): # generate a new Query from the original, then # produce a subquery from it. left_alias = self._generate_from_original_query( + compile_state, orig_query, leftmost_mapper, leftmost_attr, @@ -1164,7 +1168,9 @@ class SubqueryLoader(PostLoader): def set_state_options(compile_state): compile_state.attributes.update( { - ("orig_query", SubqueryLoader): orig_query, + ("orig_query", SubqueryLoader): orig_query.with_session( + None + ), ("subquery_path", None): subq_path, } ) @@ -1188,6 +1194,7 @@ class SubqueryLoader(PostLoader): # by create_row_processor # NOTE: be sure to consult baked.py for some hardcoded logic # about this structure as well + assert q.session is None path.set( compile_state.attributes, "subqueryload_data", {"query": q}, ) @@ -1218,6 +1225,7 @@ class SubqueryLoader(PostLoader): def _generate_from_original_query( self, + orig_compile_state, orig_query, leftmost_mapper, leftmost_attr, @@ -1243,11 +1251,18 @@ class SubqueryLoader(PostLoader): } ) - cs = q._clone() + # NOTE: keystone has a test which is counting before_compile + # events. That test is in one case dependent on an extra + # call that was occurring here within the subqueryloader setup + # process, probably when the subquery() method was called. + # Ultimately that call will not be occurring here. + # the event has already been called on the original query when + # we are here in any case, so keystone will need to adjust that + # test. - # using the _compile_state method so that the before_compile() - # event is hit here. keystone is testing for this. - compile_state = cs._compile_state(entities_only=True) + # for column information, look to the compile state that is + # already being passed through + compile_state = orig_compile_state # select from the identity columns of the outer (specifically, these # are the 'local_cols' of the property). This will remove @@ -1260,7 +1275,6 @@ class SubqueryLoader(PostLoader): ], compile_state._get_current_adapter(), ) - # q.add_columns.non_generative(q, target_cols) q._set_entities(target_cols) distinct_target_key = leftmost_relationship.distinct_target_key @@ -1428,10 +1442,20 @@ class SubqueryLoader(PostLoader): """ - __slots__ = ("subq_info", "subq", "_data") + __slots__ = ( + "session", + "execution_options", + "load_options", + "subq", + "_data", + ) - def __init__(self, subq_info): - self.subq_info = subq_info + def __init__(self, context, subq_info): + # avoid creating a cycle by storing context + # even though that's preferable + self.session = context.session + self.execution_options = context.execution_options + self.load_options = context.load_options self.subq = subq_info["query"] self._data = None @@ -1443,7 +1467,17 @@ class SubqueryLoader(PostLoader): def _load(self): self._data = collections.defaultdict(list) - rows = list(self.subq) + q = self.subq + assert q.session is None + if "compiled_cache" in self.execution_options: + q = q.execution_options( + compiled_cache=self.execution_options["compiled_cache"] + ) + q = q.with_session(self.session) + + # to work with baked query, the parameters may have been + # updated since this query was created, so take these into account + rows = list(q.params(self.load_options._params)) for k, v in itertools.groupby(rows, lambda x: x[1:]): self._data[k].extend(vv[0] for vv in v) @@ -1474,14 +1508,7 @@ class SubqueryLoader(PostLoader): subq = subq_info["query"] - if subq.session is None: - subq.session = context.session - assert subq.session is context.session, ( - "Subquery session doesn't refer to that of " - "our context. Are there broken context caching " - "schemes being used?" - ) - + assert subq.session is None local_cols = self.parent_property.local_columns # cache the loaded collections in the context @@ -1489,7 +1516,7 @@ class SubqueryLoader(PostLoader): # call upon create_row_processor again collections = path.get(context.attributes, "collections") if collections is None: - collections = self._SubqCollections(subq_info) + collections = self._SubqCollections(context, subq_info) path.set(context.attributes, "collections", collections) if adapter: diff --git a/lib/sqlalchemy/orm/util.py b/lib/sqlalchemy/orm/util.py index 1e415e49c..ce37d962e 100644 --- a/lib/sqlalchemy/orm/util.py +++ b/lib/sqlalchemy/orm/util.py @@ -41,6 +41,7 @@ from ..sql import expression from ..sql import roles from ..sql import util as sql_util from ..sql import visitors +from ..sql.annotation import SupportsCloneAnnotations from ..sql.base import ColumnCollection @@ -694,6 +695,8 @@ class AliasedInsp( "entity_namespace": self, "compile_state_plugin": "orm", } + )._set_propagate_attrs( + {"compile_state_plugin": "orm", "plugin_subject": self} ) @property @@ -748,10 +751,20 @@ class AliasedInsp( ) def _adapt_element(self, elem, key=None): - d = {"parententity": self, "parentmapper": self.mapper} + d = { + "parententity": self, + "parentmapper": self.mapper, + "compile_state_plugin": "orm", + } if key: d["orm_key"] = key - return self._adapter.traverse(elem)._annotate(d) + return ( + self._adapter.traverse(elem) + ._annotate(d) + ._set_propagate_attrs( + {"compile_state_plugin": "orm", "plugin_subject": self} + ) + ) def _entity_for_mapper(self, mapper): self_poly = self.with_polymorphic_mappers @@ -1037,7 +1050,7 @@ def with_polymorphic( @inspection._self_inspects -class Bundle(ORMColumnsClauseRole, InspectionAttr): +class Bundle(ORMColumnsClauseRole, SupportsCloneAnnotations, InspectionAttr): """A grouping of SQL expressions that are returned by a :class:`.Query` under one namespace. @@ -1070,6 +1083,8 @@ class Bundle(ORMColumnsClauseRole, InspectionAttr): is_bundle = True + _propagate_attrs = util.immutabledict() + def __init__(self, name, *exprs, **kw): r"""Construct a new :class:`.Bundle`. @@ -1090,7 +1105,10 @@ class Bundle(ORMColumnsClauseRole, InspectionAttr): """ self.name = self._label = name self.exprs = exprs = [ - coercions.expect(roles.ColumnsClauseRole, expr) for expr in exprs + coercions.expect( + roles.ColumnsClauseRole, expr, apply_propagate_attrs=self + ) + for expr in exprs ] self.c = self.columns = ColumnCollection( @@ -1145,11 +1163,14 @@ class Bundle(ORMColumnsClauseRole, InspectionAttr): return cloned def __clause_element__(self): + annotations = self._annotations.union( + {"bundle": self, "entity_namespace": self} + ) return expression.ClauseList( _literal_as_text_role=roles.ColumnsClauseRole, group=False, *[e._annotations.get("bundle", e) for e in self.exprs] - )._annotate({"bundle": self, "entity_namespace": self}) + )._annotate(annotations) @property def clauses(self): |
