diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-09-17 15:15:21 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-09-17 15:15:21 -0400 |
| commit | cb23fa243f5138aac7acb2a134d567f1a297d42e (patch) | |
| tree | 7cd9b791c2574eb5d9f679da1c2d0ca0840f0ae9 /lib | |
| parent | 1217d6ce97bd469b3ec2c17f6f955730059d571f (diff) | |
| download | sqlalchemy-cb23fa243f5138aac7acb2a134d567f1a297d42e.tar.gz | |
- Added :meth:`.Inspector.get_temp_table_names` and
:meth:`.Inspector.get_temp_view_names`; currently, only the
SQLite dialect supports these methods. The return of temporary
table and view names has been **removed** from SQLite's version
of :meth:`.Inspector.get_table_names` and
:meth:`.Inspector.get_view_names`; other database backends cannot
support this information (such as MySQL), and the scope of operation
is different in that the tables can be local to a session and
typically aren't supported in remote schemas.
fixes #3204
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/sqlalchemy/dialects/sqlite/base.py | 49 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/interfaces.py | 18 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/reflection.py | 24 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/requirements.py | 14 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_reflection.py | 78 |
5 files changed, 156 insertions, 27 deletions
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index af793d275..b0bf670a6 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -829,20 +829,26 @@ class SQLiteDialect(default.DefaultDialect): if schema is not None: qschema = self.identifier_preparer.quote_identifier(schema) master = '%s.sqlite_master' % qschema - s = ("SELECT name FROM %s " - "WHERE type='table' ORDER BY name") % (master,) - rs = connection.execute(s) else: - try: - s = ("SELECT name FROM " - " (SELECT * FROM sqlite_master UNION ALL " - " SELECT * FROM sqlite_temp_master) " - "WHERE type='table' ORDER BY name") - rs = connection.execute(s) - except exc.DBAPIError: - s = ("SELECT name FROM sqlite_master " - "WHERE type='table' ORDER BY name") - rs = connection.execute(s) + master = "sqlite_master" + s = ("SELECT name FROM %s " + "WHERE type='table' ORDER BY name") % (master,) + rs = connection.execute(s) + return [row[0] for row in rs] + + @reflection.cache + def get_temp_table_names(self, connection, **kw): + s = "SELECT name FROM sqlite_temp_master "\ + "WHERE type='table' ORDER BY name " + rs = connection.execute(s) + + return [row[0] for row in rs] + + @reflection.cache + def get_temp_view_names(self, connection, **kw): + s = "SELECT name FROM sqlite_temp_master "\ + "WHERE type='view' ORDER BY name " + rs = connection.execute(s) return [row[0] for row in rs] @@ -869,20 +875,11 @@ class SQLiteDialect(default.DefaultDialect): if schema is not None: qschema = self.identifier_preparer.quote_identifier(schema) master = '%s.sqlite_master' % qschema - s = ("SELECT name FROM %s " - "WHERE type='view' ORDER BY name") % (master,) - rs = connection.execute(s) else: - try: - s = ("SELECT name FROM " - " (SELECT * FROM sqlite_master UNION ALL " - " SELECT * FROM sqlite_temp_master) " - "WHERE type='view' ORDER BY name") - rs = connection.execute(s) - except exc.DBAPIError: - s = ("SELECT name FROM sqlite_master " - "WHERE type='view' ORDER BY name") - rs = connection.execute(s) + master = "sqlite_master" + s = ("SELECT name FROM %s " + "WHERE type='view' ORDER BY name") % (master,) + rs = connection.execute(s) return [row[0] for row in rs] diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 71df29cac..0ad2efae0 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -308,7 +308,15 @@ class Dialect(object): def get_table_names(self, connection, schema=None, **kw): """Return a list of table names for `schema`.""" - raise NotImplementedError + raise NotImplementedError() + + def get_temp_table_names(self, connection, schema=None, **kw): + """Return a list of temporary table names on the given connection, + if supported by the underlying backend. + + """ + + raise NotImplementedError() def get_view_names(self, connection, schema=None, **kw): """Return a list of all view names available in the database. @@ -319,6 +327,14 @@ class Dialect(object): raise NotImplementedError() + def get_temp_view_names(self, connection, schema=None, **kw): + """Return a list of temporary view names on the given connection, + if supported by the underlying backend. + + """ + + raise NotImplementedError() + def get_view_definition(self, connection, view_name, schema=None, **kw): """Return view definition. diff --git a/lib/sqlalchemy/engine/reflection.py b/lib/sqlalchemy/engine/reflection.py index cf1f2d3dd..c0a3240a5 100644 --- a/lib/sqlalchemy/engine/reflection.py +++ b/lib/sqlalchemy/engine/reflection.py @@ -201,6 +201,30 @@ class Inspector(object): tnames = list(topological.sort(tuples, tnames)) return tnames + def get_temp_table_names(self): + """return a list of temporary table names for the current bind. + + This method is unsupported by most dialects; currently + only SQLite implements it. + + .. versionadded:: 1.0.0 + + """ + return self.dialect.get_temp_table_names( + self.bind, info_cache=self.info_cache) + + def get_temp_view_names(self): + """return a list of temporary view names for the current bind. + + This method is unsupported by most dialects; currently + only SQLite implements it. + + .. versionadded:: 1.0.0 + + """ + return self.dialect.get_temp_view_names( + self.bind, info_cache=self.info_cache) + def get_table_options(self, table_name, schema=None, **kw): """Return a dictionary of options specified when the table of the given name was created. diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index a04bcbbdd..da3e3128a 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -314,6 +314,20 @@ class SuiteRequirements(Requirements): return exclusions.open() @property + def temp_table_reflection(self): + return exclusions.open() + + @property + def temp_table_names(self): + """target dialect supports listing of temporary table names""" + return exclusions.closed() + + @property + def temporary_views(self): + """target database supports temporary views""" + return exclusions.closed() + + @property def index_reflection(self): return exclusions.open() diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index 575a38db9..690a880bb 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -95,6 +95,27 @@ class ComponentReflectionTest(fixtures.TablesTest): cls.define_index(metadata, users) if testing.requires.view_column_reflection.enabled: cls.define_views(metadata, schema) + if not schema and testing.requires.temp_table_reflection.enabled: + cls.define_temp_tables(metadata) + + @classmethod + def define_temp_tables(cls, metadata): + temp_table = Table( + "user_tmp", metadata, + Column("id", sa.INT, primary_key=True), + Column('name', sa.VARCHAR(50)), + Column('foo', sa.INT), + sa.UniqueConstraint('name', name='user_tmp_uq'), + sa.Index("user_tmp_ix", "foo"), + prefixes=['TEMPORARY'] + ) + if testing.requires.view_reflection.enabled and \ + testing.requires.temporary_views.enabled: + event.listen( + temp_table, "after_create", + DDL("create temporary view user_tmp_v as " + "select * from user_tmp") + ) @classmethod def define_index(cls, metadata, users): @@ -147,6 +168,7 @@ class ComponentReflectionTest(fixtures.TablesTest): users, addresses, dingalings = self.tables.users, \ self.tables.email_addresses, self.tables.dingalings insp = inspect(meta.bind) + if table_type == 'view': table_names = insp.get_view_names(schema) table_names.sort() @@ -162,6 +184,20 @@ class ComponentReflectionTest(fixtures.TablesTest): answer = ['dingalings', 'email_addresses', 'users'] eq_(sorted(table_names), answer) + @testing.requires.temp_table_names + def test_get_temp_table_names(self): + insp = inspect(self.metadata.bind) + temp_table_names = insp.get_temp_table_names() + eq_(sorted(temp_table_names), ['user_tmp']) + + @testing.requires.view_reflection + @testing.requires.temp_table_names + @testing.requires.temporary_views + def test_get_temp_view_names(self): + insp = inspect(self.metadata.bind) + temp_table_names = insp.get_temp_view_names() + eq_(sorted(temp_table_names), ['user_tmp_v']) + @testing.requires.table_reflection def test_get_table_names(self): self._test_get_table_names() @@ -294,6 +330,28 @@ class ComponentReflectionTest(fixtures.TablesTest): def test_get_columns_with_schema(self): self._test_get_columns(schema=testing.config.test_schema) + @testing.requires.temp_table_reflection + def test_get_temp_table_columns(self): + meta = MetaData(testing.db) + user_tmp = self.tables.user_tmp + insp = inspect(meta.bind) + cols = insp.get_columns('user_tmp') + self.assert_(len(cols) > 0, len(cols)) + + for i, col in enumerate(user_tmp.columns): + eq_(col.name, cols[i]['name']) + + @testing.requires.temp_table_reflection + @testing.requires.view_column_reflection + @testing.requires.temporary_views + def test_get_temp_view_columns(self): + insp = inspect(self.metadata.bind) + cols = insp.get_columns('user_tmp_v') + eq_( + [col['name'] for col in cols], + ['id', 'name', 'foo'] + ) + @testing.requires.view_column_reflection def test_get_view_columns(self): self._test_get_columns(table_type='view') @@ -426,6 +484,26 @@ class ComponentReflectionTest(fixtures.TablesTest): def test_get_unique_constraints(self): self._test_get_unique_constraints() + @testing.requires.temp_table_reflection + def test_get_temp_table_unique_constraints(self): + insp = inspect(self.metadata.bind) + eq_( + insp.get_unique_constraints('user_tmp'), + [{'column_names': ['name'], 'name': 'user_tmp_uq'}] + ) + + @testing.requires.temp_table_reflection + def test_get_temp_table_indexes(self): + insp = inspect(self.metadata.bind) + indexes = insp.get_indexes('user_tmp') + eq_( + # TODO: we need to add better filtering for indexes/uq constraints + # that are doubled up + [idx for idx in indexes if idx['name'] == 'user_tmp_ix'], + [{'unique': False, 'column_names': ['foo'], 'name': 'user_tmp_ix'}] + ) + + @testing.requires.unique_constraint_reflection @testing.requires.schemas def test_get_unique_constraints_with_schema(self): |
