diff options
Diffstat (limited to 'test/sql/test_select.py')
| -rw-r--r-- | test/sql/test_select.py | 211 |
1 files changed, 160 insertions, 51 deletions
diff --git a/test/sql/test_select.py b/test/sql/test_select.py index 33bbe5ff4..d27819c18 100644 --- a/test/sql/test_select.py +++ b/test/sql/test_select.py @@ -684,58 +684,94 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def test_orderby_groupby(self): self.assert_compile( table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]), - "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC" + "SELECT myothertable.otherid, myothertable.othername FROM " + "myothertable ORDER BY myothertable.otherid, myothertable.othername ASC" ) self.assert_compile( table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]), - "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC" + "SELECT myothertable.otherid, myothertable.othername FROM " + "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC" ) # generative order_by self.assert_compile( table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()), - "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC" + "SELECT myothertable.otherid, myothertable.othername FROM " + "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC" ) self.assert_compile( - table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None), + table2.select().order_by(table2.c.otherid). + order_by(table2.c.othername.desc()).order_by(None), "SELECT myothertable.otherid, myothertable.othername FROM myothertable" ) self.assert_compile( - select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername]), - "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername" + select( + [table2.c.othername, func.count(table2.c.otherid)], + group_by = [table2.c.othername]), + "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 " + "FROM myothertable GROUP BY myothertable.othername" ) # generative group by self.assert_compile( - select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername), - "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername" + select([table2.c.othername, func.count(table2.c.otherid)]). + group_by(table2.c.othername), + "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 " + "FROM myothertable GROUP BY myothertable.othername" ) self.assert_compile( - select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername).group_by(None), - "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable" + select([table2.c.othername, func.count(table2.c.otherid)]). + group_by(table2.c.othername).group_by(None), + "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 " + "FROM myothertable" ) self.assert_compile( - select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]), - "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername" + select([table2.c.othername, func.count(table2.c.otherid)], + group_by = [table2.c.othername], + order_by = [table2.c.othername]), + "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 " + "FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername" ) def test_for_update(self): - self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + self.assert_compile( + table1.select(table1.c.myid==7, for_update=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + self.assert_compile( + table1.select(table1.c.myid==7, for_update="nowait"), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect()) + self.assert_compile( + table1.select(table1.c.myid==7, for_update="nowait"), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", + dialect=oracle.dialect()) - self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect()) + self.assert_compile( + table1.select(table1.c.myid==7, for_update="read"), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", + dialect=mysql.dialect()) - self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect()) + self.assert_compile( + table1.select(table1.c.myid==7, for_update=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %s FOR UPDATE", + dialect=mysql.dialect()) - self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", dialect=oracle.dialect()) + self.assert_compile( + table1.select(table1.c.myid==7, for_update=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", + dialect=oracle.dialect()) def test_alias(self): # test the alias for a table1. column names stay the same, table name "changes" to "foo". @@ -750,32 +786,42 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ,dialect=dialect) self.assert_compile( - select([table1.alias()]) - ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1") + select([table1.alias()]), + "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " + "FROM mytable AS mytable_1") - # create a select for a join of two tables. use_labels means the column names will have - # labels tablename_columnname, which become the column keys accessible off the Selectable object. - # also, only use one column from the second table and all columns from the first table1. - q = select([table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels = True) + # create a select for a join of two tables. use_labels + # means the column names will have labels tablename_columnname, + # which become the column keys accessible off the Selectable object. + # also, only use one column from the second table and all columns + # from the first table1. + q = select( + [table1, table2.c.otherid], + table1.c.myid == table2.c.otherid, use_labels = True + ) - # make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view". + # make an alias of the "selectable". column names + # stay the same (i.e. the labels), table name "changes" to "t2view". a = alias(q, 't2view') # select from that alias, also using labels. two levels of labels should produce two underscores. # also, reference the column "mytable_myid" off of the t2view alias. self.assert_compile( a.select(a.c.mytable_myid == 9, use_labels = True), - "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \ -t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \ -(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \ -myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \ -WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1" + "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name " + "AS t2view_mytable_name, t2view.mytable_description AS t2view_mytable_description, " + "t2view.myothertable_otherid AS t2view_myothertable_otherid FROM " + "(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, " + "mytable.description AS mytable_description, myothertable.otherid AS " + "myothertable_otherid FROM mytable, myothertable WHERE mytable.myid = " + "myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1" ) def test_prefixes(self): self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), - "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable" + "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING " + "mytable.myid, mytable.name, mytable.description FROM mytable" ) def test_text(self): @@ -789,16 +835,20 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = ["foobar(a)", "pk_foo_bar(syslaal)"], "a = 12", from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"] - ), - "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12") + ), + "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " + "left outer join lala on foobar.foo = lala.foo WHERE a = 12" + ) # test unicode self.assert_compile(select( [u"foobar(a)", u"pk_foo_bar(syslaal)"], u"a = 12", from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"] - ), - u"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12") + ), + "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " + "left outer join lala on foobar.foo = lala.foo WHERE a = 12" + ) # test building a select query programmatically with text s = select() @@ -808,11 +858,13 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = s.append_whereclause("column2=19") s = s.order_by("column1") s.append_from("table1") - self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1") + self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE " + "column1=12 AND column2=19 ORDER BY column1") self.assert_compile( select(["column1", "column2"], from_obj=table1).alias('somealias').select(), - "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias" + "SELECT somealias.column1, somealias.column2 FROM " + "(SELECT column1, column2 FROM mytable) AS somealias" ) # test that use_labels doesnt interfere with literal columns @@ -827,14 +879,13 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = "SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable" ) - print "---------------------------------------------" s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]) - print "---------------------------------------------" # test that "auto-labeling of subquery columns" doesnt interfere with literal columns, # exported columns dont get quoted self.assert_compile( select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(), - "SELECT column1 AS foobar, column2 AS hoho, myid FROM (SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)" + "SELECT column1 AS foobar, column2 AS hoho, myid FROM " + "(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)" ) self.assert_compile( @@ -844,7 +895,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = def test_binds_in_text(self): self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), + text("select * from foo where lala=:bar and hoho=:whee", + bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), "select * from foo where lala=:bar and hoho=:whee", checkparams={'bar':4, 'whee': 7}, ) @@ -858,7 +910,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = dialect = postgresql.dialect() self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), + text("select * from foo where lala=:bar and hoho=:whee", + bindparams=[bindparam('bar',4), bindparam('whee',7)]), "select * from foo where lala=%(bar)s and hoho=%(whee)s", checkparams={'bar':4, 'whee': 7}, dialect=dialect @@ -875,7 +928,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = dialect = sqlite.dialect() self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), + text("select * from foo where lala=:bar and hoho=:whee", + bindparams=[bindparam('bar',4), bindparam('whee',7)]), "select * from foo where lala=? and hoho=?", checkparams={'bar':4, 'whee':7}, dialect=dialect @@ -889,25 +943,80 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = table1.c.myid == table2.c.otherid, ) ), - "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \ -FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid") + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, sysdate(), foo, bar, lala " + "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND " + "datetime(foo) = Today AND mytable.myid = myothertable.otherid") self.assert_compile(select( [alias(table1, 't'), "foo.f"], "foo.f = t.id", from_obj = ["(select f from bar where lala=heyhey) foo"] ), - "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id") + "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, " + "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id") # test Text embedded within select_from(), using binds - generate_series = text("generate_series(:x, :y, :z) as s(a)", bindparams=[bindparam('x'), bindparam('y'), bindparam('z')]) - - s =select([(func.current_date() + literal_column("s.a")).label("dates")]).select_from(generate_series) - self.assert_compile(s, "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': None, 'x': None, 'z': None}) + generate_series = text( + "generate_series(:x, :y, :z) as s(a)", + bindparams=[bindparam('x'), bindparam('y'), bindparam('z')] + ) + + s =select([ + (func.current_date() + literal_column("s.a")).label("dates") + ]).select_from(generate_series) + self.assert_compile( + s, + "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", + checkparams={'y': None, 'x': None, 'z': None} + ) + + self.assert_compile( + s.params(x=5, y=6, z=7), + "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", + checkparams={'y': 6, 'x': 5, 'z': 7} + ) + + @testing.emits_warning('.*empty sequence.*') + def test_render_binds_as_literal(self): + """test a compiler that renders binds inline into + SQL in the columns clause.""" - self.assert_compile(s.params(x=5, y=6, z=7), "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': 6, 'x': 5, 'z': 7}) + dialect = default.DefaultDialect() + class Compiler(dialect.statement_compiler): + ansi_bind_rules = True + dialect.statement_compiler = Compiler + self.assert_compile( + select([literal("someliteral")]), + "SELECT 'someliteral'", + dialect=dialect + ) + + self.assert_compile( + select([table1.c.myid + 3]), + "SELECT mytable.myid + 3 AS anon_1 FROM mytable", + dialect=dialect + ) + self.assert_compile( + select([table1.c.myid.in_([4, 5, 6])]), + "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable", + dialect=dialect + ) + + self.assert_compile( + select([literal("foo").in_([])]), + "SELECT 'foo' != 'foo' AS anon_1", + dialect=dialect + ) + + assert_raises( + exc.CompileError, + bindparam("foo").in_([]).compile, dialect=dialect + ) + + def test_literal(self): self.assert_compile(select([literal('foo')]), "SELECT :param_1") |
