Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/functions/datetime.py: 57%
204 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from datetime import datetime
3from django.conf import settings
4from django.db.models.expressions import Func
5from django.db.models.fields import (
6 DateField,
7 DateTimeField,
8 DurationField,
9 Field,
10 IntegerField,
11 TimeField,
12)
13from django.db.models.lookups import (
14 Transform,
15 YearExact,
16 YearGt,
17 YearGte,
18 YearLt,
19 YearLte,
20)
21from django.utils import timezone
24class TimezoneMixin:
25 tzinfo = None
27 def get_tzname(self):
28 # Timezone conversions must happen to the input datetime *before*
29 # applying a function. 2015-12-31 23:00:00 -02:00 is stored in the
30 # database as 2016-01-01 01:00:00 +00:00. Any results should be
31 # based on the input datetime not the stored datetime.
32 tzname = None
33 if settings.USE_TZ: 33 ↛ 38line 33 didn't jump to line 38, because the condition on line 33 was never false
34 if self.tzinfo is None: 34 ↛ 37line 34 didn't jump to line 37, because the condition on line 34 was never false
35 tzname = timezone.get_current_timezone_name()
36 else:
37 tzname = timezone._get_timezone_name(self.tzinfo)
38 return tzname
41class Extract(TimezoneMixin, Transform):
42 lookup_name = None
43 output_field = IntegerField()
45 def __init__(self, expression, lookup_name=None, tzinfo=None, **extra):
46 if self.lookup_name is None:
47 self.lookup_name = lookup_name
48 if self.lookup_name is None:
49 raise ValueError("lookup_name must be provided")
50 self.tzinfo = tzinfo
51 super().__init__(expression, **extra)
53 def as_sql(self, compiler, connection):
54 if not connection.ops.extract_trunc_lookup_pattern.fullmatch(self.lookup_name):
55 raise ValueError("Invalid lookup_name: %s" % self.lookup_name)
56 sql, params = compiler.compile(self.lhs)
57 lhs_output_field = self.lhs.output_field
58 if isinstance(lhs_output_field, DateTimeField):
59 tzname = self.get_tzname()
60 sql = connection.ops.datetime_extract_sql(self.lookup_name, sql, tzname)
61 elif self.tzinfo is not None:
62 raise ValueError("tzinfo can only be used with DateTimeField.")
63 elif isinstance(lhs_output_field, DateField):
64 sql = connection.ops.date_extract_sql(self.lookup_name, sql)
65 elif isinstance(lhs_output_field, TimeField):
66 sql = connection.ops.time_extract_sql(self.lookup_name, sql)
67 elif isinstance(lhs_output_field, DurationField):
68 if not connection.features.has_native_duration_field:
69 raise ValueError(
70 "Extract requires native DurationField database support."
71 )
72 sql = connection.ops.time_extract_sql(self.lookup_name, sql)
73 else:
74 # resolve_expression has already validated the output_field so this
75 # assert should never be hit.
76 assert False, "Tried to Extract from an invalid type."
77 return sql, params
79 def resolve_expression(
80 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
81 ):
82 copy = super().resolve_expression(
83 query, allow_joins, reuse, summarize, for_save
84 )
85 field = getattr(copy.lhs, "output_field", None)
86 if field is None:
87 return copy
88 if not isinstance(field, (DateField, DateTimeField, TimeField, DurationField)):
89 raise ValueError(
90 "Extract input expression must be DateField, DateTimeField, "
91 "TimeField, or DurationField."
92 )
93 # Passing dates to functions expecting datetimes is most likely a mistake.
94 if type(field) == DateField and copy.lookup_name in (
95 "hour",
96 "minute",
97 "second",
98 ):
99 raise ValueError(
100 "Cannot extract time component '%s' from DateField '%s'."
101 % (copy.lookup_name, field.name)
102 )
103 if isinstance(field, DurationField) and copy.lookup_name in (
104 "year",
105 "iso_year",
106 "month",
107 "week",
108 "week_day",
109 "iso_week_day",
110 "quarter",
111 ):
112 raise ValueError(
113 "Cannot extract component '%s' from DurationField '%s'."
114 % (copy.lookup_name, field.name)
115 )
116 return copy
119class ExtractYear(Extract):
120 lookup_name = "year"
123class ExtractIsoYear(Extract):
124 """Return the ISO-8601 week-numbering year."""
126 lookup_name = "iso_year"
129class ExtractMonth(Extract):
130 lookup_name = "month"
133class ExtractDay(Extract):
134 lookup_name = "day"
137class ExtractWeek(Extract):
138 """
139 Return 1-52 or 53, based on ISO-8601, i.e., Monday is the first of the
140 week.
141 """
143 lookup_name = "week"
146class ExtractWeekDay(Extract):
147 """
148 Return Sunday=1 through Saturday=7.
150 To replicate this in Python: (mydatetime.isoweekday() % 7) + 1
151 """
153 lookup_name = "week_day"
156class ExtractIsoWeekDay(Extract):
157 """Return Monday=1 through Sunday=7, based on ISO-8601."""
159 lookup_name = "iso_week_day"
162class ExtractQuarter(Extract):
163 lookup_name = "quarter"
166class ExtractHour(Extract):
167 lookup_name = "hour"
170class ExtractMinute(Extract):
171 lookup_name = "minute"
174class ExtractSecond(Extract):
175 lookup_name = "second"
178DateField.register_lookup(ExtractYear)
179DateField.register_lookup(ExtractMonth)
180DateField.register_lookup(ExtractDay)
181DateField.register_lookup(ExtractWeekDay)
182DateField.register_lookup(ExtractIsoWeekDay)
183DateField.register_lookup(ExtractWeek)
184DateField.register_lookup(ExtractIsoYear)
185DateField.register_lookup(ExtractQuarter)
187TimeField.register_lookup(ExtractHour)
188TimeField.register_lookup(ExtractMinute)
189TimeField.register_lookup(ExtractSecond)
191DateTimeField.register_lookup(ExtractHour)
192DateTimeField.register_lookup(ExtractMinute)
193DateTimeField.register_lookup(ExtractSecond)
195ExtractYear.register_lookup(YearExact)
196ExtractYear.register_lookup(YearGt)
197ExtractYear.register_lookup(YearGte)
198ExtractYear.register_lookup(YearLt)
199ExtractYear.register_lookup(YearLte)
201ExtractIsoYear.register_lookup(YearExact)
202ExtractIsoYear.register_lookup(YearGt)
203ExtractIsoYear.register_lookup(YearGte)
204ExtractIsoYear.register_lookup(YearLt)
205ExtractIsoYear.register_lookup(YearLte)
208class Now(Func):
209 template = "CURRENT_TIMESTAMP"
210 output_field = DateTimeField()
212 def as_postgresql(self, compiler, connection, **extra_context):
213 # PostgreSQL's CURRENT_TIMESTAMP means "the time at the start of the
214 # transaction". Use STATEMENT_TIMESTAMP to be cross-compatible with
215 # other databases.
216 return self.as_sql(
217 compiler, connection, template="STATEMENT_TIMESTAMP()", **extra_context
218 )
221class TruncBase(TimezoneMixin, Transform):
222 kind = None
223 tzinfo = None
225 # RemovedInDjango50Warning: when the deprecation ends, remove is_dst
226 # argument.
227 def __init__(
228 self,
229 expression,
230 output_field=None,
231 tzinfo=None,
232 is_dst=timezone.NOT_PASSED,
233 **extra,
234 ):
235 self.tzinfo = tzinfo
236 self.is_dst = is_dst
237 super().__init__(expression, output_field=output_field, **extra)
239 def as_sql(self, compiler, connection):
240 if not connection.ops.extract_trunc_lookup_pattern.fullmatch(self.kind):
241 raise ValueError("Invalid kind: %s" % self.kind)
242 inner_sql, inner_params = compiler.compile(self.lhs)
243 tzname = None
244 if isinstance(self.lhs.output_field, DateTimeField):
245 tzname = self.get_tzname()
246 elif self.tzinfo is not None:
247 raise ValueError("tzinfo can only be used with DateTimeField.")
248 if isinstance(self.output_field, DateTimeField):
249 sql = connection.ops.datetime_trunc_sql(self.kind, inner_sql, tzname)
250 elif isinstance(self.output_field, DateField):
251 sql = connection.ops.date_trunc_sql(self.kind, inner_sql, tzname)
252 elif isinstance(self.output_field, TimeField):
253 sql = connection.ops.time_trunc_sql(self.kind, inner_sql, tzname)
254 else:
255 raise ValueError(
256 "Trunc only valid on DateField, TimeField, or DateTimeField."
257 )
258 return sql, inner_params
260 def resolve_expression(
261 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
262 ):
263 copy = super().resolve_expression(
264 query, allow_joins, reuse, summarize, for_save
265 )
266 field = copy.lhs.output_field
267 # DateTimeField is a subclass of DateField so this works for both.
268 if not isinstance(field, (DateField, TimeField)): 268 ↛ 269line 268 didn't jump to line 269, because the condition on line 268 was never true
269 raise TypeError(
270 "%r isn't a DateField, TimeField, or DateTimeField." % field.name
271 )
272 # If self.output_field was None, then accessing the field will trigger
273 # the resolver to assign it to self.lhs.output_field.
274 if not isinstance(copy.output_field, (DateField, DateTimeField, TimeField)): 274 ↛ 275line 274 didn't jump to line 275, because the condition on line 274 was never true
275 raise ValueError(
276 "output_field must be either DateField, TimeField, or DateTimeField"
277 )
278 # Passing dates or times to functions expecting datetimes is most
279 # likely a mistake.
280 class_output_field = (
281 self.__class__.output_field
282 if isinstance(self.__class__.output_field, Field)
283 else None
284 )
285 output_field = class_output_field or copy.output_field
286 has_explicit_output_field = (
287 class_output_field or field.__class__ is not copy.output_field.__class__
288 )
289 if type(field) == DateField and ( 289 ↛ 293line 289 didn't jump to line 293, because the condition on line 289 was never true
290 isinstance(output_field, DateTimeField)
291 or copy.kind in ("hour", "minute", "second", "time")
292 ):
293 raise ValueError(
294 "Cannot truncate DateField '%s' to %s."
295 % (
296 field.name,
297 output_field.__class__.__name__
298 if has_explicit_output_field
299 else "DateTimeField",
300 )
301 )
302 elif isinstance(field, TimeField) and ( 302 ↛ 306line 302 didn't jump to line 306, because the condition on line 302 was never true
303 isinstance(output_field, DateTimeField)
304 or copy.kind in ("year", "quarter", "month", "week", "day", "date")
305 ):
306 raise ValueError(
307 "Cannot truncate TimeField '%s' to %s."
308 % (
309 field.name,
310 output_field.__class__.__name__
311 if has_explicit_output_field
312 else "DateTimeField",
313 )
314 )
315 return copy
317 def convert_value(self, value, expression, connection):
318 if isinstance(self.output_field, DateTimeField):
319 if not settings.USE_TZ:
320 pass
321 elif value is not None:
322 value = value.replace(tzinfo=None)
323 value = timezone.make_aware(value, self.tzinfo, is_dst=self.is_dst)
324 elif not connection.features.has_zoneinfo_database:
325 raise ValueError(
326 "Database returned an invalid datetime value. Are time "
327 "zone definitions for your database installed?"
328 )
329 elif isinstance(value, datetime):
330 if value is None:
331 pass
332 elif isinstance(self.output_field, DateField):
333 value = value.date()
334 elif isinstance(self.output_field, TimeField):
335 value = value.time()
336 return value
339class Trunc(TruncBase):
341 # RemovedInDjango50Warning: when the deprecation ends, remove is_dst
342 # argument.
343 def __init__(
344 self,
345 expression,
346 kind,
347 output_field=None,
348 tzinfo=None,
349 is_dst=timezone.NOT_PASSED,
350 **extra,
351 ):
352 self.kind = kind
353 super().__init__(
354 expression, output_field=output_field, tzinfo=tzinfo, is_dst=is_dst, **extra
355 )
358class TruncYear(TruncBase):
359 kind = "year"
362class TruncQuarter(TruncBase):
363 kind = "quarter"
366class TruncMonth(TruncBase):
367 kind = "month"
370class TruncWeek(TruncBase):
371 """Truncate to midnight on the Monday of the week."""
373 kind = "week"
376class TruncDay(TruncBase):
377 kind = "day"
380class TruncDate(TruncBase):
381 kind = "date"
382 lookup_name = "date"
383 output_field = DateField()
385 def as_sql(self, compiler, connection):
386 # Cast to date rather than truncate to date.
387 lhs, lhs_params = compiler.compile(self.lhs)
388 tzname = self.get_tzname()
389 sql = connection.ops.datetime_cast_date_sql(lhs, tzname)
390 return sql, lhs_params
393class TruncTime(TruncBase):
394 kind = "time"
395 lookup_name = "time"
396 output_field = TimeField()
398 def as_sql(self, compiler, connection):
399 # Cast to time rather than truncate to time.
400 lhs, lhs_params = compiler.compile(self.lhs)
401 tzname = self.get_tzname()
402 sql = connection.ops.datetime_cast_time_sql(lhs, tzname)
403 return sql, lhs_params
406class TruncHour(TruncBase):
407 kind = "hour"
410class TruncMinute(TruncBase):
411 kind = "minute"
414class TruncSecond(TruncBase):
415 kind = "second"
418DateTimeField.register_lookup(TruncDate)
419DateTimeField.register_lookup(TruncTime)