summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/orm
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-04-27 12:58:12 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-05-25 13:56:37 -0400
commit6930dfc032c3f9f474e71ab4e021c0ef8384930e (patch)
tree34b919a3c34edaffda1750f161a629fc5b9a8020 /lib/sqlalchemy/orm
parentdce8c7a125cb99fad62c76cd145752d5afefae36 (diff)
downloadsqlalchemy-6930dfc032c3f9f474e71ab4e021c0ef8384930e.tar.gz
Convert execution to move through Session
This patch replaces the ORM execution flow with a single pathway through Session.execute() for all queries, including Core and ORM. Currently included is full support for ORM Query, Query.from_statement(), select(), as well as the baked query and horizontal shard systems. Initial changes have also been made to the dogpile caching example, which like baked query makes use of a new ORM-specific execution hook that replaces the use of both QueryEvents.before_compile() as well as Query._execute_and_instances() as the central ORM interception hooks. select() and Query() constructs alike can be passed to Session.execute() where they will return ORM results in a Results object. This API is currently used internally by Query. Full support for Session.execute()->results to behave in a fully 2.0 fashion will be in later changesets. bulk update/delete with ORM support will also be delivered via the update() and delete() constructs, however these have not yet been adapted to the new system and may follow in a subsequent update. Performance is also beginning to lag as of this commit and some previous ones. It is hoped that a few central functions such as the coercions functions can be rewritten in C to re-gain performance. Additionally, query caching is now available and some subsequent patches will attempt to cache more of the per-execution work from the ORM layer, e.g. column getters and adapters. This patch also contains initial "turn on" of the caching system enginewide via the query_cache_size parameter to create_engine(). Still defaulting at zero for "no caching". The caching system still needs adjustments in order to gain adequate performance. Change-Id: I047a7ebb26aa85dc01f6789fac2bff561dcd555d
Diffstat (limited to 'lib/sqlalchemy/orm')
-rw-r--r--lib/sqlalchemy/orm/__init__.py3
-rw-r--r--lib/sqlalchemy/orm/attributes.py4
-rw-r--r--lib/sqlalchemy/orm/context.py619
-rw-r--r--lib/sqlalchemy/orm/events.py37
-rw-r--r--lib/sqlalchemy/orm/interfaces.py105
-rw-r--r--lib/sqlalchemy/orm/loading.py231
-rw-r--r--lib/sqlalchemy/orm/mapper.py2
-rw-r--r--lib/sqlalchemy/orm/path_registry.py19
-rw-r--r--lib/sqlalchemy/orm/persistence.py7
-rw-r--r--lib/sqlalchemy/orm/properties.py27
-rw-r--r--lib/sqlalchemy/orm/query.py312
-rw-r--r--lib/sqlalchemy/orm/relationships.py4
-rw-r--r--lib/sqlalchemy/orm/session.py319
-rw-r--r--lib/sqlalchemy/orm/strategies.py73
-rw-r--r--lib/sqlalchemy/orm/util.py31
15 files changed, 1255 insertions, 538 deletions
diff --git a/lib/sqlalchemy/orm/__init__.py b/lib/sqlalchemy/orm/__init__.py
index 0a353f81c..110c27811 100644
--- a/lib/sqlalchemy/orm/__init__.py
+++ b/lib/sqlalchemy/orm/__init__.py
@@ -30,6 +30,7 @@ from .mapper import reconstructor # noqa
from .mapper import validates # noqa
from .properties import ColumnProperty # noqa
from .query import AliasOption # noqa
+from .query import FromStatement # noqa
from .query import Query # noqa
from .relationships import foreign # noqa
from .relationships import RelationshipProperty # noqa
@@ -39,8 +40,10 @@ from .session import close_all_sessions # noqa
from .session import make_transient # noqa
from .session import make_transient_to_detached # noqa
from .session import object_session # noqa
+from .session import ORMExecuteState # noqa
from .session import Session # noqa
from .session import sessionmaker # noqa
+from .session import SessionTransaction # noqa
from .strategy_options import Load # noqa
from .util import aliased # noqa
from .util import Bundle # noqa
diff --git a/lib/sqlalchemy/orm/attributes.py b/lib/sqlalchemy/orm/attributes.py
index 7b4415bfe..262a1efc9 100644
--- a/lib/sqlalchemy/orm/attributes.py
+++ b/lib/sqlalchemy/orm/attributes.py
@@ -207,6 +207,10 @@ class QueryableAttribute(
def __clause_element__(self):
return self.expression
+ @property
+ def _from_objects(self):
+ return self.expression._from_objects
+
def _bulk_update_tuples(self, value):
"""Return setter tuples for a bulk UPDATE."""
diff --git a/lib/sqlalchemy/orm/context.py b/lib/sqlalchemy/orm/context.py
index 0a3701134..3acab7df7 100644
--- a/lib/sqlalchemy/orm/context.py
+++ b/lib/sqlalchemy/orm/context.py
@@ -18,19 +18,21 @@ from .util import Bundle
from .util import join as orm_join
from .util import ORMAdapter
from .. import exc as sa_exc
+from .. import future
from .. import inspect
from .. import sql
from .. import util
-from ..future.selectable import Select as FutureSelect
from ..sql import coercions
from ..sql import expression
from ..sql import roles
from ..sql import util as sql_util
from ..sql import visitors
from ..sql.base import CacheableOptions
+from ..sql.base import CompileState
from ..sql.base import Options
+from ..sql.selectable import LABEL_STYLE_DISAMBIGUATE_ONLY
+from ..sql.selectable import LABEL_STYLE_NONE
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
-from ..sql.selectable import Select
from ..sql.selectable import SelectState
from ..sql.visitors import ExtendedInternalTraversal
from ..sql.visitors import InternalTraversal
@@ -44,6 +46,8 @@ class QueryContext(object):
"orm_query",
"query",
"load_options",
+ "bind_arguments",
+ "execution_options",
"session",
"autoflush",
"populate_existing",
@@ -51,7 +55,7 @@ class QueryContext(object):
"version_check",
"refresh_state",
"create_eager_joins",
- "propagate_options",
+ "propagated_loader_options",
"attributes",
"runid",
"partials",
@@ -70,20 +74,30 @@ class QueryContext(object):
_yield_per = None
_refresh_state = None
_lazy_loaded_from = None
+ _orm_query = None
_params = util.immutabledict()
- def __init__(self, compile_state, session):
- query = compile_state.query
+ def __init__(
+ self,
+ compile_state,
+ session,
+ load_options,
+ execution_options=None,
+ bind_arguments=None,
+ ):
+ self.load_options = load_options
+ self.execution_options = execution_options or {}
+ self.bind_arguments = bind_arguments or {}
self.compile_state = compile_state
self.orm_query = compile_state.orm_query
- self.query = compile_state.query
+ self.query = query = compile_state.query
self.session = session
- self.load_options = load_options = query.load_options
- self.propagate_options = set(
+ self.propagated_loader_options = {
o for o in query._with_options if o.propagate_to_loaders
- )
+ }
+
self.attributes = dict(compile_state.attributes)
self.autoflush = load_options._autoflush
@@ -92,11 +106,7 @@ class QueryContext(object):
self.version_check = load_options._version_check
self.refresh_state = load_options._refresh_state
self.yield_per = load_options._yield_per
-
- if self.refresh_state is not None:
- self.identity_token = load_options._refresh_identity_token
- else:
- self.identity_token = None
+ self.identity_token = load_options._refresh_identity_token
if self.yield_per and compile_state._no_yield_pers:
raise sa_exc.InvalidRequestError(
@@ -119,25 +129,10 @@ class QueryContext(object):
)
-class QueryCompileState(sql.base.CompileState):
- _joinpath = _joinpoint = util.immutabledict()
- _from_obj_alias = None
- _has_mapper_entities = False
-
- _has_orm_entities = False
- multi_row_eager_loaders = False
- compound_eager_adapter = None
- loaders_require_buffering = False
- loaders_require_uniquing = False
-
- correlate = None
- _where_criteria = ()
- _having_criteria = ()
-
- orm_query = None
-
+class ORMCompileState(CompileState):
class default_compile_options(CacheableOptions):
_cache_key_traversal = [
+ ("_orm_results", InternalTraversal.dp_boolean),
("_bake_ok", InternalTraversal.dp_boolean),
(
"_with_polymorphic_adapt_map",
@@ -153,136 +148,310 @@ class QueryCompileState(sql.base.CompileState):
("_for_refresh_state", InternalTraversal.dp_boolean),
]
+ _orm_results = True
_bake_ok = True
_with_polymorphic_adapt_map = ()
_current_path = _path_registry
_enable_single_crit = True
- _statement = None
_enable_eagerloads = True
_orm_only_from_obj_alias = True
_only_load_props = None
_set_base_alias = False
_for_refresh_state = False
+ # non-cache-key elements mostly for legacy use
+ _statement = None
+ _orm_query = None
+
+ @classmethod
+ def merge(cls, other):
+ return cls + other._state_dict()
+
+ orm_query = None
+ current_path = _path_registry
+
def __init__(self, *arg, **kw):
raise NotImplementedError()
@classmethod
- def _create_for_select(cls, statement, compiler, **kw):
- if not statement._is_future:
- return SelectState(statement, compiler, **kw)
+ def create_for_statement(cls, statement_container, compiler, **kw):
+ raise NotImplementedError()
- self = cls.__new__(cls)
+ @classmethod
+ def _create_for_legacy_query(cls, query, for_statement=False):
+ stmt = query._statement_20(orm_results=not for_statement)
- if not isinstance(
- statement.compile_options, cls.default_compile_options
- ):
- statement.compile_options = cls.default_compile_options
- orm_state = self._create_for_legacy_query_via_either(statement)
- compile_state = SelectState(orm_state.statement, compiler, **kw)
- compile_state._orm_state = orm_state
- return compile_state
+ if query.compile_options._statement is not None:
+ compile_state_cls = ORMFromStatementCompileState
+ else:
+ compile_state_cls = ORMSelectCompileState
+
+ # true in all cases except for two tests in test/orm/test_events.py
+ # assert stmt.compile_options._orm_query is query
+ return compile_state_cls._create_for_statement_or_query(
+ stmt, for_statement=for_statement
+ )
@classmethod
- def _create_future_select_from_query(cls, query):
- stmt = FutureSelect.__new__(FutureSelect)
-
- # the internal state of Query is now a mirror of that of
- # Select which can be transferred directly. The Select
- # supports compilation into its correct form taking all ORM
- # features into account via the plugin and the compile options.
- # however it does not export its columns or other attributes
- # correctly if deprecated ORM features that adapt plain mapped
- # elements are used; for this reason the Select() returned here
- # can always support direct execution, but for composition in a larger
- # select only works if it does not represent legacy ORM adaption
- # features.
- stmt.__dict__.update(
- dict(
- _raw_columns=query._raw_columns,
- _compile_state_plugin="orm", # ;)
- _where_criteria=query._where_criteria,
- _from_obj=query._from_obj,
- _legacy_setup_joins=query._legacy_setup_joins,
- _order_by_clauses=query._order_by_clauses,
- _group_by_clauses=query._group_by_clauses,
- _having_criteria=query._having_criteria,
- _distinct=query._distinct,
- _distinct_on=query._distinct_on,
- _with_options=query._with_options,
- _with_context_options=query._with_context_options,
- _hints=query._hints,
- _statement_hints=query._statement_hints,
- _correlate=query._correlate,
- _auto_correlate=query._auto_correlate,
- _limit_clause=query._limit_clause,
- _offset_clause=query._offset_clause,
- _for_update_arg=query._for_update_arg,
- _prefixes=query._prefixes,
- _suffixes=query._suffixes,
- _label_style=query._label_style,
- compile_options=query.compile_options,
- # this will be moving but for now make it work like orm.Query
- load_options=query.load_options,
+ def _create_for_statement_or_query(
+ cls, statement_container, for_statement=False,
+ ):
+ raise NotImplementedError()
+
+ @classmethod
+ def orm_pre_session_exec(
+ cls, session, statement, execution_options, bind_arguments
+ ):
+ if execution_options:
+ # TODO: will have to provide public API to set some load
+ # options and also extract them from that API here, likely
+ # execution options
+ load_options = execution_options.get(
+ "_sa_orm_load_options", QueryContext.default_load_options
)
+ else:
+ load_options = QueryContext.default_load_options
+
+ bind_arguments["clause"] = statement
+
+ # new in 1.4 - the coercions system is leveraged to allow the
+ # "subject" mapper of a statement be propagated to the top
+ # as the statement is built. "subject" mapper is the generally
+ # standard object used as an identifier for multi-database schemes.
+
+ if "plugin_subject" in statement._propagate_attrs:
+ bind_arguments["mapper"] = statement._propagate_attrs[
+ "plugin_subject"
+ ].mapper
+
+ if load_options._autoflush:
+ session._autoflush()
+
+ @classmethod
+ def orm_setup_cursor_result(cls, session, bind_arguments, result):
+ execution_context = result.context
+ compile_state = execution_context.compiled.compile_state
+
+ # cover edge case where ORM entities used in legacy select
+ # were passed to session.execute:
+ # session.execute(legacy_select([User.id, User.name]))
+ # see test_query->test_legacy_tuple_old_select
+ if not execution_context.compiled.statement._is_future:
+ return result
+
+ execution_options = execution_context.execution_options
+
+ # we are getting these right above in orm_pre_session_exec(),
+ # then getting them again right here.
+ load_options = execution_options.get(
+ "_sa_orm_load_options", QueryContext.default_load_options
+ )
+ querycontext = QueryContext(
+ compile_state,
+ session,
+ load_options,
+ execution_options,
+ bind_arguments,
)
+ return loading.instances(result, querycontext)
- return stmt
+ @property
+ def _mapper_entities(self):
+ return (
+ ent for ent in self._entities if isinstance(ent, _MapperEntity)
+ )
+
+ def _create_with_polymorphic_adapter(self, ext_info, selectable):
+ if (
+ not ext_info.is_aliased_class
+ and ext_info.mapper.persist_selectable
+ not in self._polymorphic_adapters
+ ):
+ self._mapper_loads_polymorphically_with(
+ ext_info.mapper,
+ sql_util.ColumnAdapter(
+ selectable, ext_info.mapper._equivalent_columns
+ ),
+ )
+
+ def _mapper_loads_polymorphically_with(self, mapper, adapter):
+ for m2 in mapper._with_polymorphic_mappers or [mapper]:
+ self._polymorphic_adapters[m2] = adapter
+ for m in m2.iterate_to_root():
+ self._polymorphic_adapters[m.local_table] = adapter
+
+
+@sql.base.CompileState.plugin_for("orm", "grouping")
+class ORMFromStatementCompileState(ORMCompileState):
+ _aliased_generations = util.immutabledict()
+ _from_obj_alias = None
+ _has_mapper_entities = False
+
+ _has_orm_entities = False
+ multi_row_eager_loaders = False
+ compound_eager_adapter = None
+ loaders_require_buffering = False
+ loaders_require_uniquing = False
+
+ @classmethod
+ def create_for_statement(cls, statement_container, compiler, **kw):
+ compiler._rewrites_selected_columns = True
+ return cls._create_for_statement_or_query(statement_container)
@classmethod
- def _create_for_legacy_query(
- cls, query, for_statement=False, entities_only=False
+ def _create_for_statement_or_query(
+ cls, statement_container, for_statement=False,
):
- # as we are seeking to use Select() with ORM state as the
- # primary executable element, have all Query objects that are not
- # from_statement() convert to a Select() first, then run on that.
+ # from .query import FromStatement
- if query.compile_options._statement is not None:
- return cls._create_for_legacy_query_via_either(
- query,
- for_statement=for_statement,
- entities_only=entities_only,
- orm_query=query,
- )
+ # assert isinstance(statement_container, FromStatement)
+
+ self = cls.__new__(cls)
+ self._primary_entity = None
+
+ self.orm_query = statement_container.compile_options._orm_query
+
+ self.statement_container = self.query = statement_container
+ self.requested_statement = statement_container.element
+
+ self._entities = []
+ self._with_polymorphic_adapt_map = {}
+ self._polymorphic_adapters = {}
+ self._no_yield_pers = set()
+
+ _QueryEntity.to_compile_state(self, statement_container._raw_columns)
+
+ self.compile_options = statement_container.compile_options
+
+ self.current_path = statement_container.compile_options._current_path
+
+ if statement_container._with_options:
+ self.attributes = {"_unbound_load_dedupes": set()}
+
+ for opt in statement_container._with_options:
+ if opt._is_compile_state:
+ opt.process_compile_state(self)
+ else:
+ self.attributes = {}
+
+ if statement_container._with_context_options:
+ for fn, key in statement_container._with_context_options:
+ fn(self)
+
+ self.primary_columns = []
+ self.secondary_columns = []
+ self.eager_joins = {}
+ self.single_inh_entities = {}
+ self.create_eager_joins = []
+ self._fallback_from_clauses = []
+ self._setup_for_statement()
+
+ return self
+
+ def _setup_for_statement(self):
+ statement = self.requested_statement
+ if (
+ isinstance(statement, expression.SelectBase)
+ and not statement._is_textual
+ and not statement.use_labels
+ ):
+ self.statement = statement.apply_labels()
else:
- assert query.compile_options._statement is None
+ self.statement = statement
+ self.order_by = None
- stmt = cls._create_future_select_from_query(query)
+ if isinstance(self.statement, expression.TextClause):
+ # setup for all entities. Currently, this is not useful
+ # for eager loaders, as the eager loaders that work are able
+ # to do their work entirely in row_processor.
+ for entity in self._entities:
+ entity.setup_compile_state(self)
- return cls._create_for_legacy_query_via_either(
- stmt,
- for_statement=for_statement,
- entities_only=entities_only,
- orm_query=query,
+ # we did the setup just to get primary columns.
+ self.statement = expression.TextualSelect(
+ self.statement, self.primary_columns, positional=False
)
+ else:
+ # allow TextualSelect with implicit columns as well
+ # as select() with ad-hoc columns, see test_query::TextTest
+ self._from_obj_alias = sql.util.ColumnAdapter(
+ self.statement, adapt_on_names=True
+ )
+ # set up for eager loaders, however if we fix subqueryload
+ # it should not need to do this here. the model of eager loaders
+ # that can work entirely in row_processor might be interesting
+ # here though subqueryloader has a lot of upfront work to do
+ # see test/orm/test_query.py -> test_related_eagerload_against_text
+ # for where this part makes a difference. would rather have
+ # subqueryload figure out what it needs more intelligently.
+ # for entity in self._entities:
+ # entity.setup_compile_state(self)
+
+ def _adapt_col_list(self, cols, current_adapter):
+ return cols
+
+ def _get_current_adapter(self):
+ return None
+
+
+@sql.base.CompileState.plugin_for("orm", "select")
+class ORMSelectCompileState(ORMCompileState, SelectState):
+ _joinpath = _joinpoint = util.immutabledict()
+ _from_obj_alias = None
+ _has_mapper_entities = False
+
+ _has_orm_entities = False
+ multi_row_eager_loaders = False
+ compound_eager_adapter = None
+ loaders_require_buffering = False
+ loaders_require_uniquing = False
+
+ correlate = None
+ _where_criteria = ()
+ _having_criteria = ()
+
+ orm_query = None
+
+ @classmethod
+ def create_for_statement(cls, statement, compiler, **kw):
+ if not statement._is_future:
+ return SelectState(statement, compiler, **kw)
+
+ compiler._rewrites_selected_columns = True
+
+ orm_state = cls._create_for_statement_or_query(
+ statement, for_statement=True
+ )
+ SelectState.__init__(orm_state, orm_state.statement, compiler, **kw)
+ return orm_state
@classmethod
- def _create_for_legacy_query_via_either(
- cls, query, for_statement=False, entities_only=False, orm_query=None
+ def _create_for_statement_or_query(
+ cls, query, for_statement=False, _entities_only=False,
):
+ assert isinstance(query, future.Select)
+
+ query.compile_options = cls.default_compile_options.merge(
+ query.compile_options
+ )
self = cls.__new__(cls)
self._primary_entity = None
- self.has_select = isinstance(query, Select)
+ self.orm_query = query.compile_options._orm_query
- if orm_query:
- self.orm_query = orm_query
- self.query = query
- self.has_orm_query = True
- else:
- self.query = query
- if not self.has_select:
- self.orm_query = query
- self.has_orm_query = True
- else:
- self.orm_query = None
- self.has_orm_query = False
+ self.query = query
self.select_statement = select_statement = query
+ if not hasattr(select_statement.compile_options, "_orm_results"):
+ select_statement.compile_options = cls.default_compile_options
+ select_statement.compile_options += {"_orm_results": for_statement}
+ else:
+ for_statement = not select_statement.compile_options._orm_results
+
self.query = query
self._entities = []
@@ -300,19 +469,28 @@ class QueryCompileState(sql.base.CompileState):
_QueryEntity.to_compile_state(self, select_statement._raw_columns)
- if entities_only:
+ if _entities_only:
return self
self.compile_options = query.compile_options
+
+ # TODO: the name of this flag "for_statement" has to change,
+ # as it is difficult to distinguish from the "query._statement" use
+ # case which is something totally different
self.for_statement = for_statement
- if self.has_orm_query and not for_statement:
- self.label_style = LABEL_STYLE_TABLENAME_PLUS_COL
+ # determine label style. we can make different decisions here.
+ # at the moment, trying to see if we can always use DISAMBIGUATE_ONLY
+ # rather than LABEL_STYLE_NONE, and if we can use disambiguate style
+ # for new style ORM selects too.
+ if self.select_statement._label_style is LABEL_STYLE_NONE:
+ if self.orm_query and not for_statement:
+ self.label_style = LABEL_STYLE_TABLENAME_PLUS_COL
+ else:
+ self.label_style = LABEL_STYLE_DISAMBIGUATE_ONLY
else:
self.label_style = self.select_statement._label_style
- self.labels = self.label_style is LABEL_STYLE_TABLENAME_PLUS_COL
-
self.current_path = select_statement.compile_options._current_path
self.eager_order_by = ()
@@ -321,7 +499,7 @@ class QueryCompileState(sql.base.CompileState):
self.attributes = {"_unbound_load_dedupes": set()}
for opt in self.select_statement._with_options:
- if not opt._is_legacy_option:
+ if opt._is_compile_state:
opt.process_compile_state(self)
else:
self.attributes = {}
@@ -341,13 +519,50 @@ class QueryCompileState(sql.base.CompileState):
info.selectable for info in select_statement._from_obj
]
- if self.compile_options._statement is not None:
- self._setup_for_statement()
- else:
- self._setup_for_generate()
+ self._setup_for_generate()
return self
+ @classmethod
+ def _create_entities_collection(cls, query):
+ """Creates a partial ORMSelectCompileState that includes
+ the full collection of _MapperEntity and other _QueryEntity objects.
+
+ Supports a few remaining use cases that are pre-compilation
+ but still need to gather some of the column / adaption information.
+
+ """
+ self = cls.__new__(cls)
+
+ self._entities = []
+ self._primary_entity = None
+ self._aliased_generations = {}
+ self._polymorphic_adapters = {}
+
+ # legacy: only for query.with_polymorphic()
+ self._with_polymorphic_adapt_map = wpam = dict(
+ query.compile_options._with_polymorphic_adapt_map
+ )
+ if wpam:
+ self._setup_with_polymorphics()
+
+ _QueryEntity.to_compile_state(self, query._raw_columns)
+ return self
+
+ @classmethod
+ def determine_last_joined_entity(cls, statement):
+ setup_joins = statement._setup_joins
+
+ if not setup_joins:
+ return None
+
+ (target, onclause, from_, flags) = setup_joins[-1]
+
+ if isinstance(target, interfaces.PropComparator):
+ return target.entity
+ else:
+ return target
+
def _setup_with_polymorphics(self):
# legacy: only for query.with_polymorphic()
for ext_info, wp in self._with_polymorphic_adapt_map.items():
@@ -404,34 +619,6 @@ class QueryCompileState(sql.base.CompileState):
return None
- def _deep_entity_zero(self):
- """Return a 'deep' entity; this is any entity we can find associated
- with the first entity / column experssion. this is used only for
- session.get_bind().
-
- it is hoped this concept can be removed in an upcoming change
- to the ORM execution model.
-
- """
- for ent in self.from_clauses:
- if "parententity" in ent._annotations:
- return ent._annotations["parententity"].mapper
- for ent in self._entities:
- ezero = ent._deep_entity_zero()
- if ezero is not None:
- return ezero.mapper
- else:
- return None
-
- @property
- def _mapper_entities(self):
- for ent in self._entities:
- if isinstance(ent, _MapperEntity):
- yield ent
-
- def _bind_mapper(self):
- return self._deep_entity_zero()
-
def _only_full_mapper_zero(self, methname):
if self._entities != [self._primary_entity]:
raise sa_exc.InvalidRequestError(
@@ -490,7 +677,7 @@ class QueryCompileState(sql.base.CompileState):
else query._order_by_clauses
)
- if query._having_criteria is not None:
+ if query._having_criteria:
self._having_criteria = tuple(
current_adapter(crit, True, True) if current_adapter else crit
for crit in query._having_criteria
@@ -527,7 +714,7 @@ class QueryCompileState(sql.base.CompileState):
for s in query._correlate
)
)
- elif self.has_select and not query._auto_correlate:
+ elif not query._auto_correlate:
self.correlate = (None,)
# PART II
@@ -582,33 +769,6 @@ class QueryCompileState(sql.base.CompileState):
{"deepentity": ezero}
)
- def _setup_for_statement(self):
- compile_options = self.compile_options
-
- if (
- isinstance(compile_options._statement, expression.SelectBase)
- and not compile_options._statement._is_textual
- and not compile_options._statement.use_labels
- ):
- self.statement = compile_options._statement.apply_labels()
- else:
- self.statement = compile_options._statement
- self.order_by = None
-
- if isinstance(self.statement, expression.TextClause):
- # setup for all entities, including contains_eager entities.
- for entity in self._entities:
- entity.setup_compile_state(self)
- self.statement = expression.TextualSelect(
- self.statement, self.primary_columns, positional=False
- )
- else:
- # allow TextualSelect with implicit columns as well
- # as select() with ad-hoc columns, see test_query::TextTest
- self._from_obj_alias = sql.util.ColumnAdapter(
- self.statement, adapt_on_names=True
- )
-
def _compound_eager_statement(self):
# for eager joins present and LIMIT/OFFSET/DISTINCT,
# wrap the query inside a select,
@@ -659,9 +819,10 @@ class QueryCompileState(sql.base.CompileState):
self.compound_eager_adapter = sql_util.ColumnAdapter(inner, equivs)
- statement = sql.select(
- [inner] + self.secondary_columns, use_labels=self.labels
+ statement = future.select(
+ *([inner] + self.secondary_columns) # use_labels=self.labels
)
+ statement._label_style = self.label_style
# Oracle however does not allow FOR UPDATE on the subquery,
# and the Oracle dialect ignores it, plus for PostgreSQL, MySQL
@@ -752,6 +913,7 @@ class QueryCompileState(sql.base.CompileState):
group_by,
):
+ Select = future.Select
statement = Select.__new__(Select)
statement._raw_columns = raw_columns
statement._from_obj = from_obj
@@ -794,25 +956,6 @@ class QueryCompileState(sql.base.CompileState):
return statement
- def _create_with_polymorphic_adapter(self, ext_info, selectable):
- if (
- not ext_info.is_aliased_class
- and ext_info.mapper.persist_selectable
- not in self._polymorphic_adapters
- ):
- self._mapper_loads_polymorphically_with(
- ext_info.mapper,
- sql_util.ColumnAdapter(
- selectable, ext_info.mapper._equivalent_columns
- ),
- )
-
- def _mapper_loads_polymorphically_with(self, mapper, adapter):
- for m2 in mapper._with_polymorphic_mappers or [mapper]:
- self._polymorphic_adapters[m2] = adapter
- for m in m2.iterate_to_root():
- self._polymorphic_adapters[m.local_table] = adapter
-
def _adapt_polymorphic_element(self, element):
if "parententity" in element._annotations:
search = element._annotations["parententity"]
@@ -924,6 +1067,8 @@ class QueryCompileState(sql.base.CompileState):
#
onclause = right
right = None
+ elif "parententity" in right._annotations:
+ right = right._annotations["parententity"].entity
if onclause is None:
r_info = inspect(right)
@@ -932,7 +1077,6 @@ class QueryCompileState(sql.base.CompileState):
"Expected mapped entity or "
"selectable/table as join target"
)
-
if isinstance(onclause, interfaces.PropComparator):
of_type = getattr(onclause, "_of_type", None)
else:
@@ -1584,7 +1728,7 @@ class QueryCompileState(sql.base.CompileState):
"aliased_generation": aliased_generation,
}
- return right, inspect(right), onclause
+ return inspect(right), right, onclause
def _update_joinpoint(self, jp):
self._joinpoint = jp
@@ -1668,14 +1812,8 @@ class QueryCompileState(sql.base.CompileState):
def _column_descriptions(query_or_select_stmt):
- # TODO: this is a hack for now, as it is a little bit non-performant
- # to build up QueryEntity for every entity right now.
- ctx = QueryCompileState._create_for_legacy_query_via_either(
- query_or_select_stmt,
- entities_only=True,
- orm_query=query_or_select_stmt
- if not isinstance(query_or_select_stmt, Select)
- else None,
+ ctx = ORMSelectCompileState._create_entities_collection(
+ query_or_select_stmt
)
return [
{
@@ -1731,23 +1869,6 @@ def _entity_from_pre_ent_zero(query_or_augmented_select):
return ent
-@sql.base.CompileState.plugin_for(
- "orm", "select", "determine_last_joined_entity"
-)
-def _determine_last_joined_entity(statement):
- setup_joins = statement._setup_joins
-
- if not setup_joins:
- return None
-
- (target, onclause, from_, flags) = setup_joins[-1]
-
- if isinstance(target, interfaces.PropComparator):
- return target.entity
- else:
- return target
-
-
def _legacy_determine_last_joined_entity(setup_joins, entity_zero):
"""given the legacy_setup_joins collection at a point in time,
figure out what the "filter by entity" would be in terms
@@ -1929,9 +2050,6 @@ class _MapperEntity(_QueryEntity):
def entity_zero_or_selectable(self):
return self.entity_zero
- def _deep_entity_zero(self):
- return self.entity_zero
-
def corresponds_to(self, entity):
return _entity_corresponds_to(self.entity_zero, entity)
@@ -2093,14 +2211,6 @@ class _BundleEntity(_QueryEntity):
else:
return None
- def _deep_entity_zero(self):
- for ent in self._entities:
- ezero = ent._deep_entity_zero()
- if ezero is not None:
- return ezero
- else:
- return None
-
def setup_compile_state(self, compile_state):
for ent in self._entities:
ent.setup_compile_state(compile_state)
@@ -2175,17 +2285,6 @@ class _RawColumnEntity(_ColumnEntity):
)
self._extra_entities = (self.expr, self.column)
- def _deep_entity_zero(self):
- for obj in visitors.iterate(
- self.column, {"column_tables": True, "column_collections": False},
- ):
- if "parententity" in obj._annotations:
- return obj._annotations["parententity"]
- elif "deepentity" in obj._annotations:
- return obj._annotations["deepentity"]
- else:
- return None
-
def corresponds_to(self, entity):
return False
@@ -2276,9 +2375,6 @@ class _ORMColumnEntity(_ColumnEntity):
ezero, ezero.selectable
)
- def _deep_entity_zero(self):
- return self.mapper
-
def corresponds_to(self, entity):
if _is_aliased_class(entity):
# TODO: polymorphic subclasses ?
@@ -2342,8 +2438,3 @@ class _ORMColumnEntity(_ColumnEntity):
compile_state.primary_columns.append(column)
compile_state.attributes[("fetch_column", self)] = column
-
-
-sql.base.CompileState.plugin_for("orm", "select")(
- QueryCompileState._create_for_select
-)
diff --git a/lib/sqlalchemy/orm/events.py b/lib/sqlalchemy/orm/events.py
index f5d191860..be7aa272e 100644
--- a/lib/sqlalchemy/orm/events.py
+++ b/lib/sqlalchemy/orm/events.py
@@ -1397,6 +1397,43 @@ class SessionEvents(event.Events):
event_key.base_listen(**kw)
+ def do_orm_execute(self, orm_execute_state):
+ """Intercept statement executions that occur in terms of a :class:`.Session`.
+
+ This event is invoked for all top-level SQL statements invoked
+ from the :meth:`_orm.Session.execute` method. As of SQLAlchemy 1.4,
+ all ORM queries emitted on behalf of a :class:`_orm.Session` will
+ flow through this method, so this event hook provides the single
+ point at which ORM queries of all types may be intercepted before
+ they are invoked, and additionally to replace their execution with
+ a different process.
+
+ This event is a ``do_`` event, meaning it has the capability to replace
+ the operation that the :meth:`_orm.Session.execute` method normally
+ performs. The intended use for this includes sharding and
+ result-caching schemes which may seek to invoke the same statement
+ across multiple database connections, returning a result that is
+ merged from each of them, or which don't invoke the statement at all,
+ instead returning data from a cache.
+
+ The hook intends to replace the use of the
+ ``Query._execute_and_instances`` method that could be subclassed prior
+ to SQLAlchemy 1.4.
+
+ :param orm_execute_state: an instance of :class:`.ORMExecuteState`
+ which contains all information about the current execution, as well
+ as helper functions used to derive other commonly required
+ information. See that object for details.
+
+ .. seealso::
+
+ :class:`.ORMExecuteState`
+
+
+ .. versionadded:: 1.4
+
+ """
+
def after_transaction_create(self, session, transaction):
"""Execute when a new :class:`.SessionTransaction` is created.
diff --git a/lib/sqlalchemy/orm/interfaces.py b/lib/sqlalchemy/orm/interfaces.py
index 313f2fda8..6c0f5d3ef 100644
--- a/lib/sqlalchemy/orm/interfaces.py
+++ b/lib/sqlalchemy/orm/interfaces.py
@@ -64,6 +64,12 @@ __all__ = (
)
+class ORMStatementRole(roles.CoerceTextStatementRole):
+ _role_name = (
+ "Executable SQL or text() construct, including ORM " "aware objects"
+ )
+
+
class ORMColumnsClauseRole(roles.ColumnsClauseRole):
_role_name = "ORM mapped entity, aliased entity, or Column expression"
@@ -662,8 +668,15 @@ class StrategizedProperty(MapperProperty):
)
-class LoaderOption(HasCacheKey):
- """Describe a modification to an ORM statement at compilation time.
+class ORMOption(object):
+ """Base class for option objects that are passed to ORM queries.
+
+ These options may be consumed by :meth:`.Query.options`,
+ :meth:`.Select.options`, or in a more general sense by any
+ :meth:`.Executable.options` method. They are interpreted at
+ statement compile time or execution time in modern use. The
+ deprecated :class:`.MapperOption` is consumed at ORM query construction
+ time.
.. versionadded:: 1.4
@@ -680,6 +693,18 @@ class LoaderOption(HasCacheKey):
"""
+ _is_compile_state = False
+
+
+class LoaderOption(HasCacheKey, ORMOption):
+ """Describe a loader modification to an ORM statement at compilation time.
+
+ .. versionadded:: 1.4
+
+ """
+
+ _is_compile_state = True
+
def process_compile_state(self, compile_state):
"""Apply a modification to a given :class:`.CompileState`."""
@@ -693,18 +718,39 @@ class LoaderOption(HasCacheKey):
return False
+class UserDefinedOption(ORMOption):
+ """Base class for a user-defined option that can be consumed from the
+ :meth:`.SessionEvents.do_orm_execute` event hook.
+
+ """
+
+ _is_legacy_option = False
+
+ propagate_to_loaders = False
+ """if True, indicate this option should be carried along
+ to "secondary" Query objects produced during lazy loads
+ or refresh operations.
+
+ """
+
+ def __init__(self, payload=None):
+ self.payload = payload
+
+ def _gen_cache_key(self, *arg, **kw):
+ return ()
+
+
@util.deprecated_cls(
"1.4",
"The :class:`.MapperOption class is deprecated and will be removed "
- "in a future release. ORM options now run within the compilation "
- "phase and are based on the :class:`.LoaderOption` class which is "
- "intended for internal consumption only. For "
- "modifications to queries on a per-execution basis, the "
- ":meth:`.before_execute` hook will now intercept ORM :class:`.Query` "
- "objects before they are invoked",
+ "in a future release. For "
+ "modifications to queries on a per-execution basis, use the "
+ ":class:`.UserDefinedOption` class to establish state within a "
+ ":class:`.Query` or other Core statement, then use the "
+ ":meth:`.SessionEvents.before_orm_execute` hook to consume them.",
constructor=None,
)
-class MapperOption(object):
+class MapperOption(ORMOption):
"""Describe a modification to a Query"""
_is_legacy_option = True
@@ -735,23 +781,6 @@ class MapperOption(object):
def _generate_path_cache_key(self, path):
"""Used by the "baked lazy loader" to see if this option can be cached.
- The "baked lazy loader" refers to the :class:`_query.Query` that is
- produced during a lazy load operation for a mapped relationship.
- It does not yet apply to the "lazy" load operation for deferred
- or expired column attributes, however this may change in the future.
-
- This loader generates SQL for a query only once and attempts to cache
- it; from that point on, if the SQL has been cached it will no longer
- run the :meth:`_query.Query.options` method of the
- :class:`_query.Query`. The
- :class:`.MapperOption` object that wishes to participate within a lazy
- load operation therefore needs to tell the baked loader that it either
- needs to forego this caching, or that it needs to include the state of
- the :class:`.MapperOption` itself as part of its cache key, otherwise
- SQL or other query state that has been affected by the
- :class:`.MapperOption` may be cached in place of a query that does not
- include these modifications, or the option may not be invoked at all.
-
By default, this method returns the value ``False``, which means
the :class:`.BakedQuery` generated by the lazy loader will
not cache the SQL when this :class:`.MapperOption` is present.
@@ -760,26 +789,10 @@ class MapperOption(object):
an unlimited number of :class:`_query.Query` objects for an unlimited
number of :class:`.MapperOption` objects.
- .. versionchanged:: 1.2.8 the default return value of
- :meth:`.MapperOption._generate_cache_key` is False; previously it
- was ``None`` indicating "safe to cache, don't include as part of
- the cache key"
-
- To enable caching of :class:`_query.Query` objects within lazy loaders
- , a
- given :class:`.MapperOption` that returns a cache key must return a key
- that uniquely identifies the complete state of this option, which will
- match any other :class:`.MapperOption` that itself retains the
- identical state. This includes path options, flags, etc. It should
- be a state that is repeatable and part of a limited set of possible
- options.
-
- If the :class:`.MapperOption` does not apply to the given path and
- would not affect query results on such a path, it should return None,
- indicating the :class:`_query.Query` is safe to cache for this given
- loader path and that this :class:`.MapperOption` need not be
- part of the cache key.
-
+ For caching support it is recommended to use the
+ :class:`.UserDefinedOption` class in conjunction with
+ the :meth:`.Session.do_orm_execute` method so that statements may
+ be modified before they are cached.
"""
return False
diff --git a/lib/sqlalchemy/orm/loading.py b/lib/sqlalchemy/orm/loading.py
index 48641685e..616e757a3 100644
--- a/lib/sqlalchemy/orm/loading.py
+++ b/lib/sqlalchemy/orm/loading.py
@@ -26,6 +26,7 @@ from .base import _SET_DEFERRED_EXPIRED
from .util import _none_set
from .util import state_str
from .. import exc as sa_exc
+from .. import future
from .. import util
from ..engine import result_tuple
from ..engine.result import ChunkedIteratorResult
@@ -36,8 +37,20 @@ from ..sql import util as sql_util
_new_runid = util.counter()
-def instances(query, cursor, context):
- """Return an ORM result as an iterator."""
+def instances(cursor, context):
+ """Return a :class:`.Result` given an ORM query context.
+
+ :param cursor: a :class:`.CursorResult`, generated by a statement
+ which came from :class:`.ORMCompileState`
+
+ :param context: a :class:`.QueryContext` object
+
+ :return: a :class:`.Result` object representing ORM results
+
+ .. versionchanged:: 1.4 The instances() function now uses
+ :class:`.Result` objects and has an all new interface.
+
+ """
context.runid = _new_runid()
context.post_load_paths = {}
@@ -80,7 +93,7 @@ def instances(query, cursor, context):
],
)
- def chunks(size, as_tuples):
+ def chunks(size):
while True:
yield_per = size
@@ -94,7 +107,7 @@ def instances(query, cursor, context):
else:
fetch = cursor.fetchall()
- if not as_tuples:
+ if single_entity:
proc = process[0]
rows = [proc(row) for row in fetch]
else:
@@ -111,20 +124,62 @@ def instances(query, cursor, context):
break
result = ChunkedIteratorResult(
- row_metadata, chunks, source_supports_scalars=single_entity
+ row_metadata, chunks, source_supports_scalars=single_entity, raw=cursor
+ )
+
+ result._attributes = result._attributes.union(
+ dict(filtered=filtered, is_single_entity=single_entity)
)
+
if context.yield_per:
result.yield_per(context.yield_per)
- if single_entity:
- result = result.scalars()
+ return result
- filtered = context.compile_state._has_mapper_entities
- if filtered:
- result = result.unique()
+@util.preload_module("sqlalchemy.orm.context")
+def merge_frozen_result(session, statement, frozen_result, load=True):
+ querycontext = util.preloaded.orm_context
- return result
+ if load:
+ # flush current contents if we expect to load data
+ session._autoflush()
+
+ ctx = querycontext.ORMSelectCompileState._create_entities_collection(
+ statement
+ )
+
+ autoflush = session.autoflush
+ try:
+ session.autoflush = False
+ mapped_entities = [
+ i
+ for i, e in enumerate(ctx._entities)
+ if isinstance(e, querycontext._MapperEntity)
+ ]
+ keys = [ent._label_name for ent in ctx._entities]
+
+ keyed_tuple = result_tuple(
+ keys, [ent._extra_entities for ent in ctx._entities]
+ )
+
+ result = []
+ for newrow in frozen_result.rewrite_rows():
+ for i in mapped_entities:
+ if newrow[i] is not None:
+ newrow[i] = session._merge(
+ attributes.instance_state(newrow[i]),
+ attributes.instance_dict(newrow[i]),
+ load=load,
+ _recursive={},
+ _resolve_conflict_map={},
+ )
+
+ result.append(keyed_tuple(newrow))
+
+ return frozen_result.with_new_rows(result)
+ finally:
+ session.autoflush = autoflush
@util.preload_module("sqlalchemy.orm.context")
@@ -145,9 +200,7 @@ def merge_result(query, iterator, load=True):
else:
frozen_result = None
- ctx = querycontext.QueryCompileState._create_for_legacy_query(
- query, entities_only=True
- )
+ ctx = querycontext.ORMSelectCompileState._create_entities_collection(query)
autoflush = session.autoflush
try:
@@ -235,12 +288,15 @@ def get_from_identity(session, mapper, key, passive):
def load_on_ident(
- query,
+ session,
+ statement,
key,
+ load_options=None,
refresh_state=None,
with_for_update=None,
only_load_props=None,
no_autoflush=False,
+ bind_arguments=util.immutabledict(),
):
"""Load the given identity key from the database."""
if key is not None:
@@ -249,38 +305,59 @@ def load_on_ident(
else:
ident = identity_token = None
- if no_autoflush:
- query = query.autoflush(False)
-
return load_on_pk_identity(
- query,
+ session,
+ statement,
ident,
+ load_options=load_options,
refresh_state=refresh_state,
with_for_update=with_for_update,
only_load_props=only_load_props,
identity_token=identity_token,
+ no_autoflush=no_autoflush,
+ bind_arguments=bind_arguments,
)
def load_on_pk_identity(
- query,
+ session,
+ statement,
primary_key_identity,
+ load_options=None,
refresh_state=None,
with_for_update=None,
only_load_props=None,
identity_token=None,
+ no_autoflush=False,
+ bind_arguments=util.immutabledict(),
):
"""Load the given primary key identity from the database."""
+ query = statement
+ q = query._clone()
+
+ # TODO: fix these imports ....
+ from .context import QueryContext, ORMCompileState
+
+ if load_options is None:
+ load_options = QueryContext.default_load_options
+
+ compile_options = ORMCompileState.default_compile_options.merge(
+ q.compile_options
+ )
+
+ # checking that query doesnt have criteria on it
+ # just delete it here w/ optional assertion? since we are setting a
+ # where clause also
if refresh_state is None:
- q = query._clone()
- q._get_condition()
- else:
- q = query._clone()
+ _no_criterion_assertion(q, "get", order_by=False, distinct=False)
if primary_key_identity is not None:
- mapper = query._only_full_mapper_zero("load_on_pk_identity")
+ # mapper = query._only_full_mapper_zero("load_on_pk_identity")
+
+ # TODO: error checking?
+ mapper = query._raw_columns[0]._annotations["parententity"]
(_get_clause, _get_params) = mapper._get_clause
@@ -320,9 +397,8 @@ def load_on_pk_identity(
]
)
- q.load_options += {"_params": params}
+ load_options += {"_params": params}
- # with_for_update needs to be query.LockmodeArg()
if with_for_update is not None:
version_check = True
q._for_update_arg = with_for_update
@@ -333,11 +409,15 @@ def load_on_pk_identity(
version_check = False
if refresh_state and refresh_state.load_options:
- # if refresh_state.load_path.parent:
- q = q._with_current_path(refresh_state.load_path.parent)
- q = q.options(refresh_state.load_options)
+ compile_options += {"_current_path": refresh_state.load_path.parent}
+ q = q.options(*refresh_state.load_options)
- q._get_options(
+ # TODO: most of the compile_options that are not legacy only involve this
+ # function, so try to see if handling of them can mostly be local to here
+
+ q.compile_options, load_options = _set_get_options(
+ compile_options,
+ load_options,
populate_existing=bool(refresh_state),
version_check=version_check,
only_load_props=only_load_props,
@@ -346,12 +426,76 @@ def load_on_pk_identity(
)
q._order_by = None
+ if no_autoflush:
+ load_options += {"_autoflush": False}
+
+ result = (
+ session.execute(
+ q,
+ params=load_options._params,
+ execution_options={"_sa_orm_load_options": load_options},
+ bind_arguments=bind_arguments,
+ )
+ .unique()
+ .scalars()
+ )
+
try:
- return q.one()
+ return result.one()
except orm_exc.NoResultFound:
return None
+def _no_criterion_assertion(stmt, meth, order_by=True, distinct=True):
+ if (
+ stmt._where_criteria
+ or stmt.compile_options._statement is not None
+ or stmt._from_obj
+ or stmt._legacy_setup_joins
+ or stmt._limit_clause is not None
+ or stmt._offset_clause is not None
+ or stmt._group_by_clauses
+ or (order_by and stmt._order_by_clauses)
+ or (distinct and stmt._distinct)
+ ):
+ raise sa_exc.InvalidRequestError(
+ "Query.%s() being called on a "
+ "Query with existing criterion. " % meth
+ )
+
+
+def _set_get_options(
+ compile_opt,
+ load_opt,
+ populate_existing=None,
+ version_check=None,
+ only_load_props=None,
+ refresh_state=None,
+ identity_token=None,
+):
+
+ compile_options = {}
+ load_options = {}
+ if version_check:
+ load_options["_version_check"] = version_check
+ if populate_existing:
+ load_options["_populate_existing"] = populate_existing
+ if refresh_state:
+ load_options["_refresh_state"] = refresh_state
+ compile_options["_for_refresh_state"] = True
+ if only_load_props:
+ compile_options["_only_load_props"] = frozenset(only_load_props)
+ if identity_token:
+ load_options["_refresh_identity_token"] = identity_token
+
+ if load_options:
+ load_opt += load_options
+ if compile_options:
+ compile_opt += compile_options
+
+ return compile_opt, load_opt
+
+
def _setup_entity_query(
compile_state,
mapper,
@@ -487,7 +631,7 @@ def _instance_processor(
context, path, mapper, result, adapter, populators
)
- propagate_options = context.propagate_options
+ propagated_loader_options = context.propagated_loader_options
load_path = (
context.compile_state.current_path + path
if context.compile_state.current_path.path
@@ -639,8 +783,8 @@ def _instance_processor(
# be conservative about setting load_path when populate_existing
# is in effect; want to maintain options from the original
# load. see test_expire->test_refresh_maintains_deferred_options
- if isnew and (propagate_options or not populate_existing):
- state.load_options = propagate_options
+ if isnew and (propagated_loader_options or not populate_existing):
+ state.load_options = propagated_loader_options
state.load_path = load_path
_populate_full(
@@ -1055,7 +1199,7 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
result = False
- no_autoflush = passive & attributes.NO_AUTOFLUSH
+ no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
# in the case of inheritance, particularly concrete and abstract
# concrete inheritance, the class manager might have some keys
@@ -1080,10 +1224,16 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
# note: using from_statement() here means there is an adaption
# with adapt_on_names set up. the other option is to make the
# aliased() against a subquery which affects the SQL.
+
+ from .query import FromStatement
+
+ stmt = FromStatement(mapper, statement).options(
+ strategy_options.Load(mapper).undefer("*")
+ )
+
result = load_on_ident(
- session.query(mapper)
- .options(strategy_options.Load(mapper).undefer("*"))
- .from_statement(statement),
+ session,
+ stmt,
None,
only_load_props=attribute_names,
refresh_state=state,
@@ -1121,7 +1271,8 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
return
result = load_on_ident(
- session.query(mapper),
+ session,
+ future.select(mapper).apply_labels(),
identity_key,
refresh_state=state,
only_load_props=attribute_names,
diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py
index a6fb1039f..7bfe70c36 100644
--- a/lib/sqlalchemy/orm/mapper.py
+++ b/lib/sqlalchemy/orm/mapper.py
@@ -2237,6 +2237,8 @@ class Mapper(
"parentmapper": self,
"compile_state_plugin": "orm",
}
+ )._set_propagate_attrs(
+ {"compile_state_plugin": "orm", "plugin_subject": self}
)
@property
diff --git a/lib/sqlalchemy/orm/path_registry.py b/lib/sqlalchemy/orm/path_registry.py
index 1698a5181..2e5941713 100644
--- a/lib/sqlalchemy/orm/path_registry.py
+++ b/lib/sqlalchemy/orm/path_registry.py
@@ -228,10 +228,29 @@ class RootRegistry(PathRegistry):
PathRegistry.root = RootRegistry()
+class PathToken(HasCacheKey, str):
+ """cacheable string token"""
+
+ _intern = {}
+
+ def _gen_cache_key(self, anon_map, bindparams):
+ return (str(self),)
+
+ @classmethod
+ def intern(cls, strvalue):
+ if strvalue in cls._intern:
+ return cls._intern[strvalue]
+ else:
+ cls._intern[strvalue] = result = PathToken(strvalue)
+ return result
+
+
class TokenRegistry(PathRegistry):
__slots__ = ("token", "parent", "path", "natural_path")
def __init__(self, parent, token):
+ token = PathToken.intern(token)
+
self.token = token
self.parent = parent
self.path = parent.path + (token,)
diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py
index d14f6c27b..163ebf22a 100644
--- a/lib/sqlalchemy/orm/persistence.py
+++ b/lib/sqlalchemy/orm/persistence.py
@@ -25,6 +25,7 @@ from . import loading
from . import sync
from .base import state_str
from .. import exc as sa_exc
+from .. import future
from .. import sql
from .. import util
from ..sql import coercions
@@ -1424,8 +1425,10 @@ def _finalize_insert_update_commands(base_mapper, uowtransaction, states):
if toload_now:
state.key = base_mapper._identity_key_from_state(state)
+ stmt = future.select(mapper).apply_labels()
loading.load_on_ident(
- uowtransaction.session.query(mapper),
+ uowtransaction.session,
+ stmt,
state.key,
refresh_state=state,
only_load_props=toload_now,
@@ -1723,7 +1726,7 @@ class BulkUD(object):
self.context
) = compile_state = query._compile_state()
- self.mapper = compile_state._bind_mapper()
+ self.mapper = compile_state._entity_zero()
if isinstance(
compile_state._entities[0], query_context._RawColumnEntity,
diff --git a/lib/sqlalchemy/orm/properties.py b/lib/sqlalchemy/orm/properties.py
index 027786c19..4cf501e3f 100644
--- a/lib/sqlalchemy/orm/properties.py
+++ b/lib/sqlalchemy/orm/properties.py
@@ -346,14 +346,20 @@ class ColumnProperty(StrategizedProperty):
pe = self._parententity
# no adapter, so we aren't aliased
# assert self._parententity is self._parentmapper
- return self.prop.columns[0]._annotate(
- {
- "entity_namespace": pe,
- "parententity": pe,
- "parentmapper": pe,
- "orm_key": self.prop.key,
- "compile_state_plugin": "orm",
- }
+ return (
+ self.prop.columns[0]
+ ._annotate(
+ {
+ "entity_namespace": pe,
+ "parententity": pe,
+ "parentmapper": pe,
+ "orm_key": self.prop.key,
+ "compile_state_plugin": "orm",
+ }
+ )
+ ._set_propagate_attrs(
+ {"compile_state_plugin": "orm", "plugin_subject": pe}
+ )
)
def _memoized_attr_info(self):
@@ -388,6 +394,11 @@ class ColumnProperty(StrategizedProperty):
"orm_key": self.prop.key,
"compile_state_plugin": "orm",
}
+ )._set_propagate_attrs(
+ {
+ "compile_state_plugin": "orm",
+ "plugin_subject": self._parententity,
+ }
)
for col in self.prop.columns
]
diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py
index 8a861c3dc..25d6f4736 100644
--- a/lib/sqlalchemy/orm/query.py
+++ b/lib/sqlalchemy/orm/query.py
@@ -18,6 +18,7 @@ ORM session, whereas the ``Select`` construct interacts directly with the
database to return iterable result sets.
"""
+import itertools
from . import attributes
from . import exc as orm_exc
@@ -28,7 +29,8 @@ from .base import _assertions
from .context import _column_descriptions
from .context import _legacy_determine_last_joined_entity
from .context import _legacy_filter_by_entity_zero
-from .context import QueryCompileState
+from .context import ORMCompileState
+from .context import ORMFromStatementCompileState
from .context import QueryContext
from .interfaces import ORMColumnsClauseRole
from .util import aliased
@@ -42,18 +44,22 @@ from .. import inspection
from .. import log
from .. import sql
from .. import util
+from ..future.selectable import Select as FutureSelect
from ..sql import coercions
from ..sql import expression
from ..sql import roles
from ..sql import util as sql_util
+from ..sql.annotation import SupportsCloneAnnotations
from ..sql.base import _generative
from ..sql.base import Executable
+from ..sql.selectable import _SelectFromElements
from ..sql.selectable import ForUpdateArg
from ..sql.selectable import HasHints
from ..sql.selectable import HasPrefixes
from ..sql.selectable import HasSuffixes
from ..sql.selectable import LABEL_STYLE_NONE
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
+from ..sql.selectable import SelectStatementGrouping
from ..sql.util import _entity_namespace_key
from ..util import collections_abc
@@ -62,7 +68,15 @@ __all__ = ["Query", "QueryContext", "aliased"]
@inspection._self_inspects
@log.class_logger
-class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
+class Query(
+ _SelectFromElements,
+ SupportsCloneAnnotations,
+ HasPrefixes,
+ HasSuffixes,
+ HasHints,
+ Executable,
+):
+
"""ORM-level SQL construction object.
:class:`_query.Query`
@@ -105,7 +119,7 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
_legacy_setup_joins = ()
_label_style = LABEL_STYLE_NONE
- compile_options = QueryCompileState.default_compile_options
+ compile_options = ORMCompileState.default_compile_options
load_options = QueryContext.default_load_options
@@ -115,6 +129,11 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
_enable_assertions = True
_last_joined_entity = None
+ # mirrors that of ClauseElement, used to propagate the "orm"
+ # plugin as well as the "subject" of the plugin, e.g. the mapper
+ # we are querying against.
+ _propagate_attrs = util.immutabledict()
+
def __init__(self, entities, session=None):
"""Construct a :class:`_query.Query` directly.
@@ -148,7 +167,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
def _set_entities(self, entities):
self._raw_columns = [
- coercions.expect(roles.ColumnsClauseRole, ent)
+ coercions.expect(
+ roles.ColumnsClauseRole, ent, apply_propagate_attrs=self
+ )
for ent in util.to_list(entities)
]
@@ -183,7 +204,10 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
def _set_select_from(self, obj, set_base_alias):
fa = [
coercions.expect(
- roles.StrictFromClauseRole, elem, allow_select=True
+ roles.StrictFromClauseRole,
+ elem,
+ allow_select=True,
+ apply_propagate_attrs=self,
)
for elem in obj
]
@@ -332,15 +356,13 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
if (
not self.compile_options._set_base_alias
and not self.compile_options._with_polymorphic_adapt_map
- and self.compile_options._statement is None
+ # and self.compile_options._statement is None
):
# if we don't have legacy top level aliasing features in use
# then convert to a future select() directly
stmt = self._statement_20()
else:
- stmt = QueryCompileState._create_for_legacy_query(
- self, for_statement=True
- ).statement
+ stmt = self._compile_state(for_statement=True).statement
if self.load_options._params:
# this is the search and replace thing. this is kind of nuts
@@ -349,8 +371,67 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
return stmt
- def _statement_20(self):
- return QueryCompileState._create_future_select_from_query(self)
+ def _statement_20(self, orm_results=False):
+ # TODO: this event needs to be deprecated, as it currently applies
+ # only to ORM query and occurs at this spot that is now more
+ # or less an artificial spot
+ if self.dispatch.before_compile:
+ for fn in self.dispatch.before_compile:
+ new_query = fn(self)
+ if new_query is not None and new_query is not self:
+ self = new_query
+ if not fn._bake_ok:
+ self.compile_options += {"_bake_ok": False}
+
+ if self.compile_options._statement is not None:
+ stmt = FromStatement(
+ self._raw_columns, self.compile_options._statement
+ )
+ # TODO: once SubqueryLoader uses select(), we can remove
+ # "_orm_query" from this structure
+ stmt.__dict__.update(
+ _with_options=self._with_options,
+ _with_context_options=self._with_context_options,
+ compile_options=self.compile_options
+ + {"_orm_query": self.with_session(None)},
+ _execution_options=self._execution_options,
+ )
+ stmt._propagate_attrs = self._propagate_attrs
+ else:
+ stmt = FutureSelect.__new__(FutureSelect)
+
+ stmt.__dict__.update(
+ _raw_columns=self._raw_columns,
+ _where_criteria=self._where_criteria,
+ _from_obj=self._from_obj,
+ _legacy_setup_joins=self._legacy_setup_joins,
+ _order_by_clauses=self._order_by_clauses,
+ _group_by_clauses=self._group_by_clauses,
+ _having_criteria=self._having_criteria,
+ _distinct=self._distinct,
+ _distinct_on=self._distinct_on,
+ _with_options=self._with_options,
+ _with_context_options=self._with_context_options,
+ _hints=self._hints,
+ _statement_hints=self._statement_hints,
+ _correlate=self._correlate,
+ _auto_correlate=self._auto_correlate,
+ _limit_clause=self._limit_clause,
+ _offset_clause=self._offset_clause,
+ _for_update_arg=self._for_update_arg,
+ _prefixes=self._prefixes,
+ _suffixes=self._suffixes,
+ _label_style=self._label_style,
+ compile_options=self.compile_options
+ + {"_orm_query": self.with_session(None)},
+ _execution_options=self._execution_options,
+ )
+
+ if not orm_results:
+ stmt.compile_options += {"_orm_results": False}
+
+ stmt._propagate_attrs = self._propagate_attrs
+ return stmt
def subquery(self, name=None, with_labels=False, reduce_columns=False):
"""return the full SELECT statement represented by
@@ -879,7 +960,17 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
elif instance is attributes.PASSIVE_CLASS_MISMATCH:
return None
- return db_load_fn(self, primary_key_identity)
+ # apply_labels() not strictly necessary, however this will ensure that
+ # tablename_colname style is used which at the moment is asserted
+ # in a lot of unit tests :)
+
+ statement = self._statement_20(orm_results=True).apply_labels()
+ return db_load_fn(
+ self.session,
+ statement,
+ primary_key_identity,
+ load_options=self.load_options,
+ )
@property
def lazy_loaded_from(self):
@@ -1059,7 +1150,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
self._raw_columns = list(self._raw_columns)
self._raw_columns.append(
- coercions.expect(roles.ColumnsClauseRole, entity)
+ coercions.expect(
+ roles.ColumnsClauseRole, entity, apply_propagate_attrs=self
+ )
)
@_generative
@@ -1397,7 +1490,10 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
self._raw_columns = list(self._raw_columns)
self._raw_columns.extend(
- coercions.expect(roles.ColumnsClauseRole, c) for c in column
+ coercions.expect(
+ roles.ColumnsClauseRole, c, apply_propagate_attrs=self
+ )
+ for c in column
)
@util.deprecated(
@@ -1584,7 +1680,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
"""
for criterion in list(criterion):
- criterion = coercions.expect(roles.WhereHavingRole, criterion)
+ criterion = coercions.expect(
+ roles.WhereHavingRole, criterion, apply_propagate_attrs=self
+ )
# legacy vvvvvvvvvvvvvvvvvvvvvvvvvvv
if self._aliased_generation:
@@ -1742,7 +1840,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
"""
self._having_criteria += (
- coercions.expect(roles.WhereHavingRole, criterion),
+ coercions.expect(
+ roles.WhereHavingRole, criterion, apply_propagate_attrs=self
+ ),
)
def _set_op(self, expr_fn, *q):
@@ -2177,7 +2277,12 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
self._legacy_setup_joins += tuple(
(
- coercions.expect(roles.JoinTargetRole, prop[0], legacy=True),
+ coercions.expect(
+ roles.JoinTargetRole,
+ prop[0],
+ legacy=True,
+ apply_propagate_attrs=self,
+ ),
prop[1] if len(prop) == 2 else None,
None,
{
@@ -2605,7 +2710,9 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
ORM tutorial
"""
- statement = coercions.expect(roles.SelectStatementRole, statement)
+ statement = coercions.expect(
+ roles.SelectStatementRole, statement, apply_propagate_attrs=self
+ )
self.compile_options += {"_statement": statement}
def first(self):
@@ -2711,76 +2818,50 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
def __iter__(self):
return self._iter().__iter__()
- # TODO: having _iter(), _execute_and_instances, _connection_from_session,
- # etc., is all too much.
+ def _iter(self):
+ # new style execution.
+ params = self.load_options._params
+ statement = self._statement_20(orm_results=True)
+ result = self.session.execute(
+ statement,
+ params,
+ execution_options={"_sa_orm_load_options": self.load_options},
+ )
- # new recipes / extensions should be based on an event hook of some kind,
- # can allow an execution that would return a Result to take in all the
- # information and return a different Result. this has to be at
- # the session / connection .execute() level, and can perhaps be
- # before_execute() but needs to be focused around rewriting of results.
+ # legacy: automatically set scalars, unique
+ if result._attributes.get("is_single_entity", False):
+ result = result.scalars()
- # the dialect do_execute() *may* be this but that seems a bit too low
- # level. it may need to be ORM session based and be a session event,
- # becasue it might not invoke the cursor, might invoke for multiple
- # connections, etc. OK really has to be a session level event in this
- # case to support horizontal sharding.
+ if result._attributes.get("filtered", False):
+ result = result.unique()
- def _iter(self):
- context = self._compile_context()
+ return result
+
+ def _execute_crud(self, stmt, mapper):
+ conn = self.session.connection(
+ mapper=mapper, clause=stmt, close_with_result=True
+ )
- if self.load_options._autoflush:
- self.session._autoflush()
- return self._execute_and_instances(context)
+ return conn._execute_20(
+ stmt, self.load_options._params, self._execution_options
+ )
def __str__(self):
- compile_state = self._compile_state()
+ statement = self._statement_20(orm_results=True)
+
try:
bind = (
- self._get_bind_args(compile_state, self.session.get_bind)
+ self._get_bind_args(statement, self.session.get_bind)
if self.session
else None
)
except sa_exc.UnboundExecutionError:
bind = None
- return str(compile_state.statement.compile(bind))
-
- def _connection_from_session(self, **kw):
- conn = self.session.connection(**kw)
- if self._execution_options:
- conn = conn.execution_options(**self._execution_options)
- return conn
-
- def _execute_and_instances(self, querycontext, params=None):
- conn = self._get_bind_args(
- querycontext.compile_state,
- self._connection_from_session,
- close_with_result=True,
- )
- if params is None:
- params = querycontext.load_options._params
+ return str(statement.compile(bind))
- result = conn._execute_20(
- querycontext.compile_state.statement,
- params,
- # execution_options=self.session._orm_execution_options(),
- )
- return loading.instances(querycontext.query, result, querycontext)
-
- def _execute_crud(self, stmt, mapper):
- conn = self._connection_from_session(
- mapper=mapper, clause=stmt, close_with_result=True
- )
-
- return conn.execute(stmt, self.load_options._params)
-
- def _get_bind_args(self, compile_state, fn, **kw):
- return fn(
- mapper=compile_state._bind_mapper(),
- clause=compile_state.statement,
- **kw
- )
+ def _get_bind_args(self, statement, fn, **kw):
+ return fn(clause=statement, **kw)
@property
def column_descriptions(self):
@@ -2837,10 +2918,21 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
"for linking ORM results to arbitrary select constructs.",
version="1.4",
)
- compile_state = QueryCompileState._create_for_legacy_query(self)
- context = QueryContext(compile_state, self.session)
+ compile_state = ORMCompileState._create_for_legacy_query(self)
+ context = QueryContext(
+ compile_state, self.session, self.load_options
+ )
+
+ result = loading.instances(result_proxy, context)
+
+ # legacy: automatically set scalars, unique
+ if result._attributes.get("is_single_entity", False):
+ result = result.scalars()
+
+ if result._attributes.get("filtered", False):
+ result = result.unique()
- return loading.instances(self, result_proxy, context)
+ return result
def merge_result(self, iterator, load=True):
"""Merge a result into this :class:`_query.Query` object's Session.
@@ -3239,36 +3331,62 @@ class Query(HasPrefixes, HasSuffixes, HasHints, Executable):
return update_op.rowcount
def _compile_state(self, for_statement=False, **kw):
- # TODO: this needs to become a general event for all
- # Executable objects as well (all ClauseElement?)
- # but then how do we clarify that this event is only for
- # *top level* compile, not as an embedded element is visted?
- # how does that even work because right now a Query that does things
- # like from_self() will in fact invoke before_compile for each
- # inner element.
- # OK perhaps with 2.0 style folks will continue using before_execute()
- # as they can now, as a select() with ORM elements will be delivered
- # there, OK. sort of fixes the "bake_ok" problem too.
- if self.dispatch.before_compile:
- for fn in self.dispatch.before_compile:
- new_query = fn(self)
- if new_query is not None and new_query is not self:
- self = new_query
- if not fn._bake_ok:
- self.compile_options += {"_bake_ok": False}
-
- compile_state = QueryCompileState._create_for_legacy_query(
+ return ORMCompileState._create_for_legacy_query(
self, for_statement=for_statement, **kw
)
- return compile_state
def _compile_context(self, for_statement=False):
compile_state = self._compile_state(for_statement=for_statement)
- context = QueryContext(compile_state, self.session)
+ context = QueryContext(compile_state, self.session, self.load_options)
return context
+class FromStatement(SelectStatementGrouping, Executable):
+ """Core construct that represents a load of ORM objects from a finished
+ select or text construct.
+
+ """
+
+ compile_options = ORMFromStatementCompileState.default_compile_options
+
+ _compile_state_factory = ORMFromStatementCompileState.create_for_statement
+
+ _is_future = True
+
+ _for_update_arg = None
+
+ def __init__(self, entities, element):
+ self._raw_columns = [
+ coercions.expect(
+ roles.ColumnsClauseRole, ent, apply_propagate_attrs=self
+ )
+ for ent in util.to_list(entities)
+ ]
+ super(FromStatement, self).__init__(element)
+
+ def _compiler_dispatch(self, compiler, **kw):
+ compile_state = self._compile_state_factory(self, self, **kw)
+
+ toplevel = not compiler.stack
+
+ if toplevel:
+ compiler.compile_state = compile_state
+
+ return compiler.process(compile_state.statement, **kw)
+
+ def _ensure_disambiguated_names(self):
+ return self
+
+ def get_children(self, **kw):
+ for elem in itertools.chain.from_iterable(
+ element._from_objects for element in self._raw_columns
+ ):
+ yield elem
+ for elem in super(FromStatement, self).get_children(**kw):
+ yield elem
+
+
class AliasOption(interfaces.LoaderOption):
@util.deprecated(
"1.4",
diff --git a/lib/sqlalchemy/orm/relationships.py b/lib/sqlalchemy/orm/relationships.py
index f539e968f..e82cd174f 100644
--- a/lib/sqlalchemy/orm/relationships.py
+++ b/lib/sqlalchemy/orm/relationships.py
@@ -2737,12 +2737,12 @@ class JoinCondition(object):
def replace(element):
if "remote" in element._annotations:
- v = element._annotations.copy()
+ v = dict(element._annotations)
del v["remote"]
v["local"] = True
return element._with_annotations(v)
elif "local" in element._annotations:
- v = element._annotations.copy()
+ v = dict(element._annotations)
del v["local"]
v["remote"] = True
return element._with_annotations(v)
diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py
index 6cb8a0062..8d2f13df3 100644
--- a/lib/sqlalchemy/orm/session.py
+++ b/lib/sqlalchemy/orm/session.py
@@ -12,6 +12,7 @@ import sys
import weakref
from . import attributes
+from . import context
from . import exc
from . import identity
from . import loading
@@ -28,13 +29,12 @@ from .base import state_str
from .unitofwork import UOWTransaction
from .. import engine
from .. import exc as sa_exc
-from .. import sql
+from .. import future
from .. import util
from ..inspection import inspect
from ..sql import coercions
from ..sql import roles
-from ..sql import util as sql_util
-
+from ..sql import visitors
__all__ = ["Session", "SessionTransaction", "sessionmaker"]
@@ -98,6 +98,160 @@ DEACTIVE = util.symbol("DEACTIVE")
CLOSED = util.symbol("CLOSED")
+class ORMExecuteState(object):
+ """Stateful object used for the :meth:`.SessionEvents.do_orm_execute`
+
+ .. versionadded:: 1.4
+
+ """
+
+ __slots__ = (
+ "session",
+ "statement",
+ "parameters",
+ "execution_options",
+ "bind_arguments",
+ )
+
+ def __init__(
+ self, session, statement, parameters, execution_options, bind_arguments
+ ):
+ self.session = session
+ self.statement = statement
+ self.parameters = parameters
+ self.execution_options = execution_options
+ self.bind_arguments = bind_arguments
+
+ def invoke_statement(
+ self,
+ statement=None,
+ params=None,
+ execution_options=None,
+ bind_arguments=None,
+ ):
+ """Execute the statement represented by this
+ :class:`.ORMExecuteState`, without re-invoking events.
+
+ This method essentially performs a re-entrant execution of the
+ current statement for which the :meth:`.SessionEvents.do_orm_execute`
+ event is being currently invoked. The use case for this is
+ for event handlers that want to override how the ultimate results
+ object is returned, such as for schemes that retrieve results from
+ an offline cache or which concatenate results from multiple executions.
+
+ :param statement: optional statement to be invoked, in place of the
+ statement currently represented by :attr:`.ORMExecuteState.statement`.
+
+ :param params: optional dictionary of parameters which will be merged
+ into the existing :attr:`.ORMExecuteState.parameters` of this
+ :class:`.ORMExecuteState`.
+
+ :param execution_options: optional dictionary of execution options
+ will be merged into the existing
+ :attr:`.ORMExecuteState.execution_options` of this
+ :class:`.ORMExecuteState`.
+
+ :param bind_arguments: optional dictionary of bind_arguments
+ which will be merged amongst the current
+ :attr:`.ORMExecuteState.bind_arguments`
+ of this :class:`.ORMExecuteState`.
+
+ :return: a :class:`_engine.Result` object with ORM-level results.
+
+ .. seealso::
+
+ :ref:`examples_caching` - includes example use of the
+ :meth:`.SessionEvents.do_orm_execute` hook as well as the
+ :meth:`.ORMExecuteState.invoke_query` method.
+
+
+ """
+
+ if statement is None:
+ statement = self.statement
+
+ _bind_arguments = dict(self.bind_arguments)
+ if bind_arguments:
+ _bind_arguments.update(bind_arguments)
+ _bind_arguments["_sa_skip_events"] = True
+
+ if params:
+ _params = dict(self.parameters)
+ _params.update(params)
+ else:
+ _params = self.parameters
+
+ if execution_options:
+ _execution_options = dict(self.execution_options)
+ _execution_options.update(execution_options)
+ else:
+ _execution_options = self.execution_options
+
+ return self.session.execute(
+ statement, _params, _execution_options, _bind_arguments
+ )
+
+ @property
+ def orm_query(self):
+ """Return the :class:`_orm.Query` object associated with this
+ execution.
+
+ For SQLAlchemy-2.0 style usage, the :class:`_orm.Query` object
+ is not used at all, and this attribute will return None.
+
+ """
+ load_opts = self.load_options
+ if load_opts._orm_query:
+ return load_opts._orm_query
+
+ opts = self._orm_compile_options()
+ if opts is not None:
+ return opts._orm_query
+ else:
+ return None
+
+ def _orm_compile_options(self):
+ opts = self.statement.compile_options
+ if isinstance(opts, context.ORMCompileState.default_compile_options):
+ return opts
+ else:
+ return None
+
+ @property
+ def loader_strategy_path(self):
+ """Return the :class:`.PathRegistry` for the current load path.
+
+ This object represents the "path" in a query along relationships
+ when a particular object or collection is being loaded.
+
+ """
+ opts = self._orm_compile_options()
+ if opts is not None:
+ return opts._current_path
+ else:
+ return None
+
+ @property
+ def load_options(self):
+ """Return the load_options that will be used for this execution."""
+
+ return self.execution_options.get(
+ "_sa_orm_load_options", context.QueryContext.default_load_options
+ )
+
+ @property
+ def user_defined_options(self):
+ """The sequence of :class:`.UserDefinedOptions` that have been
+ associated with the statement being invoked.
+
+ """
+ return [
+ opt
+ for opt in self.statement._with_options
+ if not opt._is_compile_state and not opt._is_legacy_option
+ ]
+
+
class SessionTransaction(object):
"""A :class:`.Session`-level transaction.
@@ -1032,9 +1186,7 @@ class Session(_SessionClassMethods):
def connection(
self,
- mapper=None,
- clause=None,
- bind=None,
+ bind_arguments=None,
close_with_result=False,
execution_options=None,
**kw
@@ -1059,23 +1211,18 @@ class Session(_SessionClassMethods):
resolved through any of the optional keyword arguments. This
ultimately makes usage of the :meth:`.get_bind` method for resolution.
+ :param bind_arguments: dictionary of bind arguments. may include
+ "mapper", "bind", "clause", other custom arguments that are passed
+ to :meth:`.Session.get_bind`.
+
:param bind:
- Optional :class:`_engine.Engine` to be used as the bind. If
- this engine is already involved in an ongoing transaction,
- that connection will be used. This argument takes precedence
- over ``mapper``, ``clause``.
+ deprecated; use bind_arguments
:param mapper:
- Optional :func:`.mapper` mapped class, used to identify
- the appropriate bind. This argument takes precedence over
- ``clause``.
+ deprecated; use bind_arguments
:param clause:
- A :class:`_expression.ClauseElement` (i.e.
- :func:`_expression.select`,
- :func:`_expression.text`,
- etc.) which will be used to locate a bind, if a bind
- cannot otherwise be identified.
+ deprecated; use bind_arguments
:param close_with_result: Passed to :meth:`_engine.Engine.connect`,
indicating the :class:`_engine.Connection` should be considered
@@ -1097,13 +1244,16 @@ class Session(_SessionClassMethods):
:ref:`session_transaction_isolation`
:param \**kw:
- Additional keyword arguments are sent to :meth:`get_bind()`,
- allowing additional arguments to be passed to custom
- implementations of :meth:`get_bind`.
+ deprecated; use bind_arguments
"""
+
+ if not bind_arguments:
+ bind_arguments = kw
+
+ bind = bind_arguments.pop("bind", None)
if bind is None:
- bind = self.get_bind(mapper, clause=clause, **kw)
+ bind = self.get_bind(**bind_arguments)
return self._connection_for_bind(
bind,
@@ -1124,7 +1274,14 @@ class Session(_SessionClassMethods):
conn = conn.execution_options(**execution_options)
return conn
- def execute(self, clause, params=None, mapper=None, bind=None, **kw):
+ def execute(
+ self,
+ statement,
+ params=None,
+ execution_options=util.immutabledict(),
+ bind_arguments=None,
+ **kw
+ ):
r"""Execute a SQL expression construct or string statement within
the current transaction.
@@ -1222,22 +1379,19 @@ class Session(_SessionClassMethods):
"executemany" will be invoked. The keys in each dictionary
must correspond to parameter names present in the statement.
+ :param bind_arguments: dictionary of additional arguments to determine
+ the bind. may include "mapper", "bind", or other custom arguments.
+ Contents of this dictionary are passed to the
+ :meth:`.Session.get_bind` method.
+
:param mapper:
- Optional :func:`.mapper` or mapped class, used to identify
- the appropriate bind. This argument takes precedence over
- ``clause`` when locating a bind. See :meth:`.Session.get_bind`
- for more details.
+ deprecated; use the bind_arguments dictionary
:param bind:
- Optional :class:`_engine.Engine` to be used as the bind. If
- this engine is already involved in an ongoing transaction,
- that connection will be used. This argument takes
- precedence over ``mapper`` and ``clause`` when locating
- a bind.
+ deprecated; use the bind_arguments dictionary
:param \**kw:
- Additional keyword arguments are sent to :meth:`.Session.get_bind()`
- to allow extensibility of "bind" schemes.
+ deprecated; use the bind_arguments dictionary
.. seealso::
@@ -1253,20 +1407,63 @@ class Session(_SessionClassMethods):
in order to execute the statement.
"""
- clause = coercions.expect(roles.CoerceTextStatementRole, clause)
- if bind is None:
- bind = self.get_bind(mapper, clause=clause, **kw)
+ statement = coercions.expect(roles.CoerceTextStatementRole, statement)
- return self._connection_for_bind(
- bind, close_with_result=True
- )._execute_20(clause, params,)
+ if not bind_arguments:
+ bind_arguments = kw
+ elif kw:
+ bind_arguments.update(kw)
+
+ compile_state_cls = statement._get_plugin_compile_state_cls("orm")
+ if compile_state_cls:
+ compile_state_cls.orm_pre_session_exec(
+ self, statement, execution_options, bind_arguments
+ )
+ else:
+ bind_arguments.setdefault("clause", statement)
+ if statement._is_future:
+ execution_options = util.immutabledict().merge_with(
+ execution_options, {"future_result": True}
+ )
+
+ if self.dispatch.do_orm_execute:
+ skip_events = bind_arguments.pop("_sa_skip_events", False)
+
+ if not skip_events:
+ orm_exec_state = ORMExecuteState(
+ self, statement, params, execution_options, bind_arguments
+ )
+ for fn in self.dispatch.do_orm_execute:
+ result = fn(orm_exec_state)
+ if result:
+ return result
+
+ bind = self.get_bind(**bind_arguments)
+
+ conn = self._connection_for_bind(bind, close_with_result=True)
+ result = conn._execute_20(statement, params or {}, execution_options)
- def scalar(self, clause, params=None, mapper=None, bind=None, **kw):
+ if compile_state_cls:
+ result = compile_state_cls.orm_setup_cursor_result(
+ self, bind_arguments, result
+ )
+
+ return result
+
+ def scalar(
+ self,
+ statement,
+ params=None,
+ execution_options=None,
+ mapper=None,
+ bind=None,
+ **kw
+ ):
"""Like :meth:`~.Session.execute` but return a scalar result."""
return self.execute(
- clause, params=params, mapper=mapper, bind=bind, **kw
+ statement, params=params, mapper=mapper, bind=bind, **kw
).scalar()
def close(self):
@@ -1422,7 +1619,7 @@ class Session(_SessionClassMethods):
"""
self._add_bind(table, bind)
- def get_bind(self, mapper=None, clause=None):
+ def get_bind(self, mapper=None, clause=None, bind=None):
"""Return a "bind" to which this :class:`.Session` is bound.
The "bind" is usually an instance of :class:`_engine.Engine`,
@@ -1497,6 +1694,8 @@ class Session(_SessionClassMethods):
:meth:`.Session.bind_table`
"""
+ if bind:
+ return bind
if mapper is clause is None:
if self.bind:
@@ -1520,6 +1719,8 @@ class Session(_SessionClassMethods):
raise
if self.__binds:
+ # matching mappers and selectables to entries in the
+ # binds dictionary; supported use case.
if mapper:
for cls in mapper.class_.__mro__:
if cls in self.__binds:
@@ -1528,18 +1729,32 @@ class Session(_SessionClassMethods):
clause = mapper.persist_selectable
if clause is not None:
- for t in sql_util.find_tables(clause, include_crud=True):
- if t in self.__binds:
- return self.__binds[t]
+ for obj in visitors.iterate(clause):
+ if obj in self.__binds:
+ return self.__binds[obj]
+ # session has a single bind; supported use case.
if self.bind:
return self.bind
- if isinstance(clause, sql.expression.ClauseElement) and clause.bind:
- return clause.bind
+ # now we are in legacy territory. looking for "bind" on tables
+ # that are via bound metadata. this goes away in 2.0.
+ if mapper and clause is None:
+ clause = mapper.persist_selectable
- if mapper and mapper.persist_selectable.bind:
- return mapper.persist_selectable.bind
+ if clause is not None:
+ if clause.bind:
+ return clause.bind
+ # for obj in visitors.iterate(clause):
+ # if obj.bind:
+ # return obj.bind
+
+ if mapper:
+ if mapper.persist_selectable.bind:
+ return mapper.persist_selectable.bind
+ # for obj in visitors.iterate(mapper.persist_selectable):
+ # if obj.bind:
+ # return obj.bind
context = []
if mapper is not None:
@@ -1722,9 +1937,11 @@ class Session(_SessionClassMethods):
else:
with_for_update = None
+ stmt = future.select(object_mapper(instance))
if (
loading.load_on_ident(
- self.query(object_mapper(instance)),
+ self,
+ stmt,
state.key,
refresh_state=state,
with_for_update=with_for_update,
diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py
index c0c090b3d..a7d501b53 100644
--- a/lib/sqlalchemy/orm/strategies.py
+++ b/lib/sqlalchemy/orm/strategies.py
@@ -33,6 +33,7 @@ from .util import _none_set
from .util import aliased
from .. import event
from .. import exc as sa_exc
+from .. import future
from .. import inspect
from .. import log
from .. import sql
@@ -440,10 +441,13 @@ class DeferredColumnLoader(LoaderStrategy):
if self.raiseload:
self._invoke_raise_load(state, passive, "raise")
- query = session.query(localparent)
if (
loading.load_on_ident(
- query, state.key, only_load_props=group, refresh_state=state
+ session,
+ future.select(localparent).apply_labels(),
+ state.key,
+ only_load_props=group,
+ refresh_state=state,
)
is None
):
@@ -897,7 +901,7 @@ class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots):
q(session)
.with_post_criteria(lambda q: q._set_lazyload_from(state))
._load_on_pk_identity(
- session.query(self.mapper), primary_key_identity
+ session, session.query(self.mapper), primary_key_identity
)
)
@@ -1090,7 +1094,6 @@ class SubqueryLoader(PostLoader):
parentmapper=None,
**kwargs
):
-
if (
not compile_state.compile_options._enable_eagerloads
or compile_state.compile_options._for_refresh_state
@@ -1146,6 +1149,7 @@ class SubqueryLoader(PostLoader):
# generate a new Query from the original, then
# produce a subquery from it.
left_alias = self._generate_from_original_query(
+ compile_state,
orig_query,
leftmost_mapper,
leftmost_attr,
@@ -1164,7 +1168,9 @@ class SubqueryLoader(PostLoader):
def set_state_options(compile_state):
compile_state.attributes.update(
{
- ("orig_query", SubqueryLoader): orig_query,
+ ("orig_query", SubqueryLoader): orig_query.with_session(
+ None
+ ),
("subquery_path", None): subq_path,
}
)
@@ -1188,6 +1194,7 @@ class SubqueryLoader(PostLoader):
# by create_row_processor
# NOTE: be sure to consult baked.py for some hardcoded logic
# about this structure as well
+ assert q.session is None
path.set(
compile_state.attributes, "subqueryload_data", {"query": q},
)
@@ -1218,6 +1225,7 @@ class SubqueryLoader(PostLoader):
def _generate_from_original_query(
self,
+ orig_compile_state,
orig_query,
leftmost_mapper,
leftmost_attr,
@@ -1243,11 +1251,18 @@ class SubqueryLoader(PostLoader):
}
)
- cs = q._clone()
+ # NOTE: keystone has a test which is counting before_compile
+ # events. That test is in one case dependent on an extra
+ # call that was occurring here within the subqueryloader setup
+ # process, probably when the subquery() method was called.
+ # Ultimately that call will not be occurring here.
+ # the event has already been called on the original query when
+ # we are here in any case, so keystone will need to adjust that
+ # test.
- # using the _compile_state method so that the before_compile()
- # event is hit here. keystone is testing for this.
- compile_state = cs._compile_state(entities_only=True)
+ # for column information, look to the compile state that is
+ # already being passed through
+ compile_state = orig_compile_state
# select from the identity columns of the outer (specifically, these
# are the 'local_cols' of the property). This will remove
@@ -1260,7 +1275,6 @@ class SubqueryLoader(PostLoader):
],
compile_state._get_current_adapter(),
)
- # q.add_columns.non_generative(q, target_cols)
q._set_entities(target_cols)
distinct_target_key = leftmost_relationship.distinct_target_key
@@ -1428,10 +1442,20 @@ class SubqueryLoader(PostLoader):
"""
- __slots__ = ("subq_info", "subq", "_data")
+ __slots__ = (
+ "session",
+ "execution_options",
+ "load_options",
+ "subq",
+ "_data",
+ )
- def __init__(self, subq_info):
- self.subq_info = subq_info
+ def __init__(self, context, subq_info):
+ # avoid creating a cycle by storing context
+ # even though that's preferable
+ self.session = context.session
+ self.execution_options = context.execution_options
+ self.load_options = context.load_options
self.subq = subq_info["query"]
self._data = None
@@ -1443,7 +1467,17 @@ class SubqueryLoader(PostLoader):
def _load(self):
self._data = collections.defaultdict(list)
- rows = list(self.subq)
+ q = self.subq
+ assert q.session is None
+ if "compiled_cache" in self.execution_options:
+ q = q.execution_options(
+ compiled_cache=self.execution_options["compiled_cache"]
+ )
+ q = q.with_session(self.session)
+
+ # to work with baked query, the parameters may have been
+ # updated since this query was created, so take these into account
+ rows = list(q.params(self.load_options._params))
for k, v in itertools.groupby(rows, lambda x: x[1:]):
self._data[k].extend(vv[0] for vv in v)
@@ -1474,14 +1508,7 @@ class SubqueryLoader(PostLoader):
subq = subq_info["query"]
- if subq.session is None:
- subq.session = context.session
- assert subq.session is context.session, (
- "Subquery session doesn't refer to that of "
- "our context. Are there broken context caching "
- "schemes being used?"
- )
-
+ assert subq.session is None
local_cols = self.parent_property.local_columns
# cache the loaded collections in the context
@@ -1489,7 +1516,7 @@ class SubqueryLoader(PostLoader):
# call upon create_row_processor again
collections = path.get(context.attributes, "collections")
if collections is None:
- collections = self._SubqCollections(subq_info)
+ collections = self._SubqCollections(context, subq_info)
path.set(context.attributes, "collections", collections)
if adapter:
diff --git a/lib/sqlalchemy/orm/util.py b/lib/sqlalchemy/orm/util.py
index 1e415e49c..ce37d962e 100644
--- a/lib/sqlalchemy/orm/util.py
+++ b/lib/sqlalchemy/orm/util.py
@@ -41,6 +41,7 @@ from ..sql import expression
from ..sql import roles
from ..sql import util as sql_util
from ..sql import visitors
+from ..sql.annotation import SupportsCloneAnnotations
from ..sql.base import ColumnCollection
@@ -694,6 +695,8 @@ class AliasedInsp(
"entity_namespace": self,
"compile_state_plugin": "orm",
}
+ )._set_propagate_attrs(
+ {"compile_state_plugin": "orm", "plugin_subject": self}
)
@property
@@ -748,10 +751,20 @@ class AliasedInsp(
)
def _adapt_element(self, elem, key=None):
- d = {"parententity": self, "parentmapper": self.mapper}
+ d = {
+ "parententity": self,
+ "parentmapper": self.mapper,
+ "compile_state_plugin": "orm",
+ }
if key:
d["orm_key"] = key
- return self._adapter.traverse(elem)._annotate(d)
+ return (
+ self._adapter.traverse(elem)
+ ._annotate(d)
+ ._set_propagate_attrs(
+ {"compile_state_plugin": "orm", "plugin_subject": self}
+ )
+ )
def _entity_for_mapper(self, mapper):
self_poly = self.with_polymorphic_mappers
@@ -1037,7 +1050,7 @@ def with_polymorphic(
@inspection._self_inspects
-class Bundle(ORMColumnsClauseRole, InspectionAttr):
+class Bundle(ORMColumnsClauseRole, SupportsCloneAnnotations, InspectionAttr):
"""A grouping of SQL expressions that are returned by a :class:`.Query`
under one namespace.
@@ -1070,6 +1083,8 @@ class Bundle(ORMColumnsClauseRole, InspectionAttr):
is_bundle = True
+ _propagate_attrs = util.immutabledict()
+
def __init__(self, name, *exprs, **kw):
r"""Construct a new :class:`.Bundle`.
@@ -1090,7 +1105,10 @@ class Bundle(ORMColumnsClauseRole, InspectionAttr):
"""
self.name = self._label = name
self.exprs = exprs = [
- coercions.expect(roles.ColumnsClauseRole, expr) for expr in exprs
+ coercions.expect(
+ roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
+ )
+ for expr in exprs
]
self.c = self.columns = ColumnCollection(
@@ -1145,11 +1163,14 @@ class Bundle(ORMColumnsClauseRole, InspectionAttr):
return cloned
def __clause_element__(self):
+ annotations = self._annotations.union(
+ {"bundle": self, "entity_namespace": self}
+ )
return expression.ClauseList(
_literal_as_text_role=roles.ColumnsClauseRole,
group=False,
*[e._annotations.get("bundle", e) for e in self.exprs]
- )._annotate({"bundle": self, "entity_namespace": self})
+ )._annotate(annotations)
@property
def clauses(self):