Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/sql/where.py: 62%

163 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2Code to manage the creation and SQL rendering of 'where' constraints. 

3""" 

4 

5from django.core.exceptions import EmptyResultSet 

6from django.utils import tree 

7from django.utils.functional import cached_property 

8 

9# Connection types 

10AND = "AND" 

11OR = "OR" 

12 

13 

14class WhereNode(tree.Node): 

15 """ 

16 An SQL WHERE clause. 

17 

18 The class is tied to the Query class that created it (in order to create 

19 the correct SQL). 

20 

21 A child is usually an expression producing boolean values. Most likely the 

22 expression is a Lookup instance. 

23 

24 However, a child could also be any class with as_sql() and either 

25 relabeled_clone() method or relabel_aliases() and clone() methods and 

26 contains_aggregate attribute. 

27 """ 

28 

29 default = AND 

30 resolved = False 

31 conditional = True 

32 

33 def split_having(self, negated=False): 

34 """ 

35 Return two possibly None nodes: one for those parts of self that 

36 should be included in the WHERE clause and one for those parts of 

37 self that must be included in the HAVING clause. 

38 """ 

39 if not self.contains_aggregate: 39 ↛ 41line 39 didn't jump to line 41, because the condition on line 39 was never false

40 return self, None 

41 in_negated = negated ^ self.negated 

42 # If the effective connector is OR and this node contains an aggregate, 

43 # then we need to push the whole branch to HAVING clause. 

44 may_need_split = (in_negated and self.connector == AND) or ( 

45 not in_negated and self.connector == OR 

46 ) 

47 if may_need_split and self.contains_aggregate: 

48 return None, self 

49 where_parts = [] 

50 having_parts = [] 

51 for c in self.children: 

52 if hasattr(c, "split_having"): 

53 where_part, having_part = c.split_having(in_negated) 

54 if where_part is not None: 

55 where_parts.append(where_part) 

56 if having_part is not None: 

57 having_parts.append(having_part) 

58 elif c.contains_aggregate: 

59 having_parts.append(c) 

60 else: 

61 where_parts.append(c) 

62 having_node = ( 

63 self.__class__(having_parts, self.connector, self.negated) 

64 if having_parts 

65 else None 

66 ) 

67 where_node = ( 

68 self.__class__(where_parts, self.connector, self.negated) 

69 if where_parts 

70 else None 

71 ) 

72 return where_node, having_node 

73 

74 def as_sql(self, compiler, connection): 

75 """ 

76 Return the SQL version of the where clause and the value to be 

77 substituted in. Return '', [] if this node matches everything, 

78 None, [] if this node is empty, and raise EmptyResultSet if this 

79 node can't match anything. 

80 """ 

81 result = [] 

82 result_params = [] 

83 if self.connector == AND: 83 ↛ 86line 83 didn't jump to line 86, because the condition on line 83 was never false

84 full_needed, empty_needed = len(self.children), 1 

85 else: 

86 full_needed, empty_needed = 1, len(self.children) 

87 

88 for child in self.children: 

89 try: 

90 sql, params = compiler.compile(child) 

91 except EmptyResultSet: 

92 empty_needed -= 1 

93 else: 

94 if sql: 94 ↛ 98line 94 didn't jump to line 98, because the condition on line 94 was never false

95 result.append(sql) 

96 result_params.extend(params) 

97 else: 

98 full_needed -= 1 

99 # Check if this node matches nothing or everything. 

100 # First check the amount of full nodes and empty nodes 

101 # to make this node empty/full. 

102 # Now, check if this node is full/empty using the 

103 # counts. 

104 if empty_needed == 0: 

105 if self.negated: 105 ↛ 106line 105 didn't jump to line 106, because the condition on line 105 was never true

106 return "", [] 

107 else: 

108 raise EmptyResultSet 

109 if full_needed == 0: 109 ↛ 110line 109 didn't jump to line 110, because the condition on line 109 was never true

110 if self.negated: 

111 raise EmptyResultSet 

112 else: 

113 return "", [] 

114 conn = " %s " % self.connector 

115 sql_string = conn.join(result) 

116 if sql_string: 

117 if self.negated: 

118 # Some backends (Oracle at least) need parentheses 

119 # around the inner SQL in the negated case, even if the 

120 # inner SQL contains just a single expression. 

121 sql_string = "NOT (%s)" % sql_string 

122 elif len(result) > 1 or self.resolved: 

123 sql_string = "(%s)" % sql_string 

124 return sql_string, result_params 

125 

126 def get_group_by_cols(self, alias=None): 

127 cols = [] 

128 for child in self.children: 

129 cols.extend(child.get_group_by_cols()) 

130 return cols 

131 

132 def get_source_expressions(self): 

133 return self.children[:] 

134 

135 def set_source_expressions(self, children): 

136 assert len(children) == len(self.children) 

137 self.children = children 

138 

139 def relabel_aliases(self, change_map): 

140 """ 

141 Relabel the alias values of any children. 'change_map' is a dictionary 

142 mapping old (current) alias values to the new values. 

