Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/polymorphic/query_translate.py: 44%

129 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2PolymorphicQuerySet support functions 

3""" 

4import copy 

5from collections import deque 

6 

7from django.apps import apps 

8from django.contrib.contenttypes.models import ContentType 

9from django.core.exceptions import FieldDoesNotExist, FieldError 

10from django.db import models 

11from django.db.models import Q 

12from django.db.models.fields.related import ForeignObjectRel, RelatedField 

13from django.db.utils import DEFAULT_DB_ALIAS 

14 

15# These functions implement the additional filter- and Q-object functionality. 

16# They form a kind of small framework for easily adding more 

17# functionality to filters and Q objects. 

18# Probably a more general queryset enhancement class could be made out of them. 

19from polymorphic import compat 

20 

21################################################################################### 

22# PolymorphicQuerySet support functions 

23 

24 

25def translate_polymorphic_filter_definitions_in_kwargs( 

26 queryset_model, kwargs, using=DEFAULT_DB_ALIAS 

27): 

28 """ 

29 Translate the keyword argument list for PolymorphicQuerySet.filter() 

30 

31 Any kwargs with special polymorphic functionality are replaced in the kwargs 

32 dict with their vanilla django equivalents. 

33 

34 For some kwargs a direct replacement is not possible, as a Q object is needed 

35 instead to implement the required functionality. In these cases the kwarg is 

36 deleted from the kwargs dict and a Q object is added to the return list. 

37 

38 Modifies: kwargs dict 

39 Returns: a list of non-keyword-arguments (Q objects) to be added to the filter() query. 

40 """ 

41 additional_args = [] 

42 for field_path, val in kwargs.copy().items(): # Python 3 needs copy 

43 

44 new_expr = _translate_polymorphic_filter_definition( 

45 queryset_model, field_path, val, using=using 

46 ) 

47 

48 if isinstance(new_expr, tuple): 48 ↛ 50line 48 didn't jump to line 50, because the condition on line 48 was never true

49 # replace kwargs element 

50 del kwargs[field_path] 

51 kwargs[new_expr[0]] = new_expr[1] 

52 

53 elif isinstance(new_expr, models.Q): 

54 del kwargs[field_path] 

55 additional_args.append(new_expr) 

56 

57 return additional_args 

58 

59 

60def translate_polymorphic_Q_object(queryset_model, potential_q_object, using=DEFAULT_DB_ALIAS): 

61 def tree_node_correct_field_specs(my_model, node): 

62 "process all children of this Q node" 

63 for i in range(len(node.children)): 

64 child = node.children[i] 

65 

66 if isinstance(child, (tuple, list)): 66 ↛ 76line 66 didn't jump to line 76, because the condition on line 66 was never false

67 # this Q object child is a tuple => a kwarg like Q( instance_of=ModelB ) 

68 key, val = child 

69 new_expr = _translate_polymorphic_filter_definition( 

70 my_model, key, val, using=using 

71 ) 

72 if new_expr: 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true

73 node.children[i] = new_expr 

74 else: 

75 # this Q object child is another Q object, recursively process this as well 

76 tree_node_correct_field_specs(my_model, child) 

77 

78 if isinstance(potential_q_object, models.Q): 78 ↛ 81line 78 didn't jump to line 81, because the condition on line 78 was never false

79 tree_node_correct_field_specs(queryset_model, potential_q_object) 

80 

81 return potential_q_object 

82 

83 

84def translate_polymorphic_filter_definitions_in_args(queryset_model, args, using=DEFAULT_DB_ALIAS): 

85 """ 

86 Translate the non-keyword argument list for PolymorphicQuerySet.filter() 

87 

88 In the args list, we return all kwargs to Q-objects that contain special 

89 polymorphic functionality with their vanilla django equivalents. 

90 We traverse the Q object tree for this (which is simple). 

91 

92 

93 Returns: modified Q objects 

94 """ 

95 return [ 

96 translate_polymorphic_Q_object(queryset_model, copy.deepcopy(q), using=using) for q in args 

97 ] 

98 

99 

100def _translate_polymorphic_filter_definition( 

101 queryset_model, field_path, field_val, using=DEFAULT_DB_ALIAS 

102): 

103 """ 

