summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/case_statement.py4
-rw-r--r--test/sql/columns.py8
-rw-r--r--test/sql/constraints.py10
-rw-r--r--test/sql/defaults.py6
-rw-r--r--test/sql/functions.py2
-rw-r--r--test/sql/generative.py214
-rw-r--r--test/sql/query.py6
-rw-r--r--test/sql/quote.py27
-rw-r--r--test/sql/select.py36
-rwxr-xr-xtest/sql/selectable.py12
-rw-r--r--test/sql/testtypes.py91
11 files changed, 250 insertions, 166 deletions
diff --git a/test/sql/case_statement.py b/test/sql/case_statement.py
index 6aecefd3c..876f820b5 100644
--- a/test/sql/case_statement.py
+++ b/test/sql/case_statement.py
@@ -2,7 +2,7 @@ import testenv; testenv.configure_for_tests()
import sys
from sqlalchemy import *
from testlib import *
-from sqlalchemy import util, exceptions
+from sqlalchemy import util, exc
from sqlalchemy.sql import table, column
@@ -91,7 +91,7 @@ class CaseTest(TestBase, AssertsCompiledSQL):
def test_literal_interpretation(self):
t = table('test', column('col1'))
- self.assertRaises(exceptions.ArgumentError, case, [("x", "y")])
+ self.assertRaises(exc.ArgumentError, case, [("x", "y")])
self.assert_compile(case([("x", "y")], value=t.c.col1), "CASE test.col1 WHEN :param_1 THEN :param_2 END")
self.assert_compile(case([(t.c.col1==7, "y")], else_="z"), "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END")
diff --git a/test/sql/columns.py b/test/sql/columns.py
index 76bf9b389..661be891a 100644
--- a/test/sql/columns.py
+++ b/test/sql/columns.py
@@ -1,6 +1,6 @@
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
-from sqlalchemy import exceptions, sql
+from sqlalchemy import exc, sql
from testlib import *
from sqlalchemy import Table, Column # don't use testlib's wrappers
@@ -37,7 +37,7 @@ class ColumnDefinitionTest(TestBase):
def test_incomplete(self):
c = self.columns()
- self.assertRaises(exceptions.ArgumentError, Table, 't', MetaData(), *c)
+ self.assertRaises(exc.ArgumentError, Table, 't', MetaData(), *c)
def test_incomplete_key(self):
c = Column(Integer)
@@ -52,8 +52,8 @@ class ColumnDefinitionTest(TestBase):
def test_bogus(self):
- self.assertRaises(exceptions.ArgumentError, Column, 'foo', name='bar')
- self.assertRaises(exceptions.ArgumentError, Column, 'foo', Integer,
+ self.assertRaises(exc.ArgumentError, Column, 'foo', name='bar')
+ self.assertRaises(exc.ArgumentError, Column, 'foo', Integer,
type_=Integer())
if __name__ == "__main__":
diff --git a/test/sql/constraints.py b/test/sql/constraints.py
index 2908e07da..966930ca9 100644
--- a/test/sql/constraints.py
+++ b/test/sql/constraints.py
@@ -1,6 +1,6 @@
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
-from sqlalchemy import exceptions
+from sqlalchemy import exc
from testlib import *
from testlib import config, engines
@@ -72,14 +72,14 @@ class ConstraintTest(TestBase, AssertsExecutionResults):
try:
foo.insert().execute(id=2,x=5,y=9)
assert False
- except exceptions.SQLError:
+ except exc.SQLError:
assert True
bar.insert().execute(id=1,x=10)
try:
bar.insert().execute(id=2,x=5)
assert False
- except exceptions.SQLError:
+ except exc.SQLError:
assert True
def test_unique_constraint(self):
@@ -100,12 +100,12 @@ class ConstraintTest(TestBase, AssertsExecutionResults):
try:
foo.insert().execute(id=3, value='value1')
assert False
- except exceptions.SQLError:
+ except exc.SQLError:
assert True
try:
bar.insert().execute(id=3, value='a', value2='b')
assert False
- except exceptions.SQLError:
+ except exc.SQLError:
assert True
def test_index_create(self):
diff --git a/test/sql/defaults.py b/test/sql/defaults.py
index 22660c060..e9ed21a65 100644
--- a/test/sql/defaults.py
+++ b/test/sql/defaults.py
@@ -1,7 +1,7 @@
import testenv; testenv.configure_for_tests()
import datetime
from sqlalchemy import *
-from sqlalchemy import exceptions, schema, util
+from sqlalchemy import exc, schema, util
from sqlalchemy.orm import mapper, create_session
from testlib import *
@@ -122,7 +122,7 @@ class DefaultTest(TestBase):
try:
c = ColumnDefault(fn)
assert False, str(fn)
- except exceptions.ArgumentError, e:
+ except exc.ArgumentError, e:
assert str(e) == ex_msg
def test_argsignature(self):
@@ -327,7 +327,7 @@ class AutoIncrementTest(TestBase):
nonai_table.insert().execute(data='row 1')
nonai_table.insert().execute(data='row 2')
assert False
- except exceptions.SQLError, e:
+ except exc.SQLError, e:
print "Got exception", str(e)
assert True
diff --git a/test/sql/functions.py b/test/sql/functions.py
index d1ce17c72..82814ef1b 100644
--- a/test/sql/functions.py
+++ b/test/sql/functions.py
@@ -2,7 +2,7 @@ import testenv; testenv.configure_for_tests()
import datetime
from sqlalchemy import *
from sqlalchemy.sql import table, column
-from sqlalchemy import databases, exceptions, sql, util
+from sqlalchemy import databases, sql, util
from sqlalchemy.sql.compiler import BIND_TEMPLATES
from sqlalchemy.engine import default
from sqlalchemy import types as sqltypes
diff --git a/test/sql/generative.py b/test/sql/generative.py
index 820474282..cf5ea8235 100644
--- a/test/sql/generative.py
+++ b/test/sql/generative.py
@@ -1,7 +1,7 @@
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
from sqlalchemy.sql import table, column, ClauseElement
-from sqlalchemy.sql.expression import _clone
+from sqlalchemy.sql.expression import _clone, _from_objects
from testlib import *
from sqlalchemy.sql.visitors import *
from sqlalchemy import util
@@ -82,14 +82,14 @@ class TraversalTest(TestBase, AssertsExecutionResults):
def test_clone(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
- class Vis(ClauseVisitor):
+ class Vis(CloningVisitor):
def visit_a(self, a):
pass
def visit_b(self, b):
pass
vis = Vis()
- s2 = vis.traverse(struct, clone=True)
+ s2 = vis.traverse(struct)
assert struct == s2
assert not struct.is_other(s2)
@@ -103,7 +103,7 @@ class TraversalTest(TestBase, AssertsExecutionResults):
pass
vis = Vis()
- s2 = vis.traverse(struct, clone=False)
+ s2 = vis.traverse(struct)
assert struct == s2
assert struct.is_other(s2)
@@ -112,7 +112,7 @@ class TraversalTest(TestBase, AssertsExecutionResults):
struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), A("expr2b")), A("expr3"))
struct3 = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2bmodified")), A("expr3"))
- class Vis(ClauseVisitor):
+ class Vis(CloningVisitor):
def visit_a(self, a):
if a.expr == "expr2":
a.expr = "expr2modified"
@@ -120,12 +120,12 @@ class TraversalTest(TestBase, AssertsExecutionResults):
pass
vis = Vis()
- s2 = vis.traverse(struct, clone=True)
+ s2 = vis.traverse(struct)
assert struct != s2
assert not struct.is_other(s2)
assert struct2 == s2
- class Vis2(ClauseVisitor):
+ class Vis2(CloningVisitor):
def visit_a(self, a):
if a.expr == "expr2b":
a.expr = "expr2bmodified"
@@ -133,7 +133,7 @@ class TraversalTest(TestBase, AssertsExecutionResults):
pass
vis2 = Vis2()
- s3 = vis2.traverse(struct, clone=True)
+ s3 = vis2.traverse(struct)
assert struct != s3
assert struct3 == s3
@@ -156,7 +156,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
def test_binary(self):
clause = t1.c.col2 == t2.c.col2
- assert str(clause) == ClauseVisitor().traverse(clause, clone=True)
+ assert str(clause) == CloningVisitor().traverse(clause)
def test_binary_anon_label_quirk(self):
t = table('t1', column('col1'))
@@ -175,25 +175,25 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
def test_join(self):
clause = t1.join(t2, t1.c.col2==t2.c.col2)
c1 = str(clause)
- assert str(clause) == str(ClauseVisitor().traverse(clause, clone=True))
+ assert str(clause) == str(CloningVisitor().traverse(clause))
- class Vis(ClauseVisitor):
+ class Vis(CloningVisitor):
def visit_binary(self, binary):
binary.right = t2.c.col3
- clause2 = Vis().traverse(clause, clone=True)
+ clause2 = Vis().traverse(clause)
assert c1 == str(clause)
assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3))
def test_text(self):
clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')])
c1 = str(clause)
- class Vis(ClauseVisitor):
+ class Vis(CloningVisitor):
def visit_textclause(self, text):
text.text = text.text + " SOME MODIFIER=:lala"
text.bindparams['lala'] = bindparam('lala')
- clause2 = Vis().traverse(clause, clone=True)
+ clause2 = Vis().traverse(clause)
assert c1 == str(clause)
assert str(clause2) == c1 + " SOME MODIFIER=:lala"
assert clause.bindparams.keys() == ['bar']
@@ -203,24 +203,27 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
s2 = select([t1])
s2_assert = str(s2)
s3_assert = str(select([t1], t1.c.col2==7))
- class Vis(ClauseVisitor):
+ class Vis(CloningVisitor):
def visit_select(self, select):
select.append_whereclause(t1.c.col2==7)
- s3 = Vis().traverse(s2, clone=True)
+ s3 = Vis().traverse(s2)
assert str(s3) == s3_assert
assert str(s2) == s2_assert
print str(s2)
print str(s3)
+ class Vis(ClauseVisitor):
+ def visit_select(self, select):
+ select.append_whereclause(t1.c.col2==7)
Vis().traverse(s2)
assert str(s2) == s3_assert
print "------------------"
s4_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col3==9)))
- class Vis(ClauseVisitor):
+ class Vis(CloningVisitor):
def visit_select(self, select):
select.append_whereclause(t1.c.col3==9)
- s4 = Vis().traverse(s3, clone=True)
+ s4 = Vis().traverse(s3)
print str(s3)
print str(s4)
assert str(s4) == s4_assert
@@ -228,12 +231,12 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
print "------------------"
s5_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col1==9)))
- class Vis(ClauseVisitor):
+ class Vis(CloningVisitor):
def visit_binary(self, binary):
if binary.left is t1.c.col3:
binary.left = t1.c.col1
binary.right = bindparam("col1", unique=True)
- s5 = Vis().traverse(s4, clone=True)
+ s5 = Vis().traverse(s4)
print str(s4)
print str(s5)
assert str(s5) == s5_assert
@@ -241,13 +244,13 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
def test_union(self):
u = union(t1.select(), t2.select())
- u2 = ClauseVisitor().traverse(u, clone=True)
+ u2 = CloningVisitor().traverse(u)
assert str(u) == str(u2)
assert [str(c) for c in u2.c] == [str(c) for c in u.c]
u = union(t1.select(), t2.select())
cols = [str(c) for c in u.c]
- u2 = ClauseVisitor().traverse(u, clone=True)
+ u2 = CloningVisitor().traverse(u)
assert str(u) == str(u2)
assert [str(c) for c in u2.c] == cols
@@ -265,7 +268,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
"""test that unique bindparams change their name upon clone() to prevent conflicts"""
s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias()
- s2 = ClauseVisitor().traverse(s, clone=True).alias()
+ s2 = CloningVisitor().traverse(s).alias()
s3 = select([s], s.c.col2==s2.c.col2)
self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\
@@ -274,7 +277,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
"WHERE anon_1.col2 = anon_2.col2")
s = select([t1], t1.c.col1==4).alias()
- s2 = ClauseVisitor().traverse(s, clone=True).alias()
+ s2 = CloningVisitor().traverse(s).alias()
s3 = select([s], s.c.col2==s2.c.col2)
self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\
"table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1, "\
@@ -286,26 +289,51 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
subq = t2.select().alias('subq')
s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)])
orig = str(s)
- s2 = ClauseVisitor().traverse(s, clone=True)
+ s2 = CloningVisitor().traverse(s)
assert orig == str(s) == str(s2)
- s4 = ClauseVisitor().traverse(s2, clone=True)
+ s4 = CloningVisitor().traverse(s2)
assert orig == str(s) == str(s2) == str(s4)
- s3 = sql_util.ClauseAdapter(table('foo')).traverse(s, clone=True)
+ s3 = sql_util.ClauseAdapter(table('foo')).traverse(s)
assert orig == str(s) == str(s3)
- s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3, clone=True)
+ s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3)
assert orig == str(s) == str(s3) == str(s4)
def test_correlated_select(self):
s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2)
- class Vis(ClauseVisitor):
+ class Vis(CloningVisitor):
def visit_select(self, select):
select.append_whereclause(t1.c.col2==7)
- self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :col2_1")
-
+ self.assert_compile(Vis().traverse(s), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :col2_1")
+
+ def test_this_thing(self):
+ s = select([t1]).where(t1.c.col1=='foo').alias()
+ s2 = select([s.c.col1])
+
+ self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1")
+ t1a = t1.alias()
+ s2 = sql_util.ClauseAdapter(t1a).traverse(s2)
+ self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1_1.col1 AS col1, table1_1.col2 AS col2, table1_1.col3 AS col3 FROM table1 AS table1_1 WHERE table1_1.col1 = :col1_1) AS anon_1")
+
+ def test_select_fromtwice(self):
+ t1a = t1.alias()
+
+ s = select([1], t1.c.col1==t1a.c.col1, from_obj=t1a).correlate(t1)
+ self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1")
+
+ s = CloningVisitor().traverse(s)
+ self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1")
+
+ s = select([t1]).where(t1.c.col1=='foo').alias()
+
+ s2 = select([1], t1.c.col1==s.c.col1, from_obj=s).correlate(t1)
+ self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1")
+ s2 = ReplacingCloningVisitor().traverse(s2)
+ self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1")
+
class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
def setUpAll(self):
global t1, t2
@@ -330,69 +358,88 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
assert t1alias in s._froms
self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
- s = vis.traverse(s, clone=True)
+ s = vis.traverse(s)
+
assert t2alias not in s._froms # not present because it's been cloned
+
assert t1alias in s._froms # present because the adapter placed it there
+
# correlate list on "s" needs to take into account the full _cloned_set for each element in _froms when correlating
self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
s = select(['*'], from_obj=[t1alias, t2alias]).correlate(t2alias).as_scalar()
self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
- s = vis.traverse(s, clone=True)
+ s = vis.traverse(s)
self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
- s = ClauseVisitor().traverse(s, clone=True)
+ s = CloningVisitor().traverse(s)
self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)")
s = select(['*']).where(t1.c.col1==t2.c.col1).as_scalar()
self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1")
vis = sql_util.ClauseAdapter(t1alias)
- s = vis.traverse(s, clone=True)
+ s = vis.traverse(s)
self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias")
- s = ClauseVisitor().traverse(s, clone=True)
+ s = CloningVisitor().traverse(s)
self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias")
s = select(['*']).where(t1.c.col1==t2.c.col1).correlate(t1).as_scalar()
self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1")
vis = sql_util.ClauseAdapter(t1alias)
- s = vis.traverse(s, clone=True)
+ s = vis.traverse(s)
self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias")
- s = ClauseVisitor().traverse(s, clone=True)
+ s = CloningVisitor().traverse(s)
self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias")
-
+
+ @testing.fails_on_everything_except()
+ def test_joins_dont_adapt(self):
+ # adapting to a join, i.e. ClauseAdapter(t1.join(t2)), doesn't make much sense.
+ # ClauseAdapter doesn't make any changes if it's against a straight join.
+ users = table('users', column('id'))
+ addresses = table('addresses', column('id'), column('user_id'))
+
+ ualias = users.alias()
+
+ s = select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users) #.as_scalar().label(None)
+ s= sql_util.ClauseAdapter(ualias).traverse(s)
+
+ j1 = addresses.join(ualias, addresses.c.user_id==ualias.c.id)
+
+ self.assert_compile(sql_util.ClauseAdapter(j1).traverse(s), "SELECT count(addresses.id) AS count_1 FROM addresses WHERE users_1.id = addresses.user_id")
def test_table_to_alias(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
- ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
- assert ff._get_from_objects() == [t1alias]
+ ff = vis.traverse(func.count(t1.c.col1).label('foo'))
+ assert list(_from_objects(ff)) == [t1alias]
- self.assert_compile(vis.traverse(select(['*'], from_obj=[t1]), clone=True), "SELECT * FROM table1 AS t1alias")
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 WHERE t1alias.col1 = table2.col2")
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = table2.col2")
+ self.assert_compile(vis.traverse(select(['*'], from_obj=[t1])), "SELECT * FROM table1 AS t1alias")
+ self.assert_compile(select(['*'], t1.c.col1==t2.c.col2), "SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2")
+ self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2)), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
+ self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2])), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
+ self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1)), "SELECT * FROM table2 WHERE t1alias.col1 = table2.col2")
+ self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2)), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = table2.col2")
s = select(['*'], from_obj=[t1]).alias('foo')
self.assert_compile(s.select(), "SELECT foo.* FROM (SELECT * FROM table1) AS foo")
- self.assert_compile(vis.traverse(s.select(), clone=True), "SELECT foo.* FROM (SELECT * FROM table1 AS t1alias) AS foo")
+ self.assert_compile(vis.traverse(s.select()), "SELECT foo.* FROM (SELECT * FROM table1 AS t1alias) AS foo")
self.assert_compile(s.select(), "SELECT foo.* FROM (SELECT * FROM table1) AS foo")
- ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
- self.assert_compile(ff, "count(t1alias.col1) AS foo")
- assert ff._get_from_objects() == [t1alias]
+ ff = vis.traverse(func.count(t1.c.col1).label('foo'))
+ self.assert_compile(select([ff]), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias")
+ assert list(_from_objects(ff)) == [t1alias]
# TODO:
# self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias")
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2")
+ self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2)), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
+ self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2])), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
+ self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1)), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
+ self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2)), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2")
def test_include_exclude(self):
m = MetaData()
@@ -517,6 +564,65 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
"WHERE c.bid = anon_1.b_aid"
)
+class SpliceJoinsTest(TestBase, AssertsCompiledSQL):
+ def setUpAll(self):
+ global table1, table2, table3, table4
+ def _table(name):
+ return table(name, column("col1"), column("col2"),column("col3"))
+
+ table1, table2, table3, table4 = [_table(name) for name in ("table1", "table2", "table3", "table4")]
+
+ def test_splice(self):
+ (t1, t2, t3, t4) = (table1, table2, table1.alias(), table2.alias())
+
+ j = t1.join(t2, t1.c.col1==t2.c.col1).join(t3, t2.c.col1==t3.c.col1).join(t4, t4.c.col1==t1.c.col1)
+
+ s = select([t1]).where(t1.c.col2<5).alias()
+
+ self.assert_compile(sql_util.splice_joins(s, j),
+ "(SELECT table1.col1 AS col1, table1.col2 AS col2, "\
+ "table1.col3 AS col3 FROM table1 WHERE table1.col2 < :col2_1) AS anon_1 "\
+ "JOIN table2 ON anon_1.col1 = table2.col1 JOIN table1 AS table1_1 ON table2.col1 = table1_1.col1 "\
+ "JOIN table2 AS table2_1 ON table2_1.col1 = anon_1.col1")
+
+ def test_stop_on(self):
+ (t1, t2, t3) = (table1, table2, table3)
+
+ j1= t1.join(t2, t1.c.col1==t2.c.col1)
+ j2 = j1.join(t3, t2.c.col1==t3.c.col1)
+
+ s = select([t1]).select_from(j1).alias()
+
+ self.assert_compile(sql_util.splice_joins(s, j2),
+ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 JOIN table2 "\
+ "ON table1.col1 = table2.col1) AS anon_1 JOIN table2 ON anon_1.col1 = table2.col1 JOIN table3 "\
+ "ON table2.col1 = table3.col1"
+ )
+
+ self.assert_compile(sql_util.splice_joins(s, j2, j1),
+ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 "\
+ "JOIN table2 ON table1.col1 = table2.col1) AS anon_1 JOIN table3 ON table2.col1 = table3.col1")
+
+ def test_splice_2(self):
+ t2a = table2.alias()
+ t3a = table3.alias()
+ j1 = table1.join(t2a, table1.c.col1==t2a.c.col1).join(t3a, t2a.c.col2==t3a.c.col2)
+
+ t2b = table4.alias()
+ j2 = table1.join(t2b, table1.c.col3==t2b.c.col3)
+
+ self.assert_compile(sql_util.splice_joins(table1, j1),
+ "table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\
+ "JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2")
+
+ self.assert_compile(sql_util.splice_joins(table1, j2), "table1 JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3")
+
+ self.assert_compile(sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2),
+ "table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\
+ "JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2 "\
+ "JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3")
+
+
class SelectTest(TestBase, AssertsCompiledSQL):
"""tests the generative capability of Select"""
diff --git a/test/sql/query.py b/test/sql/query.py
index e6d6714c2..a305a5314 100644
--- a/test/sql/query.py
+++ b/test/sql/query.py
@@ -1,7 +1,7 @@
import testenv; testenv.configure_for_tests()
import datetime
from sqlalchemy import *
-from sqlalchemy import exceptions, sql
+from sqlalchemy import exc, sql
from sqlalchemy.engine import default
from testlib import *
@@ -426,7 +426,7 @@ class QueryTest(TestBase):
try:
print r['user_id']
assert False
- except exceptions.InvalidRequestError, e:
+ except exc.InvalidRequestError, e:
assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \
str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement."
@@ -466,7 +466,7 @@ class QueryTest(TestBase):
def test_cant_execute_join(self):
try:
users.join(addresses).execute()
- except exceptions.ArgumentError, e:
+ except exc.ArgumentError, e:
assert str(e).startswith('Not an executable clause: ')
diff --git a/test/sql/quote.py b/test/sql/quote.py
index 825e836ff..d137b44a3 100644
--- a/test/sql/quote.py
+++ b/test/sql/quote.py
@@ -4,7 +4,7 @@ from sqlalchemy import sql
from sqlalchemy.sql import compiler
from testlib import *
-class QuoteTest(TestBase):
+class QuoteTest(TestBase, AssertsCompiledSQL):
def setUpAll(self):
# TODO: figure out which databases/which identifiers allow special characters to be used,
# such as: spaces, quote characters, punctuation characters, set up tests for those as
@@ -67,7 +67,23 @@ class QuoteTest(TestBase):
res2 = select([table2.c.d123, table2.c.u123, table2.c.MixedCase], use_labels=True).execute().fetchall()
print res2
assert(res2==[(1,2,3),(2,2,3),(4,3,2)])
+
+ def test_quote_flag(self):
+ metadata = MetaData()
+ t1 = Table('TableOne', metadata,
+ Column('ColumnOne', Integer), schema="FooBar")
+ self.assert_compile(t1.select(), '''SELECT "FooBar"."TableOne"."ColumnOne" FROM "FooBar"."TableOne"''')
+
+ metadata = MetaData()
+ t1 = Table('t1', metadata,
+ Column('col1', Integer, quote=True), quote=True, schema="foo", quote_schema=True)
+ self.assert_compile(t1.select(), '''SELECT "foo"."t1"."col1" FROM "foo"."t1"''')
+ metadata = MetaData()
+ t1 = Table('TableOne', metadata,
+ Column('ColumnOne', Integer, quote=False), quote=False, schema="FooBar", quote_schema=False)
+ self.assert_compile(t1.select(), '''SELECT FooBar.TableOne.ColumnOne FROM FooBar.TableOne''')
+
@testing.unsupported('oracle')
def testlabels(self):
"""test the quoting of labels.
@@ -86,16 +102,19 @@ class QuoteTest(TestBase):
table = Table("ImATable", metadata,
Column("col1", Integer))
x = select([table.c.col1.label("ImATable_col1")]).alias("SomeAlias")
- assert str(select([x.c.ImATable_col1])) == '''SELECT "SomeAlias"."ImATable_col1" \nFROM (SELECT "ImATable".col1 AS "ImATable_col1" \nFROM "ImATable") AS "SomeAlias"'''
+ self.assert_compile(select([x.c.ImATable_col1]),
+ '''SELECT "SomeAlias"."ImATable_col1" FROM (SELECT "ImATable".col1 AS "ImATable_col1" FROM "ImATable") AS "SomeAlias"''')
# note that 'foo' and 'FooCol' are literals already quoted
x = select([sql.literal_column("'foo'").label("somelabel")], from_obj=[table]).alias("AnAlias")
x = x.select()
- assert str(x) == '''SELECT "AnAlias".somelabel \nFROM (SELECT 'foo' AS somelabel \nFROM "ImATable") AS "AnAlias"'''
+ self.assert_compile(x,
+ '''SELECT "AnAlias".somelabel FROM (SELECT 'foo' AS somelabel FROM "ImATable") AS "AnAlias"''')
x = select([sql.literal_column("'FooCol'").label("SomeLabel")], from_obj=[table])
x = x.select()
- assert str(x) == '''SELECT "SomeLabel" \nFROM (SELECT 'FooCol' AS "SomeLabel" \nFROM "ImATable")'''
+ self.assert_compile(x,
+ '''SELECT "SomeLabel" FROM (SELECT 'FooCol' AS "SomeLabel" FROM "ImATable")''')
class PreparerTest(TestBase):
diff --git a/test/sql/select.py b/test/sql/select.py
index bea862112..3ecf63d34 100644
--- a/test/sql/select.py
+++ b/test/sql/select.py
@@ -1,7 +1,7 @@
import testenv; testenv.configure_for_tests()
import datetime, re, operator
from sqlalchemy import *
-from sqlalchemy import exceptions, sql, util
+from sqlalchemy import exc, sql, util
from sqlalchemy.sql import table, column, compiler
from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql
from testlib import *
@@ -154,7 +154,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
t2 = table('t2', column('c'), column('d'))
s = select([t.c.a]).where(t.c.a==t2.c.d).as_scalar()
s2 =select([t, t2, s])
- self.assertRaises(exceptions.InvalidRequestError, str, s2)
+ self.assertRaises(exc.InvalidRequestError, str, s2)
# intentional again
s = s.correlate(t, t2)
@@ -245,14 +245,14 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
try:
s = select([table1.c.myid, table1.c.name]).as_scalar()
assert False
- except exceptions.InvalidRequestError, err:
+ except exc.InvalidRequestError, err:
assert str(err) == "Scalar select can only be created from a Select object that has exactly one column expression.", str(err)
try:
# generic function which will look at the type of expression
func.coalesce(select([table1.c.myid]))
assert False
- except exceptions.InvalidRequestError, err:
+ except exc.InvalidRequestError, err:
assert str(err) == "Select objects don't have a type. Call as_scalar() on this Select object to return a 'scalar' version of this Select.", str(err)
s = select([table1.c.myid], scalar=True, correlate=False)
@@ -278,12 +278,12 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
s = select([table1.c.myid]).as_scalar()
try:
s.c.foo
- except exceptions.InvalidRequestError, err:
+ except exc.InvalidRequestError, err:
assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.'
try:
s.columns.foo
- except exceptions.InvalidRequestError, err:
+ except exc.InvalidRequestError, err:
assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.'
zips = table('zips',
@@ -807,8 +807,8 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl
self.assert_compile(
select(
- [join(join(table1, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid == table3.c.userid)
- ]),
+ [join(join(table1, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid == table3.c.userid)]
+ ),
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid"
)
@@ -854,7 +854,7 @@ EXISTS (select yay from foo where boo = lar)",
def test_compound_selects(self):
try:
union(table3.select(), table1.select())
- except exceptions.ArgumentError, err:
+ except exc.ArgumentError, err:
assert str(err) == "All selectables passed to CompoundSelect must have identical numbers of columns; select #1 has 2 columns, select #2 has 3"
x = union(
@@ -1048,10 +1048,10 @@ UNION SELECT mytable.myid FROM mytable"
# check that conflicts with "unique" params are caught
s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('myid_1')))
- self.assertRaisesMessage(exceptions.CompileError, "conflicts with unique bind parameter of the same name", str, s)
+ self.assertRaisesMessage(exc.CompileError, "conflicts with unique bind parameter of the same name", str, s)
s = select([table1], or_(table1.c.myid==7, table1.c.myid==8, table1.c.myid==bindparam('myid_1')))
- self.assertRaisesMessage(exceptions.CompileError, "conflicts with unique bind parameter of the same name", str, s)
+ self.assertRaisesMessage(exc.CompileError, "conflicts with unique bind parameter of the same name", str, s)
@@ -1153,20 +1153,6 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
self.assert_compile(select([table1], table1.c.myid.in_([])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
- @testing.uses_deprecated('passing in_')
- def test_in_deprecated_api(self):
- self.assert_compile(select([table1], table1.c.myid.in_('abc')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1)")
-
- self.assert_compile(select([table1], table1.c.myid.in_(1)),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1)")
-
- self.assert_compile(select([table1], table1.c.myid.in_(1,2)),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :myid_2)")
-
- self.assert_compile(select([table1], table1.c.myid.in_()),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
-
def test_cast(self):
tbl = table('casttest',
column('id', Integer),
diff --git a/test/sql/selectable.py b/test/sql/selectable.py
index b29ba8d5c..66793a25b 100755
--- a/test/sql/selectable.py
+++ b/test/sql/selectable.py
@@ -6,7 +6,7 @@ import testenv; testenv.configure_for_tests()
from sqlalchemy import *
from testlib import *
from sqlalchemy.sql import util as sql_util
-from sqlalchemy import exceptions
+from sqlalchemy import exc
metadata = MetaData()
table = Table('table1', metadata,
@@ -164,7 +164,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
print str(j)
self.assert_(criterion.compare(j.onclause))
- def testcolumnlabels(self):
+ def test_column_labels(self):
a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')])
print str(a)
print [c for c in a.columns]
@@ -173,13 +173,13 @@ class SelectableTest(TestBase, AssertsExecutionResults):
criterion = a.c.acol1 == table2.c.col2
print str(j)
self.assert_(criterion.compare(j.onclause))
-
+
def test_labeled_select_correspoinding(self):
l1 = select([func.max(table.c.col1)]).label('foo')
s = select([l1])
assert s.corresponding_column(l1).name == s.c.foo
-
+
s = select([table.c.col1, l1])
assert s.corresponding_column(l1).name == s.c.foo
@@ -193,7 +193,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
print str(j.onclause)
self.assert_(criterion.compare(j.onclause))
- def testtablejoinedtoselectoftable(self):
+ def test_table_joined_to_select_of_table(self):
metadata = MetaData()
a = Table('a', metadata,
Column('id', Integer, primary_key=True))
@@ -242,7 +242,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
s = select([t2, t3], use_labels=True)
- self.assertRaises(exceptions.NoReferencedTableError, s.join, t1)
+ self.assertRaises(exc.NoReferencedTableError, s.join, t1)
class PrimaryKeyTest(TestBase, AssertsExecutionResults):
def test_join_pk_collapse_implicit(self):
diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py
index 09a3702ee..9cd6f9bdb 100644
--- a/test/sql/testtypes.py
+++ b/test/sql/testtypes.py
@@ -1,7 +1,7 @@
import testenv; testenv.configure_for_tests()
import datetime, os, pickleable, re
from sqlalchemy import *
-from sqlalchemy import exceptions, types, util
+from sqlalchemy import exc, types, util
from sqlalchemy.sql import operators
import sqlalchemy.engine.url as url
from sqlalchemy.databases import mssql, oracle, mysql, postgres, firebird
@@ -40,17 +40,6 @@ class AdaptTest(TestBase):
assert isinstance(dialect_type, mssql.MSNVarchar)
assert dialect_type.get_col_spec() == 'NVARCHAR(10)'
- def testoracletext(self):
- dialect = oracle.OracleDialect()
- class MyDecoratedType(types.TypeDecorator):
- impl = String
- def copy(self):
- return MyDecoratedType()
-
- col = Column('', MyDecoratedType)
- dialect_type = col.type.dialect_impl(dialect)
- assert isinstance(dialect_type.impl, oracle.OracleText), repr(dialect_type.impl)
-
def testoracletimestamp(self):
dialect = oracle.OracleDialect()
@@ -77,29 +66,29 @@ class AdaptTest(TestBase):
firebird_dialect = firebird.FBDialect()
for dialect, start, test in [
- (oracle_dialect, String(), oracle.OracleText),
+ (oracle_dialect, String(), oracle.OracleString),
(oracle_dialect, VARCHAR(), oracle.OracleString),
(oracle_dialect, String(50), oracle.OracleString),
- (oracle_dialect, Unicode(), oracle.OracleText),
+ (oracle_dialect, Unicode(), oracle.OracleString),
(oracle_dialect, UnicodeText(), oracle.OracleText),
(oracle_dialect, NCHAR(), oracle.OracleString),
(oracle_dialect, oracle.OracleRaw(50), oracle.OracleRaw),
- (mysql_dialect, String(), mysql.MSText),
+ (mysql_dialect, String(), mysql.MSString),
(mysql_dialect, VARCHAR(), mysql.MSString),
(mysql_dialect, String(50), mysql.MSString),
- (mysql_dialect, Unicode(), mysql.MSText),
+ (mysql_dialect, Unicode(), mysql.MSString),
(mysql_dialect, UnicodeText(), mysql.MSText),
(mysql_dialect, NCHAR(), mysql.MSNChar),
- (postgres_dialect, String(), postgres.PGText),
+ (postgres_dialect, String(), postgres.PGString),
(postgres_dialect, VARCHAR(), postgres.PGString),
(postgres_dialect, String(50), postgres.PGString),
- (postgres_dialect, Unicode(), postgres.PGText),
+ (postgres_dialect, Unicode(), postgres.PGString),
(postgres_dialect, UnicodeText(), postgres.PGText),
(postgres_dialect, NCHAR(), postgres.PGString),
- (firebird_dialect, String(), firebird.FBText),
+ (firebird_dialect, String(), firebird.FBString),
(firebird_dialect, VARCHAR(), firebird.FBString),
(firebird_dialect, String(50), firebird.FBString),
- (firebird_dialect, Unicode(), firebird.FBText),
+ (firebird_dialect, Unicode(), firebird.FBString),
(firebird_dialect, UnicodeText(), firebird.FBText),
(firebird_dialect, NCHAR(), firebird.FBString),
]:
@@ -118,9 +107,9 @@ class UserDefinedTest(TestBase):
def testprocessing(self):
global users
- users.insert().execute(user_id = 2, goofy = 'jack', goofy2='jack', goofy3='jack', goofy4=u'jack', goofy5=u'jack', goofy6='jack', goofy7=u'jack', goofy8=12, goofy9=12)
- users.insert().execute(user_id = 3, goofy = 'lala', goofy2='lala', goofy3='lala', goofy4=u'lala', goofy5=u'lala', goofy6='lala', goofy7=u'lala', goofy8=15, goofy9=15)
- users.insert().execute(user_id = 4, goofy = 'fred', goofy2='fred', goofy3='fred', goofy4=u'fred', goofy5=u'fred', goofy6='fred', goofy7=u'fred', goofy8=9, goofy9=9)
+ users.insert().execute(user_id = 2, goofy = 'jack', goofy2='jack', goofy4=u'jack', goofy5=u'jack', goofy6='jack', goofy7=u'jack', goofy8=12, goofy9=12)
+ users.insert().execute(user_id = 3, goofy = 'lala', goofy2='lala', goofy4=u'lala', goofy5=u'lala', goofy6='lala', goofy7=u'lala', goofy8=15, goofy9=15)
+ users.insert().execute(user_id = 4, goofy = 'fred', goofy2='fred', goofy4=u'fred', goofy5=u'fred', goofy6='fred', goofy7=u'fred', goofy8=9, goofy9=9)
l = users.select().execute().fetchall()
for assertstr, assertint, assertint2, row in zip(
@@ -130,11 +119,11 @@ class UserDefinedTest(TestBase):
l
):
- for col in row[1:8]:
+ for col in row[1:7]:
self.assertEquals(col, assertstr)
- self.assertEquals(row[8], assertint)
- self.assertEquals(row[9], assertint2)
- for col in (row[4], row[5], row[7]):
+ self.assertEquals(row[7], assertint)
+ self.assertEquals(row[8], assertint2)
+ for col in (row[3], row[4], row[6]):
assert isinstance(col, unicode)
def setUpAll(self):
@@ -250,13 +239,10 @@ class UserDefinedTest(TestBase):
# decorated type with an argument, so its a String
Column('goofy2', MyDecoratedType(50), nullable = False),
- # decorated type without an argument, it will adapt_args to TEXT
- Column('goofy3', MyDecoratedType, nullable = False),
-
- Column('goofy4', MyUnicodeType, nullable = False),
- Column('goofy5', LegacyUnicodeType, nullable = False),
+ Column('goofy4', MyUnicodeType(50), nullable = False),
+ Column('goofy5', LegacyUnicodeType(50), nullable = False),
Column('goofy6', LegacyType, nullable = False),
- Column('goofy7', MyNewUnicodeType, nullable = False),
+ Column('goofy7', MyNewUnicodeType(50), nullable = False),
Column('goofy8', MyNewIntType, nullable = False),
Column('goofy9', MyNewIntSubClass, nullable = False),
@@ -344,7 +330,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
try:
unicode_table.insert().execute(unicode_varchar='not unicode')
assert False
- except exceptions.SAWarning, e:
+ except exc.SAWarning, e:
assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
unicode_engine = engines.utf8_engine(options={'convert_unicode':True,
@@ -353,7 +339,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
try:
unicode_engine.execute(unicode_table.insert(), plain_varchar='im not unicode')
assert False
- except exceptions.InvalidRequestError, e:
+ except exc.InvalidRequestError, e:
assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'"
@testing.emits_warning('.*non-unicode bind')
@@ -664,33 +650,20 @@ class DateTest(TestBase, AssertsExecutionResults):
t.drop(checkfirst=True)
class StringTest(TestBase, AssertsExecutionResults):
- def test_nolen_string_deprecated(self):
+
+
+ def test_nolength_string(self):
+ # this tests what happens with String DDL with no length.
+ # seems like we need to decide amongst "VARCHAR" (sqlite, postgres), "TEXT" (mysql)
+ # i.e. theres some inconsisency here.
+
metadata = MetaData(testing.db)
foo =Table('foo', metadata,
Column('one', String))
-
- # no warning
- select([func.count("*")], bind=testing.db).execute()
-
- try:
- # warning during CREATE
- foo.create()
- assert False
- except exceptions.SADeprecationWarning, e:
- assert "Using String type with no length" in str(e)
- assert re.search(r'\bone\b', str(e))
-
- bar = Table('bar', metadata, Column('one', String(40)))
-
- try:
- # no warning
- bar.create()
-
- # no warning for non-lengthed string
- select([func.count("*")], from_obj=bar).execute()
- finally:
- bar.drop()
-
+
+ foo.create()
+ foo.drop()
+
def _missing_decimal():
"""Python implementation supports decimals"""
try: