summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-12-03 09:11:14 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2022-12-04 10:45:35 -0500
commit1284fa377e53f03cec061d7af16f269ad73fa7b9 (patch)
tree40e421506cd20f3cf8c12cba834062cd1bb0505c /test/sql
parent96db7cdd53ee9004be66545989b4ac5632bb7ccf (diff)
downloadsqlalchemy-1284fa377e53f03cec061d7af16f269ad73fa7b9.tar.gz
disallow same-named columns, unchecked replacement in Table
Fixed issue where table reflection using :paramref:`.Table.extend_existing` would fail to deduplicate a same-named column if the existing :class:`.Table` used a separate key. The :paramref:`.Table.autoload_replace` parameter would allow the column to be skipped but under no circumstances should a :class:`.Table` ever have the same-named column twice. Additionally, changed deprecation warnings to exceptions as were implemented in I1d58c8ebe081079cb669e7ead60886ffc1b1a7f5 . Fixes: #8925 Change-Id: I83d0f8658177a7ffbb06e01dbca91377d1a98d49
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_metadata.py185
1 files changed, 154 insertions, 31 deletions
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index e50aa236a..56bb89541 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -59,6 +59,7 @@ from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
+from sqlalchemy.testing import Variation
from sqlalchemy.testing.assertions import expect_warnings
@@ -1825,8 +1826,12 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
is_(t2.c.contains_column(g), False)
def test_table_ctor_duplicated_column_name(self):
- def go():
- return Table(
+ # when it will raise
+ with testing.expect_raises_message(
+ exc.ArgumentError,
+ "A column with name 'col' is already present in table 't'",
+ ):
+ Table(
"t",
MetaData(),
Column("a", Integer),
@@ -1834,39 +1839,42 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
Column("col", String),
)
- with testing.expect_deprecated(
- "A column with name 'col' is already present in table 't'",
- ):
- t = go()
- is_true(isinstance(t.c.col.type, String))
- # when it will raise
- # with testing.expect_raises_message(
- # exc.ArgumentError,
- # "A column with name 'col' is already present in table 't'",
- # ):
- # go()
-
def test_append_column_existing_name(self):
t = Table("t", MetaData(), Column("col", Integer))
- with testing.expect_deprecated(
- "A column with name 'col' is already present in table 't'",
+ with testing.expect_raises_message(
+ exc.DuplicateColumnError,
+ r"A column with name 'col' is already present in table 't'. "
+ r"Specify replace_existing=True to Table.append_column\(\) to "
+ r"replace an existing column.",
):
t.append_column(Column("col", String))
- is_true(isinstance(t.c.col.type, String))
- # when it will raise
- # col = t.c.col
- # with testing.expect_raises_message(
- # exc.ArgumentError,
- # "A column with name 'col' is already present in table 't'",
- # ):
- # t.append_column(Column("col", String))
- # is_true(t.c.col is col)
-
- def test_append_column_replace_existing(self):
- t = Table("t", MetaData(), Column("col", Integer))
- t.append_column(Column("col", String), replace_existing=True)
- is_true(isinstance(t.c.col.type, String))
+
+ def test_append_column_existing_key(self):
+ t = Table("t", MetaData(), Column("col", Integer, key="c2"))
+
+ with testing.expect_raises_message(
+ exc.DuplicateColumnError,
+ r"A column with key 'c2' is already present in table 't'. "
+ r"Specify replace_existing=True to Table.append_column\(\) "
+ r"to replace an existing column.",
+ ):
+ t.append_column(Column("col", String, key="c2"))
+
+ @testing.variation("field", ["name", "key"])
+ def test_append_column_replace_existing(self, field: Variation):
+ if field.name:
+ t = Table("t", MetaData(), Column("col", Integer))
+ t.append_column(Column("col", String), replace_existing=True)
+ is_true(isinstance(t.c.col.type, String))
+ elif field.key:
+ t = Table("t", MetaData(), Column("col", Integer, key="c2"))
+ t.append_column(
+ Column("col", String, key="c2"), replace_existing=True
+ )
+ is_true(isinstance(t.c.c2.type, String))
+ else:
+ field.fail()
def test_autoincrement_replace(self):
m = MetaData()
@@ -2658,7 +2666,7 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
assert t2.index("CREATE TABLE someschema.table2") > -1
-class UseExistingTest(fixtures.TablesTest):
+class UseExistingTest(testing.AssertsCompiledSQL, fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
Table(
@@ -2678,6 +2686,121 @@ class UseExistingTest(fixtures.TablesTest):
def empty_meta(self):
return MetaData()
+ @testing.variation(
+ "scenario",
+ [
+ "inplace",
+ "inplace_ee",
+ "separate_ee_key_first",
+ "separate_ee_key_second",
+ "separate_ee_key_append_no_replace",
+ "separate_ee_key_append_replace",
+ ],
+ )
+ @testing.variation("both_have_keys", [True, False])
+ def test_table_w_two_same_named_columns(
+ self, empty_meta, scenario: Variation, both_have_keys: Variation
+ ):
+
+ if scenario.inplace:
+ with expect_raises_message(
+ exc.DuplicateColumnError,
+ "A column with name 'b' is already present in table 'users'.",
+ ):
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ Column("b", String, key="b2"),
+ )
+ return
+ elif scenario.inplace_ee:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ Column("b", String, key="b2"),
+ extend_existing=True,
+ )
+ elif scenario.separate_ee_key_first:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b2"),
+ )
+
+ expected_warnings = (
+ [
+ 'Column with user-specified key "b2" is being '
+ 'replaced with plain named column "b", key "b2" '
+ "is being removed."
+ ]
+ if not both_have_keys
+ else []
+ )
+ with expect_warnings(*expected_warnings):
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ extend_existing=True,
+ )
+ elif scenario.separate_ee_key_second:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ )
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b2"),
+ extend_existing=True,
+ )
+ elif scenario.separate_ee_key_append_no_replace:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ )
+ with expect_raises_message(
+ exc.DuplicateColumnError,
+ r"A column with name 'b' is already present in table 'users'. "
+ r"Specify replace_existing=True to Table.append_column\(\) "
+ r"to replace an existing column.",
+ ):
+ t1.append_column(Column("b", String, key="b2"))
+ return
+ elif scenario.separate_ee_key_append_replace:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ )
+ t1.append_column(
+ Column("b", String, key="b2"), replace_existing=True
+ )
+
+ else:
+ scenario.fail()
+
+ if scenario.separate_ee_key_first:
+ if both_have_keys:
+ eq_(t1.c.keys(), ["a", "b1"])
+ else:
+ eq_(t1.c.keys(), ["a", "b"])
+ else:
+ eq_(t1.c.keys(), ["a", "b2"])
+ self.assert_compile(select(t1), "SELECT users.a, users.b FROM users")
+
def test_exception_no_flags(self, existing_meta):
def go():
Table(