diff options
Diffstat (limited to 'test/sql/test_selectable.py')
| -rw-r--r-- | test/sql/test_selectable.py | 591 |
1 files changed, 332 insertions, 259 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 2537c4896..062ed5f1b 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1,6 +1,7 @@ """Test various algorithmic properties of selectables.""" -from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.test.testing import eq_, assert_raises, \ + assert_raises_message from sqlalchemy import * from sqlalchemy.test import * from sqlalchemy.sql import util as sql_util, visitors @@ -24,39 +25,46 @@ table2 = Table('table2', metadata, Column('coly', Integer), ) + class SelectableTest(TestBase, AssertsExecutionResults): + def test_distance_on_labels(self): + # same column three times - s = select([table1.c.col1.label('c2'), table1.c.col1, table1.c.col1.label('c1')]) - # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far - #assert s.corresponding_column(table1.c.col1) is s.c.col1 + s = select([table1.c.col1.label('c2'), table1.c.col1, + table1.c.col1.label('c1')]) + + # didnt do this yet...col.label().make_proxy() has same + # "distance" as col.make_proxy() so far assert + # s.corresponding_column(table1.c.col1) is s.c.col1 + assert s.corresponding_column(s.c.col1) is s.c.col1 assert s.corresponding_column(s.c.c1) is s.c.c1 def test_distance_on_aliases(self): a1 = table1.alias('a1') - - for s in ( - select([a1, table1], use_labels=True), - select([table1, a1], use_labels=True) - ): - assert s.corresponding_column(table1.c.col1) is s.c.table1_col1 + for s in (select([a1, table1], use_labels=True), + select([table1, a1], use_labels=True)): + assert s.corresponding_column(table1.c.col1) \ + is s.c.table1_col1 assert s.corresponding_column(a1.c.col1) is s.c.a1_col1 - def test_join_against_self(self): jj = select([table1.c.col1.label('bar_col1')]) - jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1) + jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) # test column directly agaisnt itself - assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 + assert jjj.corresponding_column(jjj.c.table1_col1) \ + is jjj.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1 # test alias of the join + j2 = jjj.alias('foo') - assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1 + assert j2.corresponding_column(table1.c.col1) \ + is j2.c.table1_col1 def test_against_cloned_non_table(self): # test that corresponding column digs across @@ -73,20 +81,27 @@ class SelectableTest(TestBase, AssertsExecutionResults): def test_select_on_table(self): sel = select([table1, table2], use_labels=True) - assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1 - assert sel.corresponding_column(table1.c.col1, require_embedded=True) is sel.c.table1_col1 - assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1 - assert table1.corresponding_column(sel.c.table1_col1, require_embedded=True) is None - def test_join_against_join(self): - j = outerjoin(table1, table2, table1.c.col1==table2.c.col2) - jj = select([ table1.c.col1.label('bar_col1')],from_obj=[j]).alias('foo') - jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1) - assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 + assert sel.corresponding_column(table1.c.col1) \ + is sel.c.table1_col1 + assert sel.corresponding_column(table1.c.col1, + require_embedded=True) is sel.c.table1_col1 + assert table1.corresponding_column(sel.c.table1_col1) \ + is table1.c.col1 + assert table1.corresponding_column(sel.c.table1_col1, + require_embedded=True) is None - j2 = jjj.alias('foo') - assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1 + def test_join_against_join(self): + j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2) + jj = select([table1.c.col1.label('bar_col1')], + from_obj=[j]).alias('foo') + jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) + assert jjj.corresponding_column(jjj.c.table1_col1) \ + is jjj.c.table1_col1 + j2 = jjj.alias('foo') + assert j2.corresponding_column(jjj.c.table1_col1) \ + is j2.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1 def test_table_alias(self): @@ -97,12 +112,18 @@ class SelectableTest(TestBase, AssertsExecutionResults): criterion = a.c.col1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) + def test_union(self): - # tests that we can correspond a column in a Select statement with a certain Table, against - # a column in a Union where one of its underlying Selects matches to that same Table - u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ) + + # tests that we can correspond a column in a Select statement + # with a certain Table, against a column in a Union where one of + # its underlying Selects matches to that same Table + + u = select([table1.c.col1, table1.c.col2, table1.c.col3, + table1.c.colx, null().label('coly' + )]).union(select([table2.c.col1, table2.c.col2, + table2.c.col3, null().label('colx'), + table2.c.coly])) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) c = u.corresponding_column(s1.c.table1_col2) @@ -128,19 +149,25 @@ class SelectableTest(TestBase, AssertsExecutionResults): assert u1.corresponding_column(table1.c.colx) is u1.c.col2 assert u1.corresponding_column(table1.c.col3) is u1.c.col1 - def test_singular_union(self): - u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select([table1.c.col1, table1.c.col2, table1.c.col3])) + def test_singular_union(self): + u = union(select([table1.c.col1, table1.c.col2, + table1.c.col3]), select([table1.c.col1, + table1.c.col2, table1.c.col3])) u = union(select([table1.c.col1, table1.c.col2, table1.c.col3])) assert u.c.col1 is not None assert u.c.col2 is not None assert u.c.col3 is not None - + def test_alias_union(self): + # same as testunion, except its an alias of the union - u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') + + u = select([table1.c.col1, table1.c.col2, table1.c.col3, + table1.c.colx, null().label('coly' + )]).union(select([table2.c.col1, table2.c.col2, + table2.c.col3, null().label('colx'), + table2.c.coly])).alias('analias') s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 @@ -149,10 +176,14 @@ class SelectableTest(TestBase, AssertsExecutionResults): assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly def test_select_union(self): + # like testaliasunion, but off a Select off the union. - u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') + + u = select([table1.c.col1, table1.c.col2, table1.c.col3, + table1.c.colx, null().label('coly' + )]).union(select([table2.c.col1, table2.c.col2, + table2.c.col3, null().label('colx'), + table2.c.coly])).alias('analias') s = select([u]) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) @@ -160,10 +191,14 @@ class SelectableTest(TestBase, AssertsExecutionResults): assert s.corresponding_column(s2.c.table2_col2) is s.c.col2 def test_union_against_join(self): + # same as testunion, except its an alias of the union - u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') + + u = select([table1.c.col1, table1.c.col2, table1.c.col3, + table1.c.colx, null().label('coly' + )]).union(select([table2.c.col1, table2.c.col2, + table2.c.col3, null().label('colx'), + table2.c.coly])).alias('analias') j1 = table1.join(table2) assert u.corresponding_column(j1.c.table1_colx) is u.c.colx assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx @@ -191,8 +226,11 @@ class SelectableTest(TestBase, AssertsExecutionResults): criterion = a.c.table1_col1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) + def test_column_labels(self): - a = select([table1.c.col1.label('acol1'), table1.c.col2.label('acol2'), table1.c.col3.label('acol3')]) + a = select([table1.c.col1.label('acol1'), + table1.c.col2.label('acol2'), + table1.c.col3.label('acol3')]) j = join(a, table2) criterion = a.c.acol1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) @@ -243,31 +281,31 @@ class SelectableTest(TestBase, AssertsExecutionResults): assert_raises(exc.NoReferencedTableError, s.join, t1) + def test_join_condition(self): m = MetaData() t1 = Table('t1', m, Column('id', Integer)) - t2 = Table('t2', m, Column('id', Integer), Column('t1id', ForeignKey('t1.id'))) - t3 = Table('t3', m, Column('id', Integer), - Column('t1id', ForeignKey('t1.id')), - Column('t2id', ForeignKey('t2.id'))) - t4 = Table('t4', m, Column('id', Integer), Column('t2id', ForeignKey('t2.id'))) - + t2 = Table('t2', m, Column('id', Integer), Column('t1id', + ForeignKey('t1.id'))) + t3 = Table('t3', m, Column('id', Integer), Column('t1id', + ForeignKey('t1.id')), Column('t2id', + ForeignKey('t2.id'))) + t4 = Table('t4', m, Column('id', Integer), Column('t2id', + ForeignKey('t2.id'))) t1t2 = t1.join(t2) t2t3 = t2.join(t3) - - for left, right, a_subset, expected in [ - (t1, t2, None, t1.c.id==t2.c.t1id), - (t1t2, t3, t2, t1t2.c.t2_id==t3.c.t2id), - (t2t3, t1, t3, t1.c.id==t3.c.t1id), - (t2t3, t4, None, t2t3.c.t2_id==t4.c.t2id), - (t2t3, t4, t3, t2t3.c.t2_id==t4.c.t2id), - (t2t3.join(t1), t4, None, t2t3.c.t2_id==t4.c.t2id), - (t2t3.join(t1), t4, t1, t2t3.c.t2_id==t4.c.t2id), - (t1t2, t2t3, t2, t1t2.c.t2_id==t2t3.c.t3_t2id), - ]: - assert expected.compare( - sql_util.join_condition(left, right, a_subset=a_subset) - ) + for (left, right, a_subset, expected) in [ + (t1, t2, None, t1.c.id == t2.c.t1id), + (t1t2, t3, t2, t1t2.c.t2_id == t3.c.t2id), + (t2t3, t1, t3, t1.c.id == t3.c.t1id), + (t2t3, t4, None, t2t3.c.t2_id == t4.c.t2id), + (t2t3, t4, t3, t2t3.c.t2_id == t4.c.t2id), + (t2t3.join(t1), t4, None, t2t3.c.t2_id == t4.c.t2id), + (t2t3.join(t1), t4, t1, t2t3.c.t2_id == t4.c.t2id), + (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id), + ]: + assert expected.compare(sql_util.join_condition(left, + right, a_subset=a_subset)) # these are ambiguous, or have no joins for left, right, a_subset in [ @@ -298,35 +336,39 @@ class SelectableTest(TestBase, AssertsExecutionResults): left.join(right).onclause ) - # TODO: this raises due to right side being "grouped", - # and no longer has FKs. Did we want to make - # _FromGrouping friendlier ? - assert_raises_message( - exc.ArgumentError, - r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?", - t1t2.join, t2t3 - ) - assert_raises_message( - exc.ArgumentError, - r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?", - t1t2.join, t2t3.select(use_labels=True) - ) + + # TODO: this raises due to right side being "grouped", and no + # longer has FKs. Did we want to make _FromGrouping friendlier + # ? + + assert_raises_message(exc.ArgumentError, + "Perhaps you meant to convert the right " + "side to a subquery using alias\(\)\?", + t1t2.join, t2t3) + assert_raises_message(exc.ArgumentError, + "Perhaps you meant to convert the right " + "side to a subquery using alias\(\)\?", + t1t2.join, t2t3.select(use_labels=True)) + class PrimaryKeyTest(TestBase, AssertsExecutionResults): + def test_join_pk_collapse_implicit(self): - """test that redundant columns in a join get 'collapsed' into a minimal primary key, - which is the root column along a chain of foreign key relationships.""" + """test that redundant columns in a join get 'collapsed' into a + minimal primary key, which is the root column along a chain of + foreign key relationships.""" meta = MetaData() a = Table('a', meta, Column('id', Integer, primary_key=True)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True)) - + b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), + primary_key=True)) + c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), + primary_key=True)) + d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), + primary_key=True)) assert c.c.id.references(b.c.id) assert not d.c.id.references(a.c.id) - assert list(a.join(b).primary_key) == [a.c.id] assert list(b.join(c).primary_key) == [b.c.id] assert list(a.join(b).join(c).primary_key) == [a.c.id] @@ -334,26 +376,36 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults): assert list(d.join(c).join(b).primary_key) == [b.c.id] assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id] + def test_join_pk_collapse_explicit(self): - """test that redundant columns in a join get 'collapsed' into a minimal primary key, - which is the root column along a chain of explicit join conditions.""" + """test that redundant columns in a join get 'collapsed' into a + minimal primary key, which is the root column along a chain of + explicit join conditions.""" meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer)) - - print list(a.join(b, a.c.x==b.c.id).primary_key) - assert list(a.join(b, a.c.x==b.c.id).primary_key) == [a.c.id] - assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id] - assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id] - assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [b.c.id] - assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id] - assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [b.c.id] - assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id] - - assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id] + a = Table('a', meta, Column('id', Integer, primary_key=True), + Column('x', Integer)) + b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), + primary_key=True), Column('x', Integer)) + c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), + primary_key=True), Column('x', Integer)) + d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), + primary_key=True), Column('x', Integer)) + print list(a.join(b, a.c.x == b.c.id).primary_key) + assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id] + assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id] + assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) \ + == [a.c.id] + assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) \ + == [b.c.id] + assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \ + == [b.c.id] + assert list(d.join(b, d.c.id == b.c.id).join(c, b.c.id + == c.c.x).primary_key) == [b.c.id] + assert list(a.join(b).join(c, c.c.id + == b.c.x).join(d).primary_key) == [a.c.id] + assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x + == b.c.id)).primary_key) == [a.c.id] def test_init_doesnt_blowitaway(self): meta = MetaData() @@ -391,19 +443,17 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults): Column('id', Integer, primary_key= True), ) - engineer = Table( 'Engineer', metadata, - Column('id', Integer, ForeignKey( 'Employee.id', ), primary_key=True), - ) + engineer = Table('Engineer', metadata, + Column('id', Integer, + ForeignKey('Employee.id'), primary_key=True)) - eq_( - util.column_set(employee.join(engineer, employee.c.id==engineer.c.id).primary_key), - util.column_set([employee.c.id]) - ) - eq_( - util.column_set(employee.join(engineer, engineer.c.id==employee.c.id).primary_key), - util.column_set([employee.c.id]) - ) + eq_(util.column_set(employee.join(engineer, employee.c.id + == engineer.c.id).primary_key), + util.column_set([employee.c.id])) + eq_(util.column_set(employee.join(engineer, engineer.c.id + == employee.c.id).primary_key), + util.column_set([employee.c.id])) class ReduceTest(TestBase, AssertsExecutionResults): @@ -419,164 +469,171 @@ class ReduceTest(TestBase, AssertsExecutionResults): Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True), Column('t3data', String(30))) - - eq_( - util.column_set(sql_util.reduce_columns([ - t1.c.t1id, t1.c.t1data, t2.c.t2id, - t2.c.t2data, t3.c.t3id, t3.c.t3data])), - util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data]) - ) + eq_(util.column_set(sql_util.reduce_columns([ + t1.c.t1id, + t1.c.t1data, + t2.c.t2id, + t2.c.t2data, + t3.c.t3id, + t3.c.t3data, + ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, + t3.c.t3data])) + def test_reduce_selectable(self): - metadata = MetaData() - - engineers = Table('engineers', metadata, - Column('engineer_id', Integer, primary_key=True), - Column('engineer_name', String(50)), - ) - - managers = Table('managers', metadata, - Column('manager_id', Integer, primary_key=True), - Column('manager_name', String(50)) - ) - - s = select([engineers, managers]).where(engineers.c.engineer_name==managers.c.manager_name) - - eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)), - util.column_set([s.c.engineer_id, s.c.engineer_name, s.c.manager_id]) - ) + metadata = MetaData() + engineers = Table('engineers', metadata, Column('engineer_id', + Integer, primary_key=True), + Column('engineer_name', String(50))) + managers = Table('managers', metadata, Column('manager_id', + Integer, primary_key=True), + Column('manager_name', String(50))) + s = select([engineers, + managers]).where(engineers.c.engineer_name + == managers.c.manager_name) + eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)), + util.column_set([s.c.engineer_id, s.c.engineer_name, + s.c.manager_id])) + def test_reduce_aliased_join(self): metadata = MetaData() - people = Table('people', metadata, - Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True), - Column('name', String(50)), - Column('type', String(30))) - - engineers = Table('engineers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), - Column('status', String(30)), - Column('engineer_name', String(50)), - Column('primary_language', String(50)), - ) - - managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), - Column('status', String(30)), - Column('manager_name', String(50)) - ) - - pjoin = people.outerjoin(engineers).\ - outerjoin(managers).select(use_labels=True).\ - alias('pjoin') - eq_( - util.column_set(sql_util.reduce_columns([ - pjoin.c.people_person_id, pjoin.c.engineers_person_id, - pjoin.c.managers_person_id])), - util.column_set([pjoin.c.people_person_id]) - ) + people = Table('people', metadata, Column('person_id', Integer, + Sequence('person_id_seq', optional=True), + primary_key=True), Column('name', String(50)), + Column('type', String(30))) + engineers = Table( + 'engineers', + metadata, + Column('person_id', Integer, ForeignKey('people.person_id' + ), primary_key=True), + Column('status', String(30)), + Column('engineer_name', String(50)), + Column('primary_language', String(50)), + ) + managers = Table('managers', metadata, Column('person_id', + Integer, ForeignKey('people.person_id'), + primary_key=True), Column('status', + String(30)), Column('manager_name', + String(50))) + pjoin = \ + people.outerjoin(engineers).outerjoin(managers).\ + select(use_labels=True).alias('pjoin' + ) + eq_(util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id, + pjoin.c.engineers_person_id, pjoin.c.managers_person_id])), + util.column_set([pjoin.c.people_person_id])) def test_reduce_aliased_union(self): metadata = MetaData() - item_table = Table( - 'item', metadata, - Column('id', Integer, ForeignKey('base_item.id'), primary_key=True), - Column('dummy', Integer, default=0)) - base_item_table = Table( - 'base_item', metadata, - Column('id', Integer, primary_key=True), - Column('child_name', String(255), default=None)) - + item_table = Table('item', metadata, Column('id', Integer, + ForeignKey('base_item.id'), + primary_key=True), Column('dummy', Integer, + default=0)) + base_item_table = Table('base_item', metadata, Column('id', + Integer, primary_key=True), + Column('child_name', String(255), + default=None)) from sqlalchemy.orm.util import polymorphic_union - - item_join = polymorphic_union( { - 'BaseItem':base_item_table.select(base_item_table.c.child_name=='BaseItem'), - 'Item':base_item_table.join(item_table), - }, None, 'item_join') - - eq_( - util.column_set(sql_util.reduce_columns([ - item_join.c.id, item_join.c.dummy, item_join.c.child_name - ])), - util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name]) - ) + item_join = polymorphic_union({ + 'BaseItem': + base_item_table.select( + base_item_table.c.child_name + == 'BaseItem'), + 'Item': base_item_table.join(item_table)}, + None, 'item_join') + eq_(util.column_set(sql_util.reduce_columns([item_join.c.id, + item_join.c.dummy, item_join.c.child_name])), + util.column_set([item_join.c.id, item_join.c.dummy, + item_join.c.child_name])) + def test_reduce_aliased_union_2(self): metadata = MetaData() - - page_table = Table('page', metadata, - Column('id', Integer, primary_key=True), - ) + page_table = Table('page', metadata, Column('id', Integer, + primary_key=True)) magazine_page_table = Table('magazine_page', metadata, - Column('page_id', Integer, ForeignKey('page.id'), primary_key=True), - ) + Column('page_id', Integer, + ForeignKey('page.id'), + primary_key=True)) classified_page_table = Table('classified_page', metadata, - Column('magazine_page_id', Integer, - ForeignKey('magazine_page.page_id'), primary_key=True), - ) - - # this is essentially the union formed by the ORM's polymorphic_union function. - # we define two versions with different ordering of selects. + Column('magazine_page_id', Integer, + ForeignKey('magazine_page.page_id'), primary_key=True)) + + # this is essentially the union formed by the ORM's + # polymorphic_union function. we define two versions with + # different ordering of selects. + # + # the first selectable has the "real" column + # classified_page.magazine_page_id - # the first selectable has the "real" column classified_page.magazine_page_id pjoin = union( - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]).select_from(page_table.join(magazine_page_table).join(classified_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]).select_from(page_table.join(magazine_page_table)), - ).alias('pjoin') - - eq_( - util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id]) - ) + select([ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id + ]). + select_from( + page_table.join(magazine_page_table). + join(classified_page_table)), + + select([ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label('magazine_page_id') + ]). + select_from(page_table.join(magazine_page_table)) + ).alias('pjoin') + eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id, + pjoin.c.page_id, pjoin.c.magazine_page_id])), + util.column_set([pjoin.c.id])) # the first selectable has a CAST, which is a placeholder for - # classified_page.magazine_page_id in the second selectable. reduce_columns - # needs to take into account all foreign keys derived from pjoin.c.magazine_page_id. - # the UNION construct currently makes the external column look like that of the first - # selectable only. - pjoin = union( - select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]).select_from(page_table.join(magazine_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]).select_from(page_table.join(magazine_page_table).join(classified_page_table)) - ).alias('pjoin') - - eq_( - util.column_set(sql_util.reduce_columns([ - pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id]) - ) + # classified_page.magazine_page_id in the second selectable. + # reduce_columns needs to take into account all foreign keys + # derived from pjoin.c.magazine_page_id. the UNION construct + # currently makes the external column look like that of the + # first selectable only. + + pjoin = union(select([ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label('magazine_page_id') + ]). + select_from(page_table.join(magazine_page_table)), + + select([ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id + ]). + select_from(page_table.join(magazine_page_table). + join(classified_page_table)) + ).alias('pjoin') + eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id, + pjoin.c.page_id, pjoin.c.magazine_page_id])), + util.column_set([pjoin.c.id])) class DerivedTest(TestBase, AssertsExecutionResults): def test_table(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) + + t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) + t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) assert t1.is_derived_from(t1) assert not t2.is_derived_from(t1) + def test_alias(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) + t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) + t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) assert t1.alias().is_derived_from(t1) assert not t2.alias().is_derived_from(t1) @@ -585,8 +642,11 @@ class DerivedTest(TestBase, AssertsExecutionResults): def test_select(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) + + t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) + t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) assert t1.select().is_derived_from(t1) assert not t2.select().is_derived_from(t1) @@ -623,14 +683,23 @@ class AnnotationsTest(TestBase): t1 = s1._annotate({}) t2 = s1 - # t1 needs to share the same _make_proxy() columns as t2, even though it's - # annotated. otherwise paths will diverge once they are corresponded against "inner" below. + # t1 needs to share the same _make_proxy() columns as t2, even + # though it's annotated. otherwise paths will diverge once they + # are corresponded against "inner" below. + assert t1.c is t2.c assert t1.c.col1 is t2.c.col1 inner = select([s1]) - assert inner.corresponding_column(t2.c.col1, require_embedded=False) is inner.corresponding_column(t2.c.col1, require_embedded=True) is inner.c.col1 - assert inner.corresponding_column(t1.c.col1, require_embedded=False) is inner.corresponding_column(t1.c.col1, require_embedded=True) is inner.c.col1 + + assert inner.corresponding_column(t2.c.col1, + require_embedded=False) \ + is inner.corresponding_column(t2.c.col1, + require_embedded=True) is inner.c.col1 + assert inner.corresponding_column(t1.c.col1, + require_embedded=False) \ + is inner.corresponding_column(t1.c.col1, + require_embedded=True) is inner.c.col1 def test_annotated_visit(self): table1 = table('table1', column("col1"), column("col2")) @@ -643,8 +712,10 @@ class AnnotationsTest(TestBase): b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary}) assert str(b2) == "table1.col1 = table1.col2" - b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':visit_binary}) - assert str(b3) == "table1.col1 = table1.col2" + + b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary' + : visit_binary}) + assert str(b3) == 'table1.col1 = table1.col2' def visit_binary(b): b.left = bindparam('bar') @@ -655,19 +726,20 @@ class AnnotationsTest(TestBase): b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary}) assert str(b5) == ":bar = table1.col2" + def test_annotate_expressions(self): - table1 = table('table1', column("col1"), column("col2")) - - for expr, expected in [ - (table1.c.col1, "table1.col1"), - (table1.c.col1 == 5, "table1.col1 = :col1_1"), - (table1.c.col1.in_([2,3,4]), "table1.col1 IN (:col1_1, :col1_2, :col1_3)") - ]: + table1 = table('table1', column('col1'), column('col2')) + for expr, expected in [(table1.c.col1, 'table1.col1'), + (table1.c.col1 == 5, + 'table1.col1 = :col1_1'), + (table1.c.col1.in_([2, 3, 4]), + 'table1.col1 IN (:col1_1, :col1_2, ' + ':col1_3)')]: eq_(str(expr), expected) eq_(str(expr._annotate({})), expected) eq_(str(sql_util._deep_annotate(expr, {})), expected) - eq_(str(sql_util._deep_annotate(expr, {}, exclude=[table1.c.col1])), expected) - + eq_(str(sql_util._deep_annotate(expr, {}, + exclude=[table1.c.col1])), expected) def test_deannotate(self): table1 = table('table1', column("col1"), column("col2")) @@ -681,9 +753,10 @@ class AnnotationsTest(TestBase): for elem in (b2._annotations, b2.left._annotations): assert '_orm_adapt' in elem - for elem in (b3._annotations, b3.left._annotations, b4._annotations, b4.left._annotations): + for elem in b3._annotations, b3.left._annotations, \ + b4._annotations, b4.left._annotations: assert elem == {} - + assert b2.left is not bin.left assert b3.left is not b2.left is not bin.left assert b4.left is bin.left # since column is immutable |
