Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/polymorphic/query.py: 58%

235 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2QuerySet for PolymorphicModel 

3""" 

4import copy 

5from collections import defaultdict 

6 

7from django import get_version as get_django_version 

8from django.contrib.contenttypes.models import ContentType 

9from django.core.exceptions import FieldDoesNotExist 

10from django.db.models import FilteredRelation 

11from django.db.models.query import ModelIterable, Q, QuerySet 

12 

13from .query_translate import ( 

14 translate_polymorphic_field_path, 

15 translate_polymorphic_filter_definitions_in_args, 

16 translate_polymorphic_filter_definitions_in_kwargs, 

17 translate_polymorphic_Q_object, 

18) 

19 

20# chunk-size: maximum number of objects requested per db-request 

21# by the polymorphic queryset.iterator() implementation 

22Polymorphic_QuerySet_objects_per_request = 100 

23 

24 

25class PolymorphicModelIterable(ModelIterable): 

26 """ 

27 ModelIterable for PolymorphicModel 

28 

29 Yields real instances if qs.polymorphic_disabled is False, 

30 otherwise acts like a regular ModelIterable. 

31 """ 

32 

33 def __iter__(self): 

34 base_iter = super().__iter__() 

35 if self.queryset.polymorphic_disabled: 35 ↛ 36line 35 didn't jump to line 36, because the condition on line 35 was never true

36 return base_iter 

37 return self._polymorphic_iterator(base_iter) 

38 

39 def _polymorphic_iterator(self, base_iter): 

40 """ 

41 Here we do the same as:: 

42 

43 real_results = queryset._get_real_instances(list(base_iter)) 

44 for o in real_results: yield o 

45 

46 but it requests the objects in chunks from the database, 

47 with Polymorphic_QuerySet_objects_per_request per chunk 

48 """ 

49 while True: 

50 base_result_objects = [] 

51 reached_end = False 

52 

53 # Make sure the base iterator is read in chunks instead of 

54 # reading it completely, in case our caller read only a few objects. 

55 for i in range(Polymorphic_QuerySet_objects_per_request): 55 ↛ 64line 55 didn't jump to line 64, because the loop on line 55 didn't complete

56 

57 try: 

58 o = next(base_iter) 

59 base_result_objects.append(o) 

60 except StopIteration: 

61 reached_end = True 

62 break 

63 

64 real_results = self.queryset._get_real_instances(base_result_objects) 

65 

66 for o in real_results: 

67 yield o 

68 

69 if reached_end: 69 ↛ 50line 69 didn't jump to line 50, because the condition on line 69 was never false

70 return 

71 

72 

73def transmogrify(cls, obj): 

74 """ 

75 Upcast a class to a different type without asking questions. 

76 """ 

77 if "__init__" not in obj.__dict__: 77 ↛ 83line 77 didn't jump to line 83, because the condition on line 77 was never false

78 # Just assign __class__ to a different value. 

79 new = obj 

80 new.__class__ = cls 

81 else: 

82 # Run constructor, reassign values 

83 new = cls() 

84 for k, v in obj.__dict__.items(): 

85 new.__dict__[k] = v 

86 return new 

87 

88 

89################################################################################### 

90# PolymorphicQuerySet 

91 

92 

93class PolymorphicQuerySet(QuerySet): 

94 """ 

95 QuerySet for PolymorphicModel 

96 

97 Contains the core functionality for PolymorphicModel 

98 

99 Usually not explicitly needed, except if a custom queryset class 

100 is to be used. 

