diff options
Diffstat (limited to 'test/sql/test_lambdas.py')
-rw-r--r-- | test/sql/test_lambdas.py | 679 |
1 files changed, 679 insertions, 0 deletions
diff --git a/test/sql/test_lambdas.py b/test/sql/test_lambdas.py new file mode 100644 index 000000000..53f6a9544 --- /dev/null +++ b/test/sql/test_lambdas.py @@ -0,0 +1,679 @@ +from sqlalchemy import exc +from sqlalchemy import testing +from sqlalchemy.future import select as future_select +from sqlalchemy.schema import Column +from sqlalchemy.schema import ForeignKey +from sqlalchemy.schema import Table +from sqlalchemy.sql import and_ +from sqlalchemy.sql import coercions +from sqlalchemy.sql import column +from sqlalchemy.sql import join +from sqlalchemy.sql import lambda_stmt +from sqlalchemy.sql import lambdas +from sqlalchemy.sql import roles +from sqlalchemy.sql import select +from sqlalchemy.sql import table +from sqlalchemy.sql import util as sql_util +from sqlalchemy.testing import assert_raises_message +from sqlalchemy.testing import AssertsCompiledSQL +from sqlalchemy.testing import eq_ +from sqlalchemy.testing import fixtures +from sqlalchemy.testing import is_ +from sqlalchemy.testing.assertsql import CompiledSQL +from sqlalchemy.types import Integer +from sqlalchemy.types import String + + +class DeferredLambdaTest( + fixtures.TestBase, testing.AssertsExecutionResults, AssertsCompiledSQL +): + __dialect__ = "default" + + def test_select_whereclause(self): + t1 = table("t1", column("q"), column("p")) + + x = 10 + y = 5 + + def go(): + return select([t1]).where(lambda: and_(t1.c.q == x, t1.c.p == y)) + + self.assert_compile( + go(), "SELECT t1.q, t1.p FROM t1 WHERE t1.q = :x_1 AND t1.p = :y_1" + ) + + self.assert_compile( + go(), "SELECT t1.q, t1.p FROM t1 WHERE t1.q = :x_1 AND t1.p = :y_1" + ) + + def test_stale_checker_embedded(self): + def go(x): + + stmt = select([lambda: x]) + return stmt + + c1 = column("x") + s1 = go(c1) + s2 = go(c1) + + self.assert_compile(s1, "SELECT x") + self.assert_compile(s2, "SELECT x") + + c1 = column("q") + + s3 = go(c1) + self.assert_compile(s3, "SELECT q") + + def test_stale_checker_statement(self): + def go(x): + + stmt = lambdas.lambda_stmt(lambda: select([x])) + return stmt + + c1 = column("x") + s1 = go(c1) + s2 = go(c1) + + self.assert_compile(s1, "SELECT x") + self.assert_compile(s2, "SELECT x") + + c1 = column("q") + + s3 = go(c1) + self.assert_compile(s3, "SELECT q") + + def test_stale_checker_linked(self): + def go(x, y): + + stmt = lambdas.lambda_stmt(lambda: select([x])) + ( + lambda s: s.where(y > 5) + ) + return stmt + + c1 = column("x") + c2 = column("y") + s1 = go(c1, c2) + s2 = go(c1, c2) + + self.assert_compile(s1, "SELECT x WHERE y > :y_1") + self.assert_compile(s2, "SELECT x WHERE y > :y_1") + + c1 = column("q") + c2 = column("p") + + s3 = go(c1, c2) + self.assert_compile(s3, "SELECT q WHERE p > :p_1") + + def test_coercion_cols_clause(self): + assert_raises_message( + exc.ArgumentError, + "Textual column expression 'f' should be explicitly declared", + select, + [lambda: "foo"], + ) + + def test_coercion_where_clause(self): + assert_raises_message( + exc.ArgumentError, + "SQL expression for WHERE/HAVING role expected, got 5", + select([column("q")]).where, + 5, + ) + + def test_propagate_attrs_full_stmt(self): + col = column("q") + col._propagate_attrs = col._propagate_attrs.union( + {"compile_state_plugin": "x", "plugin_subject": "y"} + ) + + stmt = lambdas.lambda_stmt(lambda: select([col])) + + eq_( + stmt._propagate_attrs, + {"compile_state_plugin": "x", "plugin_subject": "y"}, + ) + + def test_propagate_attrs_cols_clause(self): + col = column("q") + col._propagate_attrs = col._propagate_attrs.union( + {"compile_state_plugin": "x", "plugin_subject": "y"} + ) + + stmt = select([lambda: col]) + + eq_( + stmt._propagate_attrs, + {"compile_state_plugin": "x", "plugin_subject": "y"}, + ) + + def test_propagate_attrs_from_clause(self): + col = column("q") + + t = table("t", column("y")) + + t._propagate_attrs = t._propagate_attrs.union( + {"compile_state_plugin": "x", "plugin_subject": "y"} + ) + + stmt = future_select(lambda: col).join(t) + + eq_( + stmt._propagate_attrs, + {"compile_state_plugin": "x", "plugin_subject": "y"}, + ) + + def test_select_legacy_expanding_columns(self): + q, p, r = column("q"), column("p"), column("r") + + stmt = select([lambda: (q, p, r)]) + + self.assert_compile(stmt, "SELECT q, p, r") + + def test_select_future_expanding_columns(self): + q, p, r = column("q"), column("p"), column("r") + + stmt = future_select(lambda: (q, p, r)) + + self.assert_compile(stmt, "SELECT q, p, r") + + def test_select_fromclause(self): + t1 = table("t1", column("q"), column("p")) + t2 = table("t2", column("y")) + + def go(): + return select([t1]).select_from( + lambda: join(t1, t2, lambda: t1.c.q == t2.c.y) + ) + + self.assert_compile( + go(), "SELECT t1.q, t1.p FROM t1 JOIN t2 ON t1.q = t2.y" + ) + + self.assert_compile( + go(), "SELECT t1.q, t1.p FROM t1 JOIN t2 ON t1.q = t2.y" + ) + + def test_in_parameters_one(self): + + expr1 = select([1]).where(column("q").in_(["a", "b", "c"])) + self.assert_compile(expr1, "SELECT 1 WHERE q IN ([POSTCOMPILE_q_1])") + + self.assert_compile( + expr1, + "SELECT 1 WHERE q IN (:q_1_1, :q_1_2, :q_1_3)", + render_postcompile=True, + checkparams={"q_1_1": "a", "q_1_2": "b", "q_1_3": "c"}, + ) + + def test_in_parameters_two(self): + expr2 = select([1]).where(lambda: column("q").in_(["a", "b", "c"])) + self.assert_compile(expr2, "SELECT 1 WHERE q IN ([POSTCOMPILE_q_1])") + self.assert_compile( + expr2, + "SELECT 1 WHERE q IN (:q_1_1, :q_1_2, :q_1_3)", + render_postcompile=True, + checkparams={"q_1_1": "a", "q_1_2": "b", "q_1_3": "c"}, + ) + + def test_in_parameters_three(self): + expr3 = lambdas.lambda_stmt( + lambda: select([1]).where(column("q").in_(["a", "b", "c"])) + ) + self.assert_compile(expr3, "SELECT 1 WHERE q IN ([POSTCOMPILE_q_1])") + self.assert_compile( + expr3, + "SELECT 1 WHERE q IN (:q_1_1, :q_1_2, :q_1_3)", + render_postcompile=True, + checkparams={"q_1_1": "a", "q_1_2": "b", "q_1_3": "c"}, + ) + + def test_in_parameters_four(self): + def go(names): + return lambdas.lambda_stmt( + lambda: select([1]).where(column("q").in_(names)) + ) + + expr4 = go(["a", "b", "c"]) + self.assert_compile( + expr4, "SELECT 1 WHERE q IN ([POSTCOMPILE_names_1])" + ) + self.assert_compile( + expr4, + "SELECT 1 WHERE q IN (:names_1_1, :names_1_2, :names_1_3)", + render_postcompile=True, + checkparams={"names_1_1": "a", "names_1_2": "b", "names_1_3": "c"}, + ) + + def test_in_parameters_five(self): + def go(n1, n2): + stmt = lambdas.lambda_stmt( + lambda: select([1]).where(column("q").in_(n1)) + ) + stmt += lambda s: s.where(column("y").in_(n2)) + return stmt + + expr = go(["a", "b", "c"], ["d", "e", "f"]) + self.assert_compile( + expr, + "SELECT 1 WHERE q IN (:n1_1_1, :n1_1_2, :n1_1_3) " + "AND y IN (:n2_1_1, :n2_1_2, :n2_1_3)", + render_postcompile=True, + checkparams={ + "n1_1_1": "a", + "n1_1_2": "b", + "n1_1_3": "c", + "n2_1_1": "d", + "n2_1_2": "e", + "n2_1_3": "f", + }, + ) + + def test_select_columns_clause(self): + t1 = table("t1", column("q"), column("p")) + + g = 5 + + def go(): + return select([lambda: t1.c.q, lambda: t1.c.p + g]) + + stmt = go() + self.assert_compile( + stmt, + "SELECT t1.q, t1.p + :g_1 AS anon_1 FROM t1", + checkparams={"g_1": 5}, + ) + eq_(stmt._generate_cache_key()._generate_param_dict(), {"g_1": 5}) + + g = 10 + stmt = go() + self.assert_compile( + stmt, + "SELECT t1.q, t1.p + :g_1 AS anon_1 FROM t1", + checkparams={"g_1": 10}, + ) + eq_(stmt._generate_cache_key()._generate_param_dict(), {"g_1": 10}) + + @testing.metadata_fixture() + def user_address_fixture(self, metadata): + users = Table( + "users", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + ) + addresses = Table( + "addresses", + metadata, + Column("id", Integer), + Column("user_id", ForeignKey("users.id")), + Column("email", String(50)), + ) + return users, addresses + + def test_adapt_select(self, user_address_fixture): + users, addresses = user_address_fixture + + stmt = ( + select([users]) + .select_from( + users.join( + addresses, lambda: users.c.id == addresses.c.user_id + ) + ) + .where(lambda: users.c.name == "ed") + ) + + self.assert_compile( + stmt, + "SELECT users.id, users.name FROM users " + "JOIN addresses ON users.id = addresses.user_id " + "WHERE users.name = :name_1", + ) + + u1 = users.alias() + adapter = sql_util.ClauseAdapter(u1) + + s2 = adapter.traverse(stmt) + + self.assert_compile( + s2, + "SELECT users_1.id, users_1.name FROM users AS users_1 " + "JOIN addresses ON users_1.id = addresses.user_id " + "WHERE users_1.name = :name_1", + ) + + def test_no_var_dict_keys(self, user_address_fixture): + users, addresses = user_address_fixture + + names = {"x": "some name"} + foo = "x" + expr = lambda: users.c.name == names[foo] # noqa + + assert_raises_message( + exc.InvalidRequestError, + "Dictionary keys / list indexes inside of a cached " + "lambda must be Python literals only", + coercions.expect, + roles.WhereHavingRole, + expr, + ) + + def test_dict_literal_keys(self, user_address_fixture): + users, addresses = user_address_fixture + + names = {"x": "some name"} + lmb = lambda: users.c.name == names["x"] # noqa + + expr = coercions.expect(roles.WhereHavingRole, lmb) + + self.assert_compile( + expr, + "users.name = :x_1", + params=expr._param_dict(), + checkparams={"x_1": "some name"}, + ) + + def test_assignment_one(self, user_address_fixture): + users, addresses = user_address_fixture + + x = 5 + + def my_lambda(): + + y = 10 + z = y + 18 + + expr1 = users.c.name > x + expr2 = users.c.name < z + return and_(expr1, expr2) + + expr = coercions.expect(roles.WhereHavingRole, my_lambda) + self.assert_compile( + expr, + "users.name > :x_1 AND users.name < :name_1", + params=expr._param_dict(), + checkparams={"name_1": 28, "x_1": 5}, + ) + + expr = coercions.expect(roles.WhereHavingRole, my_lambda) + self.assert_compile( + expr, + "users.name > :x_1 AND users.name < :name_1", + params=expr._param_dict(), + checkparams={"name_1": 28, "x_1": 5}, + ) + + def test_assignment_two(self, user_address_fixture): + users, addresses = user_address_fixture + + x = 5 + z = 10 + + def my_lambda(): + + y = x + z + + expr1 = users.c.name > x + expr2 = users.c.name < y + return and_(expr1, expr2) + + expr = coercions.expect(roles.WhereHavingRole, my_lambda) + self.assert_compile( + expr, + "users.name > :x_1 AND users.name < :x_1 + :z_1", + params=expr._param_dict(), + checkparams={"x_1": 5, "z_1": 10}, + ) + + x = 15 + z = 18 + + expr = coercions.expect(roles.WhereHavingRole, my_lambda) + self.assert_compile( + expr, + "users.name > :x_1 AND users.name < :x_1 + :z_1", + params=expr._param_dict(), + checkparams={"x_1": 15, "z_1": 18}, + ) + + def test_assignment_three(self, user_address_fixture): + users, addresses = user_address_fixture + + x = 5 + z = 10 + + def my_lambda(): + + y = 10 + z + + expr1 = users.c.name > x + expr2 = users.c.name < y + return and_(expr1, expr2) + + expr = coercions.expect(roles.WhereHavingRole, my_lambda) + self.assert_compile( + expr, + "users.name > :x_1 AND users.name < :param_1 + :z_1", + params=expr._param_dict(), + checkparams={"x_1": 5, "z_1": 10, "param_1": 10}, + ) + + x = 15 + z = 18 + + expr = coercions.expect(roles.WhereHavingRole, my_lambda) + self.assert_compile( + expr, + "users.name > :x_1 AND users.name < :param_1 + :z_1", + params=expr._param_dict(), + checkparams={"x_1": 15, "z_1": 18, "param_1": 10}, + ) + + def test_op_reverse(self, user_address_fixture): + user, addresses = user_address_fixture + + x = "foo" + + def mylambda(): + return x + user.c.name + + expr = coercions.expect(roles.WhereHavingRole, mylambda) + self.assert_compile( + expr, ":x_1 || users.name", checkparams={"x_1": "foo"} + ) + + x = "bar" + expr = coercions.expect(roles.WhereHavingRole, mylambda) + self.assert_compile( + expr, ":x_1 || users.name", checkparams={"x_1": "bar"} + ) + + def test_op_forwards(self, user_address_fixture): + user, addresses = user_address_fixture + + x = "foo" + + def mylambda(): + return user.c.name + x + + expr = coercions.expect(roles.WhereHavingRole, mylambda) + self.assert_compile( + expr, "users.name || :x_1", checkparams={"x_1": "foo"} + ) + + x = "bar" + expr = coercions.expect(roles.WhereHavingRole, mylambda) + self.assert_compile( + expr, "users.name || :x_1", checkparams={"x_1": "bar"} + ) + + def test_execute_constructed_uncached(self, user_address_fixture): + users, addresses = user_address_fixture + + def go(name): + stmt = select([lambda: users.c.id]).where( + lambda: users.c.name == name + ) + with testing.db.connect().execution_options( + compiled_cache=None + ) as conn: + conn.execute(stmt) + + with self.sql_execution_asserter(testing.db) as asserter: + go("name1") + go("name2") + go("name1") + go("name3") + + asserter.assert_( + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name1"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name2"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name1"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name3"}], + ), + ) + + def test_execute_full_uncached(self, user_address_fixture): + users, addresses = user_address_fixture + + def go(name): + stmt = lambda_stmt( + lambda: select([users.c.id]).where( # noqa + users.c.name == name + ) + ) + + with testing.db.connect().execution_options( + compiled_cache=None + ) as conn: + conn.execute(stmt) + + with self.sql_execution_asserter(testing.db) as asserter: + go("name1") + go("name2") + go("name1") + go("name3") + + asserter.assert_( + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name1"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name2"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name1"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name3"}], + ), + ) + + def test_execute_constructed_cached(self, user_address_fixture): + users, addresses = user_address_fixture + + cache = {} + + def go(name): + stmt = select([lambda: users.c.id]).where( + lambda: users.c.name == name + ) + + with testing.db.connect().execution_options( + compiled_cache=cache + ) as conn: + conn.execute(stmt) + + with self.sql_execution_asserter(testing.db) as asserter: + go("name1") + go("name2") + go("name1") + go("name3") + + asserter.assert_( + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name1"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name2"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name1"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name3"}], + ), + ) + + def test_execute_full_cached(self, user_address_fixture): + users, addresses = user_address_fixture + + cache = {} + + def go(name): + stmt = lambda_stmt( + lambda: select([users.c.id]).where( # noqa + users.c.name == name + ) + ) + + with testing.db.connect().execution_options( + compiled_cache=cache + ) as conn: + conn.execute(stmt) + + with self.sql_execution_asserter(testing.db) as asserter: + go("name1") + go("name2") + go("name1") + go("name3") + + asserter.assert_( + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name1"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name2"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name1"}], + ), + CompiledSQL( + "SELECT users.id FROM users WHERE users.name = :name_1", + lambda ctx: [{"name_1": "name3"}], + ), + ) + + def test_cache_key_thing(self): + t1 = table("t1", column("q"), column("p")) + + def go(x): + return coercions.expect(roles.WhereHavingRole, lambda: t1.c.q == x) + + expr1 = go(5) + expr2 = go(10) + + is_(expr1._generate_cache_key().bindparams[0], expr1._resolved.right) + is_(expr2._generate_cache_key().bindparams[0], expr2._resolved.right) |