# mysql/reflection.py # Copyright (C) 2005-2018 the SQLAlchemy authors and contributors # # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php import re from ... import log, util from ... import types as sqltypes from .enumerated import _EnumeratedValues, SET from .types import DATETIME, TIME, TIMESTAMP class ReflectedState(object): """Stores raw information about a SHOW CREATE TABLE statement.""" def __init__(self): self.columns = [] self.table_options = {} self.table_name = None self.keys = [] self.fk_constraints = [] self.ck_constraints = [] @log.class_logger class MySQLTableDefinitionParser(object): """Parses the results of a SHOW CREATE TABLE statement.""" def __init__(self, dialect, preparer): self.dialect = dialect self.preparer = preparer self._prep_regexes() def parse(self, show_create, charset): state = ReflectedState() state.charset = charset for line in re.split(r"\r?\n", show_create): if line.startswith(" " + self.preparer.initial_quote): self._parse_column(line, state) # a regular table options line elif line.startswith(") "): self._parse_table_options(line, state) # an ANSI-mode table options line elif line == ")": pass elif line.startswith("CREATE "): self._parse_table_name(line, state) # Not present in real reflection, but may be if # loading from a file. elif not line: pass else: type_, spec = self._parse_constraints(line) if type_ is None: util.warn("Unknown schema content: %r" % line) elif type_ == "key": state.keys.append(spec) elif type_ == "fk_constraint": state.fk_constraints.append(spec) elif type_ == "ck_constraint": state.ck_constraints.append(spec) else: pass return state def _parse_constraints(self, line): """Parse a KEY or CONSTRAINT line. :param line: A line of SHOW CREATE TABLE output """ # KEY m = self._re_key.match(line) if m: spec = m.groupdict() # convert columns into name, length pairs # NOTE: we may want to consider SHOW INDEX as the # format of indexes in MySQL becomes more complex spec["columns"] = self._parse_keyexprs(spec["columns"]) if spec["version_sql"]: m2 = self._re_key_version_sql.match(spec["version_sql"]) if m2 and m2.groupdict()["parser"]: spec["parser"] = m2.groupdict()["parser"] if spec["parser"]: spec["parser"] = self.preparer.unformat_identifiers( spec["parser"] )[0] return "key", spec # FOREIGN KEY CONSTRAINT m = self._re_fk_constraint.match(line) if m: spec = m.groupdict() spec["table"] = self.preparer.unformat_identifiers(spec["table"]) spec["local"] = [c[0] for c in self._parse_keyexprs(spec["local"])] spec["foreign"] = [ c[0] for c in self._parse_keyexprs(spec["foreign"]) ] return "fk_constraint", spec # CHECK constraint m = self._re_ck_constraint.match(line) if m: spec = m.groupdict() return "ck_constraint", spec # PARTITION and SUBPARTITION m = self._re_partition.match(line) if m: # Punt! return "partition", line # No match. return (None, line) def _parse_table_name(self, line, state): """Extract the table name. :param line: The first line of SHOW CREATE TABLE """ regex, cleanup = self._pr_name m = regex.match(line) if m: state.table_name = cleanup(m.group("name")) def _parse_table_options(self, line, state): """Build a dictionary of all reflected table-level options. :param line: The final line of SHOW CREATE TABLE output. """ options = {} if not line or line == ")": pass else: rest_of_line = line[:] for regex, cleanup in self._pr_options: m = regex.search(rest_of_line) if not m: continue directive, value = m.group("directive"), m.group("val") if cleanup: value = cleanup(value) options[directive.lower()] = value rest_of_line = regex.sub("", rest_of_line) for nope in ("auto_increment", "data directory", "index directory"): options.pop(nope, None) for opt, val in options.items(): state.table_options["%s_%s" % (self.dialect.name, opt)] = val def _parse_column(self, line, state): """Extract column details. Falls back to a 'minimal support' variant if full parse fails. :param line: Any column-bearing line from SHOW CREATE TABLE """ spec = None m = self._re_column.match(line) if m: spec = m.groupdict() spec["full"] = True else: m = self._re_column_loose.match(line) if m: spec = m.groupdict() spec["full"] = False if not spec: util.warn("Unknown column definition %r" % line) return if not spec["full"]: util.warn("Incomplete reflection of column definition %r" % line) name, type_, args = spec["name"], spec["coltype"], spec["arg"] try: col_type = self.dialect.ischema_names[type_] except KeyError: util.warn( "Did not recognize type '%s' of column '%s'" % (type_, name) ) col_type = sqltypes.NullType # Column type positional arguments eg. varchar(32) if args is None or args == "": type_args = [] elif args[0] == "'" and args[-1] == "'": type_args = self._re_csv_str.findall(args) else: type_args = [int(v) for v in self._re_csv_int.findall(args)] # Column type keyword options type_kw = {} if issubclass(col_type, (DATETIME, TIME, TIMESTAMP)): if type_args: type_kw["fsp"] = type_args.pop(0) for kw in ("unsigned", "zerofill"): if spec.get(kw, False): type_kw[kw] = True for kw in ("charset", "collate"): if spec.get(kw, False): type_kw[kw] = spec[kw] if issubclass(col_type, _EnumeratedValues): type_args = _EnumeratedValues._strip_values(type_args) if issubclass(col_type, SET) and "" in type_args: type_kw["retrieve_as_bitwise"] = True type_instance = col_type(*type_args, **type_kw) col_kw = {} # NOT NULL col_kw["nullable"] = True # this can be "NULL" in the case of TIMESTAMP if spec.get("notnull", False) == "NOT NULL": col_kw["nullable"] = False # AUTO_INCREMENT if spec.get("autoincr", False): col_kw["autoincrement"] = True elif issubclass(col_type, sqltypes.Integer): col_kw["autoincrement"] = False # DEFAULT default = spec.get("default", None) if default == "NULL": # eliminates the need to deal with this later. default = None comment = spec.get("comment", None) if comment is not None: comment = comment.replace("\\\\", "\\").replace("''", "'") col_d = dict( name=name, type=type_instance, default=default, comment=comment ) col_d.update(col_kw) state.columns.append(col_d) def _describe_to_create(self, table_name, columns): """Re-format DESCRIBE output as a SHOW CREATE TABLE string. DESCRIBE is a much simpler reflection and is sufficient for reflecting views for runtime use. This method formats DDL for columns only- keys are omitted. :param columns: A sequence of DESCRIBE or SHOW COLUMNS 6-tuples. SHOW FULL COLUMNS FROM rows must be rearranged for use with this function. """ buffer = [] for row in columns: (name, col_type, nullable, default, extra) = [ row[i] for i in (0, 1, 2, 4, 5) ] line = [" "] line.append(self.preparer.quote_identifier(name)) line.append(col_type) if not nullable: line.append("NOT NULL") if default: if "auto_increment" in default: pass elif col_type.startswith("timestamp") and default.startswith( "C" ): line.append("DEFAULT") line.append(default) elif default == "NULL": line.append("DEFAULT") line.append(default) else: line.append("DEFAULT") line.append("'%s'" % default.replace("'", "''")) if extra: line.append(extra) buffer.append(" ".join(line)) return "".join( [ ( "CREATE TABLE %s (\n" % self.preparer.quote_identifier(table_name) ), ",\n".join(buffer), "\n) ", ] ) def _parse_keyexprs(self, identifiers): """Unpack '"col"(2),"col" ASC'-ish strings into components.""" return self._re_keyexprs.findall(identifiers) def _prep_regexes(self): """Pre-compile regular expressions.""" self._re_columns = [] self._pr_options = [] _final = self.preparer.final_quote quotes = dict( zip( ("iq", "fq", "esc_fq"), [ re.escape(s) for s in ( self.preparer.initial_quote, _final, self.preparer._escape_identifier(_final), ) ], ) ) self._pr_name = _pr_compile( r"^CREATE (?:\w+ +)?TABLE +" r"%(iq)s(?P(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +\($" % quotes, self.preparer._unescape_identifier, ) # `col`,`col2`(32),`col3`(15) DESC # self._re_keyexprs = _re_compile( r"(?:" r"(?:%(iq)s((?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)" r"(?:\((\d+)\))?(?: +(ASC|DESC))?(?=\,|$))+" % quotes ) # 'foo' or 'foo','bar' or 'fo,o','ba''a''r' self._re_csv_str = _re_compile(r"\x27(?:\x27\x27|[^\x27])*\x27") # 123 or 123,456 self._re_csv_int = _re_compile(r"\d+") # `colname` [type opts] # (NOT NULL | NULL) # DEFAULT ('value' | CURRENT_TIMESTAMP...) # COMMENT 'comment' # COLUMN_FORMAT (FIXED|DYNAMIC|DEFAULT) # STORAGE (DISK|MEMORY) self._re_column = _re_compile( r" " r"%(iq)s(?P(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +" r"(?P\w+)" r"(?:\((?P(?:\d+|\d+,\d+|" r"(?:'(?:''|[^'])*',?)+))\))?" r"(?: +(?PUNSIGNED))?" r"(?: +(?PZEROFILL))?" r"(?: +CHARACTER SET +(?P[\w_]+))?" r"(?: +COLLATE +(?P[\w_]+))?" r"(?: +(?P(?:NOT )?NULL))?" r"(?: +DEFAULT +(?P" r"(?:NULL|'(?:''|[^'])*'|[\w\(\)]+" r"(?: +ON UPDATE [\w\(\)]+)?)" r"))?" r"(?: +(?PAUTO_INCREMENT))?" r"(?: +COMMENT +'(?P(?:''|[^'])*)')?" r"(?: +COLUMN_FORMAT +(?P\w+))?" r"(?: +STORAGE +(?P\w+))?" r"(?: +(?P.*))?" r",?$" % quotes ) # Fallback, try to parse as little as possible self._re_column_loose = _re_compile( r" " r"%(iq)s(?P(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +" r"(?P\w+)" r"(?:\((?P(?:\d+|\d+,\d+|\x27(?:\x27\x27|[^\x27])+\x27))\))?" r".*?(?P(?:NOT )NULL)?" % quotes ) # (PRIMARY|UNIQUE|FULLTEXT|SPATIAL) INDEX `name` (USING (BTREE|HASH))? # (`col` (ASC|DESC)?, `col` (ASC|DESC)?) # KEY_BLOCK_SIZE size | WITH PARSER name /*!50100 WITH PARSER name */ self._re_key = _re_compile( r" " r"(?:(?P\S+) )?KEY" r"(?: +%(iq)s(?P(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)?" r"(?: +USING +(?P\S+))?" r" +\((?P.+?)\)" r"(?: +USING +(?P\S+))?" r"(?: +KEY_BLOCK_SIZE *[ =]? *(?P\S+))?" r"(?: +WITH PARSER +(?P\S+))?" r"(?: +COMMENT +(?P(\x27\x27|\x27([^\x27])*?\x27)+))?" r"(?: +/\*(?P.+)\*/ +)?" r",?$" % quotes ) # https://forums.mysql.com/read.php?20,567102,567111#msg-567111 # It means if the MySQL version >= \d+, execute what's in the comment self._re_key_version_sql = _re_compile( r"\!\d+ " r"(?: *WITH PARSER +(?P\S+) *)?" ) # CONSTRAINT `name` FOREIGN KEY (`local_col`) # REFERENCES `remote` (`remote_col`) # MATCH FULL | MATCH PARTIAL | MATCH SIMPLE # ON DELETE CASCADE ON UPDATE RESTRICT # # unique constraints come back as KEYs kw = quotes.copy() kw["on"] = "RESTRICT|CASCADE|SET NULL|NOACTION" self._re_fk_constraint = _re_compile( r" " r"CONSTRAINT +" r"%(iq)s(?P(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +" r"FOREIGN KEY +" r"\((?P[^\)]+?)\) REFERENCES +" r"(?P%(iq)s[^%(fq)s]+%(fq)s" r"(?:\.%(iq)s[^%(fq)s]+%(fq)s)?) +" r"\((?P[^\)]+?)\)" r"(?: +(?PMATCH \w+))?" r"(?: +ON DELETE (?P%(on)s))?" r"(?: +ON UPDATE (?P%(on)s))?" % kw ) # CONSTRAINT `CONSTRAINT_1` CHECK (`x` > 5)' # testing on MariaDB 10.2 shows that the CHECK constraint # is returned on a line by itself, so to match without worrying # about parenthesis in the expresion we go to the end of the line self._re_ck_constraint = _re_compile( r" " r"CONSTRAINT +" r"%(iq)s(?P(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +" r"CHECK +" r"\((?P.+)\),?" % kw ) # PARTITION # # punt! self._re_partition = _re_compile(r"(?:.*)(?:SUB)?PARTITION(?:.*)") # Table-level options (COLLATE, ENGINE, etc.) # Do the string options first, since they have quoted # strings we need to get rid of. for option in _options_of_type_string: self._add_option_string(option) for option in ( "ENGINE", "TYPE", "AUTO_INCREMENT", "AVG_ROW_LENGTH", "CHARACTER SET", "DEFAULT CHARSET", "CHECKSUM", "COLLATE", "DELAY_KEY_WRITE", "INSERT_METHOD", "MAX_ROWS", "MIN_ROWS", "PACK_KEYS", "ROW_FORMAT", "KEY_BLOCK_SIZE", ): self._add_option_word(option) self._add_option_regex("UNION", r"\([^\)]+\)") self._add_option_regex("TABLESPACE", r".*? STORAGE DISK") self._add_option_regex( "RAID_TYPE", r"\w+\s+RAID_CHUNKS\s*\=\s*\w+RAID_CHUNKSIZE\s*=\s*\w+", ) _optional_equals = r"(?:\s*(?:=\s*)|\s+)" def _add_option_string(self, directive): regex = r"(?P%s)%s" r"'(?P(?:[^']|'')*?)'(?!')" % ( re.escape(directive), self._optional_equals, ) self._pr_options.append( _pr_compile( regex, lambda v: v.replace("\\\\", "\\").replace("''", "'") ) ) def _add_option_word(self, directive): regex = r"(?P%s)%s" r"(?P\w+)" % ( re.escape(directive), self._optional_equals, ) self._pr_options.append(_pr_compile(regex)) def _add_option_regex(self, directive, regex): regex = r"(?P%s)%s" r"(?P%s)" % ( re.escape(directive), self._optional_equals, regex, ) self._pr_options.append(_pr_compile(regex)) _options_of_type_string = ( "COMMENT", "DATA DIRECTORY", "INDEX DIRECTORY", "PASSWORD", "CONNECTION", ) def _pr_compile(regex, cleanup=None): """Prepare a 2-tuple of compiled regex and callable.""" return (_re_compile(regex), cleanup) def _re_compile(regex): """Compile a string to regex, I and UNICODE.""" return re.compile(regex, re.I | re.UNICODE)