diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-07-18 15:08:37 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-09-24 11:15:32 -0400 |
| commit | 2bcc97da424eef7db9a5d02f81d02344925415ee (patch) | |
| tree | 13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /test | |
| parent | 332188e5680574368001ded52eb0a9d259ecdef5 (diff) | |
| download | sqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz | |
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends
when RETURNING is used,
except for Oracle that doesn't need it, and on
psycopg2 and mssql+pyodbc it is used for all INSERT statements,
not just those that use RETURNING.
third party dialects would need to opt in to the new feature
by setting use_insertmanyvalues to True.
Also adds dialect-level guards against using returning
with executemany where we dont have an implementation to
suit it. execute single w/ returning still defers to the
server without us checking.
Fixes: #6047
Fixes: #7907
Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'test')
| -rw-r--r-- | test/dialect/mssql/test_engine.py | 4 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_dialect.py | 440 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_on_conflict.py | 13 | ||||
| -rw-r--r-- | test/engine/test_execute.py | 91 | ||||
| -rw-r--r-- | test/engine/test_logging.py | 119 | ||||
| -rw-r--r-- | test/orm/test_unitofwork.py | 2 | ||||
| -rw-r--r-- | test/requirements.py | 5 | ||||
| -rw-r--r-- | test/sql/test_insert.py | 59 | ||||
| -rw-r--r-- | test/sql/test_insert_exec.py | 239 | ||||
| -rw-r--r-- | test/sql/test_returning.py | 473 |
10 files changed, 959 insertions, 486 deletions
diff --git a/test/dialect/mssql/test_engine.py b/test/dialect/mssql/test_engine.py index 3fb078ef4..d19e591b4 100644 --- a/test/dialect/mssql/test_engine.py +++ b/test/dialect/mssql/test_engine.py @@ -402,7 +402,9 @@ class FastExecutemanyTest(fixtures.TestBase): ) t.create(testing.db) - eng = engines.testing_engine(options={"fast_executemany": True}) + eng = engines.testing_engine( + options={"fast_executemany": True, "use_insertmanyvalues": False} + ) @event.listens_for(eng, "after_cursor_execute") def after_cursor_execute( diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index fdb114d57..27d4a4cf9 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -1,7 +1,6 @@ # coding: utf-8 import dataclasses import datetime -import itertools import logging import logging.handlers @@ -18,7 +17,6 @@ from sqlalchemy import extract from sqlalchemy import func from sqlalchemy import Integer from sqlalchemy import literal -from sqlalchemy import literal_column from sqlalchemy import MetaData from sqlalchemy import Numeric from sqlalchemy import schema @@ -32,15 +30,14 @@ from sqlalchemy import text from sqlalchemy import TypeDecorator from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import HSTORE -from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.dialects.postgresql import psycopg as psycopg_dialect from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect from sqlalchemy.dialects.postgresql import Range -from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH -from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_PLAIN from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES -from sqlalchemy.engine import cursor as _cursor +from sqlalchemy.dialects.postgresql.psycopg2 import ( + EXECUTEMANY_VALUES_PLUS_BATCH, +) from sqlalchemy.engine import url from sqlalchemy.sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL from sqlalchemy.testing import config @@ -60,11 +57,6 @@ from sqlalchemy.testing.assertions import eq_regex from sqlalchemy.testing.assertions import expect_raises from sqlalchemy.testing.assertions import ne_ -if True: - from sqlalchemy.dialects.postgresql.psycopg2 import ( - EXECUTEMANY_VALUES_PLUS_BATCH, - ) - class DialectTest(fixtures.TestBase): """python-side dialect tests.""" @@ -451,179 +443,6 @@ class ExecuteManyMode: Column("\u6e2c\u8a66", Integer), ) - @testing.combinations( - "insert", "pg_insert", "pg_insert_on_conflict", argnames="insert_type" - ) - def test_insert(self, connection, insert_type): - from psycopg2 import extras - - values_page_size = connection.dialect.executemany_values_page_size - batch_page_size = connection.dialect.executemany_batch_page_size - if connection.dialect.executemany_mode & EXECUTEMANY_VALUES: - meth = extras.execute_values - stmt = "INSERT INTO data (x, y) VALUES %s" - expected_kwargs = { - "template": "(%(x)s, %(y)s)", - "page_size": values_page_size, - "fetch": False, - } - elif connection.dialect.executemany_mode & EXECUTEMANY_BATCH: - meth = extras.execute_batch - stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)" - expected_kwargs = {"page_size": batch_page_size} - else: - assert False - - if insert_type == "pg_insert_on_conflict": - stmt += " ON CONFLICT DO NOTHING" - - with mock.patch.object( - extras, meth.__name__, side_effect=meth - ) as mock_exec: - if insert_type == "insert": - ins_stmt = self.tables.data.insert() - elif insert_type == "pg_insert": - ins_stmt = pg_insert(self.tables.data) - elif insert_type == "pg_insert_on_conflict": - ins_stmt = pg_insert(self.tables.data).on_conflict_do_nothing() - else: - assert False - - connection.execute( - ins_stmt, - [ - {"x": "x1", "y": "y1"}, - {"x": "x2", "y": "y2"}, - {"x": "x3", "y": "y3"}, - ], - ) - - eq_( - connection.execute(select(self.tables.data)).fetchall(), - [ - (1, "x1", "y1", 5), - (2, "x2", "y2", 5), - (3, "x3", "y3", 5), - ], - ) - eq_( - mock_exec.mock_calls, - [ - mock.call( - mock.ANY, - stmt, - [ - {"x": "x1", "y": "y1"}, - {"x": "x2", "y": "y2"}, - {"x": "x3", "y": "y3"}, - ], - **expected_kwargs, - ) - ], - ) - - def test_insert_no_page_size(self, connection): - from psycopg2 import extras - - values_page_size = connection.dialect.executemany_values_page_size - batch_page_size = connection.dialect.executemany_batch_page_size - - if connection.dialect.executemany_mode & EXECUTEMANY_VALUES: - meth = extras.execute_values - stmt = "INSERT INTO data (x, y) VALUES %s" - expected_kwargs = { - "template": "(%(x)s, %(y)s)", - "page_size": values_page_size, - "fetch": False, - } - elif connection.dialect.executemany_mode & EXECUTEMANY_BATCH: - meth = extras.execute_batch - stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)" - expected_kwargs = {"page_size": batch_page_size} - else: - assert False - - with mock.patch.object( - extras, meth.__name__, side_effect=meth - ) as mock_exec: - connection.execute( - self.tables.data.insert(), - [ - {"x": "x1", "y": "y1"}, - {"x": "x2", "y": "y2"}, - {"x": "x3", "y": "y3"}, - ], - ) - - eq_( - mock_exec.mock_calls, - [ - mock.call( - mock.ANY, - stmt, - [ - {"x": "x1", "y": "y1"}, - {"x": "x2", "y": "y2"}, - {"x": "x3", "y": "y3"}, - ], - **expected_kwargs, - ) - ], - ) - - def test_insert_page_size(self): - from psycopg2 import extras - - opts = self.options.copy() - opts["executemany_batch_page_size"] = 500 - opts["executemany_values_page_size"] = 1000 - - eng = engines.testing_engine(options=opts) - - if eng.dialect.executemany_mode & EXECUTEMANY_VALUES: - meth = extras.execute_values - stmt = "INSERT INTO data (x, y) VALUES %s" - expected_kwargs = { - "fetch": False, - "page_size": 1000, - "template": "(%(x)s, %(y)s)", - } - elif eng.dialect.executemany_mode & EXECUTEMANY_BATCH: - meth = extras.execute_batch - stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)" - expected_kwargs = {"page_size": 500} - else: - assert False - - with mock.patch.object( - extras, meth.__name__, side_effect=meth - ) as mock_exec: - with eng.begin() as conn: - conn.execute( - self.tables.data.insert(), - [ - {"x": "x1", "y": "y1"}, - {"x": "x2", "y": "y2"}, - {"x": "x3", "y": "y3"}, - ], - ) - - eq_( - mock_exec.mock_calls, - [ - mock.call( - mock.ANY, - stmt, - [ - {"x": "x1", "y": "y1"}, - {"x": "x2", "y": "y2"}, - {"x": "x3", "y": "y3"}, - ], - **expected_kwargs, - ) - ], - ) - def test_insert_unicode_keys(self, connection): table = self.tables["Unitéble2"] @@ -661,7 +480,10 @@ class ExecuteManyMode: ], ) - if connection.dialect.executemany_mode & EXECUTEMANY_BATCH: + if ( + connection.dialect.executemany_mode + is EXECUTEMANY_VALUES_PLUS_BATCH + ): eq_( mock_exec.mock_calls, [ @@ -680,7 +502,10 @@ class ExecuteManyMode: eq_(mock_exec.mock_calls, []) def test_not_sane_rowcount(self, connection): - if connection.dialect.executemany_mode & EXECUTEMANY_BATCH: + if ( + connection.dialect.executemany_mode + is EXECUTEMANY_VALUES_PLUS_BATCH + ): assert not connection.dialect.supports_sane_multi_rowcount else: assert connection.dialect.supports_sane_multi_rowcount @@ -709,257 +534,22 @@ class ExecuteManyMode: ) -class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest): - options = {"executemany_mode": "batch"} - - -class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest): - options = {"executemany_mode": "values_only"} - - def test_insert_returning_values(self, connection): - """the psycopg2 dialect needs to assemble a fully buffered result - with the return value of execute_values(). - - """ - t = self.tables.data - - conn = connection - page_size = conn.dialect.executemany_values_page_size or 100 - data = [ - {"x": "x%d" % i, "y": "y%d" % i} - for i in range(1, page_size * 5 + 27) - ] - result = conn.execute(t.insert().returning(t.c.x, t.c.y), data) - - eq_([tup[0] for tup in result.cursor.description], ["x", "y"]) - eq_(result.keys(), ["x", "y"]) - assert t.c.x in result.keys() - assert t.c.id not in result.keys() - assert not result._soft_closed - assert isinstance( - result.cursor_strategy, - _cursor.FullyBufferedCursorFetchStrategy, - ) - assert not result.cursor.closed - assert not result.closed - eq_(result.mappings().all(), data) - - assert result._soft_closed - # assert result.closed - assert result.cursor is None - - def test_insert_returning_preexecute_pk(self, metadata, connection): - counter = itertools.count(1) - - t = Table( - "t", - self.metadata, - Column( - "id", - Integer, - primary_key=True, - default=lambda: next(counter), - ), - Column("data", Integer), - ) - metadata.create_all(connection) - - result = connection.execute( - t.insert().return_defaults(), - [{"data": 1}, {"data": 2}, {"data": 3}], - ) - - eq_(result.inserted_primary_key_rows, [(1,), (2,), (3,)]) - - def test_insert_returning_defaults(self, connection): - t = self.tables.data - - conn = connection - - result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) - first_pk = result.inserted_primary_key[0] - - page_size = conn.dialect.executemany_values_page_size or 100 - total_rows = page_size * 5 + 27 - data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)] - result = conn.execute(t.insert().returning(t.c.id, t.c.z), data) - - eq_( - result.all(), - [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)], - ) - - def test_insert_return_pks_default_values(self, connection): - """test sending multiple, empty rows into an INSERT and getting primary - key values back. - - This has to use a format that indicates at least one DEFAULT in - multiple parameter sets, i.e. "INSERT INTO table (anycol) VALUES - (DEFAULT) (DEFAULT) (DEFAULT) ... RETURNING col" - - """ - t = self.tables.data - - conn = connection - - result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) - first_pk = result.inserted_primary_key[0] - - page_size = conn.dialect.executemany_values_page_size or 100 - total_rows = page_size * 5 + 27 - data = [{} for i in range(1, total_rows)] - result = conn.execute(t.insert().returning(t.c.id), data) - - eq_( - result.all(), - [(pk,) for pk in range(1 + first_pk, total_rows + first_pk)], - ) - - def test_insert_w_newlines(self, connection): - from psycopg2 import extras - - t = self.tables.data - - ins = ( - t.insert() - .inline() - .values( - id=bindparam("id"), - x=select(literal_column("5")) - .select_from(self.tables.data) - .scalar_subquery(), - y=bindparam("y"), - z=bindparam("z"), - ) - ) - # compiled SQL has a newline in it - eq_( - str(ins.compile(testing.db)), - "INSERT INTO data (id, x, y, z) VALUES (%(id)s, " - "(SELECT 5 \nFROM data), %(y)s, %(z)s)", - ) - meth = extras.execute_values - with mock.patch.object( - extras, "execute_values", side_effect=meth - ) as mock_exec: - - connection.execute( - ins, - [ - {"id": 1, "y": "y1", "z": 1}, - {"id": 2, "y": "y2", "z": 2}, - {"id": 3, "y": "y3", "z": 3}, - ], - ) - - eq_( - mock_exec.mock_calls, - [ - mock.call( - mock.ANY, - "INSERT INTO data (id, x, y, z) VALUES %s", - [ - {"id": 1, "y": "y1", "z": 1}, - {"id": 2, "y": "y2", "z": 2}, - {"id": 3, "y": "y3", "z": 3}, - ], - template="(%(id)s, (SELECT 5 \nFROM data), %(y)s, %(z)s)", - fetch=False, - page_size=connection.dialect.executemany_values_page_size, - ) - ], - ) - - def test_insert_modified_by_event(self, connection): - from psycopg2 import extras - - t = self.tables.data - - ins = ( - t.insert() - .inline() - .values( - id=bindparam("id"), - x=select(literal_column("5")) - .select_from(self.tables.data) - .scalar_subquery(), - y=bindparam("y"), - z=bindparam("z"), - ) - ) - # compiled SQL has a newline in it - eq_( - str(ins.compile(testing.db)), - "INSERT INTO data (id, x, y, z) VALUES (%(id)s, " - "(SELECT 5 \nFROM data), %(y)s, %(z)s)", - ) - meth = extras.execute_batch - with mock.patch.object( - extras, "execute_values" - ) as mock_values, mock.patch.object( - extras, "execute_batch", side_effect=meth - ) as mock_batch: - - # create an event hook that will change the statement to - # something else, meaning the dialect has to detect that - # insert_single_values_expr is no longer useful - @event.listens_for( - connection, "before_cursor_execute", retval=True - ) - def before_cursor_execute( - conn, cursor, statement, parameters, context, executemany - ): - statement = ( - "INSERT INTO data (id, y, z) VALUES " - "(%(id)s, %(y)s, %(z)s)" - ) - return statement, parameters - - connection.execute( - ins, - [ - {"id": 1, "y": "y1", "z": 1}, - {"id": 2, "y": "y2", "z": 2}, - {"id": 3, "y": "y3", "z": 3}, - ], - ) - - eq_(mock_values.mock_calls, []) - - if connection.dialect.executemany_mode & EXECUTEMANY_BATCH: - eq_( - mock_batch.mock_calls, - [ - mock.call( - mock.ANY, - "INSERT INTO data (id, y, z) VALUES " - "(%(id)s, %(y)s, %(z)s)", - ( - {"id": 1, "y": "y1", "z": 1}, - {"id": 2, "y": "y2", "z": 2}, - {"id": 3, "y": "y3", "z": 3}, - ), - ) - ], - ) - else: - eq_(mock_batch.mock_calls, []) - - class ExecutemanyValuesPlusBatchInsertsTest( ExecuteManyMode, fixtures.TablesTest ): options = {"executemany_mode": "values_plus_batch"} +class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest): + options = {"executemany_mode": "values_only"} + + class ExecutemanyFlagOptionsTest(fixtures.TablesTest): __only_on__ = "postgresql+psycopg2" __backend__ = True def test_executemany_correct_flag_options(self): for opt, expected in [ - (None, EXECUTEMANY_PLAIN), - ("batch", EXECUTEMANY_BATCH), ("values_only", EXECUTEMANY_VALUES), ("values_plus_batch", EXECUTEMANY_VALUES_PLUS_BATCH), ]: diff --git a/test/dialect/postgresql/test_on_conflict.py b/test/dialect/postgresql/test_on_conflict.py index 8adc91a17..9c1aaf78e 100644 --- a/test/dialect/postgresql/test_on_conflict.py +++ b/test/dialect/postgresql/test_on_conflict.py @@ -206,7 +206,10 @@ class OnConflictTest(fixtures.TablesTest): [(1, "name1")], ) - def test_on_conflict_do_update_set_executemany(self, connection): + @testing.combinations(True, False, argnames="use_returning") + def test_on_conflict_do_update_set_executemany( + self, connection, use_returning + ): """test #6581""" users = self.tables.users @@ -221,7 +224,10 @@ class OnConflictTest(fixtures.TablesTest): index_elements=[users.c.id], set_={"id": i.excluded.id, "name": i.excluded.name + ".5"}, ) - connection.execute( + if use_returning: + i = i.returning(users.c.id, users.c.name) + + result = connection.execute( i, [ dict(id=1, name="name1"), @@ -230,6 +236,9 @@ class OnConflictTest(fixtures.TablesTest): ], ) + if use_returning: + eq_(result.all(), [(1, "name1.5"), (2, "name2.5"), (3, "name3")]) + eq_( connection.execute(users.select().order_by(users.c.id)).fetchall(), [(1, "name1.5"), (2, "name2.5"), (3, "name3")], diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 196b70340..2940a1e7f 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -3736,7 +3736,7 @@ class DialectEventTest(fixtures.TestBase): class SetInputSizesTest(fixtures.TablesTest): __backend__ = True - __requires__ = ("independent_connections",) + __requires__ = ("independent_connections", "insert_returning") @classmethod def define_tables(cls, metadata): @@ -3752,15 +3752,6 @@ class SetInputSizesTest(fixtures.TablesTest): canary = mock.Mock() def do_set_input_sizes(cursor, list_of_tuples, context): - if not engine.dialect.positional: - # sort by "user_id", "user_name", or otherwise - # param name for a non-positional dialect, so that we can - # confirm the ordering. mostly a py2 thing probably can't - # occur on py3.6+ since we are passing dictionaries with - # "user_id", "user_name" - list_of_tuples = sorted( - list_of_tuples, key=lambda elem: elem[0] - ) canary.do_set_input_sizes(cursor, list_of_tuples, context) def pre_exec(self): @@ -3786,15 +3777,21 @@ class SetInputSizesTest(fixtures.TablesTest): ): yield engine, canary - def test_set_input_sizes_no_event(self, input_sizes_fixture): + @testing.requires.insertmanyvalues + def test_set_input_sizes_insertmanyvalues_no_event( + self, input_sizes_fixture + ): engine, canary = input_sizes_fixture with engine.begin() as conn: conn.execute( - self.tables.users.insert(), + self.tables.users.insert().returning( + self.tables.users.c.user_id + ), [ {"user_id": 1, "user_name": "n1"}, {"user_id": 2, "user_name": "n2"}, + {"user_id": 3, "user_name": "n3"}, ], ) @@ -3805,6 +3802,58 @@ class SetInputSizesTest(fixtures.TablesTest): mock.ANY, [ ( + "user_id_0", + mock.ANY, + testing.eq_type_affinity(Integer), + ), + ( + "user_name_0", + mock.ANY, + testing.eq_type_affinity(String), + ), + ( + "user_id_1", + mock.ANY, + testing.eq_type_affinity(Integer), + ), + ( + "user_name_1", + mock.ANY, + testing.eq_type_affinity(String), + ), + ( + "user_id_2", + mock.ANY, + testing.eq_type_affinity(Integer), + ), + ( + "user_name_2", + mock.ANY, + testing.eq_type_affinity(String), + ), + ], + mock.ANY, + ) + ], + ) + + def test_set_input_sizes_no_event(self, input_sizes_fixture): + engine, canary = input_sizes_fixture + + with engine.begin() as conn: + conn.execute( + self.tables.users.update() + .where(self.tables.users.c.user_id == 15) + .values(user_id=15, user_name="n1"), + ) + + eq_( + canary.mock_calls, + [ + call.do_set_input_sizes( + mock.ANY, + [ + ( "user_id", mock.ANY, testing.eq_type_affinity(Integer), @@ -3814,6 +3863,11 @@ class SetInputSizesTest(fixtures.TablesTest): mock.ANY, testing.eq_type_affinity(String), ), + ( + "user_id_1", + mock.ANY, + testing.eq_type_affinity(Integer), + ), ], mock.ANY, ) @@ -3924,11 +3978,9 @@ class SetInputSizesTest(fixtures.TablesTest): with engine.begin() as conn: conn.execute( - self.tables.users.insert(), - [ - {"user_id": 1, "user_name": "n1"}, - {"user_id": 2, "user_name": "n2"}, - ], + self.tables.users.update() + .where(self.tables.users.c.user_id == 15) + .values(user_id=15, user_name="n1"), ) eq_( @@ -3947,6 +3999,11 @@ class SetInputSizesTest(fixtures.TablesTest): (SPECIAL_STRING, None, 0), testing.eq_type_affinity(String), ), + ( + "user_id_1", + mock.ANY, + testing.eq_type_affinity(Integer), + ), ], mock.ANY, ) diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index 38e1c436c..b1c487631 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -4,6 +4,7 @@ import re import sqlalchemy as tsa from sqlalchemy import bindparam from sqlalchemy import Column +from sqlalchemy import Integer from sqlalchemy import MetaData from sqlalchemy import or_ from sqlalchemy import select @@ -31,7 +32,9 @@ class LogParamsTest(fixtures.TestBase): __requires__ = ("ad_hoc_engines",) def setup_test(self): - self.eng = engines.testing_engine(options={"echo": True}) + self.eng = engines.testing_engine( + options={"echo": True, "insertmanyvalues_page_size": 150} + ) self.no_param_engine = engines.testing_engine( options={"echo": True, "hide_parameters": True} ) @@ -45,6 +48,7 @@ class LogParamsTest(fixtures.TestBase): log.addHandler(self.buf) def teardown_test(self): + self.eng = engines.testing_engine(options={"echo": True}) exec_sql(self.eng, "drop table if exists foo") for log in [logging.getLogger("sqlalchemy.engine")]: log.removeHandler(self.buf) @@ -176,6 +180,21 @@ class LogParamsTest(fixtures.TestBase): repr(params), ) + def test_repr_params_huge_named_dict(self): + # given non-multi-params in a list. repr params with + # per-element truncation, mostly does the exact same thing + params = {"key_%s" % i: i for i in range(800)} + eq_( + repr(sql_util._repr_params(params, batches=10, ismulti=False)), + # this assertion is very hardcoded to exactly how many characters + # are in a Python dict repr() for the given name/value scheme + # in the sample dictionary. If for some strange reason + # Python dictionary repr() changes in some way, then this would + # have to be adjusted + f"{repr(params)[0:679]} ... 700 parameters truncated ... " + f"{repr(params)[-799:]}", + ) + def test_repr_params_ismulti_named_dict(self): # given non-multi-params in a list. repr params with # per-element truncation, mostly does the exact same thing @@ -219,6 +238,104 @@ class LogParamsTest(fixtures.TestBase): "298, 299], 5]]", ) + def test_log_insertmanyvalues(self): + """test the full logging for insertmanyvalues added for #6047. + + to make it as clear as possible what's going on, the "insertmanyvalues" + execute is noted explicitly and includes total number of batches, + batch count. The long SQL string as well as the long parameter list + is now truncated in the middle, which is a new logging capability + as of this feature (we had only truncation of many separate parameter + sets and truncation of long individual parameter values, not + a long single tuple/dict of parameters.) + + """ + t = Table( + "t", + MetaData(), + Column("id", Integer, primary_key=True), + Column("data", String), + ) + + with self.eng.begin() as connection: + t.create(connection) + + connection.execute( + t.insert().returning(t.c.id), + [{"data": f"d{i}"} for i in range(327)], + ) + + full_insert = ( + "INSERT INTO t (data) VALUES (?), (?), " + "(?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), " + "(? ... 439 characters truncated ... ?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?) " + "RETURNING id" + ) + eq_(self.buf.buffer[3].message, full_insert) + + eq_regex( + self.buf.buffer[4].message, + r"\[generated in .* \(insertmanyvalues\)\] \('d0', 'd1', " + r"'d2', 'd3', 'd4', 'd5', 'd6', 'd7', " + r"'d8', 'd9', 'd10', 'd11', 'd12', 'd13', 'd14', 'd15', " + r"'d16', 'd17', 'd18', 'd19', 'd20', 'd21', 'd22', 'd23', " + r"'d24', 'd25', 'd26', 'd27', 'd28', 'd29', 'd30', " + r"'d31', 'd32', 'd33', 'd34', 'd35', 'd36', 'd37', 'd38', " + r"'d39', 'd40', 'd41', 'd42', 'd43', 'd44', 'd45', 'd46', " + r"'d47', 'd48', " + r"'d49' ... 50 parameters truncated ... " + r"'d100', 'd101', 'd102', 'd103', 'd104', 'd105', 'd106', " + r"'d107', 'd108', 'd109', 'd110', 'd111', 'd112', 'd113', " + r"'d114', 'd115', 'd116', 'd117', 'd118', 'd119', 'd120', " + r"'d121', 'd122', 'd123', 'd124', 'd125', 'd126', 'd127', " + r"'d128', 'd129', 'd130', 'd131', 'd132', 'd133', 'd134', " + r"'d135', 'd136', 'd137', 'd138', 'd139', 'd140', " + r"'d141', 'd142', " + r"'d143', 'd144', 'd145', 'd146', 'd147', 'd148', 'd149'\)", + ) + eq_(self.buf.buffer[5].message, full_insert) + eq_( + self.buf.buffer[6].message, + "[insertmanyvalues batch 2 of 3] ('d150', 'd151', 'd152', " + "'d153', 'd154', 'd155', 'd156', 'd157', 'd158', 'd159', " + "'d160', 'd161', 'd162', 'd163', 'd164', 'd165', 'd166', " + "'d167', 'd168', 'd169', 'd170', 'd171', 'd172', 'd173', " + "'d174', 'd175', 'd176', 'd177', 'd178', 'd179', 'd180', " + "'d181', 'd182', 'd183', 'd184', 'd185', 'd186', 'd187', " + "'d188', 'd189', 'd190', 'd191', 'd192', 'd193', 'd194', " + "'d195', 'd196', 'd197', 'd198', 'd199' " + "... 50 parameters truncated ... 'd250', 'd251', 'd252', " + "'d253', 'd254', 'd255', 'd256', 'd257', 'd258', 'd259', " + "'d260', 'd261', 'd262', 'd263', 'd264', 'd265', 'd266', " + "'d267', 'd268', 'd269', 'd270', 'd271', 'd272', 'd273', " + "'d274', 'd275', 'd276', 'd277', 'd278', 'd279', 'd280', " + "'d281', 'd282', 'd283', 'd284', 'd285', 'd286', 'd287', " + "'d288', 'd289', 'd290', 'd291', 'd292', 'd293', 'd294', " + "'d295', 'd296', 'd297', 'd298', 'd299')", + ) + eq_( + self.buf.buffer[7].message, + "INSERT INTO t (data) VALUES (?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), " + "(?), (?), (?), (?), (?), (?) RETURNING id", + ) + eq_( + self.buf.buffer[8].message, + "[insertmanyvalues batch 3 of 3] ('d300', 'd301', 'd302', " + "'d303', 'd304', 'd305', 'd306', 'd307', 'd308', 'd309', " + "'d310', 'd311', 'd312', 'd313', 'd314', 'd315', 'd316', " + "'d317', 'd318', 'd319', 'd320', 'd321', 'd322', 'd323', " + "'d324', 'd325', 'd326')", + ) + def test_log_large_parameter_single(self): import random diff --git a/test/orm/test_unitofwork.py b/test/orm/test_unitofwork.py index 2821d3a39..b94998716 100644 --- a/test/orm/test_unitofwork.py +++ b/test/orm/test_unitofwork.py @@ -3523,7 +3523,7 @@ class NoRowInsertedTest(fixtures.TestBase): ): if statement.startswith("INSERT"): if statement.endswith("RETURNING my_table.id"): - if executemany: + if executemany and isinstance(parameters, list): # remove some rows, so the count is wrong parameters = parameters[0:1] else: diff --git a/test/requirements.py b/test/requirements.py index 031e0eb4a..c6979d663 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -894,6 +894,11 @@ class DefaultRequirements(SuiteRequirements): return skip_if(["mariadb+mariadbconnector"]) + self.empty_inserts @property + def provisioned_upsert(self): + """backend includes upsert() in its provisioning.py""" + return only_on(["postgresql", "sqlite", "mariadb"]) + + @property def expressions_against_unbounded_text(self): """target database supports use of an unbounded textual field in a WHERE clause.""" diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py index 61e0783e4..23a850f08 100644 --- a/test/sql/test_insert.py +++ b/test/sql/test_insert.py @@ -469,6 +469,65 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): dialect=postgresql.dialect(), ) + def test_heterogeneous_multi_values(self): + """for #6047, originally I thought we'd take any insert().values() + and be able to convert it to a "many" style execution that we can + cache. + + however, this test shows that we cannot, at least not in the + general case, because SQL expressions are not guaranteed to be in + the same position each time, therefore each ``VALUES`` clause is not + of the same structure. + + """ + + m = MetaData() + + t1 = Table( + "t", + m, + Column("id", Integer, primary_key=True), + Column("x", Integer), + Column("y", Integer), + Column("z", Integer), + ) + + stmt = t1.insert().values( + [ + {"x": 1, "y": func.sum(1, 2), "z": 2}, + {"x": func.sum(1, 2), "y": 2, "z": 3}, + {"x": func.sum(1, 2), "y": 2, "z": func.foo(10)}, + ] + ) + + # SQL expressions in the params at arbitrary locations means + # we have to scan them at compile time, and the shape of the bound + # parameters is not predictable. so for #6047 where I originally + # thought all of values() could be rewritten, this makes it not + # really worth it. + self.assert_compile( + stmt, + "INSERT INTO t (x, y, z) VALUES " + "(%(x_m0)s, sum(%(sum_1)s, %(sum_2)s), %(z_m0)s), " + "(sum(%(sum_3)s, %(sum_4)s), %(y_m1)s, %(z_m1)s), " + "(sum(%(sum_5)s, %(sum_6)s), %(y_m2)s, foo(%(foo_1)s))", + checkparams={ + "x_m0": 1, + "sum_1": 1, + "sum_2": 2, + "z_m0": 2, + "sum_3": 1, + "sum_4": 2, + "y_m1": 2, + "z_m1": 3, + "sum_5": 1, + "sum_6": 2, + "y_m2": 2, + "foo_1": 10, + }, + dialect=postgresql.dialect(), + ) + def test_insert_seq_pk_multi_values_seq_not_supported(self): m = MetaData() diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py index b6945813e..4ce093156 100644 --- a/test/sql/test_insert_exec.py +++ b/test/sql/test_insert_exec.py @@ -1,4 +1,7 @@ +import itertools + from sqlalchemy import and_ +from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import ForeignKey from sqlalchemy import func @@ -10,8 +13,10 @@ from sqlalchemy import sql from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import VARCHAR +from sqlalchemy.engine import cursor as _cursor from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import mock @@ -712,3 +717,237 @@ class TableInsertTest(fixtures.TablesTest): table=t, parameters=dict(id=None, data="data", x=5), ) + + +class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest): + __backend__ = True + __requires__ = ("insertmanyvalues",) + + @classmethod + def define_tables(cls, metadata): + Table( + "data", + metadata, + Column("id", Integer, primary_key=True), + Column("x", String(50)), + Column("y", String(50)), + Column("z", Integer, server_default="5"), + ) + + Table( + "Unitéble2", + metadata, + Column("méil", Integer, primary_key=True), + Column("\u6e2c\u8a66", Integer), + ) + + def test_insert_unicode_keys(self, connection): + table = self.tables["Unitéble2"] + + stmt = table.insert().returning(table.c["méil"]) + + connection.execute( + stmt, + [ + {"méil": 1, "\u6e2c\u8a66": 1}, + {"méil": 2, "\u6e2c\u8a66": 2}, + {"méil": 3, "\u6e2c\u8a66": 3}, + ], + ) + + eq_(connection.execute(table.select()).all(), [(1, 1), (2, 2), (3, 3)]) + + def test_insert_returning_values(self, connection): + t = self.tables.data + + conn = connection + page_size = conn.dialect.insertmanyvalues_page_size or 100 + data = [ + {"x": "x%d" % i, "y": "y%d" % i} + for i in range(1, page_size * 2 + 27) + ] + result = conn.execute(t.insert().returning(t.c.x, t.c.y), data) + + eq_([tup[0] for tup in result.cursor.description], ["x", "y"]) + eq_(result.keys(), ["x", "y"]) + assert t.c.x in result.keys() + assert t.c.id not in result.keys() + assert not result._soft_closed + assert isinstance( + result.cursor_strategy, + _cursor.FullyBufferedCursorFetchStrategy, + ) + assert not result.closed + eq_(result.mappings().all(), data) + + assert result._soft_closed + # assert result.closed + assert result.cursor is None + + def test_insert_returning_preexecute_pk(self, metadata, connection): + counter = itertools.count(1) + + t = Table( + "t", + self.metadata, + Column( + "id", + Integer, + primary_key=True, + default=lambda: next(counter), + ), + Column("data", Integer), + ) + metadata.create_all(connection) + + result = connection.execute( + t.insert().return_defaults(), + [{"data": 1}, {"data": 2}, {"data": 3}], + ) + + eq_(result.inserted_primary_key_rows, [(1,), (2,), (3,)]) + + def test_insert_returning_defaults(self, connection): + t = self.tables.data + + conn = connection + + result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) + first_pk = result.inserted_primary_key[0] + + page_size = conn.dialect.insertmanyvalues_page_size or 100 + total_rows = page_size * 5 + 27 + data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)] + result = conn.execute(t.insert().returning(t.c.id, t.c.z), data) + + eq_( + result.all(), + [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)], + ) + + def test_insert_return_pks_default_values(self, connection): + """test sending multiple, empty rows into an INSERT and getting primary + key values back. + + This has to use a format that indicates at least one DEFAULT in + multiple parameter sets, i.e. "INSERT INTO table (anycol) VALUES + (DEFAULT) (DEFAULT) (DEFAULT) ... RETURNING col" + + if the database doesnt support this (like SQLite, mssql), it + actually runs the statement that many times on the cursor. + This is much less efficient, but is still more efficient than + how it worked previously where we'd run the statement that many + times anyway. + + There's ways to make it work for those, such as on SQLite + we can use "INSERT INTO table (pk_col) VALUES (NULL) RETURNING pk_col", + but that assumes an autoincrement pk_col, not clear how this + could be produced generically. + + """ + t = self.tables.data + + conn = connection + + result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) + first_pk = result.inserted_primary_key[0] + + page_size = conn.dialect.insertmanyvalues_page_size or 100 + total_rows = page_size * 2 + 27 + data = [{} for i in range(1, total_rows)] + result = conn.execute(t.insert().returning(t.c.id), data) + + eq_( + result.all(), + [(pk,) for pk in range(1 + first_pk, total_rows + first_pk)], + ) + + @testing.combinations(None, 100, 329, argnames="batchsize") + @testing.combinations( + "engine", + "conn_execution_option", + "exec_execution_option", + "stmt_execution_option", + argnames="paramtype", + ) + def test_page_size_adjustment(self, testing_engine, batchsize, paramtype): + + t = self.tables.data + + if paramtype == "engine" and batchsize is not None: + e = testing_engine( + options={ + "insertmanyvalues_page_size": batchsize, + }, + ) + + # sqlite, since this is a new engine, re-create the table + if not testing.requires.independent_connections.enabled: + t.create(e, checkfirst=True) + else: + e = testing.db + + totalnum = 1275 + data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)] + + insert_count = 0 + + with e.begin() as conn: + + @event.listens_for(conn, "before_cursor_execute") + def go(conn, cursor, statement, parameters, context, executemany): + nonlocal insert_count + if statement.startswith("INSERT"): + insert_count += 1 + + stmt = t.insert() + if batchsize is None or paramtype == "engine": + conn.execute(stmt.returning(t.c.id), data) + elif paramtype == "conn_execution_option": + conn = conn.execution_options( + insertmanyvalues_page_size=batchsize + ) + conn.execute(stmt.returning(t.c.id), data) + elif paramtype == "stmt_execution_option": + stmt = stmt.execution_options( + insertmanyvalues_page_size=batchsize + ) + conn.execute(stmt.returning(t.c.id), data) + elif paramtype == "exec_execution_option": + conn.execute( + stmt.returning(t.c.id), + data, + execution_options=dict( + insertmanyvalues_page_size=batchsize + ), + ) + else: + assert False + + assert_batchsize = batchsize or 1000 + eq_( + insert_count, + totalnum // assert_batchsize + + (1 if totalnum % assert_batchsize else 0), + ) + + def test_disabled(self, testing_engine): + + e = testing_engine( + options={"use_insertmanyvalues": False}, + share_pool=True, + transfer_staticpool=True, + ) + totalnum = 1275 + data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)] + + t = self.tables.data + + with e.begin() as conn: + stmt = t.insert() + with expect_raises_message( + exc.StatementError, + "with current server capabilities does not support " + "INSERT..RETURNING when executemany", + ): + conn.execute(stmt.returning(t.c.id), data) diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py index c458e3262..f8cc32517 100644 --- a/test/sql/test_returning.py +++ b/test/sql/test_returning.py @@ -19,8 +19,12 @@ from sqlalchemy.sql.sqltypes import NullType from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import AssertsExecutionResults +from sqlalchemy.testing import config from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures +from sqlalchemy.testing import mock +from sqlalchemy.testing import provision from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table from sqlalchemy.types import TypeDecorator @@ -71,10 +75,12 @@ class ReturnCombinationTests(fixtures.TestBase, AssertsCompiledSQL): stmt = stmt.returning(t.c.x) + stmt = stmt.return_defaults() assert_raises_message( - sa_exc.InvalidRequestError, - "RETURNING is already configured on this statement", - stmt.return_defaults, + sa_exc.CompileError, + r"Can't compile statement that includes returning\(\) " + r"and return_defaults\(\) simultaneously", + stmt.compile, ) def test_return_defaults_no_returning(self, table_fixture): @@ -224,7 +230,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): cls.GoofyType = GoofyType Table( - "tables", + "returning_tbl", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True @@ -236,7 +242,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): ) def test_column_targeting(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl result = connection.execute( table.insert().returning(table.c.id, table.c.full), {"persons": 1, "full": False}, @@ -260,7 +266,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): eq_(row["goofy"], "FOOsomegoofyBAR") def test_labeling(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl result = connection.execute( table.insert() .values(persons=6) @@ -270,7 +276,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): assert row["lala"] == 6 def test_anon_expressions(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl GoofyType = self.GoofyType result = connection.execute( table.insert() @@ -286,27 +292,75 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): row = result.first() eq_(row[0], 30) - @testing.fails_on( - "mssql", - "driver has unknown issue with string concatenation " - "in INSERT RETURNING", + @testing.combinations( + (lambda table: (table.c.strval + "hi",), ("str1hi",)), + ( + lambda table: ( + table.c.persons, + table.c.full, + table.c.strval + "hi", + ), + ( + 5, + False, + "str1hi", + ), + ), + ( + lambda table: ( + table.c.persons, + table.c.strval + "hi", + table.c.full, + ), + (5, "str1hi", False), + ), + ( + lambda table: ( + table.c.strval + "hi", + table.c.persons, + table.c.full, + ), + ("str1hi", 5, False), + ), + argnames="testcase, expected_row", ) - def test_insert_returning_w_expression_one(self, connection): - table = self.tables.tables + def test_insert_returning_w_expression( + self, connection, testcase, expected_row + ): + table = self.tables.returning_tbl + + exprs = testing.resolve_lambda(testcase, table=table) result = connection.execute( - table.insert().returning(table.c.strval + "hi"), + table.insert().returning(*exprs), {"persons": 5, "full": False, "strval": "str1"}, ) - eq_(result.fetchall(), [("str1hi",)]) + eq_(result.fetchall(), [expected_row]) result2 = connection.execute( select(table.c.id, table.c.strval).order_by(table.c.id) ) eq_(result2.fetchall(), [(1, "str1")]) + def test_insert_explicit_pk_col(self, connection): + table = self.tables.returning_tbl + result = connection.execute( + table.insert().returning(table.c.id, table.c.strval), + {"id": 1, "strval": "str1"}, + ) + + eq_( + result.fetchall(), + [ + ( + 1, + "str1", + ) + ], + ) + def test_insert_returning_w_type_coerce_expression(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl result = connection.execute( table.insert().returning(type_coerce(table.c.goofy, String)), {"persons": 5, "goofy": "somegoofy"}, @@ -320,7 +374,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): eq_(result2.fetchall(), [(1, "FOOsomegoofyBAR")]) def test_no_ipk_on_returning(self, connection, close_result_when_finished): - table = self.tables.tables + table = self.tables.returning_tbl result = connection.execute( table.insert().returning(table.c.id), {"persons": 1, "full": False} ) @@ -334,7 +388,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): ) def test_insert_returning(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl result = connection.execute( table.insert().returning(table.c.id), {"persons": 1, "full": False} ) @@ -342,8 +396,8 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): eq_(result.fetchall(), [(1,)]) @testing.requires.multivalues_inserts - def test_multirow_returning(self, connection): - table = self.tables.tables + def test_multivalues_insert_returning(self, connection): + table = self.tables.returning_tbl ins = ( table.insert() .returning(table.c.id, table.c.persons) @@ -372,7 +426,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults): literal_true = "1" result4 = connection.exec_driver_sql( - "insert into tables (id, persons, %sfull%s) " + "insert into returning_tbl (id, persons, %sfull%s) " "values (5, 10, %s) returning persons" % (quote, quote, literal_true) ) @@ -388,7 +442,7 @@ class UpdateReturningTest(fixtures.TablesTest, AssertsExecutionResults): define_tables = InsertReturningTest.define_tables def test_update_returning(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl connection.execute( table.insert(), [{"persons": 5, "full": False}, {"persons": 3, "full": False}], @@ -408,7 +462,7 @@ class UpdateReturningTest(fixtures.TablesTest, AssertsExecutionResults): eq_(result2.fetchall(), [(1, True), (2, False)]) def test_update_returning_w_expression_one(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl connection.execute( table.insert(), [ @@ -431,7 +485,7 @@ class UpdateReturningTest(fixtures.TablesTest, AssertsExecutionResults): eq_(result2.fetchall(), [(1, "str1"), (2, "str2")]) def test_update_returning_w_type_coerce_expression(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl connection.execute( table.insert(), [ @@ -457,7 +511,7 @@ class UpdateReturningTest(fixtures.TablesTest, AssertsExecutionResults): ) def test_update_full_returning(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl connection.execute( table.insert(), [{"persons": 5, "full": False}, {"persons": 3, "full": False}], @@ -481,7 +535,7 @@ class DeleteReturningTest(fixtures.TablesTest, AssertsExecutionResults): define_tables = InsertReturningTest.define_tables def test_delete_returning(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl connection.execute( table.insert(), [{"persons": 5, "full": False}, {"persons": 3, "full": False}], @@ -536,7 +590,7 @@ class SequenceReturningTest(fixtures.TablesTest): def define_tables(cls, metadata): seq = Sequence("tid_seq") Table( - "tables", + "returning_tbl", metadata, Column( "id", @@ -549,7 +603,7 @@ class SequenceReturningTest(fixtures.TablesTest): cls.sequences.tid_seq = seq def test_insert(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl r = connection.execute( table.insert().values(data="hi").returning(table.c.id) ) @@ -570,7 +624,7 @@ class KeyReturningTest(fixtures.TablesTest, AssertsExecutionResults): @classmethod def define_tables(cls, metadata): Table( - "tables", + "returning_tbl", metadata, Column( "id", @@ -584,7 +638,7 @@ class KeyReturningTest(fixtures.TablesTest, AssertsExecutionResults): @testing.exclude("postgresql", "<", (8, 2), "8.2+ feature") def test_insert(self, connection): - table = self.tables.tables + table = self.tables.returning_tbl result = connection.execute( table.insert().returning(table.c.foo_id), dict(data="somedata") ) @@ -886,18 +940,359 @@ class InsertManyReturnDefaultsTest(fixtures.TablesTest): ], ) + if connection.dialect.insert_null_pk_still_autoincrements: + eq_( + [row._mapping for row in result.returned_defaults_rows], + [ + {"id": 10, "insdef": 0, "upddef": None}, + {"id": 11, "insdef": 0, "upddef": None}, + {"id": 12, "insdef": 0, "upddef": None}, + {"id": 13, "insdef": 0, "upddef": None}, + {"id": 14, "insdef": 0, "upddef": None}, + {"id": 15, "insdef": 0, "upddef": None}, + ], + ) + else: + eq_( + [row._mapping for row in result.returned_defaults_rows], + [ + {"insdef": 0, "upddef": None}, + {"insdef": 0, "upddef": None}, + {"insdef": 0, "upddef": None}, + {"insdef": 0, "upddef": None}, + {"insdef": 0, "upddef": None}, + {"insdef": 0, "upddef": None}, + ], + ) eq_( - [row._mapping for row in result.returned_defaults_rows], + result.inserted_primary_key_rows, + [(10,), (11,), (12,), (13,), (14,), (15,)], + ) + + +class InsertManyReturningTest(fixtures.TablesTest): + __requires__ = ("insert_executemany_returning",) + run_define_tables = "each" + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + from sqlalchemy.sql import ColumnElement + from sqlalchemy.ext.compiler import compiles + + counter = itertools.count() + + class IncDefault(ColumnElement): + pass + + @compiles(IncDefault) + def compile_(element, compiler, **kw): + return str(next(counter)) + + Table( + "default_cases", + metadata, + Column( + "id", Integer, primary_key=True, test_needs_autoincrement=True + ), + Column("data", String(50)), + Column("insdef", Integer, default=IncDefault()), + Column("upddef", Integer, onupdate=IncDefault()), + ) + + class GoofyType(TypeDecorator): + impl = String + cache_ok = True + + def process_bind_param(self, value, dialect): + if value is None: + return None + return "FOO" + value + + def process_result_value(self, value, dialect): + if value is None: + return None + return value + "BAR" + + cls.GoofyType = GoofyType + + Table( + "type_cases", + metadata, + Column( + "id", Integer, primary_key=True, test_needs_autoincrement=True + ), + Column("persons", Integer), + Column("full", Boolean), + Column("goofy", GoofyType(50)), + Column("strval", String(50)), + ) + + @testing.combinations( + ( + lambda table: (table.c.strval + "hi",), + [("str1hi",), ("str2hi",), ("str3hi",)], + ), + ( + lambda table: ( + table.c.persons, + table.c.full, + table.c.strval + "hi", + ), + [ + (5, False, "str1hi"), + (6, True, "str2hi"), + (7, False, "str3hi"), + ], + ), + ( + lambda table: ( + table.c.persons, + table.c.strval + "hi", + table.c.full, + ), [ - {"insdef": 0, "upddef": None}, - {"insdef": 0, "upddef": None}, - {"insdef": 0, "upddef": None}, - {"insdef": 0, "upddef": None}, - {"insdef": 0, "upddef": None}, - {"insdef": 0, "upddef": None}, + (5, "str1hi", False), + (6, "str2hi", True), + (7, "str3hi", False), + ], + ), + ( + lambda table: ( + table.c.strval + "hi", + table.c.persons, + table.c.full, + ), + [ + ("str1hi", 5, False), + ("str2hi", 6, True), + ("str3hi", 7, False), + ], + ), + argnames="testcase, expected_rows", + ) + def test_insert_returning_w_expression( + self, connection, testcase, expected_rows + ): + table = self.tables.type_cases + + exprs = testing.resolve_lambda(testcase, table=table) + result = connection.execute( + table.insert().returning(*exprs), + [ + {"persons": 5, "full": False, "strval": "str1"}, + {"persons": 6, "full": True, "strval": "str2"}, + {"persons": 7, "full": False, "strval": "str3"}, + ], + ) + + eq_(result.fetchall(), expected_rows) + + result2 = connection.execute( + select(table.c.id, table.c.strval).order_by(table.c.id) + ) + eq_(result2.fetchall(), [(1, "str1"), (2, "str2"), (3, "str3")]) + + @testing.fails_if( + # Oracle has native executemany() + returning and does not use + # insertmanyvalues to achieve this. so test that for + # that particular dialect, the exception expected is not raised + # in the case that the compiler vetoed insertmanyvalues ( + # since Oracle's compiler will always veto it) + lambda config: not config.db.dialect.use_insertmanyvalues + ) + def test_iie_supported_but_not_this_statement(self, connection): + """test the case where INSERT..RETURNING w/ executemany is used, + the dialect requires use_insertmanyreturning, but + the compiler vetoed the use of insertmanyvalues.""" + + t1 = self.tables.type_cases + + with mock.patch.object( + testing.db.dialect.statement_compiler, + "_insert_stmt_should_use_insertmanyvalues", + lambda *arg: False, + ): + with expect_raises_message( + sa_exc.StatementError, + r'Statement does not have "insertmanyvalues" enabled, ' + r"can\'t use INSERT..RETURNING with executemany in this case.", + ): + connection.execute( + t1.insert().returning(t1.c.id, t1.c.goofy, t1.c.full), + [ + {"persons": 5, "full": True}, + {"persons": 6, "full": True}, + {"persons": 7, "full": False}, + ], + ) + + def test_insert_executemany_type_test(self, connection): + t1 = self.tables.type_cases + result = connection.execute( + t1.insert().returning(t1.c.id, t1.c.goofy, t1.c.full), + [ + {"persons": 5, "full": True, "goofy": "row1", "strval": "s1"}, + {"persons": 6, "full": True, "goofy": "row2", "strval": "s2"}, + {"persons": 7, "full": False, "goofy": "row3", "strval": "s3"}, + {"persons": 8, "full": True, "goofy": "row4", "strval": "s4"}, ], ) eq_( - result.inserted_primary_key_rows, - [(10,), (11,), (12,), (13,), (14,), (15,)], + result.mappings().all(), + [ + {"id": 1, "goofy": "FOOrow1BAR", "full": True}, + {"id": 2, "goofy": "FOOrow2BAR", "full": True}, + {"id": 3, "goofy": "FOOrow3BAR", "full": False}, + {"id": 4, "goofy": "FOOrow4BAR", "full": True}, + ], ) + + def test_insert_executemany_default_generators(self, connection): + t1 = self.tables.default_cases + result = connection.execute( + t1.insert().returning(t1.c.id, t1.c.insdef, t1.c.upddef), + [ + {"data": "d1"}, + {"data": "d2"}, + {"data": "d3"}, + {"data": "d4"}, + {"data": "d5"}, + {"data": "d6"}, + ], + ) + + eq_( + result.mappings().all(), + [ + {"id": 1, "insdef": 0, "upddef": None}, + {"id": 2, "insdef": 0, "upddef": None}, + {"id": 3, "insdef": 0, "upddef": None}, + {"id": 4, "insdef": 0, "upddef": None}, + {"id": 5, "insdef": 0, "upddef": None}, + {"id": 6, "insdef": 0, "upddef": None}, + ], + ) + + @testing.combinations(True, False, argnames="update_cols") + @testing.requires.provisioned_upsert + def test_upsert_data_w_defaults(self, connection, update_cols): + t1 = self.tables.default_cases + + new_rows = connection.execute( + t1.insert().returning(t1.c.id, t1.c.insdef, t1.c.data), + [ + {"data": "d1"}, + {"data": "d2"}, + {"data": "d3"}, + {"data": "d4"}, + {"data": "d5"}, + {"data": "d6"}, + ], + ).all() + + eq_( + new_rows, + [ + (1, 0, "d1"), + (2, 0, "d2"), + (3, 0, "d3"), + (4, 0, "d4"), + (5, 0, "d5"), + (6, 0, "d6"), + ], + ) + + stmt = provision.upsert( + config, + t1, + (t1.c.id, t1.c.insdef, t1.c.data), + (lambda excluded: {"data": excluded.data + " excluded"}) + if update_cols + else None, + ) + + upserted_rows = connection.execute( + stmt, + [ + {"id": 1, "data": "d1 upserted"}, + {"id": 4, "data": "d4 upserted"}, + {"id": 5, "data": "d5 upserted"}, + {"id": 7, "data": "d7 upserted"}, + {"id": 8, "data": "d8 upserted"}, + {"id": 9, "data": "d9 upserted"}, + ], + ).all() + + if update_cols: + eq_( + upserted_rows, + [ + (1, 0, "d1 upserted excluded"), + (4, 0, "d4 upserted excluded"), + (5, 0, "d5 upserted excluded"), + (7, 1, "d7 upserted"), + (8, 1, "d8 upserted"), + (9, 1, "d9 upserted"), + ], + ) + else: + if testing.against("sqlite", "postgresql"): + eq_( + upserted_rows, + [ + (7, 1, "d7 upserted"), + (8, 1, "d8 upserted"), + (9, 1, "d9 upserted"), + ], + ) + elif testing.against("mariadb"): + # mariadb does not seem to have an "empty" upsert, + # so the provision.upsert() sets table.c.id to itself. + # this means we get all the rows back + eq_( + upserted_rows, + [ + (1, 0, "d1"), + (4, 0, "d4"), + (5, 0, "d5"), + (7, 1, "d7 upserted"), + (8, 1, "d8 upserted"), + (9, 1, "d9 upserted"), + ], + ) + + resulting_data = connection.execute( + t1.select().order_by(t1.c.id) + ).all() + + if update_cols: + eq_( + resulting_data, + [ + (1, "d1 upserted excluded", 0, None), + (2, "d2", 0, None), + (3, "d3", 0, None), + (4, "d4 upserted excluded", 0, None), + (5, "d5 upserted excluded", 0, None), + (6, "d6", 0, None), + (7, "d7 upserted", 1, None), + (8, "d8 upserted", 1, None), + (9, "d9 upserted", 1, None), + ], + ) + else: + eq_( + resulting_data, + [ + (1, "d1", 0, None), + (2, "d2", 0, None), + (3, "d3", 0, None), + (4, "d4", 0, None), + (5, "d5", 0, None), + (6, "d6", 0, None), + (7, "d7 upserted", 1, None), + (8, "d8 upserted", 1, None), + (9, "d9 upserted", 1, None), + ], + ) |
