Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework/schemas/coreapi.py: 14%

285 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import warnings 

2from collections import Counter, OrderedDict 

3from urllib import parse 

4 

5from django.db import models 

6from django.utils.encoding import force_str 

7 

8from rest_framework import exceptions, serializers 

9from rest_framework.compat import coreapi, coreschema, uritemplate 

10from rest_framework.settings import api_settings 

11 

12from .generators import BaseSchemaGenerator 

13from .inspectors import ViewInspector 

14from .utils import get_pk_description, is_list_view 

15 

16 

17def common_path(paths): 

18 split_paths = [path.strip('/').split('/') for path in paths] 

19 s1 = min(split_paths) 

20 s2 = max(split_paths) 

21 common = s1 

22 for i, c in enumerate(s1): 

23 if c != s2[i]: 

24 common = s1[:i] 

25 break 

26 return '/' + '/'.join(common) 

27 

28 

29def is_custom_action(action): 

30 return action not in { 

31 'retrieve', 'list', 'create', 'update', 'partial_update', 'destroy' 

32 } 

33 

34 

35def distribute_links(obj): 

36 for key, value in obj.items(): 

37 distribute_links(value) 

38 

39 for preferred_key, link in obj.links: 

40 key = obj.get_available_key(preferred_key) 

41 obj[key] = link 

42 

43 

44INSERT_INTO_COLLISION_FMT = """ 

45Schema Naming Collision. 

46 

47coreapi.Link for URL path {value_url} cannot be inserted into schema. 

48Position conflicts with coreapi.Link for URL path {target_url}. 

49 

50Attempted to insert link with keys: {keys}. 

51 

52Adjust URLs to avoid naming collision or override `SchemaGenerator.get_keys()` 

53to customise schema structure. 

54""" 

55 

56 

57class LinkNode(OrderedDict): 

58 def __init__(self): 

59 self.links = [] 

60 self.methods_counter = Counter() 

61 super().__init__() 

62 

63 def get_available_key(self, preferred_key): 

64 if preferred_key not in self: 

65 return preferred_key 

66 

67 while True: 

68 current_val = self.methods_counter[preferred_key] 

69 self.methods_counter[preferred_key] += 1 

70 

71 key = '{}_{}'.format(preferred_key, current_val) 

72 if key not in self: 

73 return key 

74 

75 

76def insert_into(target, keys, value): 

77 """ 

78 Nested dictionary insertion. 

79 

80 >>> example = {} 

81 >>> insert_into(example, ['a', 'b', 'c'], 123) 

82 >>> example 

83 LinkNode({'a': LinkNode({'b': LinkNode({'c': LinkNode(links=[123])}}}))) 

84 """ 

85 for key in keys[:-1]: 

86 if key not in target: 

87 target[key] = LinkNode() 

88 target = target[key] 

89 

90 try: 

91 target.links.append((keys[-1], value)) 

92 except TypeError: 

93 msg = INSERT_INTO_COLLISION_FMT.format( 

94 value_url=value.url, 

95 target_url=target.url, 

96 keys=keys 

97 ) 

98 raise ValueError(msg) 

99 

100 

101class SchemaGenerator(BaseSchemaGenerator): 

102 """ 

103 Original CoreAPI version. 

104 """ 

105 # Map HTTP methods onto actions. 

106 default_mapping = { 

107 'get': 'retrieve', 

108 'post': 'create', 

109 'put': 'update', 

110 'patch': 'partial_update', 

111 'delete': 'destroy', 

112 } 

113 

114 # Map the method names we use for viewset actions onto external schema names. 

115 # These give us names that are more suitable for the external representation. 

116 # Set by 'SCHEMA_COERCE_METHOD_NAMES'. 

117 coerce_method_names = None 

118 

119 def __init__(self, title=None, url=None, description=None, patterns=None, urlconf=None, version=None): 

120 assert coreapi, '`coreapi` must be installed for schema support.' 

121 assert coreschema, '`coreschema` must be installed for schema support.' 

122 

123 super().__init__(title, url, description, patterns, urlconf) 