101 """ 

102 

103 def __init__(self, *args, **kwargs): 

104 super().__init__(*args, **kwargs) 

105 self._iterable_class = PolymorphicModelIterable 

106 

107 self.polymorphic_disabled = False 

108 # A parallel structure to django.db.models.query.Query.deferred_loading, 

109 # which we maintain with the untranslated field names passed to 

110 # .defer() and .only() in order to be able to retranslate them when 

111 # retrieving the real instance (so that the deferred fields apply 

112 # to that queryset as well). 

113 self.polymorphic_deferred_loading = (set(), True) 

114 

115 def _clone(self, *args, **kwargs): 

116 # Django's _clone only copies its own variables, so we need to copy ours here 

117 new = super()._clone(*args, **kwargs) 

118 new.polymorphic_disabled = self.polymorphic_disabled 

119 new.polymorphic_deferred_loading = ( 

120 copy.copy(self.polymorphic_deferred_loading[0]), 

121 self.polymorphic_deferred_loading[1], 

122 ) 

123 return new 

124 

125 def as_manager(cls): 

126 from .managers import PolymorphicManager 

127 

128 manager = PolymorphicManager.from_queryset(cls)() 

129 manager._built_with_as_manager = True 

130 return manager 

131 

132 as_manager.queryset_only = True 

133 as_manager = classmethod(as_manager) 

134 

135 def bulk_create(self, objs, batch_size=None, ignore_conflicts=False): 

136 objs = list(objs) 

137 for obj in objs: 

138 obj.pre_save_polymorphic() 

139 return super().bulk_create(objs, batch_size, ignore_conflicts=ignore_conflicts) 

140 

141 def non_polymorphic(self): 

142 """switch off polymorphic behaviour for this query. 

143 When the queryset is evaluated, only objects of the type of the 

144 base class used for this query are returned.""" 

145 qs = self._clone() 

146 qs.polymorphic_disabled = True 

147 if issubclass(qs._iterable_class, PolymorphicModelIterable): 147 ↛ 149line 147 didn't jump to line 149, because the condition on line 147 was never false

148 qs._iterable_class = ModelIterable 

149 return qs 

150 

151 def instance_of(self, *args): 

152 """Filter the queryset to only include the classes in args (and their subclasses).""" 

153 # Implementation in _translate_polymorphic_filter_defnition. 

154 return self.filter(instance_of=args) 

155 

156 def not_instance_of(self, *args): 

157 """Filter the queryset to exclude the classes in args (and their subclasses).""" 

158 # Implementation in _translate_polymorphic_filter_defnition.""" 

159 return self.filter(not_instance_of=args) 

160 

161 # Makes _filter_or_exclude compatible with the change in signature introduced in django at 9c9a3fe 

162 if get_django_version() >= "3.2": 162 ↛ 178line 162 didn't jump to line 178, because the condition on line 162 was never false

163 

164 def _filter_or_exclude(self, negate, args, kwargs): 

165 # We override this internal Django function as it is used for all filter member functions. 

166 q_objects = translate_polymorphic_filter_definitions_in_args( 

167 queryset_model=self.model, args=args, using=self.db 

168 ) 

169 # filter_field='data' 

170 additional_args = translate_polymorphic_filter_definitions_in_kwargs( 

171 queryset_model=self.model, kwargs=kwargs, using=self.db 

172 ) 

173 args = list(q_objects) + additional_args 

174 return super()._filter_or_exclude(negate=negate, args=args, kwargs=kwargs) 

175 

176 else: 

177 

178 def _filter_or_exclude(self, negate, *args, **kwargs): 

179 # We override this internal Django function as it is used for all filter member functions. 

180 q_objects = translate_polymorphic_filter_definitions_in_args( 

181 self.model, args, using=self.db 

182 ) 

183 # filter_field='data' 

184 additional_args = translate_polymorphic_filter_definitions_in_kwargs( 

185 self.model, kwargs, using=self.db 

186 ) 

187 return super()._filter_or_exclude( 

188 negate, *(list(q_objects) + additional_args), **kwargs 

189 ) 

190 

191 def order_by(self, *field_names): 

192 """translate the field paths in the args, then call vanilla order_by.""" 

193 field_names = [ 

194 translate_polymorphic_field_path(self.model, a) 

195 if isinstance(a, str) 

196 else a # allow expressions to pass unchanged 

197 for a in field_names 

198 ] 

