1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
"""Test MySQL FOR UPDATE behavior.
See #4246
"""
import contextlib
from sqlalchemy import Column, Integer, ForeignKey, update
from sqlalchemy.orm import relationship, Session, joinedload
from sqlalchemy import exc
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
__backend__ = True
__only_on__ = "mysql"
__requires__ = ("mysql_for_update",)
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
x = Column(Integer)
y = Column(Integer)
bs = relationship("B")
__table_args__ = {"mysql_engine": "InnoDB"}
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
a_id = Column(ForeignKey("a.id"))
x = Column(Integer)
y = Column(Integer)
__table_args__ = {"mysql_engine": "InnoDB"}
@classmethod
def insert_data(cls):
A = cls.classes.A
B = cls.classes.B
# all the x/y are < 10
s = Session()
s.add_all(
[
A(x=5, y=5, bs=[B(x=4, y=4), B(x=2, y=8), B(x=7, y=1)]),
A(x=7, y=5, bs=[B(x=4, y=4), B(x=5, y=8)]),
]
)
s.commit()
@contextlib.contextmanager
def run_test(self):
connection = testing.db.connect()
connection.execute("set innodb_lock_wait_timeout=1")
main_trans = connection.begin()
try:
yield Session(bind=connection)
finally:
main_trans.rollback()
connection.close()
def _assert_a_is_locked(self, should_be_locked):
A = self.classes.A
with testing.db.begin() as alt_trans:
alt_trans.execute("set innodb_lock_wait_timeout=1")
# set x/y > 10
try:
alt_trans.execute(update(A).values(x=15, y=19))
except (exc.InternalError, exc.OperationalError) as err:
assert "Lock wait timeout exceeded" in str(err)
assert should_be_locked
else:
assert not should_be_locked
def _assert_b_is_locked(self, should_be_locked):
B = self.classes.B
with testing.db.begin() as alt_trans:
alt_trans.execute("set innodb_lock_wait_timeout=1")
# set x/y > 10
try:
alt_trans.execute(update(B).values(x=15, y=19))
except (exc.InternalError, exc.OperationalError) as err:
assert "Lock wait timeout exceeded" in str(err)
assert should_be_locked
else:
assert not should_be_locked
def test_basic_lock(self):
A = self.classes.A
with self.run_test() as s:
s.query(A).with_for_update().all()
# test our fixture
self._assert_a_is_locked(True)
def test_basic_not_lock(self):
A = self.classes.A
with self.run_test() as s:
s.query(A).all()
# test our fixture
self._assert_a_is_locked(False)
def test_joined_lock_subquery(self):
A = self.classes.A
with self.run_test() as s:
s.query(A).options(joinedload(A.bs)).with_for_update().first()
# test for issue #4246, should be locked
self._assert_a_is_locked(True)
self._assert_b_is_locked(True)
def test_joined_lock_subquery_inner_for_update(self):
A = self.classes.A
B = self.classes.B
with self.run_test() as s:
q = s.query(A).with_for_update().subquery()
s.query(q).join(B).all()
# FOR UPDATE is inside the subquery, should be locked
self._assert_a_is_locked(True)
# FOR UPDATE is inside the subquery, B is not locked
self._assert_b_is_locked(False)
def test_joined_lock_subquery_inner_for_update_outer(self):
A = self.classes.A
B = self.classes.B
with self.run_test() as s:
q = s.query(A).with_for_update().subquery()
s.query(q).join(B).with_for_update().all()
# FOR UPDATE is inside the subquery, should be locked
self._assert_a_is_locked(True)
# FOR UPDATE is also outside the subquery, B is locked
self._assert_b_is_locked(True)
def test_joined_lock_subquery_order_for_update_outer(self):
A = self.classes.A
B = self.classes.B
with self.run_test() as s:
q = s.query(A).order_by(A.id).subquery()
s.query(q).join(B).with_for_update().all()
# FOR UPDATE is inside the subquery, should not be locked
self._assert_a_is_locked(False)
self._assert_b_is_locked(True)
def test_joined_lock_no_subquery(self):
A = self.classes.A
with self.run_test() as s:
s.query(A).options(joinedload(A.bs)).with_for_update().all()
# no subquery, should be locked
self._assert_a_is_locked(True)
self._assert_b_is_locked(True)
|