Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sqlparse/filters/reindent.py: 9%

173 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# 

2# Copyright (C) 2009-2020 the sqlparse authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of python-sqlparse and is released under 

6# the BSD License: https://opensource.org/licenses/BSD-3-Clause 

7 

8from sqlparse import sql, tokens as T 

9from sqlparse.utils import offset, indent 

10 

11 

12class ReindentFilter: 

13 def __init__(self, width=2, char=' ', wrap_after=0, n='\n', 

14 comma_first=False, indent_after_first=False, 

15 indent_columns=False): 

16 self.n = n 

17 self.width = width 

18 self.char = char 

19 self.indent = 1 if indent_after_first else 0 

20 self.offset = 0 

21 self.wrap_after = wrap_after 

22 self.comma_first = comma_first 

23 self.indent_columns = indent_columns 

24 self._curr_stmt = None 

25 self._last_stmt = None 

26 self._last_func = None 

27 

28 def _flatten_up_to_token(self, token): 

29 """Yields all tokens up to token but excluding current.""" 

30 if token.is_group: 

31 token = next(token.flatten()) 

32 

33 for t in self._curr_stmt.flatten(): 

34 if t == token: 

35 break 

36 yield t 

37 

38 @property 

39 def leading_ws(self): 

40 return self.offset + self.indent * self.width 

41 

42 def _get_offset(self, token): 

43 raw = ''.join(map(str, self._flatten_up_to_token(token))) 

44 line = (raw or '\n').splitlines()[-1] 

45 # Now take current offset into account and return relative offset. 

46 return len(line) - len(self.char * self.leading_ws) 

47 

48 def nl(self, offset=0): 

49 return sql.Token( 

50 T.Whitespace, 

51 self.n + self.char * max(0, self.leading_ws + offset)) 

52 

53 def _next_token(self, tlist, idx=-1): 

54 split_words = ('FROM', 'STRAIGHT_JOIN$', 'JOIN$', 'AND', 'OR', 

55 'GROUP BY', 'ORDER BY', 'UNION', 'VALUES', 

56 'SET', 'BETWEEN', 'EXCEPT', 'HAVING', 'LIMIT') 

57 m_split = T.Keyword, split_words, True 

58 tidx, token = tlist.token_next_by(m=m_split, idx=idx) 

59 

60 if token and token.normalized == 'BETWEEN': 

61 tidx, token = self._next_token(tlist, tidx) 

62 

63 if token and token.normalized == 'AND': 

64 tidx, token = self._next_token(tlist, tidx) 

65 

66 return tidx, token 

67 

68 def _split_kwds(self, tlist): 

69 tidx, token = self._next_token(tlist) 

70 while token: 

71 pidx, prev_ = tlist.token_prev(tidx, skip_ws=False) 

72 uprev = str(prev_) 

73 

74 if prev_ and prev_.is_whitespace: 

75 del tlist.tokens[pidx] 

76 tidx -= 1 

77 

78 if not (uprev.endswith('\n') or uprev.endswith('\r')): 

79 tlist.insert_before(tidx, self.nl()) 

80 tidx += 1 

81 

82 tidx, token = self._next_token(tlist, tidx) 

83 

84 def _split_statements(self, tlist): 

85 ttypes = T.Keyword.DML, T.Keyword.DDL 

86 tidx, token = tlist.token_next_by(t=ttypes) 

87 while token: 

88 pidx, prev_ = tlist.token_prev(tidx, skip_ws=False) 

89 if prev_ and prev_.is_whitespace: 

90 del tlist.tokens[pidx] 

91 tidx -= 1 

92 # only break if it's not the first token 

93 if prev_: 

94 tlist.insert_before(tidx, self.nl()) 

95 tidx += 1 

96 tidx, token = tlist.token_next_by(t=ttypes, idx=tidx) 

97 

98 def _process(self, tlist): 

99 func_name = '_process_{cls}'.format(cls=type(tlist).__name__) 

100 func = getattr(self, func_name.lower(), self._process_default) 

101 func(tlist) 

102 

103 def _process_where(self, tlist): 

104 tidx, token = tlist.token_next_by(m=(T.Keyword, 'WHERE')) 

105 if not token: 

106 return 

107 # issue121, errors in statement fixed?? 

108 tlist.insert_before(tidx, self.nl()) 

109 with indent(self): 

110 self._process_default(tlist) 

111 

112 def _process_parenthesis(self, tlist): 

113 ttypes = T.Keyword.DML, T.Keyword.DDL 

114 _, is_dml_dll = tlist.token_next_by(t=ttypes) 

115 fidx, first = tlist.token_next_by(m=sql.Parenthesis.M_OPEN) 

116 if first is None: 

117 return 

118 

119 with indent(self, 1 if is_dml_dll else 0): 

120 tlist.tokens.insert(0, self.nl()) if is_dml_dll else None 

121 with offset(self, self._get_offset(first) + 1): 