124 self.coerce_method_names = api_settings.SCHEMA_COERCE_METHOD_NAMES 

125 

126 def get_links(self, request=None): 

127 """ 

128 Return a dictionary containing all the links that should be 

129 included in the API schema. 

130 """ 

131 links = LinkNode() 

132 

133 paths, view_endpoints = self._get_paths_and_endpoints(request) 

134 

135 # Only generate the path prefix for paths that will be included 

136 if not paths: 

137 return None 

138 prefix = self.determine_path_prefix(paths) 

139 

140 for path, method, view in view_endpoints: 

141 if not self.has_view_permissions(path, method, view): 

142 continue 

143 link = view.schema.get_link(path, method, base_url=self.url) 

144 subpath = path[len(prefix):] 

145 keys = self.get_keys(subpath, method, view) 

146 insert_into(links, keys, link) 

147 

148 return links 

149 

150 def get_schema(self, request=None, public=False): 

151 """ 

152 Generate a `coreapi.Document` representing the API schema. 

153 """ 

154 self._initialise_endpoints() 

155 

156 links = self.get_links(None if public else request) 

157 if not links: 

158 return None 

159 

160 url = self.url 

161 if not url and request is not None: 

162 url = request.build_absolute_uri() 

163 

164 distribute_links(links) 

165 return coreapi.Document( 

166 title=self.title, description=self.description, 

167 url=url, content=links 

168 ) 

169 

170 # Method for generating the link layout.... 

171 def get_keys(self, subpath, method, view): 

172 """ 

173 Return a list of keys that should be used to layout a link within 

174 the schema document. 

175 

176 /users/ ("users", "list"), ("users", "create") 

177 /users/{pk}/ ("users", "read"), ("users", "update"), ("users", "delete") 

178 /users/enabled/ ("users", "enabled") # custom viewset list action 

179 /users/{pk}/star/ ("users", "star") # custom viewset detail action 

180 /users/{pk}/groups/ ("users", "groups", "list"), ("users", "groups", "create") 

181 /users/{pk}/groups/{pk}/ ("users", "groups", "read"), ("users", "groups", "update"), ("users", "groups", "delete") 

182 """ 

183 if hasattr(view, 'action'): 

184 # Viewsets have explicitly named actions. 

185 action = view.action 

186 else: 

187 # Views have no associated action, so we determine one from the method. 

188 if is_list_view(subpath, method, view): 

189 action = 'list' 

190 else: 

191 action = self.default_mapping[method.lower()] 

192 

193 named_path_components = [ 

194 component for component 

195 in subpath.strip('/').split('/') 

196 if '{' not in component 

197 ] 

198 

199 if is_custom_action(action): 

200 # Custom action, eg "/users/{pk}/activate/", "/users/active/" 

201 if len(view.action_map) > 1: 

202 action = self.default_mapping[method.lower()] 

203 if action in self.coerce_method_names: 

204 action = self.coerce_method_names[action] 

205 return named_path_components + [action] 

206 else: 

207 return named_path_components[:-1] + [action] 

208 

209 if action in self.coerce_method_names: 

210 action = self.coerce_method_names[action] 

211 

212 # Default action, eg "/users/", "/users/{pk}/" 

213 return named_path_components + [action] 

214 

215 def determine_path_prefix(self, paths): 

216 """ 

217 Given a list of all paths, return the common prefix which should be 

218 discounted when generating a schema structure. 

219 

220 This will be the longest common string that does not include that last 

221 component of the URL, or the last component before a path parameter. 

222 

223 For example: 

224 

225 /api/v1/users/ 

226 /api/v1/users/{pk}/ 

227 

228 The path prefix is '/api/v1' 

229 """ 

230 prefixes = [] 

231 for path in paths: 

232 components = path.strip('/').split('/') 

233 initial_components = [] 

234 for component in components: 

235 if '{' in component: 

236 break 

237 initial_components.append(component) 

238 prefix = '/'.join(initial_components[:-1]) 

239 if not prefix: 

240 # We can just break early in the case that there's at least 

241 # one URL that doesn't have a path prefix. 