104 Translate a keyword argument (field_path=field_val), as used for 

105 PolymorphicQuerySet.filter()-like functions (and Q objects). 

106 

107 A kwarg with special polymorphic functionality is translated into 

108 its vanilla django equivalent, which is returned, either as tuple 

109 (field_path, field_val) or as Q object. 

110 

111 Returns: kwarg tuple or Q object or None (if no change is required) 

112 """ 

113 

114 # handle instance_of expressions or alternatively, 

115 # if this is a normal Django filter expression, return None 

116 if field_path == "instance_of": 

117 return create_instanceof_q(field_val, using=using) 

118 elif field_path == "not_instance_of": 118 ↛ 119line 118 didn't jump to line 119, because the condition on line 118 was never true

119 return create_instanceof_q(field_val, not_instance_of=True, using=using) 

120 elif "___" not in field_path: 120 ↛ 125line 120 didn't jump to line 125, because the condition on line 120 was never false

121 return None # no change 

122 

123 # filter expression contains '___' (i.e. filter for polymorphic field) 

124 # => get the model class specified in the filter expression 

125 newpath = translate_polymorphic_field_path(queryset_model, field_path) 

126 return (newpath, field_val) 

127 

128 

129def translate_polymorphic_field_path(queryset_model, field_path): 

130 """ 

131 Translate a field path from a keyword argument, as used for 

132 PolymorphicQuerySet.filter()-like functions (and Q objects). 

133 Supports leading '-' (for order_by args). 

134 

135 E.g.: if queryset_model is ModelA, then "ModelC___field3" is translated 

136 into modela__modelb__modelc__field3. 

137 Returns: translated path (unchanged, if no translation needed) 

138 """ 

139 if not isinstance(field_path, str): 139 ↛ 140line 139 didn't jump to line 140, because the condition on line 139 was never true

140 raise ValueError(f"Expected field name as string: {field_path}") 

141 

142 classname, sep, pure_field_path = field_path.partition("___") 

143 if not sep: 143 ↛ 145line 143 didn't jump to line 145, because the condition on line 143 was never false

144 return field_path 

145 assert classname, "PolymorphicModel: %s: bad field specification" % field_path 

146 

147 negated = False 

148 if classname[0] == "-": 

149 negated = True 

150 classname = classname.lstrip("-") 

151 

152 if "__" in classname: 

153 # the user has app label prepended to class name via __ => use Django's get_model function 

154 appname, sep, classname = classname.partition("__") 

155 model = apps.get_model(appname, classname) 

156 assert model, "PolymorphicModel: model {} (in app {}) not found!".format( 

157 model.__name__, 

158 appname, 

159 ) 

160 if not issubclass(model, queryset_model): 

161 e = ( 

162 'PolymorphicModel: queryset filter error: "' 

163 + model.__name__ 

164 + '" is not derived from "' 

165 + queryset_model.__name__ 

166 + '"' 

167 ) 

168 raise AssertionError(e) 

169 

170 else: 

171 # the user has only given us the class name via ___ 

172 # => select the model from the sub models of the queryset base model 

173 

174 # Test whether it's actually a regular relation__ _fieldname (the field starting with an _) 

175 # so no tripple ClassName___field was intended. 

176 try: 

177 # This also retreives M2M relations now (including reverse foreign key relations) 

178 field = queryset_model._meta.get_field(classname) 

179 

180 if isinstance(field, (RelatedField, ForeignObjectRel)): 

181 # Can also test whether the field exists in the related object to avoid ambiguity between 

182 # class names and field names, but that never happens when your class names are in CamelCase. 

183 return field_path # No exception raised, field does exist. 

184 except FieldDoesNotExist: 

185 pass 

186 

187 submodels = _get_all_sub_models(queryset_model) 

188 model = submodels.get(classname, None) 

189 assert model, "PolymorphicModel: model {} not found (not a subclass of {})!".format( 

190 classname, 

191 queryset_model.__name__, 

192 ) 

193 

194 basepath = _create_base_path(queryset_model, model) 

195 

196 if negated: 

197 newpath = "-" 

198 else: 

199 newpath = "" 

200 

201 newpath += basepath 

202 if basepath: 

203 newpath += "__" 

204 

205 newpath += pure_field_path 

206 return newpath 

207 

208 

209def _get_all_sub_models(base_model): 

210 """#Collect all sub-models, this should be optimized (cached)""" 

