Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/aggregates.py: 62%
121 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Classes to represent the definitions of aggregate functions.
3"""
4from django.core.exceptions import FieldError
5from django.db.models.expressions import Case, Func, Star, When
6from django.db.models.fields import IntegerField
7from django.db.models.functions.comparison import Coalesce
8from django.db.models.functions.mixins import (
9 FixDurationInputMixin,
10 NumericOutputFieldMixin,
11)
13__all__ = [
14 "Aggregate",
15 "Avg",
16 "Count",
17 "Max",
18 "Min",
19 "StdDev",
20 "Sum",
21 "Variance",
22]
25class Aggregate(Func):
26 template = "%(function)s(%(distinct)s%(expressions)s)"
27 contains_aggregate = True
28 name = None
29 filter_template = "%s FILTER (WHERE %%(filter)s)"
30 window_compatible = True
31 allow_distinct = False
32 empty_result_set_value = None
34 def __init__(
35 self, *expressions, distinct=False, filter=None, default=None, **extra
36 ):
37 if distinct and not self.allow_distinct: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true
38 raise TypeError("%s does not allow distinct." % self.__class__.__name__)
39 if default is not None and self.empty_result_set_value is not None: 39 ↛ 40line 39 didn't jump to line 40, because the condition on line 39 was never true
40 raise TypeError(f"{self.__class__.__name__} does not allow default.")
41 self.distinct = distinct
42 self.filter = filter
43 self.default = default
44 super().__init__(*expressions, **extra)
46 def get_source_fields(self):
47 # Don't return the filter expression since it's not a source field.
48 return [e._output_field_or_none for e in super().get_source_expressions()]
50 def get_source_expressions(self):
51 source_expressions = super().get_source_expressions()
52 if self.filter: 52 ↛ 53line 52 didn't jump to line 53, because the condition on line 52 was never true
53 return source_expressions + [self.filter]
54 return source_expressions
56 def set_source_expressions(self, exprs):
57 self.filter = self.filter and exprs.pop()
58 return super().set_source_expressions(exprs)
60 def resolve_expression(
61 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
62 ):
63 # Aggregates are not allowed in UPDATE queries, so ignore for_save
64 c = super().resolve_expression(query, allow_joins, reuse, summarize)
65 c.filter = c.filter and c.filter.resolve_expression(
66 query, allow_joins, reuse, summarize
67 )
68 if not summarize: 68 ↛ 71line 68 didn't jump to line 71, because the condition on line 68 was never true
69 # Call Aggregate.get_source_expressions() to avoid
70 # returning self.filter and including that in this loop.
71 expressions = super(Aggregate, c).get_source_expressions()
72 for index, expr in enumerate(expressions):
73 if expr.contains_aggregate:
74 before_resolved = self.get_source_expressions()[index]
75 name = (
76 before_resolved.name
77 if hasattr(before_resolved, "name")
78 else repr(before_resolved)
79 )
80 raise FieldError(
81 "Cannot compute %s('%s'): '%s' is an aggregate"
82 % (c.name, name, name)
83 )
84 if (default := c.default) is None: 84 ↛ 86line 84 didn't jump to line 86, because the condition on line 84 was never false
85 return c
86 if hasattr(default, "resolve_expression"):
87 default = default.resolve_expression(query, allow_joins, reuse, summarize)
88 c.default = None # Reset the default argument before wrapping.
89 coalesce = Coalesce(c, default, output_field=c._output_field_or_none)
90 coalesce.is_summary = c.is_summary
91 return coalesce
93 @property
94 def default_alias(self):
95 expressions = self.get_source_expressions()
96 if len(expressions) == 1 and hasattr(expressions[0], "name"): 96 ↛ 98line 96 didn't jump to line 98, because the condition on line 96 was never false
97 return "%s__%s" % (expressions[0].name, self.name.lower())
98 raise TypeError("Complex expressions require an alias")
100 def get_group_by_cols(self, alias=None):
101 return []
103 def as_sql(self, compiler, connection, **extra_context):
104 extra_context["distinct"] = "DISTINCT " if self.distinct else ""
105 if self.filter: 105 ↛ 106line 105 didn't jump to line 106, because the condition on line 105 was never true
106 if connection.features.supports_aggregate_filter_clause:
107 filter_sql, filter_params = self.filter.as_sql(compiler, connection)
108 template = self.filter_template % extra_context.get(
109 "template", self.template
110 )
111 sql, params = super().as_sql(
112 compiler,
113 connection,
114 template=template,
115 filter=filter_sql,
116 **extra_context,
117 )
118 return sql, params + filter_params
119 else:
120 copy = self.copy()
121 copy.filter = None
122 source_expressions = copy.get_source_expressions()
123 condition = When(self.filter, then=source_expressions[0])
124 copy.set_source_expressions([Case(condition)] + source_expressions[1:])
125 return super(Aggregate, copy).as_sql(
126 compiler, connection, **extra_context
127 )
128 return super().as_sql(compiler, connection, **extra_context)
130 def _get_repr_options(self):
131 options = super()._get_repr_options()
132 if self.distinct:
133 options["distinct"] = self.distinct
134 if self.filter:
135 options["filter"] = self.filter
136 return options
139class Avg(FixDurationInputMixin, NumericOutputFieldMixin, Aggregate):
140 function = "AVG"
141 name = "Avg"
142 allow_distinct = True
145class Count(Aggregate):
146 function = "COUNT"
147 name = "Count"
148 output_field = IntegerField()
149 allow_distinct = True
150 empty_result_set_value = 0
152 def __init__(self, expression, filter=None, **extra):
153 if expression == "*": 153 ↛ 155line 153 didn't jump to line 155, because the condition on line 153 was never false
154 expression = Star()
155 if isinstance(expression, Star) and filter is not None: 155 ↛ 156line 155 didn't jump to line 156, because the condition on line 155 was never true
156 raise ValueError("Star cannot be used with filter. Please specify a field.")
157 super().__init__(expression, filter=filter, **extra)
160class Max(Aggregate):
161 function = "MAX"
162 name = "Max"
165class Min(Aggregate):
166 function = "MIN"
167 name = "Min"
170class StdDev(NumericOutputFieldMixin, Aggregate):
171 name = "StdDev"
173 def __init__(self, expression, sample=False, **extra):
174 self.function = "STDDEV_SAMP" if sample else "STDDEV_POP"
175 super().__init__(expression, **extra)
177 def _get_repr_options(self):
178 return {**super()._get_repr_options(), "sample": self.function == "STDDEV_SAMP"}
181class Sum(FixDurationInputMixin, Aggregate):
182 function = "SUM"
183 name = "Sum"
184 allow_distinct = True
187class Variance(NumericOutputFieldMixin, Aggregate):
188 name = "Variance"
190 def __init__(self, expression, sample=False, **extra):
191 self.function = "VAR_SAMP" if sample else "VAR_POP"
192 super().__init__(expression, **extra)
194 def _get_repr_options(self):
195 return {**super()._get_repr_options(), "sample": self.function == "VAR_SAMP"}