Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sqlparse/sql.py: 29%

330 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# 

2# Copyright (C) 2009-2020 the sqlparse authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of python-sqlparse and is released under 

6# the BSD License: https://opensource.org/licenses/BSD-3-Clause 

7 

8"""This module contains classes representing syntactical elements of SQL.""" 

9 

10import re 

11 

12from sqlparse import tokens as T 

13from sqlparse.utils import imt, remove_quotes 

14 

15 

16class NameAliasMixin: 

17 """Implements get_real_name and get_alias.""" 

18 

19 def get_real_name(self): 

20 """Returns the real name (object name) of this identifier.""" 

21 # a.b 

22 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.')) 

23 return self._get_first_name(dot_idx, real_name=True) 

24 

25 def get_alias(self): 

26 """Returns the alias for this identifier or ``None``.""" 

27 

28 # "name AS alias" 

29 kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS')) 

30 if kw is not None: 

31 return self._get_first_name(kw_idx + 1, keywords=True) 

32 

33 # "name alias" or "complicated column expression alias" 

34 _, ws = self.token_next_by(t=T.Whitespace) 

35 if len(self.tokens) > 2 and ws is not None: 

36 return self._get_first_name(reverse=True) 

37 

38 

39class Token: 

40 """Base class for all other classes in this module. 

41 

42 It represents a single token and has two instance attributes: 

43 ``value`` is the unchanged value of the token and ``ttype`` is 

44 the type of the token. 

45 """ 

46 

47 __slots__ = ('value', 'ttype', 'parent', 'normalized', 'is_keyword', 

48 'is_group', 'is_whitespace') 

49 

50 def __init__(self, ttype, value): 

51 value = str(value) 

52 self.value = value 

53 self.ttype = ttype 

54 self.parent = None 

55 self.is_group = False 

56 self.is_keyword = ttype in T.Keyword 

57 self.is_whitespace = self.ttype in T.Whitespace 

58 self.normalized = value.upper() if self.is_keyword else value 

59 

60 def __str__(self): 

61 return self.value 

62 

63 # Pending tokenlist __len__ bug fix 

64 # def __len__(self): 

65 # return len(self.value) 

66 

67 def __repr__(self): 

68 cls = self._get_repr_name() 

69 value = self._get_repr_value() 

70 

71 q = '"' if value.startswith("'") and value.endswith("'") else "'" 

72 return "<{cls} {q}{value}{q} at 0x{id:2X}>".format( 

73 id=id(self), **locals()) 

74 

75 def _get_repr_name(self): 

76 return str(self.ttype).split('.')[-1] 

77 

78 def _get_repr_value(self): 

79 raw = str(self) 

80 if len(raw) > 7: 

81 raw = raw[:6] + '...' 

82 return re.sub(r'\s+', ' ', raw) 

83 

84 def flatten(self): 

85 """Resolve subgroups.""" 

86 yield self 

87 

88 def match(self, ttype, values, regex=False): 

89 """Checks whether the token matches the given arguments. 

90 

91 *ttype* is a token type. If this token doesn't match the given token 

92 type. 

93 *values* is a list of possible values for this token. The values 

94 are OR'ed together so if only one of the values matches ``True`` 

95 is returned. Except for keyword tokens the comparison is 

96 case-sensitive. For convenience it's OK to pass in a single string. 

97 If *regex* is ``True`` (default is ``False``) the given values are 

98 treated as regular expressions. 

99 """ 

100 type_matched = self.ttype is ttype 

101 if not type_matched or values is None: 

102 return type_matched 

103 

104 if isinstance(values, str): 

105 values = (values,) 

106 

107 if regex: 

108 # TODO: Add test for regex with is_keyboard = false 

109 flag = re.IGNORECASE if self.is_keyword else 0 

110 values = (re.compile(v, flag) for v in values) 

111 

112 for pattern in values: 

113 if pattern.search(self.normalized): 

114 return True 

115 return False 

116 

117 if self.is_keyword: 

118 values = (v.upper() for v in values) 

119 

120 return self.normalized in values 

121 

122 def within(self, group_cls): 

123 """Returns ``True`` if this token is within *group_cls*. 

124 

125 Use this method for example to check if an identifier is within 

126 a function: ``t.within(sql.Function)``. 

127 """ 

128 parent = self.parent 

129 while parent: 

130 if isinstance(parent, group_cls): 

131 return True 

132 parent = parent.parent 

133 return False 

