Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/lookups.py: 65%

443 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import itertools 

2import math 

3 

4from django.core.exceptions import EmptyResultSet 

5from django.db.models.expressions import Case, Expression, Func, Value, When 

6from django.db.models.fields import ( 

7 BooleanField, 

8 CharField, 

9 DateTimeField, 

10 Field, 

11 IntegerField, 

12 UUIDField, 

13) 

14from django.db.models.query_utils import RegisterLookupMixin 

15from django.utils.datastructures import OrderedSet 

16from django.utils.functional import cached_property 

17from django.utils.hashable import make_hashable 

18 

19 

20class Lookup(Expression): 

21 lookup_name = None 

22 prepare_rhs = True 

23 can_use_none_as_rhs = False 

24 

25 def __init__(self, lhs, rhs): 

26 self.lhs, self.rhs = lhs, rhs 

27 self.rhs = self.get_prep_lookup() 

28 self.lhs = self.get_prep_lhs() 

29 if hasattr(self.lhs, "get_bilateral_transforms"): 

30 bilateral_transforms = self.lhs.get_bilateral_transforms() 

31 else: 

32 bilateral_transforms = [] 

33 if bilateral_transforms: 33 ↛ 36line 33 didn't jump to line 36, because the condition on line 33 was never true

34 # Warn the user as soon as possible if they are trying to apply 

35 # a bilateral transformation on a nested QuerySet: that won't work. 

36 from django.db.models.sql.query import Query # avoid circular import 

37 

38 if isinstance(rhs, Query): 

39 raise NotImplementedError( 

40 "Bilateral transformations on nested querysets are not implemented." 

41 ) 

42 self.bilateral_transforms = bilateral_transforms 

43 

44 def apply_bilateral_transforms(self, value): 

45 for transform in self.bilateral_transforms: 

46 value = transform(value) 

47 return value 

48 

49 def __repr__(self): 

50 return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})" 

51 

52 def batch_process_rhs(self, compiler, connection, rhs=None): 

53 if rhs is None: 53 ↛ 54line 53 didn't jump to line 54, because the condition on line 53 was never true

54 rhs = self.rhs 

55 if self.bilateral_transforms: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true

56 sqls, sqls_params = [], [] 

57 for p in rhs: 

58 value = Value(p, output_field=self.lhs.output_field) 

59 value = self.apply_bilateral_transforms(value) 

60 value = value.resolve_expression(compiler.query) 

61 sql, sql_params = compiler.compile(value) 

62 sqls.append(sql) 

63 sqls_params.extend(sql_params) 

64 else: 

65 _, params = self.get_db_prep_lookup(rhs, connection) 

66 sqls, sqls_params = ["%s"] * len(params), params 

67 return sqls, sqls_params 

68 

69 def get_source_expressions(self): 

70 if self.rhs_is_direct_value(): 

71 return [self.lhs] 

72 return [self.lhs, self.rhs] 

73 

74 def set_source_expressions(self, new_exprs): 

75 if len(new_exprs) == 1: 75 ↛ 78line 75 didn't jump to line 78, because the condition on line 75 was never false

76 self.lhs = new_exprs[0] 

77 else: 

78 self.lhs, self.rhs = new_exprs 

79 

80 def get_prep_lookup(self): 

81 if not self.prepare_rhs or hasattr(self.rhs, "resolve_expression"): 

82 return self.rhs 

83 if hasattr(self.lhs, "output_field"): 83 ↛ 86line 83 didn't jump to line 86, because the condition on line 83 was never false

84 if hasattr(self.lhs.output_field, "get_prep_value"): 84 ↛ 88line 84 didn't jump to line 88, because the condition on line 84 was never false

85 return self.lhs.output_field.get_prep_value(self.rhs) 

86 elif self.rhs_is_direct_value(): 

87 return Value(self.rhs) 

88 return self.rhs 

89 

90 def get_prep_lhs(self): 

91 if hasattr(self.lhs, "resolve_expression"): 91 ↛ 93line 91 didn't jump to line 93, because the condition on line 91 was never false

92 return self.lhs 

93 return Value(self.lhs) 

94 

