Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework/filters.py: 27%

157 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2Provides generic filtering backends that can be used to filter the results 

3returned by list views. 

4""" 

5import operator 

6from functools import reduce 

7 

8from django.core.exceptions import ImproperlyConfigured 

9from django.db import models 

10from django.db.models.constants import LOOKUP_SEP 

11from django.template import loader 

12from django.utils.encoding import force_str 

13from django.utils.translation import gettext_lazy as _ 

14 

15from rest_framework.compat import coreapi, coreschema, distinct 

16from rest_framework.settings import api_settings 

17 

18 

19class BaseFilterBackend: 

20 """ 

21 A base class from which all filter backend classes should inherit. 

22 """ 

23 

24 def filter_queryset(self, request, queryset, view): 

25 """ 

26 Return a filtered queryset. 

27 """ 

28 raise NotImplementedError(".filter_queryset() must be overridden.") 

29 

30 def get_schema_fields(self, view): 

31 assert coreapi is not None, 'coreapi must be installed to use `get_schema_fields()`' 

32 assert coreschema is not None, 'coreschema must be installed to use `get_schema_fields()`' 

33 return [] 

34 

35 def get_schema_operation_parameters(self, view): 

36 return [] 

37 

38 

39class SearchFilter(BaseFilterBackend): 

40 # The URL query parameter used for the search. 

41 search_param = api_settings.SEARCH_PARAM 

42 template = 'rest_framework/filters/search.html' 

43 lookup_prefixes = { 

44 '^': 'istartswith', 

45 '=': 'iexact', 

46 '@': 'search', 

47 '$': 'iregex', 

48 } 

49 search_title = _('Search') 

50 search_description = _('A search term.') 

51 

52 def get_search_fields(self, view, request): 

53 """ 

54 Search fields are obtained from the view, but the request is always 

55 passed to this method. Sub-classes can override this method to 

56 dynamically change the search fields based on request content. 

57 """ 

58 return getattr(view, 'search_fields', None) 

59 

60 def get_search_terms(self, request): 

61 """ 

62 Search terms are set by a ?search=... query parameter, 

63 and may be comma and/or whitespace delimited. 

64 """ 

65 params = request.query_params.get(self.search_param, '') 

66 params = params.replace('\x00', '') # strip null characters 

67 params = params.replace(',', ' ') 

68 return params.split() 

69 

70 def construct_search(self, field_name): 

71 lookup = self.lookup_prefixes.get(field_name[0]) 

72 if lookup: 

73 field_name = field_name[1:] 

74 else: 

75 lookup = 'icontains' 

76 return LOOKUP_SEP.join([field_name, lookup]) 

77 

78 def must_call_distinct(self, queryset, search_fields): 

79 """ 

80 Return True if 'distinct()' should be used to query the given lookups. 

