Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sqlparse/filters/reindent.py: 9%
173 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1#
2# Copyright (C) 2009-2020 the sqlparse authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of python-sqlparse and is released under
6# the BSD License: https://opensource.org/licenses/BSD-3-Clause
8from sqlparse import sql, tokens as T
9from sqlparse.utils import offset, indent
12class ReindentFilter:
13 def __init__(self, width=2, char=' ', wrap_after=0, n='\n',
14 comma_first=False, indent_after_first=False,
15 indent_columns=False):
16 self.n = n
17 self.width = width
18 self.char = char
19 self.indent = 1 if indent_after_first else 0
20 self.offset = 0
21 self.wrap_after = wrap_after
22 self.comma_first = comma_first
23 self.indent_columns = indent_columns
24 self._curr_stmt = None
25 self._last_stmt = None
26 self._last_func = None
28 def _flatten_up_to_token(self, token):
29 """Yields all tokens up to token but excluding current."""
30 if token.is_group:
31 token = next(token.flatten())
33 for t in self._curr_stmt.flatten():
34 if t == token:
35 break
36 yield t
38 @property
39 def leading_ws(self):
40 return self.offset + self.indent * self.width
42 def _get_offset(self, token):
43 raw = ''.join(map(str, self._flatten_up_to_token(token)))
44 line = (raw or '\n').splitlines()[-1]
45 # Now take current offset into account and return relative offset.
46 return len(line) - len(self.char * self.leading_ws)
48 def nl(self, offset=0):
49 return sql.Token(
50 T.Whitespace,
51 self.n + self.char * max(0, self.leading_ws + offset))
53 def _next_token(self, tlist, idx=-1):
54 split_words = ('FROM', 'STRAIGHT_JOIN$', 'JOIN$', 'AND', 'OR',
55 'GROUP BY', 'ORDER BY', 'UNION', 'VALUES',
56 'SET', 'BETWEEN', 'EXCEPT', 'HAVING', 'LIMIT')
57 m_split = T.Keyword, split_words, True
58 tidx, token = tlist.token_next_by(m=m_split, idx=idx)
60 if token and token.normalized == 'BETWEEN':
61 tidx, token = self._next_token(tlist, tidx)
63 if token and token.normalized == 'AND':
64 tidx, token = self._next_token(tlist, tidx)
66 return tidx, token
68 def _split_kwds(self, tlist):
69 tidx, token = self._next_token(tlist)
70 while token:
71 pidx, prev_ = tlist.token_prev(tidx, skip_ws=False)
72 uprev = str(prev_)
74 if prev_ and prev_.is_whitespace:
75 del tlist.tokens[pidx]
76 tidx -= 1
78 if not (uprev.endswith('\n') or uprev.endswith('\r')):
79 tlist.insert_before(tidx, self.nl())
80 tidx += 1
82 tidx, token = self._next_token(tlist, tidx)
84 def _split_statements(self, tlist):
85 ttypes = T.Keyword.DML, T.Keyword.DDL
86 tidx, token = tlist.token_next_by(t=ttypes)
87 while token:
88 pidx, prev_ = tlist.token_prev(tidx, skip_ws=False)
89 if prev_ and prev_.is_whitespace:
90 del tlist.tokens[pidx]
91 tidx -= 1
92 # only break if it's not the first token
93 if prev_:
94 tlist.insert_before(tidx, self.nl())
95 tidx += 1
96 tidx, token = tlist.token_next_by(t=ttypes, idx=tidx)
98 def _process(self, tlist):
99 func_name = '_process_{cls}'.format(cls=type(tlist).__name__)
100 func = getattr(self, func_name.lower(), self._process_default)
101 func(tlist)
103 def _process_where(self, tlist):
104 tidx, token = tlist.token_next_by(m=(T.Keyword, 'WHERE'))
105 if not token:
106 return
107 # issue121, errors in statement fixed??
108 tlist.insert_before(tidx, self.nl())
109 with indent(self):
110 self._process_default(tlist)
112 def _process_parenthesis(self, tlist):
113 ttypes = T.Keyword.DML, T.Keyword.DDL
114 _, is_dml_dll = tlist.token_next_by(t=ttypes)
115 fidx, first = tlist.token_next_by(m=sql.Parenthesis.M_OPEN)
116 if first is None:
117 return
119 with indent(self, 1 if is_dml_dll else 0):
120 tlist.tokens.insert(0, self.nl()) if is_dml_dll else None
121 with offset(self, self._get_offset(first) + 1):
122 self._process_default(tlist, not is_dml_dll)
124 def _process_function(self, tlist):
125 self._last_func = tlist[0]
126 self._process_default(tlist)
128 def _process_identifierlist(self, tlist):
129 identifiers = list(tlist.get_identifiers())
130 if self.indent_columns:
131 first = next(identifiers[0].flatten())
132 num_offset = 1 if self.char == '\t' else self.width
133 else:
134 first = next(identifiers.pop(0).flatten())
135 num_offset = 1 if self.char == '\t' else self._get_offset(first)
137 if not tlist.within(sql.Function) and not tlist.within(sql.Values):
138 with offset(self, num_offset):
139 position = 0
140 for token in identifiers:
141 # Add 1 for the "," separator
142 position += len(token.value) + 1
143 if position > (self.wrap_after - self.offset):
144 adjust = 0
145 if self.comma_first:
146 adjust = -2
147 _, comma = tlist.token_prev(
148 tlist.token_index(token))
149 if comma is None:
150 continue
151 token = comma
152 tlist.insert_before(token, self.nl(offset=adjust))
153 if self.comma_first:
154 _, ws = tlist.token_next(
155 tlist.token_index(token), skip_ws=False)
156 if (ws is not None
157 and ws.ttype is not T.Text.Whitespace):
158 tlist.insert_after(
159 token, sql.Token(T.Whitespace, ' '))
160 position = 0
161 else:
162 # ensure whitespace
163 for token in tlist:
164 _, next_ws = tlist.token_next(
165 tlist.token_index(token), skip_ws=False)
166 if token.value == ',' and not next_ws.is_whitespace:
167 tlist.insert_after(
168 token, sql.Token(T.Whitespace, ' '))
170 end_at = self.offset + sum(len(i.value) + 1 for i in identifiers)
171 adjusted_offset = 0
172 if (self.wrap_after > 0
173 and end_at > (self.wrap_after - self.offset)
174 and self._last_func):
175 adjusted_offset = -len(self._last_func.value) - 1
177 with offset(self, adjusted_offset), indent(self):
178 if adjusted_offset < 0:
179 tlist.insert_before(identifiers[0], self.nl())
180 position = 0
181 for token in identifiers:
182 # Add 1 for the "," separator
183 position += len(token.value) + 1
184 if (self.wrap_after > 0
185 and position > (self.wrap_after - self.offset)):
186 adjust = 0
187 tlist.insert_before(token, self.nl(offset=adjust))
188 position = 0
189 self._process_default(tlist)
191 def _process_case(self, tlist):
192 iterable = iter(tlist.get_cases())
193 cond, _ = next(iterable)
194 first = next(cond[0].flatten())
196 with offset(self, self._get_offset(tlist[0])):
197 with offset(self, self._get_offset(first)):
198 for cond, value in iterable:
199 token = value[0] if cond is None else cond[0]
200 tlist.insert_before(token, self.nl())
202 # Line breaks on group level are done. let's add an offset of
203 # len "when ", "then ", "else "
204 with offset(self, len("WHEN ")):
205 self._process_default(tlist)
206 end_idx, end = tlist.token_next_by(m=sql.Case.M_CLOSE)
207 if end_idx is not None:
208 tlist.insert_before(end_idx, self.nl())
210 def _process_values(self, tlist):
211 tlist.insert_before(0, self.nl())
212 tidx, token = tlist.token_next_by(i=sql.Parenthesis)
213 first_token = token
214 while token:
215 ptidx, ptoken = tlist.token_next_by(m=(T.Punctuation, ','),
216 idx=tidx)
217 if ptoken:
218 if self.comma_first:
219 adjust = -2
220 offset = self._get_offset(first_token) + adjust
221 tlist.insert_before(ptoken, self.nl(offset))
222 else:
223 tlist.insert_after(ptoken,
224 self.nl(self._get_offset(token)))
225 tidx, token = tlist.token_next_by(i=sql.Parenthesis, idx=tidx)
227 def _process_default(self, tlist, stmts=True):
228 self._split_statements(tlist) if stmts else None
229 self._split_kwds(tlist)
230 for sgroup in tlist.get_sublists():
231 self._process(sgroup)
233 def process(self, stmt):
234 self._curr_stmt = stmt
235 self._process(stmt)
237 if self._last_stmt is not None:
238 nl = '\n' if str(self._last_stmt).endswith('\n') else '\n\n'
239 stmt.tokens.insert(0, sql.Token(T.Whitespace, nl))
241 self._last_stmt = stmt
242 return stmt