summaryrefslogtreecommitdiff
path: root/test/dialect/mysql/test_for_update.py
blob: 197c9d6937b307f8967cac5c56eb0e2ad26b3bb6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Test MySQL FOR UPDATE behavior.

See #4246

"""
import contextlib

from sqlalchemy import Column, Integer, ForeignKey, update
from sqlalchemy.orm import relationship, Session, joinedload
from sqlalchemy import exc

from sqlalchemy.testing import fixtures
from sqlalchemy import testing


class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
    __backend__ = True
    __only_on__ = "mysql"
    __requires__ = ("mysql_for_update",)

    @classmethod
    def setup_classes(cls):
        Base = cls.DeclarativeBasic

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            x = Column(Integer)
            y = Column(Integer)
            bs = relationship("B")
            __table_args__ = {"mysql_engine": "InnoDB"}

        class B(Base):
            __tablename__ = "b"
            id = Column(Integer, primary_key=True)
            a_id = Column(ForeignKey("a.id"))
            x = Column(Integer)
            y = Column(Integer)
            __table_args__ = {"mysql_engine": "InnoDB"}

    @classmethod
    def insert_data(cls):
        A = cls.classes.A
        B = cls.classes.B

        # all the x/y are < 10
        s = Session()
        s.add_all(
            [
                A(x=5, y=5, bs=[B(x=4, y=4), B(x=2, y=8), B(x=7, y=1)]),
                A(x=7, y=5, bs=[B(x=4, y=4), B(x=5, y=8)]),
            ]
        )
        s.commit()

    @contextlib.contextmanager
    def run_test(self):
        connection = testing.db.connect()
        connection.execute("set innodb_lock_wait_timeout=1")
        main_trans = connection.begin()
        try:
            yield Session(bind=connection)
        finally:
            main_trans.rollback()
            connection.close()

    def _assert_a_is_locked(self, should_be_locked):
        A = self.classes.A
        with testing.db.begin() as alt_trans:
            alt_trans.execute("set innodb_lock_wait_timeout=1")
            # set x/y > 10
            try:
                alt_trans.execute(update(A).values(x=15, y=19))
            except (exc.InternalError, exc.OperationalError) as err:
                assert "Lock wait timeout exceeded" in str(err)
                assert should_be_locked
            else:
                assert not should_be_locked

    def _assert_b_is_locked(self, should_be_locked):
        B = self.classes.B
        with testing.db.begin() as alt_trans:
            alt_trans.execute("set innodb_lock_wait_timeout=1")
            # set x/y > 10
            try:
                alt_trans.execute(update(B).values(x=15, y=19))
            except (exc.InternalError, exc.OperationalError) as err:
                assert "Lock wait timeout exceeded" in str(err)
                assert should_be_locked
            else:
                assert not should_be_locked

    def test_basic_lock(self):
        A = self.classes.A
        with self.run_test() as s:
            s.query(A).with_for_update().all()
            # test our fixture
            self._assert_a_is_locked(True)

    def test_basic_not_lock(self):
        A = self.classes.A
        with self.run_test() as s:
            s.query(A).all()
            # test our fixture
            self._assert_a_is_locked(False)

    def test_joined_lock_subquery(self):
        A = self.classes.A
        with self.run_test() as s:
            s.query(A).options(joinedload(A.bs)).with_for_update().first()

            # test for issue #4246, should be locked
            self._assert_a_is_locked(True)
            self._assert_b_is_locked(True)

    def test_joined_lock_subquery_inner_for_update(self):
        A = self.classes.A
        B = self.classes.B
        with self.run_test() as s:
            q = s.query(A).with_for_update().subquery()
            s.query(q).join(B).all()

            # FOR UPDATE is inside the subquery, should be locked
            self._assert_a_is_locked(True)

            # FOR UPDATE is inside the subquery, B is not locked
            self._assert_b_is_locked(False)

    def test_joined_lock_subquery_inner_for_update_outer(self):
        A = self.classes.A
        B = self.classes.B
        with self.run_test() as s:
            q = s.query(A).with_for_update().subquery()
            s.query(q).join(B).with_for_update().all()

            # FOR UPDATE is inside the subquery, should be locked
            self._assert_a_is_locked(True)

            # FOR UPDATE is also outside the subquery, B is locked
            self._assert_b_is_locked(True)

    def test_joined_lock_subquery_order_for_update_outer(self):
        A = self.classes.A
        B = self.classes.B
        with self.run_test() as s:
            q = s.query(A).order_by(A.id).subquery()
            s.query(q).join(B).with_for_update().all()
            # FOR UPDATE is inside the subquery, should not be locked
            self._assert_a_is_locked(False)
            self._assert_b_is_locked(True)

    def test_joined_lock_no_subquery(self):
        A = self.classes.A
        with self.run_test() as s:
            s.query(A).options(joinedload(A.bs)).with_for_update().all()
            # no subquery, should be locked
            self._assert_a_is_locked(True)
            self._assert_b_is_locked(True)