Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/aggregates.py: 62%

121 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2Classes to represent the definitions of aggregate functions. 

3""" 

4from django.core.exceptions import FieldError 

5from django.db.models.expressions import Case, Func, Star, When 

6from django.db.models.fields import IntegerField 

7from django.db.models.functions.comparison import Coalesce 

8from django.db.models.functions.mixins import ( 

9 FixDurationInputMixin, 

10 NumericOutputFieldMixin, 

11) 

12 

13__all__ = [ 

14 "Aggregate", 

15 "Avg", 

16 "Count", 

17 "Max", 

18 "Min", 

19 "StdDev", 

20 "Sum", 

21 "Variance", 

22] 

23 

24 

25class Aggregate(Func): 

26 template = "%(function)s(%(distinct)s%(expressions)s)" 

27 contains_aggregate = True 

28 name = None 

29 filter_template = "%s FILTER (WHERE %%(filter)s)" 

30 window_compatible = True 

31 allow_distinct = False 

32 empty_result_set_value = None 

33 

34 def __init__( 

35 self, *expressions, distinct=False, filter=None, default=None, **extra 

36 ): 

37 if distinct and not self.allow_distinct: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true

38 raise TypeError("%s does not allow distinct." % self.__class__.__name__) 

39 if default is not None and self.empty_result_set_value is not None: 39 ↛ 40line 39 didn't jump to line 40, because the condition on line 39 was never true

40 raise TypeError(f"{self.__class__.__name__} does not allow default.") 

41 self.distinct = distinct 

42 self.filter = filter 

43 self.default = default 

44 super().__init__(*expressions, **extra) 

45 

46 def get_source_fields(self): 

47 # Don't return the filter expression since it's not a source field. 

48 return [e._output_field_or_none for e in super().get_source_expressions()] 

49 

50 def get_source_expressions(self): 

51 source_expressions = super().get_source_expressions() 

52 if self.filter: 52 ↛ 53line 52 didn't jump to line 53, because the condition on line 52 was never true

53 return source_expressions + [self.filter] 

54 return source_expressions 

55 

56 def set_source_expressions(self, exprs): 

57 self.filter = self.filter and exprs.pop() 

58 return super().set_source_expressions(exprs) 

59 

60 def resolve_expression( 

61 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

62 ): 

63 # Aggregates are not allowed in UPDATE queries, so ignore for_save 

64 c = super().resolve_expression(query, allow_joins, reuse, summarize) 

65 c.filter = c.filter and c.filter.resolve_expression( 

66 query, allow_joins, reuse, summarize 

67 ) 

68 if not summarize: 68 ↛ 71line 68 didn't jump to line 71, because the condition on line 68 was never true

69 # Call Aggregate.get_source_expressions() to avoid 

70 # returning self.filter and including that in this loop. 

71 expressions = super(Aggregate, c).get_source_expressions() 

72 for index, expr in enumerate(expressions): 

73 if expr.contains_aggregate: 

74 before_resolved = self.get_source_expressions()[index] 

75 name = ( 

76 before_resolved.name 

77 if hasattr(before_resolved, "name") 

78 else repr(before_resolved) 

79 ) 

80 raise FieldError( 

81 "Cannot compute %s('%s'): '%s' is an aggregate" 

82 % (c.name, name, name) 

83 ) 

84 if (default := c.default) is None: 84 ↛ 86line 84 didn't jump to line 86, because the condition on line 84 was never false

85 return c 

86 if hasattr(default, "resolve_expression"): 

87 default = default.resolve_expression(query, allow_joins, reuse, summarize) 

88 c.default = None # Reset the default argument before wrapping. 

89 coalesce = Coalesce(c, default, output_field=c._output_field_or_none) 

90 coalesce.is_summary = c.is_summary 

91 return coalesce 

92 

93 @property 

94 def default_alias(self): 

95 expressions = self.get_source_expressions() 

96 if len(expressions) == 1 and hasattr(expressions[0], "name"): 96 ↛ 98line 96 didn't jump to line 98, because the condition on line 96 was never false

97 return "%s__%s" % (expressions[0].name, self.name.lower()) 

98 raise TypeError("Complex expressions require an alias") 

99 

100 def get_group_by_cols(self, alias=None): 

101 return [] 

102 

103 def as_sql(self, compiler, connection, **extra_context): 

104 extra_context["distinct"] = "DISTINCT " if self.distinct else "" 

105 if self.filter: 105 ↛ 106line 105 didn't jump to line 106, because the condition on line 105 was never true

106 if connection.features.supports_aggregate_filter_clause: 

107 filter_sql, filter_params = self.filter.as_sql(compiler, connection) 

108 template = self.filter_template % extra_context.get( 

109 "template", self.template 

110 ) 

111 sql, params = super().as_sql( 

112 compiler, 

113 connection, 

114 template=template, 

115 filter=filter_sql, 

116 **extra_context, 

117 ) 

118 return sql, params + filter_params 

119 else: 

120 copy = self.copy() 

121 copy.filter = None 

122 source_expressions = copy.get_source_expressions() 

123 condition = When(self.filter, then=source_expressions[0]) 

124 copy.set_source_expressions([Case(condition)] + source_expressions[1:]) 

125 return super(Aggregate, copy).as_sql( 

126 compiler, connection, **extra_context 

127 ) 

128 return super().as_sql(compiler, connection, **extra_context) 

129 

130 def _get_repr_options(self): 

131 options = super()._get_repr_options() 

132 if self.distinct: 

133 options["distinct"] = self.distinct 

134 if self.filter: 

135 options["filter"] = self.filter 

136 return options 

137 

138 

139class Avg(FixDurationInputMixin, NumericOutputFieldMixin, Aggregate): 

140 function = "AVG" 

141 name = "Avg" 

142 allow_distinct = True 

143 

144 

145class Count(Aggregate): 

146 function = "COUNT" 

147 name = "Count" 

148 output_field = IntegerField() 

149 allow_distinct = True 

150 empty_result_set_value = 0 

151 

152 def __init__(self, expression, filter=None, **extra): 

153 if expression == "*": 153 ↛ 155line 153 didn't jump to line 155, because the condition on line 153 was never false

154 expression = Star() 

155 if isinstance(expression, Star) and filter is not None: 155 ↛ 156line 155 didn't jump to line 156, because the condition on line 155 was never true

156 raise ValueError("Star cannot be used with filter. Please specify a field.") 

157 super().__init__(expression, filter=filter, **extra) 

158 

159 

160class Max(Aggregate): 

161 function = "MAX" 

162 name = "Max" 

163 

164 

165class Min(Aggregate): 

166 function = "MIN" 

167 name = "Min" 

168 

169 

170class StdDev(NumericOutputFieldMixin, Aggregate): 

171 name = "StdDev" 

172 

173 def __init__(self, expression, sample=False, **extra): 

174 self.function = "STDDEV_SAMP" if sample else "STDDEV_POP" 

175 super().__init__(expression, **extra) 

176 

177 def _get_repr_options(self): 

178 return {**super()._get_repr_options(), "sample": self.function == "STDDEV_SAMP"} 

179 

180 

181class Sum(FixDurationInputMixin, Aggregate): 

182 function = "SUM" 

183 name = "Sum" 

184 allow_distinct = True 

185 

186 

187class Variance(NumericOutputFieldMixin, Aggregate): 

188 name = "Variance" 

189 

190 def __init__(self, expression, sample=False, **extra): 

191 self.function = "VAR_SAMP" if sample else "VAR_POP" 

192 super().__init__(expression, **extra) 

193 

194 def _get_repr_options(self): 

195 return {**super()._get_repr_options(), "sample": self.function == "VAR_SAMP"}