summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGord Thompson <gord@gordthompson.com>2020-09-18 16:33:17 -0600
committerGord Thompson <gord@gordthompson.com>2020-09-19 10:18:52 -0600
commitcec14b77e62d301540a29e5a81a671231239849d (patch)
tree16afd857ccc8a9184b4f5480dbafc22f2488019f
parent29575552b04f4d4e4f7373a8ddcaa2572046029e (diff)
downloadsqlalchemy-cec14b77e62d301540a29e5a81a671231239849d.tar.gz
Fix has_table() for mssql temporary tables
Fixes: #5597 Fixes the issue where :meth:`_reflection.has_table` always returns ``False`` for temporary tables. Change-Id: I03ab04c849a157ce8fd28c07ec3bf4407b0f2c94
-rw-r--r--doc/build/changelog/unreleased_14/5597.rst6
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py41
-rw-r--r--test/dialect/mssql/test_reflection.py16
3 files changed, 49 insertions, 14 deletions
diff --git a/doc/build/changelog/unreleased_14/5597.rst b/doc/build/changelog/unreleased_14/5597.rst
new file mode 100644
index 000000000..ee9343bc8
--- /dev/null
+++ b/doc/build/changelog/unreleased_14/5597.rst
@@ -0,0 +1,6 @@
+.. change::
+ :tags: mssql, bug, schema
+ :tickets: 5597
+
+ Fixed an issue where :meth:`_reflection.has_table` always returned
+ ``False`` for temporary tables. \ No newline at end of file
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index 7564536a5..2cbdc19aa 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -2756,21 +2756,34 @@ class MSDialect(default.DefaultDialect):
@_db_plus_owner
def has_table(self, connection, tablename, dbname, owner, schema):
- tables = ischema.tables
+ if tablename.startswith("#"): # temporary table
+ tables = ischema.mssql_temp_table_columns
+ result = connection.execute(
+ sql.select(tables.c.table_name)
+ .where(
+ tables.c.table_name.like(
+ self._temp_table_name_like_pattern(tablename)
+ )
+ )
+ .limit(1)
+ )
+ return result.scalar() is not None
+ else:
+ tables = ischema.tables
- s = sql.select(tables.c.table_name).where(
- sql.and_(
- tables.c.table_type == "BASE TABLE",
- tables.c.table_name == tablename,
+ s = sql.select(tables.c.table_name).where(
+ sql.and_(
+ tables.c.table_type == "BASE TABLE",
+ tables.c.table_name == tablename,
+ )
)
- )
- if owner:
- s = s.where(tables.c.table_schema == owner)
+ if owner:
+ s = s.where(tables.c.table_schema == owner)
- c = connection.execute(s)
+ c = connection.execute(s)
- return c.first() is not None
+ return c.first() is not None
@_db_plus_owner
def has_sequence(self, connection, sequencename, dbname, owner, schema):
@@ -2937,6 +2950,9 @@ class MSDialect(default.DefaultDialect):
view_def = rp.scalar()
return view_def
+ def _temp_table_name_like_pattern(self, tablename):
+ return tablename + (("___%") if not tablename.startswith("##") else "")
+
def _get_internal_temp_table_name(self, connection, tablename):
# it's likely that schema is always "dbo", but since we can
# get it here, let's get it.
@@ -2950,10 +2966,7 @@ class MSDialect(default.DefaultDialect):
"from tempdb.information_schema.tables "
"where table_name like :p1"
),
- {
- "p1": tablename
- + (("___%") if not tablename.startswith("##") else "")
- },
+ {"p1": self._temp_table_name_like_pattern(tablename)},
).one()
except exc.MultipleResultsFound as me:
util.raise_(
diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py
index bd64bedcb..c7d012f5b 100644
--- a/test/dialect/mssql/test_reflection.py
+++ b/test/dialect/mssql/test_reflection.py
@@ -300,6 +300,22 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
)
@testing.provide_metadata
+ @testing.combinations(
+ ("local_temp", "#tmp", True),
+ ("global_temp", "##tmp", True),
+ ("nonexistent", "#no_es_bueno", False),
+ id_="iaa",
+ argnames="table_name, exists",
+ )
+ def test_has_table_temporary(self, connection, table_name, exists):
+ if exists:
+ tt = Table(table_name, self.metadata, Column("id", Integer),)
+ tt.create(connection)
+
+ found_it = testing.db.dialect.has_table(connection, table_name)
+ eq_(found_it, exists)
+
+ @testing.provide_metadata
def test_db_qualified_items(self):
metadata = self.metadata
Table("foo", metadata, Column("id", Integer, primary_key=True))