1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
|
# orm/dynamic.py
# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: ignore-errors
"""Dynamic collection API.
Dynamic collections act like Query() objects for read operations and support
basic add/delete mutation.
"""
from __future__ import annotations
from typing import Any
from typing import Iterable
from typing import Iterator
from typing import TYPE_CHECKING
from typing import TypeVar
from . import attributes
from . import exc as orm_exc
from . import relationships
from . import util as orm_util
from .query import Query
from .session import object_session
from .writeonly import AbstractCollectionWriter
from .writeonly import WriteOnlyAttributeImpl
from .writeonly import WriteOnlyHistory
from .writeonly import WriteOnlyLoader
from .. import util
from ..engine import result
if TYPE_CHECKING:
from .session import Session
_T = TypeVar("_T", bound=Any)
class DynamicCollectionHistory(WriteOnlyHistory):
def __init__(self, attr, state, passive, apply_to=None):
if apply_to:
coll = AppenderQuery(attr, state).autoflush(False)
self.unchanged_items = util.OrderedIdentitySet(coll)
self.added_items = apply_to.added_items
self.deleted_items = apply_to.deleted_items
self._reconcile_collection = True
else:
self.deleted_items = util.OrderedIdentitySet()
self.added_items = util.OrderedIdentitySet()
self.unchanged_items = util.OrderedIdentitySet()
self._reconcile_collection = False
class DynamicAttributeImpl(WriteOnlyAttributeImpl):
_supports_dynamic_iteration = True
collection_history_cls = DynamicCollectionHistory
def __init__(
self,
class_,
key,
typecallable,
dispatch,
target_mapper,
order_by,
query_class=None,
**kw,
):
attributes.AttributeImpl.__init__(
self, class_, key, typecallable, dispatch, **kw
)
self.target_mapper = target_mapper
if order_by:
self.order_by = tuple(order_by)
if not query_class:
self.query_class = AppenderQuery
elif AppenderMixin in query_class.mro():
self.query_class = query_class
else:
self.query_class = mixin_user_query(query_class)
@relationships.RelationshipProperty.strategy_for(lazy="dynamic")
class DynaLoader(WriteOnlyLoader):
impl_class = DynamicAttributeImpl
class AppenderMixin(AbstractCollectionWriter[_T]):
"""A mixin that expects to be mixing in a Query class with
AbstractAppender.
"""
query_class = None
def __init__(self, attr, state):
Query.__init__(self, attr.target_mapper, None)
super().__init__(attr, state)
@property
def session(self) -> Session:
sess = object_session(self.instance)
if (
sess is not None
and self.autoflush
and sess.autoflush
and self.instance in sess
):
sess.flush()
if not orm_util.has_identity(self.instance):
return None
else:
return sess
@session.setter
def session(self, session: Session) -> None:
self.sess = session
def _iter(self):
sess = self.session
if sess is None:
state = attributes.instance_state(self.instance)
if state.detached:
util.warn(
"Instance %s is detached, dynamic relationship cannot "
"return a correct result. This warning will become "
"a DetachedInstanceError in a future release."
% (orm_util.state_str(state))
)
return result.IteratorResult(
result.SimpleResultMetaData([self.attr.class_.__name__]),
self.attr._get_collection_history(
attributes.instance_state(self.instance),
attributes.PASSIVE_NO_INITIALIZE,
).added_items,
_source_supports_scalars=True,
).scalars()
else:
return self._generate(sess)._iter()
if TYPE_CHECKING:
def __iter__(self) -> Iterator[_T]:
...
def __getitem__(self, index: Any) -> _T:
sess = self.session
if sess is None:
return self.attr._get_collection_history(
attributes.instance_state(self.instance),
attributes.PASSIVE_NO_INITIALIZE,
).indexed(index)
else:
return self._generate(sess).__getitem__(index)
def count(self) -> int:
sess = self.session
if sess is None:
return len(
self.attr._get_collection_history(
attributes.instance_state(self.instance),
attributes.PASSIVE_NO_INITIALIZE,
).added_items
)
else:
return self._generate(sess).count()
def _generate(self, sess=None):
# note we're returning an entirely new Query class instance
# here without any assignment capabilities; the class of this
# query is determined by the session.
instance = self.instance
if sess is None:
sess = object_session(instance)
if sess is None:
raise orm_exc.DetachedInstanceError(
"Parent instance %s is not bound to a Session, and no "
"contextual session is established; lazy load operation "
"of attribute '%s' cannot proceed"
% (orm_util.instance_str(instance), self.attr.key)
)
if self.query_class:
query = self.query_class(self.attr.target_mapper, session=sess)
else:
query = sess.query(self.attr.target_mapper)
query._where_criteria = self._where_criteria
query._from_obj = self._from_obj
query._order_by_clauses = self._order_by_clauses
return query
def add_all(self, iterator: Iterable[_T]) -> None:
"""Add an iterable of items to this :class:`_orm.AppenderQuery`.
The given items will be persisted to the database in terms of
the parent instance's collection on the next flush.
This method is provided to assist in delivering forwards-compatibility
with the :class:`_orm.WriteOnlyCollection` collection class.
.. versionadded:: 2.0
"""
self._add_all_impl(iterator)
def add(self, item: _T) -> None:
"""Add an item to this :class:`_orm.AppenderQuery`.
The given item will be persisted to the database in terms of
the parent instance's collection on the next flush.
This method is provided to assist in delivering forwards-compatibility
with the :class:`_orm.WriteOnlyCollection` collection class.
.. versionadded:: 2.0
"""
self._add_all_impl([item])
def extend(self, iterator: Iterable[_T]) -> None:
"""Add an iterable of items to this :class:`_orm.AppenderQuery`.
The given items will be persisted to the database in terms of
the parent instance's collection on the next flush.
"""
self._add_all_impl(iterator)
def append(self, item: _T) -> None:
"""Append an item to this :class:`_orm.AppenderQuery`.
The given item will be persisted to the database in terms of
the parent instance's collection on the next flush.
"""
self._add_all_impl([item])
def remove(self, item: _T) -> None:
"""Remove an item from this :class:`_orm.AppenderQuery`.
The given item will be removed from the parent instance's collection on
the next flush.
"""
self._remove_impl(item)
class AppenderQuery(AppenderMixin[_T], Query[_T]):
"""A dynamic query that supports basic collection storage operations.
Methods on :class:`.AppenderQuery` include all methods of
:class:`_orm.Query`, plus additional methods used for collection
persistence.
"""
def mixin_user_query(cls):
"""Return a new class with AppenderQuery functionality layered over."""
name = "Appender" + cls.__name__
return type(name, (AppenderMixin, cls), {"query_class": cls})
|