summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-08-18 10:02:24 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2019-08-30 17:38:09 -0400
commit36e8fe48b2332ecc44b506d1f86cc6ab3bb65f07 (patch)
tree38a218519ba4618fb6290c6851a4510b0ffed0a3 /test/sql
parentf499671ccc30cd42d6e3beb6ddec60e104bff9c5 (diff)
downloadsqlalchemy-36e8fe48b2332ecc44b506d1f86cc6ab3bb65f07.tar.gz
Render LIMIT/OFFSET conditions after compile on select dialects
Added new "post compile parameters" feature. This feature allows a :func:`.bindparam` construct to have its value rendered into the SQL string before being passed to the DBAPI driver, but after the compilation step, using the "literal render" feature of the compiler. The immediate rationale for this feature is to support LIMIT/OFFSET schemes that don't work or perform well as bound parameters handled by the database driver, while still allowing for SQLAlchemy SQL constructs to be cacheable in their compiled form. The immediate targets for the new feature are the "TOP N" clause used by SQL Server (and Sybase) which does not support a bound parameter, as well as the "ROWNUM" and optional "FIRST_ROWS()" schemes used by the Oracle dialect, the former of which has been known to perform better without bound parameters and the latter of which does not support a bound parameter. The feature builds upon the mechanisms first developed to support "expanding" parameters for IN expressions. As part of this feature, the Oracle ``use_binds_for_limits`` feature is turned on unconditionally and this flag is now deprecated. - adds limited support for "unique" bound parameters within a text() construct. - adds an additional int() check within the literal render function of the Integer datatype and tests that non-int values raise ValueError. Fixes: #4808 Change-Id: Iace97d544d1a7351ee07db970c6bc06a19c712c6
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_compiler.py961
-rw-r--r--test/sql/test_roles.py15
-rw-r--r--test/sql/test_text.py13
-rw-r--r--test/sql/test_types.py27
4 files changed, 556 insertions, 460 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index 03e18e921..ebc0fc631 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -311,20 +311,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
checkparams=params,
)
- def test_limit_offset_select_literal_binds(self):
- stmt = select([1]).limit(5).offset(6)
- self.assert_compile(
- stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
- )
-
- def test_limit_offset_compound_select_literal_binds(self):
- stmt = select([1]).union(select([2])).limit(5).offset(6)
- self.assert_compile(
- stmt,
- "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
- literal_binds=True,
- )
-
def test_select_precol_compile_ordering(self):
s1 = (
select([column("x")])
@@ -1304,20 +1290,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT mytable.myid FROM mytable",
)
- def test_multiple_col_binds(self):
- self.assert_compile(
- select(
- [literal_column("*")],
- or_(
- table1.c.myid == 12,
- table1.c.myid == "asdf",
- table1.c.myid == "foo",
- ),
- ),
- "SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
- "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
- )
-
def test_order_by_nulls(self):
self.assert_compile(
table2.select(
@@ -1631,71 +1603,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=mysql.dialect(),
)
- def test_render_binds_as_literal(self):
- """test a compiler that renders binds inline into
- SQL in the columns clause."""
-
- dialect = default.DefaultDialect()
-
- class Compiler(dialect.statement_compiler):
- ansi_bind_rules = True
-
- dialect.statement_compiler = Compiler
-
- self.assert_compile(
- select([literal("someliteral")]),
- "SELECT 'someliteral' AS anon_1",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([table1.c.myid + 3]),
- "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([table1.c.myid.in_([4, 5, 6])]),
- "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([func.mod(table1.c.myid, 5)]),
- "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([literal("foo").in_([])]),
- "SELECT 1 != 1 AS anon_1",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([literal(util.b("foo"))]),
- "SELECT 'foo' AS anon_1",
- dialect=dialect,
- )
-
- # test callable
- self.assert_compile(
- select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
- "SELECT mytable.myid = 5 AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
- empty_in_dialect.statement_compiler = Compiler
-
- assert_raises_message(
- exc.CompileError,
- "Bind parameter 'foo' without a "
- "renderable value not allowed here.",
- bindparam("foo").in_([]).compile,
- dialect=empty_in_dialect,
- )
-
def test_collate(self):
# columns clause
self.assert_compile(
@@ -2214,373 +2121,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
" LIMIT -1 OFFSET :param_2) AS anon_2",
)
- def test_binds(self):
- for (
- stmt,
- expected_named_stmt,
- expected_positional_stmt,
- expected_default_params_dict,
- expected_default_params_list,
- test_param_dict,
- expected_test_params_dict,
- expected_test_params_list,
- ) in [
- (
- select(
- [table1, table2],
- and_(
- table1.c.myid == table2.c.otherid,
- table1.c.name == bindparam("mytablename"),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername FROM mytable, "
- "myothertable WHERE mytable.myid = myothertable.otherid "
- "AND mytable.name = :mytablename",
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername FROM mytable, "
- "myothertable WHERE mytable.myid = myothertable.otherid AND "
- "mytable.name = ?",
- {"mytablename": None},
- [None],
- {"mytablename": 5},
- {"mytablename": 5},
- [5],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myid"),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable, myothertable WHERE mytable.myid = :myid "
- "OR myothertable.otherid = :myid",
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable, myothertable WHERE mytable.myid = ? "
- "OR myothertable.otherid = ?",
- {"myid": None},
- [None, None],
- {"myid": 5},
- {"myid": 5},
- [5, 5],
- ),
- (
- text(
- "SELECT mytable.myid, mytable.name, "
- "mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid"
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = ? OR "
- "myothertable.otherid = ?",
- {"myid": None},
- [None, None],
- {"myid": 5},
- {"myid": 5},
- [5, 5],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid", unique=True),
- table2.c.otherid == bindparam("myid", unique=True),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid_1 OR myothertable.otherid = :myid_2",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = ? "
- "OR myothertable.otherid = ?",
- {"myid_1": None, "myid_2": None},
- [None, None],
- {"myid_1": 5, "myid_2": 6},
- {"myid_1": 5, "myid_2": 6},
- [5, 6],
- ),
- (
- bindparam("test", type_=String, required=False) + text("'hi'"),
- ":test || 'hi'",
- "? || 'hi'",
- {"test": None},
- [None],
- {},
- {"test": None},
- [None],
- ),
- (
- # testing select.params() here - bindparam() objects
- # must get required flag set to False
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myotherid"),
- ),
- ).params({"myid": 8, "myotherid": 7}),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid OR myothertable.otherid = :myotherid",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- "? OR myothertable.otherid = ?",
- {"myid": 8, "myotherid": 7},
- [8, 7],
- {"myid": 5},
- {"myid": 5, "myotherid": 7},
- [5, 7],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid
- == bindparam("myid", value=7, unique=True),
- table2.c.otherid
- == bindparam("myid", value=8, unique=True),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid_1 OR myothertable.otherid = :myid_2",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- "? OR myothertable.otherid = ?",
- {"myid_1": 7, "myid_2": 8},
- [7, 8],
- {"myid_1": 5, "myid_2": 6},
- {"myid_1": 5, "myid_2": 6},
- [5, 6],
- ),
- ]:
-
- self.assert_compile(
- stmt, expected_named_stmt, params=expected_default_params_dict
- )
- self.assert_compile(
- stmt, expected_positional_stmt, dialect=sqlite.dialect()
- )
- nonpositional = stmt.compile()
- positional = stmt.compile(dialect=sqlite.dialect())
- pp = positional.params
- eq_(
- [pp[k] for k in positional.positiontup],
- expected_default_params_list,
- )
-
- eq_(
- nonpositional.construct_params(test_param_dict),
- expected_test_params_dict,
- )
- pp = positional.construct_params(test_param_dict)
- eq_(
- [pp[k] for k in positional.positiontup],
- expected_test_params_list,
- )
-
- # check that params() doesn't modify original statement
- s = select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myotherid"),
- ),
- )
- s2 = s.params({"myid": 8, "myotherid": 7})
- s3 = s2.params({"myid": 9})
- assert s.compile().params == {"myid": None, "myotherid": None}
- assert s2.compile().params == {"myid": 8, "myotherid": 7}
- assert s3.compile().params == {"myid": 9, "myotherid": 7}
-
- # test using same 'unique' param object twice in one compile
- s = (
- select([table1.c.myid])
- .where(table1.c.myid == 12)
- .scalar_subquery()
- )
- s2 = select([table1, s], table1.c.myid == s)
- self.assert_compile(
- s2,
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
- ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
- "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
- )
- positional = s2.compile(dialect=sqlite.dialect())
-
- pp = positional.params
- assert [pp[k] for k in positional.positiontup] == [12, 12]
-
- # check that conflicts with "unique" params are caught
- s = select(
- [table1],
- or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
- )
- assert_raises_message(
- exc.CompileError,
- "conflicts with unique bind parameter " "of the same name",
- str,
- s,
- )
-
- s = select(
- [table1],
- or_(
- table1.c.myid == 7,
- table1.c.myid == 8,
- table1.c.myid == bindparam("myid_1"),
- ),
- )
- assert_raises_message(
- exc.CompileError,
- "conflicts with unique bind parameter " "of the same name",
- str,
- s,
- )
-
- def _test_binds_no_hash_collision(self):
- """test that construct_params doesn't corrupt dict
- due to hash collisions"""
-
- total_params = 100000
-
- in_clause = [":in%d" % i for i in range(total_params)]
- params = dict(("in%d" % i, i) for i in range(total_params))
- t = text("text clause %s" % ", ".join(in_clause))
- eq_(len(t.bindparams), total_params)
- c = t.compile()
- pp = c.construct_params(params)
- eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
- eq_(len(set(pp.values())), total_params)
-
- def test_bind_as_col(self):
- t = table("foo", column("id"))
-
- s = select([t, literal("lala").label("hoho")])
- self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
-
- assert [str(c) for c in s.subquery().c] == ["anon_1.id", "anon_1.hoho"]
-
- def test_bind_callable(self):
- expr = column("x") == bindparam("key", callable_=lambda: 12)
- self.assert_compile(expr, "x = :key", {"x": 12})
-
- def test_bind_params_missing(self):
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x'",
- select([table1])
- .where(
- and_(
- table1.c.myid == bindparam("x", required=True),
- table1.c.name == bindparam("y", required=True),
- )
- )
- .compile()
- .construct_params,
- params=dict(y=5),
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x'",
- select([table1])
- .where(table1.c.myid == bindparam("x", required=True))
- .compile()
- .construct_params,
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x', "
- "in parameter group 2",
- select([table1])
- .where(
- and_(
- table1.c.myid == bindparam("x", required=True),
- table1.c.name == bindparam("y", required=True),
- )
- )
- .compile()
- .construct_params,
- params=dict(y=5),
- _group_number=2,
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x', "
- "in parameter group 2",
- select([table1])
- .where(table1.c.myid == bindparam("x", required=True))
- .compile()
- .construct_params,
- _group_number=2,
- )
-
- def test_tuple(self):
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
- "(mytable.myid, mytable.name) IN "
- "((:param_1, :param_2), (:param_3, :param_4))",
- )
-
- dialect = default.DefaultDialect()
- dialect.tuple_in_values = True
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
- "(mytable.myid, mytable.name) IN "
- "(VALUES (:param_1, :param_2), (:param_3, :param_4))",
- dialect=dialect,
- )
-
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- [tuple_(table2.c.otherid, table2.c.othername)]
- ),
- "(mytable.myid, mytable.name) IN "
- "((myothertable.otherid, myothertable.othername))",
- )
-
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- select([table2.c.otherid, table2.c.othername])
- ),
- "(mytable.myid, mytable.name) IN (SELECT "
- "myothertable.otherid, myothertable.othername FROM myothertable)",
- )
-
- def test_expanding_parameter(self):
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- bindparam("foo", expanding=True)
- ),
- "(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
- )
-
- dialect = default.DefaultDialect()
- dialect.tuple_in_values = True
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- bindparam("foo", expanding=True)
- ),
- "(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
- dialect=dialect,
- )
-
- self.assert_compile(
- table1.c.myid.in_(bindparam("foo", expanding=True)),
- "mytable.myid IN ([EXPANDING_foo])",
- )
-
def test_cast(self):
tbl = table(
"casttest",
@@ -3250,6 +2790,507 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises(exc.ArgumentError, and_, ("a",), ("b",))
+class BindParameterTest(AssertsCompiledSQL, fixtures.TestBase):
+ __dialect__ = "default"
+
+ def test_binds(self):
+ for (
+ stmt,
+ expected_named_stmt,
+ expected_positional_stmt,
+ expected_default_params_dict,
+ expected_default_params_list,
+ test_param_dict,
+ expected_test_params_dict,
+ expected_test_params_list,
+ ) in [
+ (
+ select(
+ [table1, table2],
+ and_(
+ table1.c.myid == table2.c.otherid,
+ table1.c.name == bindparam("mytablename"),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername FROM mytable, "
+ "myothertable WHERE mytable.myid = myothertable.otherid "
+ "AND mytable.name = :mytablename",
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername FROM mytable, "
+ "myothertable WHERE mytable.myid = myothertable.otherid AND "
+ "mytable.name = ?",
+ {"mytablename": None},
+ [None],
+ {"mytablename": 5},
+ {"mytablename": 5},
+ [5],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myid"),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable, myothertable WHERE mytable.myid = :myid "
+ "OR myothertable.otherid = :myid",
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable, myothertable WHERE mytable.myid = ? "
+ "OR myothertable.otherid = ?",
+ {"myid": None},
+ [None, None],
+ {"myid": 5},
+ {"myid": 5},
+ [5, 5],
+ ),
+ (
+ text(
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid"
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = ? OR "
+ "myothertable.otherid = ?",
+ {"myid": None},
+ [None, None],
+ {"myid": 5},
+ {"myid": 5},
+ [5, 5],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid", unique=True),
+ table2.c.otherid == bindparam("myid", unique=True),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid_1 OR myothertable.otherid = :myid_2",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = ? "
+ "OR myothertable.otherid = ?",
+ {"myid_1": None, "myid_2": None},
+ [None, None],
+ {"myid_1": 5, "myid_2": 6},
+ {"myid_1": 5, "myid_2": 6},
+ [5, 6],
+ ),
+ (
+ bindparam("test", type_=String, required=False) + text("'hi'"),
+ ":test || 'hi'",
+ "? || 'hi'",
+ {"test": None},
+ [None],
+ {},
+ {"test": None},
+ [None],
+ ),
+ (
+ # testing select.params() here - bindparam() objects
+ # must get required flag set to False
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myotherid"),
+ ),
+ ).params({"myid": 8, "myotherid": 7}),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid OR myothertable.otherid = :myotherid",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ "? OR myothertable.otherid = ?",
+ {"myid": 8, "myotherid": 7},
+ [8, 7],
+ {"myid": 5},
+ {"myid": 5, "myotherid": 7},
+ [5, 7],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid
+ == bindparam("myid", value=7, unique=True),
+ table2.c.otherid
+ == bindparam("myid", value=8, unique=True),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid_1 OR myothertable.otherid = :myid_2",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ "? OR myothertable.otherid = ?",
+ {"myid_1": 7, "myid_2": 8},
+ [7, 8],
+ {"myid_1": 5, "myid_2": 6},
+ {"myid_1": 5, "myid_2": 6},
+ [5, 6],
+ ),
+ ]:
+
+ self.assert_compile(
+ stmt, expected_named_stmt, params=expected_default_params_dict
+ )
+ self.assert_compile(
+ stmt, expected_positional_stmt, dialect=sqlite.dialect()
+ )
+ nonpositional = stmt.compile()
+ positional = stmt.compile(dialect=sqlite.dialect())
+ pp = positional.params
+ eq_(
+ [pp[k] for k in positional.positiontup],
+ expected_default_params_list,
+ )
+
+ eq_(
+ nonpositional.construct_params(test_param_dict),
+ expected_test_params_dict,
+ )
+ pp = positional.construct_params(test_param_dict)
+ eq_(
+ [pp[k] for k in positional.positiontup],
+ expected_test_params_list,
+ )
+
+ # check that params() doesn't modify original statement
+ s = select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myotherid"),
+ ),
+ )
+ s2 = s.params({"myid": 8, "myotherid": 7})
+ s3 = s2.params({"myid": 9})
+ assert s.compile().params == {"myid": None, "myotherid": None}
+ assert s2.compile().params == {"myid": 8, "myotherid": 7}
+ assert s3.compile().params == {"myid": 9, "myotherid": 7}
+
+ # test using same 'unique' param object twice in one compile
+ s = (
+ select([table1.c.myid])
+ .where(table1.c.myid == 12)
+ .scalar_subquery()
+ )
+ s2 = select([table1, s], table1.c.myid == s)
+ self.assert_compile(
+ s2,
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
+ ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
+ )
+ positional = s2.compile(dialect=sqlite.dialect())
+
+ pp = positional.params
+ assert [pp[k] for k in positional.positiontup] == [12, 12]
+
+ # check that conflicts with "unique" params are caught
+ s = select(
+ [table1],
+ or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "conflicts with unique bind parameter " "of the same name",
+ str,
+ s,
+ )
+
+ s = select(
+ [table1],
+ or_(
+ table1.c.myid == 7,
+ table1.c.myid == 8,
+ table1.c.myid == bindparam("myid_1"),
+ ),
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "conflicts with unique bind parameter " "of the same name",
+ str,
+ s,
+ )
+
+ def _test_binds_no_hash_collision(self):
+ """test that construct_params doesn't corrupt dict
+ due to hash collisions"""
+
+ total_params = 100000
+
+ in_clause = [":in%d" % i for i in range(total_params)]
+ params = dict(("in%d" % i, i) for i in range(total_params))
+ t = text("text clause %s" % ", ".join(in_clause))
+ eq_(len(t.bindparams), total_params)
+ c = t.compile()
+ pp = c.construct_params(params)
+ eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
+ eq_(len(set(pp.values())), total_params)
+
+ def test_bind_as_col(self):
+ t = table("foo", column("id"))
+
+ s = select([t, literal("lala").label("hoho")])
+ self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
+
+ assert [str(c) for c in s.subquery().c] == ["anon_1.id", "anon_1.hoho"]
+
+ def test_bind_callable(self):
+ expr = column("x") == bindparam("key", callable_=lambda: 12)
+ self.assert_compile(expr, "x = :key", {"x": 12})
+
+ def test_bind_params_missing(self):
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x'",
+ select([table1])
+ .where(
+ and_(
+ table1.c.myid == bindparam("x", required=True),
+ table1.c.name == bindparam("y", required=True),
+ )
+ )
+ .compile()
+ .construct_params,
+ params=dict(y=5),
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x'",
+ select([table1])
+ .where(table1.c.myid == bindparam("x", required=True))
+ .compile()
+ .construct_params,
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x', "
+ "in parameter group 2",
+ select([table1])
+ .where(
+ and_(
+ table1.c.myid == bindparam("x", required=True),
+ table1.c.name == bindparam("y", required=True),
+ )
+ )
+ .compile()
+ .construct_params,
+ params=dict(y=5),
+ _group_number=2,
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x', "
+ "in parameter group 2",
+ select([table1])
+ .where(table1.c.myid == bindparam("x", required=True))
+ .compile()
+ .construct_params,
+ _group_number=2,
+ )
+
+ def test_tuple(self):
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
+ "(mytable.myid, mytable.name) IN "
+ "((:param_1, :param_2), (:param_3, :param_4))",
+ )
+
+ dialect = default.DefaultDialect()
+ dialect.tuple_in_values = True
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
+ "(mytable.myid, mytable.name) IN "
+ "(VALUES (:param_1, :param_2), (:param_3, :param_4))",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ [tuple_(table2.c.otherid, table2.c.othername)]
+ ),
+ "(mytable.myid, mytable.name) IN "
+ "((myothertable.otherid, myothertable.othername))",
+ )
+
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ select([table2.c.otherid, table2.c.othername])
+ ),
+ "(mytable.myid, mytable.name) IN (SELECT "
+ "myothertable.otherid, myothertable.othername FROM myothertable)",
+ )
+
+ def test_expanding_parameter(self):
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ bindparam("foo", expanding=True)
+ ),
+ "(mytable.myid, mytable.name) IN ([POSTCOMPILE_foo])",
+ )
+
+ dialect = default.DefaultDialect()
+ dialect.tuple_in_values = True
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ bindparam("foo", expanding=True)
+ ),
+ "(mytable.myid, mytable.name) IN ([POSTCOMPILE_foo])",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ table1.c.myid.in_(bindparam("foo", expanding=True)),
+ "mytable.myid IN ([POSTCOMPILE_foo])",
+ )
+
+ def test_limit_offset_select_literal_binds(self):
+ stmt = select([1]).limit(5).offset(6)
+ self.assert_compile(
+ stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
+ )
+
+ def test_limit_offset_compound_select_literal_binds(self):
+ stmt = select([1]).union(select([2])).limit(5).offset(6)
+ self.assert_compile(
+ stmt,
+ "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
+ literal_binds=True,
+ )
+
+ def test_multiple_col_binds(self):
+ self.assert_compile(
+ select(
+ [literal_column("*")],
+ or_(
+ table1.c.myid == 12,
+ table1.c.myid == "asdf",
+ table1.c.myid == "foo",
+ ),
+ ),
+ "SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
+ "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
+ )
+
+ def test_render_binds_as_literal(self):
+ """test a compiler that renders binds inline into
+ SQL in the columns clause."""
+
+ dialect = default.DefaultDialect()
+
+ class Compiler(dialect.statement_compiler):
+ ansi_bind_rules = True
+
+ dialect.statement_compiler = Compiler
+
+ self.assert_compile(
+ select([literal("someliteral")]),
+ "SELECT 'someliteral' AS anon_1",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([table1.c.myid + 3]),
+ "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([table1.c.myid.in_([4, 5, 6])]),
+ "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([func.mod(table1.c.myid, 5)]),
+ "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([literal("foo").in_([])]),
+ "SELECT 1 != 1 AS anon_1",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([literal(util.b("foo"))]),
+ "SELECT 'foo' AS anon_1",
+ dialect=dialect,
+ )
+
+ # test callable
+ self.assert_compile(
+ select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
+ "SELECT mytable.myid = 5 AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
+ empty_in_dialect.statement_compiler = Compiler
+
+ assert_raises_message(
+ exc.CompileError,
+ "Bind parameter 'foo' without a "
+ "renderable value not allowed here.",
+ bindparam("foo").in_([]).compile,
+ dialect=empty_in_dialect,
+ )
+
+ def test_render_literal_execute_parameter(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid == bindparam("foo", 5, literal_execute=True)
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid = [POSTCOMPILE_foo]",
+ )
+
+ def test_render_literal_execute_parameter_literal_binds(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid == bindparam("foo", 5, literal_execute=True)
+ ),
+ "SELECT mytable.myid FROM mytable " "WHERE mytable.myid = 5",
+ literal_binds=True,
+ )
+
+ def test_render_expanding_parameter(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid.in_(bindparam("foo", expanding=True))
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid IN ([POSTCOMPILE_foo])",
+ )
+
+ def test_render_expanding_parameter_literal_binds(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid.in_(bindparam("foo", [1, 2, 3], expanding=True))
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid IN (1, 2, 3)",
+ literal_binds=True,
+ )
+
+
class UnsupportedTest(fixtures.TestBase):
def test_unsupported_element_str_visit_name(self):
from sqlalchemy.sql.expression import ClauseElement
diff --git a/test/sql/test_roles.py b/test/sql/test_roles.py
index de5951d20..617f8c786 100644
--- a/test/sql/test_roles.py
+++ b/test/sql/test_roles.py
@@ -1,3 +1,4 @@
+from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import Integer
@@ -22,6 +23,7 @@ from sqlalchemy.sql.elements import _truncated_label
from sqlalchemy.sql.elements import Null
from sqlalchemy.sql.selectable import FromGrouping
from sqlalchemy.sql.selectable import SelectStatementGrouping
+from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
@@ -201,6 +203,19 @@ class RoleTest(fixtures.TestBase):
)
)
+ def test_offset_or_limit_role_only_ints_or_clauseelement(self):
+ assert_raises(ValueError, select([t]).limit, "some limit")
+
+ assert_raises(ValueError, select([t]).offset, "some offset")
+
+ def test_offset_or_limit_role_clauseelement(self):
+ bind = bindparam("x")
+ stmt = select([t]).limit(bind)
+ is_(stmt._limit_clause, bind)
+
+ stmt = select([t]).offset(bind)
+ is_(stmt._offset_clause, bind)
+
def test_from_clause_is_not_a_select(self):
assert_raises_message(
exc.ArgumentError,
diff --git a/test/sql/test_text.py b/test/sql/test_text.py
index 35d909ef8..9483d10b0 100644
--- a/test/sql/test_text.py
+++ b/test/sql/test_text.py
@@ -282,6 +282,19 @@ class BindParamTest(fixtures.TestBase, AssertsCompiledSQL):
dialect="postgresql",
)
+ def test_unique_binds(self):
+ # unique binds can be used in text() however they uniquify across
+ # multiple text() constructs only, not within a single text
+
+ t1 = text("select :foo").bindparams(bindparam("foo", 5, unique=True))
+ t2 = text("select :foo").bindparams(bindparam("foo", 10, unique=True))
+ stmt = select([t1, t2])
+ self.assert_compile(
+ stmt,
+ "SELECT select :foo_1, select :foo_2",
+ checkparams={"foo_1": 5, "foo_2": 10},
+ )
+
def test_binds_compiled_positional(self):
self.assert_compile(
text(
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index 584bdf6a5..7bf83b461 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -62,6 +62,7 @@ from sqlalchemy.schema import AddConstraint
from sqlalchemy.schema import CheckConstraint
from sqlalchemy.sql import column
from sqlalchemy.sql import ddl
+from sqlalchemy.sql import elements
from sqlalchemy.sql import null
from sqlalchemy.sql import operators
from sqlalchemy.sql import sqltypes
@@ -2363,6 +2364,20 @@ class ExpressionTest(
],
)
+ def test_grouped_bind_adapt(self):
+ expr = test_table.c.atimestamp == elements.Grouping(
+ bindparam("thedate")
+ )
+ eq_(expr.right.type._type_affinity, Date)
+ eq_(expr.right.element.type._type_affinity, Date)
+
+ expr = test_table.c.atimestamp == elements.Grouping(
+ elements.Grouping(bindparam("thedate"))
+ )
+ eq_(expr.right.type._type_affinity, Date)
+ eq_(expr.right.element.type._type_affinity, Date)
+ eq_(expr.right.element.element.type._type_affinity, Date)
+
def test_bind_adapt_update(self):
bp = bindparam("somevalue")
stmt = test_table.update().values(avalue=bp)
@@ -2883,6 +2898,18 @@ class IntervalTest(fixtures.TestBase, AssertsExecutionResults):
eq_(row["non_native_interval"], None)
+class IntegerTest(fixtures.TestBase):
+ def test_integer_literal_processor(self):
+ typ = Integer()
+ eq_(typ._cached_literal_processor(testing.db.dialect)(5), "5")
+
+ assert_raises(
+ ValueError,
+ typ._cached_literal_processor(testing.db.dialect),
+ "notanint",
+ )
+
+
class BooleanTest(
fixtures.TablesTest, AssertsExecutionResults, AssertsCompiledSQL
):