summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/postgresql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-07-18 15:08:37 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-09-24 11:15:32 -0400
commit2bcc97da424eef7db9a5d02f81d02344925415ee (patch)
tree13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /lib/sqlalchemy/dialects/postgresql
parent332188e5680574368001ded52eb0a9d259ecdef5 (diff)
downloadsqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends when RETURNING is used, except for Oracle that doesn't need it, and on psycopg2 and mssql+pyodbc it is used for all INSERT statements, not just those that use RETURNING. third party dialects would need to opt in to the new feature by setting use_insertmanyvalues to True. Also adds dialect-level guards against using returning with executemany where we dont have an implementation to suit it. execute single w/ returning still defers to the server without us checking. Fixes: #6047 Fixes: #7907 Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py2
-rw-r--r--lib/sqlalchemy/dialects/postgresql/base.py21
-rw-r--r--lib/sqlalchemy/dialects/postgresql/provision.py18
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py201
4 files changed, 82 insertions, 160 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index c953d3447..96bac59d9 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -418,7 +418,6 @@ class AsyncAdapt_asyncpg_cursor:
"description",
"arraysize",
"rowcount",
- "_inputsizes",
"_cursor",
"_invalidate_schema_cache_asof",
)
@@ -433,7 +432,6 @@ class AsyncAdapt_asyncpg_cursor:
self.description = None
self.arraysize = 1
self.rowcount = -1
- self._inputsizes = None
self._invalidate_schema_cache_asof = 0
def close(self):
diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py
index 18a7c0a86..3e43d601f 100644
--- a/lib/sqlalchemy/dialects/postgresql/base.py
+++ b/lib/sqlalchemy/dialects/postgresql/base.py
@@ -1330,7 +1330,6 @@ from typing import List
from typing import Optional
from . import array as _array
-from . import dml
from . import hstore as _hstore
from . import json as _json
from . import pg_catalog
@@ -1850,24 +1849,6 @@ class PGCompiler(compiler.SQLCompiler):
return target_text
- @util.memoized_property
- def _is_safe_for_fast_insert_values_helper(self):
- # don't allow fast executemany if _post_values_clause is
- # present and is not an OnConflictDoNothing. what this means
- # concretely is that the
- # "fast insert executemany helper" won't be used, in other
- # words we won't convert "executemany()" of many parameter
- # sets into a single INSERT with many elements in VALUES.
- # We can't apply that optimization safely if for example the
- # statement includes a clause like "ON CONFLICT DO UPDATE"
-
- return self.insert_single_values_expr is not None and (
- self.statement._post_values_clause is None
- or isinstance(
- self.statement._post_values_clause, dml.OnConflictDoNothing
- )
- )
-
def visit_on_conflict_do_nothing(self, on_conflict, **kw):
target_text = self._on_conflict_target(on_conflict, **kw)
@@ -2804,6 +2785,7 @@ class PGDialect(default.DefaultDialect):
sequences_optional = True
preexecute_autoincrement_sequences = True
postfetch_lastrowid = False
+ use_insertmanyvalues = True
supports_comments = True
supports_constraint_comments = True
@@ -2813,6 +2795,7 @@ class PGDialect(default.DefaultDialect):
supports_empty_insert = False
supports_multivalues_insert = True
+
supports_identity_columns = True
default_paramstyle = "pyformat"
diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py
index 0d17f28e0..8dd8a4995 100644
--- a/lib/sqlalchemy/dialects/postgresql/provision.py
+++ b/lib/sqlalchemy/dialects/postgresql/provision.py
@@ -14,6 +14,7 @@ from ...testing.provision import log
from ...testing.provision import prepare_for_drop_tables
from ...testing.provision import set_default_schema_on_connection
from ...testing.provision import temp_table_keyword_args
+from ...testing.provision import upsert
@create_db.for_db("postgresql")
@@ -125,3 +126,20 @@ def prepare_for_drop_tables(config, connection):
"idle in transaction: %s"
% ("; ".join(row._mapping["query"] for row in rows))
)
+
+
+@upsert.for_db("postgresql")
+def _upsert(cfg, table, returning, set_lambda=None):
+ from sqlalchemy.dialects.postgresql import insert
+
+ stmt = insert(table)
+
+ if set_lambda:
+ stmt = stmt.on_conflict_do_update(
+ index_elements=table.primary_key, set_=set_lambda(stmt.excluded)
+ )
+ else:
+ stmt = stmt.on_conflict_do_nothing()
+
+ stmt = stmt.returning(*returning)
+ return stmt
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
index a01f20e99..350f4b616 100644
--- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -230,25 +230,23 @@ Modern versions of psycopg2 include a feature known as
`Fast Execution Helpers \
<https://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
have been shown in benchmarking to improve psycopg2's executemany()
-performance, primarily with INSERT statements, by multiple orders of magnitude.
-SQLAlchemy internally makes use of these extensions for ``executemany()`` style
-calls, which correspond to lists of parameters being passed to
-:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter
-sets <tutorial_multiple_parameters>`. The ORM also uses this mode internally whenever
-possible.
-
-The two available extensions on the psycopg2 side are the ``execute_values()``
-and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the
-``execute_values()`` extension for all qualifying INSERT statements.
-
-.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode
- ``"values_only"`` for ``executemany_mode``, which allows an order of
- magnitude performance improvement for INSERT statements, but does not
- include "batch" mode for UPDATE and DELETE statements which removes the
- ability of ``cursor.rowcount`` to function correctly.
-
-The use of these extensions is controlled by the ``executemany_mode`` flag
-which may be passed to :func:`_sa.create_engine`::
+performance, primarily with INSERT statements, by at least
+an order of magnitude.
+
+SQLAlchemy implements a native form of the "insert many values"
+handler that will rewrite a single-row INSERT statement to accommodate for
+many values at once within an extended VALUES clause; this handler is
+equivalent to psycopg2's ``execute_values()`` handler; an overview of this
+feature and its configuration are at :ref:`engine_insertmanyvalues`.
+
+.. versionadded:: 2.0 Replaced psycopg2's ``execute_values()`` fast execution
+ helper with a native SQLAlchemy mechanism referred towards as
+ :ref:`insertmanyvalues <engine_insertmanyvalues>`.
+
+The psycopg2 dialect retains the ability to use the psycopg2-specific
+``execute_batch()`` feature, although it is not expected that this is a widely
+used feature. The use of this extension may be enabled using the
+``executemany_mode`` flag which may be passed to :func:`_sa.create_engine`::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
@@ -257,59 +255,55 @@ which may be passed to :func:`_sa.create_engine`::
Possible options for ``executemany_mode`` include:
-* ``values_only`` - this is the default value. the psycopg2 execute_values()
- extension is used for qualifying INSERT statements, which rewrites the INSERT
- to include multiple VALUES clauses so that many parameter sets can be
- inserted with one statement.
-
- .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode``
- which is also now the default.
-
-* ``None`` - No psycopg2 extensions are not used, and the usual
- ``cursor.executemany()`` method is used when invoking statements with
- multiple parameter sets.
-
-* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying
- INSERT, UPDATE and DELETE statements, so that multiple copies
- of a SQL query, each one corresponding to a parameter set passed to
- ``executemany()``, are joined into a single SQL string separated by a
- semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount`
- attribute will not contain a value for executemany-style executions.
-
-* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT
- statements, ``execute_batch`` is used for UPDATE and DELETE.
- When using this mode, the :attr:`_engine.CursorResult.rowcount`
+* ``values_only`` - this is the default value. SQLAlchemy's native
+ :ref:`insertmanyvalues <engine_insertmanyvalues>` handler is used for qualifying
+ INSERT statements, assuming
+ :paramref:`_sa.create_engine.use_insertmanyvalues` is left at
+ its default value of ``True``. This handler rewrites simple
+ INSERT statements to include multiple VALUES clauses so that many
+ parameter sets can be inserted with one statement.
+
+* ``'values_plus_batch'``- SQLAlchemy's native
+ :ref:`insertmanyvalues <engine_insertmanyvalues>` handler is used for qualifying
+ INSERT statements, assuming
+ :paramref:`_sa.create_engine.use_insertmanyvalues` is left at its default
+ value of ``True``. Then, psycopg2's ``execute_batch()`` handler is used for
+ qualifying UPDATE and DELETE statements when executed with multiple parameter
+ sets. When using this mode, the :attr:`_engine.CursorResult.rowcount`
attribute will not contain a value for executemany-style executions against
UPDATE and DELETE statements.
-By "qualifying statements", we mean that the statement being executed
-must be a Core :func:`_expression.insert`, :func:`_expression.update`
-or :func:`_expression.delete` construct, and not a plain textual SQL
-string or one constructed using :func:`_expression.text`. When using the
-ORM, all insert/update/delete statements used by the ORM flush process
+.. versionchanged:: 2.0 Removed the ``'batch'`` and ``'None'`` options
+ from psycopg2 ``executemany_mode``. Control over batching for INSERT
+ statements is now configured via the
+ :paramref:`_sa.create_engine.use_insertmanyvalues` engine-level parameter.
+
+The term "qualifying statements" refers to the statement being executed
+being a Core :func:`_expression.insert`, :func:`_expression.update`
+or :func:`_expression.delete` construct, and **not** a plain textual SQL
+string or one constructed using :func:`_expression.text`. It also may **not** be
+a special "extension" statement such as an "ON CONFLICT" "upsert" statement.
+When using the ORM, all insert/update/delete statements used by the ORM flush process
are qualifying.
-The "page size" for the "values" and "batch" strategies can be affected
-by using the ``executemany_batch_page_size`` and
-``executemany_values_page_size`` engine parameters. These
-control how many parameter sets
-should be represented in each execution. The "values" page size defaults
-to 1000, which is different that psycopg2's default. The "batch" page
-size defaults to 100. These can be affected by passing new values to
-:func:`_engine.create_engine`::
+The "page size" for the psycopg2 "batch" strategy can be affected
+by using the ``executemany_batch_page_size`` parameter, which defaults to
+100.
+
+For the "insertmanyvalues" feature, the page size can be controlled using the
+:paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter,
+which defaults to 1000. An example of modifying both parameters
+is below::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
- executemany_mode='values',
- executemany_values_page_size=10000, executemany_batch_page_size=500)
-
-.. versionchanged:: 1.4
-
- The default for ``executemany_values_page_size`` is now 1000, up from
- 100.
+ executemany_mode='values_plus_batch',
+ insertmanyvalues_page_size=5000, executemany_batch_page_size=500)
.. seealso::
+ :ref:`engine_insertmanyvalues` - background on "insertmanyvalues"
+
:ref:`tutorial_multiple_parameters` - General information on using the
:class:`_engine.Connection`
object to execute statements in such a way as to make
@@ -484,13 +478,11 @@ from typing import cast
from . import ranges
from ._psycopg_common import _PGDialect_common_psycopg
from ._psycopg_common import _PGExecutionContext_common_psycopg
-from .base import PGCompiler
from .base import PGIdentifierPreparer
from .json import JSON
from .json import JSONB
from ... import types as sqltypes
from ... import util
-from ...engine import cursor as _cursor
from ...util import FastIntFlag
from ...util import parse_user_argument_for_enum
@@ -561,22 +553,6 @@ class PGExecutionContext_psycopg2(_PGExecutionContext_common_psycopg):
_psycopg2_fetched_rows = None
def post_exec(self):
- if (
- self._psycopg2_fetched_rows
- and self.compiled
- and self.compiled.effective_returning
- ):
- # psycopg2 execute_values will provide for a real cursor where
- # cursor.description works correctly. however, it executes the
- # INSERT statement multiple times for multiple pages of rows, so
- # while this cursor also supports calling .fetchall() directly, in
- # order to get the list of all rows inserted across multiple pages,
- # we have to retrieve the aggregated list from the execute_values()
- # function directly.
- strat_cls = _cursor.FullyBufferedCursorFetchStrategy
- self.cursor_fetch_strategy = strat_cls(
- self.cursor, initial_buffer=self._psycopg2_fetched_rows
- )
self._log_notices(self.cursor)
def _log_notices(self, cursor):
@@ -597,24 +573,16 @@ class PGExecutionContext_psycopg2(_PGExecutionContext_common_psycopg):
cursor.connection.notices[:] = []
-class PGCompiler_psycopg2(PGCompiler):
- pass
-
-
class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
pass
class ExecutemanyMode(FastIntFlag):
- EXECUTEMANY_PLAIN = 0
- EXECUTEMANY_BATCH = 1
- EXECUTEMANY_VALUES = 2
- EXECUTEMANY_VALUES_PLUS_BATCH = EXECUTEMANY_BATCH | EXECUTEMANY_VALUES
+ EXECUTEMANY_VALUES = 0
+ EXECUTEMANY_VALUES_PLUS_BATCH = 1
(
- EXECUTEMANY_PLAIN,
- EXECUTEMANY_BATCH,
EXECUTEMANY_VALUES,
EXECUTEMANY_VALUES_PLUS_BATCH,
) = tuple(ExecutemanyMode)
@@ -630,9 +598,9 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
# set to true based on psycopg2 version
supports_sane_multi_rowcount = False
execution_ctx_cls = PGExecutionContext_psycopg2
- statement_compiler = PGCompiler_psycopg2
preparer = PGIdentifierPreparer_psycopg2
psycopg2_version = (0, 0)
+ use_insertmanyvalues_wo_returning = True
_has_native_hstore = True
@@ -655,7 +623,6 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
self,
executemany_mode="values_only",
executemany_batch_page_size=100,
- executemany_values_page_size=1000,
**kwargs,
):
_PGDialect_common_psycopg.__init__(self, **kwargs)
@@ -665,19 +632,13 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
self.executemany_mode = parse_user_argument_for_enum(
executemany_mode,
{
- EXECUTEMANY_PLAIN: [None],
- EXECUTEMANY_BATCH: ["batch"],
EXECUTEMANY_VALUES: ["values_only"],
- EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"],
+ EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch"],
},
"executemany_mode",
)
- if self.executemany_mode & EXECUTEMANY_VALUES:
- self.insert_executemany_returning = True
-
self.executemany_batch_page_size = executemany_batch_page_size
- self.executemany_values_page_size = executemany_values_page_size
if self.dbapi and hasattr(self.dbapi, "__version__"):
m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
@@ -699,14 +660,8 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
is not None
)
- # PGDialect.initialize() checks server version for <= 8.2 and sets
- # this flag to False if so
- if not self.insert_returning:
- self.insert_executemany_returning = False
- self.executemany_mode = EXECUTEMANY_PLAIN
-
- self.supports_sane_multi_rowcount = not (
- self.executemany_mode & EXECUTEMANY_BATCH
+ self.supports_sane_multi_rowcount = (
+ self.executemany_mode is not EXECUTEMANY_VALUES_PLUS_BATCH
)
@classmethod
@@ -806,39 +761,7 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
return None
def do_executemany(self, cursor, statement, parameters, context=None):
- if (
- self.executemany_mode & EXECUTEMANY_VALUES
- and context
- and context.isinsert
- and context.compiled._is_safe_for_fast_insert_values_helper
- ):
- executemany_values = (
- "(%s)" % context.compiled.insert_single_values_expr
- )
-
- # guard for statement that was altered via event hook or similar
- if executemany_values not in statement:
- executemany_values = None
- else:
- executemany_values = None
-
- if executemany_values:
- statement = statement.replace(executemany_values, "%s")
- if self.executemany_values_page_size:
- kwargs = {"page_size": self.executemany_values_page_size}
- else:
- kwargs = {}
- xtras = self._psycopg2_extras
- context._psycopg2_fetched_rows = xtras.execute_values(
- cursor,
- statement,
- parameters,
- template=executemany_values,
- fetch=bool(context.compiled.effective_returning),
- **kwargs,
- )
-
- elif self.executemany_mode & EXECUTEMANY_BATCH:
+ if self.executemany_mode is EXECUTEMANY_VALUES_PLUS_BATCH:
if self.executemany_batch_page_size:
kwargs = {"page_size": self.executemany_batch_page_size}
else: