# -*- coding: utf-8 -*- # Copyright (C) 2008 Andi Albrecht, albrecht.andi@gmail.com # # This module is part of python-sqlparse and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php. """SQL Lexer""" # This code is based on the SqlLexer in pygments. # http://pygments.org/ # It's separated from the rest of pygments to increase performance # and to allow some customizations. import re import sys from sqlparse import tokens from sqlparse.keywords import SQL_REGEX from sqlparse.compat import StringIO, string_types, text_type class Lexer(object): encoding = 'utf-8' flags = re.IGNORECASE | re.UNICODE def __init__(self): self._tokens = {} for state in SQL_REGEX: self._tokens[state] = [] for tdef in SQL_REGEX[state]: rex = re.compile(tdef[0], self.flags).match new_state = None if len(tdef) > 2: # Only Multiline comments if tdef[2] == '#pop': new_state = -1 elif tdef[2] in SQL_REGEX: new_state = (tdef[2],) self._tokens[state].append((rex, tdef[1], new_state)) def _decode(self, text): if sys.version_info[0] == 3: if isinstance(text, str): return text if self.encoding == 'guess': try: text = text.decode('utf-8') if text.startswith(u'\ufeff'): text = text[len(u'\ufeff'):] except UnicodeDecodeError: text = text.decode('latin1') else: try: text = text.decode(self.encoding) except UnicodeDecodeError: text = text.decode('unicode-escape') return text def get_tokens(self, text): """ Return an iterable of (tokentype, value) pairs generated from `text`. If `unfiltered` is set to `True`, the filtering mechanism is bypassed even if filters are defined. Also preprocess the text, i.e. expand tabs and strip it if wanted and applies registered filters. """ if isinstance(text, string_types): if sys.version_info[0] < 3 and isinstance(text, text_type): text = StringIO(text.encode('utf-8')) self.encoding = 'utf-8' else: text = StringIO(text) def streamer(): for i, t, v in self.get_tokens_unprocessed(text): yield t, v stream = streamer() return stream def get_tokens_unprocessed(self, stream): """ Split ``text`` into (tokentype, text) pairs. ``stack`` is the inital stack (default: ``['root']``) """ pos = 0 tokendefs = self._tokens # see __call__, pylint:disable=E1101 statestack = ['root', ] statetokens = tokendefs[statestack[-1]] known_names = {} text = stream.read() text = self._decode(text) while 1: for rexmatch, action, new_state in statetokens: m = rexmatch(text, pos) if m: value = m.group() if value in known_names: yield pos, known_names[value], value elif type(action) is tokens._TokenType: yield pos, action, value elif hasattr(action, '__call__'): ttype, value = action(value) known_names[value] = ttype yield pos, ttype, value else: for item in action(self, m): yield item pos = m.end() if new_state is not None: # state transition if isinstance(new_state, tuple): for state in new_state: if state == '#pop': statestack.pop() elif state == '#push': statestack.append(statestack[-1]) elif ( # Ugly hack - multiline-comments # are not stackable state != 'multiline-comments' or not statestack or statestack[-1] != 'multiline-comments' ): statestack.append(state) elif isinstance(new_state, int): # pop del statestack[new_state:] elif new_state == '#push': statestack.append(statestack[-1]) statetokens = tokendefs[statestack[-1]] break else: try: if text[pos] == '\n': # at EOL, reset state to "root" pos += 1 statestack = ['root'] statetokens = tokendefs['root'] yield pos, tokens.Text, u'\n' continue yield pos, tokens.Error, text[pos] pos += 1 except IndexError: break def tokenize(sql, encoding=None): """Tokenize sql. Tokenize *sql* using the :class:`Lexer` and return a 2-tuple stream of ``(token type, value)`` items. """ lexer = Lexer() if encoding is not None: lexer.encoding = encoding return lexer.get_tokens(sql)