diff options
Diffstat (limited to 'test/sql/test_metadata.py')
| -rw-r--r-- | test/sql/test_metadata.py | 944 |
1 files changed, 521 insertions, 423 deletions
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 8bd419964..3a252c646 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -17,12 +17,14 @@ from sqlalchemy.testing import ComparesTables, AssertsCompiledSQL from sqlalchemy.testing import eq_, is_, mock from contextlib import contextmanager + class MetaDataTest(fixtures.TestBase, ComparesTables): + def test_metadata_connect(self): metadata = MetaData() t1 = Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20))) + Column('col1', Integer, primary_key=True), + Column('col2', String(20))) metadata.bind = testing.db metadata.create_all() try: @@ -52,16 +54,16 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('baz', String(), unique=True), Column(Integer(), primary_key=True), Column('bar', Integer(), Sequence('foo_seq'), primary_key=True, - key='bar'), + key='bar'), Column(Integer(), ForeignKey('bat.blah'), doc="this is a col"), Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True, - key='bar'), + key='bar'), Column('bar', Integer(), info={'foo': 'bar'}), ]: c2 = col.copy() for attr in ('name', 'type', 'nullable', - 'primary_key', 'key', 'unique', 'info', - 'doc'): + 'primary_key', 'key', 'unique', 'info', + 'doc'): eq_(getattr(col, attr), getattr(c2, attr)) eq_(len(col.foreign_keys), len(c2.foreign_keys)) if col.default: @@ -72,6 +74,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_col_subclass_copy(self): class MyColumn(schema.Column): + def __init__(self, *args, **kw): self.widget = kw.pop('widget', None) super(MyColumn, self).__init__(*args, **kw) @@ -87,6 +90,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_uninitialized_column_copy_events(self): msgs = [] + def write(c, t): msgs.append("attach %s.%s" % (t.name, c.name)) c1 = Column('foo', String()) @@ -150,21 +154,22 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_dupe_tables(self): metadata = self.metadata Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20))) + Column('col1', Integer, primary_key=True), + Column('col2', String(20))) metadata.create_all() Table('table1', metadata, autoload=True) + def go(): Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20))) + Column('col1', Integer, primary_key=True), + Column('col2', String(20))) assert_raises_message( tsa.exc.InvalidRequestError, - "Table 'table1' is already defined for this " - "MetaData instance. Specify 'extend_existing=True' " - "to redefine options and columns on an existing " - "Table object.", + "Table 'table1' is already defined for this " + "MetaData instance. Specify 'extend_existing=True' " + "to redefine options and columns on an existing " + "Table object.", go ) @@ -175,8 +180,8 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): t1 = Table('t', m, c1, c2) kw = dict(onupdate="X", - ondelete="Y", use_alter=True, name='f1', - deferrable="Z", initially="Q", link_to_name=True) + ondelete="Y", use_alter=True, name='f1', + deferrable="Z", initially="Q", link_to_name=True) fk1 = ForeignKey(c1, **kw) fk2 = ForeignKeyConstraint((c1,), (c2,), **kw) @@ -248,6 +253,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_fk_given_non_col_clauseelem(self): class Foo(object): + def __clause_element__(self): return bindparam('x') assert_raises_message( @@ -267,7 +273,9 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_fk_given_col_non_table_clauseelem(self): t = Table('t', MetaData(), Column('x', Integer)) + class Foo(object): + def __clause_element__(self): return t.alias().c.x @@ -306,50 +314,46 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): getattr, list(a.foreign_keys)[0], "column" ) - - def test_pickle_metadata_sequence_restated(self): m1 = MetaData() Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq"))) + Column('id', Integer, primary_key=True), + Column('x', Integer, Sequence("x_seq"))) m2 = pickle.loads(pickle.dumps(m1)) s2 = Sequence("x_seq") t2 = Table('a', m2, - Column('id', Integer, primary_key=True), - Column('x', Integer, s2), - extend_existing=True) + Column('id', Integer, primary_key=True), + Column('x', Integer, s2), + extend_existing=True) assert m2._sequences['x_seq'] is t2.c.x.default assert m2._sequences['x_seq'] is s2 - def test_sequence_restated_replaced(self): """Test restatement of Sequence replaces.""" m1 = MetaData() s1 = Sequence("x_seq") t = Table('a', m1, - Column('x', Integer, s1) - ) + Column('x', Integer, s1) + ) assert m1._sequences['x_seq'] is s1 s2 = Sequence('x_seq') Table('a', m1, - Column('x', Integer, s2), - extend_existing=True - ) + Column('x', Integer, s2), + extend_existing=True + ) assert t.c.x.default is s2 assert m1._sequences['x_seq'] is s2 - def test_pickle_metadata_sequence_implicit(self): m1 = MetaData() Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq"))) + Column('id', Integer, primary_key=True), + Column('x', Integer, Sequence("x_seq"))) m2 = pickle.loads(pickle.dumps(m1)) @@ -360,14 +364,14 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_pickle_metadata_schema(self): m1 = MetaData() Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq")), - schema='y') + Column('id', Integer, primary_key=True), + Column('x', Integer, Sequence("x_seq")), + schema='y') m2 = pickle.loads(pickle.dumps(m1)) Table('a', m2, schema='y', - extend_existing=True) + extend_existing=True) eq_(m2._schemas, m1._schemas) @@ -378,24 +382,24 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): m4 = MetaData() for i, (name, metadata, schema, quote_schema, - exp_schema, exp_quote_schema) in enumerate([ - ('t1', m1, None, None, 'sch1', None), - ('t2', m1, 'sch2', None, 'sch2', None), - ('t3', m1, 'sch2', True, 'sch2', True), - ('t4', m1, 'sch1', None, 'sch1', None), - ('t1', m2, None, None, 'sch1', True), - ('t2', m2, 'sch2', None, 'sch2', None), - ('t3', m2, 'sch2', True, 'sch2', True), - ('t4', m2, 'sch1', None, 'sch1', None), - ('t1', m3, None, None, 'sch1', False), - ('t2', m3, 'sch2', None, 'sch2', None), - ('t3', m3, 'sch2', True, 'sch2', True), - ('t4', m3, 'sch1', None, 'sch1', None), - ('t1', m4, None, None, None, None), - ('t2', m4, 'sch2', None, 'sch2', None), - ('t3', m4, 'sch2', True, 'sch2', True), - ('t4', m4, 'sch1', None, 'sch1', None), - ]): + exp_schema, exp_quote_schema) in enumerate([ + ('t1', m1, None, None, 'sch1', None), + ('t2', m1, 'sch2', None, 'sch2', None), + ('t3', m1, 'sch2', True, 'sch2', True), + ('t4', m1, 'sch1', None, 'sch1', None), + ('t1', m2, None, None, 'sch1', True), + ('t2', m2, 'sch2', None, 'sch2', None), + ('t3', m2, 'sch2', True, 'sch2', True), + ('t4', m2, 'sch1', None, 'sch1', None), + ('t1', m3, None, None, 'sch1', False), + ('t2', m3, 'sch2', None, 'sch2', None), + ('t3', m3, 'sch2', True, 'sch2', True), + ('t4', m3, 'sch1', None, 'sch1', None), + ('t1', m4, None, None, None, None), + ('t2', m4, 'sch2', None, 'sch2', None), + ('t3', m4, 'sch2', True, 'sch2', True), + ('t4', m4, 'sch1', None, 'sch1', None), + ]): kw = {} if schema is not None: kw['schema'] = schema @@ -404,13 +408,13 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): t = Table(name, metadata, **kw) eq_(t.schema, exp_schema, "test %d, table schema" % i) eq_(t.schema.quote if t.schema is not None else None, - exp_quote_schema, - "test %d, table quote_schema" % i) + exp_quote_schema, + "test %d, table quote_schema" % i) seq = Sequence(name, metadata=metadata, **kw) eq_(seq.schema, exp_schema, "test %d, seq schema" % i) eq_(seq.schema.quote if seq.schema is not None else None, - exp_quote_schema, - "test %d, seq quote_schema" % i) + exp_quote_schema, + "test %d, seq quote_schema" % i) def test_manual_dependencies(self): meta = MetaData() @@ -432,8 +436,8 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): def test_nonexistent(self): assert_raises(tsa.exc.NoSuchTableError, Table, - 'fake_table', - MetaData(testing.db), autoload=True) + 'fake_table', + MetaData(testing.db), autoload=True) def test_assorted_repr(self): t1 = Table("foo", MetaData(), Column("x", Integer)) @@ -456,9 +460,9 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): (i1, "Index('bar', Column('x', Integer(), table=<foo>))"), (schema.FetchedValue(), "FetchedValue()"), (ck, - "CheckConstraint(" - "%s" - ", name='someconstraint')" % repr(ck.sqltext)), + "CheckConstraint(" + "%s" + ", name='someconstraint')" % repr(ck.sqltext)), ): eq_( repr(const), @@ -472,26 +476,51 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): from sqlalchemy.testing.schema import Table meta = MetaData() - table = Table('mytable', meta, - Column('myid', Integer, Sequence('foo_id_seq'), primary_key=True), - Column('name', String(40), nullable=True), - Column('foo', String(40), nullable=False, server_default='x', - server_onupdate='q'), - Column('bar', String(40), nullable=False, default='y', - onupdate='z'), - Column('description', String(30), - CheckConstraint("description='hi'")), + table = Table( + 'mytable', + meta, + Column( + 'myid', + Integer, + Sequence('foo_id_seq'), + primary_key=True), + Column( + 'name', + String(40), + nullable=True), + Column( + 'foo', + String(40), + nullable=False, + server_default='x', + server_onupdate='q'), + Column( + 'bar', + String(40), + nullable=False, + default='y', + onupdate='z'), + Column( + 'description', + String(30), + CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True - ) - - table2 = Table('othertable', meta, - Column('id', Integer, Sequence('foo_seq'), primary_key=True), - Column('myid', Integer, - ForeignKey('mytable.myid'), - ), - test_needs_fk=True - ) + test_needs_fk=True) + + table2 = Table( + 'othertable', + meta, + Column( + 'id', + Integer, + Sequence('foo_seq'), + primary_key=True), + Column( + 'myid', + Integer, + ForeignKey('mytable.myid'), + ), + test_needs_fk=True) def test_to_metadata(): meta2 = MetaData() @@ -560,7 +589,6 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): assert c.columns.contains_column(table_c.c.name) assert not c.columns.contains_column(table.c.name) - finally: meta.drop_all(testing.db) @@ -576,29 +604,28 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): a2 = a.tometadata(m2) assert b2.c.y.references(a2.c.x) - def test_change_schema(self): meta = MetaData() table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), - ) + Column('myid', Integer, primary_key=True), + Column('name', String(40), nullable=True), + Column('description', String(30), + CheckConstraint("description='hi'")), + UniqueConstraint('name'), + ) table2 = Table('othertable', meta, - Column('id', Integer, primary_key=True), - Column('myid', Integer, ForeignKey('mytable.myid')), - ) + Column('id', Integer, primary_key=True), + Column('myid', Integer, ForeignKey('mytable.myid')), + ) meta2 = MetaData() table_c = table.tometadata(meta2, schema='someschema') table2_c = table2.tometadata(meta2, schema='someschema') eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid - == table2_c.c.myid)) + == table2_c.c.myid)) eq_(str(table_c.join(table2_c).onclause), 'someschema.mytable.myid = someschema.othertable.myid') @@ -606,26 +633,34 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): meta = MetaData() table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), + Column('myid', Integer, primary_key=True), + Column('name', String(40), nullable=True), + Column('description', String(30), + CheckConstraint("description='hi'")), + UniqueConstraint('name'), + schema='myschema', + ) + + table2 = Table( + 'othertable', + meta, + Column( + 'id', + Integer, + primary_key=True), + Column( + 'myid', + Integer, + ForeignKey('myschema.mytable.myid')), schema='myschema', ) - table2 = Table('othertable', meta, - Column('id', Integer, primary_key=True), - Column('myid', Integer, ForeignKey('myschema.mytable.myid')), - schema='myschema', - ) - meta2 = MetaData() table_c = table.tometadata(meta2) table2_c = table2.tometadata(meta2) eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid - == table2_c.c.myid)) + == table2_c.c.myid)) eq_(str(table_c.join(table2_c).onclause), 'myschema.mytable.myid = myschema.othertable.myid') @@ -634,7 +669,7 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): existing_schema = t2.schema if schema: t2c = t2.tometadata(m2, schema=schema, - referred_schema_fn=referred_schema_fn) + referred_schema_fn=referred_schema_fn) eq_(t2c.schema, schema) else: t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn) @@ -679,7 +714,7 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): m = MetaData() t2 = Table('t2', m, Column('y', Integer, - ForeignKey('q.t1.x')), schema="q") + ForeignKey('q.t1.x')), schema="q") self._assert_fk(t2, None, "q.t1.x") @@ -690,7 +725,7 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): m = MetaData() t2 = Table('t2', m, Column('y', Integer, - ForeignKey('q.t1.x')), schema="q") + ForeignKey('q.t1.x')), schema="q") self._assert_fk(t2, "z", "z.t1.x") @@ -702,23 +737,22 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): t1 = Table('t1', m, Column('x', Integer), schema='q') t2 = Table('t2', m, Column('y', Integer, - ForeignKey(t1.c.x)), schema='q') + ForeignKey(t1.c.x)), schema='q') self._assert_fk(t2, None, "q.t1.x") - def test_fk_and_referent_has_same_schema_col_new_schema(self): m = MetaData() t1 = Table('t1', m, Column('x', Integer), schema='q') t2 = Table('t2', m, Column('y', Integer, - ForeignKey(t1.c.x)), schema='q') + ForeignKey(t1.c.x)), schema='q') self._assert_fk(t2, 'z', "z.t1.x") def test_fk_and_referent_has_diff_schema_string_retain_schema(self): m = MetaData() t2 = Table('t2', m, Column('y', Integer, - ForeignKey('p.t1.x')), schema="q") + ForeignKey('p.t1.x')), schema="q") self._assert_fk(t2, None, "p.t1.x") @@ -729,7 +763,7 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): m = MetaData() t2 = Table('t2', m, Column('y', Integer, - ForeignKey('p.t1.x')), schema="q") + ForeignKey('p.t1.x')), schema="q") self._assert_fk(t2, "z", "p.t1.x") @@ -741,22 +775,21 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): t1 = Table('t1', m, Column('x', Integer), schema='p') t2 = Table('t2', m, Column('y', Integer, - ForeignKey(t1.c.x)), schema='q') + ForeignKey(t1.c.x)), schema='q') self._assert_fk(t2, None, "p.t1.x") - def test_fk_and_referent_has_diff_schema_col_new_schema(self): m = MetaData() t1 = Table('t1', m, Column('x', Integer), schema='p') t2 = Table('t2', m, Column('y', Integer, - ForeignKey(t1.c.x)), schema='q') + ForeignKey(t1.c.x)), schema='q') self._assert_fk(t2, 'z', "p.t1.x") def test_fk_custom_system(self): m = MetaData() t2 = Table('t2', m, Column('y', Integer, - ForeignKey('p.t1.x')), schema='q') + ForeignKey('p.t1.x')), schema='q') def ref_fn(table, to_schema, constraint, referred_schema): assert table is t2 @@ -765,7 +798,6 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): return "h" self._assert_fk(t2, 'z', "h.t1.x", referred_schema_fn=ref_fn) - def test_copy_info(self): m = MetaData() fk = ForeignKey('t2.id') @@ -780,7 +812,7 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): t.info['tinfo'] = True t.primary_key.info['pkinfo'] = True fkc = [const for const in t.constraints if - isinstance(const, ForeignKeyConstraint)][0] + isinstance(const, ForeignKeyConstraint)][0] fkc.info['fkcinfo'] = True m2 = MetaData() @@ -800,21 +832,20 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): eq_(t2.primary_key.info, {"pkinfo": True}) fkc2 = [const for const in t2.constraints - if isinstance(const, ForeignKeyConstraint)][0] + if isinstance(const, ForeignKeyConstraint)][0] eq_(fkc2.info, {"fkcinfo": True}) ck2 = [const for const in - t2.constraints if isinstance(const, CheckConstraint)][0] + t2.constraints if isinstance(const, CheckConstraint)][0] eq_(ck2.info, {"ckinfo": True}) - def test_dialect_kwargs(self): meta = MetaData() table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - mysql_engine='InnoDB', - ) + Column('myid', Integer, primary_key=True), + mysql_engine='InnoDB', + ) meta2 = MetaData() table_c = table.tometadata(meta2) @@ -827,10 +858,10 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): meta = MetaData() table = Table('mytable', meta, - Column('id', Integer, primary_key=True), - Column('data1', Integer, index=True), - Column('data2', Integer), - ) + Column('id', Integer, primary_key=True), + Column('data1', Integer, index=True), + Column('data2', Integer), + ) Index('multi', table.c.data1, table.c.data2), meta2 = MetaData() @@ -838,8 +869,8 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): def _get_key(i): return [i.name, i.unique] + \ - sorted(i.kwargs.items()) + \ - list(i.columns.keys()) + sorted(i.kwargs.items()) + \ + list(i.columns.keys()) eq_( sorted([_get_key(i) for i in table.indexes]), @@ -851,12 +882,12 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): meta1 = MetaData() table1 = Table('mytable', meta1, - Column('myid', Integer, primary_key=True), - ) + Column('myid', Integer, primary_key=True), + ) meta2 = MetaData() table2 = Table('mytable', meta2, - Column('yourid', Integer, primary_key=True), - ) + Column('yourid', Integer, primary_key=True), + ) table_c = table1.tometadata(meta2) table_d = table2.tometadata(meta2) @@ -867,59 +898,72 @@ class ToMetaDataTest(fixtures.TestBase, ComparesTables): def test_default_schema_metadata(self): meta = MetaData(schema='myschema') - table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), CheckConstraint("description='hi'")), + table = Table( + 'mytable', + meta, + Column( + 'myid', + Integer, + primary_key=True), + Column( + 'name', + String(40), + nullable=True), + Column( + 'description', + String(30), + CheckConstraint("description='hi'")), UniqueConstraint('name'), ) - table2 = Table('othertable', meta, - Column('id', Integer, primary_key=True), - Column('myid', Integer, ForeignKey('myschema.mytable.myid')), - ) + table2 = Table( + 'othertable', meta, Column( + 'id', Integer, primary_key=True), Column( + 'myid', Integer, ForeignKey('myschema.mytable.myid')), ) meta2 = MetaData(schema='someschema') table_c = table.tometadata(meta2, schema=None) table2_c = table2.tometadata(meta2, schema=None) eq_(str(table_c.join(table2_c).onclause), - str(table_c.c.myid == table2_c.c.myid)) + str(table_c.c.myid == table2_c.c.myid)) eq_(str(table_c.join(table2_c).onclause), - "someschema.mytable.myid = someschema.othertable.myid") + "someschema.mytable.myid = someschema.othertable.myid") def test_strip_schema(self): meta = MetaData() table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=True), - Column('description', String(30), - CheckConstraint("description='hi'")), - UniqueConstraint('name'), - ) + Column('myid', Integer, primary_key=True), + Column('name', String(40), nullable=True), + Column('description', String(30), + CheckConstraint("description='hi'")), + UniqueConstraint('name'), + ) table2 = Table('othertable', meta, - Column('id', Integer, primary_key=True), - Column('myid', Integer, ForeignKey('mytable.myid')), - ) + Column('id', Integer, primary_key=True), + Column('myid', Integer, ForeignKey('mytable.myid')), + ) meta2 = MetaData() table_c = table.tometadata(meta2, schema=None) table2_c = table2.tometadata(meta2, schema=None) eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid - == table2_c.c.myid)) + == table2_c.c.myid)) eq_(str(table_c.join(table2_c).onclause), 'mytable.myid = othertable.myid') + class TableTest(fixtures.TestBase, AssertsCompiledSQL): + @testing.skip_if('mssql', 'different col format') def test_prefixes(self): from sqlalchemy import Table table1 = Table("temporary_table_1", MetaData(), - Column("col1", Integer), - prefixes=["TEMPORARY"]) + Column("col1", Integer), + prefixes=["TEMPORARY"]) self.assert_compile( schema.CreateTable(table1), @@ -927,8 +971,8 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): ) table2 = Table("temporary_table_2", MetaData(), - Column("col1", Integer), - prefixes=["VIRTUAL"]) + Column("col1", Integer), + prefixes=["VIRTUAL"]) self.assert_compile( schema.CreateTable(table2), "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)" @@ -972,23 +1016,23 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): m = MetaData() t = Table('t', m, - Column('id', Integer, primary_key=True) - ) + Column('id', Integer, primary_key=True) + ) is_(t._autoincrement_column, t.c.id) t = Table('t', m, - Column('id', Integer, primary_key=True), - extend_existing=True - ) + Column('id', Integer, primary_key=True), + extend_existing=True + ) is_(t._autoincrement_column, t.c.id) def test_pk_args_standalone(self): m = MetaData() t = Table('t', m, - Column('x', Integer, primary_key=True), - PrimaryKeyConstraint(mssql_clustered=True) - ) + Column('x', Integer, primary_key=True), + PrimaryKeyConstraint(mssql_clustered=True) + ) eq_( list(t.primary_key), [t.c.x] ) @@ -999,11 +1043,11 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): def test_pk_cols_sets_flags(self): m = MetaData() t = Table('t', m, - Column('x', Integer), - Column('y', Integer), - Column('z', Integer), - PrimaryKeyConstraint('x', 'y') - ) + Column('x', Integer), + Column('y', Integer), + Column('z', Integer), + PrimaryKeyConstraint('x', 'y') + ) eq_(t.c.x.primary_key, True) eq_(t.c.y.primary_key, True) eq_(t.c.z.primary_key, False) @@ -1013,11 +1057,11 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises_message( exc.SAWarning, "Table 't' specifies columns 'x' as primary_key=True, " - "not matching locally specified columns 'q'", + "not matching locally specified columns 'q'", Table, 't', m, - Column('x', Integer, primary_key=True), - Column('q', Integer), - PrimaryKeyConstraint('q') + Column('x', Integer, primary_key=True), + Column('q', Integer), + PrimaryKeyConstraint('q') ) def test_pk_col_mismatch_two(self): @@ -1025,33 +1069,33 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises_message( exc.SAWarning, "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, " - "not matching locally specified columns 'b', 'c'", + "not matching locally specified columns 'b', 'c'", Table, 't', m, - Column('a', Integer, primary_key=True), - Column('b', Integer, primary_key=True), - Column('c', Integer, primary_key=True), - PrimaryKeyConstraint('b', 'c') + Column('a', Integer, primary_key=True), + Column('b', Integer, primary_key=True), + Column('c', Integer, primary_key=True), + PrimaryKeyConstraint('b', 'c') ) @testing.emits_warning("Table 't'") def test_pk_col_mismatch_three(self): m = MetaData() t = Table('t', m, - Column('x', Integer, primary_key=True), - Column('q', Integer), - PrimaryKeyConstraint('q') - ) + Column('x', Integer, primary_key=True), + Column('q', Integer), + PrimaryKeyConstraint('q') + ) eq_(list(t.primary_key), [t.c.q]) @testing.emits_warning("Table 't'") def test_pk_col_mismatch_four(self): m = MetaData() t = Table('t', m, - Column('a', Integer, primary_key=True), - Column('b', Integer, primary_key=True), - Column('c', Integer, primary_key=True), - PrimaryKeyConstraint('b', 'c') - ) + Column('a', Integer, primary_key=True), + Column('b', Integer, primary_key=True), + Column('c', Integer, primary_key=True), + PrimaryKeyConstraint('b', 'c') + ) eq_(list(t.primary_key), [t.c.b, t.c.c]) def test_pk_always_flips_nullable(self): @@ -1073,6 +1117,7 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): class SchemaTypeTest(fixtures.TestBase): + class MyType(sqltypes.SchemaType, sqltypes.TypeEngine): column = None table = None @@ -1094,6 +1139,7 @@ class SchemaTypeTest(fixtures.TestBase): self.evt_targets += (target,) class MyTypeWImpl(MyType): + def _gen_dialect_impl(self, dialect): return self.adapt(SchemaTypeTest.MyTypeImpl) @@ -1236,7 +1282,14 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): def test_ad_hoc_schema_equiv_fk(self): m = MetaData() t1 = Table('t1', m, Column('x', Integer), schema="foo") - t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')), schema="foo") + t2 = Table( + 't2', + m, + Column( + 'x', + Integer, + ForeignKey('t1.x')), + schema="foo") assert_raises( exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x) @@ -1246,7 +1299,7 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): m = MetaData(schema="foo") t1 = Table('t1', m, Column('x', Integer)) t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')), - schema="bar") + schema="bar") assert t2.c.x.references(t1.c.x) def test_default_schema_metadata_fk_alt_local_raises(self): @@ -1281,12 +1334,26 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): def test_iteration(self): metadata = MetaData() - table1 = Table('table1', metadata, Column('col1', Integer, - primary_key=True), schema='someschema') - table2 = Table('table2', metadata, Column('col1', Integer, - primary_key=True), Column('col2', Integer, - ForeignKey('someschema.table1.col1')), - schema='someschema') + table1 = Table( + 'table1', + metadata, + Column( + 'col1', + Integer, + primary_key=True), + schema='someschema') + table2 = Table( + 'table2', + metadata, + Column( + 'col1', + Integer, + primary_key=True), + Column( + 'col2', + Integer, + ForeignKey('someschema.table1.col1')), + schema='someschema') t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) @@ -1299,11 +1366,12 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): class UseExistingTest(fixtures.TablesTest): + @classmethod def define_tables(cls, metadata): Table('users', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(30))) + Column('id', Integer, primary_key=True), + Column('name', String(30))) def _useexisting_fixture(self): meta2 = MetaData(testing.db) @@ -1315,23 +1383,23 @@ class UseExistingTest(fixtures.TablesTest): def test_exception_no_flags(self): meta2 = self._useexisting_fixture() + def go(): Table('users', meta2, Column('name', - Unicode), autoload=True) + Unicode), autoload=True) assert_raises_message( exc.InvalidRequestError, - "Table 'users' is already defined for this "\ - "MetaData instance.", + "Table 'users' is already defined for this " + "MetaData instance.", go ) - def test_keep_plus_existing_raises(self): meta2 = self._useexisting_fixture() assert_raises( exc.ArgumentError, Table, 'users', meta2, keep_existing=True, - extend_existing=True + extend_existing=True ) @testing.uses_deprecated() @@ -1340,47 +1408,47 @@ class UseExistingTest(fixtures.TablesTest): assert_raises( exc.ArgumentError, Table, 'users', meta2, useexisting=True, - extend_existing=True + extend_existing=True ) def test_keep_existing_no_dupe_constraints(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, - Column('id', Integer), - Column('name', Unicode), - UniqueConstraint('name'), - keep_existing=True - ) + Column('id', Integer), + Column('name', Unicode), + UniqueConstraint('name'), + keep_existing=True + ) assert 'name' in users.c assert 'id' in users.c eq_(len(users.constraints), 2) u2 = Table('users', meta2, - Column('id', Integer), - Column('name', Unicode), - UniqueConstraint('name'), - keep_existing=True - ) + Column('id', Integer), + Column('name', Unicode), + UniqueConstraint('name'), + keep_existing=True + ) eq_(len(u2.constraints), 2) def test_extend_existing_dupes_constraints(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, - Column('id', Integer), - Column('name', Unicode), - UniqueConstraint('name'), - extend_existing=True - ) + Column('id', Integer), + Column('name', Unicode), + UniqueConstraint('name'), + extend_existing=True + ) assert 'name' in users.c assert 'id' in users.c eq_(len(users.constraints), 2) u2 = Table('users', meta2, - Column('id', Integer), - Column('name', Unicode), - UniqueConstraint('name'), - extend_existing=True - ) + Column('id', Integer), + Column('name', Unicode), + UniqueConstraint('name'), + extend_existing=True + ) # constraint got duped eq_(len(u2.constraints), 3) @@ -1399,8 +1467,8 @@ class UseExistingTest(fixtures.TablesTest): def test_keep_existing_add_column(self): meta2 = self._useexisting_fixture() users = Table('users', meta2, - Column('foo', Integer), - autoload=True, + Column('foo', Integer), + autoload=True, keep_existing=True) assert "foo" not in users.c @@ -1411,20 +1479,20 @@ class UseExistingTest(fixtures.TablesTest): assert isinstance(users.c.name.type, Unicode) @testing.skip_if( - lambda: testing.db.dialect.requires_name_normalize, - "test depends on lowercase as case insensitive") + lambda: testing.db.dialect.requires_name_normalize, + "test depends on lowercase as case insensitive") def test_keep_existing_quote_no_orig(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, quote=True, - autoload=True, + autoload=True, keep_existing=True) assert users.name.quote def test_keep_existing_add_column_no_orig(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, - Column('foo', Integer), - autoload=True, + Column('foo', Integer), + autoload=True, keep_existing=True) assert "foo" in users.c @@ -1443,7 +1511,7 @@ class UseExistingTest(fixtures.TablesTest): def test_keep_existing_add_column_no_reflection(self): meta2 = self._useexisting_fixture() users = Table('users', meta2, - Column('foo', Integer), + Column('foo', Integer), keep_existing=True) assert "foo" not in users.c @@ -1459,14 +1527,14 @@ class UseExistingTest(fixtures.TablesTest): tsa.exc.ArgumentError, "Can't redefine 'quote' or 'quote_schema' arguments", Table, 'users', meta2, quote=True, autoload=True, - extend_existing=True + extend_existing=True ) def test_extend_existing_add_column(self): meta2 = self._useexisting_fixture() users = Table('users', meta2, - Column('foo', Integer), - autoload=True, + Column('foo', Integer), + autoload=True, extend_existing=True) assert "foo" in users.c @@ -1477,20 +1545,20 @@ class UseExistingTest(fixtures.TablesTest): assert isinstance(users.c.name.type, Unicode) @testing.skip_if( - lambda: testing.db.dialect.requires_name_normalize, - "test depends on lowercase as case insensitive") + lambda: testing.db.dialect.requires_name_normalize, + "test depends on lowercase as case insensitive") def test_extend_existing_quote_no_orig(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, quote=True, - autoload=True, + autoload=True, extend_existing=True) assert users.name.quote def test_extend_existing_add_column_no_orig(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, - Column('foo', Integer), - autoload=True, + Column('foo', Integer), + autoload=True, extend_existing=True) assert "foo" in users.c @@ -1506,17 +1574,19 @@ class UseExistingTest(fixtures.TablesTest): tsa.exc.ArgumentError, "Can't redefine 'quote' or 'quote_schema' arguments", Table, 'users', meta2, quote=True, - extend_existing=True + extend_existing=True ) def test_extend_existing_add_column_no_reflection(self): meta2 = self._useexisting_fixture() users = Table('users', meta2, - Column('foo', Integer), + Column('foo', Integer), extend_existing=True) assert "foo" in users.c + class IndexTest(fixtures.TestBase): + def _assert(self, t, i, columns=True): eq_(t.indexes, set([i])) if columns: @@ -1584,21 +1654,22 @@ class IndexTest(fixtures.TestBase): class ConstraintTest(fixtures.TestBase): + def _single_fixture(self): m = MetaData() t1 = Table('t1', m, - Column('a', Integer), - Column('b', Integer) - ) + Column('a', Integer), + Column('b', Integer) + ) t2 = Table('t2', m, - Column('a', Integer, ForeignKey('t1.a')) - ) + Column('a', Integer, ForeignKey('t1.a')) + ) t3 = Table('t3', m, - Column('a', Integer) - ) + Column('a', Integer) + ) return t1, t2, t3 def test_table_references(self): @@ -1633,13 +1704,13 @@ class ConstraintTest(fixtures.TestBase): def test_related_column_not_present_atfirst_ok(self): m = MetaData() base_table = Table("base", m, - Column("id", Integer, primary_key=True) - ) + Column("id", Integer, primary_key=True) + ) fk = ForeignKey('base.q') derived_table = Table("derived", m, - Column("id", None, fk, - primary_key=True), - ) + Column("id", None, fk, + primary_key=True), + ) base_table.append_column(Column('q', Integer)) assert fk.column is base_table.c.q @@ -1651,7 +1722,7 @@ class ConstraintTest(fixtures.TestBase): assert_raises_message( exc.ArgumentError, r"ForeignKeyConstraint on t1\(x, y\) refers to " - "multiple remote tables: t2 and t3", + "multiple remote tables: t2 and t3", Table, 't1', m, Column('x', Integer), Column('y', Integer), ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y']) @@ -1666,7 +1737,7 @@ class ConstraintTest(fixtures.TestBase): assert_raises_message( exc.ArgumentError, r"ForeignKeyConstraint on t1\(x, y\) refers to " - "multiple remote tables: t2 and t3", + "multiple remote tables: t2 and t3", Table, 't1', m, Column('x', Integer), Column('y', Integer), ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y]) @@ -1680,17 +1751,17 @@ class ConstraintTest(fixtures.TestBase): # no error is raised for this one right now. # which is a minor bug. Table('t1', m, Column('x', Integer), Column('y', Integer), - ForeignKeyConstraint(['x', 'y'], [x, y]) - ) + ForeignKeyConstraint(['x', 'y'], [x, y]) + ) - t2 = Table('t2', m, x) - t3 = Table('t3', m, y) + Table('t2', m, x) + Table('t3', m, y) def test_constraint_copied_to_proxy_ok(self): m = MetaData() - t1 = Table('t1', m, Column('id', Integer, primary_key=True)) + Table('t1', m, Column('id', Integer, primary_key=True)) t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'), - primary_key=True)) + primary_key=True)) s = tsa.select([t2]) t2fk = list(t2.c.id.foreign_keys)[0] @@ -1711,9 +1782,10 @@ class ConstraintTest(fixtures.TestBase): def test_type_propagate_composite_fk_string(self): metadata = MetaData() - a = Table('a', metadata, - Column('key1', Integer, primary_key=True), - Column('key2', String(40), primary_key=True)) + Table( + 'a', metadata, + Column('key1', Integer, primary_key=True), + Column('key2', String(40), primary_key=True)) b = Table('b', metadata, Column('a_key1', None), @@ -1745,8 +1817,9 @@ class ConstraintTest(fixtures.TestBase): def test_type_propagate_standalone_fk_string(self): metadata = MetaData() - a = Table('a', metadata, - Column('key1', Integer, primary_key=True)) + Table( + 'a', metadata, + Column('key1', Integer, primary_key=True)) b = Table('b', metadata, Column('a_key1', None, ForeignKey("a.key1")), @@ -1767,8 +1840,10 @@ class ConstraintTest(fixtures.TestBase): def test_type_propagate_chained_string_source_first(self): metadata = MetaData() - a = Table('a', metadata, - Column('key1', Integer, primary_key=True)) + Table( + 'a', metadata, + Column('key1', Integer, primary_key=True) + ) b = Table('b', metadata, Column('a_key1', None, ForeignKey("a.key1")), @@ -1792,8 +1867,9 @@ class ConstraintTest(fixtures.TestBase): Column('b_key1', None, ForeignKey("b.a_key1")), ) - a = Table('a', metadata, - Column('key1', Integer, primary_key=True)) + Table( + 'a', metadata, + Column('key1', Integer, primary_key=True)) assert isinstance(b.c.a_key1.type, Integer) assert isinstance(c.c.b_key1.type, Integer) @@ -1823,8 +1899,10 @@ class ConstraintTest(fixtures.TestBase): c1 = Column('x', Integer) class CThing(object): + def __init__(self, c): self.c = c + def __clause_element__(self): return self.c @@ -1842,7 +1920,7 @@ class ConstraintTest(fixtures.TestBase): def test_column_accessor_string_no_parent_table(self): fk = ForeignKey("sometable.somecol") - c1 = Column('x', fk) + Column('x', fk) assert_raises_message( exc.InvalidRequestError, "this ForeignKey's parent column is not yet " @@ -1853,7 +1931,7 @@ class ConstraintTest(fixtures.TestBase): def test_column_accessor_string_no_target_table(self): fk = ForeignKey("sometable.somecol") c1 = Column('x', fk) - t1 = Table('t', MetaData(), c1) + Table('t', MetaData(), c1) assert_raises_message( exc.NoReferencedTableError, "Foreign key associated with column 't.x' could not find " @@ -1866,8 +1944,8 @@ class ConstraintTest(fixtures.TestBase): fk = ForeignKey("sometable.somecol") c1 = Column('x', fk) m = MetaData() - t1 = Table('t', m, c1) - t2 = Table("sometable", m, Column('notsomecol', Integer)) + Table('t', m, c1) + Table("sometable", m, Column('notsomecol', Integer)) assert_raises_message( exc.NoReferencedColumnError, "Could not initialize target column for ForeignKey " @@ -1914,20 +1992,21 @@ class ConstraintTest(fixtures.TestBase): class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): + """Test Column() construction.""" __dialect__ = 'default' def columns(self): return [Column(Integer), - Column('b', Integer), - Column(Integer), - Column('d', Integer), - Column(Integer, name='e'), - Column(type_=Integer), - Column(Integer()), - Column('h', Integer()), - Column(type_=Integer())] + Column('b', Integer), + Column(Integer), + Column('d', Integer), + Column(Integer, name='e'), + Column(type_=Integer), + Column(Integer()), + Column('h', Integer()), + Column(type_=Integer())] def test_basic(self): c = self.columns() @@ -2006,7 +2085,7 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): def test_bogus(self): assert_raises(exc.ArgumentError, Column, 'foo', name='bar') assert_raises(exc.ArgumentError, Column, 'foo', Integer, - type_=Integer()) + type_=Integer()) def test_custom_subclass_proxy(self): """test proxy generation of a Column subclass, can be compiled.""" @@ -2016,6 +2095,7 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): from sqlalchemy.sql import select class MyColumn(Column): + def _constructor(self, name, type, **kw): kw['name'] = name return MyColumn(type, **kw) @@ -2036,9 +2116,9 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): name = MyColumn(String) name.name = 'name' t1 = Table('foo', MetaData(), - id, - name - ) + id, + name + ) # goofy thing eq_(t1.c.name.my_goofy_thing(), "hi") @@ -2062,6 +2142,7 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): from sqlalchemy.sql import select class MyColumn(Column): + def __init__(self, type, **kw): Column.__init__(self, type, **kw) @@ -2070,9 +2151,9 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): name = MyColumn(String) name.name = 'name' t1 = Table('foo', MetaData(), - id, - name - ) + id, + name + ) assert_raises_message( TypeError, "Could not create a copy of this <class " @@ -2092,9 +2173,9 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): return compiler.visit_create_column(element, **kw) text = "%s SPECIAL DIRECTIVE %s" % ( - column.name, - compiler.type_compiler.process(column.type) - ) + column.name, + compiler.type_compiler.process(column.type) + ) default = compiler.get_column_default_string(column) if default is not None: text += " DEFAULT " + default @@ -2104,26 +2185,30 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): if column.constraints: text += " ".join( - compiler.process(const) - for const in column.constraints) + compiler.process(const) + for const in column.constraints) return text - t = Table('mytable', MetaData(), - Column('x', Integer, info={"special": True}, primary_key=True), - Column('y', String(50)), - Column('z', String(20), info={"special": True}) - ) + t = Table( + 'mytable', MetaData(), + Column('x', Integer, info={ + "special": True}, primary_key=True), + Column('y', String(50)), + Column('z', String(20), info={ + "special": True})) self.assert_compile( schema.CreateTable(t), "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER " - "NOT NULL, y VARCHAR(50), " - "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))" + "NOT NULL, y VARCHAR(50), " + "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))" ) deregister(schema.CreateColumn) + class ColumnDefaultsTest(fixtures.TestBase): + """test assignment of default fixures to columns""" def _fixture(self, *arg, **kw): @@ -2232,6 +2317,7 @@ class ColumnDefaultsTest(fixtures.TestBase): assert c.onupdate.arg == target assert c.onupdate.column is c + class ColumnOptionsTest(fixtures.TestBase): def test_default_generators(self): @@ -2283,7 +2369,6 @@ class ColumnOptionsTest(fixtures.TestBase): self._no_error(Column("foo", ForeignKey('bar.id'), default="foo")) self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a"))) - def test_column_info(self): c1 = Column('foo', String, info={'x': 'y'}) @@ -2297,28 +2382,36 @@ class ColumnOptionsTest(fixtures.TestBase): c.info['bar'] = 'zip' assert c.info['bar'] == 'zip' + class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): def test_all_events(self): canary = [] + def before_attach(obj, parent): canary.append("%s->%s" % (obj.__class__.__name__, - parent.__class__.__name__)) + parent.__class__.__name__)) def after_attach(obj, parent): canary.append("%s->%s" % (obj.__class__.__name__, parent)) - self.event_listen(schema.SchemaItem, "before_parent_attach", before_attach) - self.event_listen(schema.SchemaItem, "after_parent_attach", after_attach) + self.event_listen( + schema.SchemaItem, + "before_parent_attach", + before_attach) + self.event_listen( + schema.SchemaItem, + "after_parent_attach", + after_attach) m = MetaData() Table('t1', m, - Column('id', Integer, Sequence('foo_id'), primary_key=True), - Column('bar', String, ForeignKey('t2.id')) - ) + Column('id', Integer, Sequence('foo_id'), primary_key=True), + Column('bar', String, ForeignKey('t2.id')) + ) Table('t2', m, - Column('id', Integer, primary_key=True), - ) + Column('id', Integer, primary_key=True), + ) eq_( canary, @@ -2339,7 +2432,7 @@ class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): def evt(target): def before_attach(obj, parent): canary.append("%s->%s" % (target.__name__, - parent.__class__.__name__)) + parent.__class__.__name__)) def after_attach(obj, parent): assert hasattr(obj, 'name') # so we can change it @@ -2357,18 +2450,18 @@ class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): m = MetaData() Table('t1', m, - Column('id', Integer, Sequence('foo_id'), primary_key=True), - Column('bar', String, ForeignKey('t2.id'), index=True), - Column('bat', Integer, unique=True), - ) + Column('id', Integer, Sequence('foo_id'), primary_key=True), + Column('bar', String, ForeignKey('t2.id'), index=True), + Column('bat', Integer, unique=True), + ) Table('t2', m, - Column('id', Integer, primary_key=True), - Column('bar', Integer), - Column('bat', Integer), - CheckConstraint("bar>5"), - UniqueConstraint('bar', 'bat'), - Index(None, 'bar', 'bat') - ) + Column('id', Integer, primary_key=True), + Column('bar', Integer), + Column('bat', Integer), + CheckConstraint("bar>5"), + UniqueConstraint('bar', 'bat'), + Index(None, 'bar', 'bat') + ) eq_( canary, [ @@ -2383,10 +2476,13 @@ class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase): ] ) + class DialectKWArgTest(fixtures.TestBase): + @contextmanager def _fixture(self): from sqlalchemy.engine.default import DefaultDialect + class ParticipatingDialect(DefaultDialect): construct_arguments = [ (schema.Index, { @@ -2445,7 +2541,12 @@ class DialectKWArgTest(fixtures.TestBase): def test_nonparticipating(self): with self._fixture(): - idx = Index('a', 'b', 'c', nonparticipating_y=True, nonparticipating_q=5) + idx = Index( + 'a', + 'b', + 'c', + nonparticipating_y=True, + nonparticipating_q=5) eq_( idx.dialect_kwargs, { @@ -2459,9 +2560,10 @@ class DialectKWArgTest(fixtures.TestBase): assert_raises_message( TypeError, "Additional arguments should be named " - "<dialectname>_<argument>, got 'foobar'", + "<dialectname>_<argument>, got 'foobar'", Index, 'a', 'b', 'c', foobar=True ) + def test_unknown_dialect_warning(self): with self._fixture(): assert_raises_message( @@ -2503,7 +2605,7 @@ class DialectKWArgTest(fixtures.TestBase): def test_unknown_dialect_warning_still_populates_multiple(self): with self._fixture(): idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5, - otherunknown_foo='bar', participating_y=8) + otherunknown_foo='bar', participating_y=8) eq_( idx.dialect_options, { @@ -2514,14 +2616,14 @@ class DialectKWArgTest(fixtures.TestBase): ) eq_(idx.dialect_kwargs, {'unknown_z': 5, 'participating_y': 8, - 'unknown_y': True, - 'otherunknown_foo': 'bar'} - ) # still populates + 'unknown_y': True, + 'otherunknown_foo': 'bar'} + ) # still populates def test_runs_safekwarg(self): with mock.patch("sqlalchemy.util.safe_kwarg", - lambda arg: "goofy_%s" % arg): + lambda arg: "goofy_%s" % arg): with self._fixture(): idx = Index('a', 'b') idx.kwargs[u'participating_x'] = 7 @@ -2534,7 +2636,7 @@ class DialectKWArgTest(fixtures.TestBase): def test_combined(self): with self._fixture(): idx = Index('a', 'b', 'c', participating_x=7, - nonparticipating_y=True) + nonparticipating_y=True) eq_( idx.dialect_options, @@ -2557,7 +2659,7 @@ class DialectKWArgTest(fixtures.TestBase): participating_x=7, participating2_x=15, participating2_y="lazy" - ) + ) eq_( idx.dialect_options, { @@ -2579,7 +2681,10 @@ class DialectKWArgTest(fixtures.TestBase): m = MetaData() fk = ForeignKey('t2.id', participating_foobar=True) t = Table('t', m, Column('id', Integer, fk)) - fkc = [c for c in t.constraints if isinstance(c, ForeignKeyConstraint)][0] + fkc = [ + c for c in t.constraints if isinstance( + c, + ForeignKeyConstraint)][0] eq_( fkc.dialect_kwargs, {'participating_foobar': True} @@ -2602,9 +2707,9 @@ class DialectKWArgTest(fixtures.TestBase): with self._fixture(): m = MetaData() t = Table('x', m, Column('x', Integer), - participating2_xyz='foo', - participating2_engine='InnoDB', - ) + participating2_xyz='foo', + participating2_engine='InnoDB', + ) eq_( t.dialect_kwargs, { @@ -2636,48 +2741,47 @@ class DialectKWArgTest(fixtures.TestBase): t = Table('x', m, Column('x', Integer), participating2_foobar=5) assert 'foobar' in t.dialect_options['participating2'] - def test_update(self): with self._fixture(): idx = Index('a', 'b', 'c', participating_x=20) eq_(idx.dialect_kwargs, { - "participating_x": 20, - }) + "participating_x": 20, + }) idx._validate_dialect_kwargs({ - "participating_x": 25, - "participating_z_one": "default"}) + "participating_x": 25, + "participating_z_one": "default"}) eq_(idx.dialect_options, { - "participating": {"x": 25, "y": False, "z_one": "default"} - }) + "participating": {"x": 25, "y": False, "z_one": "default"} + }) eq_(idx.dialect_kwargs, { - "participating_x": 25, - 'participating_z_one': "default" - }) + "participating_x": 25, + 'participating_z_one': "default" + }) idx._validate_dialect_kwargs({ - "participating_x": 25, - "participating_z_one": "default"}) + "participating_x": 25, + "participating_z_one": "default"}) eq_(idx.dialect_options, { - "participating": {"x": 25, "y": False, "z_one": "default"} - }) + "participating": {"x": 25, "y": False, "z_one": "default"} + }) eq_(idx.dialect_kwargs, { - "participating_x": 25, - 'participating_z_one': "default" - }) + "participating_x": 25, + 'participating_z_one': "default" + }) idx._validate_dialect_kwargs({ - "participating_y": True, - 'participating2_y': "p2y"}) + "participating_y": True, + 'participating2_y': "p2y"}) eq_(idx.dialect_options, { - "participating": {"x": 25, "y": True, "z_one": "default"}, - "participating2": {"y": "p2y", "pp": "default", "x": 9} - }) + "participating": {"x": 25, "y": True, "z_one": "default"}, + "participating2": {"y": "p2y", "pp": "default", "x": 9} + }) eq_(idx.dialect_kwargs, { - "participating_x": 25, - "participating_y": True, - 'participating2_y': "p2y", - "participating_z_one": "default"}) + "participating_x": 25, + "participating_y": True, + 'participating2_y': "p2y", + "participating_z_one": "default"}) def test_key_error_kwargs_no_dialect(self): with self._fixture(): @@ -2718,8 +2822,8 @@ class DialectKWArgTest(fixtures.TestBase): assert_raises( KeyError, - idx.dialect_options['nonparticipating'].__getitem__, 'asdfaso890' - ) + idx.dialect_options['nonparticipating'].__getitem__, + 'asdfaso890') def test_ad_hoc_participating_via_opt(self): with self._fixture(): @@ -2798,11 +2902,10 @@ class DialectKWArgTest(fixtures.TestBase): assert_raises_message( exc.ArgumentError, "Dialect 'nonparticipating' does have keyword-argument " - "validation and defaults enabled configured", + "validation and defaults enabled configured", Index.argument_for, "nonparticipating", "xyzqpr", False ) - def test_add_new_arguments_invalid_dialect(self): with self._fixture(): assert_raises_message( @@ -2819,18 +2922,18 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): m1 = MetaData(naming_convention=naming_convention) u1 = Table('user', m1, - Column('id', Integer, primary_key=True), - Column('version', Integer, primary_key=True), - Column('data', String(30)), - schema=table_schema - ) + Column('id', Integer, primary_key=True), + Column('version', Integer, primary_key=True), + Column('data', String(30)), + schema=table_schema + ) return u1 def test_uq_name(self): u1 = self._fixture(naming_convention={ - "uq": "uq_%(table_name)s_%(column_0_name)s" - }) + "uq": "uq_%(table_name)s_%(column_0_name)s" + }) uq = UniqueConstraint(u1.c.data) eq_(uq.name, "uq_user_data") @@ -2863,16 +2966,16 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): def test_column_attached_ck_name(self): m = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s" - }) + "ck": "ck_%(table_name)s_%(constraint_name)s" + }) ck = CheckConstraint('x > 5', name='x1') Table('t', m, Column('x', ck)) eq_(ck.name, "ck_t_x1") def test_table_attached_ck_name(self): m = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s" - }) + "ck": "ck_%(table_name)s_%(constraint_name)s" + }) ck = CheckConstraint('x > 5', name='x1') Table('t', m, Column('x', Integer), ck) eq_(ck.name, "ck_t_x1") @@ -2888,67 +2991,64 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): t.append_constraint(uq) eq_(uq.name, "my_special_key") - def test_fk_name_schema(self): u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(column_0_name)s_" - "%(referred_table_name)s_%(referred_column_0_name)s" - }, table_schema="foo") + "fk": "fk_%(table_name)s_%(column_0_name)s_" + "%(referred_table_name)s_%(referred_column_0_name)s" + }, table_schema="foo") m1 = u1.metadata a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('user_id', Integer), - Column('user_version_id', Integer) - ) + Column('id', Integer, primary_key=True), + Column('user_id', Integer), + Column('user_version_id', Integer) + ) fk = ForeignKeyConstraint(['user_id', 'user_version_id'], - ['foo.user.id', 'foo.user.version']) + ['foo.user.id', 'foo.user.version']) a1.append_constraint(fk) eq_(fk.name, "fk_address_user_id_user_id") - def test_fk_attrs(self): u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(column_0_name)s_" - "%(referred_table_name)s_%(referred_column_0_name)s" - }) + "fk": "fk_%(table_name)s_%(column_0_name)s_" + "%(referred_table_name)s_%(referred_column_0_name)s" + }) m1 = u1.metadata a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('user_id', Integer), - Column('user_version_id', Integer) - ) + Column('id', Integer, primary_key=True), + Column('user_id', Integer), + Column('user_version_id', Integer) + ) fk = ForeignKeyConstraint(['user_id', 'user_version_id'], - ['user.id', 'user.version']) + ['user.id', 'user.version']) a1.append_constraint(fk) eq_(fk.name, "fk_address_user_id_user_id") - def test_custom(self): def key_hash(const, table): return "HASH_%s" % table.name u1 = self._fixture(naming_convention={ - "fk": "fk_%(table_name)s_%(key_hash)s", - "key_hash": key_hash - }) + "fk": "fk_%(table_name)s_%(key_hash)s", + "key_hash": key_hash + }) m1 = u1.metadata a1 = Table('address', m1, - Column('id', Integer, primary_key=True), - Column('user_id', Integer), - Column('user_version_id', Integer) - ) + Column('id', Integer, primary_key=True), + Column('user_id', Integer), + Column('user_version_id', Integer) + ) fk = ForeignKeyConstraint(['user_id', 'user_version_id'], - ['user.id', 'user.version']) + ['user.id', 'user.version']) a1.append_constraint(fk) eq_(fk.name, "fk_address_HASH_address") def test_schematype_ck_name_boolean(self): m1 = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s"}) + "ck": "ck_%(table_name)s_%(constraint_name)s"}) u1 = Table('user', m1, - Column('x', Boolean(name='foo')) - ) + Column('x', Boolean(name='foo')) + ) # constraint is not hit eq_( [c for c in u1.constraints @@ -2965,11 +3065,11 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): def test_schematype_ck_name_enum(self): m1 = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s"}) + "ck": "ck_%(table_name)s_%(constraint_name)s"}) u1 = Table('user', m1, - Column('x', Enum('a', 'b', name='foo')) - ) + Column('x', Enum('a', 'b', name='foo')) + ) eq_( [c for c in u1.constraints if isinstance(c, CheckConstraint)][0].name, "foo" @@ -2978,18 +3078,18 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( schema.CreateTable(u1), 'CREATE TABLE "user" (' - "x VARCHAR(1), " - "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" + "x VARCHAR(1), " + "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))" ")" ) def test_schematype_ck_name_propagate_conv(self): m1 = MetaData(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s"}) + "ck": "ck_%(table_name)s_%(constraint_name)s"}) u1 = Table('user', m1, - Column('x', Enum('a', 'b', name=naming.conv('foo'))) - ) + Column('x', Enum('a', 'b', name=naming.conv('foo'))) + ) eq_( [c for c in u1.constraints if isinstance(c, CheckConstraint)][0].name, "foo" @@ -2998,8 +3098,8 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( schema.CreateTable(u1), 'CREATE TABLE "user" (' - "x VARCHAR(1), " - "CONSTRAINT foo CHECK (x IN ('a', 'b'))" + "x VARCHAR(1), " + "CONSTRAINT foo CHECK (x IN ('a', 'b'))" ")" ) @@ -3051,10 +3151,9 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): 'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))' ) - def test_ck_constraint_redundant_event(self): u1 = self._fixture(naming_convention={ - "ck": "ck_%(table_name)s_%(constraint_name)s"}) + "ck": "ck_%(table_name)s_%(constraint_name)s"}) ck1 = CheckConstraint(u1.c.version > 3, name='foo') u1.append_constraint(ck1) @@ -3062,4 +3161,3 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): u1.append_constraint(ck1) eq_(ck1.name, "ck_user_foo") - |