199 return super().order_by(*field_names) 

200 

201 def defer(self, *fields): 

202 """ 

203 Translate the field paths in the args, then call vanilla defer. 

204 

205 Also retain a copy of the original fields passed, which we'll need 

206 when we're retrieving the real instance (since we'll need to translate 

207 them again, as the model will have changed). 

208 """ 

209 new_fields = [translate_polymorphic_field_path(self.model, a) for a in fields] 

210 clone = super().defer(*new_fields) 

211 clone._polymorphic_add_deferred_loading(fields) 

212 return clone 

213 

214 def only(self, *fields): 

215 """ 

216 Translate the field paths in the args, then call vanilla only. 

217 

218 Also retain a copy of the original fields passed, which we'll need 

219 when we're retrieving the real instance (since we'll need to translate 

220 them again, as the model will have changed). 

221 """ 

222 new_fields = [translate_polymorphic_field_path(self.model, a) for a in fields] 

223 clone = super().only(*new_fields) 

224 clone._polymorphic_add_immediate_loading(fields) 

225 return clone 

226 

227 def _polymorphic_add_deferred_loading(self, field_names): 

228 """ 

229 Follows the logic of django.db.models.query.Query.add_deferred_loading(), 

230 but for the non-translated field names that were passed to self.defer(). 

231 """ 

232 existing, defer = self.polymorphic_deferred_loading 

233 if defer: 

234 # Add to existing deferred names. 

235 self.polymorphic_deferred_loading = existing.union(field_names), True 

236 else: 

237 # Remove names from the set of any existing "immediate load" names. 

238 self.polymorphic_deferred_loading = existing.difference(field_names), False 

239 

240 def _polymorphic_add_immediate_loading(self, field_names): 

241 """ 

242 Follows the logic of django.db.models.query.Query.add_immediate_loading(), 

243 but for the non-translated field names that were passed to self.only() 

244 """ 

245 existing, defer = self.polymorphic_deferred_loading 

246 field_names = set(field_names) 

247 if "pk" in field_names: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true

248 field_names.remove("pk") 

249 field_names.add(self.model._meta.pk.name) 

250 

251 if defer: 251 ↛ 257line 251 didn't jump to line 257, because the condition on line 251 was never false

252 # Remove any existing deferred names from the current set before 

253 # setting the new names. 

254 self.polymorphic_deferred_loading = field_names.difference(existing), False 

255 else: 

256 # Replace any existing "immediate load" field names. 

257 self.polymorphic_deferred_loading = field_names, False 

258 

259 def _process_aggregate_args(self, args, kwargs): 

260 """for aggregate and annotate kwargs: allow ModelX___field syntax for kwargs, forbid it for args. 

261 Modifies kwargs if needed (these are Aggregate objects, we translate the lookup member variable)""" 

262 ___lookup_assert_msg = "PolymorphicModel: annotate()/aggregate(): ___ model lookup supported for keyword arguments only" 

263 

264 def patch_lookup(a): 

265 # The field on which the aggregate operates is 

266 # stored inside a complex query expression. 

267 if isinstance(a, Q): 267 ↛ 268line 267 didn't jump to line 268, because the condition on line 267 was never true

268 translate_polymorphic_Q_object(self.model, a) 

269 elif isinstance(a, FilteredRelation): 269 ↛ 270line 269 didn't jump to line 270, because the condition on line 269 was never true

270 patch_lookup(a.condition) 

271 elif hasattr(a, "get_source_expressions"): 

272 for source_expression in a.get_source_expressions(): 

273 if source_expression is not None: 273 ↛ 272line 273 didn't jump to line 272, because the condition on line 273 was never false

274 patch_lookup(source_expression) 

275 else: 

276 a.name = translate_polymorphic_field_path(self.model, a.name) 

277 

278 def test___lookup(a): 

