1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
#
# Copyright (C) 2009-2020 the sqlparse authors and contributors
# <see AUTHORS file>
#
# This module is part of python-sqlparse and is released under
# the BSD License: https://opensource.org/licenses/BSD-3-Clause
"""SQL Lexer"""
import re
# This code is based on the SqlLexer in pygments.
# http://pygments.org/
# It's separated from the rest of pygments to increase performance
# and to allow some customizations.
from io import TextIOBase
from sqlparse import tokens, keywords
from sqlparse.utils import consume
class Lexer:
"""The Lexer supports configurable syntax.
To add support for additional keywords, use the `add_keywords` method."""
_default_intance = None
# Development notes:
# - This class is prepared to be able to support additional SQL dialects
# in the future by adding additional functions that take the place of
# the function default_initialization()
# - The lexer class uses an explicit singleton behavior with the
# instance-getter method get_default_instance(). This mechanism has
# the advantage that the call signature of the entry-points to the
# sqlparse library are not affected. Also, usage of sqlparse in third
# party code does not need to be adapted. On the other hand, singleton
# behavior is not thread safe, and the current implementation does not
# easily allow for multiple SQL dialects to be parsed in the same
# process. Such behavior can be supported in the future by passing a
# suitably initialized lexer object as an additional parameter to the
# entry-point functions (such as `parse`). Code will need to be written
# to pass down and utilize such an object. The current implementation
# is prepared to support this thread safe approach without the
# default_instance part needing to change interface.
@classmethod
def get_default_instance(cls):
"""Returns the lexer instance used internally
by the sqlparse core functions."""
if cls._default_intance is None:
cls._default_intance = cls()
cls._default_intance.default_initialization()
return cls._default_intance
def default_initialization(self):
"""Initialize the lexer with default dictionaries.
Useful if you need to revert custom syntax settings."""
self.clear()
self.set_SQL_REGEX(keywords.SQL_REGEX)
self.add_keywords(keywords.KEYWORDS_COMMON)
self.add_keywords(keywords.KEYWORDS_ORACLE)
self.add_keywords(keywords.KEYWORDS_PLPGSQL)
self.add_keywords(keywords.KEYWORDS_HQL)
self.add_keywords(keywords.KEYWORDS_MSACCESS)
self.add_keywords(keywords.KEYWORDS)
def clear(self):
"""Clear all syntax configurations.
Useful if you want to load a reduced set of syntax configurations.
After this call, regexps and keyword dictionaries need to be loaded
to make the lexer functional again."""
self._SQL_REGEX = []
self._keywords = []
def set_SQL_REGEX(self, SQL_REGEX):
"""Set the list of regex that will parse the SQL."""
FLAGS = re.IGNORECASE | re.UNICODE
self._SQL_REGEX = [
(re.compile(rx, FLAGS).match, tt)
for rx, tt in SQL_REGEX
]
def add_keywords(self, keywords):
"""Add keyword dictionaries. Keywords are looked up in the same order
that dictionaries were added."""
self._keywords.append(keywords)
def is_keyword(self, value):
"""Checks for a keyword.
If the given value is in one of the KEYWORDS_* dictionary
it's considered a keyword. Otherwise, tokens.Name is returned.
"""
val = value.upper()
for kwdict in self._keywords:
if val in kwdict:
return kwdict[val], value
else:
return tokens.Name, value
def get_tokens(self, text, encoding=None):
"""
Return an iterable of (tokentype, value) pairs generated from
`text`. If `unfiltered` is set to `True`, the filtering mechanism
is bypassed even if filters are defined.
Also preprocess the text, i.e. expand tabs and strip it if
wanted and applies registered filters.
Split ``text`` into (tokentype, text) pairs.
``stack`` is the initial stack (default: ``['root']``)
"""
if isinstance(text, TextIOBase):
text = text.read()
if isinstance(text, str):
pass
elif isinstance(text, bytes):
if encoding:
text = text.decode(encoding)
else:
try:
text = text.decode('utf-8')
except UnicodeDecodeError:
text = text.decode('unicode-escape')
else:
raise TypeError("Expected text or file-like object, got {!r}".
format(type(text)))
iterable = enumerate(text)
for pos, char in iterable:
for rexmatch, action in self._SQL_REGEX:
m = rexmatch(text, pos)
if not m:
continue
elif isinstance(action, tokens._TokenType):
yield action, m.group()
elif action is keywords.PROCESS_AS_KEYWORD:
yield self.is_keyword(m.group())
consume(iterable, m.end() - pos - 1)
break
else:
yield tokens.Error, char
def tokenize(sql, encoding=None):
"""Tokenize sql.
Tokenize *sql* using the :class:`Lexer` and return a 2-tuple stream
of ``(token type, value)`` items.
"""
return Lexer.get_default_instance().get_tokens(sql, encoding)
|