from sqlalchemy.testing.assertions import eq_, assert_raises from sqlalchemy.testing import fixtures from sqlalchemy import exc, testing from sqlalchemy.dialects.mysql import insert from sqlalchemy import Table, Column, Boolean, Integer, String, func class OnDuplicateTest(fixtures.TablesTest): __only_on__ = ("mysql",) __backend__ = True run_define_tables = "each" @classmethod def define_tables(cls, metadata): Table( "foos", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("bar", String(10)), Column("baz", String(10)), Column("updated_once", Boolean, default=False), ) def test_bad_args(self): assert_raises( ValueError, insert(self.tables.foos, values={}).on_duplicate_key_update, ) assert_raises( exc.ArgumentError, insert(self.tables.foos, values={}).on_duplicate_key_update, {"id": 1, "bar": "b"}, id=1, bar="b", ) assert_raises( exc.ArgumentError, insert(self.tables.foos, values={}).on_duplicate_key_update, {"id": 1, "bar": "b"}, {"id": 2, "bar": "baz"}, ) def test_on_duplicate_key_update(self): foos = self.tables.foos with testing.db.connect() as conn: conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) stmt = insert(foos).values( [dict(id=1, bar="ab"), dict(id=2, bar="b")] ) stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar) result = conn.execute(stmt) eq_(result.inserted_primary_key, [2]) eq_( conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), [(1, "ab", "bz", False)], ) def test_on_duplicate_key_update_preserve_order(self): foos = self.tables.foos with testing.db.connect() as conn: conn.execute( insert( foos, [ dict(id=1, bar="b", baz="bz"), dict(id=2, bar="b", baz="bz2"), ], ) ) stmt = insert(foos) update_condition = foos.c.updated_once == False # The following statements show importance of the columns update ordering # as old values being referenced in UPDATE clause are getting replaced one # by one from left to right with their new values. stmt1 = stmt.on_duplicate_key_update( [ ( "bar", func.if_( update_condition, func.values(foos.c.bar), foos.c.bar, ), ), ( "updated_once", func.if_(update_condition, True, foos.c.updated_once), ), ] ) stmt2 = stmt.on_duplicate_key_update( [ ( "updated_once", func.if_(update_condition, True, foos.c.updated_once), ), ( "bar", func.if_( update_condition, func.values(foos.c.bar), foos.c.bar, ), ), ] ) # First statement should succeed updating column bar conn.execute(stmt1, dict(id=1, bar="ab")) eq_( conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), [(1, "ab", "bz", True)], ) # Second statement will do noop update of column bar conn.execute(stmt2, dict(id=2, bar="ab")) eq_( conn.execute(foos.select().where(foos.c.id == 2)).fetchall(), [(2, "b", "bz2", True)], ) def test_last_inserted_id(self): foos = self.tables.foos with testing.db.connect() as conn: stmt = insert(foos).values({"bar": "b", "baz": "bz"}) result = conn.execute( stmt.on_duplicate_key_update( bar=stmt.inserted.bar, baz="newbz" ) ) eq_(result.inserted_primary_key, [1]) stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"}) result = conn.execute( stmt.on_duplicate_key_update( bar=stmt.inserted.bar, baz="newbz" ) ) eq_(result.inserted_primary_key, [1])