Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/query_utils.py: 56%

178 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2Various data structures used in query construction. 

3 

4Factored out from django.db.models.query to avoid making the main module very 

5large and/or so that they can be used by other modules without getting into 

6circular import difficulties. 

7""" 

8import copy 

9import functools 

10import inspect 

11from collections import namedtuple 

12 

13from django.core.exceptions import FieldError 

14from django.db.models.constants import LOOKUP_SEP 

15from django.utils import tree 

16 

17# PathInfo is used when converting lookups (fk__somecol). The contents 

18# describe the relation in Model terms (model Options and Fields for both 

19# sides of the relation. The join_field is the field backing the relation. 

20PathInfo = namedtuple( 

21 "PathInfo", 

22 "from_opts to_opts target_fields join_field m2m direct filtered_relation", 

23) 

24 

25 

26def subclasses(cls): 

27 yield cls 

28 for subclass in cls.__subclasses__(): 

29 yield from subclasses(subclass) 

30 

31 

32class Q(tree.Node): 

33 """ 

34 Encapsulate filters as objects that can then be combined logically (using 

35 `&` and `|`). 

36 """ 

37 

38 # Connection types 

39 AND = "AND" 

40 OR = "OR" 

41 default = AND 

42 conditional = True 

43 

44 def __init__(self, *args, _connector=None, _negated=False, **kwargs): 

45 super().__init__( 

46 children=[*args, *sorted(kwargs.items())], 

47 connector=_connector, 

48 negated=_negated, 

49 ) 

50 

51 def _combine(self, other, conn): 

52 if not (isinstance(other, Q) or getattr(other, "conditional", False) is True): 52 ↛ 53line 52 didn't jump to line 53, because the condition on line 52 was never true

53 raise TypeError(other) 

54 

55 if not self: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true

56 return other.copy() if hasattr(other, "copy") else copy.copy(other) 

57 elif isinstance(other, Q) and not other: 57 ↛ 61line 57 didn't jump to line 61, because the condition on line 57 was never false

58 _, args, kwargs = self.deconstruct() 

59 return type(self)(*args, **kwargs) 

60 

61 obj = type(self)() 

62 obj.connector = conn 

63 obj.add(self, conn) 

64 obj.add(other, conn) 

65 return obj 

66 

67 def __or__(self, other): 

68 return self._combine(other, self.OR) 

69 

70 def __and__(self, other): 

71 return self._combine(other, self.AND) 

72 

73 def __invert__(self): 

74 obj = type(self)() 

75 obj.add(self, self.AND) 

76 obj.negate() 

77 return obj 

78 

79 def resolve_expression( 

80 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

81 ): 

82 # We must promote any new joins to left outer joins so that when Q is 

83 # used as an expression, rows aren't filtered due to joins. 

84 clause, joins = query._add_q( 

85 self, 

86 reuse, 

87 allow_joins=allow_joins, 

88 split_subq=False, 

89 check_filterable=False, 

90 ) 

91 query.promote_joins(joins) 

92 return clause 

93 

94 def deconstruct(self): 

95 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__) 

96 if path.startswith("django.db.models.query_utils"): 96 ↛ 98line 96 didn't jump to line 98, because the condition on line 96 was never false

97 path = path.replace("django.db.models.query_utils", "django.db.models") 

98 args = tuple(self.children) 

99 kwargs = {} 

100 if self.connector != self.default: 100 ↛ 101line 100 didn't jump to line 101, because the condition on line 100 was never true

101 kwargs["_connector"] = self.connector 

102 if self.negated: 102 ↛ 103line 102 didn't jump to line 103, because the condition on line 102 was never true

103 kwargs["_negated"] = True 

104 return path, args, kwargs 

105 

106 

107class DeferredAttribute: 

108 """ 

109 A wrapper for a deferred-loading field. When the value is read from this 

110 object the first time, the query is executed. 

111 """ 

112 

113 def __init__(self, field): 

114 self.field = field 

115 

116 def __get__(self, instance, cls=None): 

117 """ 

118 Retrieve and caches the value from the datastore on the first lookup. 

119 Return the cached value. 

