diff options
Diffstat (limited to 'test/sql/test_generative.py')
-rw-r--r-- | test/sql/test_generative.py | 293 |
1 files changed, 157 insertions, 136 deletions
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index e868cbe88..8b2abef0e 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -590,13 +590,18 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): def test_correlated_select(self): s = select(['*'], t1.c.col1 == t2.c.col1, from_obj=[t1, t2]).correlate(t2) + class Vis(CloningVisitor): def visit_select(self, select): select.append_whereclause(t1.c.col2 == 7) - self.assert_compile(Vis().traverse(s), - "SELECT * FROM table1 WHERE table1.col1 = table2.col1 " - "AND table1.col2 = :col2_1") + self.assert_compile( + select([t2]).where(t2.c.col1 == Vis().traverse(s)), + "SELECT table2.col1, table2.col2, table2.col3 " + "FROM table2 WHERE table2.col1 = " + "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 " + "AND table1.col2 = :col2_1)" + ) def test_this_thing(self): s = select([t1]).where(t1.c.col1 == 'foo').alias() @@ -616,35 +621,49 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): 'AS table1_1 WHERE table1_1.col1 = ' ':col1_1) AS anon_1') - def test_select_fromtwice(self): + def test_select_fromtwice_one(self): t1a = t1.alias() - s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1) + s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1a) + s = select([t1]).where(t1.c.col1 == s) self.assert_compile(s, - 'SELECT 1 FROM table1 AS table1_1 WHERE ' - 'table1.col1 = table1_1.col1') - + "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " + "WHERE table1.col1 = " + "(SELECT 1 FROM table1, table1 AS table1_1 " + "WHERE table1.col1 = table1_1.col1)" + ) s = CloningVisitor().traverse(s) self.assert_compile(s, - 'SELECT 1 FROM table1 AS table1_1 WHERE ' - 'table1.col1 = table1_1.col1') + "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " + "WHERE table1.col1 = " + "(SELECT 1 FROM table1, table1 AS table1_1 " + "WHERE table1.col1 = table1_1.col1)") + def test_select_fromtwice_two(self): s = select([t1]).where(t1.c.col1 == 'foo').alias() s2 = select([1], t1.c.col1 == s.c.col1, from_obj=s).correlate(t1) - self.assert_compile(s2, - 'SELECT 1 FROM (SELECT table1.col1 AS ' - 'col1, table1.col2 AS col2, table1.col3 AS ' - 'col3 FROM table1 WHERE table1.col1 = ' - ':col1_1) AS anon_1 WHERE table1.col1 = ' - 'anon_1.col1') - s2 = ReplacingCloningVisitor().traverse(s2) - self.assert_compile(s2, - 'SELECT 1 FROM (SELECT table1.col1 AS ' - 'col1, table1.col2 AS col2, table1.col3 AS ' - 'col3 FROM table1 WHERE table1.col1 = ' - ':col1_1) AS anon_1 WHERE table1.col1 = ' - 'anon_1.col1') + s3 = select([t1]).where(t1.c.col1 == s2) + self.assert_compile(s3, + "SELECT table1.col1, table1.col2, table1.col3 " + "FROM table1 WHERE table1.col1 = " + "(SELECT 1 FROM " + "(SELECT table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1 " + "WHERE table1.col1 = :col1_1) " + "AS anon_1 WHERE table1.col1 = anon_1.col1)" + ) + + s4 = ReplacingCloningVisitor().traverse(s3) + self.assert_compile(s4, + "SELECT table1.col1, table1.col2, table1.col3 " + "FROM table1 WHERE table1.col1 = " + "(SELECT 1 FROM " + "(SELECT table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1 " + "WHERE table1.col1 = :col1_1) " + "AS anon_1 WHERE table1.col1 = anon_1.col1)" + ) class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -763,67 +782,125 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): 'FROM addresses WHERE users_1.id = ' 'addresses.user_id') - def test_table_to_alias(self): - + def test_table_to_alias_1(self): t1alias = t1.alias('t1alias') vis = sql_util.ClauseAdapter(t1alias) ff = vis.traverse(func.count(t1.c.col1).label('foo')) assert list(_from_objects(ff)) == [t1alias] + def test_table_to_alias_2(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile(vis.traverse(select(['*'], from_obj=[t1])), 'SELECT * FROM table1 AS t1alias') + + def test_table_to_alias_3(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile(select(['*'], t1.c.col1 == t2.c.col2), 'SELECT * FROM table1, table2 WHERE ' 'table1.col1 = table2.col2') + + def test_table_to_alias_4(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile(vis.traverse(select(['*'], t1.c.col1 == t2.c.col2)), 'SELECT * FROM table1 AS t1alias, table2 ' 'WHERE t1alias.col1 = table2.col2') + + def test_table_to_alias_5(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile(vis.traverse(select(['*'], t1.c.col1 == t2.c.col2, from_obj=[t1, t2])), 'SELECT * FROM table1 AS t1alias, table2 ' 'WHERE t1alias.col1 = table2.col2') - self.assert_compile(vis.traverse(select(['*'], t1.c.col1 - == t2.c.col2, from_obj=[t1, - t2]).correlate(t1)), - 'SELECT * FROM table2 WHERE t1alias.col1 = ' - 'table2.col2') - self.assert_compile(vis.traverse(select(['*'], t1.c.col1 - == t2.c.col2, from_obj=[t1, - t2]).correlate(t2)), - 'SELECT * FROM table1 AS t1alias WHERE ' - 't1alias.col1 = table2.col2') + + def test_table_to_alias_6(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) + self.assert_compile( + select([t1alias, t2]).where(t1alias.c.col1 == + vis.traverse(select(['*'], + t1.c.col1 == t2.c.col2, + from_obj=[t1, t2]).correlate(t1))), + "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " + "table2.col1, table2.col2, table2.col3 " + "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = " + "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)" + ) + + def test_table_to_alias_7(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) + self.assert_compile( + select([t1alias, t2]).where(t1alias.c.col1 == + vis.traverse(select(['*'], + t1.c.col1 == t2.c.col2, + from_obj=[t1, t2]).correlate(t2))), + "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " + "table2.col1, table2.col2, table2.col3 " + "FROM table1 AS t1alias, table2 " + "WHERE t1alias.col1 = " + "(SELECT * FROM table1 AS t1alias " + "WHERE t1alias.col1 = table2.col2)") + + def test_table_to_alias_8(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile(vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)), 'CASE WHEN (t1alias.col1 = :col1_1) THEN ' 't1alias.col2 ELSE t1alias.col1 END') + + def test_table_to_alias_9(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile(vis.traverse(case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1)), 'CASE t1alias.col1 WHEN :param_1 THEN ' 't1alias.col2 ELSE t1alias.col1 END') + def test_table_to_alias_10(self): s = select(['*'], from_obj=[t1]).alias('foo') self.assert_compile(s.select(), 'SELECT foo.* FROM (SELECT * FROM table1) ' 'AS foo') + + def test_table_to_alias_11(self): + s = select(['*'], from_obj=[t1]).alias('foo') + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile(vis.traverse(s.select()), 'SELECT foo.* FROM (SELECT * FROM table1 ' 'AS t1alias) AS foo') + + def test_table_to_alias_12(self): + s = select(['*'], from_obj=[t1]).alias('foo') self.assert_compile(s.select(), 'SELECT foo.* FROM (SELECT * FROM table1) ' 'AS foo') + + def test_table_to_alias_13(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) ff = vis.traverse(func.count(t1.c.col1).label('foo')) self.assert_compile(select([ff]), 'SELECT count(t1alias.col1) AS foo FROM ' 'table1 AS t1alias') assert list(_from_objects(ff)) == [t1alias] + #def test_table_to_alias_2(self): # TODO: self.assert_compile(vis.traverse(select([func.count(t1.c # .col1).l abel('foo')]), clone=True), "SELECT # count(t1alias.col1) AS foo FROM table1 AS t1alias") + def test_table_to_alias_14(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) t2alias = t2.alias('t2alias') vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile(vis.traverse(select(['*'], t1.c.col1 @@ -831,28 +908,59 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): 'SELECT * FROM table1 AS t1alias, table2 ' 'AS t2alias WHERE t1alias.col1 = ' 't2alias.col2') + + def test_table_to_alias_15(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) + t2alias = t2.alias('t2alias') + vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile(vis.traverse(select(['*'], t1.c.col1 == t2.c.col2, from_obj=[t1, t2])), 'SELECT * FROM table1 AS t1alias, table2 ' 'AS t2alias WHERE t1alias.col1 = ' 't2alias.col2') - self.assert_compile(vis.traverse(select(['*'], t1.c.col1 - == t2.c.col2, from_obj=[t1, - t2]).correlate(t1)), - 'SELECT * FROM table2 AS t2alias WHERE ' - 't1alias.col1 = t2alias.col2') - self.assert_compile(vis.traverse(select(['*'], t1.c.col1 - == t2.c.col2, from_obj=[t1, - t2]).correlate(t2)), - 'SELECT * FROM table1 AS t1alias WHERE ' - 't1alias.col1 = t2alias.col2') + + def test_table_to_alias_16(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) + t2alias = t2.alias('t2alias') + vis.chain(sql_util.ClauseAdapter(t2alias)) + self.assert_compile( + select([t1alias, t2alias]).where( + t1alias.c.col1 == + vis.traverse(select(['*'], + t1.c.col1 == t2.c.col2, + from_obj=[t1, t2]).correlate(t1)) + ), + "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " + "t2alias.col1, t2alias.col2, t2alias.col3 " + "FROM table1 AS t1alias, table2 AS t2alias " + "WHERE t1alias.col1 = " + "(SELECT * FROM table2 AS t2alias " + "WHERE t1alias.col1 = t2alias.col2)" + ) + + def test_table_to_alias_17(self): + t1alias = t1.alias('t1alias') + vis = sql_util.ClauseAdapter(t1alias) + t2alias = t2.alias('t2alias') + vis.chain(sql_util.ClauseAdapter(t2alias)) + self.assert_compile( + t2alias.select().where(t2alias.c.col2 == + vis.traverse(select(['*'], + t1.c.col1 == t2.c.col2, + from_obj=[t1, t2]).correlate(t2))), + 'SELECT t2alias.col1, t2alias.col2, t2alias.col3 ' + 'FROM table2 AS t2alias WHERE t2alias.col2 = ' + '(SELECT * FROM table1 AS t1alias WHERE ' + 't1alias.col1 = t2alias.col2)') def test_include_exclude(self): m = MetaData() - a=Table( 'a',m, - Column( 'id', Integer, primary_key=True), - Column( 'xxx_id', Integer, - ForeignKey( 'a.id', name='adf',use_alter=True ) + a = Table('a', m, + Column('id', Integer, primary_key=True), + Column('xxx_id', Integer, + ForeignKey('a.id', name='adf', use_alter=True) ) ) @@ -1167,93 +1275,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'SELECT table1.col1, table1.col2, ' 'table1.col3 FROM table1') - def test_correlation(self): - s = select([t2], t1.c.col1 == t2.c.col1) - self.assert_compile(s, - 'SELECT table2.col1, table2.col2, ' - 'table2.col3 FROM table2, table1 WHERE ' - 'table1.col1 = table2.col1') - s2 = select([t1], t1.c.col2 == s.c.col2) - # dont correlate in a FROM entry - self.assert_compile(s2, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1, (SELECT ' - 'table2.col1 AS col1, table2.col2 AS col2, ' - 'table2.col3 AS col3 FROM table2, table1 WHERE ' - 'table1.col1 = table2.col1) WHERE ' - 'table1.col2 = col2') - s3 = s.correlate(None) - self.assert_compile(select([t1], t1.c.col2 == s3.c.col2), - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1, (SELECT ' - 'table2.col1 AS col1, table2.col2 AS col2, ' - 'table2.col3 AS col3 FROM table2, table1 ' - 'WHERE table1.col1 = table2.col1) WHERE ' - 'table1.col2 = col2') - # dont correlate in a FROM entry - self.assert_compile(select([t1], t1.c.col2 == s.c.col2), - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1, (SELECT ' - 'table2.col1 AS col1, table2.col2 AS col2, ' - 'table2.col3 AS col3 FROM table2, table1 WHERE ' - 'table1.col1 = table2.col1) WHERE ' - 'table1.col2 = col2') - - # but correlate in a WHERE entry - s_w = select([t2.c.col1]).where(t1.c.col1 == t2.c.col1) - self.assert_compile(select([t1], t1.c.col2 == s_w), - 'SELECT table1.col1, table1.col2, table1.col3 ' - 'FROM table1 WHERE table1.col2 = ' - '(SELECT table2.col1 FROM table2 ' - 'WHERE table1.col1 = table2.col1)' - ) - - - s4 = s3.correlate(t1) - self.assert_compile(select([t1], t1.c.col2 == s4.c.col2), - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1, (SELECT ' - 'table2.col1 AS col1, table2.col2 AS col2, ' - 'table2.col3 AS col3 FROM table2 WHERE ' - 'table1.col1 = table2.col1) WHERE ' - 'table1.col2 = col2') - - self.assert_compile(select([t1], t1.c.col2 == s3.c.col2), - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1, (SELECT ' - 'table2.col1 AS col1, table2.col2 AS col2, ' - 'table2.col3 AS col3 FROM table2, table1 ' - 'WHERE table1.col1 = table2.col1) WHERE ' - 'table1.col2 = col2') - - self.assert_compile(t1.select().where(t1.c.col1 - == 5).order_by(t1.c.col3), - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1 WHERE table1.col1 ' - '= :col1_1 ORDER BY table1.col3') - - # dont correlate in FROM - self.assert_compile(t1.select().select_from(select([t2], - t2.c.col1 - == t1.c.col1)).order_by(t1.c.col3), - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1, (SELECT ' - 'table2.col1 AS col1, table2.col2 AS col2, ' - 'table2.col3 AS col3 FROM table2, table1 WHERE ' - 'table2.col1 = table1.col1) ORDER BY ' - 'table1.col3') - - # still works if you actually add that table to correlate() - s = select([t2], t2.c.col1 == t1.c.col1) - s = s.correlate(t1).order_by(t2.c.col3) - - self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1, (SELECT ' - 'table2.col1 AS col1, table2.col2 AS col2, ' - 'table2.col3 AS col3 FROM table2 WHERE ' - 'table2.col1 = table1.col1 ORDER BY ' - 'table2.col3) ORDER BY table1.col3') def test_prefixes(self): s = t1.select() |