Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreapi/codecs/corejson.py: 12%
182 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import unicode_literals
2from collections import OrderedDict
3from coreapi.codecs.base import BaseCodec
4from coreapi.compat import force_bytes, string_types, urlparse
5from coreapi.compat import COMPACT_SEPARATORS, VERBOSE_SEPARATORS
6from coreapi.document import Document, Link, Array, Object, Error, Field
7from coreapi.exceptions import ParseError
8import coreschema
9import json
12# Schema encoding and decoding.
13# Just a naive first-pass at this point.
15SCHEMA_CLASS_TO_TYPE_ID = {
16 coreschema.Object: 'object',
17 coreschema.Array: 'array',
18 coreschema.Number: 'number',
19 coreschema.Integer: 'integer',
20 coreschema.String: 'string',
21 coreschema.Boolean: 'boolean',
22 coreschema.Null: 'null',
23 coreschema.Enum: 'enum',
24 coreschema.Anything: 'anything'
25}
27TYPE_ID_TO_SCHEMA_CLASS = {
28 value: key
29 for key, value
30 in SCHEMA_CLASS_TO_TYPE_ID.items()
31}
34def encode_schema_to_corejson(schema):
35 if hasattr(schema, 'typename'):
36 type_id = schema.typename
37 else:
38 type_id = SCHEMA_CLASS_TO_TYPE_ID.get(schema.__class__, 'anything')
39 retval = {
40 '_type': type_id,
41 'title': schema.title,
42 'description': schema.description
43 }
44 if hasattr(schema, 'enum'):
45 retval['enum'] = schema.enum
46 return retval
49def decode_schema_from_corejson(data):
50 type_id = _get_string(data, '_type')
51 title = _get_string(data, 'title')
52 description = _get_string(data, 'description')
54 kwargs = {}
55 if type_id == 'enum':
56 kwargs['enum'] = _get_list(data, 'enum')
58 schema_cls = TYPE_ID_TO_SCHEMA_CLASS.get(type_id, coreschema.Anything)
59 return schema_cls(title=title, description=description, **kwargs)
62# Robust dictionary lookups, that always return an item of the correct
63# type, using an empty default if an incorrect type exists.
64# Useful for liberal parsing of inputs.
66def _get_schema(item, key):
67 schema_data = _get_dict(item, key)
68 if schema_data:
69 return decode_schema_from_corejson(schema_data)
70 return None
73def _get_string(item, key):
74 value = item.get(key)
75 if isinstance(value, string_types):
76 return value
77 return ''
80def _get_dict(item, key):
81 value = item.get(key)
82 if isinstance(value, dict):
83 return value
84 return {}
87def _get_list(item, key):
88 value = item.get(key)
89 if isinstance(value, list):
90 return value
91 return []
94def _get_bool(item, key):
95 value = item.get(key)
96 if isinstance(value, bool):
97 return value
98 return False
101def _graceful_relative_url(base_url, url):
102 """
103 Return a graceful link for a URL relative to a base URL.
105 * If they are the same, return an empty string.
106 * If the have the same scheme and hostname, return the path & query params.
107 * Otherwise return the full URL.
108 """
109 if url == base_url:
110 return ''
111 base_prefix = '%s://%s' % urlparse.urlparse(base_url or '')[0:2]
112 url_prefix = '%s://%s' % urlparse.urlparse(url or '')[0:2]
113 if base_prefix == url_prefix and url_prefix != '://':
114 return url[len(url_prefix):]
115 return url
118def _escape_key(string):
119 """
120 The '_type' and '_meta' keys are reserved.
121 Prefix with an additional '_' if they occur.
122 """
123 if string.startswith('_') and string.lstrip('_') in ('type', 'meta'):
124 return '_' + string
125 return string
128def _unescape_key(string):
129 """
130 Unescape '__type' and '__meta' keys if they occur.
131 """
132 if string.startswith('__') and string.lstrip('_') in ('type', 'meta'):
133 return string[1:]
134 return string
137def _get_content(item, base_url=None):
138 """
139 Return a dictionary of content, for documents, objects and errors.
140 """
141 return {
142 _unescape_key(key): _primitive_to_document(value, base_url)
143 for key, value in item.items()
144 if key not in ('_type', '_meta')
145 }
148def _document_to_primitive(node, base_url=None):
149 """
150 Take a Core API document and return Python primitives
151 ready to be rendered into the JSON style encoding.
152 """
153 if isinstance(node, Document):
154 ret = OrderedDict()
155 ret['_type'] = 'document'
157 meta = OrderedDict()
158 url = _graceful_relative_url(base_url, node.url)
159 if url:
160 meta['url'] = url
161 if node.title:
162 meta['title'] = node.title
163 if node.description:
164 meta['description'] = node.description
165 if meta:
166 ret['_meta'] = meta
168 # Fill in key-value content.
169 ret.update([
170 (_escape_key(key), _document_to_primitive(value, base_url=url))
171 for key, value in node.items()
172 ])
173 return ret
175 elif isinstance(node, Error):
176 ret = OrderedDict()
177 ret['_type'] = 'error'
179 if node.title:
180 ret['_meta'] = {'title': node.title}
182 # Fill in key-value content.
183 ret.update([
184 (_escape_key(key), _document_to_primitive(value, base_url=base_url))
185 for key, value in node.items()
186 ])
187 return ret
189 elif isinstance(node, Link):
190 ret = OrderedDict()
191 ret['_type'] = 'link'
192 url = _graceful_relative_url(base_url, node.url)
193 if url:
194 ret['url'] = url
195 if node.action:
196 ret['action'] = node.action
197 if node.encoding:
198 ret['encoding'] = node.encoding
199 if node.transform:
200 ret['transform'] = node.transform
201 if node.title:
202 ret['title'] = node.title
203 if node.description:
204 ret['description'] = node.description
205 if node.fields:
206 ret['fields'] = [
207 _document_to_primitive(field) for field in node.fields
208 ]
209 return ret
211 elif isinstance(node, Field):
212 ret = OrderedDict({'name': node.name})
213 if node.required:
214 ret['required'] = node.required
215 if node.location:
216 ret['location'] = node.location
217 if node.schema:
218 ret['schema'] = encode_schema_to_corejson(node.schema)
219 return ret
221 elif isinstance(node, Object):
222 return OrderedDict([
223 (_escape_key(key), _document_to_primitive(value, base_url=base_url))
224 for key, value in node.items()
225 ])
227 elif isinstance(node, Array):
228 return [_document_to_primitive(value) for value in node]
230 return node
233def _primitive_to_document(data, base_url=None):
234 """
235 Take Python primitives as returned from parsing JSON content,
236 and return a Core API document.
237 """
238 if isinstance(data, dict) and data.get('_type') == 'document':
239 # Document
240 meta = _get_dict(data, '_meta')
241 url = _get_string(meta, 'url')
242 url = urlparse.urljoin(base_url, url)
243 title = _get_string(meta, 'title')
244 description = _get_string(meta, 'description')
245 content = _get_content(data, base_url=url)
246 return Document(
247 url=url,
248 title=title,
249 description=description,
250 media_type='application/coreapi+json',
251 content=content
252 )
254 if isinstance(data, dict) and data.get('_type') == 'error':
255 # Error
256 meta = _get_dict(data, '_meta')
257 title = _get_string(meta, 'title')
258 content = _get_content(data, base_url=base_url)
259 return Error(title=title, content=content)
261 elif isinstance(data, dict) and data.get('_type') == 'link':
262 # Link
263 url = _get_string(data, 'url')
264 url = urlparse.urljoin(base_url, url)
265 action = _get_string(data, 'action')
266 encoding = _get_string(data, 'encoding')
267 transform = _get_string(data, 'transform')
268 title = _get_string(data, 'title')
269 description = _get_string(data, 'description')
270 fields = _get_list(data, 'fields')
271 fields = [
272 Field(
273 name=_get_string(item, 'name'),
274 required=_get_bool(item, 'required'),
275 location=_get_string(item, 'location'),
276 schema=_get_schema(item, 'schema')
277 )
278 for item in fields if isinstance(item, dict)
279 ]
280 return Link(
281 url=url, action=action, encoding=encoding, transform=transform,
282 title=title, description=description, fields=fields
283 )
285 elif isinstance(data, dict):
286 # Map
287 content = _get_content(data, base_url=base_url)
288 return Object(content)
290 elif isinstance(data, list):
291 # Array
292 content = [_primitive_to_document(item, base_url) for item in data]
293 return Array(content)
295 # String, Integer, Number, Boolean, null.
296 return data
299class CoreJSONCodec(BaseCodec):
300 media_type = 'application/coreapi+json'
301 format = 'corejson'
303 # The following is due to be deprecated...
304 media_types = ['application/coreapi+json', 'application/vnd.coreapi+json']
306 def decode(self, bytestring, **options):
307 """
308 Takes a bytestring and returns a document.
309 """
310 base_url = options.get('base_url')
312 try:
313 data = json.loads(bytestring.decode('utf-8'))
314 except ValueError as exc:
315 raise ParseError('Malformed JSON. %s' % exc)
317 doc = _primitive_to_document(data, base_url)
319 if isinstance(doc, Object):
320 doc = Document(content=dict(doc))
321 elif not (isinstance(doc, Document) or isinstance(doc, Error)):
322 raise ParseError('Top level node should be a document or error.')
324 return doc
326 def encode(self, document, **options):
327 """
328 Takes a document and returns a bytestring.
329 """
330 indent = options.get('indent')
332 if indent:
333 kwargs = {
334 'ensure_ascii': False,
335 'indent': 4,
336 'separators': VERBOSE_SEPARATORS
337 }
338 else:
339 kwargs = {
340 'ensure_ascii': False,
341 'indent': None,
342 'separators': COMPACT_SEPARATORS
343 }
345 data = _document_to_primitive(document)
346 return force_bytes(json.dumps(data, **kwargs))