summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-07-18 15:08:37 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-09-24 11:15:32 -0400
commit2bcc97da424eef7db9a5d02f81d02344925415ee (patch)
tree13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /test
parent332188e5680574368001ded52eb0a9d259ecdef5 (diff)
downloadsqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends when RETURNING is used, except for Oracle that doesn't need it, and on psycopg2 and mssql+pyodbc it is used for all INSERT statements, not just those that use RETURNING. third party dialects would need to opt in to the new feature by setting use_insertmanyvalues to True. Also adds dialect-level guards against using returning with executemany where we dont have an implementation to suit it. execute single w/ returning still defers to the server without us checking. Fixes: #6047 Fixes: #7907 Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'test')
-rw-r--r--test/dialect/mssql/test_engine.py4
-rw-r--r--test/dialect/postgresql/test_dialect.py440
-rw-r--r--test/dialect/postgresql/test_on_conflict.py13
-rw-r--r--test/engine/test_execute.py91
-rw-r--r--test/engine/test_logging.py119
-rw-r--r--test/orm/test_unitofwork.py2
-rw-r--r--test/requirements.py5
-rw-r--r--test/sql/test_insert.py59
-rw-r--r--test/sql/test_insert_exec.py239
-rw-r--r--test/sql/test_returning.py473
10 files changed, 959 insertions, 486 deletions
diff --git a/test/dialect/mssql/test_engine.py b/test/dialect/mssql/test_engine.py
index 3fb078ef4..d19e591b4 100644
--- a/test/dialect/mssql/test_engine.py
+++ b/test/dialect/mssql/test_engine.py
@@ -402,7 +402,9 @@ class FastExecutemanyTest(fixtures.TestBase):
)
t.create(testing.db)
- eng = engines.testing_engine(options={"fast_executemany": True})
+ eng = engines.testing_engine(
+ options={"fast_executemany": True, "use_insertmanyvalues": False}
+ )
@event.listens_for(eng, "after_cursor_execute")
def after_cursor_execute(
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index fdb114d57..27d4a4cf9 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -1,7 +1,6 @@
# coding: utf-8
import dataclasses
import datetime
-import itertools
import logging
import logging.handlers
@@ -18,7 +17,6 @@ from sqlalchemy import extract
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import literal
-from sqlalchemy import literal_column
from sqlalchemy import MetaData
from sqlalchemy import Numeric
from sqlalchemy import schema
@@ -32,15 +30,14 @@ from sqlalchemy import text
from sqlalchemy import TypeDecorator
from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import HSTORE
-from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.postgresql import psycopg as psycopg_dialect
from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect
from sqlalchemy.dialects.postgresql import Range
-from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH
-from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_PLAIN
from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES
-from sqlalchemy.engine import cursor as _cursor
+from sqlalchemy.dialects.postgresql.psycopg2 import (
+ EXECUTEMANY_VALUES_PLUS_BATCH,
+)
from sqlalchemy.engine import url
from sqlalchemy.sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from sqlalchemy.testing import config
@@ -60,11 +57,6 @@ from sqlalchemy.testing.assertions import eq_regex
from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.assertions import ne_
-if True:
- from sqlalchemy.dialects.postgresql.psycopg2 import (
- EXECUTEMANY_VALUES_PLUS_BATCH,
- )
-
class DialectTest(fixtures.TestBase):
"""python-side dialect tests."""
@@ -451,179 +443,6 @@ class ExecuteManyMode:
Column("\u6e2c\u8a66", Integer),
)
- @testing.combinations(
- "insert", "pg_insert", "pg_insert_on_conflict", argnames="insert_type"
- )
- def test_insert(self, connection, insert_type):
- from psycopg2 import extras
-
- values_page_size = connection.dialect.executemany_values_page_size
- batch_page_size = connection.dialect.executemany_batch_page_size
- if connection.dialect.executemany_mode & EXECUTEMANY_VALUES:
- meth = extras.execute_values
- stmt = "INSERT INTO data (x, y) VALUES %s"
- expected_kwargs = {
- "template": "(%(x)s, %(y)s)",
- "page_size": values_page_size,
- "fetch": False,
- }
- elif connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
- meth = extras.execute_batch
- stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
- expected_kwargs = {"page_size": batch_page_size}
- else:
- assert False
-
- if insert_type == "pg_insert_on_conflict":
- stmt += " ON CONFLICT DO NOTHING"
-
- with mock.patch.object(
- extras, meth.__name__, side_effect=meth
- ) as mock_exec:
- if insert_type == "insert":
- ins_stmt = self.tables.data.insert()
- elif insert_type == "pg_insert":
- ins_stmt = pg_insert(self.tables.data)
- elif insert_type == "pg_insert_on_conflict":
- ins_stmt = pg_insert(self.tables.data).on_conflict_do_nothing()
- else:
- assert False
-
- connection.execute(
- ins_stmt,
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- )
-
- eq_(
- connection.execute(select(self.tables.data)).fetchall(),
- [
- (1, "x1", "y1", 5),
- (2, "x2", "y2", 5),
- (3, "x3", "y3", 5),
- ],
- )
- eq_(
- mock_exec.mock_calls,
- [
- mock.call(
- mock.ANY,
- stmt,
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- **expected_kwargs,
- )
- ],
- )
-
- def test_insert_no_page_size(self, connection):
- from psycopg2 import extras
-
- values_page_size = connection.dialect.executemany_values_page_size
- batch_page_size = connection.dialect.executemany_batch_page_size
-
- if connection.dialect.executemany_mode & EXECUTEMANY_VALUES:
- meth = extras.execute_values
- stmt = "INSERT INTO data (x, y) VALUES %s"
- expected_kwargs = {
- "template": "(%(x)s, %(y)s)",
- "page_size": values_page_size,
- "fetch": False,
- }
- elif connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
- meth = extras.execute_batch
- stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
- expected_kwargs = {"page_size": batch_page_size}
- else:
- assert False
-
- with mock.patch.object(
- extras, meth.__name__, side_effect=meth
- ) as mock_exec:
- connection.execute(
- self.tables.data.insert(),
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- )
-
- eq_(
- mock_exec.mock_calls,
- [
- mock.call(
- mock.ANY,
- stmt,
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- **expected_kwargs,
- )
- ],
- )
-
- def test_insert_page_size(self):
- from psycopg2 import extras
-
- opts = self.options.copy()
- opts["executemany_batch_page_size"] = 500
- opts["executemany_values_page_size"] = 1000
-
- eng = engines.testing_engine(options=opts)
-
- if eng.dialect.executemany_mode & EXECUTEMANY_VALUES:
- meth = extras.execute_values
- stmt = "INSERT INTO data (x, y) VALUES %s"
- expected_kwargs = {
- "fetch": False,
- "page_size": 1000,
- "template": "(%(x)s, %(y)s)",
- }
- elif eng.dialect.executemany_mode & EXECUTEMANY_BATCH:
- meth = extras.execute_batch
- stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
- expected_kwargs = {"page_size": 500}
- else:
- assert False
-
- with mock.patch.object(
- extras, meth.__name__, side_effect=meth
- ) as mock_exec:
- with eng.begin() as conn:
- conn.execute(
- self.tables.data.insert(),
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- )
-
- eq_(
- mock_exec.mock_calls,
- [
- mock.call(
- mock.ANY,
- stmt,
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- **expected_kwargs,
- )
- ],
- )
-
def test_insert_unicode_keys(self, connection):
table = self.tables["Unitéble2"]
@@ -661,7 +480,10 @@ class ExecuteManyMode:
],
)
- if connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ if (
+ connection.dialect.executemany_mode
+ is EXECUTEMANY_VALUES_PLUS_BATCH
+ ):
eq_(
mock_exec.mock_calls,
[
@@ -680,7 +502,10 @@ class ExecuteManyMode:
eq_(mock_exec.mock_calls, [])
def test_not_sane_rowcount(self, connection):
- if connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ if (
+ connection.dialect.executemany_mode
+ is EXECUTEMANY_VALUES_PLUS_BATCH
+ ):
assert not connection.dialect.supports_sane_multi_rowcount
else:
assert connection.dialect.supports_sane_multi_rowcount
@@ -709,257 +534,22 @@ class ExecuteManyMode:
)
-class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
- options = {"executemany_mode": "batch"}
-
-
-class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
- options = {"executemany_mode": "values_only"}
-
- def test_insert_returning_values(self, connection):
- """the psycopg2 dialect needs to assemble a fully buffered result
- with the return value of execute_values().
-
- """
- t = self.tables.data
-
- conn = connection
- page_size = conn.dialect.executemany_values_page_size or 100
- data = [
- {"x": "x%d" % i, "y": "y%d" % i}
- for i in range(1, page_size * 5 + 27)
- ]
- result = conn.execute(t.insert().returning(t.c.x, t.c.y), data)
-
- eq_([tup[0] for tup in result.cursor.description], ["x", "y"])
- eq_(result.keys(), ["x", "y"])
- assert t.c.x in result.keys()
- assert t.c.id not in result.keys()
- assert not result._soft_closed
- assert isinstance(
- result.cursor_strategy,
- _cursor.FullyBufferedCursorFetchStrategy,
- )
- assert not result.cursor.closed
- assert not result.closed
- eq_(result.mappings().all(), data)
-
- assert result._soft_closed
- # assert result.closed
- assert result.cursor is None
-
- def test_insert_returning_preexecute_pk(self, metadata, connection):
- counter = itertools.count(1)
-
- t = Table(
- "t",
- self.metadata,
- Column(
- "id",
- Integer,
- primary_key=True,
- default=lambda: next(counter),
- ),
- Column("data", Integer),
- )
- metadata.create_all(connection)
-
- result = connection.execute(
- t.insert().return_defaults(),
- [{"data": 1}, {"data": 2}, {"data": 3}],
- )
-
- eq_(result.inserted_primary_key_rows, [(1,), (2,), (3,)])
-
- def test_insert_returning_defaults(self, connection):
- t = self.tables.data
-
- conn = connection
-
- result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
- first_pk = result.inserted_primary_key[0]
-
- page_size = conn.dialect.executemany_values_page_size or 100
- total_rows = page_size * 5 + 27
- data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)]
- result = conn.execute(t.insert().returning(t.c.id, t.c.z), data)
-
- eq_(
- result.all(),
- [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)],
- )
-
- def test_insert_return_pks_default_values(self, connection):
- """test sending multiple, empty rows into an INSERT and getting primary
- key values back.
-
- This has to use a format that indicates at least one DEFAULT in
- multiple parameter sets, i.e. "INSERT INTO table (anycol) VALUES
- (DEFAULT) (DEFAULT) (DEFAULT) ... RETURNING col"
-
- """
- t = self.tables.data
-
- conn = connection
-
- result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
- first_pk = result.inserted_primary_key[0]
-
- page_size = conn.dialect.executemany_values_page_size or 100
- total_rows = page_size * 5 + 27
- data = [{} for i in range(1, total_rows)]
- result = conn.execute(t.insert().returning(t.c.id), data)
-
- eq_(
- result.all(),
- [(pk,) for pk in range(1 + first_pk, total_rows + first_pk)],
- )
-
- def test_insert_w_newlines(self, connection):
- from psycopg2 import extras
-
- t = self.tables.data
-
- ins = (
- t.insert()
- .inline()
- .values(
- id=bindparam("id"),
- x=select(literal_column("5"))
- .select_from(self.tables.data)
- .scalar_subquery(),
- y=bindparam("y"),
- z=bindparam("z"),
- )
- )
- # compiled SQL has a newline in it
- eq_(
- str(ins.compile(testing.db)),
- "INSERT INTO data (id, x, y, z) VALUES (%(id)s, "
- "(SELECT 5 \nFROM data), %(y)s, %(z)s)",
- )
- meth = extras.execute_values
- with mock.patch.object(
- extras, "execute_values", side_effect=meth
- ) as mock_exec:
-
- connection.execute(
- ins,
- [
- {"id": 1, "y": "y1", "z": 1},
- {"id": 2, "y": "y2", "z": 2},
- {"id": 3, "y": "y3", "z": 3},
- ],
- )
-
- eq_(
- mock_exec.mock_calls,
- [
- mock.call(
- mock.ANY,
- "INSERT INTO data (id, x, y, z) VALUES %s",
- [
- {"id": 1, "y": "y1", "z": 1},
- {"id": 2, "y": "y2", "z": 2},
- {"id": 3, "y": "y3", "z": 3},
- ],
- template="(%(id)s, (SELECT 5 \nFROM data), %(y)s, %(z)s)",
- fetch=False,
- page_size=connection.dialect.executemany_values_page_size,
- )
- ],
- )
-
- def test_insert_modified_by_event(self, connection):
- from psycopg2 import extras
-
- t = self.tables.data
-
- ins = (
- t.insert()
- .inline()
- .values(
- id=bindparam("id"),
- x=select(literal_column("5"))
- .select_from(self.tables.data)
- .scalar_subquery(),
- y=bindparam("y"),
- z=bindparam("z"),
- )
- )
- # compiled SQL has a newline in it
- eq_(
- str(ins.compile(testing.db)),
- "INSERT INTO data (id, x, y, z) VALUES (%(id)s, "
- "(SELECT 5 \nFROM data), %(y)s, %(z)s)",
- )
- meth = extras.execute_batch
- with mock.patch.object(
- extras, "execute_values"
- ) as mock_values, mock.patch.object(
- extras, "execute_batch", side_effect=meth
- ) as mock_batch:
-
- # create an event hook that will change the statement to
- # something else, meaning the dialect has to detect that
- # insert_single_values_expr is no longer useful
- @event.listens_for(
- connection, "before_cursor_execute", retval=True
- )
- def before_cursor_execute(
- conn, cursor, statement, parameters, context, executemany
- ):
- statement = (
- "INSERT INTO data (id, y, z) VALUES "
- "(%(id)s, %(y)s, %(z)s)"
- )
- return statement, parameters
-
- connection.execute(
- ins,
- [
- {"id": 1, "y": "y1", "z": 1},
- {"id": 2, "y": "y2", "z": 2},
- {"id": 3, "y": "y3", "z": 3},
- ],
- )
-
- eq_(mock_values.mock_calls, [])
-
- if connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
- eq_(
- mock_batch.mock_calls,
- [
- mock.call(
- mock.ANY,
- "INSERT INTO data (id, y, z) VALUES "
- "(%(id)s, %(y)s, %(z)s)",
- (
- {"id": 1, "y": "y1", "z": 1},
- {"id": 2, "y": "y2", "z": 2},
- {"id": 3, "y": "y3", "z": 3},
- ),
- )
- ],
- )
- else:
- eq_(mock_batch.mock_calls, [])
-
-
class ExecutemanyValuesPlusBatchInsertsTest(
ExecuteManyMode, fixtures.TablesTest
):
options = {"executemany_mode": "values_plus_batch"}
+class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
+ options = {"executemany_mode": "values_only"}
+
+
class ExecutemanyFlagOptionsTest(fixtures.TablesTest):
__only_on__ = "postgresql+psycopg2"
__backend__ = True
def test_executemany_correct_flag_options(self):
for opt, expected in [
- (None, EXECUTEMANY_PLAIN),
- ("batch", EXECUTEMANY_BATCH),
("values_only", EXECUTEMANY_VALUES),
("values_plus_batch", EXECUTEMANY_VALUES_PLUS_BATCH),
]:
diff --git a/test/dialect/postgresql/test_on_conflict.py b/test/dialect/postgresql/test_on_conflict.py
index 8adc91a17..9c1aaf78e 100644
--- a/test/dialect/postgresql/test_on_conflict.py
+++ b/test/dialect/postgresql/test_on_conflict.py
@@ -206,7 +206,10 @@ class OnConflictTest(fixtures.TablesTest):
[(1, "name1")],
)
- def test_on_conflict_do_update_set_executemany(self, connection):
+ @testing.combinations(True, False, argnames="use_returning")
+ def test_on_conflict_do_update_set_executemany(
+ self, connection, use_returning
+ ):
"""test #6581"""
users = self.tables.users
@@ -221,7 +224,10 @@ class OnConflictTest(fixtures.TablesTest):
index_elements=[users.c.id],
set_={"id": i.excluded.id, "name": i.excluded.name + ".5"},
)
- connection.execute(
+ if use_returning:
+ i = i.returning(users.c.id, users.c.name)
+
+ result = connection.execute(
i,
[
dict(id=1, name="name1"),
@@ -230,6 +236,9 @@ class OnConflictTest(fixtures.TablesTest):
],
)
+ if use_returning:
+ eq_(result.all(), [(1, "name1.5"), (2, "name2.5"), (3, "name3")])
+
eq_(
connection.execute(users.select().order_by(users.c.id)).fetchall(),
[(1, "name1.5"), (2, "name2.5"), (3, "name3")],
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 196b70340..2940a1e7f 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -3736,7 +3736,7 @@ class DialectEventTest(fixtures.TestBase):
class SetInputSizesTest(fixtures.TablesTest):
__backend__ = True
- __requires__ = ("independent_connections",)
+ __requires__ = ("independent_connections", "insert_returning")
@classmethod
def define_tables(cls, metadata):
@@ -3752,15 +3752,6 @@ class SetInputSizesTest(fixtures.TablesTest):
canary = mock.Mock()
def do_set_input_sizes(cursor, list_of_tuples, context):
- if not engine.dialect.positional:
- # sort by "user_id", "user_name", or otherwise
- # param name for a non-positional dialect, so that we can
- # confirm the ordering. mostly a py2 thing probably can't
- # occur on py3.6+ since we are passing dictionaries with
- # "user_id", "user_name"
- list_of_tuples = sorted(
- list_of_tuples, key=lambda elem: elem[0]
- )
canary.do_set_input_sizes(cursor, list_of_tuples, context)
def pre_exec(self):
@@ -3786,15 +3777,21 @@ class SetInputSizesTest(fixtures.TablesTest):
):
yield engine, canary
- def test_set_input_sizes_no_event(self, input_sizes_fixture):
+ @testing.requires.insertmanyvalues
+ def test_set_input_sizes_insertmanyvalues_no_event(
+ self, input_sizes_fixture
+ ):
engine, canary = input_sizes_fixture
with engine.begin() as conn:
conn.execute(
- self.tables.users.insert(),
+ self.tables.users.insert().returning(
+ self.tables.users.c.user_id
+ ),
[
{"user_id": 1, "user_name": "n1"},
{"user_id": 2, "user_name": "n2"},
+ {"user_id": 3, "user_name": "n3"},
],
)
@@ -3805,6 +3802,58 @@ class SetInputSizesTest(fixtures.TablesTest):
mock.ANY,
[
(
+ "user_id_0",
+ mock.ANY,
+ testing.eq_type_affinity(Integer),
+ ),
+ (
+ "user_name_0",
+ mock.ANY,
+ testing.eq_type_affinity(String),
+ ),
+ (
+ "user_id_1",
+ mock.ANY,
+ testing.eq_type_affinity(Integer),
+ ),
+ (
+ "user_name_1",
+ mock.ANY,
+ testing.eq_type_affinity(String),
+ ),
+ (
+ "user_id_2",
+ mock.ANY,
+ testing.eq_type_affinity(Integer),
+ ),
+ (
+ "user_name_2",
+ mock.ANY,
+ testing.eq_type_affinity(String),
+ ),
+ ],
+ mock.ANY,
+ )
+ ],
+ )
+
+ def test_set_input_sizes_no_event(self, input_sizes_fixture):
+ engine, canary = input_sizes_fixture
+
+ with engine.begin() as conn:
+ conn.execute(
+ self.tables.users.update()
+ .where(self.tables.users.c.user_id == 15)
+ .values(user_id=15, user_name="n1"),
+ )
+
+ eq_(
+ canary.mock_calls,
+ [
+ call.do_set_input_sizes(
+ mock.ANY,
+ [
+ (
"user_id",
mock.ANY,
testing.eq_type_affinity(Integer),
@@ -3814,6 +3863,11 @@ class SetInputSizesTest(fixtures.TablesTest):
mock.ANY,
testing.eq_type_affinity(String),
),
+ (
+ "user_id_1",
+ mock.ANY,
+ testing.eq_type_affinity(Integer),
+ ),
],
mock.ANY,
)
@@ -3924,11 +3978,9 @@ class SetInputSizesTest(fixtures.TablesTest):
with engine.begin() as conn:
conn.execute(
- self.tables.users.insert(),
- [
- {"user_id": 1, "user_name": "n1"},
- {"user_id": 2, "user_name": "n2"},
- ],
+ self.tables.users.update()
+ .where(self.tables.users.c.user_id == 15)
+ .values(user_id=15, user_name="n1"),
)
eq_(
@@ -3947,6 +3999,11 @@ class SetInputSizesTest(fixtures.TablesTest):
(SPECIAL_STRING, None, 0),
testing.eq_type_affinity(String),
),
+ (
+ "user_id_1",
+ mock.ANY,
+ testing.eq_type_affinity(Integer),
+ ),
],
mock.ANY,
)
diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py
index 38e1c436c..b1c487631 100644
--- a/test/engine/test_logging.py
+++ b/test/engine/test_logging.py
@@ -4,6 +4,7 @@ import re
import sqlalchemy as tsa
from sqlalchemy import bindparam
from sqlalchemy import Column
+from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy import select
@@ -31,7 +32,9 @@ class LogParamsTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
def setup_test(self):
- self.eng = engines.testing_engine(options={"echo": True})
+ self.eng = engines.testing_engine(
+ options={"echo": True, "insertmanyvalues_page_size": 150}
+ )
self.no_param_engine = engines.testing_engine(
options={"echo": True, "hide_parameters": True}
)
@@ -45,6 +48,7 @@ class LogParamsTest(fixtures.TestBase):
log.addHandler(self.buf)
def teardown_test(self):
+ self.eng = engines.testing_engine(options={"echo": True})
exec_sql(self.eng, "drop table if exists foo")
for log in [logging.getLogger("sqlalchemy.engine")]:
log.removeHandler(self.buf)
@@ -176,6 +180,21 @@ class LogParamsTest(fixtures.TestBase):
repr(params),
)
+ def test_repr_params_huge_named_dict(self):
+ # given non-multi-params in a list. repr params with
+ # per-element truncation, mostly does the exact same thing
+ params = {"key_%s" % i: i for i in range(800)}
+ eq_(
+ repr(sql_util._repr_params(params, batches=10, ismulti=False)),
+ # this assertion is very hardcoded to exactly how many characters
+ # are in a Python dict repr() for the given name/value scheme
+ # in the sample dictionary. If for some strange reason
+ # Python dictionary repr() changes in some way, then this would
+ # have to be adjusted
+ f"{repr(params)[0:679]} ... 700 parameters truncated ... "
+ f"{repr(params)[-799:]}",
+ )
+
def test_repr_params_ismulti_named_dict(self):
# given non-multi-params in a list. repr params with
# per-element truncation, mostly does the exact same thing
@@ -219,6 +238,104 @@ class LogParamsTest(fixtures.TestBase):
"298, 299], 5]]",
)
+ def test_log_insertmanyvalues(self):
+ """test the full logging for insertmanyvalues added for #6047.
+
+ to make it as clear as possible what's going on, the "insertmanyvalues"
+ execute is noted explicitly and includes total number of batches,
+ batch count. The long SQL string as well as the long parameter list
+ is now truncated in the middle, which is a new logging capability
+ as of this feature (we had only truncation of many separate parameter
+ sets and truncation of long individual parameter values, not
+ a long single tuple/dict of parameters.)
+
+ """
+ t = Table(
+ "t",
+ MetaData(),
+ Column("id", Integer, primary_key=True),
+ Column("data", String),
+ )
+
+ with self.eng.begin() as connection:
+ t.create(connection)
+
+ connection.execute(
+ t.insert().returning(t.c.id),
+ [{"data": f"d{i}"} for i in range(327)],
+ )
+
+ full_insert = (
+ "INSERT INTO t (data) VALUES (?), (?), "
+ "(?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), "
+ "(? ... 439 characters truncated ... ?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?) "
+ "RETURNING id"
+ )
+ eq_(self.buf.buffer[3].message, full_insert)
+
+ eq_regex(
+ self.buf.buffer[4].message,
+ r"\[generated in .* \(insertmanyvalues\)\] \('d0', 'd1', "
+ r"'d2', 'd3', 'd4', 'd5', 'd6', 'd7', "
+ r"'d8', 'd9', 'd10', 'd11', 'd12', 'd13', 'd14', 'd15', "
+ r"'d16', 'd17', 'd18', 'd19', 'd20', 'd21', 'd22', 'd23', "
+ r"'d24', 'd25', 'd26', 'd27', 'd28', 'd29', 'd30', "
+ r"'d31', 'd32', 'd33', 'd34', 'd35', 'd36', 'd37', 'd38', "
+ r"'d39', 'd40', 'd41', 'd42', 'd43', 'd44', 'd45', 'd46', "
+ r"'d47', 'd48', "
+ r"'d49' ... 50 parameters truncated ... "
+ r"'d100', 'd101', 'd102', 'd103', 'd104', 'd105', 'd106', "
+ r"'d107', 'd108', 'd109', 'd110', 'd111', 'd112', 'd113', "
+ r"'d114', 'd115', 'd116', 'd117', 'd118', 'd119', 'd120', "
+ r"'d121', 'd122', 'd123', 'd124', 'd125', 'd126', 'd127', "
+ r"'d128', 'd129', 'd130', 'd131', 'd132', 'd133', 'd134', "
+ r"'d135', 'd136', 'd137', 'd138', 'd139', 'd140', "
+ r"'d141', 'd142', "
+ r"'d143', 'd144', 'd145', 'd146', 'd147', 'd148', 'd149'\)",
+ )
+ eq_(self.buf.buffer[5].message, full_insert)
+ eq_(
+ self.buf.buffer[6].message,
+ "[insertmanyvalues batch 2 of 3] ('d150', 'd151', 'd152', "
+ "'d153', 'd154', 'd155', 'd156', 'd157', 'd158', 'd159', "
+ "'d160', 'd161', 'd162', 'd163', 'd164', 'd165', 'd166', "
+ "'d167', 'd168', 'd169', 'd170', 'd171', 'd172', 'd173', "
+ "'d174', 'd175', 'd176', 'd177', 'd178', 'd179', 'd180', "
+ "'d181', 'd182', 'd183', 'd184', 'd185', 'd186', 'd187', "
+ "'d188', 'd189', 'd190', 'd191', 'd192', 'd193', 'd194', "
+ "'d195', 'd196', 'd197', 'd198', 'd199' "
+ "... 50 parameters truncated ... 'd250', 'd251', 'd252', "
+ "'d253', 'd254', 'd255', 'd256', 'd257', 'd258', 'd259', "
+ "'d260', 'd261', 'd262', 'd263', 'd264', 'd265', 'd266', "
+ "'d267', 'd268', 'd269', 'd270', 'd271', 'd272', 'd273', "
+ "'d274', 'd275', 'd276', 'd277', 'd278', 'd279', 'd280', "
+ "'d281', 'd282', 'd283', 'd284', 'd285', 'd286', 'd287', "
+ "'d288', 'd289', 'd290', 'd291', 'd292', 'd293', 'd294', "
+ "'d295', 'd296', 'd297', 'd298', 'd299')",
+ )
+ eq_(
+ self.buf.buffer[7].message,
+ "INSERT INTO t (data) VALUES (?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), "
+ "(?), (?), (?), (?), (?), (?) RETURNING id",
+ )
+ eq_(
+ self.buf.buffer[8].message,
+ "[insertmanyvalues batch 3 of 3] ('d300', 'd301', 'd302', "
+ "'d303', 'd304', 'd305', 'd306', 'd307', 'd308', 'd309', "
+ "'d310', 'd311', 'd312', 'd313', 'd314', 'd315', 'd316', "
+ "'d317', 'd318', 'd319', 'd320', 'd321', 'd322', 'd323', "
+ "'d324', 'd325', 'd326')",
+ )
+
def test_log_large_parameter_single(self):
import random
diff --git a/test/orm/test_unitofwork.py b/test/orm/test_unitofwork.py
index 2821d3a39..b94998716 100644
--- a/test/orm/test_unitofwork.py
+++ b/test/orm/test_unitofwork.py
@@ -3523,7 +3523,7 @@ class NoRowInsertedTest(fixtures.TestBase):
):
if statement.startswith("INSERT"):
if statement.endswith("RETURNING my_table.id"):
- if executemany:
+ if executemany and isinstance(parameters, list):
# remove some rows, so the count is wrong
parameters = parameters[0:1]
else:
diff --git a/test/requirements.py b/test/requirements.py
index 031e0eb4a..c6979d663 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -894,6 +894,11 @@ class DefaultRequirements(SuiteRequirements):
return skip_if(["mariadb+mariadbconnector"]) + self.empty_inserts
@property
+ def provisioned_upsert(self):
+ """backend includes upsert() in its provisioning.py"""
+ return only_on(["postgresql", "sqlite", "mariadb"])
+
+ @property
def expressions_against_unbounded_text(self):
"""target database supports use of an unbounded textual field in a
WHERE clause."""
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
index 61e0783e4..23a850f08 100644
--- a/test/sql/test_insert.py
+++ b/test/sql/test_insert.py
@@ -469,6 +469,65 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
dialect=postgresql.dialect(),
)
+ def test_heterogeneous_multi_values(self):
+ """for #6047, originally I thought we'd take any insert().values()
+ and be able to convert it to a "many" style execution that we can
+ cache.
+
+ however, this test shows that we cannot, at least not in the
+ general case, because SQL expressions are not guaranteed to be in
+ the same position each time, therefore each ``VALUES`` clause is not
+ of the same structure.
+
+ """
+
+ m = MetaData()
+
+ t1 = Table(
+ "t",
+ m,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ Column("z", Integer),
+ )
+
+ stmt = t1.insert().values(
+ [
+ {"x": 1, "y": func.sum(1, 2), "z": 2},
+ {"x": func.sum(1, 2), "y": 2, "z": 3},
+ {"x": func.sum(1, 2), "y": 2, "z": func.foo(10)},
+ ]
+ )
+
+ # SQL expressions in the params at arbitrary locations means
+ # we have to scan them at compile time, and the shape of the bound
+ # parameters is not predictable. so for #6047 where I originally
+ # thought all of values() could be rewritten, this makes it not
+ # really worth it.
+ self.assert_compile(
+ stmt,
+ "INSERT INTO t (x, y, z) VALUES "
+ "(%(x_m0)s, sum(%(sum_1)s, %(sum_2)s), %(z_m0)s), "
+ "(sum(%(sum_3)s, %(sum_4)s), %(y_m1)s, %(z_m1)s), "
+ "(sum(%(sum_5)s, %(sum_6)s), %(y_m2)s, foo(%(foo_1)s))",
+ checkparams={
+ "x_m0": 1,
+ "sum_1": 1,
+ "sum_2": 2,
+ "z_m0": 2,
+ "sum_3": 1,
+ "sum_4": 2,
+ "y_m1": 2,
+ "z_m1": 3,
+ "sum_5": 1,
+ "sum_6": 2,
+ "y_m2": 2,
+ "foo_1": 10,
+ },
+ dialect=postgresql.dialect(),
+ )
+
def test_insert_seq_pk_multi_values_seq_not_supported(self):
m = MetaData()
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py
index b6945813e..4ce093156 100644
--- a/test/sql/test_insert_exec.py
+++ b/test/sql/test_insert_exec.py
@@ -1,4 +1,7 @@
+import itertools
+
from sqlalchemy import and_
+from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
@@ -10,8 +13,10 @@ from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import VARCHAR
+from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import mock
@@ -712,3 +717,237 @@ class TableInsertTest(fixtures.TablesTest):
table=t,
parameters=dict(id=None, data="data", x=5),
)
+
+
+class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest):
+ __backend__ = True
+ __requires__ = ("insertmanyvalues",)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "data",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", String(50)),
+ Column("y", String(50)),
+ Column("z", Integer, server_default="5"),
+ )
+
+ Table(
+ "Unitéble2",
+ metadata,
+ Column("méil", Integer, primary_key=True),
+ Column("\u6e2c\u8a66", Integer),
+ )
+
+ def test_insert_unicode_keys(self, connection):
+ table = self.tables["Unitéble2"]
+
+ stmt = table.insert().returning(table.c["méil"])
+
+ connection.execute(
+ stmt,
+ [
+ {"méil": 1, "\u6e2c\u8a66": 1},
+ {"méil": 2, "\u6e2c\u8a66": 2},
+ {"méil": 3, "\u6e2c\u8a66": 3},
+ ],
+ )
+
+ eq_(connection.execute(table.select()).all(), [(1, 1), (2, 2), (3, 3)])
+
+ def test_insert_returning_values(self, connection):
+ t = self.tables.data
+
+ conn = connection
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ data = [
+ {"x": "x%d" % i, "y": "y%d" % i}
+ for i in range(1, page_size * 2 + 27)
+ ]
+ result = conn.execute(t.insert().returning(t.c.x, t.c.y), data)
+
+ eq_([tup[0] for tup in result.cursor.description], ["x", "y"])
+ eq_(result.keys(), ["x", "y"])
+ assert t.c.x in result.keys()
+ assert t.c.id not in result.keys()
+ assert not result._soft_closed
+ assert isinstance(
+ result.cursor_strategy,
+ _cursor.FullyBufferedCursorFetchStrategy,
+ )
+ assert not result.closed
+ eq_(result.mappings().all(), data)
+
+ assert result._soft_closed
+ # assert result.closed
+ assert result.cursor is None
+
+ def test_insert_returning_preexecute_pk(self, metadata, connection):
+ counter = itertools.count(1)
+
+ t = Table(
+ "t",
+ self.metadata,
+ Column(
+ "id",
+ Integer,
+ primary_key=True,
+ default=lambda: next(counter),
+ ),
+ Column("data", Integer),
+ )
+ metadata.create_all(connection)
+
+ result = connection.execute(
+ t.insert().return_defaults(),
+ [{"data": 1}, {"data": 2}, {"data": 3}],
+ )
+
+ eq_(result.inserted_primary_key_rows, [(1,), (2,), (3,)])
+
+ def test_insert_returning_defaults(self, connection):
+ t = self.tables.data
+
+ conn = connection
+
+ result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
+ first_pk = result.inserted_primary_key[0]
+
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ total_rows = page_size * 5 + 27
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)]
+ result = conn.execute(t.insert().returning(t.c.id, t.c.z), data)
+
+ eq_(
+ result.all(),
+ [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)],
+ )
+
+ def test_insert_return_pks_default_values(self, connection):
+ """test sending multiple, empty rows into an INSERT and getting primary
+ key values back.
+
+ This has to use a format that indicates at least one DEFAULT in
+ multiple parameter sets, i.e. "INSERT INTO table (anycol) VALUES
+ (DEFAULT) (DEFAULT) (DEFAULT) ... RETURNING col"
+
+ if the database doesnt support this (like SQLite, mssql), it
+ actually runs the statement that many times on the cursor.
+ This is much less efficient, but is still more efficient than
+ how it worked previously where we'd run the statement that many
+ times anyway.
+
+ There's ways to make it work for those, such as on SQLite
+ we can use "INSERT INTO table (pk_col) VALUES (NULL) RETURNING pk_col",
+ but that assumes an autoincrement pk_col, not clear how this
+ could be produced generically.
+
+ """
+ t = self.tables.data
+
+ conn = connection
+
+ result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
+ first_pk = result.inserted_primary_key[0]
+
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ total_rows = page_size * 2 + 27
+ data = [{} for i in range(1, total_rows)]
+ result = conn.execute(t.insert().returning(t.c.id), data)
+
+ eq_(
+ result.all(),
+ [(pk,) for pk in range(1 + first_pk, total_rows + first_pk)],
+ )
+
+ @testing.combinations(None, 100, 329, argnames="batchsize")
+ @testing.combinations(
+ "engine",
+ "conn_execution_option",
+ "exec_execution_option",
+ "stmt_execution_option",
+ argnames="paramtype",
+ )
+ def test_page_size_adjustment(self, testing_engine, batchsize, paramtype):
+
+ t = self.tables.data
+
+ if paramtype == "engine" and batchsize is not None:
+ e = testing_engine(
+ options={
+ "insertmanyvalues_page_size": batchsize,
+ },
+ )
+
+ # sqlite, since this is a new engine, re-create the table
+ if not testing.requires.independent_connections.enabled:
+ t.create(e, checkfirst=True)
+ else:
+ e = testing.db
+
+ totalnum = 1275
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)]
+
+ insert_count = 0
+
+ with e.begin() as conn:
+
+ @event.listens_for(conn, "before_cursor_execute")
+ def go(conn, cursor, statement, parameters, context, executemany):
+ nonlocal insert_count
+ if statement.startswith("INSERT"):
+ insert_count += 1
+
+ stmt = t.insert()
+ if batchsize is None or paramtype == "engine":
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "conn_execution_option":
+ conn = conn.execution_options(
+ insertmanyvalues_page_size=batchsize
+ )
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "stmt_execution_option":
+ stmt = stmt.execution_options(
+ insertmanyvalues_page_size=batchsize
+ )
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "exec_execution_option":
+ conn.execute(
+ stmt.returning(t.c.id),
+ data,
+ execution_options=dict(
+ insertmanyvalues_page_size=batchsize
+ ),
+ )
+ else:
+ assert False
+
+ assert_batchsize = batchsize or 1000
+ eq_(
+ insert_count,
+ totalnum // assert_batchsize
+ + (1 if totalnum % assert_batchsize else 0),
+ )
+
+ def test_disabled(self, testing_engine):
+
+ e = testing_engine(
+ options={"use_insertmanyvalues": False},
+ share_pool=True,
+ transfer_staticpool=True,
+ )
+ totalnum = 1275
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)]
+
+ t = self.tables.data
+
+ with e.begin() as conn:
+ stmt = t.insert()
+ with expect_raises_message(
+ exc.StatementError,
+ "with current server capabilities does not support "
+ "INSERT..RETURNING when executemany",
+ ):
+ conn.execute(stmt.returning(t.c.id), data)
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
index c458e3262..f8cc32517 100644
--- a/test/sql/test_returning.py
+++ b/test/sql/test_returning.py
@@ -19,8 +19,12 @@ from sqlalchemy.sql.sqltypes import NullType
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import AssertsExecutionResults
+from sqlalchemy.testing import config
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import mock
+from sqlalchemy.testing import provision
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.types import TypeDecorator
@@ -71,10 +75,12 @@ class ReturnCombinationTests(fixtures.TestBase, AssertsCompiledSQL):
stmt = stmt.returning(t.c.x)
+ stmt = stmt.return_defaults()
assert_raises_message(
- sa_exc.InvalidRequestError,
- "RETURNING is already configured on this statement",
- stmt.return_defaults,
+ sa_exc.CompileError,
+ r"Can't compile statement that includes returning\(\) "
+ r"and return_defaults\(\) simultaneously",
+ stmt.compile,
)
def test_return_defaults_no_returning(self, table_fixture):
@@ -224,7 +230,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
cls.GoofyType = GoofyType
Table(
- "tables",
+ "returning_tbl",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
@@ -236,7 +242,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
)
def test_column_targeting(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
result = connection.execute(
table.insert().returning(table.c.id, table.c.full),
{"persons": 1, "full": False},
@@ -260,7 +266,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
eq_(row["goofy"], "FOOsomegoofyBAR")
def test_labeling(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
result = connection.execute(
table.insert()
.values(persons=6)
@@ -270,7 +276,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
assert row["lala"] == 6
def test_anon_expressions(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
GoofyType = self.GoofyType
result = connection.execute(
table.insert()
@@ -286,27 +292,75 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
row = result.first()
eq_(row[0], 30)
- @testing.fails_on(
- "mssql",
- "driver has unknown issue with string concatenation "
- "in INSERT RETURNING",
+ @testing.combinations(
+ (lambda table: (table.c.strval + "hi",), ("str1hi",)),
+ (
+ lambda table: (
+ table.c.persons,
+ table.c.full,
+ table.c.strval + "hi",
+ ),
+ (
+ 5,
+ False,
+ "str1hi",
+ ),
+ ),
+ (
+ lambda table: (
+ table.c.persons,
+ table.c.strval + "hi",
+ table.c.full,
+ ),
+ (5, "str1hi", False),
+ ),
+ (
+ lambda table: (
+ table.c.strval + "hi",
+ table.c.persons,
+ table.c.full,
+ ),
+ ("str1hi", 5, False),
+ ),
+ argnames="testcase, expected_row",
)
- def test_insert_returning_w_expression_one(self, connection):
- table = self.tables.tables
+ def test_insert_returning_w_expression(
+ self, connection, testcase, expected_row
+ ):
+ table = self.tables.returning_tbl
+
+ exprs = testing.resolve_lambda(testcase, table=table)
result = connection.execute(
- table.insert().returning(table.c.strval + "hi"),
+ table.insert().returning(*exprs),
{"persons": 5, "full": False, "strval": "str1"},
)
- eq_(result.fetchall(), [("str1hi",)])
+ eq_(result.fetchall(), [expected_row])
result2 = connection.execute(
select(table.c.id, table.c.strval).order_by(table.c.id)
)
eq_(result2.fetchall(), [(1, "str1")])
+ def test_insert_explicit_pk_col(self, connection):
+ table = self.tables.returning_tbl
+ result = connection.execute(
+ table.insert().returning(table.c.id, table.c.strval),
+ {"id": 1, "strval": "str1"},
+ )
+
+ eq_(
+ result.fetchall(),
+ [
+ (
+ 1,
+ "str1",
+ )
+ ],
+ )
+
def test_insert_returning_w_type_coerce_expression(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
result = connection.execute(
table.insert().returning(type_coerce(table.c.goofy, String)),
{"persons": 5, "goofy": "somegoofy"},
@@ -320,7 +374,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
eq_(result2.fetchall(), [(1, "FOOsomegoofyBAR")])
def test_no_ipk_on_returning(self, connection, close_result_when_finished):
- table = self.tables.tables
+ table = self.tables.returning_tbl
result = connection.execute(
table.insert().returning(table.c.id), {"persons": 1, "full": False}
)
@@ -334,7 +388,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
)
def test_insert_returning(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
result = connection.execute(
table.insert().returning(table.c.id), {"persons": 1, "full": False}
)
@@ -342,8 +396,8 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
eq_(result.fetchall(), [(1,)])
@testing.requires.multivalues_inserts
- def test_multirow_returning(self, connection):
- table = self.tables.tables
+ def test_multivalues_insert_returning(self, connection):
+ table = self.tables.returning_tbl
ins = (
table.insert()
.returning(table.c.id, table.c.persons)
@@ -372,7 +426,7 @@ class InsertReturningTest(fixtures.TablesTest, AssertsExecutionResults):
literal_true = "1"
result4 = connection.exec_driver_sql(
- "insert into tables (id, persons, %sfull%s) "
+ "insert into returning_tbl (id, persons, %sfull%s) "
"values (5, 10, %s) returning persons"
% (quote, quote, literal_true)
)
@@ -388,7 +442,7 @@ class UpdateReturningTest(fixtures.TablesTest, AssertsExecutionResults):
define_tables = InsertReturningTest.define_tables
def test_update_returning(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
connection.execute(
table.insert(),
[{"persons": 5, "full": False}, {"persons": 3, "full": False}],
@@ -408,7 +462,7 @@ class UpdateReturningTest(fixtures.TablesTest, AssertsExecutionResults):
eq_(result2.fetchall(), [(1, True), (2, False)])
def test_update_returning_w_expression_one(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
connection.execute(
table.insert(),
[
@@ -431,7 +485,7 @@ class UpdateReturningTest(fixtures.TablesTest, AssertsExecutionResults):
eq_(result2.fetchall(), [(1, "str1"), (2, "str2")])
def test_update_returning_w_type_coerce_expression(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
connection.execute(
table.insert(),
[
@@ -457,7 +511,7 @@ class UpdateReturningTest(fixtures.TablesTest, AssertsExecutionResults):
)
def test_update_full_returning(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
connection.execute(
table.insert(),
[{"persons": 5, "full": False}, {"persons": 3, "full": False}],
@@ -481,7 +535,7 @@ class DeleteReturningTest(fixtures.TablesTest, AssertsExecutionResults):
define_tables = InsertReturningTest.define_tables
def test_delete_returning(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
connection.execute(
table.insert(),
[{"persons": 5, "full": False}, {"persons": 3, "full": False}],
@@ -536,7 +590,7 @@ class SequenceReturningTest(fixtures.TablesTest):
def define_tables(cls, metadata):
seq = Sequence("tid_seq")
Table(
- "tables",
+ "returning_tbl",
metadata,
Column(
"id",
@@ -549,7 +603,7 @@ class SequenceReturningTest(fixtures.TablesTest):
cls.sequences.tid_seq = seq
def test_insert(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
r = connection.execute(
table.insert().values(data="hi").returning(table.c.id)
)
@@ -570,7 +624,7 @@ class KeyReturningTest(fixtures.TablesTest, AssertsExecutionResults):
@classmethod
def define_tables(cls, metadata):
Table(
- "tables",
+ "returning_tbl",
metadata,
Column(
"id",
@@ -584,7 +638,7 @@ class KeyReturningTest(fixtures.TablesTest, AssertsExecutionResults):
@testing.exclude("postgresql", "<", (8, 2), "8.2+ feature")
def test_insert(self, connection):
- table = self.tables.tables
+ table = self.tables.returning_tbl
result = connection.execute(
table.insert().returning(table.c.foo_id), dict(data="somedata")
)
@@ -886,18 +940,359 @@ class InsertManyReturnDefaultsTest(fixtures.TablesTest):
],
)
+ if connection.dialect.insert_null_pk_still_autoincrements:
+ eq_(
+ [row._mapping for row in result.returned_defaults_rows],
+ [
+ {"id": 10, "insdef": 0, "upddef": None},
+ {"id": 11, "insdef": 0, "upddef": None},
+ {"id": 12, "insdef": 0, "upddef": None},
+ {"id": 13, "insdef": 0, "upddef": None},
+ {"id": 14, "insdef": 0, "upddef": None},
+ {"id": 15, "insdef": 0, "upddef": None},
+ ],
+ )
+ else:
+ eq_(
+ [row._mapping for row in result.returned_defaults_rows],
+ [
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ ],
+ )
eq_(
- [row._mapping for row in result.returned_defaults_rows],
+ result.inserted_primary_key_rows,
+ [(10,), (11,), (12,), (13,), (14,), (15,)],
+ )
+
+
+class InsertManyReturningTest(fixtures.TablesTest):
+ __requires__ = ("insert_executemany_returning",)
+ run_define_tables = "each"
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ from sqlalchemy.sql import ColumnElement
+ from sqlalchemy.ext.compiler import compiles
+
+ counter = itertools.count()
+
+ class IncDefault(ColumnElement):
+ pass
+
+ @compiles(IncDefault)
+ def compile_(element, compiler, **kw):
+ return str(next(counter))
+
+ Table(
+ "default_cases",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ Column("insdef", Integer, default=IncDefault()),
+ Column("upddef", Integer, onupdate=IncDefault()),
+ )
+
+ class GoofyType(TypeDecorator):
+ impl = String
+ cache_ok = True
+
+ def process_bind_param(self, value, dialect):
+ if value is None:
+ return None
+ return "FOO" + value
+
+ def process_result_value(self, value, dialect):
+ if value is None:
+ return None
+ return value + "BAR"
+
+ cls.GoofyType = GoofyType
+
+ Table(
+ "type_cases",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("persons", Integer),
+ Column("full", Boolean),
+ Column("goofy", GoofyType(50)),
+ Column("strval", String(50)),
+ )
+
+ @testing.combinations(
+ (
+ lambda table: (table.c.strval + "hi",),
+ [("str1hi",), ("str2hi",), ("str3hi",)],
+ ),
+ (
+ lambda table: (
+ table.c.persons,
+ table.c.full,
+ table.c.strval + "hi",
+ ),
+ [
+ (5, False, "str1hi"),
+ (6, True, "str2hi"),
+ (7, False, "str3hi"),
+ ],
+ ),
+ (
+ lambda table: (
+ table.c.persons,
+ table.c.strval + "hi",
+ table.c.full,
+ ),
[
- {"insdef": 0, "upddef": None},
- {"insdef": 0, "upddef": None},
- {"insdef": 0, "upddef": None},
- {"insdef": 0, "upddef": None},
- {"insdef": 0, "upddef": None},
- {"insdef": 0, "upddef": None},
+ (5, "str1hi", False),
+ (6, "str2hi", True),
+ (7, "str3hi", False),
+ ],
+ ),
+ (
+ lambda table: (
+ table.c.strval + "hi",
+ table.c.persons,
+ table.c.full,
+ ),
+ [
+ ("str1hi", 5, False),
+ ("str2hi", 6, True),
+ ("str3hi", 7, False),
+ ],
+ ),
+ argnames="testcase, expected_rows",
+ )
+ def test_insert_returning_w_expression(
+ self, connection, testcase, expected_rows
+ ):
+ table = self.tables.type_cases
+
+ exprs = testing.resolve_lambda(testcase, table=table)
+ result = connection.execute(
+ table.insert().returning(*exprs),
+ [
+ {"persons": 5, "full": False, "strval": "str1"},
+ {"persons": 6, "full": True, "strval": "str2"},
+ {"persons": 7, "full": False, "strval": "str3"},
+ ],
+ )
+
+ eq_(result.fetchall(), expected_rows)
+
+ result2 = connection.execute(
+ select(table.c.id, table.c.strval).order_by(table.c.id)
+ )
+ eq_(result2.fetchall(), [(1, "str1"), (2, "str2"), (3, "str3")])
+
+ @testing.fails_if(
+ # Oracle has native executemany() + returning and does not use
+ # insertmanyvalues to achieve this. so test that for
+ # that particular dialect, the exception expected is not raised
+ # in the case that the compiler vetoed insertmanyvalues (
+ # since Oracle's compiler will always veto it)
+ lambda config: not config.db.dialect.use_insertmanyvalues
+ )
+ def test_iie_supported_but_not_this_statement(self, connection):
+ """test the case where INSERT..RETURNING w/ executemany is used,
+ the dialect requires use_insertmanyreturning, but
+ the compiler vetoed the use of insertmanyvalues."""
+
+ t1 = self.tables.type_cases
+
+ with mock.patch.object(
+ testing.db.dialect.statement_compiler,
+ "_insert_stmt_should_use_insertmanyvalues",
+ lambda *arg: False,
+ ):
+ with expect_raises_message(
+ sa_exc.StatementError,
+ r'Statement does not have "insertmanyvalues" enabled, '
+ r"can\'t use INSERT..RETURNING with executemany in this case.",
+ ):
+ connection.execute(
+ t1.insert().returning(t1.c.id, t1.c.goofy, t1.c.full),
+ [
+ {"persons": 5, "full": True},
+ {"persons": 6, "full": True},
+ {"persons": 7, "full": False},
+ ],
+ )
+
+ def test_insert_executemany_type_test(self, connection):
+ t1 = self.tables.type_cases
+ result = connection.execute(
+ t1.insert().returning(t1.c.id, t1.c.goofy, t1.c.full),
+ [
+ {"persons": 5, "full": True, "goofy": "row1", "strval": "s1"},
+ {"persons": 6, "full": True, "goofy": "row2", "strval": "s2"},
+ {"persons": 7, "full": False, "goofy": "row3", "strval": "s3"},
+ {"persons": 8, "full": True, "goofy": "row4", "strval": "s4"},
],
)
eq_(
- result.inserted_primary_key_rows,
- [(10,), (11,), (12,), (13,), (14,), (15,)],
+ result.mappings().all(),
+ [
+ {"id": 1, "goofy": "FOOrow1BAR", "full": True},
+ {"id": 2, "goofy": "FOOrow2BAR", "full": True},
+ {"id": 3, "goofy": "FOOrow3BAR", "full": False},
+ {"id": 4, "goofy": "FOOrow4BAR", "full": True},
+ ],
)
+
+ def test_insert_executemany_default_generators(self, connection):
+ t1 = self.tables.default_cases
+ result = connection.execute(
+ t1.insert().returning(t1.c.id, t1.c.insdef, t1.c.upddef),
+ [
+ {"data": "d1"},
+ {"data": "d2"},
+ {"data": "d3"},
+ {"data": "d4"},
+ {"data": "d5"},
+ {"data": "d6"},
+ ],
+ )
+
+ eq_(
+ result.mappings().all(),
+ [
+ {"id": 1, "insdef": 0, "upddef": None},
+ {"id": 2, "insdef": 0, "upddef": None},
+ {"id": 3, "insdef": 0, "upddef": None},
+ {"id": 4, "insdef": 0, "upddef": None},
+ {"id": 5, "insdef": 0, "upddef": None},
+ {"id": 6, "insdef": 0, "upddef": None},
+ ],
+ )
+
+ @testing.combinations(True, False, argnames="update_cols")
+ @testing.requires.provisioned_upsert
+ def test_upsert_data_w_defaults(self, connection, update_cols):
+ t1 = self.tables.default_cases
+
+ new_rows = connection.execute(
+ t1.insert().returning(t1.c.id, t1.c.insdef, t1.c.data),
+ [
+ {"data": "d1"},
+ {"data": "d2"},
+ {"data": "d3"},
+ {"data": "d4"},
+ {"data": "d5"},
+ {"data": "d6"},
+ ],
+ ).all()
+
+ eq_(
+ new_rows,
+ [
+ (1, 0, "d1"),
+ (2, 0, "d2"),
+ (3, 0, "d3"),
+ (4, 0, "d4"),
+ (5, 0, "d5"),
+ (6, 0, "d6"),
+ ],
+ )
+
+ stmt = provision.upsert(
+ config,
+ t1,
+ (t1.c.id, t1.c.insdef, t1.c.data),
+ (lambda excluded: {"data": excluded.data + " excluded"})
+ if update_cols
+ else None,
+ )
+
+ upserted_rows = connection.execute(
+ stmt,
+ [
+ {"id": 1, "data": "d1 upserted"},
+ {"id": 4, "data": "d4 upserted"},
+ {"id": 5, "data": "d5 upserted"},
+ {"id": 7, "data": "d7 upserted"},
+ {"id": 8, "data": "d8 upserted"},
+ {"id": 9, "data": "d9 upserted"},
+ ],
+ ).all()
+
+ if update_cols:
+ eq_(
+ upserted_rows,
+ [
+ (1, 0, "d1 upserted excluded"),
+ (4, 0, "d4 upserted excluded"),
+ (5, 0, "d5 upserted excluded"),
+ (7, 1, "d7 upserted"),
+ (8, 1, "d8 upserted"),
+ (9, 1, "d9 upserted"),
+ ],
+ )
+ else:
+ if testing.against("sqlite", "postgresql"):
+ eq_(
+ upserted_rows,
+ [
+ (7, 1, "d7 upserted"),
+ (8, 1, "d8 upserted"),
+ (9, 1, "d9 upserted"),
+ ],
+ )
+ elif testing.against("mariadb"):
+ # mariadb does not seem to have an "empty" upsert,
+ # so the provision.upsert() sets table.c.id to itself.
+ # this means we get all the rows back
+ eq_(
+ upserted_rows,
+ [
+ (1, 0, "d1"),
+ (4, 0, "d4"),
+ (5, 0, "d5"),
+ (7, 1, "d7 upserted"),
+ (8, 1, "d8 upserted"),
+ (9, 1, "d9 upserted"),
+ ],
+ )
+
+ resulting_data = connection.execute(
+ t1.select().order_by(t1.c.id)
+ ).all()
+
+ if update_cols:
+ eq_(
+ resulting_data,
+ [
+ (1, "d1 upserted excluded", 0, None),
+ (2, "d2", 0, None),
+ (3, "d3", 0, None),
+ (4, "d4 upserted excluded", 0, None),
+ (5, "d5 upserted excluded", 0, None),
+ (6, "d6", 0, None),
+ (7, "d7 upserted", 1, None),
+ (8, "d8 upserted", 1, None),
+ (9, "d9 upserted", 1, None),
+ ],
+ )
+ else:
+ eq_(
+ resulting_data,
+ [
+ (1, "d1", 0, None),
+ (2, "d2", 0, None),
+ (3, "d3", 0, None),
+ (4, "d4", 0, None),
+ (5, "d5", 0, None),
+ (6, "d6", 0, None),
+ (7, "d7 upserted", 1, None),
+ (8, "d8 upserted", 1, None),
+ (9, "d9 upserted", 1, None),
+ ],
+ )