from test.lib.testing import assert_raises, assert_raises_message, eq_ from test.lib.engines import testing_engine from test.lib import fixtures, AssertsCompiledSQL, testing from sqlalchemy import * from sqlalchemy import exc as exceptions from sqlalchemy.engine import default from sqlalchemy.sql import table, column from test.lib.schema import Table, Column IDENT_LENGTH = 29 class LabelTypeTest(fixtures.TestBase): def test_type(self): m = MetaData() t = Table('sometable', m, Column('col1', Integer), Column('col2', Float)) assert isinstance(t.c.col1.label('hi').type, Integer) assert isinstance(select([t.c.col2]).as_scalar().label('lala').type, Float) class LongLabelsTest(fixtures.TablesTest, AssertsCompiledSQL): run_inserts = 'once' run_deletes = None @classmethod def define_tables(cls, metadata): table1 = Table("some_large_named_table", metadata, Column("this_is_the_primarykey_column", Integer, primary_key=True, test_needs_autoincrement=True), Column("this_is_the_data_column", String(30)) ) table2 = Table("table_with_exactly_29_characs", metadata, Column("this_is_the_primarykey_column", Integer, primary_key=True, test_needs_autoincrement=True), Column("this_is_the_data_column", String(30)) ) cls.tables.table1 = table1 cls.tables.table2 = table2 @classmethod def insert_data(cls): table1 = cls.tables.table1 table2 = cls.tables.table2 for data in [ {"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"}, {"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"}, {"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"}, {"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"} ]: testing.db.execute( table1.insert(), **data ) testing.db.execute( table2.insert(), {"this_is_the_primary_key_column":1, "this_is_the_data_column":"data"} ) @classmethod def setup_class(cls): super(LongLabelsTest, cls).setup_class() cls.maxlen = testing.db.dialect.max_identifier_length testing.db.dialect.max_identifier_length = IDENT_LENGTH @classmethod def teardown_class(cls): testing.db.dialect.max_identifier_length = cls.maxlen super(LongLabelsTest, cls).teardown_class() def test_too_long_name_disallowed(self): m = MetaData(testing.db) t1 = Table("this_name_is_too_long_for_what_were_doing_in_this_test", m, Column('foo', Integer)) assert_raises(exceptions.IdentifierError, m.create_all) assert_raises(exceptions.IdentifierError, m.drop_all) assert_raises(exceptions.IdentifierError, t1.create) assert_raises(exceptions.IdentifierError, t1.drop) def test_basic_result(self): table1 = self.tables.table1 s = table1.select(use_labels=True, order_by=[table1.c.this_is_the_primarykey_column]) result = [ (row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]) for row in testing.db.execute(s) ] eq_(result, [ (1, "data1"), (2, "data2"), (3, "data3"), (4, "data4"), ]) def test_result_limit(self): table1 = self.tables.table1 # some dialects such as oracle (and possibly ms-sql # in a future version) # generate a subquery for limits/offsets. # ensure that the generated result map corresponds # to the selected table, not # the select query s = table1.select(use_labels=True, order_by=[table1.c.this_is_the_primarykey_column]).\ limit(2) result = [ (row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]) for row in testing.db.execute(s) ] eq_(result, [ (1, "data1"), (2, "data2"), ]) @testing.requires.offset def test_result_limit_offset(self): table1 = self.tables.table1 s = table1.select(use_labels=True, order_by=[table1.c.this_is_the_primarykey_column]).\ limit(2).offset(1) result = [ (row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]) for row in testing.db.execute(s) ] eq_(result, [ (2, "data2"), (3, "data3"), ]) def test_table_alias_1(self): table2 = self.tables.table2 if testing.against('oracle'): self.assert_compile( table2.alias().select(), "SELECT table_with_exactly_29_c_1." "this_is_the_primarykey_column, " "table_with_exactly_29_c_1.this_is_the_data_column " "FROM table_with_exactly_29_characs " "table_with_exactly_29_c_1" ) else: self.assert_compile( table2.alias().select(), "SELECT table_with_exactly_29_c_1." "this_is_the_primarykey_column, " "table_with_exactly_29_c_1.this_is_the_data_column " "FROM table_with_exactly_29_characs AS " "table_with_exactly_29_c_1" ) def test_table_alias_2(self): table1 = self.tables.table1 table2 = self.tables.table2 ta = table2.alias() dialect = default.DefaultDialect() dialect.max_identifier_length = IDENT_LENGTH self.assert_compile( select([table1, ta]).select_from( table1.join(ta, table1.c.this_is_the_data_column== ta.c.this_is_the_data_column)).\ where(ta.c.this_is_the_data_column=='data3'), "SELECT some_large_named_table.this_is_the_primarykey_column, " "some_large_named_table.this_is_the_data_column, " "table_with_exactly_29_c_1.this_is_the_primarykey_column, " "table_with_exactly_29_c_1.this_is_the_data_column FROM " "some_large_named_table JOIN table_with_exactly_29_characs " "AS table_with_exactly_29_c_1 ON " "some_large_named_table.this_is_the_data_column = " "table_with_exactly_29_c_1.this_is_the_data_column " "WHERE table_with_exactly_29_c_1.this_is_the_data_column = " ":this_is_the_data_column_1", dialect=dialect ) def test_table_alias_3(self): table2 = self.tables.table2 eq_( testing.db.execute(table2.alias().select()).first(), (1, "data") ) def test_colbinds(self): table1 = self.tables.table1 r = table1.select(table1.c.this_is_the_primarykey_column == 4).\ execute() assert r.fetchall() == [(4, "data4")] r = table1.select(or_( table1.c.this_is_the_primarykey_column == 4, table1.c.this_is_the_primarykey_column == 2 )).execute() assert r.fetchall() == [(2, "data2"), (4, "data4")] @testing.provide_metadata def test_insert_no_pk(self): t = Table("some_other_large_named_table", self.metadata, Column("this_is_the_primarykey_column", Integer, Sequence("this_is_some_large_seq"), primary_key=True), Column("this_is_the_data_column", String(30)) ) t.create(testing.db, checkfirst=True) testing.db.execute(t.insert(), **{"this_is_the_data_column":"data1"}) @testing.requires.subqueries def test_subquery(self): table1 = self.tables.table1 q = table1.select(table1.c.this_is_the_primarykey_column == 4).\ alias('foo') eq_( list(testing.db.execute(select([q]))), [(4, u'data4')] ) @testing.requires.subqueries def test_anon_alias(self): table1 = self.tables.table1 compile_dialect = default.DefaultDialect() compile_dialect.max_identifier_length = IDENT_LENGTH q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias() x = select([q], use_labels=True) self.assert_compile(x, "SELECT anon_1.this_is_the_primarykey_column AS " "anon_1_this_is_the_prim_1, anon_1.this_is_the_data_column " "AS anon_1_this_is_the_data_2 " "FROM (SELECT some_large_named_table." "this_is_the_primarykey_column AS " "this_is_the_primarykey_column, " "some_large_named_table.this_is_the_data_column " "AS this_is_the_data_column " "FROM some_large_named_table " "WHERE some_large_named_table.this_is_the_primarykey_column " "= :this_is_the_primarykey__1) AS anon_1", dialect=compile_dialect) eq_( list(testing.db.execute(x)), [(4, u'data4')] ) def test_adjustable(self): table1 = self.tables.table1 q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo') x = select([q]) compile_dialect = default.DefaultDialect(label_length=10) self.assert_compile(x, "SELECT foo.this_1, foo.this_2 FROM " "(SELECT some_large_named_table." "this_is_the_primarykey_column AS this_1, " "some_large_named_table.this_is_the_data_column AS this_2 " "FROM some_large_named_table WHERE " "some_large_named_table.this_is_the_primarykey_column = :this_1) AS foo", dialect=compile_dialect) compile_dialect = default.DefaultDialect(label_length=4) self.assert_compile(x, "SELECT foo._1, foo._2 FROM " "(SELECT some_large_named_table.this_is_the_primarykey_column " "AS _1, some_large_named_table.this_is_the_data_column AS _2 " "FROM some_large_named_table WHERE " "some_large_named_table.this_is_the_primarykey_column = :_1) AS foo", dialect=compile_dialect) q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias() x = select([q], use_labels=True) compile_dialect = default.DefaultDialect(label_length=10) self.assert_compile(x, "SELECT anon_1.this_2 AS anon_1, anon_1.this_4 AS anon_3 FROM " "(SELECT some_large_named_table.this_is_the_primarykey_column " "AS this_2, some_large_named_table.this_is_the_data_column AS this_4 " "FROM some_large_named_table WHERE " "some_large_named_table.this_is_the_primarykey_column = :this_1) AS anon_1", dialect=compile_dialect) compile_dialect = default.DefaultDialect(label_length=4) self.assert_compile(x, "SELECT _1._2 AS _1, _1._4 AS _3 FROM " "(SELECT some_large_named_table.this_is_the_primarykey_column " "AS _2, some_large_named_table.this_is_the_data_column AS _4 " "FROM some_large_named_table WHERE " "some_large_named_table.this_is_the_primarykey_column = :_1) AS _1", dialect=compile_dialect) def test_adjustable_result_schema_column(self): table1 = self.tables.table1 q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo') x = select([q]) e = testing_engine(options={"label_length":10}) e.pool = testing.db.pool row = e.execute(x).first() eq_(row.this_is_the_primarykey_column, 4) eq_(row.this_1, 4) eq_(row['this_1'], 4) q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo') row = e.execute(x).first() eq_(row.this_is_the_primarykey_column, 4) eq_(row.this_1, 4) def test_adjustable_result_lightweight_column(self): table1 = table("some_large_named_table", column("this_is_the_primarykey_column"), column("this_is_the_data_column") ) q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo') x = select([q]) e = testing_engine(options={"label_length":10}) e.pool = testing.db.pool row = e.execute(x).first() eq_(row.this_is_the_primarykey_column, 4) eq_(row.this_1, 4) def test_table_plus_column_exceeds_length(self): """test that the truncation occurs if tablename / colname are only greater than the max when concatenated.""" compile_dialect = default.DefaultDialect(label_length=30) m = MetaData() a_table = Table( 'thirty_characters_table_xxxxxx', m, Column('id', Integer, primary_key=True) ) other_table = Table( 'other_thirty_characters_table_', m, Column('id', Integer, primary_key=True), Column('thirty_characters_table_id', Integer, ForeignKey('thirty_characters_table_xxxxxx.id'), primary_key=True ) ) anon = a_table.alias() self.assert_compile( select([other_table,anon]). select_from( other_table.outerjoin(anon) ).apply_labels(), "SELECT other_thirty_characters_table_.id AS " "other_thirty_characters__1, " "other_thirty_characters_table_.thirty_characters_table_id " "AS other_thirty_characters__2, thirty_characters_table__1.id " "AS thirty_characters_table__3 " "FROM other_thirty_characters_table_ " "LEFT OUTER JOIN thirty_characters_table_xxxxxx " "AS thirty_characters_table__1 ON " "thirty_characters_table__1.id = " "other_thirty_characters_table_.thirty_characters_table_id", dialect=compile_dialect) self.assert_compile( select([other_table, anon]). select_from( other_table.outerjoin(anon) ).apply_labels(), "SELECT other_thirty_characters_table_.id AS " "other_thirty_characters__1, " "other_thirty_characters_table_.thirty_characters_table_id " "AS other_thirty_characters__2, " "thirty_characters_table__1.id AS thirty_characters_table__3 " "FROM other_thirty_characters_table_ " "LEFT OUTER JOIN thirty_characters_table_xxxxxx " "AS thirty_characters_table__1 ON " "thirty_characters_table__1.id = " "other_thirty_characters_table_.thirty_characters_table_id", dialect=compile_dialect )