summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_case_statement.py18
-rw-r--r--test/sql/test_compiler.py226
-rw-r--r--test/sql/test_constraints.py44
-rw-r--r--test/sql/test_defaults.py26
-rw-r--r--test/sql/test_functions.py30
-rw-r--r--test/sql/test_generative.py110
-rw-r--r--test/sql/test_labels.py18
-rw-r--r--test/sql/test_metadata.py63
-rw-r--r--test/sql/test_query.py132
-rw-r--r--test/sql/test_quote.py4
-rw-r--r--test/sql/test_returning.py28
-rw-r--r--test/sql/test_rowcount.py4
-rw-r--r--test/sql/test_selectable.py68
-rw-r--r--test/sql/test_types.py272
-rw-r--r--test/sql/test_unicode.py4
15 files changed, 523 insertions, 524 deletions
diff --git a/test/sql/test_case_statement.py b/test/sql/test_case_statement.py
index 7bc3ab31f..97220d4dd 100644
--- a/test/sql/test_case_statement.py
+++ b/test/sql/test_case_statement.py
@@ -94,31 +94,31 @@ class CaseTest(TestBase, AssertsCompiledSQL):
def test_literal_interpretation(self):
t = table('test', column('col1'))
-
+
assert_raises(exc.ArgumentError, case, [("x", "y")])
-
+
self.assert_compile(case([("x", "y")], value=t.c.col1), "CASE test.col1 WHEN :param_1 THEN :param_2 END")
self.assert_compile(case([(t.c.col1==7, "y")], else_="z"), "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END")
-
+
def test_text_doesnt_explode(self):
for s in [
select([case([(info_table.c.info == 'pk_4_data',
text("'yes'"))], else_=text("'no'"
))]).order_by(info_table.c.info),
-
+
select([case([(info_table.c.info == 'pk_4_data',
literal_column("'yes'"))], else_=literal_column("'no'"
))]).order_by(info_table.c.info),
-
+
]:
eq_(s.execute().fetchall(), [
(u'no', ), (u'no', ), (u'no', ), (u'yes', ),
(u'no', ), (u'no', ),
])
-
-
-
+
+
+
@testing.fails_on('firebird', 'FIXME: unknown')
@testing.fails_on('maxdb', 'FIXME: unknown')
def testcase_with_dict(self):
@@ -146,7 +146,7 @@ class CaseTest(TestBase, AssertsCompiledSQL):
],
whereclause=info_table.c.pk < 4,
from_obj=[info_table])
-
+
assert simple_query.execute().fetchall() == [
('one', 1),
('two', 2),
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index b34eaeaae..d63e41e90 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -85,15 +85,15 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable")
-
+
def test_invalid_col_argument(self):
assert_raises(exc.ArgumentError, select, table1)
assert_raises(exc.ArgumentError, select, table1.c.myid)
-
+
def test_from_subquery(self):
"""tests placing select statements in the column clause of another select, for the
purposes of selecting from the exported columns of that select."""
-
+
s = select([table1], table1.c.name == 'jack')
self.assert_compile(
select(
@@ -163,7 +163,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
select([ClauseList(column('a'), column('b'))]).select_from('sometable'),
'SELECT a, b FROM sometable'
)
-
+
def test_use_labels(self):
self.assert_compile(
select([table1.c.myid==5], use_labels=True),
@@ -184,15 +184,15 @@ class SelectTest(TestBase, AssertsCompiledSQL):
select([cast("data", Integer)], use_labels=True),
"SELECT CAST(:param_1 AS INTEGER) AS anon_1"
)
-
+
self.assert_compile(
select([func.sum(func.lala(table1.c.myid).label('foo')).label('bar')]),
"SELECT sum(lala(mytable.myid)) AS bar FROM mytable"
)
-
+
def test_paramstyles(self):
stmt = text("select :foo, :bar, :bat from sometable")
-
+
self.assert_compile(
stmt,
"select ?, ?, ? from sometable"
@@ -218,10 +218,10 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"select %(foo)s, %(bar)s, %(bat)s from sometable"
, dialect=default.DefaultDialect(paramstyle='pyformat')
)
-
+
def test_dupe_columns(self):
"""test that deduping is performed against clause element identity, not rendered result."""
-
+
self.assert_compile(
select([column('a'), column('a'), column('a')]),
"SELECT a, a, a"
@@ -241,7 +241,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"SELECT a, b"
, dialect=default.DefaultDialect()
)
-
+
self.assert_compile(
select([bindparam('a'), bindparam('b'), bindparam('c')]),
"SELECT :a, :b, :c"
@@ -258,11 +258,11 @@ class SelectTest(TestBase, AssertsCompiledSQL):
select(["a", "a", "a"]),
"SELECT a, a, a"
)
-
+
s = select([bindparam('a'), bindparam('b'), bindparam('c')])
s = s.compile(dialect=default.DefaultDialect(paramstyle='qmark'))
eq_(s.positiontup, ['a', 'b', 'c'])
-
+
def test_nested_uselabels(self):
"""test nested anonymous label generation. this
essentially tests the ANONYMOUS_LABEL regex.
@@ -285,7 +285,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
'mytable.name AS name, mytable.description '
'AS description FROM mytable) AS anon_2) '
'AS anon_1')
-
+
def test_dont_overcorrelate(self):
self.assert_compile(select([table1], from_obj=[table1,
table1.select()]),
@@ -294,7 +294,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"mytable.myid AS myid, mytable.name AS "
"name, mytable.description AS description "
"FROM mytable)")
-
+
def test_full_correlate(self):
# intentional
t = table('t', column('a'), column('b'))
@@ -302,7 +302,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
s2 = select([t.c.a, s])
self.assert_compile(s2, """SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t""")
-
+
# unintentional
t2 = table('t2', column('c'), column('d'))
s = select([t.c.a]).where(t.c.a==t2.c.d).as_scalar()
@@ -313,18 +313,18 @@ class SelectTest(TestBase, AssertsCompiledSQL):
s = s.correlate(t, t2)
s2 =select([t, t2, s])
self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d")
-
+
def test_exists(self):
s = select([table1.c.myid]).where(table1.c.myid==5)
-
+
self.assert_compile(exists(s),
"EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
)
-
+
self.assert_compile(exists(s.as_scalar()),
"EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
)
-
+
self.assert_compile(exists([table1.c.myid], table1.c.myid
== 5).select(),
'SELECT EXISTS (SELECT mytable.myid FROM '
@@ -375,7 +375,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
'WHERE EXISTS (SELECT * FROM myothertable '
'AS myothertable_1 WHERE '
'myothertable_1.otherid = mytable.myid)')
-
+
self.assert_compile(
select([
or_(
@@ -388,7 +388,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"OR (EXISTS (SELECT * FROM myothertable WHERE "
"myothertable.otherid = :otherid_2)) AS anon_1"
)
-
+
def test_where_subquery(self):
s = select([addresses.c.street], addresses.c.user_id
@@ -619,7 +619,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
self.assert_compile(
label('bar', column('foo', type_=String))+ 'foo',
'foo || :param_1')
-
+
def test_conjunctions(self):
a, b, c = 'a', 'b', 'c'
@@ -630,7 +630,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
select([x.label('foo')]),
'SELECT a AND b AND c AS foo'
)
-
+
self.assert_compile(
and_(table1.c.myid == 12, table1.c.name=='asdf',
table2.c.othername == 'foo', "sysdate() = today()"),
@@ -651,7 +651,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
'today()',
checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12}
)
-
+
def test_distinct(self):
self.assert_compile(
@@ -678,7 +678,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
select([func.count(distinct(table1.c.myid))]),
"SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
)
-
+
def test_operators(self):
for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
(operator.sub, '-'),
@@ -730,7 +730,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
self.assert_(compiled == fwd_sql or compiled == rev_sql,
"\n'" + compiled + "'\n does not match\n'" +
fwd_sql + "'\n or\n'" + rev_sql + "'")
-
+
for (py_op, op) in (
(operator.neg, '-'),
(operator.inv, 'NOT '),
@@ -739,11 +739,11 @@ class SelectTest(TestBase, AssertsCompiledSQL):
(table1.c.myid, "mytable.myid"),
(literal("foo"), ":param_1"),
):
-
+
compiled = str(py_op(expr))
sql = "%s%s" % (op, sql)
eq_(compiled, sql)
-
+
self.assert_compile(
table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
@@ -837,7 +837,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
postgresql.PGDialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
-
+
def test_match(self):
for expr, check, dialect in [
(table1.c.myid.match('somstr'),
@@ -853,7 +853,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
postgresql.dialect()),
(table1.c.myid.match('somstr'),
"CONTAINS (mytable.myid, :myid_1)",
- oracle.dialect()),
+ oracle.dialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
@@ -1160,7 +1160,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"SELECT column1 AS foobar, column2 AS hoho, myid FROM "
"(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
)
-
+
self.assert_compile(
select(['col1','col2'], from_obj='tablename').alias('myalias'),
"SELECT col1, col2 FROM tablename"
@@ -1189,7 +1189,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
checkparams={'bar':4, 'whee': 7},
dialect=dialect
)
-
+
# test escaping out text() params with a backslash
self.assert_compile(
text("select * from foo where clock='05:06:07' and mork='\:mindy'"),
@@ -1243,23 +1243,23 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
checkparams={'y': None, 'x': None, 'z': None}
)
-
+
self.assert_compile(
s.params(x=5, y=6, z=7),
"SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
checkparams={'y': 6, 'x': 5, 'z': 7}
)
-
+
@testing.emits_warning('.*empty sequence.*')
def test_render_binds_as_literal(self):
"""test a compiler that renders binds inline into
SQL in the columns clause."""
-
+
dialect = default.DefaultDialect()
class Compiler(dialect.statement_compiler):
ansi_bind_rules = True
dialect.statement_compiler = Compiler
-
+
self.assert_compile(
select([literal("someliteral")]),
"SELECT 'someliteral'",
@@ -1283,23 +1283,23 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
dialect=dialect
)
-
+
self.assert_compile(
select([literal("foo").in_([])]),
"SELECT 'foo' != 'foo' AS anon_1",
dialect=dialect
)
-
+
assert_raises(
exc.CompileError,
bindparam("foo").in_([]).compile, dialect=dialect
)
-
-
+
+
def test_literal(self):
-
+
self.assert_compile(select([literal('foo')]), "SELECT :param_1")
-
+
self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
"SELECT :param_1 || :param_2 AS anon_1 FROM mytable")
@@ -1334,7 +1334,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
expr, "SELECT mytable.name COLLATE latin1_german2_ci AS anon_1 FROM mytable")
assert table1.c.name.collate('latin1_german2_ci').type is table1.c.name.type
-
+
expr = select([table1.c.name.collate('latin1_german2_ci').label('k1')]).order_by('k1')
self.assert_compile(expr,"SELECT mytable.name COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1")
@@ -1384,8 +1384,8 @@ class SelectTest(TestBase, AssertsCompiledSQL):
'''"table%name"."spaces % more spaces" AS "table%name_spaces % '''\
'''more spaces" FROM "table%name"'''
)
-
-
+
+
def test_joins(self):
self.assert_compile(
join(table2, table1, table1.c.myid == table2.c.otherid).select(),
@@ -1473,7 +1473,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"select #1 has 2 columns, select #2 has 3",
union, table3.select(), table1.select()
)
-
+
x = union(
select([table1], table1.c.myid == 5),
select([table1], table1.c.myid == 12),
@@ -1494,7 +1494,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"FROM mytable UNION SELECT mytable.myid, mytable.name, "
"mytable.description FROM mytable) UNION SELECT mytable.myid,"
" mytable.name, mytable.description FROM mytable")
-
+
u1 = union(
select([table1.c.myid, table1.c.name]),
select([table2]),
@@ -1507,7 +1507,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"FROM thirdtable")
assert u1.corresponding_column(table2.c.otherid) is u1.c.myid
-
+
self.assert_compile(
union(
select([table1.c.myid, table1.c.name]),
@@ -1557,7 +1557,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"SELECT thirdtable.userid FROM thirdtable)"
)
-
+
s = select([column('foo'), column('bar')])
# ORDER BY's even though not supported by all DB's, are rendered if requested
@@ -1569,9 +1569,9 @@ class SelectTest(TestBase, AssertsCompiledSQL):
union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()),
"(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, bar ORDER BY bar LIMIT :param_1)",
{'param_1':10}
-
+
)
-
+
def test_compound_grouping(self):
s = select([column('foo'), column('bar')]).select_from('bat')
@@ -1580,19 +1580,19 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"((SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) "
"UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat"
)
-
+
self.assert_compile(
union(s, s, s, s),
"SELECT foo, bar FROM bat UNION SELECT foo, bar "
"FROM bat UNION SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat"
)
-
+
self.assert_compile(
union(s, union(s, union(s, s))),
"SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat "
"UNION (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat))"
)
-
+
self.assert_compile(
select([s.alias()]),
'SELECT anon_1.foo, anon_1.bar FROM (SELECT foo, bar FROM bat) AS anon_1'
@@ -1654,8 +1654,8 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"UNION SELECT foo, bar FROM bat) "
"UNION (SELECT foo, bar FROM bat "
"UNION SELECT foo, bar FROM bat)")
-
-
+
+
self.assert_compile(
union(
intersect(s, s),
@@ -1817,9 +1817,9 @@ class SelectTest(TestBase, AssertsCompiledSQL):
def test_binds_no_hash_collision(self):
"""test that construct_params doesn't corrupt dict due to hash collisions"""
-
+
total_params = 100000
-
+
in_clause = [':in%d' % i for i in range(total_params)]
params = dict(('in%d' % i, i) for i in range(total_params))
sql = 'text clause %s' % ', '.join(in_clause)
@@ -1829,14 +1829,14 @@ class SelectTest(TestBase, AssertsCompiledSQL):
pp = c.construct_params(params)
eq_(len(set(pp)), total_params, '%s %s' % (len(set(pp)), len(pp)))
eq_(len(set(pp.values())), total_params)
-
+
def test_bind_as_col(self):
t = table('foo', column('id'))
s = select([t, literal('lala').label('hoho')])
self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
-
+
assert [str(c) for c in s.c] == ["id", "hoho"]
def test_bind_callable(self):
@@ -1846,8 +1846,8 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"x = :key",
{'x':12}
)
-
-
+
+
@testing.emits_warning('.*empty sequence.*')
def test_in(self):
self.assert_compile(table1.c.myid.in_(['a']),
@@ -1969,7 +1969,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
),
"(mytable.myid, mytable.name) IN ((myothertable.otherid, myothertable.othername))"
)
-
+
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
select([table2.c.otherid, table2.c.othername])
@@ -1977,8 +1977,8 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"(mytable.myid, mytable.name) IN (SELECT "
"myothertable.otherid, myothertable.othername FROM myothertable)"
)
-
-
+
+
def test_cast(self):
tbl = table('casttest',
column('id', Integer),
@@ -2039,7 +2039,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
self.assert_compile(cast(literal_column('NULL'), Integer),
'CAST(NULL AS INTEGER)',
dialect=sqlite.dialect())
-
+
def test_date_between(self):
import datetime
table = Table('dt', metadata,
@@ -2085,15 +2085,15 @@ class SelectTest(TestBase, AssertsCompiledSQL):
"SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)),
"SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
-
+
def test_associativity(self):
f = column('f')
self.assert_compile( f - f, "f - f" )
self.assert_compile( f - f - f, "(f - f) - f" )
-
+
self.assert_compile( (f - f) - f, "(f - f) - f" )
self.assert_compile( (f - f).label('foo') - f, "(f - f) - f" )
-
+
self.assert_compile( f - (f - f), "f - (f - f)" )
self.assert_compile( f - (f - f).label('foo'), "f - (f - f)" )
@@ -2104,54 +2104,54 @@ class SelectTest(TestBase, AssertsCompiledSQL):
self.assert_compile( f / f - f, "f / f - f" )
self.assert_compile( (f / f) - f, "f / f - f" )
self.assert_compile( (f / f).label('foo') - f, "f / f - f" )
-
+
# because / more precedent than -
self.assert_compile( f - (f / f), "f - f / f" )
self.assert_compile( f - (f / f).label('foo'), "f - f / f" )
self.assert_compile( f - f / f, "f - f / f" )
self.assert_compile( (f - f) / f, "(f - f) / f" )
-
+
self.assert_compile( ((f - f) / f) - f, "(f - f) / f - f")
self.assert_compile( (f - f) / (f - f), "(f - f) / (f - f)")
-
+
# higher precedence
self.assert_compile( (f / f) - (f / f), "f / f - f / f")
self.assert_compile( (f / f) - (f - f), "f / f - (f - f)")
self.assert_compile( (f / f) / (f - f), "(f / f) / (f - f)")
self.assert_compile( f / (f / (f - f)), "f / (f / (f - f))")
-
-
+
+
def test_delayed_col_naming(self):
my_str = Column(String)
-
+
sel1 = select([my_str])
-
+
assert_raises_message(
exc.InvalidRequestError,
"Cannot initialize a sub-selectable with this Column",
lambda: sel1.c
)
-
+
# calling label or as_scalar doesn't compile
- # anything.
+ # anything.
sel2 = select([func.substr(my_str, 2, 3)]).label('my_substr')
-
+
assert_raises_message(
exc.CompileError,
"Cannot compile Column object until it's 'name' is assigned.",
str, sel2
)
-
+
sel3 = select([my_str]).as_scalar()
assert_raises_message(
exc.CompileError,
"Cannot compile Column object until it's 'name' is assigned.",
str, sel3
)
-
+
my_str.name = 'foo'
-
+
self.assert_compile(
sel1,
"SELECT foo",
@@ -2160,18 +2160,18 @@ class SelectTest(TestBase, AssertsCompiledSQL):
sel2,
'(SELECT substr(foo, :substr_2, :substr_3) AS substr_1)',
)
-
+
self.assert_compile(
sel3,
"(SELECT foo)"
)
-
+
def test_naming(self):
f1 = func.hoho(table1.c.name)
s1 = select([table1.c.myid, table1.c.myid.label('foobar'),
f1,
func.lala(table1.c.name).label('gg')])
-
+
eq_(
s1.c.keys(),
['myid', 'foobar', str(f1), 'gg']
@@ -2179,7 +2179,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
meta = MetaData()
t1 = Table('mytable', meta, Column('col1', Integer))
-
+
exprs = (
table1.c.myid==12,
func.hoho(table1.c.myid),
@@ -2197,15 +2197,15 @@ class SelectTest(TestBase, AssertsCompiledSQL):
t = col.table
else:
t = table1
-
+
s1 = select([col], from_obj=t)
assert s1.c.keys() == [key], s1.c.keys()
-
+
if label:
self.assert_compile(s1, "SELECT %s AS %s FROM mytable" % (expr, label))
else:
self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,))
-
+
s1 = select([s1])
if label:
self.assert_compile(s1,
@@ -2220,7 +2220,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
self.assert_compile(s1,
"SELECT %s FROM (SELECT %s FROM mytable)" %
(expr,expr))
-
+
def test_hints(self):
s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s")
@@ -2230,12 +2230,12 @@ class SelectTest(TestBase, AssertsCompiledSQL):
a1 = table1.alias()
s3 = select([a1.c.myid]).with_hint(a1, "index(%(name)s hint)")
-
+
subs4 = select([
table1, table2
]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).\
with_hint(table1, 'hint1')
-
+
s4 = select([table3]).select_from(
table3.join(
subs4,
@@ -2243,7 +2243,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
)
).\
with_hint(table3, 'hint3')
-
+
subs5 = select([
table1, table2
]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid))
@@ -2255,12 +2255,12 @@ class SelectTest(TestBase, AssertsCompiledSQL):
).\
with_hint(table3, 'hint3').\
with_hint(table1, 'hint1')
-
+
t1 = table('QuotedName', column('col1'))
s6 = select([t1.c.col1]).where(t1.c.col1>10).with_hint(t1, '%(name)s idx1')
a2 = t1.alias('SomeName')
s7 = select([a2.c.col1]).where(a2.c.col1>10).with_hint(a2, '%(name)s idx1')
-
+
mysql_d, oracle_d, sybase_d = \
mysql.dialect(), \
oracle.dialect(), \
@@ -2336,13 +2336,13 @@ class SelectTest(TestBase, AssertsCompiledSQL):
and_("a", "b"),
"a AND b"
)
-
+
def test_literal_as_text_nonstring_raise(self):
assert_raises(exc.ArgumentError,
and_, ("a",), ("b",)
)
-
-
+
+
class CRUDTest(TestBase, AssertsCompiledSQL):
def test_insert(self):
# generic insert, will create bind params for all columns
@@ -2514,7 +2514,7 @@ class CRUDTest(TestBase, AssertsCompiledSQL):
where(table1.c.name=='somename'),
"DELETE FROM mytable WHERE mytable.myid = :myid_1 "
"AND mytable.name = :name_1")
-
+
def test_correlated_delete(self):
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
@@ -2529,26 +2529,26 @@ class CRUDTest(TestBase, AssertsCompiledSQL):
"DELETE FROM mytable WHERE mytable.name = (SELECT "
"myothertable.othername FROM myothertable WHERE "
"myothertable.otherid = mytable.myid)")
-
+
def test_binds_that_match_columns(self):
"""test bind params named after column names
replace the normal SET/VALUES generation."""
-
+
t = table('foo', column('x'), column('y'))
u = t.update().where(t.c.x==bindparam('x'))
-
+
assert_raises(exc.CompileError, u.compile)
-
+
self.assert_compile(u, "UPDATE foo SET WHERE foo.x = :x", params={})
assert_raises(exc.CompileError, u.values(x=7).compile)
-
+
self.assert_compile(u.values(y=7), "UPDATE foo SET y=:y WHERE foo.x = :x")
-
+
assert_raises(exc.CompileError, u.values(x=7).compile, column_keys=['x', 'y'])
assert_raises(exc.CompileError, u.compile, column_keys=['x', 'y'])
-
+
self.assert_compile(u.values(x=3 + bindparam('x')),
"UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
@@ -2574,7 +2574,7 @@ class CRUDTest(TestBase, AssertsCompiledSQL):
i = t.insert().values(x=3 + bindparam('y'), y=5)
assert_raises(exc.CompileError, i.compile)
-
+
i = t.insert().values(x=3 + bindparam('x2'))
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))")
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={})
@@ -2582,11 +2582,11 @@ class CRUDTest(TestBase, AssertsCompiledSQL):
params={'x':1, 'y':2})
self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
params={'x2':1, 'y':2})
-
+
def test_labels_no_collision(self):
-
+
t = table('foo', column('id'), column('foo_id'))
-
+
self.assert_compile(
t.update().where(t.c.id==5),
"UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :id_1"
@@ -2596,7 +2596,7 @@ class CRUDTest(TestBase, AssertsCompiledSQL):
t.update().where(t.c.id==bindparam(key=t.c.id._label)),
"UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :foo_id_1"
)
-
+
class InlineDefaultTest(TestBase, AssertsCompiledSQL):
def test_insert(self):
m = MetaData()
@@ -2634,7 +2634,7 @@ class SchemaTest(TestBase, AssertsCompiledSQL):
self.assert_compile(table4.select(),
"SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
" remote_owner.remotetable.value FROM remote_owner.remotetable")
-
+
self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
"SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
" remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "
@@ -2664,7 +2664,7 @@ class SchemaTest(TestBase, AssertsCompiledSQL):
' "dbo.remote_owner".remotetable.value AS dbo_remote_owner_remotetable_value FROM'
' "dbo.remote_owner".remotetable'
)
-
+
def test_alias(self):
a = alias(table4, 'remtable')
self.assert_compile(a.select(a.c.datatype_id==7),
diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py
index 56c5c6205..fb07bf437 100644
--- a/test/sql/test_constraints.py
+++ b/test/sql/test_constraints.py
@@ -36,10 +36,10 @@ class ConstraintTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
def test_double_fk_usage_raises(self):
f = ForeignKey('b.id')
-
+
Column('x', Integer, f)
assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
-
+
def test_circular_constraint(self):
a = Table("a", metadata,
Column('id', Integer, primary_key=True),
@@ -192,22 +192,22 @@ class ConstraintTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
('sometable', 'this_name_is_too_long', 'ix_sometable_t_09aa'),
('sometable', 'this_name_alsois_long', 'ix_sometable_t_3cf1'),
]:
-
+
t1 = Table(tname, MetaData(),
Column(cname, Integer, index=True),
)
ix1 = list(t1.indexes)[0]
-
+
self.assert_compile(
schema.CreateIndex(ix1),
"CREATE INDEX %s "
"ON %s (%s)" % (exp, tname, cname),
dialect=dialect
)
-
+
dialect.max_identifier_length = 22
dialect.max_index_name_length = None
-
+
t1 = Table('t', MetaData(), Column('c', Integer))
assert_raises(
exc.IdentifierError,
@@ -217,7 +217,7 @@ class ConstraintTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
dialect=dialect
)
-
+
class ConstraintCompilationTest(TestBase, AssertsCompiledSQL):
def _test_deferrable(self, constraint_factory):
@@ -225,11 +225,11 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL):
Column('a', Integer),
Column('b', Integer),
constraint_factory(deferrable=True))
-
+
sql = str(schema.CreateTable(t).compile(bind=testing.db))
assert 'DEFERRABLE' in sql, sql
assert 'NOT DEFERRABLE' not in sql, sql
-
+
t = Table('tbl', MetaData(),
Column('a', Integer),
Column('b', Integer),
@@ -291,18 +291,18 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL):
CheckConstraint('a < b',
deferrable=True,
initially='DEFERRED')))
-
+
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) DEFERRABLE INITIALLY DEFERRED)"
)
-
+
def test_use_alter(self):
m = MetaData()
t = Table('t', m,
Column('a', Integer),
)
-
+
t2 = Table('t2', m,
Column('a', Integer, ForeignKey('t.a', use_alter=True, name='fk_ta')),
Column('b', Integer, ForeignKey('t.a', name='fk_tb')), # to ensure create ordering ...
@@ -320,25 +320,25 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL):
'DROP TABLE t2',
'DROP TABLE t'
])
-
-
+
+
def test_add_drop_constraint(self):
m = MetaData()
-
+
t = Table('tbl', m,
Column('a', Integer),
Column('b', Integer)
)
-
+
t2 = Table('t2', m,
Column('a', Integer),
Column('b', Integer)
)
-
+
constraint = CheckConstraint('a < b',name="my_test_constraint",
deferrable=True,initially='DEFERRED', table=t)
-
+
# before we create an AddConstraint,
# the CONSTRAINT comes out inline
self.assert_compile(
@@ -397,13 +397,13 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL):
schema.AddConstraint(constraint),
"ALTER TABLE t2 ADD CONSTRAINT uq_cst UNIQUE (a, b)"
)
-
+
constraint = UniqueConstraint(t2.c.a, t2.c.b, name="uq_cs2")
self.assert_compile(
schema.AddConstraint(constraint),
"ALTER TABLE t2 ADD CONSTRAINT uq_cs2 UNIQUE (a, b)"
)
-
+
assert t.c.a.primary_key is False
constraint = PrimaryKeyConstraint(t.c.a)
assert t.c.a.primary_key is True
@@ -411,5 +411,5 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL):
schema.AddConstraint(constraint),
"ALTER TABLE tbl ADD PRIMARY KEY (a)"
)
-
-
+
+
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index 7ec43f8d2..31759a709 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -147,7 +147,7 @@ class DefaultTest(testing.TestBase):
assert_raises_message(sa.exc.ArgumentError,
ex_msg,
sa.ColumnDefault, fn)
-
+
def test_arg_signature(self):
def fn1(): pass
def fn2(): pass
@@ -277,7 +277,7 @@ class DefaultTest(testing.TestBase):
assert r.lastrow_has_defaults()
eq_(set(r.context.postfetch_cols),
set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
-
+
eq_(t.select(t.c.col1==54).execute().fetchall(),
[(54, 'imthedefault', f, ts, ts, ctexec, True, False,
12, today, None)])
@@ -301,7 +301,7 @@ class DefaultTest(testing.TestBase):
12, today, 'py'),
(53, 'imthedefault', f, ts, ts, ctexec, True, False,
12, today, 'py')])
-
+
def test_missing_many_param(self):
assert_raises_message(exc.InvalidRequestError,
"A value is required for bind parameter 'col7', in parameter group 1",
@@ -310,7 +310,7 @@ class DefaultTest(testing.TestBase):
{'col4':7, 'col8':19},
{'col4':7, 'col7':12, 'col8':19},
)
-
+
def test_insert_values(self):
t.insert(values={'col3':50}).execute()
l = t.select().execute()
@@ -366,7 +366,7 @@ class DefaultTest(testing.TestBase):
l = l.first()
eq_(55, l['col3'])
-
+
class PKDefaultTest(_base.TablesTest):
__requires__ = ('subqueries',)
@@ -379,14 +379,14 @@ class PKDefaultTest(_base.TablesTest):
Column('id', Integer, primary_key=True,
default=sa.select([func.max(t2.c.nextid)]).as_scalar()),
Column('data', String(30)))
-
+
@testing.requires.returning
def test_with_implicit_returning(self):
self._test(True)
-
+
def test_regular(self):
self._test(False)
-
+
@testing.resolve_artifact_names
def _test(self, returning):
if not returning and not testing.db.dialect.implicit_returning:
@@ -442,7 +442,7 @@ class PKIncrementTest(_base.TablesTest):
ids.add(last)
eq_(ids, set([1,2,3,4]))
-
+
eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
[(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
@@ -477,7 +477,7 @@ class EmptyInsertTest(testing.TestBase):
t1 = Table('t1', metadata,
Column('is_true', Boolean, server_default=('1')))
metadata.create_all()
-
+
try:
result = t1.insert().execute()
eq_(1, select([func.count(text('*'))], from_obj=t1).scalar())
@@ -588,7 +588,7 @@ class SequenceTest(testing.TestBase, testing.AssertsCompiledSQL):
"DROP SEQUENCE foo_seq",
use_default_dialect=True,
)
-
+
@testing.fails_on('firebird', 'no FB support for start/increment')
@testing.requires.sequences
@@ -605,10 +605,10 @@ class SequenceTest(testing.TestBase, testing.AssertsCompiledSQL):
start = seq.start or 1
inc = seq.increment or 1
assert values == list(xrange(start, start + inc * 3, inc))
-
+
finally:
seq.drop(testing.db)
-
+
@testing.requires.sequences
def test_seq_nonpk(self):
"""test sequences fire off as defaults on non-pk columns"""
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py
index 0fb2ca5f7..13116a4bb 100644
--- a/test/sql/test_functions.py
+++ b/test/sql/test_functions.py
@@ -26,8 +26,8 @@ class CompileTest(TestBase, AssertsCompiledSQL):
self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect)
else:
self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect)
-
- # test generic function compile
+
+ # test generic function compile
class fake_func(GenericFunction):
__return_type__ = sqltypes.Integer
@@ -39,14 +39,14 @@ class CompileTest(TestBase, AssertsCompiledSQL):
"fake_func(%s)" %
bindtemplate % {'name':'param_1', 'position':1},
dialect=dialect)
-
+
def test_use_labels(self):
self.assert_compile(select([func.foo()], use_labels=True),
"SELECT foo() AS foo_1"
)
def test_underscores(self):
self.assert_compile(func.if_(), "if()")
-
+
def test_generic_now(self):
assert isinstance(func.now().type, sqltypes.DateTime)
@@ -69,10 +69,10 @@ class CompileTest(TestBase, AssertsCompiledSQL):
('random', oracle.dialect())
]:
self.assert_compile(func.random(), ret, dialect=dialect)
-
+
def test_namespacing_conflicts(self):
self.assert_compile(func.text('foo'), 'text(:text_1)')
-
+
def test_generic_count(self):
assert isinstance(func.count().type, sqltypes.Integer)
@@ -101,7 +101,7 @@ class CompileTest(TestBase, AssertsCompiledSQL):
assert True
def test_return_type_detection(self):
-
+
for fn in [func.coalesce, func.max, func.min, func.sum]:
for args, type_ in [
((datetime.date(2007, 10, 5),
@@ -113,7 +113,7 @@ class CompileTest(TestBase, AssertsCompiledSQL):
datetime.datetime(2005, 10, 15, 14, 45, 33)), sqltypes.DateTime)
]:
assert isinstance(fn(*args).type, type_), "%s / %s" % (fn(), type_)
-
+
assert isinstance(func.concat("foo", "bar").type, sqltypes.String)
@@ -193,7 +193,7 @@ class ExecuteTest(TestBase):
@engines.close_first
def tearDown(self):
pass
-
+
def test_standalone_execute(self):
x = testing.db.func.current_date().execute().scalar()
y = testing.db.func.current_date().select().execute().scalar()
@@ -208,10 +208,10 @@ class ExecuteTest(TestBase):
def test_conn_execute(self):
from sqlalchemy.sql.expression import FunctionElement
from sqlalchemy.ext.compiler import compiles
-
+
class myfunc(FunctionElement):
type = Date()
-
+
@compiles(myfunc)
def compile(elem, compiler, **kw):
return compiler.process(func.current_date())
@@ -229,17 +229,17 @@ class ExecuteTest(TestBase):
def test_exec_options(self):
f = func.foo()
eq_(f._execution_options, {})
-
+
f = f.execution_options(foo='bar')
eq_(f._execution_options, {'foo':'bar'})
s = f.select()
eq_(s._execution_options, {'foo':'bar'})
-
+
ret = testing.db.execute(func.now().execution_options(foo='bar'))
eq_(ret.context.execution_options, {'foo':'bar'})
ret.close()
-
-
+
+
@engines.close_first
def test_update(self):
"""
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py
index e129f69c4..627736370 100644
--- a/test/sql/test_generative.py
+++ b/test/sql/test_generative.py
@@ -27,7 +27,7 @@ class TraversalTest(TestBase, AssertsExecutionResults):
return other is self
__hash__ = ClauseElement.__hash__
-
+
def __eq__(self, other):
return other.expr == self.expr
@@ -100,7 +100,7 @@ class TraversalTest(TestBase, AssertsExecutionResults):
s2 = vis.traverse(struct)
assert struct == s2
assert not struct.is_other(s2)
-
+
def test_no_clone(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
@@ -151,9 +151,9 @@ class TraversalTest(TestBase, AssertsExecutionResults):
class CustomObj(Column):
pass
-
+
assert CustomObj.__visit_name__ == Column.__visit_name__ == 'column'
-
+
foo, bar = CustomObj('foo', String), CustomObj('bar', String)
bin = foo == bar
s = set(ClauseVisitor().iterate(bin))
@@ -193,7 +193,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
f = sql_util.ClauseAdapter(a).traverse(f)
self.assert_compile(select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1")
-
+
def test_join(self):
clause = t1.join(t2, t1.c.col2==t2.c.col2)
c1 = str(clause)
@@ -206,20 +206,20 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
clause2 = Vis().traverse(clause)
assert c1 == str(clause)
assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3))
-
+
def test_aliased_column_adapt(self):
clause = t1.select()
-
+
aliased = t1.select().alias()
aliased2 = t1.alias()
adapter = sql_util.ColumnAdapter(aliased)
-
+
f = select([
adapter.columns[c]
for c in aliased2.c
]).select_from(aliased)
-
+
s = select([aliased2]).select_from(aliased)
eq_(str(s), str(f))
@@ -230,8 +230,8 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
str(select([func.count(aliased2.c.col1)]).select_from(aliased)),
str(f)
)
-
-
+
+
def test_text(self):
clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')])
c1 = str(clause)
@@ -288,7 +288,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
print str(s5)
assert str(s5) == s5_assert
assert str(s4) == s4_assert
-
+
def test_union(self):
u = union(t1.select(), t2.select())
u2 = CloningVisitor().traverse(u)
@@ -300,27 +300,27 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
u2 = CloningVisitor().traverse(u)
assert str(u) == str(u2)
assert [str(c) for c in u2.c] == cols
-
+
s1 = select([t1], t1.c.col1 == bindparam('id_param'))
s2 = select([t2])
u = union(s1, s2)
-
+
u2 = u.params(id_param=7)
u3 = u.params(id_param=10)
assert str(u) == str(u2) == str(u3)
assert u2.compile().params == {'id_param':7}
assert u3.compile().params == {'id_param':10}
-
+
def test_in(self):
expr = t1.c.col1.in_(['foo', 'bar'])
expr2 = CloningVisitor().traverse(expr)
assert str(expr) == str(expr2)
-
+
def test_adapt_union(self):
u = union(t1.select().where(t1.c.col1==4), t1.select().where(t1.c.col1==5)).alias()
-
+
assert sql_util.ClauseAdapter(u).traverse(t1) is u
-
+
def test_binds(self):
"""test that unique bindparams change their name upon clone() to prevent conflicts"""
@@ -340,17 +340,17 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
"table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1, "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 "\
"WHERE anon_1.col2 = anon_2.col2")
-
+
def test_extract(self):
s = select([extract('foo', t1.c.col1).label('col1')])
self.assert_compile(s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1")
-
+
s2 = CloningVisitor().traverse(s).alias()
s3 = select([s2.c.col1])
self.assert_compile(s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1")
self.assert_compile(s3, "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1) AS anon_1")
-
-
+
+
@testing.emits_warning('.*replaced by another column with the same key')
def test_alias(self):
subq = t2.select().alias('subq')
@@ -372,7 +372,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)])
s5 = CloningVisitor().traverse(s)
assert orig == str(s) == str(s5)
-
+
def test_correlated_select(self):
s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2)
class Vis(CloningVisitor):
@@ -380,32 +380,32 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
select.append_whereclause(t1.c.col2==7)
self.assert_compile(Vis().traverse(s), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :col2_1")
-
+
def test_this_thing(self):
s = select([t1]).where(t1.c.col1=='foo').alias()
s2 = select([s.c.col1])
-
+
self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1")
t1a = t1.alias()
s2 = sql_util.ClauseAdapter(t1a).traverse(s2)
self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1_1.col1 AS col1, table1_1.col2 AS col2, table1_1.col3 AS col3 FROM table1 AS table1_1 WHERE table1_1.col1 = :col1_1) AS anon_1")
-
+
def test_select_fromtwice(self):
t1a = t1.alias()
-
+
s = select([1], t1.c.col1==t1a.c.col1, from_obj=t1a).correlate(t1)
self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1")
-
+
s = CloningVisitor().traverse(s)
self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1")
-
+
s = select([t1]).where(t1.c.col1=='foo').alias()
-
+
s2 = select([1], t1.c.col1==s.c.col1, from_obj=s).correlate(t1)
self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1")
s2 = ReplacingCloningVisitor().traverse(s2)
self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1")
-
+
class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
@classmethod
def setup_class(cls):
@@ -446,7 +446,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
s = CloningVisitor().traverse(s)
self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
-
+
s = select(['*']).where(t1.c.col1==t2.c.col1).as_scalar()
self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1")
vis = sql_util.ClauseAdapter(t1alias)
@@ -478,7 +478,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
j1 = addresses.join(ualias, addresses.c.user_id==ualias.c.id)
self.assert_compile(sql_util.ClauseAdapter(j1).traverse(s), "SELECT count(addresses.id) AS count_1 FROM addresses WHERE users_1.id = addresses.user_id")
-
+
def test_table_to_alias(self):
t1alias = t1.alias('t1alias')
@@ -543,7 +543,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
a = Table('a', m, Column('x', Integer), Column('y', Integer))
b = Table('b', m, Column('x', Integer), Column('y', Integer))
c = Table('c', m, Column('x', Integer), Column('y', Integer))
-
+
# force a recursion overflow, by linking a.c.x<->c.c.x, and
# asking for a nonexistent col. corresponding_column should prevent
# endless depth.
@@ -557,13 +557,13 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
c = Table('c', m, Column('x', Integer), Column('y', Integer))
alias = select([a]).select_from(a.join(b, a.c.x==b.c.x)).alias()
-
+
# two levels of indirection from c.x->b.x->a.x, requires recursive
# corresponding_column call
adapt = sql_util.ClauseAdapter(alias, equivalents= {b.c.x: set([ a.c.x]), c.c.x:set([b.c.x])})
assert adapt._corresponding_column(a.c.x, False) is alias.c.x
assert adapt._corresponding_column(c.c.x, False) is alias.c.x
-
+
def test_join_to_alias(self):
metadata = MetaData()
a = Table('a', metadata,
@@ -642,13 +642,13 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT :param_1 OFFSET :param_2) AS anon_1 "\
"LEFT OUTER JOIN table1 AS bar ON anon_1.col1 = bar.col1",
{'param_1':5, 'param_2':10})
-
+
def test_functions(self):
self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(func.count(t1.c.col1)), "count(table1_1.col1)")
s = select([func.count(t1.c.col1)])
self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(s), "SELECT count(table1_1.col1) AS count_1 FROM table1 AS table1_1")
-
+
def test_recursive(self):
metadata = MetaData()
a = Table('a', metadata,
@@ -670,8 +670,8 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
u = union(
a.join(b).select().apply_labels(),
a.join(d).select().apply_labels()
- ).alias()
-
+ ).alias()
+
self.assert_compile(
sql_util.ClauseAdapter(u).traverse(select([c.c.bid]).where(c.c.bid==u.c.b_aid)),
"SELECT c.bid "\
@@ -687,16 +687,16 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL):
global table1, table2, table3, table4
def _table(name):
return table(name, column("col1"), column("col2"),column("col3"))
-
- table1, table2, table3, table4 = [_table(name) for name in ("table1", "table2", "table3", "table4")]
+
+ table1, table2, table3, table4 = [_table(name) for name in ("table1", "table2", "table3", "table4")]
def test_splice(self):
(t1, t2, t3, t4) = (table1, table2, table1.alias(), table2.alias())
-
+
j = t1.join(t2, t1.c.col1==t2.c.col1).join(t3, t2.c.col1==t3.c.col1).join(t4, t4.c.col1==t1.c.col1)
-
+
s = select([t1]).where(t1.c.col2<5).alias()
-
+
self.assert_compile(sql_util.splice_joins(s, j),
"(SELECT table1.col1 AS col1, table1.col2 AS col2, "\
"table1.col3 AS col3 FROM table1 WHERE table1.col2 < :col2_1) AS anon_1 "\
@@ -705,12 +705,12 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL):
def test_stop_on(self):
(t1, t2, t3) = (table1, table2, table3)
-
+
j1= t1.join(t2, t1.c.col1==t2.c.col1)
j2 = j1.join(t3, t2.c.col1==t3.c.col1)
-
+
s = select([t1]).select_from(j1).alias()
-
+
self.assert_compile(sql_util.splice_joins(s, j2),
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 JOIN table2 "\
"ON table1.col1 = table2.col1) AS anon_1 JOIN table2 ON anon_1.col1 = table2.col1 JOIN table3 "\
@@ -720,27 +720,27 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL):
self.assert_compile(sql_util.splice_joins(s, j2, j1),
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 "\
"JOIN table2 ON table1.col1 = table2.col1) AS anon_1 JOIN table3 ON table2.col1 = table3.col1")
-
+
def test_splice_2(self):
t2a = table2.alias()
t3a = table3.alias()
j1 = table1.join(t2a, table1.c.col1==t2a.c.col1).join(t3a, t2a.c.col2==t3a.c.col2)
-
+
t2b = table4.alias()
j2 = table1.join(t2b, table1.c.col3==t2b.c.col3)
-
+
self.assert_compile(sql_util.splice_joins(table1, j1),
"table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\
"JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2")
-
+
self.assert_compile(sql_util.splice_joins(table1, j2), "table1 JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3")
self.assert_compile(sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2),
"table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\
"JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2 "\
"JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3")
-
-
+
+
class SelectTest(TestBase, AssertsCompiledSQL):
"""tests the generative capability of Select"""
@@ -838,7 +838,7 @@ class SelectTest(TestBase, AssertsCompiledSQL):
assert s._execution_options == dict(foo='bar')
# s2 should have its execution_options based on s, though.
assert s2._execution_options == dict(foo='bar', bar='baz')
-
+
# this feature not available yet
def _NOTYET_test_execution_options_in_text(self):
s = text('select 42', execution_options=dict(foo='bar'))
diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py
index 0f84c30a0..9160aa3ee 100644
--- a/test/sql/test_labels.py
+++ b/test/sql/test_labels.py
@@ -51,7 +51,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL):
assert_raises(exceptions.IdentifierError, m.drop_all)
assert_raises(exceptions.IdentifierError, t1.create)
assert_raises(exceptions.IdentifierError, t1.drop)
-
+
def test_result(self):
table1.insert().execute(**{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"})
table1.insert().execute(**{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"})
@@ -82,7 +82,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL):
(1, "data1"),
(2, "data2"),
], repr(result)
-
+
@testing.requires.offset
def go():
r = s.limit(2).offset(1).execute()
@@ -94,7 +94,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL):
(3, "data3"),
], repr(result)
go()
-
+
def test_table_alias_names(self):
if testing.against('oracle'):
self.assert_compile(
@@ -113,7 +113,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL):
self.assert_compile(
select([table1, ta]).select_from(table1.join(ta, table1.c.this_is_the_data_column==ta.c.this_is_the_data_column)).\
where(ta.c.this_is_the_data_column=='data3'),
-
+
"SELECT some_large_named_table.this_is_the_primarykey_column, some_large_named_table.this_is_the_data_column, "
"table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM "
"some_large_named_table JOIN table_with_exactly_29_characs AS table_with_exactly_29_c_1 ON "
@@ -121,17 +121,17 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL):
"WHERE table_with_exactly_29_c_1.this_is_the_data_column = :this_is_the_data_column_1",
dialect=dialect
)
-
+
table2.insert().execute(
{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"},
{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"},
{"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"},
{"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"},
)
-
+
r = table2.alias().select().execute()
assert r.fetchall() == [(x, "data%d" % x) for x in range(1, 5)]
-
+
def test_colbinds(self):
table1.insert().execute(**{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"})
table1.insert().execute(**{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"})
@@ -201,5 +201,5 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL):
self.assert_compile(x, "SELECT _1.this_is_the_primarykey_column AS _1, _1.this_is_the_data_column AS _2 FROM "
"(SELECT some_large_named_table.this_is_the_primarykey_column AS _3, some_large_named_table.this_is_the_data_column AS _4 "
"FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :_1) AS _1", dialect=compile_dialect)
-
-
+
+
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index c5ef48154..75eac9ee8 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -32,7 +32,7 @@ class MetaDataTest(TestBase, ComparesTables):
t2 = Table('t2', metadata, Column('x', Integer), schema='foo')
t3 = Table('t2', MetaData(), Column('x', Integer))
t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo')
-
+
assert "t1" in metadata
assert "foo.t2" in metadata
assert "t2" not in metadata
@@ -41,7 +41,7 @@ class MetaDataTest(TestBase, ComparesTables):
assert t2 in metadata
assert t3 not in metadata
assert t4 not in metadata
-
+
def test_uninitialized_column_copy(self):
for col in [
Column('foo', String(), nullable=False),
@@ -76,24 +76,24 @@ class MetaDataTest(TestBase, ComparesTables):
cx = c1.copy()
t = Table('foo%d' % i, m, cx)
eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo'])
-
+
def test_schema_collection_add(self):
metadata = MetaData()
-
+
t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
t2 = Table('t2', metadata, Column('x', Integer), schema='bar')
t3 = Table('t3', metadata, Column('x', Integer))
-
+
eq_(metadata._schemas, set(['foo', 'bar']))
eq_(len(metadata.tables), 3)
-
+
def test_schema_collection_remove(self):
metadata = MetaData()
-
+
t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
t2 = Table('t2', metadata, Column('x', Integer), schema='bar')
t3 = Table('t3', metadata, Column('x', Integer), schema='bar')
-
+
metadata.remove(t3)
eq_(metadata._schemas, set(['foo', 'bar']))
eq_(len(metadata.tables), 2)
@@ -101,28 +101,28 @@ class MetaDataTest(TestBase, ComparesTables):
metadata.remove(t1)
eq_(metadata._schemas, set(['bar']))
eq_(len(metadata.tables), 1)
-
+
def test_schema_collection_remove_all(self):
metadata = MetaData()
-
+
t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
t2 = Table('t2', metadata, Column('x', Integer), schema='bar')
metadata.clear()
eq_(metadata._schemas, set())
eq_(len(metadata.tables), 0)
-
+
def test_metadata_tables_immutable(self):
metadata = MetaData()
-
+
t1 = Table('t1', metadata, Column('x', Integer))
assert 't1' in metadata.tables
-
+
assert_raises(
TypeError,
lambda: metadata.tables.pop('t1')
)
-
+
@testing.provide_metadata
def test_dupe_tables(self):
t1 = Table('table1', metadata,
@@ -143,28 +143,28 @@ class MetaDataTest(TestBase, ComparesTables):
"Table object.",
go
)
-
+
def test_fk_copy(self):
c1 = Column('foo', Integer)
c2 = Column('bar', Integer)
m = MetaData()
t1 = Table('t', m, c1, c2)
-
+
kw = dict(onupdate="X",
ondelete="Y", use_alter=True, name='f1',
deferrable="Z", initially="Q", link_to_name=True)
-
+
fk1 = ForeignKey(c1, **kw)
fk2 = ForeignKeyConstraint((c1,), (c2,), **kw)
-
+
t1.append_constraint(fk2)
fk1c = fk1.copy()
fk2c = fk2.copy()
-
+
for k in kw:
eq_(getattr(fk1c, k), kw[k])
eq_(getattr(fk2c, k), kw[k])
-
+
def test_fk_construct(self):
c1 = Column('foo', Integer)
c2 = Column('bar', Integer)
@@ -172,7 +172,7 @@ class MetaDataTest(TestBase, ComparesTables):
t1 = Table('t', m, c1, c2)
fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1)
assert fk1 in t1.constraints
-
+
@testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely')
def test_to_metadata(self):
meta = MetaData()
@@ -264,7 +264,7 @@ class MetaDataTest(TestBase, ComparesTables):
assert not c.columns.contains_column(table.c.name)
finally:
meta.drop_all(testing.db)
-
+
def test_tometadata_with_schema(self):
meta = MetaData()
@@ -314,7 +314,7 @@ class MetaDataTest(TestBase, ComparesTables):
Column('data2', Integer),
)
Index('multi',table.c.data1,table.c.data2),
-
+
meta2 = MetaData()
table_c = table.tometadata(meta2)
@@ -322,7 +322,7 @@ class MetaDataTest(TestBase, ComparesTables):
return [i.name,i.unique] + \
sorted(i.kwargs.items()) + \
i.columns.keys()
-
+
eq_(
sorted([_get_key(i) for i in table.indexes]),
sorted([_get_key(i) for i in table_c.indexes])
@@ -330,7 +330,7 @@ class MetaDataTest(TestBase, ComparesTables):
@emits_warning("Table '.+' already exists within the given MetaData")
def test_tometadata_already_there(self):
-
+
meta1 = MetaData()
table1 = Table('mytable', meta1,
Column('myid', Integer, primary_key=True),
@@ -341,7 +341,7 @@ class MetaDataTest(TestBase, ComparesTables):
)
meta3 = MetaData()
-
+
table_c = table1.tometadata(meta2)
table_d = table2.tometadata(meta2)
@@ -384,7 +384,7 @@ class MetaDataTest(TestBase, ComparesTables):
c = Table('c', meta, Column('foo', Integer))
d = Table('d', meta, Column('foo', Integer))
e = Table('e', meta, Column('foo', Integer))
-
+
e.add_is_dependent_on(c)
a.add_is_dependent_on(b)
b.add_is_dependent_on(d)
@@ -394,7 +394,7 @@ class MetaDataTest(TestBase, ComparesTables):
meta.sorted_tables,
[d, b, a, c, e]
)
-
+
def test_tometadata_strip_schema(self):
meta = MetaData()
@@ -433,7 +433,7 @@ class TableTest(TestBase, AssertsCompiledSQL):
table1 = Table("temporary_table_1", MetaData(),
Column("col1", Integer),
prefixes = ["TEMPORARY"])
-
+
self.assert_compile(
schema.CreateTable(table1),
"CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)"
@@ -480,8 +480,8 @@ class TableTest(TestBase, AssertsCompiledSQL):
TypeError,
assign
)
-
-
+
+
class ColumnDefinitionTest(TestBase):
"""Test Column() construction."""
@@ -570,4 +570,3 @@ class ColumnOptionsTest(TestBase):
c.info['bar'] = 'zip'
assert c.info['bar'] == 'zip'
- \ No newline at end of file
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 085845bc6..6a9055887 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -23,7 +23,7 @@ class QueryTest(TestBase):
Column('address', String(30)),
test_needs_acid=True
)
-
+
users2 = Table('u2', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
@@ -47,7 +47,7 @@ class QueryTest(TestBase):
def test_insert_heterogeneous_params(self):
"""test that executemany parameters are asserted to match the parameter set of the first."""
-
+
assert_raises_message(exc.InvalidRequestError,
"A value is required for bind parameter 'user_name', in parameter group 2",
users.insert().execute,
@@ -87,10 +87,10 @@ class QueryTest(TestBase):
comp = ins.compile(engine, column_keys=list(values))
if not set(values).issuperset(c.key for c in table.primary_key):
assert comp.returning
-
+
result = engine.execute(table.insert(), **values)
ret = values.copy()
-
+
for col, id in zip(table.primary_key, result.inserted_primary_key):
ret[col.key] = id
@@ -104,7 +104,7 @@ class QueryTest(TestBase):
if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
assert testing.db.dialect.implicit_returning
-
+
if testing.db.dialect.implicit_returning:
test_engines = [
engines.testing_engine(options={'implicit_returning':False}),
@@ -112,7 +112,7 @@ class QueryTest(TestBase):
]
else:
test_engines = [testing.db]
-
+
for engine in test_engines:
metadata = MetaData()
for supported, table, values, assertvalues in [
@@ -208,14 +208,14 @@ class QueryTest(TestBase):
]
else:
test_engines = [testing.db]
-
+
for engine in test_engines:
-
+
r = engine.execute(users.insert(),
{'user_name':'jack'},
)
assert r.closed
-
+
def test_row_iteration(self):
users.insert().execute(
{'user_id':7, 'user_name':'jack'},
@@ -245,25 +245,25 @@ class QueryTest(TestBase):
@testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
def test_order_by_label(self):
"""test that a label within an ORDER BY works on each backend.
-
+
This test should be modified to support [ticket:1068] when that ticket
is implemented. For now, you need to put the actual string in the
ORDER BY.
-
+
"""
users.insert().execute(
{'user_id':7, 'user_name':'jack'},
{'user_id':8, 'user_name':'ed'},
{'user_id':9, 'user_name':'fred'},
)
-
+
concat = ("test: " + users.c.user_name).label('thedata')
print select([concat]).order_by("thedata")
eq_(
select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
-
+
eq_(
select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
@@ -285,8 +285,8 @@ class QueryTest(TestBase):
[("test: ed",), ("test: fred",), ("test: jack",)]
)
go()
-
-
+
+
def test_row_comparison(self):
users.insert().execute(user_id = 7, user_name = 'jack')
rp = users.select().execute().first()
@@ -311,10 +311,10 @@ class QueryTest(TestBase):
for pickle in False, True:
for use_labels in False, True:
result = users.select(use_labels=use_labels).order_by(users.c.user_id).execute().fetchall()
-
+
if pickle:
result = util.pickle.loads(util.pickle.dumps(result))
-
+
eq_(
result,
[(7, "jack"), (8, "ed"), (9, "fred")]
@@ -325,27 +325,27 @@ class QueryTest(TestBase):
else:
eq_(result[0]['user_id'], 7)
eq_(result[0].keys(), ["user_id", "user_name"])
-
+
eq_(result[0][0], 7)
eq_(result[0][users.c.user_id], 7)
eq_(result[0][users.c.user_name], 'jack')
-
+
if use_labels:
assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.user_id])
else:
# test with a different table. name resolution is
# causing 'user_id' to match when use_labels wasn't used.
eq_(result[0][addresses.c.user_id], 7)
-
+
assert_raises(exc.NoSuchColumnError, lambda: result[0]['fake key'])
assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.address_id])
-
+
@testing.requires.boolean_col_expressions
def test_or_and_as_columns(self):
true, false = literal(True), literal(False)
-
+
eq_(testing.db.execute(select([and_(true, false)])).scalar(), False)
eq_(testing.db.execute(select([and_(true, true)])).scalar(), True)
eq_(testing.db.execute(select([or_(true, false)])).scalar(), True)
@@ -359,7 +359,7 @@ class QueryTest(TestBase):
row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first()
assert row.x == True
assert row.y == False
-
+
def test_fetchmany(self):
users.insert().execute(user_id = 7, user_name = 'jack')
users.insert().execute(user_id = 8, user_name = 'ed')
@@ -394,7 +394,7 @@ class QueryTest(TestBase):
), [(5,)]),
):
eq_(expr.execute().fetchall(), result)
-
+
@testing.fails_on("firebird", "see dialect.test_firebird:MiscTest.test_percents_in_text")
@testing.fails_on("oracle", "neither % nor %% are accepted")
@testing.fails_on("informix", "neither % nor %% are accepted")
@@ -410,7 +410,7 @@ class QueryTest(TestBase):
(text("select 'hello % world'"), "hello % world")
):
eq_(testing.db.scalar(expr), result)
-
+
def test_ilike(self):
users.insert().execute(
{'user_id':1, 'user_name':'one'},
@@ -642,7 +642,7 @@ class QueryTest(TestBase):
self.assert_(r[0:1] == (1,))
self.assert_(r[1:] == (2, 'foo@bar.com'))
self.assert_(r[:-1] == (1, 2))
-
+
def test_column_accessor(self):
users.insert().execute(user_id=1, user_name='john')
users.insert().execute(user_id=2, user_name='jack')
@@ -651,11 +651,11 @@ class QueryTest(TestBase):
r = users.select(users.c.user_id==2).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
+
r = text("select * from query_users where user_id=2", bind=testing.db).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
+
# test a little sqlite weirdness - with the UNION,
# cols come back as "query_users.user_id" in cursor.description
r = text("select query_users.user_id, query_users.user_name from query_users "
@@ -682,14 +682,14 @@ class QueryTest(TestBase):
users.insert(),
{'user_id':1, 'user_name':'ed'}
)
-
+
eq_(r.lastrowid, 1)
-
-
+
+
def test_graceful_fetch_on_non_rows(self):
"""test that calling fetchone() etc. on a result that doesn't
return rows fails gracefully.
-
+
"""
# these proxies don't work with no cursor.description present.
@@ -709,7 +709,7 @@ class QueryTest(TestBase):
getattr(result, meth),
)
trans.rollback()
-
+
def test_no_inserted_pk_on_non_insert(self):
result = testing.db.execute("select * from query_users")
assert_raises_message(
@@ -717,7 +717,7 @@ class QueryTest(TestBase):
r"Statement is not an insert\(\) expression construct.",
getattr, result, 'inserted_primary_key'
)
-
+
@testing.requires.returning
def test_no_inserted_pk_on_returning(self):
result = testing.db.execute(users.insert().returning(users.c.user_id, users.c.user_name))
@@ -726,7 +726,7 @@ class QueryTest(TestBase):
r"Can't call inserted_primary_key when returning\(\) is used.",
getattr, result, 'inserted_primary_key'
)
-
+
def test_fetchone_til_end(self):
result = testing.db.execute("select * from query_users")
eq_(result.fetchone(), None)
@@ -738,17 +738,17 @@ class QueryTest(TestBase):
def test_result_case_sensitivity(self):
"""test name normalization for result sets."""
-
+
row = testing.db.execute(
select([
literal_column("1").label("case_insensitive"),
literal_column("2").label("CaseSensitive")
])
).first()
-
+
assert row.keys() == ["case_insensitive", "CaseSensitive"]
-
+
def test_row_as_args(self):
users.insert().execute(user_id=1, user_name='john')
r = users.select(users.c.user_id==1).execute().first()
@@ -761,12 +761,12 @@ class QueryTest(TestBase):
r = users.select().execute()
users2.insert().execute(list(r))
assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
-
+
users2.delete().execute()
r = users.select().execute()
users2.insert().execute(*list(r))
assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
-
+
def test_ambiguous_column(self):
users.insert().execute(user_id=1, user_name='john')
r = users.outerjoin(addresses).select().execute().first()
@@ -782,7 +782,7 @@ class QueryTest(TestBase):
"Ambiguous column name",
lambda: r['user_id']
)
-
+
result = users.outerjoin(addresses).select().execute()
result = base.BufferedColumnResultProxy(result.context)
r = result.first()
@@ -821,7 +821,7 @@ class QueryTest(TestBase):
users.insert().execute(user_id=1, user_name='foo')
r = users.select().execute().first()
eq_(len(r), 2)
-
+
r = testing.db.execute('select user_name, user_id from query_users').first()
eq_(len(r), 2)
r = testing.db.execute('select user_name from query_users').first()
@@ -924,10 +924,10 @@ class QueryTest(TestBase):
"uses sql-92 rules")
def test_bind_in(self):
"""test calling IN against a bind parameter.
-
+
this isn't allowed on several platforms since we
generate ? = ?.
-
+
"""
users.insert().execute(user_id = 7, user_name = 'jack')
users.insert().execute(user_id = 8, user_name = 'fred')
@@ -940,7 +940,7 @@ class QueryTest(TestBase):
assert len(r) == 3
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
-
+
@testing.emits_warning('.*empty sequence.*')
@testing.fails_on('firebird', 'uses sql-92 bind rules')
def test_literal_in(self):
@@ -953,15 +953,15 @@ class QueryTest(TestBase):
s = users.select(not_(literal("john").in_([])))
r = s.execute().fetchall()
assert len(r) == 3
-
-
+
+
@testing.emits_warning('.*empty sequence.*')
@testing.requires.boolean_col_expressions
def test_in_filtering_advanced(self):
"""test the behavior of the in_() function when
comparing against an empty collection, specifically
that a proper boolean value is generated.
-
+
"""
users.insert().execute(user_id = 7, user_name = 'jack')
@@ -980,11 +980,11 @@ class QueryTest(TestBase):
class PercentSchemaNamesTest(TestBase):
"""tests using percent signs, spaces in table and column names.
-
+
Doesn't pass for mysql, postgresql, but this is really a
SQLAlchemy bug - we should be escaping out %% signs for this
operation the same way we do for text() and column labels.
-
+
"""
@classmethod
@@ -999,11 +999,11 @@ class PercentSchemaNamesTest(TestBase):
def teardown(self):
percent_table.delete().execute()
-
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
-
+
def test_single_roundtrip(self):
percent_table.insert().execute(
{'percent%':5, 'spaces % more spaces':12},
@@ -1018,7 +1018,7 @@ class PercentSchemaNamesTest(TestBase):
{'percent%':11, 'spaces % more spaces':9},
)
self._assert_table()
-
+
@testing.crashes('mysql+mysqldb', 'MySQLdb handles executemany() inconsistently vs. execute()')
def test_executemany_roundtrip(self):
percent_table.insert().execute(
@@ -1030,7 +1030,7 @@ class PercentSchemaNamesTest(TestBase):
{'percent%':11, 'spaces % more spaces':9},
)
self._assert_table()
-
+
def _assert_table(self):
for table in (percent_table, percent_table.alias()):
eq_(
@@ -1073,9 +1073,9 @@ class PercentSchemaNamesTest(TestBase):
(11, 15)
]
)
-
-
-
+
+
+
class LimitTest(TestBase):
@classmethod
@@ -1106,7 +1106,7 @@ class LimitTest(TestBase):
addresses.insert().execute(address_id=6, user_id=6, address='addr5')
users.insert().execute(user_id=7, user_name='fido')
addresses.insert().execute(address_id=7, user_id=7, address='addr5')
-
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -1189,11 +1189,11 @@ class CompoundTest(TestBase):
dict(col2="t3col2r2", col3="bbb", col4="aaa"),
dict(col2="t3col2r3", col3="ccc", col4="bbb"),
])
-
+
@engines.close_first
def teardown(self):
pass
-
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -1274,13 +1274,13 @@ class CompoundTest(TestBase):
"""like test_union_all, but breaks the sub-union into
a subquery with an explicit column reference on the outside,
more palatable to a wider variety of engines.
-
+
"""
u = union(
select([t1.c.col3]),
select([t1.c.col3]),
).alias()
-
+
e = union_all(
select([t1.c.col3]),
select([u.c.col3])
@@ -1327,7 +1327,7 @@ class CompoundTest(TestBase):
def test_except_style2(self):
# same as style1, but add alias().select() to the except_().
# sqlite can handle it now.
-
+
e = except_(union(
select([t1.c.col3, t1.c.col4]),
select([t2.c.col3, t2.c.col4]),
@@ -1368,7 +1368,7 @@ class CompoundTest(TestBase):
select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc
).alias().select()
)
-
+
eq_(e.execute().fetchall(), [('ccc',)])
eq_(
e.alias().select().execute().fetchall(),
@@ -1409,7 +1409,7 @@ class CompoundTest(TestBase):
found = self._fetchall_sorted(u.execute())
eq_(found, wanted)
-
+
@testing.requires.intersect
def test_intersect_unions_3(self):
u = intersect(
@@ -1733,7 +1733,7 @@ class OperatorTest(TestBase):
@classmethod
def teardown_class(cls):
metadata.drop_all()
-
+
# TODO: seems like more tests warranted for this setup.
def test_modulo(self):
diff --git a/test/sql/test_quote.py b/test/sql/test_quote.py
index e880388d7..2aa5086c6 100644
--- a/test/sql/test_quote.py
+++ b/test/sql/test_quote.py
@@ -96,7 +96,7 @@ class QuoteTest(TestBase, AssertsCompiledSQL):
'''SELECT 1 FROM (SELECT "foo"."t1"."col1" AS "col1" FROM '''\
'''"foo"."t1") AS anon WHERE anon."col1" = :col1_1'''
)
-
+
metadata = MetaData()
t1 = Table('TableOne', metadata,
Column('ColumnOne', Integer, quote=False), quote=False, schema="FooBar", quote_schema=False)
@@ -105,7 +105,7 @@ class QuoteTest(TestBase, AssertsCompiledSQL):
self.assert_compile(t1.select().apply_labels(),
"SELECT FooBar.TableOne.ColumnOne AS "\
"FooBar_TableOne_ColumnOne FROM FooBar.TableOne" # TODO: is this what we really want here ? what if table/schema
- # *are* quoted?
+ # *are* quoted?
)
a = t1.select().alias('anon')
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
index 632a739f1..2a8e6fc2f 100644
--- a/test/sql/test_returning.py
+++ b/test/sql/test_returning.py
@@ -10,10 +10,10 @@ class ReturningTest(TestBase, AssertsExecutionResults):
def setup(self):
meta = MetaData(testing.db)
global table, GoofyType
-
+
class GoofyType(TypeDecorator):
impl = String
-
+
def process_bind_param(self, value, dialect):
if value is None:
return None
@@ -23,7 +23,7 @@ class ReturningTest(TestBase, AssertsExecutionResults):
if value is None:
return None
return value + "BAR"
-
+
table = Table('tables', meta,
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('persons', Integer),
@@ -31,19 +31,19 @@ class ReturningTest(TestBase, AssertsExecutionResults):
Column('goofy', GoofyType(50))
)
table.create(checkfirst=True)
-
+
def teardown(self):
table.drop()
-
+
@testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
@testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
def test_column_targeting(self):
result = table.insert().returning(table.c.id, table.c.full).execute({'persons': 1, 'full': False})
-
+
row = result.first()
assert row[table.c.id] == row['id'] == 1
assert row[table.c.full] == row['full'] == False
-
+
result = table.insert().values(persons=5, full=True, goofy="somegoofy").\
returning(table.c.persons, table.c.full, table.c.goofy).execute()
row = result.first()
@@ -52,7 +52,7 @@ class ReturningTest(TestBase, AssertsExecutionResults):
eq_(row[table.c.goofy], row['goofy'])
eq_(row['goofy'], "FOOsomegoofyBAR")
-
+
@testing.fails_on('firebird', "fb can't handle returning x AS y")
@testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
@testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
@@ -76,7 +76,7 @@ class ReturningTest(TestBase, AssertsExecutionResults):
returning(table.c.persons + 18).execute()
row = result.first()
assert row[0] == 30
-
+
@testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
@testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
def test_update_returning(self):
@@ -115,8 +115,8 @@ class ReturningTest(TestBase, AssertsExecutionResults):
test_executemany()
-
-
+
+
@testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
@testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
@testing.fails_on_everything_except('postgresql', 'firebird')
@@ -164,7 +164,7 @@ class SequenceReturningTest(TestBase):
class KeyReturningTest(TestBase, AssertsExecutionResults):
"""test returning() works with columns that define 'key'."""
-
+
__unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access')
def setup(self):
@@ -186,8 +186,8 @@ class KeyReturningTest(TestBase, AssertsExecutionResults):
result = table.insert().returning(table.c.foo_id).execute(data='somedata')
row = result.first()
assert row[table.c.foo_id] == row['id'] == 1
-
+
result = table.select().execute().first()
assert row[table.c.foo_id] == row['id'] == 1
-
+
diff --git a/test/sql/test_rowcount.py b/test/sql/test_rowcount.py
index ed40f2801..fc74e8467 100644
--- a/test/sql/test_rowcount.py
+++ b/test/sql/test_rowcount.py
@@ -4,9 +4,9 @@ from test.lib import *
class FoundRowsTest(TestBase, AssertsExecutionResults):
"""tests rowcount functionality"""
-
+
__requires__ = ('sane_rowcount', )
-
+
@classmethod
def setup_class(cls):
global employees_table, metadata
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 77acf896b..243c56dcf 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -31,7 +31,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
def test_indirect_correspondence_on_labels(self):
# this test depends upon 'distance' to
# get the right result
-
+
# same column three times
s = select([table1.c.col1.label('c2'), table1.c.col1,
@@ -48,7 +48,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
def test_direct_correspondence_on_labels(self):
# this test depends on labels being part
# of the proxy set to get the right result
-
+
l1, l2 = table1.c.col1.label('foo'), table1.c.col1.label('bar')
sel = select([l1, l2])
@@ -85,20 +85,20 @@ class SelectableTest(TestBase, AssertsExecutionResults):
j2 = jjj.alias('foo')
assert j2.corresponding_column(table1.c.col1) \
is j2.c.table1_col1
-
+
def test_against_cloned_non_table(self):
# test that corresponding column digs across
# clone boundaries with anonymous labeled elements
col = func.count().label('foo')
sel = select([col])
-
+
sel2 = visitors.ReplacingCloningVisitor().traverse(sel)
assert sel2.corresponding_column(col) is sel2.c.foo
sel3 = visitors.ReplacingCloningVisitor().traverse(sel2)
assert sel3.corresponding_column(col) is sel3.c.foo
-
+
def test_select_on_table(self):
sel = select([table1, table2], use_labels=True)
@@ -153,22 +153,22 @@ class SelectableTest(TestBase, AssertsExecutionResults):
def test_union_precedence(self):
# conflicting column correspondence should be resolved based on
# the order of the select()s in the union
-
+
s1 = select([table1.c.col1, table1.c.col2])
s2 = select([table1.c.col2, table1.c.col1])
s3 = select([table1.c.col3, table1.c.colx])
s4 = select([table1.c.colx, table1.c.col3])
-
+
u1 = union(s1, s2)
assert u1.corresponding_column(table1.c.col1) is u1.c.col1
assert u1.corresponding_column(table1.c.col2) is u1.c.col2
-
+
u1 = union(s1, s2, s3, s4)
assert u1.corresponding_column(table1.c.col1) is u1.c.col1
assert u1.corresponding_column(table1.c.col2) is u1.c.col2
assert u1.corresponding_column(table1.c.colx) is u1.c.col2
assert u1.corresponding_column(table1.c.col3) is u1.c.col1
-
+
def test_singular_union(self):
u = union(select([table1.c.col1, table1.c.col2,
@@ -254,13 +254,13 @@ class SelectableTest(TestBase, AssertsExecutionResults):
j = join(a, table2)
criterion = a.c.acol1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
-
+
def test_labeled_select_correspoinding(self):
l1 = select([func.max(table1.c.col1)]).label('foo')
s = select([l1])
eq_(s.corresponding_column(l1), s.c.foo)
-
+
s = select([table1.c.col1, l1])
eq_(s.corresponding_column(l1), s.c.foo)
@@ -300,7 +300,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
s = select([t2, t3], use_labels=True)
assert_raises(exc.NoReferencedTableError, s.join, t1)
-
+
def test_join_condition(self):
m = MetaData()
@@ -326,7 +326,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
]:
assert expected.compare(sql_util.join_condition(left,
right, a_subset=a_subset))
-
+
# these are ambiguous, or have no joins
for left, right, a_subset in [
(t1t2, t3, None),
@@ -339,7 +339,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
sql_util.join_condition,
left, right, a_subset=a_subset
)
-
+
als = t2t3.alias()
# test join's behavior, including natural
for left, right, expected in [
@@ -370,7 +370,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
"Perhaps you meant to convert the right "
"side to a subquery using alias\(\)\?",
t1t2.join, t2t3.select(use_labels=True))
-
+
class PrimaryKeyTest(TestBase, AssertsExecutionResults):
@@ -488,7 +488,7 @@ class ReduceTest(TestBase, AssertsExecutionResults):
t3 = Table('t3', meta,
Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True),
Column('t3data', String(30)))
-
+
eq_(util.column_set(sql_util.reduce_columns([
t1.c.t1id,
t1.c.t1data,
@@ -498,7 +498,7 @@ class ReduceTest(TestBase, AssertsExecutionResults):
t3.c.t3data,
])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data,
t3.c.t3data]))
-
+
def test_reduce_selectable(self):
metadata = MetaData()
@@ -514,7 +514,7 @@ class ReduceTest(TestBase, AssertsExecutionResults):
eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
util.column_set([s.c.engineer_id, s.c.engineer_name,
s.c.manager_id]))
-
+
def test_reduce_aliased_join(self):
metadata = MetaData()
@@ -543,7 +543,7 @@ class ReduceTest(TestBase, AssertsExecutionResults):
eq_(util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id,
pjoin.c.engineers_person_id, pjoin.c.managers_person_id])),
util.column_set([pjoin.c.people_person_id]))
-
+
def test_reduce_aliased_union(self):
metadata = MetaData()
@@ -567,7 +567,7 @@ class ReduceTest(TestBase, AssertsExecutionResults):
item_join.c.dummy, item_join.c.child_name])),
util.column_set([item_join.c.id, item_join.c.dummy,
item_join.c.child_name]))
-
+
def test_reduce_aliased_union_2(self):
metadata = MetaData()
@@ -597,7 +597,7 @@ class ReduceTest(TestBase, AssertsExecutionResults):
select_from(
page_table.join(magazine_page_table).
join(classified_page_table)),
-
+
select([
page_table.c.id,
magazine_page_table.c.page_id,
@@ -622,7 +622,7 @@ class ReduceTest(TestBase, AssertsExecutionResults):
cast(null(), Integer).label('magazine_page_id')
]).
select_from(page_table.join(magazine_page_table)),
-
+
select([
page_table.c.id,
magazine_page_table.c.page_id,
@@ -684,7 +684,7 @@ class AnnotationsTest(TestBase):
def __init__(self):
Column.__init__(self, 'foo', Integer)
_constructor = Column
-
+
t1 = Table('t1', MetaData(), MyColumn())
s1 = t1.select()
assert isinstance(t1.c.foo, MyColumn)
@@ -695,14 +695,14 @@ class AnnotationsTest(TestBase):
assert isinstance(s2.c.foo, Column)
annot_2 = s1._annotate({})
assert isinstance(annot_2.c.foo, Column)
-
+
def test_annotated_corresponding_column(self):
table1 = table('table1', column("col1"))
-
+
s1 = select([table1.c.col1])
t1 = s1._annotate({})
t2 = s1
-
+
# t1 needs to share the same _make_proxy() columns as t2, even
# though it's annotated. otherwise paths will diverge once they
# are corresponded against "inner" below.
@@ -723,12 +723,12 @@ class AnnotationsTest(TestBase):
def test_annotated_visit(self):
table1 = table('table1', column("col1"), column("col2"))
-
+
bin = table1.c.col1 == bindparam('foo', value=None)
assert str(bin) == "table1.col1 = :foo"
def visit_binary(b):
b.right = table1.c.col2
-
+
b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary})
assert str(b2) == "table1.col1 = table1.col2"
@@ -739,13 +739,13 @@ class AnnotationsTest(TestBase):
def visit_binary(b):
b.left = bindparam('bar')
-
+
b4 = visitors.cloned_traverse(b2, {}, {'binary':visit_binary})
assert str(b4) == ":bar = table1.col2"
b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary})
assert str(b5) == ":bar = table1.col2"
-
+
def test_annotate_expressions(self):
table1 = table('table1', column('col1'), column('col2'))
@@ -760,10 +760,10 @@ class AnnotationsTest(TestBase):
eq_(str(sql_util._deep_annotate(expr, {})), expected)
eq_(str(sql_util._deep_annotate(expr, {},
exclude=[table1.c.col1])), expected)
-
+
def test_deannotate(self):
table1 = table('table1', column("col1"), column("col2"))
-
+
bin = table1.c.col1 == bindparam('foo', value=None)
b2 = sql_util._deep_annotate(bin, {'_orm_adapt':True})
@@ -772,7 +772,7 @@ class AnnotationsTest(TestBase):
for elem in (b2._annotations, b2.left._annotations):
assert '_orm_adapt' in elem
-
+
for elem in b3._annotations, b3.left._annotations, \
b4._annotations, b4.left._annotations:
assert elem == {}
@@ -781,4 +781,4 @@ class AnnotationsTest(TestBase):
assert b3.left is not b2.left is not bin.left
assert b4.left is bin.left # since column is immutable
assert b4.right is not bin.right is not b2.right is not b3.right
-
+
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index 700f7b7de..1665e35c8 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -22,11 +22,11 @@ class AdaptTest(TestBase):
for d in dialects.__all__
if not d.startswith('_')
]
-
+
def _all_dialects(self):
return [d.base.dialect() for d in
self._all_dialect_modules()]
-
+
def _all_types(self):
def types_for_mod(mod):
for key in dir(mod):
@@ -34,24 +34,24 @@ class AdaptTest(TestBase):
if not isinstance(typ, type) or not issubclass(typ, types.TypeEngine):
continue
yield typ
-
+
for typ in types_for_mod(types):
yield typ
for dialect in self._all_dialect_modules():
for typ in types_for_mod(dialect):
yield typ
-
+
def test_uppercase_rendering(self):
"""Test that uppercase types from types.py always render as their
type.
-
+
As of SQLA 0.6, using an uppercase type means you want specifically
that type. If the database in use doesn't support that DDL, it (the DB
backend) should raise an error - it means you should be using a
lowercased (genericized) type.
-
+
"""
-
+
for dialect in self._all_dialects():
for type_, expected in (
(FLOAT, "FLOAT"),
@@ -77,29 +77,29 @@ class AdaptTest(TestBase):
compiled = types.to_instance(type_).\
compile(dialect=dialect)
-
+
assert compiled in expected, \
"%r matches none of %r for dialect %s" % \
(compiled, expected, dialect.name)
-
+
assert str(types.to_instance(type_)) in expected, \
"default str() of type %r not expected, %r" % \
(type_, expected)
-
+
@testing.uses_deprecated()
def test_adapt_method(self):
"""ensure all types have a working adapt() method,
- which creates a distinct copy.
-
+ which creates a distinct copy.
+
The distinct copy ensures that when we cache
the adapted() form of a type against the original
in a weak key dictionary, a cycle is not formed.
-
+
This test doesn't test type-specific arguments of
adapt() beyond their defaults.
-
+
"""
-
+
for typ in self._all_types():
if typ in (types.TypeDecorator, types.TypeEngine):
continue
@@ -117,8 +117,8 @@ class AdaptTest(TestBase):
if k == 'impl':
continue
eq_(getattr(t2, k), t1.__dict__[k])
-
-
+
+
class TypeAffinityTest(TestBase):
def test_type_affinity(self):
for type_, affin in [
@@ -128,7 +128,7 @@ class TypeAffinityTest(TestBase):
(LargeBinary(), types._Binary)
]:
eq_(type_._type_affinity, affin)
-
+
for t1, t2, comp in [
(Integer(), SmallInteger(), True),
(Integer(), String(), False),
@@ -169,7 +169,7 @@ class PickleMetadataTest(TestBase):
Table('foo', meta, column_type)
ct = loads(dumps(column_type))
mt = loads(dumps(meta))
-
+
class UserDefinedTest(TestBase, AssertsCompiledSQL):
"""tests user-defined types."""
@@ -210,37 +210,37 @@ class UserDefinedTest(TestBase, AssertsCompiledSQL):
]:
for dialect_ in (dialects.postgresql, dialects.mssql, dialects.mysql):
dialect_ = dialect_.dialect()
-
+
raw_impl = types.to_instance(impl_, **kw)
-
+
class MyType(types.TypeDecorator):
impl = impl_
-
+
dec_type = MyType(**kw)
-
+
eq_(dec_type.impl.__class__, raw_impl.__class__)
-
+
raw_dialect_impl = raw_impl.dialect_impl(dialect_)
dec_dialect_impl = dec_type.dialect_impl(dialect_)
eq_(dec_dialect_impl.__class__, MyType)
eq_(raw_dialect_impl.__class__ , dec_dialect_impl.impl.__class__)
-
+
self.assert_compile(
MyType(**kw),
exp,
dialect=dialect_
)
-
+
def test_user_defined_typedec_impl(self):
class MyType(types.TypeDecorator):
impl = Float
-
+
def load_dialect_impl(self, dialect):
if dialect.name == 'sqlite':
return String(50)
else:
return super(MyType, self).load_dialect_impl(dialect)
-
+
sl = dialects.sqlite.dialect()
pg = dialects.postgresql.dialect()
t = MyType()
@@ -254,35 +254,35 @@ class UserDefinedTest(TestBase, AssertsCompiledSQL):
t.dialect_impl(dialect=pg).impl.__class__,
Float().dialect_impl(pg).__class__
)
-
+
@testing.provide_metadata
def test_type_coerce(self):
"""test ad-hoc usage of custom types with type_coerce()."""
-
+
class MyType(types.TypeDecorator):
impl = String
def process_bind_param(self, value, dialect):
return value[0:-8]
-
+
def process_result_value(self, value, dialect):
return value + "BIND_OUT"
-
+
t = Table('t', metadata, Column('data', String(50)))
metadata.create_all()
-
+
t.insert().values(data=type_coerce('d1BIND_OUT',MyType)).execute()
eq_(
select([type_coerce(t.c.data, MyType)]).execute().fetchall(),
[('d1BIND_OUT', )]
)
-
+
eq_(
select([t.c.data, type_coerce(t.c.data, MyType)]).execute().fetchall(),
[('d1', 'd1BIND_OUT')]
)
-
+
eq_(
select([t.c.data, type_coerce(t.c.data, MyType)]).\
where(type_coerce(t.c.data, MyType) == 'd1BIND_OUT').\
@@ -310,7 +310,7 @@ class UserDefinedTest(TestBase, AssertsCompiledSQL):
execute().fetchall(),
[]
)
-
+
@classmethod
def setup_class(cls):
global users, metadata
@@ -432,7 +432,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
Column('unicode_text', UnicodeText),
)
metadata.create_all()
-
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -440,26 +440,26 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
@engines.close_first
def teardown(self):
unicode_table.delete().execute()
-
+
def test_native_unicode(self):
"""assert expected values for 'native unicode' mode"""
-
+
if \
(testing.against('mssql+pyodbc') and not testing.db.dialect.freetds):
assert testing.db.dialect.returns_unicode_strings == 'conditional'
return
-
+
if testing.against('mssql+pymssql'):
assert testing.db.dialect.returns_unicode_strings == ('charset' in testing.db.url.query)
return
-
+
assert testing.db.dialect.returns_unicode_strings == \
((testing.db.name, testing.db.driver) in \
(
('postgresql','psycopg2'),
('postgresql','pypostgresql'),
('postgresql','pg8000'),
- ('postgresql','zxjdbc'),
+ ('postgresql','zxjdbc'),
('mysql','oursql'),
('mysql','zxjdbc'),
('mysql','mysqlconnector'),
@@ -471,14 +471,14 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
(testing.db.name,
testing.db.driver,
testing.db.dialect.returns_unicode_strings)
-
+
def test_round_trip(self):
unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, "\
u"quand une drôle de petit voix m’a réveillé. Elle "\
u"disait: « S’il vous plaît… dessine-moi un mouton! »"
-
+
unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
-
+
x = unicode_table.select().execute().first()
assert isinstance(x['unicode_varchar'], unicode)
assert isinstance(x['unicode_text'], unicode)
@@ -488,7 +488,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
def test_round_trip_executemany(self):
# cx_oracle was producing different behavior for cursor.executemany()
# vs. cursor.execute()
-
+
unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\
u"une drôle de petit voix m’a réveillé. "\
u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
@@ -512,12 +512,12 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
-
+
x = union(
select([unicode_table.c.unicode_varchar]),
select([unicode_table.c.unicode_varchar])
).execute().first()
-
+
assert isinstance(x['unicode_varchar'], unicode)
eq_(x['unicode_varchar'], unicodedata)
@@ -529,13 +529,13 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
def test_unicode_warnings(self):
"""test the warnings raised when SQLA must coerce unicode binds,
*and* is using the Unicode type.
-
+
"""
unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\
u"une drôle de petit voix m’a réveillé. "\
u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
-
+
# using Unicode explicly - warning should be emitted
u = Unicode()
uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect)
@@ -557,14 +557,14 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
assert_raises(exc.SAWarning, uni, 'x')
assert isinstance(uni(unicodedata), str)
# end Py2K
-
+
eq_(uni(unicodedata), unicodedata.encode('utf-8'))
-
+
# using convert unicode at engine level -
# this should not be raising a warning
unicode_engine = engines.utf8_engine(options={'convert_unicode':True,})
unicode_engine.dialect.supports_unicode_binds = False
-
+
s = String()
uni = s.dialect_impl(unicode_engine.dialect).bind_processor(unicode_engine.dialect)
# this is not the unicode type - no warning
@@ -575,36 +575,36 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
uni('x')
assert isinstance(uni(unicodedata), str)
# end Py2K
-
+
eq_(uni(unicodedata), unicodedata.encode('utf-8'))
-
+
@testing.fails_if(
lambda: testing.db_spec("postgresql+pg8000")(testing.db) and util.py3k,
"pg8000 appropriately does not accept 'bytes' for a VARCHAR column."
)
def test_ignoring_unicode_error(self):
"""checks String(unicode_error='ignore') is passed to underlying codec."""
-
+
unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\
u"une drôle de petit voix m’a réveillé. "\
u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
-
+
asciidata = unicodedata.encode('ascii', 'ignore')
-
+
m = MetaData()
table = Table('unicode_err_table', m,
Column('sort', Integer),
Column('plain_varchar_no_coding_error', \
String(248, convert_unicode='force', unicode_error='ignore'))
)
-
+
m2 = MetaData()
utf8_table = Table('unicode_err_table', m2,
Column('sort', Integer),
Column('plain_varchar_no_coding_error', \
String(248, convert_unicode=True))
)
-
+
engine = engines.testing_engine(options={'encoding':'ascii'})
m.create_all(engine)
try:
@@ -619,7 +619,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
# switch to utf-8
engine.dialect.encoding = 'utf-8'
from binascii import hexlify
-
+
# the row that we put in was stored as hexlified ascii
row = engine.execute(utf8_table.select()).first()
x = row['plain_varchar_no_coding_error']
@@ -629,7 +629,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
a = hexlify(x)
b = hexlify(asciidata)
eq_(a, b)
-
+
# insert another row which will be stored with
# utf-8 only chars
engine.execute(
@@ -649,7 +649,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
ascii_row = result.fetchone()
utf8_row = result.fetchone()
result.close()
-
+
x = ascii_row['plain_varchar_no_coding_error']
# on python3 "x" comes back as string (i.e. unicode),
# hexlify requires bytes
@@ -671,7 +671,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
else:
a = hexlify(x)
eq_(a, b)
-
+
finally:
m.drop_all(engine)
@@ -692,11 +692,11 @@ class EnumTest(TestBase):
)
metadata.create_all()
-
+
def teardown(self):
enum_table.delete().execute()
non_native_enum_table.delete().execute()
-
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -713,7 +713,7 @@ class EnumTest(TestBase):
{'id':2, 'someenum':'two'},
{'id':3, 'someenum':'one'},
])
-
+
eq_(
enum_table.select().order_by(enum_table.c.id).execute().fetchall(),
[
@@ -739,7 +739,7 @@ class EnumTest(TestBase):
(3, 'one'),
]
)
-
+
def test_adapt(self):
from sqlalchemy.dialects.postgresql import ENUM
e1 = Enum('one','two','three', native_enum=False)
@@ -749,7 +749,7 @@ class EnumTest(TestBase):
e1 = Enum('one','two','three', name='foo', schema='bar')
eq_(e1.adapt(ENUM).name, 'foo')
eq_(e1.adapt(ENUM).schema, 'bar')
-
+
@testing.fails_on('mysql+mysqldb', "MySQL seems to issue a 'data truncated' warning.")
def test_constraint(self):
assert_raises(exc.DBAPIError,
@@ -764,7 +764,7 @@ class EnumTest(TestBase):
non_native_enum_table.insert().execute,
{'id':4, 'someenum':'four'}
)
-
+
class BinaryTest(TestBase, AssertsExecutionResults):
__excluded_on__ = (
('mysql', '<', (4, 1, 1)), # screwy varbinary types
@@ -786,7 +786,7 @@ class BinaryTest(TestBase, AssertsExecutionResults):
if value:
value.stuff = 'this is the right stuff'
return value
-
+
metadata = MetaData(testing.db)
binary_table = Table('binary_table', metadata,
Column('primary_id', Integer, primary_key=True, test_needs_autoincrement=True),
@@ -855,15 +855,15 @@ class BinaryTest(TestBase, AssertsExecutionResults):
'data, not really known how to make this work')
def test_comparison(self):
"""test that type coercion occurs on comparison for binary"""
-
+
expr = binary_table.c.data == 'foo'
assert isinstance(expr.right.type, LargeBinary)
-
+
data = os.urandom(32)
binary_table.insert().execute(data=data)
eq_(binary_table.select().where(binary_table.c.data==data).alias().count().scalar(), 1)
-
-
+
+
def load_stream(self, name):
f = os.path.join(os.path.dirname(__file__), "..", name)
return open(f, mode='rb').read()
@@ -886,16 +886,16 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
return process
def adapt_operator(self, op):
return {operators.add:operators.sub, operators.sub:operators.add}.get(op, op)
-
+
class MyTypeDec(types.TypeDecorator):
impl = String
-
+
def process_bind_param(self, value, dialect):
return "BIND_IN" + str(value)
def process_result_value(self, value, dialect):
return value + "BIND_OUT"
-
+
meta = MetaData(testing.db)
test_table = Table('test', meta,
Column('id', Integer, primary_key=True),
@@ -951,14 +951,14 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
expr = test_table.c.bvalue == bindparam("somevalue")
eq_(expr.right.type._type_affinity, String)
-
+
eq_(
testing.db.execute(test_table.select().where(expr),
{"somevalue":"foo"}).fetchall(),
[(1, 'somedata',
datetime.date(2007, 10, 15), 25, 'BIND_INfooBIND_OUT')]
)
-
+
def test_literal_adapt(self):
# literals get typed based on the types dictionary, unless
# compatible with the left side type
@@ -974,8 +974,8 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
expr = column('foo', CHAR) == "asdf"
eq_(expr.right.type.__class__, CHAR)
-
-
+
+
@testing.fails_on('firebird', 'Data type unknown on the parameter')
def test_operator_adapt(self):
"""test type-based overloading of operators"""
@@ -996,7 +996,7 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
assert testing.db.execute(select([expr.label('foo')])).scalar() == 21
expr = test_table.c.avalue + literal(40, type_=MyCustomType)
-
+
# + operator converted to -
# value is calculated as: (250 - (40 * 10)) / 10 == -15
assert testing.db.execute(select([expr.label('foo')])).scalar() == -15
@@ -1007,10 +1007,10 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
def test_typedec_operator_adapt(self):
expr = test_table.c.bvalue + "hi"
-
+
assert expr.type.__class__ is MyTypeDec
assert expr.right.type.__class__ is MyTypeDec
-
+
eq_(
testing.db.execute(select([expr.label('foo')])).scalar(),
"BIND_INfooBIND_INhiBIND_OUT"
@@ -1019,7 +1019,7 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
def test_typedec_righthand_coercion(self):
class MyTypeDec(types.TypeDecorator):
impl = String
-
+
def process_bind_param(self, value, dialect):
return "BIND_IN" + str(value)
@@ -1028,34 +1028,34 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
tab = table('test', column('bvalue', MyTypeDec))
expr = tab.c.bvalue + 6
-
+
self.assert_compile(
expr,
"test.bvalue || :bvalue_1",
use_default_dialect=True
)
-
+
assert expr.type.__class__ is MyTypeDec
eq_(
testing.db.execute(select([expr.label('foo')])).scalar(),
"BIND_INfooBIND_IN6BIND_OUT"
)
-
-
+
+
def test_bind_typing(self):
from sqlalchemy.sql import column
-
+
class MyFoobarType(types.UserDefinedType):
pass
-
+
class Foo(object):
pass
-
+
# unknown type + integer, right hand bind
# is an Integer
expr = column("foo", MyFoobarType) + 5
assert expr.right.type._type_affinity is types.Integer
-
+
# untyped bind - it gets assigned MyFoobarType
expr = column("foo", MyFoobarType) + bindparam("foo")
assert expr.right.type._type_affinity is MyFoobarType
@@ -1067,30 +1067,30 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
# coerces to the left
expr = column("foo", MyFoobarType) + Foo()
assert expr.right.type._type_affinity is MyFoobarType
-
+
# including for non-commutative ops
expr = column("foo", MyFoobarType) - Foo()
assert expr.right.type._type_affinity is MyFoobarType
expr = column("foo", MyFoobarType) - datetime.date(2010, 8, 25)
assert expr.right.type._type_affinity is types.Date
-
+
def test_date_coercion(self):
from sqlalchemy.sql import column
-
+
expr = column('bar', types.NULLTYPE) - column('foo', types.TIMESTAMP)
eq_(expr.type._type_affinity, types.NullType)
-
+
expr = func.sysdate() - column('foo', types.TIMESTAMP)
eq_(expr.type._type_affinity, types.Interval)
expr = func.current_date() - column('foo', types.TIMESTAMP)
eq_(expr.type._type_affinity, types.Interval)
-
+
def test_numerics_coercion(self):
from sqlalchemy.sql import column
import operator
-
+
for op in (
operator.add,
operator.mul,
@@ -1114,15 +1114,15 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
str(column('a', types.NullType()) + column('b', types.NullType())),
"a + b"
)
-
+
def test_expression_typing(self):
expr = column('bar', Integer) - 3
-
+
eq_(expr.type._type_affinity, Integer)
expr = bindparam('bar') + bindparam('foo')
eq_(expr.type, types.NULLTYPE)
-
+
def test_distinct(self):
s = select([distinct(test_table.c.avalue)])
eq_(testing.db.execute(s).scalar(), 25)
@@ -1132,12 +1132,12 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
assert distinct(test_table.c.data).type == test_table.c.data.type
assert test_table.c.data.distinct().type == test_table.c.data.type
-
+
class CompileTest(TestBase, AssertsCompiledSQL):
def test_default_compile(self):
"""test that the base dialect of the type object is used
for default compilation.
-
+
"""
for type_, expected in (
(String(), "VARCHAR"),
@@ -1207,8 +1207,8 @@ class DateTest(TestBase, AssertsExecutionResults):
datetime.time(23, 59, 59, time_micro)),
(10, 'colber', None, None, None),
]
-
-
+
+
fnames = ['user_id', 'user_name', 'user_datetime',
'user_date', 'user_time']
@@ -1300,10 +1300,10 @@ class NumericTest(TestBase):
def setup(self):
global metadata
metadata = MetaData(testing.db)
-
+
def teardown(self):
metadata.drop_all()
-
+
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def _do_test(self, type_, input_, output, filter_ = None):
t = Table('t', metadata, Column('x', type_))
@@ -1318,7 +1318,7 @@ class NumericTest(TestBase):
#print result
#print output
eq_(result, output)
-
+
def test_numeric_as_decimal(self):
self._do_test(
Numeric(precision=8, scale=4),
@@ -1354,7 +1354,7 @@ class NumericTest(TestBase):
[15.7563],
filter_ = lambda n:n is not None and round(n, 5) or None
)
-
+
@testing.fails_on('mssql+pymssql', 'FIXME: improve pymssql dec handling')
def test_precision_decimal(self):
numbers = set([
@@ -1362,7 +1362,7 @@ class NumericTest(TestBase):
decimal.Decimal("0.004354"),
decimal.Decimal("900.0"),
])
-
+
self._do_test(
Numeric(precision=18, scale=12),
numbers,
@@ -1372,12 +1372,12 @@ class NumericTest(TestBase):
@testing.fails_on('mssql+pymssql', 'FIXME: improve pymssql dec handling')
def test_enotation_decimal(self):
"""test exceedingly small decimals.
-
+
Decimal reports values with E notation when the exponent
is greater than 6.
-
+
"""
-
+
numbers = set([
decimal.Decimal('1E-2'),
decimal.Decimal('1E-3'),
@@ -1397,7 +1397,7 @@ class NumericTest(TestBase):
numbers,
numbers
)
-
+
@testing.fails_on("sybase+pyodbc",
"Don't know how do get these values through FreeTDS + Sybase")
@testing.fails_on("firebird", "Precision must be from 1 to 18")
@@ -1417,7 +1417,7 @@ class NumericTest(TestBase):
numbers,
numbers
)
-
+
@testing.fails_on('sqlite', 'TODO')
@testing.fails_on('postgresql+pg8000', 'TODO')
@testing.fails_on("firebird", "Precision must be from 1 to 18")
@@ -1434,11 +1434,11 @@ class NumericTest(TestBase):
numbers,
numbers
)
-
+
class NumericRawSQLTest(TestBase):
"""Test what DBAPIs and dialects return without any typing
information supplied at the SQLA level.
-
+
"""
def _fixture(self, metadata, type, data):
t = Table('t', metadata,
@@ -1446,7 +1446,7 @@ class NumericRawSQLTest(TestBase):
)
metadata.create_all()
t.insert().execute(val=data)
-
+
@testing.fails_on('sqlite', "Doesn't provide Decimal results natively")
@testing.provide_metadata
def test_decimal_fp(self):
@@ -1475,16 +1475,16 @@ class NumericRawSQLTest(TestBase):
t = self._fixture(metadata, Float, 46.583)
val = testing.db.execute("select val from t").scalar()
assert isinstance(val, float)
-
+
# some DBAPIs have unusual float handling
if testing.against('oracle+cx_oracle', 'mysql+oursql'):
eq_(round_decimal(val, 3), 46.583)
else:
eq_(val, 46.583)
-
-
-
-
+
+
+
+
class IntervalTest(TestBase, AssertsExecutionResults):
@classmethod
def setup_class(cls):
@@ -1529,8 +1529,8 @@ class IntervalTest(TestBase, AssertsExecutionResults):
eq_(row['native_interval'], None)
eq_(row['native_interval_args'], None)
eq_(row['non_native_interval'], None)
-
-
+
+
class BooleanTest(TestBase, AssertsExecutionResults):
@classmethod
def setup_class(cls):
@@ -1542,14 +1542,14 @@ class BooleanTest(TestBase, AssertsExecutionResults):
Column('unconstrained_value', Boolean(create_constraint=False)),
)
bool_table.create()
-
+
@classmethod
def teardown_class(cls):
bool_table.drop()
-
+
def teardown(self):
bool_table.delete().execute()
-
+
def test_boolean(self):
bool_table.insert().execute(id=1, value=True)
bool_table.insert().execute(id=2, value=False)
@@ -1573,11 +1573,11 @@ class BooleanTest(TestBase, AssertsExecutionResults):
eq_(res3, [(1, True), (2, False),
(3, True), (4, True),
(5, True), (6, None)])
-
+
# ensure we're getting True/False, not just ints
assert res3[0][1] is True
assert res3[1][1] is False
-
+
@testing.fails_on('mysql',
"The CHECK clause is parsed but ignored by all storage engines.")
@testing.fails_on('mssql',
@@ -1592,11 +1592,11 @@ class BooleanTest(TestBase, AssertsExecutionResults):
def test_unconstrained(self):
testing.db.execute(
"insert into booltest (id, unconstrained_value) values (1, 5)")
-
+
class PickleTest(TestBase):
def test_eq_comparison(self):
p1 = PickleType()
-
+
for obj in (
{'1':'2'},
pickleable.Bar(5, 6),
@@ -1608,7 +1608,7 @@ class PickleTest(TestBase):
p1.compare_values,
pickleable.BrokenComparable('foo'),
pickleable.BrokenComparable('foo'))
-
+
def test_nonmutable_comparison(self):
p1 = PickleType()
@@ -1618,7 +1618,7 @@ class PickleTest(TestBase):
pickleable.OldSchool(10, 11)
):
assert p1.compare_values(p1.copy_value(obj), obj)
-
+
class CallableTest(TestBase):
@classmethod
def setup_class(cls):
diff --git a/test/sql/test_unicode.py b/test/sql/test_unicode.py
index 2eda4ffa8..d6757caf1 100644
--- a/test/sql/test_unicode.py
+++ b/test/sql/test_unicode.py
@@ -122,13 +122,13 @@ class EscapesDefaultsTest(testing.TestBase):
try:
engine = metadata.bind
-
+
# reset the identifier preparer, so that we can force it to cache
# a unicode identifier
engine.dialect.identifier_preparer = engine.dialect.preparer(engine.dialect)
select([column(u'special_col')]).select_from(t1).execute().close()
assert isinstance(engine.dialect.identifier_preparer.format_sequence(Sequence('special_col')), unicode)
-
+
# now execute, run the sequence. it should run in u"Special_col.nextid" or similar as
# a unicode object; cx_oracle asserts that this is None or a String (postgresql lets it pass thru).
# ensure that executioncontext._exec_default() is encoding.