120 """ 

121 if instance is None: 

122 return self 

123 data = instance.__dict__ 

124 field_name = self.field.attname 

125 if field_name not in data: 125 ↛ 128line 125 didn't jump to line 128, because the condition on line 125 was never true

126 # Let's see if the field is part of the parent chain. If so we 

127 # might be able to reuse the already loaded value. Refs #18343. 

128 val = self._check_parent_chain(instance) 

129 if val is None: 

130 instance.refresh_from_db(fields=[field_name]) 

131 else: 

132 data[field_name] = val 

133 return data[field_name] 

134 

135 def _check_parent_chain(self, instance): 

136 """ 

137 Check if the field value can be fetched from a parent field already 

138 loaded in the instance. This can be done if the to-be fetched 

139 field is a primary key field. 

140 """ 

141 opts = instance._meta 

142 link_field = opts.get_ancestor_link(self.field.model) 

143 if self.field.primary_key and self.field != link_field: 

144 return getattr(instance, link_field.attname) 

145 return None 

146 

147 

148class RegisterLookupMixin: 

149 @classmethod 

150 def _get_lookup(cls, lookup_name): 

151 return cls.get_lookups().get(lookup_name, None) 

152 

153 @classmethod 

154 @functools.lru_cache(maxsize=None) 

155 def get_lookups(cls): 

156 class_lookups = [ 

157 parent.__dict__.get("class_lookups", {}) for parent in inspect.getmro(cls) 

158 ] 

159 return cls.merge_dicts(class_lookups) 

160 

161 def get_lookup(self, lookup_name): 

162 from django.db.models.lookups import Lookup 

163 

164 found = self._get_lookup(lookup_name) 

165 if found is None and hasattr(self, "output_field"): 

166 return self.output_field.get_lookup(lookup_name) 

167 if found is not None and not issubclass(found, Lookup): 

168 return None 

169 return found 

170 

171 def get_transform(self, lookup_name): 

172 from django.db.models.lookups import Transform 

173 

174 found = self._get_lookup(lookup_name) 

175 if found is None and hasattr(self, "output_field"): 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true

176 return self.output_field.get_transform(lookup_name) 

177 if found is not None and not issubclass(found, Transform): 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true

178 return None 

179 return found 

180 

181 @staticmethod 

182 def merge_dicts(dicts): 

183 """ 

184 Merge dicts in reverse to preference the order of the original list. e.g., 

185 merge_dicts([a, b]) will preference the keys in 'a' over those in 'b'. 

186 """ 

187 merged = {} 

188 for d in reversed(dicts): 

189 merged.update(d) 

190 return merged 

191 

192 @classmethod 

193 def _clear_cached_lookups(cls): 

194 for subclass in subclasses(cls): 

195 subclass.get_lookups.cache_clear() 

196 

197 @classmethod 

198 def register_lookup(cls, lookup, lookup_name=None): 

199 if lookup_name is None: 199 ↛ 201line 199 didn't jump to line 201, because the condition on line 199 was never false

200 lookup_name = lookup.lookup_name 

201 if "class_lookups" not in cls.__dict__: 

202 cls.class_lookups = {} 

203 cls.class_lookups[lookup_name] = lookup 

204 cls._clear_cached_lookups() 

205 return lookup 

206 

207 @classmethod 

208 def _unregister_lookup(cls, lookup, lookup_name=None): 

209 """ 

210 Remove given lookup from cls lookups. For use in tests only as it's 

211 not thread-safe. 

212 """ 

213 if lookup_name is None: 

214 lookup_name = lookup.lookup_name 

215 del cls.class_lookups[lookup_name] 

216 

217 

218def select_related_descend(field, restricted, requested, load_fields, reverse=False): 

219 """ 

220 Return True if this field should be used to descend deeper for 

221 select_related() purposes. Used by both the query construction code 

222 (sql.query.fill_related_selections()) and the model instance creation code 

223 (query.get_klass_info()). 

224 

225 Arguments: 

226 * field - the field to be checked 

227 * restricted - a boolean field, indicating if the field list has been 

228 manually restricted using a requested clause) 

229 * requested - The select_related() dictionary. 