95 def get_db_prep_lookup(self, value, connection): 

96 return ("%s", [value]) 

97 

98 def process_lhs(self, compiler, connection, lhs=None): 

99 lhs = lhs or self.lhs 

100 if hasattr(lhs, "resolve_expression"): 100 ↛ 102line 100 didn't jump to line 102, because the condition on line 100 was never false

101 lhs = lhs.resolve_expression(compiler.query) 

102 sql, params = compiler.compile(lhs) 

103 if isinstance(lhs, Lookup): 103 ↛ 105line 103 didn't jump to line 105, because the condition on line 103 was never true

104 # Wrapped in parentheses to respect operator precedence. 

105 sql = f"({sql})" 

106 return sql, params 

107 

108 def process_rhs(self, compiler, connection): 

109 value = self.rhs 

110 if self.bilateral_transforms: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true

111 if self.rhs_is_direct_value(): 

112 # Do not call get_db_prep_lookup here as the value will be 

113 # transformed before being used for lookup 

114 value = Value(value, output_field=self.lhs.output_field) 

115 value = self.apply_bilateral_transforms(value) 

116 value = value.resolve_expression(compiler.query) 

117 if hasattr(value, "as_sql"): 

118 sql, params = compiler.compile(value) 

119 # Ensure expression is wrapped in parentheses to respect operator 

120 # precedence but avoid double wrapping as it can be misinterpreted 

121 # on some backends (e.g. subqueries on SQLite). 

122 if sql and sql[0] != "(": 122 ↛ 123line 122 didn't jump to line 123, because the condition on line 122 was never true

123 sql = "(%s)" % sql 

124 return sql, params 

125 else: 

126 return self.get_db_prep_lookup(value, connection) 

127 

128 def rhs_is_direct_value(self): 

129 return not hasattr(self.rhs, "as_sql") 

130 

131 def get_group_by_cols(self, alias=None): 

132 cols = [] 

133 for source in self.get_source_expressions(): 

134 cols.extend(source.get_group_by_cols()) 

135 return cols 

136 

137 def as_oracle(self, compiler, connection): 

138 # Oracle doesn't allow EXISTS() and filters to be compared to another 

139 # expression unless they're wrapped in a CASE WHEN. 

140 wrapped = False 

141 exprs = [] 

142 for expr in (self.lhs, self.rhs): 

143 if connection.ops.conditional_expression_supported_in_where_clause(expr): 

144 expr = Case(When(expr, then=True), default=False) 

145 wrapped = True 

146 exprs.append(expr) 

147 lookup = type(self)(*exprs) if wrapped else self 

148 return lookup.as_sql(compiler, connection) 

149 

150 @cached_property 

151 def output_field(self): 

152 return BooleanField() 

153 

154 @property 

155 def identity(self): 

156 return self.__class__, self.lhs, self.rhs 

157 

158 def __eq__(self, other): 

159 if not isinstance(other, Lookup): 

160 return NotImplemented 

161 return self.identity == other.identity 

162 

163 def __hash__(self): 

164 return hash(make_hashable(self.identity)) 

165 

166 def resolve_expression( 

167 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

168 ): 

169 c = self.copy() 

170 c.is_summary = summarize 

171 c.lhs = self.lhs.resolve_expression( 

172 query, allow_joins, reuse, summarize, for_save 

173 ) 

174 if hasattr(self.rhs, "resolve_expression"): 

175 c.rhs = self.rhs.resolve_expression( 

176 query, allow_joins, reuse, summarize, for_save 

177 ) 

178 return c 

179 

180 def select_format(self, compiler, sql, params): 

181 # Wrap filters with a CASE WHEN expression if a database backend 

182 # (e.g. Oracle) doesn't support boolean expression in SELECT or GROUP 

183 # BY list. 

184 if not compiler.connection.features.supports_boolean_expr_in_select_clause: 

185 sql = f"CASE WHEN {sql} THEN 1 ELSE 0 END" 

186 return sql, params 

187 

188 

189class Transform(RegisterLookupMixin, Func): 

190 """ 

191 RegisterLookupMixin() is first so that get_lookup() and get_transform() 

192 first examine self and then check output_field. 

193 """ 