242 return '/' 

243 prefixes.append('/' + prefix + '/') 

244 return common_path(prefixes) 

245 

246# View Inspectors # 

247 

248 

249def field_to_schema(field): 

250 title = force_str(field.label) if field.label else '' 

251 description = force_str(field.help_text) if field.help_text else '' 

252 

253 if isinstance(field, (serializers.ListSerializer, serializers.ListField)): 

254 child_schema = field_to_schema(field.child) 

255 return coreschema.Array( 

256 items=child_schema, 

257 title=title, 

258 description=description 

259 ) 

260 elif isinstance(field, serializers.DictField): 

261 return coreschema.Object( 

262 title=title, 

263 description=description 

264 ) 

265 elif isinstance(field, serializers.Serializer): 

266 return coreschema.Object( 

267 properties=OrderedDict([ 

268 (key, field_to_schema(value)) 

269 for key, value 

270 in field.fields.items() 

271 ]), 

272 title=title, 

273 description=description 

274 ) 

275 elif isinstance(field, serializers.ManyRelatedField): 

276 related_field_schema = field_to_schema(field.child_relation) 

277 

278 return coreschema.Array( 

279 items=related_field_schema, 

280 title=title, 

281 description=description 

282 ) 

283 elif isinstance(field, serializers.PrimaryKeyRelatedField): 

284 schema_cls = coreschema.String 

285 model = getattr(field.queryset, 'model', None) 

286 if model is not None: 

287 model_field = model._meta.pk 

288 if isinstance(model_field, models.AutoField): 

289 schema_cls = coreschema.Integer 

290 return schema_cls(title=title, description=description) 

291 elif isinstance(field, serializers.RelatedField): 

292 return coreschema.String(title=title, description=description) 

293 elif isinstance(field, serializers.MultipleChoiceField): 

294 return coreschema.Array( 

295 items=coreschema.Enum(enum=list(field.choices)), 

296 title=title, 

297 description=description 

298 ) 

299 elif isinstance(field, serializers.ChoiceField): 

300 return coreschema.Enum( 

301 enum=list(field.choices), 

302 title=title, 

303 description=description 

304 ) 

305 elif isinstance(field, serializers.BooleanField): 

306 return coreschema.Boolean(title=title, description=description) 

307 elif isinstance(field, (serializers.DecimalField, serializers.FloatField)): 

308 return coreschema.Number(title=title, description=description) 

309 elif isinstance(field, serializers.IntegerField): 

310 return coreschema.Integer(title=title, description=description) 

311 elif isinstance(field, serializers.DateField): 

312 return coreschema.String( 

313 title=title, 

314 description=description, 

315 format='date' 

316 ) 

317 elif isinstance(field, serializers.DateTimeField): 

318 return coreschema.String( 

319 title=title, 

320 description=description, 

321 format='date-time' 

322 ) 

323 elif isinstance(field, serializers.JSONField): 

324 return coreschema.Object(title=title, description=description) 

325 

326 if field.style.get('base_template') == 'textarea.html': 

327 return coreschema.String( 

328 title=title, 

329 description=description, 

330 format='textarea' 

331 ) 

332 

333 return coreschema.String(title=title, description=description) 

334 

335 

336class AutoSchema(ViewInspector): 

337 """ 

338 Default inspector for APIView 

339 

340 Responsible for per-view introspection and schema generation. 

341 """ 

342 def __init__(self, manual_fields=None): 

343 """ 

344 Parameters: 

345 

346 * `manual_fields`: list of `coreapi.Field` instances that 

347 will be added to auto-generated fields, overwriting on `Field.name` 

348 """ 

349 super().__init__() 

350 if manual_fields is None: 350 ↛ 352line 350 didn't jump to line 352, because the condition on line 350 was never false

351 manual_fields = [] 

352 self._manual_fields = manual_fields 

353 

354 def get_link(self, path, method, base_url): 

