Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreapi/codecs/corejson.py: 12%

182 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import unicode_literals 

2from collections import OrderedDict 

3from coreapi.codecs.base import BaseCodec 

4from coreapi.compat import force_bytes, string_types, urlparse 

5from coreapi.compat import COMPACT_SEPARATORS, VERBOSE_SEPARATORS 

6from coreapi.document import Document, Link, Array, Object, Error, Field 

7from coreapi.exceptions import ParseError 

8import coreschema 

9import json 

10 

11 

12# Schema encoding and decoding. 

13# Just a naive first-pass at this point. 

14 

15SCHEMA_CLASS_TO_TYPE_ID = { 

16 coreschema.Object: 'object', 

17 coreschema.Array: 'array', 

18 coreschema.Number: 'number', 

19 coreschema.Integer: 'integer', 

20 coreschema.String: 'string', 

21 coreschema.Boolean: 'boolean', 

22 coreschema.Null: 'null', 

23 coreschema.Enum: 'enum', 

24 coreschema.Anything: 'anything' 

25} 

26 

27TYPE_ID_TO_SCHEMA_CLASS = { 

28 value: key 

29 for key, value 

30 in SCHEMA_CLASS_TO_TYPE_ID.items() 

31} 

32 

33 

34def encode_schema_to_corejson(schema): 

35 if hasattr(schema, 'typename'): 

36 type_id = schema.typename 

37 else: 

38 type_id = SCHEMA_CLASS_TO_TYPE_ID.get(schema.__class__, 'anything') 

39 retval = { 

40 '_type': type_id, 

41 'title': schema.title, 

42 'description': schema.description 

43 } 

44 if hasattr(schema, 'enum'): 

45 retval['enum'] = schema.enum 

46 return retval 

47 

48 

49def decode_schema_from_corejson(data): 

50 type_id = _get_string(data, '_type') 

51 title = _get_string(data, 'title') 

52 description = _get_string(data, 'description') 

53 

54 kwargs = {} 

55 if type_id == 'enum': 

56 kwargs['enum'] = _get_list(data, 'enum') 

57 

58 schema_cls = TYPE_ID_TO_SCHEMA_CLASS.get(type_id, coreschema.Anything) 

59 return schema_cls(title=title, description=description, **kwargs) 

60 

61 

62# Robust dictionary lookups, that always return an item of the correct 

63# type, using an empty default if an incorrect type exists. 

64# Useful for liberal parsing of inputs. 

65 

66def _get_schema(item, key): 

67 schema_data = _get_dict(item, key) 

68 if schema_data: 

69 return decode_schema_from_corejson(schema_data) 

70 return None 

71 

72 

73def _get_string(item, key): 

74 value = item.get(key) 

75 if isinstance(value, string_types): 

76 return value 

77 return '' 

78 

79 

80def _get_dict(item, key): 

81 value = item.get(key) 

82 if isinstance(value, dict): 

83 return value 

84 return {} 

85 

86 

87def _get_list(item, key): 

88 value = item.get(key) 

89 if isinstance(value, list): 

90 return value 

91 return [] 

92 

93 

94def _get_bool(item, key): 

95 value = item.get(key) 

96 if isinstance(value, bool): 

97 return value 

98 return False 

99 

100 

101def _graceful_relative_url(base_url, url): 

102 """ 

103 Return a graceful link for a URL relative to a base URL. 

104 

105 * If they are the same, return an empty string. 

106 * If the have the same scheme and hostname, return the path & query params. 

107 * Otherwise return the full URL. 

108 """ 

109 if url == base_url: 

110 return '' 

111 base_prefix = '%s://%s' % urlparse.urlparse(base_url or '')[0:2] 

112 url_prefix = '%s://%s' % urlparse.urlparse(url or '')[0:2] 

113 if base_prefix == url_prefix and url_prefix != '://': 

114 return url[len(url_prefix):] 

115 return url 

116 

117 

118def _escape_key(string): 

119 """ 

120 The '_type' and '_meta' keys are reserved. 

121 Prefix with an additional '_' if they occur. 

122 """ 

123 if string.startswith('_') and string.lstrip('_') in ('type', 'meta'): 

124 return '_' + string 

125 return string 

126 

127 

128def _unescape_key(string): 

129 """ 

130 Unescape '__type' and '__meta' keys if they occur. 

131 """ 

132 if string.startswith('__') and string.lstrip('_') in ('type', 'meta'): 

133 return string[1:] 

134 return string 

135 

136 

137def _get_content(item, base_url=None): 

138 """ 

139 Return a dictionary of content, for documents, objects and errors. 

140 """ 

141 return { 

142 _unescape_key(key): _primitive_to_document(value, base_url) 

143 for key, value in item.items() 

144 if key not in ('_type', '_meta') 

145 } 

146 

147 

148def _document_to_primitive(node, base_url=None): 

149 """ 

150 Take a Core API document and return Python primitives 

151 ready to be rendered into the JSON style encoding. 

152 """ 

153 if isinstance(node, Document): 

154 ret = OrderedDict() 

155 ret['_type'] = 'document' 

156 

157 meta = OrderedDict() 

158 url = _graceful_relative_url(base_url, node.url) 

159 if url: 

160 meta['url'] = url 

161 if node.title: 

162 meta['title'] = node.title 

163 if node.description: 

164 meta['description'] = node.description 

165 if meta: 

166 ret['_meta'] = meta 

167 

168 # Fill in key-value content. 

169 ret.update([ 

170 (_escape_key(key), _document_to_primitive(value, base_url=url)) 

171 for key, value in node.items() 

172 ]) 

173 return ret 

174 

