summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/build/changelog/changelog_10.rst11
-rw-r--r--doc/build/changelog/changelog_11.rst51
-rw-r--r--doc/build/changelog/migration_11.rst164
-rw-r--r--doc/build/orm/tutorial.rst16
-rw-r--r--lib/sqlalchemy/dialects/firebird/base.py2
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py2
-rw-r--r--lib/sqlalchemy/dialects/mysql/base.py31
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py5
-rw-r--r--lib/sqlalchemy/dialects/sqlite/base.py53
-rw-r--r--lib/sqlalchemy/orm/query.py23
-rw-r--r--lib/sqlalchemy/sql/compiler.py2
-rw-r--r--lib/sqlalchemy/sql/crud.py91
-rw-r--r--lib/sqlalchemy/sql/schema.py158
-rw-r--r--lib/sqlalchemy/testing/provision.py31
-rw-r--r--lib/sqlalchemy/testing/schema.py5
-rw-r--r--test/dialect/mysql/test_compiler.py8
-rw-r--r--test/dialect/mysql/test_reflection.py34
-rw-r--r--test/dialect/postgresql/test_query.py65
-rw-r--r--test/dialect/test_oracle.py26
-rw-r--r--test/dialect/test_sqlite.py85
-rw-r--r--test/engine/test_reflection.py19
-rw-r--r--test/ext/declarative/test_basic.py3
-rw-r--r--test/orm/test_composites.py3
-rw-r--r--test/orm/test_query.py3
-rw-r--r--test/orm/test_relationships.py11
-rw-r--r--test/orm/test_unitofwork.py2
-rw-r--r--test/requirements.py1
-rw-r--r--test/sql/test_compiler.py15
-rw-r--r--test/sql/test_defaults.py23
-rw-r--r--test/sql/test_insert.py100
-rw-r--r--test/sql/test_insert_exec.py445
-rw-r--r--test/sql/test_metadata.py117
-rw-r--r--test/sql/test_query.py1420
-rw-r--r--test/sql/test_resultset.py1136
34 files changed, 2534 insertions, 1627 deletions
diff --git a/doc/build/changelog/changelog_10.rst b/doc/build/changelog/changelog_10.rst
index ef125eecf..a3b3e0092 100644
--- a/doc/build/changelog/changelog_10.rst
+++ b/doc/build/changelog/changelog_10.rst
@@ -19,6 +19,17 @@
:version: 1.0.9
.. change::
+ :tags: bug, oracle
+ :versions: 1.1.0b1
+ :tickets: 3548
+
+ Fixed bug in Oracle dialect where reflection of tables and other
+ symbols with names quoted to force all-lower-case would not be
+ identified properly in reflection queries. The :class:`.quoted_name`
+ construct is now applied to incoming symbol names that detect as
+ forced into all-lower-case within the "name normalize" process.
+
+ .. change::
:tags: feature, orm
:versions: 1.1.0b1
:pullreq: github:201
diff --git a/doc/build/changelog/changelog_11.rst b/doc/build/changelog/changelog_11.rst
index e37fd1a69..dcd43f28d 100644
--- a/doc/build/changelog/changelog_11.rst
+++ b/doc/build/changelog/changelog_11.rst
@@ -22,6 +22,57 @@
:version: 1.1.0b1
.. change::
+ :tags: change, sql, mysql
+ :tickets: 3216
+
+ The system by which a :class:`.Column` considers itself to be an
+ "auto increment" column has been changed, such that autoincrement
+ is no longer implicitly enabled for a :class:`.Table` that has a
+ composite primary key. In order to accommodate being able to enable
+ autoincrement for a composite PK member column while at the same time
+ maintaining SQLAlchemy's long standing behavior of enabling
+ implicit autoincrement for a single integer primary key, a third
+ state has been added to the :paramref:`.Column.autoincrement` parameter
+ ``"auto"``, which is now the default.
+
+ .. seealso::
+
+ :ref:`change_3216`
+
+ :ref:`change_mysql_3216`
+
+ .. change::
+ :tags: change, mysql
+ :tickets: 3216
+
+ The MySQL dialect no longer generates an extra "KEY" directive when
+ generating CREATE TABLE DDL for a table using InnoDB with a
+ composite primary key with AUTO_INCREMENT on a column that isn't the
+ first column; to overcome InnoDB's limitation here, the PRIMARY KEY
+ constraint is now generated with the AUTO_INCREMENT column placed
+ first in the list of columns.
+
+ .. seealso::
+
+ :ref:`change_mysql_3216`
+
+ :ref:`change_3216`
+
+ .. change::
+ :tags: change, sqlite
+ :pullreq: github:198
+
+ Added support to the SQLite dialect for the
+ :meth:`.Inspector.get_schema_names` method to work with SQLite;
+ pull request courtesy Brian Van Klaveren. Also repaired support
+ for creation of indexes with schemas as well as reflection of
+ foreign key constraints in schema-bound tables.
+
+ .. seealso::
+
+ :ref:`change_sqlite_schemas`
+
+ .. change::
:tags: change, mssql
:tickets: 3434
diff --git a/doc/build/changelog/migration_11.rst b/doc/build/changelog/migration_11.rst
index ca6c44165..5b7c8321a 100644
--- a/doc/build/changelog/migration_11.rst
+++ b/doc/build/changelog/migration_11.rst
@@ -16,7 +16,7 @@ What's New in SQLAlchemy 1.1?
some issues may be moved to later milestones in order to allow
for a timely release.
- Document last updated: September 19, 2015
+ Document last updated: October 7, 2015
Introduction
============
@@ -256,6 +256,103 @@ configuration of the existing object-level technique of assigning
New Features and Improvements - Core
====================================
+.. _change_3216:
+
+The ``.autoincrement`` directive is no longer implicitly enabled for a composite primary key column
+---------------------------------------------------------------------------------------------------
+
+SQLAlchemy has always had the convenience feature of enabling the backend database's
+"autoincrement" feature for a single-column integer primary key; by "autoincrement"
+we mean that the database column will include whatever DDL directives the
+database provides in order to indicate an auto-incrementing integer identifier,
+such as the SERIAL keyword on Postgresql or AUTO_INCREMENT on MySQL, and additionally
+that the dialect will recieve these generated values from the execution
+of a :meth:`.Table.insert` construct using techniques appropriate to that
+backend.
+
+What's changed is that this feature no longer turns on automatically for a
+*composite* primary key; previously, a table definition such as::
+
+ Table(
+ 'some_table', metadata,
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True)
+ )
+
+Would have "autoincrement" semantics applied to the ``'x'`` column, only
+because it's first in the list of primary key columns. In order to
+disable this, one would have to turn off ``autoincrement`` on all columns::
+
+ # old way
+ Table(
+ 'some_table', metadata,
+ Column('x', Integer, primary_key=True, autoincrement=False),
+ Column('y', Integer, primary_key=True, autoincrement=False)
+ )
+
+With the new behavior, the composite primary key will not have autoincrement
+semantics unless a column is marked explcitly with ``autoincrement=True``::
+
+ # column 'y' will be SERIAL/AUTO_INCREMENT/ auto-generating
+ Table(
+ 'some_table', metadata,
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True, autoincrement=True)
+ )
+
+In order to anticipate some potential backwards-incompatible scenarios,
+the :meth:`.Table.insert` construct will perform more thorough checks
+for missing primary key values on composite primary key columns that don't
+have autoincrement set up; given a table such as::
+
+ Table(
+ 'b', metadata,
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True)
+ )
+
+An INSERT emitted with no values for this table will produce the exception::
+
+ CompileError: Column 'b.x' is marked as a member of the primary
+ key for table 'b', but has no Python-side or server-side default
+ generator indicated, nor does it indicate 'autoincrement=True',
+ and no explicit value is passed. Primary key columns may not
+ store NULL. Note that as of SQLAlchemy 1.1, 'autoincrement=True'
+ must be indicated explicitly for composite (e.g. multicolumn)
+ primary keys if AUTO_INCREMENT/SERIAL/IDENTITY behavior is
+ expected for one of the columns in the primary key. CREATE TABLE
+ statements are impacted by this change as well on most backends.
+
+For a column that is receiving primary key values from a server-side
+default or something less common such as a trigger, the presence of a
+value generator can be indicated using :class:`.FetchedValue`::
+
+ Table(
+ 'b', metadata,
+ Column('x', Integer, primary_key=True, server_default=FetchedValue()),
+ Column('y', Integer, primary_key=True, server_default=FetchedValue())
+ )
+
+For the very unlikely case where a composite primary key is actually intended
+to store NULL in one or more of its columns (only supported on SQLite and MySQL),
+specify the column with ``nullable=True``::
+
+ Table(
+ 'b', metadata,
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True, nullable=True)
+ )
+
+In a related change, the ``autoincrement`` flag may be set to True
+on a column that has a client-side or server-side default. This typically
+will not have much impact on the behavior of the column during an INSERT.
+
+
+.. seealso::
+
+ :ref:`change_mysql_3216`
+
+:ticket:`3216`
.. _change_2528:
@@ -787,10 +884,75 @@ emits::
Dialect Improvements and Changes - MySQL
=============================================
+.. _change_mysql_3216:
+
+No more generation of an implicit KEY for composite primary key w/ AUTO_INCREMENT
+---------------------------------------------------------------------------------
+
+The MySQL dialect had the behavior such that if a composite primary key
+on an InnoDB table featured AUTO_INCREMENT on one of its columns which was
+not the first column, e.g.::
+
+ t = Table(
+ 'some_table', metadata,
+ Column('x', Integer, primary_key=True, autoincrement=False),
+ Column('y', Integer, primary_key=True, autoincrement=True),
+ mysql_engine='InnoDB'
+ )
+
+DDL such as the following would be generated::
+
+ CREATE TABLE some_table (
+ x INTEGER NOT NULL,
+ y INTEGER NOT NULL AUTO_INCREMENT,
+ PRIMARY KEY (x, y),
+ KEY idx_autoinc_y (y)
+ )ENGINE=InnoDB
+
+Note the above "KEY" with an auto-generated name; this is a change that
+found its way into the dialect many years ago in response to the issue that
+the AUTO_INCREMENT would otherwise fail on InnoDB without this additional KEY.
+
+This workaround has been removed and replaced with the much better system
+of just stating the AUTO_INCREMENT column *first* within the primary key::
+
+ CREATE TABLE some_table (
+ x INTEGER NOT NULL,
+ y INTEGER NOT NULL AUTO_INCREMENT,
+ PRIMARY KEY (y, x)
+ )ENGINE=InnoDB
+
+Along with the change :ref:`change_3216`, composite primary keys with
+or without auto increment are now easier to specify;
+:paramref:`.Column.autoincrement`
+now defaults to the value ``"auto"`` and the ``autoincrement=False``
+directives are no longer needed::
+
+ t = Table(
+ 'some_table', metadata,
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True, autoincrement=True),
+ mysql_engine='InnoDB'
+ )
+
+
Dialect Improvements and Changes - SQLite
=============================================
+.. _change_sqlite_schemas:
+
+Improved Support for Remote Schemas
+------------------------------------
+
+The SQLite dialect now implements :meth:`.Inspector.get_schema_names`
+and additionally has improved support for tables and indexes that are
+created and reflected from a remote schema, which in SQLite is a
+database that is assigned a name via the ``ATTACH`` statement; previously,
+the ``CREATE INDEX`` DDL didn't work correctly for a schema-bound table
+and the :meth:`.Inspector.get_foreign_keys` method will now indicate the
+given schema in the results. Cross-schema foreign keys aren't supported.
+
Dialect Improvements and Changes - SQL Server
=============================================
diff --git a/doc/build/orm/tutorial.rst b/doc/build/orm/tutorial.rst
index ed8d05534..dff181f6b 100644
--- a/doc/build/orm/tutorial.rst
+++ b/doc/build/orm/tutorial.rst
@@ -795,11 +795,17 @@ Here's a rundown of some of the most common operators used in
# or chain multiple filter()/filter_by() calls
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
+ .. note:: Make sure you use :func:`.and_` and **not** the
+ Python ``and`` operator!
+
* :func:`OR <.sql.expression.or_>`::
from sqlalchemy import or_
query.filter(or_(User.name == 'ed', User.name == 'wendy'))
+ .. note:: Make sure you use :func:`.or_` and **not** the
+ Python ``or`` operator!
+
* :meth:`MATCH <.ColumnOperators.match>`::
query.filter(User.name.match('wendy'))
@@ -1350,6 +1356,16 @@ The reference documentation for :meth:`~.Query.join` contains detailed informati
and examples of the calling styles accepted by this method; :meth:`~.Query.join`
is an important method at the center of usage for any SQL-fluent application.
+.. topic:: What does :class:`.Query` select from if there's multiple entities?
+
+ The :meth:`.Query.join` method will **typically join from the leftmost
+ item** in the list of entities, when the ON clause is omitted, or if the
+ ON clause is a plain SQL expression. To control the first entity in the list
+ of JOINs, use the :meth:`.Query.select_from` method::
+
+ query = Session.query(User, Address).select_from(Address).join(User)
+
+
.. _ormtutorial_aliases:
Using Aliases
diff --git a/lib/sqlalchemy/dialects/firebird/base.py b/lib/sqlalchemy/dialects/firebird/base.py
index c34829cd3..acd419e85 100644
--- a/lib/sqlalchemy/dialects/firebird/base.py
+++ b/lib/sqlalchemy/dialects/firebird/base.py
@@ -648,7 +648,7 @@ class FBDialect(default.DefaultDialect):
'type': coltype,
'nullable': not bool(row['null_flag']),
'default': defvalue,
- 'autoincrement': defvalue is None
+ 'autoincrement': 'auto',
}
if orig_colname.lower() == orig_colname:
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index e4f9ac3de..37e798014 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -186,7 +186,7 @@ CREATE TABLE statement for this column will yield::
LIMIT/OFFSET Support
--------------------
-MSSQL has no support for the LIMIT or OFFSET keysowrds. LIMIT is
+MSSQL has no support for the LIMIT or OFFSET keywords. LIMIT is
supported directly through the ``TOP`` Transact SQL keyword::
select.limit
diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py
index 4b3e5bcd1..2c78de2fc 100644
--- a/lib/sqlalchemy/dialects/mysql/base.py
+++ b/lib/sqlalchemy/dialects/mysql/base.py
@@ -1916,38 +1916,7 @@ class MySQLCompiler(compiler.SQLCompiler):
return None
-# ug. "InnoDB needs indexes on foreign keys and referenced keys [...].
-# Starting with MySQL 4.1.2, these indexes are created automatically.
-# In older versions, the indexes must be created explicitly or the
-# creation of foreign key constraints fails."
-
class MySQLDDLCompiler(compiler.DDLCompiler):
- def create_table_constraints(self, table, **kw):
- """Get table constraints."""
- constraint_string = super(
- MySQLDDLCompiler, self).create_table_constraints(table, **kw)
-
- # why self.dialect.name and not 'mysql'? because of drizzle
- is_innodb = 'engine' in table.dialect_options[self.dialect.name] and \
- table.dialect_options[self.dialect.name][
- 'engine'].lower() == 'innodb'
-
- auto_inc_column = table._autoincrement_column
-
- if is_innodb and \
- auto_inc_column is not None and \
- auto_inc_column is not list(table.primary_key)[0]:
- if constraint_string:
- constraint_string += ", \n\t"
- constraint_string += "KEY %s (%s)" % (
- self.preparer.quote(
- "idx_autoinc_%s" % auto_inc_column.name
- ),
- self.preparer.format_column(auto_inc_column)
- )
-
- return constraint_string
-
def get_column_specification(self, column, **kw):
"""Builds column DDL."""
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
index c605bd510..82ec72f2b 100644
--- a/lib/sqlalchemy/dialects/oracle/base.py
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -287,6 +287,7 @@ from sqlalchemy import util, sql
from sqlalchemy.engine import default, reflection
from sqlalchemy.sql import compiler, visitors, expression
from sqlalchemy.sql import operators as sql_operators
+from sqlalchemy.sql.elements import quoted_name
from sqlalchemy import types as sqltypes, schema as sa_schema
from sqlalchemy.types import VARCHAR, NVARCHAR, CHAR, \
BLOB, CLOB, TIMESTAMP, FLOAT
@@ -1032,6 +1033,8 @@ class OracleDialect(default.DefaultDialect):
if name.upper() == name and not \
self.identifier_preparer._requires_quotes(name.lower()):
return name.lower()
+ elif name.lower() == name:
+ return quoted_name(name, quote=True)
else:
return name
@@ -1280,7 +1283,7 @@ class OracleDialect(default.DefaultDialect):
'type': coltype,
'nullable': nullable,
'default': default,
- 'autoincrement': default is None
+ 'autoincrement': 'auto',
}
if orig_colname.lower() == orig_colname:
cdict['quote'] = True
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py
index e19047b76..a1786d16c 100644
--- a/lib/sqlalchemy/dialects/sqlite/base.py
+++ b/lib/sqlalchemy/dialects/sqlite/base.py
@@ -853,12 +853,20 @@ class SQLiteDDLCompiler(compiler.DDLCompiler):
if not column.nullable:
colspec += " NOT NULL"
- if (column.primary_key and
- column.table.dialect_options['sqlite']['autoincrement'] and
- len(column.table.primary_key.columns) == 1 and
- issubclass(column.type._type_affinity, sqltypes.Integer) and
- not column.foreign_keys):
- colspec += " PRIMARY KEY AUTOINCREMENT"
+ if column.primary_key:
+ if (
+ column.autoincrement is True and
+ len(column.table.primary_key.columns) != 1
+ ):
+ raise exc.CompileError(
+ "SQLite does not support autoincrement for "
+ "composite primary keys")
+
+ if (column.table.dialect_options['sqlite']['autoincrement'] and
+ len(column.table.primary_key.columns) == 1 and
+ issubclass(column.type._type_affinity, sqltypes.Integer) and
+ not column.foreign_keys):
+ colspec += " PRIMARY KEY AUTOINCREMENT"
return colspec
@@ -894,11 +902,25 @@ class SQLiteDDLCompiler(compiler.DDLCompiler):
return preparer.format_table(table, use_schema=False)
- def visit_create_index(self, create):
+ def visit_create_index(self, create, include_schema=False,
+ include_table_schema=True):
index = create.element
-
- text = super(SQLiteDDLCompiler, self).visit_create_index(
- create, include_table_schema=False)
+ self._verify_index_table(index)
+ preparer = self.preparer
+ text = "CREATE "
+ if index.unique:
+ text += "UNIQUE "
+ text += "INDEX %s ON %s (%s)" \
+ % (
+ self._prepared_index_name(index,
+ include_schema=True),
+ preparer.format_table(index.table,
+ use_schema=False),
+ ', '.join(
+ self.sql_compiler.process(
+ expr, include_table=False, literal_binds=True) for
+ expr in index.expressions)
+ )
whereclause = index.dialect_options["sqlite"]["where"]
if whereclause is not None:
@@ -1095,6 +1117,13 @@ class SQLiteDialect(default.DefaultDialect):
return None
@reflection.cache
+ def get_schema_names(self, connection, **kw):
+ s = "PRAGMA database_list"
+ dl = connection.execute(s)
+
+ return [db[1] for db in dl if db[1] != "temp"]
+
+ @reflection.cache
def get_table_names(self, connection, schema=None, **kw):
if schema is not None:
qschema = self.identifier_preparer.quote_identifier(schema)
@@ -1190,7 +1219,7 @@ class SQLiteDialect(default.DefaultDialect):
'type': coltype,
'nullable': nullable,
'default': default,
- 'autoincrement': default is None,
+ 'autoincrement': 'auto',
'primary_key': primary_key,
}
@@ -1283,7 +1312,7 @@ class SQLiteDialect(default.DefaultDialect):
fk = fks[numerical_id] = {
'name': None,
'constrained_columns': [],
- 'referred_schema': None,
+ 'referred_schema': schema,
'referred_table': rtbl,
'referred_columns': [],
}
diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py
index 16ead950e..dc36f2f35 100644
--- a/lib/sqlalchemy/orm/query.py
+++ b/lib/sqlalchemy/orm/query.py
@@ -1292,7 +1292,9 @@ class Query(object):
session.query(MyClass).filter(MyClass.name == 'some name')
- Multiple criteria are joined together by AND::
+ Multiple criteria may be specified as comma separated; the effect
+ is that they will be joined together using the :func:`.and_`
+ function::
session.query(MyClass).\\
filter(MyClass.name == 'some name', MyClass.id > 5)
@@ -1301,9 +1303,6 @@ class Query(object):
WHERE clause of a select. String expressions are coerced
into SQL expression constructs via the :func:`.text` construct.
- .. versionchanged:: 0.7.5
- Multiple criteria joined by AND.
-
.. seealso::
:meth:`.Query.filter_by` - filter on keyword expressions.
@@ -1327,7 +1326,9 @@ class Query(object):
session.query(MyClass).filter_by(name = 'some name')
- Multiple criteria are joined together by AND::
+ Multiple criteria may be specified as comma separated; the effect
+ is that they will be joined together using the :func:`.and_`
+ function::
session.query(MyClass).\\
filter_by(name = 'some name', id = 5)
@@ -2463,6 +2464,12 @@ class Query(object):
Calling ``first()`` results in an execution of the underlying query.
+ .. seealso::
+
+ :meth:`.Query.one`
+
+ :meth:`.Query.one_or_none`
+
"""
if self._statement is not None:
ret = list(self)[0:1]
@@ -2495,6 +2502,12 @@ class Query(object):
Added :meth:`.Query.one_or_none`
+ .. seealso::
+
+ :meth:`.Query.first`
+
+ :meth:`.Query.one`
+
"""
ret = list(self)
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index 691195772..f1220ce31 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -2381,7 +2381,7 @@ class DDLCompiler(Compiled):
text += "CONSTRAINT %s " % formatted_name
text += "PRIMARY KEY "
text += "(%s)" % ', '.join(self.preparer.quote(c.name)
- for c in constraint)
+ for c in constraint.columns_autoinc_first)
text += self.define_constraint_deferrability(constraint)
return text
diff --git a/lib/sqlalchemy/sql/crud.py b/lib/sqlalchemy/sql/crud.py
index e6f16b698..72b66c036 100644
--- a/lib/sqlalchemy/sql/crud.py
+++ b/lib/sqlalchemy/sql/crud.py
@@ -212,6 +212,7 @@ def _scan_cols(
for c in cols:
col_key = _getattr_col_key(c)
+
if col_key in parameters and col_key not in check_columns:
_append_param_parameter(
@@ -248,6 +249,10 @@ def _scan_cols(
elif implicit_return_defaults and \
c in implicit_return_defaults:
compiler.returning.append(c)
+ elif c.primary_key and \
+ c is not stmt.table._autoincrement_column and \
+ not c.nullable:
+ _raise_pk_with_no_anticipated_value(c)
elif compiler.isupdate:
_append_param_update(
@@ -285,6 +290,22 @@ def _append_param_parameter(
def _append_param_insert_pk_returning(compiler, stmt, c, values, kw):
+ """Create a primary key expression in the INSERT statement and
+ possibly a RETURNING clause for it.
+
+ If the column has a Python-side default, we will create a bound
+ parameter for it and "pre-execute" the Python function. If
+ the column has a SQL expression default, or is a sequence,
+ we will add it directly into the INSERT statement and add a
+ RETURNING element to get the new value. If the column has a
+ server side default or is marked as the "autoincrement" column,
+ we will add a RETRUNING element to get at the value.
+
+ If all the above tests fail, that indicates a primary key column with no
+ noted default generation capabilities that has no parameter passed;
+ raise an exception.
+
+ """
if c.default is not None:
if c.default.is_sequence:
if compiler.dialect.supports_sequences and \
@@ -303,9 +324,12 @@ def _append_param_insert_pk_returning(compiler, stmt, c, values, kw):
values.append(
(c, _create_prefetch_bind_param(compiler, c))
)
-
- else:
+ elif c is stmt.table._autoincrement_column or c.server_default is not None:
compiler.returning.append(c)
+ elif not c.nullable:
+ # no .default, no .server_default, not autoincrement, we have
+ # no indication this primary key column will have any value
+ _raise_pk_with_no_anticipated_value(c)
def _create_prefetch_bind_param(compiler, c, process=True, name=None):
@@ -342,18 +366,46 @@ def _process_multiparam_default_bind(compiler, c, index, kw):
def _append_param_insert_pk(compiler, stmt, c, values, kw):
+ """Create a bound parameter in the INSERT statement to receive a
+ 'prefetched' default value.
+
+ The 'prefetched' value indicates that we are to invoke a Python-side
+ default function or expliclt SQL expression before the INSERT statement
+ proceeds, so that we have a primary key value available.
+
+ if the column has no noted default generation capabilities, it has
+ no value passed in either; raise an exception.
+
+ """
if (
- (c.default is not None and
- (not c.default.is_sequence or
- compiler.dialect.supports_sequences)) or
- c is stmt.table._autoincrement_column and
- (compiler.dialect.supports_sequences or
- compiler.dialect.
- preexecute_autoincrement_sequences)
+ (
+ # column has a Python-side default
+ c.default is not None and
+ (
+ # and it won't be a Sequence
+ not c.default.is_sequence or
+ compiler.dialect.supports_sequences
+ )
+ )
+ or
+ (
+ # column is the "autoincrement column"
+ c is stmt.table._autoincrement_column and
+ (
+ # and it's either a "sequence" or a
+ # pre-executable "autoincrement" sequence
+ compiler.dialect.supports_sequences or
+ compiler.dialect.preexecute_autoincrement_sequences
+ )
+ )
):
values.append(
(c, _create_prefetch_bind_param(compiler, c))
)
+ elif c.default is None and c.server_default is None and not c.nullable:
+ # no .default, no .server_default, not autoincrement, we have
+ # no indication this primary key column will have any value
+ _raise_pk_with_no_anticipated_value(c)
def _append_param_insert_hasdefault(
@@ -555,3 +607,24 @@ def _get_returning_modifiers(compiler, stmt):
return need_pks, implicit_returning, \
implicit_return_defaults, postfetch_lastrowid
+
+
+def _raise_pk_with_no_anticipated_value(c):
+ msg = (
+ "Column '%s.%s' is marked as a member of the "
+ "primary key for table '%s', "
+ "but has no Python-side or server-side default generator indicated, "
+ "nor does it indicate 'autoincrement=True' or 'nullable=True', "
+ "and no explicit value is passed. "
+ "Primary key columns typically may not store NULL."
+ %
+ (c.table.fullname, c.name, c.table.fullname))
+ if len(c.table.primary_key.columns) > 1:
+ msg += (
+ " Note that as of SQLAlchemy 1.1, 'autoincrement=True' must be "
+ "indicated explicitly for composite (e.g. multicolumn) primary "
+ "keys if AUTO_INCREMENT/SERIAL/IDENTITY "
+ "behavior is expected for one of the columns in the primary key. "
+ "CREATE TABLE statements are impacted by this change as well on "
+ "most backends.")
+ raise exc.CompileError(msg)
diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py
index 137208584..e20545962 100644
--- a/lib/sqlalchemy/sql/schema.py
+++ b/lib/sqlalchemy/sql/schema.py
@@ -572,18 +572,9 @@ class Table(DialectKWArgs, SchemaItem, TableClause):
def _init_collections(self):
pass
- @util.memoized_property
+ @property
def _autoincrement_column(self):
- for col in self.primary_key:
- if (col.autoincrement and col.type._type_affinity is not None and
- issubclass(col.type._type_affinity,
- type_api.INTEGERTYPE._type_affinity) and
- (not col.foreign_keys or
- col.autoincrement == 'ignore_fk') and
- isinstance(col.default, (type(None), Sequence)) and
- (col.server_default is None or
- col.server_default.reflected)):
- return col
+ return self.primary_key._autoincrement_column
@property
def key(self):
@@ -913,17 +904,40 @@ class Column(SchemaItem, ColumnClause):
argument is available such as ``server_default``, ``default``
and ``unique``.
- :param autoincrement: This flag may be set to ``False`` to
- indicate an integer primary key column that should not be
- considered to be the "autoincrement" column, that is
- the integer primary key column which generates values
- implicitly upon INSERT and whose value is usually returned
- via the DBAPI cursor.lastrowid attribute. It defaults
- to ``True`` to satisfy the common use case of a table
- with a single integer primary key column. If the table
- has a composite primary key consisting of more than one
- integer column, set this flag to True only on the
- column that should be considered "autoincrement".
+ :param autoincrement: Set up "auto increment" semantics for an integer
+ primary key column. The default value is the string ``"auto"``
+ which indicates that a single-column primary key that is of
+ an INTEGER type with no stated client-side or python-side defaults
+ should receive auto increment semantics automatically;
+ all other varieties of primary key columns will not. This
+ includes that :term:`DDL` such as Postgresql SERIAL or MySQL
+ AUTO_INCREMENT will be emitted for this column during a table
+ create, as well as that the column is assumed to generate new
+ integer primary key values when an INSERT statement invokes which
+ will be retrieved by the dialect.
+
+ The flag may be set to ``True`` to indicate that a column which
+ is part of a composite (e.g. multi-column) primary key should
+ have autoincrement semantics, though note that only one column
+ within a primary key may have this setting. It can also
+ be set to ``True`` to indicate autoincrement semantics on a
+ column that has a client-side or server-side default configured,
+ however note that not all dialects can accommodate all styles
+ of default as an "autoincrement". It can also be
+ set to ``False`` on a single-column primary key that has a
+ datatype of INTEGER in order to disable auto increment semantics
+ for that column.
+
+ .. versionchanged:: 1.1 The autoincrement flag now defaults to
+ ``"auto"`` which indicates autoincrement semantics by default
+ for single-column integer primary keys only; for composite
+ (multi-column) primary keys, autoincrement is never implicitly
+ enabled; as always, ``autoincrement=True`` will allow for
+ at most one of those columns to be an "autoincrement" column.
+ ``autoincrement=True`` may also be set on a :class:`.Column`
+ that has an explicit client-side or server-side default,
+ subject to limitations of the backend database and dialect.
+
The setting *only* has an effect for columns which are:
@@ -940,11 +954,8 @@ class Column(SchemaItem, ColumnClause):
primary_key=True, autoincrement='ignore_fk')
It is typically not desirable to have "autoincrement" enabled
- on such a column as its value intends to mirror that of a
- primary key column elsewhere.
-
- * have no server side or client side defaults (with the exception
- of Postgresql SERIAL).
+ on a column that refers to another via foreign key, as such a column
+ is required to refer to a value that originates from elsewhere.
The setting has these two effects on columns that meet the
above criteria:
@@ -961,20 +972,15 @@ class Column(SchemaItem, ColumnClause):
:ref:`sqlite_autoincrement`
- * The column will be considered to be available as
- cursor.lastrowid or equivalent, for those dialects which
- "post fetch" newly inserted identifiers after a row has
- been inserted (SQLite, MySQL, MS-SQL). It does not have
- any effect in this regard for databases that use sequences
- to generate primary key identifiers (i.e. Firebird, Postgresql,
- Oracle).
-
- .. versionchanged:: 0.7.4
- ``autoincrement`` accepts a special value ``'ignore_fk'``
- to indicate that autoincrementing status regardless of foreign
- key references. This applies to certain composite foreign key
- setups, such as the one demonstrated in the ORM documentation
- at :ref:`post_update`.
+ * The column will be considered to be available using an
+ "autoincrement" method specific to the backend database, such
+ as calling upon ``cursor.lastrowid``, using RETURNING in an
+ INSERT statement to get at a sequence-generated value, or using
+ special functions such as "SELECT scope_identity()".
+ These methods are highly specific to the DBAPIs and databases in
+ use and vary greatly, so care should be taken when associating
+ ``autoincrement=True`` with a custom default generation function.
+
:param default: A scalar, Python callable, or
:class:`.ColumnElement` expression representing the
@@ -1128,7 +1134,7 @@ class Column(SchemaItem, ColumnClause):
self.system = kwargs.pop('system', False)
self.doc = kwargs.pop('doc', None)
self.onupdate = kwargs.pop('onupdate', None)
- self.autoincrement = kwargs.pop('autoincrement', True)
+ self.autoincrement = kwargs.pop('autoincrement', "auto")
self.constraints = set()
self.foreign_keys = set()
@@ -1263,12 +1269,12 @@ class Column(SchemaItem, ColumnClause):
if self.primary_key:
table.primary_key._replace(self)
- Table._autoincrement_column._reset(table)
elif self.key in table.primary_key:
raise exc.ArgumentError(
"Trying to redefine primary-key column '%s' as a "
"non-primary-key column on table '%s'" % (
self.key, table.fullname))
+
self.table = table
if self.index:
@@ -3025,11 +3031,77 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint):
self.columns.extend(columns)
+ PrimaryKeyConstraint._autoincrement_column._reset(self)
self._set_parent_with_dispatch(self.table)
def _replace(self, col):
+ PrimaryKeyConstraint._autoincrement_column._reset(self)
self.columns.replace(col)
+ @property
+ def columns_autoinc_first(self):
+ autoinc = self._autoincrement_column
+
+ if autoinc is not None:
+ return [autoinc] + [c for c in self.columns if c is not autoinc]
+ else:
+ return list(self.columns)
+
+ @util.memoized_property
+ def _autoincrement_column(self):
+
+ def _validate_autoinc(col, autoinc_true):
+ if col.type._type_affinity is None or not issubclass(
+ col.type._type_affinity,
+ type_api.INTEGERTYPE._type_affinity):
+ if autoinc_true:
+ raise exc.ArgumentError(
+ "Column type %s on column '%s' is not "
+ "compatible with autoincrement=True" % (
+ col.type,
+ col
+ ))
+ else:
+ return False
+ elif not isinstance(col.default, (type(None), Sequence)) and \
+ not autoinc_true:
+ return False
+ elif col.server_default is not None and not autoinc_true:
+ return False
+ elif (
+ col.foreign_keys and col.autoincrement
+ not in (True, 'ignore_fk')):
+ return False
+ return True
+
+ if len(self.columns) == 1:
+ col = list(self.columns)[0]
+
+ if col.autoincrement is True:
+ _validate_autoinc(col, True)
+ return col
+ elif (
+ col.autoincrement in ('auto', 'ignore_fk') and
+ _validate_autoinc(col, False)
+ ):
+ return col
+
+ else:
+ autoinc = None
+ for col in self.columns:
+ if col.autoincrement is True:
+ _validate_autoinc(col, True)
+ if autoinc is not None:
+ raise exc.ArgumentError(
+ "Only one Column may be marked "
+ "autoincrement=True, found both %s and %s." %
+ (col.name, autoinc.name)
+ )
+ else:
+ autoinc = col
+
+ return autoinc
+
class UniqueConstraint(ColumnCollectionConstraint):
"""A table-level UNIQUE constraint.
diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py
index 77527571b..3f9ddae73 100644
--- a/lib/sqlalchemy/testing/provision.py
+++ b/lib/sqlalchemy/testing/provision.py
@@ -2,7 +2,7 @@ from sqlalchemy.engine import url as sa_url
from sqlalchemy import text
from sqlalchemy.util import compat
from . import config, engines
-
+import os
FOLLOWER_IDENT = None
@@ -52,6 +52,7 @@ def setup_config(db_url, options, file_config, follower_ident):
db_opts = {}
_update_db_opts(db_url, db_opts)
eng = engines.testing_engine(db_url, db_opts)
+ _post_configure_engine(db_url, eng, follower_ident)
eng.connect().close()
cfg = config.Config.register(eng, db_opts, options, file_config)
if follower_ident:
@@ -106,6 +107,11 @@ def _configure_follower(cfg, ident):
@register.init
+def _post_configure_engine(url, engine, follower_ident):
+ pass
+
+
+@register.init
def _follower_url_from_main(url, ident):
url = sa_url.make_url(url)
url.database = ident
@@ -126,6 +132,23 @@ def _sqlite_follower_url_from_main(url, ident):
return sa_url.make_url("sqlite:///%s.db" % ident)
+@_post_configure_engine.for_db("sqlite")
+def _sqlite_post_configure_engine(url, engine, follower_ident):
+ from sqlalchemy import event
+
+ @event.listens_for(engine, "connect")
+ def connect(dbapi_connection, connection_record):
+ # use file DBs in all cases, memory acts kind of strangely
+ # as an attached
+ if not follower_ident:
+ dbapi_connection.execute(
+ 'ATTACH DATABASE "test_schema.db" AS test_schema')
+ else:
+ dbapi_connection.execute(
+ 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
+ % follower_ident)
+
+
@_create_db.for_db("postgresql")
def _pg_create_db(cfg, eng, ident):
with eng.connect().execution_options(
@@ -176,8 +199,10 @@ def _pg_drop_db(cfg, eng, ident):
@_drop_db.for_db("sqlite")
def _sqlite_drop_db(cfg, eng, ident):
- pass
- #os.remove("%s.db" % ident)
+ if ident:
+ os.remove("%s_test_schema.db" % ident)
+ else:
+ os.remove("%s.db" % ident)
@_drop_db.for_db("mysql")
diff --git a/lib/sqlalchemy/testing/schema.py b/lib/sqlalchemy/testing/schema.py
index 93b52ad58..257578668 100644
--- a/lib/sqlalchemy/testing/schema.py
+++ b/lib/sqlalchemy/testing/schema.py
@@ -71,9 +71,12 @@ def Column(*args, **kw):
args = [arg for arg in args if not isinstance(arg, schema.ForeignKey)]
col = schema.Column(*args, **kw)
- if 'test_needs_autoincrement' in test_opts and \
+ if test_opts.get('test_needs_autoincrement', False) and \
kw.get('primary_key', False):
+ if col.default is None and col.server_default is None:
+ col.autoincrement = True
+
# allow any test suite to pick up on this
col.info['test_needs_autoincrement'] = True
diff --git a/test/dialect/mysql/test_compiler.py b/test/dialect/mysql/test_compiler.py
index 304c31012..60af82bab 100644
--- a/test/dialect/mysql/test_compiler.py
+++ b/test/dialect/mysql/test_compiler.py
@@ -511,9 +511,8 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(schema.CreateTable(t1),
'CREATE TABLE sometable (assigned_id '
'INTEGER NOT NULL, id INTEGER NOT NULL '
- 'AUTO_INCREMENT, PRIMARY KEY (assigned_id, '
- 'id), KEY idx_autoinc_id (id))ENGINE=Inn'
- 'oDB')
+ 'AUTO_INCREMENT, PRIMARY KEY (id, assigned_id)'
+ ')ENGINE=InnoDB')
t1 = Table('sometable', MetaData(),
Column('assigned_id', Integer(), primary_key=True,
@@ -537,8 +536,7 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
'CREATE TABLE sometable ('
'id INTEGER NOT NULL, '
'`order` INTEGER NOT NULL AUTO_INCREMENT, '
- 'PRIMARY KEY (id, `order`), '
- 'KEY idx_autoinc_order (`order`)'
+ 'PRIMARY KEY (`order`, id)'
')ENGINE=InnoDB')
def test_create_table_with_partition(self):
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py
index 39b39e006..266fc335e 100644
--- a/test/dialect/mysql/test_reflection.py
+++ b/test/dialect/mysql/test_reflection.py
@@ -211,49 +211,55 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
meta = MetaData(testing.db)
try:
Table('ai_1', meta,
- Column('int_y', Integer, primary_key=True),
+ Column('int_y', Integer, primary_key=True,
+ autoincrement=True),
Column('int_n', Integer, DefaultClause('0'),
primary_key=True),
- mysql_engine='MyISAM')
+ mysql_engine='MyISAM')
Table('ai_2', meta,
- Column('int_y', Integer, primary_key=True),
+ Column('int_y', Integer, primary_key=True,
+ autoincrement=True),
Column('int_n', Integer, DefaultClause('0'),
primary_key=True),
- mysql_engine='MyISAM')
+ mysql_engine='MyISAM')
Table('ai_3', meta,
Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
- Column('int_y', Integer, primary_key=True),
- mysql_engine='MyISAM')
+ Column('int_y', Integer, primary_key=True,
+ autoincrement=True),
+ mysql_engine='MyISAM')
Table('ai_4', meta,
Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
Column('int_n2', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
- mysql_engine='MyISAM')
+ mysql_engine='MyISAM')
Table('ai_5', meta,
- Column('int_y', Integer, primary_key=True),
+ Column('int_y', Integer, primary_key=True,
+ autoincrement=True),
Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
- mysql_engine='MyISAM')
+ mysql_engine='MyISAM')
Table('ai_6', meta,
Column('o1', String(1), DefaultClause('x'),
primary_key=True),
- Column('int_y', Integer, primary_key=True),
- mysql_engine='MyISAM')
+ Column('int_y', Integer, primary_key=True,
+ autoincrement=True),
+ mysql_engine='MyISAM')
Table('ai_7', meta,
Column('o1', String(1), DefaultClause('x'),
primary_key=True),
Column('o2', String(1), DefaultClause('x'),
primary_key=True),
- Column('int_y', Integer, primary_key=True),
- mysql_engine='MyISAM')
+ Column('int_y', Integer, primary_key=True,
+ autoincrement=True),
+ mysql_engine='MyISAM')
Table('ai_8', meta,
Column('o1', String(1), DefaultClause('x'),
primary_key=True),
Column('o2', String(1), DefaultClause('x'),
primary_key=True),
- mysql_engine='MyISAM')
+ mysql_engine='MyISAM')
meta.create_all()
table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py
index 4a33644e0..6c10d78cc 100644
--- a/test/dialect/postgresql/test_query.py
+++ b/test/dialect/postgresql/test_query.py
@@ -72,9 +72,12 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engines.testing_engine(options={'implicit_returning': False}),
engines.testing_engine(options={'implicit_returning': True})
]:
- assert_raises_message(exc.DBAPIError,
- 'violates not-null constraint',
- eng.execute, t2.insert())
+ assert_raises_message(
+ exc.CompileError,
+ ".*has no Python-side or server-side default.*",
+ eng.execute, t2.insert()
+ )
+
def test_sequence_insert(self):
table = Table(
@@ -494,26 +497,26 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engines.testing_engine(options={'implicit_returning': False})
metadata.bind = self.engine
table.insert().execute({'id': 30, 'data': 'd1'})
- if self.engine.driver == 'pg8000':
- exception_cls = exc.ProgrammingError
- elif self.engine.driver == 'pypostgresql':
- exception_cls = Exception
- else:
- exception_cls = exc.IntegrityError
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'},
- {'data': 'd3'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'},
- {'data': 'd3'})
+
+ assert_raises_message(
+ exc.CompileError,
+ ".*has no Python-side or server-side default.*",
+ table.insert().execute, {'data': 'd2'})
+ assert_raises_message(
+ exc.CompileError,
+ ".*has no Python-side or server-side default.*",
+ table.insert().execute, {'data': 'd2'},
+ {'data': 'd3'})
+ assert_raises_message(
+ exc.CompileError,
+ ".*has no Python-side or server-side default.*",
+ table.insert().execute, {'data': 'd2'})
+ assert_raises_message(
+ exc.CompileError,
+ ".*has no Python-side or server-side default.*",
+ table.insert().execute, {'data': 'd2'},
+ {'data': 'd3'})
+
table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32,
'data': 'd3'})
table.insert(inline=True).execute({'id': 33, 'data': 'd4'})
@@ -530,13 +533,15 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
table.insert().execute({'id': 30, 'data': 'd1'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'},
- {'data': 'd3'})
+ assert_raises_message(
+ exc.CompileError,
+ ".*has no Python-side or server-side default.*",
+ table.insert().execute, {'data': 'd2'})
+ assert_raises_message(
+ exc.CompileError,
+ ".*has no Python-side or server-side default.*",
+ table.insert().execute, {'data': 'd2'},
+ {'data': 'd3'})
table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32,
'data': 'd3'})
table.insert(inline=True).execute({'id': 33, 'data': 'd4'})
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py
index e080568cf..dd4a888ff 100644
--- a/test/dialect/test_oracle.py
+++ b/test/dialect/test_oracle.py
@@ -5,6 +5,7 @@ from sqlalchemy.testing import eq_
from sqlalchemy import *
from sqlalchemy import types as sqltypes, exc, schema
from sqlalchemy.sql import table, column
+from sqlalchemy.sql.elements import quoted_name
from sqlalchemy.testing import fixtures, AssertsExecutionResults, AssertsCompiledSQL
from sqlalchemy import testing
from sqlalchemy.util import u, b
@@ -1859,6 +1860,31 @@ class TableReflectionTest(fixtures.TestBase):
tbl = Table('test_compress', m2, autoload=True)
assert tbl.dialect_options['oracle']['compress'] == "OLTP"
+ @testing.provide_metadata
+ def test_reflect_lowercase_forced_tables(self):
+ metadata = self.metadata
+
+ Table(
+ quoted_name('t1', quote=True), metadata,
+ Column('id', Integer, primary_key=True),
+ )
+ Table(
+ quoted_name('t2', quote=True), metadata,
+ Column('id', Integer, primary_key=True),
+ Column('t1id', ForeignKey('t1.id'))
+ )
+ metadata.create_all()
+
+ m2 = MetaData(testing.db)
+ t2_ref = Table(quoted_name('t2', quote=True), m2, autoload=True)
+ t1_ref = m2.tables['t1']
+ assert t2_ref.c.t1id.references(t1_ref.c.id)
+
+ m3 = MetaData(testing.db)
+ m3.reflect(only=lambda name, m: name.lower() in ('t1', 't2'))
+ assert m3.tables['t2'].c.t1id.references(m3.tables['t1'].c.id)
+
+
class RoundTripIndexTest(fixtures.TestBase):
__only_on__ = 'oracle'
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
index 17920c127..33903ff89 100644
--- a/test/dialect/test_sqlite.py
+++ b/test/dialect/test_sqlite.py
@@ -20,7 +20,7 @@ from sqlalchemy.engine.url import make_url
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \
AssertsExecutionResults, engines
from sqlalchemy import testing
-from sqlalchemy.schema import CreateTable
+from sqlalchemy.schema import CreateTable, FetchedValue
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.testing import mock
@@ -535,29 +535,12 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
assert e.pool.__class__ is pool.NullPool
-
-class AttachedMemoryDBTest(fixtures.TestBase):
+class AttachedDBTest(fixtures.TestBase):
__only_on__ = 'sqlite'
- dbname = None
-
- def setUp(self):
- self.conn = conn = testing.db.connect()
- if self.dbname is None:
- dbname = ':memory:'
- else:
- dbname = self.dbname
- conn.execute('ATTACH DATABASE "%s" AS test_schema' % dbname)
- self.metadata = MetaData()
-
- def tearDown(self):
- self.metadata.drop_all(self.conn)
- self.conn.execute('DETACH DATABASE test_schema')
- if self.dbname:
- os.remove(self.dbname)
-
def _fixture(self):
meta = self.metadata
+ self.conn = testing.db.connect()
ct = Table(
'created', meta,
Column('id', Integer),
@@ -567,6 +550,14 @@ class AttachedMemoryDBTest(fixtures.TestBase):
meta.create_all(self.conn)
return ct
+ def setup(self):
+ self.conn = testing.db.connect()
+ self.metadata = MetaData()
+
+ def teardown(self):
+ self.metadata.drop_all(self.conn)
+ self.conn.close()
+
def test_no_tables(self):
insp = inspect(self.conn)
eq_(insp.get_table_names("test_schema"), [])
@@ -581,6 +572,18 @@ class AttachedMemoryDBTest(fixtures.TestBase):
insp = inspect(self.conn)
eq_(insp.get_table_names("test_schema"), ["created"])
+ def test_schema_names(self):
+ self._fixture()
+ insp = inspect(self.conn)
+ eq_(insp.get_schema_names(), ["main", "test_schema"])
+
+ # implicitly creates a "temp" schema
+ self.conn.execute("select * from sqlite_temp_master")
+
+ # we're not including it
+ insp = inspect(self.conn)
+ eq_(insp.get_schema_names(), ["main", "test_schema"])
+
def test_reflect_system_table(self):
meta = MetaData(self.conn)
alt_master = Table(
@@ -633,10 +636,6 @@ class AttachedMemoryDBTest(fixtures.TestBase):
eq_(row['name'], 'foo')
-class AttachedFileDBTest(AttachedMemoryDBTest):
- dbname = 'attached_db.db'
-
-
class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
"""Tests SQLite-dialect specific compilation."""
@@ -752,6 +751,17 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
"WHERE data > 'a' AND data < 'b''s'",
dialect=sqlite.dialect())
+ def test_no_autoinc_on_composite_pk(self):
+ m = MetaData()
+ t = Table(
+ 't', m,
+ Column('x', Integer, primary_key=True, autoincrement=True),
+ Column('y', Integer, primary_key=True))
+ assert_raises_message(
+ exc.CompileError,
+ "SQLite does not support autoincrement for composite",
+ CreateTable(t).compile, dialect=sqlite.dialect()
+ )
class InsertTest(fixtures.TestBase, AssertsExecutionResults):
@@ -782,23 +792,46 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
def test_empty_insert_pk2(self):
+ # now raises CompileError due to [ticket:3216]
assert_raises(
- exc.DBAPIError, self._test_empty_insert,
+ exc.CompileError, self._test_empty_insert,
Table(
'b', MetaData(testing.db),
Column('x', Integer, primary_key=True),
Column('y', Integer, primary_key=True)))
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
- def test_empty_insert_pk3(self):
+ def test_empty_insert_pk2_fv(self):
assert_raises(
exc.DBAPIError, self._test_empty_insert,
Table(
+ 'b', MetaData(testing.db),
+ Column('x', Integer, primary_key=True,
+ server_default=FetchedValue()),
+ Column('y', Integer, primary_key=True,
+ server_default=FetchedValue())))
+
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
+ def test_empty_insert_pk3(self):
+ # now raises CompileError due to [ticket:3216]
+ assert_raises(
+ exc.CompileError, self._test_empty_insert,
+ Table(
'c', MetaData(testing.db),
Column('x', Integer, primary_key=True),
Column('y', Integer, DefaultClause('123'), primary_key=True)))
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
+ def test_empty_insert_pk3_fv(self):
+ assert_raises(
+ exc.DBAPIError, self._test_empty_insert,
+ Table(
+ 'c', MetaData(testing.db),
+ Column('x', Integer, primary_key=True,
+ server_default=FetchedValue()),
+ Column('y', Integer, DefaultClause('123'), primary_key=True)))
+
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
def test_empty_insert_pk4(self):
self._test_empty_insert(
Table(
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index 83650609d..b7bf87d63 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -311,22 +311,22 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
Don't mark this test as unsupported for any backend !
- (technically it fails with MySQL InnoDB since "id" comes before "id2")
-
"""
meta = self.metadata
- Table('test', meta,
+ Table(
+ 'test', meta,
Column('id', sa.Integer, primary_key=True),
Column('data', sa.String(50)),
- mysql_engine='MyISAM'
+ mysql_engine='InnoDB'
)
- Table('test2', meta,
- Column('id', sa.Integer, sa.ForeignKey('test.id'),
- primary_key=True),
+ Table(
+ 'test2', meta,
+ Column(
+ 'id', sa.Integer, sa.ForeignKey('test.id'), primary_key=True),
Column('id2', sa.Integer, primary_key=True),
Column('data', sa.String(50)),
- mysql_engine='MyISAM'
+ mysql_engine='InnoDB'
)
meta.create_all()
m2 = MetaData(testing.db)
@@ -334,7 +334,8 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
assert t1a._autoincrement_column is t1a.c.id
t2a = Table('test2', m2, autoload=True)
- assert t2a._autoincrement_column is t2a.c.id2
+ assert t2a._autoincrement_column is None
+
@skip('sqlite')
@testing.provide_metadata
diff --git a/test/ext/declarative/test_basic.py b/test/ext/declarative/test_basic.py
index ab0de801c..5165d9cc9 100644
--- a/test/ext/declarative/test_basic.py
+++ b/test/ext/declarative/test_basic.py
@@ -1570,8 +1570,7 @@ class DeclarativeTest(DeclarativeTestBase):
meta = MetaData(testing.db)
t1 = Table(
't1', meta,
- Column('id', String(50),
- primary_key=True, test_needs_autoincrement=True),
+ Column('id', String(50), primary_key=True),
Column('data', String(50)))
meta.create_all()
try:
diff --git a/test/orm/test_composites.py b/test/orm/test_composites.py
index 8b777dcdf..48027ec2d 100644
--- a/test/orm/test_composites.py
+++ b/test/orm/test_composites.py
@@ -313,8 +313,7 @@ class PrimaryKeyTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('graphs', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True),
Column('version_id', Integer, primary_key=True,
nullable=True),
Column('name', String(30)))
diff --git a/test/orm/test_query.py b/test/orm/test_query.py
index d7069733e..a373f1482 100644
--- a/test/orm/test_query.py
+++ b/test/orm/test_query.py
@@ -579,8 +579,7 @@ class GetTest(QueryTest):
table = Table(
'unicode_data', metadata,
Column(
- 'id', Unicode(40), primary_key=True,
- test_needs_autoincrement=True),
+ 'id', Unicode(40), primary_key=True),
Column('data', Unicode(40)))
metadata.create_all()
ustring = util.b('petit voix m\xe2\x80\x99a').decode('utf-8')
diff --git a/test/orm/test_relationships.py b/test/orm/test_relationships.py
index 9e4b38a90..061187330 100644
--- a/test/orm/test_relationships.py
+++ b/test/orm/test_relationships.py
@@ -931,14 +931,12 @@ class SynonymsAsFKsTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table("tableA", metadata,
- Column("id", Integer, primary_key=True,
- test_needs_autoincrement=True),
+ Column("id", Integer, primary_key=True),
Column("foo", Integer,),
test_needs_fk=True)
Table("tableB", metadata,
- Column("id", Integer, primary_key=True,
- test_needs_autoincrement=True),
+ Column("id", Integer, primary_key=True),
Column("_a_id", Integer, key='a_id', primary_key=True),
test_needs_fk=True)
@@ -1093,7 +1091,7 @@ class FKsAsPksTest(fixtures.MappedTest):
'tablec', tableA.metadata,
Column('id', Integer, primary_key=True),
Column('a_id', Integer, ForeignKey('tableA.id'),
- primary_key=True, autoincrement=False, nullable=True))
+ primary_key=True, nullable=True))
tableC.create()
class C(fixtures.BasicEntity):
@@ -2703,8 +2701,7 @@ class ExplicitLocalRemoteTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('id', String(50), primary_key=True,
- test_needs_autoincrement=True),
+ Column('id', String(50), primary_key=True),
Column('data', String(50)))
Table('t2', metadata,
Column('id', Integer, primary_key=True,
diff --git a/test/orm/test_unitofwork.py b/test/orm/test_unitofwork.py
index 5a47903f0..2f67943f1 100644
--- a/test/orm/test_unitofwork.py
+++ b/test/orm/test_unitofwork.py
@@ -260,7 +260,7 @@ class PKTest(fixtures.MappedTest):
def define_tables(cls, metadata):
Table('multipk1', metadata,
Column('multi_id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=not testing.against('sqlite')),
Column('multi_rev', Integer, primary_key=True),
Column('name', String(50), nullable=False),
Column('value', String(100)))
diff --git a/test/requirements.py b/test/requirements.py
index c25b409d7..fa69a62f1 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -293,7 +293,6 @@ class DefaultRequirements(SuiteRequirements):
named 'test_schema'."""
return skip_if([
- "sqlite",
"firebird"
], "no schema support")
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index c957b2f8a..f6f2ec740 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -2916,6 +2916,21 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
"CREATE TABLE t (x INTEGER, z INTEGER)"
)
+ def test_composite_pk_constraint_autoinc_first(self):
+ m = MetaData()
+ t = Table(
+ 't', m,
+ Column('a', Integer, primary_key=True),
+ Column('b', Integer, primary_key=True, autoincrement=True)
+ )
+ self.assert_compile(
+ schema.CreateTable(t),
+ "CREATE TABLE t ("
+ "a INTEGER NOT NULL, "
+ "b INTEGER NOT NULL, "
+ "PRIMARY KEY (b, a))"
+ )
+
class InlineDefaultTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index 673085cf7..fc0b2bb80 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -732,7 +732,6 @@ class AutoIncrementTest(fixtures.TablesTest):
)
assert x._autoincrement_column is None
- @testing.fails_on('sqlite', 'FIXME: unknown')
def test_non_autoincrement(self):
# sqlite INT primary keys can be non-unique! (only for ints)
nonai = Table(
@@ -746,8 +745,9 @@ class AutoIncrementTest(fixtures.TablesTest):
# mysql in legacy mode fails on second row
nonai.insert().execute(data='row 1')
nonai.insert().execute(data='row 2')
- assert_raises(
- sa.exc.DBAPIError,
+ assert_raises_message(
+ sa.exc.CompileError,
+ ".*has no Python-side or server-side default.*",
go
)
@@ -1080,6 +1080,23 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
assert not self._has_sequence('s1')
assert not self._has_sequence('s2')
+ @testing.requires.returning
+ @testing.provide_metadata
+ def test_freestanding_sequence_via_autoinc(self):
+ t = Table(
+ 'some_table', self.metadata,
+ Column(
+ 'id', Integer,
+ autoincrement=True,
+ primary_key=True,
+ default=Sequence(
+ 'my_sequence', metadata=self.metadata).next_value())
+ )
+ self.metadata.create_all(testing.db)
+
+ result = testing.db.execute(t.insert())
+ eq_(result.inserted_primary_key, [1])
+
cartitems = sometable = metadata = None
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
index f66f0b391..bdaf4f38c 100644
--- a/test/sql/test_insert.py
+++ b/test/sql/test_insert.py
@@ -390,6 +390,106 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams={"name_1": "foo"}
)
+ def test_anticipate_no_pk_composite_pk(self):
+ t = Table(
+ 't', MetaData(), Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True)
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "Column 't.y' is marked as a member.*"
+ "Note that as of SQLAlchemy 1.1,",
+ t.insert().compile, column_keys=['x']
+
+ )
+
+ def test_anticipate_no_pk_composite_pk_implicit_returning(self):
+ t = Table(
+ 't', MetaData(), Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True)
+ )
+ d = postgresql.dialect()
+ d.implicit_returning = True
+ assert_raises_message(
+ exc.CompileError,
+ "Column 't.y' is marked as a member.*"
+ "Note that as of SQLAlchemy 1.1,",
+ t.insert().compile, dialect=d, column_keys=['x']
+
+ )
+
+ def test_anticipate_no_pk_composite_pk_prefetch(self):
+ t = Table(
+ 't', MetaData(), Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True)
+ )
+ d = postgresql.dialect()
+ d.implicit_returning = False
+ assert_raises_message(
+ exc.CompileError,
+ "Column 't.y' is marked as a member.*"
+ "Note that as of SQLAlchemy 1.1,",
+ t.insert().compile, dialect=d, column_keys=['x']
+
+ )
+
+ def test_anticipate_nullable_composite_pk(self):
+ t = Table(
+ 't', MetaData(), Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True, nullable=True)
+ )
+ self.assert_compile(
+ t.insert(),
+ "INSERT INTO t (x) VALUES (:x)",
+ params={'x': 5},
+ )
+
+ def test_anticipate_no_pk_non_composite_pk(self):
+ t = Table(
+ 't', MetaData(),
+ Column('x', Integer, primary_key=True, autoincrement=False),
+ Column('q', Integer)
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "Column 't.x' is marked as a member.*"
+ "may not store NULL.$",
+ t.insert().compile, column_keys=['q']
+
+ )
+
+ def test_anticipate_no_pk_non_composite_pk_implicit_returning(self):
+ t = Table(
+ 't', MetaData(),
+ Column('x', Integer, primary_key=True, autoincrement=False),
+ Column('q', Integer)
+ )
+ d = postgresql.dialect()
+ d.implicit_returning = True
+ assert_raises_message(
+ exc.CompileError,
+ "Column 't.x' is marked as a member.*"
+ "may not store NULL.$",
+ t.insert().compile, dialect=d, column_keys=['q']
+
+ )
+
+ def test_anticipate_no_pk_non_composite_pk_prefetch(self):
+ t = Table(
+ 't', MetaData(),
+ Column('x', Integer, primary_key=True, autoincrement=False),
+ Column('q', Integer)
+ )
+ d = postgresql.dialect()
+ d.implicit_returning = False
+ assert_raises_message(
+ exc.CompileError,
+ "Column 't.x' is marked as a member.*"
+ "may not store NULL.$",
+ t.insert().compile, dialect=d, column_keys=['q']
+
+ )
+
class InsertImplicitReturningTest(
_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py
new file mode 100644
index 000000000..c49947425
--- /dev/null
+++ b/test/sql/test_insert_exec.py
@@ -0,0 +1,445 @@
+from sqlalchemy.testing import eq_, assert_raises_message, is_
+from sqlalchemy import testing
+from sqlalchemy.testing import fixtures, engines
+from sqlalchemy import (
+ exc, sql, String, Integer, MetaData, and_, ForeignKey,
+ VARCHAR, INT, Sequence, func)
+from sqlalchemy.testing.schema import Table, Column
+
+
+class InsertExecTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'users', metadata,
+ Column(
+ 'user_id', INT, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('user_name', VARCHAR(20)),
+ test_needs_acid=True
+ )
+
+ @testing.requires.multivalues_inserts
+ def test_multivalues_insert(self):
+ users = self.tables.users
+ users.insert(
+ values=[
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'}]).execute()
+ rows = users.select().order_by(users.c.user_id).execute().fetchall()
+ eq_(rows[0], (7, 'jack'))
+ eq_(rows[1], (8, 'ed'))
+ users.insert(values=[(9, 'jack'), (10, 'ed')]).execute()
+ rows = users.select().order_by(users.c.user_id).execute().fetchall()
+ eq_(rows[2], (9, 'jack'))
+ eq_(rows[3], (10, 'ed'))
+
+ def test_insert_heterogeneous_params(self):
+ """test that executemany parameters are asserted to match the
+ parameter set of the first."""
+ users = self.tables.users
+
+ assert_raises_message(
+ exc.StatementError,
+ r"\(sqlalchemy.exc.InvalidRequestError\) A value is required for "
+ "bind parameter 'user_name', in "
+ "parameter group 2 "
+ r"\[SQL: u?'INSERT INTO users",
+ users.insert().execute,
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9}
+ )
+
+ # this succeeds however. We aren't yet doing
+ # a length check on all subsequent parameters.
+ users.insert().execute(
+ {'user_id': 7},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9}
+ )
+
+ def _test_lastrow_accessor(self, table_, values, assertvalues):
+ """Tests the inserted_primary_key and lastrow_has_id() functions."""
+
+ def insert_values(engine, table_, values):
+ """
+ Inserts a row into a table, returns the full list of values
+ INSERTed including defaults that fired off on the DB side and
+ detects rows that had defaults and post-fetches.
+ """
+
+ # verify implicit_returning is working
+ if engine.dialect.implicit_returning:
+ ins = table_.insert()
+ comp = ins.compile(engine, column_keys=list(values))
+ if not set(values).issuperset(
+ c.key for c in table_.primary_key):
+ is_(bool(comp.returning), True)
+
+ result = engine.execute(table_.insert(), **values)
+ ret = values.copy()
+
+ for col, id in zip(
+ table_.primary_key, result.inserted_primary_key):
+ ret[col.key] = id
+
+ if result.lastrow_has_defaults():
+ criterion = and_(
+ *[
+ col == id for col, id in
+ zip(table_.primary_key, result.inserted_primary_key)])
+ row = engine.execute(table_.select(criterion)).first()
+ for c in table_.c:
+ ret[c.key] = row[c]
+ return ret
+
+ if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
+ assert testing.db.dialect.implicit_returning
+
+ if testing.db.dialect.implicit_returning:
+ test_engines = [
+ engines.testing_engine(options={'implicit_returning': False}),
+ engines.testing_engine(options={'implicit_returning': True}),
+ ]
+ else:
+ test_engines = [testing.db]
+
+ for engine in test_engines:
+ try:
+ table_.create(bind=engine, checkfirst=True)
+ i = insert_values(engine, table_, values)
+ eq_(i, assertvalues)
+ finally:
+ table_.drop(bind=engine)
+
+ @testing.skip_if('sqlite')
+ def test_lastrow_accessor_one(self):
+ metadata = MetaData()
+ self._test_lastrow_accessor(
+ Table(
+ "t1", metadata,
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('foo', String(30), primary_key=True)),
+ {'foo': 'hi'},
+ {'id': 1, 'foo': 'hi'}
+ )
+
+ @testing.skip_if('sqlite')
+ def test_lastrow_accessor_two(self):
+ metadata = MetaData()
+ self._test_lastrow_accessor(
+ Table(
+ "t2", metadata,
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('foo', String(30), primary_key=True),
+ Column('bar', String(30), server_default='hi')
+ ),
+ {'foo': 'hi'},
+ {'id': 1, 'foo': 'hi', 'bar': 'hi'}
+ )
+
+ def test_lastrow_accessor_three(self):
+ metadata = MetaData()
+ self._test_lastrow_accessor(
+ Table(
+ "t3", metadata,
+ Column("id", String(40), primary_key=True),
+ Column('foo', String(30), primary_key=True),
+ Column("bar", String(30))
+ ),
+ {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"},
+ {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}
+ )
+
+ def test_lastrow_accessor_four(self):
+ metadata = MetaData()
+ self._test_lastrow_accessor(
+ Table(
+ "t4", metadata,
+ Column(
+ 'id', Integer,
+ Sequence('t4_id_seq', optional=True),
+ primary_key=True),
+ Column('foo', String(30), primary_key=True),
+ Column('bar', String(30), server_default='hi')
+ ),
+ {'foo': 'hi', 'id': 1},
+ {'id': 1, 'foo': 'hi', 'bar': 'hi'}
+ )
+
+ def test_lastrow_accessor_five(self):
+ metadata = MetaData()
+ self._test_lastrow_accessor(
+ Table(
+ "t5", metadata,
+ Column('id', String(10), primary_key=True),
+ Column('bar', String(30), server_default='hi')
+ ),
+ {'id': 'id1'},
+ {'id': 'id1', 'bar': 'hi'},
+ )
+
+ @testing.skip_if('sqlite')
+ def test_lastrow_accessor_six(self):
+ metadata = MetaData()
+ self._test_lastrow_accessor(
+ Table(
+ "t6", metadata,
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('bar', Integer, primary_key=True)
+ ),
+ {'bar': 0},
+ {'id': 1, 'bar': 0},
+ )
+
+ # TODO: why not in the sqlite suite?
+ @testing.only_on('sqlite+pysqlite')
+ @testing.provide_metadata
+ def test_lastrowid_zero(self):
+ from sqlalchemy.dialects import sqlite
+ eng = engines.testing_engine()
+
+ class ExcCtx(sqlite.base.SQLiteExecutionContext):
+
+ def get_lastrowid(self):
+ return 0
+ eng.dialect.execution_ctx_cls = ExcCtx
+ t = Table(
+ 't', self.metadata, Column('x', Integer, primary_key=True),
+ Column('y', Integer))
+ t.create(eng)
+ r = eng.execute(t.insert().values(y=5))
+ eq_(r.inserted_primary_key, [0])
+
+ @testing.fails_on(
+ 'sqlite', "sqlite autoincremnt doesn't work with composite pks")
+ @testing.provide_metadata
+ def test_misordered_lastrow(self):
+ metadata = self.metadata
+
+ related = Table(
+ 'related', metadata,
+ Column('id', Integer, primary_key=True),
+ mysql_engine='MyISAM'
+ )
+ t6 = Table(
+ "t6", metadata,
+ Column(
+ 'manual_id', Integer, ForeignKey('related.id'),
+ primary_key=True),
+ Column(
+ 'auto_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ mysql_engine='MyISAM'
+ )
+
+ metadata.create_all()
+ r = related.insert().values(id=12).execute()
+ id_ = r.inserted_primary_key[0]
+ eq_(id_, 12)
+
+ r = t6.insert().values(manual_id=id_).execute()
+ eq_(r.inserted_primary_key, [12, 1])
+
+ def test_implicit_id_insert_select_columns(self):
+ users = self.tables.users
+ stmt = users.insert().from_select(
+ (users.c.user_id, users.c.user_name),
+ users.select().where(users.c.user_id == 20))
+
+ testing.db.execute(stmt)
+
+ def test_implicit_id_insert_select_keys(self):
+ users = self.tables.users
+ stmt = users.insert().from_select(
+ ["user_id", "user_name"],
+ users.select().where(users.c.user_id == 20))
+
+ testing.db.execute(stmt)
+
+ @testing.requires.empty_inserts
+ @testing.requires.returning
+ def test_no_inserted_pk_on_returning(self):
+ users = self.tables.users
+ result = testing.db.execute(users.insert().returning(
+ users.c.user_id, users.c.user_name))
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"Can't call inserted_primary_key when returning\(\) is used.",
+ getattr, result, 'inserted_primary_key'
+ )
+
+
+class TableInsertTest(fixtures.TablesTest):
+
+ """test for consistent insert behavior across dialects
+ regarding the inline=True flag, lower-case 't' tables.
+
+ """
+ run_create_tables = 'each'
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'foo', metadata,
+ Column('id', Integer, Sequence('t_id_seq'), primary_key=True),
+ Column('data', String(50)),
+ Column('x', Integer)
+ )
+
+ def _fixture(self, types=True):
+ if types:
+ t = sql.table(
+ 'foo', sql.column('id', Integer),
+ sql.column('data', String),
+ sql.column('x', Integer))
+ else:
+ t = sql.table(
+ 'foo', sql.column('id'), sql.column('data'), sql.column('x'))
+ return t
+
+ def _test(self, stmt, row, returning=None, inserted_primary_key=False):
+ r = testing.db.execute(stmt)
+
+ if returning:
+ returned = r.first()
+ eq_(returned, returning)
+ elif inserted_primary_key is not False:
+ eq_(r.inserted_primary_key, inserted_primary_key)
+
+ eq_(testing.db.execute(self.tables.foo.select()).first(), row)
+
+ def _test_multi(self, stmt, rows, data):
+ testing.db.execute(stmt, rows)
+ eq_(
+ testing.db.execute(
+ self.tables.foo.select().
+ order_by(self.tables.foo.c.id)).fetchall(),
+ data)
+
+ @testing.requires.sequences
+ def test_expicit_sequence(self):
+ t = self._fixture()
+ self._test(
+ t.insert().values(
+ id=func.next_value(Sequence('t_id_seq')), data='data', x=5),
+ (1, 'data', 5)
+ )
+
+ def test_uppercase(self):
+ t = self.tables.foo
+ self._test(
+ t.insert().values(id=1, data='data', x=5),
+ (1, 'data', 5),
+ inserted_primary_key=[1]
+ )
+
+ def test_uppercase_inline(self):
+ t = self.tables.foo
+ self._test(
+ t.insert(inline=True).values(id=1, data='data', x=5),
+ (1, 'data', 5),
+ inserted_primary_key=[1]
+ )
+
+ @testing.crashes(
+ "mssql+pyodbc",
+ "Pyodbc + SQL Server + Py3K, some decimal handling issue")
+ def test_uppercase_inline_implicit(self):
+ t = self.tables.foo
+ self._test(
+ t.insert(inline=True).values(data='data', x=5),
+ (1, 'data', 5),
+ inserted_primary_key=[None]
+ )
+
+ def test_uppercase_implicit(self):
+ t = self.tables.foo
+ self._test(
+ t.insert().values(data='data', x=5),
+ (1, 'data', 5),
+ inserted_primary_key=[1]
+ )
+
+ def test_uppercase_direct_params(self):
+ t = self.tables.foo
+ self._test(
+ t.insert().values(id=1, data='data', x=5),
+ (1, 'data', 5),
+ inserted_primary_key=[1]
+ )
+
+ @testing.requires.returning
+ def test_uppercase_direct_params_returning(self):
+ t = self.tables.foo
+ self._test(
+ t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x),
+ (1, 'data', 5),
+ returning=(1, 5)
+ )
+
+ @testing.fails_on(
+ 'mssql', "lowercase table doesn't support identity insert disable")
+ def test_direct_params(self):
+ t = self._fixture()
+ self._test(
+ t.insert().values(id=1, data='data', x=5),
+ (1, 'data', 5),
+ inserted_primary_key=[]
+ )
+
+ @testing.fails_on(
+ 'mssql', "lowercase table doesn't support identity insert disable")
+ @testing.requires.returning
+ def test_direct_params_returning(self):
+ t = self._fixture()
+ self._test(
+ t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x),
+ (1, 'data', 5),
+ returning=(1, 5)
+ )
+
+ @testing.requires.emulated_lastrowid
+ def test_implicit_pk(self):
+ t = self._fixture()
+ self._test(
+ t.insert().values(data='data', x=5),
+ (1, 'data', 5),
+ inserted_primary_key=[]
+ )
+
+ @testing.requires.emulated_lastrowid
+ def test_implicit_pk_multi_rows(self):
+ t = self._fixture()
+ self._test_multi(
+ t.insert(),
+ [
+ {'data': 'd1', 'x': 5},
+ {'data': 'd2', 'x': 6},
+ {'data': 'd3', 'x': 7},
+ ],
+ [
+ (1, 'd1', 5),
+ (2, 'd2', 6),
+ (3, 'd3', 7)
+ ],
+ )
+
+ @testing.requires.emulated_lastrowid
+ def test_implicit_pk_inline(self):
+ t = self._fixture()
+ self._test(
+ t.insert(inline=True).values(data='data', x=5),
+ (1, 'data', 5),
+ inserted_primary_key=[]
+ )
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index 2e51b9a91..501df4671 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -1361,6 +1361,123 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
assert not t1.c.x.nullable
+class PKAutoIncrementTest(fixtures.TestBase):
+ def test_multi_integer_no_autoinc(self):
+ pk = PrimaryKeyConstraint(
+ Column('a', Integer),
+ Column('b', Integer)
+ )
+ t = Table('t', MetaData())
+ t.append_constraint(pk)
+
+ is_(pk._autoincrement_column, None)
+
+ def test_multi_integer_multi_autoinc(self):
+ pk = PrimaryKeyConstraint(
+ Column('a', Integer, autoincrement=True),
+ Column('b', Integer, autoincrement=True)
+ )
+ t = Table('t', MetaData())
+ t.append_constraint(pk)
+
+ assert_raises_message(
+ exc.ArgumentError,
+ "Only one Column may be marked",
+ lambda: pk._autoincrement_column
+ )
+
+ def test_single_integer_no_autoinc(self):
+ pk = PrimaryKeyConstraint(
+ Column('a', Integer),
+ )
+ t = Table('t', MetaData())
+ t.append_constraint(pk)
+
+ is_(pk._autoincrement_column, pk.columns['a'])
+
+ def test_single_string_no_autoinc(self):
+ pk = PrimaryKeyConstraint(
+ Column('a', String),
+ )
+ t = Table('t', MetaData())
+ t.append_constraint(pk)
+
+ is_(pk._autoincrement_column, None)
+
+ def test_single_string_illegal_autoinc(self):
+ t = Table('t', MetaData(), Column('a', String, autoincrement=True))
+ pk = PrimaryKeyConstraint(
+ t.c.a
+ )
+ t.append_constraint(pk)
+
+ assert_raises_message(
+ exc.ArgumentError,
+ "Column type VARCHAR on column 't.a'",
+ lambda: pk._autoincrement_column
+ )
+
+ def test_single_integer_default(self):
+ t = Table(
+ 't', MetaData(),
+ Column('a', Integer, autoincrement=True, default=lambda: 1))
+ pk = PrimaryKeyConstraint(
+ t.c.a
+ )
+ t.append_constraint(pk)
+
+ is_(pk._autoincrement_column, t.c.a)
+
+ def test_single_integer_server_default(self):
+ # new as of 1.1; now that we have three states for autoincrement,
+ # if the user puts autoincrement=True with a server_default, trust
+ # them on it
+ t = Table(
+ 't', MetaData(),
+ Column('a', Integer,
+ autoincrement=True, server_default=func.magic()))
+ pk = PrimaryKeyConstraint(
+ t.c.a
+ )
+ t.append_constraint(pk)
+
+ is_(pk._autoincrement_column, t.c.a)
+
+ def test_implicit_autoinc_but_fks(self):
+ m = MetaData()
+ Table('t1', m, Column('id', Integer, primary_key=True))
+ t2 = Table(
+ 't2', MetaData(),
+ Column('a', Integer, ForeignKey('t1.id')))
+ pk = PrimaryKeyConstraint(
+ t2.c.a
+ )
+ t2.append_constraint(pk)
+ is_(pk._autoincrement_column, None)
+
+ def test_explicit_autoinc_but_fks(self):
+ m = MetaData()
+ Table('t1', m, Column('id', Integer, primary_key=True))
+ t2 = Table(
+ 't2', MetaData(),
+ Column('a', Integer, ForeignKey('t1.id'), autoincrement=True))
+ pk = PrimaryKeyConstraint(
+ t2.c.a
+ )
+ t2.append_constraint(pk)
+ is_(pk._autoincrement_column, t2.c.a)
+
+ t3 = Table(
+ 't3', MetaData(),
+ Column('a', Integer,
+ ForeignKey('t1.id'), autoincrement='ignore_fk'))
+ pk = PrimaryKeyConstraint(
+ t3.c.a
+ )
+ t3.append_constraint(pk)
+ is_(pk._autoincrement_column, t3.c.a)
+
+
class SchemaTypeTest(fixtures.TestBase):
class MyType(sqltypes.SchemaType, sqltypes.TypeEngine):
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 0313a9cd0..aca933fc9 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -2,13 +2,12 @@ from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \
is_, in_, not_in_
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, engines
-from sqlalchemy import util
from sqlalchemy import (
exc, sql, func, select, String, Integer, MetaData, and_, ForeignKey,
- union, intersect, except_, union_all, VARCHAR, INT, CHAR, text, Sequence,
- bindparam, literal, not_, type_coerce, literal_column, desc, asc,
- TypeDecorator, or_, cast, table, column)
-from sqlalchemy.engine import default, result as _result
+ union, intersect, except_, union_all, VARCHAR, INT, text,
+ bindparam, literal, not_, literal_column, desc, asc,
+ TypeDecorator, or_, cast)
+from sqlalchemy.engine import default
from sqlalchemy.testing.schema import Table, Column
# ongoing - these are old tests. those which are of general use
@@ -62,260 +61,6 @@ class QueryTest(fixtures.TestBase):
def teardown_class(cls):
metadata.drop_all()
- @testing.requires.multivalues_inserts
- def test_multivalues_insert(self):
- users.insert(
- values=[
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'}]).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
- self.assert_(rows[0] == (7, 'jack'))
- self.assert_(rows[1] == (8, 'ed'))
- users.insert(values=[(9, 'jack'), (10, 'ed')]).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
- self.assert_(rows[2] == (9, 'jack'))
- self.assert_(rows[3] == (10, 'ed'))
-
- def test_insert_heterogeneous_params(self):
- """test that executemany parameters are asserted to match the
- parameter set of the first."""
-
- assert_raises_message(
- exc.StatementError,
- r"\(sqlalchemy.exc.InvalidRequestError\) A value is required for "
- "bind parameter 'user_name', in "
- "parameter group 2 "
- r"\[SQL: u?'INSERT INTO query_users",
- users.insert().execute,
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9}
- )
-
- # this succeeds however. We aren't yet doing
- # a length check on all subsequent parameters.
- users.insert().execute(
- {'user_id': 7},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9}
- )
-
- def test_lastrow_accessor(self):
- """Tests the inserted_primary_key and lastrow_has_id() functions."""
-
- def insert_values(engine, table, values):
- """
- Inserts a row into a table, returns the full list of values
- INSERTed including defaults that fired off on the DB side and
- detects rows that had defaults and post-fetches.
- """
-
- # verify implicit_returning is working
- if engine.dialect.implicit_returning:
- ins = table.insert()
- comp = ins.compile(engine, column_keys=list(values))
- if not set(values).issuperset(
- c.key for c in table.primary_key):
- assert comp.returning
-
- result = engine.execute(table.insert(), **values)
- ret = values.copy()
-
- for col, id in zip(table.primary_key, result.inserted_primary_key):
- ret[col.key] = id
-
- if result.lastrow_has_defaults():
- criterion = and_(
- *[
- col == id for col, id in
- zip(table.primary_key, result.inserted_primary_key)])
- row = engine.execute(table.select(criterion)).first()
- for c in table.c:
- ret[c.key] = row[c]
- return ret
-
- if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
- assert testing.db.dialect.implicit_returning
-
- if testing.db.dialect.implicit_returning:
- test_engines = [
- engines.testing_engine(options={'implicit_returning': False}),
- engines.testing_engine(options={'implicit_returning': True}),
- ]
- else:
- test_engines = [testing.db]
-
- for engine in test_engines:
- metadata = MetaData()
- for supported, table, values, assertvalues in [
- (
- {'unsupported': ['sqlite']},
- Table(
- "t1", metadata,
- Column(
- 'id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('foo', String(30), primary_key=True)),
- {'foo': 'hi'},
- {'id': 1, 'foo': 'hi'}
- ),
- (
- {'unsupported': ['sqlite']},
- Table(
- "t2", metadata,
- Column(
- 'id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('foo', String(30), primary_key=True),
- Column('bar', String(30), server_default='hi')
- ),
- {'foo': 'hi'},
- {'id': 1, 'foo': 'hi', 'bar': 'hi'}
- ),
- (
- {'unsupported': []},
- Table(
- "t3", metadata,
- Column("id", String(40), primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column("bar", String(30))
- ),
- {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"},
- {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}
- ),
- (
- {'unsupported': []},
- Table(
- "t4", metadata,
- Column(
- 'id', Integer,
- Sequence('t4_id_seq', optional=True),
- primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column('bar', String(30), server_default='hi')
- ),
- {'foo': 'hi', 'id': 1},
- {'id': 1, 'foo': 'hi', 'bar': 'hi'}
- ),
- (
- {'unsupported': []},
- Table(
- "t5", metadata,
- Column('id', String(10), primary_key=True),
- Column('bar', String(30), server_default='hi')
- ),
- {'id': 'id1'},
- {'id': 'id1', 'bar': 'hi'},
- ),
- (
- {'unsupported': ['sqlite']},
- Table(
- "t6", metadata,
- Column(
- 'id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('bar', Integer, primary_key=True)
- ),
- {'bar': 0},
- {'id': 1, 'bar': 0},
- ),
- ]:
- if testing.db.name in supported['unsupported']:
- continue
- try:
- table.create(bind=engine, checkfirst=True)
- i = insert_values(engine, table, values)
- assert i == assertvalues, "tablename: %s %r %r" % \
- (table.name, repr(i), repr(assertvalues))
- finally:
- table.drop(bind=engine)
-
- # TODO: why not in the sqlite suite?
- @testing.only_on('sqlite+pysqlite')
- @testing.provide_metadata
- def test_lastrowid_zero(self):
- from sqlalchemy.dialects import sqlite
- eng = engines.testing_engine()
-
- class ExcCtx(sqlite.base.SQLiteExecutionContext):
-
- def get_lastrowid(self):
- return 0
- eng.dialect.execution_ctx_cls = ExcCtx
- t = Table(
- 't', self.metadata, Column('x', Integer, primary_key=True),
- Column('y', Integer))
- t.create(eng)
- r = eng.execute(t.insert().values(y=5))
- eq_(r.inserted_primary_key, [0])
-
- @testing.fails_on(
- 'sqlite', "sqlite autoincremnt doesn't work with composite pks")
- def test_misordered_lastrow(self):
- related = Table(
- 'related', metadata,
- Column('id', Integer, primary_key=True),
- mysql_engine='MyISAM'
- )
- t6 = Table(
- "t6", metadata,
- Column(
- 'manual_id', Integer, ForeignKey('related.id'),
- primary_key=True),
- Column(
- 'auto_id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- mysql_engine='MyISAM'
- )
-
- metadata.create_all()
- r = related.insert().values(id=12).execute()
- id = r.inserted_primary_key[0]
- assert id == 12
-
- r = t6.insert().values(manual_id=id).execute()
- eq_(r.inserted_primary_key, [12, 1])
-
- def test_implicit_id_insert_select_columns(self):
- stmt = users.insert().from_select(
- (users.c.user_id, users.c.user_name),
- users.select().where(users.c.user_id == 20))
-
- testing.db.execute(stmt)
-
- def test_implicit_id_insert_select_keys(self):
- stmt = users.insert().from_select(
- ["user_id", "user_name"],
- users.select().where(users.c.user_id == 20))
-
- testing.db.execute(stmt)
-
- def test_row_iteration(self):
- users.insert().execute(
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9, 'user_name': 'fred'},
- )
- r = users.select().execute()
- l = []
- for row in r:
- l.append(row)
- self.assert_(len(l) == 3)
-
- @testing.requires.subqueries
- def test_anonymous_rows(self):
- users.insert().execute(
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9, 'user_name': 'fred'},
- )
-
- sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \
- as_scalar()
- for row in select([sel + 1, sel + 3], bind=users.bind).execute():
- assert row['anon_1'] == 8
- assert row['anon_2'] == 10
-
@testing.fails_on(
'firebird', "kinterbasdb doesn't send full type information")
def test_order_by_label(self):
@@ -365,154 +110,6 @@ class QueryTest(fixtures.TestBase):
[("test: ed",), ("test: fred",), ("test: jack",)]
)
- def test_row_comparison(self):
- users.insert().execute(user_id=7, user_name='jack')
- rp = users.select().execute().first()
-
- self.assert_(rp == rp)
- self.assert_(not(rp != rp))
-
- equal = (7, 'jack')
-
- self.assert_(rp == equal)
- self.assert_(equal == rp)
- self.assert_(not (rp != equal))
- self.assert_(not (equal != equal))
-
- def endless():
- while True:
- yield 1
- self.assert_(rp != endless())
- self.assert_(endless() != rp)
-
- # test that everything compares the same
- # as it would against a tuple
- import operator
- for compare in [False, 8, endless(), 'xyz', (7, 'jack')]:
- for op in [
- operator.eq, operator.ne, operator.gt,
- operator.lt, operator.ge, operator.le
- ]:
-
- try:
- control = op(equal, compare)
- except TypeError:
- # Py3K raises TypeError for some invalid comparisons
- assert_raises(TypeError, op, rp, compare)
- else:
- eq_(control, op(rp, compare))
-
- try:
- control = op(compare, equal)
- except TypeError:
- # Py3K raises TypeError for some invalid comparisons
- assert_raises(TypeError, op, compare, rp)
- else:
- eq_(control, op(compare, rp))
-
- @testing.provide_metadata
- def test_column_label_overlap_fallback(self):
- content = Table(
- 'content', self.metadata,
- Column('type', String(30)),
- )
- bar = Table(
- 'bar', self.metadata,
- Column('content_type', String(30))
- )
- self.metadata.create_all(testing.db)
- testing.db.execute(content.insert().values(type="t1"))
-
- row = testing.db.execute(content.select(use_labels=True)).first()
- assert content.c.type in row
- assert bar.c.content_type not in row
- assert sql.column('content_type') in row
-
- row = testing.db.execute(
- select([content.c.type.label("content_type")])).first()
- assert content.c.type in row
-
- assert bar.c.content_type not in row
-
- assert sql.column('content_type') in row
-
- row = testing.db.execute(select([func.now().label("content_type")])). \
- first()
- assert content.c.type not in row
-
- assert bar.c.content_type not in row
-
- assert sql.column('content_type') in row
-
- def test_pickled_rows(self):
- users.insert().execute(
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9, 'user_name': 'fred'},
- )
-
- for pickle in False, True:
- for use_labels in False, True:
- result = users.select(use_labels=use_labels).order_by(
- users.c.user_id).execute().fetchall()
-
- if pickle:
- result = util.pickle.loads(util.pickle.dumps(result))
-
- eq_(
- result,
- [(7, "jack"), (8, "ed"), (9, "fred")]
- )
- if use_labels:
- eq_(result[0]['query_users_user_id'], 7)
- eq_(
- list(result[0].keys()),
- ["query_users_user_id", "query_users_user_name"])
- else:
- eq_(result[0]['user_id'], 7)
- eq_(list(result[0].keys()), ["user_id", "user_name"])
-
- eq_(result[0][0], 7)
- eq_(result[0][users.c.user_id], 7)
- eq_(result[0][users.c.user_name], 'jack')
-
- if not pickle or use_labels:
- assert_raises(
- exc.NoSuchColumnError,
- lambda: result[0][addresses.c.user_id])
- else:
- # test with a different table. name resolution is
- # causing 'user_id' to match when use_labels wasn't used.
- eq_(result[0][addresses.c.user_id], 7)
-
- assert_raises(
- exc.NoSuchColumnError, lambda: result[0]['fake key'])
- assert_raises(
- exc.NoSuchColumnError,
- lambda: result[0][addresses.c.address_id])
-
- def test_column_error_printing(self):
- row = testing.db.execute(select([1])).first()
-
- class unprintable(object):
-
- def __str__(self):
- raise ValueError("nope")
-
- msg = r"Could not locate column in row for column '%s'"
-
- for accessor, repl in [
- ("x", "x"),
- (Column("q", Integer), "q"),
- (Column("q", Integer) + 12, r"q \+ :q_1"),
- (unprintable(), "unprintable element.*"),
- ]:
- assert_raises_message(
- exc.NoSuchColumnError,
- msg % repl,
- lambda: row[accessor]
- )
-
@testing.requires.boolean_col_expressions
def test_or_and_as_columns(self):
true, false = literal(True), literal(False)
@@ -539,16 +136,6 @@ class QueryTest(fixtures.TestBase):
assert row.x == True # noqa
assert row.y == False # noqa
- def test_fetchmany(self):
- users.insert().execute(user_id=7, user_name='jack')
- users.insert().execute(user_id=8, user_name='ed')
- users.insert().execute(user_id=9, user_name='fred')
- r = users.select().execute()
- l = []
- for row in r.fetchmany(size=2):
- l.append(row)
- self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))
-
def test_like_ops(self):
users.insert().execute(
{'user_id': 1, 'user_name': 'apples'},
@@ -817,618 +404,6 @@ class QueryTest(fixtures.TestBase):
use_labels=labels),
[(3, 'a'), (2, 'b'), (1, None)])
- def test_column_slices(self):
- users.insert().execute(user_id=1, user_name='john')
- users.insert().execute(user_id=2, user_name='jack')
- addresses.insert().execute(
- address_id=1, user_id=2, address='foo@bar.com')
-
- r = text(
- "select * from query_addresses", bind=testing.db).execute().first()
- self.assert_(r[0:1] == (1,))
- self.assert_(r[1:] == (2, 'foo@bar.com'))
- self.assert_(r[:-1] == (1, 2))
-
- def test_column_accessor_basic_compiled(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='jack')
- )
-
- r = users.select(users.c.user_id == 2).execute().first()
- self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(
- r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- def test_column_accessor_basic_text(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='jack')
- )
- r = testing.db.execute(
- text("select * from query_users where user_id=2")).first()
- self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(
- r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- def test_column_accessor_textual_select(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='jack')
- )
- # this will create column() objects inside
- # the select(), these need to match on name anyway
- r = testing.db.execute(
- select([
- column('user_id'), column('user_name')
- ]).select_from(table('query_users')).
- where(text('user_id=2'))
- ).first()
- self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(
- r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- def test_column_accessor_dotted_union(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
-
- # test a little sqlite weirdness - with the UNION,
- # cols come back as "query_users.user_id" in cursor.description
- r = testing.db.execute(
- text(
- "select query_users.user_id, query_users.user_name "
- "from query_users "
- "UNION select query_users.user_id, "
- "query_users.user_name from query_users"
- )
- ).first()
- eq_(r['user_id'], 1)
- eq_(r['user_name'], "john")
- eq_(list(r.keys()), ["user_id", "user_name"])
-
- @testing.only_on("sqlite", "sqlite specific feature")
- def test_column_accessor_sqlite_raw(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
-
- r = text(
- "select query_users.user_id, query_users.user_name "
- "from query_users "
- "UNION select query_users.user_id, "
- "query_users.user_name from query_users",
- bind=testing.db).execution_options(sqlite_raw_colnames=True). \
- execute().first()
- assert 'user_id' not in r
- assert 'user_name' not in r
- eq_(r['query_users.user_id'], 1)
- eq_(r['query_users.user_name'], "john")
- eq_(list(r.keys()), ["query_users.user_id", "query_users.user_name"])
-
- @testing.only_on("sqlite", "sqlite specific feature")
- def test_column_accessor_sqlite_translated(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
-
- r = text(
- "select query_users.user_id, query_users.user_name "
- "from query_users "
- "UNION select query_users.user_id, "
- "query_users.user_name from query_users",
- bind=testing.db).execute().first()
- eq_(r['user_id'], 1)
- eq_(r['user_name'], "john")
- eq_(r['query_users.user_id'], 1)
- eq_(r['query_users.user_name'], "john")
- eq_(list(r.keys()), ["user_id", "user_name"])
-
- def test_column_accessor_labels_w_dots(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
- # test using literal tablename.colname
- r = text(
- 'select query_users.user_id AS "query_users.user_id", '
- 'query_users.user_name AS "query_users.user_name" '
- 'from query_users', bind=testing.db).\
- execution_options(sqlite_raw_colnames=True).execute().first()
- eq_(r['query_users.user_id'], 1)
- eq_(r['query_users.user_name'], "john")
- assert "user_name" not in r
- eq_(list(r.keys()), ["query_users.user_id", "query_users.user_name"])
-
- def test_column_accessor_unary(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
-
- # unary experssions
- r = select([users.c.user_name.distinct()]).order_by(
- users.c.user_name).execute().first()
- eq_(r[users.c.user_name], 'john')
- eq_(r.user_name, 'john')
-
- def test_column_accessor_err(self):
- r = testing.db.execute(select([1])).first()
- assert_raises_message(
- AttributeError,
- "Could not locate column in row for column 'foo'",
- getattr, r, "foo"
- )
- assert_raises_message(
- KeyError,
- "Could not locate column in row for column 'foo'",
- lambda: r['foo']
- )
-
- def test_graceful_fetch_on_non_rows(self):
- """test that calling fetchone() etc. on a result that doesn't
- return rows fails gracefully.
-
- """
-
- # these proxies don't work with no cursor.description present.
- # so they don't apply to this test at the moment.
- # result.FullyBufferedResultProxy,
- # result.BufferedRowResultProxy,
- # result.BufferedColumnResultProxy
-
- conn = testing.db.connect()
- for meth in [
- lambda r: r.fetchone(),
- lambda r: r.fetchall(),
- lambda r: r.first(),
- lambda r: r.scalar(),
- lambda r: r.fetchmany(),
- lambda r: r._getter('user'),
- lambda r: r._has_key('user'),
- ]:
- trans = conn.begin()
- result = conn.execute(users.insert(), user_id=1)
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object does not return rows. "
- "It has been closed automatically.",
- meth, result,
- )
- trans.rollback()
-
- @testing.requires.empty_inserts
- @testing.requires.returning
- def test_no_inserted_pk_on_returning(self):
- result = testing.db.execute(users.insert().returning(
- users.c.user_id, users.c.user_name))
- assert_raises_message(
- exc.InvalidRequestError,
- r"Can't call inserted_primary_key when returning\(\) is used.",
- getattr, result, 'inserted_primary_key'
- )
-
- def test_fetchone_til_end(self):
- result = testing.db.execute("select * from query_users")
- eq_(result.fetchone(), None)
- eq_(result.fetchone(), None)
- eq_(result.fetchone(), None)
- result.close()
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object is closed.",
- result.fetchone
- )
-
- def test_row_case_sensitive(self):
- row = testing.db.execute(
- select([
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive")
- ])
- ).first()
-
- eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
-
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- not_in_("casesensitive", row._keymap)
-
- eq_(row["case_insensitive"], 1)
- eq_(row["CaseSensitive"], 2)
-
- assert_raises(
- KeyError,
- lambda: row["Case_insensitive"]
- )
- assert_raises(
- KeyError,
- lambda: row["casesensitive"]
- )
-
- def test_row_case_sensitive_unoptimized(self):
- ins_db = engines.testing_engine(options={"case_sensitive": True})
- row = ins_db.execute(
- select([
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive"),
- text("3 AS screw_up_the_cols")
- ])
- ).first()
-
- eq_(
- list(row.keys()),
- ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
-
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- not_in_("casesensitive", row._keymap)
-
- eq_(row["case_insensitive"], 1)
- eq_(row["CaseSensitive"], 2)
- eq_(row["screw_up_the_cols"], 3)
-
- assert_raises(KeyError, lambda: row["Case_insensitive"])
- assert_raises(KeyError, lambda: row["casesensitive"])
- assert_raises(KeyError, lambda: row["screw_UP_the_cols"])
-
- def test_row_case_insensitive(self):
- ins_db = engines.testing_engine(options={"case_sensitive": False})
- row = ins_db.execute(
- select([
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive")
- ])
- ).first()
-
- eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
-
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- in_("casesensitive", row._keymap)
-
- eq_(row["case_insensitive"], 1)
- eq_(row["CaseSensitive"], 2)
- eq_(row["Case_insensitive"], 1)
- eq_(row["casesensitive"], 2)
-
- def test_row_case_insensitive_unoptimized(self):
- ins_db = engines.testing_engine(options={"case_sensitive": False})
- row = ins_db.execute(
- select([
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive"),
- text("3 AS screw_up_the_cols")
- ])
- ).first()
-
- eq_(
- list(row.keys()),
- ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
-
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- in_("casesensitive", row._keymap)
-
- eq_(row["case_insensitive"], 1)
- eq_(row["CaseSensitive"], 2)
- eq_(row["screw_up_the_cols"], 3)
- eq_(row["Case_insensitive"], 1)
- eq_(row["casesensitive"], 2)
- eq_(row["screw_UP_the_cols"], 3)
-
- def test_row_as_args(self):
- users.insert().execute(user_id=1, user_name='john')
- r = users.select(users.c.user_id == 1).execute().first()
- users.delete().execute()
- users.insert().execute(r)
- eq_(users.select().execute().fetchall(), [(1, 'john')])
-
- def test_result_as_args(self):
- users.insert().execute([
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='ed')])
- r = users.select().execute()
- users2.insert().execute(list(r))
- eq_(
- users2.select().order_by(users2.c.user_id).execute().fetchall(),
- [(1, 'john'), (2, 'ed')]
- )
-
- users2.delete().execute()
- r = users.select().execute()
- users2.insert().execute(*list(r))
- eq_(
- users2.select().order_by(users2.c.user_id).execute().fetchall(),
- [(1, 'john'), (2, 'ed')]
- )
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column(self):
- users.insert().execute(user_id=1, user_name='john')
- result = users.outerjoin(addresses).select().execute()
- r = result.first()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r['user_id']
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r[users.c.user_id]
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r[addresses.c.user_id]
- )
-
- # try to trick it - fake_table isn't in the result!
- # we get the correct error
- fake_table = Table('fake', MetaData(), Column('user_id', Integer))
- assert_raises_message(
- exc.InvalidRequestError,
- "Could not locate column in row for column 'fake.user_id'",
- lambda: r[fake_table.c.user_id]
- )
-
- r = util.pickle.loads(util.pickle.dumps(r))
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r['user_id']
- )
-
- result = users.outerjoin(addresses).select().execute()
- result = _result.BufferedColumnResultProxy(result.context)
- r = result.first()
- assert isinstance(r, _result.BufferedColumnRow)
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r['user_id']
- )
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column_by_col(self):
- users.insert().execute(user_id=1, user_name='john')
- ua = users.alias()
- u2 = users.alias()
- result = select([users.c.user_id, ua.c.user_id]).execute()
- row = result.first()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row[users.c.user_id]
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row[ua.c.user_id]
- )
-
- # Unfortunately, this fails -
- # we'd like
- # "Could not locate column in row"
- # to be raised here, but the check for
- # "common column" in _compare_name_for_result()
- # has other requirements to be more liberal.
- # Ultimately the
- # expression system would need a way to determine
- # if given two columns in a "proxy" relationship, if they
- # refer to a different parent table
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row[u2.c.user_id]
- )
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column_contains(self):
- # ticket 2702. in 0.7 we'd get True, False.
- # in 0.8, both columns are present so it's True;
- # but when they're fetched you'll get the ambiguous error.
- users.insert().execute(user_id=1, user_name='john')
- result = select([users.c.user_id, addresses.c.user_id]).\
- select_from(users.outerjoin(addresses)).execute()
- row = result.first()
-
- eq_(
- set([users.c.user_id in row, addresses.c.user_id in row]),
- set([True])
- )
-
- def test_ambiguous_column_by_col_plus_label(self):
- users.insert().execute(user_id=1, user_name='john')
- result = select(
- [users.c.user_id,
- type_coerce(users.c.user_id, Integer).label('foo')]).execute()
- row = result.first()
- eq_(
- row[users.c.user_id], 1
- )
- eq_(
- row[1], 1
- )
-
- def test_fetch_partial_result_map(self):
- users.insert().execute(user_id=7, user_name='ed')
-
- t = text("select * from query_users").columns(
- user_name=String()
- )
- eq_(
- testing.db.execute(t).fetchall(), [(7, 'ed')]
- )
-
- def test_fetch_unordered_result_map(self):
- users.insert().execute(user_id=7, user_name='ed')
-
- class Goofy1(TypeDecorator):
- impl = String
-
- def process_result_value(self, value, dialect):
- return value + "a"
-
- class Goofy2(TypeDecorator):
- impl = String
-
- def process_result_value(self, value, dialect):
- return value + "b"
-
- class Goofy3(TypeDecorator):
- impl = String
-
- def process_result_value(self, value, dialect):
- return value + "c"
-
- t = text(
- "select user_name as a, user_name as b, "
- "user_name as c from query_users").columns(
- a=Goofy1(), b=Goofy2(), c=Goofy3()
- )
- eq_(
- testing.db.execute(t).fetchall(), [
- ('eda', 'edb', 'edc')
- ]
- )
-
- @testing.requires.subqueries
- def test_column_label_targeting(self):
- users.insert().execute(user_id=7, user_name='ed')
-
- for s in (
- users.select().alias('foo'),
- users.select().alias(users.name),
- ):
- row = s.select(use_labels=True).execute().first()
- assert row[s.c.user_id] == 7
- assert row[s.c.user_name] == 'ed'
-
- def test_keys(self):
- users.insert().execute(user_id=1, user_name='foo')
- result = users.select().execute()
- eq_(
- result.keys(),
- ['user_id', 'user_name']
- )
- row = result.first()
- eq_(
- row.keys(),
- ['user_id', 'user_name']
- )
-
- def test_keys_anon_labels(self):
- """test [ticket:3483]"""
-
- users.insert().execute(user_id=1, user_name='foo')
- result = testing.db.execute(
- select([
- users.c.user_id,
- users.c.user_name.label(None),
- func.count(literal_column('1'))]).
- group_by(users.c.user_id, users.c.user_name)
- )
-
- eq_(
- result.keys(),
- ['user_id', 'user_name_1', 'count_1']
- )
- row = result.first()
- eq_(
- row.keys(),
- ['user_id', 'user_name_1', 'count_1']
- )
-
- def test_items(self):
- users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().first()
- eq_(
- [(x[0].lower(), x[1]) for x in list(r.items())],
- [('user_id', 1), ('user_name', 'foo')])
-
- def test_len(self):
- users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().first()
- eq_(len(r), 2)
-
- r = testing.db.execute('select user_name, user_id from query_users'). \
- first()
- eq_(len(r), 2)
- r = testing.db.execute('select user_name from query_users').first()
- eq_(len(r), 1)
-
- def test_sorting_in_python(self):
- users.insert().execute(
- dict(user_id=1, user_name='foo'),
- dict(user_id=2, user_name='bar'),
- dict(user_id=3, user_name='def'),
- )
-
- rows = users.select().order_by(users.c.user_name).execute().fetchall()
-
- eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')])
-
- eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')])
-
- def test_column_order_with_simple_query(self):
- # should return values in column definition order
- users.insert().execute(user_id=1, user_name='foo')
- r = users.select(users.c.user_id == 1).execute().first()
- eq_(r[0], 1)
- eq_(r[1], 'foo')
- eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name'])
- eq_(list(r.values()), [1, 'foo'])
-
- def test_column_order_with_text_query(self):
- # should return values in query order
- users.insert().execute(user_id=1, user_name='foo')
- r = testing.db.execute('select user_name, user_id from query_users'). \
- first()
- eq_(r[0], 'foo')
- eq_(r[1], 1)
- eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id'])
- eq_(list(r.values()), ['foo', 1])
-
- @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()')
- @testing.crashes('firebird', 'An identifier must begin with a letter')
- def test_column_accessor_shadow(self):
- meta = MetaData(testing.db)
- shadowed = Table(
- 'test_shadowed', meta,
- Column('shadow_id', INT, primary_key=True),
- Column('shadow_name', VARCHAR(20)),
- Column('parent', VARCHAR(20)),
- Column('row', VARCHAR(40)),
- Column('_parent', VARCHAR(20)),
- Column('_row', VARCHAR(20)),
- )
- shadowed.create(checkfirst=True)
- try:
- shadowed.insert().execute(
- shadow_id=1, shadow_name='The Shadow', parent='The Light',
- row='Without light there is no shadow',
- _parent='Hidden parent', _row='Hidden row')
- r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
- self.assert_(
- r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
- self.assert_(
- r.shadow_name == r['shadow_name'] ==
- r[shadowed.c.shadow_name] == 'The Shadow')
- self.assert_(
- r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
- self.assert_(
- r.row == r['row'] == r[shadowed.c.row] ==
- 'Without light there is no shadow')
- self.assert_(r['_parent'] == 'Hidden parent')
- self.assert_(r['_row'] == 'Hidden row')
- finally:
- shadowed.drop(checkfirst=True)
-
@testing.emits_warning('.*empty sequence.*')
def test_in_filtering(self):
"""test the behavior of the in_() function."""
@@ -1578,393 +553,6 @@ class RequiredBindTest(fixtures.TablesTest):
is_(bindparam('foo', callable_=c, required=False).required, False)
-class TableInsertTest(fixtures.TablesTest):
-
- """test for consistent insert behavior across dialects
- regarding the inline=True flag, lower-case 't' tables.
-
- """
- run_create_tables = 'each'
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- 'foo', metadata,
- Column('id', Integer, Sequence('t_id_seq'), primary_key=True),
- Column('data', String(50)),
- Column('x', Integer)
- )
-
- def _fixture(self, types=True):
- if types:
- t = sql.table(
- 'foo', sql.column('id', Integer),
- sql.column('data', String),
- sql.column('x', Integer))
- else:
- t = sql.table(
- 'foo', sql.column('id'), sql.column('data'), sql.column('x'))
- return t
-
- def _test(self, stmt, row, returning=None, inserted_primary_key=False):
- r = testing.db.execute(stmt)
-
- if returning:
- returned = r.first()
- eq_(returned, returning)
- elif inserted_primary_key is not False:
- eq_(r.inserted_primary_key, inserted_primary_key)
-
- eq_(testing.db.execute(self.tables.foo.select()).first(), row)
-
- def _test_multi(self, stmt, rows, data):
- testing.db.execute(stmt, rows)
- eq_(
- testing.db.execute(
- self.tables.foo.select().
- order_by(self.tables.foo.c.id)).fetchall(),
- data)
-
- @testing.requires.sequences
- def test_expicit_sequence(self):
- t = self._fixture()
- self._test(
- t.insert().values(
- id=func.next_value(Sequence('t_id_seq')), data='data', x=5),
- (1, 'data', 5)
- )
-
- def test_uppercase(self):
- t = self.tables.foo
- self._test(
- t.insert().values(id=1, data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[1]
- )
-
- def test_uppercase_inline(self):
- t = self.tables.foo
- self._test(
- t.insert(inline=True).values(id=1, data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[1]
- )
-
- @testing.crashes(
- "mssql+pyodbc",
- "Pyodbc + SQL Server + Py3K, some decimal handling issue")
- def test_uppercase_inline_implicit(self):
- t = self.tables.foo
- self._test(
- t.insert(inline=True).values(data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[None]
- )
-
- def test_uppercase_implicit(self):
- t = self.tables.foo
- self._test(
- t.insert().values(data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[1]
- )
-
- def test_uppercase_direct_params(self):
- t = self.tables.foo
- self._test(
- t.insert().values(id=1, data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[1]
- )
-
- @testing.requires.returning
- def test_uppercase_direct_params_returning(self):
- t = self.tables.foo
- self._test(
- t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x),
- (1, 'data', 5),
- returning=(1, 5)
- )
-
- @testing.fails_on(
- 'mssql', "lowercase table doesn't support identity insert disable")
- def test_direct_params(self):
- t = self._fixture()
- self._test(
- t.insert().values(id=1, data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[]
- )
-
- @testing.fails_on(
- 'mssql', "lowercase table doesn't support identity insert disable")
- @testing.requires.returning
- def test_direct_params_returning(self):
- t = self._fixture()
- self._test(
- t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x),
- (1, 'data', 5),
- returning=(1, 5)
- )
-
- @testing.requires.emulated_lastrowid
- def test_implicit_pk(self):
- t = self._fixture()
- self._test(
- t.insert().values(data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[]
- )
-
- @testing.requires.emulated_lastrowid
- def test_implicit_pk_multi_rows(self):
- t = self._fixture()
- self._test_multi(
- t.insert(),
- [
- {'data': 'd1', 'x': 5},
- {'data': 'd2', 'x': 6},
- {'data': 'd3', 'x': 7},
- ],
- [
- (1, 'd1', 5),
- (2, 'd2', 6),
- (3, 'd3', 7)
- ],
- )
-
- @testing.requires.emulated_lastrowid
- def test_implicit_pk_inline(self):
- t = self._fixture()
- self._test(
- t.insert(inline=True).values(data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[]
- )
-
-
-class KeyTargetingTest(fixtures.TablesTest):
- run_inserts = 'once'
- run_deletes = None
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- 'keyed1', metadata, Column("a", CHAR(2), key="b"),
- Column("c", CHAR(2), key="q")
- )
- Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2)))
- Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2)))
- Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2)))
- Table('content', metadata, Column('t', String(30), key="type"))
- Table('bar', metadata, Column('ctype', String(30), key="content_type"))
-
- if testing.requires.schemas.enabled:
- Table(
- 'wschema', metadata,
- Column("a", CHAR(2), key="b"),
- Column("c", CHAR(2), key="q"),
- schema=testing.config.test_schema
- )
-
- @classmethod
- def insert_data(cls):
- cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
- cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
- cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
- cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
- cls.tables.content.insert().execute(type="t1")
-
- if testing.requires.schemas.enabled:
- cls.tables['%s.wschema' % testing.config.test_schema].insert().execute(
- dict(b="a1", q="c1"))
-
- @testing.requires.schemas
- def test_keyed_accessor_wschema(self):
- keyed1 = self.tables['%s.wschema' % testing.config.test_schema]
- row = testing.db.execute(keyed1.select()).first()
-
- eq_(row.b, "a1")
- eq_(row.q, "c1")
- eq_(row.a, "a1")
- eq_(row.c, "c1")
-
- def test_keyed_accessor_single(self):
- keyed1 = self.tables.keyed1
- row = testing.db.execute(keyed1.select()).first()
-
- eq_(row.b, "a1")
- eq_(row.q, "c1")
- eq_(row.a, "a1")
- eq_(row.c, "c1")
-
- def test_keyed_accessor_single_labeled(self):
- keyed1 = self.tables.keyed1
- row = testing.db.execute(keyed1.select().apply_labels()).first()
-
- eq_(row.keyed1_b, "a1")
- eq_(row.keyed1_q, "c1")
- eq_(row.keyed1_a, "a1")
- eq_(row.keyed1_c, "c1")
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_keyed_accessor_composite_conflict_2(self):
- keyed1 = self.tables.keyed1
- keyed2 = self.tables.keyed2
-
- row = testing.db.execute(select([keyed1, keyed2])).first()
- # row.b is unambiguous
- eq_(row.b, "b2")
- # row.a is ambiguous
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambig",
- getattr, row, "a"
- )
-
- def test_keyed_accessor_composite_names_precedent(self):
- keyed1 = self.tables.keyed1
- keyed4 = self.tables.keyed4
-
- row = testing.db.execute(select([keyed1, keyed4])).first()
- eq_(row.b, "b4")
- eq_(row.q, "q4")
- eq_(row.a, "a1")
- eq_(row.c, "c1")
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_keyed_accessor_composite_keys_precedent(self):
- keyed1 = self.tables.keyed1
- keyed3 = self.tables.keyed3
-
- row = testing.db.execute(select([keyed1, keyed3])).first()
- eq_(row.q, "c1")
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name 'b'",
- getattr, row, "b"
- )
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name 'a'",
- getattr, row, "a"
- )
- eq_(row.d, "d3")
-
- def test_keyed_accessor_composite_labeled(self):
- keyed1 = self.tables.keyed1
- keyed2 = self.tables.keyed2
-
- row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \
- first()
- eq_(row.keyed1_b, "a1")
- eq_(row.keyed1_a, "a1")
- eq_(row.keyed1_q, "c1")
- eq_(row.keyed1_c, "c1")
- eq_(row.keyed2_a, "a2")
- eq_(row.keyed2_b, "b2")
- assert_raises(KeyError, lambda: row['keyed2_c'])
- assert_raises(KeyError, lambda: row['keyed2_q'])
-
- def test_column_label_overlap_fallback(self):
- content, bar = self.tables.content, self.tables.bar
- row = testing.db.execute(
- select([content.c.type.label("content_type")])).first()
- assert content.c.type not in row
- assert bar.c.content_type not in row
- assert sql.column('content_type') in row
-
- row = testing.db.execute(select([func.now().label("content_type")])). \
- first()
- assert content.c.type not in row
- assert bar.c.content_type not in row
- assert sql.column('content_type') in row
-
- def test_column_label_overlap_fallback_2(self):
- content, bar = self.tables.content, self.tables.bar
- row = testing.db.execute(content.select(use_labels=True)).first()
- assert content.c.type in row
- assert bar.c.content_type not in row
- assert sql.column('content_type') not in row
-
- def test_columnclause_schema_column_one(self):
- keyed2 = self.tables.keyed2
-
- # this is addressed by [ticket:2932]
- # ColumnClause._compare_name_for_result allows the
- # columns which the statement is against to be lightweight
- # cols, which results in a more liberal comparison scheme
- a, b = sql.column('a'), sql.column('b')
- stmt = select([a, b]).select_from(table("keyed2"))
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert a in row
- assert b in row
-
- def test_columnclause_schema_column_two(self):
- keyed2 = self.tables.keyed2
-
- a, b = sql.column('a'), sql.column('b')
- stmt = select([keyed2.c.a, keyed2.c.b])
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert a in row
- assert b in row
-
- def test_columnclause_schema_column_three(self):
- keyed2 = self.tables.keyed2
-
- # this is also addressed by [ticket:2932]
-
- a, b = sql.column('a'), sql.column('b')
- stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert a in row
- assert b in row
- assert stmt.c.a in row
- assert stmt.c.b in row
-
- def test_columnclause_schema_column_four(self):
- keyed2 = self.tables.keyed2
-
- # this is also addressed by [ticket:2932]
-
- a, b = sql.column('keyed2_a'), sql.column('keyed2_b')
- stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
- a, b)
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert a in row
- assert b in row
- assert stmt.c.keyed2_a in row
- assert stmt.c.keyed2_b in row
-
- def test_columnclause_schema_column_five(self):
- keyed2 = self.tables.keyed2
-
- # this is also addressed by [ticket:2932]
-
- stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
- keyed2_a=CHAR, keyed2_b=CHAR)
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert stmt.c.keyed2_a in row
- assert stmt.c.keyed2_b in row
-
-
class LimitTest(fixtures.TestBase):
__backend__ = True
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
new file mode 100644
index 000000000..8461996ea
--- /dev/null
+++ b/test/sql/test_resultset.py
@@ -0,0 +1,1136 @@
+from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \
+ in_, not_in_, is_, ne_
+from sqlalchemy import testing
+from sqlalchemy.testing import fixtures, engines
+from sqlalchemy import util
+from sqlalchemy import (
+ exc, sql, func, select, String, Integer, MetaData, ForeignKey,
+ VARCHAR, INT, CHAR, text, type_coerce, literal_column,
+ TypeDecorator, table, column)
+from sqlalchemy.engine import result as _result
+from sqlalchemy.testing.schema import Table, Column
+import operator
+
+
+class ResultProxyTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'users', metadata,
+ Column(
+ 'user_id', INT, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('user_name', VARCHAR(20)),
+ test_needs_acid=True
+ )
+ Table(
+ 'addresses', metadata,
+ Column(
+ 'address_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('user_id', Integer, ForeignKey('users.user_id')),
+ Column('address', String(30)),
+ test_needs_acid=True
+ )
+
+ Table(
+ 'users2', metadata,
+ Column('user_id', INT, primary_key=True),
+ Column('user_name', VARCHAR(20)),
+ test_needs_acid=True
+ )
+
+ def test_row_iteration(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9, 'user_name': 'fred'},
+ )
+ r = users.select().execute()
+ l = []
+ for row in r:
+ l.append(row)
+ eq_(len(l), 3)
+
+ @testing.requires.subqueries
+ def test_anonymous_rows(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9, 'user_name': 'fred'},
+ )
+
+ sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \
+ as_scalar()
+ for row in select([sel + 1, sel + 3], bind=users.bind).execute():
+ eq_(row['anon_1'], 8)
+ eq_(row['anon_2'], 10)
+
+ def test_row_comparison(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=7, user_name='jack')
+ rp = users.select().execute().first()
+
+ eq_(rp, rp)
+ is_(not(rp != rp), True)
+
+ equal = (7, 'jack')
+
+ eq_(rp, equal)
+ eq_(equal, rp)
+ is_((not (rp != equal)), True)
+ is_(not (equal != equal), True)
+
+ def endless():
+ while True:
+ yield 1
+ ne_(rp, endless())
+ ne_(endless(), rp)
+
+ # test that everything compares the same
+ # as it would against a tuple
+ for compare in [False, 8, endless(), 'xyz', (7, 'jack')]:
+ for op in [
+ operator.eq, operator.ne, operator.gt,
+ operator.lt, operator.ge, operator.le
+ ]:
+
+ try:
+ control = op(equal, compare)
+ except TypeError:
+ # Py3K raises TypeError for some invalid comparisons
+ assert_raises(TypeError, op, rp, compare)
+ else:
+ eq_(control, op(rp, compare))
+
+ try:
+ control = op(compare, equal)
+ except TypeError:
+ # Py3K raises TypeError for some invalid comparisons
+ assert_raises(TypeError, op, compare, rp)
+ else:
+ eq_(control, op(compare, rp))
+
+ @testing.provide_metadata
+ def test_column_label_overlap_fallback(self):
+ content = Table(
+ 'content', self.metadata,
+ Column('type', String(30)),
+ )
+ bar = Table(
+ 'bar', self.metadata,
+ Column('content_type', String(30))
+ )
+ self.metadata.create_all(testing.db)
+ testing.db.execute(content.insert().values(type="t1"))
+
+ row = testing.db.execute(content.select(use_labels=True)).first()
+ in_(content.c.type, row)
+ not_in_(bar.c.content_type, row)
+ in_(sql.column('content_type'), row)
+
+ row = testing.db.execute(
+ select([content.c.type.label("content_type")])).first()
+ in_(content.c.type, row)
+
+ not_in_(bar.c.content_type, row)
+
+ in_(sql.column('content_type'), row)
+
+ row = testing.db.execute(select([func.now().label("content_type")])). \
+ first()
+ not_in_(content.c.type, row)
+
+ not_in_(bar.c.content_type, row)
+
+ in_(sql.column('content_type'), row)
+
+ def test_pickled_rows(self):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ users.insert().execute(
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9, 'user_name': 'fred'},
+ )
+
+ for pickle in False, True:
+ for use_labels in False, True:
+ result = users.select(use_labels=use_labels).order_by(
+ users.c.user_id).execute().fetchall()
+
+ if pickle:
+ result = util.pickle.loads(util.pickle.dumps(result))
+
+ eq_(
+ result,
+ [(7, "jack"), (8, "ed"), (9, "fred")]
+ )
+ if use_labels:
+ eq_(result[0]['users_user_id'], 7)
+ eq_(
+ list(result[0].keys()),
+ ["users_user_id", "users_user_name"])
+ else:
+ eq_(result[0]['user_id'], 7)
+ eq_(list(result[0].keys()), ["user_id", "user_name"])
+
+ eq_(result[0][0], 7)
+ eq_(result[0][users.c.user_id], 7)
+ eq_(result[0][users.c.user_name], 'jack')
+
+ if not pickle or use_labels:
+ assert_raises(
+ exc.NoSuchColumnError,
+ lambda: result[0][addresses.c.user_id])
+ else:
+ # test with a different table. name resolution is
+ # causing 'user_id' to match when use_labels wasn't used.
+ eq_(result[0][addresses.c.user_id], 7)
+
+ assert_raises(
+ exc.NoSuchColumnError, lambda: result[0]['fake key'])
+ assert_raises(
+ exc.NoSuchColumnError,
+ lambda: result[0][addresses.c.address_id])
+
+ def test_column_error_printing(self):
+ row = testing.db.execute(select([1])).first()
+
+ class unprintable(object):
+
+ def __str__(self):
+ raise ValueError("nope")
+
+ msg = r"Could not locate column in row for column '%s'"
+
+ for accessor, repl in [
+ ("x", "x"),
+ (Column("q", Integer), "q"),
+ (Column("q", Integer) + 12, r"q \+ :q_1"),
+ (unprintable(), "unprintable element.*"),
+ ]:
+ assert_raises_message(
+ exc.NoSuchColumnError,
+ msg % repl,
+ lambda: row[accessor]
+ )
+
+ def test_fetchmany(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=7, user_name='jack')
+ users.insert().execute(user_id=8, user_name='ed')
+ users.insert().execute(user_id=9, user_name='fred')
+ r = users.select().execute()
+ l = []
+ for row in r.fetchmany(size=2):
+ l.append(row)
+ eq_(len(l), 2)
+
+ def test_column_slices(self):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ users.insert().execute(user_id=1, user_name='john')
+ users.insert().execute(user_id=2, user_name='jack')
+ addresses.insert().execute(
+ address_id=1, user_id=2, address='foo@bar.com')
+
+ r = text(
+ "select * from addresses", bind=testing.db).execute().first()
+ eq_(r[0:1], (1,))
+ eq_(r[1:], (2, 'foo@bar.com'))
+ eq_(r[:-1], (1, 2))
+
+ def test_column_accessor_basic_compiled(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='john'),
+ dict(user_id=2, user_name='jack')
+ )
+
+ r = users.select(users.c.user_id == 2).execute().first()
+ eq_(r.user_id, 2)
+ eq_(r['user_id'], 2)
+ eq_(r[users.c.user_id], 2)
+
+ eq_(r.user_name, 'jack')
+ eq_(r['user_name'], 'jack')
+ eq_(r[users.c.user_name], 'jack')
+
+ def test_column_accessor_basic_text(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='john'),
+ dict(user_id=2, user_name='jack')
+ )
+ r = testing.db.execute(
+ text("select * from users where user_id=2")).first()
+
+ eq_(r.user_id, 2)
+ eq_(r['user_id'], 2)
+ eq_(r[users.c.user_id], 2)
+
+ eq_(r.user_name, 'jack')
+ eq_(r['user_name'], 'jack')
+ eq_(r[users.c.user_name], 'jack')
+
+ def test_column_accessor_textual_select(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='john'),
+ dict(user_id=2, user_name='jack')
+ )
+ # this will create column() objects inside
+ # the select(), these need to match on name anyway
+ r = testing.db.execute(
+ select([
+ column('user_id'), column('user_name')
+ ]).select_from(table('users')).
+ where(text('user_id=2'))
+ ).first()
+
+ eq_(r.user_id, 2)
+ eq_(r['user_id'], 2)
+ eq_(r[users.c.user_id], 2)
+
+ eq_(r.user_name, 'jack')
+ eq_(r['user_name'], 'jack')
+ eq_(r[users.c.user_name], 'jack')
+
+ def test_column_accessor_dotted_union(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='john'),
+ )
+
+ # test a little sqlite weirdness - with the UNION,
+ # cols come back as "users.user_id" in cursor.description
+ r = testing.db.execute(
+ text(
+ "select users.user_id, users.user_name "
+ "from users "
+ "UNION select users.user_id, "
+ "users.user_name from users"
+ )
+ ).first()
+ eq_(r['user_id'], 1)
+ eq_(r['user_name'], "john")
+ eq_(list(r.keys()), ["user_id", "user_name"])
+
+ @testing.only_on("sqlite", "sqlite specific feature")
+ def test_column_accessor_sqlite_raw(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='john'),
+ )
+
+ r = text(
+ "select users.user_id, users.user_name "
+ "from users "
+ "UNION select users.user_id, "
+ "users.user_name from users",
+ bind=testing.db).execution_options(sqlite_raw_colnames=True). \
+ execute().first()
+ not_in_('user_id', r)
+ not_in_('user_name', r)
+ eq_(r['users.user_id'], 1)
+ eq_(r['users.user_name'], "john")
+ eq_(list(r.keys()), ["users.user_id", "users.user_name"])
+
+ @testing.only_on("sqlite", "sqlite specific feature")
+ def test_column_accessor_sqlite_translated(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='john'),
+ )
+
+ r = text(
+ "select users.user_id, users.user_name "
+ "from users "
+ "UNION select users.user_id, "
+ "users.user_name from users",
+ bind=testing.db).execute().first()
+ eq_(r['user_id'], 1)
+ eq_(r['user_name'], "john")
+ eq_(r['users.user_id'], 1)
+ eq_(r['users.user_name'], "john")
+ eq_(list(r.keys()), ["user_id", "user_name"])
+
+ def test_column_accessor_labels_w_dots(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='john'),
+ )
+ # test using literal tablename.colname
+ r = text(
+ 'select users.user_id AS "users.user_id", '
+ 'users.user_name AS "users.user_name" '
+ 'from users', bind=testing.db).\
+ execution_options(sqlite_raw_colnames=True).execute().first()
+ eq_(r['users.user_id'], 1)
+ eq_(r['users.user_name'], "john")
+ not_in_("user_name", r)
+ eq_(list(r.keys()), ["users.user_id", "users.user_name"])
+
+ def test_column_accessor_unary(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='john'),
+ )
+
+ # unary experssions
+ r = select([users.c.user_name.distinct()]).order_by(
+ users.c.user_name).execute().first()
+ eq_(r[users.c.user_name], 'john')
+ eq_(r.user_name, 'john')
+
+ def test_column_accessor_err(self):
+ r = testing.db.execute(select([1])).first()
+ assert_raises_message(
+ AttributeError,
+ "Could not locate column in row for column 'foo'",
+ getattr, r, "foo"
+ )
+ assert_raises_message(
+ KeyError,
+ "Could not locate column in row for column 'foo'",
+ lambda: r['foo']
+ )
+
+ def test_graceful_fetch_on_non_rows(self):
+ """test that calling fetchone() etc. on a result that doesn't
+ return rows fails gracefully.
+
+ """
+
+ # these proxies don't work with no cursor.description present.
+ # so they don't apply to this test at the moment.
+ # result.FullyBufferedResultProxy,
+ # result.BufferedRowResultProxy,
+ # result.BufferedColumnResultProxy
+
+ users = self.tables.users
+
+ conn = testing.db.connect()
+ for meth in [
+ lambda r: r.fetchone(),
+ lambda r: r.fetchall(),
+ lambda r: r.first(),
+ lambda r: r.scalar(),
+ lambda r: r.fetchmany(),
+ lambda r: r._getter('user'),
+ lambda r: r._has_key('user'),
+ ]:
+ trans = conn.begin()
+ result = conn.execute(users.insert(), user_id=1)
+ assert_raises_message(
+ exc.ResourceClosedError,
+ "This result object does not return rows. "
+ "It has been closed automatically.",
+ meth, result,
+ )
+ trans.rollback()
+
+ def test_fetchone_til_end(self):
+ result = testing.db.execute("select * from users")
+ eq_(result.fetchone(), None)
+ eq_(result.fetchone(), None)
+ eq_(result.fetchone(), None)
+ result.close()
+ assert_raises_message(
+ exc.ResourceClosedError,
+ "This result object is closed.",
+ result.fetchone
+ )
+
+ def test_row_case_sensitive(self):
+ row = testing.db.execute(
+ select([
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive")
+ ])
+ ).first()
+
+ eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
+
+ in_("case_insensitive", row._keymap)
+ in_("CaseSensitive", row._keymap)
+ not_in_("casesensitive", row._keymap)
+
+ eq_(row["case_insensitive"], 1)
+ eq_(row["CaseSensitive"], 2)
+
+ assert_raises(
+ KeyError,
+ lambda: row["Case_insensitive"]
+ )
+ assert_raises(
+ KeyError,
+ lambda: row["casesensitive"]
+ )
+
+ def test_row_case_sensitive_unoptimized(self):
+ ins_db = engines.testing_engine(options={"case_sensitive": True})
+ row = ins_db.execute(
+ select([
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive"),
+ text("3 AS screw_up_the_cols")
+ ])
+ ).first()
+
+ eq_(
+ list(row.keys()),
+ ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
+
+ in_("case_insensitive", row._keymap)
+ in_("CaseSensitive", row._keymap)
+ not_in_("casesensitive", row._keymap)
+
+ eq_(row["case_insensitive"], 1)
+ eq_(row["CaseSensitive"], 2)
+ eq_(row["screw_up_the_cols"], 3)
+
+ assert_raises(KeyError, lambda: row["Case_insensitive"])
+ assert_raises(KeyError, lambda: row["casesensitive"])
+ assert_raises(KeyError, lambda: row["screw_UP_the_cols"])
+
+ def test_row_case_insensitive(self):
+ ins_db = engines.testing_engine(options={"case_sensitive": False})
+ row = ins_db.execute(
+ select([
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive")
+ ])
+ ).first()
+
+ eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
+
+ in_("case_insensitive", row._keymap)
+ in_("CaseSensitive", row._keymap)
+ in_("casesensitive", row._keymap)
+
+ eq_(row["case_insensitive"], 1)
+ eq_(row["CaseSensitive"], 2)
+ eq_(row["Case_insensitive"], 1)
+ eq_(row["casesensitive"], 2)
+
+ def test_row_case_insensitive_unoptimized(self):
+ ins_db = engines.testing_engine(options={"case_sensitive": False})
+ row = ins_db.execute(
+ select([
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive"),
+ text("3 AS screw_up_the_cols")
+ ])
+ ).first()
+
+ eq_(
+ list(row.keys()),
+ ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
+
+ in_("case_insensitive", row._keymap)
+ in_("CaseSensitive", row._keymap)
+ in_("casesensitive", row._keymap)
+
+ eq_(row["case_insensitive"], 1)
+ eq_(row["CaseSensitive"], 2)
+ eq_(row["screw_up_the_cols"], 3)
+ eq_(row["Case_insensitive"], 1)
+ eq_(row["casesensitive"], 2)
+ eq_(row["screw_UP_the_cols"], 3)
+
+ def test_row_as_args(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='john')
+ r = users.select(users.c.user_id == 1).execute().first()
+ users.delete().execute()
+ users.insert().execute(r)
+ eq_(users.select().execute().fetchall(), [(1, 'john')])
+
+ def test_result_as_args(self):
+ users = self.tables.users
+ users2 = self.tables.users2
+
+ users.insert().execute([
+ dict(user_id=1, user_name='john'),
+ dict(user_id=2, user_name='ed')])
+ r = users.select().execute()
+ users2.insert().execute(list(r))
+ eq_(
+ users2.select().order_by(users2.c.user_id).execute().fetchall(),
+ [(1, 'john'), (2, 'ed')]
+ )
+
+ users2.delete().execute()
+ r = users.select().execute()
+ users2.insert().execute(*list(r))
+ eq_(
+ users2.select().order_by(users2.c.user_id).execute().fetchall(),
+ [(1, 'john'), (2, 'ed')]
+ )
+
+ @testing.requires.duplicate_names_in_cursor_description
+ def test_ambiguous_column(self):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ users.insert().execute(user_id=1, user_name='john')
+ result = users.outerjoin(addresses).select().execute()
+ r = result.first()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: r['user_id']
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: r[users.c.user_id]
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: r[addresses.c.user_id]
+ )
+
+ # try to trick it - fake_table isn't in the result!
+ # we get the correct error
+ fake_table = Table('fake', MetaData(), Column('user_id', Integer))
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Could not locate column in row for column 'fake.user_id'",
+ lambda: r[fake_table.c.user_id]
+ )
+
+ r = util.pickle.loads(util.pickle.dumps(r))
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: r['user_id']
+ )
+
+ result = users.outerjoin(addresses).select().execute()
+ result = _result.BufferedColumnResultProxy(result.context)
+ r = result.first()
+ assert isinstance(r, _result.BufferedColumnRow)
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: r['user_id']
+ )
+
+ @testing.requires.duplicate_names_in_cursor_description
+ def test_ambiguous_column_by_col(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='john')
+ ua = users.alias()
+ u2 = users.alias()
+ result = select([users.c.user_id, ua.c.user_id]).execute()
+ row = result.first()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row[users.c.user_id]
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row[ua.c.user_id]
+ )
+
+ # Unfortunately, this fails -
+ # we'd like
+ # "Could not locate column in row"
+ # to be raised here, but the check for
+ # "common column" in _compare_name_for_result()
+ # has other requirements to be more liberal.
+ # Ultimately the
+ # expression system would need a way to determine
+ # if given two columns in a "proxy" relationship, if they
+ # refer to a different parent table
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row[u2.c.user_id]
+ )
+
+ @testing.requires.duplicate_names_in_cursor_description
+ def test_ambiguous_column_contains(self):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ # ticket 2702. in 0.7 we'd get True, False.
+ # in 0.8, both columns are present so it's True;
+ # but when they're fetched you'll get the ambiguous error.
+ users.insert().execute(user_id=1, user_name='john')
+ result = select([users.c.user_id, addresses.c.user_id]).\
+ select_from(users.outerjoin(addresses)).execute()
+ row = result.first()
+
+ eq_(
+ set([users.c.user_id in row, addresses.c.user_id in row]),
+ set([True])
+ )
+
+ def test_ambiguous_column_by_col_plus_label(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='john')
+ result = select(
+ [users.c.user_id,
+ type_coerce(users.c.user_id, Integer).label('foo')]).execute()
+ row = result.first()
+ eq_(
+ row[users.c.user_id], 1
+ )
+ eq_(
+ row[1], 1
+ )
+
+ def test_fetch_partial_result_map(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=7, user_name='ed')
+
+ t = text("select * from users").columns(
+ user_name=String()
+ )
+ eq_(
+ testing.db.execute(t).fetchall(), [(7, 'ed')]
+ )
+
+ def test_fetch_unordered_result_map(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=7, user_name='ed')
+
+ class Goofy1(TypeDecorator):
+ impl = String
+
+ def process_result_value(self, value, dialect):
+ return value + "a"
+
+ class Goofy2(TypeDecorator):
+ impl = String
+
+ def process_result_value(self, value, dialect):
+ return value + "b"
+
+ class Goofy3(TypeDecorator):
+ impl = String
+
+ def process_result_value(self, value, dialect):
+ return value + "c"
+
+ t = text(
+ "select user_name as a, user_name as b, "
+ "user_name as c from users").columns(
+ a=Goofy1(), b=Goofy2(), c=Goofy3()
+ )
+ eq_(
+ testing.db.execute(t).fetchall(), [
+ ('eda', 'edb', 'edc')
+ ]
+ )
+
+ @testing.requires.subqueries
+ def test_column_label_targeting(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=7, user_name='ed')
+
+ for s in (
+ users.select().alias('foo'),
+ users.select().alias(users.name),
+ ):
+ row = s.select(use_labels=True).execute().first()
+ eq_(row[s.c.user_id], 7)
+ eq_(row[s.c.user_name], 'ed')
+
+ def test_keys(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='foo')
+ result = users.select().execute()
+ eq_(
+ result.keys(),
+ ['user_id', 'user_name']
+ )
+ row = result.first()
+ eq_(
+ row.keys(),
+ ['user_id', 'user_name']
+ )
+
+ def test_keys_anon_labels(self):
+ """test [ticket:3483]"""
+
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='foo')
+ result = testing.db.execute(
+ select([
+ users.c.user_id,
+ users.c.user_name.label(None),
+ func.count(literal_column('1'))]).
+ group_by(users.c.user_id, users.c.user_name)
+ )
+
+ eq_(
+ result.keys(),
+ ['user_id', 'user_name_1', 'count_1']
+ )
+ row = result.first()
+ eq_(
+ row.keys(),
+ ['user_id', 'user_name_1', 'count_1']
+ )
+
+ def test_items(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select().execute().first()
+ eq_(
+ [(x[0].lower(), x[1]) for x in list(r.items())],
+ [('user_id', 1), ('user_name', 'foo')])
+
+ def test_len(self):
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select().execute().first()
+ eq_(len(r), 2)
+
+ r = testing.db.execute('select user_name, user_id from users'). \
+ first()
+ eq_(len(r), 2)
+ r = testing.db.execute('select user_name from users').first()
+ eq_(len(r), 1)
+
+ def test_sorting_in_python(self):
+ users = self.tables.users
+
+ users.insert().execute(
+ dict(user_id=1, user_name='foo'),
+ dict(user_id=2, user_name='bar'),
+ dict(user_id=3, user_name='def'),
+ )
+
+ rows = users.select().order_by(users.c.user_name).execute().fetchall()
+
+ eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')])
+
+ eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')])
+
+ def test_column_order_with_simple_query(self):
+ # should return values in column definition order
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select(users.c.user_id == 1).execute().first()
+ eq_(r[0], 1)
+ eq_(r[1], 'foo')
+ eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name'])
+ eq_(list(r.values()), [1, 'foo'])
+
+ def test_column_order_with_text_query(self):
+ # should return values in query order
+ users = self.tables.users
+
+ users.insert().execute(user_id=1, user_name='foo')
+ r = testing.db.execute('select user_name, user_id from users'). \
+ first()
+ eq_(r[0], 'foo')
+ eq_(r[1], 1)
+ eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id'])
+ eq_(list(r.values()), ['foo', 1])
+
+ @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()')
+ @testing.crashes('firebird', 'An identifier must begin with a letter')
+ @testing.provide_metadata
+ def test_column_accessor_shadow(self):
+ shadowed = Table(
+ 'test_shadowed', self.metadata,
+ Column('shadow_id', INT, primary_key=True),
+ Column('shadow_name', VARCHAR(20)),
+ Column('parent', VARCHAR(20)),
+ Column('row', VARCHAR(40)),
+ Column('_parent', VARCHAR(20)),
+ Column('_row', VARCHAR(20)),
+ )
+ self.metadata.create_all()
+ shadowed.insert().execute(
+ shadow_id=1, shadow_name='The Shadow', parent='The Light',
+ row='Without light there is no shadow',
+ _parent='Hidden parent', _row='Hidden row')
+ r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
+
+ eq_(r.shadow_id, 1)
+ eq_(r['shadow_id'], 1)
+ eq_(r[shadowed.c.shadow_id], 1)
+
+ eq_(r.shadow_name, 'The Shadow')
+ eq_(r['shadow_name'], 'The Shadow')
+ eq_(r[shadowed.c.shadow_name], 'The Shadow')
+
+ eq_(r.parent, 'The Light')
+ eq_(r['parent'], 'The Light')
+ eq_(r[shadowed.c.parent], 'The Light')
+
+ eq_(r.row, 'Without light there is no shadow')
+ eq_(r['row'], 'Without light there is no shadow')
+ eq_(r[shadowed.c.row], 'Without light there is no shadow')
+
+ eq_(r['_parent'], 'Hidden parent')
+ eq_(r['_row'], 'Hidden row')
+
+
+class KeyTargetingTest(fixtures.TablesTest):
+ run_inserts = 'once'
+ run_deletes = None
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'keyed1', metadata, Column("a", CHAR(2), key="b"),
+ Column("c", CHAR(2), key="q")
+ )
+ Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2)))
+ Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2)))
+ Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2)))
+ Table('content', metadata, Column('t', String(30), key="type"))
+ Table('bar', metadata, Column('ctype', String(30), key="content_type"))
+
+ if testing.requires.schemas.enabled:
+ Table(
+ 'wschema', metadata,
+ Column("a", CHAR(2), key="b"),
+ Column("c", CHAR(2), key="q"),
+ schema=testing.config.test_schema
+ )
+
+ @classmethod
+ def insert_data(cls):
+ cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
+ cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
+ cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
+ cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
+ cls.tables.content.insert().execute(type="t1")
+
+ if testing.requires.schemas.enabled:
+ cls.tables[
+ '%s.wschema' % testing.config.test_schema].insert().execute(
+ dict(b="a1", q="c1"))
+
+ @testing.requires.schemas
+ def test_keyed_accessor_wschema(self):
+ keyed1 = self.tables['%s.wschema' % testing.config.test_schema]
+ row = testing.db.execute(keyed1.select()).first()
+
+ eq_(row.b, "a1")
+ eq_(row.q, "c1")
+ eq_(row.a, "a1")
+ eq_(row.c, "c1")
+
+ def test_keyed_accessor_single(self):
+ keyed1 = self.tables.keyed1
+ row = testing.db.execute(keyed1.select()).first()
+
+ eq_(row.b, "a1")
+ eq_(row.q, "c1")
+ eq_(row.a, "a1")
+ eq_(row.c, "c1")
+
+ def test_keyed_accessor_single_labeled(self):
+ keyed1 = self.tables.keyed1
+ row = testing.db.execute(keyed1.select().apply_labels()).first()
+
+ eq_(row.keyed1_b, "a1")
+ eq_(row.keyed1_q, "c1")
+ eq_(row.keyed1_a, "a1")
+ eq_(row.keyed1_c, "c1")
+
+ @testing.requires.duplicate_names_in_cursor_description
+ def test_keyed_accessor_composite_conflict_2(self):
+ keyed1 = self.tables.keyed1
+ keyed2 = self.tables.keyed2
+
+ row = testing.db.execute(select([keyed1, keyed2])).first()
+ # row.b is unambiguous
+ eq_(row.b, "b2")
+ # row.a is ambiguous
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambig",
+ getattr, row, "a"
+ )
+
+ def test_keyed_accessor_composite_names_precedent(self):
+ keyed1 = self.tables.keyed1
+ keyed4 = self.tables.keyed4
+
+ row = testing.db.execute(select([keyed1, keyed4])).first()
+ eq_(row.b, "b4")
+ eq_(row.q, "q4")
+ eq_(row.a, "a1")
+ eq_(row.c, "c1")
+
+ @testing.requires.duplicate_names_in_cursor_description
+ def test_keyed_accessor_composite_keys_precedent(self):
+ keyed1 = self.tables.keyed1
+ keyed3 = self.tables.keyed3
+
+ row = testing.db.execute(select([keyed1, keyed3])).first()
+ eq_(row.q, "c1")
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name 'b'",
+ getattr, row, "b"
+ )
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name 'a'",
+ getattr, row, "a"
+ )
+ eq_(row.d, "d3")
+
+ def test_keyed_accessor_composite_labeled(self):
+ keyed1 = self.tables.keyed1
+ keyed2 = self.tables.keyed2
+
+ row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \
+ first()
+ eq_(row.keyed1_b, "a1")
+ eq_(row.keyed1_a, "a1")
+ eq_(row.keyed1_q, "c1")
+ eq_(row.keyed1_c, "c1")
+ eq_(row.keyed2_a, "a2")
+ eq_(row.keyed2_b, "b2")
+ assert_raises(KeyError, lambda: row['keyed2_c'])
+ assert_raises(KeyError, lambda: row['keyed2_q'])
+
+ def test_column_label_overlap_fallback(self):
+ content, bar = self.tables.content, self.tables.bar
+ row = testing.db.execute(
+ select([content.c.type.label("content_type")])).first()
+
+ not_in_(content.c.type, row)
+ not_in_(bar.c.content_type, row)
+
+ in_(sql.column('content_type'), row)
+
+ row = testing.db.execute(select([func.now().label("content_type")])). \
+ first()
+ not_in_(content.c.type, row)
+ not_in_(bar.c.content_type, row)
+ in_(sql.column('content_type'), row)
+
+ def test_column_label_overlap_fallback_2(self):
+ content, bar = self.tables.content, self.tables.bar
+ row = testing.db.execute(content.select(use_labels=True)).first()
+ in_(content.c.type, row)
+ not_in_(bar.c.content_type, row)
+ not_in_(sql.column('content_type'), row)
+
+ def test_columnclause_schema_column_one(self):
+ keyed2 = self.tables.keyed2
+
+ # this is addressed by [ticket:2932]
+ # ColumnClause._compare_name_for_result allows the
+ # columns which the statement is against to be lightweight
+ # cols, which results in a more liberal comparison scheme
+ a, b = sql.column('a'), sql.column('b')
+ stmt = select([a, b]).select_from(table("keyed2"))
+ row = testing.db.execute(stmt).first()
+
+ in_(keyed2.c.a, row)
+ in_(keyed2.c.b, row)
+ in_(a, row)
+ in_(b, row)
+
+ def test_columnclause_schema_column_two(self):
+ keyed2 = self.tables.keyed2
+
+ a, b = sql.column('a'), sql.column('b')
+ stmt = select([keyed2.c.a, keyed2.c.b])
+ row = testing.db.execute(stmt).first()
+
+ in_(keyed2.c.a, row)
+ in_(keyed2.c.b, row)
+ in_(a, row)
+ in_(b, row)
+
+ def test_columnclause_schema_column_three(self):
+ keyed2 = self.tables.keyed2
+
+ # this is also addressed by [ticket:2932]
+
+ a, b = sql.column('a'), sql.column('b')
+ stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
+ row = testing.db.execute(stmt).first()
+
+ in_(keyed2.c.a, row)
+ in_(keyed2.c.b, row)
+ in_(a, row)
+ in_(b, row)
+ in_(stmt.c.a, row)
+ in_(stmt.c.b, row)
+
+ def test_columnclause_schema_column_four(self):
+ keyed2 = self.tables.keyed2
+
+ # this is also addressed by [ticket:2932]
+
+ a, b = sql.column('keyed2_a'), sql.column('keyed2_b')
+ stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
+ a, b)
+ row = testing.db.execute(stmt).first()
+
+ in_(keyed2.c.a, row)
+ in_(keyed2.c.b, row)
+ in_(a, row)
+ in_(b, row)
+ in_(stmt.c.keyed2_a, row)
+ in_(stmt.c.keyed2_b, row)
+
+ def test_columnclause_schema_column_five(self):
+ keyed2 = self.tables.keyed2
+
+ # this is also addressed by [ticket:2932]
+
+ stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
+ keyed2_a=CHAR, keyed2_b=CHAR)
+ row = testing.db.execute(stmt).first()
+
+ in_(keyed2.c.a, row)
+ in_(keyed2.c.b, row)
+ in_(stmt.c.keyed2_a, row)
+ in_(stmt.c.keyed2_b, row)