355 """ 

356 Generate `coreapi.Link` for self.view, path and method. 

357 

358 This is the main _public_ access point. 

359 

360 Parameters: 

361 

362 * path: Route path for view from URLConf. 

363 * method: The HTTP request method. 

364 * base_url: The project "mount point" as given to SchemaGenerator 

365 """ 

366 fields = self.get_path_fields(path, method) 

367 fields += self.get_serializer_fields(path, method) 

368 fields += self.get_pagination_fields(path, method) 

369 fields += self.get_filter_fields(path, method) 

370 

371 manual_fields = self.get_manual_fields(path, method) 

372 fields = self.update_fields(fields, manual_fields) 

373 

374 if fields and any([field.location in ('form', 'body') for field in fields]): 

375 encoding = self.get_encoding(path, method) 

376 else: 

377 encoding = None 

378 

379 description = self.get_description(path, method) 

380 

381 if base_url and path.startswith('/'): 

382 path = path[1:] 

383 

384 return coreapi.Link( 

385 url=parse.urljoin(base_url, path), 

386 action=method.lower(), 

387 encoding=encoding, 

388 fields=fields, 

389 description=description 

390 ) 

391 

392 def get_path_fields(self, path, method): 

393 """ 

394 Return a list of `coreapi.Field` instances corresponding to any 

395 templated path variables. 

396 """ 

397 view = self.view 

398 model = getattr(getattr(view, 'queryset', None), 'model', None) 

399 fields = [] 

400 

401 for variable in uritemplate.variables(path): 

402 title = '' 

403 description = '' 

404 schema_cls = coreschema.String 

405 kwargs = {} 

406 if model is not None: 

407 # Attempt to infer a field description if possible. 

408 try: 

409 model_field = model._meta.get_field(variable) 

410 except Exception: 

411 model_field = None 

412 

413 if model_field is not None and model_field.verbose_name: 

414 title = force_str(model_field.verbose_name) 

415 

416 if model_field is not None and model_field.help_text: 

417 description = force_str(model_field.help_text) 

418 elif model_field is not None and model_field.primary_key: 

419 description = get_pk_description(model, model_field) 

420 

421 if hasattr(view, 'lookup_value_regex') and view.lookup_field == variable: 

422 kwargs['pattern'] = view.lookup_value_regex 

423 elif isinstance(model_field, models.AutoField): 

424 schema_cls = coreschema.Integer 

425 

426 field = coreapi.Field( 

427 name=variable, 

428 location='path', 

429 required=True, 

430 schema=schema_cls(title=title, description=description, **kwargs) 

431 ) 

432 fields.append(field) 

433 

434 return fields 

435 

436 def get_serializer_fields(self, path, method): 

437 """ 

438 Return a list of `coreapi.Field` instances corresponding to any 

439 request body input, as determined by the serializer class. 

440 """ 

441 view = self.view 

442 

443 if method not in ('PUT', 'PATCH', 'POST'): 

444 return [] 

445 

446 if not hasattr(view, 'get_serializer'): 

447 return [] 

448 

449 try: 

450 serializer = view.get_serializer() 

451 except exceptions.APIException: 

452 serializer = None 

453 warnings.warn('{}.get_serializer() raised an exception during ' 

454 'schema generation. Serializer fields will not be ' 

455 'generated for {} {}.' 

456 .format(view.__class__.__name__, method, path)) 

457 

458 if isinstance(serializer, serializers.ListSerializer): 

459 return [ 

460 coreapi.Field( 

461 name='data', 

462 location='body', 

463 required=True, 

464 schema=coreschema.Array() 

465 ) 

466 ] 

467 

468 if not isinstance(serializer, serializers.Serializer): 

469 return [] 

470 

471 fields = [] 

472 for field in serializer.fields.values(): 

473 if field.read_only or isinstance(field, serializers.HiddenField): 

474 continue 

475 

476 required = field.required and method != 'PATCH' 

477 field = coreapi.Field( 

478 name=field.field_name, 

479 location='form', 

480 required=required, 

481 schema=field_to_schema(field) 

482 ) 

483 fields.append(field) 

484 

485 return fields 

486 

487 def get_pagination_fields(self, path, method): 

488 view = self.view 

489 