194 

195 bilateral = False 

196 arity = 1 

197 

198 @property 

199 def lhs(self): 

200 return self.get_source_expressions()[0] 

201 

202 def get_bilateral_transforms(self): 

203 if hasattr(self.lhs, "get_bilateral_transforms"): 203 ↛ 204line 203 didn't jump to line 204, because the condition on line 203 was never true

204 bilateral_transforms = self.lhs.get_bilateral_transforms() 

205 else: 

206 bilateral_transforms = [] 

207 if self.bilateral: 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true

208 bilateral_transforms.append(self.__class__) 

209 return bilateral_transforms 

210 

211 

212class BuiltinLookup(Lookup): 

213 def process_lhs(self, compiler, connection, lhs=None): 

214 lhs_sql, params = super().process_lhs(compiler, connection, lhs) 

215 field_internal_type = self.lhs.output_field.get_internal_type() 

216 db_type = self.lhs.output_field.db_type(connection=connection) 

217 lhs_sql = connection.ops.field_cast_sql(db_type, field_internal_type) % lhs_sql 

218 lhs_sql = ( 

219 connection.ops.lookup_cast(self.lookup_name, field_internal_type) % lhs_sql 

220 ) 

221 return lhs_sql, list(params) 

222 

223 def as_sql(self, compiler, connection): 

224 lhs_sql, params = self.process_lhs(compiler, connection) 

225 rhs_sql, rhs_params = self.process_rhs(compiler, connection) 

226 params.extend(rhs_params) 

227 rhs_sql = self.get_rhs_op(connection, rhs_sql) 

228 return "%s %s" % (lhs_sql, rhs_sql), params 

229 

230 def get_rhs_op(self, connection, rhs): 

231 return connection.operators[self.lookup_name] % rhs 

232 

233 

234class FieldGetDbPrepValueMixin: 

235 """ 

236 Some lookups require Field.get_db_prep_value() to be called on their 

237 inputs. 

238 """ 

239 

240 get_db_prep_lookup_value_is_iterable = False 

241 

242 def get_db_prep_lookup(self, value, connection): 

243 # For relational fields, use the 'target_field' attribute of the 

244 # output_field. 

245 field = getattr(self.lhs.output_field, "target_field", None) 

246 get_db_prep_value = ( 

247 getattr(field, "get_db_prep_value", None) 

248 or self.lhs.output_field.get_db_prep_value 

249 ) 

250 return ( 

251 "%s", 

252 [get_db_prep_value(v, connection, prepared=True) for v in value] 

253 if self.get_db_prep_lookup_value_is_iterable 

254 else [get_db_prep_value(value, connection, prepared=True)], 

255 ) 

256 

257 

258class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin): 

259 """ 

260 Some lookups require Field.get_db_prep_value() to be called on each value 

261 in an iterable. 

262 """ 

263 

264 get_db_prep_lookup_value_is_iterable = True 

265 

266 def get_prep_lookup(self): 

267 if hasattr(self.rhs, "resolve_expression"): 

268 return self.rhs 

269 prepared_values = [] 

270 for rhs_value in self.rhs: 

271 if hasattr(rhs_value, "resolve_expression"): 271 ↛ 274line 271 didn't jump to line 274, because the condition on line 271 was never true

272 # An expression will be handled by the database but can coexist 

273 # alongside real values. 

274 pass 

275 elif self.prepare_rhs and hasattr(self.lhs.output_field, "get_prep_value"): 275 ↛ 277line 275 didn't jump to line 277, because the condition on line 275 was never false

276 rhs_value = self.lhs.output_field.get_prep_value(rhs_value) 

277 prepared_values.append(rhs_value) 

278 return prepared_values 

279 

280 def process_rhs(self, compiler, connection): 

281 if self.rhs_is_direct_value(): 281 ↛ 284line 281 didn't jump to line 284, because the condition on line 281 was never true

282 # rhs should be an iterable of values. Use batch_process_rhs() 

283 # to prepare/transform those values. 

284 return self.batch_process_rhs(compiler, connection) 

285 else: 