122 self._process_default(tlist, not is_dml_dll) 

123 

124 def _process_function(self, tlist): 

125 self._last_func = tlist[0] 

126 self._process_default(tlist) 

127 

128 def _process_identifierlist(self, tlist): 

129 identifiers = list(tlist.get_identifiers()) 

130 if self.indent_columns: 

131 first = next(identifiers[0].flatten()) 

132 num_offset = 1 if self.char == '\t' else self.width 

133 else: 

134 first = next(identifiers.pop(0).flatten()) 

135 num_offset = 1 if self.char == '\t' else self._get_offset(first) 

136 

137 if not tlist.within(sql.Function) and not tlist.within(sql.Values): 

138 with offset(self, num_offset): 

139 position = 0 

140 for token in identifiers: 

141 # Add 1 for the "," separator 

142 position += len(token.value) + 1 

143 if position > (self.wrap_after - self.offset): 

144 adjust = 0 

145 if self.comma_first: 

146 adjust = -2 

147 _, comma = tlist.token_prev( 

148 tlist.token_index(token)) 

149 if comma is None: 

150 continue 

151 token = comma 

152 tlist.insert_before(token, self.nl(offset=adjust)) 

153 if self.comma_first: 

154 _, ws = tlist.token_next( 

155 tlist.token_index(token), skip_ws=False) 

156 if (ws is not None 

157 and ws.ttype is not T.Text.Whitespace): 

158 tlist.insert_after( 

159 token, sql.Token(T.Whitespace, ' ')) 

160 position = 0 

161 else: 

162 # ensure whitespace 

163 for token in tlist: 

164 _, next_ws = tlist.token_next( 

165 tlist.token_index(token), skip_ws=False) 

166 if token.value == ',' and not next_ws.is_whitespace: 

167 tlist.insert_after( 

168 token, sql.Token(T.Whitespace, ' ')) 

169 

170 end_at = self.offset + sum(len(i.value) + 1 for i in identifiers) 

171 adjusted_offset = 0 

172 if (self.wrap_after > 0 

173 and end_at > (self.wrap_after - self.offset) 

174 and self._last_func): 

175 adjusted_offset = -len(self._last_func.value) - 1 

176 

177 with offset(self, adjusted_offset), indent(self): 

178 if adjusted_offset < 0: 

179 tlist.insert_before(identifiers[0], self.nl()) 

180 position = 0 

181 for token in identifiers: 

182 # Add 1 for the "," separator 

183 position += len(token.value) + 1 

184 if (self.wrap_after > 0 

185 and position > (self.wrap_after - self.offset)): 

186 adjust = 0 

187 tlist.insert_before(token, self.nl(offset=adjust)) 

188 position = 0 

189 self._process_default(tlist) 

190 

191 def _process_case(self, tlist): 

192 iterable = iter(tlist.get_cases()) 

193 cond, _ = next(iterable) 

194 first = next(cond[0].flatten()) 

195 

196 with offset(self, self._get_offset(tlist[0])): 

197 with offset(self, self._get_offset(first)): 

198 for cond, value in iterable: 

199 token = value[0] if cond is None else cond[0] 

200 tlist.insert_before(token, self.nl()) 

201 

202 # Line breaks on group level are done. let's add an offset of 

203 # len "when ", "then ", "else " 

204 with offset(self, len("WHEN ")): 

205 self._process_default(tlist) 

206 end_idx, end = tlist.token_next_by(m=sql.Case.M_CLOSE) 

207 if end_idx is not None: 

208 tlist.insert_before(end_idx, self.nl()) 

209 

210 def _process_values(self, tlist): 

211 tlist.insert_before(0, self.nl()) 

212 tidx, token = tlist.token_next_by(i=sql.Parenthesis) 

213 first_token = token 

214 while token: 

215 ptidx, ptoken = tlist.token_next_by(m=(T.Punctuation, ','), 

216 idx=tidx) 

217 if ptoken: 

218 if self.comma_first: 

219 adjust = -2 

220 offset = self._get_offset(first_token) + adjust 

221 tlist.insert_before(ptoken, self.nl(offset)) 

222 else: 

223 tlist.insert_after(ptoken, 

224 self.nl(self._get_offset(token))) 

225 tidx, token = tlist.token_next_by(i=sql.Parenthesis, idx=tidx) 

226 

227 def _process_default(self, tlist, stmts=True): 

228 self._split_statements(tlist) if stmts else None 

229 self._split_kwds(tlist) 

230 for sgroup in tlist.get_sublists(): 

231 self._process(sgroup) 

232 

233 def process(self, stmt): 

234 self._curr_stmt = stmt 

235 self._process(stmt) 

236 

237 if self._last_stmt is not None: 

238 nl = '\n' if str(self._last_stmt).endswith('\n') else '\n\n' 

239 stmt.tokens.insert(0, sql.Token(T.Whitespace, nl)) 

240 

241 self._last_stmt = stmt 

242 return stmt