490 if not is_list_view(path, method, view): 

491 return [] 

492 

493 pagination = getattr(view, 'pagination_class', None) 

494 if not pagination: 

495 return [] 

496 

497 paginator = view.pagination_class() 

498 return paginator.get_schema_fields(view) 

499 

500 def _allows_filters(self, path, method): 

501 """ 

502 Determine whether to include filter Fields in schema. 

503 

504 Default implementation looks for ModelViewSet or GenericAPIView 

505 actions/methods that cause filtering on the default implementation. 

506 

507 Override to adjust behaviour for your view. 

508 

509 Note: Introduced in v3.7: Initially "private" (i.e. with leading underscore) 

510 to allow changes based on user experience. 

511 """ 

512 if getattr(self.view, 'filter_backends', None) is None: 

513 return False 

514 

515 if hasattr(self.view, 'action'): 

516 return self.view.action in ["list", "retrieve", "update", "partial_update", "destroy"] 

517 

518 return method.lower() in ["get", "put", "patch", "delete"] 

519 

520 def get_filter_fields(self, path, method): 

521 if not self._allows_filters(path, method): 

522 return [] 

523 

524 fields = [] 

525 for filter_backend in self.view.filter_backends: 

526 fields += filter_backend().get_schema_fields(self.view) 

527 return fields 

528 

529 def get_manual_fields(self, path, method): 

530 return self._manual_fields 

531 

532 @staticmethod 

533 def update_fields(fields, update_with): 

534 """ 

535 Update list of coreapi.Field instances, overwriting on `Field.name`. 

536 

537 Utility function to handle replacing coreapi.Field fields 

538 from a list by name. Used to handle `manual_fields`. 

539 

540 Parameters: 

541 

542 * `fields`: list of `coreapi.Field` instances to update 

543 * `update_with: list of `coreapi.Field` instances to add or replace. 

544 """ 

545 if not update_with: 

546 return fields 

547 

548 by_name = OrderedDict((f.name, f) for f in fields) 

549 for f in update_with: 

550 by_name[f.name] = f 

551 fields = list(by_name.values()) 

552 return fields 

553 

554 def get_encoding(self, path, method): 

555 """ 

556 Return the 'encoding' parameter to use for a given endpoint. 

557 """ 

558 view = self.view 

559 

560 # Core API supports the following request encodings over HTTP... 

561 supported_media_types = { 

562 'application/json', 

563 'application/x-www-form-urlencoded', 

564 'multipart/form-data', 

565 } 

566 parser_classes = getattr(view, 'parser_classes', []) 

567 for parser_class in parser_classes: 

568 media_type = getattr(parser_class, 'media_type', None) 

569 if media_type in supported_media_types: 

570 return media_type 

571 # Raw binary uploads are supported with "application/octet-stream" 

572 if media_type == '*/*': 

573 return 'application/octet-stream' 

574 

575 return None 

576 

577 

578class ManualSchema(ViewInspector): 

579 """ 

580 Allows providing a list of coreapi.Fields, 

581 plus an optional description. 

582 """ 

583 def __init__(self, fields, description='', encoding=None): 

584 """ 

585 Parameters: 

586 

587 * `fields`: list of `coreapi.Field` instances. 

588 * `description`: String description for view. Optional. 

589 """ 

590 super().__init__() 

591 assert all(isinstance(f, coreapi.Field) for f in fields), "`fields` must be a list of coreapi.Field instances" 

592 self._fields = fields 

593 self._description = description 

594 self._encoding = encoding 

595 

596 def get_link(self, path, method, base_url): 

597 

598 if base_url and path.startswith('/'): 

599 path = path[1:] 

600 

601 return coreapi.Link( 

602 url=parse.urljoin(base_url, path), 

603 action=method.lower(), 

604 encoding=self._encoding, 

605 fields=self._fields, 

606 description=self._description 

607 ) 

608 

609 

610def is_enabled(): 

611 """Is CoreAPI Mode enabled?""" 

612 return issubclass(api_settings.DEFAULT_SCHEMA_CLASS, AutoSchema)