Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/functions/datetime.py: 57%

204 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from datetime import datetime 

2 

3from django.conf import settings 

4from django.db.models.expressions import Func 

5from django.db.models.fields import ( 

6 DateField, 

7 DateTimeField, 

8 DurationField, 

9 Field, 

10 IntegerField, 

11 TimeField, 

12) 

13from django.db.models.lookups import ( 

14 Transform, 

15 YearExact, 

16 YearGt, 

17 YearGte, 

18 YearLt, 

19 YearLte, 

20) 

21from django.utils import timezone 

22 

23 

24class TimezoneMixin: 

25 tzinfo = None 

26 

27 def get_tzname(self): 

28 # Timezone conversions must happen to the input datetime *before* 

29 # applying a function. 2015-12-31 23:00:00 -02:00 is stored in the 

30 # database as 2016-01-01 01:00:00 +00:00. Any results should be 

31 # based on the input datetime not the stored datetime. 

32 tzname = None 

33 if settings.USE_TZ: 33 ↛ 38line 33 didn't jump to line 38, because the condition on line 33 was never false

34 if self.tzinfo is None: 34 ↛ 37line 34 didn't jump to line 37, because the condition on line 34 was never false

35 tzname = timezone.get_current_timezone_name() 

36 else: 

37 tzname = timezone._get_timezone_name(self.tzinfo) 

38 return tzname 

39 

40 

41class Extract(TimezoneMixin, Transform): 

42 lookup_name = None 

43 output_field = IntegerField() 

44 

45 def __init__(self, expression, lookup_name=None, tzinfo=None, **extra): 

46 if self.lookup_name is None: 

47 self.lookup_name = lookup_name 

48 if self.lookup_name is None: 

49 raise ValueError("lookup_name must be provided") 

50 self.tzinfo = tzinfo 

51 super().__init__(expression, **extra) 

52 

53 def as_sql(self, compiler, connection): 

54 if not connection.ops.extract_trunc_lookup_pattern.fullmatch(self.lookup_name): 

55 raise ValueError("Invalid lookup_name: %s" % self.lookup_name) 

56 sql, params = compiler.compile(self.lhs) 

57 lhs_output_field = self.lhs.output_field 

58 if isinstance(lhs_output_field, DateTimeField): 

59 tzname = self.get_tzname() 

60 sql = connection.ops.datetime_extract_sql(self.lookup_name, sql, tzname) 

61 elif self.tzinfo is not None: 

62 raise ValueError("tzinfo can only be used with DateTimeField.") 

63 elif isinstance(lhs_output_field, DateField): 

64 sql = connection.ops.date_extract_sql(self.lookup_name, sql) 

65 elif isinstance(lhs_output_field, TimeField): 

66 sql = connection.ops.time_extract_sql(self.lookup_name, sql) 

67 elif isinstance(lhs_output_field, DurationField): 

68 if not connection.features.has_native_duration_field: 

69 raise ValueError( 

70 "Extract requires native DurationField database support." 

71 ) 

72 sql = connection.ops.time_extract_sql(self.lookup_name, sql) 

73 else: 

74 # resolve_expression has already validated the output_field so this 

75 # assert should never be hit. 

76 assert False, "Tried to Extract from an invalid type." 

77 return sql, params 

78 

79 def resolve_expression( 

80 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

81 ): 

82 copy = super().resolve_expression( 

83 query, allow_joins, reuse, summarize, for_save 

84 ) 

85 field = getattr(copy.lhs, "output_field", None) 

86 if field is None: 

87 return copy 

88 if not isinstance(field, (DateField, DateTimeField, TimeField, DurationField)): 

89 raise ValueError( 

90 "Extract input expression must be DateField, DateTimeField, " 

91 "TimeField, or DurationField." 

92 ) 

93 # Passing dates to functions expecting datetimes is most likely a mistake. 

94 if type(field) == DateField and copy.lookup_name in ( 

95 "hour", 

96 "minute", 

97 "second", 

98 ): 

99 raise ValueError( 

100 "Cannot extract time component '%s' from DateField '%s'." 

101 % (copy.lookup_name, field.name) 

102 ) 

103 if isinstance(field, DurationField) and copy.lookup_name in ( 

104 "year", 

105 "iso_year", 

106 "month", 

107 "week", 

108 "week_day", 

109 "iso_week_day", 

110 "quarter", 

111 ): 

112 raise ValueError( 

113 "Cannot extract component '%s' from DurationField '%s'." 

114 % (copy.lookup_name, field.name) 

115 ) 

116 return copy 

117 

118 

119class ExtractYear(Extract): 

120 lookup_name = "year" 

121 

122 

123class ExtractIsoYear(Extract): 

124 """Return the ISO-8601 week-numbering year.""" 

125 

126 lookup_name = "iso_year" 

127 

128 

129class ExtractMonth(Extract): 

130 lookup_name = "month" 

131 

132 

133class ExtractDay(Extract): 

134 lookup_name = "day" 

135 

136 

137class ExtractWeek(Extract): 

138 """ 

139 Return 1-52 or 53, based on ISO-8601, i.e., Monday is the first of the 

140 week. 

141 """ 

