summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/sqlite/base.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-12-13 18:04:11 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2014-12-13 19:24:56 -0500
commit468db416dbf284f0e7dddde90ec9641dc89428c6 (patch)
tree061c600e0a2cd834d60137f901b5088a9c6eb664 /lib/sqlalchemy/dialects/sqlite/base.py
parent5b146e1bab7b440038c356f388e3362a669399c1 (diff)
downloadsqlalchemy-468db416dbf284f0e7dddde90ec9641dc89428c6.tar.gz
- rework sqlite FK and unique constraint system to combine both PRAGMA
and regexp parsing of SQL in order to form a complete picture of constraints + their names. fixes #3244 fixes #3261 - factor various PRAGMA work to be centralized into one call
Diffstat (limited to 'lib/sqlalchemy/dialects/sqlite/base.py')
-rw-r--r--lib/sqlalchemy/dialects/sqlite/base.py299
1 files changed, 187 insertions, 112 deletions
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py
index 30d8a6ea3..e79299527 100644
--- a/lib/sqlalchemy/dialects/sqlite/base.py
+++ b/lib/sqlalchemy/dialects/sqlite/base.py
@@ -913,22 +913,9 @@ class SQLiteDialect(default.DefaultDialect):
return [row[0] for row in rs]
def has_table(self, connection, table_name, schema=None):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- pragma = "PRAGMA %s." % quote(schema)
- else:
- pragma = "PRAGMA "
- qtable = quote(table_name)
- statement = "%stable_info(%s)" % (pragma, qtable)
- cursor = _pragma_cursor(connection.execute(statement))
- row = cursor.fetchone()
-
- # consume remaining rows, to work around
- # http://www.sqlite.org/cvstrac/tktview?tn=1884
- while not cursor.closed and cursor.fetchone() is not None:
- pass
-
- return row is not None
+ info = self._get_table_pragma(
+ connection, "table_info", table_name, schema=schema)
+ return bool(info)
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
@@ -970,18 +957,11 @@ class SQLiteDialect(default.DefaultDialect):
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- pragma = "PRAGMA %s." % quote(schema)
- else:
- pragma = "PRAGMA "
- qtable = quote(table_name)
- statement = "%stable_info(%s)" % (pragma, qtable)
- c = _pragma_cursor(connection.execute(statement))
+ info = self._get_table_pragma(
+ connection, "table_info", table_name, schema=schema)
- rows = c.fetchall()
columns = []
- for row in rows:
+ for row in info:
(name, type_, nullable, default, primary_key) = (
row[1], row[2].upper(), not row[3], row[4], row[5])
@@ -1068,92 +1048,192 @@ class SQLiteDialect(default.DefaultDialect):
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- pragma = "PRAGMA %s." % quote(schema)
- else:
- pragma = "PRAGMA "
- qtable = quote(table_name)
- statement = "%sforeign_key_list(%s)" % (pragma, qtable)
- c = _pragma_cursor(connection.execute(statement))
- fkeys = []
+ # sqlite makes this *extremely difficult*.
+ # First, use the pragma to get the actual FKs.
+ pragma_fks = self._get_table_pragma(
+ connection, "foreign_key_list",
+ table_name, schema=schema
+ )
+
fks = {}
- while True:
- row = c.fetchone()
- if row is None:
- break
+
+ for row in pragma_fks:
(numerical_id, rtbl, lcol, rcol) = (
row[0], row[2], row[3], row[4])
- self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol)
- return fkeys
+ if rcol is None:
+ rcol = lcol
- def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol):
- # sqlite won't return rcol if the table was created with REFERENCES
- # <tablename>, no col
- if rcol is None:
- rcol = lcol
+ if self._broken_fk_pragma_quotes:
+ rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl)
- if self._broken_fk_pragma_quotes:
- rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl)
+ if numerical_id in fks:
+ fk = fks[numerical_id]
+ else:
+ fk = fks[numerical_id] = {
+ 'name': None,
+ 'constrained_columns': [],
+ 'referred_schema': None,
+ 'referred_table': rtbl,
+ 'referred_columns': [],
+ }
+ fks[numerical_id] = fk
- try:
- fk = fks[numerical_id]
- except KeyError:
- fk = {
- 'name': None,
- 'constrained_columns': [],
- 'referred_schema': None,
- 'referred_table': rtbl,
- 'referred_columns': [],
- }
- fkeys.append(fk)
- fks[numerical_id] = fk
-
- if lcol not in fk['constrained_columns']:
fk['constrained_columns'].append(lcol)
- if rcol not in fk['referred_columns']:
fk['referred_columns'].append(rcol)
- return fk
+
+ def fk_sig(constrained_columns, referred_table, referred_columns):
+ return tuple(constrained_columns) + (referred_table,) + \
+ tuple(referred_columns)
+
+ # then, parse the actual SQL and attempt to find DDL that matches
+ # the names as well. SQLite saves the DDL in whatever format
+ # it was typed in as, so need to be liberal here.
+
+ keys_by_signature = dict(
+ (
+ fk_sig(
+ fk['constrained_columns'],
+ fk['referred_table'], fk['referred_columns']),
+ fk
+ ) for fk in fks.values()
+ )
+
+ table_data = self._get_table_sql(connection, table_name, schema=schema)
+ if table_data is None:
+ # system tables, etc.
+ return []
+
+ def parse_fks():
+ FK_PATTERN = (
+ '(?:CONSTRAINT (\w+) +)?'
+ 'FOREIGN KEY *\( *(.+?) *\) +'
+ 'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\((.+?)\)'
+ )
+
+ for match in re.finditer(FK_PATTERN, table_data, re.I):
+ (
+ constraint_name, constrained_columns,
+ referred_quoted_name, referred_name,
+ referred_columns) = match.group(1, 2, 3, 4, 5)
+ constrained_columns = list(
+ self._find_cols_in_sig(constrained_columns))
+ if not referred_columns:
+ referred_columns = constrained_columns
+ else:
+ referred_columns = list(
+ self._find_cols_in_sig(referred_columns))
+ referred_name = referred_quoted_name or referred_name
+ yield (
+ constraint_name, constrained_columns,
+ referred_name, referred_columns)
+ fkeys = []
+
+ for (
+ constraint_name, constrained_columns,
+ referred_name, referred_columns) in parse_fks():
+ sig = fk_sig(
+ constrained_columns, referred_name, referred_columns)
+ if sig not in keys_by_signature:
+ util.warn(
+ "WARNING: SQL-parsed foreign key constraint "
+ "'%s' could not be located in PRAGMA "
+ "foreign_keys for table %s" % (
+ sig,
+ table_name
+ ))
+ continue
+ key = keys_by_signature.pop(sig)
+ key['name'] = constraint_name
+ fkeys.append(key)
+ # assume the remainders are the unnamed, inline constraints, just
+ # use them as is as it's extremely difficult to parse inline
+ # constraints
+ fkeys.extend(keys_by_signature.values())
+ return fkeys
+
+ def _find_cols_in_sig(self, sig):
+ for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
+ yield match.group(1) or match.group(2)
+
+ @reflection.cache
+ def get_unique_constraints(self, connection, table_name,
+ schema=None, **kw):
+
+ auto_index_by_sig = {}
+ for idx in self.get_indexes(
+ connection, table_name, schema=schema,
+ include_auto_indexes=True, **kw):
+ if not idx['name'].startswith("sqlite_autoindex"):
+ continue
+ sig = tuple(idx['column_names'])
+ auto_index_by_sig[sig] = idx
+
+ table_data = self._get_table_sql(
+ connection, table_name, schema=schema, **kw)
+ if not table_data:
+ return []
+
+ unique_constraints = []
+
+ def parse_uqs():
+ UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) +)?UNIQUE *\((.+?)\)'
+ INLINE_UNIQUE_PATTERN = (
+ '(?:(".+?")|([a-z0-9]+)) '
+ '+[a-z0-9_ ]+? +UNIQUE')
+
+ for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
+ name, cols = match.group(1, 2)
+ yield name, list(self._find_cols_in_sig(cols))
+
+ # we need to match inlines as well, as we seek to differentiate
+ # a UNIQUE constraint from a UNIQUE INDEX, even though these
+ # are kind of the same thing :)
+ for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
+ cols = list(
+ self._find_cols_in_sig(match.group(1) or match.group(2)))
+ yield None, cols
+
+ for name, cols in parse_uqs():
+ sig = tuple(cols)
+ if sig in auto_index_by_sig:
+ auto_index_by_sig.pop(sig)
+ parsed_constraint = {
+ 'name': name,
+ 'column_names': cols
+ }
+ unique_constraints.append(parsed_constraint)
+ # NOTE: auto_index_by_sig might not be empty here,
+ # the PRIMARY KEY may have an entry.
+ return unique_constraints
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- pragma = "PRAGMA %s." % quote(schema)
- else:
- pragma = "PRAGMA "
- include_auto_indexes = kw.pop('include_auto_indexes', False)
- qtable = quote(table_name)
- statement = "%sindex_list(%s)" % (pragma, qtable)
- c = _pragma_cursor(connection.execute(statement))
+ pragma_indexes = self._get_table_pragma(
+ connection, "index_list", table_name, schema=schema)
indexes = []
- while True:
- row = c.fetchone()
- if row is None:
- break
+
+ include_auto_indexes = kw.pop('include_auto_indexes', False)
+ for row in pragma_indexes:
# ignore implicit primary key index.
# http://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
- elif (not include_auto_indexes and
- row[1].startswith('sqlite_autoindex')):
+ if (not include_auto_indexes and
+ row[1].startswith('sqlite_autoindex')):
continue
indexes.append(dict(name=row[1], column_names=[], unique=row[2]))
+
# loop thru unique indexes to get the column names.
for idx in indexes:
- statement = "%sindex_info(%s)" % (pragma, quote(idx['name']))
- c = connection.execute(statement)
- cols = idx['column_names']
- while True:
- row = c.fetchone()
- if row is None:
- break
- cols.append(row[2])
+ pragma_index = self._get_table_pragma(
+ connection, "index_info", idx['name'])
+
+ for row in pragma_index:
+ idx['column_names'].append(row[2])
return indexes
@reflection.cache
- def get_unique_constraints(self, connection, table_name,
- schema=None, **kw):
+ def _get_table_sql(self, connection, table_name, schema=None, **kw):
try:
s = ("SELECT sql FROM "
" (SELECT * FROM sqlite_master UNION ALL "
@@ -1165,27 +1245,22 @@ class SQLiteDialect(default.DefaultDialect):
s = ("SELECT sql FROM sqlite_master WHERE name = '%s' "
"AND type = 'table'") % table_name
rs = connection.execute(s)
- row = rs.fetchone()
- if row is None:
- # sqlite won't return the schema for the sqlite_master or
- # sqlite_temp_master tables from this query. These tables
- # don't have any unique constraints anyway.
- return []
- table_data = row[0]
-
- UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) )?UNIQUE \(([^\)]+)\)'
- return [
- {'name': name,
- 'column_names': [col.strip(' "') for col in cols.split(',')]}
- for name, cols in re.findall(UNIQUE_PATTERN, table_data)
- ]
+ return rs.scalar()
-
-def _pragma_cursor(cursor):
- """work around SQLite issue whereby cursor.description
- is blank when PRAGMA returns no rows."""
-
- if cursor.closed:
- cursor.fetchone = lambda: None
- cursor.fetchall = lambda: []
- return cursor
+ def _get_table_pragma(self, connection, pragma, table_name, schema=None):
+ quote = self.identifier_preparer.quote_identifier
+ if schema is not None:
+ statement = "PRAGMA %s." % quote(schema)
+ else:
+ statement = "PRAGMA "
+ qtable = quote(table_name)
+ statement = "%s%s(%s)" % (statement, pragma, qtable)
+ cursor = connection.execute(statement)
+ if not cursor.closed:
+ # work around SQLite issue whereby cursor.description
+ # is blank when PRAGMA returns no rows:
+ # http://www.sqlite.org/cvstrac/tktview?tn=1884
+ result = cursor.fetchall()
+ else:
+ result = []
+ return result