Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework/schemas/coreapi.py: 14%
285 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import warnings
2from collections import Counter, OrderedDict
3from urllib import parse
5from django.db import models
6from django.utils.encoding import force_str
8from rest_framework import exceptions, serializers
9from rest_framework.compat import coreapi, coreschema, uritemplate
10from rest_framework.settings import api_settings
12from .generators import BaseSchemaGenerator
13from .inspectors import ViewInspector
14from .utils import get_pk_description, is_list_view
17def common_path(paths):
18 split_paths = [path.strip('/').split('/') for path in paths]
19 s1 = min(split_paths)
20 s2 = max(split_paths)
21 common = s1
22 for i, c in enumerate(s1):
23 if c != s2[i]:
24 common = s1[:i]
25 break
26 return '/' + '/'.join(common)
29def is_custom_action(action):
30 return action not in {
31 'retrieve', 'list', 'create', 'update', 'partial_update', 'destroy'
32 }
35def distribute_links(obj):
36 for key, value in obj.items():
37 distribute_links(value)
39 for preferred_key, link in obj.links:
40 key = obj.get_available_key(preferred_key)
41 obj[key] = link
44INSERT_INTO_COLLISION_FMT = """
45Schema Naming Collision.
47coreapi.Link for URL path {value_url} cannot be inserted into schema.
48Position conflicts with coreapi.Link for URL path {target_url}.
50Attempted to insert link with keys: {keys}.
52Adjust URLs to avoid naming collision or override `SchemaGenerator.get_keys()`
53to customise schema structure.
54"""
57class LinkNode(OrderedDict):
58 def __init__(self):
59 self.links = []
60 self.methods_counter = Counter()
61 super().__init__()
63 def get_available_key(self, preferred_key):
64 if preferred_key not in self:
65 return preferred_key
67 while True:
68 current_val = self.methods_counter[preferred_key]
69 self.methods_counter[preferred_key] += 1
71 key = '{}_{}'.format(preferred_key, current_val)
72 if key not in self:
73 return key
76def insert_into(target, keys, value):
77 """
78 Nested dictionary insertion.
80 >>> example = {}
81 >>> insert_into(example, ['a', 'b', 'c'], 123)
82 >>> example
83 LinkNode({'a': LinkNode({'b': LinkNode({'c': LinkNode(links=[123])}}})))
84 """
85 for key in keys[:-1]:
86 if key not in target:
87 target[key] = LinkNode()
88 target = target[key]
90 try:
91 target.links.append((keys[-1], value))
92 except TypeError:
93 msg = INSERT_INTO_COLLISION_FMT.format(
94 value_url=value.url,
95 target_url=target.url,
96 keys=keys
97 )
98 raise ValueError(msg)
101class SchemaGenerator(BaseSchemaGenerator):
102 """
103 Original CoreAPI version.
104 """
105 # Map HTTP methods onto actions.
106 default_mapping = {
107 'get': 'retrieve',
108 'post': 'create',
109 'put': 'update',
110 'patch': 'partial_update',
111 'delete': 'destroy',
112 }
114 # Map the method names we use for viewset actions onto external schema names.
115 # These give us names that are more suitable for the external representation.
116 # Set by 'SCHEMA_COERCE_METHOD_NAMES'.
117 coerce_method_names = None
119 def __init__(self, title=None, url=None, description=None, patterns=None, urlconf=None, version=None):
120 assert coreapi, '`coreapi` must be installed for schema support.'
121 assert coreschema, '`coreschema` must be installed for schema support.'
123 super().__init__(title, url, description, patterns, urlconf)
124 self.coerce_method_names = api_settings.SCHEMA_COERCE_METHOD_NAMES
126 def get_links(self, request=None):
127 """
128 Return a dictionary containing all the links that should be
129 included in the API schema.
130 """
131 links = LinkNode()
133 paths, view_endpoints = self._get_paths_and_endpoints(request)
135 # Only generate the path prefix for paths that will be included
136 if not paths:
137 return None
138 prefix = self.determine_path_prefix(paths)
140 for path, method, view in view_endpoints:
141 if not self.has_view_permissions(path, method, view):
142 continue
143 link = view.schema.get_link(path, method, base_url=self.url)
144 subpath = path[len(prefix):]
145 keys = self.get_keys(subpath, method, view)
146 insert_into(links, keys, link)
148 return links
150 def get_schema(self, request=None, public=False):
151 """
152 Generate a `coreapi.Document` representing the API schema.
153 """
154 self._initialise_endpoints()
156 links = self.get_links(None if public else request)
157 if not links:
158 return None
160 url = self.url
161 if not url and request is not None:
162 url = request.build_absolute_uri()
164 distribute_links(links)
165 return coreapi.Document(
166 title=self.title, description=self.description,
167 url=url, content=links
168 )
170 # Method for generating the link layout....
171 def get_keys(self, subpath, method, view):
172 """
173 Return a list of keys that should be used to layout a link within
174 the schema document.
176 /users/ ("users", "list"), ("users", "create")
177 /users/{pk}/ ("users", "read"), ("users", "update"), ("users", "delete")
178 /users/enabled/ ("users", "enabled") # custom viewset list action
179 /users/{pk}/star/ ("users", "star") # custom viewset detail action
180 /users/{pk}/groups/ ("users", "groups", "list"), ("users", "groups", "create")
181 /users/{pk}/groups/{pk}/ ("users", "groups", "read"), ("users", "groups", "update"), ("users", "groups", "delete")
182 """
183 if hasattr(view, 'action'):
184 # Viewsets have explicitly named actions.
185 action = view.action
186 else:
187 # Views have no associated action, so we determine one from the method.
188 if is_list_view(subpath, method, view):
189 action = 'list'
190 else:
191 action = self.default_mapping[method.lower()]
193 named_path_components = [
194 component for component
195 in subpath.strip('/').split('/')
196 if '{' not in component
197 ]
199 if is_custom_action(action):
200 # Custom action, eg "/users/{pk}/activate/", "/users/active/"
201 if len(view.action_map) > 1:
202 action = self.default_mapping[method.lower()]
203 if action in self.coerce_method_names:
204 action = self.coerce_method_names[action]
205 return named_path_components + [action]
206 else:
207 return named_path_components[:-1] + [action]
209 if action in self.coerce_method_names:
210 action = self.coerce_method_names[action]
212 # Default action, eg "/users/", "/users/{pk}/"
213 return named_path_components + [action]
215 def determine_path_prefix(self, paths):
216 """
217 Given a list of all paths, return the common prefix which should be
218 discounted when generating a schema structure.
220 This will be the longest common string that does not include that last
221 component of the URL, or the last component before a path parameter.
223 For example:
225 /api/v1/users/
226 /api/v1/users/{pk}/
228 The path prefix is '/api/v1'
229 """
230 prefixes = []
231 for path in paths:
232 components = path.strip('/').split('/')
233 initial_components = []
234 for component in components:
235 if '{' in component:
236 break
237 initial_components.append(component)
238 prefix = '/'.join(initial_components[:-1])
239 if not prefix:
240 # We can just break early in the case that there's at least
241 # one URL that doesn't have a path prefix.
242 return '/'
243 prefixes.append('/' + prefix + '/')
244 return common_path(prefixes)
246# View Inspectors #
249def field_to_schema(field):
250 title = force_str(field.label) if field.label else ''
251 description = force_str(field.help_text) if field.help_text else ''
253 if isinstance(field, (serializers.ListSerializer, serializers.ListField)):
254 child_schema = field_to_schema(field.child)
255 return coreschema.Array(
256 items=child_schema,
257 title=title,
258 description=description
259 )
260 elif isinstance(field, serializers.DictField):
261 return coreschema.Object(
262 title=title,
263 description=description
264 )
265 elif isinstance(field, serializers.Serializer):
266 return coreschema.Object(
267 properties=OrderedDict([
268 (key, field_to_schema(value))
269 for key, value
270 in field.fields.items()
271 ]),
272 title=title,
273 description=description
274 )
275 elif isinstance(field, serializers.ManyRelatedField):
276 related_field_schema = field_to_schema(field.child_relation)
278 return coreschema.Array(
279 items=related_field_schema,
280 title=title,
281 description=description
282 )
283 elif isinstance(field, serializers.PrimaryKeyRelatedField):
284 schema_cls = coreschema.String
285 model = getattr(field.queryset, 'model', None)
286 if model is not None:
287 model_field = model._meta.pk
288 if isinstance(model_field, models.AutoField):
289 schema_cls = coreschema.Integer
290 return schema_cls(title=title, description=description)
291 elif isinstance(field, serializers.RelatedField):
292 return coreschema.String(title=title, description=description)
293 elif isinstance(field, serializers.MultipleChoiceField):
294 return coreschema.Array(
295 items=coreschema.Enum(enum=list(field.choices)),
296 title=title,
297 description=description
298 )
299 elif isinstance(field, serializers.ChoiceField):
300 return coreschema.Enum(
301 enum=list(field.choices),
302 title=title,
303 description=description
304 )
305 elif isinstance(field, serializers.BooleanField):
306 return coreschema.Boolean(title=title, description=description)
307 elif isinstance(field, (serializers.DecimalField, serializers.FloatField)):
308 return coreschema.Number(title=title, description=description)
309 elif isinstance(field, serializers.IntegerField):
310 return coreschema.Integer(title=title, description=description)
311 elif isinstance(field, serializers.DateField):
312 return coreschema.String(
313 title=title,
314 description=description,
315 format='date'
316 )
317 elif isinstance(field, serializers.DateTimeField):
318 return coreschema.String(
319 title=title,
320 description=description,
321 format='date-time'
322 )
323 elif isinstance(field, serializers.JSONField):
324 return coreschema.Object(title=title, description=description)
326 if field.style.get('base_template') == 'textarea.html':
327 return coreschema.String(
328 title=title,
329 description=description,
330 format='textarea'
331 )
333 return coreschema.String(title=title, description=description)
336class AutoSchema(ViewInspector):
337 """
338 Default inspector for APIView
340 Responsible for per-view introspection and schema generation.
341 """
342 def __init__(self, manual_fields=None):
343 """
344 Parameters:
346 * `manual_fields`: list of `coreapi.Field` instances that
347 will be added to auto-generated fields, overwriting on `Field.name`
348 """
349 super().__init__()
350 if manual_fields is None: 350 ↛ 352line 350 didn't jump to line 352, because the condition on line 350 was never false
351 manual_fields = []
352 self._manual_fields = manual_fields
354 def get_link(self, path, method, base_url):
355 """
356 Generate `coreapi.Link` for self.view, path and method.
358 This is the main _public_ access point.
360 Parameters:
362 * path: Route path for view from URLConf.
363 * method: The HTTP request method.
364 * base_url: The project "mount point" as given to SchemaGenerator
365 """
366 fields = self.get_path_fields(path, method)
367 fields += self.get_serializer_fields(path, method)
368 fields += self.get_pagination_fields(path, method)
369 fields += self.get_filter_fields(path, method)
371 manual_fields = self.get_manual_fields(path, method)
372 fields = self.update_fields(fields, manual_fields)
374 if fields and any([field.location in ('form', 'body') for field in fields]):
375 encoding = self.get_encoding(path, method)
376 else:
377 encoding = None
379 description = self.get_description(path, method)
381 if base_url and path.startswith('/'):
382 path = path[1:]
384 return coreapi.Link(
385 url=parse.urljoin(base_url, path),
386 action=method.lower(),
387 encoding=encoding,
388 fields=fields,
389 description=description
390 )
392 def get_path_fields(self, path, method):
393 """
394 Return a list of `coreapi.Field` instances corresponding to any
395 templated path variables.
396 """
397 view = self.view
398 model = getattr(getattr(view, 'queryset', None), 'model', None)
399 fields = []
401 for variable in uritemplate.variables(path):
402 title = ''
403 description = ''
404 schema_cls = coreschema.String
405 kwargs = {}
406 if model is not None:
407 # Attempt to infer a field description if possible.
408 try:
409 model_field = model._meta.get_field(variable)
410 except Exception:
411 model_field = None
413 if model_field is not None and model_field.verbose_name:
414 title = force_str(model_field.verbose_name)
416 if model_field is not None and model_field.help_text:
417 description = force_str(model_field.help_text)
418 elif model_field is not None and model_field.primary_key:
419 description = get_pk_description(model, model_field)
421 if hasattr(view, 'lookup_value_regex') and view.lookup_field == variable:
422 kwargs['pattern'] = view.lookup_value_regex
423 elif isinstance(model_field, models.AutoField):
424 schema_cls = coreschema.Integer
426 field = coreapi.Field(
427 name=variable,
428 location='path',
429 required=True,
430 schema=schema_cls(title=title, description=description, **kwargs)
431 )
432 fields.append(field)
434 return fields
436 def get_serializer_fields(self, path, method):
437 """
438 Return a list of `coreapi.Field` instances corresponding to any
439 request body input, as determined by the serializer class.
440 """
441 view = self.view
443 if method not in ('PUT', 'PATCH', 'POST'):
444 return []
446 if not hasattr(view, 'get_serializer'):
447 return []
449 try:
450 serializer = view.get_serializer()
451 except exceptions.APIException:
452 serializer = None
453 warnings.warn('{}.get_serializer() raised an exception during '
454 'schema generation. Serializer fields will not be '
455 'generated for {} {}.'
456 .format(view.__class__.__name__, method, path))
458 if isinstance(serializer, serializers.ListSerializer):
459 return [
460 coreapi.Field(
461 name='data',
462 location='body',
463 required=True,
464 schema=coreschema.Array()
465 )
466 ]
468 if not isinstance(serializer, serializers.Serializer):
469 return []
471 fields = []
472 for field in serializer.fields.values():
473 if field.read_only or isinstance(field, serializers.HiddenField):
474 continue
476 required = field.required and method != 'PATCH'
477 field = coreapi.Field(
478 name=field.field_name,
479 location='form',
480 required=required,
481 schema=field_to_schema(field)
482 )
483 fields.append(field)
485 return fields
487 def get_pagination_fields(self, path, method):
488 view = self.view
490 if not is_list_view(path, method, view):
491 return []
493 pagination = getattr(view, 'pagination_class', None)
494 if not pagination:
495 return []
497 paginator = view.pagination_class()
498 return paginator.get_schema_fields(view)
500 def _allows_filters(self, path, method):
501 """
502 Determine whether to include filter Fields in schema.
504 Default implementation looks for ModelViewSet or GenericAPIView
505 actions/methods that cause filtering on the default implementation.
507 Override to adjust behaviour for your view.
509 Note: Introduced in v3.7: Initially "private" (i.e. with leading underscore)
510 to allow changes based on user experience.
511 """
512 if getattr(self.view, 'filter_backends', None) is None:
513 return False
515 if hasattr(self.view, 'action'):
516 return self.view.action in ["list", "retrieve", "update", "partial_update", "destroy"]
518 return method.lower() in ["get", "put", "patch", "delete"]
520 def get_filter_fields(self, path, method):
521 if not self._allows_filters(path, method):
522 return []
524 fields = []
525 for filter_backend in self.view.filter_backends:
526 fields += filter_backend().get_schema_fields(self.view)
527 return fields
529 def get_manual_fields(self, path, method):
530 return self._manual_fields
532 @staticmethod
533 def update_fields(fields, update_with):
534 """
535 Update list of coreapi.Field instances, overwriting on `Field.name`.
537 Utility function to handle replacing coreapi.Field fields
538 from a list by name. Used to handle `manual_fields`.
540 Parameters:
542 * `fields`: list of `coreapi.Field` instances to update
543 * `update_with: list of `coreapi.Field` instances to add or replace.
544 """
545 if not update_with:
546 return fields
548 by_name = OrderedDict((f.name, f) for f in fields)
549 for f in update_with:
550 by_name[f.name] = f
551 fields = list(by_name.values())
552 return fields
554 def get_encoding(self, path, method):
555 """
556 Return the 'encoding' parameter to use for a given endpoint.
557 """
558 view = self.view
560 # Core API supports the following request encodings over HTTP...
561 supported_media_types = {
562 'application/json',
563 'application/x-www-form-urlencoded',
564 'multipart/form-data',
565 }
566 parser_classes = getattr(view, 'parser_classes', [])
567 for parser_class in parser_classes:
568 media_type = getattr(parser_class, 'media_type', None)
569 if media_type in supported_media_types:
570 return media_type
571 # Raw binary uploads are supported with "application/octet-stream"
572 if media_type == '*/*':
573 return 'application/octet-stream'
575 return None
578class ManualSchema(ViewInspector):
579 """
580 Allows providing a list of coreapi.Fields,
581 plus an optional description.
582 """
583 def __init__(self, fields, description='', encoding=None):
584 """
585 Parameters:
587 * `fields`: list of `coreapi.Field` instances.
588 * `description`: String description for view. Optional.
589 """
590 super().__init__()
591 assert all(isinstance(f, coreapi.Field) for f in fields), "`fields` must be a list of coreapi.Field instances"
592 self._fields = fields
593 self._description = description
594 self._encoding = encoding
596 def get_link(self, path, method, base_url):
598 if base_url and path.startswith('/'):
599 path = path[1:]
601 return coreapi.Link(
602 url=parse.urljoin(base_url, path),
603 action=method.lower(),
604 encoding=self._encoding,
605 fields=self._fields,
606 description=self._description
607 )
610def is_enabled():
611 """Is CoreAPI Mode enabled?"""
612 return issubclass(api_settings.DEFAULT_SCHEMA_CLASS, AutoSchema)