286 return super().process_rhs(compiler, connection) 

287 

288 def resolve_expression_parameter(self, compiler, connection, sql, param): 

289 params = [param] 

290 if hasattr(param, "resolve_expression"): 290 ↛ 291line 290 didn't jump to line 291, because the condition on line 290 was never true

291 param = param.resolve_expression(compiler.query) 

292 if hasattr(param, "as_sql"): 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true

293 sql, params = compiler.compile(param) 

294 return sql, params 

295 

296 def batch_process_rhs(self, compiler, connection, rhs=None): 

297 pre_processed = super().batch_process_rhs(compiler, connection, rhs) 

298 # The params list may contain expressions which compile to a 

299 # sql/param pair. Zip them to get sql and param pairs that refer to the 

300 # same argument and attempt to replace them with the result of 

301 # compiling the param step. 

302 sql, params = zip( 

303 *( 

304 self.resolve_expression_parameter(compiler, connection, sql, param) 

305 for sql, param in zip(*pre_processed) 

306 ) 

307 ) 

308 params = itertools.chain.from_iterable(params) 

309 return sql, tuple(params) 

310 

311 

312class PostgresOperatorLookup(FieldGetDbPrepValueMixin, Lookup): 

313 """Lookup defined by operators on PostgreSQL.""" 

314 

315 postgres_operator = None 

316 

317 def as_postgresql(self, compiler, connection): 

318 lhs, lhs_params = self.process_lhs(compiler, connection) 

319 rhs, rhs_params = self.process_rhs(compiler, connection) 

320 params = tuple(lhs_params) + tuple(rhs_params) 

321 return "%s %s %s" % (lhs, self.postgres_operator, rhs), params 

322 

323 

324@Field.register_lookup 

325class Exact(FieldGetDbPrepValueMixin, BuiltinLookup): 

326 lookup_name = "exact" 

327 

328 def process_rhs(self, compiler, connection): 

329 from django.db.models.sql.query import Query 

330 

331 if isinstance(self.rhs, Query): 331 ↛ 332line 331 didn't jump to line 332, because the condition on line 331 was never true

332 if self.rhs.has_limit_one(): 

333 if not self.rhs.has_select_fields: 

334 self.rhs.clear_select_clause() 

335 self.rhs.add_fields(["pk"]) 

336 else: 

337 raise ValueError( 

338 "The QuerySet value for an exact lookup must be limited to " 

339 "one result using slicing." 

340 ) 

341 return super().process_rhs(compiler, connection) 

342 

343 def as_sql(self, compiler, connection): 

344 # Avoid comparison against direct rhs if lhs is a boolean value. That 

345 # turns "boolfield__exact=True" into "WHERE boolean_field" instead of 

346 # "WHERE boolean_field = True" when allowed. 

347 if ( 

348 isinstance(self.rhs, bool) 

349 and getattr(self.lhs, "conditional", False) 

350 and connection.ops.conditional_expression_supported_in_where_clause( 

351 self.lhs 

352 ) 

353 ): 

354 lhs_sql, params = self.process_lhs(compiler, connection) 

355 template = "%s" if self.rhs else "NOT %s" 

356 return template % lhs_sql, params 

357 return super().as_sql(compiler, connection) 

358 

359 

360@Field.register_lookup 

361class IExact(BuiltinLookup): 

362 lookup_name = "iexact" 

363 prepare_rhs = False 

364 

365 def process_rhs(self, qn, connection): 

366 rhs, params = super().process_rhs(qn, connection) 

367 if params: 

368 params[0] = connection.ops.prep_for_iexact_query(params[0]) 

369 return rhs, params 

370 

371 

372@Field.register_lookup 

373class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup): 

374 lookup_name = "gt" 

375 

376 

377@Field.register_lookup 

378class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup): 

379 lookup_name = "gte" 

380 

381 

382@Field.register_lookup 

383class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup): 

384 lookup_name = "lt" 

385 

386 

387@Field.register_lookup 

388class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup): 

389 lookup_name = "lte" 

390 

391 

392class IntegerFieldFloatRounding: 

