diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-08-14 20:37:28 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-08-14 20:37:28 -0400 |
commit | f0a56bc5aa8cb0ca7b642eaa99093ade2065d4b5 (patch) | |
tree | 8e9266cead532d089dd6edee6390b82ebd914cfd | |
parent | 253523c57f76c515ade82e4db30cff2536fb2a92 (diff) | |
download | sqlalchemy-f0a56bc5aa8cb0ca7b642eaa99093ade2065d4b5.tar.gz |
pep8
-rw-r--r-- | test/orm/test_unitofworkv2.py | 1016 |
1 files changed, 530 insertions, 486 deletions
diff --git a/test/orm/test_unitofworkv2.py b/test/orm/test_unitofworkv2.py index 9fedc9590..9c9296786 100644 --- a/test/orm/test_unitofworkv2.py +++ b/test/orm/test_unitofworkv2.py @@ -1,4 +1,4 @@ -from sqlalchemy.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.testing import eq_, assert_raises_message from sqlalchemy import testing from sqlalchemy.testing import engines from sqlalchemy.testing.schema import Table, Column @@ -7,12 +7,14 @@ from sqlalchemy import exc from sqlalchemy.testing import fixtures from sqlalchemy import Integer, String, ForeignKey, func from sqlalchemy.orm import mapper, relationship, backref, \ - create_session, unitofwork, attributes,\ - Session, class_mapper, sync, exc as orm_exc + create_session, unitofwork, attributes,\ + Session, exc as orm_exc + +from sqlalchemy.testing.assertsql import AllOf, CompiledSQL -from sqlalchemy.testing.assertsql import AllOf, CompiledSQL, Or class AssertsUOW(object): + def _get_test_uow(self, session): uow = unitofwork.UOWTransaction(session) deleted = set(session._deleted) @@ -24,26 +26,29 @@ class AssertsUOW(object): uow.register_object(d, isdelete=True) return uow - def _assert_uow_size(self, session, expected ): + def _assert_uow_size(self, session, expected): uow = self._get_test_uow(session) postsort_actions = uow._generate_actions() print(postsort_actions) eq_(len(postsort_actions), expected, postsort_actions) -class UOWTest(_fixtures.FixtureTest, - testing.AssertsExecutionResults, AssertsUOW): + +class UOWTest( + _fixtures.FixtureTest, + testing.AssertsExecutionResults, AssertsUOW): run_inserts = None + class RudimentaryFlushTest(UOWTest): def test_one_to_many_save(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users, properties={ - 'addresses':relationship(Address), + 'addresses': relationship(Address), }) mapper(Address, addresses) sess = create_session() @@ -53,32 +58,32 @@ class RudimentaryFlushTest(UOWTest): sess.add(u1) self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL( - "INSERT INTO users (name) VALUES (:name)", - {'name': 'u1'} - ), - CompiledSQL( - "INSERT INTO addresses (user_id, email_address) " - "VALUES (:user_id, :email_address)", - lambda ctx: {'email_address': 'a1', 'user_id':u1.id} - ), - CompiledSQL( - "INSERT INTO addresses (user_id, email_address) " - "VALUES (:user_id, :email_address)", - lambda ctx: {'email_address': 'a2', 'user_id':u1.id} - ), - ) + testing.db, + sess.flush, + CompiledSQL( + "INSERT INTO users (name) VALUES (:name)", + {'name': 'u1'} + ), + CompiledSQL( + "INSERT INTO addresses (user_id, email_address) " + "VALUES (:user_id, :email_address)", + lambda ctx: {'email_address': 'a1', 'user_id': u1.id} + ), + CompiledSQL( + "INSERT INTO addresses (user_id, email_address) " + "VALUES (:user_id, :email_address)", + lambda ctx: {'email_address': 'a2', 'user_id': u1.id} + ), + ) def test_one_to_many_delete_all(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users, properties={ - 'addresses':relationship(Address), + 'addresses': relationship(Address), }) mapper(Address, addresses) sess = create_session() @@ -91,26 +96,26 @@ class RudimentaryFlushTest(UOWTest): sess.delete(a1) sess.delete(a2) self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL( - "DELETE FROM addresses WHERE addresses.id = :id", - [{'id':a1.id},{'id':a2.id}] - ), - CompiledSQL( - "DELETE FROM users WHERE users.id = :id", - {'id':u1.id} - ), + testing.db, + sess.flush, + CompiledSQL( + "DELETE FROM addresses WHERE addresses.id = :id", + [{'id': a1.id}, {'id': a2.id}] + ), + CompiledSQL( + "DELETE FROM users WHERE users.id = :id", + {'id': u1.id} + ), ) def test_one_to_many_delete_parent(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users, properties={ - 'addresses':relationship(Address), + 'addresses': relationship(Address), }) mapper(Address, addresses) sess = create_session() @@ -121,76 +126,75 @@ class RudimentaryFlushTest(UOWTest): sess.delete(u1) self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL( - "UPDATE addresses SET user_id=:user_id WHERE " - "addresses.id = :addresses_id", - lambda ctx: [{'addresses_id': a1.id, 'user_id': None}] - ), - CompiledSQL( - "UPDATE addresses SET user_id=:user_id WHERE " - "addresses.id = :addresses_id", - lambda ctx: [{'addresses_id': a2.id, 'user_id': None}] - ), - CompiledSQL( - "DELETE FROM users WHERE users.id = :id", - {'id':u1.id} - ), + testing.db, + sess.flush, + CompiledSQL( + "UPDATE addresses SET user_id=:user_id WHERE " + "addresses.id = :addresses_id", + lambda ctx: [{'addresses_id': a1.id, 'user_id': None}] + ), + CompiledSQL( + "UPDATE addresses SET user_id=:user_id WHERE " + "addresses.id = :addresses_id", + lambda ctx: [{'addresses_id': a2.id, 'user_id': None}] + ), + CompiledSQL( + "DELETE FROM users WHERE users.id = :id", + {'id': u1.id} + ), ) def test_many_to_one_save(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) - + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users) mapper(Address, addresses, properties={ - 'user':relationship(User) + 'user': relationship(User) }) sess = create_session() u1 = User(name='u1') a1, a2 = Address(email_address='a1', user=u1), \ - Address(email_address='a2', user=u1) + Address(email_address='a2', user=u1) sess.add_all([a1, a2]) self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL( - "INSERT INTO users (name) VALUES (:name)", - {'name': 'u1'} - ), - CompiledSQL( - "INSERT INTO addresses (user_id, email_address) " - "VALUES (:user_id, :email_address)", - lambda ctx: {'email_address': 'a1', 'user_id':u1.id} - ), - CompiledSQL( - "INSERT INTO addresses (user_id, email_address) " - "VALUES (:user_id, :email_address)", - lambda ctx: {'email_address': 'a2', 'user_id':u1.id} - ), - ) + testing.db, + sess.flush, + CompiledSQL( + "INSERT INTO users (name) VALUES (:name)", + {'name': 'u1'} + ), + CompiledSQL( + "INSERT INTO addresses (user_id, email_address) " + "VALUES (:user_id, :email_address)", + lambda ctx: {'email_address': 'a1', 'user_id': u1.id} + ), + CompiledSQL( + "INSERT INTO addresses (user_id, email_address) " + "VALUES (:user_id, :email_address)", + lambda ctx: {'email_address': 'a2', 'user_id': u1.id} + ), + ) def test_many_to_one_delete_all(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users) mapper(Address, addresses, properties={ - 'user':relationship(User) + 'user': relationship(User) }) sess = create_session() u1 = User(name='u1') a1, a2 = Address(email_address='a1', user=u1), \ - Address(email_address='a2', user=u1) + Address(email_address='a2', user=u1) sess.add_all([a1, a2]) sess.flush() @@ -198,71 +202,71 @@ class RudimentaryFlushTest(UOWTest): sess.delete(a1) sess.delete(a2) self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL( - "DELETE FROM addresses WHERE addresses.id = :id", - [{'id':a1.id},{'id':a2.id}] - ), - CompiledSQL( - "DELETE FROM users WHERE users.id = :id", - {'id':u1.id} - ), + testing.db, + sess.flush, + CompiledSQL( + "DELETE FROM addresses WHERE addresses.id = :id", + [{'id': a1.id}, {'id': a2.id}] + ), + CompiledSQL( + "DELETE FROM users WHERE users.id = :id", + {'id': u1.id} + ), ) def test_many_to_one_delete_target(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users) mapper(Address, addresses, properties={ - 'user':relationship(User) + 'user': relationship(User) }) sess = create_session() u1 = User(name='u1') a1, a2 = Address(email_address='a1', user=u1), \ - Address(email_address='a2', user=u1) + Address(email_address='a2', user=u1) sess.add_all([a1, a2]) sess.flush() sess.delete(u1) a1.user = a2.user = None self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL( - "UPDATE addresses SET user_id=:user_id WHERE " - "addresses.id = :addresses_id", - lambda ctx: [{'addresses_id': a1.id, 'user_id': None}] - ), - CompiledSQL( - "UPDATE addresses SET user_id=:user_id WHERE " - "addresses.id = :addresses_id", - lambda ctx: [{'addresses_id': a2.id, 'user_id': None}] - ), - CompiledSQL( - "DELETE FROM users WHERE users.id = :id", - {'id':u1.id} - ), + testing.db, + sess.flush, + CompiledSQL( + "UPDATE addresses SET user_id=:user_id WHERE " + "addresses.id = :addresses_id", + lambda ctx: [{'addresses_id': a1.id, 'user_id': None}] + ), + CompiledSQL( + "UPDATE addresses SET user_id=:user_id WHERE " + "addresses.id = :addresses_id", + lambda ctx: [{'addresses_id': a2.id, 'user_id': None}] + ), + CompiledSQL( + "DELETE FROM users WHERE users.id = :id", + {'id': u1.id} + ), ) def test_many_to_one_delete_unloaded(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users) mapper(Address, addresses, properties={ - 'parent':relationship(User) + 'parent': relationship(User) }) parent = User(name='p1') c1, c2 = Address(email_address='c1', parent=parent), \ - Address(email_address='c2', parent=parent) + Address(email_address='c2', parent=parent) session = Session() session.add_all([c1, c2]) @@ -295,16 +299,20 @@ class RudimentaryFlushTest(UOWTest): # the User row might be handled before or the addresses # are loaded so need to use AllOf CompiledSQL( - "SELECT addresses.id AS addresses_id, addresses.user_id AS " + "SELECT addresses.id AS addresses_id, " + "addresses.user_id AS " "addresses_user_id, addresses.email_address AS " - "addresses_email_address FROM addresses WHERE addresses.id = " + "addresses_email_address FROM addresses " + "WHERE addresses.id = " ":param_1", lambda ctx: {'param_1': c1id} ), CompiledSQL( - "SELECT addresses.id AS addresses_id, addresses.user_id AS " + "SELECT addresses.id AS addresses_id, " + "addresses.user_id AS " "addresses_user_id, addresses.email_address AS " - "addresses_email_address FROM addresses WHERE addresses.id = " + "addresses_email_address FROM addresses " + "WHERE addresses.id = " ":param_1", lambda ctx: {'param_1': c2id} ), @@ -326,18 +334,18 @@ class RudimentaryFlushTest(UOWTest): def test_many_to_one_delete_childonly_unloaded(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users) mapper(Address, addresses, properties={ - 'parent':relationship(User) + 'parent': relationship(User) }) parent = User(name='p1') c1, c2 = Address(email_address='c1', parent=parent), \ - Address(email_address='c2', parent=parent) + Address(email_address='c2', parent=parent) session = Session() session.add_all([c1, c2]) @@ -345,7 +353,7 @@ class RudimentaryFlushTest(UOWTest): session.flush() - pid = parent.id + #pid = parent.id c1id = c1.id c2id = c2.id @@ -360,18 +368,23 @@ class RudimentaryFlushTest(UOWTest): session.flush, AllOf( # [ticket:2049] - we aren't deleting User, - # relationship is simple m2o, no SELECT should be emitted for it. + # relationship is simple m2o, no SELECT should be emitted for + # it. CompiledSQL( - "SELECT addresses.id AS addresses_id, addresses.user_id AS " + "SELECT addresses.id AS addresses_id, " + "addresses.user_id AS " "addresses_user_id, addresses.email_address AS " - "addresses_email_address FROM addresses WHERE addresses.id = " + "addresses_email_address FROM addresses " + "WHERE addresses.id = " ":param_1", lambda ctx: {'param_1': c1id} ), CompiledSQL( - "SELECT addresses.id AS addresses_id, addresses.user_id AS " + "SELECT addresses.id AS addresses_id, " + "addresses.user_id AS " "addresses_user_id, addresses.email_address AS " - "addresses_email_address FROM addresses WHERE addresses.id = " + "addresses_email_address FROM addresses " + "WHERE addresses.id = " ":param_1", lambda ctx: {'param_1': c2id} ), @@ -384,18 +397,18 @@ class RudimentaryFlushTest(UOWTest): def test_many_to_one_delete_childonly_unloaded_expired(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users) mapper(Address, addresses, properties={ - 'parent':relationship(User) + 'parent': relationship(User) }) parent = User(name='p1') c1, c2 = Address(email_address='c1', parent=parent), \ - Address(email_address='c2', parent=parent) + Address(email_address='c2', parent=parent) session = Session() session.add_all([c1, c2]) @@ -403,7 +416,7 @@ class RudimentaryFlushTest(UOWTest): session.flush() - pid = parent.id + #pid = parent.id c1id = c1.id c2id = c2.id @@ -420,16 +433,20 @@ class RudimentaryFlushTest(UOWTest): AllOf( # the parent User is expired, so it gets loaded here. CompiledSQL( - "SELECT addresses.id AS addresses_id, addresses.user_id AS " + "SELECT addresses.id AS addresses_id, " + "addresses.user_id AS " "addresses_user_id, addresses.email_address AS " - "addresses_email_address FROM addresses WHERE addresses.id = " + "addresses_email_address FROM addresses " + "WHERE addresses.id = " ":param_1", lambda ctx: {'param_1': c1id} ), CompiledSQL( - "SELECT addresses.id AS addresses_id, addresses.user_id AS " + "SELECT addresses.id AS addresses_id, " + "addresses.user_id AS " "addresses_user_id, addresses.email_address AS " - "addresses_email_address FROM addresses WHERE addresses.id = " + "addresses_email_address FROM addresses " + "WHERE addresses.id = " ":param_1", lambda ctx: {'param_1': c2id} ), @@ -441,17 +458,17 @@ class RudimentaryFlushTest(UOWTest): ) def test_natural_ordering(self): - """test that unconnected items take relationship() into account regardless.""" + """test that unconnected items take relationship() + into account regardless.""" users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) - + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users) mapper(Address, addresses, properties={ - 'parent':relationship(User) + 'parent': relationship(User) }) sess = create_session() @@ -465,7 +482,7 @@ class RudimentaryFlushTest(UOWTest): sess.flush, CompiledSQL( "INSERT INTO users (id, name) VALUES (:id, :name)", - {'id':1, 'name':'u1'}), + {'id': 1, 'name': 'u1'}), CompiledSQL( "INSERT INTO addresses (id, user_id, email_address) " "VALUES (:id, :user_id, :email_address)", @@ -489,13 +506,13 @@ class RudimentaryFlushTest(UOWTest): ) def test_natural_selfref(self): - """test that unconnected items take relationship() into account regardless.""" + """test that unconnected items take relationship() + into account regardless.""" Node, nodes = self.classes.Node, self.tables.nodes - mapper(Node, nodes, properties={ - 'children':relationship(Node) + 'children': relationship(Node) }) sess = create_session() @@ -515,20 +532,18 @@ class RudimentaryFlushTest(UOWTest): "INSERT INTO nodes (id, parent_id, data) VALUES " "(:id, :parent_id, :data)", [{'parent_id': None, 'data': None, 'id': 1}, - {'parent_id': 1, 'data': None, 'id': 2}, - {'parent_id': 2, 'data': None, 'id': 3}] - ), + {'parent_id': 1, 'data': None, 'id': 2}, + {'parent_id': 2, 'data': None, 'id': 3}] + ), ) def test_many_to_many(self): - keywords, items, item_keywords, Keyword, Item = (self.tables.keywords, - self.tables.items, - self.tables.item_keywords, - self.classes.Keyword, - self.classes.Item) + keywords, items, item_keywords, Keyword, Item = ( + self.tables.keywords, self.tables.items, self.tables.item_keywords, + self.classes.Keyword, self.classes.Item) mapper(Item, items, properties={ - 'keywords':relationship(Keyword, secondary=item_keywords) + 'keywords': relationship(Keyword, secondary=item_keywords) }) mapper(Keyword, keywords) @@ -537,45 +552,45 @@ class RudimentaryFlushTest(UOWTest): i1 = Item(description='i1', keywords=[k1]) sess.add(i1) self.assert_sql_execution( - testing.db, - sess.flush, - AllOf( - CompiledSQL( + testing.db, + sess.flush, + AllOf( + CompiledSQL( "INSERT INTO keywords (name) VALUES (:name)", - {'name':'k1'} - ), - CompiledSQL( - "INSERT INTO items (description) VALUES (:description)", - {'description':'i1'} - ), + {'name': 'k1'} ), CompiledSQL( - "INSERT INTO item_keywords (item_id, keyword_id) " - "VALUES (:item_id, :keyword_id)", - lambda ctx:{'item_id':i1.id, 'keyword_id':k1.id} - ) + "INSERT INTO items (description) VALUES (:description)", + {'description': 'i1'} + ), + ), + CompiledSQL( + "INSERT INTO item_keywords (item_id, keyword_id) " + "VALUES (:item_id, :keyword_id)", + lambda ctx: {'item_id': i1.id, 'keyword_id': k1.id} + ) ) # test that keywords collection isn't loaded sess.expire(i1, ['keywords']) i1.description = 'i2' self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL("UPDATE items SET description=:description " - "WHERE items.id = :items_id", - lambda ctx:{'description':'i2', 'items_id':i1.id}) + testing.db, + sess.flush, + CompiledSQL("UPDATE items SET description=:description " + "WHERE items.id = :items_id", + lambda ctx: {'description': 'i2', 'items_id': i1.id}) ) def test_m2o_flush_size(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users) mapper(Address, addresses, properties={ - 'user':relationship(User, passive_updates=True) + 'user': relationship(User, passive_updates=True) }) sess = create_session() u1 = User(name='ed') @@ -584,12 +599,12 @@ class RudimentaryFlushTest(UOWTest): def test_o2m_flush_size(self): users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + self.classes.Address, + self.tables.addresses, + self.classes.User) mapper(User, users, properties={ - 'addresses':relationship(Address), + 'addresses': relationship(Address), }) mapper(Address, addresses) @@ -600,7 +615,7 @@ class RudimentaryFlushTest(UOWTest): sess.flush() - u1.name='jack' + u1.name = 'jack' self._assert_uow_size(sess, 2) sess.flush() @@ -617,7 +632,7 @@ class RudimentaryFlushTest(UOWTest): sess = create_session() u1 = sess.query(User).first() - u1.name='ed' + u1.name = 'ed' self._assert_uow_size(sess, 2) u1.addresses @@ -625,6 +640,7 @@ class RudimentaryFlushTest(UOWTest): class SingleCycleTest(UOWTest): + def teardown(self): engines.testing_reaper.rollback_all() # mysql can't handle delete from nodes @@ -639,7 +655,7 @@ class SingleCycleTest(UOWTest): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'children':relationship(Node) + 'children': relationship(Node) }) sess = create_session() @@ -649,15 +665,15 @@ class SingleCycleTest(UOWTest): sess.add(n1) self.assert_sql_execution( - testing.db, - sess.flush, + testing.db, + sess.flush, - CompiledSQL( - "INSERT INTO nodes (parent_id, data) VALUES " - "(:parent_id, :data)", - {'parent_id': None, 'data': 'n1'} - ), - AllOf( + CompiledSQL( + "INSERT INTO nodes (parent_id, data) VALUES " + "(:parent_id, :data)", + {'parent_id': None, 'data': 'n1'} + ), + AllOf( CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", @@ -668,14 +684,14 @@ class SingleCycleTest(UOWTest): "(:parent_id, :data)", lambda ctx: {'parent_id': n1.id, 'data': 'n3'} ), - ) ) + ) def test_one_to_many_delete_all(self): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'children':relationship(Node) + 'children': relationship(Node) }) sess = create_session() @@ -689,19 +705,19 @@ class SingleCycleTest(UOWTest): sess.delete(n2) sess.delete(n3) self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", - lambda ctx:[{'id':n2.id}, {'id':n3.id}]), - CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", - lambda ctx: {'id':n1.id}) + testing.db, + sess.flush, + CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", + lambda ctx: [{'id': n2.id}, {'id': n3.id}]), + CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", + lambda ctx: {'id': n1.id}) ) def test_one_to_many_delete_parent(self): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'children':relationship(Node) + 'children': relationship(Node) }) sess = create_session() @@ -713,25 +729,25 @@ class SingleCycleTest(UOWTest): sess.delete(n1) self.assert_sql_execution( - testing.db, - sess.flush, - AllOf( - CompiledSQL("UPDATE nodes SET parent_id=:parent_id " - "WHERE nodes.id = :nodes_id", - lambda ctx: {'nodes_id':n3.id, 'parent_id':None}), - CompiledSQL("UPDATE nodes SET parent_id=:parent_id " - "WHERE nodes.id = :nodes_id", - lambda ctx: {'nodes_id':n2.id, 'parent_id':None}), - ), - CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", - lambda ctx:{'id':n1.id}) - ) + testing.db, sess.flush, AllOf( + CompiledSQL( + "UPDATE nodes SET parent_id=:parent_id " + "WHERE nodes.id = :nodes_id", lambda ctx: { + 'nodes_id': n3.id, 'parent_id': None}), + CompiledSQL( + "UPDATE nodes SET parent_id=:parent_id " + "WHERE nodes.id = :nodes_id", lambda ctx: { + 'nodes_id': n2.id, 'parent_id': None}), + ), + CompiledSQL( + "DELETE FROM nodes WHERE nodes.id = :id", lambda ctx: { + 'id': n1.id})) def test_many_to_one_save(self): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'parent':relationship(Node, remote_side=nodes.c.id) + 'parent': relationship(Node, remote_side=nodes.c.id) }) sess = create_session() @@ -741,15 +757,15 @@ class SingleCycleTest(UOWTest): sess.add_all([n2, n3]) self.assert_sql_execution( - testing.db, - sess.flush, + testing.db, + sess.flush, - CompiledSQL( - "INSERT INTO nodes (parent_id, data) VALUES " - "(:parent_id, :data)", - {'parent_id': None, 'data': 'n1'} - ), - AllOf( + CompiledSQL( + "INSERT INTO nodes (parent_id, data) VALUES " + "(:parent_id, :data)", + {'parent_id': None, 'data': 'n1'} + ), + AllOf( CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", @@ -760,14 +776,14 @@ class SingleCycleTest(UOWTest): "(:parent_id, :data)", lambda ctx: {'parent_id': n1.id, 'data': 'n3'} ), - ) ) + ) def test_many_to_one_delete_all(self): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'parent':relationship(Node, remote_side=nodes.c.id) + 'parent': relationship(Node, remote_side=nodes.c.id) }) sess = create_session() @@ -781,19 +797,19 @@ class SingleCycleTest(UOWTest): sess.delete(n2) sess.delete(n3) self.assert_sql_execution( - testing.db, - sess.flush, - CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", - lambda ctx:[{'id':n2.id},{'id':n3.id}]), - CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", - lambda ctx: {'id':n1.id}) + testing.db, + sess.flush, + CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", + lambda ctx: [{'id': n2.id}, {'id': n3.id}]), + CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", + lambda ctx: {'id': n1.id}) ) def test_many_to_one_set_null_unloaded(self): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'parent':relationship(Node, remote_side=nodes.c.id) + 'parent': relationship(Node, remote_side=nodes.c.id) }) sess = create_session() n1 = Node(data='n1') @@ -810,7 +826,7 @@ class SingleCycleTest(UOWTest): CompiledSQL( "UPDATE nodes SET parent_id=:parent_id WHERE " "nodes.id = :nodes_id", - lambda ctx: {"parent_id":None, "nodes_id":n2.id} + lambda ctx: {"parent_id": None, "nodes_id": n2.id} ) ) @@ -818,7 +834,7 @@ class SingleCycleTest(UOWTest): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'children':relationship(Node) + 'children': relationship(Node) }) sess = create_session() @@ -836,9 +852,9 @@ class SingleCycleTest(UOWTest): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'children':relationship(Node, - backref=backref('parent', - remote_side=nodes.c.id)) + 'children': relationship(Node, + backref=backref('parent', + remote_side=nodes.c.id)) }) sess = create_session() @@ -857,11 +873,15 @@ class SingleCycleTest(UOWTest): def test_bidirectional_multilevel_save(self): Node, nodes = self.classes.Node, self.tables.nodes - mapper(Node, nodes, properties={ - 'children':relationship(Node, - backref=backref('parent', remote_side=nodes.c.id) - ) - }) + mapper( + Node, + nodes, + properties={ + 'children': relationship( + Node, + backref=backref( + 'parent', + remote_side=nodes.c.id))}) sess = create_session() n1 = Node(data='n1') n1.children.append(Node(data='n11')) @@ -878,37 +898,37 @@ class SingleCycleTest(UOWTest): CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", - lambda ctx:{'parent_id':None, 'data':'n1'} + lambda ctx: {'parent_id': None, 'data': 'n1'} ), CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", - lambda ctx:{'parent_id':n1.id, 'data':'n11'} + lambda ctx: {'parent_id': n1.id, 'data': 'n11'} ), CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", - lambda ctx:{'parent_id':n1.id, 'data':'n12'} + lambda ctx: {'parent_id': n1.id, 'data': 'n12'} ), CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", - lambda ctx:{'parent_id':n1.id, 'data':'n13'} + lambda ctx: {'parent_id': n1.id, 'data': 'n13'} ), CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", - lambda ctx:{'parent_id':n12.id, 'data':'n121'} + lambda ctx: {'parent_id': n12.id, 'data': 'n121'} ), CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", - lambda ctx:{'parent_id':n12.id, 'data':'n122'} + lambda ctx: {'parent_id': n12.id, 'data': 'n122'} ), CompiledSQL( "INSERT INTO nodes (parent_id, data) VALUES " "(:parent_id, :data)", - lambda ctx:{'parent_id':n12.id, 'data':'n123'} + lambda ctx: {'parent_id': n12.id, 'data': 'n123'} ), ) @@ -916,7 +936,7 @@ class SingleCycleTest(UOWTest): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'children':relationship(Node) + 'children': relationship(Node) }) sess = create_session() n1 = Node(data='ed') @@ -925,7 +945,7 @@ class SingleCycleTest(UOWTest): sess.flush() - n1.data='jack' + n1.data = 'jack' self._assert_uow_size(sess, 2) sess.flush() @@ -942,18 +962,17 @@ class SingleCycleTest(UOWTest): sess = create_session() n1 = sess.query(Node).first() - n1.data='ed' + n1.data = 'ed' self._assert_uow_size(sess, 2) n1.children self._assert_uow_size(sess, 2) - def test_delete_unloaded_m2o(self): Node, nodes = self.classes.Node, self.tables.nodes mapper(Node, nodes, properties={ - 'parent':relationship(Node, remote_side=nodes.c.id) + 'parent': relationship(Node, remote_side=nodes.c.id) }) parent = Node() @@ -1022,35 +1041,38 @@ class SingleCycleTest(UOWTest): ) +class SingleCyclePlusAttributeTest( + fixtures.MappedTest, + testing.AssertsExecutionResults, + AssertsUOW): -class SingleCyclePlusAttributeTest(fixtures.MappedTest, - testing.AssertsExecutionResults, AssertsUOW): @classmethod def define_tables(cls, metadata): Table('nodes', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('parent_id', Integer, ForeignKey('nodes.id')), - Column('data', String(30)) - ) + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('parent_id', Integer, ForeignKey('nodes.id')), + Column('data', String(30)) + ) Table('foobars', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('parent_id', Integer, ForeignKey('nodes.id')), - ) + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('parent_id', Integer, ForeignKey('nodes.id')), + ) def test_flush_size(self): foobars, nodes = self.tables.foobars, self.tables.nodes class Node(fixtures.ComparableEntity): pass + class FooBar(fixtures.ComparableEntity): pass mapper(Node, nodes, properties={ - 'children':relationship(Node), - 'foobars':relationship(FooBar) + 'children': relationship(Node), + 'foobars': relationship(FooBar) }) mapper(FooBar, foobars) @@ -1070,25 +1092,30 @@ class SingleCyclePlusAttributeTest(fixtures.MappedTest, sess.flush() + class SingleCycleM2MTest(fixtures.MappedTest, - testing.AssertsExecutionResults, AssertsUOW): + testing.AssertsExecutionResults, AssertsUOW): @classmethod def define_tables(cls, metadata): - nodes = Table('nodes', metadata, - Column('id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('data', String(30)), - Column('favorite_node_id', Integer, ForeignKey('nodes.id')) - ) + Table( + 'nodes', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column( + 'data', String(30)), Column( + 'favorite_node_id', Integer, ForeignKey('nodes.id'))) - node_to_nodes =Table('node_to_nodes', metadata, - Column('left_node_id', Integer, - ForeignKey('nodes.id'),primary_key=True), - Column('right_node_id', Integer, - ForeignKey('nodes.id'),primary_key=True), - ) + Table( + 'node_to_nodes', metadata, + Column( + 'left_node_id', Integer, + ForeignKey('nodes.id'), primary_key=True), + Column( + 'right_node_id', Integer, + ForeignKey('nodes.id'), primary_key=True), + ) def test_many_to_many_one(self): nodes, node_to_nodes = self.tables.nodes, self.tables.node_to_nodes @@ -1096,14 +1123,19 @@ class SingleCycleM2MTest(fixtures.MappedTest, class Node(fixtures.ComparableEntity): pass - mapper(Node, nodes, properties={ - 'children':relationship(Node, secondary=node_to_nodes, - primaryjoin=nodes.c.id==node_to_nodes.c.left_node_id, - secondaryjoin=nodes.c.id==node_to_nodes.c.right_node_id, - backref='parents' - ), - 'favorite':relationship(Node, remote_side=nodes.c.id) - }) + mapper( + Node, + nodes, + properties={ + 'children': relationship( + Node, + secondary=node_to_nodes, + primaryjoin=nodes.c.id == node_to_nodes.c.left_node_id, + secondaryjoin=nodes.c.id == node_to_nodes.c.right_node_id, + backref='parents'), + 'favorite': relationship( + Node, + remote_side=nodes.c.id)}) sess = create_session() n1 = Node(data='n1') @@ -1128,46 +1160,46 @@ class SingleCycleM2MTest(fixtures.MappedTest, sess.flush() eq_( sess.query(node_to_nodes.c.left_node_id, - node_to_nodes.c.right_node_id).\ - order_by(node_to_nodes.c.left_node_id, - node_to_nodes.c.right_node_id).\ - all(), + node_to_nodes.c.right_node_id). + order_by(node_to_nodes.c.left_node_id, + node_to_nodes.c.right_node_id). + all(), sorted([ - (n1.id, n2.id), (n1.id, n3.id), (n1.id, n4.id), - (n2.id, n3.id), (n2.id, n5.id), - (n3.id, n5.id), (n3.id, n4.id) - ]) + (n1.id, n2.id), (n1.id, n3.id), (n1.id, n4.id), + (n2.id, n3.id), (n2.id, n5.id), + (n3.id, n5.id), (n3.id, n4.id) + ]) ) sess.delete(n1) self.assert_sql_execution( - testing.db, - sess.flush, - # this is n1.parents firing off, as it should, since - # passive_deletes is False for n1.parents - CompiledSQL( - "SELECT nodes.id AS nodes_id, nodes.data AS nodes_data, " - "nodes.favorite_node_id AS nodes_favorite_node_id FROM " - "nodes, node_to_nodes WHERE :param_1 = " - "node_to_nodes.right_node_id AND nodes.id = " - "node_to_nodes.left_node_id" , - lambda ctx:{'param_1': n1.id}, - ), - CompiledSQL( - "DELETE FROM node_to_nodes WHERE " - "node_to_nodes.left_node_id = :left_node_id AND " - "node_to_nodes.right_node_id = :right_node_id", - lambda ctx:[ - {'right_node_id': n2.id, 'left_node_id': n1.id}, - {'right_node_id': n3.id, 'left_node_id': n1.id}, - {'right_node_id': n4.id, 'left_node_id': n1.id} - ] - ), - CompiledSQL( - "DELETE FROM nodes WHERE nodes.id = :id", - lambda ctx:{'id': n1.id} - ), + testing.db, + sess.flush, + # this is n1.parents firing off, as it should, since + # passive_deletes is False for n1.parents + CompiledSQL( + "SELECT nodes.id AS nodes_id, nodes.data AS nodes_data, " + "nodes.favorite_node_id AS nodes_favorite_node_id FROM " + "nodes, node_to_nodes WHERE :param_1 = " + "node_to_nodes.right_node_id AND nodes.id = " + "node_to_nodes.left_node_id", + lambda ctx: {'param_1': n1.id}, + ), + CompiledSQL( + "DELETE FROM node_to_nodes WHERE " + "node_to_nodes.left_node_id = :left_node_id AND " + "node_to_nodes.right_node_id = :right_node_id", + lambda ctx: [ + {'right_node_id': n2.id, 'left_node_id': n1.id}, + {'right_node_id': n3.id, 'left_node_id': n1.id}, + {'right_node_id': n4.id, 'left_node_id': n1.id} + ] + ), + CompiledSQL( + "DELETE FROM nodes WHERE nodes.id = :id", + lambda ctx: {'id': n1.id} + ), ) for n in [n2, n3, n4, n5]: @@ -1185,7 +1217,7 @@ class SingleCycleM2MTest(fixtures.MappedTest, "DELETE FROM node_to_nodes WHERE node_to_nodes.left_node_id " "= :left_node_id AND node_to_nodes.right_node_id = " ":right_node_id", - lambda ctx:[ + lambda ctx: [ {'right_node_id': n5.id, 'left_node_id': n3.id}, {'right_node_id': n4.id, 'left_node_id': n3.id}, {'right_node_id': n3.id, 'left_node_id': n2.id}, @@ -1194,38 +1226,41 @@ class SingleCycleM2MTest(fixtures.MappedTest, ), CompiledSQL( "DELETE FROM nodes WHERE nodes.id = :id", - lambda ctx:[{'id': n4.id}, {'id': n5.id}] + lambda ctx: [{'id': n4.id}, {'id': n5.id}] ), CompiledSQL( "DELETE FROM nodes WHERE nodes.id = :id", - lambda ctx:[{'id': n2.id}, {'id': n3.id}] + lambda ctx: [{'id': n2.id}, {'id': n3.id}] ), ) + class RowswitchAccountingTest(fixtures.MappedTest): + @classmethod def define_tables(cls, metadata): Table('parent', metadata, - Column('id', Integer, primary_key=True), - Column('data', Integer) - ) + Column('id', Integer, primary_key=True), + Column('data', Integer) + ) Table('child', metadata, - Column('id', Integer, ForeignKey('parent.id'), primary_key=True), - Column('data', Integer) - ) + Column('id', Integer, ForeignKey('parent.id'), primary_key=True), + Column('data', Integer) + ) def _fixture(self): parent, child = self.tables.parent, self.tables.child class Parent(fixtures.BasicEntity): pass + class Child(fixtures.BasicEntity): pass mapper(Parent, parent, properties={ - 'child':relationship(Child, uselist=False, - cascade="all, delete-orphan", - backref="parent") + 'child': relationship(Child, uselist=False, + cascade="all, delete-orphan", + backref="parent") }) mapper(Child, child) return Parent, Child @@ -1274,8 +1309,10 @@ class RowswitchAccountingTest(fixtures.MappedTest): eq_(sess.scalar(self.tables.parent.count()), 0) + class RowswitchM2OTest(fixtures.MappedTest): # tests for #3060 and related issues + @classmethod def define_tables(cls, metadata): Table( @@ -1299,17 +1336,18 @@ class RowswitchM2OTest(fixtures.MappedTest): class A(fixtures.BasicEntity): pass + class B(fixtures.BasicEntity): pass + class C(fixtures.BasicEntity): pass - mapper(A, a, properties={ - 'bs': relationship(B, cascade="all, delete-orphan") + 'bs': relationship(B, cascade="all, delete-orphan") }) mapper(B, b, properties={ - 'c': relationship(C) + 'c': relationship(C) }) mapper(C, c) return A, B, C @@ -1391,29 +1429,31 @@ class RowswitchM2OTest(fixtures.MappedTest): class BasicStaleChecksTest(fixtures.MappedTest): + @classmethod def define_tables(cls, metadata): Table('parent', metadata, - Column('id', Integer, primary_key=True), - Column('data', Integer) - ) + Column('id', Integer, primary_key=True), + Column('data', Integer) + ) Table('child', metadata, - Column('id', Integer, ForeignKey('parent.id'), primary_key=True), - Column('data', Integer) - ) + Column('id', Integer, ForeignKey('parent.id'), primary_key=True), + Column('data', Integer) + ) def _fixture(self, confirm_deleted_rows=True): parent, child = self.tables.parent, self.tables.child class Parent(fixtures.BasicEntity): pass + class Child(fixtures.BasicEntity): pass mapper(Parent, parent, properties={ - 'child':relationship(Child, uselist=False, - cascade="all, delete-orphan", - backref="parent"), + 'child': relationship(Child, uselist=False, + cascade="all, delete-orphan", + backref="parent"), }, confirm_deleted_rows=confirm_deleted_rows) mapper(Child, child) return Parent, Child @@ -1431,7 +1471,7 @@ class BasicStaleChecksTest(fixtures.MappedTest): assert_raises_message( orm_exc.StaleDataError, "UPDATE statement on table 'parent' expected to " - "update 1 row\(s\); 0 were matched.", + "update 1 row\(s\); 0 were matched.", sess.flush ) @@ -1451,7 +1491,7 @@ class BasicStaleChecksTest(fixtures.MappedTest): assert_raises_message( exc.SAWarning, "DELETE statement on table 'parent' expected to " - "delete 2 row\(s\); 0 were matched.", + "delete 2 row\(s\); 0 were matched.", sess.flush ) @@ -1471,14 +1511,15 @@ class BasicStaleChecksTest(fixtures.MappedTest): class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults): + @classmethod def define_tables(cls, metadata): Table('t', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('data', String(50)), - Column('def_', String(50), server_default='def1') - ) + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(50)), + Column('def_', String(50), server_default='def1') + ) def test_batch_interaction(self): """test batching groups same-structured, primary @@ -1532,8 +1573,8 @@ class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults): ), CompiledSQL( "INSERT INTO t (id, data, def_) VALUES (:id, :data, :def_)", - [{'data': 't9', 'id': 9, 'def_':'def2'}, - {'data': 't10', 'id': 10, 'def_':'def3'}] + [{'data': 't9', 'id': 9, 'def_': 'def2'}, + {'data': 't10', 'id': 10, 'def_': 'def3'}] ), CompiledSQL( "INSERT INTO t (id, data) VALUES (:id, :data)", @@ -1541,126 +1582,129 @@ class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults): ), ) + class LoadersUsingCommittedTest(UOWTest): - """Test that events which occur within a flush() - get the same attribute loading behavior as on the outside - of the flush, and that the unit of work itself uses the - "committed" version of primary/foreign key attributes - when loading a collection for historical purposes (this typically - has importance for when primary key values change). + + """Test that events which occur within a flush() + get the same attribute loading behavior as on the outside + of the flush, and that the unit of work itself uses the + "committed" version of primary/foreign key attributes + when loading a collection for historical purposes (this typically + has importance for when primary key values change). + + """ + + def _mapper_setup(self, passive_updates=True): + users, Address, addresses, User = (self.tables.users, + self.classes.Address, + self.tables.addresses, + self.classes.User) + + mapper(User, users, properties={ + 'addresses': relationship(Address, + order_by=addresses.c.email_address, + passive_updates=passive_updates, + backref='user') + }) + mapper(Address, addresses) + return create_session(autocommit=False) + + def test_before_update_m2o(self): + """Expect normal many to one attribute load behavior + (should not get committed value) + from within public 'before_update' event""" + sess = self._mapper_setup() + + Address, User = self.classes.Address, self.classes.User + + def before_update(mapper, connection, target): + # if get committed is used to find target.user, then + # it will be still be u1 instead of u2 + assert target.user.id == target.user_id == u2.id + from sqlalchemy import event + event.listen(Address, 'before_update', before_update) + + a1 = Address(email_address='a1') + u1 = User(name='u1', addresses=[a1]) + sess.add(u1) + + u2 = User(name='u2') + sess.add(u2) + sess.commit() + + sess.expunge_all() + # lookup an address and move it to the other user + a1 = sess.query(Address).get(a1.id) + + # move address to another user's fk + assert a1.user_id == u1.id + a1.user_id = u2.id + + sess.flush() + + def test_before_update_o2m_passive(self): + """Expect normal one to many attribute load behavior + (should not get committed value) + from within public 'before_update' event""" + self._test_before_update_o2m(True) + + def test_before_update_o2m_notpassive(self): + """Expect normal one to many attribute load behavior + (should not get committed value) + from within public 'before_update' event with + passive_updates=False """ + self._test_before_update_o2m(False) - def _mapper_setup(self, passive_updates=True): - users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) - - mapper(User, users, properties={ - 'addresses': relationship(Address, - order_by=addresses.c.email_address, - passive_updates=passive_updates, - backref='user') - }) - mapper(Address, addresses) - return create_session(autocommit=False) - - def test_before_update_m2o(self): - """Expect normal many to one attribute load behavior - (should not get committed value) - from within public 'before_update' event""" - sess = self._mapper_setup() - - Address, User = self.classes.Address, self.classes.User - - def before_update(mapper, connection, target): - # if get committed is used to find target.user, then - # it will be still be u1 instead of u2 - assert target.user.id == target.user_id == u2.id - from sqlalchemy import event - event.listen(Address, 'before_update', before_update) - - a1 = Address(email_address='a1') - u1 = User(name='u1', addresses=[a1]) - sess.add(u1) - - u2 = User(name='u2') - sess.add(u2) - sess.commit() - - sess.expunge_all() - # lookup an address and move it to the other user - a1 = sess.query(Address).get(a1.id) - - # move address to another user's fk - assert a1.user_id == u1.id - a1.user_id = u2.id + def _test_before_update_o2m(self, passive_updates): + sess = self._mapper_setup(passive_updates=passive_updates) - sess.flush() + Address, User = self.classes.Address, self.classes.User - def test_before_update_o2m_passive(self): - """Expect normal one to many attribute load behavior - (should not get committed value) - from within public 'before_update' event""" - self._test_before_update_o2m(True) + class AvoidReferencialError(Exception): - def test_before_update_o2m_notpassive(self): - """Expect normal one to many attribute load behavior - (should not get committed value) - from within public 'before_update' event with - passive_updates=False + """the test here would require ON UPDATE CASCADE on FKs + for the flush to fully succeed; this exception is used + to cancel the flush before we get that far. """ - self._test_before_update_o2m(False) - - def _test_before_update_o2m(self, passive_updates): - sess = self._mapper_setup(passive_updates=passive_updates) - - Address, User = self.classes.Address, self.classes.User - - class AvoidReferencialError(Exception): - """the test here would require ON UPDATE CASCADE on FKs - for the flush to fully succeed; this exception is used - to cancel the flush before we get that far. - - """ - - def before_update(mapper, connection, target): - if passive_updates: - # we shouldn't be using committed value. - # so, having switched target's primary key, - # we expect no related items in the collection - # since we are using passive_updates - # this is a behavior change since #2350 - assert 'addresses' not in target.__dict__ - eq_(target.addresses, []) - else: - # in contrast with passive_updates=True, - # here we expect the orm to have looked up the addresses - # with the committed value (it needs to in order to - # update the foreign keys). So we expect addresses - # collection to move with the user, - # (just like they will be after the update) - - # collection is already loaded - assert 'addresses' in target.__dict__ - eq_([a.id for a in target.addresses], - [a.id for a in [a1, a2]]) - raise AvoidReferencialError() - from sqlalchemy import event - event.listen(User, 'before_update', before_update) - - a1 = Address(email_address='jack1') - a2 = Address(email_address='jack2') - u1 = User(id=1, name='jack', addresses=[a1, a2]) - sess.add(u1) - sess.commit() - - sess.expunge_all() - u1 = sess.query(User).get(u1.id) - u1.id = 2 - try: - sess.flush() - except AvoidReferencialError: - pass + + def before_update(mapper, connection, target): + if passive_updates: + # we shouldn't be using committed value. + # so, having switched target's primary key, + # we expect no related items in the collection + # since we are using passive_updates + # this is a behavior change since #2350 + assert 'addresses' not in target.__dict__ + eq_(target.addresses, []) + else: + # in contrast with passive_updates=True, + # here we expect the orm to have looked up the addresses + # with the committed value (it needs to in order to + # update the foreign keys). So we expect addresses + # collection to move with the user, + # (just like they will be after the update) + + # collection is already loaded + assert 'addresses' in target.__dict__ + eq_([a.id for a in target.addresses], + [a.id for a in [a1, a2]]) + raise AvoidReferencialError() + from sqlalchemy import event + event.listen(User, 'before_update', before_update) + + a1 = Address(email_address='jack1') + a2 = Address(email_address='jack2') + u1 = User(id=1, name='jack', addresses=[a1, a2]) + sess.add(u1) + sess.commit() + + sess.expunge_all() + u1 = sess.query(User).get(u1.id) + u1.id = 2 + try: + sess.flush() + except AvoidReferencialError: + pass |