summaryrefslogtreecommitdiff
path: root/sqlparse/sql.py
diff options
context:
space:
mode:
Diffstat (limited to 'sqlparse/sql.py')
-rw-r--r--sqlparse/sql.py162
1 files changed, 95 insertions, 67 deletions
diff --git a/sqlparse/sql.py b/sqlparse/sql.py
index 52b3bf1..9656390 100644
--- a/sqlparse/sql.py
+++ b/sqlparse/sql.py
@@ -209,109 +209,138 @@ class TokenList(Token):
if start is None:
return None
- if not isinstance(start, int):
- start = self.token_index(start) + 1
-
if not isinstance(funcs, (list, tuple)):
funcs = (funcs,)
if reverse:
- iterable = reversed(self.tokens[end:start - 1])
+ assert end is None
+ for idx in range(start - 2, -1, -1):
+ token = self.tokens[idx]
+ for func in funcs:
+ if func(token):
+ return idx, token
else:
- iterable = self.tokens[start:end]
+ for idx, token in enumerate(self.tokens[start:end], start=start):
+ for func in funcs:
+ if func(token):
+ return idx, token
+ return None, None
+
+ def token_first(self, skip_ws=True, skip_cm=False):
+ """Returns the first child token.
- for token in iterable:
- for func in funcs:
- if func(token):
- return token
+ If *skip_ws* is ``True`` (the default), whitespace
+ tokens are ignored.
- def token_next_by(self, i=None, m=None, t=None, idx=0, end=None):
+ if *skip_cm* is ``True`` (default: ``False``), comments are
+ ignored too.
+ """
+ # this on is inconsistent, using Comment instead of T.Comment...
+ funcs = lambda tk: not ((skip_ws and tk.is_whitespace()) or
+ (skip_cm and imt(tk, t=T.Comment, i=Comment)))
+ return self._token_matching(funcs)[1]
+
+ def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None):
funcs = lambda tk: imt(tk, i, m, t)
+ idx += 1
return self._token_matching(funcs, idx, end)
- def token_not_matching(self, idx, funcs):
+ def token_not_matching(self, funcs, idx):
funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs
funcs = [lambda tk: not func(tk) for func in funcs]
return self._token_matching(funcs, idx)
- def token_matching(self, idx, funcs):
- return self._token_matching(funcs, idx)
+ def token_matching(self, funcs, idx):
+ return self._token_matching(funcs, idx)[1]
- def token_prev(self, idx=0, skip_ws=True, skip_cm=False):
+ def token_prev(self, idx, skip_ws=True, skip_cm=False):
"""Returns the previous token relative to *idx*.
If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
``None`` is returned if there's no previous token.
"""
+ if idx is None:
+ return None, None
+ idx += 1 # alot of code usage current pre-compensates for this
funcs = lambda tk: not ((skip_ws and tk.is_whitespace()) or
(skip_cm and imt(tk, t=T.Comment, i=Comment)))
return self._token_matching(funcs, idx, reverse=True)
- def token_next(self, idx=0, skip_ws=True, skip_cm=False):
+ # TODO: May need to implement skip_cm for upstream changes.
+ # TODO: May need to re-add default value to idx
+ def token_next(self, idx, skip_ws=True, skip_cm=False):
"""Returns the next token relative to *idx*.
- If called with idx = 0. Returns the first child token.
If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
- If *skip_cm* is ``True`` (default: ``False``), comments are ignored.
``None`` is returned if there's no next token.
"""
- funcs = lambda tk: not ((skip_ws and tk.is_whitespace()) or
- (skip_cm and imt(tk, t=T.Comment, i=Comment)))
- return self._token_matching(funcs, idx)
+ if idx is None:
+ return None, None
+ idx += 1 # alot of code usage current pre-compensates for this
+ try:
+ if not skip_ws:
+ return idx, self.tokens[idx]
+ else:
+ while True:
+ token = self.tokens[idx]
+ if not token.is_whitespace():
+ return idx, token
+ idx += 1
+ except IndexError:
+ return None, None
def token_index(self, token, start=0):
"""Return list index of token."""
start = start if isinstance(start, int) else self.token_index(start)
return start + self.tokens[start:].index(token)
- def tokens_between(self, start, end, include_end=True):
- """Return all tokens between (and including) start and end.
-
- If *include_end* is ``False`` (default is ``True``) the end token
- is excluded.
- """
- start_idx = self.token_index(start)
- end_idx = include_end + self.token_index(end)
- return self.tokens[start_idx:end_idx]
-
- def group_tokens(self, grp_cls, tokens, skip_ws=False, extend=False):
+ def group_tokens(self, grp_cls, start, end, include_end=True,
+ extend=False):
"""Replace tokens by an instance of *grp_cls*."""
+ start_idx = start
+ start = self.tokens[start_idx]
- while skip_ws and tokens and tokens[-1].is_whitespace():
- tokens = tokens[:-1]
+ end_idx = end + include_end
- left = tokens[0]
- idx = self.token_index(left)
+ # will be needed later for new group_clauses
+ # while skip_ws and tokens and tokens[-1].is_whitespace():
+ # tokens = tokens[:-1]
- if extend and isinstance(left, grp_cls):
- grp = left
- grp.tokens.extend(tokens[1:])
+ if extend and isinstance(start, grp_cls):
+ subtokens = self.tokens[start_idx + 1:end_idx]
+
+ grp = start
+ grp.tokens.extend(subtokens)
+ del self.tokens[start_idx + 1:end_idx]
+ grp.value = text_type(start)
else:
- grp = grp_cls(tokens)
+ subtokens = self.tokens[start_idx:end_idx]
+ grp = grp_cls(subtokens)
+ self.tokens[start_idx:end_idx] = [grp]
+ grp.parent = self
- for token in tokens:
+ for token in subtokens:
token.parent = grp
- # Improve performance. LOOP(list.remove()) is O(n**2) operation
- self.tokens = [token for token in self.tokens if token not in tokens]
-
- self.tokens.insert(idx, grp)
- grp.parent = self
return grp
def insert_before(self, where, token):
"""Inserts *token* before *where*."""
+ if not isinstance(where, int):
+ where = self.token_index(where)
token.parent = self
- self.tokens.insert(self.token_index(where), token)
+ self.tokens.insert(where, token)
def insert_after(self, where, token, skip_ws=True):
"""Inserts *token* after *where*."""
- next_token = self.token_next(where, skip_ws=skip_ws)
+ if not isinstance(where, int):
+ where = self.token_index(where)
+ nidx, next_ = self.token_next(where, skip_ws=skip_ws)
token.parent = self
- if next_token is None:
+ if next_ is None:
self.tokens.append(token)
else:
- self.insert_before(next_token, token)
+ self.tokens.insert(nidx, token)
def has_alias(self):
"""Returns ``True`` if an alias is present."""
@@ -321,12 +350,13 @@ class TokenList(Token):
"""Returns the alias for this identifier or ``None``."""
# "name AS alias"
- kw = self.token_next_by(m=(T.Keyword, 'AS'))
+ kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS'))
if kw is not None:
- return self._get_first_name(kw, keywords=True)
+ return self._get_first_name(kw_idx + 1, keywords=True)
# "name alias" or "complicated column expression alias"
- if len(self.tokens) > 2 and self.token_next_by(t=T.Whitespace):
+ _, ws = self.token_next_by(t=T.Whitespace)
+ if len(self.tokens) > 2 and ws is not None:
return self._get_first_name(reverse=True)
def get_name(self):
@@ -341,24 +371,21 @@ class TokenList(Token):
def get_real_name(self):
"""Returns the real name (object name) of this identifier."""
# a.b
- dot = self.token_next_by(m=(T.Punctuation, '.'))
- return self._get_first_name(dot)
+ dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
+ return self._get_first_name(dot_idx)
def get_parent_name(self):
"""Return name of the parent object if any.
A parent object is identified by the first occuring dot.
"""
- dot = self.token_next_by(m=(T.Punctuation, '.'))
- prev_ = self.token_prev(dot)
+ dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
+ _, prev_ = self.token_prev(dot_idx)
return remove_quotes(prev_.value) if prev_ is not None else None
def _get_first_name(self, idx=None, reverse=False, keywords=False):
"""Returns the name of the first token with a name"""
- if idx and not isinstance(idx, int):
- idx = self.token_index(idx) + 1
-
tokens = self.tokens[idx:] if idx else self.tokens
tokens = reversed(tokens) if reverse else tokens
types = [T.Name, T.Wildcard, T.String.Symbol]
@@ -386,7 +413,7 @@ class Statement(TokenList):
Whitespaces and comments at the beginning of the statement
are ignored.
"""
- first_token = self.token_next(skip_cm=True)
+ first_token = self.token_first(skip_cm=True)
if first_token is None:
# An "empty" statement that either has not tokens at all
# or only whitespace tokens.
@@ -399,9 +426,10 @@ class Statement(TokenList):
# The WITH keyword should be followed by either an Identifier or
# an IdentifierList containing the CTE definitions; the actual
# DML keyword (e.g. SELECT, INSERT) will follow next.
- token = self.token_next(first_token, skip_ws=True)
+ fidx = self.token_index(first_token)
+ tidx, token = self.token_next(fidx, skip_ws=True)
if isinstance(token, (Identifier, IdentifierList)):
- dml_keyword = self.token_next(token, skip_ws=True)
+ _, dml_keyword = self.token_next(tidx, skip_ws=True)
if dml_keyword.ttype == T.Keyword.DML:
return dml_keyword.normalized
@@ -418,18 +446,18 @@ class Identifier(TokenList):
def is_wildcard(self):
"""Return ``True`` if this identifier contains a wildcard."""
- token = self.token_next_by(t=T.Wildcard)
+ _, token = self.token_next_by(t=T.Wildcard)
return token is not None
def get_typecast(self):
"""Returns the typecast or ``None`` of this object as a string."""
- marker = self.token_next_by(m=(T.Punctuation, '::'))
- next_ = self.token_next(marker, skip_ws=False)
+ midx, marker = self.token_next_by(m=(T.Punctuation, '::'))
+ nidx, next_ = self.token_next(midx, skip_ws=False)
return next_.value if next_ else None
def get_ordering(self):
"""Returns the ordering or ``None`` as uppercase string."""
- ordering = self.token_next_by(t=T.Keyword.Order)
+ _, ordering = self.token_next_by(t=T.Keyword.Order)
return ordering.normalized if ordering else None
def get_array_indices(self):
@@ -576,7 +604,7 @@ class Function(TokenList):
"""Return a list of parameters."""
parenthesis = self.tokens[-1]
for token in parenthesis.tokens:
- if imt(token, i=IdentifierList):
+ if isinstance(token, IdentifierList):
return token.get_identifiers()
elif imt(token, i=(Function, Identifier), t=T.Literal):
return [token, ]