summaryrefslogtreecommitdiff
path: root/test/orm
diff options
context:
space:
mode:
Diffstat (limited to 'test/orm')
-rw-r--r--test/orm/test_cycles.py14
-rw-r--r--test/orm/test_query.py48
2 files changed, 55 insertions, 7 deletions
diff --git a/test/orm/test_cycles.py b/test/orm/test_cycles.py
index c95b8d152..56386e8d2 100644
--- a/test/orm/test_cycles.py
+++ b/test/orm/test_cycles.py
@@ -1181,9 +1181,10 @@ class PostUpdateBatchingTest(fixtures.MappedTest):
testing.db,
sess.flush,
CompiledSQL(
- "UPDATE parent SET c1_id=:c1_id, c2_id=:c2_id, "
- "c3_id=:c3_id WHERE parent.id = :parent_id",
- lambda ctx: {'c2_id': c23.id, 'parent_id': p1.id, 'c1_id': c12.id, 'c3_id': c31.id}
+ "UPDATE parent SET c1_id=:c1_id, c2_id=:c2_id, c3_id=:c3_id "
+ "WHERE parent.id = :parent_id",
+ lambda ctx: {'c2_id': c23.id, 'parent_id': p1.id,
+ 'c1_id': c12.id, 'c3_id': c31.id}
)
)
@@ -1193,8 +1194,9 @@ class PostUpdateBatchingTest(fixtures.MappedTest):
testing.db,
sess.flush,
CompiledSQL(
- "UPDATE parent SET c1_id=:c1_id, c2_id=:c2_id, "
- "c3_id=:c3_id WHERE parent.id = :parent_id",
- lambda ctx: {'c2_id': None, 'parent_id': p1.id, 'c1_id': None, 'c3_id': None}
+ "UPDATE parent SET c1_id=:c1_id, c2_id=:c2_id, c3_id=:c3_id "
+ "WHERE parent.id = :parent_id",
+ lambda ctx: {'c2_id': None, 'parent_id': p1.id,
+ 'c1_id': None, 'c3_id': None}
)
)
diff --git a/test/orm/test_query.py b/test/orm/test_query.py
index a373f1482..458637453 100644
--- a/test/orm/test_query.py
+++ b/test/orm/test_query.py
@@ -3867,7 +3867,7 @@ class SessionBindTest(QueryTest):
def _assert_bind_args(self, session):
get_bind = mock.Mock(side_effect=session.get_bind)
with mock.patch.object(session, "get_bind", get_bind):
- yield
+ yield get_bind
for call_ in get_bind.mock_calls:
is_(call_[1][0], inspect(self.classes.User))
is_not_(call_[2]['clause'], None)
@@ -3903,6 +3903,52 @@ class SessionBindTest(QueryTest):
session.query(User).filter(User.id == 15).update(
{"name": "foob"}, synchronize_session=False)
+ def test_bulk_update_unordered_dict(self):
+ User = self.classes.User
+ session = Session()
+
+ # Do an update using unordered dict and check that the parametes used
+ # are unordered
+ with self._assert_bind_args(session) as mock_args:
+ session.query(User).filter(User.id == 15).update(
+ {'name': 'foob', 'id': 123})
+ # Confirm that parameters are a dict instead of tuple or list
+ params_type = type(mock_args.mock_calls[0][2]['clause'].parameters)
+ assert params_type is dict
+
+ def test_bulk_update_ordered_dict(self):
+ User = self.classes.User
+ session = Session()
+
+ # Do an update using an ordered dict and check that the parametes used
+ # are unordered
+ with self._assert_bind_args(session) as mock_args:
+ session.query(User).filter(User.id == 15).update(
+ util.OrderedDict((('name', 'foob'), ('id', 123))))
+ params_type = type(mock_args.mock_calls[0][2]['clause'].parameters)
+ assert params_type is dict
+
+ def test_bulk_update_with_order(self):
+ User = self.classes.User
+ session = Session()
+
+ # Do update using a tuple and check that order is preserved
+ with self._assert_bind_args(session) as mock_args:
+ session.query(User).filter(User.id == 15).update(
+ (('id', 123), ('name', 'foob')))
+ cols = [c[0].name for c
+ in mock_args.mock_calls[0][2]['clause'].parameters]
+ assert ['id', 'name'] == cols
+
+ # Now invert the order and use a list instead, and check that order is
+ # also preserved
+ with self._assert_bind_args(session) as mock_args:
+ session.query(User).filter(User.id == 15).update(
+ [('id', 123), ('name', 'foob')])
+ cols = [c[0].name for c
+ in mock_args.mock_calls[0][2]['clause'].parameters]
+ assert ['id', 'name'] == cols
+
def test_bulk_delete_no_sync(self):
User = self.classes.User
session = Session()