393 """ 

394 Allow floats to work as query values for IntegerField. Without this, the 

395 decimal portion of the float would always be discarded. 

396 """ 

397 

398 def get_prep_lookup(self): 

399 if isinstance(self.rhs, float): 399 ↛ 400line 399 didn't jump to line 400, because the condition on line 399 was never true

400 self.rhs = math.ceil(self.rhs) 

401 return super().get_prep_lookup() 

402 

403 

404@IntegerField.register_lookup 

405class IntegerGreaterThanOrEqual(IntegerFieldFloatRounding, GreaterThanOrEqual): 

406 pass 

407 

408 

409@IntegerField.register_lookup 

410class IntegerLessThan(IntegerFieldFloatRounding, LessThan): 

411 pass 

412 

413 

414@Field.register_lookup 

415class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup): 

416 lookup_name = "in" 

417 

418 def process_rhs(self, compiler, connection): 

419 db_rhs = getattr(self.rhs, "_db", None) 

420 if db_rhs is not None and db_rhs != connection.alias: 420 ↛ 421line 420 didn't jump to line 421, because the condition on line 420 was never true

421 raise ValueError( 

422 "Subqueries aren't allowed across different databases. Force " 

423 "the inner query to be evaluated using `list(inner_query)`." 

424 ) 

425 

426 if self.rhs_is_direct_value(): 

427 # Remove None from the list as NULL is never equal to anything. 

428 try: 

429 rhs = OrderedSet(self.rhs) 

430 rhs.discard(None) 

431 except TypeError: # Unhashable items in self.rhs 

432 rhs = [r for r in self.rhs if r is not None] 

433 

434 if not rhs: 

435 raise EmptyResultSet 

436 

437 # rhs should be an iterable; use batch_process_rhs() to 

438 # prepare/transform those values. 

439 sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs) 

440 placeholder = "(" + ", ".join(sqls) + ")" 

441 return (placeholder, sqls_params) 

442 else: 

443 from django.db.models.sql.query import Query # avoid circular import 

444 

445 if isinstance(self.rhs, Query): 445 ↛ 452line 445 didn't jump to line 452, because the condition on line 445 was never false

446 query = self.rhs 

447 query.clear_ordering(clear_default=True) 

448 if not query.has_select_fields: 

449 query.clear_select_clause() 

450 query.add_fields(["pk"]) 

451 

452 return super().process_rhs(compiler, connection) 

453 

454 def get_group_by_cols(self, alias=None): 

455 cols = self.lhs.get_group_by_cols() 

456 if hasattr(self.rhs, "get_group_by_cols"): 

457 if not getattr(self.rhs, "has_select_fields", True): 

458 self.rhs.clear_select_clause() 

459 self.rhs.add_fields(["pk"]) 

460 cols.extend(self.rhs.get_group_by_cols()) 

461 return cols 

462 

463 def get_rhs_op(self, connection, rhs): 

464 return "IN %s" % rhs 

465 

466 def as_sql(self, compiler, connection): 

467 max_in_list_size = connection.ops.max_in_list_size() 

468 if ( 468 ↛ 473line 468 didn't jump to line 473

469 self.rhs_is_direct_value() 

470 and max_in_list_size 

471 and len(self.rhs) > max_in_list_size 

472 ): 

473 return self.split_parameter_list_as_sql(compiler, connection) 

474 return super().as_sql(compiler, connection) 

475 

476 def split_parameter_list_as_sql(self, compiler, connection): 

477 # This is a special case for databases which limit the number of 

478 # elements which can appear in an 'IN' clause. 

479 max_in_list_size = connection.ops.max_in_list_size() 

480 lhs, lhs_params = self.process_lhs(compiler, connection) 

481 rhs, rhs_params = self.batch_process_rhs(compiler, connection) 

482 in_clause_elements = ["("] 

483 params = [] 

484 for offset in range(0, len(rhs_params), max_in_list_size): 

485 if offset > 0: 

486 in_clause_elements.append(" OR ") 

487 in_clause_elements.append("%s IN (" % lhs) 

488 params.extend(lhs_params) 

489 sqls = rhs[offset : offset + max_in_list_size] 