134 

135 def is_child_of(self, other): 

136 """Returns ``True`` if this token is a direct child of *other*.""" 

137 return self.parent == other 

138 

139 def has_ancestor(self, other): 

140 """Returns ``True`` if *other* is in this tokens ancestry.""" 

141 parent = self.parent 

142 while parent: 

143 if parent == other: 

144 return True 

145 parent = parent.parent 

146 return False 

147 

148 

149class TokenList(Token): 

150 """A group of tokens. 

151 

152 It has an additional instance attribute ``tokens`` which holds a 

153 list of child-tokens. 

154 """ 

155 

156 __slots__ = 'tokens' 

157 

158 def __init__(self, tokens=None): 

159 self.tokens = tokens or [] 

160 [setattr(token, 'parent', self) for token in self.tokens] 

161 super().__init__(None, str(self)) 

162 self.is_group = True 

163 

164 def __str__(self): 

165 return ''.join(token.value for token in self.flatten()) 

166 

167 # weird bug 

168 # def __len__(self): 

169 # return len(self.tokens) 

170 

171 def __iter__(self): 

172 return iter(self.tokens) 

173 

174 def __getitem__(self, item): 

175 return self.tokens[item] 

176 

177 def _get_repr_name(self): 

178 return type(self).__name__ 

179 

180 def _pprint_tree(self, max_depth=None, depth=0, f=None, _pre=''): 

181 """Pretty-print the object tree.""" 

182 token_count = len(self.tokens) 

183 for idx, token in enumerate(self.tokens): 

184 cls = token._get_repr_name() 

185 value = token._get_repr_value() 

186 

187 last = idx == (token_count - 1) 

188 pre = '`- ' if last else '|- ' 

189 

190 q = '"' if value.startswith("'") and value.endswith("'") else "'" 

191 print("{_pre}{pre}{idx} {cls} {q}{value}{q}" 

192 .format(**locals()), file=f) 

193 

194 if token.is_group and (max_depth is None or depth < max_depth): 

195 parent_pre = ' ' if last else '| ' 

196 token._pprint_tree(max_depth, depth + 1, f, _pre + parent_pre) 

197 

198 def get_token_at_offset(self, offset): 

199 """Returns the token that is on position offset.""" 

200 idx = 0 

201 for token in self.flatten(): 

202 end = idx + len(token.value) 

203 if idx <= offset < end: 

204 return token 

205 idx = end 

206 

207 def flatten(self): 

208 """Generator yielding ungrouped tokens. 

209 

210 This method is recursively called for all child tokens. 

211 """ 

212 for token in self.tokens: 

213 if token.is_group: 

214 yield from token.flatten() 

215 else: 

216 yield token 

217 

218 def get_sublists(self): 

219 for token in self.tokens: 

220 if token.is_group: 

221 yield token 

222 

223 @property 

224 def _groupable_tokens(self): 

225 return self.tokens 

226 

227 def _token_matching(self, funcs, start=0, end=None, reverse=False): 

228 """next token that match functions""" 

229 if start is None: 

230 return None 

231 

232 if not isinstance(funcs, (list, tuple)): 

233 funcs = (funcs,) 

234 

235 if reverse: 

236 assert end is None 

237 indexes = range(start - 2, -1, -1) 

238 else: 

239 if end is None: 

240 end = len(self.tokens) 

241 indexes = range(start, end) 

242 for idx in indexes: 

243 token = self.tokens[idx] 

244 for func in funcs: 

245 if func(token): 

246 return idx, token 

247 return None, None 

248 

249 def token_first(self, skip_ws=True, skip_cm=False): 

250 """Returns the first child token. 

251 

252 If *skip_ws* is ``True`` (the default), whitespace 

253 tokens are ignored. 

254 

255 if *skip_cm* is ``True`` (default: ``False``), comments are 

256 ignored too. 

257 """ 

258 # this on is inconsistent, using Comment instead of T.Comment... 

259 def matcher(tk): 

260 return not ((skip_ws and tk.is_whitespace) 

261 or (skip_cm and imt(tk, t=T.Comment, i=Comment))) 

262 return self._token_matching(matcher)[1] 

263 

264 def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None): 

265 idx += 1 

266 return self._token_matching(lambda tk: imt(tk, i, m, t), idx, end) 

267 

268 def token_not_matching(self, funcs, idx): 

269 funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs 

270 funcs = [lambda tk: not func(tk) for func in funcs] 

