diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-03-09 13:54:07 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-03-10 15:07:27 -0500 |
| commit | 2c9796b10c3e85450afdeedc4003607abda2f2db (patch) | |
| tree | 916866e37253c920acbf6d61adafbc64c2cb159b /test/sql | |
| parent | 9c0715181de6f03543c7ac9038c481f57f773d49 (diff) | |
| download | sqlalchemy-2c9796b10c3e85450afdeedc4003607abda2f2db.tar.gz | |
repair broken lambda patch
in I4e0b627bfa187f1780dc68ec81b94db1c78f846a the 1.4 version has more
changes than the main version, which failed to get the entire change,
yet the whole thing was merged. Restore the missing mutex related
code to the main version.
Fixed regression where the fix for :ticket:`8098`, which was released in
the 1.4 series and provided a layer of concurrency-safe checks for the
lambda SQL API, included additional fixes in the patch that failed to be
applied to the main branch. These additional fixes have been applied.
Change-Id: Id172e09c421dafa6ef1d40b383aa4371de343864
References: #8098
Fixes: #9461
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_lambdas.py | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/test/sql/test_lambdas.py b/test/sql/test_lambdas.py index c3e271706..002a13db9 100644 --- a/test/sql/test_lambdas.py +++ b/test/sql/test_lambdas.py @@ -1,3 +1,10 @@ +from __future__ import annotations + +import threading +import time +from typing import List +from typing import Optional + from sqlalchemy import exc from sqlalchemy import testing from sqlalchemy.future import select as future_select @@ -2083,3 +2090,98 @@ class DeferredLambdaElementTest( eq_(e12key[0], e1key[0]) eq_(e32key[0], e3key[0]) + + +class ConcurrencyTest(fixtures.TestBase): + """test for #8098 and #9461""" + + __requires__ = ("independent_readonly_connections",) + + __only_on__ = ("+psycopg2", "+mysqldb", "+pysqlite", "+pymysql") + + THREADS = 10 + + @testing.fixture + def mapping_fixture(self, decl_base): + class A(decl_base): + __tablename__ = "a" + id = Column(Integer, primary_key=True) + col1 = Column(String(100)) + col2 = Column(String(100)) + col3 = Column(String(100)) + col4 = Column(String(100)) + + decl_base.metadata.create_all(testing.db) + + from sqlalchemy.orm import Session + + with testing.db.connect() as conn: + with Session(conn) as session: + session.add_all( + [ + A(col1=str(i), col2=str(i), col3=str(i), col4=str(i)) + for i in range(self.THREADS + 1) + ] + ) + session.commit() + + return A + + @testing.requires.timing_intensive + def test_lambda_concurrency(self, testing_engine, mapping_fixture): + A = mapping_fixture + engine = testing_engine(options={"pool_size": self.THREADS + 5}) + NUM_OF_LAMBDAS = 150 + + code = """ +from sqlalchemy import lambda_stmt, select + + +def generate_lambda_stmt(wanted): + stmt = lambda_stmt(lambda: select(A.col1, A.col2, A.col3, A.col4)) +""" + + for _ in range(NUM_OF_LAMBDAS): + code += ( + " stmt += lambda s: s.where((A.col1 == wanted) & " + "(A.col2 == wanted) & (A.col3 == wanted) & " + "(A.col4 == wanted))\n" + ) + + code += """ + return stmt +""" + + d = {"A": A, "__name__": "lambda_fake"} + exec(code, d) + generate_lambda_stmt = d["generate_lambda_stmt"] + + runs: List[Optional[int]] = [None for _ in range(self.THREADS)] + conns = [engine.connect() for _ in range(self.THREADS)] + + def run(num): + wanted = str(num) + connection = conns[num] + time.sleep(0.1) + stmt = generate_lambda_stmt(wanted) + time.sleep(0.1) + row = connection.execute(stmt).first() + if not row: + runs[num] = False + else: + runs[num] = True + + threads = [ + threading.Thread(target=run, args=(num,)) + for num in range(self.THREADS) + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join(timeout=10) + for conn in conns: + conn.close() + + fails = len([r for r in runs if r is False]) + assert not fails, f"{fails} runs failed" |