279 """*args might be complex expressions too in django 1.8 so 

280 the testing for a '___' is rather complex on this one""" 

281 if isinstance(a, Q): 

282 

283 def tree_node_test___lookup(my_model, node): 

284 "process all children of this Q node" 

285 for i in range(len(node.children)): 

286 child = node.children[i] 

287 

288 if type(child) == tuple: 

289 # this Q object child is a tuple => a kwarg like Q( instance_of=ModelB ) 

290 assert "___" not in child[0], ___lookup_assert_msg 

291 else: 

292 # this Q object child is another Q object, recursively process this as well 

293 tree_node_test___lookup(my_model, child) 

294 

295 tree_node_test___lookup(self.model, a) 

296 elif hasattr(a, "get_source_expressions"): 

297 for source_expression in a.get_source_expressions(): 

298 test___lookup(source_expression) 

299 else: 

300 assert "___" not in a.name, ___lookup_assert_msg 

301 

302 for a in args: 302 ↛ 303line 302 didn't jump to line 303, because the loop on line 302 never started

303 test___lookup(a) 

304 for a in kwargs.values(): 

305 patch_lookup(a) 

306 

307 def annotate(self, *args, **kwargs): 

308 """translate the polymorphic field paths in the kwargs, then call vanilla annotate. 

309 _get_real_instances will do the rest of the job after executing the query.""" 

310 self._process_aggregate_args(args, kwargs) 

311 return super().annotate(*args, **kwargs) 

312 

313 def aggregate(self, *args, **kwargs): 

314 """translate the polymorphic field paths in the kwargs, then call vanilla aggregate. 

315 We need no polymorphic object retrieval for aggregate => switch it off.""" 

316 self._process_aggregate_args(args, kwargs) 

317 qs = self.non_polymorphic() 

318 return super(PolymorphicQuerySet, qs).aggregate(*args, **kwargs) 

319 

320 # Starting with Django 1.9, the copy returned by 'qs.values(...)' has the 

321 # same class as 'qs', so our polymorphic modifications would apply. 

322 # We want to leave values queries untouched, so we set 'polymorphic_disabled'. 

323 def _values(self, *args, **kwargs): 

324 clone = super()._values(*args, **kwargs) 

325 clone.polymorphic_disabled = True 

326 return clone 

327 

328 # Since django_polymorphic 'V1.0 beta2', extra() always returns polymorphic results. 

329 # The resulting objects are required to have a unique primary key within the result set 

330 # (otherwise an error is thrown). 

331 # The "polymorphic" keyword argument is not supported anymore. 

332 # def extra(self, *args, **kwargs): 

333 

334 def _get_real_instances(self, base_result_objects): 

335 """ 

336 Polymorphic object loader 

337 

338 Does the same as: 

339 

340 return [ o.get_real_instance() for o in base_result_objects ] 

341 

342 but more efficiently. 

343 

344 The list base_result_objects contains the objects from the executed 

345 base class query. The class of all of them is self.model (our base model). 

346 

347 Some, many or all of these objects were not created and stored as 

348 class self.model, but as a class derived from self.model. We want to re-fetch 

349 these objects from the db as their original class so we can return them 

350 just as they were created/saved. 

351 

352 We identify these objects by looking at o.polymorphic_ctype, which specifies 

353 the real class of these objects (the class at the time they were saved). 

354 

355 First, we sort the result objects in base_result_objects for their 

356 subclass (from o.polymorphic_ctype), and then we execute one db query per 

357 subclass of objects. Here, we handle any annotations from annotate(). 

358 

359 Finally we re-sort the resulting objects into the correct order and 

360 return them as a list. 

361 """ 

362 resultlist = [] # polymorphic list of result-objects 

363 

364 # dict contains one entry per unique model type occurring in result, 

365 # in the format idlist_per_model[modelclass]=[list-of-object-ids] 

366 idlist_per_model = defaultdict(list) 

367 indexlist_per_model = defaultdict(list) 

