diff options
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 251 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 6 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/cursor.py | 82 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 83 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/persistence.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/sql/compiler.py | 94 | ||||
| -rw-r--r-- | lib/sqlalchemy/sql/crud.py | 9 | ||||
| -rw-r--r-- | lib/sqlalchemy/sql/dml.py | 36 | ||||
| -rw-r--r-- | lib/sqlalchemy/sql/elements.py | 16 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/assertions.py | 3 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/assertsql.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/requirements.py | 14 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_insert.py | 8 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_sequence.py | 6 |
14 files changed, 373 insertions, 241 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index a9408bcb0..6364838a6 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -61,9 +61,6 @@ psycopg2-specific keyword arguments which are accepted by :ref:`psycopg2_executemany_mode` -* ``use_batch_mode``: this is the previous setting used to affect "executemany" - mode and is now deprecated. - Unix Domain Connections ------------------------ @@ -155,66 +152,82 @@ Modern versions of psycopg2 include a feature known as <http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which have been shown in benchmarking to improve psycopg2's executemany() performance, primarily with INSERT statements, by multiple orders of magnitude. -SQLAlchemy allows this extension to be used for all ``executemany()`` style -calls invoked by an :class:`_engine.Engine` -when used with :ref:`multiple parameter -sets <execute_multiple>`, which includes the use of this feature both by the -Core as well as by the ORM for inserts of objects with non-autogenerated -primary key values, by adding the ``executemany_mode`` flag to -:func:`_sa.create_engine`:: +SQLAlchemy internally makes use of these extensions for ``executemany()`` style +calls, which correspond to lists of parameters being passed to +:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter +sets <execute_multiple>`. The ORM also uses this mode internally whenever +possible. + +The two available extensions on the psycopg2 side are the ``execute_values()`` +and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the +``execute_values()`` extension for all qualifying INSERT statements. + +.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode + ``"values_only"`` for ``executemany_mode``, which allows an order of + magnitude performance improvement for INSERT statements, but does not + include "batch" mode for UPDATE and DELETE statements which removes the + ability of ``cursor.rowcount`` to function correctly. + +The use of these extensions is controlled by the ``executemany_mode`` flag +which may be passed to :func:`_sa.create_engine`:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", - executemany_mode='batch') - + executemany_mode='values_plus_batch') -.. versionchanged:: 1.3.7 - the ``use_batch_mode`` flag has been superseded - by a new parameter ``executemany_mode`` which provides support both for - psycopg2's ``execute_batch`` helper as well as the ``execute_values`` - helper. Possible options for ``executemany_mode`` include: -* ``None`` - By default, psycopg2's extensions are not used, and the usual - ``cursor.executemany()`` method is used when invoking batches of statements. +* ``values_only`` - this is the default value. the psycopg2 execute_values() + extension is used for qualifying INSERT statements, which rewrites the INSERT + to include multiple VALUES clauses so that many parameter sets can be + inserted with one statement. -* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies + .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode`` + which is also now the default. + +* ``None`` - No psycopg2 extensions are not used, and the usual + ``cursor.executemany()`` method is used when invoking statements with + multiple parameter sets. + +* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying + INSERT, UPDATE and DELETE statements, so that multiple copies of a SQL query, each one corresponding to a parameter set passed to ``executemany()``, are joined into a single SQL string separated by a - semicolon. This is the same behavior as was provided by the - ``use_batch_mode=True`` flag. - -* ``'values'``- For Core :func:`_expression.insert` - constructs only (including those - emitted by the ORM automatically), the ``psycopg2.extras.execute_values`` - extension is used so that multiple parameter sets are grouped into a single - INSERT statement and joined together with multiple VALUES expressions. This - method requires that the string text of the VALUES clause inside the - INSERT statement is manipulated, so is only supported with a compiled - :func:`_expression.insert` construct where the format is predictable. - For all other - constructs, including plain textual INSERT statements not rendered by the - SQLAlchemy expression language compiler, the - ``psycopg2.extras.execute_batch`` method is used. It is therefore important - to note that **"values" mode implies that "batch" mode is also used for - all statements for which "values" mode does not apply**. - -For both strategies, the ``executemany_batch_page_size`` and -``executemany_values_page_size`` arguments control how many parameter sets -should be represented in each execution. Because "values" mode implies a -fallback down to "batch" mode for non-INSERT statements, there are two -independent page size arguments. For each, the default value of ``None`` means -to use psycopg2's defaults, which at the time of this writing are quite low at -100. For the ``execute_values`` method, a number as high as 10000 may prove -to be performant, whereas for ``execute_batch``, as the number represents -full statements repeated, a number closer to the default of 100 is likely -more appropriate:: + semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount` + attribute will not contain a value for executemany-style executions. + +* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT + statements, ``execute_batch`` is used for UPDATE and DELETE. + When using this mode, the :attr:`_engine.CursorResult.rowcount` + attribute will not contain a value for executemany-style executions against + UPDATE and DELETE statements. + +By "qualifying statements", we mean that the statement being executed +must be a Core :func:`_expression.insert`, :func:`_expression.update` +or :func:`_expression.delete` construct, and not a plain textual SQL +string or one constructed using :func:`_expression.text`. When using the +ORM, all insert/update/delete statements used by the ORM flush process +are qualifying. + +The "page size" for the "values" and "batch" strategies can be affected +by using the ``executemany_batch_page_size`` and +``executemany_values_page_size`` engine parameters. These +control how many parameter sets +should be represented in each execution. The "values" page size defaults +to 1000, which is different that psycopg2's default. The "batch" page +size defaults to 100. These can be affected by passing new values to +:func:`_engine.create_engine`:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", executemany_mode='values', executemany_values_page_size=10000, executemany_batch_page_size=500) +.. versionchanged:: 1.4 + + The default for ``executemany_values_page_size`` is now 1000, up from + 100. .. seealso:: @@ -223,10 +236,6 @@ more appropriate:: object to execute statements in such a way as to make use of the DBAPI ``.executemany()`` method. -.. versionchanged:: 1.3.7 - Added support for - ``psycopg2.extras.execute_values``. The ``use_batch_mode`` flag is - superseded by the ``executemany_mode`` flag. - .. _psycopg2_unicode: @@ -474,6 +483,7 @@ from ... import exc from ... import processors from ... import types as sqltypes from ... import util +from ...engine import cursor as _cursor from ...util import collections_abc try: @@ -546,18 +556,12 @@ class _PGHStore(HSTORE): class _PGJSON(JSON): def result_processor(self, dialect, coltype): - if dialect._has_native_json: - return None - else: - return super(_PGJSON, self).result_processor(dialect, coltype) + return None class _PGJSONB(JSONB): def result_processor(self, dialect, coltype): - if dialect._has_native_jsonb: - return None - else: - return super(_PGJSONB, self).result_processor(dialect, coltype) + return None class _PGUUID(UUID): @@ -586,6 +590,8 @@ _server_side_id = util.counter() class PGExecutionContext_psycopg2(PGExecutionContext): + _psycopg2_fetched_rows = None + def create_server_side_cursor(self): # use server-side cursors: # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html @@ -593,6 +599,22 @@ class PGExecutionContext_psycopg2(PGExecutionContext): return self._dbapi_connection.cursor(ident) def post_exec(self): + if ( + self._psycopg2_fetched_rows + and self.compiled + and self.compiled.returning + ): + # psycopg2 execute_values will provide for a real cursor where + # cursor.description works correctly. however, it executes the + # INSERT statement multiple times for multiple pages of rows, so + # while this cursor also supports calling .fetchall() directly, in + # order to get the list of all rows inserted across multiple pages, + # we have to retrieve the aggregated list from the execute_values() + # function directly. + strat_cls = _cursor.FullyBufferedCursorFetchStrategy + self.cursor_fetch_strategy = strat_cls( + self.cursor, initial_buffer=self._psycopg2_fetched_rows + ) self._log_notices(self.cursor) def _log_notices(self, cursor): @@ -621,9 +643,13 @@ class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): pass -EXECUTEMANY_DEFAULT = util.symbol("executemany_default") -EXECUTEMANY_BATCH = util.symbol("executemany_batch") -EXECUTEMANY_VALUES = util.symbol("executemany_values") +EXECUTEMANY_DEFAULT = util.symbol("executemany_default", canonical=0) +EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1) +EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2) +EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol( + "executemany_values_plus_batch", + canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES, +) class PGDialect_psycopg2(PGDialect): @@ -641,17 +667,7 @@ class PGDialect_psycopg2(PGDialect): preparer = PGIdentifierPreparer_psycopg2 psycopg2_version = (0, 0) - FEATURE_VERSION_MAP = dict( - native_json=(2, 5), - native_jsonb=(2, 5, 4), - sane_multi_rowcount=(2, 0, 9), - array_oid=(2, 4, 3), - hstore_adapter=(2, 4), - ) - - _has_native_hstore = False - _has_native_json = False - _has_native_jsonb = False + _has_native_hstore = True engine_config_types = PGDialect.engine_config_types.union( {"use_native_unicode": util.asbool} @@ -671,13 +687,6 @@ class PGDialect_psycopg2(PGDialect): }, ) - @util.deprecated_params( - use_batch_mode=( - "1.3.7", - "The psycopg2 use_batch_mode flag is superseded by " - "executemany_mode='batch'", - ) - ) def __init__( self, server_side_cursors=False, @@ -685,15 +694,16 @@ class PGDialect_psycopg2(PGDialect): client_encoding=None, use_native_hstore=True, use_native_uuid=True, - executemany_mode=None, - executemany_batch_page_size=None, - executemany_values_page_size=None, - use_batch_mode=None, + executemany_mode="values_only", + executemany_batch_page_size=100, + executemany_values_page_size=1000, **kwargs ): PGDialect.__init__(self, **kwargs) self.server_side_cursors = server_side_cursors self.use_native_unicode = use_native_unicode + if not use_native_hstore: + self._has_native_hstore = False self.use_native_hstore = use_native_hstore self.use_native_uuid = use_native_uuid self.supports_unicode_binds = use_native_unicode @@ -706,12 +716,14 @@ class PGDialect_psycopg2(PGDialect): { EXECUTEMANY_DEFAULT: [None], EXECUTEMANY_BATCH: ["batch"], - EXECUTEMANY_VALUES: ["values"], + EXECUTEMANY_VALUES: ["values_only"], + EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"], }, "executemany_mode", ) - if use_batch_mode: - self.executemany_mode = EXECUTEMANY_BATCH + + if self.executemany_mode & EXECUTEMANY_VALUES: + self.insert_executemany_returning = True self.executemany_batch_page_size = executemany_batch_page_size self.executemany_values_page_size = executemany_values_page_size @@ -723,24 +735,21 @@ class PGDialect_psycopg2(PGDialect): int(x) for x in m.group(1, 2, 3) if x is not None ) + if self.psycopg2_version < (2, 7): + raise ImportError( + "psycopg2 version 2.7 or higher is required." + ) + def initialize(self, connection): super(PGDialect_psycopg2, self).initialize(connection) self._has_native_hstore = ( self.use_native_hstore and self._hstore_oids(connection.connection) is not None ) - self._has_native_json = ( - self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_json"] - ) - self._has_native_jsonb = ( - self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_jsonb"] - ) # http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9 - self.supports_sane_multi_rowcount = ( - self.psycopg2_version - >= self.FEATURE_VERSION_MAP["sane_multi_rowcount"] - and self.executemany_mode is EXECUTEMANY_DEFAULT + self.supports_sane_multi_rowcount = not ( + self.executemany_mode & EXECUTEMANY_BATCH ) @classmethod @@ -830,11 +839,7 @@ class PGDialect_psycopg2(PGDialect): kw = {"oid": oid} if util.py2k: kw["unicode"] = True - if ( - self.psycopg2_version - >= self.FEATURE_VERSION_MAP["array_oid"] - ): - kw["array_oid"] = array_oid + kw["array_oid"] = array_oid extras.register_hstore(conn, **kw) fns.append(on_connect) @@ -842,14 +847,12 @@ class PGDialect_psycopg2(PGDialect): if self.dbapi and self._json_deserializer: def on_connect(conn): - if self._has_native_json: - extras.register_default_json( - conn, loads=self._json_deserializer - ) - if self._has_native_jsonb: - extras.register_default_jsonb( - conn, loads=self._json_deserializer - ) + extras.register_default_json( + conn, loads=self._json_deserializer + ) + extras.register_default_jsonb( + conn, loads=self._json_deserializer + ) fns.append(on_connect) @@ -864,12 +867,8 @@ class PGDialect_psycopg2(PGDialect): return None def do_executemany(self, cursor, statement, parameters, context=None): - if self.executemany_mode is EXECUTEMANY_DEFAULT: - cursor.executemany(statement, parameters) - return - if ( - self.executemany_mode is EXECUTEMANY_VALUES + self.executemany_mode & EXECUTEMANY_VALUES and context and context.isinsert and context.compiled.insert_single_values_expr @@ -893,15 +892,17 @@ class PGDialect_psycopg2(PGDialect): kwargs = {"page_size": self.executemany_values_page_size} else: kwargs = {} - self._psycopg2_extras().execute_values( + xtras = self._psycopg2_extras() + context._psycopg2_fetched_rows = xtras.execute_values( cursor, statement, parameters, template=executemany_values, + fetch=bool(context.compiled.returning), **kwargs ) - else: + elif self.executemany_mode & EXECUTEMANY_BATCH: if self.executemany_batch_page_size: kwargs = {"page_size": self.executemany_batch_page_size} else: @@ -909,15 +910,19 @@ class PGDialect_psycopg2(PGDialect): self._psycopg2_extras().execute_batch( cursor, statement, parameters, **kwargs ) + else: + cursor.executemany(statement, parameters) @util.memoized_instancemethod def _hstore_oids(self, conn): - if self.psycopg2_version >= self.FEATURE_VERSION_MAP["hstore_adapter"]: - extras = self._psycopg2_extras() - oids = extras.HstoreAdapter.get_oids(conn) - if oids is not None and oids[0]: - return oids[0:2] - return None + extras = self._psycopg2_extras() + if hasattr(conn, "connection"): + conn = conn.connection + oids = extras.HstoreAdapter.get_oids(conn) + if oids is not None and oids[0]: + return oids[0:2] + else: + return None def create_connect_args(self, url): opts = url.translate_connect_args(username="user") diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 81c0c9f58..c73f89a2b 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1163,10 +1163,10 @@ class Connection(Connectable): # ensure we don't retain a link to the view object for keys() # which links to the values, which we don't want to cache keys = sorted(distilled_params[0]) - inline = len(distilled_params) > 1 + for_executemany = len(distilled_params) > 1 else: keys = [] - inline = False + for_executemany = False dialect = self.dialect @@ -1182,7 +1182,7 @@ class Connection(Connectable): dialect=dialect, compiled_cache=compiled_cache, column_keys=keys, - inline=inline, + for_executemany=for_executemany, schema_translate_map=schema_translate_map, linting=self.dialect.compiler_linting | compiler.WARN_LINTING, ) diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index abffe0d1f..65cd92e6f 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -1077,7 +1077,7 @@ class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): __slots__ = ("_rowbuffer", "alternate_cursor_description") def __init__( - self, dbapi_cursor, alternate_description, initial_buffer=None + self, dbapi_cursor, alternate_description=None, initial_buffer=None ): self.alternate_cursor_description = alternate_description if initial_buffer is not None: @@ -1304,7 +1304,37 @@ class BaseCursorResult(object): self.connection._safe_close_cursor(cursor) self._soft_closed = True - @util.memoized_property + @property + def inserted_primary_key_rows(self): + """Return a list of tuples, each containing the primary key for each row + just inserted. + + Usually, this method will return at most a list with a single + entry which is the same row one would get back from + :attr:`_engine.CursorResult.inserted_primary_key`. To support + "executemany with INSERT" mode, multiple rows can be part of the + list returned. + + .. versionadded:: 1.4 + + """ + if not self.context.compiled: + raise exc.InvalidRequestError( + "Statement is not a compiled " "expression construct." + ) + elif not self.context.isinsert: + raise exc.InvalidRequestError( + "Statement is not an insert() " "expression construct." + ) + elif self.context._is_explicit_returning: + raise exc.InvalidRequestError( + "Can't call inserted_primary_key " + "when returning() " + "is used." + ) + return self.context.inserted_primary_key_rows + + @property def inserted_primary_key(self): """Return the primary key for the row just inserted. @@ -1331,22 +1361,18 @@ class BaseCursorResult(object): """ - if not self.context.compiled: + if self.context.executemany: raise exc.InvalidRequestError( - "Statement is not a compiled " "expression construct." - ) - elif not self.context.isinsert: - raise exc.InvalidRequestError( - "Statement is not an insert() " "expression construct." - ) - elif self.context._is_explicit_returning: - raise exc.InvalidRequestError( - "Can't call inserted_primary_key " - "when returning() " - "is used." + "This statement was an executemany call; if primary key " + "returning is supported, please " + "use .inserted_primary_key_rows." ) - return self.context.inserted_primary_key + ikp = self.inserted_primary_key_rows + if ikp: + return ikp[0] + else: + return None def last_updated_params(self): """Return the collection of updated parameters from this @@ -1393,6 +1419,19 @@ class BaseCursorResult(object): return self.context.compiled_parameters[0] @property + def returned_defaults_rows(self): + """Return a list of rows each containing the values of default + columns that were fetched using + the :meth:`.ValuesBase.return_defaults` feature. + + The return value is a list of :class:`.Row` objects. + + .. versionadded:: 1.4 + + """ + return self.context.returned_default_rows + + @property def returned_defaults(self): """Return the values of default columns that were fetched using the :meth:`.ValuesBase.return_defaults` feature. @@ -1408,7 +1447,18 @@ class BaseCursorResult(object): :meth:`.ValuesBase.return_defaults` """ - return self.context.returned_defaults + + if self.context.executemany: + raise exc.InvalidRequestError( + "This statement was an executemany call; if return defaults " + "is supported, please use .returned_defaults_rows." + ) + + rows = self.context.returned_default_rows + if rows: + return rows[0] + else: + return None def lastrow_has_defaults(self): """Return ``lastrow_has_defaults()`` from the underlying diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 1a8dbb4cd..414a1a9ab 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -68,6 +68,7 @@ class DefaultDialect(interfaces.Dialect): postfetch_lastrowid = True implicit_returning = False full_returning = False + insert_executemany_returning = False cte_follows_insert = False @@ -705,7 +706,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): compiled = None statement = None result_column_struct = None - returned_defaults = None + returned_default_rows = None execution_options = util.immutabledict() cursor_fetch_strategy = _cursor._DEFAULT_FETCH @@ -1323,12 +1324,14 @@ class DefaultExecutionContext(interfaces.ExecutionContext): if self.isinsert: if self._is_implicit_returning: - row = result.fetchone() - self.returned_defaults = row - self._setup_ins_pk_from_implicit_returning(row) + rows = result.all() - # test that it has a cursor metadata that is accurate. - # the first row will have been fetched and current assumptions + self.returned_default_rows = rows + + self._setup_ins_pk_from_implicit_returning(result, rows) + + # test that it has a cursor metadata that is accurate. the + # first row will have been fetched and current assumptions # are that the result has only one row, until executemany() # support is added here. assert result._metadata.returns_rows @@ -1344,7 +1347,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): elif self.isupdate and self._is_implicit_returning: row = result.fetchone() - self.returned_defaults = row + self.returned_default_rows = [row] result._soft_close() # test that it has a cursor metadata that is accurate. @@ -1360,61 +1363,33 @@ class DefaultExecutionContext(interfaces.ExecutionContext): return result def _setup_ins_pk_from_lastrowid(self): - key_getter = self.compiled._key_getters_for_crud_column[2] - table = self.compiled.statement.table - compiled_params = self.compiled_parameters[0] + + getter = self.compiled._inserted_primary_key_from_lastrowid_getter lastrowid = self.get_lastrowid() - if lastrowid is not None: - autoinc_col = table._autoincrement_column - if autoinc_col is not None: - # apply type post processors to the lastrowid - proc = autoinc_col.type._cached_result_processor( - self.dialect, None - ) - if proc is not None: - lastrowid = proc(lastrowid) - self.inserted_primary_key = [ - lastrowid - if c is autoinc_col - else compiled_params.get(key_getter(c), None) - for c in table.primary_key - ] - else: - # don't have a usable lastrowid, so - # do the same as _setup_ins_pk_from_empty - self.inserted_primary_key = [ - compiled_params.get(key_getter(c), None) - for c in table.primary_key - ] + self.inserted_primary_key_rows = [ + getter(lastrowid, self.compiled_parameters[0]) + ] def _setup_ins_pk_from_empty(self): - key_getter = self.compiled._key_getters_for_crud_column[2] - table = self.compiled.statement.table - compiled_params = self.compiled_parameters[0] - self.inserted_primary_key = [ - compiled_params.get(key_getter(c), None) for c in table.primary_key + + getter = self.compiled._inserted_primary_key_from_lastrowid_getter + + self.inserted_primary_key_rows = [ + getter(None, self.compiled_parameters[0]) ] - def _setup_ins_pk_from_implicit_returning(self, row): - if row is None: - self.inserted_primary_key = None + def _setup_ins_pk_from_implicit_returning(self, result, rows): + + if not rows: + self.inserted_primary_key_rows = [] return - key_getter = self.compiled._key_getters_for_crud_column[2] - table = self.compiled.statement.table - compiled_params = self.compiled_parameters[0] - - # TODO: why are we using keyed index here? can't we get the ints? - # can compiler build up the structure here as far as what was - # explicit and what comes back in returning? - row_mapping = row._mapping - self.inserted_primary_key = [ - row_mapping[col] if value is None else value - for col, value in [ - (col, compiled_params.get(key_getter(col), None)) - for col in table.primary_key - ] + getter = self.compiled._inserted_primary_key_from_returning_getter + compiled_params = self.compiled_parameters + + self.inserted_primary_key_rows = [ + getter(row, param) for row, param in zip(rows, compiled_params) ] def lastrow_has_defaults(self): diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py index bd8efe77f..88524dc49 100644 --- a/lib/sqlalchemy/orm/persistence.py +++ b/lib/sqlalchemy/orm/persistence.py @@ -1146,7 +1146,7 @@ def _emit_insert_statements( statement, params ) - primary_key = result.context.inserted_primary_key + primary_key = result.inserted_primary_key if primary_key is not None: # set primary key attributes for pk, col in zip( @@ -1527,7 +1527,7 @@ def _postfetch( load_evt_attrs = [] if returning_cols: - row = result.context.returned_defaults + row = result.returned_defaults if row is not None: for row_value, col in zip(row, returning_cols): # pk cols returned from insert are handled diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 61178291a..e09e60c2c 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -676,13 +676,15 @@ class SQLCompiler(Compiled): """ + inline = False + def __init__( self, dialect, statement, cache_key=None, column_keys=None, - inline=False, + for_executemany=False, linting=NO_LINTING, **kwargs ): @@ -695,8 +697,13 @@ class SQLCompiler(Compiled): :param column_keys: a list of column names to be compiled into an INSERT or UPDATE statement. - :param inline: whether to generate INSERT statements as "inline", e.g. - not formatted to return any generated defaults + :param for_executemany: whether INSERT / UPDATE statements should + expect that they are to be invoked in an "executemany" style, + which may impact how the statement will be expected to return the + values of defaults and autoincrement / sequences and similar. + Depending on the backend and driver in use, support for retreiving + these values may be disabled which means SQL expressions may + be rendered inline, RETURNING may not be rendered, etc. :param kwargs: additional keyword arguments to be consumed by the superclass. @@ -709,9 +716,10 @@ class SQLCompiler(Compiled): if cache_key: self._cache_key_bind_match = {b: b for b in cache_key[1]} - # compile INSERT/UPDATE defaults/sequences inlined (no pre- - # execute) - self.inline = inline or getattr(statement, "_inline", False) + # compile INSERT/UPDATE defaults/sequences to expect executemany + # style execution, which may mean no pre-execute of defaults, + # or no RETURNING + self.for_executemany = for_executemany self.linting = linting @@ -755,10 +763,21 @@ class SQLCompiler(Compiled): Compiled.__init__(self, dialect, statement, **kwargs) - if ( - self.isinsert or self.isupdate or self.isdelete - ) and statement._returning: - self.returning = statement._returning + if self.isinsert or self.isupdate or self.isdelete: + if statement._returning: + self.returning = statement._returning + + if self.isinsert or self.isupdate: + if statement._inline: + self.inline = True + elif self.for_executemany and ( + not self.isinsert + or ( + self.dialect.insert_executemany_returning + and statement._return_defaults + ) + ): + self.inline = True if self.positional and self._numeric_binds: self._apply_numbered_params() @@ -1088,6 +1107,61 @@ class SQLCompiler(Compiled): self._result_columns ) + @util.memoized_property + def _inserted_primary_key_from_lastrowid_getter(self): + key_getter = self._key_getters_for_crud_column[2] + table = self.statement.table + + getters = [ + (operator.methodcaller("get", key_getter(col), None), col) + for col in table.primary_key + ] + + autoinc_col = table._autoincrement_column + if autoinc_col is not None: + # apply type post processors to the lastrowid + proc = autoinc_col.type._cached_result_processor( + self.dialect, None + ) + else: + proc = None + + def get(lastrowid, parameters): + if proc is not None: + lastrowid = proc(lastrowid) + + if lastrowid is None: + return tuple(getter(parameters) for getter, col in getters) + else: + return tuple( + lastrowid if col is autoinc_col else getter(parameters) + for getter, col in getters + ) + + return get + + @util.memoized_property + def _inserted_primary_key_from_returning_getter(self): + key_getter = self._key_getters_for_crud_column[2] + table = self.statement.table + + ret = {col: idx for idx, col in enumerate(self.returning)} + + getters = [ + (operator.itemgetter(ret[col]), True) + if col in ret + else (operator.methodcaller("get", key_getter(col), None), False) + for col in table.primary_key + ] + + def get(row, parameters): + return tuple( + getter(row) if use_row else getter(parameters) + for getter, use_row in getters + ) + + return get + def default_from(self): """Called when a SELECT statement has no froms, and no FROM clause is to be appended. diff --git a/lib/sqlalchemy/sql/crud.py b/lib/sqlalchemy/sql/crud.py index 625183db3..c80d95a2c 100644 --- a/lib/sqlalchemy/sql/crud.py +++ b/lib/sqlalchemy/sql/crud.py @@ -781,7 +781,14 @@ def _get_returning_modifiers(compiler, stmt, compile_state): need_pks = ( compile_state.isinsert - and not compiler.inline + and not stmt._inline + and ( + not compiler.for_executemany + or ( + compiler.dialect.insert_executemany_returning + and stmt._return_defaults + ) + ) and not stmt._returning and not compile_state._has_multi_parameters ) diff --git a/lib/sqlalchemy/sql/dml.py b/lib/sqlalchemy/sql/dml.py index 50b2a935a..4e8430450 100644 --- a/lib/sqlalchemy/sql/dml.py +++ b/lib/sqlalchemy/sql/dml.py @@ -693,18 +693,18 @@ class ValuesBase(UpdateBase): This method differs from :meth:`.UpdateBase.returning` in these ways: - 1. :meth:`.ValuesBase.return_defaults` is only intended for use with - an INSERT or an UPDATE statement that matches exactly one row. - While the RETURNING construct in the general sense supports - multiple rows for a multi-row UPDATE or DELETE statement, or for - special cases of INSERT that return multiple rows (e.g. INSERT from - SELECT, multi-valued VALUES clause), + 1. :meth:`.ValuesBase.return_defaults` is only intended for use with an + INSERT or an UPDATE statement that matches exactly one row per + parameter set. While the RETURNING construct in the general sense + supports multiple rows for a multi-row UPDATE or DELETE statement, + or for special cases of INSERT that return multiple rows (e.g. + INSERT from SELECT, multi-valued VALUES clause), :meth:`.ValuesBase.return_defaults` is intended only for an - "ORM-style" single-row INSERT/UPDATE statement. The row returned - by the statement is also consumed implicitly when + "ORM-style" single-row INSERT/UPDATE statement. The row + returned by the statement is also consumed implicitly when :meth:`.ValuesBase.return_defaults` is used. By contrast, - :meth:`.UpdateBase.returning` leaves the RETURNING result-set - intact with a collection of any number of rows. + :meth:`.UpdateBase.returning` leaves the RETURNING result-set intact + with a collection of any number of rows. 2. It is compatible with the existing logic to fetch auto-generated primary key values, also known as "implicit returning". Backends @@ -718,6 +718,16 @@ class ValuesBase(UpdateBase): an exception. The return value of :attr:`_engine.CursorResult.returned_defaults` will be ``None`` + 4. An INSERT statement invoked with executemany() is supported if the + backend database driver supports the + ``insert_executemany_returning`` feature, currently this includes + PostgreSQL with psycopg2. When executemany is used, the + :attr:`_engine.CursorResult.returned_defaults_rows` and + :attr:`_engine.CursorResult.inserted_primary_key_rows` accessors + will return the inserted defaults and primary keys. + + .. versionadded:: 1.4 + :meth:`.ValuesBase.return_defaults` is used by the ORM to provide an efficient implementation for the ``eager_defaults`` feature of :func:`.mapper`. @@ -735,6 +745,12 @@ class ValuesBase(UpdateBase): :attr:`_engine.CursorResult.returned_defaults` + :attr:`_engine.CursorResult.returned_defaults_rows` + + :attr:`_engine.CursorResult.inserted_primary_key` + + :attr:`_engine.CursorResult.inserted_primary_key_rows` + """ self._return_defaults = cols or True diff --git a/lib/sqlalchemy/sql/elements.py b/lib/sqlalchemy/sql/elements.py index 60c816ee6..5cb411b85 100644 --- a/lib/sqlalchemy/sql/elements.py +++ b/lib/sqlalchemy/sql/elements.py @@ -482,14 +482,6 @@ class ClauseElement( 's bound engine, if any. - :param inline: Used for INSERT statements, for a dialect which does - not support inline retrieval of newly generated primary key - columns, will force the expression used to create the new primary - key value to be rendered inline within the INSERT statement's - VALUES clause. This typically refers to Sequence execution but may - also refer to any server-side default generation function - associated with a primary key `Column`. - :param compile_kwargs: optional dictionary of additional parameters that will be passed through to the compiler within all "visit" methods. This allows any custom flag to be passed through to @@ -528,7 +520,7 @@ class ClauseElement( dialect, compiled_cache=None, column_keys=None, - inline=False, + for_executemany=False, schema_translate_map=None, **kw ): @@ -546,7 +538,7 @@ class ClauseElement( cache_key, tuple(column_keys), bool(schema_translate_map), - inline, + for_executemany, ) compiled_sql = compiled_cache.get(key) @@ -555,7 +547,7 @@ class ClauseElement( dialect, cache_key=elem_cache_key, column_keys=column_keys, - inline=inline, + for_executemany=for_executemany, schema_translate_map=schema_translate_map, **kw ) @@ -568,7 +560,7 @@ class ClauseElement( dialect, cache_key=elem_cache_key, column_keys=column_keys, - inline=inline, + for_executemany=for_executemany, schema_translate_map=schema_translate_map, **kw ) diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py index 54da06a3d..1ea366dac 100644 --- a/lib/sqlalchemy/testing/assertions.py +++ b/lib/sqlalchemy/testing/assertions.py @@ -355,7 +355,6 @@ class AssertsCompiledSQL(object): schema_translate_map=None, render_schema_translate=False, default_schema_name=None, - inline_flag=None, ): if use_default_dialect: dialect = default.DefaultDialect() @@ -451,8 +450,6 @@ class AssertsCompiledSQL(object): }, check_post_param, ) - if inline_flag is not None: - eq_(c.inline, inline_flag) class ComparesTables(object): diff --git a/lib/sqlalchemy/testing/assertsql.py b/lib/sqlalchemy/testing/assertsql.py index 48cbb4694..ef324635e 100644 --- a/lib/sqlalchemy/testing/assertsql.py +++ b/lib/sqlalchemy/testing/assertsql.py @@ -106,7 +106,7 @@ class CompiledSQL(SQLMatchRule): compiled = execute_observed.clauseelement.compile( dialect=compare_dialect, column_keys=context.compiled.column_keys, - inline=context.compiled.inline, + for_executemany=context.compiled.for_executemany, schema_translate_map=map_, ) _received_statement = re.sub(r"[\n\t]", "", util.text_type(compiled)) diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index 163276ca9..3e20f8681 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -324,6 +324,20 @@ class SuiteRequirements(Requirements): ) @property + def insert_executemany_returning(self): + """target platform supports RETURNING when INSERT is used with + executemany(), e.g. multiple parameter sets, indicating + as many rows come back as do parameter sets were passed. + + """ + + return exclusions.only_if( + lambda config: config.db.dialect.insert_executemany_returning, + "%(database)s %(does_support)s 'RETURNING of " + "multiple rows with INSERT executemany'", + ) + + @property def returning(self): """target platform supports RETURNING for at least one row. diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py index 65741941f..5b8c343c4 100644 --- a/lib/sqlalchemy/testing/suite/test_insert.py +++ b/lib/sqlalchemy/testing/suite/test_insert.py @@ -56,7 +56,7 @@ class LastrowidTest(fixtures.TablesTest): self.tables.autoinc_pk.insert(), data="some data" ) pk = connection.scalar(select([self.tables.autoinc_pk.c.id])) - eq_(r.inserted_primary_key, [pk]) + eq_(r.inserted_primary_key, (pk,)) @requirements.dbapi_lastrowid def test_native_lastrowid_autoinc(self, connection): @@ -184,7 +184,7 @@ class InsertBehaviorTest(fixtures.TablesTest): ) ) - eq_(result.inserted_primary_key, [None]) + eq_(result.inserted_primary_key, (None,)) result = connection.execute( select([dest_table.c.data]).order_by(dest_table.c.data) @@ -204,7 +204,7 @@ class InsertBehaviorTest(fixtures.TablesTest): ), ) ) - eq_(result.inserted_primary_key, [None]) + eq_(result.inserted_primary_key, (None,)) result = connection.execute( select([dest_table.c.data]).order_by(dest_table.c.data) @@ -329,7 +329,7 @@ class ReturningTest(fixtures.TablesTest): self.tables.autoinc_pk.insert(), data="some data" ) pk = connection.scalar(select([self.tables.autoinc_pk.c.id])) - eq_(r.inserted_primary_key, [pk]) + eq_(r.inserted_primary_key, (pk,)) __all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest") diff --git a/lib/sqlalchemy/testing/suite/test_sequence.py b/lib/sqlalchemy/testing/suite/test_sequence.py index 55e8e8406..5a1876bc5 100644 --- a/lib/sqlalchemy/testing/suite/test_sequence.py +++ b/lib/sqlalchemy/testing/suite/test_sequence.py @@ -46,7 +46,9 @@ class SequenceTest(fixtures.TablesTest): def test_insert_lastrowid(self, connection): r = connection.execute(self.tables.seq_pk.insert(), data="some data") - eq_(r.inserted_primary_key, [testing.db.dialect.default_sequence_base]) + eq_( + r.inserted_primary_key, (testing.db.dialect.default_sequence_base,) + ) def test_nextval_direct(self, connection): r = connection.execute(self.tables.seq_pk.c.id.default) @@ -57,7 +59,7 @@ class SequenceTest(fixtures.TablesTest): r = connection.execute( self.tables.seq_opt_pk.insert(), data="some data" ) - eq_(r.inserted_primary_key, [1]) + eq_(r.inserted_primary_key, (1,)) def _assert_round_trip(self, table, conn): row = conn.execute(table.select()).first() |