490 sqls_params = rhs_params[offset : offset + max_in_list_size] 

491 param_group = ", ".join(sqls) 

492 in_clause_elements.append(param_group) 

493 in_clause_elements.append(")") 

494 params.extend(sqls_params) 

495 in_clause_elements.append(")") 

496 return "".join(in_clause_elements), params 

497 

498 

499class PatternLookup(BuiltinLookup): 

500 param_pattern = "%%%s%%" 

501 prepare_rhs = False 

502 

503 def get_rhs_op(self, connection, rhs): 

504 # Assume we are in startswith. We need to produce SQL like: 

505 # col LIKE %s, ['thevalue%'] 

506 # For python values we can (and should) do that directly in Python, 

507 # but if the value is for example reference to other column, then 

508 # we need to add the % pattern match to the lookup by something like 

509 # col LIKE othercol || '%%' 

510 # So, for Python values we don't need any special pattern, but for 

511 # SQL reference values or SQL transformations we need the correct 

512 # pattern added. 

513 if hasattr(self.rhs, "as_sql") or self.bilateral_transforms: 

514 pattern = connection.pattern_ops[self.lookup_name].format( 

515 connection.pattern_esc 

516 ) 

517 return pattern.format(rhs) 

518 else: 

519 return super().get_rhs_op(connection, rhs) 

520 

521 def process_rhs(self, qn, connection): 

522 rhs, params = super().process_rhs(qn, connection) 

523 if self.rhs_is_direct_value() and params and not self.bilateral_transforms: 

524 params[0] = self.param_pattern % connection.ops.prep_for_like_query( 

525 params[0] 

526 ) 

527 return rhs, params 

528 

529 

530@Field.register_lookup 

531class Contains(PatternLookup): 

532 lookup_name = "contains" 

533 

534 

535@Field.register_lookup 

536class IContains(Contains): 

537 lookup_name = "icontains" 

538 

539 

540@Field.register_lookup 

541class StartsWith(PatternLookup): 

542 lookup_name = "startswith" 

543 param_pattern = "%s%%" 

544 

545 

546@Field.register_lookup 

547class IStartsWith(StartsWith): 

548 lookup_name = "istartswith" 

549 

550 

551@Field.register_lookup 

552class EndsWith(PatternLookup): 

553 lookup_name = "endswith" 

554 param_pattern = "%%%s" 

555 

556 

557@Field.register_lookup 

558class IEndsWith(EndsWith): 

559 lookup_name = "iendswith" 

560 

561 

562@Field.register_lookup 

563class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup): 

564 lookup_name = "range" 

565 

566 def get_rhs_op(self, connection, rhs): 

567 return "BETWEEN %s AND %s" % (rhs[0], rhs[1]) 

568 

569 

570@Field.register_lookup 

571class IsNull(BuiltinLookup): 

572 lookup_name = "isnull" 

573 prepare_rhs = False 

574 

575 def as_sql(self, compiler, connection): 

576 if not isinstance(self.rhs, bool): 576 ↛ 577line 576 didn't jump to line 577, because the condition on line 576 was never true

577 raise ValueError( 

578 "The QuerySet value for an isnull lookup must be True or False." 

579 ) 

580 sql, params = compiler.compile(self.lhs) 

581 if self.rhs: 

582 return "%s IS NULL" % sql, params 

583 else: 

584 return "%s IS NOT NULL" % sql, params 

585 

586 

587@Field.register_lookup 

588class Regex(BuiltinLookup): 

589 lookup_name = "regex" 

590 prepare_rhs = False 

591 

592 def as_sql(self, compiler, connection): 

593 if self.lookup_name in connection.operators: 

594 return super().as_sql(compiler, connection) 

595 else: 

596 lhs, lhs_params = self.process_lhs(compiler, connection) 

597 rhs, rhs_params = self.process_rhs(compiler, connection) 

598 sql_template = connection.ops.regex_lookup(self.lookup_name) 

599 return sql_template % (lhs, rhs), lhs_params + rhs_params 

600 

601 

602@Field.register_lookup 

603class IRegex(Regex): 

604 lookup_name = "iregex" 

605 