271 return self._token_matching(funcs, idx) 

272 

273 def token_matching(self, funcs, idx): 

274 return self._token_matching(funcs, idx)[1] 

275 

276 def token_prev(self, idx, skip_ws=True, skip_cm=False): 

277 """Returns the previous token relative to *idx*. 

278 

279 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored. 

280 If *skip_cm* is ``True`` comments are ignored. 

281 ``None`` is returned if there's no previous token. 

282 """ 

283 return self.token_next(idx, skip_ws, skip_cm, _reverse=True) 

284 

285 # TODO: May need to re-add default value to idx 

286 def token_next(self, idx, skip_ws=True, skip_cm=False, _reverse=False): 

287 """Returns the next token relative to *idx*. 

288 

289 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored. 

290 If *skip_cm* is ``True`` comments are ignored. 

291 ``None`` is returned if there's no next token. 

292 """ 

293 if idx is None: 

294 return None, None 

295 idx += 1 # alot of code usage current pre-compensates for this 

296 

297 def matcher(tk): 

298 return not ((skip_ws and tk.is_whitespace) 

299 or (skip_cm and imt(tk, t=T.Comment, i=Comment))) 

300 return self._token_matching(matcher, idx, reverse=_reverse) 

301 

302 def token_index(self, token, start=0): 

303 """Return list index of token.""" 

304 start = start if isinstance(start, int) else self.token_index(start) 

305 return start + self.tokens[start:].index(token) 

306 

307 def group_tokens(self, grp_cls, start, end, include_end=True, 

308 extend=False): 

309 """Replace tokens by an instance of *grp_cls*.""" 

310 start_idx = start 

311 start = self.tokens[start_idx] 

312 

313 end_idx = end + include_end 

314 

315 # will be needed later for new group_clauses 

316 # while skip_ws and tokens and tokens[-1].is_whitespace: 

317 # tokens = tokens[:-1] 

318 

319 if extend and isinstance(start, grp_cls): 

320 subtokens = self.tokens[start_idx + 1:end_idx] 

321 

322 grp = start 

323 grp.tokens.extend(subtokens) 

324 del self.tokens[start_idx + 1:end_idx] 

325 grp.value = str(start) 

326 else: 

327 subtokens = self.tokens[start_idx:end_idx] 

328 grp = grp_cls(subtokens) 

329 self.tokens[start_idx:end_idx] = [grp] 

330 grp.parent = self 

331 

332 for token in subtokens: 

333 token.parent = grp 

334 

335 return grp 

336 

337 def insert_before(self, where, token): 

338 """Inserts *token* before *where*.""" 

339 if not isinstance(where, int): 

340 where = self.token_index(where) 

341 token.parent = self 

342 self.tokens.insert(where, token) 

343 

344 def insert_after(self, where, token, skip_ws=True): 

345 """Inserts *token* after *where*.""" 

346 if not isinstance(where, int): 

347 where = self.token_index(where) 

348 nidx, next_ = self.token_next(where, skip_ws=skip_ws) 

349 token.parent = self 

350 if next_ is None: 

351 self.tokens.append(token) 

352 else: 

353 self.tokens.insert(nidx, token) 

354 

355 def has_alias(self): 

356 """Returns ``True`` if an alias is present.""" 

357 return self.get_alias() is not None 

358 

359 def get_alias(self): 

360 """Returns the alias for this identifier or ``None``.""" 

361 return None 

362 

363 def get_name(self): 

364 """Returns the name of this identifier. 

365 

366 This is either it's alias or it's real name. The returned valued can 

367 be considered as the name under which the object corresponding to 

368 this identifier is known within the current statement. 

369 """ 

370 return self.get_alias() or self.get_real_name() 

371 

372 def get_real_name(self): 

373 """Returns the real name (object name) of this identifier.""" 

374 return None 

375 

376 def get_parent_name(self): 

377 """Return name of the parent object if any. 

378 

379 A parent object is identified by the first occurring dot. 

380 """ 

381 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.')) 

382 _, prev_ = self.token_prev(dot_idx) 

383 return remove_quotes(prev_.value) if prev_ is not None else None 

384 

385 def _get_first_name(self, idx=None, reverse=False, keywords=False, 

386 real_name=False): 

387 """Returns the name of the first token with a name""" 

388 

389 tokens = self.tokens[idx:] if idx else self.tokens 

390 tokens = reversed(tokens) if reverse else tokens 