143 """ 

144 for pos, child in enumerate(self.children): 

145 if hasattr(child, "relabel_aliases"): 145 ↛ 147line 145 didn't jump to line 147, because the condition on line 145 was never true

146 # For example another WhereNode 

147 child.relabel_aliases(change_map) 

148 elif hasattr(child, "relabeled_clone"): 148 ↛ 144line 148 didn't jump to line 144, because the condition on line 148 was never false

149 self.children[pos] = child.relabeled_clone(change_map) 

150 

151 def clone(self): 

152 """ 

153 Create a clone of the tree. Must only be called on root nodes (nodes 

154 with empty subtree_parents). Childs must be either (Constraint, lookup, 

155 value) tuples, or objects supporting .clone(). 

156 """ 

157 clone = self.__class__._new_instance( 

158 children=None, 

159 connector=self.connector, 

160 negated=self.negated, 

161 ) 

162 for child in self.children: 

163 if hasattr(child, "clone"): 

164 clone.children.append(child.clone()) 

165 else: 

166 clone.children.append(child) 

167 return clone 

168 

169 def relabeled_clone(self, change_map): 

170 clone = self.clone() 

171 clone.relabel_aliases(change_map) 

172 return clone 

173 

174 def copy(self): 

175 return self.clone() 

176 

177 @classmethod 

178 def _contains_aggregate(cls, obj): 

179 if isinstance(obj, tree.Node): 

180 return any(cls._contains_aggregate(c) for c in obj.children) 

181 return obj.contains_aggregate 

182 

183 @cached_property 

184 def contains_aggregate(self): 

185 return self._contains_aggregate(self) 

186 

187 @classmethod 

188 def _contains_over_clause(cls, obj): 

189 if isinstance(obj, tree.Node): 

190 return any(cls._contains_over_clause(c) for c in obj.children) 

191 return obj.contains_over_clause 

192 

193 @cached_property 

194 def contains_over_clause(self): 

195 return self._contains_over_clause(self) 

196 

197 @staticmethod 

198 def _resolve_leaf(expr, query, *args, **kwargs): 

199 if hasattr(expr, "resolve_expression"): 

200 expr = expr.resolve_expression(query, *args, **kwargs) 

201 return expr 

202 

203 @classmethod 

204 def _resolve_node(cls, node, query, *args, **kwargs): 

205 if hasattr(node, "children"): 

206 for child in node.children: 

207 cls._resolve_node(child, query, *args, **kwargs) 

208 if hasattr(node, "lhs"): 

209 node.lhs = cls._resolve_leaf(node.lhs, query, *args, **kwargs) 

210 if hasattr(node, "rhs"): 

211 node.rhs = cls._resolve_leaf(node.rhs, query, *args, **kwargs) 

212 

213 def resolve_expression(self, *args, **kwargs): 

214 clone = self.clone() 

215 clone._resolve_node(clone, *args, **kwargs) 

216 clone.resolved = True 

217 return clone 

218 

219 @cached_property 

220 def output_field(self): 

221 from django.db.models import BooleanField 

222 

223 return BooleanField() 

224 

225 def select_format(self, compiler, sql, params): 

226 # Wrap filters with a CASE WHEN expression if a database backend 

227 # (e.g. Oracle) doesn't support boolean expression in SELECT or GROUP 

228 # BY list. 

229 if not compiler.connection.features.supports_boolean_expr_in_select_clause: 

230 sql = f"CASE WHEN {sql} THEN 1 ELSE 0 END" 

231 return sql, params 

232 

233 def get_db_converters(self, connection): 

234 return self.output_field.get_db_converters(connection) 

235 

236 def get_lookup(self, lookup): 

237 return self.output_field.get_lookup(lookup) 

238 

239 

240class NothingNode: 

241 """A node that matches nothing.""" 

242 

243 contains_aggregate = False 

244 

245 def as_sql(self, compiler=None, connection=None): 

246 raise EmptyResultSet 

247 

248 

249class ExtraWhere: 

250 # The contents are a black box - assume no aggregates are used. 

251 contains_aggregate = False 

252 

253 def __init__(self, sqls, params): 

254 self.sqls = sqls 

255 self.params = params 

256 

257 def as_sql(self, compiler=None, connection=None): 

258 sqls = ["(%s)" % sql for sql in self.sqls] 

259 return " AND ".join(sqls), list(self.params or ()) 

260 

261 

262class SubqueryConstraint: 

263 # Even if aggregates would be used in a subquery, the outer query isn't 

264 # interested about those. 

265 contains_aggregate = False 

266 

267 def __init__(self, alias, columns, targets, query_object): 

268 self.alias = alias 

269 self.columns = columns 

270 self.targets = targets 

271 query_object.clear_ordering(clear_default=True) 

272 self.query_object = query_object 

273 

274 def as_sql(self, compiler, connection): 

275 query = self.query_object 

276 query.set_values(self.targets) 

277 query_compiler = query.get_compiler(connection=connection) 

278 return query_compiler.as_subquery_condition(self.alias, self.columns, compiler)