1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
|
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import eq_
class OnDuplicateTest(fixtures.TablesTest):
__only_on__ = ("mysql", "mariadb")
__backend__ = True
run_define_tables = "each"
@classmethod
def define_tables(cls, metadata):
Table(
"foos",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("bar", String(10)),
Column("baz", String(10)),
Column("updated_once", Boolean, default=False),
)
def test_bad_args(self):
assert_raises(
ValueError,
insert(self.tables.foos).values({}).on_duplicate_key_update,
)
assert_raises(
exc.ArgumentError,
insert(self.tables.foos).values({}).on_duplicate_key_update,
{"id": 1, "bar": "b"},
id=1,
bar="b",
)
assert_raises(
exc.ArgumentError,
insert(self.tables.foos).values({}).on_duplicate_key_update,
{"id": 1, "bar": "b"},
{"id": 2, "bar": "baz"},
)
def test_on_duplicate_key_update_multirow(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(insert(foos).values(dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
result = conn.execute(stmt)
# multirow, so its ambiguous. this is a behavioral change
# in 1.4
eq_(result.inserted_primary_key, (None,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "ab", "bz", False)],
)
def test_on_duplicate_key_update_singlerow(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(insert(foos).values(dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values(dict(id=2, bar="b"))
stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
result = conn.execute(stmt)
# only one row in the INSERT so we do inserted_primary_key
eq_(result.inserted_primary_key, (2,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "b", "bz", False)],
)
def test_on_duplicate_key_update_null_multirow(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(insert(foos).values(dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
stmt = stmt.on_duplicate_key_update(updated_once=None)
result = conn.execute(stmt)
# ambiguous
eq_(result.inserted_primary_key, (None,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "b", "bz", None)],
)
def test_on_duplicate_key_update_expression_multirow(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(insert(foos).values(dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
stmt = stmt.on_duplicate_key_update(
bar=func.concat(stmt.inserted.bar, "_foo"),
baz=func.concat(stmt.inserted.bar, "_", foos.c.baz),
)
result = conn.execute(stmt)
eq_(result.inserted_primary_key, (None,))
eq_(
conn.execute(foos.select()).fetchall(),
[
# first entry triggers ON DUPLICATE
(1, "ab_foo", "ab_bz", False),
# second entry must be an insert
(2, "b", None, False),
],
)
def test_on_duplicate_key_update_preserve_order(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(
insert(foos).values(
[
dict(id=1, bar="b", baz="bz"),
dict(id=2, bar="b", baz="bz2"),
],
)
)
stmt = insert(foos)
update_condition = foos.c.updated_once == False
# The following statements show importance of the columns update
# ordering as old values being referenced in UPDATE clause are
# getting replaced one by one from left to right with their new
# values.
stmt1 = stmt.on_duplicate_key_update(
[
(
"bar",
func.if_(
update_condition,
func.values(foos.c.bar),
foos.c.bar,
),
),
(
"updated_once",
func.if_(update_condition, True, foos.c.updated_once),
),
]
)
stmt2 = stmt.on_duplicate_key_update(
[
(
"updated_once",
func.if_(update_condition, True, foos.c.updated_once),
),
(
"bar",
func.if_(
update_condition,
func.values(foos.c.bar),
foos.c.bar,
),
),
]
)
# First statement should succeed updating column bar
conn.execute(stmt1, dict(id=1, bar="ab"))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "ab", "bz", True)],
)
# Second statement will do noop update of column bar
conn.execute(stmt2, dict(id=2, bar="ab"))
eq_(
conn.execute(foos.select().where(foos.c.id == 2)).fetchall(),
[(2, "b", "bz2", True)],
)
def test_last_inserted_id(self, connection):
foos = self.tables.foos
conn = connection
stmt = insert(foos).values({"bar": "b", "baz": "bz"})
result = conn.execute(
stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz")
)
eq_(result.inserted_primary_key, (1,))
stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
result = conn.execute(
stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz")
)
eq_(result.inserted_primary_key, (1,))
|