1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
import random
import threading
import time
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.testing import fixtures
class ConcurrentUseDeclMappingTest(fixtures.TestBase):
def teardown(self):
clear_mappers()
@classmethod
def make_a(cls, Base):
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data = Column(String)
bs = relationship("B")
# need a strong ref so that the class is not gc'ed
cls.A = A
@classmethod
def query_a(cls, Base, result):
s = Session()
time.sleep(random.random() / 100)
A = cls.A
try:
s.query(A).join(A.bs)
except orm_exc.UnmappedClassError as oe:
# this is the failure mode, where B is being handled by
# declarative and is in the registry but not mapped yet.
result[0] = oe
except exc.InvalidRequestError as err:
# if make_b() starts too slowly, we can reach here, because
# B isn't in the registry yet. We can't guard against this
# case in the library because a class can refer to a name that
# doesn't exist and that has to raise.
result[0] = True
else:
# no conflict
result[0] = True
@classmethod
def make_b(cls, Base):
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
@declared_attr
def data(cls):
time.sleep(0.001)
return Column(String)
a_id = Column(ForeignKey("a.id"))
cls.B = B
def test_concurrent_create(self):
for i in range(50):
Base = declarative_base()
clear_mappers()
self.make_a(Base)
result = [False]
threads = [
threading.Thread(target=self.make_b, args=(Base,)),
threading.Thread(target=self.query_a, args=(Base, result)),
]
for t in threads:
t.start()
for t in threads:
t.join()
if isinstance(result[0], orm_exc.UnmappedClassError):
raise result[0]
|