368 

369 # django's automatic ".pk" field does not always work correctly for 

370 # custom fields in derived objects (unclear yet who to put the blame on). 

371 # We get different type(o.pk) in this case. 

372 # We work around this by using the real name of the field directly 

373 # for accessing the primary key of the the derived objects. 

374 # We might assume that self.model._meta.pk.name gives us the name of the primary key field, 

375 # but it doesn't. Therefore we use polymorphic_primary_key_name, which we set up in base.py. 

376 pk_name = self.model.polymorphic_primary_key_name 

377 

378 # - sort base_result_object ids into idlist_per_model lists, depending on their real class; 

379 # - store objects that already have the correct class into "results" 

380 content_type_manager = ContentType.objects.db_manager(self.db) 

381 self_model_class_id = content_type_manager.get_for_model( 

382 self.model, for_concrete_model=False 

383 ).pk 

384 self_concrete_model_class_id = content_type_manager.get_for_model( 

385 self.model, for_concrete_model=True 

386 ).pk 

387 

388 for i, base_object in enumerate(base_result_objects): 

389 

390 if base_object.polymorphic_ctype_id == self_model_class_id: 

391 # Real class is exactly the same as base class, go straight to results 

392 resultlist.append(base_object) 

393 else: 

394 real_concrete_class = base_object.get_real_instance_class() 

395 real_concrete_class_id = base_object.get_real_concrete_instance_class_id() 

396 

397 if real_concrete_class_id is None: 397 ↛ 399line 397 didn't jump to line 399, because the condition on line 397 was never true

398 # Dealing with a stale content type 

399 continue 

400 elif real_concrete_class_id == self_concrete_model_class_id: 

401 # Real and base classes share the same concrete ancestor, 

402 # upcast it and put it in the results 

403 resultlist.append(transmogrify(real_concrete_class, base_object)) 

404 else: 

405 # This model has a concrete derived class, track it for bulk retrieval. 

406 real_concrete_class = content_type_manager.get_for_id( 

407 real_concrete_class_id 

408 ).model_class() 

409 idlist_per_model[real_concrete_class].append(getattr(base_object, pk_name)) 

410 indexlist_per_model[real_concrete_class].append((i, len(resultlist))) 

411 resultlist.append(None) 

412 

413 # For each model in "idlist_per_model" request its objects (the real model) 

414 # from the db and store them in results[]. 

415 # Then we copy the annotate fields from the base objects to the real objects. 

416 # Then we copy the extra() select fields from the base objects to the real objects. 

417 # TODO: defer(), only(): support for these would be around here 

418 for real_concrete_class, idlist in idlist_per_model.items(): 

419 indices = indexlist_per_model[real_concrete_class] 

420 real_objects = real_concrete_class._base_objects.db_manager(self.db).filter( 

421 **{("%s__in" % pk_name): idlist} 

422 ) 

423 # copy select related configuration to new qs 

424 real_objects.query.select_related = self.query.select_related 

425 

426 # Copy deferred fields configuration to the new queryset 

427 deferred_loading_fields = [] 

428 existing_fields = self.polymorphic_deferred_loading[0] 

429 for field in existing_fields: 429 ↛ 430line 429 didn't jump to line 430, because the loop on line 429 never started

430 try: 

431 translated_field_name = translate_polymorphic_field_path( 

432 real_concrete_class, field 

433 ) 

434 except AssertionError: 

435 if "___" in field: 

436 # The originally passed argument to .defer() or .only() 

437 # was in the form Model2B___field2, where Model2B is 

438 # now a superclass of real_concrete_class. Thus it's 

439 # sufficient to just use the field name. 

440 translated_field_name = field.rpartition("___")[-1] 

441 

442 # Check if the field does exist. 

443 # Ignore deferred fields that don't exist in this subclass type. 

444 try: 

445 real_concrete_class._meta.get_field(translated_field_name) 

446 except FieldDoesNotExist: 

