Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sqlparse/sql.py: 29%
330 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1#
2# Copyright (C) 2009-2020 the sqlparse authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of python-sqlparse and is released under
6# the BSD License: https://opensource.org/licenses/BSD-3-Clause
8"""This module contains classes representing syntactical elements of SQL."""
10import re
12from sqlparse import tokens as T
13from sqlparse.utils import imt, remove_quotes
16class NameAliasMixin:
17 """Implements get_real_name and get_alias."""
19 def get_real_name(self):
20 """Returns the real name (object name) of this identifier."""
21 # a.b
22 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
23 return self._get_first_name(dot_idx, real_name=True)
25 def get_alias(self):
26 """Returns the alias for this identifier or ``None``."""
28 # "name AS alias"
29 kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS'))
30 if kw is not None:
31 return self._get_first_name(kw_idx + 1, keywords=True)
33 # "name alias" or "complicated column expression alias"
34 _, ws = self.token_next_by(t=T.Whitespace)
35 if len(self.tokens) > 2 and ws is not None:
36 return self._get_first_name(reverse=True)
39class Token:
40 """Base class for all other classes in this module.
42 It represents a single token and has two instance attributes:
43 ``value`` is the unchanged value of the token and ``ttype`` is
44 the type of the token.
45 """
47 __slots__ = ('value', 'ttype', 'parent', 'normalized', 'is_keyword',
48 'is_group', 'is_whitespace')
50 def __init__(self, ttype, value):
51 value = str(value)
52 self.value = value
53 self.ttype = ttype
54 self.parent = None
55 self.is_group = False
56 self.is_keyword = ttype in T.Keyword
57 self.is_whitespace = self.ttype in T.Whitespace
58 self.normalized = value.upper() if self.is_keyword else value
60 def __str__(self):
61 return self.value
63 # Pending tokenlist __len__ bug fix
64 # def __len__(self):
65 # return len(self.value)
67 def __repr__(self):
68 cls = self._get_repr_name()
69 value = self._get_repr_value()
71 q = '"' if value.startswith("'") and value.endswith("'") else "'"
72 return "<{cls} {q}{value}{q} at 0x{id:2X}>".format(
73 id=id(self), **locals())
75 def _get_repr_name(self):
76 return str(self.ttype).split('.')[-1]
78 def _get_repr_value(self):
79 raw = str(self)
80 if len(raw) > 7:
81 raw = raw[:6] + '...'
82 return re.sub(r'\s+', ' ', raw)
84 def flatten(self):
85 """Resolve subgroups."""
86 yield self
88 def match(self, ttype, values, regex=False):
89 """Checks whether the token matches the given arguments.
91 *ttype* is a token type. If this token doesn't match the given token
92 type.
93 *values* is a list of possible values for this token. The values
94 are OR'ed together so if only one of the values matches ``True``
95 is returned. Except for keyword tokens the comparison is
96 case-sensitive. For convenience it's OK to pass in a single string.
97 If *regex* is ``True`` (default is ``False``) the given values are
98 treated as regular expressions.
99 """
100 type_matched = self.ttype is ttype
101 if not type_matched or values is None:
102 return type_matched
104 if isinstance(values, str):
105 values = (values,)
107 if regex:
108 # TODO: Add test for regex with is_keyboard = false
109 flag = re.IGNORECASE if self.is_keyword else 0
110 values = (re.compile(v, flag) for v in values)
112 for pattern in values:
113 if pattern.search(self.normalized):
114 return True
115 return False
117 if self.is_keyword:
118 values = (v.upper() for v in values)
120 return self.normalized in values
122 def within(self, group_cls):
123 """Returns ``True`` if this token is within *group_cls*.
125 Use this method for example to check if an identifier is within
126 a function: ``t.within(sql.Function)``.
127 """
128 parent = self.parent
129 while parent:
130 if isinstance(parent, group_cls):
131 return True
132 parent = parent.parent
133 return False
135 def is_child_of(self, other):
136 """Returns ``True`` if this token is a direct child of *other*."""
137 return self.parent == other
139 def has_ancestor(self, other):
140 """Returns ``True`` if *other* is in this tokens ancestry."""
141 parent = self.parent
142 while parent:
143 if parent == other:
144 return True
145 parent = parent.parent
146 return False
149class TokenList(Token):
150 """A group of tokens.
152 It has an additional instance attribute ``tokens`` which holds a
153 list of child-tokens.
154 """
156 __slots__ = 'tokens'
158 def __init__(self, tokens=None):
159 self.tokens = tokens or []
160 [setattr(token, 'parent', self) for token in self.tokens]
161 super().__init__(None, str(self))
162 self.is_group = True
164 def __str__(self):
165 return ''.join(token.value for token in self.flatten())
167 # weird bug
168 # def __len__(self):
169 # return len(self.tokens)
171 def __iter__(self):
172 return iter(self.tokens)
174 def __getitem__(self, item):
175 return self.tokens[item]
177 def _get_repr_name(self):
178 return type(self).__name__
180 def _pprint_tree(self, max_depth=None, depth=0, f=None, _pre=''):
181 """Pretty-print the object tree."""
182 token_count = len(self.tokens)
183 for idx, token in enumerate(self.tokens):
184 cls = token._get_repr_name()
185 value = token._get_repr_value()
187 last = idx == (token_count - 1)
188 pre = '`- ' if last else '|- '
190 q = '"' if value.startswith("'") and value.endswith("'") else "'"
191 print("{_pre}{pre}{idx} {cls} {q}{value}{q}"
192 .format(**locals()), file=f)
194 if token.is_group and (max_depth is None or depth < max_depth):
195 parent_pre = ' ' if last else '| '
196 token._pprint_tree(max_depth, depth + 1, f, _pre + parent_pre)
198 def get_token_at_offset(self, offset):
199 """Returns the token that is on position offset."""
200 idx = 0
201 for token in self.flatten():
202 end = idx + len(token.value)
203 if idx <= offset < end:
204 return token
205 idx = end
207 def flatten(self):
208 """Generator yielding ungrouped tokens.
210 This method is recursively called for all child tokens.
211 """
212 for token in self.tokens:
213 if token.is_group:
214 yield from token.flatten()
215 else:
216 yield token
218 def get_sublists(self):
219 for token in self.tokens:
220 if token.is_group:
221 yield token
223 @property
224 def _groupable_tokens(self):
225 return self.tokens
227 def _token_matching(self, funcs, start=0, end=None, reverse=False):
228 """next token that match functions"""
229 if start is None:
230 return None
232 if not isinstance(funcs, (list, tuple)):
233 funcs = (funcs,)
235 if reverse:
236 assert end is None
237 indexes = range(start - 2, -1, -1)
238 else:
239 if end is None:
240 end = len(self.tokens)
241 indexes = range(start, end)
242 for idx in indexes:
243 token = self.tokens[idx]
244 for func in funcs:
245 if func(token):
246 return idx, token
247 return None, None
249 def token_first(self, skip_ws=True, skip_cm=False):
250 """Returns the first child token.
252 If *skip_ws* is ``True`` (the default), whitespace
253 tokens are ignored.
255 if *skip_cm* is ``True`` (default: ``False``), comments are
256 ignored too.
257 """
258 # this on is inconsistent, using Comment instead of T.Comment...
259 def matcher(tk):
260 return not ((skip_ws and tk.is_whitespace)
261 or (skip_cm and imt(tk, t=T.Comment, i=Comment)))
262 return self._token_matching(matcher)[1]
264 def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None):
265 idx += 1
266 return self._token_matching(lambda tk: imt(tk, i, m, t), idx, end)
268 def token_not_matching(self, funcs, idx):
269 funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs
270 funcs = [lambda tk: not func(tk) for func in funcs]
271 return self._token_matching(funcs, idx)
273 def token_matching(self, funcs, idx):
274 return self._token_matching(funcs, idx)[1]
276 def token_prev(self, idx, skip_ws=True, skip_cm=False):
277 """Returns the previous token relative to *idx*.
279 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
280 If *skip_cm* is ``True`` comments are ignored.
281 ``None`` is returned if there's no previous token.
282 """
283 return self.token_next(idx, skip_ws, skip_cm, _reverse=True)
285 # TODO: May need to re-add default value to idx
286 def token_next(self, idx, skip_ws=True, skip_cm=False, _reverse=False):
287 """Returns the next token relative to *idx*.
289 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
290 If *skip_cm* is ``True`` comments are ignored.
291 ``None`` is returned if there's no next token.
292 """
293 if idx is None:
294 return None, None
295 idx += 1 # alot of code usage current pre-compensates for this
297 def matcher(tk):
298 return not ((skip_ws and tk.is_whitespace)
299 or (skip_cm and imt(tk, t=T.Comment, i=Comment)))
300 return self._token_matching(matcher, idx, reverse=_reverse)
302 def token_index(self, token, start=0):
303 """Return list index of token."""
304 start = start if isinstance(start, int) else self.token_index(start)
305 return start + self.tokens[start:].index(token)
307 def group_tokens(self, grp_cls, start, end, include_end=True,
308 extend=False):
309 """Replace tokens by an instance of *grp_cls*."""
310 start_idx = start
311 start = self.tokens[start_idx]
313 end_idx = end + include_end
315 # will be needed later for new group_clauses
316 # while skip_ws and tokens and tokens[-1].is_whitespace:
317 # tokens = tokens[:-1]
319 if extend and isinstance(start, grp_cls):
320 subtokens = self.tokens[start_idx + 1:end_idx]
322 grp = start
323 grp.tokens.extend(subtokens)
324 del self.tokens[start_idx + 1:end_idx]
325 grp.value = str(start)
326 else:
327 subtokens = self.tokens[start_idx:end_idx]
328 grp = grp_cls(subtokens)
329 self.tokens[start_idx:end_idx] = [grp]
330 grp.parent = self
332 for token in subtokens:
333 token.parent = grp
335 return grp
337 def insert_before(self, where, token):
338 """Inserts *token* before *where*."""
339 if not isinstance(where, int):
340 where = self.token_index(where)
341 token.parent = self
342 self.tokens.insert(where, token)
344 def insert_after(self, where, token, skip_ws=True):
345 """Inserts *token* after *where*."""
346 if not isinstance(where, int):
347 where = self.token_index(where)
348 nidx, next_ = self.token_next(where, skip_ws=skip_ws)
349 token.parent = self
350 if next_ is None:
351 self.tokens.append(token)
352 else:
353 self.tokens.insert(nidx, token)
355 def has_alias(self):
356 """Returns ``True`` if an alias is present."""
357 return self.get_alias() is not None
359 def get_alias(self):
360 """Returns the alias for this identifier or ``None``."""
361 return None
363 def get_name(self):
364 """Returns the name of this identifier.
366 This is either it's alias or it's real name. The returned valued can
367 be considered as the name under which the object corresponding to
368 this identifier is known within the current statement.
369 """
370 return self.get_alias() or self.get_real_name()
372 def get_real_name(self):
373 """Returns the real name (object name) of this identifier."""
374 return None
376 def get_parent_name(self):
377 """Return name of the parent object if any.
379 A parent object is identified by the first occurring dot.
380 """
381 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
382 _, prev_ = self.token_prev(dot_idx)
383 return remove_quotes(prev_.value) if prev_ is not None else None
385 def _get_first_name(self, idx=None, reverse=False, keywords=False,
386 real_name=False):
387 """Returns the name of the first token with a name"""
389 tokens = self.tokens[idx:] if idx else self.tokens
390 tokens = reversed(tokens) if reverse else tokens
391 types = [T.Name, T.Wildcard, T.String.Symbol]
393 if keywords:
394 types.append(T.Keyword)
396 for token in tokens:
397 if token.ttype in types:
398 return remove_quotes(token.value)
399 elif isinstance(token, (Identifier, Function)):
400 return token.get_real_name() if real_name else token.get_name()
403class Statement(TokenList):
404 """Represents a SQL statement."""
406 def get_type(self):
407 """Returns the type of a statement.
409 The returned value is a string holding an upper-cased reprint of
410 the first DML or DDL keyword. If the first token in this group
411 isn't a DML or DDL keyword "UNKNOWN" is returned.
413 Whitespaces and comments at the beginning of the statement
414 are ignored.
415 """
416 first_token = self.token_first(skip_cm=True)
417 if first_token is None:
418 # An "empty" statement that either has not tokens at all
419 # or only whitespace tokens.
420 return 'UNKNOWN'
422 elif first_token.ttype in (T.Keyword.DML, T.Keyword.DDL):
423 return first_token.normalized
425 elif first_token.ttype == T.Keyword.CTE:
426 # The WITH keyword should be followed by either an Identifier or
427 # an IdentifierList containing the CTE definitions; the actual
428 # DML keyword (e.g. SELECT, INSERT) will follow next.
429 fidx = self.token_index(first_token)
430 tidx, token = self.token_next(fidx, skip_ws=True)
431 if isinstance(token, (Identifier, IdentifierList)):
432 _, dml_keyword = self.token_next(tidx, skip_ws=True)
434 if dml_keyword is not None \
435 and dml_keyword.ttype == T.Keyword.DML:
436 return dml_keyword.normalized
438 # Hmm, probably invalid syntax, so return unknown.
439 return 'UNKNOWN'
442class Identifier(NameAliasMixin, TokenList):
443 """Represents an identifier.
445 Identifiers may have aliases or typecasts.
446 """
448 def is_wildcard(self):
449 """Return ``True`` if this identifier contains a wildcard."""
450 _, token = self.token_next_by(t=T.Wildcard)
451 return token is not None
453 def get_typecast(self):
454 """Returns the typecast or ``None`` of this object as a string."""
455 midx, marker = self.token_next_by(m=(T.Punctuation, '::'))
456 nidx, next_ = self.token_next(midx, skip_ws=False)
457 return next_.value if next_ else None
459 def get_ordering(self):
460 """Returns the ordering or ``None`` as uppercase string."""
461 _, ordering = self.token_next_by(t=T.Keyword.Order)
462 return ordering.normalized if ordering else None
464 def get_array_indices(self):
465 """Returns an iterator of index token lists"""
467 for token in self.tokens:
468 if isinstance(token, SquareBrackets):
469 # Use [1:-1] index to discard the square brackets
470 yield token.tokens[1:-1]
473class IdentifierList(TokenList):
474 """A list of :class:`~sqlparse.sql.Identifier`\'s."""
476 def get_identifiers(self):
477 """Returns the identifiers.
479 Whitespaces and punctuations are not included in this generator.
480 """
481 for token in self.tokens:
482 if not (token.is_whitespace or token.match(T.Punctuation, ',')):
483 yield token
486class TypedLiteral(TokenList):
487 """A typed literal, such as "date '2001-09-28'" or "interval '2 hours'"."""
488 M_OPEN = [(T.Name.Builtin, None), (T.Keyword, "TIMESTAMP")]
489 M_CLOSE = T.String.Single, None
490 M_EXTEND = T.Keyword, ("DAY", "HOUR", "MINUTE", "MONTH", "SECOND", "YEAR")
493class Parenthesis(TokenList):
494 """Tokens between parenthesis."""
495 M_OPEN = T.Punctuation, '('
496 M_CLOSE = T.Punctuation, ')'
498 @property
499 def _groupable_tokens(self):
500 return self.tokens[1:-1]
503class SquareBrackets(TokenList):
504 """Tokens between square brackets"""
505 M_OPEN = T.Punctuation, '['
506 M_CLOSE = T.Punctuation, ']'
508 @property
509 def _groupable_tokens(self):
510 return self.tokens[1:-1]
513class Assignment(TokenList):
514 """An assignment like 'var := val;'"""
517class If(TokenList):
518 """An 'if' clause with possible 'else if' or 'else' parts."""
519 M_OPEN = T.Keyword, 'IF'
520 M_CLOSE = T.Keyword, 'END IF'
523class For(TokenList):
524 """A 'FOR' loop."""
525 M_OPEN = T.Keyword, ('FOR', 'FOREACH')
526 M_CLOSE = T.Keyword, 'END LOOP'
529class Comparison(TokenList):
530 """A comparison used for example in WHERE clauses."""
532 @property
533 def left(self):
534 return self.tokens[0]
536 @property
537 def right(self):
538 return self.tokens[-1]
541class Comment(TokenList):
542 """A comment."""
544 def is_multiline(self):
545 return self.tokens and self.tokens[0].ttype == T.Comment.Multiline
548class Where(TokenList):
549 """A WHERE clause."""
550 M_OPEN = T.Keyword, 'WHERE'
551 M_CLOSE = T.Keyword, (
552 'ORDER BY', 'GROUP BY', 'LIMIT', 'UNION', 'UNION ALL', 'EXCEPT',
553 'HAVING', 'RETURNING', 'INTO')
556class Having(TokenList):
557 """A HAVING clause."""
558 M_OPEN = T.Keyword, 'HAVING'
559 M_CLOSE = T.Keyword, ('ORDER BY', 'LIMIT')
562class Case(TokenList):
563 """A CASE statement with one or more WHEN and possibly an ELSE part."""
564 M_OPEN = T.Keyword, 'CASE'
565 M_CLOSE = T.Keyword, 'END'
567 def get_cases(self, skip_ws=False):
568 """Returns a list of 2-tuples (condition, value).
570 If an ELSE exists condition is None.
571 """
572 CONDITION = 1
573 VALUE = 2
575 ret = []
576 mode = CONDITION
578 for token in self.tokens:
579 # Set mode from the current statement
580 if token.match(T.Keyword, 'CASE'):
581 continue
583 elif skip_ws and token.ttype in T.Whitespace:
584 continue
586 elif token.match(T.Keyword, 'WHEN'):
587 ret.append(([], []))
588 mode = CONDITION
590 elif token.match(T.Keyword, 'THEN'):
591 mode = VALUE
593 elif token.match(T.Keyword, 'ELSE'):
594 ret.append((None, []))
595 mode = VALUE
597 elif token.match(T.Keyword, 'END'):
598 mode = None
600 # First condition without preceding WHEN
601 if mode and not ret:
602 ret.append(([], []))
604 # Append token depending of the current mode
605 if mode == CONDITION:
606 ret[-1][0].append(token)
608 elif mode == VALUE:
609 ret[-1][1].append(token)
611 # Return cases list
612 return ret
615class Function(NameAliasMixin, TokenList):
616 """A function or procedure call."""
618 def get_parameters(self):
619 """Return a list of parameters."""
620 parenthesis = self.tokens[-1]
621 for token in parenthesis.tokens:
622 if isinstance(token, IdentifierList):
623 return token.get_identifiers()
624 elif imt(token, i=(Function, Identifier), t=T.Literal):
625 return [token, ]
626 return []
629class Begin(TokenList):
630 """A BEGIN/END block."""
631 M_OPEN = T.Keyword, 'BEGIN'
632 M_CLOSE = T.Keyword, 'END'
635class Operation(TokenList):
636 """Grouping of operations"""
639class Values(TokenList):
640 """Grouping of values"""
643class Command(TokenList):
644 """Grouping of CLI commands."""