Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreapi/utils.py: 13%

137 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from coreapi import exceptions 

2from coreapi.compat import string_types, text_type, urlparse, _TemporaryFileWrapper 

3from collections import namedtuple 

4import os 

5import pkg_resources 

6import tempfile 

7 

8 

9def domain_matches(request, domain): 

10 """ 

11 Domain string matching against an outgoing request. 

12 Patterns starting with '*' indicate a wildcard domain. 

13 """ 

14 if (domain is None) or (domain == '*'): 

15 return True 

16 

17 host = urlparse.urlparse(request.url).hostname 

18 if domain.startswith('*'): 

19 return host.endswith(domain[1:]) 

20 return host == domain 

21 

22 

23def get_installed_codecs(): 

24 packages = [ 

25 (package, package.load()) for package in 

26 pkg_resources.iter_entry_points(group='coreapi.codecs') 

27 ] 

28 return { 

29 package.name: cls() for (package, cls) in packages 

30 } 

31 

32 

33# File utilities for upload and download support. 

34 

35File = namedtuple('File', 'name content content_type') 

36File.__new__.__defaults__ = (None,) 

37 

38 

39def is_file(obj): 

40 if isinstance(obj, File): 

41 return True 

42 

43 if hasattr(obj, '__iter__') and not isinstance(obj, (string_types, list, tuple, dict)): 

44 # A stream object. 

45 return True 

46 

47 return False 

48 

49 

50def guess_filename(obj): 

51 name = getattr(obj, 'name', None) 

52 if name and isinstance(name, string_types) and name[0] != '<' and name[-1] != '>': 

53 return os.path.basename(name) 

54 return None 

55 

56 

57def guess_extension(content_type): 

58 """ 

59 Python's `mimetypes.guess_extension` is no use because it simply returns 

60 the first of an unordered set. We use the same set of media types here, 

61 but take a reasonable preference on what extension to map to. 

62 """ 

63 return { 

64 'application/javascript': '.js', 

65 'application/msword': '.doc', 

66 'application/octet-stream': '.bin', 

67 'application/oda': '.oda', 

68 'application/pdf': '.pdf', 

69 'application/pkcs7-mime': '.p7c', 

70 'application/postscript': '.ps', 

71 'application/vnd.apple.mpegurl': '.m3u', 

72 'application/vnd.ms-excel': '.xls', 

73 'application/vnd.ms-powerpoint': '.ppt', 

74 'application/x-bcpio': '.bcpio', 

75 'application/x-cpio': '.cpio', 

76 'application/x-csh': '.csh', 

77 'application/x-dvi': '.dvi', 

78 'application/x-gtar': '.gtar', 

79 'application/x-hdf': '.hdf', 

80 'application/x-latex': '.latex', 

81 'application/x-mif': '.mif', 

82 'application/x-netcdf': '.nc', 

83 'application/x-pkcs12': '.p12', 

84 'application/x-pn-realaudio': '.ram', 

85 'application/x-python-code': '.pyc', 

86 'application/x-sh': '.sh', 

87 'application/x-shar': '.shar', 

88 'application/x-shockwave-flash': '.swf', 

89 'application/x-sv4cpio': '.sv4cpio', 

90 'application/x-sv4crc': '.sv4crc', 

91 'application/x-tar': '.tar', 

92 'application/x-tcl': '.tcl', 

93 'application/x-tex': '.tex', 

94 'application/x-texinfo': '.texinfo', 

95 'application/x-troff': '.tr', 

96 'application/x-troff-man': '.man', 

97 'application/x-troff-me': '.me', 

98 'application/x-troff-ms': '.ms', 

99 'application/x-ustar': '.ustar', 

100 'application/x-wais-source': '.src', 

101 'application/xml': '.xml', 

102 'application/zip': '.zip', 

103 'audio/basic': '.au', 

104 'audio/mpeg': '.mp3', 

105 'audio/x-aiff': '.aif', 

106 'audio/x-pn-realaudio': '.ra', 

107 'audio/x-wav': '.wav', 

108 'image/gif': '.gif', 

109 'image/ief': '.ief', 

110 'image/jpeg': '.jpe', 

111 'image/png': '.png', 

112 'image/svg+xml': '.svg', 

113 'image/tiff': '.tiff', 

114 'image/vnd.microsoft.icon': '.ico', 

115 'image/x-cmu-raster': '.ras', 

116 'image/x-ms-bmp': '.bmp', 

117 'image/x-portable-anymap': '.pnm', 

118 'image/x-portable-bitmap': '.pbm', 

119 'image/x-portable-graymap': '.pgm', 

120 'image/x-portable-pixmap': '.ppm', 

121 'image/x-rgb': '.rgb', 

122 'image/x-xbitmap': '.xbm', 

123 'image/x-xpixmap': '.xpm', 

124 'image/x-xwindowdump': '.xwd', 

125 'message/rfc822': '.eml', 

126 'text/css': '.css', 

127 'text/csv': '.csv', 

128 'text/html': '.html', 

129 'text/plain': '.txt', 

130 'text/richtext': '.rtx', 

131 'text/tab-separated-values': '.tsv', 

132 'text/x-python': '.py', 

133 'text/x-setext': '.etx', 

134 'text/x-sgml': '.sgml', 

135 'text/x-vcard': '.vcf', 

136 'text/xml': '.xml', 

137 'video/mp4': '.mp4', 

138 'video/mpeg': '.mpeg', 

139 'video/quicktime': '.mov', 

140 'video/webm': '.webm', 

141 'video/x-msvideo': '.avi', 

142 'video/x-sgi-movie': '.movie' 

143 }.get(content_type, '') 

