diff options
Diffstat (limited to 'test/dialect/test_mssql.py')
| -rw-r--r-- | test/dialect/test_mssql.py | 532 |
1 files changed, 287 insertions, 245 deletions
diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py index dd86ce0de..423310db6 100644 --- a/test/dialect/test_mssql.py +++ b/test/dialect/test_mssql.py @@ -2,17 +2,18 @@ from sqlalchemy.test.testing import eq_ import datetime, os, re from sqlalchemy import * -from sqlalchemy import types, exc +from sqlalchemy import types, exc, schema from sqlalchemy.orm import * from sqlalchemy.sql import table, column from sqlalchemy.databases import mssql -import sqlalchemy.engine.url as url +from sqlalchemy.dialects.mssql import pyodbc +from sqlalchemy.engine import url from sqlalchemy.test import * from sqlalchemy.test.testing import eq_ class CompileTest(TestBase, AssertsCompiledSQL): - __dialect__ = mssql.MSSQLDialect() + __dialect__ = mssql.dialect() def test_insert(self): t = table('sometable', column('somecolumn')) @@ -157,6 +158,45 @@ class CompileTest(TestBase, AssertsCompiledSQL): select([extract(field, t.c.col1)]), 'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % field) + def test_update_returning(self): + table1 = table('mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + + u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name) + self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, inserted.name") + + u = update(table1, values=dict(name='foo')).returning(table1) + self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, " + "inserted.name, inserted.description") + + u = update(table1, values=dict(name='foo')).returning(table1).where(table1.c.name=='bar') + self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, " + "inserted.name, inserted.description WHERE mytable.name = :name_1") + + u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name)) + self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT LEN(inserted.name) AS length_1") + + def test_insert_returning(self): + table1 = table('mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + + i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name) + self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT inserted.myid, inserted.name VALUES (:name)") + + i = insert(table1, values=dict(name='foo')).returning(table1) + self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT inserted.myid, " + "inserted.name, inserted.description VALUES (:name)") + + i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name)) + self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT LEN(inserted.name) AS length_1 VALUES (:name)") + + class IdentityInsertTest(TestBase, AssertsCompiledSQL): __only_on__ = 'mssql' @@ -189,9 +229,9 @@ class IdentityInsertTest(TestBase, AssertsCompiledSQL): eq_([(9, 'Python')], list(cats)) result = cattable.insert().values(description='PHP').execute() - eq_([10], result.last_inserted_ids()) + eq_([10], result.inserted_primary_key) lastcat = cattable.select().order_by(desc(cattable.c.id)).execute() - eq_((10, 'PHP'), lastcat.fetchone()) + eq_((10, 'PHP'), lastcat.first()) def test_executemany(self): cattable.insert().execute([ @@ -213,10 +253,51 @@ class IdentityInsertTest(TestBase, AssertsCompiledSQL): eq_([(91, 'Smalltalk'), (90, 'PHP')], list(lastcats)) -class ReflectionTest(TestBase): +class ReflectionTest(TestBase, ComparesTables): __only_on__ = 'mssql' - def testidentity(self): + def test_basic_reflection(self): + meta = MetaData(testing.db) + + users = Table('engine_users', meta, + Column('user_id', types.INT, primary_key=True), + Column('user_name', types.VARCHAR(20), nullable=False), + Column('test1', types.CHAR(5), nullable=False), + Column('test2', types.Float(5), nullable=False), + Column('test3', types.Text), + Column('test4', types.Numeric, nullable = False), + Column('test5', types.DateTime), + Column('parent_user_id', types.Integer, + ForeignKey('engine_users.user_id')), + Column('test6', types.DateTime, nullable=False), + Column('test7', types.Text), + Column('test8', types.Binary), + Column('test_passivedefault2', types.Integer, server_default='5'), + Column('test9', types.Binary(100)), + Column('test_numeric', types.Numeric()), + test_needs_fk=True, + ) + + addresses = Table('engine_email_addresses', meta, + Column('address_id', types.Integer, primary_key = True), + Column('remote_user_id', types.Integer, ForeignKey(users.c.user_id)), + Column('email_address', types.String(20)), + test_needs_fk=True, + ) + meta.create_all() + + try: + meta2 = MetaData() + reflected_users = Table('engine_users', meta2, autoload=True, + autoload_with=testing.db) + reflected_addresses = Table('engine_email_addresses', meta2, + autoload=True, autoload_with=testing.db) + self.assert_tables_equal(users, reflected_users) + self.assert_tables_equal(addresses, reflected_addresses) + finally: + meta.drop_all() + + def test_identity(self): meta = MetaData(testing.db) table = Table( 'identity_test', meta, @@ -240,7 +321,7 @@ class QueryUnicodeTest(TestBase): meta = MetaData(testing.db) t1 = Table('unitest_table', meta, Column('id', Integer, primary_key=True), - Column('descr', mssql.MSText(200, convert_unicode=True))) + Column('descr', mssql.MSText(convert_unicode=True))) meta.create_all() con = testing.db.connect() @@ -248,7 +329,7 @@ class QueryUnicodeTest(TestBase): con.execute(u"insert into unitest_table values ('bien mangé')".encode('UTF-8')) try: - r = t1.select().execute().fetchone() + r = t1.select().execute().first() assert isinstance(r[1], unicode), '%s is %s instead of unicode, working on %s' % ( r[1], type(r[1]), meta.bind) @@ -262,7 +343,9 @@ class QueryTest(TestBase): meta = MetaData(testing.db) t1 = Table('t1', meta, Column('id', Integer, Sequence('fred', 100, 1), primary_key=True), - Column('descr', String(200))) + Column('descr', String(200)), + implicit_returning = False + ) t2 = Table('t2', meta, Column('id', Integer, Sequence('fred', 200, 1), primary_key=True), Column('descr', String(200))) @@ -274,9 +357,9 @@ class QueryTest(TestBase): try: tr = con.begin() r = con.execute(t2.insert(), descr='hello') - self.assert_(r.last_inserted_ids() == [200]) + self.assert_(r.inserted_primary_key == [200]) r = con.execute(t1.insert(), descr='hello') - self.assert_(r.last_inserted_ids() == [100]) + self.assert_(r.inserted_primary_key == [100]) finally: tr.commit() @@ -295,6 +378,19 @@ class QueryTest(TestBase): tbl.drop() con.execute('drop schema paj') + def test_returning_no_autoinc(self): + meta = MetaData(testing.db) + + table = Table('t1', meta, Column('id', Integer, primary_key=True), Column('data', String(50))) + table.create() + try: + result = table.insert().values(id=1, data=func.lower("SomeString")).returning(table.c.id, table.c.data).execute() + eq_(result.fetchall(), [(1, 'somestring',)]) + finally: + # this will hang if the "SET IDENTITY_INSERT t1 OFF" occurs before the + # result is fetched + table.drop() + def test_delete_schema(self): meta = MetaData(testing.db) con = testing.db.connect() @@ -371,36 +467,26 @@ class SchemaTest(TestBase): ) self.column = t.c.test_column + dialect = mssql.dialect() + self.ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t)) + + def _column_spec(self): + return self.ddl_compiler.get_column_specification(self.column) + def test_that_mssql_default_nullability_emits_null(self): - schemagenerator = \ - mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None) - column_specification = \ - schemagenerator.get_column_specification(self.column) - eq_("test_column VARCHAR NULL", column_specification) + eq_("test_column VARCHAR NULL", self._column_spec()) def test_that_mssql_none_nullability_does_not_emit_nullability(self): - schemagenerator = \ - mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None) self.column.nullable = None - column_specification = \ - schemagenerator.get_column_specification(self.column) - eq_("test_column VARCHAR", column_specification) + eq_("test_column VARCHAR", self._column_spec()) def test_that_mssql_specified_nullable_emits_null(self): - schemagenerator = \ - mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None) self.column.nullable = True - column_specification = \ - schemagenerator.get_column_specification(self.column) - eq_("test_column VARCHAR NULL", column_specification) + eq_("test_column VARCHAR NULL", self._column_spec()) def test_that_mssql_specified_not_nullable_emits_not_null(self): - schemagenerator = \ - mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None) self.column.nullable = False - column_specification = \ - schemagenerator.get_column_specification(self.column) - eq_("test_column VARCHAR NOT NULL", column_specification) + eq_("test_column VARCHAR NOT NULL", self._column_spec()) def full_text_search_missing(): @@ -515,79 +601,73 @@ class MatchTest(TestBase, AssertsCompiledSQL): class ParseConnectTest(TestBase, AssertsCompiledSQL): __only_on__ = 'mssql' + @classmethod + def setup_class(cls): + global dialect + dialect = pyodbc.MSDialect_pyodbc() + def test_pyodbc_connect_dsn_trusted(self): u = url.make_url('mssql://mydsn') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;TrustedConnection=Yes'], {}], connection) def test_pyodbc_connect_old_style_dsn_trusted(self): u = url.make_url('mssql:///?dsn=mydsn') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;TrustedConnection=Yes'], {}], connection) def test_pyodbc_connect_dsn_non_trusted(self): u = url.make_url('mssql://username:password@mydsn') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;UID=username;PWD=password'], {}], connection) def test_pyodbc_connect_dsn_extra(self): u = url.make_url('mssql://username:password@mydsn/?LANGUAGE=us_english&foo=bar') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;UID=username;PWD=password;LANGUAGE=us_english;foo=bar'], {}], connection) def test_pyodbc_connect(self): u = url.make_url('mssql://username:password@hostspec/database') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection) def test_pyodbc_connect_comma_port(self): u = url.make_url('mssql://username:password@hostspec:12345/database') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec,12345;Database=database;UID=username;PWD=password'], {}], connection) def test_pyodbc_connect_config_port(self): u = url.make_url('mssql://username:password@hostspec/database?port=12345') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password;port=12345'], {}], connection) def test_pyodbc_extra_connect(self): u = url.make_url('mssql://username:password@hostspec/database?LANGUAGE=us_english&foo=bar') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password;foo=bar;LANGUAGE=us_english'], {}], connection) def test_pyodbc_odbc_connect(self): u = url.make_url('mssql:///?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection) def test_pyodbc_odbc_connect_with_dsn(self): u = url.make_url('mssql:///?odbc_connect=dsn%3Dmydsn%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['dsn=mydsn;Database=database;UID=username;PWD=password'], {}], connection) def test_pyodbc_odbc_connect_ignores_other_values(self): u = url.make_url('mssql://userdiff:passdiff@localhost/dbdiff?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword') - dialect = mssql.MSSQLDialect_pyodbc() connection = dialect.create_connect_args(u) eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection) -class TypesTest(TestBase): +class TypesTest(TestBase, AssertsExecutionResults, ComparesTables): __only_on__ = 'mssql' @classmethod def setup_class(cls): - global numeric_table, metadata + global metadata metadata = MetaData(testing.db) def teardown(self): @@ -601,26 +681,22 @@ class TypesTest(TestBase): ) metadata.create_all() - try: - test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000', - '-1500000.00000000000000000000', '1500000', - '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2', - '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234', - '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3', - '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2', - '-02452E-2', '45125E-2', - '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25', - '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12', - '00000000000000.1E+12', '000000000000.2E-32'] + test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000', + '-1500000.00000000000000000000', '1500000', + '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2', + '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234', + '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3', + '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2', + '-02452E-2', '45125E-2', + '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25', + '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12', + '00000000000000.1E+12', '000000000000.2E-32'] - for value in test_items: - numeric_table.insert().execute(numericcol=value) + for value in test_items: + numeric_table.insert().execute(numericcol=value) - for value in select([numeric_table.c.numericcol]).execute(): - assert value[0] in test_items, "%s not in test_items" % value[0] - - except Exception, e: - raise e + for value in select([numeric_table.c.numericcol]).execute(): + assert value[0] in test_items, "%s not in test_items" % value[0] def test_float(self): float_table = Table('float_table', metadata, @@ -643,11 +719,6 @@ class TypesTest(TestBase): raise e -class TypesTest2(TestBase, AssertsExecutionResults): - "Test Microsoft SQL Server column types" - - __only_on__ = 'mssql' - def test_money(self): "Exercise type specification for money types." @@ -659,13 +730,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): 'SMALLMONEY'), ] - table_args = ['test_mssql_money', MetaData(testing.db)] + table_args = ['test_mssql_money', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) money_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(money_table)) for col in money_table.c: index = int(col.name[1:]) @@ -688,15 +760,27 @@ class TypesTest2(TestBase, AssertsExecutionResults): (mssql.MSDateTime, [], {}, 'DATETIME', []), + (types.DATE, [], {}, + 'DATE', ['>=', (10,)]), + (types.Date, [], {}, + 'DATE', ['>=', (10,)]), + (types.Date, [], {}, + 'DATETIME', ['<', (10,)], mssql.MSDateTime), (mssql.MSDate, [], {}, 'DATE', ['>=', (10,)]), (mssql.MSDate, [], {}, 'DATETIME', ['<', (10,)], mssql.MSDateTime), + (types.TIME, [], {}, + 'TIME', ['>=', (10,)]), + (types.Time, [], {}, + 'TIME', ['>=', (10,)]), (mssql.MSTime, [], {}, 'TIME', ['>=', (10,)]), (mssql.MSTime, [1], {}, 'TIME(1)', ['>=', (10,)]), + (types.Time, [], {}, + 'DATETIME', ['<', (10,)], mssql.MSDateTime), (mssql.MSTime, [], {}, 'DATETIME', ['<', (10,)], mssql.MSDateTime), @@ -715,14 +799,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): ] - table_args = ['test_mssql_dates', MetaData(testing.db)] + table_args = ['test_mssql_dates', metadata] for index, spec in enumerate(columns): type_, args, kw, res, requires = spec[0:5] if (requires and testing._is_excluded('mssql', *requires)) or not requires: table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) dates_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + gen = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(dates_table)) for col in dates_table.c: index = int(col.name[1:]) @@ -730,49 +814,37 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - dates_table.create(checkfirst=True) - assert True - except: - raise + dates_table.create(checkfirst=True) reflected_dates = Table('test_mssql_dates', MetaData(testing.db), autoload=True) for col in reflected_dates.c: - index = int(col.name[1:]) - testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__, - len(columns[index]) > 5 and columns[index][5] or columns[index][0]) - dates_table.drop() - - def test_dates2(self): - meta = MetaData(testing.db) - t = Table('test_dates', meta, - Column('id', Integer, - Sequence('datetest_id_seq', optional=True), - primary_key=True), - Column('adate', Date), - Column('atime', Time), - Column('adatetime', DateTime)) - t.create(checkfirst=True) - try: - d1 = datetime.date(2007, 10, 30) - t1 = datetime.time(11, 2, 32) - d2 = datetime.datetime(2007, 10, 30, 11, 2, 32) - t.insert().execute(adate=d1, adatetime=d2, atime=t1) - t.insert().execute(adate=d2, adatetime=d2, atime=d2) + self.assert_types_base(col, dates_table.c[col.key]) - x = t.select().execute().fetchall()[0] - self.assert_(x.adate.__class__ == datetime.date) - self.assert_(x.atime.__class__ == datetime.time) - self.assert_(x.adatetime.__class__ == datetime.datetime) + def test_date_roundtrip(self): + t = Table('test_dates', metadata, + Column('id', Integer, + Sequence('datetest_id_seq', optional=True), + primary_key=True), + Column('adate', Date), + Column('atime', Time), + Column('adatetime', DateTime)) + metadata.create_all() + d1 = datetime.date(2007, 10, 30) + t1 = datetime.time(11, 2, 32) + d2 = datetime.datetime(2007, 10, 30, 11, 2, 32) + t.insert().execute(adate=d1, adatetime=d2, atime=t1) + t.insert().execute(adate=d2, adatetime=d2, atime=d2) - t.delete().execute() + x = t.select().execute().fetchall()[0] + self.assert_(x.adate.__class__ == datetime.date) + self.assert_(x.atime.__class__ == datetime.time) + self.assert_(x.adatetime.__class__ == datetime.datetime) - t.insert().execute(adate=d1, adatetime=d2, atime=t1) + t.delete().execute() - eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)]) + t.insert().execute(adate=d1, adatetime=d2, atime=t1) - finally: - t.drop(checkfirst=True) + eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)]) def test_binary(self): "Exercise type specification for binary types." @@ -781,6 +853,9 @@ class TypesTest2(TestBase, AssertsExecutionResults): # column type, args, kwargs, expected ddl (mssql.MSBinary, [], {}, 'BINARY'), + (types.Binary, [10], {}, + 'BINARY(10)'), + (mssql.MSBinary, [10], {}, 'BINARY(10)'), @@ -798,13 +873,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): 'BINARY(10)') ] - table_args = ['test_mssql_binary', MetaData(testing.db)] + table_args = ['test_mssql_binary', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) binary_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(binary_table)) for col in binary_table.c: index = int(col.name[1:]) @@ -812,22 +888,15 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - binary_table.create(checkfirst=True) - assert True - except: - raise + metadata.create_all() reflected_binary = Table('test_mssql_binary', MetaData(testing.db), autoload=True) for col in reflected_binary.c: - # don't test the MSGenericBinary since it's a special case and - # reflected it will map to a MSImage or MSBinary depending - if not testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__ == mssql.MSGenericBinary: - testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__, - testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__) + c1 =testing.db.dialect.type_descriptor(col.type).__class__ + c2 =testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__ + assert issubclass(c1, c2), "%r is not a subclass of %r" % (c1, c2) if binary_table.c[col.name].type.length: testing.eq_(col.type.length, binary_table.c[col.name].type.length) - binary_table.drop() def test_boolean(self): "Exercise type specification for boolean type." @@ -838,13 +907,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): 'BIT'), ] - table_args = ['test_mssql_boolean', MetaData(testing.db)] + table_args = ['test_mssql_boolean', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) boolean_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(boolean_table)) for col in boolean_table.c: index = int(col.name[1:]) @@ -852,12 +922,7 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - boolean_table.create(checkfirst=True) - assert True - except: - raise - boolean_table.drop() + metadata.create_all() def test_numeric(self): "Exercise type specification and options for numeric types." @@ -865,40 +930,39 @@ class TypesTest2(TestBase, AssertsExecutionResults): columns = [ # column type, args, kwargs, expected ddl (mssql.MSNumeric, [], {}, - 'NUMERIC(10, 2)'), + 'NUMERIC'), (mssql.MSNumeric, [None], {}, 'NUMERIC'), - (mssql.MSNumeric, [12], {}, - 'NUMERIC(12, 2)'), (mssql.MSNumeric, [12, 4], {}, 'NUMERIC(12, 4)'), - (mssql.MSFloat, [], {}, - 'FLOAT(10)'), - (mssql.MSFloat, [None], {}, + (types.Float, [], {}, + 'FLOAT'), + (types.Float, [None], {}, 'FLOAT'), - (mssql.MSFloat, [12], {}, + (types.Float, [12], {}, 'FLOAT(12)'), (mssql.MSReal, [], {}, 'REAL'), - (mssql.MSInteger, [], {}, + (types.Integer, [], {}, 'INTEGER'), - (mssql.MSBigInteger, [], {}, + (types.BigInteger, [], {}, 'BIGINT'), (mssql.MSTinyInteger, [], {}, 'TINYINT'), - (mssql.MSSmallInteger, [], {}, + (types.SmallInteger, [], {}, 'SMALLINT'), ] - table_args = ['test_mssql_numeric', MetaData(testing.db)] + table_args = ['test_mssql_numeric', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) numeric_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(numeric_table)) for col in numeric_table.c: index = int(col.name[1:]) @@ -906,20 +970,11 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - numeric_table.create(checkfirst=True) - assert True - except: - raise - numeric_table.drop() + metadata.create_all() def test_char(self): """Exercise COLLATE-ish options on string types.""" - # modify the text_as_varchar setting since we are not testing that behavior here - text_as_varchar = testing.db.dialect.text_as_varchar - testing.db.dialect.text_as_varchar = False - columns = [ (mssql.MSChar, [], {}, 'CHAR'), @@ -960,13 +1015,14 @@ class TypesTest2(TestBase, AssertsExecutionResults): 'NTEXT COLLATE Latin1_General_CI_AS'), ] - table_args = ['test_mssql_charset', MetaData(testing.db)] + table_args = ['test_mssql_charset', metadata] for index, spec in enumerate(columns): type_, args, kw, res = spec table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) charset_table = Table(*table_args) - gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + dialect = mssql.dialect() + gen = dialect.ddl_compiler(dialect, schema.CreateTable(charset_table)) for col in charset_table.c: index = int(col.name[1:]) @@ -974,110 +1030,91 @@ class TypesTest2(TestBase, AssertsExecutionResults): "%s %s" % (col.name, columns[index][3])) self.assert_(repr(col)) - try: - charset_table.create(checkfirst=True) - assert True - except: - raise - charset_table.drop() - - testing.db.dialect.text_as_varchar = text_as_varchar + metadata.create_all() def test_timestamp(self): """Exercise TIMESTAMP column.""" - meta = MetaData(testing.db) - - try: - columns = [ - (TIMESTAMP, - 'TIMESTAMP'), - (mssql.MSTimeStamp, - 'TIMESTAMP'), - ] - for idx, (spec, expected) in enumerate(columns): - t = Table('mssql_ts%s' % idx, meta, - Column('id', Integer, primary_key=True), - Column('t', spec, nullable=None)) - testing.eq_(colspec(t.c.t), "t %s" % expected) - self.assert_(repr(t.c.t)) - try: - t.create(checkfirst=True) - assert True - except: - raise - t.drop() - finally: - meta.drop_all() + dialect = mssql.dialect() + spec, expected = (TIMESTAMP,'TIMESTAMP') + t = Table('mssql_ts', metadata, + Column('id', Integer, primary_key=True), + Column('t', spec, nullable=None)) + gen = dialect.ddl_compiler(dialect, schema.CreateTable(t)) + testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected) + self.assert_(repr(t.c.t)) + t.create(checkfirst=True) + def test_autoincrement(self): - meta = MetaData(testing.db) - try: - Table('ai_1', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True)) - Table('ai_2', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True)) - Table('ai_3', meta, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_y', Integer, primary_key=True)) - Table('ai_4', meta, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_n2', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_5', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_6', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True)) - Table('ai_7', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True)) - Table('ai_8', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True)) - meta.create_all() - - table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', - 'ai_5', 'ai_6', 'ai_7', 'ai_8'] - mr = MetaData(testing.db) - mr.reflect(only=table_names) - - for tbl in [mr.tables[name] for name in table_names]: - for c in tbl.c: - if c.name.startswith('int_y'): - assert c.autoincrement - elif c.name.startswith('int_n'): - assert not c.autoincrement - tbl.insert().execute() + Table('ai_1', metadata, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True)) + Table('ai_2', metadata, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True)) + Table('ai_3', metadata, + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False), + Column('int_y', Integer, primary_key=True)) + Table('ai_4', metadata, + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False), + Column('int_n2', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table('ai_5', metadata, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table('ai_6', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('int_y', Integer, primary_key=True)) + Table('ai_7', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('o2', String(1), DefaultClause('x'), + primary_key=True), + Column('int_y', Integer, primary_key=True)) + Table('ai_8', metadata, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('o2', String(1), DefaultClause('x'), + primary_key=True)) + metadata.create_all() + + table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', + 'ai_5', 'ai_6', 'ai_7', 'ai_8'] + mr = MetaData(testing.db) + + for name in table_names: + tbl = Table(name, mr, autoload=True) + for c in tbl.c: + if c.name.startswith('int_y'): + assert c.autoincrement + elif c.name.startswith('int_n'): + assert not c.autoincrement + + for counter, engine in enumerate([ + engines.testing_engine(options={'implicit_returning':False}), + engines.testing_engine(options={'implicit_returning':True}), + ] + ): + engine.execute(tbl.insert()) if 'int_y' in tbl.c: - assert select([tbl.c.int_y]).scalar() == 1 - assert list(tbl.select().execute().fetchone()).count(1) == 1 + assert engine.scalar(select([tbl.c.int_y])) == counter + 1 + assert list(engine.execute(tbl.select()).first()).count(counter + 1) == 1 else: - assert 1 not in list(tbl.select().execute().fetchone()) - finally: - meta.drop_all() - -def colspec(c): - return testing.db.dialect.schemagenerator(testing.db.dialect, - testing.db, None, None).get_column_specification(c) - + assert 1 not in list(engine.execute(tbl.select()).first()) + engine.execute(tbl.delete()) class BinaryTest(TestBase, AssertsExecutionResults): """Test the Binary and VarBinary types""" + + __only_on__ = 'mssql' + @classmethod def setup_class(cls): global binary_table, MyPickleType @@ -1125,6 +1162,11 @@ class BinaryTest(TestBase, AssertsExecutionResults): stream2 =self.load_stream('binary_data_two.dat') binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_image=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3) binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_image=stream2, data_slice=stream2[0:99], pickled=testobj2) + + # TODO: pyodbc does not seem to accept "None" for a VARBINARY column (data=None). + # error: [Microsoft][ODBC SQL Server Driver][SQL Server]Implicit conversion from + # data type varchar to varbinary is not allowed. Use the CONVERT function to run this query. (257) + #binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_image=None, data_slice=stream2[0:99], pickled=None) binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data_image=None, data_slice=stream2[0:99], pickled=None) for stmt in ( |