447 continue 

448 else: 

449 raise 

450 

451 deferred_loading_fields.append(translated_field_name) 

452 real_objects.query.deferred_loading = ( 

453 set(deferred_loading_fields), 

454 self.query.deferred_loading[1], 

455 ) 

456 

457 real_objects_dict = { 

458 getattr(real_object, pk_name): real_object for real_object in real_objects 

459 } 

460 

461 for i, j in indices: 

462 base_object = base_result_objects[i] 

463 o_pk = getattr(base_object, pk_name) 

464 real_object = real_objects_dict.get(o_pk) 

465 if real_object is None: 465 ↛ 466line 465 didn't jump to line 466, because the condition on line 465 was never true

466 continue 

467 

468 # need shallow copy to avoid duplication in caches (see PR #353) 

469 real_object = copy.copy(real_object) 

470 real_class = real_object.get_real_instance_class() 

471 

472 # If the real class is a proxy, upcast it 

473 if real_class != real_concrete_class: 473 ↛ 474line 473 didn't jump to line 474, because the condition on line 473 was never true

474 real_object = transmogrify(real_class, real_object) 

475 

476 if self.query.annotations: 476 ↛ 477line 476 didn't jump to line 477, because the condition on line 476 was never true

477 for anno_field_name in self.query.annotations.keys(): 

478 attr = getattr(base_object, anno_field_name) 

479 setattr(real_object, anno_field_name, attr) 

480 

481 if self.query.extra_select: 481 ↛ 482line 481 didn't jump to line 482, because the condition on line 481 was never true

482 for select_field_name in self.query.extra_select.keys(): 

483 attr = getattr(base_object, select_field_name) 

484 setattr(real_object, select_field_name, attr) 

485 

486 resultlist[j] = real_object 

487 

488 resultlist = [i for i in resultlist if i] 

489 

490 # set polymorphic_annotate_names in all objects (currently just used for debugging/printing) 

491 if self.query.annotations: 491 ↛ 493line 491 didn't jump to line 493, because the condition on line 491 was never true

492 # get annotate field list 

493 annotate_names = list(self.query.annotations.keys()) 

494 for real_object in resultlist: 

495 real_object.polymorphic_annotate_names = annotate_names 

496 

497 # set polymorphic_extra_select_names in all objects (currently just used for debugging/printing) 

498 if self.query.extra_select: 498 ↛ 500line 498 didn't jump to line 500, because the condition on line 498 was never true

499 # get extra select field list 

500 extra_select_names = list(self.query.extra_select.keys()) 

501 for real_object in resultlist: 

502 real_object.polymorphic_extra_select_names = extra_select_names 

503 

504 return resultlist 

505 

506 def __repr__(self, *args, **kwargs): 

507 if self.model.polymorphic_query_multiline_output: 

508 result = [repr(o) for o in self.all()] 

509 return "[ " + ",\n ".join(result) + " ]" 

510 else: 

511 return super().__repr__(*args, **kwargs) 

512 

513 class _p_list_class(list): 

514 def __repr__(self, *args, **kwargs): 

515 result = [repr(o) for o in self] 

516 return "[ " + ",\n ".join(result) + " ]" 

517 

518 def get_real_instances(self, base_result_objects=None): 

519 """ 

520 Cast a list of objects to their actual classes. 

521 

522 This does roughly the same as:: 

523 

524 return [ o.get_real_instance() for o in base_result_objects ] 

525 

526 but more efficiently. 

527 

528 :rtype: PolymorphicQuerySet 

529 """ 

530 "same as _get_real_instances, but make sure that __repr__ for ShowField... creates correct output" 

531 if base_result_objects is None: 

532 base_result_objects = self 

533 olist = self._get_real_instances(base_result_objects) 

534 if not self.model.polymorphic_query_multiline_output: 

535 return olist 

536 clist = PolymorphicQuerySet._p_list_class(olist) 

537 return clist