142 

143 lookup_name = "week" 

144 

145 

146class ExtractWeekDay(Extract): 

147 """ 

148 Return Sunday=1 through Saturday=7. 

149 

150 To replicate this in Python: (mydatetime.isoweekday() % 7) + 1 

151 """ 

152 

153 lookup_name = "week_day" 

154 

155 

156class ExtractIsoWeekDay(Extract): 

157 """Return Monday=1 through Sunday=7, based on ISO-8601.""" 

158 

159 lookup_name = "iso_week_day" 

160 

161 

162class ExtractQuarter(Extract): 

163 lookup_name = "quarter" 

164 

165 

166class ExtractHour(Extract): 

167 lookup_name = "hour" 

168 

169 

170class ExtractMinute(Extract): 

171 lookup_name = "minute" 

172 

173 

174class ExtractSecond(Extract): 

175 lookup_name = "second" 

176 

177 

178DateField.register_lookup(ExtractYear) 

179DateField.register_lookup(ExtractMonth) 

180DateField.register_lookup(ExtractDay) 

181DateField.register_lookup(ExtractWeekDay) 

182DateField.register_lookup(ExtractIsoWeekDay) 

183DateField.register_lookup(ExtractWeek) 

184DateField.register_lookup(ExtractIsoYear) 

185DateField.register_lookup(ExtractQuarter) 

186 

187TimeField.register_lookup(ExtractHour) 

188TimeField.register_lookup(ExtractMinute) 

189TimeField.register_lookup(ExtractSecond) 

190 

191DateTimeField.register_lookup(ExtractHour) 

192DateTimeField.register_lookup(ExtractMinute) 

193DateTimeField.register_lookup(ExtractSecond) 

194 

195ExtractYear.register_lookup(YearExact) 

196ExtractYear.register_lookup(YearGt) 

197ExtractYear.register_lookup(YearGte) 

198ExtractYear.register_lookup(YearLt) 

199ExtractYear.register_lookup(YearLte) 

200 

201ExtractIsoYear.register_lookup(YearExact) 

202ExtractIsoYear.register_lookup(YearGt) 

203ExtractIsoYear.register_lookup(YearGte) 

204ExtractIsoYear.register_lookup(YearLt) 

205ExtractIsoYear.register_lookup(YearLte) 

206 

207 

208class Now(Func): 

209 template = "CURRENT_TIMESTAMP" 

210 output_field = DateTimeField() 

211 

212 def as_postgresql(self, compiler, connection, **extra_context): 

213 # PostgreSQL's CURRENT_TIMESTAMP means "the time at the start of the 

214 # transaction". Use STATEMENT_TIMESTAMP to be cross-compatible with 

215 # other databases. 

216 return self.as_sql( 

217 compiler, connection, template="STATEMENT_TIMESTAMP()", **extra_context 

218 ) 

219 

220 

221class TruncBase(TimezoneMixin, Transform): 

222 kind = None 

223 tzinfo = None 

224 

225 # RemovedInDjango50Warning: when the deprecation ends, remove is_dst 

226 # argument. 

227 def __init__( 

228 self, 

229 expression, 

230 output_field=None, 

231 tzinfo=None, 

232 is_dst=timezone.NOT_PASSED, 

233 **extra, 

234 ): 

235 self.tzinfo = tzinfo 

236 self.is_dst = is_dst 

237 super().__init__(expression, output_field=output_field, **extra) 

238 

239 def as_sql(self, compiler, connection): 

240 if not connection.ops.extract_trunc_lookup_pattern.fullmatch(self.kind): 

241 raise ValueError("Invalid kind: %s" % self.kind) 

242 inner_sql, inner_params = compiler.compile(self.lhs) 

243 tzname = None 

244 if isinstance(self.lhs.output_field, DateTimeField): 

245 tzname = self.get_tzname() 

246 elif self.tzinfo is not None: 

247 raise ValueError("tzinfo can only be used with DateTimeField.") 

248 if isinstance(self.output_field, DateTimeField): 

249 sql = connection.ops.datetime_trunc_sql(self.kind, inner_sql, tzname) 

250 elif isinstance(self.output_field, DateField): 

251 sql = connection.ops.date_trunc_sql(self.kind, inner_sql, tzname) 

252 elif isinstance(self.output_field, TimeField): 

253 sql = connection.ops.time_trunc_sql(self.kind, inner_sql, tzname) 

254 else: 

255 raise ValueError( 

256 "Trunc only valid on DateField, TimeField, or DateTimeField." 

257 ) 

258 return sql, inner_params 

259 

260 def resolve_expression( 

261 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

262 ): 

263 copy = super().resolve_expression( 

264 query, allow_joins, reuse, summarize, for_save 

265 ) 

266 field = copy.lhs.output_field 

267 # DateTimeField is a subclass of DateField so this works for both. 

268 if not isinstance(field, (DateField, TimeField)): 268 ↛ 269line 268 didn't jump to line 269, because the condition on line 268 was never true

269 raise TypeError( 

270 "%r isn't a DateField, TimeField, or DateTimeField." % field.name 

271 ) 

