diff options
| author | Sławek Ehlert <slafs@op.pl> | 2015-01-27 22:04:38 +0100 |
|---|---|---|
| committer | Sławek Ehlert <slafs@op.pl> | 2015-01-27 22:04:38 +0100 |
| commit | 57b2bd5dcba6140b511c898c0f682234f13d5c51 (patch) | |
| tree | a0899b2a35d27e177001b163054c3c9a8f7f1c06 /test/dialect | |
| parent | 6a1f16d09958e549502a0991890d64964c71b357 (diff) | |
| parent | 8aaa8dd6bdfb85fa481efa3115b9080d935d344c (diff) | |
| download | sqlalchemy-pr/152.tar.gz | |
Merge branch 'master' into oracle-servicename-optionpr/152
Diffstat (limited to 'test/dialect')
| -rw-r--r-- | test/dialect/mssql/test_engine.py | 3 | ||||
| -rw-r--r-- | test/dialect/mssql/test_query.py | 76 | ||||
| -rw-r--r-- | test/dialect/mssql/test_reflection.py | 11 | ||||
| -rw-r--r-- | test/dialect/mssql/test_types.py | 312 | ||||
| -rw-r--r-- | test/dialect/mysql/test_query.py | 39 | ||||
| -rw-r--r-- | test/dialect/mysql/test_types.py | 345 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_compiler.py | 54 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_dialect.py | 3 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_query.py | 327 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_reflection.py | 12 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_types.py | 20 | ||||
| -rw-r--r-- | test/dialect/test_oracle.py | 138 | ||||
| -rw-r--r-- | test/dialect/test_sqlite.py | 772 | ||||
| -rw-r--r-- | test/dialect/test_suite.py | 1 |
14 files changed, 1476 insertions, 637 deletions
diff --git a/test/dialect/mssql/test_engine.py b/test/dialect/mssql/test_engine.py index 4b4780d43..a994b1787 100644 --- a/test/dialect/mssql/test_engine.py +++ b/test/dialect/mssql/test_engine.py @@ -157,8 +157,7 @@ class ParseConnectTest(fixtures.TestBase): eq_(dialect.is_disconnect("not an error", None, None), False) - @testing.only_on(['mssql+pyodbc', 'mssql+pymssql'], - "FreeTDS specific test") + @testing.requires.mssql_freetds def test_bad_freetds_warning(self): engine = engines.testing_engine() diff --git a/test/dialect/mssql/test_query.py b/test/dialect/mssql/test_query.py index 715eebb84..e0affe831 100644 --- a/test/dialect/mssql/test_query.py +++ b/test/dialect/mssql/test_query.py @@ -7,6 +7,7 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import testing from sqlalchemy.util import ue from sqlalchemy import util +from sqlalchemy.testing.assertsql import CursorSQL @@ -163,7 +164,6 @@ class QueryUnicodeTest(fixtures.TestBase): finally: meta.drop_all() -from sqlalchemy.testing.assertsql import ExactSQL class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase): __only_on__ = 'mssql' @@ -232,27 +232,73 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase): con.execute("""drop trigger paj""") meta.drop_all() - @testing.fails_on_everything_except('mssql+pyodbc', 'pyodbc-specific feature') @testing.provide_metadata def test_disable_scope_identity(self): engine = engines.testing_engine(options={"use_scope_identity": False}) metadata = self.metadata - metadata.bind = engine - t1 = Table('t1', metadata, - Column('id', Integer, primary_key=True), - implicit_returning=False + t1 = Table( + 't1', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)), + implicit_returning=False ) - metadata.create_all() + metadata.create_all(engine) + + with self.sql_execution_asserter(engine) as asserter: + engine.execute(t1.insert(), {"data": "somedata"}) + + asserter.assert_( + CursorSQL( + "INSERT INTO t1 (data) VALUES (?)", + ("somedata", ) + ), + CursorSQL("SELECT @@identity AS lastrowid"), + ) + + @testing.provide_metadata + def test_enable_scope_identity(self): + engine = engines.testing_engine(options={"use_scope_identity": True}) + metadata = self.metadata + t1 = Table( + 't1', metadata, + Column('id', Integer, primary_key=True), + implicit_returning=False + ) + metadata.create_all(engine) + + with self.sql_execution_asserter(engine) as asserter: + engine.execute(t1.insert()) + + # even with pyodbc, we don't embed the scope identity on a + # DEFAULT VALUES insert + asserter.assert_( + CursorSQL("INSERT INTO t1 DEFAULT VALUES"), + CursorSQL("SELECT scope_identity() AS lastrowid"), + ) + + @testing.only_on('mssql+pyodbc') + @testing.provide_metadata + def test_embedded_scope_identity(self): + engine = engines.testing_engine(options={"use_scope_identity": True}) + metadata = self.metadata + t1 = Table( + 't1', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)), + implicit_returning=False + ) + metadata.create_all(engine) + + with self.sql_execution_asserter(engine) as asserter: + engine.execute(t1.insert(), {'data': 'somedata'}) - self.assert_sql_execution( - testing.db, - lambda: engine.execute(t1.insert()), - ExactSQL("INSERT INTO t1 DEFAULT VALUES"), - # we don't have an event for - # "SELECT @@IDENTITY" part here. - # this will be in 0.8 with #2459 + # pyodbc-specific system + asserter.assert_( + CursorSQL( + "INSERT INTO t1 (data) VALUES (?); select scope_identity()", + ("somedata", ) + ), ) - assert not engine.dialect.use_scope_identity def test_insertid_schema(self): meta = MetaData(testing.db) diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py index 0ef69f656..bee441586 100644 --- a/test/dialect/mssql/test_reflection.py +++ b/test/dialect/mssql/test_reflection.py @@ -24,14 +24,14 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): Column('user_name', types.VARCHAR(20), nullable=False), Column('test1', types.CHAR(5), nullable=False), Column('test2', types.Float(5), nullable=False), - Column('test3', types.Text), + Column('test3', types.Text('max')), Column('test4', types.Numeric, nullable=False), Column('test5', types.DateTime), Column('parent_user_id', types.Integer, ForeignKey('engine_users.user_id')), Column('test6', types.DateTime, nullable=False), - Column('test7', types.Text), - Column('test8', types.LargeBinary), + Column('test7', types.Text('max')), + Column('test8', types.LargeBinary('max')), Column('test_passivedefault2', types.Integer, server_default='5'), Column('test9', types.BINARY(100)), @@ -204,6 +204,11 @@ class InfoCoerceUnicodeTest(fixtures.TestBase, AssertsCompiledSQL): class ReflectHugeViewTest(fixtures.TestBase): __only_on__ = 'mssql' + # crashes on freetds 0.91, not worth it + __skip_if__ = ( + lambda: testing.requires.mssql_freetds.enabled, + ) + def setup(self): self.col_num = 150 diff --git a/test/dialect/mssql/test_types.py b/test/dialect/mssql/test_types.py index 9dc1983ae..5c9157379 100644 --- a/test/dialect/mssql/test_types.py +++ b/test/dialect/mssql/test_types.py @@ -2,12 +2,15 @@ from sqlalchemy.testing import eq_, engines, pickleable import datetime import os -from sqlalchemy import * +from sqlalchemy import Table, Column, MetaData, Float, \ + Integer, String, Boolean, TIMESTAMP, Sequence, Numeric, select, \ + Date, Time, DateTime, DefaultClause, PickleType, text, Text, \ + UnicodeText, LargeBinary from sqlalchemy import types, schema from sqlalchemy.databases import mssql from sqlalchemy.dialects.mssql.base import TIME from sqlalchemy.testing import fixtures, \ - AssertsExecutionResults, ComparesTables + AssertsExecutionResults, ComparesTables from sqlalchemy import testing from sqlalchemy.testing import emits_warning_on import decimal @@ -32,6 +35,7 @@ class TimeTypeTest(fixtures.TestBase): class TypeDDLTest(fixtures.TestBase): + def test_boolean(self): "Exercise type specification for boolean type." @@ -39,7 +43,7 @@ class TypeDDLTest(fixtures.TestBase): # column type, args, kwargs, expected ddl (Boolean, [], {}, 'BIT'), - ] + ] metadata = MetaData() table_args = ['test_mssql_boolean', metadata] @@ -54,11 +58,11 @@ class TypeDDLTest(fixtures.TestBase): for col in boolean_table.c: index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), - "%s %s" % (col.name, columns[index][3])) + testing.eq_( + gen.get_column_specification(col), + "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - def test_numeric(self): "Exercise type specification and options for numeric types." @@ -88,7 +92,7 @@ class TypeDDLTest(fixtures.TestBase): 'TINYINT'), (types.SmallInteger, [], {}, 'SMALLINT'), - ] + ] metadata = MetaData() table_args = ['test_mssql_numeric', metadata] @@ -103,11 +107,11 @@ class TypeDDLTest(fixtures.TestBase): for col in numeric_table.c: index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), - "%s %s" % (col.name, columns[index][3])) + testing.eq_( + gen.get_column_specification(col), + "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - def test_char(self): """Exercise COLLATE-ish options on string types.""" @@ -149,7 +153,7 @@ class TypeDDLTest(fixtures.TestBase): 'NTEXT'), (mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'}, 'NTEXT COLLATE Latin1_General_CI_AS'), - ] + ] metadata = MetaData() table_args = ['test_mssql_charset', metadata] @@ -164,10 +168,48 @@ class TypeDDLTest(fixtures.TestBase): for col in charset_table.c: index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), - "%s %s" % (col.name, columns[index][3])) + testing.eq_( + gen.get_column_specification(col), + "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) + def test_large_type_deprecation(self): + d1 = mssql.dialect(deprecate_large_types=True) + d2 = mssql.dialect(deprecate_large_types=False) + d3 = mssql.dialect() + d3.server_version_info = (11, 0) + d3._setup_version_attributes() + d4 = mssql.dialect() + d4.server_version_info = (10, 0) + d4._setup_version_attributes() + + for dialect in (d1, d3): + eq_( + str(Text().compile(dialect=dialect)), + "VARCHAR(max)" + ) + eq_( + str(UnicodeText().compile(dialect=dialect)), + "NVARCHAR(max)" + ) + eq_( + str(LargeBinary().compile(dialect=dialect)), + "VARBINARY(max)" + ) + + for dialect in (d2, d4): + eq_( + str(Text().compile(dialect=dialect)), + "TEXT" + ) + eq_( + str(UnicodeText().compile(dialect=dialect)), + "NTEXT" + ) + eq_( + str(LargeBinary().compile(dialect=dialect)), + "IMAGE" + ) def test_timestamp(self): """Exercise TIMESTAMP column.""" @@ -176,9 +218,10 @@ class TypeDDLTest(fixtures.TestBase): metadata = MetaData() spec, expected = (TIMESTAMP, 'TIMESTAMP') - t = Table('mssql_ts', metadata, - Column('id', Integer, primary_key=True), - Column('t', spec, nullable=None)) + t = Table( + 'mssql_ts', metadata, + Column('id', Integer, primary_key=True), + Column('t', spec, nullable=None)) gen = dialect.ddl_compiler(dialect, schema.CreateTable(t)) testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected) self.assert_(repr(t.c.t)) @@ -255,7 +298,11 @@ class TypeDDLTest(fixtures.TestBase): % (col.name, columns[index][3])) self.assert_(repr(col)) -class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTables): +metadata = None + + +class TypeRoundTripTest( + fixtures.TestBase, AssertsExecutionResults, ComparesTables): __only_on__ = 'mssql' @classmethod @@ -266,15 +313,18 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl def teardown(self): metadata.drop_all() - @testing.fails_on_everything_except('mssql+pyodbc', - 'this is some pyodbc-specific feature') + @testing.fails_on_everything_except( + 'mssql+pyodbc', + 'this is some pyodbc-specific feature') def test_decimal_notation(self): - numeric_table = Table('numeric_table', metadata, Column('id', - Integer, Sequence('numeric_id_seq', - optional=True), primary_key=True), - Column('numericcol', - Numeric(precision=38, scale=20, - asdecimal=True))) + numeric_table = Table( + 'numeric_table', metadata, + Column( + 'id', Integer, + Sequence('numeric_id_seq', optional=True), primary_key=True), + Column( + 'numericcol', + Numeric(precision=38, scale=20, asdecimal=True))) metadata.create_all() test_items = [decimal.Decimal(d) for d in ( '1500000.00000000000000000000', @@ -323,7 +373,7 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl '000000000000.32E12', '00000000000000.1E+12', '000000000000.2E-32', - )] + )] for value in test_items: numeric_table.insert().execute(numericcol=value) @@ -332,10 +382,13 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl assert value[0] in test_items, "%r not in test_items" % value[0] def test_float(self): - float_table = Table('float_table', metadata, Column('id', - Integer, Sequence('numeric_id_seq', - optional=True), primary_key=True), - Column('floatcol', Float())) + float_table = Table( + 'float_table', metadata, + Column( + 'id', Integer, + Sequence('numeric_id_seq', optional=True), primary_key=True), + Column('floatcol', Float())) + metadata.create_all() try: test_items = [float(d) for d in ( @@ -363,13 +416,12 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl '1E-6', '1E-7', '1E-8', - )] + )] for value in test_items: float_table.insert().execute(floatcol=value) except Exception as e: raise e - # todo this should suppress warnings, but it does not @emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*') def test_dates(self): @@ -417,20 +469,20 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl (mssql.MSDateTime2, [1], {}, 'DATETIME2(1)', ['>=', (10,)]), - ] + ] table_args = ['test_mssql_dates', metadata] for index, spec in enumerate(columns): type_, args, kw, res, requires = spec[0:5] - if requires and testing._is_excluded('mssql', *requires) \ - or not requires: - c = Column('c%s' % index, type_(*args, - **kw), nullable=None) + if requires and \ + testing._is_excluded('mssql', *requires) or not requires: + c = Column('c%s' % index, type_(*args, **kw), nullable=None) testing.db.dialect.type_descriptor(c.type) table_args.append(c) dates_table = Table(*table_args) - gen = testing.db.dialect.ddl_compiler(testing.db.dialect, - schema.CreateTable(dates_table)) + gen = testing.db.dialect.ddl_compiler( + testing.db.dialect, + schema.CreateTable(dates_table)) for col in dates_table.c: index = int(col.name[1:]) testing.eq_(gen.get_column_specification(col), '%s %s' @@ -443,13 +495,14 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl self.assert_types_base(col, dates_table.c[col.key]) def test_date_roundtrip(self): - t = Table('test_dates', metadata, - Column('id', Integer, - Sequence('datetest_id_seq', optional=True), - primary_key=True), - Column('adate', Date), - Column('atime', Time), - Column('adatetime', DateTime)) + t = Table( + 'test_dates', metadata, + Column('id', Integer, + Sequence('datetest_id_seq', optional=True), + primary_key=True), + Column('adate', Date), + Column('atime', Time), + Column('adatetime', DateTime)) metadata.create_all() d1 = datetime.date(2007, 10, 30) t1 = datetime.time(11, 2, 32) @@ -471,18 +524,18 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl @emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*') @testing.provide_metadata - def test_binary_reflection(self): + def _test_binary_reflection(self, deprecate_large_types): "Exercise type specification for binary types." columns = [ - # column type, args, kwargs, expected ddl + # column type, args, kwargs, expected ddl from reflected (mssql.MSBinary, [], {}, - 'BINARY'), + 'BINARY(1)'), (mssql.MSBinary, [10], {}, 'BINARY(10)'), (types.BINARY, [], {}, - 'BINARY'), + 'BINARY(1)'), (types.BINARY, [10], {}, 'BINARY(10)'), @@ -503,10 +556,12 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl 'IMAGE'), (types.LargeBinary, [], {}, - 'IMAGE'), + 'IMAGE' if not deprecate_large_types else 'VARBINARY(max)'), ] metadata = self.metadata + metadata.bind = engines.testing_engine( + options={"deprecate_large_types": deprecate_large_types}) table_args = ['test_mssql_binary', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec @@ -516,59 +571,80 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl metadata.create_all() reflected_binary = Table('test_mssql_binary', MetaData(testing.db), autoload=True) - for col in reflected_binary.c: + for col, spec in zip(reflected_binary.c, columns): + eq_( + str(col.type), spec[3], + "column %s %s != %s" % (col.key, str(col.type), spec[3]) + ) c1 = testing.db.dialect.type_descriptor(col.type).__class__ c2 = \ testing.db.dialect.type_descriptor( binary_table.c[col.name].type).__class__ - assert issubclass(c1, c2), '%r is not a subclass of %r' \ - % (c1, c2) + assert issubclass(c1, c2), \ + 'column %s: %r is not a subclass of %r' \ + % (col.key, c1, c2) if binary_table.c[col.name].type.length: testing.eq_(col.type.length, binary_table.c[col.name].type.length) + def test_binary_reflection_legacy_large_types(self): + self._test_binary_reflection(False) + + @testing.only_on('mssql >= 11') + def test_binary_reflection_sql2012_large_types(self): + self._test_binary_reflection(True) def test_autoincrement(self): - Table('ai_1', metadata, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_2', metadata, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_3', metadata, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_y', Integer, primary_key=True)) - Table('ai_4', metadata, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_n2', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_5', metadata, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_6', metadata, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True)) - Table('ai_7', metadata, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True)) - Table('ai_8', metadata, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True)) + Table( + 'ai_1', metadata, + Column('int_y', Integer, primary_key=True), + Column( + 'int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table( + 'ai_2', metadata, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table( + 'ai_3', metadata, + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False), + Column('int_y', Integer, primary_key=True)) + + Table( + 'ai_4', metadata, + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False), + Column('int_n2', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table( + 'ai_5', metadata, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table( + 'ai_6', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('int_y', Integer, primary_key=True)) + Table( + 'ai_7', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('o2', String(1), DefaultClause('x'), + primary_key=True), + Column('int_y', Integer, primary_key=True)) + Table( + 'ai_8', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('o2', String(1), DefaultClause('x'), + primary_key=True)) metadata.create_all() table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', - 'ai_5', 'ai_6', 'ai_7', 'ai_8'] + 'ai_5', 'ai_6', 'ai_7', 'ai_8'] mr = MetaData(testing.db) for name in table_names: @@ -586,27 +662,29 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl if testing.db.driver == 'mxodbc': eng = \ - [engines.testing_engine(options={'implicit_returning' - : True})] + [engines.testing_engine(options={ + 'implicit_returning': True})] else: eng = \ - [engines.testing_engine(options={'implicit_returning' - : False}), - engines.testing_engine(options={'implicit_returning' - : True})] + [engines.testing_engine(options={ + 'implicit_returning': False}), + engines.testing_engine(options={ + 'implicit_returning': True})] for counter, engine in enumerate(eng): engine.execute(tbl.insert()) if 'int_y' in tbl.c: assert engine.scalar(select([tbl.c.int_y])) \ == counter + 1 - assert list(engine.execute(tbl.select()).first()).\ - count(counter + 1) == 1 + assert list( + engine.execute(tbl.select()).first()).\ + count(counter + 1) == 1 else: assert 1 \ not in list(engine.execute(tbl.select()).first()) engine.execute(tbl.delete()) + class MonkeyPatchedBinaryTest(fixtures.TestBase): __only_on__ = 'mssql+pymssql' @@ -622,7 +700,12 @@ class MonkeyPatchedBinaryTest(fixtures.TestBase): result = module.Binary(input) eq_(result, expected_result) +binary_table = None +MyPickleType = None + + class BinaryTest(fixtures.TestBase, AssertsExecutionResults): + """Test the Binary and VarBinary types""" __only_on__ = 'mssql' @@ -655,7 +738,7 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults): Column('misc', String(30)), Column('pickled', PickleType), Column('mypickle', MyPickleType), - ) + ) binary_table.create() def teardown(self): @@ -679,7 +762,7 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults): data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3, - ) + ) binary_table.insert().execute( primary_id=2, misc='binary_data_two.dat', @@ -687,7 +770,7 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults): data_image=stream2, data_slice=stream2[0:99], pickled=testobj2, - ) + ) # TODO: pyodbc does not seem to accept "None" for a VARBINARY # column (data=None). error: [Microsoft][ODBC SQL Server @@ -697,17 +780,21 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults): # misc='binary_data_two.dat', data=None, data_image=None, # data_slice=stream2[0:99], pickled=None) - binary_table.insert().execute(primary_id=3, - misc='binary_data_two.dat', data_image=None, - data_slice=stream2[0:99], pickled=None) + binary_table.insert().execute( + primary_id=3, + misc='binary_data_two.dat', data_image=None, + data_slice=stream2[0:99], pickled=None) for stmt in \ binary_table.select(order_by=binary_table.c.primary_id), \ - text('select * from binary_table order by ' - 'binary_table.primary_id', - typemap=dict(data=mssql.MSVarBinary(8000), - data_image=mssql.MSImage, - data_slice=types.BINARY(100), pickled=PickleType, - mypickle=MyPickleType), bind=testing.db): + text( + 'select * from binary_table order by ' + 'binary_table.primary_id', + typemap=dict( + data=mssql.MSVarBinary(8000), + data_image=mssql.MSImage, + data_slice=types.BINARY(100), pickled=PickleType, + mypickle=MyPickleType), + bind=testing.db): l = stmt.execute().fetchall() eq_(list(stream1), list(l[0]['data'])) paddedstream = list(stream1[0:100]) @@ -721,7 +808,8 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults): eq_(l[0]['mypickle'].stuff, 'this is the right stuff') def load_stream(self, name, len=3000): - fp = open(os.path.join(os.path.dirname(__file__), "..", "..", name), 'rb') + fp = open( + os.path.join(os.path.dirname(__file__), "..", "..", name), 'rb') stream = fp.read(len) fp.close() return stream diff --git a/test/dialect/mysql/test_query.py b/test/dialect/mysql/test_query.py index e085d86c1..ccb501651 100644 --- a/test/dialect/mysql/test_query.py +++ b/test/dialect/mysql/test_query.py @@ -55,7 +55,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): ]) matchtable.insert().execute([ {'id': 1, - 'title': 'Agile Web Development with Rails', + 'title': 'Agile Web Development with Ruby On Rails', 'category_id': 2}, {'id': 2, 'title': 'Dive Into Python', @@ -76,7 +76,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): metadata.drop_all() @testing.fails_on('mysql+mysqlconnector', 'uses pyformat') - def test_expression(self): + def test_expression_format(self): format = testing.db.dialect.paramstyle == 'format' and '%s' or '?' self.assert_compile( matchtable.c.title.match('somstr'), @@ -88,7 +88,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): @testing.fails_on('mysql+oursql', 'uses format') @testing.fails_on('mysql+pyodbc', 'uses format') @testing.fails_on('mysql+zxjdbc', 'uses format') - def test_expression(self): + def test_expression_pyformat(self): format = '%(title_1)s' self.assert_compile( matchtable.c.title.match('somstr'), @@ -102,6 +102,14 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): fetchall()) eq_([2, 5], [r.id for r in results]) + def test_not_match(self): + results = (matchtable.select(). + where(~matchtable.c.title.match('python')). + order_by(matchtable.c.id). + execute(). + fetchall()) + eq_([1, 3, 4], [r.id for r in results]) + def test_simple_match_with_apostrophe(self): results = (matchtable.select(). where(matchtable.c.title.match("Matz's")). @@ -109,6 +117,26 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): fetchall()) eq_([3], [r.id for r in results]) + def test_return_value(self): + # test [ticket:3263] + result = testing.db.execute( + select([ + matchtable.c.title.match('Agile Ruby Programming').label('ruby'), + matchtable.c.title.match('Dive Python').label('python'), + matchtable.c.title + ]).order_by(matchtable.c.id) + ).fetchall() + eq_( + result, + [ + (2.0, 0.0, 'Agile Web Development with Ruby On Rails'), + (0.0, 2.0, 'Dive Into Python'), + (2.0, 0.0, "Programming Matz's Ruby"), + (0.0, 0.0, 'The Definitive Guide to Django'), + (0.0, 1.0, 'Python in a Nutshell') + ] + ) + def test_or_match(self): results1 = (matchtable.select(). where(or_(matchtable.c.title.match('nutshell'), @@ -116,14 +144,13 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): order_by(matchtable.c.id). execute(). fetchall()) - eq_([3, 5], [r.id for r in results1]) + eq_([1, 3, 5], [r.id for r in results1]) results2 = (matchtable.select(). where(matchtable.c.title.match('nutshell ruby')). order_by(matchtable.c.id). execute(). fetchall()) - eq_([3, 5], [r.id for r in results2]) - + eq_([1, 3, 5], [r.id for r in results2]) def test_and_match(self): results1 = (matchtable.select(). diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py index e65acc6db..13425dc10 100644 --- a/test/dialect/mysql/test_types.py +++ b/test/dialect/mysql/test_types.py @@ -1,6 +1,6 @@ # coding: utf-8 -from sqlalchemy.testing import eq_, assert_raises +from sqlalchemy.testing import eq_, assert_raises, assert_raises_message from sqlalchemy import * from sqlalchemy import sql, exc, schema from sqlalchemy.util import u @@ -295,9 +295,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): self.assert_compile(type_, expected) @testing.exclude('mysql', '<', (5, 0, 5), 'a 5.0+ feature') - @testing.fails_if( - lambda: testing.against("mysql+oursql") and util.py3k, - 'some round trips fail, oursql bug ?') @testing.provide_metadata def test_bit_50_roundtrip(self): bit_table = Table('mysql_bits', self.metadata, @@ -550,13 +547,13 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): eq_(colspec(table.c.y5), 'y5 YEAR(4)') -class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): +class EnumSetTest( + fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): __only_on__ = 'mysql' __dialect__ = mysql.dialect() __backend__ = True - @testing.provide_metadata def test_enum(self): """Exercise the ENUM type.""" @@ -566,7 +563,8 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL e3 = mysql.ENUM("'a'", "'b'", strict=True) e4 = mysql.ENUM("'a'", "'b'", strict=True) - enum_table = Table('mysql_enum', self.metadata, + enum_table = Table( + 'mysql_enum', self.metadata, Column('e1', e1), Column('e2', e2, nullable=False), Column('e2generic', Enum("a", "b"), nullable=False), @@ -576,32 +574,43 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL Column('e5', mysql.ENUM("a", "b")), Column('e5generic', Enum("a", "b")), Column('e6', mysql.ENUM("'a'", "b")), - ) + ) - eq_(colspec(enum_table.c.e1), - "e1 ENUM('a','b')") - eq_(colspec(enum_table.c.e2), - "e2 ENUM('a','b') NOT NULL") - eq_(colspec(enum_table.c.e2generic), - "e2generic ENUM('a','b') NOT NULL") - eq_(colspec(enum_table.c.e3), - "e3 ENUM('a','b')") - eq_(colspec(enum_table.c.e4), - "e4 ENUM('a','b') NOT NULL") - eq_(colspec(enum_table.c.e5), - "e5 ENUM('a','b')") - eq_(colspec(enum_table.c.e5generic), - "e5generic ENUM('a','b')") - eq_(colspec(enum_table.c.e6), - "e6 ENUM('''a''','b')") + eq_( + colspec(enum_table.c.e1), + "e1 ENUM('a','b')") + eq_( + colspec(enum_table.c.e2), + "e2 ENUM('a','b') NOT NULL") + eq_( + colspec(enum_table.c.e2generic), + "e2generic ENUM('a','b') NOT NULL") + eq_( + colspec(enum_table.c.e3), + "e3 ENUM('a','b')") + eq_( + colspec(enum_table.c.e4), + "e4 ENUM('a','b') NOT NULL") + eq_( + colspec(enum_table.c.e5), + "e5 ENUM('a','b')") + eq_( + colspec(enum_table.c.e5generic), + "e5generic ENUM('a','b')") + eq_( + colspec(enum_table.c.e6), + "e6 ENUM('''a''','b')") enum_table.create() - assert_raises(exc.DBAPIError, enum_table.insert().execute, - e1=None, e2=None, e3=None, e4=None) + assert_raises( + exc.DBAPIError, enum_table.insert().execute, + e1=None, e2=None, e3=None, e4=None) - assert_raises(exc.StatementError, enum_table.insert().execute, - e1='c', e2='c', e2generic='c', e3='c', - e4='c', e5='c', e5generic='c', e6='c') + assert_raises( + exc.StatementError, + enum_table.insert().execute, + e1='c', e2='c', e2generic='c', e3='c', + e4='c', e5='c', e5generic='c', e6='c') enum_table.insert().execute() enum_table.insert().execute(e1='a', e2='a', e2generic='a', e3='a', @@ -617,67 +626,191 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL eq_(res, expected) - @testing.provide_metadata - def test_set(self): - + def _set_fixture_one(self): with testing.expect_deprecated('Manually quoting SET value literals'): e1, e2 = mysql.SET("'a'", "'b'"), mysql.SET("'a'", "'b'") e4 = mysql.SET("'a'", "b") e5 = mysql.SET("'a'", "'b'", quoting="quoted") - set_table = Table('mysql_set', self.metadata, + + set_table = Table( + 'mysql_set', self.metadata, Column('e1', e1), Column('e2', e2, nullable=False), Column('e3', mysql.SET("a", "b")), Column('e4', e4), Column('e5', e5) - ) + ) + return set_table + + def test_set_colspec(self): + self.metadata = MetaData() + set_table = self._set_fixture_one() + eq_( + colspec(set_table.c.e1), + "e1 SET('a','b')") + eq_(colspec( + set_table.c.e2), + "e2 SET('a','b') NOT NULL") + eq_( + colspec(set_table.c.e3), + "e3 SET('a','b')") + eq_( + colspec(set_table.c.e4), + "e4 SET('''a''','b')") + eq_( + colspec(set_table.c.e5), + "e5 SET('a','b')") - eq_(colspec(set_table.c.e1), - "e1 SET('a','b')") - eq_(colspec(set_table.c.e2), - "e2 SET('a','b') NOT NULL") - eq_(colspec(set_table.c.e3), - "e3 SET('a','b')") - eq_(colspec(set_table.c.e4), - "e4 SET('''a''','b')") - eq_(colspec(set_table.c.e5), - "e5 SET('a','b')") + @testing.provide_metadata + def test_no_null(self): + set_table = self._set_fixture_one() set_table.create() + assert_raises( + exc.DBAPIError, set_table.insert().execute, + e1=None, e2=None, e3=None, e4=None) - assert_raises(exc.DBAPIError, set_table.insert().execute, - e1=None, e2=None, e3=None, e4=None) + @testing.only_on('+oursql') + @testing.provide_metadata + def test_oursql_error_one(self): + set_table = self._set_fixture_one() + set_table.create() + assert_raises( + exc.StatementError, set_table.insert().execute, + e1='c', e2='c', e3='c', e4='c') + + @testing.fails_on("+oursql", "oursql raises on the truncate warning") + @testing.provide_metadata + def test_empty_set_no_empty_string(self): + t = Table( + 't', self.metadata, + Column('id', Integer), + Column('data', mysql.SET("a", "b")) + ) + t.create() + with testing.db.begin() as conn: + conn.execute( + t.insert(), + {'id': 1, 'data': set()}, + {'id': 2, 'data': set([''])}, + {'id': 3, 'data': set(['a', ''])}, + {'id': 4, 'data': set(['b'])}, + ) + eq_( + conn.execute(t.select().order_by(t.c.id)).fetchall(), + [ + (1, set()), + (2, set()), + (3, set(['a'])), + (4, set(['b'])), + ] + ) - if testing.against("+oursql"): - assert_raises(exc.StatementError, set_table.insert().execute, - e1='c', e2='c', e3='c', e4='c') + def test_bitwise_required_for_empty(self): + assert_raises_message( + exc.ArgumentError, + "Can't use the blank value '' in a SET without setting " + "retrieve_as_bitwise=True", + mysql.SET, "a", "b", '' + ) - set_table.insert().execute(e1='a', e2='a', e3='a', e4="'a'", e5="a,b") - set_table.insert().execute(e1='b', e2='b', e3='b', e4='b', e5="a,b") + @testing.provide_metadata + def test_empty_set_empty_string(self): + t = Table( + 't', self.metadata, + Column('id', Integer), + Column('data', mysql.SET("a", "b", '', retrieve_as_bitwise=True)) + ) + t.create() + with testing.db.begin() as conn: + conn.execute( + t.insert(), + {'id': 1, 'data': set()}, + {'id': 2, 'data': set([''])}, + {'id': 3, 'data': set(['a', ''])}, + {'id': 4, 'data': set(['b'])}, + ) + eq_( + conn.execute(t.select().order_by(t.c.id)).fetchall(), + [ + (1, set()), + (2, set([''])), + (3, set(['a', ''])), + (4, set(['b'])), + ] + ) - res = set_table.select().execute().fetchall() + @testing.provide_metadata + def test_string_roundtrip(self): + set_table = self._set_fixture_one() + set_table.create() + with testing.db.begin() as conn: + conn.execute( + set_table.insert(), + dict(e1='a', e2='a', e3='a', e4="'a'", e5="a,b")) + conn.execute( + set_table.insert(), + dict(e1='b', e2='b', e3='b', e4='b', e5="a,b")) + + expected = [ + (set(['a']), set(['a']), set(['a']), + set(["'a'"]), set(['a', 'b'])), + (set(['b']), set(['b']), set(['b']), + set(['b']), set(['a', 'b'])) + ] + res = conn.execute( + set_table.select() + ).fetchall() - if not testing.against("+oursql"): - # oursql receives this for first row: - # (set(['']), set(['']), set(['']), set(['']), None), - # but based on ...OS? MySQL version? not clear. - # not worth testing. + eq_(res, expected) - expected = [] + @testing.provide_metadata + def test_unicode_roundtrip(self): + set_table = Table( + 't', self.metadata, + Column('id', Integer, primary_key=True), + Column('data', mysql.SET( + u('réveillé'), u('drôle'), u('S’il'), convert_unicode=True)), + ) - expected.extend([ - (set(['a']), set(['a']), set(['a']), set(["'a'"]), set(['a', 'b'])), - (set(['b']), set(['b']), set(['b']), set(['b']), set(['a', 'b'])) - ]) + set_table.create() + with testing.db.begin() as conn: + conn.execute( + set_table.insert(), + {"data": set([u('réveillé'), u('drôle')])}) + + row = conn.execute( + set_table.select() + ).first() + + eq_( + row, + (1, set([u('réveillé'), u('drôle')])) + ) - eq_(res, expected) + @testing.provide_metadata + def test_int_roundtrip(self): + set_table = self._set_fixture_one() + set_table.create() + with testing.db.begin() as conn: + conn.execute( + set_table.insert(), + dict(e1=1, e2=2, e3=3, e4=3, e5=0) + ) + res = conn.execute(set_table.select()).first() + eq_( + res, + ( + set(['a']), set(['b']), set(['a', 'b']), + set(["'a'", 'b']), set([])) + ) @testing.provide_metadata def test_set_roundtrip_plus_reflection(self): - set_table = Table('mysql_set', self.metadata, - Column('s1', - mysql.SET("dq", "sq")), - Column('s2', mysql.SET("a")), - Column('s3', mysql.SET("5", "7", "9"))) + set_table = Table( + 'mysql_set', self.metadata, + Column('s1', mysql.SET("dq", "sq")), + Column('s2', mysql.SET("a")), + Column('s3', mysql.SET("5", "7", "9"))) eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')") eq_(colspec(set_table.c.s2), "s2 SET('a')") @@ -691,37 +824,34 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL expected = expected or store table.insert(store).execute() row = table.select().execute().first() - self.assert_(list(row) == expected) + eq_(row, tuple(expected)) table.delete().execute() roundtrip([None, None, None], [None] * 3) - roundtrip(['', '', ''], [set([''])] * 3) + roundtrip(['', '', ''], [set([])] * 3) roundtrip([set(['dq']), set(['a']), set(['5'])]) roundtrip(['dq', 'a', '5'], [set(['dq']), set(['a']), set(['5'])]) - roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5' - ])]) - roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7' - ])]) - set_table.insert().execute({'s3': set(['5'])}, - {'s3': set(['5', '7'])}, {'s3': set(['5', '7', '9'])}, - {'s3': set(['7', '9'])}) - - # NOTE: the string sent to MySQL here is sensitive to ordering. - # for some reason the set ordering is always "5, 7" when we test on - # MySQLdb but in Py3K this is not guaranteed. So basically our - # SET type doesn't do ordering correctly (not sure how it can, - # as we don't know how the SET was configured in the first place.) - rows = select([set_table.c.s3], - set_table.c.s3.in_([set(['5']), ['5', '7']]) - ).execute().fetchall() + roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5'])]) + roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7'])]) + set_table.insert().execute( + {'s3': set(['5'])}, + {'s3': set(['5', '7'])}, + {'s3': set(['5', '7', '9'])}, + {'s3': set(['7', '9'])}) + + rows = select( + [set_table.c.s3], + set_table.c.s3.in_([set(['5']), ['5', '7']]) + ).execute().fetchall() found = set([frozenset(row[0]) for row in rows]) eq_(found, set([frozenset(['5']), frozenset(['5', '7'])])) @testing.provide_metadata def test_unicode_enum(self): metadata = self.metadata - t1 = Table('table', metadata, + t1 = Table( + 'table', metadata, Column('id', Integer, primary_key=True), Column('value', Enum(u('réveillé'), u('drôle'), u('S’il'))), Column('value2', mysql.ENUM(u('réveillé'), u('drôle'), u('S’il'))) @@ -731,9 +861,11 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL t1.insert().execute(value=u('réveillé'), value2=u('réveillé')) t1.insert().execute(value=u('S’il'), value2=u('S’il')) eq_(t1.select().order_by(t1.c.id).execute().fetchall(), - [(1, u('drôle'), u('drôle')), (2, u('réveillé'), u('réveillé')), - (3, u('S’il'), u('S’il'))] - ) + [ + (1, u('drôle'), u('drôle')), + (2, u('réveillé'), u('réveillé')), + (3, u('S’il'), u('S’il')) + ]) # test reflection of the enum labels @@ -743,11 +875,15 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL # TODO: what's wrong with the last element ? is there # latin-1 stuff forcing its way in ? - assert t2.c.value.type.enums[0:2] == \ - (u('réveillé'), u('drôle')) # u'S’il') # eh ? + eq_( + t2.c.value.type.enums[0:2], + (u('réveillé'), u('drôle')) # u'S’il') # eh ? + ) - assert t2.c.value2.type.enums[0:2] == \ - (u('réveillé'), u('drôle')) # u'S’il') # eh ? + eq_( + t2.c.value2.type.enums[0:2], + (u('réveillé'), u('drôle')) # u'S’il') # eh ? + ) def test_enum_compile(self): e1 = Enum('x', 'y', 'z', name='somename') @@ -767,7 +903,8 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL def test_enum_parse(self): with testing.expect_deprecated('Manually quoting ENUM value literals'): - enum_table = Table('mysql_enum', self.metadata, + enum_table = Table( + 'mysql_enum', self.metadata, Column('e1', mysql.ENUM("'a'")), Column('e2', mysql.ENUM("''")), Column('e3', mysql.ENUM('a')), @@ -795,14 +932,17 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL @testing.exclude('mysql', '<', (5,)) def test_set_parse(self): with testing.expect_deprecated('Manually quoting SET value literals'): - set_table = Table('mysql_set', self.metadata, + set_table = Table( + 'mysql_set', self.metadata, Column('e1', mysql.SET("'a'")), - Column('e2', mysql.SET("''")), + Column('e2', mysql.SET("''", retrieve_as_bitwise=True)), Column('e3', mysql.SET('a')), - Column('e4', mysql.SET('')), - Column('e5', mysql.SET("'a'", "''")), - Column('e6', mysql.SET("''", "'a'")), - Column('e7', mysql.SET("''", "'''a'''", "'b''b'", "''''"))) + Column('e4', mysql.SET('', retrieve_as_bitwise=True)), + Column('e5', mysql.SET("'a'", "''", retrieve_as_bitwise=True)), + Column('e6', mysql.SET("''", "'a'", retrieve_as_bitwise=True)), + Column('e7', mysql.SET( + "''", "'''a'''", "'b''b'", "''''", + retrieve_as_bitwise=True))) for col in set_table.c: self.assert_(repr(col)) @@ -821,7 +961,8 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL eq_(t.c.e6.type.values, ("", "a")) eq_(t.c.e7.type.values, ("", "'a'", "b'b", "'")) + def colspec(c): return testing.db.dialect.ddl_compiler( - testing.db.dialect, None).get_column_specification(c) + testing.db.dialect, None).get_column_specification(c) diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py index 6c4f3c8cc..5717df9f7 100644 --- a/test/dialect/postgresql/test_compiler.py +++ b/test/dialect/postgresql/test_compiler.py @@ -5,7 +5,7 @@ from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, \ from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing from sqlalchemy import Sequence, Table, Column, Integer, update, String,\ - insert, func, MetaData, Enum, Index, and_, delete, select, cast + insert, func, MetaData, Enum, Index, and_, delete, select, cast, text from sqlalchemy.dialects.postgresql import ExcludeConstraint, array from sqlalchemy import exc, schema from sqlalchemy.dialects.postgresql import base as postgresql @@ -296,6 +296,58 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): '(data text_pattern_ops, data2 int4_ops)', dialect=postgresql.dialect()) + def test_create_index_with_text_or_composite(self): + m = MetaData() + tbl = Table('testtbl', m, + Column('d1', String), + Column('d2', Integer)) + + idx = Index('test_idx1', text('x')) + tbl.append_constraint(idx) + + idx2 = Index('test_idx2', text('y'), tbl.c.d2) + + idx3 = Index( + 'test_idx2', tbl.c.d1, text('y'), tbl.c.d2, + postgresql_ops={'d1': 'x1', 'd2': 'x2'} + ) + + idx4 = Index( + 'test_idx2', tbl.c.d1, tbl.c.d2 > 5, text('q'), + postgresql_ops={'d1': 'x1', 'd2': 'x2'} + ) + + idx5 = Index( + 'test_idx2', tbl.c.d1, (tbl.c.d2 > 5).label('g'), text('q'), + postgresql_ops={'d1': 'x1', 'g': 'x2'} + ) + + self.assert_compile( + schema.CreateIndex(idx), + "CREATE INDEX test_idx1 ON testtbl (x)" + ) + self.assert_compile( + schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl (y, d2)" + ) + self.assert_compile( + schema.CreateIndex(idx3), + "CREATE INDEX test_idx2 ON testtbl (d1 x1, y, d2 x2)" + ) + + # note that at the moment we do not expect the 'd2' op to + # pick up on the "d2 > 5" expression + self.assert_compile( + schema.CreateIndex(idx4), + "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5), q)" + ) + + # however it does work if we label! + self.assert_compile( + schema.CreateIndex(idx5), + "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5) x2, q)" + ) + def test_create_index_with_using(self): m = MetaData() tbl = Table('testtbl', m, Column('data', String)) diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index b751bbcdd..9f86aaa7a 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -118,7 +118,8 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): eq_(c.connection.connection.encoding, test_encoding) @testing.only_on( - ['postgresql+psycopg2', 'postgresql+pg8000'], + ['postgresql+psycopg2', 'postgresql+pg8000', + 'postgresql+psycopg2cffi'], 'psycopg2 / pg8000 - specific feature') @engines.close_open_connections def test_autocommit_isolation_level(self): diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py index a512b56fa..27cb958fd 100644 --- a/test/dialect/postgresql/test_query.py +++ b/test/dialect/postgresql/test_query.py @@ -6,6 +6,7 @@ from sqlalchemy import Table, Column, MetaData, Integer, String, bindparam, \ Sequence, ForeignKey, text, select, func, extract, literal_column, \ tuple_, DateTime, Time, literal, and_, Date, or_ from sqlalchemy.testing import engines, fixtures +from sqlalchemy.testing.assertsql import DialectSQL, CursorSQL from sqlalchemy import testing from sqlalchemy import exc from sqlalchemy.dialects import postgresql @@ -170,7 +171,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): engines.testing_engine(options={'implicit_returning': False}) metadata.bind = self.engine - def go(): + with self.sql_execution_asserter(self.engine) as asserter: # execute with explicit id @@ -199,32 +200,41 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert(inline=True).execute({'data': 'd8'}) - # note that the test framework doesn't capture the "preexecute" - # of a seqeuence or default. we just see it in the bind params. + asserter.assert_( + DialectSQL( + 'INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 30, 'data': 'd1'}), + DialectSQL( + 'INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 1, 'data': 'd2'}), + DialectSQL( + 'INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), + DialectSQL( + 'INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd5'}, {'data': 'd6'}]), + DialectSQL( + 'INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 33, 'data': 'd7'}]), + DialectSQL( + 'INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd8'}]), + ) + + eq_( + table.select().execute().fetchall(), + [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + ) - self.assert_sql(self.engine, go, [], with_sequences=[ - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', - {'id': 30, 'data': 'd1'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', - {'id': 1, 'data': 'd2'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', - [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ('INSERT INTO testtable (data) VALUES (:data)', - [{'data': 'd5'}, {'data': 'd6'}]), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', - [{'id': 33, 'data': 'd7'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), - ]) - assert table.select().execute().fetchall() == [ - (30, 'd1'), - (1, 'd2'), - (31, 'd3'), - (32, 'd4'), - (2, 'd5'), - (3, 'd6'), - (33, 'd7'), - (4, 'd8'), - ] table.delete().execute() # test the same series of events using a reflected version of @@ -233,7 +243,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): m2 = MetaData(self.engine) table = Table(table.name, m2, autoload=True) - def go(): + with self.sql_execution_asserter(self.engine) as asserter: table.insert().execute({'id': 30, 'data': 'd1'}) r = table.insert().execute({'data': 'd2'}) assert r.inserted_primary_key == [5] @@ -243,29 +253,39 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) - self.assert_sql(self.engine, go, [], with_sequences=[ - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', - {'id': 30, 'data': 'd1'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', - {'id': 5, 'data': 'd2'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', - [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ('INSERT INTO testtable (data) VALUES (:data)', - [{'data': 'd5'}, {'data': 'd6'}]), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', - [{'id': 33, 'data': 'd7'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), - ]) - assert table.select().execute().fetchall() == [ - (30, 'd1'), - (5, 'd2'), - (31, 'd3'), - (32, 'd4'), - (6, 'd5'), - (7, 'd6'), - (33, 'd7'), - (8, 'd8'), - ] + asserter.assert_( + DialectSQL( + 'INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 30, 'data': 'd1'}), + DialectSQL( + 'INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 5, 'data': 'd2'}), + DialectSQL( + 'INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), + DialectSQL( + 'INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd5'}, {'data': 'd6'}]), + DialectSQL( + 'INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 33, 'data': 'd7'}]), + DialectSQL( + 'INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd8'}]), + ) + eq_( + table.select().execute().fetchall(), + [ + (30, 'd1'), + (5, 'd2'), + (31, 'd3'), + (32, 'd4'), + (6, 'd5'), + (7, 'd6'), + (33, 'd7'), + (8, 'd8'), + ] + ) table.delete().execute() def _assert_data_autoincrement_returning(self, table): @@ -273,7 +293,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): engines.testing_engine(options={'implicit_returning': True}) metadata.bind = self.engine - def go(): + with self.sql_execution_asserter(self.engine) as asserter: # execute with explicit id @@ -302,29 +322,34 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert(inline=True).execute({'data': 'd8'}) - self.assert_sql(self.engine, go, [], with_sequences=[ - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + asserter.assert_( + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), - ('INSERT INTO testtable (data) VALUES (:data) RETURNING ' + DialectSQL('INSERT INTO testtable (data) VALUES (:data) RETURNING ' 'testtable.id', {'data': 'd2'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ('INSERT INTO testtable (data) VALUES (:data)', + DialectSQL('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd5'}, {'data': 'd6'}]), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), - ]) - assert table.select().execute().fetchall() == [ - (30, 'd1'), - (1, 'd2'), - (31, 'd3'), - (32, 'd4'), - (2, 'd5'), - (3, 'd6'), - (33, 'd7'), - (4, 'd8'), - ] + DialectSQL('INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd8'}]), + ) + + eq_( + table.select().execute().fetchall(), + [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + ) table.delete().execute() # test the same series of events using a reflected version of @@ -333,7 +358,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): m2 = MetaData(self.engine) table = Table(table.name, m2, autoload=True) - def go(): + with self.sql_execution_asserter(self.engine) as asserter: table.insert().execute({'id': 30, 'data': 'd1'}) r = table.insert().execute({'data': 'd2'}) assert r.inserted_primary_key == [5] @@ -343,29 +368,32 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) - self.assert_sql(self.engine, go, [], with_sequences=[ - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + asserter.assert_( + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), - ('INSERT INTO testtable (data) VALUES (:data) RETURNING ' + DialectSQL('INSERT INTO testtable (data) VALUES (:data) RETURNING ' 'testtable.id', {'data': 'd2'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ('INSERT INTO testtable (data) VALUES (:data)', + DialectSQL('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd5'}, {'data': 'd6'}]), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), - ]) - assert table.select().execute().fetchall() == [ - (30, 'd1'), - (5, 'd2'), - (31, 'd3'), - (32, 'd4'), - (6, 'd5'), - (7, 'd6'), - (33, 'd7'), - (8, 'd8'), - ] + DialectSQL('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), + ) + eq_( + table.select().execute().fetchall(), + [ + (30, 'd1'), + (5, 'd2'), + (31, 'd3'), + (32, 'd4'), + (6, 'd5'), + (7, 'd6'), + (33, 'd7'), + (8, 'd8'), + ] + ) table.delete().execute() def _assert_data_with_sequence(self, table, seqname): @@ -373,7 +401,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): engines.testing_engine(options={'implicit_returning': False}) metadata.bind = self.engine - def go(): + with self.sql_execution_asserter(self.engine) as asserter: table.insert().execute({'id': 30, 'data': 'd1'}) table.insert().execute({'data': 'd2'}) table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, @@ -382,30 +410,34 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) - self.assert_sql(self.engine, go, [], with_sequences=[ - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + asserter.assert_( + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + CursorSQL("select nextval('my_seq')"), + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 1, 'data': 'd2'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " + DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), - ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " + DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd8'}]), - ]) - assert table.select().execute().fetchall() == [ - (30, 'd1'), - (1, 'd2'), - (31, 'd3'), - (32, 'd4'), - (2, 'd5'), - (3, 'd6'), - (33, 'd7'), - (4, 'd8'), - ] + ) + eq_( + table.select().execute().fetchall(), + [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + ) # cant test reflection here since the Sequence must be # explicitly specified @@ -415,7 +447,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): engines.testing_engine(options={'implicit_returning': True}) metadata.bind = self.engine - def go(): + with self.sql_execution_asserter(self.engine) as asserter: table.insert().execute({'id': 30, 'data': 'd1'}) table.insert().execute({'data': 'd2'}) table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, @@ -424,31 +456,35 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) - self.assert_sql(self.engine, go, [], with_sequences=[ - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + asserter.assert_( + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), - ("INSERT INTO testtable (id, data) VALUES " + DialectSQL("INSERT INTO testtable (id, data) VALUES " "(nextval('my_seq'), :data) RETURNING testtable.id", {'data': 'd2'}), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " + DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]), - ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), - ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " + DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd8'}]), - ]) - assert table.select().execute().fetchall() == [ - (30, 'd1'), - (1, 'd2'), - (31, 'd3'), - (32, 'd4'), - (2, 'd5'), - (3, 'd6'), - (33, 'd7'), - (4, 'd8'), - ] + ) + + eq_( + table.select().execute().fetchall(), + [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + ) # cant test reflection here since the Sequence must be # explicitly specified @@ -693,6 +729,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): @testing.fails_on('postgresql+psycopg2', 'uses pyformat') @testing.fails_on('postgresql+pypostgresql', 'uses pyformat') @testing.fails_on('postgresql+zxjdbc', 'uses qmark') + @testing.fails_on('postgresql+psycopg2cffi', 'uses pyformat') def test_expression_positional(self): self.assert_compile(matchtable.c.title.match('somstr'), 'matchtable.title @@ to_tsquery(%s)') @@ -703,6 +740,12 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): matchtable.c.id).execute().fetchall() eq_([2, 5], [r.id for r in results]) + def test_not_match(self): + results = matchtable.select().where( + ~matchtable.c.title.match('python')).order_by( + matchtable.c.id).execute().fetchall() + eq_([1, 3, 4], [r.id for r in results]) + def test_simple_match_with_apostrophe(self): results = matchtable.select().where( matchtable.c.title.match("Matz's")).execute().fetchall() @@ -813,21 +856,23 @@ class ExtractTest(fixtures.TablesTest): def utcoffset(self, dt): return datetime.timedelta(hours=4) - conn = testing.db.connect() - - # we aren't resetting this at the moment but we don't have - # any other tests that are TZ specific - conn.execute("SET SESSION TIME ZONE 0") - conn.execute( - cls.tables.t.insert(), - { - 'dtme': datetime.datetime(2012, 5, 10, 12, 15, 25), - 'dt': datetime.date(2012, 5, 10), - 'tm': datetime.time(12, 15, 25), - 'intv': datetime.timedelta(seconds=570), - 'dttz': datetime.datetime(2012, 5, 10, 12, 15, 25, tzinfo=TZ()) - }, - ) + with testing.db.connect() as conn: + + # we aren't resetting this at the moment but we don't have + # any other tests that are TZ specific + conn.execute("SET SESSION TIME ZONE 0") + conn.execute( + cls.tables.t.insert(), + { + 'dtme': datetime.datetime(2012, 5, 10, 12, 15, 25), + 'dt': datetime.date(2012, 5, 10), + 'tm': datetime.time(12, 15, 25), + 'intv': datetime.timedelta(seconds=570), + 'dttz': + datetime.datetime(2012, 5, 10, 12, 15, 25, + tzinfo=TZ()) + }, + ) def _test(self, expr, field="all", overrides=None): t = self.tables.t diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index 8de71216e..0dda1fa45 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -323,6 +323,18 @@ class ReflectionTest(fixtures.TestBase): eq_([c.name for c in t2.primary_key], ['t_id']) @testing.provide_metadata + def test_has_temporary_table(self): + assert not testing.db.has_table("some_temp_table") + user_tmp = Table( + "some_temp_table", self.metadata, + Column("id", Integer, primary_key=True), + Column('name', String(50)), + prefixes=['TEMPORARY'] + ) + user_tmp.create(testing.db) + assert testing.db.has_table("some_temp_table") + + @testing.provide_metadata def test_cross_schema_reflection_one(self): meta1 = self.metadata diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py index 5c5da59b1..1f572c9a1 100644 --- a/test/dialect/postgresql/test_types.py +++ b/test/dialect/postgresql/test_types.py @@ -189,7 +189,7 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): try: self.assert_sql( - testing.db, go, [], with_sequences=[ + testing.db, go, [ ("CREATE TABLE foo (\tbar " "VARCHAR(5), \tCONSTRAINT myenum CHECK " "(bar IN ('one', 'two', 'three')))", {})]) @@ -259,9 +259,9 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): try: self.assert_sql( - engine, go, [], with_sequences=[ - ("CREATE TABLE foo (\tbar " - "VARCHAR(5), \tCONSTRAINT myenum CHECK " + engine, go, [ + ("CREATE TABLE foo (bar " + "VARCHAR(5), CONSTRAINT myenum CHECK " "(bar IN ('one', 'two', 'three')))", {})]) finally: metadata.drop_all(engine) @@ -379,10 +379,12 @@ class NumericInterpretationTest(fixtures.TestBase): __backend__ = True def test_numeric_codes(self): - from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base - - for dialect in (pg8000.dialect(), psycopg2.dialect()): + from sqlalchemy.dialects.postgresql import psycopg2cffi, pg8000, \ + psycopg2, base + dialects = (pg8000.dialect(), psycopg2.dialect(), + psycopg2cffi.dialect()) + for dialect in dialects: typ = Numeric().dialect_impl(dialect) for code in base._INT_TYPES + base._FLOAT_TYPES + \ base._DECIMAL_TYPES: @@ -1397,7 +1399,7 @@ class HStoreRoundTripTest(fixtures.TablesTest): use_native_hstore=False)) else: engine = testing.db - engine.connect() + engine.connect().close() return engine def test_reflect(self): @@ -2029,7 +2031,7 @@ class JSONRoundTripTest(fixtures.TablesTest): engine = engines.testing_engine(options=options) else: engine = testing.db - engine.connect() + engine.connect().close() return engine def test_reflect(self): diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 72decbdf3..3c67f1590 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -180,6 +180,51 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): t.update().values(plain=5), 'UPDATE s SET "plain"=:"plain"' ) + def test_cte(self): + part = table( + 'part', + column('part'), + column('sub_part'), + column('quantity') + ) + + included_parts = select([ + part.c.sub_part, part.c.part, part.c.quantity + ]).where(part.c.part == "p1").\ + cte(name="included_parts", recursive=True).\ + suffix_with( + "search depth first by part set ord1", + "cycle part set y_cycle to 1 default 0", dialect='oracle') + + incl_alias = included_parts.alias("pr1") + parts_alias = part.alias("p") + included_parts = included_parts.union_all( + select([ + parts_alias.c.sub_part, + parts_alias.c.part, parts_alias.c.quantity + ]).where(parts_alias.c.part == incl_alias.c.sub_part) + ) + + q = select([ + included_parts.c.sub_part, + func.sum(included_parts.c.quantity).label('total_quantity')]).\ + group_by(included_parts.c.sub_part) + + self.assert_compile( + q, + "WITH included_parts(sub_part, part, quantity) AS " + "(SELECT part.sub_part AS sub_part, part.part AS part, " + "part.quantity AS quantity FROM part WHERE part.part = :part_1 " + "UNION ALL SELECT p.sub_part AS sub_part, p.part AS part, " + "p.quantity AS quantity FROM part p, included_parts pr1 " + "WHERE p.part = pr1.sub_part) " + "search depth first by part set ord1 cycle part set " + "y_cycle to 1 default 0 " + "SELECT included_parts.sub_part, sum(included_parts.quantity) " + "AS total_quantity FROM included_parts " + "GROUP BY included_parts.sub_part" + ) + def test_limit(self): t = table('sometable', column('col1'), column('col2')) s = select([t]) @@ -687,6 +732,34 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) + def test_create_table_compress(self): + m = MetaData() + tbl1 = Table('testtbl1', m, Column('data', Integer), + oracle_compress=True) + tbl2 = Table('testtbl2', m, Column('data', Integer), + oracle_compress="OLTP") + + self.assert_compile(schema.CreateTable(tbl1), + "CREATE TABLE testtbl1 (data INTEGER) COMPRESS") + self.assert_compile(schema.CreateTable(tbl2), + "CREATE TABLE testtbl2 (data INTEGER) " + "COMPRESS FOR OLTP") + + def test_create_index_bitmap_compress(self): + m = MetaData() + tbl = Table('testtbl', m, Column('data', Integer)) + idx1 = Index('idx1', tbl.c.data, oracle_compress=True) + idx2 = Index('idx2', tbl.c.data, oracle_compress=1) + idx3 = Index('idx3', tbl.c.data, oracle_bitmap=True) + + self.assert_compile(schema.CreateIndex(idx1), + "CREATE INDEX idx1 ON testtbl (data) COMPRESS") + self.assert_compile(schema.CreateIndex(idx2), + "CREATE INDEX idx2 ON testtbl (data) COMPRESS 1") + self.assert_compile(schema.CreateIndex(idx3), + "CREATE BITMAP INDEX idx3 ON testtbl (data)") + + class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): def _dialect(self, server_version, **kw): @@ -1727,6 +1800,58 @@ class UnsupportedIndexReflectTest(fixtures.TestBase): m2 = MetaData(testing.db) Table('test_index_reflect', m2, autoload=True) + +def all_tables_compression_missing(): + try: + testing.db.execute('SELECT compression FROM all_tables') + return False + except: + return True + + +def all_tables_compress_for_missing(): + try: + testing.db.execute('SELECT compress_for FROM all_tables') + return False + except: + return True + + +class TableReflectionTest(fixtures.TestBase): + __only_on__ = 'oracle' + + @testing.provide_metadata + @testing.fails_if(all_tables_compression_missing) + def test_reflect_basic_compression(self): + metadata = self.metadata + + tbl = Table('test_compress', metadata, + Column('data', Integer, primary_key=True), + oracle_compress=True) + metadata.create_all() + + m2 = MetaData(testing.db) + + tbl = Table('test_compress', m2, autoload=True) + # Don't hardcode the exact value, but it must be non-empty + assert tbl.dialect_options['oracle']['compress'] + + @testing.provide_metadata + @testing.fails_if(all_tables_compress_for_missing) + def test_reflect_oltp_compression(self): + metadata = self.metadata + + tbl = Table('test_compress', metadata, + Column('data', Integer, primary_key=True), + oracle_compress="OLTP") + metadata.create_all() + + m2 = MetaData(testing.db) + + tbl = Table('test_compress', m2, autoload=True) + assert tbl.dialect_options['oracle']['compress'] == "OLTP" + + class RoundTripIndexTest(fixtures.TestBase): __only_on__ = 'oracle' @@ -1744,6 +1869,10 @@ class RoundTripIndexTest(fixtures.TestBase): # "group" is a keyword, so lower case normalind = Index('tableind', table.c.id_b, table.c.group) + compress1 = Index('compress1', table.c.id_a, table.c.id_b, + oracle_compress=True) + compress2 = Index('compress2', table.c.id_a, table.c.id_b, table.c.col, + oracle_compress=1) metadata.create_all() mirror = MetaData(testing.db) @@ -1792,8 +1921,15 @@ class RoundTripIndexTest(fixtures.TestBase): ) assert (Index, ('id_b', ), True) in reflected assert (Index, ('col', 'group'), True) in reflected + + idx = reflected[(Index, ('id_a', 'id_b', ), False)] + assert idx.dialect_options['oracle']['compress'] == 2 + + idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)] + assert idx.dialect_options['oracle']['compress'] == 1 + eq_(len(reflectedtable.constraints), 1) - eq_(len(reflectedtable.indexes), 3) + eq_(len(reflectedtable.indexes), 5) class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index 124208dbe..44e4eda42 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -7,8 +7,8 @@ import datetime from sqlalchemy.testing import eq_, assert_raises, \ assert_raises_message, is_ from sqlalchemy import Table, select, bindparam, Column,\ - MetaData, func, extract, ForeignKey, text, DefaultClause, and_, create_engine,\ - UniqueConstraint + MetaData, func, extract, ForeignKey, text, DefaultClause, and_, \ + create_engine, UniqueConstraint from sqlalchemy.types import Integer, String, Boolean, DateTime, Date, Time from sqlalchemy import types as sqltypes from sqlalchemy import event, inspect @@ -21,6 +21,9 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \ AssertsExecutionResults, engines from sqlalchemy import testing from sqlalchemy.schema import CreateTable +from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.testing import mock + class TestTypes(fixtures.TestBase, AssertsExecutionResults): @@ -32,9 +35,10 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): """ meta = MetaData(testing.db) - t = Table('bool_table', meta, Column('id', Integer, - primary_key=True), Column('boo', - Boolean(create_constraint=False))) + t = Table( + 'bool_table', meta, + Column('id', Integer, primary_key=True), + Column('boo', Boolean(create_constraint=False))) try: meta.create_all() testing.db.execute("INSERT INTO bool_table (id, boo) " @@ -69,28 +73,31 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): ValueError, "Couldn't parse %s string." % disp, lambda: testing.db.execute( - text("select 'ASDF' as value", typemap={"value":typ}) + text("select 'ASDF' as value", typemap={"value": typ}) ).scalar() ) def test_native_datetime(self): dbapi = testing.db.dialect.dbapi - connect_args = {'detect_types': dbapi.PARSE_DECLTYPES \ - | dbapi.PARSE_COLNAMES} - engine = engines.testing_engine(options={'connect_args' - : connect_args, 'native_datetime': True}) - t = Table('datetest', MetaData(), - Column('id', Integer, primary_key=True), - Column('d1', Date), Column('d2', sqltypes.TIMESTAMP)) + connect_args = { + 'detect_types': dbapi.PARSE_DECLTYPES | dbapi.PARSE_COLNAMES} + engine = engines.testing_engine( + options={'connect_args': connect_args, 'native_datetime': True}) + t = Table( + 'datetest', MetaData(), + Column('id', Integer, primary_key=True), + Column('d1', Date), Column('d2', sqltypes.TIMESTAMP)) t.create(engine) try: - engine.execute(t.insert(), {'d1': datetime.date(2010, 5, - 10), - 'd2': datetime.datetime( 2010, 5, 10, 12, 15, 25, - )}) + engine.execute(t.insert(), { + 'd1': datetime.date(2010, 5, 10), + 'd2': datetime.datetime(2010, 5, 10, 12, 15, 25) + }) row = engine.execute(t.select()).first() - eq_(row, (1, datetime.date(2010, 5, 10), - datetime.datetime( 2010, 5, 10, 12, 15, 25, ))) + eq_( + row, + (1, datetime.date(2010, 5, 10), + datetime.datetime(2010, 5, 10, 12, 15, 25))) r = engine.execute(func.current_date()).scalar() assert isinstance(r, util.string_types) finally: @@ -100,15 +107,16 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): @testing.provide_metadata def test_custom_datetime(self): sqlite_date = sqlite.DATETIME( - # 2004-05-21T00:00:00 - storage_format="%(year)04d-%(month)02d-%(day)02d" - "T%(hour)02d:%(minute)02d:%(second)02d", - regexp=r"(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)", - ) + # 2004-05-21T00:00:00 + storage_format="%(year)04d-%(month)02d-%(day)02d" + "T%(hour)02d:%(minute)02d:%(second)02d", + regexp=r"(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)", + ) t = Table('t', self.metadata, Column('d', sqlite_date)) self.metadata.create_all(testing.db) - testing.db.execute(t.insert(). - values(d=datetime.datetime(2010, 10, 15, 12, 37, 0))) + testing.db.execute( + t.insert(). + values(d=datetime.datetime(2010, 10, 15, 12, 37, 0))) testing.db.execute("insert into t (d) values ('2004-05-21T00:00:00')") eq_( testing.db.execute("select * from t order by d").fetchall(), @@ -116,21 +124,70 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): ) eq_( testing.db.execute(select([t.c.d]).order_by(t.c.d)).fetchall(), - [(datetime.datetime(2004, 5, 21, 0, 0),), - (datetime.datetime(2010, 10, 15, 12, 37),)] + [ + (datetime.datetime(2004, 5, 21, 0, 0),), + (datetime.datetime(2010, 10, 15, 12, 37),)] + ) + + @testing.provide_metadata + def test_custom_datetime_text_affinity(self): + sqlite_date = sqlite.DATETIME( + storage_format="%(year)04d%(month)02d%(day)02d" + "%(hour)02d%(minute)02d%(second)02d", + regexp=r"(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})", + ) + t = Table('t', self.metadata, Column('d', sqlite_date)) + self.metadata.create_all(testing.db) + testing.db.execute( + t.insert(). + values(d=datetime.datetime(2010, 10, 15, 12, 37, 0))) + testing.db.execute("insert into t (d) values ('20040521000000')") + eq_( + testing.db.execute("select * from t order by d").fetchall(), + [('20040521000000',), ('20101015123700',)] + ) + eq_( + testing.db.execute(select([t.c.d]).order_by(t.c.d)).fetchall(), + [ + (datetime.datetime(2004, 5, 21, 0, 0),), + (datetime.datetime(2010, 10, 15, 12, 37),)] + ) + + @testing.provide_metadata + def test_custom_date_text_affinity(self): + sqlite_date = sqlite.DATE( + storage_format="%(year)04d%(month)02d%(day)02d", + regexp=r"(\d{4})(\d{2})(\d{2})", + ) + t = Table('t', self.metadata, Column('d', sqlite_date)) + self.metadata.create_all(testing.db) + testing.db.execute( + t.insert(). + values(d=datetime.date(2010, 10, 15))) + testing.db.execute("insert into t (d) values ('20040521')") + eq_( + testing.db.execute("select * from t order by d").fetchall(), + [('20040521',), ('20101015',)] + ) + eq_( + testing.db.execute(select([t.c.d]).order_by(t.c.d)).fetchall(), + [ + (datetime.date(2004, 5, 21),), + (datetime.date(2010, 10, 15),)] ) @testing.provide_metadata def test_custom_date(self): sqlite_date = sqlite.DATE( - # 2004-05-21T00:00:00 - storage_format="%(year)04d|%(month)02d|%(day)02d", - regexp=r"(\d+)\|(\d+)\|(\d+)", - ) + # 2004-05-21T00:00:00 + storage_format="%(year)04d|%(month)02d|%(day)02d", + regexp=r"(\d+)\|(\d+)\|(\d+)", + ) t = Table('t', self.metadata, Column('d', sqlite_date)) self.metadata.create_all(testing.db) - testing.db.execute(t.insert(). - values(d=datetime.date(2010, 10, 15))) + testing.db.execute( + t.insert(). + values(d=datetime.date(2010, 10, 15))) testing.db.execute("insert into t (d) values ('2004|05|21')") eq_( testing.db.execute("select * from t order by d").fetchall(), @@ -138,11 +195,11 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): ) eq_( testing.db.execute(select([t.c.d]).order_by(t.c.d)).fetchall(), - [(datetime.date(2004, 5, 21),), - (datetime.date(2010, 10, 15),)] + [ + (datetime.date(2004, 5, 21),), + (datetime.date(2010, 10, 15),)] ) - def test_no_convert_unicode(self): """test no utf-8 encoding occurs""" @@ -156,7 +213,7 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): sqltypes.CHAR(convert_unicode=True), sqltypes.Unicode(), sqltypes.UnicodeText(), - ): + ): bindproc = t.dialect_impl(dialect).bind_processor(dialect) assert not bindproc or \ isinstance(bindproc(util.u('some string')), util.text_type) @@ -198,6 +255,7 @@ class DateTimeTest(fixtures.TestBase, AssertsCompiledSQL): rp = sldt.result_processor(None, None) eq_(rp(bp(dt)), dt) + class DateTest(fixtures.TestBase, AssertsCompiledSQL): def test_default(self): @@ -221,6 +279,7 @@ class DateTest(fixtures.TestBase, AssertsCompiledSQL): rp = sldt.result_processor(None, None) eq_(rp(bp(dt)), dt) + class TimeTest(fixtures.TestBase, AssertsCompiledSQL): def test_default(self): @@ -333,8 +392,9 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL): @testing.provide_metadata def test_boolean_default(self): - t = Table("t", self.metadata, - Column("x", Boolean, server_default=sql.false())) + t = Table( + "t", self.metadata, + Column("x", Boolean, server_default=sql.false())) t.create(testing.db) testing.db.execute(t.insert()) testing.db.execute(t.insert().values(x=True)) @@ -351,7 +411,6 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL): eq_(info['default'], '3') - class DialectTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'sqlite' @@ -372,7 +431,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): Column('true', Integer), Column('false', Integer), Column('column', Integer), - ) + ) try: meta.create_all() t.insert().execute(safe=1) @@ -403,8 +462,8 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): table1 = Table('django_admin_log', metadata, autoload=True) table2 = Table('django_content_type', metadata, autoload=True) j = table1.join(table2) - assert j.onclause.compare(table1.c.content_type_id - == table2.c.id) + assert j.onclause.compare( + table1.c.content_type_id == table2.c.id) @testing.provide_metadata def test_quoted_identifiers_functional_two(self): @@ -426,8 +485,8 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): # unfortunately, still can't do this; sqlite quadruples # up the quotes on the table name here for pragma foreign_key_list - #testing.db.execute(r''' - #CREATE TABLE """b""" ( + # testing.db.execute(r''' + # CREATE TABLE """b""" ( # """id""" integer NOT NULL PRIMARY KEY, # """aid""" integer NULL # REFERENCES """a""" ("""id""") @@ -439,48 +498,25 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): #table2 = Table(r'"b"', metadata, autoload=True) #j = table1.join(table2) - #assert j.onclause.compare(table1.c['"id"'] + # assert j.onclause.compare(table1.c['"id"'] # == table2.c['"aid"']) - def test_legacy_quoted_identifiers_unit(self): - dialect = sqlite.dialect() - dialect._broken_fk_pragma_quotes = True - - - for row in [ - (0, 'target', 'tid', 'id'), - (0, '"target"', 'tid', 'id'), - (0, '[target]', 'tid', 'id'), - (0, "'target'", 'tid', 'id'), - (0, '`target`', 'tid', 'id'), - ]: - fks = {} - fkeys = [] - dialect._parse_fk(fks, fkeys, *row) - eq_(fkeys, [{ - 'referred_table': 'target', - 'referred_columns': ['id'], - 'referred_schema': None, - 'name': None, - 'constrained_columns': ['tid'] - }]) - @testing.provide_metadata def test_description_encoding(self): # amazingly, pysqlite seems to still deliver cursor.description # as encoded bytes in py2k - t = Table('x', self.metadata, - Column(u('méil'), Integer, primary_key=True), - Column(ue('\u6e2c\u8a66'), Integer), - ) + t = Table( + 'x', self.metadata, + Column(u('méil'), Integer, primary_key=True), + Column(ue('\u6e2c\u8a66'), Integer), + ) self.metadata.create_all(testing.db) result = testing.db.execute(t.select()) assert u('méil') in result.keys() assert ue('\u6e2c\u8a66') in result.keys() - def test_file_path_is_absolute(self): d = pysqlite_dialect.dialect() eq_( @@ -498,48 +534,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): e = create_engine('sqlite+pysqlite:///foo.db') assert e.pool.__class__ is pool.NullPool - def test_dont_reflect_autoindex(self): - meta = MetaData(testing.db) - t = Table('foo', meta, Column('bar', String, primary_key=True)) - meta.create_all() - from sqlalchemy.engine.reflection import Inspector - try: - inspector = Inspector(testing.db) - eq_(inspector.get_indexes('foo'), []) - eq_(inspector.get_indexes('foo', - include_auto_indexes=True), [{'unique': 1, 'name' - : 'sqlite_autoindex_foo_1', 'column_names': ['bar']}]) - finally: - meta.drop_all() - - def test_create_index_with_schema(self): - """Test creation of index with explicit schema""" - - meta = MetaData(testing.db) - t = Table('foo', meta, Column('bar', String, index=True), - schema='main') - try: - meta.create_all() - finally: - meta.drop_all() - - def test_get_unique_constraints(self): - meta = MetaData(testing.db) - t1 = Table('foo', meta, Column('f', Integer), - UniqueConstraint('f', name='foo_f')) - t2 = Table('bar', meta, Column('b', Integer), - UniqueConstraint('b', name='bar_b'), - prefixes=['TEMPORARY']) - meta.create_all() - from sqlalchemy.engine.reflection import Inspector - try: - inspector = Inspector(testing.db) - eq_(inspector.get_unique_constraints('foo'), - [{'column_names': [u'f'], 'name': u'foo_f'}]) - eq_(inspector.get_unique_constraints('bar'), - [{'column_names': [u'b'], 'name': u'bar_b'}]) - finally: - meta.drop_all() class AttachedMemoryDBTest(fixtures.TestBase): @@ -662,7 +656,7 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL): 'epoch': '%s', 'dow': '%w', 'week': '%W', - } + } for field, subst in mapping.items(): self.assert_compile(select([extract(field, t.c.col1)]), "SELECT CAST(STRFTIME('%s', t.col1) AS " @@ -685,53 +679,57 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL): def test_constraints_with_schemas(self): metadata = MetaData() - t1 = Table('t1', metadata, - Column('id', Integer, primary_key=True), - schema='master') - t2 = Table('t2', metadata, - Column('id', Integer, primary_key=True), - Column('t1_id', Integer, ForeignKey('master.t1.id')), - schema='master' - ) - t3 = Table('t3', metadata, - Column('id', Integer, primary_key=True), - Column('t1_id', Integer, ForeignKey('master.t1.id')), - schema='alternate' - ) - t4 = Table('t4', metadata, - Column('id', Integer, primary_key=True), - Column('t1_id', Integer, ForeignKey('master.t1.id')), - ) + Table( + 't1', metadata, + Column('id', Integer, primary_key=True), + schema='master') + t2 = Table( + 't2', metadata, + Column('id', Integer, primary_key=True), + Column('t1_id', Integer, ForeignKey('master.t1.id')), + schema='master' + ) + t3 = Table( + 't3', metadata, + Column('id', Integer, primary_key=True), + Column('t1_id', Integer, ForeignKey('master.t1.id')), + schema='alternate' + ) + t4 = Table( + 't4', metadata, + Column('id', Integer, primary_key=True), + Column('t1_id', Integer, ForeignKey('master.t1.id')), + ) # schema->schema, generate REFERENCES with no schema name self.assert_compile( schema.CreateTable(t2), - "CREATE TABLE master.t2 (" - "id INTEGER NOT NULL, " - "t1_id INTEGER, " - "PRIMARY KEY (id), " - "FOREIGN KEY(t1_id) REFERENCES t1 (id)" - ")" + "CREATE TABLE master.t2 (" + "id INTEGER NOT NULL, " + "t1_id INTEGER, " + "PRIMARY KEY (id), " + "FOREIGN KEY(t1_id) REFERENCES t1 (id)" + ")" ) # schema->different schema, don't generate REFERENCES self.assert_compile( schema.CreateTable(t3), - "CREATE TABLE alternate.t3 (" - "id INTEGER NOT NULL, " - "t1_id INTEGER, " - "PRIMARY KEY (id)" - ")" + "CREATE TABLE alternate.t3 (" + "id INTEGER NOT NULL, " + "t1_id INTEGER, " + "PRIMARY KEY (id)" + ")" ) # same for local schema self.assert_compile( schema.CreateTable(t4), - "CREATE TABLE t4 (" - "id INTEGER NOT NULL, " - "t1_id INTEGER, " - "PRIMARY KEY (id)" - ")" + "CREATE TABLE t4 (" + "id INTEGER NOT NULL, " + "t1_id INTEGER, " + "PRIMARY KEY (id)" + ")" ) @@ -756,30 +754,37 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') def test_empty_insert_pk1(self): - self._test_empty_insert(Table('a', MetaData(testing.db), - Column('id', Integer, - primary_key=True))) + self._test_empty_insert( + Table( + 'a', MetaData(testing.db), + Column('id', Integer, primary_key=True))) @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') def test_empty_insert_pk2(self): - assert_raises(exc.DBAPIError, self._test_empty_insert, Table('b' - , MetaData(testing.db), Column('x', Integer, - primary_key=True), Column('y', Integer, - primary_key=True))) + assert_raises( + exc.DBAPIError, self._test_empty_insert, + Table( + 'b', MetaData(testing.db), + Column('x', Integer, primary_key=True), + Column('y', Integer, primary_key=True))) @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') def test_empty_insert_pk3(self): - assert_raises(exc.DBAPIError, self._test_empty_insert, Table('c' - , MetaData(testing.db), Column('x', Integer, - primary_key=True), Column('y', Integer, - DefaultClause('123'), primary_key=True))) + assert_raises( + exc.DBAPIError, self._test_empty_insert, + Table( + 'c', MetaData(testing.db), + Column('x', Integer, primary_key=True), + Column('y', Integer, DefaultClause('123'), primary_key=True))) @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') def test_empty_insert_pk4(self): - self._test_empty_insert(Table('d', MetaData(testing.db), - Column('x', Integer, primary_key=True), - Column('y', Integer, DefaultClause('123' - )))) + self._test_empty_insert( + Table( + 'd', MetaData(testing.db), + Column('x', Integer, primary_key=True), + Column('y', Integer, DefaultClause('123')) + )) @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') def test_empty_insert_nopk1(self): @@ -788,9 +793,10 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support') def test_empty_insert_nopk2(self): - self._test_empty_insert(Table('f', MetaData(testing.db), - Column('x', Integer), Column('y', - Integer))) + self._test_empty_insert( + Table( + 'f', MetaData(testing.db), + Column('x', Integer), Column('y', Integer))) def test_inserts_with_spaces(self): tbl = Table('tbl', MetaData('sqlite:///'), Column('with space', @@ -800,8 +806,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): tbl.insert().execute({'without': 123}) assert list(tbl.select().execute()) == [(None, 123)] tbl.insert().execute({'with space': 456}) - assert list(tbl.select().execute()) == [(None, 123), (456, - None)] + assert list(tbl.select().execute()) == [ + (None, 123), (456, None)] finally: tbl.drop() @@ -817,6 +823,8 @@ def full_text_search_missing(): except: return True +metadata = cattable = matchtable = None + class MatchTest(fixtures.TestBase, AssertsCompiledSQL): @@ -845,19 +853,20 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): """) matchtable = Table('matchtable', metadata, autoload=True) metadata.create_all() - cattable.insert().execute([{'id': 1, 'description': 'Python'}, - {'id': 2, 'description': 'Ruby'}]) - matchtable.insert().execute([{'id': 1, 'title' - : 'Agile Web Development with Rails' - , 'category_id': 2}, {'id': 2, - 'title': 'Dive Into Python', - 'category_id': 1}, {'id': 3, 'title' - : "Programming Matz's Ruby", - 'category_id': 2}, {'id': 4, 'title' - : 'The Definitive Guide to Django', - 'category_id': 1}, {'id': 5, 'title' - : 'Python in a Nutshell', - 'category_id': 1}]) + cattable.insert().execute( + [{'id': 1, 'description': 'Python'}, + {'id': 2, 'description': 'Ruby'}]) + matchtable.insert().execute( + [ + {'id': 1, 'title': 'Agile Web Development with Rails', + 'category_id': 2}, + {'id': 2, 'title': 'Dive Into Python', 'category_id': 1}, + {'id': 3, 'title': "Programming Matz's Ruby", + 'category_id': 2}, + {'id': 4, 'title': 'The Definitive Guide to Django', + 'category_id': 1}, + {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1} + ]) @classmethod def teardown_class(cls): @@ -869,35 +878,38 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): def test_simple_match(self): results = \ - matchtable.select().where(matchtable.c.title.match('python' - )).order_by(matchtable.c.id).execute().fetchall() + matchtable.select().where( + matchtable.c.title.match('python')).\ + order_by(matchtable.c.id).execute().fetchall() eq_([2, 5], [r.id for r in results]) def test_simple_prefix_match(self): results = \ - matchtable.select().where(matchtable.c.title.match('nut*' - )).execute().fetchall() + matchtable.select().where( + matchtable.c.title.match('nut*')).execute().fetchall() eq_([5], [r.id for r in results]) def test_or_match(self): results2 = \ matchtable.select().where( - matchtable.c.title.match('nutshell OR ruby' - )).order_by(matchtable.c.id).execute().fetchall() + matchtable.c.title.match('nutshell OR ruby')).\ + order_by(matchtable.c.id).execute().fetchall() eq_([3, 5], [r.id for r in results2]) def test_and_match(self): results2 = \ matchtable.select().where( - matchtable.c.title.match('python nutshell' - )).execute().fetchall() + matchtable.c.title.match('python nutshell') + ).execute().fetchall() eq_([5], [r.id for r in results2]) def test_match_across_joins(self): - results = matchtable.select().where(and_(cattable.c.id - == matchtable.c.category_id, - cattable.c.description.match('Ruby' - ))).order_by(matchtable.c.id).execute().fetchall() + results = matchtable.select().where( + and_( + cattable.c.id == matchtable.c.category_id, + cattable.c.description.match('Ruby') + ) + ).order_by(matchtable.c.id).execute().fetchall() eq_([1, 3], [r.id for r in results]) @@ -907,10 +919,11 @@ class AutoIncrementTest(fixtures.TestBase, AssertsCompiledSQL): table = Table('autoinctable', MetaData(), Column('id', Integer, primary_key=True), Column('x', Integer, default=None), sqlite_autoincrement=True) - self.assert_compile(schema.CreateTable(table), - 'CREATE TABLE autoinctable (id INTEGER NOT ' - 'NULL PRIMARY KEY AUTOINCREMENT, x INTEGER)' - , dialect=sqlite.dialect()) + self.assert_compile( + schema.CreateTable(table), + 'CREATE TABLE autoinctable (id INTEGER NOT ' + 'NULL PRIMARY KEY AUTOINCREMENT, x INTEGER)', + dialect=sqlite.dialect()) def test_sqlite_autoincrement_constraint(self): table = Table( @@ -920,7 +933,7 @@ class AutoIncrementTest(fixtures.TestBase, AssertsCompiledSQL): Column('x', Integer, default=None), UniqueConstraint('x'), sqlite_autoincrement=True, - ) + ) self.assert_compile(schema.CreateTable(table), 'CREATE TABLE autoinctable (id INTEGER NOT ' 'NULL PRIMARY KEY AUTOINCREMENT, x ' @@ -944,7 +957,7 @@ class AutoIncrementTest(fixtures.TestBase, AssertsCompiledSQL): MetaData(), Column('id', MyInteger, primary_key=True), sqlite_autoincrement=True, - ) + ) self.assert_compile(schema.CreateTable(table), 'CREATE TABLE autoinctable (id INTEGER NOT ' 'NULL PRIMARY KEY AUTOINCREMENT)', @@ -958,7 +971,8 @@ class ReflectHeadlessFKsTest(fixtures.TestBase): testing.db.execute("CREATE TABLE a (id INTEGER PRIMARY KEY)") # this syntax actually works on other DBs perhaps we'd want to add # tests to test_reflection - testing.db.execute("CREATE TABLE b (id INTEGER PRIMARY KEY REFERENCES a)") + testing.db.execute( + "CREATE TABLE b (id INTEGER PRIMARY KEY REFERENCES a)") def teardown(self): testing.db.execute("drop table b") @@ -971,53 +985,312 @@ class ReflectHeadlessFKsTest(fixtures.TestBase): assert b.c.id.references(a.c.id) -class ReflectFKConstraintTest(fixtures.TestBase): + +class ConstraintReflectionTest(fixtures.TestBase): __only_on__ = 'sqlite' - def setup(self): - testing.db.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)") - testing.db.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)") - testing.db.execute("CREATE TABLE b (id INTEGER PRIMARY KEY, " - "FOREIGN KEY(id) REFERENCES a1(id)," - "FOREIGN KEY(id) REFERENCES a2(id)" - ")") - testing.db.execute("CREATE TABLE c (id INTEGER, " - "CONSTRAINT bar PRIMARY KEY(id)," - "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id)," - "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)" - ")") + @classmethod + def setup_class(cls): + with testing.db.begin() as conn: + + conn.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)") + conn.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)") + conn.execute( + "CREATE TABLE b (id INTEGER PRIMARY KEY, " + "FOREIGN KEY(id) REFERENCES a1(id)," + "FOREIGN KEY(id) REFERENCES a2(id)" + ")") + conn.execute( + "CREATE TABLE c (id INTEGER, " + "CONSTRAINT bar PRIMARY KEY(id)," + "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id)," + "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)" + ")") + conn.execute( + # the lower casing + inline is intentional here + "CREATE TABLE d (id INTEGER, x INTEGER unique)") + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d1 ' + '(id INTEGER, "some ( STUPID n,ame" INTEGER unique)') + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d2 ( "some STUPID n,ame" INTEGER unique)') + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d3 ( "some STUPID n,ame" INTEGER NULL unique)') + + conn.execute( + # lower casing + inline is intentional + "CREATE TABLE e (id INTEGER, x INTEGER references a2(id))") + conn.execute( + 'CREATE TABLE e1 (id INTEGER, "some ( STUPID n,ame" INTEGER ' + 'references a2 ("some ( STUPID n,ame"))') + conn.execute( + 'CREATE TABLE e2 (id INTEGER, ' + '"some ( STUPID n,ame" INTEGER NOT NULL ' + 'references a2 ("some ( STUPID n,ame"))') + + conn.execute( + "CREATE TABLE f (x INTEGER, CONSTRAINT foo_fx UNIQUE(x))" + ) + conn.execute( + "CREATE TEMPORARY TABLE g " + "(x INTEGER, CONSTRAINT foo_gx UNIQUE(x))" + ) + conn.execute( + # intentional broken casing + "CREATE TABLE h (x INTEGER, COnstraINT foo_hx unIQUE(x))" + ) + conn.execute( + "CREATE TABLE i (x INTEGER, y INTEGER, PRIMARY KEY(x, y))" + ) + conn.execute( + "CREATE TABLE j (id INTEGER, q INTEGER, p INTEGER, " + "PRIMARY KEY(id), FOreiGN KEY(q,p) REFERENCes i(x,y))" + ) + conn.execute( + "CREATE TABLE k (id INTEGER, q INTEGER, p INTEGER, " + "PRIMARY KEY(id), " + "conSTRAINT my_fk FOreiGN KEY ( q , p ) " + "REFERENCes i ( x , y ))" + ) - def teardown(self): - testing.db.execute("drop table c") - testing.db.execute("drop table b") - testing.db.execute("drop table a1") - testing.db.execute("drop table a2") + meta = MetaData() + Table( + 'l', meta, Column('bar', String, index=True), + schema='main') + + Table( + 'm', meta, + Column('id', Integer, primary_key=True), + Column('x', String(30)), + UniqueConstraint('x') + ) + + Table( + 'n', meta, + Column('id', Integer, primary_key=True), + Column('x', String(30)), + UniqueConstraint('x'), + prefixes=['TEMPORARY'] + ) - def test_name_is_none(self): + meta.create_all(conn) + + # will contain an "autoindex" + conn.execute("create table o (foo varchar(20) primary key)") + + @classmethod + def teardown_class(cls): + with testing.db.begin() as conn: + for name in [ + "m", "main.l", "k", "j", "i", "h", "g", "f", "e", "e1", + "d", "d1", "d2", "c", "b", "a1", "a2"]: + conn.execute("drop table %s" % name) + + def test_legacy_quoted_identifiers_unit(self): + dialect = sqlite.dialect() + dialect._broken_fk_pragma_quotes = True + + for row in [ + (0, None, 'target', 'tid', 'id', None), + (0, None, '"target"', 'tid', 'id', None), + (0, None, '[target]', 'tid', 'id', None), + (0, None, "'target'", 'tid', 'id', None), + (0, None, '`target`', 'tid', 'id', None), + ]: + def _get_table_pragma(*arg, **kw): + return [row] + + def _get_table_sql(*arg, **kw): + return "CREATE TABLE foo "\ + "(tid INTEGER, "\ + "FOREIGN KEY(tid) REFERENCES %s (id))" % row[2] + with mock.patch.object( + dialect, "_get_table_pragma", _get_table_pragma): + with mock.patch.object( + dialect, '_get_table_sql', _get_table_sql): + + fkeys = dialect.get_foreign_keys(None, 'foo') + eq_( + fkeys, + [{ + 'referred_table': 'target', + 'referred_columns': ['id'], + 'referred_schema': None, + 'name': None, + 'constrained_columns': ['tid'] + }]) + + def test_foreign_key_name_is_none(self): # and not "0" - meta = MetaData() - b = Table('b', meta, autoload=True, autoload_with=testing.db) + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('b') eq_( - [con.name for con in b.constraints], - [None, None, None] + fks, + [ + {'referred_table': 'a1', 'referred_columns': ['id'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['id']}, + {'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['id']}, + ] ) - def test_name_not_none(self): - # we don't have names for PK constraints, - # it appears we get back None in the pragma for - # FKs also (also it doesn't even appear to be documented on sqlite's docs - # at http://www.sqlite.org/pragma.html#pragma_foreign_key_list - # how did we ever know that's the "name" field ??) + def test_foreign_key_name_is_not_none(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('c') + eq_( + fks, + [ + { + 'referred_table': 'a1', 'referred_columns': ['id'], + 'referred_schema': None, 'name': 'foo1', + 'constrained_columns': ['id']}, + { + 'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, 'name': 'foo2', + 'constrained_columns': ['id']}, + ] + ) - meta = MetaData() - c = Table('c', meta, autoload=True, autoload_with=testing.db) + def test_unnamed_inline_foreign_key(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('e') eq_( - set([con.name for con in c.constraints]), - set([None, None]) + fks, + [{ + 'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['x'] + }] + ) + + def test_unnamed_inline_foreign_key_quoted(self): + inspector = Inspector(testing.db) + + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('e1') + eq_( + fks, + [{ + 'referred_table': 'a2', + 'referred_columns': ['some ( STUPID n,ame'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['some ( STUPID n,ame'] + }] + ) + fks = inspector.get_foreign_keys('e2') + eq_( + fks, + [{ + 'referred_table': 'a2', + 'referred_columns': ['some ( STUPID n,ame'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['some ( STUPID n,ame'] + }] + ) + + def test_foreign_key_composite_broken_casing(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('j') + eq_( + fks, + [{ + 'referred_table': 'i', + 'referred_columns': ['x', 'y'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['q', 'p']}] + ) + fks = inspector.get_foreign_keys('k') + eq_( + fks, + [{'referred_table': 'i', 'referred_columns': ['x', 'y'], + 'referred_schema': None, 'name': 'my_fk', + 'constrained_columns': ['q', 'p']}] + ) + + def test_dont_reflect_autoindex(self): + inspector = Inspector(testing.db) + eq_(inspector.get_indexes('o'), []) + eq_( + inspector.get_indexes('o', include_auto_indexes=True), + [{ + 'unique': 1, + 'name': 'sqlite_autoindex_o_1', + 'column_names': ['foo']}]) + + def test_create_index_with_schema(self): + """Test creation of index with explicit schema""" + + inspector = Inspector(testing.db) + eq_( + inspector.get_indexes('l', schema='main'), + [{'unique': 0, 'name': u'ix_main_l_bar', + 'column_names': [u'bar']}]) + + def test_unique_constraint_named(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("f"), + [{'column_names': ['x'], 'name': 'foo_fx'}] + ) + + def test_unique_constraint_named_broken_casing(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("h"), + [{'column_names': ['x'], 'name': 'foo_hx'}] + ) + + def test_unique_constraint_named_broken_temp(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("g"), + [{'column_names': ['x'], 'name': 'foo_gx'}] + ) + + def test_unique_constraint_unnamed_inline(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("d"), + [{'column_names': ['x'], 'name': None}] + ) + + def test_unique_constraint_unnamed_inline_quoted(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("d1"), + [{'column_names': ['some ( STUPID n,ame'], 'name': None}] + ) + eq_( + inspector.get_unique_constraints("d2"), + [{'column_names': ['some STUPID n,ame'], 'name': None}] + ) + eq_( + inspector.get_unique_constraints("d3"), + [{'column_names': ['some STUPID n,ame'], 'name': None}] + ) + + def test_unique_constraint_unnamed_normal(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("m"), + [{'column_names': ['x'], 'name': None}] + ) + + def test_unique_constraint_unnamed_normal_temporary(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("n"), + [{'column_names': ['x'], 'name': None}] ) class SavepointTest(fixtures.TablesTest): + """test that savepoints work when we use the correct event setup""" __only_on__ = 'sqlite' @@ -1081,7 +1354,7 @@ class SavepointTest(fixtures.TablesTest): connection = self.bind.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') - trans2 = connection.begin_nested() + connection.begin_nested() connection.execute(users.insert(), user_id=2, user_name='user2') trans3 = connection.begin() connection.execute(users.insert(), user_id=3, user_name='user3') @@ -1127,6 +1400,16 @@ class TypeReflectionTest(fixtures.TestBase): (sqltypes.Time, sqltypes.TIME()), (sqltypes.BOOLEAN, sqltypes.BOOLEAN()), (sqltypes.Boolean, sqltypes.BOOLEAN()), + (sqlite.DATE( + storage_format="%(year)04d%(month)02d%(day)02d", + ), sqltypes.DATE()), + (sqlite.TIME( + storage_format="%(hour)02d%(minute)02d%(second)02d", + ), sqltypes.TIME()), + (sqlite.DATETIME( + storage_format="%(year)04d%(month)02d%(day)02d" + "%(hour)02d%(minute)02d%(second)02d", + ), sqltypes.DATETIME()), ] def _unsupported_args_fixture(self): @@ -1169,8 +1452,8 @@ class TypeReflectionTest(fixtures.TestBase): if warnings: def go(): return dialect._resolve_type_affinity(from_) - final_type = testing.assert_warnings(go, - ["Could not instantiate"], regex=True) + final_type = testing.assert_warnings( + go, ["Could not instantiate"], regex=True) else: final_type = dialect._resolve_type_affinity(from_) expected_type = type(to_) @@ -1186,8 +1469,8 @@ class TypeReflectionTest(fixtures.TestBase): if warnings: def go(): return inspector.get_columns("foo")[0] - col_info = testing.assert_warnings(go, - ["Could not instantiate"], regex=True) + col_info = testing.assert_warnings( + go, ["Could not instantiate"], regex=True) else: col_info = inspector.get_columns("foo")[0] expected_type = type(to_) @@ -1207,7 +1490,8 @@ class TypeReflectionTest(fixtures.TestBase): self._test_lookup_direct(self._fixed_lookup_fixture()) def test_lookup_direct_unsupported_args(self): - self._test_lookup_direct(self._unsupported_args_fixture(), warnings=True) + self._test_lookup_direct( + self._unsupported_args_fixture(), warnings=True) def test_lookup_direct_type_affinity(self): self._test_lookup_direct(self._type_affinity_fixture()) @@ -1216,8 +1500,8 @@ class TypeReflectionTest(fixtures.TestBase): self._test_round_trip(self._fixed_lookup_fixture()) def test_round_trip_direct_unsupported_args(self): - self._test_round_trip(self._unsupported_args_fixture(), warnings=True) + self._test_round_trip( + self._unsupported_args_fixture(), warnings=True) def test_round_trip_direct_type_affinity(self): self._test_round_trip(self._type_affinity_fixture()) - diff --git a/test/dialect/test_suite.py b/test/dialect/test_suite.py index e6d642ced..3820a7721 100644 --- a/test/dialect/test_suite.py +++ b/test/dialect/test_suite.py @@ -1,2 +1,3 @@ from sqlalchemy.testing.suite import * + |