230 * load_fields - the set of fields to be loaded on this model 

231 * reverse - boolean, True if we are checking a reverse select related 

232 """ 

233 if not field.remote_field: 

234 return False 

235 if field.remote_field.parent_link and not reverse: 

236 return False 

237 if restricted: 

238 if reverse and field.related_query_name() not in requested: 

239 return False 

240 if not reverse and field.name not in requested: 

241 return False 

242 if not restricted and field.null: 

243 return False 

244 if load_fields: 

245 if field.attname not in load_fields: 

246 if restricted and field.name in requested: 

247 msg = ( 

248 "Field %s.%s cannot be both deferred and traversed using " 

249 "select_related at the same time." 

250 ) % (field.model._meta.object_name, field.name) 

251 raise FieldError(msg) 

252 return True 

253 

254 

255def refs_expression(lookup_parts, annotations): 

256 """ 

257 Check if the lookup_parts contains references to the given annotations set. 

258 Because the LOOKUP_SEP is contained in the default annotation names, check 

259 each prefix of the lookup_parts for a match. 

260 """ 

261 for n in range(1, len(lookup_parts) + 1): 

262 level_n_lookup = LOOKUP_SEP.join(lookup_parts[0:n]) 

263 if level_n_lookup in annotations and annotations[level_n_lookup]: 

264 return annotations[level_n_lookup], lookup_parts[n:] 

265 return False, () 

266 

267 

268def check_rel_lookup_compatibility(model, target_opts, field): 

269 """ 

270 Check that self.model is compatible with target_opts. Compatibility 

271 is OK if: 

272 1) model and opts match (where proxy inheritance is removed) 

273 2) model is parent of opts' model or the other way around 

274 """ 

275 

276 def check(opts): 

277 return ( 

278 model._meta.concrete_model == opts.concrete_model 

279 or opts.concrete_model in model._meta.get_parent_list() 

280 or model in opts.get_parent_list() 

281 ) 

282 

283 # If the field is a primary key, then doing a query against the field's 

284 # model is ok, too. Consider the case: 

285 # class Restaurant(models.Model): 

286 # place = OneToOneField(Place, primary_key=True): 

287 # Restaurant.objects.filter(pk__in=Restaurant.objects.all()). 

288 # If we didn't have the primary key check, then pk__in (== place__in) would 

289 # give Place's opts as the target opts, but Restaurant isn't compatible 

290 # with that. This logic applies only to primary keys, as when doing __in=qs, 

291 # we are going to turn this into __in=qs.values('pk') later on. 

292 return check(target_opts) or ( 

293 getattr(field, "primary_key", False) and check(field.model._meta) 

294 ) 

295 

296 

297class FilteredRelation: 

298 """Specify custom filtering in the ON clause of SQL joins.""" 

299 

300 def __init__(self, relation_name, *, condition=Q()): 

301 if not relation_name: 

302 raise ValueError("relation_name cannot be empty.") 

303 self.relation_name = relation_name 

304 self.alias = None 

305 if not isinstance(condition, Q): 

306 raise ValueError("condition argument must be a Q() instance.") 

307 self.condition = condition 

308 self.path = [] 

309 

310 def __eq__(self, other): 

311 if not isinstance(other, self.__class__): 

312 return NotImplemented 

313 return ( 

314 self.relation_name == other.relation_name 

315 and self.alias == other.alias 

316 and self.condition == other.condition 

317 ) 

318 

319 def clone(self): 

320 clone = FilteredRelation(self.relation_name, condition=self.condition) 

321 clone.alias = self.alias 

322 clone.path = self.path[:] 

323 return clone 

324 

325 def resolve_expression(self, *args, **kwargs): 

326 """ 

327 QuerySet.annotate() only accepts expression-like arguments 

328 (with a resolve_expression() method). 

329 """ 

330 raise NotImplementedError("FilteredRelation.resolve_expression() is unused.") 

331 

332 def as_sql(self, compiler, connection): 

333 # Resolve the condition in Join.filtered_relation. 

334 query = compiler.query 

335 where = query.build_filtered_relation_q(self.condition, reuse=set(self.path)) 

336 return compiler.compile(where)