Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/sql/where.py: 62%
163 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Code to manage the creation and SQL rendering of 'where' constraints.
3"""
5from django.core.exceptions import EmptyResultSet
6from django.utils import tree
7from django.utils.functional import cached_property
9# Connection types
10AND = "AND"
11OR = "OR"
14class WhereNode(tree.Node):
15 """
16 An SQL WHERE clause.
18 The class is tied to the Query class that created it (in order to create
19 the correct SQL).
21 A child is usually an expression producing boolean values. Most likely the
22 expression is a Lookup instance.
24 However, a child could also be any class with as_sql() and either
25 relabeled_clone() method or relabel_aliases() and clone() methods and
26 contains_aggregate attribute.
27 """
29 default = AND
30 resolved = False
31 conditional = True
33 def split_having(self, negated=False):
34 """
35 Return two possibly None nodes: one for those parts of self that
36 should be included in the WHERE clause and one for those parts of
37 self that must be included in the HAVING clause.
38 """
39 if not self.contains_aggregate: 39 ↛ 41line 39 didn't jump to line 41, because the condition on line 39 was never false
40 return self, None
41 in_negated = negated ^ self.negated
42 # If the effective connector is OR and this node contains an aggregate,
43 # then we need to push the whole branch to HAVING clause.
44 may_need_split = (in_negated and self.connector == AND) or (
45 not in_negated and self.connector == OR
46 )
47 if may_need_split and self.contains_aggregate:
48 return None, self
49 where_parts = []
50 having_parts = []
51 for c in self.children:
52 if hasattr(c, "split_having"):
53 where_part, having_part = c.split_having(in_negated)
54 if where_part is not None:
55 where_parts.append(where_part)
56 if having_part is not None:
57 having_parts.append(having_part)
58 elif c.contains_aggregate:
59 having_parts.append(c)
60 else:
61 where_parts.append(c)
62 having_node = (
63 self.__class__(having_parts, self.connector, self.negated)
64 if having_parts
65 else None
66 )
67 where_node = (
68 self.__class__(where_parts, self.connector, self.negated)
69 if where_parts
70 else None
71 )
72 return where_node, having_node
74 def as_sql(self, compiler, connection):
75 """
76 Return the SQL version of the where clause and the value to be
77 substituted in. Return '', [] if this node matches everything,
78 None, [] if this node is empty, and raise EmptyResultSet if this
79 node can't match anything.
80 """
81 result = []
82 result_params = []
83 if self.connector == AND: 83 ↛ 86line 83 didn't jump to line 86, because the condition on line 83 was never false
84 full_needed, empty_needed = len(self.children), 1
85 else:
86 full_needed, empty_needed = 1, len(self.children)
88 for child in self.children:
89 try:
90 sql, params = compiler.compile(child)
91 except EmptyResultSet:
92 empty_needed -= 1
93 else:
94 if sql: 94 ↛ 98line 94 didn't jump to line 98, because the condition on line 94 was never false
95 result.append(sql)
96 result_params.extend(params)
97 else:
98 full_needed -= 1
99 # Check if this node matches nothing or everything.
100 # First check the amount of full nodes and empty nodes
101 # to make this node empty/full.
102 # Now, check if this node is full/empty using the
103 # counts.
104 if empty_needed == 0:
105 if self.negated: 105 ↛ 106line 105 didn't jump to line 106, because the condition on line 105 was never true
106 return "", []
107 else:
108 raise EmptyResultSet
109 if full_needed == 0: 109 ↛ 110line 109 didn't jump to line 110, because the condition on line 109 was never true
110 if self.negated:
111 raise EmptyResultSet
112 else:
113 return "", []
114 conn = " %s " % self.connector
115 sql_string = conn.join(result)
116 if sql_string:
117 if self.negated:
118 # Some backends (Oracle at least) need parentheses
119 # around the inner SQL in the negated case, even if the
120 # inner SQL contains just a single expression.
121 sql_string = "NOT (%s)" % sql_string
122 elif len(result) > 1 or self.resolved:
123 sql_string = "(%s)" % sql_string
124 return sql_string, result_params
126 def get_group_by_cols(self, alias=None):
127 cols = []
128 for child in self.children:
129 cols.extend(child.get_group_by_cols())
130 return cols
132 def get_source_expressions(self):
133 return self.children[:]
135 def set_source_expressions(self, children):
136 assert len(children) == len(self.children)
137 self.children = children
139 def relabel_aliases(self, change_map):
140 """
141 Relabel the alias values of any children. 'change_map' is a dictionary
142 mapping old (current) alias values to the new values.
143 """
144 for pos, child in enumerate(self.children):
145 if hasattr(child, "relabel_aliases"): 145 ↛ 147line 145 didn't jump to line 147, because the condition on line 145 was never true
146 # For example another WhereNode
147 child.relabel_aliases(change_map)
148 elif hasattr(child, "relabeled_clone"): 148 ↛ 144line 148 didn't jump to line 144, because the condition on line 148 was never false
149 self.children[pos] = child.relabeled_clone(change_map)
151 def clone(self):
152 """
153 Create a clone of the tree. Must only be called on root nodes (nodes
154 with empty subtree_parents). Childs must be either (Constraint, lookup,
155 value) tuples, or objects supporting .clone().
156 """
157 clone = self.__class__._new_instance(
158 children=None,
159 connector=self.connector,
160 negated=self.negated,
161 )
162 for child in self.children:
163 if hasattr(child, "clone"):
164 clone.children.append(child.clone())
165 else:
166 clone.children.append(child)
167 return clone
169 def relabeled_clone(self, change_map):
170 clone = self.clone()
171 clone.relabel_aliases(change_map)
172 return clone
174 def copy(self):
175 return self.clone()
177 @classmethod
178 def _contains_aggregate(cls, obj):
179 if isinstance(obj, tree.Node):
180 return any(cls._contains_aggregate(c) for c in obj.children)
181 return obj.contains_aggregate
183 @cached_property
184 def contains_aggregate(self):
185 return self._contains_aggregate(self)
187 @classmethod
188 def _contains_over_clause(cls, obj):
189 if isinstance(obj, tree.Node):
190 return any(cls._contains_over_clause(c) for c in obj.children)
191 return obj.contains_over_clause
193 @cached_property
194 def contains_over_clause(self):
195 return self._contains_over_clause(self)
197 @staticmethod
198 def _resolve_leaf(expr, query, *args, **kwargs):
199 if hasattr(expr, "resolve_expression"):
200 expr = expr.resolve_expression(query, *args, **kwargs)
201 return expr
203 @classmethod
204 def _resolve_node(cls, node, query, *args, **kwargs):
205 if hasattr(node, "children"):
206 for child in node.children:
207 cls._resolve_node(child, query, *args, **kwargs)
208 if hasattr(node, "lhs"):
209 node.lhs = cls._resolve_leaf(node.lhs, query, *args, **kwargs)
210 if hasattr(node, "rhs"):
211 node.rhs = cls._resolve_leaf(node.rhs, query, *args, **kwargs)
213 def resolve_expression(self, *args, **kwargs):
214 clone = self.clone()
215 clone._resolve_node(clone, *args, **kwargs)
216 clone.resolved = True
217 return clone
219 @cached_property
220 def output_field(self):
221 from django.db.models import BooleanField
223 return BooleanField()
225 def select_format(self, compiler, sql, params):
226 # Wrap filters with a CASE WHEN expression if a database backend
227 # (e.g. Oracle) doesn't support boolean expression in SELECT or GROUP
228 # BY list.
229 if not compiler.connection.features.supports_boolean_expr_in_select_clause:
230 sql = f"CASE WHEN {sql} THEN 1 ELSE 0 END"
231 return sql, params
233 def get_db_converters(self, connection):
234 return self.output_field.get_db_converters(connection)
236 def get_lookup(self, lookup):
237 return self.output_field.get_lookup(lookup)
240class NothingNode:
241 """A node that matches nothing."""
243 contains_aggregate = False
245 def as_sql(self, compiler=None, connection=None):
246 raise EmptyResultSet
249class ExtraWhere:
250 # The contents are a black box - assume no aggregates are used.
251 contains_aggregate = False
253 def __init__(self, sqls, params):
254 self.sqls = sqls
255 self.params = params
257 def as_sql(self, compiler=None, connection=None):
258 sqls = ["(%s)" % sql for sql in self.sqls]
259 return " AND ".join(sqls), list(self.params or ())
262class SubqueryConstraint:
263 # Even if aggregates would be used in a subquery, the outer query isn't
264 # interested about those.
265 contains_aggregate = False
267 def __init__(self, alias, columns, targets, query_object):
268 self.alias = alias
269 self.columns = columns
270 self.targets = targets
271 query_object.clear_ordering(clear_default=True)
272 self.query_object = query_object
274 def as_sql(self, compiler, connection):
275 query = self.query_object
276 query.set_values(self.targets)
277 query_compiler = query.get_compiler(connection=connection)
278 return query_compiler.as_subquery_condition(self.alias, self.columns, compiler)