import copy from sqlalchemy import exc as sa_exc from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy import Integer from sqlalchemy import select from sqlalchemy import String from sqlalchemy import testing from sqlalchemy.orm import attributes from sqlalchemy.orm import backref from sqlalchemy.orm import CascadeOptions from sqlalchemy.orm import class_mapper from sqlalchemy.orm import configure_mappers from sqlalchemy.orm import exc as orm_exc from sqlalchemy.orm import foreign from sqlalchemy.orm import object_mapper from sqlalchemy.orm import relationship from sqlalchemy.orm import Session from sqlalchemy.orm import util as orm_util from sqlalchemy.orm import with_parent from sqlalchemy.orm.attributes import instance_state from sqlalchemy.orm.collections import attribute_keyed_dict from sqlalchemy.orm.decl_api import declarative_base from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import assert_warns_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import in_ from sqlalchemy.testing import not_in from sqlalchemy.testing.assertsql import CompiledSQL from sqlalchemy.testing.fixtures import fixture_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table from test.orm import _fixtures class CascadeArgTest(fixtures.MappedTest): run_inserts = None run_create_tables = None run_deletes = None @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30), nullable=False), ) Table( "addresses", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("user_id", Integer, ForeignKey("users.id")), Column("email_address", String(50), nullable=False), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Address(cls.Comparable): pass def test_delete_with_passive_deletes_all(self): User, Address = self.classes.User, self.classes.Address users, addresses = self.tables.users, self.tables.addresses self.mapper_registry.map_imperatively( User, users, properties={ "addresses": relationship( Address, passive_deletes="all", cascade="all, delete-orphan", ) }, ) self.mapper_registry.map_imperatively(Address, addresses) assert_raises_message( sa_exc.ArgumentError, "On User.addresses, can't set passive_deletes='all' " "in conjunction with 'delete' or 'delete-orphan' cascade", configure_mappers, ) def test_delete_orphan_without_delete(self): Address = self.classes.Address assert_warns_message( sa_exc.SAWarning, "The 'delete-orphan' cascade option requires 'delete'.", relationship, Address, cascade="save-update, delete-orphan", ) def test_bad_cascade(self): addresses, Address = self.tables.addresses, self.classes.Address self.mapper_registry.map_imperatively(Address, addresses) assert_raises_message( sa_exc.ArgumentError, r"Invalid cascade option\(s\): 'fake', 'fake2'", relationship, Address, cascade="fake, all, delete-orphan, fake2", ) def test_cascade_repr(self): eq_( repr(orm_util.CascadeOptions("all, delete-orphan")), "CascadeOptions('delete,delete-orphan,expunge," "merge,refresh-expire,save-update')", ) def test_cascade_immutable(self): assert isinstance( orm_util.CascadeOptions("all, delete-orphan"), frozenset ) def test_cascade_deepcopy(self): old = orm_util.CascadeOptions("all, delete-orphan") new = copy.deepcopy(old) eq_(old, new) def test_cascade_assignable(self): User, Address = self.classes.User, self.classes.Address users, addresses = self.tables.users, self.tables.addresses rel = relationship(Address) eq_(rel.cascade, {"save-update", "merge"}) rel.cascade = "save-update, merge, expunge" eq_(rel.cascade, {"save-update", "merge", "expunge"}) self.mapper_registry.map_imperatively( User, users, properties={"addresses": rel} ) am = self.mapper_registry.map_imperatively(Address, addresses) configure_mappers() eq_(rel.cascade, {"save-update", "merge", "expunge"}) assert ("addresses", User) not in am._delete_orphans rel.cascade = "all, delete, delete-orphan" assert ("addresses", User) in am._delete_orphans eq_( rel.cascade, { "delete", "delete-orphan", "expunge", "merge", "refresh-expire", "save-update", }, ) def test_cascade_unicode(self): Address = self.classes.Address rel = relationship(Address) rel.cascade = "save-update, merge, expunge" eq_(rel.cascade, {"save-update", "merge", "expunge"}) class CasadeWithRaiseloadTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30), nullable=False), ) Table( "addresses", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("user_id", Integer, ForeignKey("users.id")), Column("email_address", String(50), nullable=False), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Address(cls.Comparable): pass def test_delete_skips_lazy_raise(self): User, Address = self.classes.User, self.classes.Address users, addresses = self.tables.users, self.tables.addresses self.mapper_registry.map_imperatively( User, users, properties={ "addresses": relationship( Address, cascade="all, delete-orphan", lazy="raise" ) }, ) self.mapper_registry.map_imperatively(Address, addresses) self.mapper_registry.metadata.create_all(testing.db) sess = fixture_session() u1 = User( name="u1", addresses=[ Address(email_address="e1"), Address(email_address="e2"), ], ) sess.add(u1) sess.commit() eq_( sess.scalars( select(Address).order_by(Address.email_address) ).all(), [Address(email_address="e1"), Address(email_address="e2")], ) sess.close() sess.delete(u1) sess.commit() eq_(sess.scalars(select(Address)).all(), []) class O2MCascadeDeleteOrphanTest(fixtures.MappedTest): run_inserts = None @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30), nullable=False), ) Table( "addresses", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("user_id", Integer, ForeignKey("users.id")), Column("email_address", String(50), nullable=False), ) Table( "orders", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("user_id", Integer, ForeignKey("users.id"), nullable=False), Column("description", String(30)), ) Table( "dingalings", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("address_id", Integer, ForeignKey("addresses.id")), Column("data", String(30)), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Address(cls.Comparable): pass class Order(cls.Comparable): pass class Dingaling(cls.Comparable): pass @classmethod def setup_mappers(cls): ( users, Dingaling, Order, User, dingalings, Address, orders, addresses, ) = ( cls.tables.users, cls.classes.Dingaling, cls.classes.Order, cls.classes.User, cls.tables.dingalings, cls.classes.Address, cls.tables.orders, cls.tables.addresses, ) cls.mapper_registry.map_imperatively(Address, addresses) cls.mapper_registry.map_imperatively(Order, orders) cls.mapper_registry.map_imperatively( User, users, properties={ "addresses": relationship( Address, cascade="all, delete-orphan", backref="user" ), "orders": relationship( Order, cascade="all, delete-orphan", order_by=orders.c.id ), }, ) cls.mapper_registry.map_imperatively( Dingaling, dingalings, properties={"address": relationship(Address)}, ) def test_list_assignment_new(self): User, Order = self.classes.User, self.classes.Order with fixture_session() as sess: u = User( name="jack", orders=[ Order(description="order 1"), Order(description="order 2"), ], ) sess.add(u) sess.commit() eq_( u, User( name="jack", orders=[ Order(description="order 1"), Order(description="order 2"), ], ), ) def test_list_assignment_replace(self): User, Order = self.classes.User, self.classes.Order with fixture_session() as sess: u = User( name="jack", orders=[ Order(description="someorder"), Order(description="someotherorder"), ], ) sess.add(u) u.orders = [ Order(description="order 3"), Order(description="order 4"), ] sess.commit() eq_( u, User( name="jack", orders=[ Order(description="order 3"), Order(description="order 4"), ], ), ) # order 1, order 2 have been deleted eq_( sess.query(Order).order_by(Order.id).all(), [Order(description="order 3"), Order(description="order 4")], ) def test_standalone_orphan(self): Order = self.classes.Order with fixture_session() as sess: o5 = Order(description="order 5") sess.add(o5) assert_raises(sa_exc.DBAPIError, sess.flush) def test_save_update_sends_pending(self): """test that newly added and deleted collection items are cascaded on save-update""" Order, User = self.classes.Order, self.classes.User sess = fixture_session(expire_on_commit=False) o1, o2, o3 = ( Order(description="o1"), Order(description="o2"), Order(description="o3"), ) u = User(name="jack", orders=[o1, o2]) sess.add(u) sess.commit() sess.close() u.orders.append(o3) u.orders.remove(o1) sess.add(u) assert o1 in sess assert o2 in sess assert o3 in sess sess.commit() def test_remove_pending_from_collection(self): User, Order = self.classes.User, self.classes.Order with fixture_session() as sess: u = User(name="jack") sess.add(u) sess.commit() o1 = Order() u.orders.append(o1) assert o1 in sess u.orders.remove(o1) assert o1 not in sess def test_remove_pending_from_pending_parent(self): # test issue #4040 User, Order = self.classes.User, self.classes.Order with fixture_session() as sess: u = User(name="jack") o1 = Order() sess.add(o1) # object becomes an orphan, but parent is not in session u.orders.append(o1) u.orders.remove(o1) sess.add(u) assert o1 in sess sess.flush() assert o1 not in sess def test_delete(self): User, users, orders, Order = ( self.classes.User, self.tables.users, self.tables.orders, self.classes.Order, ) with fixture_session() as sess: u = User( name="jack", orders=[ Order(description="someorder"), Order(description="someotherorder"), ], ) sess.add(u) sess.flush() sess.delete(u) sess.flush() eq_( sess.execute( select(func.count("*")).select_from(users) ).scalar(), 0, ) eq_( sess.execute( select(func.count("*")).select_from(orders) ).scalar(), 0, ) def test_delete_unloaded_collections(self): """Unloaded collections are still included in a delete-cascade by default.""" User, addresses, users, Address = ( self.classes.User, self.tables.addresses, self.tables.users, self.classes.Address, ) with fixture_session() as sess: u = User( name="jack", addresses=[ Address(email_address="address1"), Address(email_address="address2"), ], ) sess.add(u) sess.flush() sess.expunge_all() eq_( sess.execute( select(func.count("*")).select_from(addresses) ).scalar(), 2, ) eq_( sess.execute( select(func.count("*")).select_from(users) ).scalar(), 1, ) u = sess.get(User, u.id) assert "addresses" not in u.__dict__ sess.delete(u) sess.flush() eq_( sess.execute( select(func.count("*")).select_from(addresses) ).scalar(), 0, ) eq_( sess.execute( select(func.count("*")).select_from(users) ).scalar(), 0, ) def test_cascades_onlycollection(self): """Cascade only reaches instances that are still part of the collection, not those that have been removed""" User, Order, users, orders = ( self.classes.User, self.classes.Order, self.tables.users, self.tables.orders, ) with fixture_session(autoflush=False) as sess: u = User( name="jack", orders=[ Order(description="someorder"), Order(description="someotherorder"), ], ) sess.add(u) sess.flush() o = u.orders[0] del u.orders[0] sess.delete(u) assert u in sess.deleted assert o not in sess.deleted assert o in sess u2 = User(name="newuser", orders=[o]) sess.add(u2) sess.flush() sess.expunge_all() eq_( sess.execute( select(func.count("*")).select_from(users) ).scalar(), 1, ) eq_( sess.execute( select(func.count("*")).select_from(orders) ).scalar(), 1, ) eq_( sess.query(User).all(), [ User( name="newuser", orders=[Order(description="someorder")] ) ], ) def test_cascade_nosideeffects(self): """test that cascade leaves the state of unloaded scalars/collections unchanged.""" Dingaling, User, Address = ( self.classes.Dingaling, self.classes.User, self.classes.Address, ) sess = fixture_session() u = User(name="jack") sess.add(u) assert "orders" not in u.__dict__ sess.flush() assert "orders" not in u.__dict__ a = Address(email_address="foo@bar.com") sess.add(a) assert "user" not in a.__dict__ a.user = u sess.flush() d = Dingaling(data="d1") d.address_id = a.id sess.add(d) assert "address" not in d.__dict__ sess.flush() assert d.address is a def test_cascade_delete_plusorphans(self): User, users, orders, Order = ( self.classes.User, self.tables.users, self.tables.orders, self.classes.Order, ) sess = fixture_session() u = User( name="jack", orders=[ Order(description="someorder"), Order(description="someotherorder"), ], ) sess.add(u) sess.flush() eq_( sess.execute(select(func.count("*")).select_from(users)).scalar(), 1, ) eq_( sess.execute(select(func.count("*")).select_from(orders)).scalar(), 2, ) del u.orders[0] sess.delete(u) sess.flush() eq_( sess.execute(select(func.count("*")).select_from(users)).scalar(), 0, ) eq_( sess.execute(select(func.count("*")).select_from(orders)).scalar(), 0, ) def test_collection_orphans(self): User, users, orders, Order = ( self.classes.User, self.tables.users, self.tables.orders, self.classes.Order, ) with fixture_session() as sess: u = User( name="jack", orders=[ Order(description="someorder"), Order(description="someotherorder"), ], ) sess.add(u) sess.flush() eq_( sess.execute( select(func.count("*")).select_from(users) ).scalar(), 1, ) eq_( sess.execute( select(func.count("*")).select_from(orders) ).scalar(), 2, ) u.orders[:] = [] sess.flush() eq_( sess.execute( select(func.count("*")).select_from(users) ).scalar(), 1, ) eq_( sess.execute( select(func.count("*")).select_from(orders) ).scalar(), 0, ) class O2MCascadeTest(fixtures.MappedTest): run_inserts = None @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30), nullable=False), ) Table( "addresses", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("user_id", Integer, ForeignKey("users.id")), Column("email_address", String(50), nullable=False), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Address(cls.Comparable): pass @classmethod def setup_mappers(cls): users, User, Address, addresses = ( cls.tables.users, cls.classes.User, cls.classes.Address, cls.tables.addresses, ) cls.mapper_registry.map_imperatively(Address, addresses) cls.mapper_registry.map_imperatively( User, users, properties={"addresses": relationship(Address, backref="user")}, ) def test_none_o2m_collection_assignment(self): User = self.classes.User s = fixture_session() u1 = User(name="u", addresses=[None]) s.add(u1) eq_(u1.addresses, [None]) assert_raises_message( orm_exc.FlushError, "Can't flush None value found in collection User.addresses", s.commit, ) eq_(u1.addresses, [None]) def test_none_o2m_collection_append(self): User = self.classes.User s = fixture_session() u1 = User(name="u") s.add(u1) u1.addresses.append(None) eq_(u1.addresses, [None]) assert_raises_message( orm_exc.FlushError, "Can't flush None value found in collection User.addresses", s.commit, ) eq_(u1.addresses, [None]) class O2MCascadeDeleteNoOrphanTest(fixtures.MappedTest): run_inserts = None @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30)), ) Table( "orders", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("user_id", Integer, ForeignKey("users.id")), Column("description", String(30)), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Order(cls.Comparable): pass @classmethod def setup_mappers(cls): User, Order, orders, users = ( cls.classes.User, cls.classes.Order, cls.tables.orders, cls.tables.users, ) cls.mapper_registry.map_imperatively( User, users, properties=dict( orders=relationship( cls.mapper_registry.map_imperatively(Order, orders), cascade="all", ) ), ) def test_cascade_delete_noorphans(self): User, Order, orders, users = ( self.classes.User, self.classes.Order, self.tables.orders, self.tables.users, ) with fixture_session() as sess: u = User( name="jack", orders=[ Order(description="someorder"), Order(description="someotherorder"), ], ) sess.add(u) sess.flush() eq_( sess.execute( select(func.count("*")).select_from(users) ).scalar(), 1, ) eq_( sess.execute( select(func.count("*")).select_from(orders) ).scalar(), 2, ) del u.orders[0] sess.delete(u) sess.flush() eq_( sess.execute( select(func.count("*")).select_from(users) ).scalar(), 0, ) eq_( sess.execute( select(func.count("*")).select_from(orders) ).scalar(), 1, ) class O2OSingleParentTest(_fixtures.FixtureTest): run_inserts = None @classmethod def setup_mappers(cls): Address, addresses, users, User = ( cls.classes.Address, cls.tables.addresses, cls.tables.users, cls.classes.User, ) cls.mapper_registry.map_imperatively(Address, addresses) cls.mapper_registry.map_imperatively( User, users, properties={ "address": relationship( Address, backref=backref("user", single_parent=True), uselist=False, ) }, ) def test_single_parent_raise(self): User, Address = self.classes.User, self.classes.Address a1 = Address(email_address="some address") u1 = User(name="u1", address=a1) assert_raises( sa_exc.InvalidRequestError, Address, email_address="asd", user=u1 ) a2 = Address(email_address="asd") u1.address = a2 assert u1.address is not a1 assert a1.user is None class O2OSingleParentNoFlushTest(fixtures.MappedTest): run_inserts = None @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30), nullable=False), ) Table( "addresses", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("user_id", None, ForeignKey("users.id"), nullable=False), Column("email_address", String(50), nullable=False), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Address(cls.Comparable): pass @classmethod def setup_mappers(cls): Address, addresses, users, User = ( cls.classes.Address, cls.tables.addresses, cls.tables.users, cls.classes.User, ) cls.mapper_registry.map_imperatively(Address, addresses) cls.mapper_registry.map_imperatively( User, users, properties={ "address": relationship( Address, backref=backref( "user", single_parent=True, cascade="all, delete-orphan", ), uselist=False, ) }, ) def test_replace_attribute_no_flush(self): # test [ticket:2921] User, Address = self.classes.User, self.classes.Address a1 = Address(email_address="some address") u1 = User(name="u1", address=a1) sess = fixture_session() sess.add(u1) sess.commit() # in this case, u1.address has active history set, because # this operation necessarily replaces the old object which must be # loaded. # the set operation requires that "u1" is unexpired, because the # replace operation wants to load the # previous value. The original test case for #2921 only included # that the lazyload operation passed a no autoflush flag through # to the operation, however in #5226 this has been enhanced to pass # the no autoflush flag down through to the unexpire of the attributes # as well, so that attribute unexpire can otherwise invoke autoflush. assert "id" not in u1.__dict__ a2 = Address(email_address="asdf") sess.add(a2) u1.address = a2 class M2OwNoUseGetCascadeTest( testing.AssertsExecutionResults, fixtures.TestBase ): @testing.fixture def fixture(self, metadata): Base = declarative_base(metadata=metadata) def go(lazy="select", cascade="save-update"): class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) email = Column(String(50), unique=True) bs = relationship( "B", back_populates="user", primaryjoin="A.email == B.email", ) class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) email = Column(String(50), ForeignKey("a.email")) user = relationship( "A", lazy=lazy, cascade=cascade, single_parent=True, back_populates="bs", primaryjoin="A.email == B.email", ) Base.metadata.create_all(testing.db) return A, B yield go Base.registry.dispose() def test_cascade_deletes_user(self, fixture): A, B = fixture(cascade="all, delete-orphan") sess = fixture_session() a1 = A(email="x") b1 = B(user=a1) sess.add_all([a1, b1]) sess.commit() b1 = sess.execute(select(B)).scalars().first() sess.delete(b1) self.assert_sql_execution( testing.db, sess.flush, # looking for other bs' CompiledSQL( "SELECT b.id AS b_id, b.email AS b_email " "FROM b WHERE :param_1 = b.email", lambda ctx: [{"param_1": "x"}], ), CompiledSQL( "DELETE FROM b WHERE b.id = :id", lambda ctx: [{"id": 1}] ), CompiledSQL( "DELETE FROM a WHERE a.id = :id", lambda ctx: [{"id": 1}] ), ) @testing.combinations(("select",), ("raise",), argnames="lazy") def test_ignores_user(self, fixture, lazy): A, B = fixture() sess = fixture_session() a1 = A(email="x") b1 = B(user=a1) sess.add_all([a1, b1]) sess.commit() b1 = sess.execute(select(B)).scalars().first() sess.delete(b1) self.assert_sql_execution( testing.db, sess.flush, # we would like it to be able to skip this SELECT but this is not # implemented right now CompiledSQL( "SELECT a.id AS a_id, a.email AS a_email FROM a " "WHERE a.email = :param_1", [{"param_1": "x"}], ), CompiledSQL( "DELETE FROM b WHERE b.id = :id", lambda ctx: [{"id": 1}] ), ) class NoSaveCascadeFlushTest(_fixtures.FixtureTest): """Test related item not present in session, commit proceeds.""" run_inserts = None def _one_to_many_fixture( self, o2m_cascade=True, m2o_cascade=True, o2m=False, m2o=False, ): Address, addresses, users, User = ( self.classes.Address, self.tables.addresses, self.tables.users, self.classes.User, ) if o2m: if m2o: addresses_rel = { "addresses": relationship( Address, cascade=o2m_cascade and "save-update" or "", backref=backref( "user", cascade=m2o_cascade and "save-update" or "", ), ) } else: addresses_rel = { "addresses": relationship( Address, cascade=o2m_cascade and "save-update" or "", ) } user_rel = {} elif m2o: user_rel = { "user": relationship( User, cascade=m2o_cascade and "save-update" or "", ) } addresses_rel = {} else: addresses_rel = {} user_rel = {} self.mapper_registry.map_imperatively( User, users, properties=addresses_rel ) self.mapper_registry.map_imperatively( Address, addresses, properties=user_rel ) def _many_to_many_fixture( self, fwd_cascade=True, bkd_cascade=True, fwd=False, bkd=False, ): keywords, items, item_keywords, Keyword, Item = ( self.tables.keywords, self.tables.items, self.tables.item_keywords, self.classes.Keyword, self.classes.Item, ) if fwd: if bkd: keywords_rel = { "keywords": relationship( Keyword, secondary=item_keywords, cascade=fwd_cascade and "save-update" or "", backref=backref( "items", cascade=bkd_cascade and "save-update" or "", ), ) } else: keywords_rel = { "keywords": relationship( Keyword, secondary=item_keywords, cascade=fwd_cascade and "save-update" or "", ) } items_rel = {} elif bkd: items_rel = { "items": relationship( Item, secondary=item_keywords, cascade=bkd_cascade and "save-update" or "", ) } keywords_rel = {} else: keywords_rel = {} items_rel = {} self.mapper_registry.map_imperatively( Item, items, properties=keywords_rel ) self.mapper_registry.map_imperatively( Keyword, keywords, properties=items_rel ) def test_o2m_only_child_pending(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") u1.addresses.append(a1) sess.add(u1) assert u1 in sess assert a1 in sess sess.flush() def test_o2m_only_child_transient(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=False, o2m_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") u1.addresses.append(a1) sess.add(u1) assert u1 in sess assert a1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_o2m_only_child_persistent(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=False, o2m_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") sess.add(a1) sess.flush() sess.expunge_all() u1.addresses.append(a1) sess.add(u1) assert u1 in sess assert a1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_o2m_backref_child_pending(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") u1.addresses.append(a1) sess.add(u1) assert u1 in sess assert a1 in sess sess.flush() def test_o2m_backref_child_transient(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, o2m_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") u1.addresses.append(a1) sess.add(u1) assert u1 in sess assert a1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_o2m_backref_child_transient_nochange(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, o2m_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") u1.addresses.append(a1) sess.add(u1) assert u1 in sess assert a1 not in sess @testing.emits_warning(r".*not in session") def go(): sess.commit() go() eq_(u1.addresses, []) def test_o2m_backref_child_expunged(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, o2m_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") sess.add(a1) sess.flush() sess.add(u1) u1.addresses.append(a1) sess.expunge(a1) assert u1 in sess assert a1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_o2m_backref_child_expunged_nochange(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, o2m_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") sess.add(a1) sess.flush() sess.add(u1) u1.addresses.append(a1) sess.expunge(a1) assert u1 in sess assert a1 not in sess @testing.emits_warning(r".*not in session") def go(): sess.commit() go() eq_(u1.addresses, []) def test_m2o_only_child_pending(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=False, m2o=True) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") a1.user = u1 sess.add(a1) assert u1 in sess assert a1 in sess sess.flush() def test_m2o_only_child_transient(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=False, m2o=True, m2o_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") a1.user = u1 sess.add(a1) assert u1 not in sess assert a1 in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_m2o_only_child_expunged(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=False, m2o=True, m2o_cascade=False) sess = fixture_session() u1 = User(name="u1") sess.add(u1) sess.flush() a1 = Address(email_address="a1") a1.user = u1 sess.add(a1) sess.expunge(u1) assert u1 not in sess assert a1 in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_m2o_backref_child_pending(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") a1.user = u1 sess.add(a1) assert u1 in sess assert a1 in sess sess.flush() def test_m2o_backref_child_transient(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") a1.user = u1 sess.add(a1) assert u1 not in sess assert a1 in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_m2o_backref_child_expunged(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) with fixture_session() as sess: u1 = User(name="u1") sess.add(u1) sess.flush() a1 = Address(email_address="a1") a1.user = u1 sess.add(a1) sess.expunge(u1) assert u1 not in sess assert a1 in sess assert_warns_message( sa_exc.SAWarning, "not in session", sess.flush ) def test_m2o_backref_future_child_expunged(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) with Session(testing.db, future=True) as sess: u1 = User(name="u1") sess.add(u1) sess.flush() a1 = Address(email_address="a1") a1.user = u1 assert a1 not in sess sess.add(a1) sess.expunge(u1) assert u1 not in sess assert a1 in sess assert_warns_message( sa_exc.SAWarning, "not in session", sess.flush ) def test_m2o_backref_child_pending_nochange(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) sess = fixture_session() u1 = User(name="u1") a1 = Address(email_address="a1") a1.user = u1 sess.add(a1) assert u1 not in sess assert a1 in sess @testing.emits_warning(r".*not in session") def go(): sess.commit() go() # didn't get flushed assert a1.user is None def test_m2o_backref_child_expunged_nochange(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) with fixture_session() as sess: u1 = User(name="u1") sess.add(u1) sess.flush() a1 = Address(email_address="a1") a1.user = u1 sess.add(a1) sess.expunge(u1) assert u1 not in sess assert a1 in sess @testing.emits_warning(r".*not in session") def go(): sess.commit() go() # didn't get flushed assert a1.user is None def test_m2o_backref_future_child_expunged_nochange(self): User, Address = self.classes.User, self.classes.Address self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) with Session(testing.db, future=True) as sess: u1 = User(name="u1") sess.add(u1) sess.flush() a1 = Address(email_address="a1") a1.user = u1 assert a1 not in sess sess.add(a1) sess.expunge(u1) assert u1 not in sess assert a1 in sess @testing.emits_warning(r".*not in session") def go(): sess.commit() go() # didn't get flushed assert a1.user is None def test_m2m_only_child_pending(self): Item, Keyword = self.classes.Item, self.classes.Keyword self._many_to_many_fixture(fwd=True, bkd=False) sess = fixture_session() i1 = Item(description="i1") k1 = Keyword(name="k1") i1.keywords.append(k1) sess.add(i1) assert i1 in sess assert k1 in sess sess.flush() def test_m2m_only_child_transient(self): Item, Keyword = self.classes.Item, self.classes.Keyword self._many_to_many_fixture(fwd=True, bkd=False, fwd_cascade=False) sess = fixture_session() i1 = Item(description="i1") k1 = Keyword(name="k1") i1.keywords.append(k1) sess.add(i1) assert i1 in sess assert k1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_m2m_only_child_persistent(self): Item, Keyword = self.classes.Item, self.classes.Keyword self._many_to_many_fixture(fwd=True, bkd=False, fwd_cascade=False) sess = fixture_session() i1 = Item(description="i1") k1 = Keyword(name="k1") sess.add(k1) sess.flush() sess.expunge_all() i1.keywords.append(k1) sess.add(i1) assert i1 in sess assert k1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_m2m_backref_child_pending(self): Item, Keyword = self.classes.Item, self.classes.Keyword self._many_to_many_fixture(fwd=True, bkd=True) sess = fixture_session() i1 = Item(description="i1") k1 = Keyword(name="k1") i1.keywords.append(k1) sess.add(i1) assert i1 in sess assert k1 in sess sess.flush() def test_m2m_backref_child_transient(self): Item, Keyword = self.classes.Item, self.classes.Keyword self._many_to_many_fixture(fwd=True, bkd=True, fwd_cascade=False) sess = fixture_session() i1 = Item(description="i1") k1 = Keyword(name="k1") i1.keywords.append(k1) sess.add(i1) assert i1 in sess assert k1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_m2m_backref_child_transient_nochange(self): Item, Keyword = self.classes.Item, self.classes.Keyword self._many_to_many_fixture(fwd=True, bkd=True, fwd_cascade=False) sess = fixture_session() i1 = Item(description="i1") k1 = Keyword(name="k1") i1.keywords.append(k1) sess.add(i1) assert i1 in sess assert k1 not in sess @testing.emits_warning(r".*not in session") def go(): sess.commit() go() eq_(i1.keywords, []) def test_m2m_backref_child_expunged(self): Item, Keyword = self.classes.Item, self.classes.Keyword self._many_to_many_fixture(fwd=True, bkd=True, fwd_cascade=False) sess = fixture_session() i1 = Item(description="i1") k1 = Keyword(name="k1") sess.add(k1) sess.flush() sess.add(i1) i1.keywords.append(k1) sess.expunge(k1) assert i1 in sess assert k1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.flush) def test_m2m_backref_child_expunged_nochange(self): Item, Keyword = self.classes.Item, self.classes.Keyword self._many_to_many_fixture(fwd=True, bkd=True, fwd_cascade=False) sess = fixture_session() i1 = Item(description="i1") k1 = Keyword(name="k1") sess.add(k1) sess.flush() sess.add(i1) i1.keywords.append(k1) sess.expunge(k1) assert i1 in sess assert k1 not in sess @testing.emits_warning(r".*not in session") def go(): sess.commit() go() eq_(i1.keywords, []) class NoSaveCascadeBackrefTest(_fixtures.FixtureTest): """test that backrefs don't force save-update cascades to occur when the cascade initiated from the forwards side.""" def test_unidirectional_cascade_o2m(self): User, Order, users, orders = ( self.classes.User, self.classes.Order, self.tables.users, self.tables.orders, ) self.mapper_registry.map_imperatively(Order, orders) self.mapper_registry.map_imperatively( User, users, properties=dict( orders=relationship( Order, backref=backref("user", cascade=None) ) ), ) sess = fixture_session() o1 = Order() sess.add(o1) u1 = User(orders=[o1]) assert u1 not in sess assert o1 in sess sess.expunge_all() o1 = Order() u1 = User(orders=[o1]) sess.add(o1) assert u1 not in sess assert o1 in sess def test_unidirectional_cascade_m2o(self): User, Order, users, orders = ( self.classes.User, self.classes.Order, self.tables.users, self.tables.orders, ) self.mapper_registry.map_imperatively( Order, orders, properties={ "user": relationship( User, backref=backref("orders", cascade=None) ) }, ) self.mapper_registry.map_imperatively(User, users) sess = fixture_session() u1 = User() sess.add(u1) o1 = Order() o1.user = u1 assert o1 not in sess assert u1 in sess sess.expunge_all() u1 = User() o1 = Order() o1.user = u1 sess.add(u1) assert o1 not in sess assert u1 in sess def test_unidirectional_cascade_m2m(self): keywords, items, item_keywords, Keyword, Item = ( self.tables.keywords, self.tables.items, self.tables.item_keywords, self.classes.Keyword, self.classes.Item, ) self.mapper_registry.map_imperatively( Item, items, properties={ "keywords": relationship( Keyword, secondary=item_keywords, cascade="none", backref="items", ) }, ) self.mapper_registry.map_imperatively(Keyword, keywords) sess = fixture_session() i1 = Item() k1 = Keyword() sess.add(i1) i1.keywords.append(k1) assert i1 in sess assert k1 not in sess sess.expunge_all() i1 = Item() k1 = Keyword() sess.add(i1) k1.items.append(i1) assert i1 in sess assert k1 not in sess @testing.combinations( ( "legacy_style", True, ), ( "new_style", False, ), argnames="name, _legacy_inactive_history_style", id_="sa", ) class M2OCascadeDeleteOrphanTestOne(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "extra", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("prefs_id", Integer, ForeignKey("prefs.id")), ) Table( "prefs", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), ) Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(40)), Column("pref_id", Integer, ForeignKey("prefs.id")), Column("foo_id", Integer, ForeignKey("foo.id")), ) Table( "foo", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Pref(cls.Comparable): pass class Extra(cls.Comparable): pass class Foo(cls.Comparable): pass @classmethod def setup_mappers(cls): extra, foo, users, Extra, Pref, User, prefs, Foo = ( cls.tables.extra, cls.tables.foo, cls.tables.users, cls.classes.Extra, cls.classes.Pref, cls.classes.User, cls.tables.prefs, cls.classes.Foo, ) cls.mapper_registry.map_imperatively(Extra, extra) cls.mapper_registry.map_imperatively( Pref, prefs, properties=dict(extra=relationship(Extra, cascade="all, delete")), ) cls.mapper_registry.map_imperatively( User, users, properties=dict( pref=relationship( Pref, lazy="joined", cascade="all, delete-orphan", single_parent=True, ), foo=relationship( Foo, active_history=False, _legacy_inactive_history_style=( cls._legacy_inactive_history_style ), ), ), ) # straight m2o cls.mapper_registry.map_imperatively(Foo, foo) @classmethod def insert_data(cls, connection): Pref, User, Extra = ( cls.classes.Pref, cls.classes.User, cls.classes.Extra, ) u1 = User(name="ed", pref=Pref(data="pref 1", extra=[Extra()])) u2 = User(name="jack", pref=Pref(data="pref 2", extra=[Extra()])) u3 = User(name="foo", pref=Pref(data="pref 3", extra=[Extra()])) sess = Session(connection) sess.add_all((u1, u2, u3)) sess.flush() sess.close() def test_orphan(self): prefs, User, extra = ( self.tables.prefs, self.classes.User, self.tables.extra, ) sess = fixture_session() eq_( sess.execute(select(func.count("*")).select_from(prefs)).scalar(), 3, ) eq_( sess.execute(select(func.count("*")).select_from(extra)).scalar(), 3, ) jack = sess.query(User).filter_by(name="jack").one() jack.pref = None sess.flush() eq_( sess.execute(select(func.count("*")).select_from(prefs)).scalar(), 2, ) eq_( sess.execute(select(func.count("*")).select_from(extra)).scalar(), 2, ) def test_cascade_on_deleted(self): """test a bug introduced by #6711""" Foo, User = self.classes.Foo, self.classes.User sess = fixture_session(expire_on_commit=True) u1 = User(name="jack", foo=Foo(data="f1")) sess.add(u1) sess.commit() u1.foo = None # the error condition relies upon # these things being true assert User.foo.dispatch._active_history is False eq_(attributes.get_history(u1, "foo"), ([None], (), ())) sess.add(u1) assert u1 in sess sess.commit() def test_save_update_sends_pending(self): """test that newly added and deleted scalar items are cascaded on save-update""" Pref, User = self.classes.Pref, self.classes.User sess = fixture_session(expire_on_commit=False) p1, p2 = Pref(data="p1"), Pref(data="p2") u = User(name="jack", pref=p1) sess.add(u) sess.commit() sess.close() u.pref = p2 sess.add(u) assert p1 in sess assert p2 in sess sess.commit() def test_orphan_on_update(self): prefs, User, extra = ( self.tables.prefs, self.classes.User, self.tables.extra, ) sess = fixture_session() jack = sess.query(User).filter_by(name="jack").one() p = jack.pref e = jack.pref.extra[0] sess.expunge_all() jack.pref = None sess.add(jack) sess.add(p) sess.add(e) assert p in sess assert e in sess sess.flush() eq_( sess.execute(select(func.count("*")).select_from(prefs)).scalar(), 2, ) eq_( sess.execute(select(func.count("*")).select_from(extra)).scalar(), 2, ) def test_pending_expunge(self): Pref, User = self.classes.Pref, self.classes.User sess = fixture_session() someuser = User(name="someuser") sess.add(someuser) sess.flush() someuser.pref = p1 = Pref(data="somepref") assert p1 in sess someuser.pref = Pref(data="someotherpref") assert p1 not in sess sess.flush() eq_( sess.query(Pref).filter(with_parent(someuser, User.pref)).all(), [Pref(data="someotherpref")], ) def test_double_assignment(self): """Double assignment will not accidentally reset the 'parent' flag.""" Pref, User = self.classes.Pref, self.classes.User sess = fixture_session() jack = sess.query(User).filter_by(name="jack").one() newpref = Pref(data="newpref") jack.pref = newpref jack.pref = newpref sess.flush() eq_( sess.query(Pref).order_by(Pref.id).all(), [Pref(data="pref 1"), Pref(data="pref 3"), Pref(data="newpref")], ) class M2OCascadeDeleteOrphanTestTwo(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), Column("t2id", Integer, ForeignKey("t2.id")), ) Table( "t2", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), Column("t3id", Integer, ForeignKey("t3.id")), ) Table( "t3", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), ) @classmethod def setup_classes(cls): class T1(cls.Comparable): pass class T2(cls.Comparable): pass class T3(cls.Comparable): pass @classmethod def setup_mappers(cls): t2, T2, T3, t1, t3, T1 = ( cls.tables.t2, cls.classes.T2, cls.classes.T3, cls.tables.t1, cls.tables.t3, cls.classes.T1, ) cls.mapper_registry.map_imperatively( T1, t1, properties=dict( t2=relationship( T2, cascade="all, delete-orphan", single_parent=True ) ), ) cls.mapper_registry.map_imperatively( T2, t2, properties=dict( t3=relationship( T3, cascade="all, delete-orphan", single_parent=True, backref=backref("t2", uselist=False), ) ), ) cls.mapper_registry.map_imperatively(T3, t3) def test_cascade_delete(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a"))) sess.add(x) sess.flush() sess.delete(x) sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) def test_deletes_orphans_onelevel(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x2 = T1(data="t1b", t2=T2(data="t2b", t3=T3(data="t3b"))) sess.add(x2) sess.flush() x2.t2 = None sess.delete(x2) sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) def test_deletes_orphans_twolevel(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a"))) sess.add(x) sess.flush() x.t2.t3 = None sess.delete(x) sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) def test_finds_orphans_twolevel(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a"))) sess.add(x) sess.flush() x.t2.t3 = None sess.flush() eq_(sess.query(T1).all(), [T1()]) eq_(sess.query(T2).all(), [T2()]) eq_(sess.query(T3).all(), []) def test_single_parent_raise(self): T2, T1 = self.classes.T2, self.classes.T1 y = T2(data="T2a") T1(data="T1a", t2=y) assert_raises(sa_exc.InvalidRequestError, T1, data="T1b", t2=y) def test_single_parent_backref(self): T2, T3 = self.classes.T2, self.classes.T3 y = T3(data="T3a") x = T2(data="T2a", t3=y) # can't attach the T3 to another T2 assert_raises(sa_exc.InvalidRequestError, T2, data="T2b", t3=y) # set via backref tho is OK, unsets from previous parent # first z = T2(data="T2b") y.t2 = z assert z.t3 is y assert x.t3 is None class M2OCascadeDeleteNoOrphanTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), Column("t2id", Integer, ForeignKey("t2.id")), ) Table( "t2", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), Column("t3id", Integer, ForeignKey("t3.id")), ) Table( "t3", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), ) @classmethod def setup_classes(cls): class T1(cls.Comparable): pass class T2(cls.Comparable): pass class T3(cls.Comparable): pass @classmethod def setup_mappers(cls): t2, T2, T3, t1, t3, T1 = ( cls.tables.t2, cls.classes.T2, cls.classes.T3, cls.tables.t1, cls.tables.t3, cls.classes.T1, ) cls.mapper_registry.map_imperatively( T1, t1, properties={"t2": relationship(T2, cascade="all")} ) cls.mapper_registry.map_imperatively( T2, t2, properties={"t3": relationship(T3, cascade="all")} ) cls.mapper_registry.map_imperatively(T3, t3) def test_cascade_delete(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a"))) sess.add(x) sess.flush() sess.delete(x) sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) def test_cascade_delete_postappend_onelevel(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x1 = T1(data="t1") x2 = T2(data="t2") x3 = T3(data="t3") sess.add_all((x1, x2, x3)) sess.flush() sess.delete(x1) x1.t2 = x2 x2.t3 = x3 sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) def test_cascade_delete_postappend_twolevel(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x1 = T1(data="t1", t2=T2(data="t2")) x3 = T3(data="t3") sess.add_all((x1, x3)) sess.flush() sess.delete(x1) x1.t2.t3 = x3 sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) def test_preserves_orphans_onelevel(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x2 = T1(data="t1b", t2=T2(data="t2b", t3=T3(data="t3b"))) sess.add(x2) sess.flush() x2.t2 = None sess.delete(x2) sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), [T2()]) eq_(sess.query(T3).all(), [T3()]) @testing.future() def test_preserves_orphans_onelevel_postremove(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x2 = T1(data="t1b", t2=T2(data="t2b", t3=T3(data="t3b"))) sess.add(x2) sess.flush() sess.delete(x2) x2.t2 = None sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), [T2()]) eq_(sess.query(T3).all(), [T3()]) def test_preserves_orphans_twolevel(self): T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1) sess = fixture_session() x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a"))) sess.add(x) sess.flush() x.t2.t3 = None sess.delete(x) sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), [T3()]) class M2MCascadeTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "a", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(30)), test_needs_fk=True, ) Table( "b", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(30)), test_needs_fk=True, ) Table( "atob", metadata, Column("aid", Integer, ForeignKey("a.id")), Column("bid", Integer, ForeignKey("b.id")), test_needs_fk=True, ) Table( "c", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(30)), Column("bid", Integer, ForeignKey("b.id")), test_needs_fk=True, ) @classmethod def setup_classes(cls): class A(cls.Comparable): pass class B(cls.Comparable): pass class C(cls.Comparable): pass def test_delete_orphan(self): a, A, B, b, atob = ( self.tables.a, self.classes.A, self.classes.B, self.tables.b, self.tables.atob, ) # if no backref here, delete-orphan failed until [ticket:427] # was fixed self.mapper_registry.map_imperatively( A, a, properties={ "bs": relationship( B, secondary=atob, cascade="all, delete-orphan", single_parent=True, ) }, ) self.mapper_registry.map_imperatively(B, b) sess = fixture_session() b1 = B(data="b1") a1 = A(data="a1", bs=[b1]) sess.add(a1) sess.flush() a1.bs.remove(b1) sess.flush() eq_( sess.execute(select(func.count("*")).select_from(atob)).scalar(), 0 ) eq_(sess.execute(select(func.count("*")).select_from(b)).scalar(), 0) eq_(sess.execute(select(func.count("*")).select_from(a)).scalar(), 1) def test_delete_orphan_dynamic(self): a, A, B, b, atob = ( self.tables.a, self.classes.A, self.classes.B, self.tables.b, self.tables.atob, ) self.mapper_registry.map_imperatively( A, a, # if no backref here, delete-orphan properties={ "bs": relationship( B, secondary=atob, cascade="all, delete-orphan", single_parent=True, lazy="dynamic", ) }, ) # failed until [ticket:427] was fixed self.mapper_registry.map_imperatively(B, b) sess = fixture_session() b1 = B(data="b1") a1 = A(data="a1", bs=[b1]) sess.add(a1) sess.flush() a1.bs.remove(b1) sess.flush() eq_( sess.execute(select(func.count("*")).select_from(atob)).scalar(), 0 ) eq_(sess.execute(select(func.count("*")).select_from(b)).scalar(), 0) eq_(sess.execute(select(func.count("*")).select_from(a)).scalar(), 1) def test_delete_orphan_cascades(self): a, A, c, b, C, B, atob = ( self.tables.a, self.classes.A, self.tables.c, self.tables.b, self.classes.C, self.classes.B, self.tables.atob, ) self.mapper_registry.map_imperatively( A, a, properties={ # if no backref here, delete-orphan failed until # # [ticket:427] was fixed "bs": relationship( B, secondary=atob, cascade="all, delete-orphan", single_parent=True, ) }, ) self.mapper_registry.map_imperatively( B, b, properties={"cs": relationship(C, cascade="all, delete-orphan")}, ) self.mapper_registry.map_imperatively(C, c) sess = fixture_session() b1 = B(data="b1", cs=[C(data="c1")]) a1 = A(data="a1", bs=[b1]) sess.add(a1) sess.flush() a1.bs.remove(b1) sess.flush() eq_( sess.execute(select(func.count("*")).select_from(atob)).scalar(), 0 ) eq_(sess.execute(select(func.count("*")).select_from(b)).scalar(), 0) eq_(sess.execute(select(func.count("*")).select_from(a)).scalar(), 1) eq_(sess.execute(select(func.count("*")).select_from(c)).scalar(), 0) def test_cascade_delete(self): a, A, B, b, atob = ( self.tables.a, self.classes.A, self.classes.B, self.tables.b, self.tables.atob, ) self.mapper_registry.map_imperatively( A, a, properties={ "bs": relationship( B, secondary=atob, cascade="all, delete-orphan", single_parent=True, ) }, ) self.mapper_registry.map_imperatively(B, b) sess = fixture_session() a1 = A(data="a1", bs=[B(data="b1")]) sess.add(a1) sess.flush() sess.delete(a1) sess.flush() eq_( sess.execute(select(func.count("*")).select_from(atob)).scalar(), 0 ) eq_(sess.execute(select(func.count("*")).select_from(b)).scalar(), 0) eq_(sess.execute(select(func.count("*")).select_from(a)).scalar(), 0) def test_single_parent_error(self): a, A, B, b, atob = ( self.tables.a, self.classes.A, self.classes.B, self.tables.b, self.tables.atob, ) self.mapper_registry.map_imperatively( A, a, properties={ "bs": relationship( B, secondary=atob, cascade="all, delete-orphan" ) }, ) self.mapper_registry.map_imperatively(B, b) assert_raises_message( sa_exc.ArgumentError, "For many-to-many relationship A.bs, delete-orphan cascade", configure_mappers, ) def test_single_parent_raise(self): a, A, B, b, atob = ( self.tables.a, self.classes.A, self.classes.B, self.tables.b, self.tables.atob, ) self.mapper_registry.map_imperatively( A, a, properties={ "bs": relationship( B, secondary=atob, cascade="all, delete-orphan", single_parent=True, ) }, ) self.mapper_registry.map_imperatively(B, b) b1 = B(data="b1") A(data="a1", bs=[b1]) assert_raises(sa_exc.InvalidRequestError, A, data="a2", bs=[b1]) def test_single_parent_backref(self): """test that setting m2m via a uselist=False backref bypasses the single_parent raise""" a, A, B, b, atob = ( self.tables.a, self.classes.A, self.classes.B, self.tables.b, self.tables.atob, ) self.mapper_registry.map_imperatively( A, a, properties={ "bs": relationship( B, secondary=atob, cascade="all, delete-orphan", single_parent=True, backref=backref("a", uselist=False), ) }, ) self.mapper_registry.map_imperatively(B, b) b1 = B(data="b1") a1 = A(data="a1", bs=[b1]) assert_raises(sa_exc.InvalidRequestError, A, data="a2", bs=[b1]) a2 = A(data="a2") b1.a = a2 assert b1 not in a1.bs assert b1 in a2.bs def test_none_m2m_collection_assignment(self): a, A, B, b, atob = ( self.tables.a, self.classes.A, self.classes.B, self.tables.b, self.tables.atob, ) self.mapper_registry.map_imperatively( A, a, properties={"bs": relationship(B, secondary=atob, backref="as")}, ) self.mapper_registry.map_imperatively(B, b) s = fixture_session() a1 = A(bs=[None]) s.add(a1) eq_(a1.bs, [None]) assert_raises_message( orm_exc.FlushError, "Can't flush None value found in collection A.bs", s.commit, ) eq_(a1.bs, [None]) def test_none_m2m_collection_append(self): a, A, B, b, atob = ( self.tables.a, self.classes.A, self.classes.B, self.tables.b, self.tables.atob, ) self.mapper_registry.map_imperatively( A, a, properties={"bs": relationship(B, secondary=atob, backref="as")}, ) self.mapper_registry.map_imperatively(B, b) s = fixture_session() a1 = A() a1.bs.append(None) s.add(a1) eq_(a1.bs, [None]) assert_raises_message( orm_exc.FlushError, "Can't flush None value found in collection A.bs", s.commit, ) eq_(a1.bs, [None]) class O2MSelfReferentialDetelOrphanTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "node", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("parent_id", Integer, ForeignKey("node.id")), ) @classmethod def setup_classes(cls): class Node(cls.Basic): pass @classmethod def setup_mappers(cls): Node = cls.classes.Node node = cls.tables.node cls.mapper_registry.map_imperatively( Node, node, properties={ "children": relationship( Node, cascade="all, delete-orphan", backref=backref("parent", remote_side=node.c.id), ) }, ) def test_self_referential_delete(self): Node = self.classes.Node s = fixture_session() n1, n2, n3, n4 = Node(), Node(), Node(), Node() n1.children = [n2, n3] n3.children = [n4] s.add_all([n1, n2, n3, n4]) s.commit() eq_(s.query(Node).count(), 4) n1.children.remove(n3) s.commit() eq_(s.query(Node).count(), 2) class NoBackrefCascadeTest(_fixtures.FixtureTest): run_inserts = None @classmethod def setup_mappers(cls): addresses, Dingaling, User, dingalings, Address, users = ( cls.tables.addresses, cls.classes.Dingaling, cls.classes.User, cls.tables.dingalings, cls.classes.Address, cls.tables.users, ) cls.mapper_registry.map_imperatively(Address, addresses) cls.mapper_registry.map_imperatively( User, users, properties={"addresses": relationship(Address, backref="user")}, ) cls.mapper_registry.map_imperatively( Dingaling, dingalings, properties={ "address": relationship(Address, backref="dingalings") }, ) def test_o2m_basic(self): User, Address = self.classes.User, self.classes.Address sess = fixture_session() u1 = User(name="u1") sess.add(u1) a1 = Address(email_address="a1") a1.user = u1 assert a1 not in sess def test_o2m_commit_warns(self): User, Address = self.classes.User, self.classes.Address sess = fixture_session() u1 = User(name="u1") sess.add(u1) a1 = Address(email_address="a1") a1.user = u1 assert_warns_message(sa_exc.SAWarning, "not in session", sess.commit) assert a1 not in sess def test_o2m_on_backref_no_cascade(self): Dingaling, Address = self.classes.Dingaling, self.classes.Address sess = fixture_session() a1 = Address(email_address="a1") sess.add(a1) d1 = Dingaling() d1.address = a1 assert d1 in a1.dingalings assert d1 not in sess def test_m2o_basic(self): Dingaling, Address = self.classes.Dingaling, self.classes.Address sess = fixture_session() a1 = Address(email_address="a1") d1 = Dingaling() sess.add(d1) a1.dingalings.append(d1) assert a1 not in sess def test_m2o_on_backref_no_cascade(self): User, Address = self.classes.User, self.classes.Address sess = fixture_session() a1 = Address(email_address="a1") sess.add(a1) u1 = User(name="u1") u1.addresses.append(a1) assert u1 not in sess def test_m2o_commit_no_cascade(self): Dingaling, Address = self.classes.Dingaling, self.classes.Address sess = fixture_session() a1 = Address(email_address="a1") d1 = Dingaling() sess.add(d1) a1.dingalings.append(d1) assert a1 not in sess assert_warns_message(sa_exc.SAWarning, "not in session", sess.commit) class PendingOrphanTestSingleLevel(fixtures.MappedTest): """Pending entities that are orphans""" @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column( "user_id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("name", String(40)), ) Table( "addresses", metadata, Column( "address_id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("user_id", Integer, ForeignKey("users.user_id")), Column("email_address", String(40)), ) Table( "orders", metadata, Column( "order_id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column( "user_id", Integer, ForeignKey("users.user_id"), nullable=False ), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Address(cls.Comparable): pass class Order(cls.Comparable): pass def test_pending_standalone_orphan(self): """Standalone 'orphan' objects can now be persisted, if the underlying constraints of the database allow it. This now supports persisting of objects based on foreign key values alone. """ users, orders, User, Address, Order, addresses = ( self.tables.users, self.tables.orders, self.classes.User, self.classes.Address, self.classes.Order, self.tables.addresses, ) self.mapper_registry.map_imperatively(Order, orders) self.mapper_registry.map_imperatively(Address, addresses) self.mapper_registry.map_imperatively( User, users, properties=dict( addresses=relationship( Address, cascade="all,delete-orphan", backref="user" ), orders=relationship(Order, cascade="all, delete-orphan"), ), ) s = fixture_session() # the standalone Address goes in, its foreign key # allows NULL a = Address() s.add(a) s.commit() # the standalone Order does not. o = Order() s.add(o) assert_raises(sa_exc.DBAPIError, s.commit) s.rollback() # can assign o.user_id by foreign key, # flush succeeds u = User() s.add(u) s.flush() o = Order(user_id=u.user_id) s.add(o) s.commit() assert o in s and o not in s.new def test_pending_collection_expunge(self): """Removing a pending item from a collection expunges it from the session.""" users, Address, addresses, User = ( self.tables.users, self.classes.Address, self.tables.addresses, self.classes.User, ) self.mapper_registry.map_imperatively(Address, addresses) self.mapper_registry.map_imperatively( User, users, properties=dict( addresses=relationship( Address, cascade="all,delete-orphan", backref="user" ) ), ) s = fixture_session() u = User() s.add(u) s.flush() a = Address() u.addresses.append(a) assert a in s u.addresses.remove(a) assert a not in s s.delete(u) s.flush() assert a.address_id is None, "Error: address should not be persistent" def test_nonorphans_ok(self): users, Address, addresses, User = ( self.tables.users, self.classes.Address, self.tables.addresses, self.classes.User, ) self.mapper_registry.map_imperatively(Address, addresses) self.mapper_registry.map_imperatively( User, users, properties=dict( addresses=relationship( Address, cascade="all,delete", backref="user" ) ), ) s = fixture_session() u = User(name="u1", addresses=[Address(email_address="ad1")]) s.add(u) a1 = u.addresses[0] u.addresses.remove(a1) assert a1 in s s.flush() s.expunge_all() eq_(s.query(Address).all(), [Address(email_address="ad1")]) class PendingOrphanTestTwoLevel(fixtures.MappedTest): """test usages stated at https://article.gmane.org/gmane.comp.python.sqlalchemy.user/3085 https://article.gmane.org/gmane.comp.python.sqlalchemy.user/3119 """ @classmethod def define_tables(cls, metadata): Table( "order", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), ) Table( "item", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column( "order_id", Integer, ForeignKey("order.id"), nullable=False ), ) Table( "attribute", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("item_id", Integer, ForeignKey("item.id"), nullable=False), ) @classmethod def setup_classes(cls): class Order(cls.Comparable): pass class Item(cls.Comparable): pass class Attribute(cls.Comparable): pass def test_singlelevel_remove(self): item, Order, order, Item = ( self.tables.item, self.classes.Order, self.tables.order, self.classes.Item, ) self.mapper_registry.map_imperatively( Order, order, properties={ "items": relationship(Item, cascade="all, delete-orphan") }, ) self.mapper_registry.map_imperatively(Item, item) s = fixture_session() o1 = Order() s.add(o1) i1 = Item() o1.items.append(i1) o1.items.remove(i1) s.commit() assert i1 not in o1.items def test_multilevel_remove(self): Item, Attribute, order, item, attribute, Order = ( self.classes.Item, self.classes.Attribute, self.tables.order, self.tables.item, self.tables.attribute, self.classes.Order, ) self.mapper_registry.map_imperatively( Order, order, properties={ "items": relationship(Item, cascade="all, delete-orphan") }, ) self.mapper_registry.map_imperatively( Item, item, properties={ "attributes": relationship( Attribute, cascade="all, delete-orphan" ) }, ) self.mapper_registry.map_imperatively(Attribute, attribute) s = fixture_session() o1 = Order() s.add(o1) i1 = Item() a1 = Attribute() i1.attributes.append(a1) o1.items.append(i1) assert i1 in s assert a1 in s # i1 is an orphan so the operation # removes 'i1'. The operation # cascades down to 'a1'. o1.items.remove(i1) assert i1 not in s assert a1 not in s s.commit() assert o1 in s assert a1 not in s assert i1 not in s assert a1 not in o1.items class DoubleParentO2MOrphanTest(fixtures.MappedTest): """Test orphan behavior on an entity that requires two parents via many-to-one (one-to-many collection.). """ @classmethod def define_tables(cls, meta): Table( "sales_reps", meta, Column( "sales_rep_id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("name", String(50)), ) Table( "accounts", meta, Column( "account_id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("balance", Integer), ) Table( "customers", meta, Column( "customer_id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("name", String(50)), Column( "sales_rep_id", Integer, ForeignKey("sales_reps.sales_rep_id") ), Column("account_id", Integer, ForeignKey("accounts.account_id")), ) def _fixture(self, legacy_is_orphan, uselist): sales_reps, customers, accounts = ( self.tables.sales_reps, self.tables.customers, self.tables.accounts, ) class Customer(fixtures.ComparableEntity): pass class Account(fixtures.ComparableEntity): pass class SalesRep(fixtures.ComparableEntity): pass self.mapper_registry.map_imperatively( Customer, customers, legacy_is_orphan=legacy_is_orphan ) self.mapper_registry.map_imperatively( Account, accounts, properties=dict( customers=relationship( Customer, cascade="all,delete-orphan", backref="account", uselist=uselist, ) ), ) self.mapper_registry.map_imperatively( SalesRep, sales_reps, properties=dict( customers=relationship( Customer, cascade="all,delete-orphan", backref="sales_rep", uselist=uselist, ) ), ) s = fixture_session(expire_on_commit=False, autoflush=False) a = Account(balance=0) sr = SalesRep(name="John") s.add_all((a, sr)) s.commit() c = Customer(name="Jane") if uselist: a.customers.append(c) sr.customers.append(c) else: a.customers = c sr.customers = c assert c in s return s, c, a, sr def test_double_parent_expunge_o2m_legacy(self): """test the delete-orphan uow event for multiple delete-orphan parent relationships.""" s, c, a, sr = self._fixture(True, True) a.customers.remove(c) assert c in s, "Should not expunge customer yet, still has one parent" sr.customers.remove(c) assert c not in s, "Should expunge customer when both parents are gone" def test_double_parent_expunge_o2m_current(self): """test the delete-orphan uow event for multiple delete-orphan parent relationships.""" s, c, a, sr = self._fixture(False, True) a.customers.remove(c) assert c not in s, "Should expunge customer when either parent is gone" sr.customers.remove(c) assert c not in s, "Should expunge customer when both parents are gone" def test_double_parent_expunge_o2o_legacy(self): """test the delete-orphan uow event for multiple delete-orphan parent relationships.""" s, c, a, sr = self._fixture(True, False) a.customers = None assert c in s, "Should not expunge customer yet, still has one parent" sr.customers = None assert c not in s, "Should expunge customer when both parents are gone" def test_double_parent_expunge_o2o_current(self): """test the delete-orphan uow event for multiple delete-orphan parent relationships.""" s, c, a, sr = self._fixture(False, False) a.customers = None assert c not in s, "Should expunge customer when either parent is gone" sr.customers = None assert c not in s, "Should expunge customer when both parents are gone" class DoubleParentM2OOrphanTest(fixtures.MappedTest): """Test orphan behavior on an entity that requires two parents via one-to-many (many-to-one reference to the orphan). """ @classmethod def define_tables(cls, metadata): Table( "addresses", metadata, Column( "address_id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("street", String(30)), ) Table( "homes", metadata, Column( "home_id", Integer, primary_key=True, key="id", test_needs_autoincrement=True, ), Column("description", String(30)), Column( "address_id", Integer, ForeignKey("addresses.address_id"), nullable=False, ), ) Table( "businesses", metadata, Column( "business_id", Integer, primary_key=True, key="id", test_needs_autoincrement=True, ), Column("description", String(30), key="description"), Column( "address_id", Integer, ForeignKey("addresses.address_id"), nullable=False, ), ) def test_non_orphan(self): """test that an entity can have two parent delete-orphan cascades, and persists normally.""" homes, businesses, addresses = ( self.tables.homes, self.tables.businesses, self.tables.addresses, ) class Address(fixtures.ComparableEntity): pass class Home(fixtures.ComparableEntity): pass class Business(fixtures.ComparableEntity): pass self.mapper_registry.map_imperatively(Address, addresses) self.mapper_registry.map_imperatively( Home, homes, properties={ "address": relationship( Address, cascade="all,delete-orphan", single_parent=True ) }, ) self.mapper_registry.map_imperatively( Business, businesses, properties={ "address": relationship( Address, cascade="all,delete-orphan", single_parent=True ) }, ) session = fixture_session() h1 = Home(description="home1", address=Address(street="address1")) b1 = Business( description="business1", address=Address(street="address2") ) session.add_all((h1, b1)) session.flush() session.expunge_all() eq_( session.get(Home, h1.id), Home(description="home1", address=Address(street="address1")), ) eq_( session.get(Business, b1.id), Business( description="business1", address=Address(street="address2") ), ) def test_orphan(self): """test that an entity can have two parent delete-orphan cascades, and is detected as an orphan when saved without a parent.""" homes, businesses, addresses = ( self.tables.homes, self.tables.businesses, self.tables.addresses, ) class Address(fixtures.ComparableEntity): pass class Home(fixtures.ComparableEntity): pass class Business(fixtures.ComparableEntity): pass self.mapper_registry.map_imperatively(Address, addresses) self.mapper_registry.map_imperatively( Home, homes, properties={ "address": relationship( Address, cascade="all,delete-orphan", single_parent=True ) }, ) self.mapper_registry.map_imperatively( Business, businesses, properties={ "address": relationship( Address, cascade="all,delete-orphan", single_parent=True ) }, ) session = fixture_session() a1 = Address() session.add(a1) session.flush() class CollectionAssignmentOrphanTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "table_a", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30)), ) Table( "table_b", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30)), Column("a_id", Integer, ForeignKey("table_a.id")), ) def test_basic(self): table_b, table_a = self.tables.table_b, self.tables.table_a class A(fixtures.ComparableEntity): pass class B(fixtures.ComparableEntity): pass self.mapper_registry.map_imperatively( A, table_a, properties={"bs": relationship(B, cascade="all, delete-orphan")}, ) self.mapper_registry.map_imperatively(B, table_b) a1 = A(name="a1", bs=[B(name="b1"), B(name="b2"), B(name="b3")]) sess = fixture_session() sess.add(a1) sess.flush() sess.expunge_all() eq_( sess.get(A, a1.id), A(name="a1", bs=[B(name="b1"), B(name="b2"), B(name="b3")]), ) a1 = sess.get(A, a1.id) assert not class_mapper(B)._is_orphan( attributes.instance_state(a1.bs[0]) ) a1.bs[0].foo = "b2modified" a1.bs[1].foo = "b3modified" sess.flush() sess.expunge_all() eq_( sess.get(A, a1.id), A(name="a1", bs=[B(name="b1"), B(name="b2"), B(name="b3")]), ) class OrphanCriterionTest(fixtures.MappedTest): @classmethod def define_tables(self, metadata): Table( "core", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("related_one_id", Integer, ForeignKey("related_one.id")), Column("related_two_id", Integer, ForeignKey("related_two.id")), ) Table( "related_one", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), ) Table( "related_two", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), ) def _fixture( self, legacy_is_orphan, persistent, r1_present, r2_present, detach_event=True, ): class Core: pass class RelatedOne: def __init__(self, cores): self.cores = cores class RelatedTwo: def __init__(self, cores): self.cores = cores self.mapper_registry.map_imperatively( Core, self.tables.core, legacy_is_orphan=legacy_is_orphan ) self.mapper_registry.map_imperatively( RelatedOne, self.tables.related_one, properties={ "cores": relationship( Core, cascade="all, delete-orphan", backref="r1" ) }, ) self.mapper_registry.map_imperatively( RelatedTwo, self.tables.related_two, properties={ "cores": relationship( Core, cascade="all, delete-orphan", backref="r2" ) }, ) c1 = Core() if detach_event: RelatedOne(cores=[c1]) RelatedTwo(cores=[c1]) else: if r1_present: RelatedOne(cores=[c1]) if r2_present: RelatedTwo(cores=[c1]) if persistent: s = fixture_session() s.add(c1) s.flush() if detach_event: if not r1_present: c1.r1 = None if not r2_present: c1.r2 = None return c1 def _assert_not_orphan(self, c1): mapper = object_mapper(c1) state = instance_state(c1) assert not mapper._is_orphan(state) def _assert_is_orphan(self, c1): mapper = object_mapper(c1) state = instance_state(c1) assert mapper._is_orphan(state) def test_leg_pers_r1_r2(self): c1 = self._fixture(True, True, True, True) self._assert_not_orphan(c1) def test_current_pers_r1_r2(self): c1 = self._fixture(False, True, True, True) self._assert_not_orphan(c1) def test_leg_pers_r1_notr2(self): c1 = self._fixture(True, True, True, False) self._assert_not_orphan(c1) def test_current_pers_r1_notr2(self): c1 = self._fixture(False, True, True, False) self._assert_is_orphan(c1) def test_leg_pers_notr1_notr2(self): c1 = self._fixture(True, True, False, False) self._assert_is_orphan(c1) def test_current_pers_notr1_notr2(self): c1 = self._fixture(False, True, True, False) self._assert_is_orphan(c1) def test_leg_transient_r1_r2(self): c1 = self._fixture(True, False, True, True) self._assert_not_orphan(c1) def test_current_transient_r1_r2(self): c1 = self._fixture(False, False, True, True) self._assert_not_orphan(c1) def test_leg_transient_r1_notr2(self): c1 = self._fixture(True, False, True, False) self._assert_not_orphan(c1) def test_current_transient_r1_notr2(self): c1 = self._fixture(False, False, True, False) self._assert_is_orphan(c1) def test_leg_transient_notr1_notr2(self): c1 = self._fixture(True, False, False, False) self._assert_is_orphan(c1) def test_current_transient_notr1_notr2(self): c1 = self._fixture(False, False, False, False) self._assert_is_orphan(c1) def test_leg_transient_notr1_notr2_noevent(self): c1 = self._fixture(True, False, False, False, False) self._assert_is_orphan(c1) def test_current_transient_notr1_notr2_noevent(self): c1 = self._fixture(False, False, False, False, False) self._assert_is_orphan(c1) def test_leg_persistent_notr1_notr2_noevent(self): c1 = self._fixture(True, True, False, False, False) self._assert_not_orphan(c1) def test_current_persistent_notr1_notr2_noevent(self): c1 = self._fixture(False, True, False, False, False) self._assert_not_orphan(c1) class O2MConflictTest(fixtures.MappedTest): """test that O2M dependency detects a change in parent, does the right thing, and updates the collection/attribute. """ @classmethod def define_tables(cls, metadata): Table( "parent", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), ) Table( "child", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column( "parent_id", Integer, ForeignKey("parent.id"), nullable=False ), ) @classmethod def setup_classes(cls): class Parent(cls.Comparable): pass class Child(cls.Comparable): pass def _do_move_test(self, delete_old): Parent, Child = self.classes.Parent, self.classes.Child with fixture_session(autoflush=False) as sess: p1, p2, c1 = Parent(), Parent(), Child() if Parent.child.property.uselist: p1.child.append(c1) else: p1.child = c1 sess.add_all([p1, c1]) sess.flush() if delete_old: sess.delete(p1) if Parent.child.property.uselist: p2.child.append(c1) else: p2.child = c1 sess.add(p2) sess.flush() eq_(sess.query(Child).filter(Child.parent_id == p2.id).all(), [c1]) def test_o2o_delete_old(self): Child, Parent, parent, child = ( self.classes.Child, self.classes.Parent, self.tables.parent, self.tables.child, ) self.mapper_registry.map_imperatively( Parent, parent, properties={"child": relationship(Child, uselist=False)}, ) self.mapper_registry.map_imperatively(Child, child) self._do_move_test(True) self._do_move_test(False) def test_o2m_delete_old(self): Child, Parent, parent, child = ( self.classes.Child, self.classes.Parent, self.tables.parent, self.tables.child, ) self.mapper_registry.map_imperatively( Parent, parent, properties={"child": relationship(Child, uselist=True)}, ) self.mapper_registry.map_imperatively(Child, child) self._do_move_test(True) self._do_move_test(False) def test_o2o_backref_delete_old(self): Child, Parent, parent, child = ( self.classes.Child, self.classes.Parent, self.tables.parent, self.tables.child, ) self.mapper_registry.map_imperatively( Parent, parent, properties={ "child": relationship( Child, uselist=False, backref=backref( "parent", ), ) }, ) self.mapper_registry.map_imperatively(Child, child) self._do_move_test(True) self._do_move_test(False) def test_o2o_delcascade_delete_old(self): Child, Parent, parent, child = ( self.classes.Child, self.classes.Parent, self.tables.parent, self.tables.child, ) self.mapper_registry.map_imperatively( Parent, parent, properties={ "child": relationship( Child, uselist=False, cascade="all, delete" ) }, ) self.mapper_registry.map_imperatively(Child, child) self._do_move_test(True) self._do_move_test(False) def test_o2o_delorphan_delete_old(self): Child, Parent, parent, child = ( self.classes.Child, self.classes.Parent, self.tables.parent, self.tables.child, ) self.mapper_registry.map_imperatively( Parent, parent, properties={ "child": relationship( Child, uselist=False, cascade="all, delete, delete-orphan" ) }, ) self.mapper_registry.map_imperatively(Child, child) self._do_move_test(True) self._do_move_test(False) def test_o2o_delorphan_backref_delete_old(self): Child, Parent, parent, child = ( self.classes.Child, self.classes.Parent, self.tables.parent, self.tables.child, ) self.mapper_registry.map_imperatively( Parent, parent, properties={ "child": relationship( Child, uselist=False, cascade="all, delete, delete-orphan", backref=backref("parent"), ) }, ) self.mapper_registry.map_imperatively(Child, child) self._do_move_test(True) self._do_move_test(False) def test_o2o_backref_delorphan_delete_old(self): Child, Parent, parent, child = ( self.classes.Child, self.classes.Parent, self.tables.parent, self.tables.child, ) self.mapper_registry.map_imperatively(Parent, parent) self.mapper_registry.map_imperatively( Child, child, properties={ "parent": relationship( Parent, uselist=False, single_parent=True, backref=backref("child", uselist=False), cascade="all,delete,delete-orphan", ) }, ) self._do_move_test(True) self._do_move_test(False) def test_o2m_backref_delorphan_delete_old(self): Child, Parent, parent, child = ( self.classes.Child, self.classes.Parent, self.tables.parent, self.tables.child, ) self.mapper_registry.map_imperatively(Parent, parent) self.mapper_registry.map_imperatively( Child, child, properties={ "parent": relationship( Parent, uselist=False, single_parent=True, backref=backref("child", uselist=True), cascade="all,delete,delete-orphan", ) }, ) self._do_move_test(True) self._do_move_test(False) class PartialFlushTest(fixtures.MappedTest): """test cascade behavior as it relates to object lists passed to flush(). """ @classmethod def define_tables(cls, metadata): Table( "base", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("descr", String(50)), ) Table( "noninh_child", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("base_id", Integer, ForeignKey("base.id")), ) Table( "parent", metadata, Column("id", Integer, ForeignKey("base.id"), primary_key=True), ) Table( "inh_child", metadata, Column("id", Integer, ForeignKey("base.id"), primary_key=True), Column("parent_id", Integer, ForeignKey("parent.id")), ) def test_o2m_m2o(self): base, noninh_child = self.tables.base, self.tables.noninh_child class Base(fixtures.ComparableEntity): pass class Child(fixtures.ComparableEntity): pass self.mapper_registry.map_imperatively( Base, base, properties={"children": relationship(Child, backref="parent")}, ) self.mapper_registry.map_imperatively(Child, noninh_child) sess = fixture_session() c1, c2 = Child(), Child() b1 = Base(descr="b1", children=[c1, c2]) sess.add(b1) assert c1 in sess.new assert c2 in sess.new sess.flush([b1]) # c1, c2 get cascaded into the session on o2m. # not sure if this is how I like this # to work but that's how it works for now. assert c1 in sess and c1 not in sess.new assert c2 in sess and c2 not in sess.new assert b1 in sess and b1 not in sess.new sess = fixture_session() c1, c2 = Child(), Child() b1 = Base(descr="b1", children=[c1, c2]) sess.add(b1) sess.flush([c1]) # m2o, otoh, doesn't cascade up the other way. assert c1 in sess and c1 not in sess.new assert c2 in sess and c2 in sess.new assert b1 in sess and b1 in sess.new sess = fixture_session() c1, c2 = Child(), Child() b1 = Base(descr="b1", children=[c1, c2]) sess.add(b1) sess.flush([c1, c2]) # m2o, otoh, doesn't cascade up the other way. assert c1 in sess and c1 not in sess.new assert c2 in sess and c2 not in sess.new assert b1 in sess and b1 in sess.new def test_circular_sort(self): """test ticket 1306""" base, inh_child, parent = ( self.tables.base, self.tables.inh_child, self.tables.parent, ) class Base(fixtures.ComparableEntity): pass class Parent(Base): pass class Child(Base): pass self.mapper_registry.map_imperatively(Base, base) self.mapper_registry.map_imperatively( Child, inh_child, inherits=Base, properties={ "parent": relationship( Parent, backref="children", primaryjoin=inh_child.c.parent_id == parent.c.id, ) }, ) self.mapper_registry.map_imperatively(Parent, parent, inherits=Base) sess = fixture_session() p1 = Parent() c1, c2, c3 = Child(), Child(), Child() p1.children = [c1, c2, c3] sess.add(p1) sess.flush([c1]) assert p1 in sess.new assert c1 not in sess.new assert c2 in sess.new class SubclassCascadeTest(fixtures.DeclarativeMappedTest): @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic class Company(Base): __tablename__ = "company" id = Column(Integer, primary_key=True) name = Column(String(50)) employees = relationship("Employee", cascade="all, delete-orphan") class Employee(Base): __tablename__ = "employee" id = Column(Integer, primary_key=True) name = Column(String(50)) type = Column(String(50)) company_id = Column(ForeignKey("company.id")) __mapper_args__ = { "polymorphic_identity": "employee", "polymorphic_on": type, } class Engineer(Employee): __tablename__ = "engineer" id = Column(Integer, ForeignKey("employee.id"), primary_key=True) engineer_name = Column(String(30)) languages = relationship("Language", cascade="all, delete-orphan") __mapper_args__ = {"polymorphic_identity": "engineer"} class MavenBuild(Base): __tablename__ = "maven_build" id = Column(Integer, primary_key=True) java_language_id = Column( ForeignKey("java_language.id"), nullable=False ) class Manager(Employee): __tablename__ = "manager" id = Column(Integer, ForeignKey("employee.id"), primary_key=True) manager_name = Column(String(30)) __mapper_args__ = {"polymorphic_identity": "manager"} class Language(Base): __tablename__ = "language" id = Column(Integer, primary_key=True) engineer_id = Column(ForeignKey("engineer.id"), nullable=False) name = Column(String(50)) type = Column(String(50)) __mapper_args__ = { "polymorphic_on": type, "polymorphic_identity": "language", } class JavaLanguage(Language): __tablename__ = "java_language" id = Column(ForeignKey("language.id"), primary_key=True) maven_builds = relationship( "MavenBuild", cascade="all, delete-orphan" ) __mapper_args__ = {"polymorphic_identity": "java_language"} def test_cascade_iterator_polymorphic(self): ( Company, Employee, Engineer, Language, JavaLanguage, MavenBuild, ) = self.classes( "Company", "Employee", "Engineer", "Language", "JavaLanguage", "MavenBuild", ) obj = Company( employees=[ Engineer( languages=[ JavaLanguage(name="java", maven_builds=[MavenBuild()]) ] ) ] ) eng = obj.employees[0] lang = eng.languages[0] maven_build = lang.maven_builds[0] from sqlalchemy import inspect state = inspect(obj) it = inspect(Company).cascade_iterator("save-update", state) eq_({rec[0] for rec in it}, {eng, maven_build, lang}) state = inspect(eng) it = inspect(Employee).cascade_iterator("save-update", state) eq_({rec[0] for rec in it}, {maven_build, lang}) def test_delete_orphan_round_trip(self): ( Company, Employee, Engineer, Language, JavaLanguage, MavenBuild, ) = self.classes( "Company", "Employee", "Engineer", "Language", "JavaLanguage", "MavenBuild", ) obj = Company( employees=[ Engineer( languages=[ JavaLanguage(name="java", maven_builds=[MavenBuild()]) ] ) ] ) s = fixture_session() s.add(obj) s.commit() obj.employees = [] s.commit() eq_(s.query(Language).count(), 0) class ViewonlyCascadeUpdate(fixtures.MappedTest): """Test that cascades are trimmed accordingly when viewonly is set. Originally #4993 and #4994 this was raising an error for invalid cascades. in 2.0 this is simplified to just remove the write cascades, allows the default cascade to be reasonable. """ @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column("id", Integer, primary_key=True), Column("name", String(30)), ) Table( "orders", metadata, Column("id", Integer, primary_key=True), Column("user_id", Integer), Column("description", String(30)), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Order(cls.Comparable): pass @testing.combinations( ({"delete"}, {"none"}), ( {"all, delete-orphan"}, {"refresh-expire", "expunge", "merge"}, ), ({"save-update, expunge"}, {"expunge"}), ) def test_write_cascades(self, setting, expected): Order = self.classes.Order r = relationship( Order, primaryjoin=( self.tables.users.c.id == foreign(self.tables.orders.c.user_id) ), cascade=", ".join(sorted(setting)), viewonly=True, ) eq_(r.cascade, CascadeOptions(expected)) def test_expunge_cascade(self): User, Order, orders, users = ( self.classes.User, self.classes.Order, self.tables.orders, self.tables.users, ) self.mapper_registry.map_imperatively(Order, orders) self.mapper_registry.map_imperatively( User, users, properties={ "orders": relationship( Order, primaryjoin=( self.tables.users.c.id == foreign(self.tables.orders.c.user_id) ), cascade="expunge", viewonly=True, ) }, ) sess = fixture_session() u = User(id=1, name="jack") sess.add(u) sess.add_all( [ Order(id=1, user_id=1, description="someorder"), Order(id=2, user_id=1, description="someotherorder"), ] ) sess.commit() u1 = sess.query(User).first() orders = u1.orders eq_(len(orders), 2) in_(orders[0], sess) in_(orders[1], sess) sess.expunge(u1) not_in(orders[0], sess) not_in(orders[1], sess) def test_default_none_cascade(self): User, Order, orders, users = ( self.classes.User, self.classes.Order, self.tables.orders, self.tables.users, ) self.mapper_registry.map_imperatively(Order, orders) self.mapper_registry.map_imperatively( User, users, properties={ "orders": relationship( Order, primaryjoin=( self.tables.users.c.id == foreign(self.tables.orders.c.user_id) ), viewonly=True, ) }, ) sess = fixture_session() u1 = User(id=1, name="jack") sess.add(u1) o1, o2 = ( Order(id=1, user_id=1, description="someorder"), Order(id=2, user_id=1, description="someotherorder"), ) u1.orders.append(o1) u1.orders.append(o2) not_in(o1, sess) not_in(o2, sess) @testing.combinations( "persistent", "pending", argnames="collection_status" ) def test_default_merge_cascade(self, collection_status): User, Order, orders, users = ( self.classes.User, self.classes.Order, self.tables.orders, self.tables.users, ) self.mapper_registry.map_imperatively(Order, orders) self.mapper_registry.map_imperatively( User, users, properties={ "orders": relationship( Order, primaryjoin=( self.tables.users.c.id == foreign(self.tables.orders.c.user_id) ), viewonly=True, ) }, ) sess = fixture_session() u1 = User(id=1, name="jack") o1, o2 = ( Order(id=1, user_id=1, description="someorder"), Order(id=2, user_id=1, description="someotherorder"), ) if collection_status == "pending": # technically this is pointless, one should not be appending # to this collection u1.orders.append(o1) u1.orders.append(o2) elif collection_status == "persistent": sess.add(u1) sess.flush() sess.add_all([o1, o2]) sess.flush() u1.orders else: assert False u1 = sess.merge(u1) # in 1.4, as of #4993 this was asserting that u1.orders would # not be present in the new object. However, as observed during # #8862, this defeats schemes that seek to restore fully loaded # objects from caches which may even have lazy="raise", but # in any case would want to not emit new SQL on those collections. # so we assert here that u1.orders is in fact present assert "orders" in u1.__dict__ assert u1.__dict__["orders"] assert u1.orders def test_default_cascade(self): User, Order, orders, users = ( self.classes.User, self.classes.Order, self.tables.orders, self.tables.users, ) self.mapper_registry.map_imperatively(Order, orders) umapper = self.mapper_registry.map_imperatively( User, users, properties={ "orders": relationship( Order, primaryjoin=( self.tables.users.c.id == foreign(self.tables.orders.c.user_id) ), viewonly=True, ) }, ) eq_(umapper.attrs["orders"].cascade, {"merge"}) class CollectionCascadesNoBackrefTest(fixtures.TestBase): """test the removal of cascade_backrefs behavior see test/orm/test_deprecations.py::CollectionCascadesDespiteBackrefTest for the deprecated version """ @testing.fixture def cascade_fixture(self, registry): def go(collection_class): @registry.mapped class A: __tablename__ = "a" id = Column(Integer, primary_key=True) bs = relationship( "B", backref="a", collection_class=collection_class, ) @registry.mapped class B: __tablename__ = "b_" id = Column(Integer, primary_key=True) a_id = Column(ForeignKey("a.id")) key = Column(String) return A, B yield go @testing.combinations( (set, "add"), (list, "append"), (attribute_keyed_dict("key"), "__setitem__"), (attribute_keyed_dict("key"), "setdefault"), (attribute_keyed_dict("key"), "update_dict"), (attribute_keyed_dict("key"), "update_kw"), argnames="collection_class,methname", ) def test_cascades_on_collection( self, cascade_fixture, collection_class, methname ): A, B = cascade_fixture(collection_class) s = Session() a1 = A() s.add(a1) b1 = B(key="b1") b2 = B(key="b2") b3 = B(key="b3") b1.a = a1 b3.a = a1 assert b1 not in s assert b3 not in s if methname == "__setitem__": meth = getattr(a1.bs, methname) meth(b1.key, b1) meth(b2.key, b2) elif methname == "setdefault": meth = getattr(a1.bs, methname) meth(b1.key, b1) meth(b2.key, b2) elif methname == "update_dict" and isinstance(a1.bs, dict): a1.bs.update({b1.key: b1, b2.key: b2}) elif methname == "update_kw" and isinstance(a1.bs, dict): a1.bs.update(b1=b1, b2=b2) else: meth = getattr(a1.bs, methname) meth(b1) meth(b2) assert b1 in s assert b2 in s assert b3 not in s # the event never triggers from reverse