391 types = [T.Name, T.Wildcard, T.String.Symbol] 

392 

393 if keywords: 

394 types.append(T.Keyword) 

395 

396 for token in tokens: 

397 if token.ttype in types: 

398 return remove_quotes(token.value) 

399 elif isinstance(token, (Identifier, Function)): 

400 return token.get_real_name() if real_name else token.get_name() 

401 

402 

403class Statement(TokenList): 

404 """Represents a SQL statement.""" 

405 

406 def get_type(self): 

407 """Returns the type of a statement. 

408 

409 The returned value is a string holding an upper-cased reprint of 

410 the first DML or DDL keyword. If the first token in this group 

411 isn't a DML or DDL keyword "UNKNOWN" is returned. 

412 

413 Whitespaces and comments at the beginning of the statement 

414 are ignored. 

415 """ 

416 first_token = self.token_first(skip_cm=True) 

417 if first_token is None: 

418 # An "empty" statement that either has not tokens at all 

419 # or only whitespace tokens. 

420 return 'UNKNOWN' 

421 

422 elif first_token.ttype in (T.Keyword.DML, T.Keyword.DDL): 

423 return first_token.normalized 

424 

425 elif first_token.ttype == T.Keyword.CTE: 

426 # The WITH keyword should be followed by either an Identifier or 

427 # an IdentifierList containing the CTE definitions; the actual 

428 # DML keyword (e.g. SELECT, INSERT) will follow next. 

429 fidx = self.token_index(first_token) 

430 tidx, token = self.token_next(fidx, skip_ws=True) 

431 if isinstance(token, (Identifier, IdentifierList)): 

432 _, dml_keyword = self.token_next(tidx, skip_ws=True) 

433 

434 if dml_keyword is not None \ 

435 and dml_keyword.ttype == T.Keyword.DML: 

436 return dml_keyword.normalized 

437 

438 # Hmm, probably invalid syntax, so return unknown. 

439 return 'UNKNOWN' 

440 

441 

442class Identifier(NameAliasMixin, TokenList): 

443 """Represents an identifier. 

444 

445 Identifiers may have aliases or typecasts. 

446 """ 

447 

448 def is_wildcard(self): 

449 """Return ``True`` if this identifier contains a wildcard.""" 

450 _, token = self.token_next_by(t=T.Wildcard) 

451 return token is not None 

452 

453 def get_typecast(self): 

454 """Returns the typecast or ``None`` of this object as a string.""" 

455 midx, marker = self.token_next_by(m=(T.Punctuation, '::')) 

456 nidx, next_ = self.token_next(midx, skip_ws=False) 

457 return next_.value if next_ else None 

458 

459 def get_ordering(self): 

460 """Returns the ordering or ``None`` as uppercase string.""" 

461 _, ordering = self.token_next_by(t=T.Keyword.Order) 

462 return ordering.normalized if ordering else None 

463 

464 def get_array_indices(self): 

465 """Returns an iterator of index token lists""" 

466 

467 for token in self.tokens: 

468 if isinstance(token, SquareBrackets): 

469 # Use [1:-1] index to discard the square brackets 

470 yield token.tokens[1:-1] 

471 

472 

473class IdentifierList(TokenList): 

474 """A list of :class:`~sqlparse.sql.Identifier`\'s.""" 

475 

476 def get_identifiers(self): 

477 """Returns the identifiers. 

478 

479 Whitespaces and punctuations are not included in this generator. 

480 """ 

481 for token in self.tokens: 

482 if not (token.is_whitespace or token.match(T.Punctuation, ',')): 

483 yield token 

484 

485 

486class TypedLiteral(TokenList): 

487 """A typed literal, such as "date '2001-09-28'" or "interval '2 hours'".""" 

488 M_OPEN = [(T.Name.Builtin, None), (T.Keyword, "TIMESTAMP")] 

489 M_CLOSE = T.String.Single, None 

490 M_EXTEND = T.Keyword, ("DAY", "HOUR", "MINUTE", "MONTH", "SECOND", "YEAR") 

491 

492 

493class Parenthesis(TokenList): 

494 """Tokens between parenthesis.""" 

495 M_OPEN = T.Punctuation, '(' 

496 M_CLOSE = T.Punctuation, ')' 

497 

498 @property 

499 def _groupable_tokens(self): 

500 return self.tokens[1:-1] 

501 

502 

503class SquareBrackets(TokenList): 

504 """Tokens between square brackets""" 