211 result = {} 

212 queue = deque([base_model]) 

213 

214 while queue: 

215 model = queue.popleft() 

216 if issubclass(model, models.Model) and model != models.Model: 

217 # model name is occurring twice in submodel inheritance tree => Error 

218 if model.__name__ in result and model != result[model.__name__]: 

219 raise FieldError( 

220 "PolymorphicModel: model name alone is ambiguous: %s.%s and %s.%s match!\n" 

221 "In this case, please use the syntax: applabel__ModelName___field" 

222 % ( 

223 model._meta.app_label, 

224 model.__name__, 

225 result[model.__name__]._meta.app_label, 

226 result[model.__name__].__name__, 

227 ) 

228 ) 

229 

230 result[model.__name__] = model 

231 queue.extend(model.__subclasses__()) 

232 

233 return result 

234 

235 

236def _create_base_path(baseclass, myclass): 

237 # create new field path for expressions, e.g. for baseclass=ModelA, myclass=ModelC 

238 # 'modelb__modelc" is returned 

239 for b in myclass.__bases__: 

240 if b == baseclass: 

241 return _get_query_related_name(myclass) 

242 

243 path = _create_base_path(baseclass, b) 

244 if path: 

245 if b._meta.abstract or b._meta.proxy: 

246 return _get_query_related_name(myclass) 

247 else: 

248 return path + "__" + _get_query_related_name(myclass) 

249 return "" 

250 

251 

252def _get_query_related_name(myclass): 

253 for f in myclass._meta.local_fields: 

254 if isinstance(f, models.OneToOneField) and f.remote_field.parent_link: 

255 return f.related_query_name() 

256 

257 # Fallback to undetected name, 

258 # this happens on proxy models (e.g. SubclassSelectorProxyModel) 

259 return myclass.__name__.lower() 

260 

261 

262def create_instanceof_q(modellist, not_instance_of=False, using=DEFAULT_DB_ALIAS): 

263 """ 

264 Helper function for instance_of / not_instance_of 

265 Creates and returns a Q object that filters for the models in modellist, 

266 including all subclasses of these models (as we want to do the same 

267 as pythons isinstance() ). 

268 . 

269 We recursively collect all __subclasses__(), create a Q filter for each, 

270 and or-combine these Q objects. This could be done much more 

271 efficiently however (regarding the resulting sql), should an optimization 

272 be needed. 

273 """ 

274 if not modellist: 274 ↛ 275line 274 didn't jump to line 275, because the condition on line 274 was never true

275 return None 

276 

277 if not isinstance(modellist, (list, tuple)): 277 ↛ 278line 277 didn't jump to line 278, because the condition on line 277 was never true

278 from .models import PolymorphicModel 

279 

280 if issubclass(modellist, PolymorphicModel): 

281 modellist = [modellist] 

282 else: 

283 raise TypeError( 

284 "PolymorphicModel: instance_of expects a list of (polymorphic) " 

285 "models or a single (polymorphic) model" 

286 ) 

287 

288 contenttype_ids = _get_mro_content_type_ids(modellist, using) 

289 q = Q(polymorphic_ctype__in=sorted(contenttype_ids)) 

290 if not_instance_of: 290 ↛ 291line 290 didn't jump to line 291, because the condition on line 290 was never true

291 q = ~q 

292 return q 

293 

294 

295def _get_mro_content_type_ids(models, using): 

296 contenttype_ids = set() 

297 for model in models: 

298 ct = ContentType.objects.db_manager(using).get_for_model(model, for_concrete_model=False) 

299 contenttype_ids.add(ct.pk) 

300 subclasses = model.__subclasses__() 

301 if subclasses: 

302 contenttype_ids.update(_get_mro_content_type_ids(subclasses, using)) 

303 return contenttype_ids