diff options
author | Tony Locke <tlocke@tlocke.org.uk> | 2014-07-09 21:47:04 +0100 |
---|---|---|
committer | Tony Locke <tlocke@tlocke.org.uk> | 2014-07-19 20:20:12 +0100 |
commit | ebf74bd659340480bb37216f1e64d4b79aba4939 (patch) | |
tree | 995cbd1e04347dd2f585a8b606de5d2ed42ae311 | |
parent | 827329a0cca5351094a1a86b6b2be2b9182f0ae2 (diff) | |
download | sqlalchemy-ebf74bd659340480bb37216f1e64d4b79aba4939.tar.gz |
PEP8 tidy for test/orm/test_versioning.py
-rw-r--r-- | test/orm/test_versioning.py | 327 |
1 files changed, 179 insertions, 148 deletions
diff --git a/test/orm/test_versioning.py b/test/orm/test_versioning.py index 7a6dc106e..614909f6a 100644 --- a/test/orm/test_versioning.py +++ b/test/orm/test_versioning.py @@ -2,21 +2,22 @@ import datetime import sqlalchemy as sa from sqlalchemy.testing import engines from sqlalchemy import testing -from sqlalchemy import Integer, String, Date, ForeignKey, literal_column, \ - orm, exc, select, TypeDecorator +from sqlalchemy import ( + Integer, String, Date, ForeignKey, orm, exc, select, TypeDecorator) from sqlalchemy.testing.schema import Table, Column -from sqlalchemy.orm import mapper, relationship, Session, \ - create_session, column_property, sessionmaker,\ - exc as orm_exc -from sqlalchemy.testing import eq_, ne_, assert_raises, assert_raises_message -from sqlalchemy.testing import fixtures -from test.orm import _fixtures -from sqlalchemy.testing.assertsql import AllOf, CompiledSQL +from sqlalchemy.orm import ( + mapper, relationship, Session, create_session, sessionmaker, + exc as orm_exc) +from sqlalchemy.testing import ( + eq_, assert_raises, assert_raises_message, fixtures) +from sqlalchemy.testing.assertsql import CompiledSQL import uuid + def make_uuid(): return uuid.uuid4().hex + class VersioningTest(fixtures.MappedTest): __backend__ = True @@ -36,8 +37,7 @@ class VersioningTest(fixtures.MappedTest): def _fixture(self): Foo, version_table = self.classes.Foo, self.tables.version_table - mapper(Foo, version_table, - version_id_col=version_table.c.version_id) + mapper(Foo, version_table, version_id_col=version_table.c.version_id) s1 = Session() return s1 @@ -54,12 +54,13 @@ class VersioningTest(fixtures.MappedTest): s1.add_all((f1, f2)) s1.commit() - f1.value='f1rev2' + f1.value = 'f1rev2' assert_raises(sa.exc.SAWarning, s1.commit) finally: testing.db.dialect.supports_sane_rowcount = save - @testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support (update|delete)d rowcount') def test_basic(self): Foo = self.classes.Foo @@ -69,23 +70,23 @@ class VersioningTest(fixtures.MappedTest): s1.add_all((f1, f2)) s1.commit() - f1.value='f1rev2' + f1.value = 'f1rev2' s1.commit() s2 = create_session(autocommit=False) f1_s = s2.query(Foo).get(f1.id) - f1_s.value='f1rev3' + f1_s.value = 'f1rev3' s2.commit() - f1.value='f1rev3mine' + f1.value = 'f1rev3mine' # Only dialects with a sane rowcount can detect the # StaleDataError if testing.db.dialect.supports_sane_rowcount: - assert_raises_message(sa.orm.exc.StaleDataError, - r"UPDATE statement on table 'version_table' expected " - r"to update 1 row\(s\); 0 were matched.", - s1.commit), + assert_raises_message( + sa.orm.exc.StaleDataError, + r"UPDATE statement on table 'version_table' expected " + r"to update 1 row\(s\); 0 were matched.", s1.commit), s1.rollback() else: s1.commit() @@ -94,7 +95,7 @@ class VersioningTest(fixtures.MappedTest): f1 = s1.query(Foo).get(f1.id) f2 = s1.query(Foo).get(f2.id) - f1_s.value='f1rev4' + f1_s.value = 'f1rev4' s2.commit() s1.delete(f1) @@ -109,7 +110,8 @@ class VersioningTest(fixtures.MappedTest): else: s1.commit() - @testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support (update|delete)d rowcount') def test_bump_version(self): """test that version number can be bumped. @@ -145,11 +147,11 @@ class VersioningTest(fixtures.MappedTest): @testing.emits_warning(r'.*does not support updated rowcount') @engines.close_open_connections def test_versioncheck(self): - """query.with_lockmode performs a 'version check' on an already loaded instance""" + """query.with_lockmode performs a 'version check' on an already loaded + instance""" Foo = self.classes.Foo - s1 = self._fixture() f1s1 = Foo(value='f1 value') s1.add(f1s1) @@ -157,16 +159,16 @@ class VersioningTest(fixtures.MappedTest): s2 = create_session(autocommit=False) f1s2 = s2.query(Foo).get(f1s1.id) - f1s2.value='f1 new value' + f1s2.value = 'f1 new value' s2.commit() # load, version is wrong assert_raises_message( - sa.orm.exc.StaleDataError, - r"Instance .* has version id '\d+' which does not " - r"match database-loaded version id '\d+'", - s1.query(Foo).with_lockmode('read').get, f1s1.id - ) + sa.orm.exc.StaleDataError, + r"Instance .* has version id '\d+' which does not " + r"match database-loaded version id '\d+'", + s1.query(Foo).with_lockmode('read').get, f1s1.id + ) # reload it - this expires the old version first s1.refresh(f1s1, lockmode='read') @@ -178,16 +180,15 @@ class VersioningTest(fixtures.MappedTest): s1.close() s1.query(Foo).with_lockmode('read').get(f1s1.id) - @testing.emits_warning(r'.*does not support updated rowcount') @engines.close_open_connections @testing.requires.update_nowait def test_versioncheck_for_update(self): - """query.with_lockmode performs a 'version check' on an already loaded instance""" + """query.with_lockmode performs a 'version check' on an already loaded + instance""" Foo = self.classes.Foo - s1 = self._fixture() f1s1 = Foo(value='f1 value') s1.add(f1s1) @@ -196,7 +197,7 @@ class VersioningTest(fixtures.MappedTest): s2 = create_session(autocommit=False) f1s2 = s2.query(Foo).get(f1s1.id) s2.refresh(f1s2, lockmode='update') - f1s2.value='f1 new value' + f1s2.value = 'f1 new value' assert_raises( exc.DBAPIError, @@ -211,7 +212,8 @@ class VersioningTest(fixtures.MappedTest): @testing.emits_warning(r'.*does not support updated rowcount') @engines.close_open_connections def test_noversioncheck(self): - """test query.with_lockmode works when the mapper has no version id col""" + """test query.with_lockmode works when the mapper has no version id + col""" Foo, version_table = self.classes.Foo, self.tables.version_table @@ -226,7 +228,8 @@ class VersioningTest(fixtures.MappedTest): assert f1s2.id == f1s1.id assert f1s2.value == f1s1.value - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_merge_no_version(self): Foo = self.classes.Foo @@ -244,7 +247,8 @@ class VersioningTest(fixtures.MappedTest): s1.commit() eq_(f3.version_id, 3) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_merge_correct_version(self): Foo = self.classes.Foo @@ -262,7 +266,8 @@ class VersioningTest(fixtures.MappedTest): s1.commit() eq_(f3.version_id, 3) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_merge_incorrect_version(self): Foo = self.classes.Foo @@ -284,7 +289,8 @@ class VersioningTest(fixtures.MappedTest): s1.merge, f2 ) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_merge_incorrect_version_not_in_session(self): Foo = self.classes.Foo @@ -308,6 +314,7 @@ class VersioningTest(fixtures.MappedTest): s1.merge, f2 ) + class ColumnTypeTest(fixtures.MappedTest): __backend__ = True @@ -315,6 +322,7 @@ class ColumnTypeTest(fixtures.MappedTest): def define_tables(cls, metadata): class SpecialType(TypeDecorator): impl = Date + def process_bind_param(self, value, dialect): assert isinstance(value, datetime.date) return value @@ -332,8 +340,7 @@ class ColumnTypeTest(fixtures.MappedTest): def _fixture(self): Foo, version_table = self.classes.Foo, self.tables.version_table - mapper(Foo, version_table, - version_id_col=version_table.c.version_id) + mapper(Foo, version_table, version_id_col=version_table.c.version_id) s1 = Session() return s1 @@ -346,19 +353,23 @@ class ColumnTypeTest(fixtures.MappedTest): s1.add(f1) s1.commit() - f1.value='f1rev2' + f1.value = 'f1rev2' s1.commit() + class RowSwitchTest(fixtures.MappedTest): __backend__ = True + @classmethod def define_tables(cls, metadata): - Table('p', metadata, + Table( + 'p', metadata, Column('id', String(10), primary_key=True), Column('version_id', Integer, default=1, nullable=False), Column('data', String(50)) ) - Table('c', metadata, + Table( + 'c', metadata, Column('id', String(10), ForeignKey('p.id'), primary_key=True), Column('version_id', Integer, default=1, nullable=False), Column('data', String(50)) @@ -366,25 +377,25 @@ class RowSwitchTest(fixtures.MappedTest): @classmethod def setup_classes(cls): + class P(cls.Basic): pass + class C(cls.Basic): pass @classmethod def setup_mappers(cls): - p, c, C, P = (cls.tables.p, - cls.tables.c, - cls.classes.C, - cls.classes.P) + p, c, C, P = cls.tables.p, cls.tables.c, cls.classes.C, cls.classes.P - mapper(P, p, version_id_col=p.c.version_id, - properties={ - 'c':relationship(C, uselist=False, cascade='all, delete-orphan') - }) + mapper( + P, p, version_id_col=p.c.version_id, properties={ + 'c': relationship( + C, uselist=False, cascade='all, delete-orphan')}) mapper(C, c, version_id_col=c.c.version_id) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_row_switch(self): P = self.classes.P @@ -398,7 +409,8 @@ class RowSwitchTest(fixtures.MappedTest): session.add(P(id='P1', data="really a row-switch")) session.commit() - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_child_row_switch(self): P, C = self.classes.P, self.classes.C @@ -417,16 +429,20 @@ class RowSwitchTest(fixtures.MappedTest): p.c = C(data='child row-switch') session.commit() + class AlternateGeneratorTest(fixtures.MappedTest): __backend__ = True + @classmethod def define_tables(cls, metadata): - Table('p', metadata, + Table( + 'p', metadata, Column('id', String(10), primary_key=True), Column('version_id', String(32), nullable=False), Column('data', String(50)) ) - Table('c', metadata, + Table( + 'c', metadata, Column('id', String(10), ForeignKey('p.id'), primary_key=True), Column('version_id', String(32), nullable=False), Column('data', String(50)) @@ -434,28 +450,31 @@ class AlternateGeneratorTest(fixtures.MappedTest): @classmethod def setup_classes(cls): + class P(cls.Basic): pass + class C(cls.Basic): pass @classmethod def setup_mappers(cls): - p, c, C, P = (cls.tables.p, - cls.tables.c, - cls.classes.C, - cls.classes.P) + p, c, C, P = cls.tables.p, cls.tables.c, cls.classes.C, cls.classes.P - mapper(P, p, version_id_col=p.c.version_id, + mapper( + P, p, version_id_col=p.c.version_id, version_id_generator=lambda x: make_uuid(), properties={ - 'c': relationship(C, uselist=False, cascade='all, delete-orphan') - }) - mapper(C, c, version_id_col=c.c.version_id, - version_id_generator=lambda x: make_uuid(), + 'c': relationship( + C, uselist=False, cascade='all, delete-orphan') + }) + mapper( + C, c, version_id_col=c.c.version_id, + version_id_generator=lambda x: make_uuid(), ) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_row_switch(self): P = self.classes.P @@ -469,7 +488,8 @@ class AlternateGeneratorTest(fixtures.MappedTest): session.add(P(id='P1', data="really a row-switch")) session.commit() - @testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support (update|delete)d rowcount') def test_child_row_switch_one(self): P, C = self.classes.P, self.classes.C @@ -488,7 +508,8 @@ class AlternateGeneratorTest(fixtures.MappedTest): p.c = C(data='child row-switch') session.commit() - @testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support (update|delete)d rowcount') def test_child_row_switch_two(self): P = self.classes.P @@ -525,20 +546,26 @@ class AlternateGeneratorTest(fixtures.MappedTest): else: sess2.commit + class InheritanceTwoVersionIdsTest(fixtures.MappedTest): """Test versioning where both parent/child table have a versioning column. """ __backend__ = True + @classmethod def define_tables(cls, metadata): - Table('base', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Table( + 'base', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('version_id', Integer, nullable=True), Column('data', String(50)) ) - Table('sub', metadata, + Table( + 'sub', metadata, Column('id', Integer, ForeignKey('base.id'), primary_key=True), Column('version_id', Integer, nullable=False), Column('sub_data', String(50)) @@ -546,19 +573,19 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): @classmethod def setup_classes(cls): + class Base(cls.Basic): pass + class Sub(Base): pass def test_base_both(self): - Base, sub, base, Sub = (self.classes.Base, - self.tables.sub, - self.tables.base, - self.classes.Sub) + Base, sub, base, Sub = ( + self.classes.Base, self.tables.sub, self.tables.base, + self.classes.Sub) - mapper(Base, base, - version_id_col=base.c.version_id) + mapper(Base, base, version_id_col=base.c.version_id) mapper(Sub, sub, inherits=Base) session = Session() @@ -570,13 +597,11 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): eq_(select([base.c.version_id]).scalar(), 1) def test_sub_both(self): - Base, sub, base, Sub = (self.classes.Base, - self.tables.sub, - self.tables.base, - self.classes.Sub) + Base, sub, base, Sub = ( + self.classes.Base, self.tables.sub, self.tables.base, + self.classes.Sub) - mapper(Base, base, - version_id_col=base.c.version_id) + mapper(Base, base, version_id_col=base.c.version_id) mapper(Sub, sub, inherits=Base) session = Session() @@ -591,14 +616,12 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): eq_(select([base.c.version_id]).scalar(), 1) def test_sub_only(self): - Base, sub, base, Sub = (self.classes.Base, - self.tables.sub, - self.tables.base, - self.classes.Sub) + Base, sub, base, Sub = ( + self.classes.Base, self.tables.sub, self.tables.base, + self.classes.Sub) mapper(Base, base) - mapper(Sub, sub, inherits=Base, - version_id_col=sub.c.version_id) + mapper(Sub, sub, inherits=Base, version_id_col=sub.c.version_id) session = Session() s1 = Sub(data='s1', sub_data='s1') @@ -612,13 +635,11 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): eq_(select([base.c.version_id]).scalar(), None) def test_mismatch_version_col_warning(self): - Base, sub, base, Sub = (self.classes.Base, - self.tables.sub, - self.tables.base, - self.classes.Sub) + Base, sub, base, Sub = ( + self.classes.Base, self.tables.sub, self.tables.base, + self.classes.Sub) - mapper(Base, base, - version_id_col=base.c.version_id) + mapper(Base, base, version_id_col=base.c.version_id) assert_raises_message( exc.SAWarning, @@ -627,9 +648,8 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): "automatically populate the inherited versioning column. " "version_id_col should only be specified on " "the base-most mapper that includes versioning.", - mapper, - Sub, sub, inherits=Base, - version_id_col=sub.c.version_id) + mapper, Sub, sub, inherits=Base, + version_id_col=sub.c.version_id) class ServerVersioningTest(fixtures.MappedTest): @@ -659,27 +679,32 @@ class ServerVersioningTest(fixtures.MappedTest): stmt._counter = str(next(counter)) return stmt._counter - Table('version_table', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('version_id', Integer, nullable=False, - default=IncDefault(), onupdate=IncDefault()), - Column('value', String(40), nullable=False)) + Table( + 'version_table', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column( + 'version_id', Integer, nullable=False, + default=IncDefault(), onupdate=IncDefault()), + Column('value', String(40), nullable=False)) @classmethod def setup_classes(cls): + class Foo(cls.Basic): pass + class Bar(cls.Basic): pass def _fixture(self, expire_on_commit=True): Foo, version_table = self.classes.Foo, self.tables.version_table - mapper(Foo, version_table, - version_id_col=version_table.c.version_id, - version_id_generator=False, - ) + mapper( + Foo, version_table, version_id_col=version_table.c.version_id, + version_id_generator=False, + ) s1 = Session(expire_on_commit=expire_on_commit) return s1 @@ -691,21 +716,22 @@ class ServerVersioningTest(fixtures.MappedTest): sess.add(f1) statements = [ - # note that the assertsql tests the rule against - # "default" - on a "returning" backend, the statement - # includes "RETURNING" - CompiledSQL( - "INSERT INTO version_table (version_id, value) " - "VALUES (1, :value)", - lambda ctx: [{'value': 'f1'}] - ) + # note that the assertsql tests the rule against + # "default" - on a "returning" backend, the statement + # includes "RETURNING" + CompiledSQL( + "INSERT INTO version_table (version_id, value) " + "VALUES (1, :value)", + lambda ctx: [{'value': 'f1'}] + ) ] if not testing.db.dialect.implicit_returning: # DBs without implicit returning, we must immediately # SELECT for the new version id statements.append( CompiledSQL( - "SELECT version_table.version_id AS version_table_version_id " + "SELECT version_table.version_id " + " AS version_table_version_id " "FROM version_table WHERE version_table.id = :param_1", lambda ctx: [{"param_1": 1}] ) @@ -722,30 +748,32 @@ class ServerVersioningTest(fixtures.MappedTest): f1.value = 'f2' statements = [ - # note that the assertsql tests the rule against - # "default" - on a "returning" backend, the statement - # includes "RETURNING" - CompiledSQL( - "UPDATE version_table SET version_id=2, value=:value " - "WHERE version_table.id = :version_table_id AND " - "version_table.version_id = :version_table_version_id", - lambda ctx: [{"version_table_id": 1, - "version_table_version_id": 1, "value": "f2"}] - ) + # note that the assertsql tests the rule against + # "default" - on a "returning" backend, the statement + # includes "RETURNING" + CompiledSQL( + "UPDATE version_table SET version_id=2, value=:value " + "WHERE version_table.id = :version_table_id AND " + "version_table.version_id = :version_table_version_id", + lambda ctx: [ + { + "version_table_id": 1, + "version_table_version_id": 1, "value": "f2"}] + ) ] if not testing.db.dialect.implicit_returning: # DBs without implicit returning, we must immediately # SELECT for the new version id statements.append( CompiledSQL( - "SELECT version_table.version_id AS version_table_version_id " + "SELECT version_table.version_id " + " AS version_table_version_id " "FROM version_table WHERE version_table.id = :param_1", lambda ctx: [{"param_1": 1}] ) ) self.assert_sql_execution(testing.db, sess.flush, *statements) - def test_delete_col(self): sess = self._fixture() @@ -756,15 +784,15 @@ class ServerVersioningTest(fixtures.MappedTest): sess.delete(f1) statements = [ - # note that the assertsql tests the rule against - # "default" - on a "returning" backend, the statement - # includes "RETURNING" - CompiledSQL( - "DELETE FROM version_table " - "WHERE version_table.id = :id AND " - "version_table.version_id = :version_id", - lambda ctx: [{"id": 1, "version_id": 1}] - ) + # note that the assertsql tests the rule against + # "default" - on a "returning" backend, the statement + # includes "RETURNING" + CompiledSQL( + "DELETE FROM version_table " + "WHERE version_table.id = :id AND " + "version_table.version_id = :version_id", + lambda ctx: [{"id": 1, "version_id": 1}] + ) ] self.assert_sql_execution(testing.db, sess.flush, *statements) @@ -817,29 +845,32 @@ class ServerVersioningTest(fixtures.MappedTest): sess.commit ) + class ManualVersionTest(fixtures.MappedTest): run_define_tables = 'each' __backend__ = True @classmethod def define_tables(cls, metadata): - Table("a", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(30)), - Column('vid', Integer) - ) + Table( + "a", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(30)), + Column('vid', Integer) + ) @classmethod def setup_classes(cls): class A(cls.Basic): pass - @classmethod def setup_mappers(cls): - mapper(cls.classes.A, cls.tables.a, - version_id_col=cls.tables.a.c.vid, - version_id_generator=False) + mapper( + cls.classes.A, cls.tables.a, version_id_col=cls.tables.a.c.vid, + version_id_generator=False) def test_insert(self): sess = Session() @@ -904,4 +935,4 @@ class ManualVersionTest(fixtures.MappedTest): a1.vid = 2 sess.commit() - eq_(a1.vid, 2)
\ No newline at end of file + eq_(a1.vid, 2) |