175 elif isinstance(node, Error): 

176 ret = OrderedDict() 

177 ret['_type'] = 'error' 

178 

179 if node.title: 

180 ret['_meta'] = {'title': node.title} 

181 

182 # Fill in key-value content. 

183 ret.update([ 

184 (_escape_key(key), _document_to_primitive(value, base_url=base_url)) 

185 for key, value in node.items() 

186 ]) 

187 return ret 

188 

189 elif isinstance(node, Link): 

190 ret = OrderedDict() 

191 ret['_type'] = 'link' 

192 url = _graceful_relative_url(base_url, node.url) 

193 if url: 

194 ret['url'] = url 

195 if node.action: 

196 ret['action'] = node.action 

197 if node.encoding: 

198 ret['encoding'] = node.encoding 

199 if node.transform: 

200 ret['transform'] = node.transform 

201 if node.title: 

202 ret['title'] = node.title 

203 if node.description: 

204 ret['description'] = node.description 

205 if node.fields: 

206 ret['fields'] = [ 

207 _document_to_primitive(field) for field in node.fields 

208 ] 

209 return ret 

210 

211 elif isinstance(node, Field): 

212 ret = OrderedDict({'name': node.name}) 

213 if node.required: 

214 ret['required'] = node.required 

215 if node.location: 

216 ret['location'] = node.location 

217 if node.schema: 

218 ret['schema'] = encode_schema_to_corejson(node.schema) 

219 return ret 

220 

221 elif isinstance(node, Object): 

222 return OrderedDict([ 

223 (_escape_key(key), _document_to_primitive(value, base_url=base_url)) 

224 for key, value in node.items() 

225 ]) 

226 

227 elif isinstance(node, Array): 

228 return [_document_to_primitive(value) for value in node] 

229 

230 return node 

231 

232 

233def _primitive_to_document(data, base_url=None): 

234 """ 

235 Take Python primitives as returned from parsing JSON content, 

236 and return a Core API document. 

237 """ 

238 if isinstance(data, dict) and data.get('_type') == 'document': 

239 # Document 

240 meta = _get_dict(data, '_meta') 

241 url = _get_string(meta, 'url') 

242 url = urlparse.urljoin(base_url, url) 

243 title = _get_string(meta, 'title') 

244 description = _get_string(meta, 'description') 

245 content = _get_content(data, base_url=url) 

246 return Document( 

247 url=url, 

248 title=title, 

249 description=description, 

250 media_type='application/coreapi+json', 

251 content=content 

252 ) 

253 

254 if isinstance(data, dict) and data.get('_type') == 'error': 

255 # Error 

256 meta = _get_dict(data, '_meta') 

257 title = _get_string(meta, 'title') 

258 content = _get_content(data, base_url=base_url) 

259 return Error(title=title, content=content) 

260 

261 elif isinstance(data, dict) and data.get('_type') == 'link': 

262 # Link 

263 url = _get_string(data, 'url') 

264 url = urlparse.urljoin(base_url, url) 

265 action = _get_string(data, 'action') 

266 encoding = _get_string(data, 'encoding') 

267 transform = _get_string(data, 'transform') 

268 title = _get_string(data, 'title') 

269 description = _get_string(data, 'description') 

270 fields = _get_list(data, 'fields') 

271 fields = [ 

272 Field( 

273 name=_get_string(item, 'name'), 

274 required=_get_bool(item, 'required'), 

275 location=_get_string(item, 'location'), 

276 schema=_get_schema(item, 'schema') 

277 ) 

278 for item in fields if isinstance(item, dict) 

279 ] 

280 return Link( 

281 url=url, action=action, encoding=encoding, transform=transform, 

282 title=title, description=description, fields=fields 

283 ) 

284 

285 elif isinstance(data, dict): 

286 # Map 

287 content = _get_content(data, base_url=base_url) 

288 return Object(content) 

289 

290 elif isinstance(data, list): 

291 # Array 

292 content = [_primitive_to_document(item, base_url) for item in data] 

293 return Array(content) 

294 

295 # String, Integer, Number, Boolean, null. 

296 return data 

297 

298 

299class CoreJSONCodec(BaseCodec): 

300 media_type = 'application/coreapi+json' 

301 format = 'corejson' 

302 

303 # The following is due to be deprecated... 

304 media_types = ['application/coreapi+json', 'application/vnd.coreapi+json'] 

305 

306 def decode(self, bytestring, **options): 

307 """ 

308 Takes a bytestring and returns a document. 

309 """ 

310 base_url = options.get('base_url') 

311 

312 try: 

313 data = json.loads(bytestring.decode('utf-8')) 

314 except ValueError as exc: 

315 raise ParseError('Malformed JSON. %s' % exc) 

316 

317 doc = _primitive_to_document(data, base_url) 

318 

319 if isinstance(doc, Object): 

320 doc = Document(content=dict(doc)) 

321 elif not (isinstance(doc, Document) or isinstance(doc, Error)): 

322 raise ParseError('Top level node should be a document or error.') 

323 

324 return doc 

325 

326 def encode(self, document, **options): 

327 """ 

328 Takes a document and returns a bytestring. 

329 """ 

330 indent = options.get('indent') 

331 

332 if indent: 

333 kwargs = { 

334 'ensure_ascii': False, 

335 'indent': 4, 

336 'separators': VERBOSE_SEPARATORS 

337 } 

338 else: 

339 kwargs = { 

340 'ensure_ascii': False, 

341 'indent': None, 

342 'separators': COMPACT_SEPARATORS 

343 } 

344 

345 data = _document_to_primitive(document) 

346 return force_bytes(json.dumps(data, **kwargs))