Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/contrib/contenttypes/fields.py: 26%
344 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import functools
2import itertools
3from collections import defaultdict
5from django.contrib.contenttypes.models import ContentType
6from django.core import checks
7from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist
8from django.db import DEFAULT_DB_ALIAS, models, router, transaction
9from django.db.models import DO_NOTHING, ForeignObject, ForeignObjectRel
10from django.db.models.base import ModelBase, make_foreign_order_accessors
11from django.db.models.fields.mixins import FieldCacheMixin
12from django.db.models.fields.related import (
13 ReverseManyToOneDescriptor,
14 lazy_related_operation,
15)
16from django.db.models.query_utils import PathInfo
17from django.db.models.sql import AND
18from django.db.models.sql.where import WhereNode
19from django.utils.functional import cached_property
22class GenericForeignKey(FieldCacheMixin):
23 """
24 Provide a generic many-to-one relation through the ``content_type`` and
25 ``object_id`` fields.
27 This class also doubles as an accessor to the related object (similar to
28 ForwardManyToOneDescriptor) by adding itself as a model attribute.
29 """
31 # Field flags
32 auto_created = False
33 concrete = False
34 editable = False
35 hidden = False
37 is_relation = True
38 many_to_many = False
39 many_to_one = True
40 one_to_many = False
41 one_to_one = False
42 related_model = None
43 remote_field = None
45 def __init__(
46 self, ct_field="content_type", fk_field="object_id", for_concrete_model=True
47 ):
48 self.ct_field = ct_field
49 self.fk_field = fk_field
50 self.for_concrete_model = for_concrete_model
51 self.editable = False
52 self.rel = None
53 self.column = None
55 def contribute_to_class(self, cls, name, **kwargs):
56 self.name = name
57 self.model = cls
58 cls._meta.add_field(self, private=True)
59 setattr(cls, name, self)
61 def get_filter_kwargs_for_object(self, obj):
62 """See corresponding method on Field"""
63 return {
64 self.fk_field: getattr(obj, self.fk_field),
65 self.ct_field: getattr(obj, self.ct_field),
66 }
68 def get_forward_related_filter(self, obj):
69 """See corresponding method on RelatedField"""
70 return {
71 self.fk_field: obj.pk,
72 self.ct_field: ContentType.objects.get_for_model(obj).pk,
73 }
75 def __str__(self):
76 model = self.model
77 return "%s.%s" % (model._meta.label, self.name)
79 def check(self, **kwargs):
80 return [
81 *self._check_field_name(),
82 *self._check_object_id_field(),
83 *self._check_content_type_field(),
84 ]
86 def _check_field_name(self):
87 if self.name.endswith("_"): 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true
88 return [
89 checks.Error(
90 "Field names must not end with an underscore.",
91 obj=self,
92 id="fields.E001",
93 )
94 ]
95 else:
96 return []
98 def _check_object_id_field(self):
99 try:
100 self.model._meta.get_field(self.fk_field)
101 except FieldDoesNotExist:
102 return [
103 checks.Error(
104 "The GenericForeignKey object ID references the "
105 "nonexistent field '%s'." % self.fk_field,
106 obj=self,
107 id="contenttypes.E001",
108 )
109 ]
110 else:
111 return []
113 def _check_content_type_field(self):
114 """
115 Check if field named `field_name` in model `model` exists and is a
116 valid content_type field (is a ForeignKey to ContentType).
117 """
118 try:
119 field = self.model._meta.get_field(self.ct_field)
120 except FieldDoesNotExist:
121 return [
122 checks.Error(
123 "The GenericForeignKey content type references the "
124 "nonexistent field '%s.%s'."
125 % (self.model._meta.object_name, self.ct_field),
126 obj=self,
127 id="contenttypes.E002",
128 )
129 ]
130 else:
131 if not isinstance(field, models.ForeignKey): 131 ↛ 132line 131 didn't jump to line 132, because the condition on line 131 was never true
132 return [
133 checks.Error(
134 "'%s.%s' is not a ForeignKey."
135 % (self.model._meta.object_name, self.ct_field),
136 hint=(
137 "GenericForeignKeys must use a ForeignKey to "
138 "'contenttypes.ContentType' as the 'content_type' field."
139 ),
140 obj=self,
141 id="contenttypes.E003",
142 )
143 ]
144 elif field.remote_field.model != ContentType: 144 ↛ 145line 144 didn't jump to line 145, because the condition on line 144 was never true
145 return [
146 checks.Error(
147 "'%s.%s' is not a ForeignKey to 'contenttypes.ContentType'."
148 % (self.model._meta.object_name, self.ct_field),
149 hint=(
150 "GenericForeignKeys must use a ForeignKey to "
151 "'contenttypes.ContentType' as the 'content_type' field."
152 ),
153 obj=self,
154 id="contenttypes.E004",
155 )
156 ]
157 else:
158 return []
160 def get_cache_name(self):
161 return self.name
163 def get_content_type(self, obj=None, id=None, using=None):
164 if obj is not None: 164 ↛ 168line 164 didn't jump to line 168, because the condition on line 164 was never false
165 return ContentType.objects.db_manager(obj._state.db).get_for_model(
166 obj, for_concrete_model=self.for_concrete_model
167 )
168 elif id is not None:
169 return ContentType.objects.db_manager(using).get_for_id(id)
170 else:
171 # This should never happen. I love comments like this, don't you?
172 raise Exception("Impossible arguments to GFK.get_content_type!")
174 def get_prefetch_queryset(self, instances, queryset=None):
175 if queryset is not None:
176 raise ValueError("Custom queryset can't be used for this lookup.")
178 # For efficiency, group the instances by content type and then do one
179 # query per model
180 fk_dict = defaultdict(set)
181 # We need one instance for each group in order to get the right db:
182 instance_dict = {}
183 ct_attname = self.model._meta.get_field(self.ct_field).get_attname()
184 for instance in instances:
185 # We avoid looking for values if either ct_id or fkey value is None
186 ct_id = getattr(instance, ct_attname)
187 if ct_id is not None:
188 fk_val = getattr(instance, self.fk_field)
189 if fk_val is not None:
190 fk_dict[ct_id].add(fk_val)
191 instance_dict[ct_id] = instance
193 ret_val = []
194 for ct_id, fkeys in fk_dict.items():
195 instance = instance_dict[ct_id]
196 ct = self.get_content_type(id=ct_id, using=instance._state.db)
197 ret_val.extend(ct.get_all_objects_for_this_type(pk__in=fkeys))
199 # For doing the join in Python, we have to match both the FK val and the
200 # content type, so we use a callable that returns a (fk, class) pair.
201 def gfk_key(obj):
202 ct_id = getattr(obj, ct_attname)
203 if ct_id is None:
204 return None
205 else:
206 model = self.get_content_type(
207 id=ct_id, using=obj._state.db
208 ).model_class()
209 return (
210 model._meta.pk.get_prep_value(getattr(obj, self.fk_field)),
211 model,
212 )
214 return (
215 ret_val,
216 lambda obj: (obj.pk, obj.__class__),
217 gfk_key,
218 True,
219 self.name,
220 False,
221 )
223 def __get__(self, instance, cls=None):
224 if instance is None:
225 return self
227 # Don't use getattr(instance, self.ct_field) here because that might
228 # reload the same ContentType over and over (#5570). Instead, get the
229 # content type ID here, and later when the actual instance is needed,
230 # use ContentType.objects.get_for_id(), which has a global cache.
231 f = self.model._meta.get_field(self.ct_field)
232 ct_id = getattr(instance, f.get_attname(), None)
233 pk_val = getattr(instance, self.fk_field)
235 rel_obj = self.get_cached_value(instance, default=None)
236 if rel_obj is None and self.is_cached(instance):
237 return rel_obj
238 if rel_obj is not None:
239 ct_match = (
240 ct_id == self.get_content_type(obj=rel_obj, using=instance._state.db).id
241 )
242 pk_match = rel_obj._meta.pk.to_python(pk_val) == rel_obj.pk
243 if ct_match and pk_match:
244 return rel_obj
245 else:
246 rel_obj = None
247 if ct_id is not None:
248 ct = self.get_content_type(id=ct_id, using=instance._state.db)
249 try:
250 rel_obj = ct.get_object_for_this_type(pk=pk_val)
251 except ObjectDoesNotExist:
252 pass
253 self.set_cached_value(instance, rel_obj)
254 return rel_obj
256 def __set__(self, instance, value):
257 ct = None
258 fk = None
259 if value is not None: 259 ↛ 263line 259 didn't jump to line 263, because the condition on line 259 was never false
260 ct = self.get_content_type(obj=value)
261 fk = value.pk
263 setattr(instance, self.ct_field, ct)
264 setattr(instance, self.fk_field, fk)
265 self.set_cached_value(instance, value)
268class GenericRel(ForeignObjectRel):
269 """
270 Used by GenericRelation to store information about the relation.
271 """
273 def __init__(
274 self,
275 field,
276 to,
277 related_name=None,
278 related_query_name=None,
279 limit_choices_to=None,
280 ):
281 super().__init__(
282 field,
283 to,
284 related_name=related_query_name or "+",
285 related_query_name=related_query_name,
286 limit_choices_to=limit_choices_to,
287 on_delete=DO_NOTHING,
288 )
291class GenericRelation(ForeignObject):
292 """
293 Provide a reverse to a relation created by a GenericForeignKey.
294 """
296 # Field flags
297 auto_created = False
298 empty_strings_allowed = False
300 many_to_many = False
301 many_to_one = False
302 one_to_many = True
303 one_to_one = False
305 rel_class = GenericRel
307 mti_inherited = False
309 def __init__(
310 self,
311 to,
312 object_id_field="object_id",
313 content_type_field="content_type",
314 for_concrete_model=True,
315 related_query_name=None,
316 limit_choices_to=None,
317 **kwargs,
318 ):
319 kwargs["rel"] = self.rel_class(
320 self,
321 to,
322 related_query_name=related_query_name,
323 limit_choices_to=limit_choices_to,
324 )
326 # Reverse relations are always nullable (Django can't enforce that a
327 # foreign key on the related model points to this model).
328 kwargs["null"] = True
329 kwargs["blank"] = True
330 kwargs["on_delete"] = models.CASCADE
331 kwargs["editable"] = False
332 kwargs["serialize"] = False
334 # This construct is somewhat of an abuse of ForeignObject. This field
335 # represents a relation from pk to object_id field. But, this relation
336 # isn't direct, the join is generated reverse along foreign key. So,
337 # the from_field is object_id field, to_field is pk because of the
338 # reverse join.
339 super().__init__(to, from_fields=[object_id_field], to_fields=[], **kwargs)
341 self.object_id_field_name = object_id_field
342 self.content_type_field_name = content_type_field
343 self.for_concrete_model = for_concrete_model
345 def check(self, **kwargs):
346 return [
347 *super().check(**kwargs),
348 *self._check_generic_foreign_key_existence(),
349 ]
351 def _is_matching_generic_foreign_key(self, field):
352 """
353 Return True if field is a GenericForeignKey whose content type and
354 object id fields correspond to the equivalent attributes on this
355 GenericRelation.
356 """
357 return (
358 isinstance(field, GenericForeignKey)
359 and field.ct_field == self.content_type_field_name
360 and field.fk_field == self.object_id_field_name
361 )
363 def _check_generic_foreign_key_existence(self):
364 target = self.remote_field.model
365 if isinstance(target, ModelBase):
366 fields = target._meta.private_fields
367 if any(self._is_matching_generic_foreign_key(field) for field in fields):
368 return []
369 else:
370 return [
371 checks.Error(
372 "The GenericRelation defines a relation with the model "
373 "'%s', but that model does not have a GenericForeignKey."
374 % target._meta.label,
375 obj=self,
376 id="contenttypes.E004",
377 )
378 ]
379 else:
380 return []
382 def resolve_related_fields(self):
383 self.to_fields = [self.model._meta.pk.name]
384 return [
385 (
386 self.remote_field.model._meta.get_field(self.object_id_field_name),
387 self.model._meta.pk,
388 )
389 ]
391 def _get_path_info_with_parent(self, filtered_relation):
392 """
393 Return the path that joins the current model through any parent models.
394 The idea is that if you have a GFK defined on a parent model then we
395 need to join the parent model first, then the child model.
396 """
397 # With an inheritance chain ChildTag -> Tag and Tag defines the
398 # GenericForeignKey, and a TaggedItem model has a GenericRelation to
399 # ChildTag, then we need to generate a join from TaggedItem to Tag
400 # (as Tag.object_id == TaggedItem.pk), and another join from Tag to
401 # ChildTag (as that is where the relation is to). Do this by first
402 # generating a join to the parent model, then generating joins to the
403 # child models.
404 path = []
405 opts = self.remote_field.model._meta.concrete_model._meta
406 parent_opts = opts.get_field(self.object_id_field_name).model._meta
407 target = parent_opts.pk
408 path.append(
409 PathInfo(
410 from_opts=self.model._meta,
411 to_opts=parent_opts,
412 target_fields=(target,),
413 join_field=self.remote_field,
414 m2m=True,
415 direct=False,
416 filtered_relation=filtered_relation,
417 )
418 )
419 # Collect joins needed for the parent -> child chain. This is easiest
420 # to do if we collect joins for the child -> parent chain and then
421 # reverse the direction (call to reverse() and use of
422 # field.remote_field.get_path_info()).
423 parent_field_chain = []
424 while parent_opts != opts:
425 field = opts.get_ancestor_link(parent_opts.model)
426 parent_field_chain.append(field)
427 opts = field.remote_field.model._meta
428 parent_field_chain.reverse()
429 for field in parent_field_chain:
430 path.extend(field.remote_field.get_path_info())
431 return path
433 def get_path_info(self, filtered_relation=None):
434 opts = self.remote_field.model._meta
435 object_id_field = opts.get_field(self.object_id_field_name)
436 if object_id_field.model != opts.model:
437 return self._get_path_info_with_parent(filtered_relation)
438 else:
439 target = opts.pk
440 return [
441 PathInfo(
442 from_opts=self.model._meta,
443 to_opts=opts,
444 target_fields=(target,),
445 join_field=self.remote_field,
446 m2m=True,
447 direct=False,
448 filtered_relation=filtered_relation,
449 )
450 ]
452 def get_reverse_path_info(self, filtered_relation=None):
453 opts = self.model._meta
454 from_opts = self.remote_field.model._meta
455 return [
456 PathInfo(
457 from_opts=from_opts,
458 to_opts=opts,
459 target_fields=(opts.pk,),
460 join_field=self,
461 m2m=not self.unique,
462 direct=False,
463 filtered_relation=filtered_relation,
464 )
465 ]
467 def value_to_string(self, obj):
468 qs = getattr(obj, self.name).all()
469 return str([instance.pk for instance in qs])
471 def contribute_to_class(self, cls, name, **kwargs):
472 kwargs["private_only"] = True
473 super().contribute_to_class(cls, name, **kwargs)
474 self.model = cls
475 # Disable the reverse relation for fields inherited by subclasses of a
476 # model in multi-table inheritance. The reverse relation points to the
477 # field of the base model.
478 if self.mti_inherited:
479 self.remote_field.related_name = "+"
480 self.remote_field.related_query_name = None
481 setattr(cls, self.name, ReverseGenericManyToOneDescriptor(self.remote_field))
483 # Add get_RELATED_order() and set_RELATED_order() to the model this
484 # field belongs to, if the model on the other end of this relation
485 # is ordered with respect to its corresponding GenericForeignKey.
486 if not cls._meta.abstract:
488 def make_generic_foreign_order_accessors(related_model, model):
489 if self._is_matching_generic_foreign_key(
490 model._meta.order_with_respect_to
491 ):
492 make_foreign_order_accessors(model, related_model)
494 lazy_related_operation(
495 make_generic_foreign_order_accessors,
496 self.model,
497 self.remote_field.model,
498 )
500 def set_attributes_from_rel(self):
501 pass
503 def get_internal_type(self):
504 return "ManyToManyField"
506 def get_content_type(self):
507 """
508 Return the content type associated with this field's model.
509 """
510 return ContentType.objects.get_for_model(
511 self.model, for_concrete_model=self.for_concrete_model
512 )
514 def get_extra_restriction(self, alias, remote_alias):
515 field = self.remote_field.model._meta.get_field(self.content_type_field_name)
516 contenttype_pk = self.get_content_type().pk
517 lookup = field.get_lookup("exact")(field.get_col(remote_alias), contenttype_pk)
518 return WhereNode([lookup], connector=AND)
520 def bulk_related_objects(self, objs, using=DEFAULT_DB_ALIAS):
521 """
522 Return all objects related to ``objs`` via this ``GenericRelation``.
523 """
524 return self.remote_field.model._base_manager.db_manager(using).filter(
525 **{
526 "%s__pk"
527 % self.content_type_field_name: ContentType.objects.db_manager(using)
528 .get_for_model(self.model, for_concrete_model=self.for_concrete_model)
529 .pk,
530 "%s__in" % self.object_id_field_name: [obj.pk for obj in objs],
531 }
532 )
535class ReverseGenericManyToOneDescriptor(ReverseManyToOneDescriptor):
536 """
537 Accessor to the related objects manager on the one-to-many relation created
538 by GenericRelation.
540 In the example::
542 class Post(Model):
543 comments = GenericRelation(Comment)
545 ``post.comments`` is a ReverseGenericManyToOneDescriptor instance.
546 """
548 @cached_property
549 def related_manager_cls(self):
550 return create_generic_related_manager(
551 self.rel.model._default_manager.__class__,
552 self.rel,
553 )
556def create_generic_related_manager(superclass, rel):
557 """
558 Factory function to create a manager that subclasses another manager
559 (generally the default manager of a given model) and adds behaviors
560 specific to generic relations.
561 """
563 class GenericRelatedObjectManager(superclass):
564 def __init__(self, instance=None):
565 super().__init__()
567 self.instance = instance
569 self.model = rel.model
570 self.get_content_type = functools.partial(
571 ContentType.objects.db_manager(instance._state.db).get_for_model,
572 for_concrete_model=rel.field.for_concrete_model,
573 )
574 self.content_type = self.get_content_type(instance)
575 self.content_type_field_name = rel.field.content_type_field_name
576 self.object_id_field_name = rel.field.object_id_field_name
577 self.prefetch_cache_name = rel.field.attname
578 self.pk_val = instance.pk
580 self.core_filters = {
581 "%s__pk" % self.content_type_field_name: self.content_type.id,
582 self.object_id_field_name: self.pk_val,
583 }
585 def __call__(self, *, manager):
586 manager = getattr(self.model, manager)
587 manager_class = create_generic_related_manager(manager.__class__, rel)
588 return manager_class(instance=self.instance)
590 do_not_call_in_templates = True
592 def __str__(self):
593 return repr(self)
595 def _apply_rel_filters(self, queryset):
596 """
597 Filter the queryset for the instance this manager is bound to.
598 """
599 db = self._db or router.db_for_read(self.model, instance=self.instance)
600 return queryset.using(db).filter(**self.core_filters)
602 def _remove_prefetched_objects(self):
603 try:
604 self.instance._prefetched_objects_cache.pop(self.prefetch_cache_name)
605 except (AttributeError, KeyError):
606 pass # nothing to clear from cache
608 def get_queryset(self):
609 try:
610 return self.instance._prefetched_objects_cache[self.prefetch_cache_name]
611 except (AttributeError, KeyError):
612 queryset = super().get_queryset()
613 return self._apply_rel_filters(queryset)
615 def get_prefetch_queryset(self, instances, queryset=None):
616 if queryset is None:
617 queryset = super().get_queryset()
619 queryset._add_hints(instance=instances[0])
620 queryset = queryset.using(queryset._db or self._db)
621 # Group instances by content types.
622 content_type_queries = (
623 models.Q(
624 (f"{self.content_type_field_name}__pk", content_type_id),
625 (f"{self.object_id_field_name}__in", {obj.pk for obj in objs}),
626 )
627 for content_type_id, objs in itertools.groupby(
628 sorted(instances, key=lambda obj: self.get_content_type(obj).pk),
629 lambda obj: self.get_content_type(obj).pk,
630 )
631 )
632 query = models.Q(*content_type_queries, _connector=models.Q.OR)
633 # We (possibly) need to convert object IDs to the type of the
634 # instances' PK in order to match up instances:
635 object_id_converter = instances[0]._meta.pk.to_python
636 content_type_id_field_name = "%s_id" % self.content_type_field_name
637 return (
638 queryset.filter(query),
639 lambda relobj: (
640 object_id_converter(getattr(relobj, self.object_id_field_name)),
641 getattr(relobj, content_type_id_field_name),
642 ),
643 lambda obj: (obj.pk, self.get_content_type(obj).pk),
644 False,
645 self.prefetch_cache_name,
646 False,
647 )
649 def add(self, *objs, bulk=True):
650 self._remove_prefetched_objects()
651 db = router.db_for_write(self.model, instance=self.instance)
653 def check_and_update_obj(obj):
654 if not isinstance(obj, self.model):
655 raise TypeError(
656 "'%s' instance expected, got %r"
657 % (self.model._meta.object_name, obj)
658 )
659 setattr(obj, self.content_type_field_name, self.content_type)
660 setattr(obj, self.object_id_field_name, self.pk_val)
662 if bulk:
663 pks = []
664 for obj in objs:
665 if obj._state.adding or obj._state.db != db:
666 raise ValueError(
667 "%r instance isn't saved. Use bulk=False or save "
668 "the object first." % obj
669 )
670 check_and_update_obj(obj)
671 pks.append(obj.pk)
673 self.model._base_manager.using(db).filter(pk__in=pks).update(
674 **{
675 self.content_type_field_name: self.content_type,
676 self.object_id_field_name: self.pk_val,
677 }
678 )
679 else:
680 with transaction.atomic(using=db, savepoint=False):
681 for obj in objs:
682 check_and_update_obj(obj)
683 obj.save()
685 add.alters_data = True
687 def remove(self, *objs, bulk=True):
688 if not objs:
689 return
690 self._clear(self.filter(pk__in=[o.pk for o in objs]), bulk)
692 remove.alters_data = True
694 def clear(self, *, bulk=True):
695 self._clear(self, bulk)
697 clear.alters_data = True
699 def _clear(self, queryset, bulk):
700 self._remove_prefetched_objects()
701 db = router.db_for_write(self.model, instance=self.instance)
702 queryset = queryset.using(db)
703 if bulk:
704 # `QuerySet.delete()` creates its own atomic block which
705 # contains the `pre_delete` and `post_delete` signal handlers.
706 queryset.delete()
707 else:
708 with transaction.atomic(using=db, savepoint=False):
709 for obj in queryset:
710 obj.delete()
712 _clear.alters_data = True
714 def set(self, objs, *, bulk=True, clear=False):
715 # Force evaluation of `objs` in case it's a queryset whose value
716 # could be affected by `manager.clear()`. Refs #19816.
717 objs = tuple(objs)
719 db = router.db_for_write(self.model, instance=self.instance)
720 with transaction.atomic(using=db, savepoint=False):
721 if clear:
722 self.clear()
723 self.add(*objs, bulk=bulk)
724 else:
725 old_objs = set(self.using(db).all())
726 new_objs = []
727 for obj in objs:
728 if obj in old_objs:
729 old_objs.remove(obj)
730 else:
731 new_objs.append(obj)
733 self.remove(*old_objs)
734 self.add(*new_objs, bulk=bulk)
736 set.alters_data = True
738 def create(self, **kwargs):
739 self._remove_prefetched_objects()
740 kwargs[self.content_type_field_name] = self.content_type
741 kwargs[self.object_id_field_name] = self.pk_val
742 db = router.db_for_write(self.model, instance=self.instance)
743 return super().using(db).create(**kwargs)
745 create.alters_data = True
747 def get_or_create(self, **kwargs):
748 kwargs[self.content_type_field_name] = self.content_type
749 kwargs[self.object_id_field_name] = self.pk_val
750 db = router.db_for_write(self.model, instance=self.instance)
751 return super().using(db).get_or_create(**kwargs)
753 get_or_create.alters_data = True
755 def update_or_create(self, **kwargs):
756 kwargs[self.content_type_field_name] = self.content_type
757 kwargs[self.object_id_field_name] = self.pk_val
758 db = router.db_for_write(self.model, instance=self.instance)
759 return super().using(db).update_or_create(**kwargs)
761 update_or_create.alters_data = True
763 return GenericRelatedObjectManager