summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2023-03-09 13:54:07 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2023-03-10 15:07:27 -0500
commit2c9796b10c3e85450afdeedc4003607abda2f2db (patch)
tree916866e37253c920acbf6d61adafbc64c2cb159b /test/sql
parent9c0715181de6f03543c7ac9038c481f57f773d49 (diff)
downloadsqlalchemy-2c9796b10c3e85450afdeedc4003607abda2f2db.tar.gz
repair broken lambda patch
in I4e0b627bfa187f1780dc68ec81b94db1c78f846a the 1.4 version has more changes than the main version, which failed to get the entire change, yet the whole thing was merged. Restore the missing mutex related code to the main version. Fixed regression where the fix for :ticket:`8098`, which was released in the 1.4 series and provided a layer of concurrency-safe checks for the lambda SQL API, included additional fixes in the patch that failed to be applied to the main branch. These additional fixes have been applied. Change-Id: Id172e09c421dafa6ef1d40b383aa4371de343864 References: #8098 Fixes: #9461
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_lambdas.py102
1 files changed, 102 insertions, 0 deletions
diff --git a/test/sql/test_lambdas.py b/test/sql/test_lambdas.py
index c3e271706..002a13db9 100644
--- a/test/sql/test_lambdas.py
+++ b/test/sql/test_lambdas.py
@@ -1,3 +1,10 @@
+from __future__ import annotations
+
+import threading
+import time
+from typing import List
+from typing import Optional
+
from sqlalchemy import exc
from sqlalchemy import testing
from sqlalchemy.future import select as future_select
@@ -2083,3 +2090,98 @@ class DeferredLambdaElementTest(
eq_(e12key[0], e1key[0])
eq_(e32key[0], e3key[0])
+
+
+class ConcurrencyTest(fixtures.TestBase):
+ """test for #8098 and #9461"""
+
+ __requires__ = ("independent_readonly_connections",)
+
+ __only_on__ = ("+psycopg2", "+mysqldb", "+pysqlite", "+pymysql")
+
+ THREADS = 10
+
+ @testing.fixture
+ def mapping_fixture(self, decl_base):
+ class A(decl_base):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ col1 = Column(String(100))
+ col2 = Column(String(100))
+ col3 = Column(String(100))
+ col4 = Column(String(100))
+
+ decl_base.metadata.create_all(testing.db)
+
+ from sqlalchemy.orm import Session
+
+ with testing.db.connect() as conn:
+ with Session(conn) as session:
+ session.add_all(
+ [
+ A(col1=str(i), col2=str(i), col3=str(i), col4=str(i))
+ for i in range(self.THREADS + 1)
+ ]
+ )
+ session.commit()
+
+ return A
+
+ @testing.requires.timing_intensive
+ def test_lambda_concurrency(self, testing_engine, mapping_fixture):
+ A = mapping_fixture
+ engine = testing_engine(options={"pool_size": self.THREADS + 5})
+ NUM_OF_LAMBDAS = 150
+
+ code = """
+from sqlalchemy import lambda_stmt, select
+
+
+def generate_lambda_stmt(wanted):
+ stmt = lambda_stmt(lambda: select(A.col1, A.col2, A.col3, A.col4))
+"""
+
+ for _ in range(NUM_OF_LAMBDAS):
+ code += (
+ " stmt += lambda s: s.where((A.col1 == wanted) & "
+ "(A.col2 == wanted) & (A.col3 == wanted) & "
+ "(A.col4 == wanted))\n"
+ )
+
+ code += """
+ return stmt
+"""
+
+ d = {"A": A, "__name__": "lambda_fake"}
+ exec(code, d)
+ generate_lambda_stmt = d["generate_lambda_stmt"]
+
+ runs: List[Optional[int]] = [None for _ in range(self.THREADS)]
+ conns = [engine.connect() for _ in range(self.THREADS)]
+
+ def run(num):
+ wanted = str(num)
+ connection = conns[num]
+ time.sleep(0.1)
+ stmt = generate_lambda_stmt(wanted)
+ time.sleep(0.1)
+ row = connection.execute(stmt).first()
+ if not row:
+ runs[num] = False
+ else:
+ runs[num] = True
+
+ threads = [
+ threading.Thread(target=run, args=(num,))
+ for num in range(self.THREADS)
+ ]
+
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join(timeout=10)
+ for conn in conns:
+ conn.close()
+
+ fails = len([r for r in runs if r is False])
+ assert not fails, f"{fails} runs failed"