diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-12-03 09:11:14 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-12-04 10:45:35 -0500 |
| commit | 1284fa377e53f03cec061d7af16f269ad73fa7b9 (patch) | |
| tree | 40e421506cd20f3cf8c12cba834062cd1bb0505c /test/sql | |
| parent | 96db7cdd53ee9004be66545989b4ac5632bb7ccf (diff) | |
| download | sqlalchemy-1284fa377e53f03cec061d7af16f269ad73fa7b9.tar.gz | |
disallow same-named columns, unchecked replacement in Table
Fixed issue where table reflection using :paramref:`.Table.extend_existing`
would fail to deduplicate a same-named column if the existing
:class:`.Table` used a separate key. The
:paramref:`.Table.autoload_replace` parameter would allow the column to be
skipped but under no circumstances should a :class:`.Table` ever have the
same-named column twice.
Additionally, changed deprecation warnings to exceptions
as were implemented in I1d58c8ebe081079cb669e7ead60886ffc1b1a7f5 .
Fixes: #8925
Change-Id: I83d0f8658177a7ffbb06e01dbca91377d1a98d49
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_metadata.py | 185 |
1 files changed, 154 insertions, 31 deletions
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index e50aa236a..56bb89541 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -59,6 +59,7 @@ from sqlalchemy.testing import is_ from sqlalchemy.testing import is_false from sqlalchemy.testing import is_true from sqlalchemy.testing import mock +from sqlalchemy.testing import Variation from sqlalchemy.testing.assertions import expect_warnings @@ -1825,8 +1826,12 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): is_(t2.c.contains_column(g), False) def test_table_ctor_duplicated_column_name(self): - def go(): - return Table( + # when it will raise + with testing.expect_raises_message( + exc.ArgumentError, + "A column with name 'col' is already present in table 't'", + ): + Table( "t", MetaData(), Column("a", Integer), @@ -1834,39 +1839,42 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): Column("col", String), ) - with testing.expect_deprecated( - "A column with name 'col' is already present in table 't'", - ): - t = go() - is_true(isinstance(t.c.col.type, String)) - # when it will raise - # with testing.expect_raises_message( - # exc.ArgumentError, - # "A column with name 'col' is already present in table 't'", - # ): - # go() - def test_append_column_existing_name(self): t = Table("t", MetaData(), Column("col", Integer)) - with testing.expect_deprecated( - "A column with name 'col' is already present in table 't'", + with testing.expect_raises_message( + exc.DuplicateColumnError, + r"A column with name 'col' is already present in table 't'. " + r"Specify replace_existing=True to Table.append_column\(\) to " + r"replace an existing column.", ): t.append_column(Column("col", String)) - is_true(isinstance(t.c.col.type, String)) - # when it will raise - # col = t.c.col - # with testing.expect_raises_message( - # exc.ArgumentError, - # "A column with name 'col' is already present in table 't'", - # ): - # t.append_column(Column("col", String)) - # is_true(t.c.col is col) - - def test_append_column_replace_existing(self): - t = Table("t", MetaData(), Column("col", Integer)) - t.append_column(Column("col", String), replace_existing=True) - is_true(isinstance(t.c.col.type, String)) + + def test_append_column_existing_key(self): + t = Table("t", MetaData(), Column("col", Integer, key="c2")) + + with testing.expect_raises_message( + exc.DuplicateColumnError, + r"A column with key 'c2' is already present in table 't'. " + r"Specify replace_existing=True to Table.append_column\(\) " + r"to replace an existing column.", + ): + t.append_column(Column("col", String, key="c2")) + + @testing.variation("field", ["name", "key"]) + def test_append_column_replace_existing(self, field: Variation): + if field.name: + t = Table("t", MetaData(), Column("col", Integer)) + t.append_column(Column("col", String), replace_existing=True) + is_true(isinstance(t.c.col.type, String)) + elif field.key: + t = Table("t", MetaData(), Column("col", Integer, key="c2")) + t.append_column( + Column("col", String, key="c2"), replace_existing=True + ) + is_true(isinstance(t.c.c2.type, String)) + else: + field.fail() def test_autoincrement_replace(self): m = MetaData() @@ -2658,7 +2666,7 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): assert t2.index("CREATE TABLE someschema.table2") > -1 -class UseExistingTest(fixtures.TablesTest): +class UseExistingTest(testing.AssertsCompiledSQL, fixtures.TablesTest): @classmethod def define_tables(cls, metadata): Table( @@ -2678,6 +2686,121 @@ class UseExistingTest(fixtures.TablesTest): def empty_meta(self): return MetaData() + @testing.variation( + "scenario", + [ + "inplace", + "inplace_ee", + "separate_ee_key_first", + "separate_ee_key_second", + "separate_ee_key_append_no_replace", + "separate_ee_key_append_replace", + ], + ) + @testing.variation("both_have_keys", [True, False]) + def test_table_w_two_same_named_columns( + self, empty_meta, scenario: Variation, both_have_keys: Variation + ): + + if scenario.inplace: + with expect_raises_message( + exc.DuplicateColumnError, + "A column with name 'b' is already present in table 'users'.", + ): + t1 = Table( + "users", + empty_meta, + Column("a", String), + Column("b", String, key="b1" if both_have_keys else None), + Column("b", String, key="b2"), + ) + return + elif scenario.inplace_ee: + t1 = Table( + "users", + empty_meta, + Column("a", String), + Column("b", String, key="b1" if both_have_keys else None), + Column("b", String, key="b2"), + extend_existing=True, + ) + elif scenario.separate_ee_key_first: + t1 = Table( + "users", + empty_meta, + Column("a", String), + Column("b", String, key="b2"), + ) + + expected_warnings = ( + [ + 'Column with user-specified key "b2" is being ' + 'replaced with plain named column "b", key "b2" ' + "is being removed." + ] + if not both_have_keys + else [] + ) + with expect_warnings(*expected_warnings): + t1 = Table( + "users", + empty_meta, + Column("a", String), + Column("b", String, key="b1" if both_have_keys else None), + extend_existing=True, + ) + elif scenario.separate_ee_key_second: + t1 = Table( + "users", + empty_meta, + Column("a", String), + Column("b", String, key="b1" if both_have_keys else None), + ) + t1 = Table( + "users", + empty_meta, + Column("a", String), + Column("b", String, key="b2"), + extend_existing=True, + ) + elif scenario.separate_ee_key_append_no_replace: + t1 = Table( + "users", + empty_meta, + Column("a", String), + Column("b", String, key="b1" if both_have_keys else None), + ) + with expect_raises_message( + exc.DuplicateColumnError, + r"A column with name 'b' is already present in table 'users'. " + r"Specify replace_existing=True to Table.append_column\(\) " + r"to replace an existing column.", + ): + t1.append_column(Column("b", String, key="b2")) + return + elif scenario.separate_ee_key_append_replace: + t1 = Table( + "users", + empty_meta, + Column("a", String), + Column("b", String, key="b1" if both_have_keys else None), + ) + t1.append_column( + Column("b", String, key="b2"), replace_existing=True + ) + + else: + scenario.fail() + + if scenario.separate_ee_key_first: + if both_have_keys: + eq_(t1.c.keys(), ["a", "b1"]) + else: + eq_(t1.c.keys(), ["a", "b"]) + else: + eq_(t1.c.keys(), ["a", "b2"]) + self.assert_compile(select(t1), "SELECT users.a, users.b FROM users") + def test_exception_no_flags(self, existing_meta): def go(): Table( |