144 

145 

146if _TemporaryFileWrapper: 146 ↛ 162line 146 didn't jump to line 162, because the condition on line 146 was never false

147 # Ideally we subclass this so that we can present a custom representation. 

148 class DownloadedFile(_TemporaryFileWrapper): 

149 basename = None 

150 

151 def __repr__(self): 

152 state = "closed" if self.closed else "open" 

153 mode = "" if self.closed else " '%s'" % self.file.mode 

154 return "<DownloadedFile '%s', %s%s>" % (self.name, state, mode) 

155 

156 def __str__(self): 

157 return self.__repr__() 

158else: 

159 # On some platforms (eg GAE) the private _TemporaryFileWrapper may not be 

160 # available, just use the standard `NamedTemporaryFile` function 

161 # in this case. 

162 DownloadedFile = tempfile.NamedTemporaryFile 

163 

164 

165# Negotiation utilities. USed to determine which codec or transport class 

166# should be used, given a list of supported instances. 

167 

168def determine_transport(transports, url): 

169 """ 

170 Given a URL determine the appropriate transport instance. 

171 """ 

172 url_components = urlparse.urlparse(url) 

173 scheme = url_components.scheme.lower() 

174 netloc = url_components.netloc 

175 

176 if not scheme: 

177 raise exceptions.NetworkError("URL missing scheme '%s'." % url) 

178 

179 if not netloc: 

180 raise exceptions.NetworkError("URL missing hostname '%s'." % url) 

181 

182 for transport in transports: 

183 if scheme in transport.schemes: 

184 return transport 

185 

186 raise exceptions.NetworkError("Unsupported URL scheme '%s'." % scheme) 

187 

188 

189def negotiate_decoder(decoders, content_type=None): 

190 """ 

191 Given the value of a 'Content-Type' header, return the appropriate 

192 codec for decoding the request content. 

193 """ 

194 if content_type is None: 

195 return decoders[0] 

196 

197 content_type = content_type.split(';')[0].strip().lower() 

198 main_type = content_type.split('/')[0] + '/*' 

199 wildcard_type = '*/*' 

200 

201 for codec in decoders: 

202 for media_type in codec.get_media_types(): 

203 if media_type in (content_type, main_type, wildcard_type): 

204 return codec 

205 

206 msg = "Unsupported media in Content-Type header '%s'" % content_type 

207 raise exceptions.NoCodecAvailable(msg) 

208 

209 

210def negotiate_encoder(encoders, accept=None): 

211 """ 

212 Given the value of a 'Accept' header, return the appropriate codec for 

213 encoding the response content. 

214 """ 

215 if accept is None: 

216 return encoders[0] 

217 

218 acceptable = set([ 

219 item.split(';')[0].strip().lower() 

220 for item in accept.split(',') 

221 ]) 

