diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-01-05 13:00:21 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-01-05 17:23:48 -0500 |
commit | ea4feb3cec459f184c57ce8361fbdcea112e6667 (patch) | |
tree | 0b177dcaadbd9d29188c6a113eb54678382ba302 | |
parent | 5ba7cb3e130ea68434047926d4f330319ed72af1 (diff) | |
download | sqlalchemy-ea4feb3cec459f184c57ce8361fbdcea112e6667.tar.gz |
flake8 refactor - test_core
A full rewrite of all imports and pep8 formatting using zimports, black,
commits are broken into sections.
Directories included in this commit:
test/sql/ test/aaa_profiling/
Change-Id: I2bad1bd95dcf5cf76aad242280587e541e44fa9b
(cherry picked from commit a4df4fc8bdf556c4153026ac0aa1b55750eaa8da)
-rw-r--r-- | test/aaa_profiling/test_zoomark_orm.py | 5 | ||||
-rw-r--r-- | test/sql/test_compiler.py | 10 | ||||
-rw-r--r-- | test/sql/test_functions.py | 99 | ||||
-rw-r--r-- | test/sql/test_generative.py | 6 | ||||
-rw-r--r-- | test/sql/test_insert_exec.py | 8 | ||||
-rw-r--r-- | test/sql/test_metadata.py | 32 | ||||
-rw-r--r-- | test/sql/test_resultset.py | 2 | ||||
-rw-r--r-- | test/sql/test_returning.py | 2 | ||||
-rw-r--r-- | test/sql/test_selectable.py | 718 | ||||
-rw-r--r-- | test/sql/test_types.py | 15 | ||||
-rw-r--r-- | test/sql/test_unicode.py | 5 |
11 files changed, 101 insertions, 801 deletions
diff --git a/test/aaa_profiling/test_zoomark_orm.py b/test/aaa_profiling/test_zoomark_orm.py index 0c5525a06..0b1d05cc3 100644 --- a/test/aaa_profiling/test_zoomark_orm.py +++ b/test/aaa_profiling/test_zoomark_orm.py @@ -358,9 +358,8 @@ class ZooMarkTest(replay_fixture.ReplayFixtureTest): list( self.session.query(Zoo).filter( and_( - Zoo.Founded != None, # noqa - Zoo.Founded < func.now(), - ) + Zoo.Founded != None, Zoo.Founded < func.now() + ) # noqa ) ) ) diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index d526d9eee..0e81cc30b 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -959,14 +959,14 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "zips", column("zipcode"), column("latitude"), column("longitude") ) places = table("places", column("id"), column("nm")) - zip = "12345" + zipcode = "12345" qlat = ( - select([zips.c.latitude], zips.c.zipcode == zip) + select([zips.c.latitude], zips.c.zipcode == zipcode) .correlate(None) .as_scalar() ) qlng = ( - select([zips.c.longitude], zips.c.zipcode == zip) + select([zips.c.longitude], zips.c.zipcode == zipcode) .correlate(None) .as_scalar() ) @@ -978,7 +978,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): zips.c.zipcode, func.latlondist(qlat, qlng).label("dist"), ], - zips.c.zipcode == zip, + zips.c.zipcode == zipcode, order_by=["dist", places.c.nm], ) @@ -3370,7 +3370,7 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): pass @compiles(MyType) - def compile(element, compiler, **kw): + def compile_(element, compiler, **kw): raise exc.CompileError("Couldn't compile type") return MyType diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index faaa36f03..73c03bbd4 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -730,7 +730,7 @@ class ExecuteTest(fixtures.TestBase): type = Date() @compiles(myfunc) - def compile(elem, compiler, **kw): + def compile_(elem, compiler, **kw): return compiler.process(func.current_date()) conn = testing.db.connect() @@ -757,6 +757,7 @@ class ExecuteTest(fixtures.TestBase): ret.close() @engines.close_first + @testing.provide_metadata def test_update(self): """ Tests sending functions and SQL expressions to the VALUES and SET @@ -789,56 +790,52 @@ class ExecuteTest(fixtures.TestBase): Column("stuff", String(20), onupdate="thisisstuff"), ) meta.create_all() - try: - t.insert(values=dict(value=func.length("one"))).execute() - assert t.select().execute().first()["value"] == 3 - t.update(values=dict(value=func.length("asfda"))).execute() - assert t.select().execute().first()["value"] == 5 - - r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute() - id = r.inserted_primary_key[0] - assert t.select(t.c.id == id).execute().first()["value"] == 9 - t.update(values={t.c.value: func.length("asdf")}).execute() - assert t.select().execute().first()["value"] == 4 - t2.insert().execute() - t2.insert(values=dict(value=func.length("one"))).execute() - t2.insert(values=dict(value=func.length("asfda") + -19)).execute( - stuff="hi" - ) - - res = exec_sorted(select([t2.c.value, t2.c.stuff])) - eq_(res, [(-14, "hi"), (3, None), (7, None)]) - - t2.update(values=dict(value=func.length("asdsafasd"))).execute( - stuff="some stuff" - ) - assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [ - (9, "some stuff"), - (9, "some stuff"), - (9, "some stuff"), - ] - - t2.delete().execute() - - t2.insert(values=dict(value=func.length("one") + 8)).execute() - assert t2.select().execute().first()["value"] == 11 - - t2.update(values=dict(value=func.length("asfda"))).execute() - eq_( - select([t2.c.value, t2.c.stuff]).execute().first(), - (5, "thisisstuff"), - ) - - t2.update( - values={ - t2.c.value: func.length("asfdaasdf"), - t2.c.stuff: "foo", - } - ).execute() - print("HI", select([t2.c.value, t2.c.stuff]).execute().first()) - eq_(select([t2.c.value, t2.c.stuff]).execute().first(), (9, "foo")) - finally: - meta.drop_all() + t.insert(values=dict(value=func.length("one"))).execute() + assert t.select().execute().first()["value"] == 3 + t.update(values=dict(value=func.length("asfda"))).execute() + assert t.select().execute().first()["value"] == 5 + + r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute() + id_ = r.inserted_primary_key[0] + assert t.select(t.c.id == id_).execute().first()["value"] == 9 + t.update(values={t.c.value: func.length("asdf")}).execute() + assert t.select().execute().first()["value"] == 4 + t2.insert().execute() + t2.insert(values=dict(value=func.length("one"))).execute() + t2.insert(values=dict(value=func.length("asfda") + -19)).execute( + stuff="hi" + ) + + res = exec_sorted(select([t2.c.value, t2.c.stuff])) + eq_(res, [(-14, "hi"), (3, None), (7, None)]) + + t2.update(values=dict(value=func.length("asdsafasd"))).execute( + stuff="some stuff" + ) + assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [ + (9, "some stuff"), + (9, "some stuff"), + (9, "some stuff"), + ] + + t2.delete().execute() + + t2.insert(values=dict(value=func.length("one") + 8)).execute() + assert t2.select().execute().first()["value"] == 11 + + t2.update(values=dict(value=func.length("asfda"))).execute() + eq_( + select([t2.c.value, t2.c.stuff]).execute().first(), + (5, "thisisstuff"), + ) + + t2.update( + values={ + t2.c.value: func.length("asfdaasdf"), + t2.c.stuff: "foo", + } + ).execute() + eq_(select([t2.c.value, t2.c.stuff]).execute().first(), (9, "foo")) @testing.fails_on_everything_except("postgresql") def test_as_from(self): diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 350438f25..6c6ddec05 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -226,9 +226,9 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): assert CustomObj.__visit_name__ == Column.__visit_name__ == "column" foo, bar = CustomObj("foo", String), CustomObj("bar", String) - bin = foo == bar - set(ClauseVisitor().iterate(bin)) - assert set(ClauseVisitor().iterate(bin)) == set([foo, bar, bin]) + bin_ = foo == bar + set(ClauseVisitor().iterate(bin_)) + assert set(ClauseVisitor().iterate(bin_)) == set([foo, bar, bin_]) class BinaryEndpointTraversalTest(fixtures.TestBase): diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py index e6b989e88..fafe7cc9b 100644 --- a/test/sql/test_insert_exec.py +++ b/test/sql/test_insert_exec.py @@ -96,16 +96,16 @@ class InsertExecTest(fixtures.TablesTest): result = engine.execute(table_.insert(), **values) ret = values.copy() - for col, id in zip( + for col, id_ in zip( table_.primary_key, result.inserted_primary_key ): - ret[col.key] = id + ret[col.key] = id_ if result.lastrow_has_defaults(): criterion = and_( *[ - col == id - for col, id in zip( + col == id_ + for col, id_ in zip( table_.primary_key, result.inserted_primary_key ) ] diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 72896c945..9c60cc981 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -530,7 +530,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): ( name, metadata, - schema, + schema_, quote_schema, exp_schema, exp_quote_schema, @@ -558,8 +558,8 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): ] ): kw = {} - if schema is not None: - kw["schema"] = schema + if schema_ is not None: + kw["schema"] = schema_ if quote_schema is not None: kw["quote_schema"] = quote_schema t = Table(name, metadata, **kw) @@ -3516,12 +3516,12 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): from sqlalchemy.sql import select class MyColumn(Column): - def _constructor(self, name, type, **kw): + def _constructor(self, name, type_, **kw): kw["name"] = name - return MyColumn(type, **kw) + return MyColumn(type_, **kw) - def __init__(self, type, **kw): - Column.__init__(self, type, **kw) + def __init__(self, type_, **kw): + Column.__init__(self, type_, **kw) def my_goofy_thing(self): return "hi" @@ -3531,11 +3531,11 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): s = compiler.visit_column(element, **kw) return s + "-" - id = MyColumn(Integer, primary_key=True) - id.name = "id" + id_ = MyColumn(Integer, primary_key=True) + id_.name = "id" name = MyColumn(String) name.name = "name" - t1 = Table("foo", MetaData(), id, name) + t1 = Table("foo", MetaData(), id_, name) # goofy thing eq_(t1.c.name.my_goofy_thing(), "hi") @@ -3559,14 +3559,14 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): from sqlalchemy.sql import select class MyColumn(Column): - def __init__(self, type, **kw): - Column.__init__(self, type, **kw) + def __init__(self, type_, **kw): + Column.__init__(self, type_, **kw) - id = MyColumn(Integer, primary_key=True) - id.name = "id" + id_ = MyColumn(Integer, primary_key=True) + id_.name = "id" name = MyColumn(String) name.name = "name" - t1 = Table("foo", MetaData(), id, name) + t1 = Table("foo", MetaData(), id_, name) assert_raises_message( TypeError, "Could not create a copy of this <class " @@ -3581,7 +3581,7 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): from sqlalchemy.ext.compiler import compiles, deregister @compiles(schema.CreateColumn) - def compile(element, compiler, **kw): + def compile_(element, compiler, **kw): column = element.element if "special" not in column.info: diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index d9ce83838..3bd61b1f8 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -3,8 +3,8 @@ import operator from sqlalchemy import CHAR from sqlalchemy import column -from sqlalchemy import exc as sa_exc from sqlalchemy import exc +from sqlalchemy import exc as sa_exc from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy import INT diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py index 38ba3a43f..9f77a6783 100644 --- a/test/sql/test_returning.py +++ b/test/sql/test_returning.py @@ -311,7 +311,7 @@ class ReturnDefaultsTest(fixtures.TablesTest): pass @compiles(IncDefault) - def compile(element, compiler, **kw): + def compile_(element, compiler, **kw): return str(next(counter)) Table( diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 00b8fea4a..578f242a7 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1,12 +1,10 @@ """Test various algorithmic properties of selectables.""" -from sqlalchemy import alias from sqlalchemy import and_ from sqlalchemy import bindparam from sqlalchemy import Boolean from sqlalchemy import cast from sqlalchemy import Column -from sqlalchemy import column from sqlalchemy import exc from sqlalchemy import exists from sqlalchemy import ForeignKey @@ -23,20 +21,15 @@ from sqlalchemy import select from sqlalchemy import Sequence from sqlalchemy import String from sqlalchemy import Table -from sqlalchemy import table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import type_coerce from sqlalchemy import TypeDecorator from sqlalchemy import union from sqlalchemy import util -from sqlalchemy.schema import Column -from sqlalchemy.schema import MetaData -from sqlalchemy.schema import Table from sqlalchemy.sql import column from sqlalchemy.sql import elements from sqlalchemy.sql import expression -from sqlalchemy.sql import null from sqlalchemy.sql import table from sqlalchemy.sql import util as sql_util from sqlalchemy.sql import visitors @@ -718,701 +711,22 @@ class SelectableTest( def test_multi_label_chain_naming_col(self): # See [ticket:2167] for this one. - l1 = table1.c.col1.label("a") - l2 = select([l1]).label("b") + l1 = table1.c.col1.label('a') + l2 = select([l1]).label('b') s = select([l2]) assert s.c.b is not None self.assert_compile( s.select(), - "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)", + "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)" ) - s2 = select([s.label("c")]) + s2 = select([s.label('c')]) self.assert_compile( s2.select(), "SELECT c FROM (SELECT (SELECT (" - "SELECT table1.col1 AS a FROM table1) AS b) AS c)", + "SELECT table1.col1 AS a FROM table1) AS b) AS c)" ) - def test_self_referential_select_raises(self): - t = table("t", column("x")) - - s = select([t]) - - s.append_whereclause(s.c.x > 5) - assert_raises_message( - exc.InvalidRequestError, - r"select\(\) construct refers to itself as a FROM", - s.compile, - ) - - def test_unusual_column_elements_text(self): - """test that .c excludes text().""" - - s = select([table1.c.col1, text("foo")]) - eq_(list(s.c), [s.c.col1]) - - def test_unusual_column_elements_clauselist(self): - """Test that raw ClauseList is expanded into .c.""" - - from sqlalchemy.sql.expression import ClauseList - - s = select([table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)]) - eq_(list(s.c), [s.c.col1, s.c.col2, s.c.col3]) - - def test_unusual_column_elements_boolean_clauselist(self): - """test that BooleanClauseList is placed as single element in .c.""" - - c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4) - s = select([table1.c.col1, c2]) - eq_(list(s.c), [s.c.col1, s.corresponding_column(c2)]) - - def test_from_list_deferred_constructor(self): - c1 = Column("c1", Integer) - c2 = Column("c2", Integer) - - s = select([c1]) - - t = Table("t", MetaData(), c1, c2) - - eq_(c1._from_objects, [t]) - eq_(c2._from_objects, [t]) - - self.assert_compile(select([c1]), "SELECT t.c1 FROM t") - self.assert_compile(select([c2]), "SELECT t.c2 FROM t") - - def test_from_list_deferred_whereclause(self): - c1 = Column("c1", Integer) - c2 = Column("c2", Integer) - - s = select([c1]).where(c1 == 5) - - t = Table("t", MetaData(), c1, c2) - - eq_(c1._from_objects, [t]) - eq_(c2._from_objects, [t]) - - self.assert_compile(select([c1]), "SELECT t.c1 FROM t") - self.assert_compile(select([c2]), "SELECT t.c2 FROM t") - - def test_from_list_deferred_fromlist(self): - m = MetaData() - t1 = Table("t1", m, Column("x", Integer)) - - c1 = Column("c1", Integer) - s = select([c1]).where(c1 == 5).select_from(t1) - - t2 = Table("t2", MetaData(), c1) - - eq_(c1._from_objects, [t2]) - - self.assert_compile(select([c1]), "SELECT t2.c1 FROM t2") - - def test_from_list_deferred_cloning(self): - c1 = Column("c1", Integer) - c2 = Column("c2", Integer) - - s = select([c1]) - s2 = select([c2]) - s3 = sql_util.ClauseAdapter(s).traverse(s2) - - Table("t", MetaData(), c1, c2) - - self.assert_compile(s3, "SELECT t.c2 FROM t") - - def test_from_list_with_columns(self): - table1 = table("t1", column("a")) - table2 = table("t2", column("b")) - s1 = select([table1.c.a, table2.c.b]) - self.assert_compile(s1, "SELECT t1.a, t2.b FROM t1, t2") - s2 = s1.with_only_columns([table2.c.b]) - self.assert_compile(s2, "SELECT t2.b FROM t2") - - s3 = sql_util.ClauseAdapter(table1).traverse(s1) - self.assert_compile(s3, "SELECT t1.a, t2.b FROM t1, t2") - s4 = s3.with_only_columns([table2.c.b]) - self.assert_compile(s4, "SELECT t2.b FROM t2") - - def test_from_list_warning_against_existing(self): - c1 = Column("c1", Integer) - s = select([c1]) - - # force a compile. - self.assert_compile(s, "SELECT c1") - - Table("t", MetaData(), c1) - - self.assert_compile(s, "SELECT t.c1 FROM t") - - def test_from_list_recovers_after_warning(self): - c1 = Column("c1", Integer) - c2 = Column("c2", Integer) - - s = select([c1]) - - # force a compile. - eq_(str(s), "SELECT c1") - - @testing.emits_warning() - def go(): - return Table("t", MetaData(), c1, c2) - - t = go() - - eq_(c1._from_objects, [t]) - eq_(c2._from_objects, [t]) - - # 's' has been baked. Can't afford - # not caching select._froms. - # hopefully the warning will clue the user - self.assert_compile(s, "SELECT t.c1 FROM t") - self.assert_compile(select([c1]), "SELECT t.c1 FROM t") - self.assert_compile(select([c2]), "SELECT t.c2 FROM t") - - def test_label_gen_resets_on_table(self): - c1 = Column("c1", Integer) - eq_(c1._label, "c1") - Table("t1", MetaData(), c1) - eq_(c1._label, "t1_c1") - - -class RefreshForNewColTest(fixtures.TestBase): - def test_join_uninit(self): - a = table("a", column("x")) - b = table("b", column("y")) - j = a.join(b, a.c.x == b.c.y) - - q = column("q") - b.append_column(q) - j._refresh_for_new_column(q) - assert j.c.b_q is q - - def test_join_init(self): - a = table("a", column("x")) - b = table("b", column("y")) - j = a.join(b, a.c.x == b.c.y) - j.c - q = column("q") - b.append_column(q) - j._refresh_for_new_column(q) - assert j.c.b_q is q - - def test_join_samename_init(self): - a = table("a", column("x")) - b = table("b", column("y")) - j = a.join(b, a.c.x == b.c.y) - j.c - q = column("x") - b.append_column(q) - j._refresh_for_new_column(q) - assert j.c.b_x is q - - def test_select_samename_init(self): - a = table("a", column("x")) - b = table("b", column("y")) - s = select([a, b]).apply_labels() - s.c - q = column("x") - b.append_column(q) - s._refresh_for_new_column(q) - assert q in s.c.b_x.proxy_set - - def test_aliased_select_samename_uninit(self): - a = table("a", column("x")) - b = table("b", column("y")) - s = select([a, b]).apply_labels().alias() - q = column("x") - b.append_column(q) - s._refresh_for_new_column(q) - assert q in s.c.b_x.proxy_set - - def test_aliased_select_samename_init(self): - a = table("a", column("x")) - b = table("b", column("y")) - s = select([a, b]).apply_labels().alias() - s.c - q = column("x") - b.append_column(q) - s._refresh_for_new_column(q) - assert q in s.c.b_x.proxy_set - - def test_aliased_select_irrelevant(self): - a = table("a", column("x")) - b = table("b", column("y")) - c = table("c", column("z")) - s = select([a, b]).apply_labels().alias() - s.c - q = column("x") - c.append_column(q) - s._refresh_for_new_column(q) - assert "c_x" not in s.c - - def test_aliased_select_no_cols_clause(self): - a = table("a", column("x")) - s = select([a.c.x]).apply_labels().alias() - s.c - q = column("q") - a.append_column(q) - s._refresh_for_new_column(q) - assert "a_q" not in s.c - - def test_union_uninit(self): - a = table("a", column("x")) - s1 = select([a]) - s2 = select([a]) - s3 = s1.union(s2) - q = column("q") - a.append_column(q) - s3._refresh_for_new_column(q) - assert a.c.q in s3.c.q.proxy_set - - def test_union_init_raises(self): - a = table("a", column("x")) - s1 = select([a]) - s2 = select([a]) - s3 = s1.union(s2) - s3.c - q = column("q") - a.append_column(q) - assert_raises_message( - NotImplementedError, - "CompoundSelect constructs don't support addition of " - "columns to underlying selectables", - s3._refresh_for_new_column, - q, - ) - - def test_nested_join_uninit(self): - a = table("a", column("x")) - b = table("b", column("y")) - c = table("c", column("z")) - j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z) - - q = column("q") - b.append_column(q) - j._refresh_for_new_column(q) - assert j.c.b_q is q - - def test_nested_join_init(self): - a = table("a", column("x")) - b = table("b", column("y")) - c = table("c", column("z")) - j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z) - - j.c - q = column("q") - b.append_column(q) - j._refresh_for_new_column(q) - assert j.c.b_q is q - - def test_fk_table(self): - m = MetaData() - fk = ForeignKey("x.id") - Table("x", m, Column("id", Integer)) - a = Table("a", m, Column("x", Integer, fk)) - a.c - - q = Column("q", Integer) - a.append_column(q) - a._refresh_for_new_column(q) - eq_(a.foreign_keys, set([fk])) - - fk2 = ForeignKey("g.id") - p = Column("p", Integer, fk2) - a.append_column(p) - a._refresh_for_new_column(p) - eq_(a.foreign_keys, set([fk, fk2])) - - def test_fk_join(self): - m = MetaData() - fk = ForeignKey("x.id") - Table("x", m, Column("id", Integer)) - a = Table("a", m, Column("x", Integer, fk)) - b = Table("b", m, Column("y", Integer)) - j = a.join(b, a.c.x == b.c.y) - j.c - - q = Column("q", Integer) - b.append_column(q) - j._refresh_for_new_column(q) - eq_(j.foreign_keys, set([fk])) - - fk2 = ForeignKey("g.id") - p = Column("p", Integer, fk2) - b.append_column(p) - j._refresh_for_new_column(p) - eq_(j.foreign_keys, set([fk, fk2])) - - -class AnonLabelTest(fixtures.TestBase): - - """Test behaviors fixed by [ticket:2168].""" - - def test_anon_labels_named_column(self): - c1 = column("x") - - assert c1.label(None) is not c1 - eq_(str(select([c1.label(None)])), "SELECT x AS x_1") - - def test_anon_labels_literal_column(self): - c1 = literal_column("x") - assert c1.label(None) is not c1 - eq_(str(select([c1.label(None)])), "SELECT x AS x_1") - - def test_anon_labels_func(self): - c1 = func.count("*") - assert c1.label(None) is not c1 - - eq_(str(select([c1])), "SELECT count(:count_2) AS count_1") - c2 = select([c1]).compile() - - eq_(str(select([c1.label(None)])), "SELECT count(:count_2) AS count_1") - - def test_named_labels_named_column(self): - c1 = column("x") - eq_(str(select([c1.label("y")])), "SELECT x AS y") - - def test_named_labels_literal_column(self): - c1 = literal_column("x") - eq_(str(select([c1.label("y")])), "SELECT x AS y") - - -class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = "default" - - def test_flat_ok_on_non_join(self): - a = table("a", column("a")) - s = a.select() - self.assert_compile( - s.alias(flat=True).select(), - "SELECT anon_1.a FROM (SELECT a.a AS a FROM a) AS anon_1", - ) - - def test_join_alias(self): - a = table("a", column("a")) - b = table("b", column("b")) - self.assert_compile( - a.join(b, a.c.a == b.c.b).alias(), - "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b", - ) - - def test_join_standalone_alias(self): - a = table("a", column("a")) - b = table("b", column("b")) - self.assert_compile( - alias(a.join(b, a.c.a == b.c.b)), - "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b", - ) - - def test_join_alias_flat(self): - a = table("a", column("a")) - b = table("b", column("b")) - self.assert_compile( - a.join(b, a.c.a == b.c.b).alias(flat=True), - "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b", - ) - - def test_join_standalone_alias_flat(self): - a = table("a", column("a")) - b = table("b", column("b")) - self.assert_compile( - alias(a.join(b, a.c.a == b.c.b), flat=True), - "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b", - ) - - def test_composed_join_alias_flat(self): - a = table("a", column("a")) - b = table("b", column("b")) - c = table("c", column("c")) - d = table("d", column("d")) - - j1 = a.join(b, a.c.a == b.c.b) - j2 = c.join(d, c.c.c == d.c.d) - self.assert_compile( - j1.join(j2, b.c.b == c.c.c).alias(flat=True), - "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b JOIN " - "(c AS c_1 JOIN d AS d_1 ON c_1.c = d_1.d) ON b_1.b = c_1.c", - ) - - def test_composed_join_alias(self): - a = table("a", column("a")) - b = table("b", column("b")) - c = table("c", column("c")) - d = table("d", column("d")) - - j1 = a.join(b, a.c.a == b.c.b) - j2 = c.join(d, c.c.c == d.c.d) - self.assert_compile( - select([j1.join(j2, b.c.b == c.c.c).alias()]), - "SELECT anon_1.a_a, anon_1.b_b, anon_1.c_c, anon_1.d_d " - "FROM (SELECT a.a AS a_a, b.b AS b_b, c.c AS c_c, d.d AS d_d " - "FROM a JOIN b ON a.a = b.b " - "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1", - ) - - -class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = "default" - - def test_join_condition(self): - m = MetaData() - t1 = Table("t1", m, Column("id", Integer)) - t2 = Table( - "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id")) - ) - t3 = Table( - "t3", - m, - Column("id", Integer), - Column("t1id", ForeignKey("t1.id")), - Column("t2id", ForeignKey("t2.id")), - ) - t4 = Table( - "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id")) - ) - t5 = Table( - "t5", - m, - Column("t1id1", ForeignKey("t1.id")), - Column("t1id2", ForeignKey("t1.id")), - ) - - t1t2 = t1.join(t2) - t2t3 = t2.join(t3) - - for (left, right, a_subset, expected) in [ - (t1, t2, None, t1.c.id == t2.c.t1id), - (t1t2, t3, t2, t1t2.c.t2_id == t3.c.t2id), - (t2t3, t1, t3, t1.c.id == t3.c.t1id), - (t2t3, t4, None, t2t3.c.t2_id == t4.c.t2id), - (t2t3, t4, t3, t2t3.c.t2_id == t4.c.t2id), - (t2t3.join(t1), t4, None, t2t3.c.t2_id == t4.c.t2id), - (t2t3.join(t1), t4, t1, t2t3.c.t2_id == t4.c.t2id), - (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id), - ]: - assert expected.compare( - sql_util.join_condition(left, right, a_subset=a_subset) - ) - - # these are ambiguous, or have no joins - for left, right, a_subset in [ - (t1t2, t3, None), - (t2t3, t1, None), - (t1, t4, None), - (t1t2, t2t3, None), - (t5, t1, None), - (t5.select(use_labels=True), t1, None), - ]: - assert_raises( - exc.ArgumentError, - sql_util.join_condition, - left, - right, - a_subset=a_subset, - ) - - als = t2t3.alias() - # test join's behavior, including natural - for left, right, expected in [ - (t1, t2, t1.c.id == t2.c.t1id), - (t1t2, t3, t1t2.c.t2_id == t3.c.t2id), - (t2t3, t1, t1.c.id == t3.c.t1id), - (t2t3, t4, t2t3.c.t2_id == t4.c.t2id), - (t2t3, t4, t2t3.c.t2_id == t4.c.t2id), - (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id), - (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id), - (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id), - ]: - assert expected.compare(left.join(right).onclause) - - # these are right-nested joins - j = t1t2.join(t2t3) - assert j.onclause.compare(t2.c.id == t3.c.t2id) - self.assert_compile( - j, - "t1 JOIN t2 ON t1.id = t2.t1id JOIN " - "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id", - ) - - st2t3 = t2t3.select(use_labels=True) - j = t1t2.join(st2t3) - assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id) - self.assert_compile( - j, - "t1 JOIN t2 ON t1.id = t2.t1id JOIN " - "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, " - "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id " - "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id", - ) - - def test_join_multiple_equiv_fks(self): - m = MetaData() - t1 = Table("t1", m, Column("id", Integer, primary_key=True)) - t2 = Table( - "t2", - m, - Column("t1id", Integer, ForeignKey("t1.id"), ForeignKey("t1.id")), - ) - - assert sql_util.join_condition(t1, t2).compare(t1.c.id == t2.c.t1id) - - def test_join_cond_no_such_unrelated_table(self): - m = MetaData() - # bounding the "good" column with two "bad" ones is so to - # try to get coverage to get the "continue" statements - # in the loop... - t1 = Table( - "t1", - m, - Column("y", Integer, ForeignKey("t22.id")), - Column("x", Integer, ForeignKey("t2.id")), - Column("q", Integer, ForeignKey("t22.id")), - ) - t2 = Table("t2", m, Column("id", Integer)) - assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) - assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id) - - def test_join_cond_no_such_unrelated_column(self): - m = MetaData() - t1 = Table( - "t1", - m, - Column("x", Integer, ForeignKey("t2.id")), - Column("y", Integer, ForeignKey("t3.q")), - ) - t2 = Table("t2", m, Column("id", Integer)) - Table("t3", m, Column("id", Integer)) - assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) - assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id) - - def test_join_cond_no_such_related_table(self): - m1 = MetaData() - m2 = MetaData() - t1 = Table("t1", m1, Column("x", Integer, ForeignKey("t2.id"))) - t2 = Table("t2", m2, Column("id", Integer)) - assert_raises_message( - exc.NoReferencedTableError, - "Foreign key associated with column 't1.x' could not find " - "table 't2' with which to generate a foreign key to " - "target column 'id'", - sql_util.join_condition, - t1, - t2, - ) - - assert_raises_message( - exc.NoReferencedTableError, - "Foreign key associated with column 't1.x' could not find " - "table 't2' with which to generate a foreign key to " - "target column 'id'", - sql_util.join_condition, - t2, - t1, - ) - - def test_join_cond_no_such_related_column(self): - m = MetaData() - t1 = Table("t1", m, Column("x", Integer, ForeignKey("t2.q"))) - t2 = Table("t2", m, Column("id", Integer)) - assert_raises_message( - exc.NoReferencedColumnError, - "Could not initialize target column for " - "ForeignKey 't2.q' on table 't1': " - "table 't2' has no column named 'q'", - sql_util.join_condition, - t1, - t2, - ) - - assert_raises_message( - exc.NoReferencedColumnError, - "Could not initialize target column for " - "ForeignKey 't2.q' on table 't1': " - "table 't2' has no column named 'q'", - sql_util.join_condition, - t2, - t1, - ) - - -class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): - def test_join_pk_collapse_implicit(self): - """test that redundant columns in a join get 'collapsed' into a - minimal primary key, which is the root column along a chain of - foreign key relationships.""" - - meta = MetaData() - a = Table("a", meta, Column("id", Integer, primary_key=True)) - b = Table( - "b", - meta, - Column("id", Integer, ForeignKey("a.id"), primary_key=True), - ) - c = Table( - "c", - meta, - Column("id", Integer, ForeignKey("b.id"), primary_key=True), - ) - d = Table( - "d", - meta, - Column("id", Integer, ForeignKey("c.id"), primary_key=True), - ) - assert c.c.id.references(b.c.id) - assert not d.c.id.references(a.c.id) - assert list(a.join(b).primary_key) == [a.c.id] - assert list(b.join(c).primary_key) == [b.c.id] - assert list(a.join(b).join(c).primary_key) == [a.c.id] - assert list(b.join(c).join(d).primary_key) == [b.c.id] - assert list(d.join(c).join(b).primary_key) == [b.c.id] - assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id] - - def test_join_pk_collapse_explicit(self): - """test that redundant columns in a join get 'collapsed' into a - minimal primary key, which is the root column along a chain of - explicit join conditions.""" - - meta = MetaData() - a = Table( - "a", - meta, - Column("id", Integer, primary_key=True), - Column("x", Integer), - ) - b = Table( - "b", - meta, - Column("id", Integer, ForeignKey("a.id"), primary_key=True), - Column("x", Integer), - ) - c = Table( - "c", - meta, - Column("id", Integer, ForeignKey("b.id"), primary_key=True), - Column("x", Integer), - ) - d = Table( - "d", - meta, - Column("id", Integer, ForeignKey("c.id"), primary_key=True), - Column("x", Integer), - ) - print(list(a.join(b, a.c.x == b.c.id).primary_key)) - assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id] - assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id] - assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) == [a.c.id] - assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) == [b.c.id] - assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) == [b.c.id] - assert list( - d.join(b, d.c.id == b.c.id).join(c, b.c.id == c.c.x).primary_key - ) == [b.c.id] - assert list( - a.join(b).join(c, c.c.id == b.c.x).join(d).primary_key - ) == [a.c.id] - assert list( - a.join(b, and_(a.c.id == b.c.id, a.c.x == b.c.id)).primary_key - ) == [a.c.id] - def test_init_doesnt_blowitaway(self): meta = MetaData() a = Table( @@ -2013,17 +1327,17 @@ class AnnotationsTest(fixtures.TestBase): def test_annotated_visit(self): table1 = table("table1", column("col1"), column("col2")) - bin = table1.c.col1 == bindparam("foo", value=None) - assert str(bin) == "table1.col1 = :foo" + bin_ = table1.c.col1 == bindparam("foo", value=None) + assert str(bin_) == "table1.col1 = :foo" def visit_binary(b): b.right = table1.c.col2 - b2 = visitors.cloned_traverse(bin, {}, {"binary": visit_binary}) + b2 = visitors.cloned_traverse(bin_, {}, {"binary": visit_binary}) assert str(b2) == "table1.col1 = table1.col2" b3 = visitors.cloned_traverse( - bin._annotate({}), {}, {"binary": visit_binary} + bin_._annotate({}), {}, {"binary": visit_binary} ) assert str(b3) == "table1.col1 = table1.col2" @@ -2074,11 +1388,11 @@ class AnnotationsTest(fixtures.TestBase): def test_deannotate(self): table1 = table("table1", column("col1"), column("col2")) - bin = table1.c.col1 == bindparam("foo", value=None) + bin_ = table1.c.col1 == bindparam("foo", value=None) - b2 = sql_util._deep_annotate(bin, {"_orm_adapt": True}) + b2 = sql_util._deep_annotate(bin_, {"_orm_adapt": True}) b3 = sql_util._deep_deannotate(b2) - b4 = sql_util._deep_deannotate(bin) + b4 = sql_util._deep_deannotate(bin_) for elem in (b2._annotations, b2.left._annotations): assert "_orm_adapt" in elem @@ -2091,12 +1405,12 @@ class AnnotationsTest(fixtures.TestBase): ): assert elem == {} - assert b2.left is not bin.left - assert b3.left is not b2.left and b2.left is not bin.left - assert b4.left is bin.left # since column is immutable + assert b2.left is not bin_.left + assert b3.left is not b2.left and b2.left is not bin_.left + assert b4.left is bin_.left # since column is immutable # deannotate copies the element assert ( - bin.right is not b2.right + bin_.right is not b2.right and b2.right is not b3.right and b3.right is not b4.right ) diff --git a/test/sql/test_types.py b/test/sql/test_types.py index f7d71ec44..c54fe1e54 100644 --- a/test/sql/test_types.py +++ b/test/sql/test_types.py @@ -64,6 +64,7 @@ from sqlalchemy.sql import column from sqlalchemy.sql import ddl from sqlalchemy.sql import null from sqlalchemy.sql import operators +from sqlalchemy.sql import sqltypes from sqlalchemy.sql import table from sqlalchemy.sql import visitors from sqlalchemy.testing import assert_raises @@ -200,8 +201,6 @@ class AdaptTest(fixtures.TestBase): ): yield True, subcl, [typ] - from sqlalchemy.sql import sqltypes - for is_down_adaption, typ, target_adaptions in adaptions(): if typ in (types.TypeDecorator, types.TypeEngine, types.Variant): continue @@ -2756,8 +2755,8 @@ class NumericRawSQLTest(fixtures.TestBase): """ - def _fixture(self, metadata, type, data): - t = Table("t", metadata, Column("val", type)) + def _fixture(self, metadata, type_, data): + t = Table("t", metadata, Column("val", type_)) metadata.create_all() t.insert().execute(val=data) @@ -3122,14 +3121,6 @@ class BooleanTest( TypeError, "Not a boolean value: 'foo'", proc, "foo" ) - def test_literal_processor_coercion_native_int_out_of_range(self): - proc = Boolean().literal_processor( - default.DefaultDialect(supports_native_boolean=True) - ) - assert_raises_message( - ValueError, "Value 15 is not None, True, or False", proc, 15 - ) - class PickleTest(fixtures.TestBase): def test_eq_comparison(self): diff --git a/test/sql/test_unicode.py b/test/sql/test_unicode.py index fc4ac3de2..5b51644e6 100644 --- a/test/sql/test_unicode.py +++ b/test/sql/test_unicode.py @@ -1,12 +1,10 @@ # coding: utf-8 """verrrrry basic unicode column name testing""" -from sqlalchemy import Column from sqlalchemy import desc from sqlalchemy import ForeignKey from sqlalchemy import Integer from sqlalchemy import MetaData -from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ @@ -198,7 +196,8 @@ class UnicodeSchemaTest(fixtures.TestBase): repr(t), ( "Table('\\u6e2c\\u8a66', MetaData(bind=None), " - "Column('\\u6e2c\\u8a66_id', Integer(), table=<\u6e2c\u8a66>), " + "Column('\\u6e2c\\u8a66_id', Integer(), " + "table=<\u6e2c\u8a66>), " "schema=None)" ), ) |