summaryrefslogtreecommitdiff
path: root/test/sql/test_selectable.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-07-10 14:50:13 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2010-07-10 14:50:13 -0400
commit5cce6bf2a86cb318d80953cd3712fcb77bbc52db (patch)
tree3e667a37d6efe7b8e0a328665b5bad1583fb95b4 /test/sql/test_selectable.py
parent63249b6560a65920cb6ce12829e54a58ef3bd155 (diff)
downloadsqlalchemy-5cce6bf2a86cb318d80953cd3712fcb77bbc52db.tar.gz
- experimenting with pytidy with mods as a textmate plugin along
the path to 78 chars. eh
Diffstat (limited to 'test/sql/test_selectable.py')
-rw-r--r--test/sql/test_selectable.py591
1 files changed, 332 insertions, 259 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 2537c4896..062ed5f1b 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -1,6 +1,7 @@
"""Test various algorithmic properties of selectables."""
-from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.test.testing import eq_, assert_raises, \
+ assert_raises_message
from sqlalchemy import *
from sqlalchemy.test import *
from sqlalchemy.sql import util as sql_util, visitors
@@ -24,39 +25,46 @@ table2 = Table('table2', metadata,
Column('coly', Integer),
)
+
class SelectableTest(TestBase, AssertsExecutionResults):
+
def test_distance_on_labels(self):
+
# same column three times
- s = select([table1.c.col1.label('c2'), table1.c.col1, table1.c.col1.label('c1')])
- # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far
- #assert s.corresponding_column(table1.c.col1) is s.c.col1
+ s = select([table1.c.col1.label('c2'), table1.c.col1,
+ table1.c.col1.label('c1')])
+
+ # didnt do this yet...col.label().make_proxy() has same
+ # "distance" as col.make_proxy() so far assert
+ # s.corresponding_column(table1.c.col1) is s.c.col1
+
assert s.corresponding_column(s.c.col1) is s.c.col1
assert s.corresponding_column(s.c.c1) is s.c.c1
def test_distance_on_aliases(self):
a1 = table1.alias('a1')
-
- for s in (
- select([a1, table1], use_labels=True),
- select([table1, a1], use_labels=True)
- ):
- assert s.corresponding_column(table1.c.col1) is s.c.table1_col1
+ for s in (select([a1, table1], use_labels=True),
+ select([table1, a1], use_labels=True)):
+ assert s.corresponding_column(table1.c.col1) \
+ is s.c.table1_col1
assert s.corresponding_column(a1.c.col1) is s.c.a1_col1
-
def test_join_against_self(self):
jj = select([table1.c.col1.label('bar_col1')])
- jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
+ jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1)
# test column directly agaisnt itself
- assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+ assert jjj.corresponding_column(jjj.c.table1_col1) \
+ is jjj.c.table1_col1
assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
# test alias of the join
+
j2 = jjj.alias('foo')
- assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1
+ assert j2.corresponding_column(table1.c.col1) \
+ is j2.c.table1_col1
def test_against_cloned_non_table(self):
# test that corresponding column digs across
@@ -73,20 +81,27 @@ class SelectableTest(TestBase, AssertsExecutionResults):
def test_select_on_table(self):
sel = select([table1, table2], use_labels=True)
- assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1
- assert sel.corresponding_column(table1.c.col1, require_embedded=True) is sel.c.table1_col1
- assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1
- assert table1.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
- def test_join_against_join(self):
- j = outerjoin(table1, table2, table1.c.col1==table2.c.col2)
- jj = select([ table1.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
- jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
- assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+ assert sel.corresponding_column(table1.c.col1) \
+ is sel.c.table1_col1
+ assert sel.corresponding_column(table1.c.col1,
+ require_embedded=True) is sel.c.table1_col1
+ assert table1.corresponding_column(sel.c.table1_col1) \
+ is table1.c.col1
+ assert table1.corresponding_column(sel.c.table1_col1,
+ require_embedded=True) is None
- j2 = jjj.alias('foo')
- assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
+ def test_join_against_join(self):
+ j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2)
+ jj = select([table1.c.col1.label('bar_col1')],
+ from_obj=[j]).alias('foo')
+ jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1)
+ assert jjj.corresponding_column(jjj.c.table1_col1) \
+ is jjj.c.table1_col1
+ j2 = jjj.alias('foo')
+ assert j2.corresponding_column(jjj.c.table1_col1) \
+ is j2.c.table1_col1
assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
def test_table_alias(self):
@@ -97,12 +112,18 @@ class SelectableTest(TestBase, AssertsExecutionResults):
criterion = a.c.col1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
+
def test_union(self):
- # tests that we can correspond a column in a Select statement with a certain Table, against
- # a column in a Union where one of its underlying Selects matches to that same Table
- u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
- )
+
+ # tests that we can correspond a column in a Select statement
+ # with a certain Table, against a column in a Union where one of
+ # its underlying Selects matches to that same Table
+
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3,
+ table1.c.colx, null().label('coly'
+ )]).union(select([table2.c.col1, table2.c.col2,
+ table2.c.col3, null().label('colx'),
+ table2.c.coly]))
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
c = u.corresponding_column(s1.c.table1_col2)
@@ -128,19 +149,25 @@ class SelectableTest(TestBase, AssertsExecutionResults):
assert u1.corresponding_column(table1.c.colx) is u1.c.col2
assert u1.corresponding_column(table1.c.col3) is u1.c.col1
- def test_singular_union(self):
- u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select([table1.c.col1, table1.c.col2, table1.c.col3]))
+ def test_singular_union(self):
+ u = union(select([table1.c.col1, table1.c.col2,
+ table1.c.col3]), select([table1.c.col1,
+ table1.c.col2, table1.c.col3]))
u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]))
assert u.c.col1 is not None
assert u.c.col2 is not None
assert u.c.col3 is not None
-
+
def test_alias_union(self):
+
# same as testunion, except its an alias of the union
- u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
- ).alias('analias')
+
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3,
+ table1.c.colx, null().label('coly'
+ )]).union(select([table2.c.col1, table2.c.col2,
+ table2.c.col3, null().label('colx'),
+ table2.c.coly])).alias('analias')
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
@@ -149,10 +176,14 @@ class SelectableTest(TestBase, AssertsExecutionResults):
assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly
def test_select_union(self):
+
# like testaliasunion, but off a Select off the union.
- u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
- ).alias('analias')
+
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3,
+ table1.c.colx, null().label('coly'
+ )]).union(select([table2.c.col1, table2.c.col2,
+ table2.c.col3, null().label('colx'),
+ table2.c.coly])).alias('analias')
s = select([u])
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
@@ -160,10 +191,14 @@ class SelectableTest(TestBase, AssertsExecutionResults):
assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
def test_union_against_join(self):
+
# same as testunion, except its an alias of the union
- u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
- ).alias('analias')
+
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3,
+ table1.c.colx, null().label('coly'
+ )]).union(select([table2.c.col1, table2.c.col2,
+ table2.c.col3, null().label('colx'),
+ table2.c.coly])).alias('analias')
j1 = table1.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
@@ -191,8 +226,11 @@ class SelectableTest(TestBase, AssertsExecutionResults):
criterion = a.c.table1_col1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
+
def test_column_labels(self):
- a = select([table1.c.col1.label('acol1'), table1.c.col2.label('acol2'), table1.c.col3.label('acol3')])
+ a = select([table1.c.col1.label('acol1'),
+ table1.c.col2.label('acol2'),
+ table1.c.col3.label('acol3')])
j = join(a, table2)
criterion = a.c.acol1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
@@ -243,31 +281,31 @@ class SelectableTest(TestBase, AssertsExecutionResults):
assert_raises(exc.NoReferencedTableError, s.join, t1)
+
def test_join_condition(self):
m = MetaData()
t1 = Table('t1', m, Column('id', Integer))
- t2 = Table('t2', m, Column('id', Integer), Column('t1id', ForeignKey('t1.id')))
- t3 = Table('t3', m, Column('id', Integer),
- Column('t1id', ForeignKey('t1.id')),
- Column('t2id', ForeignKey('t2.id')))
- t4 = Table('t4', m, Column('id', Integer), Column('t2id', ForeignKey('t2.id')))
-
+ t2 = Table('t2', m, Column('id', Integer), Column('t1id',
+ ForeignKey('t1.id')))
+ t3 = Table('t3', m, Column('id', Integer), Column('t1id',
+ ForeignKey('t1.id')), Column('t2id',
+ ForeignKey('t2.id')))
+ t4 = Table('t4', m, Column('id', Integer), Column('t2id',
+ ForeignKey('t2.id')))
t1t2 = t1.join(t2)
t2t3 = t2.join(t3)
-
- for left, right, a_subset, expected in [
- (t1, t2, None, t1.c.id==t2.c.t1id),
- (t1t2, t3, t2, t1t2.c.t2_id==t3.c.t2id),
- (t2t3, t1, t3, t1.c.id==t3.c.t1id),
- (t2t3, t4, None, t2t3.c.t2_id==t4.c.t2id),
- (t2t3, t4, t3, t2t3.c.t2_id==t4.c.t2id),
- (t2t3.join(t1), t4, None, t2t3.c.t2_id==t4.c.t2id),
- (t2t3.join(t1), t4, t1, t2t3.c.t2_id==t4.c.t2id),
- (t1t2, t2t3, t2, t1t2.c.t2_id==t2t3.c.t3_t2id),
- ]:
- assert expected.compare(
- sql_util.join_condition(left, right, a_subset=a_subset)
- )
+ for (left, right, a_subset, expected) in [
+ (t1, t2, None, t1.c.id == t2.c.t1id),
+ (t1t2, t3, t2, t1t2.c.t2_id == t3.c.t2id),
+ (t2t3, t1, t3, t1.c.id == t3.c.t1id),
+ (t2t3, t4, None, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3, t4, t3, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3.join(t1), t4, None, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3.join(t1), t4, t1, t2t3.c.t2_id == t4.c.t2id),
+ (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id),
+ ]:
+ assert expected.compare(sql_util.join_condition(left,
+ right, a_subset=a_subset))
# these are ambiguous, or have no joins
for left, right, a_subset in [
@@ -298,35 +336,39 @@ class SelectableTest(TestBase, AssertsExecutionResults):
left.join(right).onclause
)
- # TODO: this raises due to right side being "grouped",
- # and no longer has FKs. Did we want to make
- # _FromGrouping friendlier ?
- assert_raises_message(
- exc.ArgumentError,
- r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?",
- t1t2.join, t2t3
- )
- assert_raises_message(
- exc.ArgumentError,
- r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?",
- t1t2.join, t2t3.select(use_labels=True)
- )
+
+ # TODO: this raises due to right side being "grouped", and no
+ # longer has FKs. Did we want to make _FromGrouping friendlier
+ # ?
+
+ assert_raises_message(exc.ArgumentError,
+ "Perhaps you meant to convert the right "
+ "side to a subquery using alias\(\)\?",
+ t1t2.join, t2t3)
+ assert_raises_message(exc.ArgumentError,
+ "Perhaps you meant to convert the right "
+ "side to a subquery using alias\(\)\?",
+ t1t2.join, t2t3.select(use_labels=True))
+
class PrimaryKeyTest(TestBase, AssertsExecutionResults):
+
def test_join_pk_collapse_implicit(self):
- """test that redundant columns in a join get 'collapsed' into a minimal primary key,
- which is the root column along a chain of foreign key relationships."""
+ """test that redundant columns in a join get 'collapsed' into a
+ minimal primary key, which is the root column along a chain of
+ foreign key relationships."""
meta = MetaData()
a = Table('a', meta, Column('id', Integer, primary_key=True))
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))
- c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True))
- d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True))
-
+ b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'),
+ primary_key=True))
+ c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'),
+ primary_key=True))
+ d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'),
+ primary_key=True))
assert c.c.id.references(b.c.id)
assert not d.c.id.references(a.c.id)
-
assert list(a.join(b).primary_key) == [a.c.id]
assert list(b.join(c).primary_key) == [b.c.id]
assert list(a.join(b).join(c).primary_key) == [a.c.id]
@@ -334,26 +376,36 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults):
assert list(d.join(c).join(b).primary_key) == [b.c.id]
assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
+
def test_join_pk_collapse_explicit(self):
- """test that redundant columns in a join get 'collapsed' into a minimal primary key,
- which is the root column along a chain of explicit join conditions."""
+ """test that redundant columns in a join get 'collapsed' into a
+ minimal primary key, which is the root column along a chain of
+ explicit join conditions."""
meta = MetaData()
- a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))
- c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer))
- d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer))
-
- print list(a.join(b, a.c.x==b.c.id).primary_key)
- assert list(a.join(b, a.c.x==b.c.id).primary_key) == [a.c.id]
- assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id]
- assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id]
- assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [b.c.id]
- assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]
- assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [b.c.id]
- assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]
-
- assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]
+ a = Table('a', meta, Column('id', Integer, primary_key=True),
+ Column('x', Integer))
+ b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'),
+ primary_key=True), Column('x', Integer))
+ c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'),
+ primary_key=True), Column('x', Integer))
+ d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'),
+ primary_key=True), Column('x', Integer))
+ print list(a.join(b, a.c.x == b.c.id).primary_key)
+ assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id]
+ assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id]
+ assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) \
+ == [a.c.id]
+ assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) \
+ == [b.c.id]
+ assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \
+ == [b.c.id]
+ assert list(d.join(b, d.c.id == b.c.id).join(c, b.c.id
+ == c.c.x).primary_key) == [b.c.id]
+ assert list(a.join(b).join(c, c.c.id
+ == b.c.x).join(d).primary_key) == [a.c.id]
+ assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x
+ == b.c.id)).primary_key) == [a.c.id]
def test_init_doesnt_blowitaway(self):
meta = MetaData()
@@ -391,19 +443,17 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults):
Column('id', Integer, primary_key= True),
)
- engineer = Table( 'Engineer', metadata,
- Column('id', Integer, ForeignKey( 'Employee.id', ), primary_key=True),
- )
+ engineer = Table('Engineer', metadata,
+ Column('id', Integer,
+ ForeignKey('Employee.id'), primary_key=True))
- eq_(
- util.column_set(employee.join(engineer, employee.c.id==engineer.c.id).primary_key),
- util.column_set([employee.c.id])
- )
- eq_(
- util.column_set(employee.join(engineer, engineer.c.id==employee.c.id).primary_key),
- util.column_set([employee.c.id])
- )
+ eq_(util.column_set(employee.join(engineer, employee.c.id
+ == engineer.c.id).primary_key),
+ util.column_set([employee.c.id]))
+ eq_(util.column_set(employee.join(engineer, engineer.c.id
+ == employee.c.id).primary_key),
+ util.column_set([employee.c.id]))
class ReduceTest(TestBase, AssertsExecutionResults):
@@ -419,164 +469,171 @@ class ReduceTest(TestBase, AssertsExecutionResults):
Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True),
Column('t3data', String(30)))
-
- eq_(
- util.column_set(sql_util.reduce_columns([
- t1.c.t1id, t1.c.t1data, t2.c.t2id,
- t2.c.t2data, t3.c.t3id, t3.c.t3data])),
- util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data])
- )
+ eq_(util.column_set(sql_util.reduce_columns([
+ t1.c.t1id,
+ t1.c.t1data,
+ t2.c.t2id,
+ t2.c.t2data,
+ t3.c.t3id,
+ t3.c.t3data,
+ ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data,
+ t3.c.t3data]))
+
def test_reduce_selectable(self):
- metadata = MetaData()
-
- engineers = Table('engineers', metadata,
- Column('engineer_id', Integer, primary_key=True),
- Column('engineer_name', String(50)),
- )
-
- managers = Table('managers', metadata,
- Column('manager_id', Integer, primary_key=True),
- Column('manager_name', String(50))
- )
-
- s = select([engineers, managers]).where(engineers.c.engineer_name==managers.c.manager_name)
-
- eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
- util.column_set([s.c.engineer_id, s.c.engineer_name, s.c.manager_id])
- )
+ metadata = MetaData()
+ engineers = Table('engineers', metadata, Column('engineer_id',
+ Integer, primary_key=True),
+ Column('engineer_name', String(50)))
+ managers = Table('managers', metadata, Column('manager_id',
+ Integer, primary_key=True),
+ Column('manager_name', String(50)))
+ s = select([engineers,
+ managers]).where(engineers.c.engineer_name
+ == managers.c.manager_name)
+ eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
+ util.column_set([s.c.engineer_id, s.c.engineer_name,
+ s.c.manager_id]))
+
def test_reduce_aliased_join(self):
metadata = MetaData()
- people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
- Column('name', String(50)),
- Column('type', String(30)))
-
- engineers = Table('engineers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
- Column('status', String(30)),
- Column('engineer_name', String(50)),
- Column('primary_language', String(50)),
- )
-
- managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
- Column('status', String(30)),
- Column('manager_name', String(50))
- )
-
- pjoin = people.outerjoin(engineers).\
- outerjoin(managers).select(use_labels=True).\
- alias('pjoin')
- eq_(
- util.column_set(sql_util.reduce_columns([
- pjoin.c.people_person_id, pjoin.c.engineers_person_id,
- pjoin.c.managers_person_id])),
- util.column_set([pjoin.c.people_person_id])
- )
+ people = Table('people', metadata, Column('person_id', Integer,
+ Sequence('person_id_seq', optional=True),
+ primary_key=True), Column('name', String(50)),
+ Column('type', String(30)))
+ engineers = Table(
+ 'engineers',
+ metadata,
+ Column('person_id', Integer, ForeignKey('people.person_id'
+ ), primary_key=True),
+ Column('status', String(30)),
+ Column('engineer_name', String(50)),
+ Column('primary_language', String(50)),
+ )
+ managers = Table('managers', metadata, Column('person_id',
+ Integer, ForeignKey('people.person_id'),
+ primary_key=True), Column('status',
+ String(30)), Column('manager_name',
+ String(50)))
+ pjoin = \
+ people.outerjoin(engineers).outerjoin(managers).\
+ select(use_labels=True).alias('pjoin'
+ )
+ eq_(util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id,
+ pjoin.c.engineers_person_id, pjoin.c.managers_person_id])),
+ util.column_set([pjoin.c.people_person_id]))
def test_reduce_aliased_union(self):
metadata = MetaData()
- item_table = Table(
- 'item', metadata,
- Column('id', Integer, ForeignKey('base_item.id'), primary_key=True),
- Column('dummy', Integer, default=0))
- base_item_table = Table(
- 'base_item', metadata,
- Column('id', Integer, primary_key=True),
- Column('child_name', String(255), default=None))
-
+ item_table = Table('item', metadata, Column('id', Integer,
+ ForeignKey('base_item.id'),
+ primary_key=True), Column('dummy', Integer,
+ default=0))
+ base_item_table = Table('base_item', metadata, Column('id',
+ Integer, primary_key=True),
+ Column('child_name', String(255),
+ default=None))
from sqlalchemy.orm.util import polymorphic_union
-
- item_join = polymorphic_union( {
- 'BaseItem':base_item_table.select(base_item_table.c.child_name=='BaseItem'),
- 'Item':base_item_table.join(item_table),
- }, None, 'item_join')
-
- eq_(
- util.column_set(sql_util.reduce_columns([
- item_join.c.id, item_join.c.dummy, item_join.c.child_name
- ])),
- util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name])
- )
+ item_join = polymorphic_union({
+ 'BaseItem':
+ base_item_table.select(
+ base_item_table.c.child_name
+ == 'BaseItem'),
+ 'Item': base_item_table.join(item_table)},
+ None, 'item_join')
+ eq_(util.column_set(sql_util.reduce_columns([item_join.c.id,
+ item_join.c.dummy, item_join.c.child_name])),
+ util.column_set([item_join.c.id, item_join.c.dummy,
+ item_join.c.child_name]))
+
def test_reduce_aliased_union_2(self):
metadata = MetaData()
-
- page_table = Table('page', metadata,
- Column('id', Integer, primary_key=True),
- )
+ page_table = Table('page', metadata, Column('id', Integer,
+ primary_key=True))
magazine_page_table = Table('magazine_page', metadata,
- Column('page_id', Integer, ForeignKey('page.id'), primary_key=True),
- )
+ Column('page_id', Integer,
+ ForeignKey('page.id'),
+ primary_key=True))
classified_page_table = Table('classified_page', metadata,
- Column('magazine_page_id', Integer,
- ForeignKey('magazine_page.page_id'), primary_key=True),
- )
-
- # this is essentially the union formed by the ORM's polymorphic_union function.
- # we define two versions with different ordering of selects.
+ Column('magazine_page_id', Integer,
+ ForeignKey('magazine_page.page_id'), primary_key=True))
+
+ # this is essentially the union formed by the ORM's
+ # polymorphic_union function. we define two versions with
+ # different ordering of selects.
+ #
+ # the first selectable has the "real" column
+ # classified_page.magazine_page_id
- # the first selectable has the "real" column classified_page.magazine_page_id
pjoin = union(
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- classified_page_table.c.magazine_page_id
- ]).select_from(page_table.join(magazine_page_table).join(classified_page_table)),
-
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- cast(null(), Integer).label('magazine_page_id')
- ]).select_from(page_table.join(magazine_page_table)),
- ).alias('pjoin')
-
- eq_(
- util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
- util.column_set([pjoin.c.id])
- )
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ classified_page_table.c.magazine_page_id
+ ]).
+ select_from(
+ page_table.join(magazine_page_table).
+ join(classified_page_table)),
+
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ cast(null(), Integer).label('magazine_page_id')
+ ]).
+ select_from(page_table.join(magazine_page_table))
+ ).alias('pjoin')
+ eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id,
+ pjoin.c.page_id, pjoin.c.magazine_page_id])),
+ util.column_set([pjoin.c.id]))
# the first selectable has a CAST, which is a placeholder for
- # classified_page.magazine_page_id in the second selectable. reduce_columns
- # needs to take into account all foreign keys derived from pjoin.c.magazine_page_id.
- # the UNION construct currently makes the external column look like that of the first
- # selectable only.
- pjoin = union(
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- cast(null(), Integer).label('magazine_page_id')
- ]).select_from(page_table.join(magazine_page_table)),
-
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- classified_page_table.c.magazine_page_id
- ]).select_from(page_table.join(magazine_page_table).join(classified_page_table))
- ).alias('pjoin')
-
- eq_(
- util.column_set(sql_util.reduce_columns([
- pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
- util.column_set([pjoin.c.id])
- )
+ # classified_page.magazine_page_id in the second selectable.
+ # reduce_columns needs to take into account all foreign keys
+ # derived from pjoin.c.magazine_page_id. the UNION construct
+ # currently makes the external column look like that of the
+ # first selectable only.
+
+ pjoin = union(select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ cast(null(), Integer).label('magazine_page_id')
+ ]).
+ select_from(page_table.join(magazine_page_table)),
+
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ classified_page_table.c.magazine_page_id
+ ]).
+ select_from(page_table.join(magazine_page_table).
+ join(classified_page_table))
+ ).alias('pjoin')
+ eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id,
+ pjoin.c.page_id, pjoin.c.magazine_page_id])),
+ util.column_set([pjoin.c.id]))
class DerivedTest(TestBase, AssertsExecutionResults):
def test_table(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
+
+ t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
+ t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
assert t1.is_derived_from(t1)
assert not t2.is_derived_from(t1)
+
def test_alias(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
+ t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
+ t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
assert t1.alias().is_derived_from(t1)
assert not t2.alias().is_derived_from(t1)
@@ -585,8 +642,11 @@ class DerivedTest(TestBase, AssertsExecutionResults):
def test_select(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
+
+ t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
+ t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
assert t1.select().is_derived_from(t1)
assert not t2.select().is_derived_from(t1)
@@ -623,14 +683,23 @@ class AnnotationsTest(TestBase):
t1 = s1._annotate({})
t2 = s1
- # t1 needs to share the same _make_proxy() columns as t2, even though it's
- # annotated. otherwise paths will diverge once they are corresponded against "inner" below.
+ # t1 needs to share the same _make_proxy() columns as t2, even
+ # though it's annotated. otherwise paths will diverge once they
+ # are corresponded against "inner" below.
+
assert t1.c is t2.c
assert t1.c.col1 is t2.c.col1
inner = select([s1])
- assert inner.corresponding_column(t2.c.col1, require_embedded=False) is inner.corresponding_column(t2.c.col1, require_embedded=True) is inner.c.col1
- assert inner.corresponding_column(t1.c.col1, require_embedded=False) is inner.corresponding_column(t1.c.col1, require_embedded=True) is inner.c.col1
+
+ assert inner.corresponding_column(t2.c.col1,
+ require_embedded=False) \
+ is inner.corresponding_column(t2.c.col1,
+ require_embedded=True) is inner.c.col1
+ assert inner.corresponding_column(t1.c.col1,
+ require_embedded=False) \
+ is inner.corresponding_column(t1.c.col1,
+ require_embedded=True) is inner.c.col1
def test_annotated_visit(self):
table1 = table('table1', column("col1"), column("col2"))
@@ -643,8 +712,10 @@ class AnnotationsTest(TestBase):
b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary})
assert str(b2) == "table1.col1 = table1.col2"
- b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':visit_binary})
- assert str(b3) == "table1.col1 = table1.col2"
+
+ b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary'
+ : visit_binary})
+ assert str(b3) == 'table1.col1 = table1.col2'
def visit_binary(b):
b.left = bindparam('bar')
@@ -655,19 +726,20 @@ class AnnotationsTest(TestBase):
b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary})
assert str(b5) == ":bar = table1.col2"
+
def test_annotate_expressions(self):
- table1 = table('table1', column("col1"), column("col2"))
-
- for expr, expected in [
- (table1.c.col1, "table1.col1"),
- (table1.c.col1 == 5, "table1.col1 = :col1_1"),
- (table1.c.col1.in_([2,3,4]), "table1.col1 IN (:col1_1, :col1_2, :col1_3)")
- ]:
+ table1 = table('table1', column('col1'), column('col2'))
+ for expr, expected in [(table1.c.col1, 'table1.col1'),
+ (table1.c.col1 == 5,
+ 'table1.col1 = :col1_1'),
+ (table1.c.col1.in_([2, 3, 4]),
+ 'table1.col1 IN (:col1_1, :col1_2, '
+ ':col1_3)')]:
eq_(str(expr), expected)
eq_(str(expr._annotate({})), expected)
eq_(str(sql_util._deep_annotate(expr, {})), expected)
- eq_(str(sql_util._deep_annotate(expr, {}, exclude=[table1.c.col1])), expected)
-
+ eq_(str(sql_util._deep_annotate(expr, {},
+ exclude=[table1.c.col1])), expected)
def test_deannotate(self):
table1 = table('table1', column("col1"), column("col2"))
@@ -681,9 +753,10 @@ class AnnotationsTest(TestBase):
for elem in (b2._annotations, b2.left._annotations):
assert '_orm_adapt' in elem
- for elem in (b3._annotations, b3.left._annotations, b4._annotations, b4.left._annotations):
+ for elem in b3._annotations, b3.left._annotations, \
+ b4._annotations, b4.left._annotations:
assert elem == {}
-
+
assert b2.left is not bin.left
assert b3.left is not b2.left is not bin.left
assert b4.left is bin.left # since column is immutable