505 M_OPEN = T.Punctuation, '[' 

506 M_CLOSE = T.Punctuation, ']' 

507 

508 @property 

509 def _groupable_tokens(self): 

510 return self.tokens[1:-1] 

511 

512 

513class Assignment(TokenList): 

514 """An assignment like 'var := val;'""" 

515 

516 

517class If(TokenList): 

518 """An 'if' clause with possible 'else if' or 'else' parts.""" 

519 M_OPEN = T.Keyword, 'IF' 

520 M_CLOSE = T.Keyword, 'END IF' 

521 

522 

523class For(TokenList): 

524 """A 'FOR' loop.""" 

525 M_OPEN = T.Keyword, ('FOR', 'FOREACH') 

526 M_CLOSE = T.Keyword, 'END LOOP' 

527 

528 

529class Comparison(TokenList): 

530 """A comparison used for example in WHERE clauses.""" 

531 

532 @property 

533 def left(self): 

534 return self.tokens[0] 

535 

536 @property 

537 def right(self): 

538 return self.tokens[-1] 

539 

540 

541class Comment(TokenList): 

542 """A comment.""" 

543 

544 def is_multiline(self): 

545 return self.tokens and self.tokens[0].ttype == T.Comment.Multiline 

546 

547 

548class Where(TokenList): 

549 """A WHERE clause.""" 

550 M_OPEN = T.Keyword, 'WHERE' 

551 M_CLOSE = T.Keyword, ( 

552 'ORDER BY', 'GROUP BY', 'LIMIT', 'UNION', 'UNION ALL', 'EXCEPT', 

553 'HAVING', 'RETURNING', 'INTO') 

554 

555 

556class Having(TokenList): 

557 """A HAVING clause.""" 

558 M_OPEN = T.Keyword, 'HAVING' 

559 M_CLOSE = T.Keyword, ('ORDER BY', 'LIMIT') 

560 

561 

562class Case(TokenList): 

563 """A CASE statement with one or more WHEN and possibly an ELSE part.""" 

564 M_OPEN = T.Keyword, 'CASE' 

565 M_CLOSE = T.Keyword, 'END' 

566 

567 def get_cases(self, skip_ws=False): 

568 """Returns a list of 2-tuples (condition, value). 

569 

570 If an ELSE exists condition is None. 

571 """ 

572 CONDITION = 1 

573 VALUE = 2 

574 

575 ret = [] 

576 mode = CONDITION 

577 

578 for token in self.tokens: 

579 # Set mode from the current statement 

580 if token.match(T.Keyword, 'CASE'): 

581 continue 

582 

583 elif skip_ws and token.ttype in T.Whitespace: 

584 continue 

585 

586 elif token.match(T.Keyword, 'WHEN'): 

587 ret.append(([], [])) 

588 mode = CONDITION 

589 

590 elif token.match(T.Keyword, 'THEN'): 

591 mode = VALUE 

592 

593 elif token.match(T.Keyword, 'ELSE'): 

594 ret.append((None, [])) 

595 mode = VALUE 

596 

597 elif token.match(T.Keyword, 'END'): 

598 mode = None 

599 

600 # First condition without preceding WHEN 

601 if mode and not ret: 

602 ret.append(([], [])) 

603 

604 # Append token depending of the current mode 

605 if mode == CONDITION: 

606 ret[-1][0].append(token) 

607 

608 elif mode == VALUE: 

609 ret[-1][1].append(token) 

610 

611 # Return cases list 

612 return ret 

613 

614 

615class Function(NameAliasMixin, TokenList): 

616 """A function or procedure call.""" 

617 

618 def get_parameters(self): 

619 """Return a list of parameters.""" 

620 parenthesis = self.tokens[-1] 

621 for token in parenthesis.tokens: 

622 if isinstance(token, IdentifierList): 

623 return token.get_identifiers() 

624 elif imt(token, i=(Function, Identifier), t=T.Literal): 

625 return [token, ] 

626 return [] 

627 

628 

629class Begin(TokenList): 

630 """A BEGIN/END block.""" 

631 M_OPEN = T.Keyword, 'BEGIN' 

632 M_CLOSE = T.Keyword, 'END' 

633 

634 

635class Operation(TokenList): 

636 """Grouping of operations""" 

637 

638 

639class Values(TokenList): 

640 """Grouping of values""" 

641 

642 

643class Command(TokenList): 

644 """Grouping of CLI commands."""