606 

607class YearLookup(Lookup): 

608 def year_lookup_bounds(self, connection, year): 

609 from django.db.models.functions import ExtractIsoYear 

610 

611 iso_year = isinstance(self.lhs, ExtractIsoYear) 

612 output_field = self.lhs.lhs.output_field 

613 if isinstance(output_field, DateTimeField): 

614 bounds = connection.ops.year_lookup_bounds_for_datetime_field( 

615 year, 

616 iso_year=iso_year, 

617 ) 

618 else: 

619 bounds = connection.ops.year_lookup_bounds_for_date_field( 

620 year, 

621 iso_year=iso_year, 

622 ) 

623 return bounds 

624 

625 def as_sql(self, compiler, connection): 

626 # Avoid the extract operation if the rhs is a direct value to allow 

627 # indexes to be used. 

628 if self.rhs_is_direct_value(): 

629 # Skip the extract part by directly using the originating field, 

630 # that is self.lhs.lhs. 

631 lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs) 

632 rhs_sql, _ = self.process_rhs(compiler, connection) 

633 rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql) 

634 start, finish = self.year_lookup_bounds(connection, self.rhs) 

635 params.extend(self.get_bound_params(start, finish)) 

636 return "%s %s" % (lhs_sql, rhs_sql), params 

637 return super().as_sql(compiler, connection) 

638 

639 def get_direct_rhs_sql(self, connection, rhs): 

640 return connection.operators[self.lookup_name] % rhs 

641 

642 def get_bound_params(self, start, finish): 

643 raise NotImplementedError( 

644 "subclasses of YearLookup must provide a get_bound_params() method" 

645 ) 

646 

647 

648class YearExact(YearLookup, Exact): 

649 def get_direct_rhs_sql(self, connection, rhs): 

650 return "BETWEEN %s AND %s" 

651 

652 def get_bound_params(self, start, finish): 

653 return (start, finish) 

654 

655 

656class YearGt(YearLookup, GreaterThan): 

657 def get_bound_params(self, start, finish): 

658 return (finish,) 

659 

660 

661class YearGte(YearLookup, GreaterThanOrEqual): 

662 def get_bound_params(self, start, finish): 

663 return (start,) 

664 

665 

666class YearLt(YearLookup, LessThan): 

667 def get_bound_params(self, start, finish): 

668 return (start,) 

669 

670 

671class YearLte(YearLookup, LessThanOrEqual): 

672 def get_bound_params(self, start, finish): 

673 return (finish,) 

674 

675 

676class UUIDTextMixin: 

677 """ 

678 Strip hyphens from a value when filtering a UUIDField on backends without 

679 a native datatype for UUID. 

680 """ 

681 

682 def process_rhs(self, qn, connection): 

683 if not connection.features.has_native_uuid_field: 

684 from django.db.models.functions import Replace 

685 

686 if self.rhs_is_direct_value(): 

687 self.rhs = Value(self.rhs) 

688 self.rhs = Replace( 

689 self.rhs, Value("-"), Value(""), output_field=CharField() 

690 ) 

691 rhs, params = super().process_rhs(qn, connection) 

692 return rhs, params 

693 

694 

695@UUIDField.register_lookup 

696class UUIDIExact(UUIDTextMixin, IExact): 

697 pass 

698 

699 

700@UUIDField.register_lookup 

701class UUIDContains(UUIDTextMixin, Contains): 

702 pass 

703 

704 

705@UUIDField.register_lookup 

706class UUIDIContains(UUIDTextMixin, IContains): 

707 pass 

708 

709 

710@UUIDField.register_lookup 

711class UUIDStartsWith(UUIDTextMixin, StartsWith): 

712 pass 

713 

714 

715@UUIDField.register_lookup 

716class UUIDIStartsWith(UUIDTextMixin, IStartsWith): 

717 pass 

718 

719 

720@UUIDField.register_lookup 

721class UUIDEndsWith(UUIDTextMixin, EndsWith): 

722 pass 

723 

724 

725@UUIDField.register_lookup 

726class UUIDIEndsWith(UUIDTextMixin, IEndsWith): 

727 pass