1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
|
# mapper/util.py
# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from sqlalchemy import sql, util, exceptions
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql.util import row_adapter as create_row_adapter
from sqlalchemy.sql import visitors
from sqlalchemy.orm.interfaces import MapperExtension, EXT_CONTINUE, build_path
all_cascades = util.Set(["delete", "delete-orphan", "all", "merge",
"expunge", "save-update", "refresh-expire", "none"])
class CascadeOptions(object):
"""Keeps track of the options sent to relation().cascade"""
def __init__(self, arg=""):
values = util.Set([c.strip() for c in arg.split(',')])
self.delete_orphan = "delete-orphan" in values
self.delete = "delete" in values or "all" in values
self.save_update = "save-update" in values or "all" in values
self.merge = "merge" in values or "all" in values
self.expunge = "expunge" in values or "all" in values
self.refresh_expire = "refresh-expire" in values or "all" in values
for x in values:
if x not in all_cascades:
raise exceptions.ArgumentError("Invalid cascade option '%s'" % x)
def __contains__(self, item):
return getattr(self, item.replace("-", "_"), False)
def __repr__(self):
return "CascadeOptions(arg=%s)" % repr(",".join(
[x for x in ['delete', 'save_update', 'merge', 'expunge',
'delete_orphan', 'refresh-expire']
if getattr(self, x, False) is True]))
def polymorphic_union(table_map, typecolname, aliasname='p_union'):
"""Create a ``UNION`` statement used by a polymorphic mapper.
See the `SQLAlchemy` advanced mapping docs for an example of how
this is used.
"""
colnames = util.Set()
colnamemaps = {}
types = {}
for key in table_map.keys():
table = table_map[key]
# mysql doesnt like selecting from a select; make it an alias of the select
if isinstance(table, sql.Select):
table = table.alias()
table_map[key] = table
m = {}
for c in table.c:
colnames.add(c.name)
m[c.name] = c
types[c.name] = c.type
colnamemaps[table] = m
def col(name, table):
try:
return colnamemaps[table][name]
except KeyError:
return sql.cast(sql.null(), types[name]).label(name)
result = []
for type, table in table_map.iteritems():
if typecolname is not None:
result.append(sql.select([col(name, table) for name in colnames] +
[sql.literal_column("'%s'" % type).label(typecolname)],
from_obj=[table]))
else:
result.append(sql.select([col(name, table) for name in colnames], from_obj=[table]))
return sql.union_all(*result).alias(aliasname)
class ExtensionCarrier(object):
"""stores a collection of MapperExtension objects.
allows an extension methods to be called on contained MapperExtensions
in the order they were added to this object. Also includes a 'methods' dictionary
accessor which allows for a quick check if a particular method
is overridden on any contained MapperExtensions.
"""
def __init__(self, _elements=None):
self.methods = {}
if _elements is not None:
self.__elements = [self.__inspect(e) for e in _elements]
else:
self.__elements = []
def copy(self):
return ExtensionCarrier(list(self.__elements))
def __iter__(self):
return iter(self.__elements)
def insert(self, extension):
"""Insert a MapperExtension at the beginning of this ExtensionCarrier's list."""
self.__elements.insert(0, self.__inspect(extension))
def append(self, extension):
"""Append a MapperExtension at the end of this ExtensionCarrier's list."""
self.__elements.append(self.__inspect(extension))
def __inspect(self, extension):
for meth in MapperExtension.__dict__.keys():
if meth not in self.methods and hasattr(extension, meth) and getattr(extension, meth) is not getattr(MapperExtension, meth):
self.methods[meth] = self.__create_do(meth)
return extension
def __create_do(self, funcname):
def _do(*args, **kwargs):
for elem in self.__elements:
ret = getattr(elem, funcname)(*args, **kwargs)
if ret is not EXT_CONTINUE:
return ret
else:
return EXT_CONTINUE
try:
_do.__name__ = funcname
except:
# cant set __name__ in py 2.3
pass
return _do
def _pass(self, *args, **kwargs):
return EXT_CONTINUE
def __getattr__(self, key):
return self.methods.get(key, self._pass)
class AliasedClauses(object):
"""Creates aliases of a mapped tables for usage in ORM queries.
"""
def __init__(self, mapped_table, alias=None):
if alias:
self.alias = alias
else:
self.alias = mapped_table.alias()
self.mapped_table = mapped_table
self.row_decorator = self._create_row_adapter()
def aliased_column(self, column):
"""return the aliased version of the given column, creating a new label for it if not already
present in this AliasedClauses."""
conv = self.alias.corresponding_column(column)
if conv:
return conv
aliased_column = column
# for column-level subqueries, swap out its selectable with our
# eager version as appropriate, and manually build the
# "correlation" list of the subquery.
class ModifySubquery(visitors.ClauseVisitor):
def visit_select(s, select):
select._should_correlate = False
select.append_correlation(self.alias)
aliased_column = sql_util.ClauseAdapter(self.alias).chain(ModifySubquery()).traverse(aliased_column, clone=True)
aliased_column = aliased_column.label(None)
self.row_decorator.map[column] = aliased_column
return aliased_column
def adapt_clause(self, clause):
return sql_util.ClauseAdapter(self.alias).traverse(clause, clone=True)
def adapt_list(self, clauses):
return sql_util.ClauseAdapter(self.alias).copy_and_process(clauses)
def _create_row_adapter(self):
"""Return a callable which,
when passed a RowProxy, will return a new dict-like object
that translates Column objects to that of this object's Alias before calling upon the row.
This allows a regular Table to be used to target columns in a row that was in reality generated from an alias
of that table, in such a way that the row can be passed to logic which knows nothing about the aliased form
of the table.
"""
return create_row_adapter(self.alias, self.mapped_table)
class PropertyAliasedClauses(AliasedClauses):
"""extends AliasedClauses to add support for primary/secondary joins on a relation()."""
def __init__(self, prop, primaryjoin, secondaryjoin, parentclauses=None, alias=None):
super(PropertyAliasedClauses, self).__init__(prop.select_table, alias=alias)
self.parentclauses = parentclauses
self.prop = prop
if prop.secondary:
self.secondary = prop.secondary.alias()
if parentclauses is not None:
primary_aliasizer = sql_util.ClauseAdapter(self.secondary).chain(sql_util.ClauseAdapter(parentclauses.alias))
secondary_aliasizer = sql_util.ClauseAdapter(self.alias).chain(sql_util.ClauseAdapter(self.secondary))
else:
primary_aliasizer = sql_util.ClauseAdapter(self.secondary)
secondary_aliasizer = sql_util.ClauseAdapter(self.alias).chain(sql_util.ClauseAdapter(self.secondary))
self.secondaryjoin = secondary_aliasizer.traverse(secondaryjoin, clone=True)
self.primaryjoin = primary_aliasizer.traverse(primaryjoin, clone=True)
else:
if parentclauses is not None:
aliasizer = sql_util.ClauseAdapter(self.alias, exclude=prop.local_side)
aliasizer.chain(sql_util.ClauseAdapter(parentclauses.alias, exclude=prop.remote_side))
else:
aliasizer = sql_util.ClauseAdapter(self.alias, exclude=prop.local_side)
self.primaryjoin = aliasizer.traverse(primaryjoin, clone=True)
self.secondary = None
self.secondaryjoin = None
if prop.order_by:
self.order_by = sql_util.ClauseAdapter(self.alias).copy_and_process(util.to_list(prop.order_by))
else:
self.order_by = None
mapper = property(lambda self:self.prop.mapper)
table = property(lambda self:self.prop.select_table)
def instance_str(instance):
"""Return a string describing an instance."""
return instance.__class__.__name__ + "@" + hex(id(instance))
def state_str(state):
"""Return a string describing an instance."""
if state is None:
return "None"
else:
return state.class_.__name__ + "@" + hex(id(state.obj()))
def attribute_str(instance, attribute):
return instance_str(instance) + "." + attribute
def identity_equal(a, b):
if a is b:
return True
id_a = getattr(a, '_instance_key', None)
id_b = getattr(b, '_instance_key', None)
if id_a is None or id_b is None:
return False
return id_a == id_b
|