diff options
author | Gord Thompson <gord@gordthompson.com> | 2020-09-18 16:33:17 -0600 |
---|---|---|
committer | Gord Thompson <gord@gordthompson.com> | 2020-09-19 10:18:52 -0600 |
commit | cec14b77e62d301540a29e5a81a671231239849d (patch) | |
tree | 16afd857ccc8a9184b4f5480dbafc22f2488019f | |
parent | 29575552b04f4d4e4f7373a8ddcaa2572046029e (diff) | |
download | sqlalchemy-cec14b77e62d301540a29e5a81a671231239849d.tar.gz |
Fix has_table() for mssql temporary tables
Fixes: #5597
Fixes the issue where :meth:`_reflection.has_table` always returns
``False`` for temporary tables.
Change-Id: I03ab04c849a157ce8fd28c07ec3bf4407b0f2c94
-rw-r--r-- | doc/build/changelog/unreleased_14/5597.rst | 6 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/mssql/base.py | 41 | ||||
-rw-r--r-- | test/dialect/mssql/test_reflection.py | 16 |
3 files changed, 49 insertions, 14 deletions
diff --git a/doc/build/changelog/unreleased_14/5597.rst b/doc/build/changelog/unreleased_14/5597.rst new file mode 100644 index 000000000..ee9343bc8 --- /dev/null +++ b/doc/build/changelog/unreleased_14/5597.rst @@ -0,0 +1,6 @@ +.. change:: + :tags: mssql, bug, schema + :tickets: 5597 + + Fixed an issue where :meth:`_reflection.has_table` always returned + ``False`` for temporary tables.
\ No newline at end of file diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index 7564536a5..2cbdc19aa 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -2756,21 +2756,34 @@ class MSDialect(default.DefaultDialect): @_db_plus_owner def has_table(self, connection, tablename, dbname, owner, schema): - tables = ischema.tables + if tablename.startswith("#"): # temporary table + tables = ischema.mssql_temp_table_columns + result = connection.execute( + sql.select(tables.c.table_name) + .where( + tables.c.table_name.like( + self._temp_table_name_like_pattern(tablename) + ) + ) + .limit(1) + ) + return result.scalar() is not None + else: + tables = ischema.tables - s = sql.select(tables.c.table_name).where( - sql.and_( - tables.c.table_type == "BASE TABLE", - tables.c.table_name == tablename, + s = sql.select(tables.c.table_name).where( + sql.and_( + tables.c.table_type == "BASE TABLE", + tables.c.table_name == tablename, + ) ) - ) - if owner: - s = s.where(tables.c.table_schema == owner) + if owner: + s = s.where(tables.c.table_schema == owner) - c = connection.execute(s) + c = connection.execute(s) - return c.first() is not None + return c.first() is not None @_db_plus_owner def has_sequence(self, connection, sequencename, dbname, owner, schema): @@ -2937,6 +2950,9 @@ class MSDialect(default.DefaultDialect): view_def = rp.scalar() return view_def + def _temp_table_name_like_pattern(self, tablename): + return tablename + (("___%") if not tablename.startswith("##") else "") + def _get_internal_temp_table_name(self, connection, tablename): # it's likely that schema is always "dbo", but since we can # get it here, let's get it. @@ -2950,10 +2966,7 @@ class MSDialect(default.DefaultDialect): "from tempdb.information_schema.tables " "where table_name like :p1" ), - { - "p1": tablename - + (("___%") if not tablename.startswith("##") else "") - }, + {"p1": self._temp_table_name_like_pattern(tablename)}, ).one() except exc.MultipleResultsFound as me: util.raise_( diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py index bd64bedcb..c7d012f5b 100644 --- a/test/dialect/mssql/test_reflection.py +++ b/test/dialect/mssql/test_reflection.py @@ -300,6 +300,22 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): ) @testing.provide_metadata + @testing.combinations( + ("local_temp", "#tmp", True), + ("global_temp", "##tmp", True), + ("nonexistent", "#no_es_bueno", False), + id_="iaa", + argnames="table_name, exists", + ) + def test_has_table_temporary(self, connection, table_name, exists): + if exists: + tt = Table(table_name, self.metadata, Column("id", Integer),) + tt.create(connection) + + found_it = testing.db.dialect.has_table(connection, table_name) + eq_(found_it, exists) + + @testing.provide_metadata def test_db_qualified_items(self): metadata = self.metadata Table("foo", metadata, Column("id", Integer, primary_key=True)) |