Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/query_utils.py: 56%
178 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Various data structures used in query construction.
4Factored out from django.db.models.query to avoid making the main module very
5large and/or so that they can be used by other modules without getting into
6circular import difficulties.
7"""
8import copy
9import functools
10import inspect
11from collections import namedtuple
13from django.core.exceptions import FieldError
14from django.db.models.constants import LOOKUP_SEP
15from django.utils import tree
17# PathInfo is used when converting lookups (fk__somecol). The contents
18# describe the relation in Model terms (model Options and Fields for both
19# sides of the relation. The join_field is the field backing the relation.
20PathInfo = namedtuple(
21 "PathInfo",
22 "from_opts to_opts target_fields join_field m2m direct filtered_relation",
23)
26def subclasses(cls):
27 yield cls
28 for subclass in cls.__subclasses__():
29 yield from subclasses(subclass)
32class Q(tree.Node):
33 """
34 Encapsulate filters as objects that can then be combined logically (using
35 `&` and `|`).
36 """
38 # Connection types
39 AND = "AND"
40 OR = "OR"
41 default = AND
42 conditional = True
44 def __init__(self, *args, _connector=None, _negated=False, **kwargs):
45 super().__init__(
46 children=[*args, *sorted(kwargs.items())],
47 connector=_connector,
48 negated=_negated,
49 )
51 def _combine(self, other, conn):
52 if not (isinstance(other, Q) or getattr(other, "conditional", False) is True): 52 ↛ 53line 52 didn't jump to line 53, because the condition on line 52 was never true
53 raise TypeError(other)
55 if not self: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true
56 return other.copy() if hasattr(other, "copy") else copy.copy(other)
57 elif isinstance(other, Q) and not other: 57 ↛ 61line 57 didn't jump to line 61, because the condition on line 57 was never false
58 _, args, kwargs = self.deconstruct()
59 return type(self)(*args, **kwargs)
61 obj = type(self)()
62 obj.connector = conn
63 obj.add(self, conn)
64 obj.add(other, conn)
65 return obj
67 def __or__(self, other):
68 return self._combine(other, self.OR)
70 def __and__(self, other):
71 return self._combine(other, self.AND)
73 def __invert__(self):
74 obj = type(self)()
75 obj.add(self, self.AND)
76 obj.negate()
77 return obj
79 def resolve_expression(
80 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
81 ):
82 # We must promote any new joins to left outer joins so that when Q is
83 # used as an expression, rows aren't filtered due to joins.
84 clause, joins = query._add_q(
85 self,
86 reuse,
87 allow_joins=allow_joins,
88 split_subq=False,
89 check_filterable=False,
90 )
91 query.promote_joins(joins)
92 return clause
94 def deconstruct(self):
95 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
96 if path.startswith("django.db.models.query_utils"): 96 ↛ 98line 96 didn't jump to line 98, because the condition on line 96 was never false
97 path = path.replace("django.db.models.query_utils", "django.db.models")
98 args = tuple(self.children)
99 kwargs = {}
100 if self.connector != self.default: 100 ↛ 101line 100 didn't jump to line 101, because the condition on line 100 was never true
101 kwargs["_connector"] = self.connector
102 if self.negated: 102 ↛ 103line 102 didn't jump to line 103, because the condition on line 102 was never true
103 kwargs["_negated"] = True
104 return path, args, kwargs
107class DeferredAttribute:
108 """
109 A wrapper for a deferred-loading field. When the value is read from this
110 object the first time, the query is executed.
111 """
113 def __init__(self, field):
114 self.field = field
116 def __get__(self, instance, cls=None):
117 """
118 Retrieve and caches the value from the datastore on the first lookup.
119 Return the cached value.
120 """
121 if instance is None:
122 return self
123 data = instance.__dict__
124 field_name = self.field.attname
125 if field_name not in data: 125 ↛ 128line 125 didn't jump to line 128, because the condition on line 125 was never true
126 # Let's see if the field is part of the parent chain. If so we
127 # might be able to reuse the already loaded value. Refs #18343.
128 val = self._check_parent_chain(instance)
129 if val is None:
130 instance.refresh_from_db(fields=[field_name])
131 else:
132 data[field_name] = val
133 return data[field_name]
135 def _check_parent_chain(self, instance):
136 """
137 Check if the field value can be fetched from a parent field already
138 loaded in the instance. This can be done if the to-be fetched
139 field is a primary key field.
140 """
141 opts = instance._meta
142 link_field = opts.get_ancestor_link(self.field.model)
143 if self.field.primary_key and self.field != link_field:
144 return getattr(instance, link_field.attname)
145 return None
148class RegisterLookupMixin:
149 @classmethod
150 def _get_lookup(cls, lookup_name):
151 return cls.get_lookups().get(lookup_name, None)
153 @classmethod
154 @functools.lru_cache(maxsize=None)
155 def get_lookups(cls):
156 class_lookups = [
157 parent.__dict__.get("class_lookups", {}) for parent in inspect.getmro(cls)
158 ]
159 return cls.merge_dicts(class_lookups)
161 def get_lookup(self, lookup_name):
162 from django.db.models.lookups import Lookup
164 found = self._get_lookup(lookup_name)
165 if found is None and hasattr(self, "output_field"):
166 return self.output_field.get_lookup(lookup_name)
167 if found is not None and not issubclass(found, Lookup):
168 return None
169 return found
171 def get_transform(self, lookup_name):
172 from django.db.models.lookups import Transform
174 found = self._get_lookup(lookup_name)
175 if found is None and hasattr(self, "output_field"): 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true
176 return self.output_field.get_transform(lookup_name)
177 if found is not None and not issubclass(found, Transform): 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true
178 return None
179 return found
181 @staticmethod
182 def merge_dicts(dicts):
183 """
184 Merge dicts in reverse to preference the order of the original list. e.g.,
185 merge_dicts([a, b]) will preference the keys in 'a' over those in 'b'.
186 """
187 merged = {}
188 for d in reversed(dicts):
189 merged.update(d)
190 return merged
192 @classmethod
193 def _clear_cached_lookups(cls):
194 for subclass in subclasses(cls):
195 subclass.get_lookups.cache_clear()
197 @classmethod
198 def register_lookup(cls, lookup, lookup_name=None):
199 if lookup_name is None: 199 ↛ 201line 199 didn't jump to line 201, because the condition on line 199 was never false
200 lookup_name = lookup.lookup_name
201 if "class_lookups" not in cls.__dict__:
202 cls.class_lookups = {}
203 cls.class_lookups[lookup_name] = lookup
204 cls._clear_cached_lookups()
205 return lookup
207 @classmethod
208 def _unregister_lookup(cls, lookup, lookup_name=None):
209 """
210 Remove given lookup from cls lookups. For use in tests only as it's
211 not thread-safe.
212 """
213 if lookup_name is None:
214 lookup_name = lookup.lookup_name
215 del cls.class_lookups[lookup_name]
218def select_related_descend(field, restricted, requested, load_fields, reverse=False):
219 """
220 Return True if this field should be used to descend deeper for
221 select_related() purposes. Used by both the query construction code
222 (sql.query.fill_related_selections()) and the model instance creation code
223 (query.get_klass_info()).
225 Arguments:
226 * field - the field to be checked
227 * restricted - a boolean field, indicating if the field list has been
228 manually restricted using a requested clause)
229 * requested - The select_related() dictionary.
230 * load_fields - the set of fields to be loaded on this model
231 * reverse - boolean, True if we are checking a reverse select related
232 """
233 if not field.remote_field:
234 return False
235 if field.remote_field.parent_link and not reverse:
236 return False
237 if restricted:
238 if reverse and field.related_query_name() not in requested:
239 return False
240 if not reverse and field.name not in requested:
241 return False
242 if not restricted and field.null:
243 return False
244 if load_fields:
245 if field.attname not in load_fields:
246 if restricted and field.name in requested:
247 msg = (
248 "Field %s.%s cannot be both deferred and traversed using "
249 "select_related at the same time."
250 ) % (field.model._meta.object_name, field.name)
251 raise FieldError(msg)
252 return True
255def refs_expression(lookup_parts, annotations):
256 """
257 Check if the lookup_parts contains references to the given annotations set.
258 Because the LOOKUP_SEP is contained in the default annotation names, check
259 each prefix of the lookup_parts for a match.
260 """
261 for n in range(1, len(lookup_parts) + 1):
262 level_n_lookup = LOOKUP_SEP.join(lookup_parts[0:n])
263 if level_n_lookup in annotations and annotations[level_n_lookup]:
264 return annotations[level_n_lookup], lookup_parts[n:]
265 return False, ()
268def check_rel_lookup_compatibility(model, target_opts, field):
269 """
270 Check that self.model is compatible with target_opts. Compatibility
271 is OK if:
272 1) model and opts match (where proxy inheritance is removed)
273 2) model is parent of opts' model or the other way around
274 """
276 def check(opts):
277 return (
278 model._meta.concrete_model == opts.concrete_model
279 or opts.concrete_model in model._meta.get_parent_list()
280 or model in opts.get_parent_list()
281 )
283 # If the field is a primary key, then doing a query against the field's
284 # model is ok, too. Consider the case:
285 # class Restaurant(models.Model):
286 # place = OneToOneField(Place, primary_key=True):
287 # Restaurant.objects.filter(pk__in=Restaurant.objects.all()).
288 # If we didn't have the primary key check, then pk__in (== place__in) would
289 # give Place's opts as the target opts, but Restaurant isn't compatible
290 # with that. This logic applies only to primary keys, as when doing __in=qs,
291 # we are going to turn this into __in=qs.values('pk') later on.
292 return check(target_opts) or (
293 getattr(field, "primary_key", False) and check(field.model._meta)
294 )
297class FilteredRelation:
298 """Specify custom filtering in the ON clause of SQL joins."""
300 def __init__(self, relation_name, *, condition=Q()):
301 if not relation_name:
302 raise ValueError("relation_name cannot be empty.")
303 self.relation_name = relation_name
304 self.alias = None
305 if not isinstance(condition, Q):
306 raise ValueError("condition argument must be a Q() instance.")
307 self.condition = condition
308 self.path = []
310 def __eq__(self, other):
311 if not isinstance(other, self.__class__):
312 return NotImplemented
313 return (
314 self.relation_name == other.relation_name
315 and self.alias == other.alias
316 and self.condition == other.condition
317 )
319 def clone(self):
320 clone = FilteredRelation(self.relation_name, condition=self.condition)
321 clone.alias = self.alias
322 clone.path = self.path[:]
323 return clone
325 def resolve_expression(self, *args, **kwargs):
326 """
327 QuerySet.annotate() only accepts expression-like arguments
328 (with a resolve_expression() method).
329 """
330 raise NotImplementedError("FilteredRelation.resolve_expression() is unused.")
332 def as_sql(self, compiler, connection):
333 # Resolve the condition in Join.filtered_relation.
334 query = compiler.query
335 where = query.build_filtered_relation_q(self.condition, reuse=set(self.path))
336 return compiler.compile(where)