diff options
34 files changed, 2534 insertions, 1627 deletions
diff --git a/doc/build/changelog/changelog_10.rst b/doc/build/changelog/changelog_10.rst index ef125eecf..a3b3e0092 100644 --- a/doc/build/changelog/changelog_10.rst +++ b/doc/build/changelog/changelog_10.rst @@ -19,6 +19,17 @@ :version: 1.0.9 .. change:: + :tags: bug, oracle + :versions: 1.1.0b1 + :tickets: 3548 + + Fixed bug in Oracle dialect where reflection of tables and other + symbols with names quoted to force all-lower-case would not be + identified properly in reflection queries. The :class:`.quoted_name` + construct is now applied to incoming symbol names that detect as + forced into all-lower-case within the "name normalize" process. + + .. change:: :tags: feature, orm :versions: 1.1.0b1 :pullreq: github:201 diff --git a/doc/build/changelog/changelog_11.rst b/doc/build/changelog/changelog_11.rst index e37fd1a69..dcd43f28d 100644 --- a/doc/build/changelog/changelog_11.rst +++ b/doc/build/changelog/changelog_11.rst @@ -22,6 +22,57 @@ :version: 1.1.0b1 .. change:: + :tags: change, sql, mysql + :tickets: 3216 + + The system by which a :class:`.Column` considers itself to be an + "auto increment" column has been changed, such that autoincrement + is no longer implicitly enabled for a :class:`.Table` that has a + composite primary key. In order to accommodate being able to enable + autoincrement for a composite PK member column while at the same time + maintaining SQLAlchemy's long standing behavior of enabling + implicit autoincrement for a single integer primary key, a third + state has been added to the :paramref:`.Column.autoincrement` parameter + ``"auto"``, which is now the default. + + .. seealso:: + + :ref:`change_3216` + + :ref:`change_mysql_3216` + + .. change:: + :tags: change, mysql + :tickets: 3216 + + The MySQL dialect no longer generates an extra "KEY" directive when + generating CREATE TABLE DDL for a table using InnoDB with a + composite primary key with AUTO_INCREMENT on a column that isn't the + first column; to overcome InnoDB's limitation here, the PRIMARY KEY + constraint is now generated with the AUTO_INCREMENT column placed + first in the list of columns. + + .. seealso:: + + :ref:`change_mysql_3216` + + :ref:`change_3216` + + .. change:: + :tags: change, sqlite + :pullreq: github:198 + + Added support to the SQLite dialect for the + :meth:`.Inspector.get_schema_names` method to work with SQLite; + pull request courtesy Brian Van Klaveren. Also repaired support + for creation of indexes with schemas as well as reflection of + foreign key constraints in schema-bound tables. + + .. seealso:: + + :ref:`change_sqlite_schemas` + + .. change:: :tags: change, mssql :tickets: 3434 diff --git a/doc/build/changelog/migration_11.rst b/doc/build/changelog/migration_11.rst index ca6c44165..5b7c8321a 100644 --- a/doc/build/changelog/migration_11.rst +++ b/doc/build/changelog/migration_11.rst @@ -16,7 +16,7 @@ What's New in SQLAlchemy 1.1? some issues may be moved to later milestones in order to allow for a timely release. - Document last updated: September 19, 2015 + Document last updated: October 7, 2015 Introduction ============ @@ -256,6 +256,103 @@ configuration of the existing object-level technique of assigning New Features and Improvements - Core ==================================== +.. _change_3216: + +The ``.autoincrement`` directive is no longer implicitly enabled for a composite primary key column +--------------------------------------------------------------------------------------------------- + +SQLAlchemy has always had the convenience feature of enabling the backend database's +"autoincrement" feature for a single-column integer primary key; by "autoincrement" +we mean that the database column will include whatever DDL directives the +database provides in order to indicate an auto-incrementing integer identifier, +such as the SERIAL keyword on Postgresql or AUTO_INCREMENT on MySQL, and additionally +that the dialect will recieve these generated values from the execution +of a :meth:`.Table.insert` construct using techniques appropriate to that +backend. + +What's changed is that this feature no longer turns on automatically for a +*composite* primary key; previously, a table definition such as:: + + Table( + 'some_table', metadata, + Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True) + ) + +Would have "autoincrement" semantics applied to the ``'x'`` column, only +because it's first in the list of primary key columns. In order to +disable this, one would have to turn off ``autoincrement`` on all columns:: + + # old way + Table( + 'some_table', metadata, + Column('x', Integer, primary_key=True, autoincrement=False), + Column('y', Integer, primary_key=True, autoincrement=False) + ) + +With the new behavior, the composite primary key will not have autoincrement +semantics unless a column is marked explcitly with ``autoincrement=True``:: + + # column 'y' will be SERIAL/AUTO_INCREMENT/ auto-generating + Table( + 'some_table', metadata, + Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True, autoincrement=True) + ) + +In order to anticipate some potential backwards-incompatible scenarios, +the :meth:`.Table.insert` construct will perform more thorough checks +for missing primary key values on composite primary key columns that don't +have autoincrement set up; given a table such as:: + + Table( + 'b', metadata, + Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True) + ) + +An INSERT emitted with no values for this table will produce the exception:: + + CompileError: Column 'b.x' is marked as a member of the primary + key for table 'b', but has no Python-side or server-side default + generator indicated, nor does it indicate 'autoincrement=True', + and no explicit value is passed. Primary key columns may not + store NULL. Note that as of SQLAlchemy 1.1, 'autoincrement=True' + must be indicated explicitly for composite (e.g. multicolumn) + primary keys if AUTO_INCREMENT/SERIAL/IDENTITY behavior is + expected for one of the columns in the primary key. CREATE TABLE + statements are impacted by this change as well on most backends. + +For a column that is receiving primary key values from a server-side +default or something less common such as a trigger, the presence of a +value generator can be indicated using :class:`.FetchedValue`:: + + Table( + 'b', metadata, + Column('x', Integer, primary_key=True, server_default=FetchedValue()), + Column('y', Integer, primary_key=True, server_default=FetchedValue()) + ) + +For the very unlikely case where a composite primary key is actually intended +to store NULL in one or more of its columns (only supported on SQLite and MySQL), +specify the column with ``nullable=True``:: + + Table( + 'b', metadata, + Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True, nullable=True) + ) + +In a related change, the ``autoincrement`` flag may be set to True +on a column that has a client-side or server-side default. This typically +will not have much impact on the behavior of the column during an INSERT. + + +.. seealso:: + + :ref:`change_mysql_3216` + +:ticket:`3216` .. _change_2528: @@ -787,10 +884,75 @@ emits:: Dialect Improvements and Changes - MySQL ============================================= +.. _change_mysql_3216: + +No more generation of an implicit KEY for composite primary key w/ AUTO_INCREMENT +--------------------------------------------------------------------------------- + +The MySQL dialect had the behavior such that if a composite primary key +on an InnoDB table featured AUTO_INCREMENT on one of its columns which was +not the first column, e.g.:: + + t = Table( + 'some_table', metadata, + Column('x', Integer, primary_key=True, autoincrement=False), + Column('y', Integer, primary_key=True, autoincrement=True), + mysql_engine='InnoDB' + ) + +DDL such as the following would be generated:: + + CREATE TABLE some_table ( + x INTEGER NOT NULL, + y INTEGER NOT NULL AUTO_INCREMENT, + PRIMARY KEY (x, y), + KEY idx_autoinc_y (y) + )ENGINE=InnoDB + +Note the above "KEY" with an auto-generated name; this is a change that +found its way into the dialect many years ago in response to the issue that +the AUTO_INCREMENT would otherwise fail on InnoDB without this additional KEY. + +This workaround has been removed and replaced with the much better system +of just stating the AUTO_INCREMENT column *first* within the primary key:: + + CREATE TABLE some_table ( + x INTEGER NOT NULL, + y INTEGER NOT NULL AUTO_INCREMENT, + PRIMARY KEY (y, x) + )ENGINE=InnoDB + +Along with the change :ref:`change_3216`, composite primary keys with +or without auto increment are now easier to specify; +:paramref:`.Column.autoincrement` +now defaults to the value ``"auto"`` and the ``autoincrement=False`` +directives are no longer needed:: + + t = Table( + 'some_table', metadata, + Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True, autoincrement=True), + mysql_engine='InnoDB' + ) + + Dialect Improvements and Changes - SQLite ============================================= +.. _change_sqlite_schemas: + +Improved Support for Remote Schemas +------------------------------------ + +The SQLite dialect now implements :meth:`.Inspector.get_schema_names` +and additionally has improved support for tables and indexes that are +created and reflected from a remote schema, which in SQLite is a +database that is assigned a name via the ``ATTACH`` statement; previously, +the ``CREATE INDEX`` DDL didn't work correctly for a schema-bound table +and the :meth:`.Inspector.get_foreign_keys` method will now indicate the +given schema in the results. Cross-schema foreign keys aren't supported. + Dialect Improvements and Changes - SQL Server ============================================= diff --git a/doc/build/orm/tutorial.rst b/doc/build/orm/tutorial.rst index ed8d05534..dff181f6b 100644 --- a/doc/build/orm/tutorial.rst +++ b/doc/build/orm/tutorial.rst @@ -795,11 +795,17 @@ Here's a rundown of some of the most common operators used in # or chain multiple filter()/filter_by() calls query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones') + .. note:: Make sure you use :func:`.and_` and **not** the + Python ``and`` operator! + * :func:`OR <.sql.expression.or_>`:: from sqlalchemy import or_ query.filter(or_(User.name == 'ed', User.name == 'wendy')) + .. note:: Make sure you use :func:`.or_` and **not** the + Python ``or`` operator! + * :meth:`MATCH <.ColumnOperators.match>`:: query.filter(User.name.match('wendy')) @@ -1350,6 +1356,16 @@ The reference documentation for :meth:`~.Query.join` contains detailed informati and examples of the calling styles accepted by this method; :meth:`~.Query.join` is an important method at the center of usage for any SQL-fluent application. +.. topic:: What does :class:`.Query` select from if there's multiple entities? + + The :meth:`.Query.join` method will **typically join from the leftmost + item** in the list of entities, when the ON clause is omitted, or if the + ON clause is a plain SQL expression. To control the first entity in the list + of JOINs, use the :meth:`.Query.select_from` method:: + + query = Session.query(User, Address).select_from(Address).join(User) + + .. _ormtutorial_aliases: Using Aliases diff --git a/lib/sqlalchemy/dialects/firebird/base.py b/lib/sqlalchemy/dialects/firebird/base.py index c34829cd3..acd419e85 100644 --- a/lib/sqlalchemy/dialects/firebird/base.py +++ b/lib/sqlalchemy/dialects/firebird/base.py @@ -648,7 +648,7 @@ class FBDialect(default.DefaultDialect): 'type': coltype, 'nullable': not bool(row['null_flag']), 'default': defvalue, - 'autoincrement': defvalue is None + 'autoincrement': 'auto', } if orig_colname.lower() == orig_colname: diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index e4f9ac3de..37e798014 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -186,7 +186,7 @@ CREATE TABLE statement for this column will yield:: LIMIT/OFFSET Support -------------------- -MSSQL has no support for the LIMIT or OFFSET keysowrds. LIMIT is +MSSQL has no support for the LIMIT or OFFSET keywords. LIMIT is supported directly through the ``TOP`` Transact SQL keyword:: select.limit diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index 4b3e5bcd1..2c78de2fc 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -1916,38 +1916,7 @@ class MySQLCompiler(compiler.SQLCompiler): return None -# ug. "InnoDB needs indexes on foreign keys and referenced keys [...]. -# Starting with MySQL 4.1.2, these indexes are created automatically. -# In older versions, the indexes must be created explicitly or the -# creation of foreign key constraints fails." - class MySQLDDLCompiler(compiler.DDLCompiler): - def create_table_constraints(self, table, **kw): - """Get table constraints.""" - constraint_string = super( - MySQLDDLCompiler, self).create_table_constraints(table, **kw) - - # why self.dialect.name and not 'mysql'? because of drizzle - is_innodb = 'engine' in table.dialect_options[self.dialect.name] and \ - table.dialect_options[self.dialect.name][ - 'engine'].lower() == 'innodb' - - auto_inc_column = table._autoincrement_column - - if is_innodb and \ - auto_inc_column is not None and \ - auto_inc_column is not list(table.primary_key)[0]: - if constraint_string: - constraint_string += ", \n\t" - constraint_string += "KEY %s (%s)" % ( - self.preparer.quote( - "idx_autoinc_%s" % auto_inc_column.name - ), - self.preparer.format_column(auto_inc_column) - ) - - return constraint_string - def get_column_specification(self, column, **kw): """Builds column DDL.""" diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py index c605bd510..82ec72f2b 100644 --- a/lib/sqlalchemy/dialects/oracle/base.py +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -287,6 +287,7 @@ from sqlalchemy import util, sql from sqlalchemy.engine import default, reflection from sqlalchemy.sql import compiler, visitors, expression from sqlalchemy.sql import operators as sql_operators +from sqlalchemy.sql.elements import quoted_name from sqlalchemy import types as sqltypes, schema as sa_schema from sqlalchemy.types import VARCHAR, NVARCHAR, CHAR, \ BLOB, CLOB, TIMESTAMP, FLOAT @@ -1032,6 +1033,8 @@ class OracleDialect(default.DefaultDialect): if name.upper() == name and not \ self.identifier_preparer._requires_quotes(name.lower()): return name.lower() + elif name.lower() == name: + return quoted_name(name, quote=True) else: return name @@ -1280,7 +1283,7 @@ class OracleDialect(default.DefaultDialect): 'type': coltype, 'nullable': nullable, 'default': default, - 'autoincrement': default is None + 'autoincrement': 'auto', } if orig_colname.lower() == orig_colname: cdict['quote'] = True diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index e19047b76..a1786d16c 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -853,12 +853,20 @@ class SQLiteDDLCompiler(compiler.DDLCompiler): if not column.nullable: colspec += " NOT NULL" - if (column.primary_key and - column.table.dialect_options['sqlite']['autoincrement'] and - len(column.table.primary_key.columns) == 1 and - issubclass(column.type._type_affinity, sqltypes.Integer) and - not column.foreign_keys): - colspec += " PRIMARY KEY AUTOINCREMENT" + if column.primary_key: + if ( + column.autoincrement is True and + len(column.table.primary_key.columns) != 1 + ): + raise exc.CompileError( + "SQLite does not support autoincrement for " + "composite primary keys") + + if (column.table.dialect_options['sqlite']['autoincrement'] and + len(column.table.primary_key.columns) == 1 and + issubclass(column.type._type_affinity, sqltypes.Integer) and + not column.foreign_keys): + colspec += " PRIMARY KEY AUTOINCREMENT" return colspec @@ -894,11 +902,25 @@ class SQLiteDDLCompiler(compiler.DDLCompiler): return preparer.format_table(table, use_schema=False) - def visit_create_index(self, create): + def visit_create_index(self, create, include_schema=False, + include_table_schema=True): index = create.element - - text = super(SQLiteDDLCompiler, self).visit_create_index( - create, include_table_schema=False) + self._verify_index_table(index) + preparer = self.preparer + text = "CREATE " + if index.unique: + text += "UNIQUE " + text += "INDEX %s ON %s (%s)" \ + % ( + self._prepared_index_name(index, + include_schema=True), + preparer.format_table(index.table, + use_schema=False), + ', '.join( + self.sql_compiler.process( + expr, include_table=False, literal_binds=True) for + expr in index.expressions) + ) whereclause = index.dialect_options["sqlite"]["where"] if whereclause is not None: @@ -1095,6 +1117,13 @@ class SQLiteDialect(default.DefaultDialect): return None @reflection.cache + def get_schema_names(self, connection, **kw): + s = "PRAGMA database_list" + dl = connection.execute(s) + + return [db[1] for db in dl if db[1] != "temp"] + + @reflection.cache def get_table_names(self, connection, schema=None, **kw): if schema is not None: qschema = self.identifier_preparer.quote_identifier(schema) @@ -1190,7 +1219,7 @@ class SQLiteDialect(default.DefaultDialect): 'type': coltype, 'nullable': nullable, 'default': default, - 'autoincrement': default is None, + 'autoincrement': 'auto', 'primary_key': primary_key, } @@ -1283,7 +1312,7 @@ class SQLiteDialect(default.DefaultDialect): fk = fks[numerical_id] = { 'name': None, 'constrained_columns': [], - 'referred_schema': None, + 'referred_schema': schema, 'referred_table': rtbl, 'referred_columns': [], } diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 16ead950e..dc36f2f35 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -1292,7 +1292,9 @@ class Query(object): session.query(MyClass).filter(MyClass.name == 'some name') - Multiple criteria are joined together by AND:: + Multiple criteria may be specified as comma separated; the effect + is that they will be joined together using the :func:`.and_` + function:: session.query(MyClass).\\ filter(MyClass.name == 'some name', MyClass.id > 5) @@ -1301,9 +1303,6 @@ class Query(object): WHERE clause of a select. String expressions are coerced into SQL expression constructs via the :func:`.text` construct. - .. versionchanged:: 0.7.5 - Multiple criteria joined by AND. - .. seealso:: :meth:`.Query.filter_by` - filter on keyword expressions. @@ -1327,7 +1326,9 @@ class Query(object): session.query(MyClass).filter_by(name = 'some name') - Multiple criteria are joined together by AND:: + Multiple criteria may be specified as comma separated; the effect + is that they will be joined together using the :func:`.and_` + function:: session.query(MyClass).\\ filter_by(name = 'some name', id = 5) @@ -2463,6 +2464,12 @@ class Query(object): Calling ``first()`` results in an execution of the underlying query. + .. seealso:: + + :meth:`.Query.one` + + :meth:`.Query.one_or_none` + """ if self._statement is not None: ret = list(self)[0:1] @@ -2495,6 +2502,12 @@ class Query(object): Added :meth:`.Query.one_or_none` + .. seealso:: + + :meth:`.Query.first` + + :meth:`.Query.one` + """ ret = list(self) diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 691195772..f1220ce31 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -2381,7 +2381,7 @@ class DDLCompiler(Compiled): text += "CONSTRAINT %s " % formatted_name text += "PRIMARY KEY " text += "(%s)" % ', '.join(self.preparer.quote(c.name) - for c in constraint) + for c in constraint.columns_autoinc_first) text += self.define_constraint_deferrability(constraint) return text diff --git a/lib/sqlalchemy/sql/crud.py b/lib/sqlalchemy/sql/crud.py index e6f16b698..72b66c036 100644 --- a/lib/sqlalchemy/sql/crud.py +++ b/lib/sqlalchemy/sql/crud.py @@ -212,6 +212,7 @@ def _scan_cols( for c in cols: col_key = _getattr_col_key(c) + if col_key in parameters and col_key not in check_columns: _append_param_parameter( @@ -248,6 +249,10 @@ def _scan_cols( elif implicit_return_defaults and \ c in implicit_return_defaults: compiler.returning.append(c) + elif c.primary_key and \ + c is not stmt.table._autoincrement_column and \ + not c.nullable: + _raise_pk_with_no_anticipated_value(c) elif compiler.isupdate: _append_param_update( @@ -285,6 +290,22 @@ def _append_param_parameter( def _append_param_insert_pk_returning(compiler, stmt, c, values, kw): + """Create a primary key expression in the INSERT statement and + possibly a RETURNING clause for it. + + If the column has a Python-side default, we will create a bound + parameter for it and "pre-execute" the Python function. If + the column has a SQL expression default, or is a sequence, + we will add it directly into the INSERT statement and add a + RETURNING element to get the new value. If the column has a + server side default or is marked as the "autoincrement" column, + we will add a RETRUNING element to get at the value. + + If all the above tests fail, that indicates a primary key column with no + noted default generation capabilities that has no parameter passed; + raise an exception. + + """ if c.default is not None: if c.default.is_sequence: if compiler.dialect.supports_sequences and \ @@ -303,9 +324,12 @@ def _append_param_insert_pk_returning(compiler, stmt, c, values, kw): values.append( (c, _create_prefetch_bind_param(compiler, c)) ) - - else: + elif c is stmt.table._autoincrement_column or c.server_default is not None: compiler.returning.append(c) + elif not c.nullable: + # no .default, no .server_default, not autoincrement, we have + # no indication this primary key column will have any value + _raise_pk_with_no_anticipated_value(c) def _create_prefetch_bind_param(compiler, c, process=True, name=None): @@ -342,18 +366,46 @@ def _process_multiparam_default_bind(compiler, c, index, kw): def _append_param_insert_pk(compiler, stmt, c, values, kw): + """Create a bound parameter in the INSERT statement to receive a + 'prefetched' default value. + + The 'prefetched' value indicates that we are to invoke a Python-side + default function or expliclt SQL expression before the INSERT statement + proceeds, so that we have a primary key value available. + + if the column has no noted default generation capabilities, it has + no value passed in either; raise an exception. + + """ if ( - (c.default is not None and - (not c.default.is_sequence or - compiler.dialect.supports_sequences)) or - c is stmt.table._autoincrement_column and - (compiler.dialect.supports_sequences or - compiler.dialect. - preexecute_autoincrement_sequences) + ( + # column has a Python-side default + c.default is not None and + ( + # and it won't be a Sequence + not c.default.is_sequence or + compiler.dialect.supports_sequences + ) + ) + or + ( + # column is the "autoincrement column" + c is stmt.table._autoincrement_column and + ( + # and it's either a "sequence" or a + # pre-executable "autoincrement" sequence + compiler.dialect.supports_sequences or + compiler.dialect.preexecute_autoincrement_sequences + ) + ) ): values.append( (c, _create_prefetch_bind_param(compiler, c)) ) + elif c.default is None and c.server_default is None and not c.nullable: + # no .default, no .server_default, not autoincrement, we have + # no indication this primary key column will have any value + _raise_pk_with_no_anticipated_value(c) def _append_param_insert_hasdefault( @@ -555,3 +607,24 @@ def _get_returning_modifiers(compiler, stmt): return need_pks, implicit_returning, \ implicit_return_defaults, postfetch_lastrowid + + +def _raise_pk_with_no_anticipated_value(c): + msg = ( + "Column '%s.%s' is marked as a member of the " + "primary key for table '%s', " + "but has no Python-side or server-side default generator indicated, " + "nor does it indicate 'autoincrement=True' or 'nullable=True', " + "and no explicit value is passed. " + "Primary key columns typically may not store NULL." + % + (c.table.fullname, c.name, c.table.fullname)) + if len(c.table.primary_key.columns) > 1: + msg += ( + " Note that as of SQLAlchemy 1.1, 'autoincrement=True' must be " + "indicated explicitly for composite (e.g. multicolumn) primary " + "keys if AUTO_INCREMENT/SERIAL/IDENTITY " + "behavior is expected for one of the columns in the primary key. " + "CREATE TABLE statements are impacted by this change as well on " + "most backends.") + raise exc.CompileError(msg) diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index 137208584..e20545962 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -572,18 +572,9 @@ class Table(DialectKWArgs, SchemaItem, TableClause): def _init_collections(self): pass - @util.memoized_property + @property def _autoincrement_column(self): - for col in self.primary_key: - if (col.autoincrement and col.type._type_affinity is not None and - issubclass(col.type._type_affinity, - type_api.INTEGERTYPE._type_affinity) and - (not col.foreign_keys or - col.autoincrement == 'ignore_fk') and - isinstance(col.default, (type(None), Sequence)) and - (col.server_default is None or - col.server_default.reflected)): - return col + return self.primary_key._autoincrement_column @property def key(self): @@ -913,17 +904,40 @@ class Column(SchemaItem, ColumnClause): argument is available such as ``server_default``, ``default`` and ``unique``. - :param autoincrement: This flag may be set to ``False`` to - indicate an integer primary key column that should not be - considered to be the "autoincrement" column, that is - the integer primary key column which generates values - implicitly upon INSERT and whose value is usually returned - via the DBAPI cursor.lastrowid attribute. It defaults - to ``True`` to satisfy the common use case of a table - with a single integer primary key column. If the table - has a composite primary key consisting of more than one - integer column, set this flag to True only on the - column that should be considered "autoincrement". + :param autoincrement: Set up "auto increment" semantics for an integer + primary key column. The default value is the string ``"auto"`` + which indicates that a single-column primary key that is of + an INTEGER type with no stated client-side or python-side defaults + should receive auto increment semantics automatically; + all other varieties of primary key columns will not. This + includes that :term:`DDL` such as Postgresql SERIAL or MySQL + AUTO_INCREMENT will be emitted for this column during a table + create, as well as that the column is assumed to generate new + integer primary key values when an INSERT statement invokes which + will be retrieved by the dialect. + + The flag may be set to ``True`` to indicate that a column which + is part of a composite (e.g. multi-column) primary key should + have autoincrement semantics, though note that only one column + within a primary key may have this setting. It can also + be set to ``True`` to indicate autoincrement semantics on a + column that has a client-side or server-side default configured, + however note that not all dialects can accommodate all styles + of default as an "autoincrement". It can also be + set to ``False`` on a single-column primary key that has a + datatype of INTEGER in order to disable auto increment semantics + for that column. + + .. versionchanged:: 1.1 The autoincrement flag now defaults to + ``"auto"`` which indicates autoincrement semantics by default + for single-column integer primary keys only; for composite + (multi-column) primary keys, autoincrement is never implicitly + enabled; as always, ``autoincrement=True`` will allow for + at most one of those columns to be an "autoincrement" column. + ``autoincrement=True`` may also be set on a :class:`.Column` + that has an explicit client-side or server-side default, + subject to limitations of the backend database and dialect. + The setting *only* has an effect for columns which are: @@ -940,11 +954,8 @@ class Column(SchemaItem, ColumnClause): primary_key=True, autoincrement='ignore_fk') It is typically not desirable to have "autoincrement" enabled - on such a column as its value intends to mirror that of a - primary key column elsewhere. - - * have no server side or client side defaults (with the exception - of Postgresql SERIAL). + on a column that refers to another via foreign key, as such a column + is required to refer to a value that originates from elsewhere. The setting has these two effects on columns that meet the above criteria: @@ -961,20 +972,15 @@ class Column(SchemaItem, ColumnClause): :ref:`sqlite_autoincrement` - * The column will be considered to be available as - cursor.lastrowid or equivalent, for those dialects which - "post fetch" newly inserted identifiers after a row has - been inserted (SQLite, MySQL, MS-SQL). It does not have - any effect in this regard for databases that use sequences - to generate primary key identifiers (i.e. Firebird, Postgresql, - Oracle). - - .. versionchanged:: 0.7.4 - ``autoincrement`` accepts a special value ``'ignore_fk'`` - to indicate that autoincrementing status regardless of foreign - key references. This applies to certain composite foreign key - setups, such as the one demonstrated in the ORM documentation - at :ref:`post_update`. + * The column will be considered to be available using an + "autoincrement" method specific to the backend database, such + as calling upon ``cursor.lastrowid``, using RETURNING in an + INSERT statement to get at a sequence-generated value, or using + special functions such as "SELECT scope_identity()". + These methods are highly specific to the DBAPIs and databases in + use and vary greatly, so care should be taken when associating + ``autoincrement=True`` with a custom default generation function. + :param default: A scalar, Python callable, or :class:`.ColumnElement` expression representing the @@ -1128,7 +1134,7 @@ class Column(SchemaItem, ColumnClause): self.system = kwargs.pop('system', False) self.doc = kwargs.pop('doc', None) self.onupdate = kwargs.pop('onupdate', None) - self.autoincrement = kwargs.pop('autoincrement', True) + self.autoincrement = kwargs.pop('autoincrement', "auto") self.constraints = set() self.foreign_keys = set() @@ -1263,12 +1269,12 @@ class Column(SchemaItem, ColumnClause): if self.primary_key: table.primary_key._replace(self) - Table._autoincrement_column._reset(table) elif self.key in table.primary_key: raise exc.ArgumentError( "Trying to redefine primary-key column '%s' as a " "non-primary-key column on table '%s'" % ( self.key, table.fullname)) + self.table = table if self.index: @@ -3025,11 +3031,77 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): self.columns.extend(columns) + PrimaryKeyConstraint._autoincrement_column._reset(self) self._set_parent_with_dispatch(self.table) def _replace(self, col): + PrimaryKeyConstraint._autoincrement_column._reset(self) self.columns.replace(col) + @property + def columns_autoinc_first(self): + autoinc = self._autoincrement_column + + if autoinc is not None: + return [autoinc] + [c for c in self.columns if c is not autoinc] + else: + return list(self.columns) + + @util.memoized_property + def _autoincrement_column(self): + + def _validate_autoinc(col, autoinc_true): + if col.type._type_affinity is None or not issubclass( + col.type._type_affinity, + type_api.INTEGERTYPE._type_affinity): + if autoinc_true: + raise exc.ArgumentError( + "Column type %s on column '%s' is not " + "compatible with autoincrement=True" % ( + col.type, + col + )) + else: + return False + elif not isinstance(col.default, (type(None), Sequence)) and \ + not autoinc_true: + return False + elif col.server_default is not None and not autoinc_true: + return False + elif ( + col.foreign_keys and col.autoincrement + not in (True, 'ignore_fk')): + return False + return True + + if len(self.columns) == 1: + col = list(self.columns)[0] + + if col.autoincrement is True: + _validate_autoinc(col, True) + return col + elif ( + col.autoincrement in ('auto', 'ignore_fk') and + _validate_autoinc(col, False) + ): + return col + + else: + autoinc = None + for col in self.columns: + if col.autoincrement is True: + _validate_autoinc(col, True) + if autoinc is not None: + raise exc.ArgumentError( + "Only one Column may be marked " + "autoincrement=True, found both %s and %s." % + (col.name, autoinc.name) + ) + else: + autoinc = col + + return autoinc + class UniqueConstraint(ColumnCollectionConstraint): """A table-level UNIQUE constraint. diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py index 77527571b..3f9ddae73 100644 --- a/lib/sqlalchemy/testing/provision.py +++ b/lib/sqlalchemy/testing/provision.py @@ -2,7 +2,7 @@ from sqlalchemy.engine import url as sa_url from sqlalchemy import text from sqlalchemy.util import compat from . import config, engines - +import os FOLLOWER_IDENT = None @@ -52,6 +52,7 @@ def setup_config(db_url, options, file_config, follower_ident): db_opts = {} _update_db_opts(db_url, db_opts) eng = engines.testing_engine(db_url, db_opts) + _post_configure_engine(db_url, eng, follower_ident) eng.connect().close() cfg = config.Config.register(eng, db_opts, options, file_config) if follower_ident: @@ -106,6 +107,11 @@ def _configure_follower(cfg, ident): @register.init +def _post_configure_engine(url, engine, follower_ident): + pass + + +@register.init def _follower_url_from_main(url, ident): url = sa_url.make_url(url) url.database = ident @@ -126,6 +132,23 @@ def _sqlite_follower_url_from_main(url, ident): return sa_url.make_url("sqlite:///%s.db" % ident) +@_post_configure_engine.for_db("sqlite") +def _sqlite_post_configure_engine(url, engine, follower_ident): + from sqlalchemy import event + + @event.listens_for(engine, "connect") + def connect(dbapi_connection, connection_record): + # use file DBs in all cases, memory acts kind of strangely + # as an attached + if not follower_ident: + dbapi_connection.execute( + 'ATTACH DATABASE "test_schema.db" AS test_schema') + else: + dbapi_connection.execute( + 'ATTACH DATABASE "%s_test_schema.db" AS test_schema' + % follower_ident) + + @_create_db.for_db("postgresql") def _pg_create_db(cfg, eng, ident): with eng.connect().execution_options( @@ -176,8 +199,10 @@ def _pg_drop_db(cfg, eng, ident): @_drop_db.for_db("sqlite") def _sqlite_drop_db(cfg, eng, ident): - pass - #os.remove("%s.db" % ident) + if ident: + os.remove("%s_test_schema.db" % ident) + else: + os.remove("%s.db" % ident) @_drop_db.for_db("mysql") diff --git a/lib/sqlalchemy/testing/schema.py b/lib/sqlalchemy/testing/schema.py index 93b52ad58..257578668 100644 --- a/lib/sqlalchemy/testing/schema.py +++ b/lib/sqlalchemy/testing/schema.py @@ -71,9 +71,12 @@ def Column(*args, **kw): args = [arg for arg in args if not isinstance(arg, schema.ForeignKey)] col = schema.Column(*args, **kw) - if 'test_needs_autoincrement' in test_opts and \ + if test_opts.get('test_needs_autoincrement', False) and \ kw.get('primary_key', False): + if col.default is None and col.server_default is None: + col.autoincrement = True + # allow any test suite to pick up on this col.info['test_needs_autoincrement'] = True diff --git a/test/dialect/mysql/test_compiler.py b/test/dialect/mysql/test_compiler.py index 304c31012..60af82bab 100644 --- a/test/dialect/mysql/test_compiler.py +++ b/test/dialect/mysql/test_compiler.py @@ -511,9 +511,8 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile(schema.CreateTable(t1), 'CREATE TABLE sometable (assigned_id ' 'INTEGER NOT NULL, id INTEGER NOT NULL ' - 'AUTO_INCREMENT, PRIMARY KEY (assigned_id, ' - 'id), KEY idx_autoinc_id (id))ENGINE=Inn' - 'oDB') + 'AUTO_INCREMENT, PRIMARY KEY (id, assigned_id)' + ')ENGINE=InnoDB') t1 = Table('sometable', MetaData(), Column('assigned_id', Integer(), primary_key=True, @@ -537,8 +536,7 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL): 'CREATE TABLE sometable (' 'id INTEGER NOT NULL, ' '`order` INTEGER NOT NULL AUTO_INCREMENT, ' - 'PRIMARY KEY (id, `order`), ' - 'KEY idx_autoinc_order (`order`)' + 'PRIMARY KEY (`order`, id)' ')ENGINE=InnoDB') def test_create_table_with_partition(self): diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py index 39b39e006..266fc335e 100644 --- a/test/dialect/mysql/test_reflection.py +++ b/test/dialect/mysql/test_reflection.py @@ -211,49 +211,55 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults): meta = MetaData(testing.db) try: Table('ai_1', meta, - Column('int_y', Integer, primary_key=True), + Column('int_y', Integer, primary_key=True, + autoincrement=True), Column('int_n', Integer, DefaultClause('0'), primary_key=True), - mysql_engine='MyISAM') + mysql_engine='MyISAM') Table('ai_2', meta, - Column('int_y', Integer, primary_key=True), + Column('int_y', Integer, primary_key=True, + autoincrement=True), Column('int_n', Integer, DefaultClause('0'), primary_key=True), - mysql_engine='MyISAM') + mysql_engine='MyISAM') Table('ai_3', meta, Column('int_n', Integer, DefaultClause('0'), primary_key=True, autoincrement=False), - Column('int_y', Integer, primary_key=True), - mysql_engine='MyISAM') + Column('int_y', Integer, primary_key=True, + autoincrement=True), + mysql_engine='MyISAM') Table('ai_4', meta, Column('int_n', Integer, DefaultClause('0'), primary_key=True, autoincrement=False), Column('int_n2', Integer, DefaultClause('0'), primary_key=True, autoincrement=False), - mysql_engine='MyISAM') + mysql_engine='MyISAM') Table('ai_5', meta, - Column('int_y', Integer, primary_key=True), + Column('int_y', Integer, primary_key=True, + autoincrement=True), Column('int_n', Integer, DefaultClause('0'), primary_key=True, autoincrement=False), - mysql_engine='MyISAM') + mysql_engine='MyISAM') Table('ai_6', meta, Column('o1', String(1), DefaultClause('x'), primary_key=True), - Column('int_y', Integer, primary_key=True), - mysql_engine='MyISAM') + Column('int_y', Integer, primary_key=True, + autoincrement=True), + mysql_engine='MyISAM') Table('ai_7', meta, Column('o1', String(1), DefaultClause('x'), primary_key=True), Column('o2', String(1), DefaultClause('x'), primary_key=True), - Column('int_y', Integer, primary_key=True), - mysql_engine='MyISAM') + Column('int_y', Integer, primary_key=True, + autoincrement=True), + mysql_engine='MyISAM') Table('ai_8', meta, Column('o1', String(1), DefaultClause('x'), primary_key=True), Column('o2', String(1), DefaultClause('x'), primary_key=True), - mysql_engine='MyISAM') + mysql_engine='MyISAM') meta.create_all() table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py index 4a33644e0..6c10d78cc 100644 --- a/test/dialect/postgresql/test_query.py +++ b/test/dialect/postgresql/test_query.py @@ -72,9 +72,12 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): engines.testing_engine(options={'implicit_returning': False}), engines.testing_engine(options={'implicit_returning': True}) ]: - assert_raises_message(exc.DBAPIError, - 'violates not-null constraint', - eng.execute, t2.insert()) + assert_raises_message( + exc.CompileError, + ".*has no Python-side or server-side default.*", + eng.execute, t2.insert() + ) + def test_sequence_insert(self): table = Table( @@ -494,26 +497,26 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): engines.testing_engine(options={'implicit_returning': False}) metadata.bind = self.engine table.insert().execute({'id': 30, 'data': 'd1'}) - if self.engine.driver == 'pg8000': - exception_cls = exc.ProgrammingError - elif self.engine.driver == 'pypostgresql': - exception_cls = Exception - else: - exception_cls = exc.IntegrityError - assert_raises_message(exception_cls, - 'violates not-null constraint', - table.insert().execute, {'data': 'd2'}) - assert_raises_message(exception_cls, - 'violates not-null constraint', - table.insert().execute, {'data': 'd2'}, - {'data': 'd3'}) - assert_raises_message(exception_cls, - 'violates not-null constraint', - table.insert().execute, {'data': 'd2'}) - assert_raises_message(exception_cls, - 'violates not-null constraint', - table.insert().execute, {'data': 'd2'}, - {'data': 'd3'}) + + assert_raises_message( + exc.CompileError, + ".*has no Python-side or server-side default.*", + table.insert().execute, {'data': 'd2'}) + assert_raises_message( + exc.CompileError, + ".*has no Python-side or server-side default.*", + table.insert().execute, {'data': 'd2'}, + {'data': 'd3'}) + assert_raises_message( + exc.CompileError, + ".*has no Python-side or server-side default.*", + table.insert().execute, {'data': 'd2'}) + assert_raises_message( + exc.CompileError, + ".*has no Python-side or server-side default.*", + table.insert().execute, {'data': 'd2'}, + {'data': 'd3'}) + table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32, 'data': 'd3'}) table.insert(inline=True).execute({'id': 33, 'data': 'd4'}) @@ -530,13 +533,15 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): m2 = MetaData(self.engine) table = Table(table.name, m2, autoload=True) table.insert().execute({'id': 30, 'data': 'd1'}) - assert_raises_message(exception_cls, - 'violates not-null constraint', - table.insert().execute, {'data': 'd2'}) - assert_raises_message(exception_cls, - 'violates not-null constraint', - table.insert().execute, {'data': 'd2'}, - {'data': 'd3'}) + assert_raises_message( + exc.CompileError, + ".*has no Python-side or server-side default.*", + table.insert().execute, {'data': 'd2'}) + assert_raises_message( + exc.CompileError, + ".*has no Python-side or server-side default.*", + table.insert().execute, {'data': 'd2'}, + {'data': 'd3'}) table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32, 'data': 'd3'}) table.insert(inline=True).execute({'id': 33, 'data': 'd4'}) diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index e080568cf..dd4a888ff 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -5,6 +5,7 @@ from sqlalchemy.testing import eq_ from sqlalchemy import * from sqlalchemy import types as sqltypes, exc, schema from sqlalchemy.sql import table, column +from sqlalchemy.sql.elements import quoted_name from sqlalchemy.testing import fixtures, AssertsExecutionResults, AssertsCompiledSQL from sqlalchemy import testing from sqlalchemy.util import u, b @@ -1859,6 +1860,31 @@ class TableReflectionTest(fixtures.TestBase): tbl = Table('test_compress', m2, autoload=True) assert tbl.dialect_options['oracle']['compress'] == "OLTP" + @testing.provide_metadata + def test_reflect_lowercase_forced_tables(self): + metadata = self.metadata + + Table( + quoted_name('t1', quote=True), metadata, + Column('id', Integer, primary_key=True), + ) + Table( + quoted_name('t2', quote=True), metadata, + Column('id', Integer, primary_key=True), + Column('t1id', ForeignKey('t1.id')) + ) + metadata.create_all() + + m2 = MetaData(testing.db) + t2_ref = Table(quoted_name('t2', quote=True), m2, autoload=True) + t1_ref = m2.tables['t1'] + assert t2_ref.c.t1id.references(t1_ref.c.id) + + m3 = MetaData(testing.db) + m3.reflect(only=lambda name, m: name.lower() in ('t1', 't2')) + assert m3.tables['t2'].c.t1id.references(m3.tables['t1'].c.id) + + class RoundTripIndexTest(fixtures.TestBase): __only_on__ = 'oracle' diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index 17920c127..33903ff89 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -20,7 +20,7 @@ from sqlalchemy.engine.url import make_url from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \ AssertsExecutionResults, engines from sqlalchemy import testing -from sqlalchemy.schema import CreateTable +from sqlalchemy.schema import CreateTable, FetchedValue from sqlalchemy.engine.reflection import Inspector from sqlalchemy.testing import mock @@ -535,29 +535,12 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): assert e.pool.__class__ is pool.NullPool - -class AttachedMemoryDBTest(fixtures.TestBase): +class AttachedDBTest(fixtures.TestBase): __only_on__ = 'sqlite' - dbname = None - - def setUp(self): - self.conn = conn = testing.db.connect() - if self.dbname is None: - dbname = ':memory:' - else: - dbname = self.dbname - conn.execute('ATTACH DATABASE "%s" AS test_schema' % dbname) - self.metadata = MetaData() - - def tearDown(self): - self.metadata.drop_all(self.conn) - self.conn.execute('DETACH DATABASE test_schema') - if self.dbname: - os.remove(self.dbname) - def _fixture(self): meta = self.metadata + self.conn = testing.db.connect() ct = Table( 'created', meta, Column('id', Integer), @@ -567,6 +550,14 @@ class AttachedMemoryDBTest(fixtures.TestBase): meta.create_all(self.conn) return ct + def setup(self): + self.conn = testing.db.connect() + self.metadata = MetaData() + + def teardown(self): + self.metadata.drop_all(self.conn) + self.conn.close() + def test_no_tables(self): insp = inspect(self.conn) eq_(insp.get_table_names("test_schema"), []) @@ -581,6 +572,18 @@ class AttachedMemoryDBTest(fixtures.TestBase): insp = inspect(self.conn) eq_(insp.get_table_names("test_schema"), ["created"]) + def test_schema_names(self): + self._fixture() + insp = inspect(self.conn) + eq_(insp.get_schema_names(), ["main", "test_schema"]) + + # implicitly creates a "temp" schema + self.conn.execute("select * from sqlite_temp_master") + + # we're not including it + insp = inspect(self.conn) + eq_(insp.get_schema_names(), ["main", "test_schema"]) + def test_reflect_system_table(self): meta = MetaData(self.conn) alt_master = Table( @@ -633,10 +636,6 @@ class AttachedMemoryDBTest(fixtures.TestBase): eq_(row['name'], 'foo') -class AttachedFileDBTest(AttachedMemoryDBTest): - dbname = 'attached_db.db' - - class SQLTest(fixtures.TestBase, AssertsCompiledSQL): """Tests SQLite-dialect specific compilation.""" @@ -752,6 +751,17 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE data > 'a' AND data < 'b''s'", dialect=sqlite.dialect()) + def test_no_autoinc_on_composite_pk(self): + m = MetaData() + t = Table( + 't', m, + Column('x', Integer, primary_key=True, autoincrement=True), + Column('y', Integer, primary_key=True)) + assert_raises_message( + exc.CompileError, + "SQLite does not support autoincrement for composite", + CreateTable(t).compile, dialect=sqlite.dialect() + ) class InsertTest(fixtures.TestBase, AssertsExecutionResults): @@ -782,23 +792,46 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') def test_empty_insert_pk2(self): + # now raises CompileError due to [ticket:3216] assert_raises( - exc.DBAPIError, self._test_empty_insert, + exc.CompileError, self._test_empty_insert, Table( 'b', MetaData(testing.db), Column('x', Integer, primary_key=True), Column('y', Integer, primary_key=True))) @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') - def test_empty_insert_pk3(self): + def test_empty_insert_pk2_fv(self): assert_raises( exc.DBAPIError, self._test_empty_insert, Table( + 'b', MetaData(testing.db), + Column('x', Integer, primary_key=True, + server_default=FetchedValue()), + Column('y', Integer, primary_key=True, + server_default=FetchedValue()))) + + @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') + def test_empty_insert_pk3(self): + # now raises CompileError due to [ticket:3216] + assert_raises( + exc.CompileError, self._test_empty_insert, + Table( 'c', MetaData(testing.db), Column('x', Integer, primary_key=True), Column('y', Integer, DefaultClause('123'), primary_key=True))) @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') + def test_empty_insert_pk3_fv(self): + assert_raises( + exc.DBAPIError, self._test_empty_insert, + Table( + 'c', MetaData(testing.db), + Column('x', Integer, primary_key=True, + server_default=FetchedValue()), + Column('y', Integer, DefaultClause('123'), primary_key=True))) + + @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') def test_empty_insert_pk4(self): self._test_empty_insert( Table( diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index 83650609d..b7bf87d63 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -311,22 +311,22 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): Don't mark this test as unsupported for any backend ! - (technically it fails with MySQL InnoDB since "id" comes before "id2") - """ meta = self.metadata - Table('test', meta, + Table( + 'test', meta, Column('id', sa.Integer, primary_key=True), Column('data', sa.String(50)), - mysql_engine='MyISAM' + mysql_engine='InnoDB' ) - Table('test2', meta, - Column('id', sa.Integer, sa.ForeignKey('test.id'), - primary_key=True), + Table( + 'test2', meta, + Column( + 'id', sa.Integer, sa.ForeignKey('test.id'), primary_key=True), Column('id2', sa.Integer, primary_key=True), Column('data', sa.String(50)), - mysql_engine='MyISAM' + mysql_engine='InnoDB' ) meta.create_all() m2 = MetaData(testing.db) @@ -334,7 +334,8 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): assert t1a._autoincrement_column is t1a.c.id t2a = Table('test2', m2, autoload=True) - assert t2a._autoincrement_column is t2a.c.id2 + assert t2a._autoincrement_column is None + @skip('sqlite') @testing.provide_metadata diff --git a/test/ext/declarative/test_basic.py b/test/ext/declarative/test_basic.py index ab0de801c..5165d9cc9 100644 --- a/test/ext/declarative/test_basic.py +++ b/test/ext/declarative/test_basic.py @@ -1570,8 +1570,7 @@ class DeclarativeTest(DeclarativeTestBase): meta = MetaData(testing.db) t1 = Table( 't1', meta, - Column('id', String(50), - primary_key=True, test_needs_autoincrement=True), + Column('id', String(50), primary_key=True), Column('data', String(50))) meta.create_all() try: diff --git a/test/orm/test_composites.py b/test/orm/test_composites.py index 8b777dcdf..48027ec2d 100644 --- a/test/orm/test_composites.py +++ b/test/orm/test_composites.py @@ -313,8 +313,7 @@ class PrimaryKeyTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table('graphs', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + Column('id', Integer, primary_key=True), Column('version_id', Integer, primary_key=True, nullable=True), Column('name', String(30))) diff --git a/test/orm/test_query.py b/test/orm/test_query.py index d7069733e..a373f1482 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -579,8 +579,7 @@ class GetTest(QueryTest): table = Table( 'unicode_data', metadata, Column( - 'id', Unicode(40), primary_key=True, - test_needs_autoincrement=True), + 'id', Unicode(40), primary_key=True), Column('data', Unicode(40))) metadata.create_all() ustring = util.b('petit voix m\xe2\x80\x99a').decode('utf-8') diff --git a/test/orm/test_relationships.py b/test/orm/test_relationships.py index 9e4b38a90..061187330 100644 --- a/test/orm/test_relationships.py +++ b/test/orm/test_relationships.py @@ -931,14 +931,12 @@ class SynonymsAsFKsTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table("tableA", metadata, - Column("id", Integer, primary_key=True, - test_needs_autoincrement=True), + Column("id", Integer, primary_key=True), Column("foo", Integer,), test_needs_fk=True) Table("tableB", metadata, - Column("id", Integer, primary_key=True, - test_needs_autoincrement=True), + Column("id", Integer, primary_key=True), Column("_a_id", Integer, key='a_id', primary_key=True), test_needs_fk=True) @@ -1093,7 +1091,7 @@ class FKsAsPksTest(fixtures.MappedTest): 'tablec', tableA.metadata, Column('id', Integer, primary_key=True), Column('a_id', Integer, ForeignKey('tableA.id'), - primary_key=True, autoincrement=False, nullable=True)) + primary_key=True, nullable=True)) tableC.create() class C(fixtures.BasicEntity): @@ -2703,8 +2701,7 @@ class ExplicitLocalRemoteTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table('t1', metadata, - Column('id', String(50), primary_key=True, - test_needs_autoincrement=True), + Column('id', String(50), primary_key=True), Column('data', String(50))) Table('t2', metadata, Column('id', Integer, primary_key=True, diff --git a/test/orm/test_unitofwork.py b/test/orm/test_unitofwork.py index 5a47903f0..2f67943f1 100644 --- a/test/orm/test_unitofwork.py +++ b/test/orm/test_unitofwork.py @@ -260,7 +260,7 @@ class PKTest(fixtures.MappedTest): def define_tables(cls, metadata): Table('multipk1', metadata, Column('multi_id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=not testing.against('sqlite')), Column('multi_rev', Integer, primary_key=True), Column('name', String(50), nullable=False), Column('value', String(100))) diff --git a/test/requirements.py b/test/requirements.py index c25b409d7..fa69a62f1 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -293,7 +293,6 @@ class DefaultRequirements(SuiteRequirements): named 'test_schema'.""" return skip_if([ - "sqlite", "firebird" ], "no schema support") diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index c957b2f8a..f6f2ec740 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -2916,6 +2916,21 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): "CREATE TABLE t (x INTEGER, z INTEGER)" ) + def test_composite_pk_constraint_autoinc_first(self): + m = MetaData() + t = Table( + 't', m, + Column('a', Integer, primary_key=True), + Column('b', Integer, primary_key=True, autoincrement=True) + ) + self.assert_compile( + schema.CreateTable(t), + "CREATE TABLE t (" + "a INTEGER NOT NULL, " + "b INTEGER NOT NULL, " + "PRIMARY KEY (b, a))" + ) + class InlineDefaultTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index 673085cf7..fc0b2bb80 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -732,7 +732,6 @@ class AutoIncrementTest(fixtures.TablesTest): ) assert x._autoincrement_column is None - @testing.fails_on('sqlite', 'FIXME: unknown') def test_non_autoincrement(self): # sqlite INT primary keys can be non-unique! (only for ints) nonai = Table( @@ -746,8 +745,9 @@ class AutoIncrementTest(fixtures.TablesTest): # mysql in legacy mode fails on second row nonai.insert().execute(data='row 1') nonai.insert().execute(data='row 2') - assert_raises( - sa.exc.DBAPIError, + assert_raises_message( + sa.exc.CompileError, + ".*has no Python-side or server-side default.*", go ) @@ -1080,6 +1080,23 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): assert not self._has_sequence('s1') assert not self._has_sequence('s2') + @testing.requires.returning + @testing.provide_metadata + def test_freestanding_sequence_via_autoinc(self): + t = Table( + 'some_table', self.metadata, + Column( + 'id', Integer, + autoincrement=True, + primary_key=True, + default=Sequence( + 'my_sequence', metadata=self.metadata).next_value()) + ) + self.metadata.create_all(testing.db) + + result = testing.db.execute(t.insert()) + eq_(result.inserted_primary_key, [1]) + cartitems = sometable = metadata = None diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py index f66f0b391..bdaf4f38c 100644 --- a/test/sql/test_insert.py +++ b/test/sql/test_insert.py @@ -390,6 +390,106 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): checkparams={"name_1": "foo"} ) + def test_anticipate_no_pk_composite_pk(self): + t = Table( + 't', MetaData(), Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True) + ) + assert_raises_message( + exc.CompileError, + "Column 't.y' is marked as a member.*" + "Note that as of SQLAlchemy 1.1,", + t.insert().compile, column_keys=['x'] + + ) + + def test_anticipate_no_pk_composite_pk_implicit_returning(self): + t = Table( + 't', MetaData(), Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True) + ) + d = postgresql.dialect() + d.implicit_returning = True + assert_raises_message( + exc.CompileError, + "Column 't.y' is marked as a member.*" + "Note that as of SQLAlchemy 1.1,", + t.insert().compile, dialect=d, column_keys=['x'] + + ) + + def test_anticipate_no_pk_composite_pk_prefetch(self): + t = Table( + 't', MetaData(), Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True) + ) + d = postgresql.dialect() + d.implicit_returning = False + assert_raises_message( + exc.CompileError, + "Column 't.y' is marked as a member.*" + "Note that as of SQLAlchemy 1.1,", + t.insert().compile, dialect=d, column_keys=['x'] + + ) + + def test_anticipate_nullable_composite_pk(self): + t = Table( + 't', MetaData(), Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True, nullable=True) + ) + self.assert_compile( + t.insert(), + "INSERT INTO t (x) VALUES (:x)", + params={'x': 5}, + ) + + def test_anticipate_no_pk_non_composite_pk(self): + t = Table( + 't', MetaData(), + Column('x', Integer, primary_key=True, autoincrement=False), + Column('q', Integer) + ) + assert_raises_message( + exc.CompileError, + "Column 't.x' is marked as a member.*" + "may not store NULL.$", + t.insert().compile, column_keys=['q'] + + ) + + def test_anticipate_no_pk_non_composite_pk_implicit_returning(self): + t = Table( + 't', MetaData(), + Column('x', Integer, primary_key=True, autoincrement=False), + Column('q', Integer) + ) + d = postgresql.dialect() + d.implicit_returning = True + assert_raises_message( + exc.CompileError, + "Column 't.x' is marked as a member.*" + "may not store NULL.$", + t.insert().compile, dialect=d, column_keys=['q'] + + ) + + def test_anticipate_no_pk_non_composite_pk_prefetch(self): + t = Table( + 't', MetaData(), + Column('x', Integer, primary_key=True, autoincrement=False), + Column('q', Integer) + ) + d = postgresql.dialect() + d.implicit_returning = False + assert_raises_message( + exc.CompileError, + "Column 't.x' is marked as a member.*" + "may not store NULL.$", + t.insert().compile, dialect=d, column_keys=['q'] + + ) + class InsertImplicitReturningTest( _InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py new file mode 100644 index 000000000..c49947425 --- /dev/null +++ b/test/sql/test_insert_exec.py @@ -0,0 +1,445 @@ +from sqlalchemy.testing import eq_, assert_raises_message, is_ +from sqlalchemy import testing +from sqlalchemy.testing import fixtures, engines +from sqlalchemy import ( + exc, sql, String, Integer, MetaData, and_, ForeignKey, + VARCHAR, INT, Sequence, func) +from sqlalchemy.testing.schema import Table, Column + + +class InsertExecTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + 'users', metadata, + Column( + 'user_id', INT, primary_key=True, + test_needs_autoincrement=True), + Column('user_name', VARCHAR(20)), + test_needs_acid=True + ) + + @testing.requires.multivalues_inserts + def test_multivalues_insert(self): + users = self.tables.users + users.insert( + values=[ + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}]).execute() + rows = users.select().order_by(users.c.user_id).execute().fetchall() + eq_(rows[0], (7, 'jack')) + eq_(rows[1], (8, 'ed')) + users.insert(values=[(9, 'jack'), (10, 'ed')]).execute() + rows = users.select().order_by(users.c.user_id).execute().fetchall() + eq_(rows[2], (9, 'jack')) + eq_(rows[3], (10, 'ed')) + + def test_insert_heterogeneous_params(self): + """test that executemany parameters are asserted to match the + parameter set of the first.""" + users = self.tables.users + + assert_raises_message( + exc.StatementError, + r"\(sqlalchemy.exc.InvalidRequestError\) A value is required for " + "bind parameter 'user_name', in " + "parameter group 2 " + r"\[SQL: u?'INSERT INTO users", + users.insert().execute, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9} + ) + + # this succeeds however. We aren't yet doing + # a length check on all subsequent parameters. + users.insert().execute( + {'user_id': 7}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9} + ) + + def _test_lastrow_accessor(self, table_, values, assertvalues): + """Tests the inserted_primary_key and lastrow_has_id() functions.""" + + def insert_values(engine, table_, values): + """ + Inserts a row into a table, returns the full list of values + INSERTed including defaults that fired off on the DB side and + detects rows that had defaults and post-fetches. + """ + + # verify implicit_returning is working + if engine.dialect.implicit_returning: + ins = table_.insert() + comp = ins.compile(engine, column_keys=list(values)) + if not set(values).issuperset( + c.key for c in table_.primary_key): + is_(bool(comp.returning), True) + + result = engine.execute(table_.insert(), **values) + ret = values.copy() + + for col, id in zip( + table_.primary_key, result.inserted_primary_key): + ret[col.key] = id + + if result.lastrow_has_defaults(): + criterion = and_( + *[ + col == id for col, id in + zip(table_.primary_key, result.inserted_primary_key)]) + row = engine.execute(table_.select(criterion)).first() + for c in table_.c: + ret[c.key] = row[c] + return ret + + if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): + assert testing.db.dialect.implicit_returning + + if testing.db.dialect.implicit_returning: + test_engines = [ + engines.testing_engine(options={'implicit_returning': False}), + engines.testing_engine(options={'implicit_returning': True}), + ] + else: + test_engines = [testing.db] + + for engine in test_engines: + try: + table_.create(bind=engine, checkfirst=True) + i = insert_values(engine, table_, values) + eq_(i, assertvalues) + finally: + table_.drop(bind=engine) + + @testing.skip_if('sqlite') + def test_lastrow_accessor_one(self): + metadata = MetaData() + self._test_lastrow_accessor( + Table( + "t1", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('foo', String(30), primary_key=True)), + {'foo': 'hi'}, + {'id': 1, 'foo': 'hi'} + ) + + @testing.skip_if('sqlite') + def test_lastrow_accessor_two(self): + metadata = MetaData() + self._test_lastrow_accessor( + Table( + "t2", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('foo', String(30), primary_key=True), + Column('bar', String(30), server_default='hi') + ), + {'foo': 'hi'}, + {'id': 1, 'foo': 'hi', 'bar': 'hi'} + ) + + def test_lastrow_accessor_three(self): + metadata = MetaData() + self._test_lastrow_accessor( + Table( + "t3", metadata, + Column("id", String(40), primary_key=True), + Column('foo', String(30), primary_key=True), + Column("bar", String(30)) + ), + {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}, + {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"} + ) + + def test_lastrow_accessor_four(self): + metadata = MetaData() + self._test_lastrow_accessor( + Table( + "t4", metadata, + Column( + 'id', Integer, + Sequence('t4_id_seq', optional=True), + primary_key=True), + Column('foo', String(30), primary_key=True), + Column('bar', String(30), server_default='hi') + ), + {'foo': 'hi', 'id': 1}, + {'id': 1, 'foo': 'hi', 'bar': 'hi'} + ) + + def test_lastrow_accessor_five(self): + metadata = MetaData() + self._test_lastrow_accessor( + Table( + "t5", metadata, + Column('id', String(10), primary_key=True), + Column('bar', String(30), server_default='hi') + ), + {'id': 'id1'}, + {'id': 'id1', 'bar': 'hi'}, + ) + + @testing.skip_if('sqlite') + def test_lastrow_accessor_six(self): + metadata = MetaData() + self._test_lastrow_accessor( + Table( + "t6", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('bar', Integer, primary_key=True) + ), + {'bar': 0}, + {'id': 1, 'bar': 0}, + ) + + # TODO: why not in the sqlite suite? + @testing.only_on('sqlite+pysqlite') + @testing.provide_metadata + def test_lastrowid_zero(self): + from sqlalchemy.dialects import sqlite + eng = engines.testing_engine() + + class ExcCtx(sqlite.base.SQLiteExecutionContext): + + def get_lastrowid(self): + return 0 + eng.dialect.execution_ctx_cls = ExcCtx + t = Table( + 't', self.metadata, Column('x', Integer, primary_key=True), + Column('y', Integer)) + t.create(eng) + r = eng.execute(t.insert().values(y=5)) + eq_(r.inserted_primary_key, [0]) + + @testing.fails_on( + 'sqlite', "sqlite autoincremnt doesn't work with composite pks") + @testing.provide_metadata + def test_misordered_lastrow(self): + metadata = self.metadata + + related = Table( + 'related', metadata, + Column('id', Integer, primary_key=True), + mysql_engine='MyISAM' + ) + t6 = Table( + "t6", metadata, + Column( + 'manual_id', Integer, ForeignKey('related.id'), + primary_key=True), + Column( + 'auto_id', Integer, primary_key=True, + test_needs_autoincrement=True), + mysql_engine='MyISAM' + ) + + metadata.create_all() + r = related.insert().values(id=12).execute() + id_ = r.inserted_primary_key[0] + eq_(id_, 12) + + r = t6.insert().values(manual_id=id_).execute() + eq_(r.inserted_primary_key, [12, 1]) + + def test_implicit_id_insert_select_columns(self): + users = self.tables.users + stmt = users.insert().from_select( + (users.c.user_id, users.c.user_name), + users.select().where(users.c.user_id == 20)) + + testing.db.execute(stmt) + + def test_implicit_id_insert_select_keys(self): + users = self.tables.users + stmt = users.insert().from_select( + ["user_id", "user_name"], + users.select().where(users.c.user_id == 20)) + + testing.db.execute(stmt) + + @testing.requires.empty_inserts + @testing.requires.returning + def test_no_inserted_pk_on_returning(self): + users = self.tables.users + result = testing.db.execute(users.insert().returning( + users.c.user_id, users.c.user_name)) + assert_raises_message( + exc.InvalidRequestError, + r"Can't call inserted_primary_key when returning\(\) is used.", + getattr, result, 'inserted_primary_key' + ) + + +class TableInsertTest(fixtures.TablesTest): + + """test for consistent insert behavior across dialects + regarding the inline=True flag, lower-case 't' tables. + + """ + run_create_tables = 'each' + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + 'foo', metadata, + Column('id', Integer, Sequence('t_id_seq'), primary_key=True), + Column('data', String(50)), + Column('x', Integer) + ) + + def _fixture(self, types=True): + if types: + t = sql.table( + 'foo', sql.column('id', Integer), + sql.column('data', String), + sql.column('x', Integer)) + else: + t = sql.table( + 'foo', sql.column('id'), sql.column('data'), sql.column('x')) + return t + + def _test(self, stmt, row, returning=None, inserted_primary_key=False): + r = testing.db.execute(stmt) + + if returning: + returned = r.first() + eq_(returned, returning) + elif inserted_primary_key is not False: + eq_(r.inserted_primary_key, inserted_primary_key) + + eq_(testing.db.execute(self.tables.foo.select()).first(), row) + + def _test_multi(self, stmt, rows, data): + testing.db.execute(stmt, rows) + eq_( + testing.db.execute( + self.tables.foo.select(). + order_by(self.tables.foo.c.id)).fetchall(), + data) + + @testing.requires.sequences + def test_expicit_sequence(self): + t = self._fixture() + self._test( + t.insert().values( + id=func.next_value(Sequence('t_id_seq')), data='data', x=5), + (1, 'data', 5) + ) + + def test_uppercase(self): + t = self.tables.foo + self._test( + t.insert().values(id=1, data='data', x=5), + (1, 'data', 5), + inserted_primary_key=[1] + ) + + def test_uppercase_inline(self): + t = self.tables.foo + self._test( + t.insert(inline=True).values(id=1, data='data', x=5), + (1, 'data', 5), + inserted_primary_key=[1] + ) + + @testing.crashes( + "mssql+pyodbc", + "Pyodbc + SQL Server + Py3K, some decimal handling issue") + def test_uppercase_inline_implicit(self): + t = self.tables.foo + self._test( + t.insert(inline=True).values(data='data', x=5), + (1, 'data', 5), + inserted_primary_key=[None] + ) + + def test_uppercase_implicit(self): + t = self.tables.foo + self._test( + t.insert().values(data='data', x=5), + (1, 'data', 5), + inserted_primary_key=[1] + ) + + def test_uppercase_direct_params(self): + t = self.tables.foo + self._test( + t.insert().values(id=1, data='data', x=5), + (1, 'data', 5), + inserted_primary_key=[1] + ) + + @testing.requires.returning + def test_uppercase_direct_params_returning(self): + t = self.tables.foo + self._test( + t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), + (1, 'data', 5), + returning=(1, 5) + ) + + @testing.fails_on( + 'mssql', "lowercase table doesn't support identity insert disable") + def test_direct_params(self): + t = self._fixture() + self._test( + t.insert().values(id=1, data='data', x=5), + (1, 'data', 5), + inserted_primary_key=[] + ) + + @testing.fails_on( + 'mssql', "lowercase table doesn't support identity insert disable") + @testing.requires.returning + def test_direct_params_returning(self): + t = self._fixture() + self._test( + t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), + (1, 'data', 5), + returning=(1, 5) + ) + + @testing.requires.emulated_lastrowid + def test_implicit_pk(self): + t = self._fixture() + self._test( + t.insert().values(data='data', x=5), + (1, 'data', 5), + inserted_primary_key=[] + ) + + @testing.requires.emulated_lastrowid + def test_implicit_pk_multi_rows(self): + t = self._fixture() + self._test_multi( + t.insert(), + [ + {'data': 'd1', 'x': 5}, + {'data': 'd2', 'x': 6}, + {'data': 'd3', 'x': 7}, + ], + [ + (1, 'd1', 5), + (2, 'd2', 6), + (3, 'd3', 7) + ], + ) + + @testing.requires.emulated_lastrowid + def test_implicit_pk_inline(self): + t = self._fixture() + self._test( + t.insert(inline=True).values(data='data', x=5), + (1, 'data', 5), + inserted_primary_key=[] + ) diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 2e51b9a91..501df4671 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -1361,6 +1361,123 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): assert not t1.c.x.nullable +class PKAutoIncrementTest(fixtures.TestBase): + def test_multi_integer_no_autoinc(self): + pk = PrimaryKeyConstraint( + Column('a', Integer), + Column('b', Integer) + ) + t = Table('t', MetaData()) + t.append_constraint(pk) + + is_(pk._autoincrement_column, None) + + def test_multi_integer_multi_autoinc(self): + pk = PrimaryKeyConstraint( + Column('a', Integer, autoincrement=True), + Column('b', Integer, autoincrement=True) + ) + t = Table('t', MetaData()) + t.append_constraint(pk) + + assert_raises_message( + exc.ArgumentError, + "Only one Column may be marked", + lambda: pk._autoincrement_column + ) + + def test_single_integer_no_autoinc(self): + pk = PrimaryKeyConstraint( + Column('a', Integer), + ) + t = Table('t', MetaData()) + t.append_constraint(pk) + + is_(pk._autoincrement_column, pk.columns['a']) + + def test_single_string_no_autoinc(self): + pk = PrimaryKeyConstraint( + Column('a', String), + ) + t = Table('t', MetaData()) + t.append_constraint(pk) + + is_(pk._autoincrement_column, None) + + def test_single_string_illegal_autoinc(self): + t = Table('t', MetaData(), Column('a', String, autoincrement=True)) + pk = PrimaryKeyConstraint( + t.c.a + ) + t.append_constraint(pk) + + assert_raises_message( + exc.ArgumentError, + "Column type VARCHAR on column 't.a'", + lambda: pk._autoincrement_column + ) + + def test_single_integer_default(self): + t = Table( + 't', MetaData(), + Column('a', Integer, autoincrement=True, default=lambda: 1)) + pk = PrimaryKeyConstraint( + t.c.a + ) + t.append_constraint(pk) + + is_(pk._autoincrement_column, t.c.a) + + def test_single_integer_server_default(self): + # new as of 1.1; now that we have three states for autoincrement, + # if the user puts autoincrement=True with a server_default, trust + # them on it + t = Table( + 't', MetaData(), + Column('a', Integer, + autoincrement=True, server_default=func.magic())) + pk = PrimaryKeyConstraint( + t.c.a + ) + t.append_constraint(pk) + + is_(pk._autoincrement_column, t.c.a) + + def test_implicit_autoinc_but_fks(self): + m = MetaData() + Table('t1', m, Column('id', Integer, primary_key=True)) + t2 = Table( + 't2', MetaData(), + Column('a', Integer, ForeignKey('t1.id'))) + pk = PrimaryKeyConstraint( + t2.c.a + ) + t2.append_constraint(pk) + is_(pk._autoincrement_column, None) + + def test_explicit_autoinc_but_fks(self): + m = MetaData() + Table('t1', m, Column('id', Integer, primary_key=True)) + t2 = Table( + 't2', MetaData(), + Column('a', Integer, ForeignKey('t1.id'), autoincrement=True)) + pk = PrimaryKeyConstraint( + t2.c.a + ) + t2.append_constraint(pk) + is_(pk._autoincrement_column, t2.c.a) + + t3 = Table( + 't3', MetaData(), + Column('a', Integer, + ForeignKey('t1.id'), autoincrement='ignore_fk')) + pk = PrimaryKeyConstraint( + t3.c.a + ) + t3.append_constraint(pk) + is_(pk._autoincrement_column, t3.c.a) + + class SchemaTypeTest(fixtures.TestBase): class MyType(sqltypes.SchemaType, sqltypes.TypeEngine): diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 0313a9cd0..aca933fc9 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -2,13 +2,12 @@ from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \ is_, in_, not_in_ from sqlalchemy import testing from sqlalchemy.testing import fixtures, engines -from sqlalchemy import util from sqlalchemy import ( exc, sql, func, select, String, Integer, MetaData, and_, ForeignKey, - union, intersect, except_, union_all, VARCHAR, INT, CHAR, text, Sequence, - bindparam, literal, not_, type_coerce, literal_column, desc, asc, - TypeDecorator, or_, cast, table, column) -from sqlalchemy.engine import default, result as _result + union, intersect, except_, union_all, VARCHAR, INT, text, + bindparam, literal, not_, literal_column, desc, asc, + TypeDecorator, or_, cast) +from sqlalchemy.engine import default from sqlalchemy.testing.schema import Table, Column # ongoing - these are old tests. those which are of general use @@ -62,260 +61,6 @@ class QueryTest(fixtures.TestBase): def teardown_class(cls): metadata.drop_all() - @testing.requires.multivalues_inserts - def test_multivalues_insert(self): - users.insert( - values=[ - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}]).execute() - rows = users.select().order_by(users.c.user_id).execute().fetchall() - self.assert_(rows[0] == (7, 'jack')) - self.assert_(rows[1] == (8, 'ed')) - users.insert(values=[(9, 'jack'), (10, 'ed')]).execute() - rows = users.select().order_by(users.c.user_id).execute().fetchall() - self.assert_(rows[2] == (9, 'jack')) - self.assert_(rows[3] == (10, 'ed')) - - def test_insert_heterogeneous_params(self): - """test that executemany parameters are asserted to match the - parameter set of the first.""" - - assert_raises_message( - exc.StatementError, - r"\(sqlalchemy.exc.InvalidRequestError\) A value is required for " - "bind parameter 'user_name', in " - "parameter group 2 " - r"\[SQL: u?'INSERT INTO query_users", - users.insert().execute, - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9} - ) - - # this succeeds however. We aren't yet doing - # a length check on all subsequent parameters. - users.insert().execute( - {'user_id': 7}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9} - ) - - def test_lastrow_accessor(self): - """Tests the inserted_primary_key and lastrow_has_id() functions.""" - - def insert_values(engine, table, values): - """ - Inserts a row into a table, returns the full list of values - INSERTed including defaults that fired off on the DB side and - detects rows that had defaults and post-fetches. - """ - - # verify implicit_returning is working - if engine.dialect.implicit_returning: - ins = table.insert() - comp = ins.compile(engine, column_keys=list(values)) - if not set(values).issuperset( - c.key for c in table.primary_key): - assert comp.returning - - result = engine.execute(table.insert(), **values) - ret = values.copy() - - for col, id in zip(table.primary_key, result.inserted_primary_key): - ret[col.key] = id - - if result.lastrow_has_defaults(): - criterion = and_( - *[ - col == id for col, id in - zip(table.primary_key, result.inserted_primary_key)]) - row = engine.execute(table.select(criterion)).first() - for c in table.c: - ret[c.key] = row[c] - return ret - - if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): - assert testing.db.dialect.implicit_returning - - if testing.db.dialect.implicit_returning: - test_engines = [ - engines.testing_engine(options={'implicit_returning': False}), - engines.testing_engine(options={'implicit_returning': True}), - ] - else: - test_engines = [testing.db] - - for engine in test_engines: - metadata = MetaData() - for supported, table, values, assertvalues in [ - ( - {'unsupported': ['sqlite']}, - Table( - "t1", metadata, - Column( - 'id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('foo', String(30), primary_key=True)), - {'foo': 'hi'}, - {'id': 1, 'foo': 'hi'} - ), - ( - {'unsupported': ['sqlite']}, - Table( - "t2", metadata, - Column( - 'id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'foo': 'hi'}, - {'id': 1, 'foo': 'hi', 'bar': 'hi'} - ), - ( - {'unsupported': []}, - Table( - "t3", metadata, - Column("id", String(40), primary_key=True), - Column('foo', String(30), primary_key=True), - Column("bar", String(30)) - ), - {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}, - {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"} - ), - ( - {'unsupported': []}, - Table( - "t4", metadata, - Column( - 'id', Integer, - Sequence('t4_id_seq', optional=True), - primary_key=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'foo': 'hi', 'id': 1}, - {'id': 1, 'foo': 'hi', 'bar': 'hi'} - ), - ( - {'unsupported': []}, - Table( - "t5", metadata, - Column('id', String(10), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'id': 'id1'}, - {'id': 'id1', 'bar': 'hi'}, - ), - ( - {'unsupported': ['sqlite']}, - Table( - "t6", metadata, - Column( - 'id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('bar', Integer, primary_key=True) - ), - {'bar': 0}, - {'id': 1, 'bar': 0}, - ), - ]: - if testing.db.name in supported['unsupported']: - continue - try: - table.create(bind=engine, checkfirst=True) - i = insert_values(engine, table, values) - assert i == assertvalues, "tablename: %s %r %r" % \ - (table.name, repr(i), repr(assertvalues)) - finally: - table.drop(bind=engine) - - # TODO: why not in the sqlite suite? - @testing.only_on('sqlite+pysqlite') - @testing.provide_metadata - def test_lastrowid_zero(self): - from sqlalchemy.dialects import sqlite - eng = engines.testing_engine() - - class ExcCtx(sqlite.base.SQLiteExecutionContext): - - def get_lastrowid(self): - return 0 - eng.dialect.execution_ctx_cls = ExcCtx - t = Table( - 't', self.metadata, Column('x', Integer, primary_key=True), - Column('y', Integer)) - t.create(eng) - r = eng.execute(t.insert().values(y=5)) - eq_(r.inserted_primary_key, [0]) - - @testing.fails_on( - 'sqlite', "sqlite autoincremnt doesn't work with composite pks") - def test_misordered_lastrow(self): - related = Table( - 'related', metadata, - Column('id', Integer, primary_key=True), - mysql_engine='MyISAM' - ) - t6 = Table( - "t6", metadata, - Column( - 'manual_id', Integer, ForeignKey('related.id'), - primary_key=True), - Column( - 'auto_id', Integer, primary_key=True, - test_needs_autoincrement=True), - mysql_engine='MyISAM' - ) - - metadata.create_all() - r = related.insert().values(id=12).execute() - id = r.inserted_primary_key[0] - assert id == 12 - - r = t6.insert().values(manual_id=id).execute() - eq_(r.inserted_primary_key, [12, 1]) - - def test_implicit_id_insert_select_columns(self): - stmt = users.insert().from_select( - (users.c.user_id, users.c.user_name), - users.select().where(users.c.user_id == 20)) - - testing.db.execute(stmt) - - def test_implicit_id_insert_select_keys(self): - stmt = users.insert().from_select( - ["user_id", "user_name"], - users.select().where(users.c.user_id == 20)) - - testing.db.execute(stmt) - - def test_row_iteration(self): - users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, - ) - r = users.select().execute() - l = [] - for row in r: - l.append(row) - self.assert_(len(l) == 3) - - @testing.requires.subqueries - def test_anonymous_rows(self): - users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, - ) - - sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \ - as_scalar() - for row in select([sel + 1, sel + 3], bind=users.bind).execute(): - assert row['anon_1'] == 8 - assert row['anon_2'] == 10 - @testing.fails_on( 'firebird', "kinterbasdb doesn't send full type information") def test_order_by_label(self): @@ -365,154 +110,6 @@ class QueryTest(fixtures.TestBase): [("test: ed",), ("test: fred",), ("test: jack",)] ) - def test_row_comparison(self): - users.insert().execute(user_id=7, user_name='jack') - rp = users.select().execute().first() - - self.assert_(rp == rp) - self.assert_(not(rp != rp)) - - equal = (7, 'jack') - - self.assert_(rp == equal) - self.assert_(equal == rp) - self.assert_(not (rp != equal)) - self.assert_(not (equal != equal)) - - def endless(): - while True: - yield 1 - self.assert_(rp != endless()) - self.assert_(endless() != rp) - - # test that everything compares the same - # as it would against a tuple - import operator - for compare in [False, 8, endless(), 'xyz', (7, 'jack')]: - for op in [ - operator.eq, operator.ne, operator.gt, - operator.lt, operator.ge, operator.le - ]: - - try: - control = op(equal, compare) - except TypeError: - # Py3K raises TypeError for some invalid comparisons - assert_raises(TypeError, op, rp, compare) - else: - eq_(control, op(rp, compare)) - - try: - control = op(compare, equal) - except TypeError: - # Py3K raises TypeError for some invalid comparisons - assert_raises(TypeError, op, compare, rp) - else: - eq_(control, op(compare, rp)) - - @testing.provide_metadata - def test_column_label_overlap_fallback(self): - content = Table( - 'content', self.metadata, - Column('type', String(30)), - ) - bar = Table( - 'bar', self.metadata, - Column('content_type', String(30)) - ) - self.metadata.create_all(testing.db) - testing.db.execute(content.insert().values(type="t1")) - - row = testing.db.execute(content.select(use_labels=True)).first() - assert content.c.type in row - assert bar.c.content_type not in row - assert sql.column('content_type') in row - - row = testing.db.execute( - select([content.c.type.label("content_type")])).first() - assert content.c.type in row - - assert bar.c.content_type not in row - - assert sql.column('content_type') in row - - row = testing.db.execute(select([func.now().label("content_type")])). \ - first() - assert content.c.type not in row - - assert bar.c.content_type not in row - - assert sql.column('content_type') in row - - def test_pickled_rows(self): - users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, - ) - - for pickle in False, True: - for use_labels in False, True: - result = users.select(use_labels=use_labels).order_by( - users.c.user_id).execute().fetchall() - - if pickle: - result = util.pickle.loads(util.pickle.dumps(result)) - - eq_( - result, - [(7, "jack"), (8, "ed"), (9, "fred")] - ) - if use_labels: - eq_(result[0]['query_users_user_id'], 7) - eq_( - list(result[0].keys()), - ["query_users_user_id", "query_users_user_name"]) - else: - eq_(result[0]['user_id'], 7) - eq_(list(result[0].keys()), ["user_id", "user_name"]) - - eq_(result[0][0], 7) - eq_(result[0][users.c.user_id], 7) - eq_(result[0][users.c.user_name], 'jack') - - if not pickle or use_labels: - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.user_id]) - else: - # test with a different table. name resolution is - # causing 'user_id' to match when use_labels wasn't used. - eq_(result[0][addresses.c.user_id], 7) - - assert_raises( - exc.NoSuchColumnError, lambda: result[0]['fake key']) - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.address_id]) - - def test_column_error_printing(self): - row = testing.db.execute(select([1])).first() - - class unprintable(object): - - def __str__(self): - raise ValueError("nope") - - msg = r"Could not locate column in row for column '%s'" - - for accessor, repl in [ - ("x", "x"), - (Column("q", Integer), "q"), - (Column("q", Integer) + 12, r"q \+ :q_1"), - (unprintable(), "unprintable element.*"), - ]: - assert_raises_message( - exc.NoSuchColumnError, - msg % repl, - lambda: row[accessor] - ) - @testing.requires.boolean_col_expressions def test_or_and_as_columns(self): true, false = literal(True), literal(False) @@ -539,16 +136,6 @@ class QueryTest(fixtures.TestBase): assert row.x == True # noqa assert row.y == False # noqa - def test_fetchmany(self): - users.insert().execute(user_id=7, user_name='jack') - users.insert().execute(user_id=8, user_name='ed') - users.insert().execute(user_id=9, user_name='fred') - r = users.select().execute() - l = [] - for row in r.fetchmany(size=2): - l.append(row) - self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l)) - def test_like_ops(self): users.insert().execute( {'user_id': 1, 'user_name': 'apples'}, @@ -817,618 +404,6 @@ class QueryTest(fixtures.TestBase): use_labels=labels), [(3, 'a'), (2, 'b'), (1, None)]) - def test_column_slices(self): - users.insert().execute(user_id=1, user_name='john') - users.insert().execute(user_id=2, user_name='jack') - addresses.insert().execute( - address_id=1, user_id=2, address='foo@bar.com') - - r = text( - "select * from query_addresses", bind=testing.db).execute().first() - self.assert_(r[0:1] == (1,)) - self.assert_(r[1:] == (2, 'foo@bar.com')) - self.assert_(r[:-1] == (1, 2)) - - def test_column_accessor_basic_compiled(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') - ) - - r = users.select(users.c.user_id == 2).execute().first() - self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_( - r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - def test_column_accessor_basic_text(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') - ) - r = testing.db.execute( - text("select * from query_users where user_id=2")).first() - self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_( - r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - def test_column_accessor_textual_select(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') - ) - # this will create column() objects inside - # the select(), these need to match on name anyway - r = testing.db.execute( - select([ - column('user_id'), column('user_name') - ]).select_from(table('query_users')). - where(text('user_id=2')) - ).first() - self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_( - r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - def test_column_accessor_dotted_union(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - - # test a little sqlite weirdness - with the UNION, - # cols come back as "query_users.user_id" in cursor.description - r = testing.db.execute( - text( - "select query_users.user_id, query_users.user_name " - "from query_users " - "UNION select query_users.user_id, " - "query_users.user_name from query_users" - ) - ).first() - eq_(r['user_id'], 1) - eq_(r['user_name'], "john") - eq_(list(r.keys()), ["user_id", "user_name"]) - - @testing.only_on("sqlite", "sqlite specific feature") - def test_column_accessor_sqlite_raw(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - - r = text( - "select query_users.user_id, query_users.user_name " - "from query_users " - "UNION select query_users.user_id, " - "query_users.user_name from query_users", - bind=testing.db).execution_options(sqlite_raw_colnames=True). \ - execute().first() - assert 'user_id' not in r - assert 'user_name' not in r - eq_(r['query_users.user_id'], 1) - eq_(r['query_users.user_name'], "john") - eq_(list(r.keys()), ["query_users.user_id", "query_users.user_name"]) - - @testing.only_on("sqlite", "sqlite specific feature") - def test_column_accessor_sqlite_translated(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - - r = text( - "select query_users.user_id, query_users.user_name " - "from query_users " - "UNION select query_users.user_id, " - "query_users.user_name from query_users", - bind=testing.db).execute().first() - eq_(r['user_id'], 1) - eq_(r['user_name'], "john") - eq_(r['query_users.user_id'], 1) - eq_(r['query_users.user_name'], "john") - eq_(list(r.keys()), ["user_id", "user_name"]) - - def test_column_accessor_labels_w_dots(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - # test using literal tablename.colname - r = text( - 'select query_users.user_id AS "query_users.user_id", ' - 'query_users.user_name AS "query_users.user_name" ' - 'from query_users', bind=testing.db).\ - execution_options(sqlite_raw_colnames=True).execute().first() - eq_(r['query_users.user_id'], 1) - eq_(r['query_users.user_name'], "john") - assert "user_name" not in r - eq_(list(r.keys()), ["query_users.user_id", "query_users.user_name"]) - - def test_column_accessor_unary(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - - # unary experssions - r = select([users.c.user_name.distinct()]).order_by( - users.c.user_name).execute().first() - eq_(r[users.c.user_name], 'john') - eq_(r.user_name, 'john') - - def test_column_accessor_err(self): - r = testing.db.execute(select([1])).first() - assert_raises_message( - AttributeError, - "Could not locate column in row for column 'foo'", - getattr, r, "foo" - ) - assert_raises_message( - KeyError, - "Could not locate column in row for column 'foo'", - lambda: r['foo'] - ) - - def test_graceful_fetch_on_non_rows(self): - """test that calling fetchone() etc. on a result that doesn't - return rows fails gracefully. - - """ - - # these proxies don't work with no cursor.description present. - # so they don't apply to this test at the moment. - # result.FullyBufferedResultProxy, - # result.BufferedRowResultProxy, - # result.BufferedColumnResultProxy - - conn = testing.db.connect() - for meth in [ - lambda r: r.fetchone(), - lambda r: r.fetchall(), - lambda r: r.first(), - lambda r: r.scalar(), - lambda r: r.fetchmany(), - lambda r: r._getter('user'), - lambda r: r._has_key('user'), - ]: - trans = conn.begin() - result = conn.execute(users.insert(), user_id=1) - assert_raises_message( - exc.ResourceClosedError, - "This result object does not return rows. " - "It has been closed automatically.", - meth, result, - ) - trans.rollback() - - @testing.requires.empty_inserts - @testing.requires.returning - def test_no_inserted_pk_on_returning(self): - result = testing.db.execute(users.insert().returning( - users.c.user_id, users.c.user_name)) - assert_raises_message( - exc.InvalidRequestError, - r"Can't call inserted_primary_key when returning\(\) is used.", - getattr, result, 'inserted_primary_key' - ) - - def test_fetchone_til_end(self): - result = testing.db.execute("select * from query_users") - eq_(result.fetchone(), None) - eq_(result.fetchone(), None) - eq_(result.fetchone(), None) - result.close() - assert_raises_message( - exc.ResourceClosedError, - "This result object is closed.", - result.fetchone - ) - - def test_row_case_sensitive(self): - row = testing.db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive") - ]) - ).first() - - eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - not_in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - - assert_raises( - KeyError, - lambda: row["Case_insensitive"] - ) - assert_raises( - KeyError, - lambda: row["casesensitive"] - ) - - def test_row_case_sensitive_unoptimized(self): - ins_db = engines.testing_engine(options={"case_sensitive": True}) - row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols") - ]) - ).first() - - eq_( - list(row.keys()), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - not_in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["screw_up_the_cols"], 3) - - assert_raises(KeyError, lambda: row["Case_insensitive"]) - assert_raises(KeyError, lambda: row["casesensitive"]) - assert_raises(KeyError, lambda: row["screw_UP_the_cols"]) - - def test_row_case_insensitive(self): - ins_db = engines.testing_engine(options={"case_sensitive": False}) - row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive") - ]) - ).first() - - eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["Case_insensitive"], 1) - eq_(row["casesensitive"], 2) - - def test_row_case_insensitive_unoptimized(self): - ins_db = engines.testing_engine(options={"case_sensitive": False}) - row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols") - ]) - ).first() - - eq_( - list(row.keys()), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["screw_up_the_cols"], 3) - eq_(row["Case_insensitive"], 1) - eq_(row["casesensitive"], 2) - eq_(row["screw_UP_the_cols"], 3) - - def test_row_as_args(self): - users.insert().execute(user_id=1, user_name='john') - r = users.select(users.c.user_id == 1).execute().first() - users.delete().execute() - users.insert().execute(r) - eq_(users.select().execute().fetchall(), [(1, 'john')]) - - def test_result_as_args(self): - users.insert().execute([ - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='ed')]) - r = users.select().execute() - users2.insert().execute(list(r)) - eq_( - users2.select().order_by(users2.c.user_id).execute().fetchall(), - [(1, 'john'), (2, 'ed')] - ) - - users2.delete().execute() - r = users.select().execute() - users2.insert().execute(*list(r)) - eq_( - users2.select().order_by(users2.c.user_id).execute().fetchall(), - [(1, 'john'), (2, 'ed')] - ) - - @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column(self): - users.insert().execute(user_id=1, user_name='john') - result = users.outerjoin(addresses).select().execute() - r = result.first() - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r['user_id'] - ) - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r[users.c.user_id] - ) - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r[addresses.c.user_id] - ) - - # try to trick it - fake_table isn't in the result! - # we get the correct error - fake_table = Table('fake', MetaData(), Column('user_id', Integer)) - assert_raises_message( - exc.InvalidRequestError, - "Could not locate column in row for column 'fake.user_id'", - lambda: r[fake_table.c.user_id] - ) - - r = util.pickle.loads(util.pickle.dumps(r)) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r['user_id'] - ) - - result = users.outerjoin(addresses).select().execute() - result = _result.BufferedColumnResultProxy(result.context) - r = result.first() - assert isinstance(r, _result.BufferedColumnRow) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r['user_id'] - ) - - @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column_by_col(self): - users.insert().execute(user_id=1, user_name='john') - ua = users.alias() - u2 = users.alias() - result = select([users.c.user_id, ua.c.user_id]).execute() - row = result.first() - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row[users.c.user_id] - ) - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row[ua.c.user_id] - ) - - # Unfortunately, this fails - - # we'd like - # "Could not locate column in row" - # to be raised here, but the check for - # "common column" in _compare_name_for_result() - # has other requirements to be more liberal. - # Ultimately the - # expression system would need a way to determine - # if given two columns in a "proxy" relationship, if they - # refer to a different parent table - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row[u2.c.user_id] - ) - - @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column_contains(self): - # ticket 2702. in 0.7 we'd get True, False. - # in 0.8, both columns are present so it's True; - # but when they're fetched you'll get the ambiguous error. - users.insert().execute(user_id=1, user_name='john') - result = select([users.c.user_id, addresses.c.user_id]).\ - select_from(users.outerjoin(addresses)).execute() - row = result.first() - - eq_( - set([users.c.user_id in row, addresses.c.user_id in row]), - set([True]) - ) - - def test_ambiguous_column_by_col_plus_label(self): - users.insert().execute(user_id=1, user_name='john') - result = select( - [users.c.user_id, - type_coerce(users.c.user_id, Integer).label('foo')]).execute() - row = result.first() - eq_( - row[users.c.user_id], 1 - ) - eq_( - row[1], 1 - ) - - def test_fetch_partial_result_map(self): - users.insert().execute(user_id=7, user_name='ed') - - t = text("select * from query_users").columns( - user_name=String() - ) - eq_( - testing.db.execute(t).fetchall(), [(7, 'ed')] - ) - - def test_fetch_unordered_result_map(self): - users.insert().execute(user_id=7, user_name='ed') - - class Goofy1(TypeDecorator): - impl = String - - def process_result_value(self, value, dialect): - return value + "a" - - class Goofy2(TypeDecorator): - impl = String - - def process_result_value(self, value, dialect): - return value + "b" - - class Goofy3(TypeDecorator): - impl = String - - def process_result_value(self, value, dialect): - return value + "c" - - t = text( - "select user_name as a, user_name as b, " - "user_name as c from query_users").columns( - a=Goofy1(), b=Goofy2(), c=Goofy3() - ) - eq_( - testing.db.execute(t).fetchall(), [ - ('eda', 'edb', 'edc') - ] - ) - - @testing.requires.subqueries - def test_column_label_targeting(self): - users.insert().execute(user_id=7, user_name='ed') - - for s in ( - users.select().alias('foo'), - users.select().alias(users.name), - ): - row = s.select(use_labels=True).execute().first() - assert row[s.c.user_id] == 7 - assert row[s.c.user_name] == 'ed' - - def test_keys(self): - users.insert().execute(user_id=1, user_name='foo') - result = users.select().execute() - eq_( - result.keys(), - ['user_id', 'user_name'] - ) - row = result.first() - eq_( - row.keys(), - ['user_id', 'user_name'] - ) - - def test_keys_anon_labels(self): - """test [ticket:3483]""" - - users.insert().execute(user_id=1, user_name='foo') - result = testing.db.execute( - select([ - users.c.user_id, - users.c.user_name.label(None), - func.count(literal_column('1'))]). - group_by(users.c.user_id, users.c.user_name) - ) - - eq_( - result.keys(), - ['user_id', 'user_name_1', 'count_1'] - ) - row = result.first() - eq_( - row.keys(), - ['user_id', 'user_name_1', 'count_1'] - ) - - def test_items(self): - users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().first() - eq_( - [(x[0].lower(), x[1]) for x in list(r.items())], - [('user_id', 1), ('user_name', 'foo')]) - - def test_len(self): - users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().first() - eq_(len(r), 2) - - r = testing.db.execute('select user_name, user_id from query_users'). \ - first() - eq_(len(r), 2) - r = testing.db.execute('select user_name from query_users').first() - eq_(len(r), 1) - - def test_sorting_in_python(self): - users.insert().execute( - dict(user_id=1, user_name='foo'), - dict(user_id=2, user_name='bar'), - dict(user_id=3, user_name='def'), - ) - - rows = users.select().order_by(users.c.user_name).execute().fetchall() - - eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')]) - - eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')]) - - def test_column_order_with_simple_query(self): - # should return values in column definition order - users.insert().execute(user_id=1, user_name='foo') - r = users.select(users.c.user_id == 1).execute().first() - eq_(r[0], 1) - eq_(r[1], 'foo') - eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name']) - eq_(list(r.values()), [1, 'foo']) - - def test_column_order_with_text_query(self): - # should return values in query order - users.insert().execute(user_id=1, user_name='foo') - r = testing.db.execute('select user_name, user_id from query_users'). \ - first() - eq_(r[0], 'foo') - eq_(r[1], 1) - eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id']) - eq_(list(r.values()), ['foo', 1]) - - @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()') - @testing.crashes('firebird', 'An identifier must begin with a letter') - def test_column_accessor_shadow(self): - meta = MetaData(testing.db) - shadowed = Table( - 'test_shadowed', meta, - Column('shadow_id', INT, primary_key=True), - Column('shadow_name', VARCHAR(20)), - Column('parent', VARCHAR(20)), - Column('row', VARCHAR(40)), - Column('_parent', VARCHAR(20)), - Column('_row', VARCHAR(20)), - ) - shadowed.create(checkfirst=True) - try: - shadowed.insert().execute( - shadow_id=1, shadow_name='The Shadow', parent='The Light', - row='Without light there is no shadow', - _parent='Hidden parent', _row='Hidden row') - r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() - self.assert_( - r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) - self.assert_( - r.shadow_name == r['shadow_name'] == - r[shadowed.c.shadow_name] == 'The Shadow') - self.assert_( - r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') - self.assert_( - r.row == r['row'] == r[shadowed.c.row] == - 'Without light there is no shadow') - self.assert_(r['_parent'] == 'Hidden parent') - self.assert_(r['_row'] == 'Hidden row') - finally: - shadowed.drop(checkfirst=True) - @testing.emits_warning('.*empty sequence.*') def test_in_filtering(self): """test the behavior of the in_() function.""" @@ -1578,393 +553,6 @@ class RequiredBindTest(fixtures.TablesTest): is_(bindparam('foo', callable_=c, required=False).required, False) -class TableInsertTest(fixtures.TablesTest): - - """test for consistent insert behavior across dialects - regarding the inline=True flag, lower-case 't' tables. - - """ - run_create_tables = 'each' - __backend__ = True - - @classmethod - def define_tables(cls, metadata): - Table( - 'foo', metadata, - Column('id', Integer, Sequence('t_id_seq'), primary_key=True), - Column('data', String(50)), - Column('x', Integer) - ) - - def _fixture(self, types=True): - if types: - t = sql.table( - 'foo', sql.column('id', Integer), - sql.column('data', String), - sql.column('x', Integer)) - else: - t = sql.table( - 'foo', sql.column('id'), sql.column('data'), sql.column('x')) - return t - - def _test(self, stmt, row, returning=None, inserted_primary_key=False): - r = testing.db.execute(stmt) - - if returning: - returned = r.first() - eq_(returned, returning) - elif inserted_primary_key is not False: - eq_(r.inserted_primary_key, inserted_primary_key) - - eq_(testing.db.execute(self.tables.foo.select()).first(), row) - - def _test_multi(self, stmt, rows, data): - testing.db.execute(stmt, rows) - eq_( - testing.db.execute( - self.tables.foo.select(). - order_by(self.tables.foo.c.id)).fetchall(), - data) - - @testing.requires.sequences - def test_expicit_sequence(self): - t = self._fixture() - self._test( - t.insert().values( - id=func.next_value(Sequence('t_id_seq')), data='data', x=5), - (1, 'data', 5) - ) - - def test_uppercase(self): - t = self.tables.foo - self._test( - t.insert().values(id=1, data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[1] - ) - - def test_uppercase_inline(self): - t = self.tables.foo - self._test( - t.insert(inline=True).values(id=1, data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[1] - ) - - @testing.crashes( - "mssql+pyodbc", - "Pyodbc + SQL Server + Py3K, some decimal handling issue") - def test_uppercase_inline_implicit(self): - t = self.tables.foo - self._test( - t.insert(inline=True).values(data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[None] - ) - - def test_uppercase_implicit(self): - t = self.tables.foo - self._test( - t.insert().values(data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[1] - ) - - def test_uppercase_direct_params(self): - t = self.tables.foo - self._test( - t.insert().values(id=1, data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[1] - ) - - @testing.requires.returning - def test_uppercase_direct_params_returning(self): - t = self.tables.foo - self._test( - t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), - (1, 'data', 5), - returning=(1, 5) - ) - - @testing.fails_on( - 'mssql', "lowercase table doesn't support identity insert disable") - def test_direct_params(self): - t = self._fixture() - self._test( - t.insert().values(id=1, data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[] - ) - - @testing.fails_on( - 'mssql', "lowercase table doesn't support identity insert disable") - @testing.requires.returning - def test_direct_params_returning(self): - t = self._fixture() - self._test( - t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), - (1, 'data', 5), - returning=(1, 5) - ) - - @testing.requires.emulated_lastrowid - def test_implicit_pk(self): - t = self._fixture() - self._test( - t.insert().values(data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[] - ) - - @testing.requires.emulated_lastrowid - def test_implicit_pk_multi_rows(self): - t = self._fixture() - self._test_multi( - t.insert(), - [ - {'data': 'd1', 'x': 5}, - {'data': 'd2', 'x': 6}, - {'data': 'd3', 'x': 7}, - ], - [ - (1, 'd1', 5), - (2, 'd2', 6), - (3, 'd3', 7) - ], - ) - - @testing.requires.emulated_lastrowid - def test_implicit_pk_inline(self): - t = self._fixture() - self._test( - t.insert(inline=True).values(data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[] - ) - - -class KeyTargetingTest(fixtures.TablesTest): - run_inserts = 'once' - run_deletes = None - __backend__ = True - - @classmethod - def define_tables(cls, metadata): - Table( - 'keyed1', metadata, Column("a", CHAR(2), key="b"), - Column("c", CHAR(2), key="q") - ) - Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) - Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) - Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) - Table('content', metadata, Column('t', String(30), key="type")) - Table('bar', metadata, Column('ctype', String(30), key="content_type")) - - if testing.requires.schemas.enabled: - Table( - 'wschema', metadata, - Column("a", CHAR(2), key="b"), - Column("c", CHAR(2), key="q"), - schema=testing.config.test_schema - ) - - @classmethod - def insert_data(cls): - cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) - cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) - cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) - cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) - cls.tables.content.insert().execute(type="t1") - - if testing.requires.schemas.enabled: - cls.tables['%s.wschema' % testing.config.test_schema].insert().execute( - dict(b="a1", q="c1")) - - @testing.requires.schemas - def test_keyed_accessor_wschema(self): - keyed1 = self.tables['%s.wschema' % testing.config.test_schema] - row = testing.db.execute(keyed1.select()).first() - - eq_(row.b, "a1") - eq_(row.q, "c1") - eq_(row.a, "a1") - eq_(row.c, "c1") - - def test_keyed_accessor_single(self): - keyed1 = self.tables.keyed1 - row = testing.db.execute(keyed1.select()).first() - - eq_(row.b, "a1") - eq_(row.q, "c1") - eq_(row.a, "a1") - eq_(row.c, "c1") - - def test_keyed_accessor_single_labeled(self): - keyed1 = self.tables.keyed1 - row = testing.db.execute(keyed1.select().apply_labels()).first() - - eq_(row.keyed1_b, "a1") - eq_(row.keyed1_q, "c1") - eq_(row.keyed1_a, "a1") - eq_(row.keyed1_c, "c1") - - @testing.requires.duplicate_names_in_cursor_description - def test_keyed_accessor_composite_conflict_2(self): - keyed1 = self.tables.keyed1 - keyed2 = self.tables.keyed2 - - row = testing.db.execute(select([keyed1, keyed2])).first() - # row.b is unambiguous - eq_(row.b, "b2") - # row.a is ambiguous - assert_raises_message( - exc.InvalidRequestError, - "Ambig", - getattr, row, "a" - ) - - def test_keyed_accessor_composite_names_precedent(self): - keyed1 = self.tables.keyed1 - keyed4 = self.tables.keyed4 - - row = testing.db.execute(select([keyed1, keyed4])).first() - eq_(row.b, "b4") - eq_(row.q, "q4") - eq_(row.a, "a1") - eq_(row.c, "c1") - - @testing.requires.duplicate_names_in_cursor_description - def test_keyed_accessor_composite_keys_precedent(self): - keyed1 = self.tables.keyed1 - keyed3 = self.tables.keyed3 - - row = testing.db.execute(select([keyed1, keyed3])).first() - eq_(row.q, "c1") - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name 'b'", - getattr, row, "b" - ) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name 'a'", - getattr, row, "a" - ) - eq_(row.d, "d3") - - def test_keyed_accessor_composite_labeled(self): - keyed1 = self.tables.keyed1 - keyed2 = self.tables.keyed2 - - row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \ - first() - eq_(row.keyed1_b, "a1") - eq_(row.keyed1_a, "a1") - eq_(row.keyed1_q, "c1") - eq_(row.keyed1_c, "c1") - eq_(row.keyed2_a, "a2") - eq_(row.keyed2_b, "b2") - assert_raises(KeyError, lambda: row['keyed2_c']) - assert_raises(KeyError, lambda: row['keyed2_q']) - - def test_column_label_overlap_fallback(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute( - select([content.c.type.label("content_type")])).first() - assert content.c.type not in row - assert bar.c.content_type not in row - assert sql.column('content_type') in row - - row = testing.db.execute(select([func.now().label("content_type")])). \ - first() - assert content.c.type not in row - assert bar.c.content_type not in row - assert sql.column('content_type') in row - - def test_column_label_overlap_fallback_2(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute(content.select(use_labels=True)).first() - assert content.c.type in row - assert bar.c.content_type not in row - assert sql.column('content_type') not in row - - def test_columnclause_schema_column_one(self): - keyed2 = self.tables.keyed2 - - # this is addressed by [ticket:2932] - # ColumnClause._compare_name_for_result allows the - # columns which the statement is against to be lightweight - # cols, which results in a more liberal comparison scheme - a, b = sql.column('a'), sql.column('b') - stmt = select([a, b]).select_from(table("keyed2")) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert a in row - assert b in row - - def test_columnclause_schema_column_two(self): - keyed2 = self.tables.keyed2 - - a, b = sql.column('a'), sql.column('b') - stmt = select([keyed2.c.a, keyed2.c.b]) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert a in row - assert b in row - - def test_columnclause_schema_column_three(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] - - a, b = sql.column('a'), sql.column('b') - stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert a in row - assert b in row - assert stmt.c.a in row - assert stmt.c.b in row - - def test_columnclause_schema_column_four(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] - - a, b = sql.column('keyed2_a'), sql.column('keyed2_b') - stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( - a, b) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert a in row - assert b in row - assert stmt.c.keyed2_a in row - assert stmt.c.keyed2_b in row - - def test_columnclause_schema_column_five(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] - - stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( - keyed2_a=CHAR, keyed2_b=CHAR) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert stmt.c.keyed2_a in row - assert stmt.c.keyed2_b in row - - class LimitTest(fixtures.TestBase): __backend__ = True diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py new file mode 100644 index 000000000..8461996ea --- /dev/null +++ b/test/sql/test_resultset.py @@ -0,0 +1,1136 @@ +from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \ + in_, not_in_, is_, ne_ +from sqlalchemy import testing +from sqlalchemy.testing import fixtures, engines +from sqlalchemy import util +from sqlalchemy import ( + exc, sql, func, select, String, Integer, MetaData, ForeignKey, + VARCHAR, INT, CHAR, text, type_coerce, literal_column, + TypeDecorator, table, column) +from sqlalchemy.engine import result as _result +from sqlalchemy.testing.schema import Table, Column +import operator + + +class ResultProxyTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + 'users', metadata, + Column( + 'user_id', INT, primary_key=True, + test_needs_autoincrement=True), + Column('user_name', VARCHAR(20)), + test_needs_acid=True + ) + Table( + 'addresses', metadata, + Column( + 'address_id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('user_id', Integer, ForeignKey('users.user_id')), + Column('address', String(30)), + test_needs_acid=True + ) + + Table( + 'users2', metadata, + Column('user_id', INT, primary_key=True), + Column('user_name', VARCHAR(20)), + test_needs_acid=True + ) + + def test_row_iteration(self): + users = self.tables.users + + users.insert().execute( + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, + ) + r = users.select().execute() + l = [] + for row in r: + l.append(row) + eq_(len(l), 3) + + @testing.requires.subqueries + def test_anonymous_rows(self): + users = self.tables.users + + users.insert().execute( + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, + ) + + sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \ + as_scalar() + for row in select([sel + 1, sel + 3], bind=users.bind).execute(): + eq_(row['anon_1'], 8) + eq_(row['anon_2'], 10) + + def test_row_comparison(self): + users = self.tables.users + + users.insert().execute(user_id=7, user_name='jack') + rp = users.select().execute().first() + + eq_(rp, rp) + is_(not(rp != rp), True) + + equal = (7, 'jack') + + eq_(rp, equal) + eq_(equal, rp) + is_((not (rp != equal)), True) + is_(not (equal != equal), True) + + def endless(): + while True: + yield 1 + ne_(rp, endless()) + ne_(endless(), rp) + + # test that everything compares the same + # as it would against a tuple + for compare in [False, 8, endless(), 'xyz', (7, 'jack')]: + for op in [ + operator.eq, operator.ne, operator.gt, + operator.lt, operator.ge, operator.le + ]: + + try: + control = op(equal, compare) + except TypeError: + # Py3K raises TypeError for some invalid comparisons + assert_raises(TypeError, op, rp, compare) + else: + eq_(control, op(rp, compare)) + + try: + control = op(compare, equal) + except TypeError: + # Py3K raises TypeError for some invalid comparisons + assert_raises(TypeError, op, compare, rp) + else: + eq_(control, op(compare, rp)) + + @testing.provide_metadata + def test_column_label_overlap_fallback(self): + content = Table( + 'content', self.metadata, + Column('type', String(30)), + ) + bar = Table( + 'bar', self.metadata, + Column('content_type', String(30)) + ) + self.metadata.create_all(testing.db) + testing.db.execute(content.insert().values(type="t1")) + + row = testing.db.execute(content.select(use_labels=True)).first() + in_(content.c.type, row) + not_in_(bar.c.content_type, row) + in_(sql.column('content_type'), row) + + row = testing.db.execute( + select([content.c.type.label("content_type")])).first() + in_(content.c.type, row) + + not_in_(bar.c.content_type, row) + + in_(sql.column('content_type'), row) + + row = testing.db.execute(select([func.now().label("content_type")])). \ + first() + not_in_(content.c.type, row) + + not_in_(bar.c.content_type, row) + + in_(sql.column('content_type'), row) + + def test_pickled_rows(self): + users = self.tables.users + addresses = self.tables.addresses + + users.insert().execute( + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, + ) + + for pickle in False, True: + for use_labels in False, True: + result = users.select(use_labels=use_labels).order_by( + users.c.user_id).execute().fetchall() + + if pickle: + result = util.pickle.loads(util.pickle.dumps(result)) + + eq_( + result, + [(7, "jack"), (8, "ed"), (9, "fred")] + ) + if use_labels: + eq_(result[0]['users_user_id'], 7) + eq_( + list(result[0].keys()), + ["users_user_id", "users_user_name"]) + else: + eq_(result[0]['user_id'], 7) + eq_(list(result[0].keys()), ["user_id", "user_name"]) + + eq_(result[0][0], 7) + eq_(result[0][users.c.user_id], 7) + eq_(result[0][users.c.user_name], 'jack') + + if not pickle or use_labels: + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.user_id]) + else: + # test with a different table. name resolution is + # causing 'user_id' to match when use_labels wasn't used. + eq_(result[0][addresses.c.user_id], 7) + + assert_raises( + exc.NoSuchColumnError, lambda: result[0]['fake key']) + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.address_id]) + + def test_column_error_printing(self): + row = testing.db.execute(select([1])).first() + + class unprintable(object): + + def __str__(self): + raise ValueError("nope") + + msg = r"Could not locate column in row for column '%s'" + + for accessor, repl in [ + ("x", "x"), + (Column("q", Integer), "q"), + (Column("q", Integer) + 12, r"q \+ :q_1"), + (unprintable(), "unprintable element.*"), + ]: + assert_raises_message( + exc.NoSuchColumnError, + msg % repl, + lambda: row[accessor] + ) + + def test_fetchmany(self): + users = self.tables.users + + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='ed') + users.insert().execute(user_id=9, user_name='fred') + r = users.select().execute() + l = [] + for row in r.fetchmany(size=2): + l.append(row) + eq_(len(l), 2) + + def test_column_slices(self): + users = self.tables.users + addresses = self.tables.addresses + + users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=2, user_name='jack') + addresses.insert().execute( + address_id=1, user_id=2, address='foo@bar.com') + + r = text( + "select * from addresses", bind=testing.db).execute().first() + eq_(r[0:1], (1,)) + eq_(r[1:], (2, 'foo@bar.com')) + eq_(r[:-1], (1, 2)) + + def test_column_accessor_basic_compiled(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='john'), + dict(user_id=2, user_name='jack') + ) + + r = users.select(users.c.user_id == 2).execute().first() + eq_(r.user_id, 2) + eq_(r['user_id'], 2) + eq_(r[users.c.user_id], 2) + + eq_(r.user_name, 'jack') + eq_(r['user_name'], 'jack') + eq_(r[users.c.user_name], 'jack') + + def test_column_accessor_basic_text(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='john'), + dict(user_id=2, user_name='jack') + ) + r = testing.db.execute( + text("select * from users where user_id=2")).first() + + eq_(r.user_id, 2) + eq_(r['user_id'], 2) + eq_(r[users.c.user_id], 2) + + eq_(r.user_name, 'jack') + eq_(r['user_name'], 'jack') + eq_(r[users.c.user_name], 'jack') + + def test_column_accessor_textual_select(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='john'), + dict(user_id=2, user_name='jack') + ) + # this will create column() objects inside + # the select(), these need to match on name anyway + r = testing.db.execute( + select([ + column('user_id'), column('user_name') + ]).select_from(table('users')). + where(text('user_id=2')) + ).first() + + eq_(r.user_id, 2) + eq_(r['user_id'], 2) + eq_(r[users.c.user_id], 2) + + eq_(r.user_name, 'jack') + eq_(r['user_name'], 'jack') + eq_(r[users.c.user_name], 'jack') + + def test_column_accessor_dotted_union(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='john'), + ) + + # test a little sqlite weirdness - with the UNION, + # cols come back as "users.user_id" in cursor.description + r = testing.db.execute( + text( + "select users.user_id, users.user_name " + "from users " + "UNION select users.user_id, " + "users.user_name from users" + ) + ).first() + eq_(r['user_id'], 1) + eq_(r['user_name'], "john") + eq_(list(r.keys()), ["user_id", "user_name"]) + + @testing.only_on("sqlite", "sqlite specific feature") + def test_column_accessor_sqlite_raw(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='john'), + ) + + r = text( + "select users.user_id, users.user_name " + "from users " + "UNION select users.user_id, " + "users.user_name from users", + bind=testing.db).execution_options(sqlite_raw_colnames=True). \ + execute().first() + not_in_('user_id', r) + not_in_('user_name', r) + eq_(r['users.user_id'], 1) + eq_(r['users.user_name'], "john") + eq_(list(r.keys()), ["users.user_id", "users.user_name"]) + + @testing.only_on("sqlite", "sqlite specific feature") + def test_column_accessor_sqlite_translated(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='john'), + ) + + r = text( + "select users.user_id, users.user_name " + "from users " + "UNION select users.user_id, " + "users.user_name from users", + bind=testing.db).execute().first() + eq_(r['user_id'], 1) + eq_(r['user_name'], "john") + eq_(r['users.user_id'], 1) + eq_(r['users.user_name'], "john") + eq_(list(r.keys()), ["user_id", "user_name"]) + + def test_column_accessor_labels_w_dots(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='john'), + ) + # test using literal tablename.colname + r = text( + 'select users.user_id AS "users.user_id", ' + 'users.user_name AS "users.user_name" ' + 'from users', bind=testing.db).\ + execution_options(sqlite_raw_colnames=True).execute().first() + eq_(r['users.user_id'], 1) + eq_(r['users.user_name'], "john") + not_in_("user_name", r) + eq_(list(r.keys()), ["users.user_id", "users.user_name"]) + + def test_column_accessor_unary(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='john'), + ) + + # unary experssions + r = select([users.c.user_name.distinct()]).order_by( + users.c.user_name).execute().first() + eq_(r[users.c.user_name], 'john') + eq_(r.user_name, 'john') + + def test_column_accessor_err(self): + r = testing.db.execute(select([1])).first() + assert_raises_message( + AttributeError, + "Could not locate column in row for column 'foo'", + getattr, r, "foo" + ) + assert_raises_message( + KeyError, + "Could not locate column in row for column 'foo'", + lambda: r['foo'] + ) + + def test_graceful_fetch_on_non_rows(self): + """test that calling fetchone() etc. on a result that doesn't + return rows fails gracefully. + + """ + + # these proxies don't work with no cursor.description present. + # so they don't apply to this test at the moment. + # result.FullyBufferedResultProxy, + # result.BufferedRowResultProxy, + # result.BufferedColumnResultProxy + + users = self.tables.users + + conn = testing.db.connect() + for meth in [ + lambda r: r.fetchone(), + lambda r: r.fetchall(), + lambda r: r.first(), + lambda r: r.scalar(), + lambda r: r.fetchmany(), + lambda r: r._getter('user'), + lambda r: r._has_key('user'), + ]: + trans = conn.begin() + result = conn.execute(users.insert(), user_id=1) + assert_raises_message( + exc.ResourceClosedError, + "This result object does not return rows. " + "It has been closed automatically.", + meth, result, + ) + trans.rollback() + + def test_fetchone_til_end(self): + result = testing.db.execute("select * from users") + eq_(result.fetchone(), None) + eq_(result.fetchone(), None) + eq_(result.fetchone(), None) + result.close() + assert_raises_message( + exc.ResourceClosedError, + "This result object is closed.", + result.fetchone + ) + + def test_row_case_sensitive(self): + row = testing.db.execute( + select([ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive") + ]) + ).first() + + eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) + + in_("case_insensitive", row._keymap) + in_("CaseSensitive", row._keymap) + not_in_("casesensitive", row._keymap) + + eq_(row["case_insensitive"], 1) + eq_(row["CaseSensitive"], 2) + + assert_raises( + KeyError, + lambda: row["Case_insensitive"] + ) + assert_raises( + KeyError, + lambda: row["casesensitive"] + ) + + def test_row_case_sensitive_unoptimized(self): + ins_db = engines.testing_engine(options={"case_sensitive": True}) + row = ins_db.execute( + select([ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + text("3 AS screw_up_the_cols") + ]) + ).first() + + eq_( + list(row.keys()), + ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) + + in_("case_insensitive", row._keymap) + in_("CaseSensitive", row._keymap) + not_in_("casesensitive", row._keymap) + + eq_(row["case_insensitive"], 1) + eq_(row["CaseSensitive"], 2) + eq_(row["screw_up_the_cols"], 3) + + assert_raises(KeyError, lambda: row["Case_insensitive"]) + assert_raises(KeyError, lambda: row["casesensitive"]) + assert_raises(KeyError, lambda: row["screw_UP_the_cols"]) + + def test_row_case_insensitive(self): + ins_db = engines.testing_engine(options={"case_sensitive": False}) + row = ins_db.execute( + select([ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive") + ]) + ).first() + + eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) + + in_("case_insensitive", row._keymap) + in_("CaseSensitive", row._keymap) + in_("casesensitive", row._keymap) + + eq_(row["case_insensitive"], 1) + eq_(row["CaseSensitive"], 2) + eq_(row["Case_insensitive"], 1) + eq_(row["casesensitive"], 2) + + def test_row_case_insensitive_unoptimized(self): + ins_db = engines.testing_engine(options={"case_sensitive": False}) + row = ins_db.execute( + select([ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + text("3 AS screw_up_the_cols") + ]) + ).first() + + eq_( + list(row.keys()), + ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) + + in_("case_insensitive", row._keymap) + in_("CaseSensitive", row._keymap) + in_("casesensitive", row._keymap) + + eq_(row["case_insensitive"], 1) + eq_(row["CaseSensitive"], 2) + eq_(row["screw_up_the_cols"], 3) + eq_(row["Case_insensitive"], 1) + eq_(row["casesensitive"], 2) + eq_(row["screw_UP_the_cols"], 3) + + def test_row_as_args(self): + users = self.tables.users + + users.insert().execute(user_id=1, user_name='john') + r = users.select(users.c.user_id == 1).execute().first() + users.delete().execute() + users.insert().execute(r) + eq_(users.select().execute().fetchall(), [(1, 'john')]) + + def test_result_as_args(self): + users = self.tables.users + users2 = self.tables.users2 + + users.insert().execute([ + dict(user_id=1, user_name='john'), + dict(user_id=2, user_name='ed')]) + r = users.select().execute() + users2.insert().execute(list(r)) + eq_( + users2.select().order_by(users2.c.user_id).execute().fetchall(), + [(1, 'john'), (2, 'ed')] + ) + + users2.delete().execute() + r = users.select().execute() + users2.insert().execute(*list(r)) + eq_( + users2.select().order_by(users2.c.user_id).execute().fetchall(), + [(1, 'john'), (2, 'ed')] + ) + + @testing.requires.duplicate_names_in_cursor_description + def test_ambiguous_column(self): + users = self.tables.users + addresses = self.tables.addresses + + users.insert().execute(user_id=1, user_name='john') + result = users.outerjoin(addresses).select().execute() + r = result.first() + + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r['user_id'] + ) + + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r[users.c.user_id] + ) + + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r[addresses.c.user_id] + ) + + # try to trick it - fake_table isn't in the result! + # we get the correct error + fake_table = Table('fake', MetaData(), Column('user_id', Integer)) + assert_raises_message( + exc.InvalidRequestError, + "Could not locate column in row for column 'fake.user_id'", + lambda: r[fake_table.c.user_id] + ) + + r = util.pickle.loads(util.pickle.dumps(r)) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r['user_id'] + ) + + result = users.outerjoin(addresses).select().execute() + result = _result.BufferedColumnResultProxy(result.context) + r = result.first() + assert isinstance(r, _result.BufferedColumnRow) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r['user_id'] + ) + + @testing.requires.duplicate_names_in_cursor_description + def test_ambiguous_column_by_col(self): + users = self.tables.users + + users.insert().execute(user_id=1, user_name='john') + ua = users.alias() + u2 = users.alias() + result = select([users.c.user_id, ua.c.user_id]).execute() + row = result.first() + + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[users.c.user_id] + ) + + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[ua.c.user_id] + ) + + # Unfortunately, this fails - + # we'd like + # "Could not locate column in row" + # to be raised here, but the check for + # "common column" in _compare_name_for_result() + # has other requirements to be more liberal. + # Ultimately the + # expression system would need a way to determine + # if given two columns in a "proxy" relationship, if they + # refer to a different parent table + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[u2.c.user_id] + ) + + @testing.requires.duplicate_names_in_cursor_description + def test_ambiguous_column_contains(self): + users = self.tables.users + addresses = self.tables.addresses + + # ticket 2702. in 0.7 we'd get True, False. + # in 0.8, both columns are present so it's True; + # but when they're fetched you'll get the ambiguous error. + users.insert().execute(user_id=1, user_name='john') + result = select([users.c.user_id, addresses.c.user_id]).\ + select_from(users.outerjoin(addresses)).execute() + row = result.first() + + eq_( + set([users.c.user_id in row, addresses.c.user_id in row]), + set([True]) + ) + + def test_ambiguous_column_by_col_plus_label(self): + users = self.tables.users + + users.insert().execute(user_id=1, user_name='john') + result = select( + [users.c.user_id, + type_coerce(users.c.user_id, Integer).label('foo')]).execute() + row = result.first() + eq_( + row[users.c.user_id], 1 + ) + eq_( + row[1], 1 + ) + + def test_fetch_partial_result_map(self): + users = self.tables.users + + users.insert().execute(user_id=7, user_name='ed') + + t = text("select * from users").columns( + user_name=String() + ) + eq_( + testing.db.execute(t).fetchall(), [(7, 'ed')] + ) + + def test_fetch_unordered_result_map(self): + users = self.tables.users + + users.insert().execute(user_id=7, user_name='ed') + + class Goofy1(TypeDecorator): + impl = String + + def process_result_value(self, value, dialect): + return value + "a" + + class Goofy2(TypeDecorator): + impl = String + + def process_result_value(self, value, dialect): + return value + "b" + + class Goofy3(TypeDecorator): + impl = String + + def process_result_value(self, value, dialect): + return value + "c" + + t = text( + "select user_name as a, user_name as b, " + "user_name as c from users").columns( + a=Goofy1(), b=Goofy2(), c=Goofy3() + ) + eq_( + testing.db.execute(t).fetchall(), [ + ('eda', 'edb', 'edc') + ] + ) + + @testing.requires.subqueries + def test_column_label_targeting(self): + users = self.tables.users + + users.insert().execute(user_id=7, user_name='ed') + + for s in ( + users.select().alias('foo'), + users.select().alias(users.name), + ): + row = s.select(use_labels=True).execute().first() + eq_(row[s.c.user_id], 7) + eq_(row[s.c.user_name], 'ed') + + def test_keys(self): + users = self.tables.users + + users.insert().execute(user_id=1, user_name='foo') + result = users.select().execute() + eq_( + result.keys(), + ['user_id', 'user_name'] + ) + row = result.first() + eq_( + row.keys(), + ['user_id', 'user_name'] + ) + + def test_keys_anon_labels(self): + """test [ticket:3483]""" + + users = self.tables.users + + users.insert().execute(user_id=1, user_name='foo') + result = testing.db.execute( + select([ + users.c.user_id, + users.c.user_name.label(None), + func.count(literal_column('1'))]). + group_by(users.c.user_id, users.c.user_name) + ) + + eq_( + result.keys(), + ['user_id', 'user_name_1', 'count_1'] + ) + row = result.first() + eq_( + row.keys(), + ['user_id', 'user_name_1', 'count_1'] + ) + + def test_items(self): + users = self.tables.users + + users.insert().execute(user_id=1, user_name='foo') + r = users.select().execute().first() + eq_( + [(x[0].lower(), x[1]) for x in list(r.items())], + [('user_id', 1), ('user_name', 'foo')]) + + def test_len(self): + users = self.tables.users + + users.insert().execute(user_id=1, user_name='foo') + r = users.select().execute().first() + eq_(len(r), 2) + + r = testing.db.execute('select user_name, user_id from users'). \ + first() + eq_(len(r), 2) + r = testing.db.execute('select user_name from users').first() + eq_(len(r), 1) + + def test_sorting_in_python(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name='foo'), + dict(user_id=2, user_name='bar'), + dict(user_id=3, user_name='def'), + ) + + rows = users.select().order_by(users.c.user_name).execute().fetchall() + + eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')]) + + eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')]) + + def test_column_order_with_simple_query(self): + # should return values in column definition order + users = self.tables.users + + users.insert().execute(user_id=1, user_name='foo') + r = users.select(users.c.user_id == 1).execute().first() + eq_(r[0], 1) + eq_(r[1], 'foo') + eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name']) + eq_(list(r.values()), [1, 'foo']) + + def test_column_order_with_text_query(self): + # should return values in query order + users = self.tables.users + + users.insert().execute(user_id=1, user_name='foo') + r = testing.db.execute('select user_name, user_id from users'). \ + first() + eq_(r[0], 'foo') + eq_(r[1], 1) + eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id']) + eq_(list(r.values()), ['foo', 1]) + + @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()') + @testing.crashes('firebird', 'An identifier must begin with a letter') + @testing.provide_metadata + def test_column_accessor_shadow(self): + shadowed = Table( + 'test_shadowed', self.metadata, + Column('shadow_id', INT, primary_key=True), + Column('shadow_name', VARCHAR(20)), + Column('parent', VARCHAR(20)), + Column('row', VARCHAR(40)), + Column('_parent', VARCHAR(20)), + Column('_row', VARCHAR(20)), + ) + self.metadata.create_all() + shadowed.insert().execute( + shadow_id=1, shadow_name='The Shadow', parent='The Light', + row='Without light there is no shadow', + _parent='Hidden parent', _row='Hidden row') + r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() + + eq_(r.shadow_id, 1) + eq_(r['shadow_id'], 1) + eq_(r[shadowed.c.shadow_id], 1) + + eq_(r.shadow_name, 'The Shadow') + eq_(r['shadow_name'], 'The Shadow') + eq_(r[shadowed.c.shadow_name], 'The Shadow') + + eq_(r.parent, 'The Light') + eq_(r['parent'], 'The Light') + eq_(r[shadowed.c.parent], 'The Light') + + eq_(r.row, 'Without light there is no shadow') + eq_(r['row'], 'Without light there is no shadow') + eq_(r[shadowed.c.row], 'Without light there is no shadow') + + eq_(r['_parent'], 'Hidden parent') + eq_(r['_row'], 'Hidden row') + + +class KeyTargetingTest(fixtures.TablesTest): + run_inserts = 'once' + run_deletes = None + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + 'keyed1', metadata, Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q") + ) + Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) + Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) + Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) + Table('content', metadata, Column('t', String(30), key="type")) + Table('bar', metadata, Column('ctype', String(30), key="content_type")) + + if testing.requires.schemas.enabled: + Table( + 'wschema', metadata, + Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q"), + schema=testing.config.test_schema + ) + + @classmethod + def insert_data(cls): + cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) + cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) + cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) + cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) + cls.tables.content.insert().execute(type="t1") + + if testing.requires.schemas.enabled: + cls.tables[ + '%s.wschema' % testing.config.test_schema].insert().execute( + dict(b="a1", q="c1")) + + @testing.requires.schemas + def test_keyed_accessor_wschema(self): + keyed1 = self.tables['%s.wschema' % testing.config.test_schema] + row = testing.db.execute(keyed1.select()).first() + + eq_(row.b, "a1") + eq_(row.q, "c1") + eq_(row.a, "a1") + eq_(row.c, "c1") + + def test_keyed_accessor_single(self): + keyed1 = self.tables.keyed1 + row = testing.db.execute(keyed1.select()).first() + + eq_(row.b, "a1") + eq_(row.q, "c1") + eq_(row.a, "a1") + eq_(row.c, "c1") + + def test_keyed_accessor_single_labeled(self): + keyed1 = self.tables.keyed1 + row = testing.db.execute(keyed1.select().apply_labels()).first() + + eq_(row.keyed1_b, "a1") + eq_(row.keyed1_q, "c1") + eq_(row.keyed1_a, "a1") + eq_(row.keyed1_c, "c1") + + @testing.requires.duplicate_names_in_cursor_description + def test_keyed_accessor_composite_conflict_2(self): + keyed1 = self.tables.keyed1 + keyed2 = self.tables.keyed2 + + row = testing.db.execute(select([keyed1, keyed2])).first() + # row.b is unambiguous + eq_(row.b, "b2") + # row.a is ambiguous + assert_raises_message( + exc.InvalidRequestError, + "Ambig", + getattr, row, "a" + ) + + def test_keyed_accessor_composite_names_precedent(self): + keyed1 = self.tables.keyed1 + keyed4 = self.tables.keyed4 + + row = testing.db.execute(select([keyed1, keyed4])).first() + eq_(row.b, "b4") + eq_(row.q, "q4") + eq_(row.a, "a1") + eq_(row.c, "c1") + + @testing.requires.duplicate_names_in_cursor_description + def test_keyed_accessor_composite_keys_precedent(self): + keyed1 = self.tables.keyed1 + keyed3 = self.tables.keyed3 + + row = testing.db.execute(select([keyed1, keyed3])).first() + eq_(row.q, "c1") + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name 'b'", + getattr, row, "b" + ) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name 'a'", + getattr, row, "a" + ) + eq_(row.d, "d3") + + def test_keyed_accessor_composite_labeled(self): + keyed1 = self.tables.keyed1 + keyed2 = self.tables.keyed2 + + row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \ + first() + eq_(row.keyed1_b, "a1") + eq_(row.keyed1_a, "a1") + eq_(row.keyed1_q, "c1") + eq_(row.keyed1_c, "c1") + eq_(row.keyed2_a, "a2") + eq_(row.keyed2_b, "b2") + assert_raises(KeyError, lambda: row['keyed2_c']) + assert_raises(KeyError, lambda: row['keyed2_q']) + + def test_column_label_overlap_fallback(self): + content, bar = self.tables.content, self.tables.bar + row = testing.db.execute( + select([content.c.type.label("content_type")])).first() + + not_in_(content.c.type, row) + not_in_(bar.c.content_type, row) + + in_(sql.column('content_type'), row) + + row = testing.db.execute(select([func.now().label("content_type")])). \ + first() + not_in_(content.c.type, row) + not_in_(bar.c.content_type, row) + in_(sql.column('content_type'), row) + + def test_column_label_overlap_fallback_2(self): + content, bar = self.tables.content, self.tables.bar + row = testing.db.execute(content.select(use_labels=True)).first() + in_(content.c.type, row) + not_in_(bar.c.content_type, row) + not_in_(sql.column('content_type'), row) + + def test_columnclause_schema_column_one(self): + keyed2 = self.tables.keyed2 + + # this is addressed by [ticket:2932] + # ColumnClause._compare_name_for_result allows the + # columns which the statement is against to be lightweight + # cols, which results in a more liberal comparison scheme + a, b = sql.column('a'), sql.column('b') + stmt = select([a, b]).select_from(table("keyed2")) + row = testing.db.execute(stmt).first() + + in_(keyed2.c.a, row) + in_(keyed2.c.b, row) + in_(a, row) + in_(b, row) + + def test_columnclause_schema_column_two(self): + keyed2 = self.tables.keyed2 + + a, b = sql.column('a'), sql.column('b') + stmt = select([keyed2.c.a, keyed2.c.b]) + row = testing.db.execute(stmt).first() + + in_(keyed2.c.a, row) + in_(keyed2.c.b, row) + in_(a, row) + in_(b, row) + + def test_columnclause_schema_column_three(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + a, b = sql.column('a'), sql.column('b') + stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) + row = testing.db.execute(stmt).first() + + in_(keyed2.c.a, row) + in_(keyed2.c.b, row) + in_(a, row) + in_(b, row) + in_(stmt.c.a, row) + in_(stmt.c.b, row) + + def test_columnclause_schema_column_four(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + a, b = sql.column('keyed2_a'), sql.column('keyed2_b') + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + a, b) + row = testing.db.execute(stmt).first() + + in_(keyed2.c.a, row) + in_(keyed2.c.b, row) + in_(a, row) + in_(b, row) + in_(stmt.c.keyed2_a, row) + in_(stmt.c.keyed2_b, row) + + def test_columnclause_schema_column_five(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + keyed2_a=CHAR, keyed2_b=CHAR) + row = testing.db.execute(stmt).first() + + in_(keyed2.c.a, row) + in_(keyed2.c.b, row) + in_(stmt.c.keyed2_a, row) + in_(stmt.c.keyed2_b, row) |
