diff options
| -rw-r--r-- | doc/build/changelog/unreleased_14/5735.rst | 12 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/dependency.py | 1 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/mapper.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/unitofwork.py | 32 | ||||
| -rw-r--r-- | lib/sqlalchemy/sql/ddl.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/util/topological.py | 29 | ||||
| -rw-r--r-- | test/base/test_dependency.py | 4 | ||||
| -rw-r--r-- | test/orm/test_unitofwork.py | 18 | ||||
| -rw-r--r-- | test/orm/test_unitofworkv2.py | 32 |
9 files changed, 80 insertions, 54 deletions
diff --git a/doc/build/changelog/unreleased_14/5735.rst b/doc/build/changelog/unreleased_14/5735.rst new file mode 100644 index 000000000..8c9bc7883 --- /dev/null +++ b/doc/build/changelog/unreleased_14/5735.rst @@ -0,0 +1,12 @@ +.. change:: + :tags: bug, orm, unitofwork + :tickets: 5735 + + Improved the unit of work topological sorting system such that the + toplogical sort is now deterministic based on the sorting of the input set, + which itself is now sorted at the level of mappers, so that the same inputs + of affected mappers should produce the same output every time, among + mappers / tables that don't have any dependency on each other. This further + reduces the chance of deadlocks as can be observed in a flush that UPDATEs + among multiple, unrelated tables such that row locks are generated. + diff --git a/lib/sqlalchemy/orm/dependency.py b/lib/sqlalchemy/orm/dependency.py index d4680e394..9c2c5ade3 100644 --- a/lib/sqlalchemy/orm/dependency.py +++ b/lib/sqlalchemy/orm/dependency.py @@ -43,6 +43,7 @@ class DependencyProcessor(object): else: self._passive_update_flag = attributes.PASSIVE_OFF + self.sort_key = "%s_%s" % (self.parent._sort_key, prop.key) self.key = prop.key if not self.prop.synchronize_pairs: raise sa_exc.ArgumentError( diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py index e73993e50..e50183894 100644 --- a/lib/sqlalchemy/orm/mapper.py +++ b/lib/sqlalchemy/orm/mapper.py @@ -574,6 +574,10 @@ class Mapper( """ self.class_ = util.assert_arg_type(class_, type, "class_") + self._sort_key = "%s.%s" % ( + self.class_.__module__, + self.class_.__name__, + ) self.class_manager = None diff --git a/lib/sqlalchemy/orm/unitofwork.py b/lib/sqlalchemy/orm/unitofwork.py index 9c67130ce..868f8e087 100644 --- a/lib/sqlalchemy/orm/unitofwork.py +++ b/lib/sqlalchemy/orm/unitofwork.py @@ -422,6 +422,10 @@ class UOWTransaction(object): def execute(self): postsort_actions = self._generate_actions() + postsort_actions = sorted( + postsort_actions, + key=lambda item: item.sort_key, + ) # sort = topological.sort(self.dependencies, postsort_actions) # print "--------------" # print "\ndependencies:", self.dependencies @@ -431,9 +435,10 @@ class UOWTransaction(object): # execute if self.cycles: - for set_ in topological.sort_as_subsets( + for subset in topological.sort_as_subsets( self.dependencies, postsort_actions ): + set_ = set(subset) while set_: n = set_.pop() n.execute_aggregate(self, set_) @@ -542,10 +547,15 @@ class PostSortRec(object): class ProcessAll(IterateMappersMixin, PostSortRec): - __slots__ = "dependency_processor", "isdelete", "fromparent" + __slots__ = "dependency_processor", "isdelete", "fromparent", "sort_key" def __init__(self, uow, dependency_processor, isdelete, fromparent): self.dependency_processor = dependency_processor + self.sort_key = ( + "ProcessAll", + self.dependency_processor.sort_key, + isdelete, + ) self.isdelete = isdelete self.fromparent = fromparent uow.deps[dependency_processor.parent.base_mapper].add( @@ -582,11 +592,12 @@ class ProcessAll(IterateMappersMixin, PostSortRec): class PostUpdateAll(PostSortRec): - __slots__ = "mapper", "isdelete" + __slots__ = "mapper", "isdelete", "sort_key" def __init__(self, uow, mapper, isdelete): self.mapper = mapper self.isdelete = isdelete + self.sort_key = ("PostUpdateAll", mapper._sort_key, isdelete) @util.preload_module("sqlalchemy.orm.persistence") def execute(self, uow): @@ -598,10 +609,11 @@ class PostUpdateAll(PostSortRec): class SaveUpdateAll(PostSortRec): - __slots__ = ("mapper",) + __slots__ = ("mapper", "sort_key") def __init__(self, uow, mapper): self.mapper = mapper + self.sort_key = ("SaveUpdateAll", mapper._sort_key) assert mapper is mapper.base_mapper @util.preload_module("sqlalchemy.orm.persistence") @@ -634,10 +646,11 @@ class SaveUpdateAll(PostSortRec): class DeleteAll(PostSortRec): - __slots__ = ("mapper",) + __slots__ = ("mapper", "sort_key") def __init__(self, uow, mapper): self.mapper = mapper + self.sort_key = ("DeleteAll", mapper._sort_key) assert mapper is mapper.base_mapper @util.preload_module("sqlalchemy.orm.persistence") @@ -670,10 +683,11 @@ class DeleteAll(PostSortRec): class ProcessState(PostSortRec): - __slots__ = "dependency_processor", "isdelete", "state" + __slots__ = "dependency_processor", "isdelete", "state", "sort_key" def __init__(self, uow, dependency_processor, isdelete, state): self.dependency_processor = dependency_processor + self.sort_key = ("ProcessState", dependency_processor.sort_key) self.isdelete = isdelete self.state = state @@ -705,11 +719,12 @@ class ProcessState(PostSortRec): class SaveUpdateState(PostSortRec): - __slots__ = "state", "mapper" + __slots__ = "state", "mapper", "sort_key" def __init__(self, uow, state): self.state = state self.mapper = state.mapper.base_mapper + self.sort_key = ("ProcessState", self.mapper._sort_key) @util.preload_module("sqlalchemy.orm.persistence") def execute_aggregate(self, uow, recs): @@ -732,11 +747,12 @@ class SaveUpdateState(PostSortRec): class DeleteState(PostSortRec): - __slots__ = "state", "mapper" + __slots__ = "state", "mapper", "sort_key" def __init__(self, uow, state): self.state = state self.mapper = state.mapper.base_mapper + self.sort_key = ("DeleteState", self.mapper._sort_key) @util.preload_module("sqlalchemy.orm.persistence") def execute_aggregate(self, uow, recs): diff --git a/lib/sqlalchemy/sql/ddl.py b/lib/sqlalchemy/sql/ddl.py index f1012292b..e0dd6faf7 100644 --- a/lib/sqlalchemy/sql/ddl.py +++ b/lib/sqlalchemy/sql/ddl.py @@ -1171,7 +1171,6 @@ def sort_tables_and_constraints( topological.sort( fixed_dependencies.union(mutable_dependencies), tables, - deterministic_order=True, ) ) except exc.CircularDependencyError as err: @@ -1203,7 +1202,6 @@ def sort_tables_and_constraints( topological.sort( fixed_dependencies.union(mutable_dependencies), tables, - deterministic_order=True, ) ) diff --git a/lib/sqlalchemy/util/topological.py b/lib/sqlalchemy/util/topological.py index 4d6ef22ec..b009a8ce2 100644 --- a/lib/sqlalchemy/util/topological.py +++ b/lib/sqlalchemy/util/topological.py @@ -10,25 +10,23 @@ from .. import util from ..exc import CircularDependencyError - __all__ = ["sort", "sort_as_subsets", "find_cycles"] -def sort_as_subsets(tuples, allitems, deterministic_order=False): +def sort_as_subsets(tuples, allitems): edges = util.defaultdict(set) for parent, child in tuples: edges[child].add(parent) - Set = util.OrderedSet if deterministic_order else set - - todo = Set(allitems) + todo = list(allitems) + todo_set = set(allitems) - while todo: - output = Set() + while todo_set: + output = [] for node in todo: - if todo.isdisjoint(edges[node]): - output.add(node) + if todo_set.isdisjoint(edges[node]): + output.append(node) if not output: raise CircularDependencyError( @@ -37,18 +35,23 @@ def sort_as_subsets(tuples, allitems, deterministic_order=False): _gen_edges(edges), ) - todo.difference_update(output) + todo_set.difference_update(output) + todo = [t for t in todo if t in todo_set] yield output -def sort(tuples, allitems, deterministic_order=False): +def sort(tuples, allitems, deterministic_order=True): """sort the given list of items by dependency. 'tuples' is a list of tuples representing a partial ordering. - 'deterministic_order' keeps items within a dependency tier in list order. + + deterministic_order is no longer used, the order is now always + deterministic given the order of "allitems". the flag is there + for backwards compatibility with Alembic. + """ - for set_ in sort_as_subsets(tuples, allitems, deterministic_order): + for set_ in sort_as_subsets(tuples, allitems): for s in set_: yield s diff --git a/test/base/test_dependency.py b/test/base/test_dependency.py index 7af7fdc71..9250c3334 100644 --- a/test/base/test_dependency.py +++ b/test/base/test_dependency.py @@ -16,9 +16,7 @@ class DependencySortTest(fixtures.TestBase): assert conforms_partial_ordering(tuples, result) def assert_sort_deterministic(self, tuples, allitems, expected): - result = list( - topological.sort(tuples, allitems, deterministic_order=True) - ) + result = list(topological.sort(tuples, allitems)) assert conforms_partial_ordering(tuples, result) assert result == expected diff --git a/test/orm/test_unitofwork.py b/test/orm/test_unitofwork.py index eb5530ef4..7583b9d22 100644 --- a/test/orm/test_unitofwork.py +++ b/test/orm/test_unitofwork.py @@ -2531,16 +2531,14 @@ class ManyToManyTest(_fixtures.FixtureTest): self.assert_sql_execution( testing.db, session.flush, - AllOf( - CompiledSQL( - "UPDATE items SET description=:description " - "WHERE items.id = :items_id", - {"description": "item4updated", "items_id": objects[4].id}, - ), - CompiledSQL( - "INSERT INTO keywords (name) " "VALUES (:name)", - {"name": "yellow"}, - ), + CompiledSQL( + "UPDATE items SET description=:description " + "WHERE items.id = :items_id", + {"description": "item4updated", "items_id": objects[4].id}, + ), + CompiledSQL( + "INSERT INTO keywords (name) " "VALUES (:name)", + {"name": "yellow"}, ), CompiledSQL( "INSERT INTO item_keywords (item_id, keyword_id) " diff --git a/test/orm/test_unitofworkv2.py b/test/orm/test_unitofworkv2.py index 31386b07f..97b7b9edd 100644 --- a/test/orm/test_unitofworkv2.py +++ b/test/orm/test_unitofworkv2.py @@ -681,15 +681,13 @@ class RudimentaryFlushTest(UOWTest): self.assert_sql_execution( testing.db, sess.flush, - AllOf( - CompiledSQL( - "INSERT INTO keywords (name) VALUES (:name)", - {"name": "k1"}, - ), - CompiledSQL( - "INSERT INTO items (description) VALUES (:description)", - {"description": "i1"}, - ), + CompiledSQL( + "INSERT INTO items (description) VALUES (:description)", + {"description": "i1"}, + ), + CompiledSQL( + "INSERT INTO keywords (name) VALUES (:name)", + {"name": "k1"}, ), CompiledSQL( "INSERT INTO item_keywords (item_id, keyword_id) " @@ -874,15 +872,13 @@ class SingleCycleTest(UOWTest): self.assert_sql_execution( testing.db, sess.flush, - AllOf( - CompiledSQL( - "UPDATE nodes SET parent_id=:parent_id " - "WHERE nodes.id = :nodes_id", - lambda ctx: [ - {"nodes_id": n3.id, "parent_id": None}, - {"nodes_id": n2.id, "parent_id": None}, - ], - ) + CompiledSQL( + "UPDATE nodes SET parent_id=:parent_id " + "WHERE nodes.id = :nodes_id", + lambda ctx: [ + {"nodes_id": n3.id, "parent_id": None}, + {"nodes_id": n2.id, "parent_id": None}, + ], ), CompiledSQL( "DELETE FROM nodes WHERE nodes.id = :id", |