222 

223 for codec in encoders: 

224 for media_type in codec.get_media_types(): 

225 if media_type in acceptable: 

226 return codec 

227 

228 for codec in encoders: 

229 for media_type in codec.get_media_types(): 

230 if codec.media_type.split('/')[0] + '/*' in acceptable: 

231 return codec 

232 

233 if '*/*' in acceptable: 

234 return encoders[0] 

235 

236 msg = "Unsupported media in Accept header '%s'" % accept 

237 raise exceptions.NoCodecAvailable(msg) 

238 

239 

240# Validation utilities. Used to ensure that we get consistent validation 

241# exceptions when invalid types are passed as a parameter, rather than 

242# an exception occuring when the request is made. 

243 

244def validate_path_param(value): 

245 value = _validate_form_field(value, allow_list=False) 

246 if not value: 

247 msg = 'Parameter %s: May not be empty.' 

248 raise exceptions.ParameterError(msg) 

249 return value 

250 

251 

252def validate_query_param(value): 

253 return _validate_form_field(value) 

254 

255 

256def validate_body_param(value, encoding): 

257 if encoding == 'application/json': 

258 return _validate_json_data(value) 

259 elif encoding == 'multipart/form-data': 

260 return _validate_form_object(value, allow_files=True) 

261 elif encoding == 'application/x-www-form-urlencoded': 

262 return _validate_form_object(value) 

263 elif encoding == 'application/octet-stream': 

264 if not is_file(value): 

265 msg = 'Must be an file upload.' 

266 raise exceptions.ParameterError(msg) 

267 return value 

268 msg = 'Unsupported encoding "%s" for outgoing request.' 

269 raise exceptions.NetworkError(msg % encoding) 

270 

271 

272def validate_form_param(value, encoding): 

273 if encoding == 'application/json': 

274 return _validate_json_data(value) 

275 elif encoding == 'multipart/form-data': 

276 return _validate_form_field(value, allow_files=True) 

277 elif encoding == 'application/x-www-form-urlencoded': 

278 return _validate_form_field(value) 

279 msg = 'Unsupported encoding "%s" for outgoing request.' 

280 raise exceptions.NetworkError(msg % encoding) 

281 

282 

283def _validate_form_object(value, allow_files=False): 

284 """ 

285 Ensure that `value` can be encoded as form data or as query parameters. 

286 """ 

287 if not isinstance(value, dict): 

288 msg = 'Must be an object.' 

289 raise exceptions.ParameterError(msg) 

290 return { 

291 text_type(item_key): _validate_form_field(item_val, allow_files=allow_files) 

292 for item_key, item_val in value.items() 

293 } 

294 

295 

296def _validate_form_field(value, allow_files=False, allow_list=True): 

297 """ 

298 Ensure that `value` can be encoded as a single form data or a query parameter. 

299 Basic types that has a simple string representation are supported. 

300 A list of basic types is also valid. 

301 """ 

302 if isinstance(value, string_types): 

303 return value 

304 elif isinstance(value, bool) or (value is None): 

305 return {True: 'true', False: 'false', None: ''}[value] 

306 elif isinstance(value, (int, float)): 

307 return "%s" % value 

308 elif allow_list and isinstance(value, (list, tuple)) and not is_file(value): 

309 # Only the top-level element may be a list. 

310 return [ 

311 _validate_form_field(item, allow_files=False, allow_list=False) 

312 for item in value 

313 ] 

314 elif allow_files and is_file(value): 

315 return value 

316 

317 msg = 'Must be a primitive type.' 

318 raise exceptions.ParameterError(msg) 

319 

320 

321def _validate_json_data(value): 

322 """ 

323 Ensure that `value` can be encoded into JSON. 

324 """ 

325 if (value is None) or isinstance(value, (bool, int, float, string_types)): 

326 return value 

327 elif isinstance(value, (list, tuple)) and not is_file(value): 

328 return [_validate_json_data(item) for item in value] 

329 elif isinstance(value, dict): 

330 return { 

331 text_type(item_key): _validate_json_data(item_val) 

332 for item_key, item_val in value.items() 

333 } 

334 

335 msg = 'Must be a JSON primitive.' 

336 raise exceptions.ParameterError(msg)