summaryrefslogtreecommitdiff
path: root/test/orm/declarative/test_concurrency.py
blob: ecddc2e5fa2a5ed3b19422533ad87260227c93b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import random
import threading
import time

from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm import relationship
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.fixtures import fixture_session


class ConcurrentUseDeclMappingTest(fixtures.TestBase):
    def teardown_test(self):
        clear_mappers()

    @classmethod
    def make_a(cls, Base):
        class A(Base):
            __tablename__ = "a"

            id = Column(Integer, primary_key=True)
            data = Column(String)
            bs = relationship("B")

        # need a strong ref so that the class is not gc'ed
        cls.A = A

    @classmethod
    def query_a(cls, Base, result):
        s = fixture_session()
        time.sleep(random.random() / 100)
        A = cls.A
        try:
            s.query(A).join(A.bs)
        except orm_exc.UnmappedClassError as oe:
            # this is the failure mode, where B is being handled by
            # declarative and is in the registry but not mapped yet.
            result[0] = oe
        except exc.InvalidRequestError:
            # if make_b() starts too slowly, we can reach here, because
            # B isn't in the registry yet.  We can't guard against this
            # case in the library because a class can refer to a name that
            # doesn't exist and that has to raise.
            result[0] = True
        else:
            # no conflict
            result[0] = True

    @classmethod
    def make_b(cls, Base):
        class B(Base):
            __tablename__ = "b"
            id = Column(Integer, primary_key=True)

            @declared_attr
            def data(cls):
                time.sleep(0.001)
                return Column(String)

            a_id = Column(ForeignKey("a.id"))

        cls.B = B

    def test_concurrent_create(self):
        for i in range(50):
            Base = declarative_base()
            clear_mappers()

            self.make_a(Base)
            result = [False]
            threads = [
                threading.Thread(target=self.make_b, args=(Base,)),
                threading.Thread(target=self.query_a, args=(Base, result)),
            ]

            for t in threads:
                t.start()

            for t in threads:
                t.join()

            if isinstance(result[0], orm_exc.UnmappedClassError):
                raise result[0]