81 """ 

82 for search_field in search_fields: 

83 opts = queryset.model._meta 

84 if search_field[0] in self.lookup_prefixes: 

85 search_field = search_field[1:] 

86 # Annotated fields do not need to be distinct 

87 if isinstance(queryset, models.QuerySet) and search_field in queryset.query.annotations: 

88 continue 

89 parts = search_field.split(LOOKUP_SEP) 

90 for part in parts: 

91 field = opts.get_field(part) 

92 if hasattr(field, 'get_path_info'): 

93 # This field is a relation, update opts to follow the relation 

94 path_info = field.get_path_info() 

95 opts = path_info[-1].to_opts 

96 if any(path.m2m for path in path_info): 

97 # This field is a m2m relation so we know we need to call distinct 

98 return True 

99 else: 

100 # This field has a custom __ query transform but is not a relational field. 

101 break 

102 return False 

103 

104 def filter_queryset(self, request, queryset, view): 

105 search_fields = self.get_search_fields(view, request) 

106 search_terms = self.get_search_terms(request) 

107 

108 if not search_fields or not search_terms: 108 ↛ 111line 108 didn't jump to line 111, because the condition on line 108 was never false

109 return queryset 

110 

111 orm_lookups = [ 

112 self.construct_search(str(search_field)) 

113 for search_field in search_fields 

114 ] 

115 

116 base = queryset 

117 conditions = [] 

118 for search_term in search_terms: 

119 queries = [ 

120 models.Q(**{orm_lookup: search_term}) 

121 for orm_lookup in orm_lookups 

122 ] 

123 conditions.append(reduce(operator.or_, queries)) 

124 queryset = queryset.filter(reduce(operator.and_, conditions)) 

125 

126 if self.must_call_distinct(queryset, search_fields): 

127 # Filtering against a many-to-many field requires us to 

128 # call queryset.distinct() in order to avoid duplicate items 

129 # in the resulting queryset. 

130 # We try to avoid this if possible, for performance reasons. 

131 queryset = distinct(queryset, base) 

132 return queryset 

133 

134 def to_html(self, request, queryset, view): 

135 if not getattr(view, 'search_fields', None): 

136 return '' 

137 

138 term = self.get_search_terms(request) 

139 term = term[0] if term else '' 

140 context = { 

141 'param': self.search_param, 

142 'term': term 

143 } 

144 template = loader.get_template(self.template) 

145 return template.render(context) 

146 

147 def get_schema_fields(self, view): 

148 assert coreapi is not None, 'coreapi must be installed to use `get_schema_fields()`' 

149 assert coreschema is not None, 'coreschema must be installed to use `get_schema_fields()`' 

150 return [ 

151 coreapi.Field( 

152 name=self.search_param, 

153 required=False, 

154 location='query', 

155 schema=coreschema.String( 

156 title=force_str(self.search_title), 

157 description=force_str(self.search_description) 

158 ) 

159 ) 

160 ] 

161 

162 def get_schema_operation_parameters(self, view): 

163 return [ 

164 { 

165 'name': self.search_param, 

166 'required': False, 

167 'in': 'query', 

168 'description': force_str(self.search_description), 

169 'schema': { 

170 'type': 'string', 

171 }, 

172 }, 

173 ] 

174 

175 

176class OrderingFilter(BaseFilterBackend): 

177 # The URL query parameter used for the ordering. 

178 ordering_param = api_settings.ORDERING_PARAM 

179 ordering_fields = None 

180 ordering_title = _('Ordering') 

181 ordering_description = _('Which field to use when ordering the results.') 

182 template = 'rest_framework/filters/ordering.html' 

183 

184 def get_ordering(self, request, queryset, view): 

185 """ 

186 Ordering is set by a comma delimited ?ordering=... query parameter. 

187 

188 The `ordering` query parameter can be overridden by setting 

189 the `ordering_param` value on the OrderingFilter or by 

190 specifying an `ORDERING_PARAM` value in the API settings. 

