summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-08-14 20:37:28 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2014-08-14 20:37:28 -0400
commitf0a56bc5aa8cb0ca7b642eaa99093ade2065d4b5 (patch)
tree8e9266cead532d089dd6edee6390b82ebd914cfd
parent253523c57f76c515ade82e4db30cff2536fb2a92 (diff)
downloadsqlalchemy-f0a56bc5aa8cb0ca7b642eaa99093ade2065d4b5.tar.gz
pep8
-rw-r--r--test/orm/test_unitofworkv2.py1016
1 files changed, 530 insertions, 486 deletions
diff --git a/test/orm/test_unitofworkv2.py b/test/orm/test_unitofworkv2.py
index 9fedc9590..9c9296786 100644
--- a/test/orm/test_unitofworkv2.py
+++ b/test/orm/test_unitofworkv2.py
@@ -1,4 +1,4 @@
-from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.testing import eq_, assert_raises_message
from sqlalchemy import testing
from sqlalchemy.testing import engines
from sqlalchemy.testing.schema import Table, Column
@@ -7,12 +7,14 @@ from sqlalchemy import exc
from sqlalchemy.testing import fixtures
from sqlalchemy import Integer, String, ForeignKey, func
from sqlalchemy.orm import mapper, relationship, backref, \
- create_session, unitofwork, attributes,\
- Session, class_mapper, sync, exc as orm_exc
+ create_session, unitofwork, attributes,\
+ Session, exc as orm_exc
+
+from sqlalchemy.testing.assertsql import AllOf, CompiledSQL
-from sqlalchemy.testing.assertsql import AllOf, CompiledSQL, Or
class AssertsUOW(object):
+
def _get_test_uow(self, session):
uow = unitofwork.UOWTransaction(session)
deleted = set(session._deleted)
@@ -24,26 +26,29 @@ class AssertsUOW(object):
uow.register_object(d, isdelete=True)
return uow
- def _assert_uow_size(self, session, expected ):
+ def _assert_uow_size(self, session, expected):
uow = self._get_test_uow(session)
postsort_actions = uow._generate_actions()
print(postsort_actions)
eq_(len(postsort_actions), expected, postsort_actions)
-class UOWTest(_fixtures.FixtureTest,
- testing.AssertsExecutionResults, AssertsUOW):
+
+class UOWTest(
+ _fixtures.FixtureTest,
+ testing.AssertsExecutionResults, AssertsUOW):
run_inserts = None
+
class RudimentaryFlushTest(UOWTest):
def test_one_to_many_save(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users, properties={
- 'addresses':relationship(Address),
+ 'addresses': relationship(Address),
})
mapper(Address, addresses)
sess = create_session()
@@ -53,32 +58,32 @@ class RudimentaryFlushTest(UOWTest):
sess.add(u1)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL(
- "INSERT INTO users (name) VALUES (:name)",
- {'name': 'u1'}
- ),
- CompiledSQL(
- "INSERT INTO addresses (user_id, email_address) "
- "VALUES (:user_id, :email_address)",
- lambda ctx: {'email_address': 'a1', 'user_id':u1.id}
- ),
- CompiledSQL(
- "INSERT INTO addresses (user_id, email_address) "
- "VALUES (:user_id, :email_address)",
- lambda ctx: {'email_address': 'a2', 'user_id':u1.id}
- ),
- )
+ testing.db,
+ sess.flush,
+ CompiledSQL(
+ "INSERT INTO users (name) VALUES (:name)",
+ {'name': 'u1'}
+ ),
+ CompiledSQL(
+ "INSERT INTO addresses (user_id, email_address) "
+ "VALUES (:user_id, :email_address)",
+ lambda ctx: {'email_address': 'a1', 'user_id': u1.id}
+ ),
+ CompiledSQL(
+ "INSERT INTO addresses (user_id, email_address) "
+ "VALUES (:user_id, :email_address)",
+ lambda ctx: {'email_address': 'a2', 'user_id': u1.id}
+ ),
+ )
def test_one_to_many_delete_all(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users, properties={
- 'addresses':relationship(Address),
+ 'addresses': relationship(Address),
})
mapper(Address, addresses)
sess = create_session()
@@ -91,26 +96,26 @@ class RudimentaryFlushTest(UOWTest):
sess.delete(a1)
sess.delete(a2)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL(
- "DELETE FROM addresses WHERE addresses.id = :id",
- [{'id':a1.id},{'id':a2.id}]
- ),
- CompiledSQL(
- "DELETE FROM users WHERE users.id = :id",
- {'id':u1.id}
- ),
+ testing.db,
+ sess.flush,
+ CompiledSQL(
+ "DELETE FROM addresses WHERE addresses.id = :id",
+ [{'id': a1.id}, {'id': a2.id}]
+ ),
+ CompiledSQL(
+ "DELETE FROM users WHERE users.id = :id",
+ {'id': u1.id}
+ ),
)
def test_one_to_many_delete_parent(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users, properties={
- 'addresses':relationship(Address),
+ 'addresses': relationship(Address),
})
mapper(Address, addresses)
sess = create_session()
@@ -121,76 +126,75 @@ class RudimentaryFlushTest(UOWTest):
sess.delete(u1)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL(
- "UPDATE addresses SET user_id=:user_id WHERE "
- "addresses.id = :addresses_id",
- lambda ctx: [{'addresses_id': a1.id, 'user_id': None}]
- ),
- CompiledSQL(
- "UPDATE addresses SET user_id=:user_id WHERE "
- "addresses.id = :addresses_id",
- lambda ctx: [{'addresses_id': a2.id, 'user_id': None}]
- ),
- CompiledSQL(
- "DELETE FROM users WHERE users.id = :id",
- {'id':u1.id}
- ),
+ testing.db,
+ sess.flush,
+ CompiledSQL(
+ "UPDATE addresses SET user_id=:user_id WHERE "
+ "addresses.id = :addresses_id",
+ lambda ctx: [{'addresses_id': a1.id, 'user_id': None}]
+ ),
+ CompiledSQL(
+ "UPDATE addresses SET user_id=:user_id WHERE "
+ "addresses.id = :addresses_id",
+ lambda ctx: [{'addresses_id': a2.id, 'user_id': None}]
+ ),
+ CompiledSQL(
+ "DELETE FROM users WHERE users.id = :id",
+ {'id': u1.id}
+ ),
)
def test_many_to_one_save(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
-
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={
- 'user':relationship(User)
+ 'user': relationship(User)
})
sess = create_session()
u1 = User(name='u1')
a1, a2 = Address(email_address='a1', user=u1), \
- Address(email_address='a2', user=u1)
+ Address(email_address='a2', user=u1)
sess.add_all([a1, a2])
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL(
- "INSERT INTO users (name) VALUES (:name)",
- {'name': 'u1'}
- ),
- CompiledSQL(
- "INSERT INTO addresses (user_id, email_address) "
- "VALUES (:user_id, :email_address)",
- lambda ctx: {'email_address': 'a1', 'user_id':u1.id}
- ),
- CompiledSQL(
- "INSERT INTO addresses (user_id, email_address) "
- "VALUES (:user_id, :email_address)",
- lambda ctx: {'email_address': 'a2', 'user_id':u1.id}
- ),
- )
+ testing.db,
+ sess.flush,
+ CompiledSQL(
+ "INSERT INTO users (name) VALUES (:name)",
+ {'name': 'u1'}
+ ),
+ CompiledSQL(
+ "INSERT INTO addresses (user_id, email_address) "
+ "VALUES (:user_id, :email_address)",
+ lambda ctx: {'email_address': 'a1', 'user_id': u1.id}
+ ),
+ CompiledSQL(
+ "INSERT INTO addresses (user_id, email_address) "
+ "VALUES (:user_id, :email_address)",
+ lambda ctx: {'email_address': 'a2', 'user_id': u1.id}
+ ),
+ )
def test_many_to_one_delete_all(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={
- 'user':relationship(User)
+ 'user': relationship(User)
})
sess = create_session()
u1 = User(name='u1')
a1, a2 = Address(email_address='a1', user=u1), \
- Address(email_address='a2', user=u1)
+ Address(email_address='a2', user=u1)
sess.add_all([a1, a2])
sess.flush()
@@ -198,71 +202,71 @@ class RudimentaryFlushTest(UOWTest):
sess.delete(a1)
sess.delete(a2)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL(
- "DELETE FROM addresses WHERE addresses.id = :id",
- [{'id':a1.id},{'id':a2.id}]
- ),
- CompiledSQL(
- "DELETE FROM users WHERE users.id = :id",
- {'id':u1.id}
- ),
+ testing.db,
+ sess.flush,
+ CompiledSQL(
+ "DELETE FROM addresses WHERE addresses.id = :id",
+ [{'id': a1.id}, {'id': a2.id}]
+ ),
+ CompiledSQL(
+ "DELETE FROM users WHERE users.id = :id",
+ {'id': u1.id}
+ ),
)
def test_many_to_one_delete_target(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={
- 'user':relationship(User)
+ 'user': relationship(User)
})
sess = create_session()
u1 = User(name='u1')
a1, a2 = Address(email_address='a1', user=u1), \
- Address(email_address='a2', user=u1)
+ Address(email_address='a2', user=u1)
sess.add_all([a1, a2])
sess.flush()
sess.delete(u1)
a1.user = a2.user = None
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL(
- "UPDATE addresses SET user_id=:user_id WHERE "
- "addresses.id = :addresses_id",
- lambda ctx: [{'addresses_id': a1.id, 'user_id': None}]
- ),
- CompiledSQL(
- "UPDATE addresses SET user_id=:user_id WHERE "
- "addresses.id = :addresses_id",
- lambda ctx: [{'addresses_id': a2.id, 'user_id': None}]
- ),
- CompiledSQL(
- "DELETE FROM users WHERE users.id = :id",
- {'id':u1.id}
- ),
+ testing.db,
+ sess.flush,
+ CompiledSQL(
+ "UPDATE addresses SET user_id=:user_id WHERE "
+ "addresses.id = :addresses_id",
+ lambda ctx: [{'addresses_id': a1.id, 'user_id': None}]
+ ),
+ CompiledSQL(
+ "UPDATE addresses SET user_id=:user_id WHERE "
+ "addresses.id = :addresses_id",
+ lambda ctx: [{'addresses_id': a2.id, 'user_id': None}]
+ ),
+ CompiledSQL(
+ "DELETE FROM users WHERE users.id = :id",
+ {'id': u1.id}
+ ),
)
def test_many_to_one_delete_unloaded(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={
- 'parent':relationship(User)
+ 'parent': relationship(User)
})
parent = User(name='p1')
c1, c2 = Address(email_address='c1', parent=parent), \
- Address(email_address='c2', parent=parent)
+ Address(email_address='c2', parent=parent)
session = Session()
session.add_all([c1, c2])
@@ -295,16 +299,20 @@ class RudimentaryFlushTest(UOWTest):
# the User row might be handled before or the addresses
# are loaded so need to use AllOf
CompiledSQL(
- "SELECT addresses.id AS addresses_id, addresses.user_id AS "
+ "SELECT addresses.id AS addresses_id, "
+ "addresses.user_id AS "
"addresses_user_id, addresses.email_address AS "
- "addresses_email_address FROM addresses WHERE addresses.id = "
+ "addresses_email_address FROM addresses "
+ "WHERE addresses.id = "
":param_1",
lambda ctx: {'param_1': c1id}
),
CompiledSQL(
- "SELECT addresses.id AS addresses_id, addresses.user_id AS "
+ "SELECT addresses.id AS addresses_id, "
+ "addresses.user_id AS "
"addresses_user_id, addresses.email_address AS "
- "addresses_email_address FROM addresses WHERE addresses.id = "
+ "addresses_email_address FROM addresses "
+ "WHERE addresses.id = "
":param_1",
lambda ctx: {'param_1': c2id}
),
@@ -326,18 +334,18 @@ class RudimentaryFlushTest(UOWTest):
def test_many_to_one_delete_childonly_unloaded(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={
- 'parent':relationship(User)
+ 'parent': relationship(User)
})
parent = User(name='p1')
c1, c2 = Address(email_address='c1', parent=parent), \
- Address(email_address='c2', parent=parent)
+ Address(email_address='c2', parent=parent)
session = Session()
session.add_all([c1, c2])
@@ -345,7 +353,7 @@ class RudimentaryFlushTest(UOWTest):
session.flush()
- pid = parent.id
+ #pid = parent.id
c1id = c1.id
c2id = c2.id
@@ -360,18 +368,23 @@ class RudimentaryFlushTest(UOWTest):
session.flush,
AllOf(
# [ticket:2049] - we aren't deleting User,
- # relationship is simple m2o, no SELECT should be emitted for it.
+ # relationship is simple m2o, no SELECT should be emitted for
+ # it.
CompiledSQL(
- "SELECT addresses.id AS addresses_id, addresses.user_id AS "
+ "SELECT addresses.id AS addresses_id, "
+ "addresses.user_id AS "
"addresses_user_id, addresses.email_address AS "
- "addresses_email_address FROM addresses WHERE addresses.id = "
+ "addresses_email_address FROM addresses "
+ "WHERE addresses.id = "
":param_1",
lambda ctx: {'param_1': c1id}
),
CompiledSQL(
- "SELECT addresses.id AS addresses_id, addresses.user_id AS "
+ "SELECT addresses.id AS addresses_id, "
+ "addresses.user_id AS "
"addresses_user_id, addresses.email_address AS "
- "addresses_email_address FROM addresses WHERE addresses.id = "
+ "addresses_email_address FROM addresses "
+ "WHERE addresses.id = "
":param_1",
lambda ctx: {'param_1': c2id}
),
@@ -384,18 +397,18 @@ class RudimentaryFlushTest(UOWTest):
def test_many_to_one_delete_childonly_unloaded_expired(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={
- 'parent':relationship(User)
+ 'parent': relationship(User)
})
parent = User(name='p1')
c1, c2 = Address(email_address='c1', parent=parent), \
- Address(email_address='c2', parent=parent)
+ Address(email_address='c2', parent=parent)
session = Session()
session.add_all([c1, c2])
@@ -403,7 +416,7 @@ class RudimentaryFlushTest(UOWTest):
session.flush()
- pid = parent.id
+ #pid = parent.id
c1id = c1.id
c2id = c2.id
@@ -420,16 +433,20 @@ class RudimentaryFlushTest(UOWTest):
AllOf(
# the parent User is expired, so it gets loaded here.
CompiledSQL(
- "SELECT addresses.id AS addresses_id, addresses.user_id AS "
+ "SELECT addresses.id AS addresses_id, "
+ "addresses.user_id AS "
"addresses_user_id, addresses.email_address AS "
- "addresses_email_address FROM addresses WHERE addresses.id = "
+ "addresses_email_address FROM addresses "
+ "WHERE addresses.id = "
":param_1",
lambda ctx: {'param_1': c1id}
),
CompiledSQL(
- "SELECT addresses.id AS addresses_id, addresses.user_id AS "
+ "SELECT addresses.id AS addresses_id, "
+ "addresses.user_id AS "
"addresses_user_id, addresses.email_address AS "
- "addresses_email_address FROM addresses WHERE addresses.id = "
+ "addresses_email_address FROM addresses "
+ "WHERE addresses.id = "
":param_1",
lambda ctx: {'param_1': c2id}
),
@@ -441,17 +458,17 @@ class RudimentaryFlushTest(UOWTest):
)
def test_natural_ordering(self):
- """test that unconnected items take relationship() into account regardless."""
+ """test that unconnected items take relationship()
+ into account regardless."""
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
-
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={
- 'parent':relationship(User)
+ 'parent': relationship(User)
})
sess = create_session()
@@ -465,7 +482,7 @@ class RudimentaryFlushTest(UOWTest):
sess.flush,
CompiledSQL(
"INSERT INTO users (id, name) VALUES (:id, :name)",
- {'id':1, 'name':'u1'}),
+ {'id': 1, 'name': 'u1'}),
CompiledSQL(
"INSERT INTO addresses (id, user_id, email_address) "
"VALUES (:id, :user_id, :email_address)",
@@ -489,13 +506,13 @@ class RudimentaryFlushTest(UOWTest):
)
def test_natural_selfref(self):
- """test that unconnected items take relationship() into account regardless."""
+ """test that unconnected items take relationship()
+ into account regardless."""
Node, nodes = self.classes.Node, self.tables.nodes
-
mapper(Node, nodes, properties={
- 'children':relationship(Node)
+ 'children': relationship(Node)
})
sess = create_session()
@@ -515,20 +532,18 @@ class RudimentaryFlushTest(UOWTest):
"INSERT INTO nodes (id, parent_id, data) VALUES "
"(:id, :parent_id, :data)",
[{'parent_id': None, 'data': None, 'id': 1},
- {'parent_id': 1, 'data': None, 'id': 2},
- {'parent_id': 2, 'data': None, 'id': 3}]
- ),
+ {'parent_id': 1, 'data': None, 'id': 2},
+ {'parent_id': 2, 'data': None, 'id': 3}]
+ ),
)
def test_many_to_many(self):
- keywords, items, item_keywords, Keyword, Item = (self.tables.keywords,
- self.tables.items,
- self.tables.item_keywords,
- self.classes.Keyword,
- self.classes.Item)
+ keywords, items, item_keywords, Keyword, Item = (
+ self.tables.keywords, self.tables.items, self.tables.item_keywords,
+ self.classes.Keyword, self.classes.Item)
mapper(Item, items, properties={
- 'keywords':relationship(Keyword, secondary=item_keywords)
+ 'keywords': relationship(Keyword, secondary=item_keywords)
})
mapper(Keyword, keywords)
@@ -537,45 +552,45 @@ class RudimentaryFlushTest(UOWTest):
i1 = Item(description='i1', keywords=[k1])
sess.add(i1)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- AllOf(
- CompiledSQL(
+ testing.db,
+ sess.flush,
+ AllOf(
+ CompiledSQL(
"INSERT INTO keywords (name) VALUES (:name)",
- {'name':'k1'}
- ),
- CompiledSQL(
- "INSERT INTO items (description) VALUES (:description)",
- {'description':'i1'}
- ),
+ {'name': 'k1'}
),
CompiledSQL(
- "INSERT INTO item_keywords (item_id, keyword_id) "
- "VALUES (:item_id, :keyword_id)",
- lambda ctx:{'item_id':i1.id, 'keyword_id':k1.id}
- )
+ "INSERT INTO items (description) VALUES (:description)",
+ {'description': 'i1'}
+ ),
+ ),
+ CompiledSQL(
+ "INSERT INTO item_keywords (item_id, keyword_id) "
+ "VALUES (:item_id, :keyword_id)",
+ lambda ctx: {'item_id': i1.id, 'keyword_id': k1.id}
+ )
)
# test that keywords collection isn't loaded
sess.expire(i1, ['keywords'])
i1.description = 'i2'
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL("UPDATE items SET description=:description "
- "WHERE items.id = :items_id",
- lambda ctx:{'description':'i2', 'items_id':i1.id})
+ testing.db,
+ sess.flush,
+ CompiledSQL("UPDATE items SET description=:description "
+ "WHERE items.id = :items_id",
+ lambda ctx: {'description': 'i2', 'items_id': i1.id})
)
def test_m2o_flush_size(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={
- 'user':relationship(User, passive_updates=True)
+ 'user': relationship(User, passive_updates=True)
})
sess = create_session()
u1 = User(name='ed')
@@ -584,12 +599,12 @@ class RudimentaryFlushTest(UOWTest):
def test_o2m_flush_size(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users, properties={
- 'addresses':relationship(Address),
+ 'addresses': relationship(Address),
})
mapper(Address, addresses)
@@ -600,7 +615,7 @@ class RudimentaryFlushTest(UOWTest):
sess.flush()
- u1.name='jack'
+ u1.name = 'jack'
self._assert_uow_size(sess, 2)
sess.flush()
@@ -617,7 +632,7 @@ class RudimentaryFlushTest(UOWTest):
sess = create_session()
u1 = sess.query(User).first()
- u1.name='ed'
+ u1.name = 'ed'
self._assert_uow_size(sess, 2)
u1.addresses
@@ -625,6 +640,7 @@ class RudimentaryFlushTest(UOWTest):
class SingleCycleTest(UOWTest):
+
def teardown(self):
engines.testing_reaper.rollback_all()
# mysql can't handle delete from nodes
@@ -639,7 +655,7 @@ class SingleCycleTest(UOWTest):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'children':relationship(Node)
+ 'children': relationship(Node)
})
sess = create_session()
@@ -649,15 +665,15 @@ class SingleCycleTest(UOWTest):
sess.add(n1)
self.assert_sql_execution(
- testing.db,
- sess.flush,
+ testing.db,
+ sess.flush,
- CompiledSQL(
- "INSERT INTO nodes (parent_id, data) VALUES "
- "(:parent_id, :data)",
- {'parent_id': None, 'data': 'n1'}
- ),
- AllOf(
+ CompiledSQL(
+ "INSERT INTO nodes (parent_id, data) VALUES "
+ "(:parent_id, :data)",
+ {'parent_id': None, 'data': 'n1'}
+ ),
+ AllOf(
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
@@ -668,14 +684,14 @@ class SingleCycleTest(UOWTest):
"(:parent_id, :data)",
lambda ctx: {'parent_id': n1.id, 'data': 'n3'}
),
- )
)
+ )
def test_one_to_many_delete_all(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'children':relationship(Node)
+ 'children': relationship(Node)
})
sess = create_session()
@@ -689,19 +705,19 @@ class SingleCycleTest(UOWTest):
sess.delete(n2)
sess.delete(n3)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
- lambda ctx:[{'id':n2.id}, {'id':n3.id}]),
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
- lambda ctx: {'id':n1.id})
+ testing.db,
+ sess.flush,
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: [{'id': n2.id}, {'id': n3.id}]),
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: {'id': n1.id})
)
def test_one_to_many_delete_parent(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'children':relationship(Node)
+ 'children': relationship(Node)
})
sess = create_session()
@@ -713,25 +729,25 @@ class SingleCycleTest(UOWTest):
sess.delete(n1)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- AllOf(
- CompiledSQL("UPDATE nodes SET parent_id=:parent_id "
- "WHERE nodes.id = :nodes_id",
- lambda ctx: {'nodes_id':n3.id, 'parent_id':None}),
- CompiledSQL("UPDATE nodes SET parent_id=:parent_id "
- "WHERE nodes.id = :nodes_id",
- lambda ctx: {'nodes_id':n2.id, 'parent_id':None}),
- ),
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
- lambda ctx:{'id':n1.id})
- )
+ testing.db, sess.flush, AllOf(
+ CompiledSQL(
+ "UPDATE nodes SET parent_id=:parent_id "
+ "WHERE nodes.id = :nodes_id", lambda ctx: {
+ 'nodes_id': n3.id, 'parent_id': None}),
+ CompiledSQL(
+ "UPDATE nodes SET parent_id=:parent_id "
+ "WHERE nodes.id = :nodes_id", lambda ctx: {
+ 'nodes_id': n2.id, 'parent_id': None}),
+ ),
+ CompiledSQL(
+ "DELETE FROM nodes WHERE nodes.id = :id", lambda ctx: {
+ 'id': n1.id}))
def test_many_to_one_save(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'parent':relationship(Node, remote_side=nodes.c.id)
+ 'parent': relationship(Node, remote_side=nodes.c.id)
})
sess = create_session()
@@ -741,15 +757,15 @@ class SingleCycleTest(UOWTest):
sess.add_all([n2, n3])
self.assert_sql_execution(
- testing.db,
- sess.flush,
+ testing.db,
+ sess.flush,
- CompiledSQL(
- "INSERT INTO nodes (parent_id, data) VALUES "
- "(:parent_id, :data)",
- {'parent_id': None, 'data': 'n1'}
- ),
- AllOf(
+ CompiledSQL(
+ "INSERT INTO nodes (parent_id, data) VALUES "
+ "(:parent_id, :data)",
+ {'parent_id': None, 'data': 'n1'}
+ ),
+ AllOf(
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
@@ -760,14 +776,14 @@ class SingleCycleTest(UOWTest):
"(:parent_id, :data)",
lambda ctx: {'parent_id': n1.id, 'data': 'n3'}
),
- )
)
+ )
def test_many_to_one_delete_all(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'parent':relationship(Node, remote_side=nodes.c.id)
+ 'parent': relationship(Node, remote_side=nodes.c.id)
})
sess = create_session()
@@ -781,19 +797,19 @@ class SingleCycleTest(UOWTest):
sess.delete(n2)
sess.delete(n3)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
- lambda ctx:[{'id':n2.id},{'id':n3.id}]),
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
- lambda ctx: {'id':n1.id})
+ testing.db,
+ sess.flush,
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: [{'id': n2.id}, {'id': n3.id}]),
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: {'id': n1.id})
)
def test_many_to_one_set_null_unloaded(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'parent':relationship(Node, remote_side=nodes.c.id)
+ 'parent': relationship(Node, remote_side=nodes.c.id)
})
sess = create_session()
n1 = Node(data='n1')
@@ -810,7 +826,7 @@ class SingleCycleTest(UOWTest):
CompiledSQL(
"UPDATE nodes SET parent_id=:parent_id WHERE "
"nodes.id = :nodes_id",
- lambda ctx: {"parent_id":None, "nodes_id":n2.id}
+ lambda ctx: {"parent_id": None, "nodes_id": n2.id}
)
)
@@ -818,7 +834,7 @@ class SingleCycleTest(UOWTest):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'children':relationship(Node)
+ 'children': relationship(Node)
})
sess = create_session()
@@ -836,9 +852,9 @@ class SingleCycleTest(UOWTest):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'children':relationship(Node,
- backref=backref('parent',
- remote_side=nodes.c.id))
+ 'children': relationship(Node,
+ backref=backref('parent',
+ remote_side=nodes.c.id))
})
sess = create_session()
@@ -857,11 +873,15 @@ class SingleCycleTest(UOWTest):
def test_bidirectional_multilevel_save(self):
Node, nodes = self.classes.Node, self.tables.nodes
- mapper(Node, nodes, properties={
- 'children':relationship(Node,
- backref=backref('parent', remote_side=nodes.c.id)
- )
- })
+ mapper(
+ Node,
+ nodes,
+ properties={
+ 'children': relationship(
+ Node,
+ backref=backref(
+ 'parent',
+ remote_side=nodes.c.id))})
sess = create_session()
n1 = Node(data='n1')
n1.children.append(Node(data='n11'))
@@ -878,37 +898,37 @@ class SingleCycleTest(UOWTest):
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
- lambda ctx:{'parent_id':None, 'data':'n1'}
+ lambda ctx: {'parent_id': None, 'data': 'n1'}
),
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
- lambda ctx:{'parent_id':n1.id, 'data':'n11'}
+ lambda ctx: {'parent_id': n1.id, 'data': 'n11'}
),
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
- lambda ctx:{'parent_id':n1.id, 'data':'n12'}
+ lambda ctx: {'parent_id': n1.id, 'data': 'n12'}
),
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
- lambda ctx:{'parent_id':n1.id, 'data':'n13'}
+ lambda ctx: {'parent_id': n1.id, 'data': 'n13'}
),
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
- lambda ctx:{'parent_id':n12.id, 'data':'n121'}
+ lambda ctx: {'parent_id': n12.id, 'data': 'n121'}
),
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
- lambda ctx:{'parent_id':n12.id, 'data':'n122'}
+ lambda ctx: {'parent_id': n12.id, 'data': 'n122'}
),
CompiledSQL(
"INSERT INTO nodes (parent_id, data) VALUES "
"(:parent_id, :data)",
- lambda ctx:{'parent_id':n12.id, 'data':'n123'}
+ lambda ctx: {'parent_id': n12.id, 'data': 'n123'}
),
)
@@ -916,7 +936,7 @@ class SingleCycleTest(UOWTest):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'children':relationship(Node)
+ 'children': relationship(Node)
})
sess = create_session()
n1 = Node(data='ed')
@@ -925,7 +945,7 @@ class SingleCycleTest(UOWTest):
sess.flush()
- n1.data='jack'
+ n1.data = 'jack'
self._assert_uow_size(sess, 2)
sess.flush()
@@ -942,18 +962,17 @@ class SingleCycleTest(UOWTest):
sess = create_session()
n1 = sess.query(Node).first()
- n1.data='ed'
+ n1.data = 'ed'
self._assert_uow_size(sess, 2)
n1.children
self._assert_uow_size(sess, 2)
-
def test_delete_unloaded_m2o(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
- 'parent':relationship(Node, remote_side=nodes.c.id)
+ 'parent': relationship(Node, remote_side=nodes.c.id)
})
parent = Node()
@@ -1022,35 +1041,38 @@ class SingleCycleTest(UOWTest):
)
+class SingleCyclePlusAttributeTest(
+ fixtures.MappedTest,
+ testing.AssertsExecutionResults,
+ AssertsUOW):
-class SingleCyclePlusAttributeTest(fixtures.MappedTest,
- testing.AssertsExecutionResults, AssertsUOW):
@classmethod
def define_tables(cls, metadata):
Table('nodes', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('parent_id', Integer, ForeignKey('nodes.id')),
- Column('data', String(30))
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('parent_id', Integer, ForeignKey('nodes.id')),
+ Column('data', String(30))
+ )
Table('foobars', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('parent_id', Integer, ForeignKey('nodes.id')),
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('parent_id', Integer, ForeignKey('nodes.id')),
+ )
def test_flush_size(self):
foobars, nodes = self.tables.foobars, self.tables.nodes
class Node(fixtures.ComparableEntity):
pass
+
class FooBar(fixtures.ComparableEntity):
pass
mapper(Node, nodes, properties={
- 'children':relationship(Node),
- 'foobars':relationship(FooBar)
+ 'children': relationship(Node),
+ 'foobars': relationship(FooBar)
})
mapper(FooBar, foobars)
@@ -1070,25 +1092,30 @@ class SingleCyclePlusAttributeTest(fixtures.MappedTest,
sess.flush()
+
class SingleCycleM2MTest(fixtures.MappedTest,
- testing.AssertsExecutionResults, AssertsUOW):
+ testing.AssertsExecutionResults, AssertsUOW):
@classmethod
def define_tables(cls, metadata):
- nodes = Table('nodes', metadata,
- Column('id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('data', String(30)),
- Column('favorite_node_id', Integer, ForeignKey('nodes.id'))
- )
+ Table(
+ 'nodes', metadata,
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column(
+ 'data', String(30)), Column(
+ 'favorite_node_id', Integer, ForeignKey('nodes.id')))
- node_to_nodes =Table('node_to_nodes', metadata,
- Column('left_node_id', Integer,
- ForeignKey('nodes.id'),primary_key=True),
- Column('right_node_id', Integer,
- ForeignKey('nodes.id'),primary_key=True),
- )
+ Table(
+ 'node_to_nodes', metadata,
+ Column(
+ 'left_node_id', Integer,
+ ForeignKey('nodes.id'), primary_key=True),
+ Column(
+ 'right_node_id', Integer,
+ ForeignKey('nodes.id'), primary_key=True),
+ )
def test_many_to_many_one(self):
nodes, node_to_nodes = self.tables.nodes, self.tables.node_to_nodes
@@ -1096,14 +1123,19 @@ class SingleCycleM2MTest(fixtures.MappedTest,
class Node(fixtures.ComparableEntity):
pass
- mapper(Node, nodes, properties={
- 'children':relationship(Node, secondary=node_to_nodes,
- primaryjoin=nodes.c.id==node_to_nodes.c.left_node_id,
- secondaryjoin=nodes.c.id==node_to_nodes.c.right_node_id,
- backref='parents'
- ),
- 'favorite':relationship(Node, remote_side=nodes.c.id)
- })
+ mapper(
+ Node,
+ nodes,
+ properties={
+ 'children': relationship(
+ Node,
+ secondary=node_to_nodes,
+ primaryjoin=nodes.c.id == node_to_nodes.c.left_node_id,
+ secondaryjoin=nodes.c.id == node_to_nodes.c.right_node_id,
+ backref='parents'),
+ 'favorite': relationship(
+ Node,
+ remote_side=nodes.c.id)})
sess = create_session()
n1 = Node(data='n1')
@@ -1128,46 +1160,46 @@ class SingleCycleM2MTest(fixtures.MappedTest,
sess.flush()
eq_(
sess.query(node_to_nodes.c.left_node_id,
- node_to_nodes.c.right_node_id).\
- order_by(node_to_nodes.c.left_node_id,
- node_to_nodes.c.right_node_id).\
- all(),
+ node_to_nodes.c.right_node_id).
+ order_by(node_to_nodes.c.left_node_id,
+ node_to_nodes.c.right_node_id).
+ all(),
sorted([
- (n1.id, n2.id), (n1.id, n3.id), (n1.id, n4.id),
- (n2.id, n3.id), (n2.id, n5.id),
- (n3.id, n5.id), (n3.id, n4.id)
- ])
+ (n1.id, n2.id), (n1.id, n3.id), (n1.id, n4.id),
+ (n2.id, n3.id), (n2.id, n5.id),
+ (n3.id, n5.id), (n3.id, n4.id)
+ ])
)
sess.delete(n1)
self.assert_sql_execution(
- testing.db,
- sess.flush,
- # this is n1.parents firing off, as it should, since
- # passive_deletes is False for n1.parents
- CompiledSQL(
- "SELECT nodes.id AS nodes_id, nodes.data AS nodes_data, "
- "nodes.favorite_node_id AS nodes_favorite_node_id FROM "
- "nodes, node_to_nodes WHERE :param_1 = "
- "node_to_nodes.right_node_id AND nodes.id = "
- "node_to_nodes.left_node_id" ,
- lambda ctx:{'param_1': n1.id},
- ),
- CompiledSQL(
- "DELETE FROM node_to_nodes WHERE "
- "node_to_nodes.left_node_id = :left_node_id AND "
- "node_to_nodes.right_node_id = :right_node_id",
- lambda ctx:[
- {'right_node_id': n2.id, 'left_node_id': n1.id},
- {'right_node_id': n3.id, 'left_node_id': n1.id},
- {'right_node_id': n4.id, 'left_node_id': n1.id}
- ]
- ),
- CompiledSQL(
- "DELETE FROM nodes WHERE nodes.id = :id",
- lambda ctx:{'id': n1.id}
- ),
+ testing.db,
+ sess.flush,
+ # this is n1.parents firing off, as it should, since
+ # passive_deletes is False for n1.parents
+ CompiledSQL(
+ "SELECT nodes.id AS nodes_id, nodes.data AS nodes_data, "
+ "nodes.favorite_node_id AS nodes_favorite_node_id FROM "
+ "nodes, node_to_nodes WHERE :param_1 = "
+ "node_to_nodes.right_node_id AND nodes.id = "
+ "node_to_nodes.left_node_id",
+ lambda ctx: {'param_1': n1.id},
+ ),
+ CompiledSQL(
+ "DELETE FROM node_to_nodes WHERE "
+ "node_to_nodes.left_node_id = :left_node_id AND "
+ "node_to_nodes.right_node_id = :right_node_id",
+ lambda ctx: [
+ {'right_node_id': n2.id, 'left_node_id': n1.id},
+ {'right_node_id': n3.id, 'left_node_id': n1.id},
+ {'right_node_id': n4.id, 'left_node_id': n1.id}
+ ]
+ ),
+ CompiledSQL(
+ "DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: {'id': n1.id}
+ ),
)
for n in [n2, n3, n4, n5]:
@@ -1185,7 +1217,7 @@ class SingleCycleM2MTest(fixtures.MappedTest,
"DELETE FROM node_to_nodes WHERE node_to_nodes.left_node_id "
"= :left_node_id AND node_to_nodes.right_node_id = "
":right_node_id",
- lambda ctx:[
+ lambda ctx: [
{'right_node_id': n5.id, 'left_node_id': n3.id},
{'right_node_id': n4.id, 'left_node_id': n3.id},
{'right_node_id': n3.id, 'left_node_id': n2.id},
@@ -1194,38 +1226,41 @@ class SingleCycleM2MTest(fixtures.MappedTest,
),
CompiledSQL(
"DELETE FROM nodes WHERE nodes.id = :id",
- lambda ctx:[{'id': n4.id}, {'id': n5.id}]
+ lambda ctx: [{'id': n4.id}, {'id': n5.id}]
),
CompiledSQL(
"DELETE FROM nodes WHERE nodes.id = :id",
- lambda ctx:[{'id': n2.id}, {'id': n3.id}]
+ lambda ctx: [{'id': n2.id}, {'id': n3.id}]
),
)
+
class RowswitchAccountingTest(fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
Table('parent', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', Integer)
- )
+ Column('id', Integer, primary_key=True),
+ Column('data', Integer)
+ )
Table('child', metadata,
- Column('id', Integer, ForeignKey('parent.id'), primary_key=True),
- Column('data', Integer)
- )
+ Column('id', Integer, ForeignKey('parent.id'), primary_key=True),
+ Column('data', Integer)
+ )
def _fixture(self):
parent, child = self.tables.parent, self.tables.child
class Parent(fixtures.BasicEntity):
pass
+
class Child(fixtures.BasicEntity):
pass
mapper(Parent, parent, properties={
- 'child':relationship(Child, uselist=False,
- cascade="all, delete-orphan",
- backref="parent")
+ 'child': relationship(Child, uselist=False,
+ cascade="all, delete-orphan",
+ backref="parent")
})
mapper(Child, child)
return Parent, Child
@@ -1274,8 +1309,10 @@ class RowswitchAccountingTest(fixtures.MappedTest):
eq_(sess.scalar(self.tables.parent.count()), 0)
+
class RowswitchM2OTest(fixtures.MappedTest):
# tests for #3060 and related issues
+
@classmethod
def define_tables(cls, metadata):
Table(
@@ -1299,17 +1336,18 @@ class RowswitchM2OTest(fixtures.MappedTest):
class A(fixtures.BasicEntity):
pass
+
class B(fixtures.BasicEntity):
pass
+
class C(fixtures.BasicEntity):
pass
-
mapper(A, a, properties={
- 'bs': relationship(B, cascade="all, delete-orphan")
+ 'bs': relationship(B, cascade="all, delete-orphan")
})
mapper(B, b, properties={
- 'c': relationship(C)
+ 'c': relationship(C)
})
mapper(C, c)
return A, B, C
@@ -1391,29 +1429,31 @@ class RowswitchM2OTest(fixtures.MappedTest):
class BasicStaleChecksTest(fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
Table('parent', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', Integer)
- )
+ Column('id', Integer, primary_key=True),
+ Column('data', Integer)
+ )
Table('child', metadata,
- Column('id', Integer, ForeignKey('parent.id'), primary_key=True),
- Column('data', Integer)
- )
+ Column('id', Integer, ForeignKey('parent.id'), primary_key=True),
+ Column('data', Integer)
+ )
def _fixture(self, confirm_deleted_rows=True):
parent, child = self.tables.parent, self.tables.child
class Parent(fixtures.BasicEntity):
pass
+
class Child(fixtures.BasicEntity):
pass
mapper(Parent, parent, properties={
- 'child':relationship(Child, uselist=False,
- cascade="all, delete-orphan",
- backref="parent"),
+ 'child': relationship(Child, uselist=False,
+ cascade="all, delete-orphan",
+ backref="parent"),
}, confirm_deleted_rows=confirm_deleted_rows)
mapper(Child, child)
return Parent, Child
@@ -1431,7 +1471,7 @@ class BasicStaleChecksTest(fixtures.MappedTest):
assert_raises_message(
orm_exc.StaleDataError,
"UPDATE statement on table 'parent' expected to "
- "update 1 row\(s\); 0 were matched.",
+ "update 1 row\(s\); 0 were matched.",
sess.flush
)
@@ -1451,7 +1491,7 @@ class BasicStaleChecksTest(fixtures.MappedTest):
assert_raises_message(
exc.SAWarning,
"DELETE statement on table 'parent' expected to "
- "delete 2 row\(s\); 0 were matched.",
+ "delete 2 row\(s\); 0 were matched.",
sess.flush
)
@@ -1471,14 +1511,15 @@ class BasicStaleChecksTest(fixtures.MappedTest):
class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults):
+
@classmethod
def define_tables(cls, metadata):
Table('t', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('data', String(50)),
- Column('def_', String(50), server_default='def1')
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', String(50)),
+ Column('def_', String(50), server_default='def1')
+ )
def test_batch_interaction(self):
"""test batching groups same-structured, primary
@@ -1532,8 +1573,8 @@ class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults):
),
CompiledSQL(
"INSERT INTO t (id, data, def_) VALUES (:id, :data, :def_)",
- [{'data': 't9', 'id': 9, 'def_':'def2'},
- {'data': 't10', 'id': 10, 'def_':'def3'}]
+ [{'data': 't9', 'id': 9, 'def_': 'def2'},
+ {'data': 't10', 'id': 10, 'def_': 'def3'}]
),
CompiledSQL(
"INSERT INTO t (id, data) VALUES (:id, :data)",
@@ -1541,126 +1582,129 @@ class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults):
),
)
+
class LoadersUsingCommittedTest(UOWTest):
- """Test that events which occur within a flush()
- get the same attribute loading behavior as on the outside
- of the flush, and that the unit of work itself uses the
- "committed" version of primary/foreign key attributes
- when loading a collection for historical purposes (this typically
- has importance for when primary key values change).
+
+ """Test that events which occur within a flush()
+ get the same attribute loading behavior as on the outside
+ of the flush, and that the unit of work itself uses the
+ "committed" version of primary/foreign key attributes
+ when loading a collection for historical purposes (this typically
+ has importance for when primary key values change).
+
+ """
+
+ def _mapper_setup(self, passive_updates=True):
+ users, Address, addresses, User = (self.tables.users,
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
+
+ mapper(User, users, properties={
+ 'addresses': relationship(Address,
+ order_by=addresses.c.email_address,
+ passive_updates=passive_updates,
+ backref='user')
+ })
+ mapper(Address, addresses)
+ return create_session(autocommit=False)
+
+ def test_before_update_m2o(self):
+ """Expect normal many to one attribute load behavior
+ (should not get committed value)
+ from within public 'before_update' event"""
+ sess = self._mapper_setup()
+
+ Address, User = self.classes.Address, self.classes.User
+
+ def before_update(mapper, connection, target):
+ # if get committed is used to find target.user, then
+ # it will be still be u1 instead of u2
+ assert target.user.id == target.user_id == u2.id
+ from sqlalchemy import event
+ event.listen(Address, 'before_update', before_update)
+
+ a1 = Address(email_address='a1')
+ u1 = User(name='u1', addresses=[a1])
+ sess.add(u1)
+
+ u2 = User(name='u2')
+ sess.add(u2)
+ sess.commit()
+
+ sess.expunge_all()
+ # lookup an address and move it to the other user
+ a1 = sess.query(Address).get(a1.id)
+
+ # move address to another user's fk
+ assert a1.user_id == u1.id
+ a1.user_id = u2.id
+
+ sess.flush()
+
+ def test_before_update_o2m_passive(self):
+ """Expect normal one to many attribute load behavior
+ (should not get committed value)
+ from within public 'before_update' event"""
+ self._test_before_update_o2m(True)
+
+ def test_before_update_o2m_notpassive(self):
+ """Expect normal one to many attribute load behavior
+ (should not get committed value)
+ from within public 'before_update' event with
+ passive_updates=False
"""
+ self._test_before_update_o2m(False)
- def _mapper_setup(self, passive_updates=True):
- users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
-
- mapper(User, users, properties={
- 'addresses': relationship(Address,
- order_by=addresses.c.email_address,
- passive_updates=passive_updates,
- backref='user')
- })
- mapper(Address, addresses)
- return create_session(autocommit=False)
-
- def test_before_update_m2o(self):
- """Expect normal many to one attribute load behavior
- (should not get committed value)
- from within public 'before_update' event"""
- sess = self._mapper_setup()
-
- Address, User = self.classes.Address, self.classes.User
-
- def before_update(mapper, connection, target):
- # if get committed is used to find target.user, then
- # it will be still be u1 instead of u2
- assert target.user.id == target.user_id == u2.id
- from sqlalchemy import event
- event.listen(Address, 'before_update', before_update)
-
- a1 = Address(email_address='a1')
- u1 = User(name='u1', addresses=[a1])
- sess.add(u1)
-
- u2 = User(name='u2')
- sess.add(u2)
- sess.commit()
-
- sess.expunge_all()
- # lookup an address and move it to the other user
- a1 = sess.query(Address).get(a1.id)
-
- # move address to another user's fk
- assert a1.user_id == u1.id
- a1.user_id = u2.id
+ def _test_before_update_o2m(self, passive_updates):
+ sess = self._mapper_setup(passive_updates=passive_updates)
- sess.flush()
+ Address, User = self.classes.Address, self.classes.User
- def test_before_update_o2m_passive(self):
- """Expect normal one to many attribute load behavior
- (should not get committed value)
- from within public 'before_update' event"""
- self._test_before_update_o2m(True)
+ class AvoidReferencialError(Exception):
- def test_before_update_o2m_notpassive(self):
- """Expect normal one to many attribute load behavior
- (should not get committed value)
- from within public 'before_update' event with
- passive_updates=False
+ """the test here would require ON UPDATE CASCADE on FKs
+ for the flush to fully succeed; this exception is used
+ to cancel the flush before we get that far.
"""
- self._test_before_update_o2m(False)
-
- def _test_before_update_o2m(self, passive_updates):
- sess = self._mapper_setup(passive_updates=passive_updates)
-
- Address, User = self.classes.Address, self.classes.User
-
- class AvoidReferencialError(Exception):
- """the test here would require ON UPDATE CASCADE on FKs
- for the flush to fully succeed; this exception is used
- to cancel the flush before we get that far.
-
- """
-
- def before_update(mapper, connection, target):
- if passive_updates:
- # we shouldn't be using committed value.
- # so, having switched target's primary key,
- # we expect no related items in the collection
- # since we are using passive_updates
- # this is a behavior change since #2350
- assert 'addresses' not in target.__dict__
- eq_(target.addresses, [])
- else:
- # in contrast with passive_updates=True,
- # here we expect the orm to have looked up the addresses
- # with the committed value (it needs to in order to
- # update the foreign keys). So we expect addresses
- # collection to move with the user,
- # (just like they will be after the update)
-
- # collection is already loaded
- assert 'addresses' in target.__dict__
- eq_([a.id for a in target.addresses],
- [a.id for a in [a1, a2]])
- raise AvoidReferencialError()
- from sqlalchemy import event
- event.listen(User, 'before_update', before_update)
-
- a1 = Address(email_address='jack1')
- a2 = Address(email_address='jack2')
- u1 = User(id=1, name='jack', addresses=[a1, a2])
- sess.add(u1)
- sess.commit()
-
- sess.expunge_all()
- u1 = sess.query(User).get(u1.id)
- u1.id = 2
- try:
- sess.flush()
- except AvoidReferencialError:
- pass
+
+ def before_update(mapper, connection, target):
+ if passive_updates:
+ # we shouldn't be using committed value.
+ # so, having switched target's primary key,
+ # we expect no related items in the collection
+ # since we are using passive_updates
+ # this is a behavior change since #2350
+ assert 'addresses' not in target.__dict__
+ eq_(target.addresses, [])
+ else:
+ # in contrast with passive_updates=True,
+ # here we expect the orm to have looked up the addresses
+ # with the committed value (it needs to in order to
+ # update the foreign keys). So we expect addresses
+ # collection to move with the user,
+ # (just like they will be after the update)
+
+ # collection is already loaded
+ assert 'addresses' in target.__dict__
+ eq_([a.id for a in target.addresses],
+ [a.id for a in [a1, a2]])
+ raise AvoidReferencialError()
+ from sqlalchemy import event
+ event.listen(User, 'before_update', before_update)
+
+ a1 = Address(email_address='jack1')
+ a2 = Address(email_address='jack2')
+ u1 = User(id=1, name='jack', addresses=[a1, a2])
+ sess.add(u1)
+ sess.commit()
+
+ sess.expunge_all()
+ u1 = sess.query(User).get(u1.id)
+ u1.id = 2
+ try:
+ sess.flush()
+ except AvoidReferencialError:
+ pass