summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-01-05 13:00:21 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2019-01-05 17:23:48 -0500
commitea4feb3cec459f184c57ce8361fbdcea112e6667 (patch)
tree0b177dcaadbd9d29188c6a113eb54678382ba302
parent5ba7cb3e130ea68434047926d4f330319ed72af1 (diff)
downloadsqlalchemy-ea4feb3cec459f184c57ce8361fbdcea112e6667.tar.gz
flake8 refactor - test_core
A full rewrite of all imports and pep8 formatting using zimports, black, commits are broken into sections. Directories included in this commit: test/sql/ test/aaa_profiling/ Change-Id: I2bad1bd95dcf5cf76aad242280587e541e44fa9b (cherry picked from commit a4df4fc8bdf556c4153026ac0aa1b55750eaa8da)
-rw-r--r--test/aaa_profiling/test_zoomark_orm.py5
-rw-r--r--test/sql/test_compiler.py10
-rw-r--r--test/sql/test_functions.py99
-rw-r--r--test/sql/test_generative.py6
-rw-r--r--test/sql/test_insert_exec.py8
-rw-r--r--test/sql/test_metadata.py32
-rw-r--r--test/sql/test_resultset.py2
-rw-r--r--test/sql/test_returning.py2
-rw-r--r--test/sql/test_selectable.py718
-rw-r--r--test/sql/test_types.py15
-rw-r--r--test/sql/test_unicode.py5
11 files changed, 101 insertions, 801 deletions
diff --git a/test/aaa_profiling/test_zoomark_orm.py b/test/aaa_profiling/test_zoomark_orm.py
index 0c5525a06..0b1d05cc3 100644
--- a/test/aaa_profiling/test_zoomark_orm.py
+++ b/test/aaa_profiling/test_zoomark_orm.py
@@ -358,9 +358,8 @@ class ZooMarkTest(replay_fixture.ReplayFixtureTest):
list(
self.session.query(Zoo).filter(
and_(
- Zoo.Founded != None, # noqa
- Zoo.Founded < func.now(),
- )
+ Zoo.Founded != None, Zoo.Founded < func.now()
+ ) # noqa
)
)
)
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index d526d9eee..0e81cc30b 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -959,14 +959,14 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"zips", column("zipcode"), column("latitude"), column("longitude")
)
places = table("places", column("id"), column("nm"))
- zip = "12345"
+ zipcode = "12345"
qlat = (
- select([zips.c.latitude], zips.c.zipcode == zip)
+ select([zips.c.latitude], zips.c.zipcode == zipcode)
.correlate(None)
.as_scalar()
)
qlng = (
- select([zips.c.longitude], zips.c.zipcode == zip)
+ select([zips.c.longitude], zips.c.zipcode == zipcode)
.correlate(None)
.as_scalar()
)
@@ -978,7 +978,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
zips.c.zipcode,
func.latlondist(qlat, qlng).label("dist"),
],
- zips.c.zipcode == zip,
+ zips.c.zipcode == zipcode,
order_by=["dist", places.c.nm],
)
@@ -3370,7 +3370,7 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
pass
@compiles(MyType)
- def compile(element, compiler, **kw):
+ def compile_(element, compiler, **kw):
raise exc.CompileError("Couldn't compile type")
return MyType
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py
index faaa36f03..73c03bbd4 100644
--- a/test/sql/test_functions.py
+++ b/test/sql/test_functions.py
@@ -730,7 +730,7 @@ class ExecuteTest(fixtures.TestBase):
type = Date()
@compiles(myfunc)
- def compile(elem, compiler, **kw):
+ def compile_(elem, compiler, **kw):
return compiler.process(func.current_date())
conn = testing.db.connect()
@@ -757,6 +757,7 @@ class ExecuteTest(fixtures.TestBase):
ret.close()
@engines.close_first
+ @testing.provide_metadata
def test_update(self):
"""
Tests sending functions and SQL expressions to the VALUES and SET
@@ -789,56 +790,52 @@ class ExecuteTest(fixtures.TestBase):
Column("stuff", String(20), onupdate="thisisstuff"),
)
meta.create_all()
- try:
- t.insert(values=dict(value=func.length("one"))).execute()
- assert t.select().execute().first()["value"] == 3
- t.update(values=dict(value=func.length("asfda"))).execute()
- assert t.select().execute().first()["value"] == 5
-
- r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
- id = r.inserted_primary_key[0]
- assert t.select(t.c.id == id).execute().first()["value"] == 9
- t.update(values={t.c.value: func.length("asdf")}).execute()
- assert t.select().execute().first()["value"] == 4
- t2.insert().execute()
- t2.insert(values=dict(value=func.length("one"))).execute()
- t2.insert(values=dict(value=func.length("asfda") + -19)).execute(
- stuff="hi"
- )
-
- res = exec_sorted(select([t2.c.value, t2.c.stuff]))
- eq_(res, [(-14, "hi"), (3, None), (7, None)])
-
- t2.update(values=dict(value=func.length("asdsafasd"))).execute(
- stuff="some stuff"
- )
- assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [
- (9, "some stuff"),
- (9, "some stuff"),
- (9, "some stuff"),
- ]
-
- t2.delete().execute()
-
- t2.insert(values=dict(value=func.length("one") + 8)).execute()
- assert t2.select().execute().first()["value"] == 11
-
- t2.update(values=dict(value=func.length("asfda"))).execute()
- eq_(
- select([t2.c.value, t2.c.stuff]).execute().first(),
- (5, "thisisstuff"),
- )
-
- t2.update(
- values={
- t2.c.value: func.length("asfdaasdf"),
- t2.c.stuff: "foo",
- }
- ).execute()
- print("HI", select([t2.c.value, t2.c.stuff]).execute().first())
- eq_(select([t2.c.value, t2.c.stuff]).execute().first(), (9, "foo"))
- finally:
- meta.drop_all()
+ t.insert(values=dict(value=func.length("one"))).execute()
+ assert t.select().execute().first()["value"] == 3
+ t.update(values=dict(value=func.length("asfda"))).execute()
+ assert t.select().execute().first()["value"] == 5
+
+ r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
+ id_ = r.inserted_primary_key[0]
+ assert t.select(t.c.id == id_).execute().first()["value"] == 9
+ t.update(values={t.c.value: func.length("asdf")}).execute()
+ assert t.select().execute().first()["value"] == 4
+ t2.insert().execute()
+ t2.insert(values=dict(value=func.length("one"))).execute()
+ t2.insert(values=dict(value=func.length("asfda") + -19)).execute(
+ stuff="hi"
+ )
+
+ res = exec_sorted(select([t2.c.value, t2.c.stuff]))
+ eq_(res, [(-14, "hi"), (3, None), (7, None)])
+
+ t2.update(values=dict(value=func.length("asdsafasd"))).execute(
+ stuff="some stuff"
+ )
+ assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [
+ (9, "some stuff"),
+ (9, "some stuff"),
+ (9, "some stuff"),
+ ]
+
+ t2.delete().execute()
+
+ t2.insert(values=dict(value=func.length("one") + 8)).execute()
+ assert t2.select().execute().first()["value"] == 11
+
+ t2.update(values=dict(value=func.length("asfda"))).execute()
+ eq_(
+ select([t2.c.value, t2.c.stuff]).execute().first(),
+ (5, "thisisstuff"),
+ )
+
+ t2.update(
+ values={
+ t2.c.value: func.length("asfdaasdf"),
+ t2.c.stuff: "foo",
+ }
+ ).execute()
+ eq_(select([t2.c.value, t2.c.stuff]).execute().first(), (9, "foo"))
@testing.fails_on_everything_except("postgresql")
def test_as_from(self):
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py
index 350438f25..6c6ddec05 100644
--- a/test/sql/test_generative.py
+++ b/test/sql/test_generative.py
@@ -226,9 +226,9 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
assert CustomObj.__visit_name__ == Column.__visit_name__ == "column"
foo, bar = CustomObj("foo", String), CustomObj("bar", String)
- bin = foo == bar
- set(ClauseVisitor().iterate(bin))
- assert set(ClauseVisitor().iterate(bin)) == set([foo, bar, bin])
+ bin_ = foo == bar
+ set(ClauseVisitor().iterate(bin_))
+ assert set(ClauseVisitor().iterate(bin_)) == set([foo, bar, bin_])
class BinaryEndpointTraversalTest(fixtures.TestBase):
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py
index e6b989e88..fafe7cc9b 100644
--- a/test/sql/test_insert_exec.py
+++ b/test/sql/test_insert_exec.py
@@ -96,16 +96,16 @@ class InsertExecTest(fixtures.TablesTest):
result = engine.execute(table_.insert(), **values)
ret = values.copy()
- for col, id in zip(
+ for col, id_ in zip(
table_.primary_key, result.inserted_primary_key
):
- ret[col.key] = id
+ ret[col.key] = id_
if result.lastrow_has_defaults():
criterion = and_(
*[
- col == id
- for col, id in zip(
+ col == id_
+ for col, id_ in zip(
table_.primary_key, result.inserted_primary_key
)
]
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index 72896c945..9c60cc981 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -530,7 +530,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
(
name,
metadata,
- schema,
+ schema_,
quote_schema,
exp_schema,
exp_quote_schema,
@@ -558,8 +558,8 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
]
):
kw = {}
- if schema is not None:
- kw["schema"] = schema
+ if schema_ is not None:
+ kw["schema"] = schema_
if quote_schema is not None:
kw["quote_schema"] = quote_schema
t = Table(name, metadata, **kw)
@@ -3516,12 +3516,12 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
from sqlalchemy.sql import select
class MyColumn(Column):
- def _constructor(self, name, type, **kw):
+ def _constructor(self, name, type_, **kw):
kw["name"] = name
- return MyColumn(type, **kw)
+ return MyColumn(type_, **kw)
- def __init__(self, type, **kw):
- Column.__init__(self, type, **kw)
+ def __init__(self, type_, **kw):
+ Column.__init__(self, type_, **kw)
def my_goofy_thing(self):
return "hi"
@@ -3531,11 +3531,11 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
s = compiler.visit_column(element, **kw)
return s + "-"
- id = MyColumn(Integer, primary_key=True)
- id.name = "id"
+ id_ = MyColumn(Integer, primary_key=True)
+ id_.name = "id"
name = MyColumn(String)
name.name = "name"
- t1 = Table("foo", MetaData(), id, name)
+ t1 = Table("foo", MetaData(), id_, name)
# goofy thing
eq_(t1.c.name.my_goofy_thing(), "hi")
@@ -3559,14 +3559,14 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
from sqlalchemy.sql import select
class MyColumn(Column):
- def __init__(self, type, **kw):
- Column.__init__(self, type, **kw)
+ def __init__(self, type_, **kw):
+ Column.__init__(self, type_, **kw)
- id = MyColumn(Integer, primary_key=True)
- id.name = "id"
+ id_ = MyColumn(Integer, primary_key=True)
+ id_.name = "id"
name = MyColumn(String)
name.name = "name"
- t1 = Table("foo", MetaData(), id, name)
+ t1 = Table("foo", MetaData(), id_, name)
assert_raises_message(
TypeError,
"Could not create a copy of this <class "
@@ -3581,7 +3581,7 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
from sqlalchemy.ext.compiler import compiles, deregister
@compiles(schema.CreateColumn)
- def compile(element, compiler, **kw):
+ def compile_(element, compiler, **kw):
column = element.element
if "special" not in column.info:
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index d9ce83838..3bd61b1f8 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -3,8 +3,8 @@ import operator
from sqlalchemy import CHAR
from sqlalchemy import column
-from sqlalchemy import exc as sa_exc
from sqlalchemy import exc
+from sqlalchemy import exc as sa_exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import INT
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
index 38ba3a43f..9f77a6783 100644
--- a/test/sql/test_returning.py
+++ b/test/sql/test_returning.py
@@ -311,7 +311,7 @@ class ReturnDefaultsTest(fixtures.TablesTest):
pass
@compiles(IncDefault)
- def compile(element, compiler, **kw):
+ def compile_(element, compiler, **kw):
return str(next(counter))
Table(
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 00b8fea4a..578f242a7 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -1,12 +1,10 @@
"""Test various algorithmic properties of selectables."""
-from sqlalchemy import alias
from sqlalchemy import and_
from sqlalchemy import bindparam
from sqlalchemy import Boolean
from sqlalchemy import cast
from sqlalchemy import Column
-from sqlalchemy import column
from sqlalchemy import exc
from sqlalchemy import exists
from sqlalchemy import ForeignKey
@@ -23,20 +21,15 @@ from sqlalchemy import select
from sqlalchemy import Sequence
from sqlalchemy import String
from sqlalchemy import Table
-from sqlalchemy import table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import type_coerce
from sqlalchemy import TypeDecorator
from sqlalchemy import union
from sqlalchemy import util
-from sqlalchemy.schema import Column
-from sqlalchemy.schema import MetaData
-from sqlalchemy.schema import Table
from sqlalchemy.sql import column
from sqlalchemy.sql import elements
from sqlalchemy.sql import expression
-from sqlalchemy.sql import null
from sqlalchemy.sql import table
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql import visitors
@@ -718,701 +711,22 @@ class SelectableTest(
def test_multi_label_chain_naming_col(self):
# See [ticket:2167] for this one.
- l1 = table1.c.col1.label("a")
- l2 = select([l1]).label("b")
+ l1 = table1.c.col1.label('a')
+ l2 = select([l1]).label('b')
s = select([l2])
assert s.c.b is not None
self.assert_compile(
s.select(),
- "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)",
+ "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)"
)
- s2 = select([s.label("c")])
+ s2 = select([s.label('c')])
self.assert_compile(
s2.select(),
"SELECT c FROM (SELECT (SELECT ("
- "SELECT table1.col1 AS a FROM table1) AS b) AS c)",
+ "SELECT table1.col1 AS a FROM table1) AS b) AS c)"
)
- def test_self_referential_select_raises(self):
- t = table("t", column("x"))
-
- s = select([t])
-
- s.append_whereclause(s.c.x > 5)
- assert_raises_message(
- exc.InvalidRequestError,
- r"select\(\) construct refers to itself as a FROM",
- s.compile,
- )
-
- def test_unusual_column_elements_text(self):
- """test that .c excludes text()."""
-
- s = select([table1.c.col1, text("foo")])
- eq_(list(s.c), [s.c.col1])
-
- def test_unusual_column_elements_clauselist(self):
- """Test that raw ClauseList is expanded into .c."""
-
- from sqlalchemy.sql.expression import ClauseList
-
- s = select([table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)])
- eq_(list(s.c), [s.c.col1, s.c.col2, s.c.col3])
-
- def test_unusual_column_elements_boolean_clauselist(self):
- """test that BooleanClauseList is placed as single element in .c."""
-
- c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4)
- s = select([table1.c.col1, c2])
- eq_(list(s.c), [s.c.col1, s.corresponding_column(c2)])
-
- def test_from_list_deferred_constructor(self):
- c1 = Column("c1", Integer)
- c2 = Column("c2", Integer)
-
- s = select([c1])
-
- t = Table("t", MetaData(), c1, c2)
-
- eq_(c1._from_objects, [t])
- eq_(c2._from_objects, [t])
-
- self.assert_compile(select([c1]), "SELECT t.c1 FROM t")
- self.assert_compile(select([c2]), "SELECT t.c2 FROM t")
-
- def test_from_list_deferred_whereclause(self):
- c1 = Column("c1", Integer)
- c2 = Column("c2", Integer)
-
- s = select([c1]).where(c1 == 5)
-
- t = Table("t", MetaData(), c1, c2)
-
- eq_(c1._from_objects, [t])
- eq_(c2._from_objects, [t])
-
- self.assert_compile(select([c1]), "SELECT t.c1 FROM t")
- self.assert_compile(select([c2]), "SELECT t.c2 FROM t")
-
- def test_from_list_deferred_fromlist(self):
- m = MetaData()
- t1 = Table("t1", m, Column("x", Integer))
-
- c1 = Column("c1", Integer)
- s = select([c1]).where(c1 == 5).select_from(t1)
-
- t2 = Table("t2", MetaData(), c1)
-
- eq_(c1._from_objects, [t2])
-
- self.assert_compile(select([c1]), "SELECT t2.c1 FROM t2")
-
- def test_from_list_deferred_cloning(self):
- c1 = Column("c1", Integer)
- c2 = Column("c2", Integer)
-
- s = select([c1])
- s2 = select([c2])
- s3 = sql_util.ClauseAdapter(s).traverse(s2)
-
- Table("t", MetaData(), c1, c2)
-
- self.assert_compile(s3, "SELECT t.c2 FROM t")
-
- def test_from_list_with_columns(self):
- table1 = table("t1", column("a"))
- table2 = table("t2", column("b"))
- s1 = select([table1.c.a, table2.c.b])
- self.assert_compile(s1, "SELECT t1.a, t2.b FROM t1, t2")
- s2 = s1.with_only_columns([table2.c.b])
- self.assert_compile(s2, "SELECT t2.b FROM t2")
-
- s3 = sql_util.ClauseAdapter(table1).traverse(s1)
- self.assert_compile(s3, "SELECT t1.a, t2.b FROM t1, t2")
- s4 = s3.with_only_columns([table2.c.b])
- self.assert_compile(s4, "SELECT t2.b FROM t2")
-
- def test_from_list_warning_against_existing(self):
- c1 = Column("c1", Integer)
- s = select([c1])
-
- # force a compile.
- self.assert_compile(s, "SELECT c1")
-
- Table("t", MetaData(), c1)
-
- self.assert_compile(s, "SELECT t.c1 FROM t")
-
- def test_from_list_recovers_after_warning(self):
- c1 = Column("c1", Integer)
- c2 = Column("c2", Integer)
-
- s = select([c1])
-
- # force a compile.
- eq_(str(s), "SELECT c1")
-
- @testing.emits_warning()
- def go():
- return Table("t", MetaData(), c1, c2)
-
- t = go()
-
- eq_(c1._from_objects, [t])
- eq_(c2._from_objects, [t])
-
- # 's' has been baked. Can't afford
- # not caching select._froms.
- # hopefully the warning will clue the user
- self.assert_compile(s, "SELECT t.c1 FROM t")
- self.assert_compile(select([c1]), "SELECT t.c1 FROM t")
- self.assert_compile(select([c2]), "SELECT t.c2 FROM t")
-
- def test_label_gen_resets_on_table(self):
- c1 = Column("c1", Integer)
- eq_(c1._label, "c1")
- Table("t1", MetaData(), c1)
- eq_(c1._label, "t1_c1")
-
-
-class RefreshForNewColTest(fixtures.TestBase):
- def test_join_uninit(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- j = a.join(b, a.c.x == b.c.y)
-
- q = column("q")
- b.append_column(q)
- j._refresh_for_new_column(q)
- assert j.c.b_q is q
-
- def test_join_init(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- j = a.join(b, a.c.x == b.c.y)
- j.c
- q = column("q")
- b.append_column(q)
- j._refresh_for_new_column(q)
- assert j.c.b_q is q
-
- def test_join_samename_init(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- j = a.join(b, a.c.x == b.c.y)
- j.c
- q = column("x")
- b.append_column(q)
- j._refresh_for_new_column(q)
- assert j.c.b_x is q
-
- def test_select_samename_init(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- s = select([a, b]).apply_labels()
- s.c
- q = column("x")
- b.append_column(q)
- s._refresh_for_new_column(q)
- assert q in s.c.b_x.proxy_set
-
- def test_aliased_select_samename_uninit(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- s = select([a, b]).apply_labels().alias()
- q = column("x")
- b.append_column(q)
- s._refresh_for_new_column(q)
- assert q in s.c.b_x.proxy_set
-
- def test_aliased_select_samename_init(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- s = select([a, b]).apply_labels().alias()
- s.c
- q = column("x")
- b.append_column(q)
- s._refresh_for_new_column(q)
- assert q in s.c.b_x.proxy_set
-
- def test_aliased_select_irrelevant(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- c = table("c", column("z"))
- s = select([a, b]).apply_labels().alias()
- s.c
- q = column("x")
- c.append_column(q)
- s._refresh_for_new_column(q)
- assert "c_x" not in s.c
-
- def test_aliased_select_no_cols_clause(self):
- a = table("a", column("x"))
- s = select([a.c.x]).apply_labels().alias()
- s.c
- q = column("q")
- a.append_column(q)
- s._refresh_for_new_column(q)
- assert "a_q" not in s.c
-
- def test_union_uninit(self):
- a = table("a", column("x"))
- s1 = select([a])
- s2 = select([a])
- s3 = s1.union(s2)
- q = column("q")
- a.append_column(q)
- s3._refresh_for_new_column(q)
- assert a.c.q in s3.c.q.proxy_set
-
- def test_union_init_raises(self):
- a = table("a", column("x"))
- s1 = select([a])
- s2 = select([a])
- s3 = s1.union(s2)
- s3.c
- q = column("q")
- a.append_column(q)
- assert_raises_message(
- NotImplementedError,
- "CompoundSelect constructs don't support addition of "
- "columns to underlying selectables",
- s3._refresh_for_new_column,
- q,
- )
-
- def test_nested_join_uninit(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- c = table("c", column("z"))
- j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z)
-
- q = column("q")
- b.append_column(q)
- j._refresh_for_new_column(q)
- assert j.c.b_q is q
-
- def test_nested_join_init(self):
- a = table("a", column("x"))
- b = table("b", column("y"))
- c = table("c", column("z"))
- j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z)
-
- j.c
- q = column("q")
- b.append_column(q)
- j._refresh_for_new_column(q)
- assert j.c.b_q is q
-
- def test_fk_table(self):
- m = MetaData()
- fk = ForeignKey("x.id")
- Table("x", m, Column("id", Integer))
- a = Table("a", m, Column("x", Integer, fk))
- a.c
-
- q = Column("q", Integer)
- a.append_column(q)
- a._refresh_for_new_column(q)
- eq_(a.foreign_keys, set([fk]))
-
- fk2 = ForeignKey("g.id")
- p = Column("p", Integer, fk2)
- a.append_column(p)
- a._refresh_for_new_column(p)
- eq_(a.foreign_keys, set([fk, fk2]))
-
- def test_fk_join(self):
- m = MetaData()
- fk = ForeignKey("x.id")
- Table("x", m, Column("id", Integer))
- a = Table("a", m, Column("x", Integer, fk))
- b = Table("b", m, Column("y", Integer))
- j = a.join(b, a.c.x == b.c.y)
- j.c
-
- q = Column("q", Integer)
- b.append_column(q)
- j._refresh_for_new_column(q)
- eq_(j.foreign_keys, set([fk]))
-
- fk2 = ForeignKey("g.id")
- p = Column("p", Integer, fk2)
- b.append_column(p)
- j._refresh_for_new_column(p)
- eq_(j.foreign_keys, set([fk, fk2]))
-
-
-class AnonLabelTest(fixtures.TestBase):
-
- """Test behaviors fixed by [ticket:2168]."""
-
- def test_anon_labels_named_column(self):
- c1 = column("x")
-
- assert c1.label(None) is not c1
- eq_(str(select([c1.label(None)])), "SELECT x AS x_1")
-
- def test_anon_labels_literal_column(self):
- c1 = literal_column("x")
- assert c1.label(None) is not c1
- eq_(str(select([c1.label(None)])), "SELECT x AS x_1")
-
- def test_anon_labels_func(self):
- c1 = func.count("*")
- assert c1.label(None) is not c1
-
- eq_(str(select([c1])), "SELECT count(:count_2) AS count_1")
- c2 = select([c1]).compile()
-
- eq_(str(select([c1.label(None)])), "SELECT count(:count_2) AS count_1")
-
- def test_named_labels_named_column(self):
- c1 = column("x")
- eq_(str(select([c1.label("y")])), "SELECT x AS y")
-
- def test_named_labels_literal_column(self):
- c1 = literal_column("x")
- eq_(str(select([c1.label("y")])), "SELECT x AS y")
-
-
-class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = "default"
-
- def test_flat_ok_on_non_join(self):
- a = table("a", column("a"))
- s = a.select()
- self.assert_compile(
- s.alias(flat=True).select(),
- "SELECT anon_1.a FROM (SELECT a.a AS a FROM a) AS anon_1",
- )
-
- def test_join_alias(self):
- a = table("a", column("a"))
- b = table("b", column("b"))
- self.assert_compile(
- a.join(b, a.c.a == b.c.b).alias(),
- "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b",
- )
-
- def test_join_standalone_alias(self):
- a = table("a", column("a"))
- b = table("b", column("b"))
- self.assert_compile(
- alias(a.join(b, a.c.a == b.c.b)),
- "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b",
- )
-
- def test_join_alias_flat(self):
- a = table("a", column("a"))
- b = table("b", column("b"))
- self.assert_compile(
- a.join(b, a.c.a == b.c.b).alias(flat=True),
- "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b",
- )
-
- def test_join_standalone_alias_flat(self):
- a = table("a", column("a"))
- b = table("b", column("b"))
- self.assert_compile(
- alias(a.join(b, a.c.a == b.c.b), flat=True),
- "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b",
- )
-
- def test_composed_join_alias_flat(self):
- a = table("a", column("a"))
- b = table("b", column("b"))
- c = table("c", column("c"))
- d = table("d", column("d"))
-
- j1 = a.join(b, a.c.a == b.c.b)
- j2 = c.join(d, c.c.c == d.c.d)
- self.assert_compile(
- j1.join(j2, b.c.b == c.c.c).alias(flat=True),
- "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b JOIN "
- "(c AS c_1 JOIN d AS d_1 ON c_1.c = d_1.d) ON b_1.b = c_1.c",
- )
-
- def test_composed_join_alias(self):
- a = table("a", column("a"))
- b = table("b", column("b"))
- c = table("c", column("c"))
- d = table("d", column("d"))
-
- j1 = a.join(b, a.c.a == b.c.b)
- j2 = c.join(d, c.c.c == d.c.d)
- self.assert_compile(
- select([j1.join(j2, b.c.b == c.c.c).alias()]),
- "SELECT anon_1.a_a, anon_1.b_b, anon_1.c_c, anon_1.d_d "
- "FROM (SELECT a.a AS a_a, b.b AS b_b, c.c AS c_c, d.d AS d_d "
- "FROM a JOIN b ON a.a = b.b "
- "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1",
- )
-
-
-class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = "default"
-
- def test_join_condition(self):
- m = MetaData()
- t1 = Table("t1", m, Column("id", Integer))
- t2 = Table(
- "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id"))
- )
- t3 = Table(
- "t3",
- m,
- Column("id", Integer),
- Column("t1id", ForeignKey("t1.id")),
- Column("t2id", ForeignKey("t2.id")),
- )
- t4 = Table(
- "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id"))
- )
- t5 = Table(
- "t5",
- m,
- Column("t1id1", ForeignKey("t1.id")),
- Column("t1id2", ForeignKey("t1.id")),
- )
-
- t1t2 = t1.join(t2)
- t2t3 = t2.join(t3)
-
- for (left, right, a_subset, expected) in [
- (t1, t2, None, t1.c.id == t2.c.t1id),
- (t1t2, t3, t2, t1t2.c.t2_id == t3.c.t2id),
- (t2t3, t1, t3, t1.c.id == t3.c.t1id),
- (t2t3, t4, None, t2t3.c.t2_id == t4.c.t2id),
- (t2t3, t4, t3, t2t3.c.t2_id == t4.c.t2id),
- (t2t3.join(t1), t4, None, t2t3.c.t2_id == t4.c.t2id),
- (t2t3.join(t1), t4, t1, t2t3.c.t2_id == t4.c.t2id),
- (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id),
- ]:
- assert expected.compare(
- sql_util.join_condition(left, right, a_subset=a_subset)
- )
-
- # these are ambiguous, or have no joins
- for left, right, a_subset in [
- (t1t2, t3, None),
- (t2t3, t1, None),
- (t1, t4, None),
- (t1t2, t2t3, None),
- (t5, t1, None),
- (t5.select(use_labels=True), t1, None),
- ]:
- assert_raises(
- exc.ArgumentError,
- sql_util.join_condition,
- left,
- right,
- a_subset=a_subset,
- )
-
- als = t2t3.alias()
- # test join's behavior, including natural
- for left, right, expected in [
- (t1, t2, t1.c.id == t2.c.t1id),
- (t1t2, t3, t1t2.c.t2_id == t3.c.t2id),
- (t2t3, t1, t1.c.id == t3.c.t1id),
- (t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
- (t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
- (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
- (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
- (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id),
- ]:
- assert expected.compare(left.join(right).onclause)
-
- # these are right-nested joins
- j = t1t2.join(t2t3)
- assert j.onclause.compare(t2.c.id == t3.c.t2id)
- self.assert_compile(
- j,
- "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
- "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id",
- )
-
- st2t3 = t2t3.select(use_labels=True)
- j = t1t2.join(st2t3)
- assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id)
- self.assert_compile(
- j,
- "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
- "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, "
- "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id "
- "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id",
- )
-
- def test_join_multiple_equiv_fks(self):
- m = MetaData()
- t1 = Table("t1", m, Column("id", Integer, primary_key=True))
- t2 = Table(
- "t2",
- m,
- Column("t1id", Integer, ForeignKey("t1.id"), ForeignKey("t1.id")),
- )
-
- assert sql_util.join_condition(t1, t2).compare(t1.c.id == t2.c.t1id)
-
- def test_join_cond_no_such_unrelated_table(self):
- m = MetaData()
- # bounding the "good" column with two "bad" ones is so to
- # try to get coverage to get the "continue" statements
- # in the loop...
- t1 = Table(
- "t1",
- m,
- Column("y", Integer, ForeignKey("t22.id")),
- Column("x", Integer, ForeignKey("t2.id")),
- Column("q", Integer, ForeignKey("t22.id")),
- )
- t2 = Table("t2", m, Column("id", Integer))
- assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
- assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
-
- def test_join_cond_no_such_unrelated_column(self):
- m = MetaData()
- t1 = Table(
- "t1",
- m,
- Column("x", Integer, ForeignKey("t2.id")),
- Column("y", Integer, ForeignKey("t3.q")),
- )
- t2 = Table("t2", m, Column("id", Integer))
- Table("t3", m, Column("id", Integer))
- assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
- assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
-
- def test_join_cond_no_such_related_table(self):
- m1 = MetaData()
- m2 = MetaData()
- t1 = Table("t1", m1, Column("x", Integer, ForeignKey("t2.id")))
- t2 = Table("t2", m2, Column("id", Integer))
- assert_raises_message(
- exc.NoReferencedTableError,
- "Foreign key associated with column 't1.x' could not find "
- "table 't2' with which to generate a foreign key to "
- "target column 'id'",
- sql_util.join_condition,
- t1,
- t2,
- )
-
- assert_raises_message(
- exc.NoReferencedTableError,
- "Foreign key associated with column 't1.x' could not find "
- "table 't2' with which to generate a foreign key to "
- "target column 'id'",
- sql_util.join_condition,
- t2,
- t1,
- )
-
- def test_join_cond_no_such_related_column(self):
- m = MetaData()
- t1 = Table("t1", m, Column("x", Integer, ForeignKey("t2.q")))
- t2 = Table("t2", m, Column("id", Integer))
- assert_raises_message(
- exc.NoReferencedColumnError,
- "Could not initialize target column for "
- "ForeignKey 't2.q' on table 't1': "
- "table 't2' has no column named 'q'",
- sql_util.join_condition,
- t1,
- t2,
- )
-
- assert_raises_message(
- exc.NoReferencedColumnError,
- "Could not initialize target column for "
- "ForeignKey 't2.q' on table 't1': "
- "table 't2' has no column named 'q'",
- sql_util.join_condition,
- t2,
- t1,
- )
-
-
-class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
- def test_join_pk_collapse_implicit(self):
- """test that redundant columns in a join get 'collapsed' into a
- minimal primary key, which is the root column along a chain of
- foreign key relationships."""
-
- meta = MetaData()
- a = Table("a", meta, Column("id", Integer, primary_key=True))
- b = Table(
- "b",
- meta,
- Column("id", Integer, ForeignKey("a.id"), primary_key=True),
- )
- c = Table(
- "c",
- meta,
- Column("id", Integer, ForeignKey("b.id"), primary_key=True),
- )
- d = Table(
- "d",
- meta,
- Column("id", Integer, ForeignKey("c.id"), primary_key=True),
- )
- assert c.c.id.references(b.c.id)
- assert not d.c.id.references(a.c.id)
- assert list(a.join(b).primary_key) == [a.c.id]
- assert list(b.join(c).primary_key) == [b.c.id]
- assert list(a.join(b).join(c).primary_key) == [a.c.id]
- assert list(b.join(c).join(d).primary_key) == [b.c.id]
- assert list(d.join(c).join(b).primary_key) == [b.c.id]
- assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
-
- def test_join_pk_collapse_explicit(self):
- """test that redundant columns in a join get 'collapsed' into a
- minimal primary key, which is the root column along a chain of
- explicit join conditions."""
-
- meta = MetaData()
- a = Table(
- "a",
- meta,
- Column("id", Integer, primary_key=True),
- Column("x", Integer),
- )
- b = Table(
- "b",
- meta,
- Column("id", Integer, ForeignKey("a.id"), primary_key=True),
- Column("x", Integer),
- )
- c = Table(
- "c",
- meta,
- Column("id", Integer, ForeignKey("b.id"), primary_key=True),
- Column("x", Integer),
- )
- d = Table(
- "d",
- meta,
- Column("id", Integer, ForeignKey("c.id"), primary_key=True),
- Column("x", Integer),
- )
- print(list(a.join(b, a.c.x == b.c.id).primary_key))
- assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id]
- assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id]
- assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) == [a.c.id]
- assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) == [b.c.id]
- assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) == [b.c.id]
- assert list(
- d.join(b, d.c.id == b.c.id).join(c, b.c.id == c.c.x).primary_key
- ) == [b.c.id]
- assert list(
- a.join(b).join(c, c.c.id == b.c.x).join(d).primary_key
- ) == [a.c.id]
- assert list(
- a.join(b, and_(a.c.id == b.c.id, a.c.x == b.c.id)).primary_key
- ) == [a.c.id]
-
def test_init_doesnt_blowitaway(self):
meta = MetaData()
a = Table(
@@ -2013,17 +1327,17 @@ class AnnotationsTest(fixtures.TestBase):
def test_annotated_visit(self):
table1 = table("table1", column("col1"), column("col2"))
- bin = table1.c.col1 == bindparam("foo", value=None)
- assert str(bin) == "table1.col1 = :foo"
+ bin_ = table1.c.col1 == bindparam("foo", value=None)
+ assert str(bin_) == "table1.col1 = :foo"
def visit_binary(b):
b.right = table1.c.col2
- b2 = visitors.cloned_traverse(bin, {}, {"binary": visit_binary})
+ b2 = visitors.cloned_traverse(bin_, {}, {"binary": visit_binary})
assert str(b2) == "table1.col1 = table1.col2"
b3 = visitors.cloned_traverse(
- bin._annotate({}), {}, {"binary": visit_binary}
+ bin_._annotate({}), {}, {"binary": visit_binary}
)
assert str(b3) == "table1.col1 = table1.col2"
@@ -2074,11 +1388,11 @@ class AnnotationsTest(fixtures.TestBase):
def test_deannotate(self):
table1 = table("table1", column("col1"), column("col2"))
- bin = table1.c.col1 == bindparam("foo", value=None)
+ bin_ = table1.c.col1 == bindparam("foo", value=None)
- b2 = sql_util._deep_annotate(bin, {"_orm_adapt": True})
+ b2 = sql_util._deep_annotate(bin_, {"_orm_adapt": True})
b3 = sql_util._deep_deannotate(b2)
- b4 = sql_util._deep_deannotate(bin)
+ b4 = sql_util._deep_deannotate(bin_)
for elem in (b2._annotations, b2.left._annotations):
assert "_orm_adapt" in elem
@@ -2091,12 +1405,12 @@ class AnnotationsTest(fixtures.TestBase):
):
assert elem == {}
- assert b2.left is not bin.left
- assert b3.left is not b2.left and b2.left is not bin.left
- assert b4.left is bin.left # since column is immutable
+ assert b2.left is not bin_.left
+ assert b3.left is not b2.left and b2.left is not bin_.left
+ assert b4.left is bin_.left # since column is immutable
# deannotate copies the element
assert (
- bin.right is not b2.right
+ bin_.right is not b2.right
and b2.right is not b3.right
and b3.right is not b4.right
)
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index f7d71ec44..c54fe1e54 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -64,6 +64,7 @@ from sqlalchemy.sql import column
from sqlalchemy.sql import ddl
from sqlalchemy.sql import null
from sqlalchemy.sql import operators
+from sqlalchemy.sql import sqltypes
from sqlalchemy.sql import table
from sqlalchemy.sql import visitors
from sqlalchemy.testing import assert_raises
@@ -200,8 +201,6 @@ class AdaptTest(fixtures.TestBase):
):
yield True, subcl, [typ]
- from sqlalchemy.sql import sqltypes
-
for is_down_adaption, typ, target_adaptions in adaptions():
if typ in (types.TypeDecorator, types.TypeEngine, types.Variant):
continue
@@ -2756,8 +2755,8 @@ class NumericRawSQLTest(fixtures.TestBase):
"""
- def _fixture(self, metadata, type, data):
- t = Table("t", metadata, Column("val", type))
+ def _fixture(self, metadata, type_, data):
+ t = Table("t", metadata, Column("val", type_))
metadata.create_all()
t.insert().execute(val=data)
@@ -3122,14 +3121,6 @@ class BooleanTest(
TypeError, "Not a boolean value: 'foo'", proc, "foo"
)
- def test_literal_processor_coercion_native_int_out_of_range(self):
- proc = Boolean().literal_processor(
- default.DefaultDialect(supports_native_boolean=True)
- )
- assert_raises_message(
- ValueError, "Value 15 is not None, True, or False", proc, 15
- )
-
class PickleTest(fixtures.TestBase):
def test_eq_comparison(self):
diff --git a/test/sql/test_unicode.py b/test/sql/test_unicode.py
index fc4ac3de2..5b51644e6 100644
--- a/test/sql/test_unicode.py
+++ b/test/sql/test_unicode.py
@@ -1,12 +1,10 @@
# coding: utf-8
"""verrrrry basic unicode column name testing"""
-from sqlalchemy import Column
from sqlalchemy import desc
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import MetaData
-from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
@@ -198,7 +196,8 @@ class UnicodeSchemaTest(fixtures.TestBase):
repr(t),
(
"Table('\\u6e2c\\u8a66', MetaData(bind=None), "
- "Column('\\u6e2c\\u8a66_id', Integer(), table=<\u6e2c\u8a66>), "
+ "Column('\\u6e2c\\u8a66_id', Integer(), "
+ "table=<\u6e2c\u8a66>), "
"schema=None)"
),
)