summaryrefslogtreecommitdiff
path: root/test/dialect/test_mssql.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/test_mssql.py')
-rw-r--r--test/dialect/test_mssql.py532
1 files changed, 287 insertions, 245 deletions
diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py
index dd86ce0de..423310db6 100644
--- a/test/dialect/test_mssql.py
+++ b/test/dialect/test_mssql.py
@@ -2,17 +2,18 @@
from sqlalchemy.test.testing import eq_
import datetime, os, re
from sqlalchemy import *
-from sqlalchemy import types, exc
+from sqlalchemy import types, exc, schema
from sqlalchemy.orm import *
from sqlalchemy.sql import table, column
from sqlalchemy.databases import mssql
-import sqlalchemy.engine.url as url
+from sqlalchemy.dialects.mssql import pyodbc
+from sqlalchemy.engine import url
from sqlalchemy.test import *
from sqlalchemy.test.testing import eq_
class CompileTest(TestBase, AssertsCompiledSQL):
- __dialect__ = mssql.MSSQLDialect()
+ __dialect__ = mssql.dialect()
def test_insert(self):
t = table('sometable', column('somecolumn'))
@@ -157,6 +158,45 @@ class CompileTest(TestBase, AssertsCompiledSQL):
select([extract(field, t.c.col1)]),
'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % field)
+ def test_update_returning(self):
+ table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String(128)),
+ column('description', String(128)),
+ )
+
+ u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
+ self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, inserted.name")
+
+ u = update(table1, values=dict(name='foo')).returning(table1)
+ self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, "
+ "inserted.name, inserted.description")
+
+ u = update(table1, values=dict(name='foo')).returning(table1).where(table1.c.name=='bar')
+ self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT inserted.myid, "
+ "inserted.name, inserted.description WHERE mytable.name = :name_1")
+
+ u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
+ self.assert_compile(u, "UPDATE mytable SET name=:name OUTPUT LEN(inserted.name) AS length_1")
+
+ def test_insert_returning(self):
+ table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String(128)),
+ column('description', String(128)),
+ )
+
+ i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
+ self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT inserted.myid, inserted.name VALUES (:name)")
+
+ i = insert(table1, values=dict(name='foo')).returning(table1)
+ self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT inserted.myid, "
+ "inserted.name, inserted.description VALUES (:name)")
+
+ i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
+ self.assert_compile(i, "INSERT INTO mytable (name) OUTPUT LEN(inserted.name) AS length_1 VALUES (:name)")
+
+
class IdentityInsertTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'mssql'
@@ -189,9 +229,9 @@ class IdentityInsertTest(TestBase, AssertsCompiledSQL):
eq_([(9, 'Python')], list(cats))
result = cattable.insert().values(description='PHP').execute()
- eq_([10], result.last_inserted_ids())
+ eq_([10], result.inserted_primary_key)
lastcat = cattable.select().order_by(desc(cattable.c.id)).execute()
- eq_((10, 'PHP'), lastcat.fetchone())
+ eq_((10, 'PHP'), lastcat.first())
def test_executemany(self):
cattable.insert().execute([
@@ -213,10 +253,51 @@ class IdentityInsertTest(TestBase, AssertsCompiledSQL):
eq_([(91, 'Smalltalk'), (90, 'PHP')], list(lastcats))
-class ReflectionTest(TestBase):
+class ReflectionTest(TestBase, ComparesTables):
__only_on__ = 'mssql'
- def testidentity(self):
+ def test_basic_reflection(self):
+ meta = MetaData(testing.db)
+
+ users = Table('engine_users', meta,
+ Column('user_id', types.INT, primary_key=True),
+ Column('user_name', types.VARCHAR(20), nullable=False),
+ Column('test1', types.CHAR(5), nullable=False),
+ Column('test2', types.Float(5), nullable=False),
+ Column('test3', types.Text),
+ Column('test4', types.Numeric, nullable = False),
+ Column('test5', types.DateTime),
+ Column('parent_user_id', types.Integer,
+ ForeignKey('engine_users.user_id')),
+ Column('test6', types.DateTime, nullable=False),
+ Column('test7', types.Text),
+ Column('test8', types.Binary),
+ Column('test_passivedefault2', types.Integer, server_default='5'),
+ Column('test9', types.Binary(100)),
+ Column('test_numeric', types.Numeric()),
+ test_needs_fk=True,
+ )
+
+ addresses = Table('engine_email_addresses', meta,
+ Column('address_id', types.Integer, primary_key = True),
+ Column('remote_user_id', types.Integer, ForeignKey(users.c.user_id)),
+ Column('email_address', types.String(20)),
+ test_needs_fk=True,
+ )
+ meta.create_all()
+
+ try:
+ meta2 = MetaData()
+ reflected_users = Table('engine_users', meta2, autoload=True,
+ autoload_with=testing.db)
+ reflected_addresses = Table('engine_email_addresses', meta2,
+ autoload=True, autoload_with=testing.db)
+ self.assert_tables_equal(users, reflected_users)
+ self.assert_tables_equal(addresses, reflected_addresses)
+ finally:
+ meta.drop_all()
+
+ def test_identity(self):
meta = MetaData(testing.db)
table = Table(
'identity_test', meta,
@@ -240,7 +321,7 @@ class QueryUnicodeTest(TestBase):
meta = MetaData(testing.db)
t1 = Table('unitest_table', meta,
Column('id', Integer, primary_key=True),
- Column('descr', mssql.MSText(200, convert_unicode=True)))
+ Column('descr', mssql.MSText(convert_unicode=True)))
meta.create_all()
con = testing.db.connect()
@@ -248,7 +329,7 @@ class QueryUnicodeTest(TestBase):
con.execute(u"insert into unitest_table values ('bien mangé')".encode('UTF-8'))
try:
- r = t1.select().execute().fetchone()
+ r = t1.select().execute().first()
assert isinstance(r[1], unicode), '%s is %s instead of unicode, working on %s' % (
r[1], type(r[1]), meta.bind)
@@ -262,7 +343,9 @@ class QueryTest(TestBase):
meta = MetaData(testing.db)
t1 = Table('t1', meta,
Column('id', Integer, Sequence('fred', 100, 1), primary_key=True),
- Column('descr', String(200)))
+ Column('descr', String(200)),
+ implicit_returning = False
+ )
t2 = Table('t2', meta,
Column('id', Integer, Sequence('fred', 200, 1), primary_key=True),
Column('descr', String(200)))
@@ -274,9 +357,9 @@ class QueryTest(TestBase):
try:
tr = con.begin()
r = con.execute(t2.insert(), descr='hello')
- self.assert_(r.last_inserted_ids() == [200])
+ self.assert_(r.inserted_primary_key == [200])
r = con.execute(t1.insert(), descr='hello')
- self.assert_(r.last_inserted_ids() == [100])
+ self.assert_(r.inserted_primary_key == [100])
finally:
tr.commit()
@@ -295,6 +378,19 @@ class QueryTest(TestBase):
tbl.drop()
con.execute('drop schema paj')
+ def test_returning_no_autoinc(self):
+ meta = MetaData(testing.db)
+
+ table = Table('t1', meta, Column('id', Integer, primary_key=True), Column('data', String(50)))
+ table.create()
+ try:
+ result = table.insert().values(id=1, data=func.lower("SomeString")).returning(table.c.id, table.c.data).execute()
+ eq_(result.fetchall(), [(1, 'somestring',)])
+ finally:
+ # this will hang if the "SET IDENTITY_INSERT t1 OFF" occurs before the
+ # result is fetched
+ table.drop()
+
def test_delete_schema(self):
meta = MetaData(testing.db)
con = testing.db.connect()
@@ -371,36 +467,26 @@ class SchemaTest(TestBase):
)
self.column = t.c.test_column
+ dialect = mssql.dialect()
+ self.ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t))
+
+ def _column_spec(self):
+ return self.ddl_compiler.get_column_specification(self.column)
+
def test_that_mssql_default_nullability_emits_null(self):
- schemagenerator = \
- mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None)
- column_specification = \
- schemagenerator.get_column_specification(self.column)
- eq_("test_column VARCHAR NULL", column_specification)
+ eq_("test_column VARCHAR NULL", self._column_spec())
def test_that_mssql_none_nullability_does_not_emit_nullability(self):
- schemagenerator = \
- mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None)
self.column.nullable = None
- column_specification = \
- schemagenerator.get_column_specification(self.column)
- eq_("test_column VARCHAR", column_specification)
+ eq_("test_column VARCHAR", self._column_spec())
def test_that_mssql_specified_nullable_emits_null(self):
- schemagenerator = \
- mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None)
self.column.nullable = True
- column_specification = \
- schemagenerator.get_column_specification(self.column)
- eq_("test_column VARCHAR NULL", column_specification)
+ eq_("test_column VARCHAR NULL", self._column_spec())
def test_that_mssql_specified_not_nullable_emits_not_null(self):
- schemagenerator = \
- mssql.MSSQLDialect().schemagenerator(mssql.MSSQLDialect(), None)
self.column.nullable = False
- column_specification = \
- schemagenerator.get_column_specification(self.column)
- eq_("test_column VARCHAR NOT NULL", column_specification)
+ eq_("test_column VARCHAR NOT NULL", self._column_spec())
def full_text_search_missing():
@@ -515,79 +601,73 @@ class MatchTest(TestBase, AssertsCompiledSQL):
class ParseConnectTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'mssql'
+ @classmethod
+ def setup_class(cls):
+ global dialect
+ dialect = pyodbc.MSDialect_pyodbc()
+
def test_pyodbc_connect_dsn_trusted(self):
u = url.make_url('mssql://mydsn')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;TrustedConnection=Yes'], {}], connection)
def test_pyodbc_connect_old_style_dsn_trusted(self):
u = url.make_url('mssql:///?dsn=mydsn')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;TrustedConnection=Yes'], {}], connection)
def test_pyodbc_connect_dsn_non_trusted(self):
u = url.make_url('mssql://username:password@mydsn')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;UID=username;PWD=password'], {}], connection)
def test_pyodbc_connect_dsn_extra(self):
u = url.make_url('mssql://username:password@mydsn/?LANGUAGE=us_english&foo=bar')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;UID=username;PWD=password;LANGUAGE=us_english;foo=bar'], {}], connection)
def test_pyodbc_connect(self):
u = url.make_url('mssql://username:password@hostspec/database')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection)
def test_pyodbc_connect_comma_port(self):
u = url.make_url('mssql://username:password@hostspec:12345/database')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec,12345;Database=database;UID=username;PWD=password'], {}], connection)
def test_pyodbc_connect_config_port(self):
u = url.make_url('mssql://username:password@hostspec/database?port=12345')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password;port=12345'], {}], connection)
def test_pyodbc_extra_connect(self):
u = url.make_url('mssql://username:password@hostspec/database?LANGUAGE=us_english&foo=bar')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password;foo=bar;LANGUAGE=us_english'], {}], connection)
def test_pyodbc_odbc_connect(self):
u = url.make_url('mssql:///?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection)
def test_pyodbc_odbc_connect_with_dsn(self):
u = url.make_url('mssql:///?odbc_connect=dsn%3Dmydsn%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['dsn=mydsn;Database=database;UID=username;PWD=password'], {}], connection)
def test_pyodbc_odbc_connect_ignores_other_values(self):
u = url.make_url('mssql://userdiff:passdiff@localhost/dbdiff?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword')
- dialect = mssql.MSSQLDialect_pyodbc()
connection = dialect.create_connect_args(u)
eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UID=username;PWD=password'], {}], connection)
-class TypesTest(TestBase):
+class TypesTest(TestBase, AssertsExecutionResults, ComparesTables):
__only_on__ = 'mssql'
@classmethod
def setup_class(cls):
- global numeric_table, metadata
+ global metadata
metadata = MetaData(testing.db)
def teardown(self):
@@ -601,26 +681,22 @@ class TypesTest(TestBase):
)
metadata.create_all()
- try:
- test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000',
- '-1500000.00000000000000000000', '1500000',
- '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2',
- '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234',
- '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3',
- '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2',
- '-02452E-2', '45125E-2',
- '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25',
- '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12',
- '00000000000000.1E+12', '000000000000.2E-32']
+ test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000',
+ '-1500000.00000000000000000000', '1500000',
+ '0.0000000000000000002', '0.2', '-0.0000000000000000002', '-2E-2',
+ '156666.458923543', '-156666.458923543', '1', '-1', '-1234', '1234',
+ '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3',
+ '1E-4', '1E-5', '1E-6', '1E-7', '1E-1', '1E-8', '0.2732E2', '-0.2432E2', '4.35656E2',
+ '-02452E-2', '45125E-2',
+ '1234.58965E-2', '1.521E+15', '-1E-25', '1E-25', '1254E-25', '-1203E-25',
+ '0', '-0.00', '-0', '4585E12', '000000000000000000012', '000000000000.32E12',
+ '00000000000000.1E+12', '000000000000.2E-32']
- for value in test_items:
- numeric_table.insert().execute(numericcol=value)
+ for value in test_items:
+ numeric_table.insert().execute(numericcol=value)
- for value in select([numeric_table.c.numericcol]).execute():
- assert value[0] in test_items, "%s not in test_items" % value[0]
-
- except Exception, e:
- raise e
+ for value in select([numeric_table.c.numericcol]).execute():
+ assert value[0] in test_items, "%s not in test_items" % value[0]
def test_float(self):
float_table = Table('float_table', metadata,
@@ -643,11 +719,6 @@ class TypesTest(TestBase):
raise e
-class TypesTest2(TestBase, AssertsExecutionResults):
- "Test Microsoft SQL Server column types"
-
- __only_on__ = 'mssql'
-
def test_money(self):
"Exercise type specification for money types."
@@ -659,13 +730,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
'SMALLMONEY'),
]
- table_args = ['test_mssql_money', MetaData(testing.db)]
+ table_args = ['test_mssql_money', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
money_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(money_table))
for col in money_table.c:
index = int(col.name[1:])
@@ -688,15 +760,27 @@ class TypesTest2(TestBase, AssertsExecutionResults):
(mssql.MSDateTime, [], {},
'DATETIME', []),
+ (types.DATE, [], {},
+ 'DATE', ['>=', (10,)]),
+ (types.Date, [], {},
+ 'DATE', ['>=', (10,)]),
+ (types.Date, [], {},
+ 'DATETIME', ['<', (10,)], mssql.MSDateTime),
(mssql.MSDate, [], {},
'DATE', ['>=', (10,)]),
(mssql.MSDate, [], {},
'DATETIME', ['<', (10,)], mssql.MSDateTime),
+ (types.TIME, [], {},
+ 'TIME', ['>=', (10,)]),
+ (types.Time, [], {},
+ 'TIME', ['>=', (10,)]),
(mssql.MSTime, [], {},
'TIME', ['>=', (10,)]),
(mssql.MSTime, [1], {},
'TIME(1)', ['>=', (10,)]),
+ (types.Time, [], {},
+ 'DATETIME', ['<', (10,)], mssql.MSDateTime),
(mssql.MSTime, [], {},
'DATETIME', ['<', (10,)], mssql.MSDateTime),
@@ -715,14 +799,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
]
- table_args = ['test_mssql_dates', MetaData(testing.db)]
+ table_args = ['test_mssql_dates', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res, requires = spec[0:5]
if (requires and testing._is_excluded('mssql', *requires)) or not requires:
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
dates_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ gen = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(dates_table))
for col in dates_table.c:
index = int(col.name[1:])
@@ -730,49 +814,37 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- dates_table.create(checkfirst=True)
- assert True
- except:
- raise
+ dates_table.create(checkfirst=True)
reflected_dates = Table('test_mssql_dates', MetaData(testing.db), autoload=True)
for col in reflected_dates.c:
- index = int(col.name[1:])
- testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__,
- len(columns[index]) > 5 and columns[index][5] or columns[index][0])
- dates_table.drop()
-
- def test_dates2(self):
- meta = MetaData(testing.db)
- t = Table('test_dates', meta,
- Column('id', Integer,
- Sequence('datetest_id_seq', optional=True),
- primary_key=True),
- Column('adate', Date),
- Column('atime', Time),
- Column('adatetime', DateTime))
- t.create(checkfirst=True)
- try:
- d1 = datetime.date(2007, 10, 30)
- t1 = datetime.time(11, 2, 32)
- d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
- t.insert().execute(adate=d2, adatetime=d2, atime=d2)
+ self.assert_types_base(col, dates_table.c[col.key])
- x = t.select().execute().fetchall()[0]
- self.assert_(x.adate.__class__ == datetime.date)
- self.assert_(x.atime.__class__ == datetime.time)
- self.assert_(x.adatetime.__class__ == datetime.datetime)
+ def test_date_roundtrip(self):
+ t = Table('test_dates', metadata,
+ Column('id', Integer,
+ Sequence('datetest_id_seq', optional=True),
+ primary_key=True),
+ Column('adate', Date),
+ Column('atime', Time),
+ Column('adatetime', DateTime))
+ metadata.create_all()
+ d1 = datetime.date(2007, 10, 30)
+ t1 = datetime.time(11, 2, 32)
+ d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
+ t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ t.insert().execute(adate=d2, adatetime=d2, atime=d2)
- t.delete().execute()
+ x = t.select().execute().fetchall()[0]
+ self.assert_(x.adate.__class__ == datetime.date)
+ self.assert_(x.atime.__class__ == datetime.time)
+ self.assert_(x.adatetime.__class__ == datetime.datetime)
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ t.delete().execute()
- eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)])
+ t.insert().execute(adate=d1, adatetime=d2, atime=t1)
- finally:
- t.drop(checkfirst=True)
+ eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)])
def test_binary(self):
"Exercise type specification for binary types."
@@ -781,6 +853,9 @@ class TypesTest2(TestBase, AssertsExecutionResults):
# column type, args, kwargs, expected ddl
(mssql.MSBinary, [], {},
'BINARY'),
+ (types.Binary, [10], {},
+ 'BINARY(10)'),
+
(mssql.MSBinary, [10], {},
'BINARY(10)'),
@@ -798,13 +873,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
'BINARY(10)')
]
- table_args = ['test_mssql_binary', MetaData(testing.db)]
+ table_args = ['test_mssql_binary', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
binary_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(binary_table))
for col in binary_table.c:
index = int(col.name[1:])
@@ -812,22 +888,15 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- binary_table.create(checkfirst=True)
- assert True
- except:
- raise
+ metadata.create_all()
reflected_binary = Table('test_mssql_binary', MetaData(testing.db), autoload=True)
for col in reflected_binary.c:
- # don't test the MSGenericBinary since it's a special case and
- # reflected it will map to a MSImage or MSBinary depending
- if not testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__ == mssql.MSGenericBinary:
- testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__,
- testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__)
+ c1 =testing.db.dialect.type_descriptor(col.type).__class__
+ c2 =testing.db.dialect.type_descriptor(binary_table.c[col.name].type).__class__
+ assert issubclass(c1, c2), "%r is not a subclass of %r" % (c1, c2)
if binary_table.c[col.name].type.length:
testing.eq_(col.type.length, binary_table.c[col.name].type.length)
- binary_table.drop()
def test_boolean(self):
"Exercise type specification for boolean type."
@@ -838,13 +907,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
'BIT'),
]
- table_args = ['test_mssql_boolean', MetaData(testing.db)]
+ table_args = ['test_mssql_boolean', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
boolean_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(boolean_table))
for col in boolean_table.c:
index = int(col.name[1:])
@@ -852,12 +922,7 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- boolean_table.create(checkfirst=True)
- assert True
- except:
- raise
- boolean_table.drop()
+ metadata.create_all()
def test_numeric(self):
"Exercise type specification and options for numeric types."
@@ -865,40 +930,39 @@ class TypesTest2(TestBase, AssertsExecutionResults):
columns = [
# column type, args, kwargs, expected ddl
(mssql.MSNumeric, [], {},
- 'NUMERIC(10, 2)'),
+ 'NUMERIC'),
(mssql.MSNumeric, [None], {},
'NUMERIC'),
- (mssql.MSNumeric, [12], {},
- 'NUMERIC(12, 2)'),
(mssql.MSNumeric, [12, 4], {},
'NUMERIC(12, 4)'),
- (mssql.MSFloat, [], {},
- 'FLOAT(10)'),
- (mssql.MSFloat, [None], {},
+ (types.Float, [], {},
+ 'FLOAT'),
+ (types.Float, [None], {},
'FLOAT'),
- (mssql.MSFloat, [12], {},
+ (types.Float, [12], {},
'FLOAT(12)'),
(mssql.MSReal, [], {},
'REAL'),
- (mssql.MSInteger, [], {},
+ (types.Integer, [], {},
'INTEGER'),
- (mssql.MSBigInteger, [], {},
+ (types.BigInteger, [], {},
'BIGINT'),
(mssql.MSTinyInteger, [], {},
'TINYINT'),
- (mssql.MSSmallInteger, [], {},
+ (types.SmallInteger, [], {},
'SMALLINT'),
]
- table_args = ['test_mssql_numeric', MetaData(testing.db)]
+ table_args = ['test_mssql_numeric', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
numeric_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(numeric_table))
for col in numeric_table.c:
index = int(col.name[1:])
@@ -906,20 +970,11 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- numeric_table.create(checkfirst=True)
- assert True
- except:
- raise
- numeric_table.drop()
+ metadata.create_all()
def test_char(self):
"""Exercise COLLATE-ish options on string types."""
- # modify the text_as_varchar setting since we are not testing that behavior here
- text_as_varchar = testing.db.dialect.text_as_varchar
- testing.db.dialect.text_as_varchar = False
-
columns = [
(mssql.MSChar, [], {},
'CHAR'),
@@ -960,13 +1015,14 @@ class TypesTest2(TestBase, AssertsExecutionResults):
'NTEXT COLLATE Latin1_General_CI_AS'),
]
- table_args = ['test_mssql_charset', MetaData(testing.db)]
+ table_args = ['test_mssql_charset', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
charset_table = Table(*table_args)
- gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+ dialect = mssql.dialect()
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(charset_table))
for col in charset_table.c:
index = int(col.name[1:])
@@ -974,110 +1030,91 @@ class TypesTest2(TestBase, AssertsExecutionResults):
"%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
- try:
- charset_table.create(checkfirst=True)
- assert True
- except:
- raise
- charset_table.drop()
-
- testing.db.dialect.text_as_varchar = text_as_varchar
+ metadata.create_all()
def test_timestamp(self):
"""Exercise TIMESTAMP column."""
- meta = MetaData(testing.db)
-
- try:
- columns = [
- (TIMESTAMP,
- 'TIMESTAMP'),
- (mssql.MSTimeStamp,
- 'TIMESTAMP'),
- ]
- for idx, (spec, expected) in enumerate(columns):
- t = Table('mssql_ts%s' % idx, meta,
- Column('id', Integer, primary_key=True),
- Column('t', spec, nullable=None))
- testing.eq_(colspec(t.c.t), "t %s" % expected)
- self.assert_(repr(t.c.t))
- try:
- t.create(checkfirst=True)
- assert True
- except:
- raise
- t.drop()
- finally:
- meta.drop_all()
+ dialect = mssql.dialect()
+ spec, expected = (TIMESTAMP,'TIMESTAMP')
+ t = Table('mssql_ts', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('t', spec, nullable=None))
+ gen = dialect.ddl_compiler(dialect, schema.CreateTable(t))
+ testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected)
+ self.assert_(repr(t.c.t))
+ t.create(checkfirst=True)
+
def test_autoincrement(self):
- meta = MetaData(testing.db)
- try:
- Table('ai_1', meta,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True))
- Table('ai_2', meta,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True))
- Table('ai_3', meta,
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False),
- Column('int_y', Integer, primary_key=True))
- Table('ai_4', meta,
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False),
- Column('int_n2', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_5', meta,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_6', meta,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('int_y', Integer, primary_key=True))
- Table('ai_7', meta,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('o2', String(1), DefaultClause('x'),
- primary_key=True),
- Column('int_y', Integer, primary_key=True))
- Table('ai_8', meta,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('o2', String(1), DefaultClause('x'),
- primary_key=True))
- meta.create_all()
-
- table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
- 'ai_5', 'ai_6', 'ai_7', 'ai_8']
- mr = MetaData(testing.db)
- mr.reflect(only=table_names)
-
- for tbl in [mr.tables[name] for name in table_names]:
- for c in tbl.c:
- if c.name.startswith('int_y'):
- assert c.autoincrement
- elif c.name.startswith('int_n'):
- assert not c.autoincrement
- tbl.insert().execute()
+ Table('ai_1', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True))
+ Table('ai_2', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True))
+ Table('ai_3', metadata,
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_4', metadata,
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False),
+ Column('int_n2', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table('ai_5', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table('ai_6', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_7', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('o2', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_8', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('o2', String(1), DefaultClause('x'),
+ primary_key=True))
+ metadata.create_all()
+
+ table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
+ 'ai_5', 'ai_6', 'ai_7', 'ai_8']
+ mr = MetaData(testing.db)
+
+ for name in table_names:
+ tbl = Table(name, mr, autoload=True)
+ for c in tbl.c:
+ if c.name.startswith('int_y'):
+ assert c.autoincrement
+ elif c.name.startswith('int_n'):
+ assert not c.autoincrement
+
+ for counter, engine in enumerate([
+ engines.testing_engine(options={'implicit_returning':False}),
+ engines.testing_engine(options={'implicit_returning':True}),
+ ]
+ ):
+ engine.execute(tbl.insert())
if 'int_y' in tbl.c:
- assert select([tbl.c.int_y]).scalar() == 1
- assert list(tbl.select().execute().fetchone()).count(1) == 1
+ assert engine.scalar(select([tbl.c.int_y])) == counter + 1
+ assert list(engine.execute(tbl.select()).first()).count(counter + 1) == 1
else:
- assert 1 not in list(tbl.select().execute().fetchone())
- finally:
- meta.drop_all()
-
-def colspec(c):
- return testing.db.dialect.schemagenerator(testing.db.dialect,
- testing.db, None, None).get_column_specification(c)
-
+ assert 1 not in list(engine.execute(tbl.select()).first())
+ engine.execute(tbl.delete())
class BinaryTest(TestBase, AssertsExecutionResults):
"""Test the Binary and VarBinary types"""
+
+ __only_on__ = 'mssql'
+
@classmethod
def setup_class(cls):
global binary_table, MyPickleType
@@ -1125,6 +1162,11 @@ class BinaryTest(TestBase, AssertsExecutionResults):
stream2 =self.load_stream('binary_data_two.dat')
binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_image=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3)
binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_image=stream2, data_slice=stream2[0:99], pickled=testobj2)
+
+ # TODO: pyodbc does not seem to accept "None" for a VARBINARY column (data=None).
+ # error: [Microsoft][ODBC SQL Server Driver][SQL Server]Implicit conversion from
+ # data type varchar to varbinary is not allowed. Use the CONVERT function to run this query. (257)
+ #binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_image=None, data_slice=stream2[0:99], pickled=None)
binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data_image=None, data_slice=stream2[0:99], pickled=None)
for stmt in (