Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework/filters.py: 27%
157 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Provides generic filtering backends that can be used to filter the results
3returned by list views.
4"""
5import operator
6from functools import reduce
8from django.core.exceptions import ImproperlyConfigured
9from django.db import models
10from django.db.models.constants import LOOKUP_SEP
11from django.template import loader
12from django.utils.encoding import force_str
13from django.utils.translation import gettext_lazy as _
15from rest_framework.compat import coreapi, coreschema, distinct
16from rest_framework.settings import api_settings
19class BaseFilterBackend:
20 """
21 A base class from which all filter backend classes should inherit.
22 """
24 def filter_queryset(self, request, queryset, view):
25 """
26 Return a filtered queryset.
27 """
28 raise NotImplementedError(".filter_queryset() must be overridden.")
30 def get_schema_fields(self, view):
31 assert coreapi is not None, 'coreapi must be installed to use `get_schema_fields()`'
32 assert coreschema is not None, 'coreschema must be installed to use `get_schema_fields()`'
33 return []
35 def get_schema_operation_parameters(self, view):
36 return []
39class SearchFilter(BaseFilterBackend):
40 # The URL query parameter used for the search.
41 search_param = api_settings.SEARCH_PARAM
42 template = 'rest_framework/filters/search.html'
43 lookup_prefixes = {
44 '^': 'istartswith',
45 '=': 'iexact',
46 '@': 'search',
47 '$': 'iregex',
48 }
49 search_title = _('Search')
50 search_description = _('A search term.')
52 def get_search_fields(self, view, request):
53 """
54 Search fields are obtained from the view, but the request is always
55 passed to this method. Sub-classes can override this method to
56 dynamically change the search fields based on request content.
57 """
58 return getattr(view, 'search_fields', None)
60 def get_search_terms(self, request):
61 """
62 Search terms are set by a ?search=... query parameter,
63 and may be comma and/or whitespace delimited.
64 """
65 params = request.query_params.get(self.search_param, '')
66 params = params.replace('\x00', '') # strip null characters
67 params = params.replace(',', ' ')
68 return params.split()
70 def construct_search(self, field_name):
71 lookup = self.lookup_prefixes.get(field_name[0])
72 if lookup:
73 field_name = field_name[1:]
74 else:
75 lookup = 'icontains'
76 return LOOKUP_SEP.join([field_name, lookup])
78 def must_call_distinct(self, queryset, search_fields):
79 """
80 Return True if 'distinct()' should be used to query the given lookups.
81 """
82 for search_field in search_fields:
83 opts = queryset.model._meta
84 if search_field[0] in self.lookup_prefixes:
85 search_field = search_field[1:]
86 # Annotated fields do not need to be distinct
87 if isinstance(queryset, models.QuerySet) and search_field in queryset.query.annotations:
88 continue
89 parts = search_field.split(LOOKUP_SEP)
90 for part in parts:
91 field = opts.get_field(part)
92 if hasattr(field, 'get_path_info'):
93 # This field is a relation, update opts to follow the relation
94 path_info = field.get_path_info()
95 opts = path_info[-1].to_opts
96 if any(path.m2m for path in path_info):
97 # This field is a m2m relation so we know we need to call distinct
98 return True
99 else:
100 # This field has a custom __ query transform but is not a relational field.
101 break
102 return False
104 def filter_queryset(self, request, queryset, view):
105 search_fields = self.get_search_fields(view, request)
106 search_terms = self.get_search_terms(request)
108 if not search_fields or not search_terms: 108 ↛ 111line 108 didn't jump to line 111, because the condition on line 108 was never false
109 return queryset
111 orm_lookups = [
112 self.construct_search(str(search_field))
113 for search_field in search_fields
114 ]
116 base = queryset
117 conditions = []
118 for search_term in search_terms:
119 queries = [
120 models.Q(**{orm_lookup: search_term})
121 for orm_lookup in orm_lookups
122 ]
123 conditions.append(reduce(operator.or_, queries))
124 queryset = queryset.filter(reduce(operator.and_, conditions))
126 if self.must_call_distinct(queryset, search_fields):
127 # Filtering against a many-to-many field requires us to
128 # call queryset.distinct() in order to avoid duplicate items
129 # in the resulting queryset.
130 # We try to avoid this if possible, for performance reasons.
131 queryset = distinct(queryset, base)
132 return queryset
134 def to_html(self, request, queryset, view):
135 if not getattr(view, 'search_fields', None):
136 return ''
138 term = self.get_search_terms(request)
139 term = term[0] if term else ''
140 context = {
141 'param': self.search_param,
142 'term': term
143 }
144 template = loader.get_template(self.template)
145 return template.render(context)
147 def get_schema_fields(self, view):
148 assert coreapi is not None, 'coreapi must be installed to use `get_schema_fields()`'
149 assert coreschema is not None, 'coreschema must be installed to use `get_schema_fields()`'
150 return [
151 coreapi.Field(
152 name=self.search_param,
153 required=False,
154 location='query',
155 schema=coreschema.String(
156 title=force_str(self.search_title),
157 description=force_str(self.search_description)
158 )
159 )
160 ]
162 def get_schema_operation_parameters(self, view):
163 return [
164 {
165 'name': self.search_param,
166 'required': False,
167 'in': 'query',
168 'description': force_str(self.search_description),
169 'schema': {
170 'type': 'string',
171 },
172 },
173 ]
176class OrderingFilter(BaseFilterBackend):
177 # The URL query parameter used for the ordering.
178 ordering_param = api_settings.ORDERING_PARAM
179 ordering_fields = None
180 ordering_title = _('Ordering')
181 ordering_description = _('Which field to use when ordering the results.')
182 template = 'rest_framework/filters/ordering.html'
184 def get_ordering(self, request, queryset, view):
185 """
186 Ordering is set by a comma delimited ?ordering=... query parameter.
188 The `ordering` query parameter can be overridden by setting
189 the `ordering_param` value on the OrderingFilter or by
190 specifying an `ORDERING_PARAM` value in the API settings.
191 """
192 params = request.query_params.get(self.ordering_param)
193 if params:
194 fields = [param.strip() for param in params.split(',')]
195 ordering = self.remove_invalid_fields(queryset, fields, view, request)
196 if ordering:
197 return ordering
199 # No ordering was included, or all the ordering fields were invalid
200 return self.get_default_ordering(view)
202 def get_default_ordering(self, view):
203 ordering = getattr(view, 'ordering', None)
204 if isinstance(ordering, str):
205 return (ordering,)
206 return ordering
208 def get_default_valid_fields(self, queryset, view, context={}):
209 # If `ordering_fields` is not specified, then we determine a default
210 # based on the serializer class, if one exists on the view.
211 if hasattr(view, 'get_serializer_class'):
212 try:
213 serializer_class = view.get_serializer_class()
214 except AssertionError:
215 # Raised by the default implementation if
216 # no serializer_class was found
217 serializer_class = None
218 else:
219 serializer_class = getattr(view, 'serializer_class', None)
221 if serializer_class is None:
222 msg = (
223 "Cannot use %s on a view which does not have either a "
224 "'serializer_class', an overriding 'get_serializer_class' "
225 "or 'ordering_fields' attribute."
226 )
227 raise ImproperlyConfigured(msg % self.__class__.__name__)
229 model_class = queryset.model
230 model_property_names = [
231 # 'pk' is a property added in Django's Model class, however it is valid for ordering.
232 attr for attr in dir(model_class) if isinstance(getattr(model_class, attr), property) and attr != 'pk'
233 ]
235 return [
236 (field.source.replace('.', '__') or field_name, field.label)
237 for field_name, field in serializer_class(context=context).fields.items()
238 if (
239 not getattr(field, 'write_only', False) and
240 not field.source == '*' and
241 field.source not in model_property_names
242 )
243 ]
245 def get_valid_fields(self, queryset, view, context={}):
246 valid_fields = getattr(view, 'ordering_fields', self.ordering_fields)
248 if valid_fields is None:
249 # Default to allowing filtering on serializer fields
250 return self.get_default_valid_fields(queryset, view, context)
252 elif valid_fields == '__all__':
253 # View explicitly allows filtering on any model field
254 valid_fields = [
255 (field.name, field.verbose_name) for field in queryset.model._meta.fields
256 ]
257 valid_fields += [
258 (key, key.title().split('__'))
259 for key in queryset.query.annotations
260 ]
261 else:
262 valid_fields = [
263 (item, item) if isinstance(item, str) else item
264 for item in valid_fields
265 ]
267 return valid_fields
269 def remove_invalid_fields(self, queryset, fields, view, request):
270 valid_fields = [item[0] for item in self.get_valid_fields(queryset, view, {'request': request})]
272 def term_valid(term):
273 if term.startswith("-"):
274 term = term[1:]
275 return term in valid_fields
277 return [term for term in fields if term_valid(term)]
279 def filter_queryset(self, request, queryset, view):
280 ordering = self.get_ordering(request, queryset, view)
282 if ordering:
283 return queryset.order_by(*ordering)
285 return queryset
287 def get_template_context(self, request, queryset, view):
288 current = self.get_ordering(request, queryset, view)
289 current = None if not current else current[0]
290 options = []
291 context = {
292 'request': request,
293 'current': current,
294 'param': self.ordering_param,
295 }
296 for key, label in self.get_valid_fields(queryset, view, context):
297 options.append((key, '%s - %s' % (label, _('ascending'))))
298 options.append(('-' + key, '%s - %s' % (label, _('descending'))))
299 context['options'] = options
300 return context
302 def to_html(self, request, queryset, view):
303 template = loader.get_template(self.template)
304 context = self.get_template_context(request, queryset, view)
305 return template.render(context)
307 def get_schema_fields(self, view):
308 assert coreapi is not None, 'coreapi must be installed to use `get_schema_fields()`'
309 assert coreschema is not None, 'coreschema must be installed to use `get_schema_fields()`'
310 return [
311 coreapi.Field(
312 name=self.ordering_param,
313 required=False,
314 location='query',
315 schema=coreschema.String(
316 title=force_str(self.ordering_title),
317 description=force_str(self.ordering_description)
318 )
319 )
320 ]
322 def get_schema_operation_parameters(self, view):
323 return [
324 {
325 'name': self.ordering_param,
326 'required': False,
327 'in': 'query',
328 'description': force_str(self.ordering_description),
329 'schema': {
330 'type': 'string',
331 },
332 },
333 ]