Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/polymorphic/query.py: 58%
235 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2QuerySet for PolymorphicModel
3"""
4import copy
5from collections import defaultdict
7from django import get_version as get_django_version
8from django.contrib.contenttypes.models import ContentType
9from django.core.exceptions import FieldDoesNotExist
10from django.db.models import FilteredRelation
11from django.db.models.query import ModelIterable, Q, QuerySet
13from .query_translate import (
14 translate_polymorphic_field_path,
15 translate_polymorphic_filter_definitions_in_args,
16 translate_polymorphic_filter_definitions_in_kwargs,
17 translate_polymorphic_Q_object,
18)
20# chunk-size: maximum number of objects requested per db-request
21# by the polymorphic queryset.iterator() implementation
22Polymorphic_QuerySet_objects_per_request = 100
25class PolymorphicModelIterable(ModelIterable):
26 """
27 ModelIterable for PolymorphicModel
29 Yields real instances if qs.polymorphic_disabled is False,
30 otherwise acts like a regular ModelIterable.
31 """
33 def __iter__(self):
34 base_iter = super().__iter__()
35 if self.queryset.polymorphic_disabled: 35 ↛ 36line 35 didn't jump to line 36, because the condition on line 35 was never true
36 return base_iter
37 return self._polymorphic_iterator(base_iter)
39 def _polymorphic_iterator(self, base_iter):
40 """
41 Here we do the same as::
43 real_results = queryset._get_real_instances(list(base_iter))
44 for o in real_results: yield o
46 but it requests the objects in chunks from the database,
47 with Polymorphic_QuerySet_objects_per_request per chunk
48 """
49 while True:
50 base_result_objects = []
51 reached_end = False
53 # Make sure the base iterator is read in chunks instead of
54 # reading it completely, in case our caller read only a few objects.
55 for i in range(Polymorphic_QuerySet_objects_per_request): 55 ↛ 64line 55 didn't jump to line 64, because the loop on line 55 didn't complete
57 try:
58 o = next(base_iter)
59 base_result_objects.append(o)
60 except StopIteration:
61 reached_end = True
62 break
64 real_results = self.queryset._get_real_instances(base_result_objects)
66 for o in real_results:
67 yield o
69 if reached_end: 69 ↛ 50line 69 didn't jump to line 50, because the condition on line 69 was never false
70 return
73def transmogrify(cls, obj):
74 """
75 Upcast a class to a different type without asking questions.
76 """
77 if "__init__" not in obj.__dict__: 77 ↛ 83line 77 didn't jump to line 83, because the condition on line 77 was never false
78 # Just assign __class__ to a different value.
79 new = obj
80 new.__class__ = cls
81 else:
82 # Run constructor, reassign values
83 new = cls()
84 for k, v in obj.__dict__.items():
85 new.__dict__[k] = v
86 return new
89###################################################################################
90# PolymorphicQuerySet
93class PolymorphicQuerySet(QuerySet):
94 """
95 QuerySet for PolymorphicModel
97 Contains the core functionality for PolymorphicModel
99 Usually not explicitly needed, except if a custom queryset class
100 is to be used.
101 """
103 def __init__(self, *args, **kwargs):
104 super().__init__(*args, **kwargs)
105 self._iterable_class = PolymorphicModelIterable
107 self.polymorphic_disabled = False
108 # A parallel structure to django.db.models.query.Query.deferred_loading,
109 # which we maintain with the untranslated field names passed to
110 # .defer() and .only() in order to be able to retranslate them when
111 # retrieving the real instance (so that the deferred fields apply
112 # to that queryset as well).
113 self.polymorphic_deferred_loading = (set(), True)
115 def _clone(self, *args, **kwargs):
116 # Django's _clone only copies its own variables, so we need to copy ours here
117 new = super()._clone(*args, **kwargs)
118 new.polymorphic_disabled = self.polymorphic_disabled
119 new.polymorphic_deferred_loading = (
120 copy.copy(self.polymorphic_deferred_loading[0]),
121 self.polymorphic_deferred_loading[1],
122 )
123 return new
125 def as_manager(cls):
126 from .managers import PolymorphicManager
128 manager = PolymorphicManager.from_queryset(cls)()
129 manager._built_with_as_manager = True
130 return manager
132 as_manager.queryset_only = True
133 as_manager = classmethod(as_manager)
135 def bulk_create(self, objs, batch_size=None, ignore_conflicts=False):
136 objs = list(objs)
137 for obj in objs:
138 obj.pre_save_polymorphic()
139 return super().bulk_create(objs, batch_size, ignore_conflicts=ignore_conflicts)
141 def non_polymorphic(self):
142 """switch off polymorphic behaviour for this query.
143 When the queryset is evaluated, only objects of the type of the
144 base class used for this query are returned."""
145 qs = self._clone()
146 qs.polymorphic_disabled = True
147 if issubclass(qs._iterable_class, PolymorphicModelIterable): 147 ↛ 149line 147 didn't jump to line 149, because the condition on line 147 was never false
148 qs._iterable_class = ModelIterable
149 return qs
151 def instance_of(self, *args):
152 """Filter the queryset to only include the classes in args (and their subclasses)."""
153 # Implementation in _translate_polymorphic_filter_defnition.
154 return self.filter(instance_of=args)
156 def not_instance_of(self, *args):
157 """Filter the queryset to exclude the classes in args (and their subclasses)."""
158 # Implementation in _translate_polymorphic_filter_defnition."""
159 return self.filter(not_instance_of=args)
161 # Makes _filter_or_exclude compatible with the change in signature introduced in django at 9c9a3fe
162 if get_django_version() >= "3.2": 162 ↛ 178line 162 didn't jump to line 178, because the condition on line 162 was never false
164 def _filter_or_exclude(self, negate, args, kwargs):
165 # We override this internal Django function as it is used for all filter member functions.
166 q_objects = translate_polymorphic_filter_definitions_in_args(
167 queryset_model=self.model, args=args, using=self.db
168 )
169 # filter_field='data'
170 additional_args = translate_polymorphic_filter_definitions_in_kwargs(
171 queryset_model=self.model, kwargs=kwargs, using=self.db
172 )
173 args = list(q_objects) + additional_args
174 return super()._filter_or_exclude(negate=negate, args=args, kwargs=kwargs)
176 else:
178 def _filter_or_exclude(self, negate, *args, **kwargs):
179 # We override this internal Django function as it is used for all filter member functions.
180 q_objects = translate_polymorphic_filter_definitions_in_args(
181 self.model, args, using=self.db
182 )
183 # filter_field='data'
184 additional_args = translate_polymorphic_filter_definitions_in_kwargs(
185 self.model, kwargs, using=self.db
186 )
187 return super()._filter_or_exclude(
188 negate, *(list(q_objects) + additional_args), **kwargs
189 )
191 def order_by(self, *field_names):
192 """translate the field paths in the args, then call vanilla order_by."""
193 field_names = [
194 translate_polymorphic_field_path(self.model, a)
195 if isinstance(a, str)
196 else a # allow expressions to pass unchanged
197 for a in field_names
198 ]
199 return super().order_by(*field_names)
201 def defer(self, *fields):
202 """
203 Translate the field paths in the args, then call vanilla defer.
205 Also retain a copy of the original fields passed, which we'll need
206 when we're retrieving the real instance (since we'll need to translate
207 them again, as the model will have changed).
208 """
209 new_fields = [translate_polymorphic_field_path(self.model, a) for a in fields]
210 clone = super().defer(*new_fields)
211 clone._polymorphic_add_deferred_loading(fields)
212 return clone
214 def only(self, *fields):
215 """
216 Translate the field paths in the args, then call vanilla only.
218 Also retain a copy of the original fields passed, which we'll need
219 when we're retrieving the real instance (since we'll need to translate
220 them again, as the model will have changed).
221 """
222 new_fields = [translate_polymorphic_field_path(self.model, a) for a in fields]
223 clone = super().only(*new_fields)
224 clone._polymorphic_add_immediate_loading(fields)
225 return clone
227 def _polymorphic_add_deferred_loading(self, field_names):
228 """
229 Follows the logic of django.db.models.query.Query.add_deferred_loading(),
230 but for the non-translated field names that were passed to self.defer().
231 """
232 existing, defer = self.polymorphic_deferred_loading
233 if defer:
234 # Add to existing deferred names.
235 self.polymorphic_deferred_loading = existing.union(field_names), True
236 else:
237 # Remove names from the set of any existing "immediate load" names.
238 self.polymorphic_deferred_loading = existing.difference(field_names), False
240 def _polymorphic_add_immediate_loading(self, field_names):
241 """
242 Follows the logic of django.db.models.query.Query.add_immediate_loading(),
243 but for the non-translated field names that were passed to self.only()
244 """
245 existing, defer = self.polymorphic_deferred_loading
246 field_names = set(field_names)
247 if "pk" in field_names: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true
248 field_names.remove("pk")
249 field_names.add(self.model._meta.pk.name)
251 if defer: 251 ↛ 257line 251 didn't jump to line 257, because the condition on line 251 was never false
252 # Remove any existing deferred names from the current set before
253 # setting the new names.
254 self.polymorphic_deferred_loading = field_names.difference(existing), False
255 else:
256 # Replace any existing "immediate load" field names.
257 self.polymorphic_deferred_loading = field_names, False
259 def _process_aggregate_args(self, args, kwargs):
260 """for aggregate and annotate kwargs: allow ModelX___field syntax for kwargs, forbid it for args.
261 Modifies kwargs if needed (these are Aggregate objects, we translate the lookup member variable)"""
262 ___lookup_assert_msg = "PolymorphicModel: annotate()/aggregate(): ___ model lookup supported for keyword arguments only"
264 def patch_lookup(a):
265 # The field on which the aggregate operates is
266 # stored inside a complex query expression.
267 if isinstance(a, Q): 267 ↛ 268line 267 didn't jump to line 268, because the condition on line 267 was never true
268 translate_polymorphic_Q_object(self.model, a)
269 elif isinstance(a, FilteredRelation): 269 ↛ 270line 269 didn't jump to line 270, because the condition on line 269 was never true
270 patch_lookup(a.condition)
271 elif hasattr(a, "get_source_expressions"):
272 for source_expression in a.get_source_expressions():
273 if source_expression is not None: 273 ↛ 272line 273 didn't jump to line 272, because the condition on line 273 was never false
274 patch_lookup(source_expression)
275 else:
276 a.name = translate_polymorphic_field_path(self.model, a.name)
278 def test___lookup(a):
279 """*args might be complex expressions too in django 1.8 so
280 the testing for a '___' is rather complex on this one"""
281 if isinstance(a, Q):
283 def tree_node_test___lookup(my_model, node):
284 "process all children of this Q node"
285 for i in range(len(node.children)):
286 child = node.children[i]
288 if type(child) == tuple:
289 # this Q object child is a tuple => a kwarg like Q( instance_of=ModelB )
290 assert "___" not in child[0], ___lookup_assert_msg
291 else:
292 # this Q object child is another Q object, recursively process this as well
293 tree_node_test___lookup(my_model, child)
295 tree_node_test___lookup(self.model, a)
296 elif hasattr(a, "get_source_expressions"):
297 for source_expression in a.get_source_expressions():
298 test___lookup(source_expression)
299 else:
300 assert "___" not in a.name, ___lookup_assert_msg
302 for a in args: 302 ↛ 303line 302 didn't jump to line 303, because the loop on line 302 never started
303 test___lookup(a)
304 for a in kwargs.values():
305 patch_lookup(a)
307 def annotate(self, *args, **kwargs):
308 """translate the polymorphic field paths in the kwargs, then call vanilla annotate.
309 _get_real_instances will do the rest of the job after executing the query."""
310 self._process_aggregate_args(args, kwargs)
311 return super().annotate(*args, **kwargs)
313 def aggregate(self, *args, **kwargs):
314 """translate the polymorphic field paths in the kwargs, then call vanilla aggregate.
315 We need no polymorphic object retrieval for aggregate => switch it off."""
316 self._process_aggregate_args(args, kwargs)
317 qs = self.non_polymorphic()
318 return super(PolymorphicQuerySet, qs).aggregate(*args, **kwargs)
320 # Starting with Django 1.9, the copy returned by 'qs.values(...)' has the
321 # same class as 'qs', so our polymorphic modifications would apply.
322 # We want to leave values queries untouched, so we set 'polymorphic_disabled'.
323 def _values(self, *args, **kwargs):
324 clone = super()._values(*args, **kwargs)
325 clone.polymorphic_disabled = True
326 return clone
328 # Since django_polymorphic 'V1.0 beta2', extra() always returns polymorphic results.
329 # The resulting objects are required to have a unique primary key within the result set
330 # (otherwise an error is thrown).
331 # The "polymorphic" keyword argument is not supported anymore.
332 # def extra(self, *args, **kwargs):
334 def _get_real_instances(self, base_result_objects):
335 """
336 Polymorphic object loader
338 Does the same as:
340 return [ o.get_real_instance() for o in base_result_objects ]
342 but more efficiently.
344 The list base_result_objects contains the objects from the executed
345 base class query. The class of all of them is self.model (our base model).
347 Some, many or all of these objects were not created and stored as
348 class self.model, but as a class derived from self.model. We want to re-fetch
349 these objects from the db as their original class so we can return them
350 just as they were created/saved.
352 We identify these objects by looking at o.polymorphic_ctype, which specifies
353 the real class of these objects (the class at the time they were saved).
355 First, we sort the result objects in base_result_objects for their
356 subclass (from o.polymorphic_ctype), and then we execute one db query per
357 subclass of objects. Here, we handle any annotations from annotate().
359 Finally we re-sort the resulting objects into the correct order and
360 return them as a list.
361 """
362 resultlist = [] # polymorphic list of result-objects
364 # dict contains one entry per unique model type occurring in result,
365 # in the format idlist_per_model[modelclass]=[list-of-object-ids]
366 idlist_per_model = defaultdict(list)
367 indexlist_per_model = defaultdict(list)
369 # django's automatic ".pk" field does not always work correctly for
370 # custom fields in derived objects (unclear yet who to put the blame on).
371 # We get different type(o.pk) in this case.
372 # We work around this by using the real name of the field directly
373 # for accessing the primary key of the the derived objects.
374 # We might assume that self.model._meta.pk.name gives us the name of the primary key field,
375 # but it doesn't. Therefore we use polymorphic_primary_key_name, which we set up in base.py.
376 pk_name = self.model.polymorphic_primary_key_name
378 # - sort base_result_object ids into idlist_per_model lists, depending on their real class;
379 # - store objects that already have the correct class into "results"
380 content_type_manager = ContentType.objects.db_manager(self.db)
381 self_model_class_id = content_type_manager.get_for_model(
382 self.model, for_concrete_model=False
383 ).pk
384 self_concrete_model_class_id = content_type_manager.get_for_model(
385 self.model, for_concrete_model=True
386 ).pk
388 for i, base_object in enumerate(base_result_objects):
390 if base_object.polymorphic_ctype_id == self_model_class_id:
391 # Real class is exactly the same as base class, go straight to results
392 resultlist.append(base_object)
393 else:
394 real_concrete_class = base_object.get_real_instance_class()
395 real_concrete_class_id = base_object.get_real_concrete_instance_class_id()
397 if real_concrete_class_id is None: 397 ↛ 399line 397 didn't jump to line 399, because the condition on line 397 was never true
398 # Dealing with a stale content type
399 continue
400 elif real_concrete_class_id == self_concrete_model_class_id:
401 # Real and base classes share the same concrete ancestor,
402 # upcast it and put it in the results
403 resultlist.append(transmogrify(real_concrete_class, base_object))
404 else:
405 # This model has a concrete derived class, track it for bulk retrieval.
406 real_concrete_class = content_type_manager.get_for_id(
407 real_concrete_class_id
408 ).model_class()
409 idlist_per_model[real_concrete_class].append(getattr(base_object, pk_name))
410 indexlist_per_model[real_concrete_class].append((i, len(resultlist)))
411 resultlist.append(None)
413 # For each model in "idlist_per_model" request its objects (the real model)
414 # from the db and store them in results[].
415 # Then we copy the annotate fields from the base objects to the real objects.
416 # Then we copy the extra() select fields from the base objects to the real objects.
417 # TODO: defer(), only(): support for these would be around here
418 for real_concrete_class, idlist in idlist_per_model.items():
419 indices = indexlist_per_model[real_concrete_class]
420 real_objects = real_concrete_class._base_objects.db_manager(self.db).filter(
421 **{("%s__in" % pk_name): idlist}
422 )
423 # copy select related configuration to new qs
424 real_objects.query.select_related = self.query.select_related
426 # Copy deferred fields configuration to the new queryset
427 deferred_loading_fields = []
428 existing_fields = self.polymorphic_deferred_loading[0]
429 for field in existing_fields: 429 ↛ 430line 429 didn't jump to line 430, because the loop on line 429 never started
430 try:
431 translated_field_name = translate_polymorphic_field_path(
432 real_concrete_class, field
433 )
434 except AssertionError:
435 if "___" in field:
436 # The originally passed argument to .defer() or .only()
437 # was in the form Model2B___field2, where Model2B is
438 # now a superclass of real_concrete_class. Thus it's
439 # sufficient to just use the field name.
440 translated_field_name = field.rpartition("___")[-1]
442 # Check if the field does exist.
443 # Ignore deferred fields that don't exist in this subclass type.
444 try:
445 real_concrete_class._meta.get_field(translated_field_name)
446 except FieldDoesNotExist:
447 continue
448 else:
449 raise
451 deferred_loading_fields.append(translated_field_name)
452 real_objects.query.deferred_loading = (
453 set(deferred_loading_fields),
454 self.query.deferred_loading[1],
455 )
457 real_objects_dict = {
458 getattr(real_object, pk_name): real_object for real_object in real_objects
459 }
461 for i, j in indices:
462 base_object = base_result_objects[i]
463 o_pk = getattr(base_object, pk_name)
464 real_object = real_objects_dict.get(o_pk)
465 if real_object is None: 465 ↛ 466line 465 didn't jump to line 466, because the condition on line 465 was never true
466 continue
468 # need shallow copy to avoid duplication in caches (see PR #353)
469 real_object = copy.copy(real_object)
470 real_class = real_object.get_real_instance_class()
472 # If the real class is a proxy, upcast it
473 if real_class != real_concrete_class: 473 ↛ 474line 473 didn't jump to line 474, because the condition on line 473 was never true
474 real_object = transmogrify(real_class, real_object)
476 if self.query.annotations: 476 ↛ 477line 476 didn't jump to line 477, because the condition on line 476 was never true
477 for anno_field_name in self.query.annotations.keys():
478 attr = getattr(base_object, anno_field_name)
479 setattr(real_object, anno_field_name, attr)
481 if self.query.extra_select: 481 ↛ 482line 481 didn't jump to line 482, because the condition on line 481 was never true
482 for select_field_name in self.query.extra_select.keys():
483 attr = getattr(base_object, select_field_name)
484 setattr(real_object, select_field_name, attr)
486 resultlist[j] = real_object
488 resultlist = [i for i in resultlist if i]
490 # set polymorphic_annotate_names in all objects (currently just used for debugging/printing)
491 if self.query.annotations: 491 ↛ 493line 491 didn't jump to line 493, because the condition on line 491 was never true
492 # get annotate field list
493 annotate_names = list(self.query.annotations.keys())
494 for real_object in resultlist:
495 real_object.polymorphic_annotate_names = annotate_names
497 # set polymorphic_extra_select_names in all objects (currently just used for debugging/printing)
498 if self.query.extra_select: 498 ↛ 500line 498 didn't jump to line 500, because the condition on line 498 was never true
499 # get extra select field list
500 extra_select_names = list(self.query.extra_select.keys())
501 for real_object in resultlist:
502 real_object.polymorphic_extra_select_names = extra_select_names
504 return resultlist
506 def __repr__(self, *args, **kwargs):
507 if self.model.polymorphic_query_multiline_output:
508 result = [repr(o) for o in self.all()]
509 return "[ " + ",\n ".join(result) + " ]"
510 else:
511 return super().__repr__(*args, **kwargs)
513 class _p_list_class(list):
514 def __repr__(self, *args, **kwargs):
515 result = [repr(o) for o in self]
516 return "[ " + ",\n ".join(result) + " ]"
518 def get_real_instances(self, base_result_objects=None):
519 """
520 Cast a list of objects to their actual classes.
522 This does roughly the same as::
524 return [ o.get_real_instance() for o in base_result_objects ]
526 but more efficiently.
528 :rtype: PolymorphicQuerySet
529 """
530 "same as _get_real_instances, but make sure that __repr__ for ShowField... creates correct output"
531 if base_result_objects is None:
532 base_result_objects = self
533 olist = self._get_real_instances(base_result_objects)
534 if not self.model.polymorphic_query_multiline_output:
535 return olist
536 clist = PolymorphicQuerySet._p_list_class(olist)
537 return clist