272 # If self.output_field was None, then accessing the field will trigger 

273 # the resolver to assign it to self.lhs.output_field. 

274 if not isinstance(copy.output_field, (DateField, DateTimeField, TimeField)): 274 ↛ 275line 274 didn't jump to line 275, because the condition on line 274 was never true

275 raise ValueError( 

276 "output_field must be either DateField, TimeField, or DateTimeField" 

277 ) 

278 # Passing dates or times to functions expecting datetimes is most 

279 # likely a mistake. 

280 class_output_field = ( 

281 self.__class__.output_field 

282 if isinstance(self.__class__.output_field, Field) 

283 else None 

284 ) 

285 output_field = class_output_field or copy.output_field 

286 has_explicit_output_field = ( 

287 class_output_field or field.__class__ is not copy.output_field.__class__ 

288 ) 

289 if type(field) == DateField and ( 289 ↛ 293line 289 didn't jump to line 293, because the condition on line 289 was never true

290 isinstance(output_field, DateTimeField) 

291 or copy.kind in ("hour", "minute", "second", "time") 

292 ): 

293 raise ValueError( 

294 "Cannot truncate DateField '%s' to %s." 

295 % ( 

296 field.name, 

297 output_field.__class__.__name__ 

298 if has_explicit_output_field 

299 else "DateTimeField", 

300 ) 

301 ) 

302 elif isinstance(field, TimeField) and ( 302 ↛ 306line 302 didn't jump to line 306, because the condition on line 302 was never true

303 isinstance(output_field, DateTimeField) 

304 or copy.kind in ("year", "quarter", "month", "week", "day", "date") 

305 ): 

306 raise ValueError( 

307 "Cannot truncate TimeField '%s' to %s." 

308 % ( 

309 field.name, 

310 output_field.__class__.__name__ 

311 if has_explicit_output_field 

312 else "DateTimeField", 

313 ) 

314 ) 

315 return copy 

316 

317 def convert_value(self, value, expression, connection): 

318 if isinstance(self.output_field, DateTimeField): 

319 if not settings.USE_TZ: 

320 pass 

321 elif value is not None: 

322 value = value.replace(tzinfo=None) 

323 value = timezone.make_aware(value, self.tzinfo, is_dst=self.is_dst) 

324 elif not connection.features.has_zoneinfo_database: 

325 raise ValueError( 

326 "Database returned an invalid datetime value. Are time " 

327 "zone definitions for your database installed?" 

328 ) 

329 elif isinstance(value, datetime): 

330 if value is None: 

331 pass 

332 elif isinstance(self.output_field, DateField): 

333 value = value.date() 

334 elif isinstance(self.output_field, TimeField): 

335 value = value.time() 

336 return value 

337 

338 

339class Trunc(TruncBase): 

340 

341 # RemovedInDjango50Warning: when the deprecation ends, remove is_dst 

342 # argument. 

343 def __init__( 

344 self, 

345 expression, 

346 kind, 

347 output_field=None, 

348 tzinfo=None, 

349 is_dst=timezone.NOT_PASSED, 

350 **extra, 

351 ): 

352 self.kind = kind 

353 super().__init__( 

354 expression, output_field=output_field, tzinfo=tzinfo, is_dst=is_dst, **extra 

355 ) 

356 

357 

358class TruncYear(TruncBase): 

359 kind = "year" 

360 

361 

362class TruncQuarter(TruncBase): 

363 kind = "quarter" 

364 

365 

366class TruncMonth(TruncBase): 

367 kind = "month" 

368 

369 

370class TruncWeek(TruncBase): 

371 """Truncate to midnight on the Monday of the week.""" 

372 

373 kind = "week" 

374 

375 

376class TruncDay(TruncBase): 

377 kind = "day" 

378 

379 

380class TruncDate(TruncBase): 

381 kind = "date" 

382 lookup_name = "date" 

383 output_field = DateField() 

384 

385 def as_sql(self, compiler, connection): 

386 # Cast to date rather than truncate to date. 

387 lhs, lhs_params = compiler.compile(self.lhs) 

388 tzname = self.get_tzname() 

389 sql = connection.ops.datetime_cast_date_sql(lhs, tzname) 

390 return sql, lhs_params 

391 

392 

393class TruncTime(TruncBase): 

394 kind = "time" 

395 lookup_name = "time" 

396 output_field = TimeField() 

397 

398 def as_sql(self, compiler, connection): 

399 # Cast to time rather than truncate to time. 

400 lhs, lhs_params = compiler.compile(self.lhs) 

401 tzname = self.get_tzname() 

402 sql = connection.ops.datetime_cast_time_sql(lhs, tzname) 

403 return sql, lhs_params 

404 

405 

406class TruncHour(TruncBase): 

407 kind = "hour" 

408 

409 

410class TruncMinute(TruncBase): 

411 kind = "minute" 

412 

413 

414class TruncSecond(TruncBase): 

415 kind = "second" 

416 

417 

418DateTimeField.register_lookup(TruncDate) 

419DateTimeField.register_lookup(TruncTime)