191 """ 

192 params = request.query_params.get(self.ordering_param) 

193 if params: 

194 fields = [param.strip() for param in params.split(',')] 

195 ordering = self.remove_invalid_fields(queryset, fields, view, request) 

196 if ordering: 

197 return ordering 

198 

199 # No ordering was included, or all the ordering fields were invalid 

200 return self.get_default_ordering(view) 

201 

202 def get_default_ordering(self, view): 

203 ordering = getattr(view, 'ordering', None) 

204 if isinstance(ordering, str): 

205 return (ordering,) 

206 return ordering 

207 

208 def get_default_valid_fields(self, queryset, view, context={}): 

209 # If `ordering_fields` is not specified, then we determine a default 

210 # based on the serializer class, if one exists on the view. 

211 if hasattr(view, 'get_serializer_class'): 

212 try: 

213 serializer_class = view.get_serializer_class() 

214 except AssertionError: 

215 # Raised by the default implementation if 

216 # no serializer_class was found 

217 serializer_class = None 

218 else: 

219 serializer_class = getattr(view, 'serializer_class', None) 

220 

221 if serializer_class is None: 

222 msg = ( 

223 "Cannot use %s on a view which does not have either a " 

224 "'serializer_class', an overriding 'get_serializer_class' " 

225 "or 'ordering_fields' attribute." 

226 ) 

227 raise ImproperlyConfigured(msg % self.__class__.__name__) 

228 

229 model_class = queryset.model 

230 model_property_names = [ 

231 # 'pk' is a property added in Django's Model class, however it is valid for ordering. 

232 attr for attr in dir(model_class) if isinstance(getattr(model_class, attr), property) and attr != 'pk' 

233 ] 

234 

235 return [ 

236 (field.source.replace('.', '__') or field_name, field.label) 

237 for field_name, field in serializer_class(context=context).fields.items() 

238 if ( 

239 not getattr(field, 'write_only', False) and 

240 not field.source == '*' and 

241 field.source not in model_property_names 

242 ) 

243 ] 

244 

245 def get_valid_fields(self, queryset, view, context={}): 

246 valid_fields = getattr(view, 'ordering_fields', self.ordering_fields) 

247 

248 if valid_fields is None: 

249 # Default to allowing filtering on serializer fields 

250 return self.get_default_valid_fields(queryset, view, context) 

251 

252 elif valid_fields == '__all__': 

253 # View explicitly allows filtering on any model field 

254 valid_fields = [ 

255 (field.name, field.verbose_name) for field in queryset.model._meta.fields 

256 ] 

257 valid_fields += [ 

258 (key, key.title().split('__')) 

259 for key in queryset.query.annotations 

260 ] 

261 else: 

262 valid_fields = [ 

263 (item, item) if isinstance(item, str) else item 

264 for item in valid_fields 

265 ] 

266 

267 return valid_fields 

268 

269 def remove_invalid_fields(self, queryset, fields, view, request): 

270 valid_fields = [item[0] for item in self.get_valid_fields(queryset, view, {'request': request})] 

271 

272 def term_valid(term): 

273 if term.startswith("-"): 

274 term = term[1:] 

275 return term in valid_fields 

276 

277 return [term for term in fields if term_valid(term)] 

278 

279 def filter_queryset(self, request, queryset, view): 

280 ordering = self.get_ordering(request, queryset, view) 

281 

282 if ordering: 

283 return queryset.order_by(*ordering) 

284 

285 return queryset 

286 

287 def get_template_context(self, request, queryset, view): 

288 current = self.get_ordering(request, queryset, view) 

289 current = None if not current else current[0] 

290 options = [] 

291 context = { 

292 'request': request, 

293 'current': current, 

294 'param': self.ordering_param, 

295 } 

296 for key, label in self.get_valid_fields(queryset, view, context): 

297 options.append((key, '%s - %s' % (label, _('ascending')))) 

298 options.append(('-' + key, '%s - %s' % (label, _('descending')))) 

299 context['options'] = options 

300 return context 

301 

302 def to_html(self, request, queryset, view): 

303 template = loader.get_template(self.template) 

304 context = self.get_template_context(request, queryset, view) 

305 return template.render(context) 

306 

307 def get_schema_fields(self, view): 

308 assert coreapi is not None, 'coreapi must be installed to use `get_schema_fields()`' 

309 assert coreschema is not None, 'coreschema must be installed to use `get_schema_fields()`' 

310 return [ 

311 coreapi.Field( 

312 name=self.ordering_param, 

313 required=False, 

314 location='query', 

315 schema=coreschema.String( 

316 title=force_str(self.ordering_title), 

317 description=force_str(self.ordering_description) 

318 ) 

319 ) 

320 ] 

321 

322 def get_schema_operation_parameters(self, view): 

323 return [ 

324 { 

325 'name': self.ordering_param, 

326 'required': False, 

327 'in': 'query', 

328 'description': force_str(self.ordering_description), 

329 'schema': { 

330 'type': 'string', 

331 }, 

332 }, 

333 ]